feat: handle job run error and change life-cycle status
This commit is contained in:
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
@@ -10,7 +10,7 @@
|
||||
"request": "launch",
|
||||
"mode": "auto",
|
||||
"env": {
|
||||
"FLINK_API_URL": "127.0.0.1:8081",
|
||||
"FLINK_API_URL": "flink.bz2:8081",
|
||||
"SAVEPOINT_PATH": "/opt/flink/savepoints"
|
||||
},
|
||||
"cwd": "${workspaceFolder}",
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
|
||||
job := GetJob(jobUid)
|
||||
pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid))
|
||||
|
||||
patchBytes, err := json.Marshal(patchData)
|
||||
if err != nil {
|
||||
|
||||
@@ -54,6 +54,7 @@ var (
|
||||
ErrNoJarId = errors.New("[managed-job] no jar id")
|
||||
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
||||
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
|
||||
ErrOnStartingJob = errors.New("[managed-job] error on starting job")
|
||||
)
|
||||
|
||||
type JobStatus string
|
||||
|
||||
@@ -26,7 +26,6 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
|
||||
}
|
||||
defer watcher.Stop()
|
||||
for event := range watcher.ResultChan() {
|
||||
pkg.Logger.Debug("[crd] event received", zap.Any("type", event.Type))
|
||||
unstructuredJob := event.Object.(*unstructured.Unstructured)
|
||||
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
|
||||
if err != nil {
|
||||
@@ -50,10 +49,10 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
|
||||
switch event.Type {
|
||||
case watch.Bookmark:
|
||||
case watch.Modified:
|
||||
pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
||||
//pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
||||
crd.repsert(job)
|
||||
case watch.Added:
|
||||
pkg.Logger.Info("[crd] [watch] new flink job created")
|
||||
//pkg.Logger.Info("[crd] [watch] new flink job created")
|
||||
crd.repsert(job)
|
||||
case watch.Deleted:
|
||||
}
|
||||
|
||||
@@ -48,10 +48,17 @@ func (job *ManagedJob) run(restoreMode bool) error {
|
||||
jobId = &runJarResp.JobId
|
||||
break
|
||||
} else {
|
||||
if strings.ContainsAny(err.Error(), ".jar does not exist") {
|
||||
if strings.Contains(err.Error(), ".jar does not exist") {
|
||||
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||
shouldUpload = true
|
||||
} else {
|
||||
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||
stringErr := err.Error()
|
||||
job.def.Status.Error = &stringErr
|
||||
job.def.Status.JobStatus = ""
|
||||
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
|
||||
job.crd.SetJobStatus(job.def.UID, job.def.Status)
|
||||
return v1alpha1.ErrOnStartingJob
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -66,6 +73,7 @@ func (job *ManagedJob) run(restoreMode bool) error {
|
||||
})
|
||||
return nil
|
||||
}
|
||||
shouldUpload = false
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -58,6 +58,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
||||
|
||||
// Loop over job definitions as Kubernetes CRD
|
||||
for _, uid := range crd.GetAllJobKeys() {
|
||||
pkg.Logger.Debug("mgr.processingJobsIds", zap.Any("processingJobIds", mgr.processingJobsIds))
|
||||
if lo.Contains(mgr.processingJobsIds, uid) {
|
||||
pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid))
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user