9 Commits

28 changed files with 78 additions and 42 deletions

2
.vscode/launch.json vendored
View File

@@ -10,7 +10,7 @@
"request": "launch", "request": "launch",
"mode": "auto", "mode": "auto",
"env": { "env": {
"FLINK_API_URL": "127.0.0.1:8081", "FLINK_API_URL": "flink.bz2:8081",
"SAVEPOINT_PATH": "/opt/flink/savepoints" "SAVEPOINT_PATH": "/opt/flink/savepoints"
}, },
"cwd": "${workspaceFolder}", "cwd": "${workspaceFolder}",

View File

@@ -17,6 +17,11 @@ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafk
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/
# Command to start Flink JobManager and TaskManager in a mini-cluster setup # Command to start Flink JobManager and TaskManager in a mini-cluster setup
CMD ["bin/start-cluster.sh"] CMD ["bin/start-cluster.sh"]

6
README.md Normal file
View File

@@ -0,0 +1,6 @@
Installation:
```bash
helm repo add lc-flink-operator https://git.logicamp.tech/Logicamp/flink-kube-operator/raw/branch/main/helm/
helm install flink-kube-operator lc-flink-operator/flink-kube-operator
```

View File

@@ -36,6 +36,10 @@ spec:
type: integer type: integer
jarUri: jarUri:
type: string type: string
args:
type: array
items:
type: string
savepointInterval: savepointInterval:
type: string type: string
format: duration format: duration

View File

@@ -1,24 +0,0 @@
apiVersion: v2
name: flink-kube-operator
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.16.0"

6
helm/chart/Chart.yaml Normal file
View File

@@ -0,0 +1,6 @@
apiVersion: v2
name: flink-kube-operator
description: Helm chart for flink kube operator
type: application
version: 0.1.1
appVersion: "0.1.0"

View File

@@ -1,21 +1,20 @@
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: {{ .Release.Name }}-flink # Adding the flink prefix to the name name: {{ .Release.Name }}-flink
labels: labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the labels app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }} # Using the release name for instance app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: Helm
spec: spec:
replicas: 1 replicas: 1
selector: selector:
matchLabels: matchLabels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the selector app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/instance: {{ .Release.Name }}
template: template:
metadata: metadata:
labels: labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the template labels app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/instance: {{ .Release.Name }}
spec: spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }} serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
@@ -54,9 +53,6 @@ spec:
- name: flink-data - name: flink-data
mountPath: /opt/flink/data mountPath: /opt/flink/data
subPath: data subPath: data
- name: flink-data
mountPath: /opt/flink/web-upload
subPath: web-upload
- name: flink-savepoints - name: flink-savepoints
mountPath: /opt/flink/savepoints mountPath: /opt/flink/savepoints
- name: flink-savepoints - name: flink-savepoints

View File

@@ -1,7 +1,7 @@
apiVersion: v1 apiVersion: v1
kind: PersistentVolumeClaim kind: PersistentVolumeClaim
metadata: metadata:
name: {{ .Values.flink.state.savepoints.pvcName }} # Adding the flink prefix to PVC name name: {{ .Values.flink.state.savepoints.pvcName }}
spec: spec:
accessModes: accessModes:
- ReadWriteOnce - ReadWriteOnce

View File

@@ -1,15 +1,15 @@
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: flink # Adding the flink prefix to the service name name: flink
labels: labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to labels app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/instance: {{ .Release.Name }}
spec: spec:
ports: ports:
- port: 8081 - port: 8081
targetPort: 8081 targetPort: 8081
selector: selector:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to selector app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/instance: {{ .Release.Name }}
type: ClusterIP # Change to LoadBalancer if you want external access type: ClusterIP # Change to LoadBalancer if you want external access

Binary file not shown.

Binary file not shown.

24
helm/index.yaml Normal file
View File

@@ -0,0 +1,24 @@
apiVersion: v1
entries:
flink-kube-operator:
- apiVersion: v2
appVersion: 0.1.0
created: "2024-12-19T00:39:44.4857163+03:30"
description: Helm chart for flink kube operator
digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.1.tgz
version: 0.1.1
- apiVersion: v2
appVersion: 0.1.0
created: "2024-12-19T00:39:44.485286485+03:30"
description: Helm chart for flink kube operator
digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.0.tgz
version: 0.1.0
generated: "2024-12-19T00:39:44.48463577+03:30"

View File

@@ -14,6 +14,7 @@ import (
func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error { func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
job := GetJob(jobUid) job := GetJob(jobUid)
pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid))
patchBytes, err := json.Marshal(patchData) patchBytes, err := json.Marshal(patchData)
if err != nil { if err != nil {

View File

@@ -16,6 +16,7 @@ type FlinkJobSpec struct {
JarURI string `json:"jarUri"` JarURI string `json:"jarUri"`
SavepointInterval metaV1.Duration `json:"savepointInterval"` SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"` EntryClass string `json:"entryClass"`
Args []string `json:"args"`
} }
type FlinkJobStatus struct { type FlinkJobStatus struct {
@@ -54,6 +55,7 @@ var (
ErrNoJarId = errors.New("[managed-job] no jar id") ErrNoJarId = errors.New("[managed-job] no jar id")
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id") ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path") ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
ErrOnStartingJob = errors.New("[managed-job] error on starting job")
) )
type JobStatus string type JobStatus string

View File

@@ -26,7 +26,6 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
} }
defer watcher.Stop() defer watcher.Stop()
for event := range watcher.ResultChan() { for event := range watcher.ResultChan() {
pkg.Logger.Debug("[crd] event received", zap.Any("type", event.Type))
unstructuredJob := event.Object.(*unstructured.Unstructured) unstructuredJob := event.Object.(*unstructured.Unstructured)
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object) unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
if err != nil { if err != nil {
@@ -50,10 +49,10 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
switch event.Type { switch event.Type {
case watch.Bookmark: case watch.Bookmark:
case watch.Modified: case watch.Modified:
pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName())) //pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
crd.repsert(job) crd.repsert(job)
case watch.Added: case watch.Added:
pkg.Logger.Info("[crd] [watch] new flink job created") //pkg.Logger.Info("[crd] [watch] new flink job created")
crd.repsert(job) crd.repsert(job)
case watch.Deleted: case watch.Deleted:
} }

View File

@@ -41,6 +41,12 @@ func (job *ManagedJob) Cycle() {
if job.def.Status.JobStatus == v1alpha1.JobStatusCreating { if job.def.Status.JobStatus == v1alpha1.JobStatusCreating {
return return
} }
if job.def.Status.JobStatus == v1alpha1.JobStatusFailed {
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
job.crd.SetJobStatus(job.def.UID, job.def.Status)
return
}
// if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil { // if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
// //job.restore() // //job.restore()
// return // return

View File

@@ -42,16 +42,25 @@ func (job *ManagedJob) run(restoreMode bool) error {
AllowNonRestoredState: true, AllowNonRestoredState: true,
EntryClass: job.def.Spec.EntryClass, EntryClass: job.def.Spec.EntryClass,
SavepointPath: savepointPath, SavepointPath: savepointPath,
Parallelism: job.def.Spec.Parallelism,
ProgramArg: job.def.Spec.Args,
}) })
if err == nil { if err == nil {
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp)) pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))
jobId = &runJarResp.JobId jobId = &runJarResp.JobId
break break
} else { } else {
if strings.ContainsAny(err.Error(), ".jar does not exist") { if strings.Contains(err.Error(), ".jar does not exist") {
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
shouldUpload = true shouldUpload = true
} else { } else {
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err)) pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
stringErr := err.Error()
job.def.Status.Error = &stringErr
job.def.Status.JobStatus = ""
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
job.crd.SetJobStatus(job.def.UID, job.def.Status)
return v1alpha1.ErrOnStartingJob
} }
} }
} }
@@ -66,6 +75,7 @@ func (job *ManagedJob) run(restoreMode bool) error {
}) })
return nil return nil
} }
shouldUpload = false
continue continue
} }
return nil return nil

View File

@@ -58,6 +58,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
// Loop over job definitions as Kubernetes CRD // Loop over job definitions as Kubernetes CRD
for _, uid := range crd.GetAllJobKeys() { for _, uid := range crd.GetAllJobKeys() {
pkg.Logger.Debug("mgr.processingJobsIds", zap.Any("processingJobIds", mgr.processingJobsIds))
if lo.Contains(mgr.processingJobsIds, uid) { if lo.Contains(mgr.processingJobsIds, uid) {
pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid)) pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid))
continue continue