5 Commits

14 changed files with 80 additions and 14 deletions

View File

@@ -27,6 +27,7 @@ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10
RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-2/zstd-jni-1.5.7-2.jar -P /opt/flink/lib/
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
CMD ["bin/start-cluster.sh"]

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.23.2
require (
github.com/danielgtaylor/huma/v2 v2.27.0
github.com/gofiber/fiber/v2 v2.52.6
github.com/logi-camp/go-flink-client v0.2.1
github.com/logi-camp/go-flink-client v0.2.0
github.com/samber/lo v1.47.0
go.uber.org/zap v1.27.0
k8s.io/apimachinery v0.31.3

View File

@@ -2,7 +2,7 @@ apiVersion: v2
name: flink-kube-operator
description: Helm chart for flink kube operator
type: application
version: 1.0.0
version: 1.1.1
appVersion: "0.1.1"
dependencies:
- name: minio

View File

@@ -18,9 +18,13 @@
high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }}
execution.checkpointing.interval: {{ .Values.flink.state.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.state.checkpoint.mode }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
state.checkpoints.dir: file:///opt/flink/checkpoints/
{{- else if eq .Values.flink.state.checkpoint.storageType "s3" }}
state.checkpoints.dir: s3://flink/checkpoints/
{{- end }}
state.backend.rocksdb.localdir: /opt/flink/rocksdb
high-availability.storageDir: /opt/flink/ha
state.savepoints.dir: s3://flink/savepoints/
@@ -28,4 +32,6 @@
rest.profiling.enabled: true
s3.endpoint: http://{{ .Release.Name }}-minio:9000
s3.path.style.access: true
{{- toYaml .Values.flink.properties | default "" | nindent 4 }}
{{- end }}

View File

@@ -1,7 +1,7 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }}
name: {{ .Release.Name }}-flink-ha-pvc
spec:
accessModes:
- ReadWriteOnce

View File

@@ -64,12 +64,19 @@ spec:
volumeMounts:
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
volumes:
- name: flink-ha
persistentVolumeClaim:
claimName: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }}
claimName: {{ .Release.Name }}-flink-ha-pvc
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@@ -0,0 +1,10 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-checkpoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.checkpoint.size }}

View File

@@ -46,8 +46,18 @@ spec:
volumeMounts:
- name: rocksdb-storage
mountPath: /opt/flink/rocksdb
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
resources:
{{- toYaml .Values.flink.taskManager.resources | nindent 10 }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
volumes:
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
volumeClaimTemplates:
- metadata:
name: rocksdb-storage

View File

@@ -122,10 +122,6 @@ flink:
parallelism:
default: 1 # Default parallelism for Flink jobs
checkpoint:
interval: 5min
mode: EXACTLY_ONCE
state:
backend: rocksdb # Use RocksDB for state backend
incremental: true
@@ -133,10 +129,18 @@ flink:
dir: "/opt/flink/ha" # Directory to store ha data
pvcName: flink-ha-pvc # PVC for ha
size: 10Gi # PVC size for ha
checkpoint:
storageType: s3 # s3 / filesystem
interval: 5min
mode: EXACTLY_ONCE
size: 8Gi
jobManager:
processMemory: 4096m # Size of job manager process memory
properties:
jobmanager.rpc.timeout: 300s
taskManager:
numberOfTaskSlots: 12 # Number of task slots for task manager

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,6 +1,34 @@
apiVersion: v1
entries:
flink-kube-operator:
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-13T10:37:39.948174933+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 0b5f5e707279a564ad9e6c10fb3e565ff1af0ba2058b3f4bc04546dc8db8f68c
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.1.1.tgz
version: 1.1.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-12T23:13:39.394371646+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 14b08b443b4118cee4c279f62b498bc040b4a3e7ebafa8e195606e3d9b21810a
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.1.tgz
version: 1.0.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-06T01:52:09.478716316+03:30"
@@ -165,4 +193,4 @@ entries:
urls:
- flink-kube-operator-0.1.0.tgz
version: 0.1.0
generated: "2025-04-06T01:52:09.466886557+03:30"
generated: "2025-04-13T10:37:39.928098588+03:30"

View File

@@ -43,7 +43,7 @@ func (job *ManagedJob) Run(restoreMode bool) error {
EntryClass: job.def.Spec.EntryClass,
SavepointPath: savepointPath,
Parallelism: job.def.Spec.Parallelism,
ProgramArgsList: job.def.Spec.Args,
ProgramArg: job.def.Spec.Args,
})
if err == nil {
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))