feat: add rest routes to take the control of flink jobs

This commit is contained in:
2025-01-09 00:22:53 +03:30
parent 4dd82c6380
commit 0df874b222
14 changed files with 1019 additions and 12 deletions

View File

@@ -14,12 +14,12 @@ func (job *ManagedJob) Cycle() {
// Init job
if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" {
job.run(false)
job.Run(false)
return
}
if job.def.Status.JobStatus == v1alpha1.JobStatusFinished && job.def.Status.LifeCycleStatus == v1alpha1.LifeCycleStatusGracefullyPaused {
job.run(true)
job.Run(true)
return
}

View File

@@ -2,7 +2,6 @@ package managed_job
import (
"flink-kube-operator/internal/jar"
"flink-kube-operator/pkg"
"go.uber.org/zap"
@@ -30,3 +29,16 @@ func (job *ManagedJob) upload() error {
})
return nil
}
func (job *ManagedJob) RemoveJar() {
if job.def.Status.JarId != nil {
err := job.client.DeleteJar(*job.def.Status.JarId)
pkg.Logger.Error("[managed-job] [jar]", zap.Error(err))
err = job.crd.Patch(job.def.UID, map[string]interface{}{
"status": map[string]interface{}{
"jarId": nil,
},
})
pkg.Logger.Error("[managed-job] [jar]", zap.Error(err))
}
}

View File

@@ -10,7 +10,7 @@ import (
"go.uber.org/zap"
)
func (job *ManagedJob) pause() error {
func (job *ManagedJob) Pause() error {
var err error
if job.def.Status.JobId != nil {
result, stopJobErr := job.client.StopJobWithSavepoint(*job.def.Status.JobId, os.Getenv("SAVEPOINT_PATH"), false)

View File

@@ -1,5 +1,5 @@
package managed_job
func (job *ManagedJob) Stop() {
job.client.StopJob(*job.def.Status.JobId)
func (job *ManagedJob) Stop() error {
return job.client.StopJob(*job.def.Status.JobId)
}

View File

@@ -11,8 +11,8 @@ import (
"go.uber.org/zap"
)
// run the job from savepoint and jarId in managedJob
func (job *ManagedJob) run(restoreMode bool) error {
// Run the job from savepoint and jarId in managedJob
func (job *ManagedJob) Run(restoreMode bool) error {
var savepointPath string
if job.def.Status.LastSavepointPath == nil {
pkg.Logger.Error("[managed-job] [restore]", zap.Error(v1alpha1.ErrNoSavepointPath))

View File

@@ -18,7 +18,7 @@ func (job *ManagedJob) upgrade() {
"jarId": job.def.Status.JarId,
},
})
err := job.pause()
err := job.Pause()
if err != nil {
pkg.Logger.Error("[managed-job] [upgrade] error in pausing", zap.Error(err))
return
@@ -30,7 +30,7 @@ func (job *ManagedJob) upgrade() {
zap.Error(err),
)
err = job.run(true)
err = job.Run(true)
if err != nil {
pkg.Logger.Error("[managed-job] [upgrade] error in running", zap.Error(err))
return

View File

@@ -20,10 +20,16 @@ type Manager struct {
processingJobsIds []types.UID
}
var mgr Manager
func GetManager() Manager {
return mgr
}
func NewManager(client *api.Client, crdInstance *crd.Crd) Manager {
ticker := time.NewTicker(5 * time.Second)
quit := make(chan struct{})
mgr := Manager{
mgr = Manager{
client: client,
managedJobs: map[types.UID]managed_job.ManagedJob{},
processingJobsIds: []types.UID{},
@@ -110,3 +116,11 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
})
}
}
func (mgr *Manager) GetJobs() map[types.UID]managed_job.ManagedJob {
return mgr.managedJobs
}
func (mgr *Manager) GetJob(id types.UID) managed_job.ManagedJob {
return mgr.managedJobs[id]
}

28
internal/rest/base.go Normal file
View File

@@ -0,0 +1,28 @@
package rest
import (
"fmt"
"log"
"github.com/danielgtaylor/huma/v2"
humaFiber "github.com/danielgtaylor/huma/v2/adapters/humafiber"
"github.com/gofiber/fiber/v2"
)
func Init() {
app := fiber.New()
config := huma.DefaultConfig("Go API", "1.0.0")
config.Servers = []*huma.Server{{}}
config.Components.SecuritySchemes = map[string]*huma.SecurityScheme{
"auth": {
Type: "http",
Scheme: "bearer",
BearerFormat: "JWT",
},
}
api := humaFiber.New(app, config)
initRouter(api)
log.Fatal(app.Listen(fmt.Sprintf(":%s", "3000")))
}

View File

@@ -0,0 +1,81 @@
package controller
import (
"context"
"flink-kube-operator/internal/crd"
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/internal/manager"
"k8s.io/apimachinery/pkg/types"
)
type GetJobsReq struct {
}
type GetJobsResp struct {
Body []v1alpha1.FlinkJob
}
func GetJobs(ctx context.Context, req *GetJobsReq) (*GetJobsResp, error) {
jobs := []v1alpha1.FlinkJob{}
for _, key := range crd.GetAllJobKeys() {
job := crd.GetJob(key)
job.ManagedFields = nil
jobs = append(jobs, job)
}
return &GetJobsResp{Body: jobs}, nil
}
type StopJobReq struct {
JobUId string `path:"uid"`
}
type StopJobRespBody struct {
Success bool `json:"success"`
}
type StopJobResp struct {
Body StopJobRespBody
}
func StopJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := manager.GetManager()
job := mgr.GetJob(types.UID(req.JobUId))
err := job.Stop()
if err != nil {
return nil, err
}
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func StartJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := manager.GetManager()
job := mgr.GetJob(types.UID(req.JobUId))
err := job.Run(true)
if err != nil {
return nil, err
}
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func RemoveJobJar(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := manager.GetManager()
job := mgr.GetJob(types.UID(req.JobUId))
job.RemoveJar()
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}
func PauseJob(ctx context.Context, req *StopJobReq) (*StopJobResp, error) {
mgr := manager.GetManager()
job := mgr.GetJob(types.UID(req.JobUId))
job.Pause()
return &StopJobResp{Body: StopJobRespBody{
Success: true,
}}, nil
}

55
internal/rest/router.go Normal file
View File

@@ -0,0 +1,55 @@
package rest
import (
"flink-kube-operator/internal/rest/controller"
"net/http"
"github.com/danielgtaylor/huma/v2"
)
func initRouter(api huma.API) {
huma.Register(api, huma.Operation{
OperationID: "get-jobs",
Method: http.MethodGet,
Path: "/jobs",
Summary: "Get Jobs",
Description: "Get Flink Jobs",
Tags: []string{"Job"},
}, controller.GetJobs)
huma.Register(api, huma.Operation{
OperationID: "stop-job",
Method: http.MethodPost,
Path: "/jobs/{uid}/stop",
Summary: "Stop Job",
Description: "Stop Flink Job",
Tags: []string{"Job"},
}, controller.StopJob)
huma.Register(api, huma.Operation{
OperationID: "start-job",
Method: http.MethodPost,
Path: "/jobs/{uid}/start",
Summary: "Start Job",
Description: "Start Flink Job",
Tags: []string{"Job"},
}, controller.StartJob)
huma.Register(api, huma.Operation{
OperationID: "remove-jar",
Method: http.MethodPost,
Path: "/jobs/{uid}/remove-jar",
Summary: "Remove Job Jar",
Description: "Remove Flink Job Jar",
Tags: []string{"Job"},
}, controller.RemoveJobJar)
huma.Register(api, huma.Operation{
OperationID: "pause-job",
Method: http.MethodPost,
Path: "/jobs/{uid}/pause",
Summary: "Pause Job",
Description: "Pause Flink Job",
Tags: []string{"Job"},
}, controller.PauseJob)
}