8 Commits

19 changed files with 145 additions and 32 deletions

View File

@@ -27,6 +27,7 @@ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10
RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-2/zstd-jni-1.5.7-2.jar -P /opt/flink/lib/
# Command to start Flink JobManager and TaskManager in a mini-cluster setup # Command to start Flink JobManager and TaskManager in a mini-cluster setup
CMD ["bin/start-cluster.sh"] CMD ["bin/start-cluster.sh"]

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.23.2
require ( require (
github.com/danielgtaylor/huma/v2 v2.27.0 github.com/danielgtaylor/huma/v2 v2.27.0
github.com/gofiber/fiber/v2 v2.52.6 github.com/gofiber/fiber/v2 v2.52.6
github.com/logi-camp/go-flink-client v0.2.1 github.com/logi-camp/go-flink-client v0.2.0
github.com/samber/lo v1.47.0 github.com/samber/lo v1.47.0
go.uber.org/zap v1.27.0 go.uber.org/zap v1.27.0
k8s.io/apimachinery v0.31.3 k8s.io/apimachinery v0.31.3

View File

@@ -2,9 +2,5 @@ apiVersion: v2
name: flink-kube-operator name: flink-kube-operator
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
type: application type: application
version: 1.0.0 version: 1.2.1
appVersion: "0.1.1" appVersion: "0.1.1"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2

Binary file not shown.

View File

@@ -0,0 +1,12 @@
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-checkpoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.checkpoint.size }}
{{- end }}

View File

@@ -18,14 +18,24 @@
high-availability.type: kubernetes high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }} kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }} kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }} execution.checkpointing.interval: {{ .Values.flink.state.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }} execution.checkpointing.mode: {{ .Values.flink.state.checkpoint.mode }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
state.checkpoints.dir: file:///opt/flink/checkpoints/
{{- else if eq .Values.flink.state.checkpoint.storageType "s3" }}
state.checkpoints.dir: s3://flink/checkpoints/ state.checkpoints.dir: s3://flink/checkpoints/
{{- end }}
state.backend.rocksdb.localdir: /opt/flink/rocksdb state.backend.rocksdb.localdir: /opt/flink/rocksdb
high-availability.storageDir: /opt/flink/ha high-availability.storageDir: /opt/flink/ha
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
state.savepoints.dir: file:///opt/flink/checkpoints/
{{- else if eq .Values.flink.state.savepoint.storageType "s3" }}
state.savepoints.dir: s3://flink/savepoints/ state.savepoints.dir: s3://flink/savepoints/
{{- end }}
state.backend.incremental: {{ .Values.flink.state.incremental }} state.backend.incremental: {{ .Values.flink.state.incremental }}
rest.profiling.enabled: true rest.profiling.enabled: true
s3.endpoint: http://{{ .Release.Name }}-minio:9000 s3.endpoint: http://{{ .Release.Name }}-minio:9000
s3.path.style.access: true s3.path.style.access: true
{{- toYaml .Values.flink.properties | default "" | nindent 4 }}
{{- end }} {{- end }}

View File

@@ -1,7 +1,7 @@
apiVersion: v1 apiVersion: v1
kind: PersistentVolumeClaim kind: PersistentVolumeClaim
metadata: metadata:
name: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }} name: {{ .Release.Name }}-flink-ha-pvc
spec: spec:
accessModes: accessModes:
- ReadWriteOnce - ReadWriteOnce

View File

@@ -64,12 +64,28 @@ spec:
volumeMounts: volumeMounts:
- name: flink-ha - name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }} mountPath: {{ .Values.flink.state.ha.dir }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
mountPath: /opt/flink/savepoint
{{- end }}
volumes: volumes:
- name: flink-ha - name: flink-ha
persistentVolumeClaim: persistentVolumeClaim:
claimName: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }} claimName: {{ .Release.Name }}-flink-ha-pvc
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-savepoint-pvc
{{- end }}
{{- with .Values.nodeSelector }} {{- with .Values.nodeSelector }}
nodeSelector: nodeSelector:
{{- toYaml . | nindent 8 }} {{- toYaml . | nindent 8 }}

View File

@@ -0,0 +1,12 @@
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-savepoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.savepoint.size }}
{{- end }}

View File

@@ -46,8 +46,18 @@ spec:
volumeMounts: volumeMounts:
- name: rocksdb-storage - name: rocksdb-storage
mountPath: /opt/flink/rocksdb mountPath: /opt/flink/rocksdb
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
resources: resources:
{{- toYaml .Values.flink.taskManager.resources | nindent 10 }} {{- toYaml .Values.flink.taskManager.resources | nindent 10 }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
volumes:
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
volumeClaimTemplates: volumeClaimTemplates:
- metadata: - metadata:
name: rocksdb-storage name: rocksdb-storage

View File

@@ -38,8 +38,7 @@ podAnnotations: {}
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ # For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
podLabels: {} podLabels: {}
podSecurityContext: {} podSecurityContext: {} # fsGroup: 2000
# fsGroup: 2000
securityContext: {} securityContext: {}
# capabilities: # capabilities:
@@ -64,10 +63,10 @@ ingress:
# kubernetes.io/ingress.class: nginx # kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true" # kubernetes.io/tls-acme: "true"
hosts: hosts:
- host: chart-example.local - host: chart-example.local
paths: paths:
- path: / - path: /
pathType: ImplementationSpecific pathType: ImplementationSpecific
tls: [] tls: []
# - secretName: chart-example-tls # - secretName: chart-example-tls
# hosts: # hosts:
@@ -106,7 +105,6 @@ autoscaling:
config: config:
flinkApiUrl: flink:8081 flinkApiUrl: flink:8081
nodeSelector: {} nodeSelector: {}
tolerations: [] tolerations: []
@@ -120,35 +118,41 @@ flink:
tag: 1.20.1-scala_2.12-java17-minicluster tag: 1.20.1-scala_2.12-java17-minicluster
parallelism: parallelism:
default: 1 # Default parallelism for Flink jobs default: 1 # Default parallelism for Flink jobs
checkpoint:
interval: 5min
mode: EXACTLY_ONCE
state: state:
backend: rocksdb # Use RocksDB for state backend backend: rocksdb # Use RocksDB for state backend
incremental: true incremental: true
ha: ha:
dir: "/opt/flink/ha" # Directory to store ha data dir: "/opt/flink/ha" # Directory to store ha data
pvcName: flink-ha-pvc # PVC for ha pvcName: flink-ha-pvc # PVC for ha
size: 10Gi # PVC size for ha size: 10Gi # PVC size for ha
checkpoint:
storageType: s3 # s3 / filesystem
interval: 5min
mode: EXACTLY_ONCE
size: 8Gi
savepoint:
storageType: s3
size: 8Gi
jobManager: jobManager:
processMemory: 4096m # Size of job manager process memory processMemory: 4096m # Size of job manager process memory
properties:
jobmanager.rpc.timeout: 300s
taskManager: taskManager:
numberOfTaskSlots: 12 # Number of task slots for task manager numberOfTaskSlots: 12 # Number of task slots for task manager
processMemory: 4096m # Size of task manager process memory processMemory: 4096m # Size of task manager process memory
replicas: 1 # Number of task manager replicas replicas: 1 # Number of task manager replicas
storage: storage:
rocksDb: rocksDb:
size: 4Gi size: 4Gi
resources: resources:
limits: limits:
cpu: 3 cpu: 3
memory: 4Gi memory: 4Gi
requests: requests:
cpu: 1 cpu: 1
memory: 2Gi memory: 2Gi

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,6 +1,58 @@
apiVersion: v1 apiVersion: v1
entries: entries:
flink-kube-operator: flink-kube-operator:
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T14:01:29.891695937+03:30"
description: Helm chart for flink kube operator
digest: 404ed2c28ff43b630b44c1215be5369417a1b9b2747ae24e2963a6b81813e7dc
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.1.tgz
version: 1.2.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T12:47:25.848097207+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 3458b9be97d2a4bcf8574706e44ea9f7fdeb11e83058a615566e6e094a51b920
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.0.tgz
version: 1.2.0
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-15T12:06:59.425538953+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 2b307a113476eebb34f58308bf1d4d0d36ca5e08fe6541f369a1c231ae0a71be
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.1.2.tgz
version: 1.1.2
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-12T23:13:39.394371646+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 14b08b443b4118cee4c279f62b498bc040b4a3e7ebafa8e195606e3d9b21810a
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.1.tgz
version: 1.0.1
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.1 appVersion: 0.1.1
created: "2025-04-06T01:52:09.478716316+03:30" created: "2025-04-06T01:52:09.478716316+03:30"
@@ -165,4 +217,4 @@ entries:
urls: urls:
- flink-kube-operator-0.1.0.tgz - flink-kube-operator-0.1.0.tgz
version: 0.1.0 version: 0.1.0
generated: "2025-04-06T01:52:09.466886557+03:30" generated: "2025-05-17T14:01:29.891695937+03:30"

View File

@@ -43,7 +43,7 @@ func (job *ManagedJob) Run(restoreMode bool) error {
EntryClass: job.def.Spec.EntryClass, EntryClass: job.def.Spec.EntryClass,
SavepointPath: savepointPath, SavepointPath: savepointPath,
Parallelism: job.def.Spec.Parallelism, Parallelism: job.def.Spec.Parallelism,
ProgramArgsList: job.def.Spec.Args, ProgramArg: job.def.Spec.Args,
}) })
if err == nil { if err == nil {
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp)) pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))