9 Commits

20 changed files with 165 additions and 32 deletions

View File

@@ -27,6 +27,7 @@ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10
RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-2/zstd-jni-1.5.7-2.jar -P /opt/flink/lib/
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
CMD ["bin/start-cluster.sh"]

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.23.2
require (
github.com/danielgtaylor/huma/v2 v2.27.0
github.com/gofiber/fiber/v2 v2.52.6
github.com/logi-camp/go-flink-client v0.2.1
github.com/logi-camp/go-flink-client v0.2.0
github.com/samber/lo v1.47.0
go.uber.org/zap v1.27.0
k8s.io/apimachinery v0.31.3

View File

@@ -2,9 +2,5 @@ apiVersion: v2
name: flink-kube-operator
description: Helm chart for flink kube operator
type: application
version: 1.0.0
version: 1.2.2
appVersion: "0.1.1"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2

Binary file not shown.

View File

@@ -0,0 +1,12 @@
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-checkpoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.checkpoint.size }}
{{- end }}

View File

@@ -18,14 +18,24 @@
high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }}
execution.checkpointing.interval: {{ .Values.flink.state.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.state.checkpoint.mode }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
state.checkpoints.dir: file:///opt/flink/checkpoints/
{{- else if eq .Values.flink.state.checkpoint.storageType "s3" }}
state.checkpoints.dir: s3://flink/checkpoints/
{{- end }}
state.backend.rocksdb.localdir: /opt/flink/rocksdb
high-availability.storageDir: /opt/flink/ha
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
state.savepoints.dir: file:///opt/flink/checkpoints/
{{- else if eq .Values.flink.state.savepoint.storageType "s3" }}
state.savepoints.dir: s3://flink/savepoints/
{{- end }}
state.backend.incremental: {{ .Values.flink.state.incremental }}
rest.profiling.enabled: true
s3.endpoint: http://{{ .Release.Name }}-minio:9000
s3.path.style.access: true
{{- toYaml .Values.flink.properties | default "" | nindent 4 }}
{{- end }}

View File

@@ -1,7 +1,7 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }}
name: {{ .Release.Name }}-flink-ha-pvc
spec:
accessModes:
- ReadWriteOnce

View File

@@ -64,12 +64,28 @@ spec:
volumeMounts:
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
mountPath: /opt/flink/savepoint
{{- end }}
volumes:
- name: flink-ha
persistentVolumeClaim:
claimName: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }}
claimName: {{ .Release.Name }}-flink-ha-pvc
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-savepoint-pvc
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@@ -0,0 +1,12 @@
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-savepoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.savepoint.size }}
{{- end }}

View File

@@ -46,8 +46,28 @@ spec:
volumeMounts:
- name: rocksdb-storage
mountPath: /opt/flink/rocksdb
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
mountPath: /opt/flink/savepoint
{{- end }}
resources:
{{- toYaml .Values.flink.taskManager.resources | nindent 10 }}
volumes:
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-savepoint-pvc
{{- end }}
volumeClaimTemplates:
- metadata:
name: rocksdb-storage

View File

@@ -38,8 +38,7 @@ podAnnotations: {}
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
podLabels: {}
podSecurityContext: {}
# fsGroup: 2000
podSecurityContext: {} # fsGroup: 2000
securityContext: {}
# capabilities:
@@ -106,7 +105,6 @@ autoscaling:
config:
flinkApiUrl: flink:8081
nodeSelector: {}
tolerations: []
@@ -122,10 +120,6 @@ flink:
parallelism:
default: 1 # Default parallelism for Flink jobs
checkpoint:
interval: 5min
mode: EXACTLY_ONCE
state:
backend: rocksdb # Use RocksDB for state backend
incremental: true
@@ -133,10 +127,20 @@ flink:
dir: "/opt/flink/ha" # Directory to store ha data
pvcName: flink-ha-pvc # PVC for ha
size: 10Gi # PVC size for ha
checkpoint:
storageType: s3 # s3 / filesystem
interval: 5min
mode: EXACTLY_ONCE
size: 8Gi
savepoint:
storageType: s3
size: 8Gi
jobManager:
processMemory: 4096m # Size of job manager process memory
properties:
jobmanager.rpc.timeout: 300s
taskManager:
numberOfTaskSlots: 12 # Number of task slots for task manager

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,6 +1,68 @@
apiVersion: v1
entries:
flink-kube-operator:
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T14:34:55.317942453+03:30"
description: Helm chart for flink kube operator
digest: 422a34dc173ebe29adccd46d7ef94505cc022ff20ccbfb85ac3e6e201cba476c
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.2.tgz
version: 1.2.2
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T14:01:29.891695937+03:30"
description: Helm chart for flink kube operator
digest: 404ed2c28ff43b630b44c1215be5369417a1b9b2747ae24e2963a6b81813e7dc
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.1.tgz
version: 1.2.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T12:47:25.848097207+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 3458b9be97d2a4bcf8574706e44ea9f7fdeb11e83058a615566e6e094a51b920
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.0.tgz
version: 1.2.0
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-15T12:06:59.425538953+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 2b307a113476eebb34f58308bf1d4d0d36ca5e08fe6541f369a1c231ae0a71be
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.1.2.tgz
version: 1.1.2
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-12T23:13:39.394371646+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 14b08b443b4118cee4c279f62b498bc040b4a3e7ebafa8e195606e3d9b21810a
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.1.tgz
version: 1.0.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-06T01:52:09.478716316+03:30"
@@ -165,4 +227,4 @@ entries:
urls:
- flink-kube-operator-0.1.0.tgz
version: 0.1.0
generated: "2025-04-06T01:52:09.466886557+03:30"
generated: "2025-05-17T14:34:55.317942453+03:30"

View File

@@ -43,7 +43,7 @@ func (job *ManagedJob) Run(restoreMode bool) error {
EntryClass: job.def.Spec.EntryClass,
SavepointPath: savepointPath,
Parallelism: job.def.Spec.Parallelism,
ProgramArgsList: job.def.Spec.Args,
ProgramArg: job.def.Spec.Args,
})
if err == nil {
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))