feat(managed-job): add update job method
This commit is contained in:
@@ -1 +1,6 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
FlinkApiUrl string `yaml:"flinkApiUrl"`
|
||||||
|
DatabasePath string `yaml:"databasePath"`
|
||||||
|
}
|
||||||
|
|||||||
12
internal/crd/event.go
Normal file
12
internal/crd/event.go
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
package crd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (crd Crd) AddEvent(jobUid types.UID, event string) {
|
||||||
|
crd.client.UpdateStatus(context.Background(), nil, v1.UpdateOptions{})
|
||||||
|
}
|
||||||
@@ -9,7 +9,7 @@ import (
|
|||||||
|
|
||||||
var jobs = map[types.UID]*v1alpha1.FlinkJob{}
|
var jobs = map[types.UID]*v1alpha1.FlinkJob{}
|
||||||
|
|
||||||
func (crd Crd) repsert(job *v1alpha1.FlinkJob) {
|
func (crd *Crd) repsert(job *v1alpha1.FlinkJob) {
|
||||||
jobs[job.GetUID()] = job
|
jobs[job.GetUID()] = job
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -58,7 +58,8 @@ func (job *ManagedJob) cycle() {
|
|||||||
if errors.Is(err, ErrNoJobId) {
|
if errors.Is(err, ErrNoJobId) {
|
||||||
job.state = nil
|
job.state = nil
|
||||||
}
|
}
|
||||||
if job.state.LastSavepointDate == nil || time.Now().Add(-job.def.Spec.SavepointInterval.Duration).After(*job.state.LastSavepointDate) {
|
lc.Logger.Debug("savepoint interval", zap.Any("savepoint duration", job.def.Spec.SavepointInterval))
|
||||||
|
if (job.def.Spec.SavepointInterval.Duration != 0) && ((job.state.LastSavepointDate == nil) || time.Now().Add(-job.def.Spec.SavepointInterval.Duration).After(*job.state.LastSavepointDate)) {
|
||||||
if job.state.SavepointTriggerId == nil {
|
if job.state.SavepointTriggerId == nil {
|
||||||
job.createSavepoint()
|
job.createSavepoint()
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -24,3 +24,7 @@ func NewManagedJob(client *api.Client, db *buntdb.DB, def v1alpha1.FlinkJob) *Ma
|
|||||||
job.startCycle()
|
job.startCycle()
|
||||||
return job
|
return job
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (job *ManagedJob) Update(def v1alpha1.FlinkJob) {
|
||||||
|
job.def = def
|
||||||
|
}
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ func (job ManagedJob) createSavepoint() error {
|
|||||||
lc.Logger.Debug("[managed-job] [savepoint] no job id")
|
lc.Logger.Debug("[managed-job] [savepoint] no job id")
|
||||||
return ErrNoJobId
|
return ErrNoJobId
|
||||||
}
|
}
|
||||||
|
lc.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("interval", job.def.Spec.SavepointInterval.String()))
|
||||||
resp, err := job.client.SavePoints(*job.state.JobId, "/flink-data/savepoints-2/", false)
|
resp, err := job.client.SavePoints(*job.state.JobId, "/flink-data/savepoints-2/", false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lc.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err))
|
lc.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err))
|
||||||
|
|||||||
@@ -31,12 +31,12 @@ func Setup(client *api.Client, db *buntdb.DB) {
|
|||||||
|
|
||||||
func cycle(client *api.Client, db *buntdb.DB) {
|
func cycle(client *api.Client, db *buntdb.DB) {
|
||||||
for _, uid := range crd.GetAllJobKeys() {
|
for _, uid := range crd.GetAllJobKeys() {
|
||||||
job := crd.GetJob(uid)
|
def := crd.GetJob(uid)
|
||||||
_, ok := managedJobs[uid]
|
managedJob, ok := managedJobs[uid]
|
||||||
if ok {
|
if ok {
|
||||||
|
managedJob.Update(def)
|
||||||
} else {
|
} else {
|
||||||
managedJob := managed_job.NewManagedJob(client, db, job)
|
managedJob := managed_job.NewManagedJob(client, db, def)
|
||||||
managedJobs[uid] = *managedJob
|
managedJobs[uid] = *managedJob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user