feat: apply new helm structure

use minio s3 for savepoint and checkpoint path
separate task-manager, job-manager and operator
use statefulset for task-manager to handle replication
support basic credential for download jar request
update to flink 1.20.1
This commit is contained in:
2025-04-05 01:39:02 +03:30
parent 7f78faeed7
commit 830e265162
26 changed files with 386 additions and 256 deletions

View File

@@ -10,13 +10,15 @@ import (
//go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE
type FlinkJobSpec struct {
Key string `json:"key"`
Name string `json:"name"`
Parallelism int `json:"parallelism"`
JarURI string `json:"jarUri"`
SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"`
Args []string `json:"args"`
Key string `json:"key"`
Name string `json:"name"`
Parallelism int `json:"parallelism"`
JarURI string `json:"jarUri"`
JarURIBasicAuthUsername *string `json:"jarURIBasicAuthUsername"`
JarURIBasicAuthPassword *string `json:"jarURIBasicAuthPassword"`
SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"`
Args []string `json:"args"`
}
type FlinkJobStatus struct {

View File

@@ -2,10 +2,12 @@ package jar
import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
"errors"
"io"
"net/http"
"net/http/cookiejar"
"os"
"strings"
@@ -16,13 +18,17 @@ import (
)
type JarFile struct {
uri string
filePath string
uri string
filePath string
basicAuthUsername *string
basicAuthPassword *string
}
func NewJarFile(URI string) (*JarFile, error) {
func NewJarFile(URI string, basicAuthUsername *string, basicAuthPassword *string) (*JarFile, error) {
jarFile := &JarFile{
uri: URI,
uri: URI,
basicAuthUsername: basicAuthUsername,
basicAuthPassword: basicAuthPassword,
}
err := jarFile.Download()
if err != nil {
@@ -57,9 +63,45 @@ func (jarFile *JarFile) Download() error {
}
defer out.Close()
resp, err := http.Get(jarFile.uri)
if err != nil || resp.StatusCode > 299 {
var resp *http.Response
if jarFile.basicAuthPassword != nil && jarFile.basicAuthUsername != nil {
basicAuth := func(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
redirectPolicyFunc := func(req *http.Request, via []*http.Request) error {
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
return nil
}
client := &http.Client{
Jar: &cookiejar.Jar{},
CheckRedirect: redirectPolicyFunc,
}
req, err := http.NewRequest("GET", jarFile.uri, nil)
if err != nil {
jarFile.delete()
return err
}
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
resp, err = client.Do(req)
} else {
resp, err = http.Get(jarFile.uri)
}
if err != nil {
jarFile.delete()
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}
if resp.StatusCode > 299 {
respBody := []byte{}
resp.Body.Read(respBody)
err = errors.New(string(respBody) + " status:" + resp.Status)
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}

View File

@@ -9,7 +9,7 @@ import (
// upload jar file and set the jarId for later usages
func (job *ManagedJob) upload() error {
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI)
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI, job.def.Spec.JarURIBasicAuthUsername, job.def.Spec.JarURIBasicAuthPassword)
if err != nil {
pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err))
return err