feat(savepoint): create and track for getting savepoint location

This commit is contained in:
2024-11-30 02:11:30 +03:30
parent 3b0aff5688
commit 8f4cb093b1
7 changed files with 66 additions and 22 deletions

View File

@@ -58,14 +58,14 @@ func (job *ManagedJob) cycle() {
if errors.Is(err, ErrNoJobId) {
job.state = nil
}
if job.state.LastSavepointDate == nil || time.Now().Add(-time.Minute*3).After(*job.state.LastSavepointDate) {
if job.state.SavepointTriggerId == nil {
job.createSavepoint()
} else {
job.trackSavepoint()
}
}
return
}
//if job.state.LastSavepointDate == nil || time.Now().Add(-time.Minute*3).After(*job.state.LastSavepointDate) {
err := job.createSavepoint()
if errors.Is(err, ErrNoJobId) {
job.state = nil
}
//}
}

View File

@@ -1,7 +1,10 @@
package managed_job
import (
"strings"
"gitea.com/logicamp/lc"
api "github.com/logi-camp/go-flink-client"
"go.uber.org/zap"
)
@@ -16,5 +19,29 @@ func (job ManagedJob) createSavepoint() error {
return err
}
lc.Logger.Debug("[managed-job] [savepoint]", zap.Any("savepoint-resp", resp))
job.setSavepointTriggerId(resp.RequestID)
return nil
}
func (job ManagedJob) trackSavepoint() error {
if job.state.JobId == nil {
lc.Logger.Debug("[managed-job] [savepoint] no job id")
return ErrNoJobId
}
if job.state.SavepointTriggerId == nil {
lc.Logger.Debug("[managed-job] [savepoint] no job id")
return ErrNoSavepointTriggerId
}
resp, err := job.client.TrackSavepoint(*job.state.JobId, *job.state.SavepointTriggerId)
lc.Logger.Debug("[managed-job] [savepoint] track savepoint", zap.Any("resp", resp), zap.Error(err))
if err != nil {
if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 {
job.removeSavepointTriggerId()
}
}
if resp.Status.Id == api.SavepointStatusInCompleted {
job.setSavepointLocation(resp.Operation.Location)
}
return nil
}

View File

@@ -47,9 +47,25 @@ func (job *ManagedJob) setError(errMsg string) {
job.updateState(*job.state)
}
func (job *ManagedJob) setSavepointId(savepointId string) {
job.state.LastSavepointId = &savepointId
func (job *ManagedJob) setSavepointLocation(savepointId string) {
job.state.LastSavepointLocation = &savepointId
job.state.SavepointTriggerId = nil
n := time.Now()
job.state.LastSavepointDate = &n
job.updateState(*job.state)
}
func (job *ManagedJob) setSavepointTriggerId(savepointReqId string) {
job.state.SavepointTriggerId = &savepointReqId
job.updateState(*job.state)
}
func (job *ManagedJob) removeSavepointTriggerId() {
job.state.SavepointTriggerId = nil
job.updateState(*job.state)
}
func (job *ManagedJob) setStatus(status JobStatus) {
job.state.Status = status
job.updateState(*job.state)
}

View File

@@ -23,10 +23,7 @@ func (job *ManagedJob) checkStatus() error {
}
return err
}
lc.Logger.Debug("[managed-job] [status]", zap.Any("status-resp", statusResp))
job.updateState(jobState{
JobId: job.state.JobId,
Status: JobStatus(statusResp.State),
})
//lc.Logger.Debug("[managed-job] [status]", zap.Any("status-resp", statusResp))
job.setStatus(JobStatus(statusResp.State))
return err
}

View File

@@ -8,7 +8,8 @@ import (
type JobStatus string
var (
ErrNoJobId = errors.New("[managed-job] no job id")
ErrNoJobId = errors.New("[managed-job] no job id")
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
)
const (
@@ -20,10 +21,11 @@ const (
)
type jobState struct {
Status JobStatus `json:"status"`
Error *string `json:"error"`
Info *string `json:"info"`
JobId *string `json:"job_id"`
LastSavepointId *string `json:"last_savepoint_id"`
LastSavepointDate *time.Time `json:"last_savepoint_time"`
Status JobStatus `json:"status"`
Error *string `json:"error"`
Info *string `json:"info"`
JobId *string `json:"job_id"`
LastSavepointLocation *string `json:"last_savepoint_location"`
SavepointTriggerId *string `json:"savepoint_trigger_id"`
LastSavepointDate *time.Time `json:"last_savepoint_time"`
}