feat: add manager
This commit is contained in:
@@ -28,7 +28,7 @@ func (job *ManagedJob) startCycle() {
|
||||
}
|
||||
|
||||
func (job *ManagedJob) cycle() {
|
||||
lc.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", job.def.Key))
|
||||
lc.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", string(job.def.UID)))
|
||||
|
||||
// Init job
|
||||
if job.state == nil {
|
||||
@@ -58,7 +58,7 @@ func (job *ManagedJob) cycle() {
|
||||
if errors.Is(err, ErrNoJobId) {
|
||||
job.state = nil
|
||||
}
|
||||
if job.state.LastSavepointDate == nil || time.Now().Add(-job.def.SavepointInterval).After(*job.state.LastSavepointDate) {
|
||||
if job.state.LastSavepointDate == nil || time.Now().Add(-job.def.Spec.SavepointInterval.Duration).After(*job.state.LastSavepointDate) {
|
||||
if job.state.SavepointTriggerId == nil {
|
||||
job.createSavepoint()
|
||||
} else {
|
||||
|
||||
@@ -1,21 +1,21 @@
|
||||
package managed_job
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/config"
|
||||
"flink-kube-operator/internal/crd/v1alpha1"
|
||||
|
||||
api "github.com/logi-camp/go-flink-client"
|
||||
"github.com/tidwall/buntdb"
|
||||
)
|
||||
|
||||
type ManagedJob struct {
|
||||
def config.JobDef
|
||||
def v1alpha1.FlinkJob
|
||||
client *api.Client
|
||||
jarId string
|
||||
db *buntdb.DB
|
||||
state *jobState
|
||||
}
|
||||
|
||||
func NewManagedJob(client *api.Client, db *buntdb.DB, def config.JobDef) *ManagedJob {
|
||||
func NewManagedJob(client *api.Client, db *buntdb.DB, def v1alpha1.FlinkJob) *ManagedJob {
|
||||
job := &ManagedJob{
|
||||
def: def,
|
||||
client: client,
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
|
||||
// upload jar file and set the jarId for later usages
|
||||
func (job *ManagedJob) upload() error {
|
||||
jarFile, err := jar.NewJarFile(job.def.JarURI)
|
||||
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI)
|
||||
if err != nil {
|
||||
lc.Logger.Debug("[main] error on download jar", zap.Error(err))
|
||||
return err
|
||||
@@ -32,7 +32,7 @@ func (job *ManagedJob) run() error {
|
||||
runJarResp, err := job.client.RunJar(api.RunOpts{
|
||||
JarID: job.jarId,
|
||||
AllowNonRestoredState: true,
|
||||
EntryClass: job.def.EntryClass,
|
||||
EntryClass: job.def.Spec.EntryClass,
|
||||
})
|
||||
if err != nil {
|
||||
lc.Logger.Error("[managed-job] [run]", zap.Error(err))
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
func (job *ManagedJob) loadState() {
|
||||
err := job.db.View(
|
||||
func(tx *buntdb.Tx) error {
|
||||
if val, err := tx.Get(job.def.Key); err != nil {
|
||||
if val, err := tx.Get(string(job.def.GetUID())); err != nil {
|
||||
return err
|
||||
} else {
|
||||
return json.Unmarshal([]byte(val), job.state)
|
||||
@@ -30,15 +30,15 @@ func (job *ManagedJob) updateState(state jobState) {
|
||||
|
||||
value, _ := json.Marshal(job.state)
|
||||
job.db.Update(func(tx *buntdb.Tx) error {
|
||||
_, _, err := tx.Set(job.def.Key, string(value), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
_, _, err := tx.Set(string(job.def.GetUID()), string(value), nil)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (job *ManagedJob) setError(errMsg string) {
|
||||
if job.state == nil {
|
||||
job.state = &jobState{}
|
||||
}
|
||||
job.state.Error = &errMsg
|
||||
job.state.Status = JobStatusError
|
||||
job.updateState(*job.state)
|
||||
|
||||
Reference in New Issue
Block a user