feat: remove buntdb dep
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,4 +2,3 @@ db
|
|||||||
*.log
|
*.log
|
||||||
tmp
|
tmp
|
||||||
*.jar
|
*.jar
|
||||||
bunt.db
|
|
||||||
1
.vscode/settings.json
vendored
1
.vscode/settings.json
vendored
@@ -1,7 +1,6 @@
|
|||||||
{
|
{
|
||||||
"cSpell.words": [
|
"cSpell.words": [
|
||||||
"apiextensions",
|
"apiextensions",
|
||||||
"buntdb",
|
|
||||||
"controllerutil",
|
"controllerutil",
|
||||||
"deepcopy",
|
"deepcopy",
|
||||||
"Finalizer",
|
"Finalizer",
|
||||||
|
|||||||
@@ -10,8 +10,6 @@ import (
|
|||||||
|
|
||||||
"gitea.com/logicamp/lc"
|
"gitea.com/logicamp/lc"
|
||||||
api "github.com/logi-camp/go-flink-client"
|
api "github.com/logi-camp/go-flink-client"
|
||||||
"github.com/tidwall/buntdb"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -23,13 +21,6 @@ func main() {
|
|||||||
// init kubernetes flink job crd watch
|
// init kubernetes flink job crd watch
|
||||||
crdInstance := crd.New()
|
crdInstance := crd.New()
|
||||||
|
|
||||||
// create database instance
|
|
||||||
db, err := buntdb.Open(config.DatabasePath)
|
|
||||||
if err != nil {
|
|
||||||
lc.Logger.Fatal("[main] error on open db", zap.Error(err))
|
|
||||||
}
|
|
||||||
defer db.Close()
|
|
||||||
|
|
||||||
// create flink api instance
|
// create flink api instance
|
||||||
c, err := api.New(config.FlinkApiUrl)
|
c, err := api.New(config.FlinkApiUrl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -44,7 +35,7 @@ func main() {
|
|||||||
fmt.Println(clusterConfig)
|
fmt.Println(clusterConfig)
|
||||||
|
|
||||||
// init flink job manager
|
// init flink job manager
|
||||||
manager.NewManager(c, db, crdInstance)
|
manager.NewManager(c, crdInstance)
|
||||||
|
|
||||||
// for _, jobDef := range config.Jobs {
|
// for _, jobDef := range config.Jobs {
|
||||||
// managed_job.NewManagedJob(c, db, jobDef)
|
// managed_job.NewManagedJob(c, db, jobDef)
|
||||||
|
|||||||
@@ -1,2 +1 @@
|
|||||||
flinkApiUrl: 127.0.0.1:8981
|
flinkApiUrl: 127.0.0.1:8981
|
||||||
databasePath: ./bunt.db
|
|
||||||
12
crds.yaml
12
crds.yaml
@@ -71,6 +71,15 @@ spec:
|
|||||||
type: string
|
type: string
|
||||||
error:
|
error:
|
||||||
type: string
|
type: string
|
||||||
|
lastSavepointPath:
|
||||||
|
type: string
|
||||||
|
lifeCycleStatus:
|
||||||
|
type: string
|
||||||
|
savepointTriggerId:
|
||||||
|
type: string
|
||||||
|
lastSavepointDate:
|
||||||
|
type: string
|
||||||
|
format: time
|
||||||
additionalPrinterColumns:
|
additionalPrinterColumns:
|
||||||
- name: Status
|
- name: Status
|
||||||
type: string
|
type: string
|
||||||
@@ -78,3 +87,6 @@ spec:
|
|||||||
- name: Age
|
- name: Age
|
||||||
type: date
|
type: date
|
||||||
jsonPath: .metadata.creationTimestamp
|
jsonPath: .metadata.creationTimestamp
|
||||||
|
- name: LifeCycleStatus
|
||||||
|
type: string
|
||||||
|
jsonPath: .status.lifeCycleStatus
|
||||||
1
go.mod
1
go.mod
@@ -6,7 +6,6 @@ require (
|
|||||||
gitea.com/logicamp/lc v1.14.6
|
gitea.com/logicamp/lc v1.14.6
|
||||||
github.com/logi-camp/go-flink-client v0.2.0
|
github.com/logi-camp/go-flink-client v0.2.0
|
||||||
github.com/matoous/go-nanoid/v2 v2.1.0
|
github.com/matoous/go-nanoid/v2 v2.1.0
|
||||||
github.com/tidwall/buntdb v1.3.2
|
|
||||||
go.uber.org/zap v1.27.0
|
go.uber.org/zap v1.27.0
|
||||||
k8s.io/apimachinery v0.31.3
|
k8s.io/apimachinery v0.31.3
|
||||||
k8s.io/client-go v0.31.3
|
k8s.io/client-go v0.31.3
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -376,8 +376,6 @@ github.com/tidwall/assert v0.1.0 h1:aWcKyRBUAdLoVebxo95N7+YZVTFF/ASTr7BN4sLP6XI=
|
|||||||
github.com/tidwall/assert v0.1.0/go.mod h1:QLYtGyeqse53vuELQheYl9dngGCJQ+mTtlxcktb+Kj8=
|
github.com/tidwall/assert v0.1.0/go.mod h1:QLYtGyeqse53vuELQheYl9dngGCJQ+mTtlxcktb+Kj8=
|
||||||
github.com/tidwall/btree v1.4.2 h1:PpkaieETJMUxYNADsjgtNRcERX7mGc/GP2zp/r5FM3g=
|
github.com/tidwall/btree v1.4.2 h1:PpkaieETJMUxYNADsjgtNRcERX7mGc/GP2zp/r5FM3g=
|
||||||
github.com/tidwall/btree v1.4.2/go.mod h1:LGm8L/DZjPLmeWGjv5kFrY8dL4uVhMmzmmLYmsObdKE=
|
github.com/tidwall/btree v1.4.2/go.mod h1:LGm8L/DZjPLmeWGjv5kFrY8dL4uVhMmzmmLYmsObdKE=
|
||||||
github.com/tidwall/buntdb v1.3.2 h1:qd+IpdEGs0pZci37G4jF51+fSKlkuUTMXuHhXL1AkKg=
|
|
||||||
github.com/tidwall/buntdb v1.3.2/go.mod h1:lZZrZUWzlyDJKlLQ6DKAy53LnG7m5kHyrEHvvcDmBpU=
|
|
||||||
github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||||
github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw=
|
github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw=
|
||||||
github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ package crd
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flink-kube-operator/internal/crd/v1alpha1"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"gitea.com/logicamp/lc"
|
"gitea.com/logicamp/lc"
|
||||||
@@ -12,12 +11,9 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (crd Crd) SetJobStatus(jobUid types.UID, status string) error {
|
func (crd Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
|
||||||
job := GetJob(jobUid)
|
job := GetJob(jobUid)
|
||||||
// Define the patch data (JSON Merge Patch format)
|
|
||||||
patchData := map[string]interface{}{
|
|
||||||
"status": v1alpha1.FlinkJobStatus{},
|
|
||||||
}
|
|
||||||
patchBytes, err := json.Marshal(patchData)
|
patchBytes, err := json.Marshal(patchData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error marshaling patch data: %w", err)
|
return fmt.Errorf("error marshaling patch data: %w", err)
|
||||||
15
internal/crd/status.go
Normal file
15
internal/crd/status.go
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
package crd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flink-kube-operator/internal/crd/v1alpha1"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (crd Crd) SetJobStatus(jobUid types.UID, status v1alpha1.FlinkJobStatus) error {
|
||||||
|
// Define the patch data (JSON Merge Patch format)
|
||||||
|
patchData := map[string]interface{}{
|
||||||
|
"status": status,
|
||||||
|
}
|
||||||
|
return crd.Patch(jobUid, patchData)
|
||||||
|
}
|
||||||
@@ -1,6 +1,9 @@
|
|||||||
package v1alpha1
|
package v1alpha1
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -16,8 +19,13 @@ type FlinkJobSpec struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type FlinkJobStatus struct {
|
type FlinkJobStatus struct {
|
||||||
JobStatus *string `json:"jobStatus,omitempty"`
|
JobStatus JobStatus `json:"jobStatus,omitempty"`
|
||||||
LifeCycleStatus *string `json:"lifeCycleStatus,omitempty"`
|
LifeCycleStatus *string `json:"lifeCycleStatus,omitempty"`
|
||||||
|
LastSavepointPath *string `json:"lastSavepointPath,omitempty"`
|
||||||
|
JobId *string `json:"jobId,omitempty"`
|
||||||
|
Error *string `json:"error,omitempty"`
|
||||||
|
SavepointTriggerId *string `json:"savepointTriggerId,omitempty"`
|
||||||
|
LastSavepointDate *time.Time `json:"lastSavepointDate,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||||
@@ -34,3 +42,35 @@ type FlinkJobList struct {
|
|||||||
metaV1.ListMeta `json:"metadata,omitempty"`
|
metaV1.ListMeta `json:"metadata,omitempty"`
|
||||||
Items []FlinkJob `json:"items"`
|
Items []FlinkJob `json:"items"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrNoJobId = errors.New("[managed-job] no job id")
|
||||||
|
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
||||||
|
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
|
||||||
|
)
|
||||||
|
|
||||||
|
type JobStatus string
|
||||||
|
|
||||||
|
var (
|
||||||
|
JobStatusInitializing JobStatus = "INITIALIZING"
|
||||||
|
JobStatusRunning JobStatus = "RUNNING"
|
||||||
|
JobStatusCreating JobStatus = "CREATING"
|
||||||
|
JobStatusError JobStatus = "ERROR"
|
||||||
|
JobStatusReconciling JobStatus = "RECONCILING"
|
||||||
|
JobStatusFailed JobStatus = "FAILED"
|
||||||
|
JobStatusFailing JobStatus = "FAILING"
|
||||||
|
JobStatusRestarting JobStatus = "RESTARTING"
|
||||||
|
JobStatusFinished JobStatus = "FINISHED"
|
||||||
|
JobStatusCanceled JobStatus = "CANCELED"
|
||||||
|
JobStatusCancelling JobStatus = "CANCELLING"
|
||||||
|
JobStatusSuspended JobStatus = "SUSPENDED"
|
||||||
|
)
|
||||||
|
|
||||||
|
type LifeCycleStatus string
|
||||||
|
|
||||||
|
const (
|
||||||
|
LifeCycleStatusInitializing LifeCycleStatus = "INITIALIZING"
|
||||||
|
LifeCycleStatusRestoring LifeCycleStatus = "RESTORING"
|
||||||
|
LifeCycleStatusHealthy LifeCycleStatus = "HEALTHY"
|
||||||
|
LifeCycleStatusFailed LifeCycleStatus = "FAILED"
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package managed_job
|
package managed_job
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flink-kube-operator/internal/crd/v1alpha1"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gitea.com/logicamp/lc"
|
"gitea.com/logicamp/lc"
|
||||||
@@ -30,20 +31,30 @@ func (job *ManagedJob) Cycle() {
|
|||||||
lc.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", string(job.def.UID)))
|
lc.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", string(job.def.UID)))
|
||||||
|
|
||||||
// Init job
|
// Init job
|
||||||
if job.state == nil {
|
if job.def.Status.JobStatus == "" {
|
||||||
err := job.upload()
|
err := job.upload()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
job.setError("[upload-error] " + err.Error())
|
job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||||
|
"status": map[string]interface{}{
|
||||||
|
"error": "[upload-error] " + err.Error(),
|
||||||
|
},
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = job.run()
|
err = job.run()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
job.setError("[run-error] " + err.Error())
|
job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||||
|
"status": map[string]interface{}{
|
||||||
|
"error": "[run-error] " + err.Error(),
|
||||||
|
},
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
job.crd.SetJobStatus(job.def.UID, string(job.state.Status))
|
// job.crd.SetJobStatus(job.def.UID, v1alpha1.FlinkJobStatus{
|
||||||
|
// JobStatus: job.def.Status.JobStatus,
|
||||||
|
// })
|
||||||
|
|
||||||
// Check for set running or error state
|
// Check for set running or error state
|
||||||
/* if job.state.Status == JobStatusCreating || job.state.Status == JobStatusFailing {
|
/* if job.state.Status == JobStatusCreating || job.state.Status == JobStatusFailing {
|
||||||
@@ -54,9 +65,9 @@ func (job *ManagedJob) Cycle() {
|
|||||||
return
|
return
|
||||||
} */
|
} */
|
||||||
|
|
||||||
if job.state.Status == JobStatusRunning {
|
if job.def.Status.JobStatus == v1alpha1.JobStatusRunning {
|
||||||
if (job.def.Spec.SavepointInterval.Duration != 0) && ((job.state.LastSavepointDate == nil) || time.Now().Add(-job.def.Spec.SavepointInterval.Duration).After(*job.state.LastSavepointDate)) {
|
if (job.def.Spec.SavepointInterval.Duration != 0) && ((job.def.Status.LastSavepointDate == nil) || time.Now().Add(-job.def.Spec.SavepointInterval.Duration).After(*job.def.Status.LastSavepointDate)) {
|
||||||
if job.state.SavepointTriggerId == nil {
|
if job.def.Status.SavepointTriggerId == nil {
|
||||||
job.createSavepoint()
|
job.createSavepoint()
|
||||||
} else {
|
} else {
|
||||||
job.trackSavepoint()
|
job.trackSavepoint()
|
||||||
@@ -64,10 +75,10 @@ func (job *ManagedJob) Cycle() {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if job.state.Status == JobStatusFailed {
|
if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
|
||||||
job.restore()
|
job.restore()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
lc.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.state.Status)))
|
lc.Logger.Warn("[managed-job] [cycle]", zap.String("unhanded job status", string(job.def.Status.JobStatus)))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,23 +5,19 @@ import (
|
|||||||
"flink-kube-operator/internal/crd/v1alpha1"
|
"flink-kube-operator/internal/crd/v1alpha1"
|
||||||
|
|
||||||
api "github.com/logi-camp/go-flink-client"
|
api "github.com/logi-camp/go-flink-client"
|
||||||
"github.com/tidwall/buntdb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ManagedJob struct {
|
type ManagedJob struct {
|
||||||
def v1alpha1.FlinkJob
|
def v1alpha1.FlinkJob
|
||||||
client *api.Client
|
client *api.Client
|
||||||
jarId string
|
jarId string
|
||||||
db *buntdb.DB
|
|
||||||
state *jobState
|
|
||||||
crd *crd.Crd
|
crd *crd.Crd
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewManagedJob(client *api.Client, db *buntdb.DB, def v1alpha1.FlinkJob, crd *crd.Crd) *ManagedJob {
|
func NewManagedJob(client *api.Client, def v1alpha1.FlinkJob, crd *crd.Crd) *ManagedJob {
|
||||||
job := &ManagedJob{
|
job := &ManagedJob{
|
||||||
def: def,
|
def: def,
|
||||||
client: client,
|
client: client,
|
||||||
db: db,
|
|
||||||
crd: crd,
|
crd: crd,
|
||||||
}
|
}
|
||||||
//job.startCycle()
|
//job.startCycle()
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
package managed_job
|
package managed_job
|
||||||
|
|
||||||
func (job *ManagedJob) Stop() {
|
func (job *ManagedJob) Stop() {
|
||||||
job.client.StopJob(*job.state.JobId)
|
job.client.StopJob(*job.def.Status.JobId)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package managed_job
|
package managed_job
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flink-kube-operator/internal/crd/v1alpha1"
|
||||||
|
|
||||||
"gitea.com/logicamp/lc"
|
"gitea.com/logicamp/lc"
|
||||||
api "github.com/logi-camp/go-flink-client"
|
api "github.com/logi-camp/go-flink-client"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@@ -8,16 +10,16 @@ import (
|
|||||||
|
|
||||||
// restore the job from savepoint and jarId in managedJob
|
// restore the job from savepoint and jarId in managedJob
|
||||||
func (job *ManagedJob) restore() error {
|
func (job *ManagedJob) restore() error {
|
||||||
if job.state.LastSavepointPath == nil {
|
if job.def.Status.LastSavepointPath == nil {
|
||||||
lc.Logger.Error("[managed-job] [restore]", zap.Error(ErrNoSavepointPath))
|
lc.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath))
|
||||||
return ErrNoSavepointPath
|
return v1alpha1.ErrNoSavepointPath
|
||||||
}
|
}
|
||||||
lc.Logger.Debug("[managed-job] [restore] restoring", zap.String("savepointPath", *job.state.LastSavepointPath))
|
lc.Logger.Debug("[managed-job] [restore] restoring", zap.String("savepointPath", *job.def.Status.LastSavepointPath))
|
||||||
runJarResp, err := job.client.RunJar(api.RunOpts{
|
runJarResp, err := job.client.RunJar(api.RunOpts{
|
||||||
JarID: job.jarId,
|
JarID: job.jarId,
|
||||||
AllowNonRestoredState: true,
|
AllowNonRestoredState: true,
|
||||||
EntryClass: job.def.Spec.EntryClass,
|
EntryClass: job.def.Spec.EntryClass,
|
||||||
SavepointPath: *job.state.LastSavepointPath,
|
SavepointPath: *job.def.Status.LastSavepointPath,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lc.Logger.Error("[managed-job] [run]", zap.Error(err))
|
lc.Logger.Error("[managed-job] [run]", zap.Error(err))
|
||||||
@@ -25,10 +27,17 @@ func (job *ManagedJob) restore() error {
|
|||||||
}
|
}
|
||||||
lc.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp))
|
lc.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp))
|
||||||
|
|
||||||
job.state.JobId = &runJarResp.JobId
|
// job.def.Status.JobId = &runJarResp.JobId
|
||||||
job.state.Status = JobStatusCreating
|
// job.def.Status.JobStatus = v1alpha1.JobStatusCreating
|
||||||
job.state.Error = nil
|
// job.def.Status.Error = nil
|
||||||
job.updateState(*job.state)
|
job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||||
|
"status": map[string]interface{}{
|
||||||
|
"jobId": &runJarResp.JobId,
|
||||||
|
"jobStatus": v1alpha1.JobStatusCreating,
|
||||||
|
"lifeCycleStatus": v1alpha1.LifeCycleStatusRestoring,
|
||||||
|
"error": nil,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package managed_job
|
package managed_job
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flink-kube-operator/internal/crd/v1alpha1"
|
||||||
"flink-kube-operator/internal/jar"
|
"flink-kube-operator/internal/jar"
|
||||||
|
|
||||||
"gitea.com/logicamp/lc"
|
"gitea.com/logicamp/lc"
|
||||||
@@ -40,12 +41,23 @@ func (job *ManagedJob) run() error {
|
|||||||
}
|
}
|
||||||
lc.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp))
|
lc.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp))
|
||||||
|
|
||||||
if job.state == nil {
|
// if job.state == nil {
|
||||||
job.state = &jobState{}
|
// job.state = &jobState{}
|
||||||
}
|
// }
|
||||||
job.state.JobId = &runJarResp.JobId
|
// job.state.JobId = &runJarResp.JobId
|
||||||
job.state.Status = JobStatusCreating
|
// job.state.Status = v1alpha1.JobStatusCreating
|
||||||
job.updateState(*job.state)
|
// job.updateState(*job.state)
|
||||||
|
// job.crd.SetJobStatus(job.def.UID, v1alpha1.FlinkJobStatus{
|
||||||
|
// JobId: job.state.JobId,
|
||||||
|
// })
|
||||||
|
job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||||
|
"status": map[string]interface{}{
|
||||||
|
"jobId": &runJarResp.JobId,
|
||||||
|
"jobStatus": v1alpha1.JobStatusCreating,
|
||||||
|
"lifeCycleStatus": v1alpha1.LifeCycleStatusInitializing,
|
||||||
|
"error": nil,
|
||||||
|
},
|
||||||
|
})
|
||||||
//job.updateState(jobState{JobId: &runJarResp.JobId, Status: JobStatusCreating})
|
//job.updateState(jobState{JobId: &runJarResp.JobId, Status: JobStatusCreating})
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
package managed_job
|
package managed_job
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flink-kube-operator/internal/crd/v1alpha1"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"gitea.com/logicamp/lc"
|
"gitea.com/logicamp/lc"
|
||||||
api "github.com/logi-camp/go-flink-client"
|
api "github.com/logi-camp/go-flink-client"
|
||||||
@@ -9,39 +11,53 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (job ManagedJob) createSavepoint() error {
|
func (job ManagedJob) createSavepoint() error {
|
||||||
if job.state.JobId == nil {
|
if job.def.Status.JobId == nil {
|
||||||
lc.Logger.Debug("[managed-job] [savepoint] no job id")
|
lc.Logger.Debug("[managed-job] [savepoint] no job id")
|
||||||
return ErrNoJobId
|
return v1alpha1.ErrNoJobId
|
||||||
}
|
}
|
||||||
lc.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("interval", job.def.Spec.SavepointInterval.String()))
|
lc.Logger.Info("[managed-job] [savepoint] creating savepoint", zap.String("interval", job.def.Spec.SavepointInterval.String()))
|
||||||
resp, err := job.client.SavePoints(*job.state.JobId, "/flink-data/savepoints-2/", false)
|
resp, err := job.client.SavePoints(*job.def.Status.JobId, "/flink-data/savepoints-2/", false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lc.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err))
|
lc.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
lc.Logger.Debug("[managed-job] [savepoint]", zap.Any("savepoint-resp", resp))
|
lc.Logger.Debug("[managed-job] [savepoint]", zap.Any("savepoint-resp", resp))
|
||||||
job.setSavepointTriggerId(resp.RequestID)
|
|
||||||
|
job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||||
|
"status": map[string]interface{}{
|
||||||
|
"savepointTriggerId": resp.RequestID,
|
||||||
|
},
|
||||||
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (job ManagedJob) trackSavepoint() error {
|
func (job ManagedJob) trackSavepoint() error {
|
||||||
if job.state.JobId == nil {
|
if job.def.Status.JobId == nil {
|
||||||
lc.Logger.Debug("[managed-job] [savepoint] no job id")
|
lc.Logger.Debug("[managed-job] [savepoint] no job id")
|
||||||
return ErrNoJobId
|
return v1alpha1.ErrNoJobId
|
||||||
}
|
}
|
||||||
if job.state.SavepointTriggerId == nil {
|
if job.def.Status.SavepointTriggerId == nil {
|
||||||
lc.Logger.Debug("[managed-job] [savepoint] no job id")
|
lc.Logger.Debug("[managed-job] [savepoint] no job id")
|
||||||
return ErrNoSavepointTriggerId
|
return v1alpha1.ErrNoSavepointTriggerId
|
||||||
}
|
}
|
||||||
resp, err := job.client.TrackSavepoint(*job.state.JobId, *job.state.SavepointTriggerId)
|
resp, err := job.client.TrackSavepoint(*job.def.Status.JobId, *job.def.Status.SavepointTriggerId)
|
||||||
lc.Logger.Debug("[managed-job] [savepoint] track savepoint", zap.Any("status.Id", resp.Status.Id), zap.Any("failureCause.stacktrace", resp.Operation.FailureCause.StackTrace), zap.Error(err))
|
lc.Logger.Debug("[managed-job] [savepoint] track savepoint", zap.Any("status.Id", resp.Status.Id), zap.Any("failureCause.stacktrace", resp.Operation.FailureCause.StackTrace), zap.Error(err))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 {
|
if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 {
|
||||||
job.removeSavepointTriggerId()
|
job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||||
|
"status": map[string]interface{}{
|
||||||
|
"savepointTriggerId": nil,
|
||||||
|
},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if resp.Status.Id == api.SavepointStatusInCompleted {
|
if resp.Status.Id == api.SavepointStatusInCompleted {
|
||||||
job.setSavepointLocation(resp.Operation.Location)
|
job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||||
|
"status": map[string]interface{}{
|
||||||
|
"lastSavepointPath": resp.Operation.Location,
|
||||||
|
"lastSavepointDate": time.Now(),
|
||||||
|
},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -1,70 +1,77 @@
|
|||||||
package managed_job
|
package managed_job
|
||||||
|
|
||||||
import (
|
// import (
|
||||||
"encoding/json"
|
// "encoding/json"
|
||||||
"time"
|
// "flink-kube-operator/internal/crd/v1alpha1"
|
||||||
|
// "time"
|
||||||
|
|
||||||
"github.com/tidwall/buntdb"
|
// "github.com/tidwall/buntdb"
|
||||||
)
|
// )
|
||||||
|
|
||||||
// get state of job from local db
|
// // get state of job from local db
|
||||||
func (job *ManagedJob) loadState() {
|
// func (job *ManagedJob) loadState() {
|
||||||
job.db.View(
|
// job.db.View(
|
||||||
func(tx *buntdb.Tx) error {
|
// func(tx *buntdb.Tx) error {
|
||||||
if val, err := tx.Get(string(job.def.GetUID())); err != nil {
|
// if val, err := tx.Get(string(job.def.GetUID())); err != nil {
|
||||||
return err
|
// return err
|
||||||
} else {
|
// } else {
|
||||||
return json.Unmarshal([]byte(val), job.state)
|
// return json.Unmarshal([]byte(val), job.state)
|
||||||
}
|
// }
|
||||||
})
|
// })
|
||||||
}
|
// }
|
||||||
|
|
||||||
// save state of job to local db
|
// // save state of job to local db
|
||||||
func (job *ManagedJob) updateState(state jobState) {
|
// func (job *ManagedJob) updateState(state jobState) {
|
||||||
job.state = &state
|
// job.state = &state
|
||||||
|
|
||||||
value, _ := json.Marshal(job.state)
|
// value, _ := json.Marshal(job.state)
|
||||||
job.db.Update(func(tx *buntdb.Tx) error {
|
// job.db.Update(func(tx *buntdb.Tx) error {
|
||||||
_, _, err := tx.Set(string(job.def.GetUID()), string(value), nil)
|
// _, _, err := tx.Set(string(job.def.GetUID()), string(value), nil)
|
||||||
return err
|
// return err
|
||||||
})
|
// })
|
||||||
}
|
// }
|
||||||
|
|
||||||
func (job *ManagedJob) setError(errMsg string) {
|
// func (job *ManagedJob) setError(errMsg string) {
|
||||||
if job.state == nil {
|
// if job.state == nil {
|
||||||
job.state = &jobState{}
|
// job.state = &jobState{}
|
||||||
}
|
// }
|
||||||
job.state.Error = &errMsg
|
// job.state.Error = &errMsg
|
||||||
job.state.Status = JobStatusError
|
// job.state.Status = v1alpha1.JobStatusError
|
||||||
job.updateState(*job.state)
|
// job.updateState(*job.state)
|
||||||
}
|
// job.crd.SetJobStatus(job.def.UID, v1alpha1.FlinkJobStatus{
|
||||||
|
// JobStatus: job.state.Status,
|
||||||
|
// })
|
||||||
|
// }
|
||||||
|
|
||||||
func (job *ManagedJob) setSavepointLocation(savepointId string) {
|
// func (job *ManagedJob) setSavepointLocation(savepointId string) {
|
||||||
job.state.LastSavepointPath = &savepointId
|
// job.state.LastSavepointPath = &savepointId
|
||||||
job.state.SavepointTriggerId = nil
|
// job.state.SavepointTriggerId = nil
|
||||||
n := time.Now()
|
// n := time.Now()
|
||||||
job.state.LastSavepointDate = &n
|
// job.state.LastSavepointDate = &n
|
||||||
job.updateState(*job.state)
|
// job.updateState(*job.state)
|
||||||
}
|
// job.crd.SetJobStatus(job.def.UID, v1alpha1.FlinkJobStatus{
|
||||||
|
// LastSavepointPath: job.state.LastSavepointPath,
|
||||||
|
// })
|
||||||
|
// }
|
||||||
|
|
||||||
func (job *ManagedJob) setSavepointTriggerId(savepointReqId string) {
|
// func (job *ManagedJob) setSavepointTriggerId(savepointReqId string) {
|
||||||
job.state.SavepointTriggerId = &savepointReqId
|
// job.state.SavepointTriggerId = &savepointReqId
|
||||||
job.updateState(*job.state)
|
// job.updateState(*job.state)
|
||||||
}
|
// }
|
||||||
|
|
||||||
func (job *ManagedJob) removeSavepointTriggerId() {
|
// func (job *ManagedJob) removeSavepointTriggerId() {
|
||||||
job.state.SavepointTriggerId = nil
|
// job.state.SavepointTriggerId = nil
|
||||||
job.updateState(*job.state)
|
// job.updateState(*job.state)
|
||||||
}
|
// }
|
||||||
|
|
||||||
func (job *ManagedJob) SetStatus(status JobStatus) {
|
// func (job *ManagedJob) SetStatus(status JobStatus) {
|
||||||
job.state.Status = status
|
// job.state.Status = status
|
||||||
job.updateState(*job.state)
|
// job.updateState(*job.state)
|
||||||
}
|
// }
|
||||||
|
|
||||||
func (job *ManagedJob) GetJobId() *string {
|
func (job *ManagedJob) GetJobId() *string {
|
||||||
if job.state != nil && job.state.JobId != nil {
|
if job.def.Status.JobId != nil {
|
||||||
return job.state.JobId
|
return job.def.Status.JobId
|
||||||
} else {
|
} else {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package managed_job
|
package managed_job
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flink-kube-operator/internal/crd/v1alpha1"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"gitea.com/logicamp/lc"
|
"gitea.com/logicamp/lc"
|
||||||
@@ -8,22 +9,36 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (job *ManagedJob) checkStatus() error {
|
func (job *ManagedJob) checkStatus() error {
|
||||||
if job.state.JobId == nil {
|
if job.def.Status.JobId == nil {
|
||||||
lc.Logger.Debug("[managed-job] [status] no job id")
|
lc.Logger.Debug("[managed-job] [status] no job id")
|
||||||
return ErrNoJobId
|
return v1alpha1.ErrNoJobId
|
||||||
}
|
}
|
||||||
statusResp, err := job.client.Job(*job.state.JobId)
|
statusResp, err := job.client.Job(*job.def.Status.JobId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lc.Logger.Debug("[managed-job] [status] cannot fetch status", zap.Error(err))
|
lc.Logger.Debug("[managed-job] [status] cannot fetch status", zap.Error(err))
|
||||||
if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 {
|
if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 {
|
||||||
job.updateState(jobState{
|
// job.updateState(jobState{
|
||||||
JobId: job.state.JobId,
|
// JobId: job.state.JobId,
|
||||||
Status: JobStatusNotFound,
|
// Status: v1alpha1.JobStatusNotFound,
|
||||||
|
// })
|
||||||
|
job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||||
|
"status": map[string]interface{}{
|
||||||
|
"jobId": &job.def.Status.JobId,
|
||||||
|
"jobStatus": "",
|
||||||
|
"lifeCycleStatus": v1alpha1.LifeCycleStatusFailed,
|
||||||
|
"error": "Job not found",
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
//lc.Logger.Debug("[managed-job] [status]", zap.Any("status-resp", statusResp))
|
job.crd.Patch(job.def.UID, map[string]interface{}{
|
||||||
job.SetStatus(JobStatus(statusResp.State))
|
"status": map[string]interface{}{
|
||||||
|
"jobId": &job.def.Status.JobId,
|
||||||
|
"jobStatus": statusResp.State,
|
||||||
|
"lifeCycleStatus": v1alpha1.LifeCycleStatusFailed,
|
||||||
|
"error": "Job not found",
|
||||||
|
},
|
||||||
|
})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,36 +1,12 @@
|
|||||||
package managed_job
|
package managed_job
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"flink-kube-operator/internal/crd/v1alpha1"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type JobStatus string
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrNoJobId = errors.New("[managed-job] no job id")
|
|
||||||
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
|
||||||
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
JobStatusInitializing JobStatus = "INITIALIZING"
|
|
||||||
JobStatusRunning JobStatus = "RUNNING"
|
|
||||||
JobStatusCreating JobStatus = "CREATING"
|
|
||||||
JobStatusNotFound JobStatus = "NotFound"
|
|
||||||
JobStatusError JobStatus = "ERROR"
|
|
||||||
JobStatusReconciling JobStatus = "RECONCILING"
|
|
||||||
JobStatusFailed JobStatus = "FAILED"
|
|
||||||
JobStatusFailing JobStatus = "FAILING"
|
|
||||||
JobStatusRestarting JobStatus = "RESTARTING"
|
|
||||||
JobStatusFinished JobStatus = "FINISHED"
|
|
||||||
JobStatusCanceled JobStatus = "CANCELED"
|
|
||||||
JobStatusCancelling JobStatus = "CANCELLING"
|
|
||||||
JobStatusSuspended JobStatus = "SUSPENDED"
|
|
||||||
)
|
|
||||||
|
|
||||||
type jobState struct {
|
type jobState struct {
|
||||||
Status JobStatus `json:"status"`
|
Status v1alpha1.JobStatus `json:"status"`
|
||||||
Error *string `json:"error"`
|
Error *string `json:"error"`
|
||||||
Info *string `json:"info"`
|
Info *string `json:"info"`
|
||||||
JobId *string `json:"job_id"`
|
JobId *string `json:"job_id"`
|
||||||
|
|||||||
@@ -2,13 +2,13 @@ package manager
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"flink-kube-operator/internal/crd"
|
"flink-kube-operator/internal/crd"
|
||||||
|
"flink-kube-operator/internal/crd/v1alpha1"
|
||||||
"flink-kube-operator/internal/managed_job"
|
"flink-kube-operator/internal/managed_job"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gitea.com/logicamp/lc"
|
"gitea.com/logicamp/lc"
|
||||||
api "github.com/logi-camp/go-flink-client"
|
api "github.com/logi-camp/go-flink-client"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
"github.com/tidwall/buntdb"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
)
|
)
|
||||||
@@ -18,7 +18,7 @@ type Manager struct {
|
|||||||
managedJobs map[types.UID]managed_job.ManagedJob
|
managedJobs map[types.UID]managed_job.ManagedJob
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewManager(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) Manager {
|
func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
ticker := time.NewTicker(5 * time.Second)
|
||||||
quit := make(chan struct{})
|
quit := make(chan struct{})
|
||||||
mgr := Manager{
|
mgr := Manager{
|
||||||
@@ -30,7 +30,7 @@ func NewManager(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) Manager
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
mgr.cycle(client, db, crdInstance)
|
mgr.cycle(client, crdInstance)
|
||||||
case <-quit:
|
case <-quit:
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
return
|
return
|
||||||
@@ -40,7 +40,7 @@ func NewManager(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) Manager
|
|||||||
return mgr
|
return mgr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *Manager) cycle(client *api.Client, db *buntdb.DB, crdInstance *crd.Crd) {
|
func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
||||||
jobsOverviews, err := mgr.client.JobsOverview()
|
jobsOverviews, err := mgr.client.JobsOverview()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lc.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(err))
|
lc.Logger.Error("[manager] [cycle] cannot check flink jobs status", zap.Error(err))
|
||||||
@@ -53,7 +53,7 @@ func (mgr *Manager) cycle(client *api.Client, db *buntdb.DB, crdInstance *crd.Cr
|
|||||||
if ok {
|
if ok {
|
||||||
managedJob.Update(def)
|
managedJob.Update(def)
|
||||||
} else {
|
} else {
|
||||||
managedJob = *managed_job.NewManagedJob(client, db, def, crdInstance)
|
managedJob = *managed_job.NewManagedJob(client, def, crdInstance)
|
||||||
//mgr.managedJobs[uid] = managedJob
|
//mgr.managedJobs[uid] = managedJob
|
||||||
}
|
}
|
||||||
jobOverview, ok := lo.Find(jobsOverviews.Jobs, func(job api.JobOverview) bool {
|
jobOverview, ok := lo.Find(jobsOverviews.Jobs, func(job api.JobOverview) bool {
|
||||||
@@ -65,7 +65,18 @@ func (mgr *Manager) cycle(client *api.Client, db *buntdb.DB, crdInstance *crd.Cr
|
|||||||
})
|
})
|
||||||
if ok {
|
if ok {
|
||||||
lc.Logger.Debug("[manager] read status from flink", zap.String("name", jobOverview.Name), zap.String("state", jobOverview.State))
|
lc.Logger.Debug("[manager] read status from flink", zap.String("name", jobOverview.Name), zap.String("state", jobOverview.State))
|
||||||
managedJob.SetStatus(managed_job.JobStatus(jobOverview.State))
|
var jobLifeCycleStatus *string
|
||||||
|
if jobOverview.State == string(v1alpha1.JobStatusRunning) {
|
||||||
|
status := string(v1alpha1.LifeCycleStatusHealthy)
|
||||||
|
jobLifeCycleStatus = &status
|
||||||
|
}
|
||||||
|
|
||||||
|
crdInstance.Patch(uid, map[string]interface{}{
|
||||||
|
"status": map[string]interface{}{
|
||||||
|
"jobStatus": v1alpha1.JobStatus(jobOverview.State),
|
||||||
|
"lifeCycleStatus": jobLifeCycleStatus,
|
||||||
|
},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
managedJob.Cycle()
|
managedJob.Cycle()
|
||||||
mgr.managedJobs[uid] = managedJob
|
mgr.managedJobs[uid] = managedJob
|
||||||
|
|||||||
Reference in New Issue
Block a user