diff --git a/.vscode/launch.json b/.vscode/launch.json index e4bbc26..5208140 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,7 +10,7 @@ "request": "launch", "mode": "auto", "env": { - "FLINK_API_URL": "127.0.0.1:8081", + "FLINK_API_URL": "flink.bz2:8081", "SAVEPOINT_PATH": "/opt/flink/savepoints" }, "cwd": "${workspaceFolder}", diff --git a/internal/crd/patch.go b/internal/crd/patch.go index afda2e4..523c0f2 100644 --- a/internal/crd/patch.go +++ b/internal/crd/patch.go @@ -14,6 +14,7 @@ import ( func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error { job := GetJob(jobUid) + pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid)) patchBytes, err := json.Marshal(patchData) if err != nil { diff --git a/internal/crd/v1alpha1/flink_job.go b/internal/crd/v1alpha1/flink_job.go index 54acb62..550dbdf 100644 --- a/internal/crd/v1alpha1/flink_job.go +++ b/internal/crd/v1alpha1/flink_job.go @@ -54,6 +54,7 @@ var ( ErrNoJarId = errors.New("[managed-job] no jar id") ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id") ErrNoSavepointPath = errors.New("[managed-job] no savepoint path") + ErrOnStartingJob = errors.New("[managed-job] error on starting job") ) type JobStatus string diff --git a/internal/crd/watch.go b/internal/crd/watch.go index fd4e7b9..aa12a74 100644 --- a/internal/crd/watch.go +++ b/internal/crd/watch.go @@ -26,7 +26,6 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable { } defer watcher.Stop() for event := range watcher.ResultChan() { - pkg.Logger.Debug("[crd] event received", zap.Any("type", event.Type)) unstructuredJob := event.Object.(*unstructured.Unstructured) unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object) if err != nil { @@ -50,10 +49,10 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable { switch event.Type { case watch.Bookmark: case watch.Modified: - pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName())) + //pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName())) crd.repsert(job) case watch.Added: - pkg.Logger.Info("[crd] [watch] new flink job created") + //pkg.Logger.Info("[crd] [watch] new flink job created") crd.repsert(job) case watch.Deleted: } diff --git a/internal/managed_job/run.go b/internal/managed_job/run.go index 01adae7..4f3553b 100644 --- a/internal/managed_job/run.go +++ b/internal/managed_job/run.go @@ -48,10 +48,17 @@ func (job *ManagedJob) run(restoreMode bool) error { jobId = &runJarResp.JobId break } else { - if strings.ContainsAny(err.Error(), ".jar does not exist") { + if strings.Contains(err.Error(), ".jar does not exist") { + pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err)) shouldUpload = true } else { pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err)) + stringErr := err.Error() + job.def.Status.Error = &stringErr + job.def.Status.JobStatus = "" + job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed + job.crd.SetJobStatus(job.def.UID, job.def.Status) + return v1alpha1.ErrOnStartingJob } } } @@ -66,6 +73,7 @@ func (job *ManagedJob) run(restoreMode bool) error { }) return nil } + shouldUpload = false continue } return nil diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 2eeccf2..749e7f8 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -58,6 +58,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) { // Loop over job definitions as Kubernetes CRD for _, uid := range crd.GetAllJobKeys() { + pkg.Logger.Debug("mgr.processingJobsIds", zap.Any("processingJobIds", mgr.processingJobsIds)) if lo.Contains(mgr.processingJobsIds, uid) { pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid)) continue