diff --git a/internal/config/config.type.go b/internal/config/config.type.go index d912156..a1043f3 100644 --- a/internal/config/config.type.go +++ b/internal/config/config.type.go @@ -1 +1,6 @@ package config + +type Config struct { + FlinkApiUrl string `yaml:"flinkApiUrl"` + DatabasePath string `yaml:"databasePath"` +} diff --git a/internal/crd/event.go b/internal/crd/event.go new file mode 100644 index 0000000..8156728 --- /dev/null +++ b/internal/crd/event.go @@ -0,0 +1,12 @@ +package crd + +import ( + "context" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +func (crd Crd) AddEvent(jobUid types.UID, event string) { + crd.client.UpdateStatus(context.Background(), nil, v1.UpdateOptions{}) +} diff --git a/internal/crd/repo.go b/internal/crd/repo.go index c42ed3b..55045b4 100644 --- a/internal/crd/repo.go +++ b/internal/crd/repo.go @@ -9,7 +9,7 @@ import ( var jobs = map[types.UID]*v1alpha1.FlinkJob{} -func (crd Crd) repsert(job *v1alpha1.FlinkJob) { +func (crd *Crd) repsert(job *v1alpha1.FlinkJob) { jobs[job.GetUID()] = job } diff --git a/internal/managed_job/cycle.go b/internal/managed_job/cycle.go index 67bccec..c823679 100644 --- a/internal/managed_job/cycle.go +++ b/internal/managed_job/cycle.go @@ -58,7 +58,8 @@ func (job *ManagedJob) cycle() { if errors.Is(err, ErrNoJobId) { job.state = nil } - if job.state.LastSavepointDate == nil || time.Now().Add(-job.def.Spec.SavepointInterval.Duration).After(*job.state.LastSavepointDate) { + lc.Logger.Debug("savepoint interval", zap.Any("savepoint duration", job.def.Spec.SavepointInterval)) + if (job.def.Spec.SavepointInterval.Duration != 0) && ((job.state.LastSavepointDate == nil) || time.Now().Add(-job.def.Spec.SavepointInterval.Duration).After(*job.state.LastSavepointDate)) { if job.state.SavepointTriggerId == nil { job.createSavepoint() } else { diff --git a/internal/managed_job/new.go b/internal/managed_job/new.go index 1a29bb0..730c1a0 100644 --- a/internal/managed_job/new.go +++ b/internal/managed_job/new.go @@ -24,3 +24,7 @@ func NewManagedJob(client *api.Client, db *buntdb.DB, def v1alpha1.FlinkJob) *Ma job.startCycle() return job } + +func (job *ManagedJob) Update(def v1alpha1.FlinkJob) { + job.def = def +} diff --git a/internal/managed_job/savepoint.go b/internal/managed_job/savepoint.go index 89f5ac1..d8f159e 100644 --- a/internal/managed_job/savepoint.go +++ b/internal/managed_job/savepoint.go @@ -13,6 +13,7 @@ func (job ManagedJob) createSavepoint() error { lc.Logger.Debug("[managed-job] [savepoint] no job id") return ErrNoJobId } + lc.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("interval", job.def.Spec.SavepointInterval.String())) resp, err := job.client.SavePoints(*job.state.JobId, "/flink-data/savepoints-2/", false) if err != nil { lc.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err)) diff --git a/internal/manager/manager.go b/internal/manager/manager.go index f58b84f..5fa783f 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -31,12 +31,12 @@ func Setup(client *api.Client, db *buntdb.DB) { func cycle(client *api.Client, db *buntdb.DB) { for _, uid := range crd.GetAllJobKeys() { - job := crd.GetJob(uid) - _, ok := managedJobs[uid] + def := crd.GetJob(uid) + managedJob, ok := managedJobs[uid] if ok { - + managedJob.Update(def) } else { - managedJob := managed_job.NewManagedJob(client, db, job) + managedJob := managed_job.NewManagedJob(client, db, def) managedJobs[uid] = *managedJob } }