This commit is contained in:
2025-04-06 08:48:54 +03:30
35 changed files with 472 additions and 1126 deletions

View File

@@ -5,7 +5,6 @@ import (
"flink-kube-operator/internal/crd/v1alpha1"
"flink-kube-operator/pkg"
"github.com/reactivex/rxgo/v2"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
@@ -13,11 +12,10 @@ import (
var FinalizerChannel chan (types.UID) = make(chan (types.UID))
func (crd Crd) manageFinalizer(jobEventObservable rxgo.Observable) {
func (crd Crd) manageFinalizer(jobEventChannel chan FlinkJobCrdEvent) {
finalizerName := "flink-operator.logicamp.tech/finalizer"
for j := range jobEventObservable.Observe() {
jobEvent := j.V.(*FlinkJobCrdEvent)
for jobEvent := range jobEventChannel {
pkg.Logger.Debug("[crd] [manage-finalizer] main loop", zap.String("name", jobEvent.Job.Name))
go func() {
if jobEvent.Job.GetDeletionTimestamp() != nil {

View File

@@ -48,11 +48,13 @@ func New() *Crd {
runtimeClient: runtimeClient,
}
// Watch for FlinkJob creation
jobEventObservable := crd.watchFlinkJobs()
jobEventCh := make(chan FlinkJobCrdEvent)
// add finalizer to new resources
go crd.manageFinalizer(jobEventObservable)
go crd.manageFinalizer(jobEventCh)
// Watch for FlinkJob creation
crd.watchFlinkJobs(jobEventCh)
return &crd
}

View File

@@ -10,13 +10,15 @@ import (
//go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE
type FlinkJobSpec struct {
Key string `json:"key"`
Name string `json:"name"`
Parallelism int `json:"parallelism"`
JarURI string `json:"jarUri"`
SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"`
Args []string `json:"args"`
Key string `json:"key"`
Name string `json:"name"`
Parallelism int `json:"parallelism"`
JarURI string `json:"jarUri"`
JarURIBasicAuthUsername *string `json:"jarURIBasicAuthUsername"`
JarURIBasicAuthPassword *string `json:"jarURIBasicAuthPassword"`
SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"`
Args []string `json:"args"`
}
type FlinkJobStatus struct {

View File

@@ -7,7 +7,6 @@ import (
"flink-kube-operator/pkg"
"github.com/reactivex/rxgo/v2"
"go.uber.org/zap"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -15,60 +14,57 @@ import (
"k8s.io/apimachinery/pkg/watch"
)
func (crd Crd) watchFlinkJobs() rxgo.Observable {
ch := make(chan rxgo.Item)
func (crd Crd) watchFlinkJobs(ch chan FlinkJobCrdEvent) {
go func() {
pkg.Logger.Debug("[crd] starting watch")
watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{})
if err != nil {
panic(err)
}
defer watcher.Stop()
namespace := os.Getenv("NAMESPACE")
pkg.Logger.Debug("[crd] [watch]", zap.String("namespace", namespace))
for event := range watcher.ResultChan() {
unstructuredJob := event.Object.(*unstructured.Unstructured)
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
for {
pkg.Logger.Debug("[crd] starting watch")
watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{})
if err != nil {
pkg.Logger.Error("[crd] [watch]cannot create unstructured map", zap.Error(err))
continue
panic(err)
}
job := &v1alpha1.FlinkJob{}
namespace := os.Getenv("NAMESPACE")
pkg.Logger.Debug("[crd] [watch]", zap.String("namespace", namespace))
for event := range watcher.ResultChan() {
unstructuredJob := event.Object.(*unstructured.Unstructured)
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
if err != nil {
pkg.Logger.Error("[crd] [watch]cannot create unstructured map", zap.Error(err))
continue
}
job := &v1alpha1.FlinkJob{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job)
if err != nil {
pkg.Logger.Error("[crd] [watch]cannot convert unstructured to structured", zap.Error(err))
continue
}
if job.Namespace != namespace {
continue
}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredMap, job)
if err != nil {
pkg.Logger.Error("[crd] [watch]cannot convert unstructured to structured", zap.Error(err))
continue
}
if job.Namespace != namespace {
continue
}
go func() {
ch <- rxgo.Item{
V: &FlinkJobCrdEvent{
go func() {
ch <- FlinkJobCrdEvent{
EventType: event.Type,
Job: job,
},
}
}()
pkg.Logger.Debug("[crd] [watch] change in", zap.String("name", job.Name), zap.String("operation", string(event.Type)))
switch event.Type {
case watch.Bookmark:
case watch.Modified:
//pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
crd.repsert(job)
case watch.Added:
//pkg.Logger.Info("[crd] [watch] new flink job created")
crd.repsert(job)
case watch.Deleted:
crd.remove(job.UID)
}
}()
pkg.Logger.Debug("[crd] [watch] change in", zap.String("name", job.Name))
switch event.Type {
case watch.Bookmark:
case watch.Modified:
//pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
crd.repsert(job)
case watch.Added:
//pkg.Logger.Info("[crd] [watch] new flink job created")
crd.repsert(job)
case watch.Deleted:
crd.remove(job.UID)
}
defer watcher.Stop()
pkg.Logger.Warn("[crd] [watch] Watcher stopped, restarting...")
}
}()
return rxgo.FromChannel(ch)
}

View File

@@ -1,27 +1,34 @@
package jar
import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
"errors"
"io"
"net/http"
"net/http/cookiejar"
"os"
"strings"
"flink-kube-operator/pkg"
api "github.com/logi-camp/go-flink-client"
gonanoid "github.com/matoous/go-nanoid/v2"
"go.uber.org/zap"
)
type JarFile struct {
uri string
filePath string
uri string
filePath string
basicAuthUsername *string
basicAuthPassword *string
}
func NewJarFile(URI string) (*JarFile, error) {
func NewJarFile(URI string, basicAuthUsername *string, basicAuthPassword *string) (*JarFile, error) {
jarFile := &JarFile{
uri: URI,
uri: URI,
basicAuthUsername: basicAuthUsername,
basicAuthPassword: basicAuthPassword,
}
err := jarFile.Download()
if err != nil {
@@ -46,7 +53,9 @@ func (jarFile *JarFile) Upload(flinkClient *api.Client) (fileName string, err er
}
func (jarFile *JarFile) Download() error {
fileName, _ := gonanoid.New()
randBytes := make([]byte, 16)
rand.Read(randBytes)
fileName := hex.EncodeToString(randBytes)
jarFile.filePath = "/tmp/" + fileName + ".jar"
out, err := os.Create(jarFile.filePath)
if err != nil {
@@ -54,9 +63,45 @@ func (jarFile *JarFile) Download() error {
}
defer out.Close()
resp, err := http.Get(jarFile.uri)
if err != nil || resp.StatusCode > 299 {
var resp *http.Response
if jarFile.basicAuthPassword != nil && jarFile.basicAuthUsername != nil {
basicAuth := func(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
redirectPolicyFunc := func(req *http.Request, via []*http.Request) error {
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
return nil
}
client := &http.Client{
Jar: &cookiejar.Jar{},
CheckRedirect: redirectPolicyFunc,
}
req, err := http.NewRequest("GET", jarFile.uri, nil)
if err != nil {
jarFile.delete()
return err
}
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
resp, err = client.Do(req)
} else {
resp, err = http.Get(jarFile.uri)
}
if err != nil {
jarFile.delete()
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}
if resp.StatusCode > 299 {
respBody := []byte{}
resp.Body.Read(respBody)
err = errors.New(string(respBody) + " status:" + resp.Status)
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}

View File

@@ -9,7 +9,7 @@ import (
// upload jar file and set the jarId for later usages
func (job *ManagedJob) upload() error {
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI)
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI, job.def.Spec.JarURIBasicAuthUsername, job.def.Spec.JarURIBasicAuthPassword)
if err != nil {
pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err))
return err

View File

@@ -96,8 +96,6 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
}
pkg.Logger.Debug("[manager] [cycle] finding job", zap.Any("name", managedJob.def.Name))
jobManagerJobOverview, jobFound := lo.Find(jobManagerJobOverviews.Jobs, func(job api.JobOverview) bool {
jobId := managedJob.GetJobId()
if jobId != nil {