feat: initialize
This commit is contained in:
12
internal/config/config.type.go
Normal file
12
internal/config/config.type.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package config
|
||||
|
||||
type JobDef struct {
|
||||
Key string `yaml:"key"`
|
||||
Name string `yaml:"name"`
|
||||
EntryClass string `yaml:"entryClass"`
|
||||
JarURI string `yaml:"jarURI"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Jobs []JobDef `yaml:"jobs"`
|
||||
}
|
||||
70
internal/jar/jar.go
Normal file
70
internal/jar/jar.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package jar
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"gitea.com/logicamp/lc"
|
||||
api "github.com/logi-camp/go-flink-client"
|
||||
gonanoid "github.com/matoous/go-nanoid/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type JarFile struct {
|
||||
uri string
|
||||
filePath string
|
||||
}
|
||||
|
||||
func NewJarFile(URI string) (*JarFile, error) {
|
||||
jarFile := &JarFile{
|
||||
uri: URI,
|
||||
}
|
||||
err := jarFile.Download()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return jarFile, nil
|
||||
}
|
||||
|
||||
func (JarFile JarFile) Upload(flinkClient *api.Client) (fileName string, err error) {
|
||||
|
||||
resp, err := flinkClient.UploadJar(JarFile.filePath)
|
||||
if err != nil {
|
||||
lc.Logger.Error("[main] error uploading jar", zap.Error(err))
|
||||
}
|
||||
filePathParts := strings.Split(resp.FileName, "/")
|
||||
fileName = filePathParts[len(filePathParts)-1]
|
||||
if resp.Status != "success" {
|
||||
err = errors.New("jar upload was not success")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (jarFile *JarFile) Download() error {
|
||||
fileName, _ := gonanoid.New()
|
||||
jarFile.filePath = fileName + ".jar"
|
||||
out, err := os.Create(jarFile.filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer out.Close()
|
||||
resp, err := http.Get(jarFile.uri)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
_, err = io.Copy(out, resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (jarFile JarFile) Delete() error {
|
||||
return os.Remove(jarFile.filePath)
|
||||
}
|
||||
71
internal/managed_job/cycle.go
Normal file
71
internal/managed_job/cycle.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package managed_job
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"gitea.com/logicamp/lc"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (job *ManagedJob) startCycle() {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
quit := make(chan struct{})
|
||||
|
||||
// load job state from db
|
||||
job.loadState()
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
job.cycle()
|
||||
case <-quit:
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (job *ManagedJob) cycle() {
|
||||
lc.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobKey", job.def.Key))
|
||||
|
||||
// Init job
|
||||
if job.state == nil {
|
||||
err := job.upload()
|
||||
if err != nil {
|
||||
job.setError("[upload-error] " + err.Error())
|
||||
return
|
||||
}
|
||||
err = job.run()
|
||||
if err != nil {
|
||||
job.setError("[run-error] " + err.Error())
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
// Check for set running or error state
|
||||
if job.state.Status == JobStatusCreating {
|
||||
err := job.checkStatus()
|
||||
if errors.Is(err, ErrNoJobId) {
|
||||
job.state = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if job.state.Status == JobStatusRunning {
|
||||
err := job.checkStatus()
|
||||
if errors.Is(err, ErrNoJobId) {
|
||||
job.state = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//if job.state.LastSavepointDate == nil || time.Now().Add(-time.Minute*3).After(*job.state.LastSavepointDate) {
|
||||
err := job.createSavepoint()
|
||||
if errors.Is(err, ErrNoJobId) {
|
||||
job.state = nil
|
||||
}
|
||||
//}
|
||||
|
||||
}
|
||||
26
internal/managed_job/new.go
Normal file
26
internal/managed_job/new.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package managed_job
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/config"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
api "github.com/logi-camp/go-flink-client"
|
||||
)
|
||||
|
||||
type ManagedJob struct {
|
||||
def config.JobDef
|
||||
client *api.Client
|
||||
jarId string
|
||||
db *badger.DB
|
||||
state *jobState
|
||||
}
|
||||
|
||||
func NewManagedJob(client *api.Client, db *badger.DB, def config.JobDef) *ManagedJob {
|
||||
job := &ManagedJob{
|
||||
def: def,
|
||||
client: client,
|
||||
db: db,
|
||||
}
|
||||
job.startCycle()
|
||||
return job
|
||||
}
|
||||
46
internal/managed_job/run.go
Normal file
46
internal/managed_job/run.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package managed_job
|
||||
|
||||
import (
|
||||
"flink-kube-operator/internal/jar"
|
||||
|
||||
"gitea.com/logicamp/lc"
|
||||
api "github.com/logi-camp/go-flink-client"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// upload jar file and set the jarId for later usages
|
||||
func (job *ManagedJob) upload() error {
|
||||
jarFile, err := jar.NewJarFile(job.def.JarURI)
|
||||
if err != nil {
|
||||
lc.Logger.Debug("[main] error on download jar", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
fileName, err := jarFile.Upload(job.client)
|
||||
jarFile.Delete()
|
||||
if err != nil {
|
||||
lc.Logger.Debug("[main] error on upload jar", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
lc.Logger.Debug("[main] after upload jar", zap.Any("upload-jar-resp", fileName))
|
||||
|
||||
job.jarId = fileName
|
||||
return nil
|
||||
}
|
||||
|
||||
// run the job from saved jarId in managedJob
|
||||
func (job *ManagedJob) run() error {
|
||||
runJarResp, err := job.client.RunJar(api.RunOpts{
|
||||
JarID: job.jarId,
|
||||
AllowNonRestoredState: true,
|
||||
EntryClass: job.def.EntryClass,
|
||||
})
|
||||
if err != nil {
|
||||
lc.Logger.Error("[managed-job] [run]", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
lc.Logger.Debug("[main] after run jar", zap.Any("run-jar-resp", runJarResp))
|
||||
|
||||
job.updateState(jobState{JobId: &runJarResp.JobId, Status: JobStatusCreating})
|
||||
|
||||
return err
|
||||
}
|
||||
20
internal/managed_job/savepoint.go
Normal file
20
internal/managed_job/savepoint.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package managed_job
|
||||
|
||||
import (
|
||||
"gitea.com/logicamp/lc"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (job ManagedJob) createSavepoint() error {
|
||||
if job.state.JobId == nil {
|
||||
lc.Logger.Debug("[managed-job] [savepoint] no job id")
|
||||
return ErrNoJobId
|
||||
}
|
||||
resp, err := job.client.SavePoints(*job.state.JobId, "/flink-data/savepoints-2/", false)
|
||||
if err != nil {
|
||||
lc.Logger.Error("[managed-job] [savepoint] error in creating savepoint", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
lc.Logger.Debug("[managed-job] [savepoint]", zap.Any("savepoint-resp", resp))
|
||||
return nil
|
||||
}
|
||||
55
internal/managed_job/state.go
Normal file
55
internal/managed_job/state.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package managed_job
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
// get state of job from local db
|
||||
func (job *ManagedJob) loadState() {
|
||||
err := job.db.View(
|
||||
func(tx *badger.Txn) error {
|
||||
if val, err := tx.Get([]byte(job.def.Key)); err != nil {
|
||||
return err
|
||||
} else if val != nil {
|
||||
val.Value(func(val []byte) error {
|
||||
job.state = &jobState{}
|
||||
return json.Unmarshal(val, job.state)
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if errors.Is(err, badger.ErrKeyNotFound) {
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
|
||||
// save state of job to local db
|
||||
func (job *ManagedJob) updateState(state jobState) {
|
||||
job.state = &state
|
||||
|
||||
value, _ := json.Marshal(job.state)
|
||||
job.db.Update(func(txn *badger.Txn) error {
|
||||
err := txn.Set([]byte(job.def.Key), value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return txn.Commit()
|
||||
})
|
||||
}
|
||||
|
||||
func (job *ManagedJob) setError(errMsg string) {
|
||||
job.state.Error = &errMsg
|
||||
job.state.Status = JobStatusError
|
||||
job.updateState(*job.state)
|
||||
}
|
||||
|
||||
func (job *ManagedJob) setSavepointId(savepointId string) {
|
||||
job.state.LastSavepointId = &savepointId
|
||||
n := time.Now()
|
||||
job.state.LastSavepointDate = &n
|
||||
job.updateState(*job.state)
|
||||
}
|
||||
32
internal/managed_job/status.go
Normal file
32
internal/managed_job/status.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package managed_job
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"gitea.com/logicamp/lc"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (job *ManagedJob) checkStatus() error {
|
||||
if job.state.JobId == nil {
|
||||
lc.Logger.Debug("[managed-job] [status] no job id")
|
||||
return ErrNoJobId
|
||||
}
|
||||
statusResp, err := job.client.Job(*job.state.JobId)
|
||||
if err != nil {
|
||||
lc.Logger.Debug("[managed-job] [status] cannot fetch status", zap.Error(err))
|
||||
if strings.IndexAny(err.Error(), "http status not 2xx: 404") == 0 {
|
||||
job.updateState(jobState{
|
||||
JobId: job.state.JobId,
|
||||
Status: JobStatusNotFound,
|
||||
})
|
||||
}
|
||||
return err
|
||||
}
|
||||
lc.Logger.Debug("[managed-job] [status]", zap.Any("status-resp", statusResp))
|
||||
job.updateState(jobState{
|
||||
JobId: job.state.JobId,
|
||||
Status: JobStatus(statusResp.State),
|
||||
})
|
||||
return err
|
||||
}
|
||||
29
internal/managed_job/type.go
Normal file
29
internal/managed_job/type.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package managed_job
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
type JobStatus string
|
||||
|
||||
var (
|
||||
ErrNoJobId = errors.New("[managed-job] no job id")
|
||||
)
|
||||
|
||||
const (
|
||||
JobStatusRunning JobStatus = "RUNNING"
|
||||
JobStatusCreating JobStatus = "CREATING"
|
||||
JobStatusNotFound JobStatus = "NotFound"
|
||||
JobStatusError JobStatus = "ERROR"
|
||||
JobStatusReconciling JobStatus = "RECONCILING"
|
||||
)
|
||||
|
||||
type jobState struct {
|
||||
Status JobStatus `json:"status"`
|
||||
Error *string `json:"error"`
|
||||
Info *string `json:"info"`
|
||||
JobId *string `json:"job_id"`
|
||||
LastSavepointId *string `json:"last_savepoint_id"`
|
||||
LastSavepointDate *time.Time `json:"last_savepoint_time"`
|
||||
}
|
||||
Reference in New Issue
Block a user