12 Commits

21 changed files with 165 additions and 41 deletions

View File

@@ -1,4 +1,4 @@
FROM public.ecr.aws/docker/library/golang:1.23.4-bookworm AS build FROM public.ecr.aws/docker/library/golang:1.24.1-bookworm AS build
ARG upx_version=4.2.4 ARG upx_version=4.2.4

View File

@@ -27,6 +27,7 @@ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10
RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-2/zstd-jni-1.5.7-2.jar -P /opt/flink/lib/
# Command to start Flink JobManager and TaskManager in a mini-cluster setup # Command to start Flink JobManager and TaskManager in a mini-cluster setup
CMD ["bin/start-cluster.sh"] CMD ["bin/start-cluster.sh"]

2
go.sum
View File

@@ -64,6 +64,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/logi-camp/go-flink-client v0.2.0 h1:PIyfJq7FjW28bnvemReCicIuQD7JzVgJDk2xPTZUS2s= github.com/logi-camp/go-flink-client v0.2.0 h1:PIyfJq7FjW28bnvemReCicIuQD7JzVgJDk2xPTZUS2s=
github.com/logi-camp/go-flink-client v0.2.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= github.com/logi-camp/go-flink-client v0.2.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
github.com/logi-camp/go-flink-client v0.2.1 h1:STfKamFm9+2SxxfZO3ysdFsb5MViQdThB4UHbnkUOE8=
github.com/logi-camp/go-flink-client v0.2.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=

View File

@@ -2,9 +2,5 @@ apiVersion: v2
name: flink-kube-operator name: flink-kube-operator
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
type: application type: application
version: 0.1.14 version: 1.2.0
appVersion: "0.1.0" appVersion: "0.1.1"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2

Binary file not shown.

View File

@@ -0,0 +1,12 @@
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-checkpoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.checkpoint.size }}
{{- end }}

View File

@@ -18,14 +18,24 @@
high-availability.type: kubernetes high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }} kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }} kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }} execution.checkpointing.interval: {{ .Values.flink.state.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }} execution.checkpointing.mode: {{ .Values.flink.state.checkpoint.mode }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
state.checkpoints.dir: file:///opt/flink/checkpoints/
{{- else if eq .Values.flink.state.checkpoint.storageType "s3" }}
state.checkpoints.dir: s3://flink/checkpoints/ state.checkpoints.dir: s3://flink/checkpoints/
{{- end }}
state.backend.rocksdb.localdir: /opt/flink/rocksdb state.backend.rocksdb.localdir: /opt/flink/rocksdb
high-availability.storageDir: /opt/flink/ha high-availability.storageDir: /opt/flink/ha
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
state.savepoints.dir: file:///opt/flink/checkpoints/
{{- else if eq .Values.flink.state.savepoint.storageType "s3" }}
state.savepoints.dir: s3://flink/savepoints/ state.savepoints.dir: s3://flink/savepoints/
{{- end }}
state.backend.incremental: {{ .Values.flink.state.incremental }} state.backend.incremental: {{ .Values.flink.state.incremental }}
rest.profiling.enabled: true rest.profiling.enabled: true
s3.endpoint: http://{{ .Release.Name }}-minio:9000 s3.endpoint: http://{{ .Release.Name }}-minio:9000
s3.path.style.access: true s3.path.style.access: true
{{- toYaml .Values.flink.properties | default "" | nindent 4 }}
{{- end }} {{- end }}

View File

@@ -1,7 +1,7 @@
apiVersion: v1 apiVersion: v1
kind: PersistentVolumeClaim kind: PersistentVolumeClaim
metadata: metadata:
name: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }} name: {{ .Release.Name }}-flink-ha-pvc
spec: spec:
accessModes: accessModes:
- ReadWriteOnce - ReadWriteOnce

View File

@@ -64,12 +64,28 @@ spec:
volumeMounts: volumeMounts:
- name: flink-ha - name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }} mountPath: {{ .Values.flink.state.ha.dir }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
mountPath: /opt/flink/savepoint
{{- end }}
volumes: volumes:
- name: flink-ha - name: flink-ha
persistentVolumeClaim: persistentVolumeClaim:
claimName: {{ .Release.Name }}-{{ .Values.flink.state.ha.pvcName }} claimName: {{ .Release.Name }}-flink-ha-pvc
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-savepoint-pvc
{{- end }}
{{- with .Values.nodeSelector }} {{- with .Values.nodeSelector }}
nodeSelector: nodeSelector:
{{- toYaml . | nindent 8 }} {{- toYaml . | nindent 8 }}

View File

@@ -0,0 +1,12 @@
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-savepoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.savepoint.size }}
{{- end }}

View File

@@ -43,11 +43,25 @@ spec:
secretKeyRef: secretKeyRef:
name: {{ .Release.Name }}-flink-secrets name: {{ .Release.Name }}-flink-secrets
key: minio_secret_key key: minio_secret_key
- name: FLINK_TASKMANAGER_HOST
valueFrom:
fieldRef:
fieldPath: spec.hostname
volumeMounts: volumeMounts:
- name: rocksdb-storage - name: rocksdb-storage
mountPath: /opt/flink/rocksdb mountPath: /opt/flink/rocksdb
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
resources: resources:
{{- toYaml .Values.flink.taskManager.resources | nindent 10 }} {{- toYaml .Values.flink.taskManager.resources | nindent 10 }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
volumes:
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
volumeClaimTemplates: volumeClaimTemplates:
- metadata: - metadata:
name: rocksdb-storage name: rocksdb-storage

View File

@@ -38,8 +38,7 @@ podAnnotations: {}
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ # For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
podLabels: {} podLabels: {}
podSecurityContext: {} podSecurityContext: {} # fsGroup: 2000
# fsGroup: 2000
securityContext: {} securityContext: {}
# capabilities: # capabilities:
@@ -64,10 +63,10 @@ ingress:
# kubernetes.io/ingress.class: nginx # kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true" # kubernetes.io/tls-acme: "true"
hosts: hosts:
- host: chart-example.local - host: chart-example.local
paths: paths:
- path: / - path: /
pathType: ImplementationSpecific pathType: ImplementationSpecific
tls: [] tls: []
# - secretName: chart-example-tls # - secretName: chart-example-tls
# hosts: # hosts:
@@ -106,7 +105,6 @@ autoscaling:
config: config:
flinkApiUrl: flink:8081 flinkApiUrl: flink:8081
nodeSelector: {} nodeSelector: {}
tolerations: [] tolerations: []
@@ -120,35 +118,41 @@ flink:
tag: 1.20.1-scala_2.12-java17-minicluster tag: 1.20.1-scala_2.12-java17-minicluster
parallelism: parallelism:
default: 1 # Default parallelism for Flink jobs default: 1 # Default parallelism for Flink jobs
checkpoint:
interval: 5min
mode: EXACTLY_ONCE
state: state:
backend: rocksdb # Use RocksDB for state backend backend: rocksdb # Use RocksDB for state backend
incremental: true incremental: true
ha: ha:
dir: "/opt/flink/ha" # Directory to store ha data dir: "/opt/flink/ha" # Directory to store ha data
pvcName: flink-ha-pvc # PVC for ha pvcName: flink-ha-pvc # PVC for ha
size: 10Gi # PVC size for ha size: 10Gi # PVC size for ha
checkpoint:
storageType: s3 # s3 / filesystem
interval: 5min
mode: EXACTLY_ONCE
size: 8Gi
savepoint:
storageType: s3
size: 8Gi
jobManager: jobManager:
processMemory: 4096m # Size of job manager process memory processMemory: 4096m # Size of job manager process memory
properties:
jobmanager.rpc.timeout: 300s
taskManager: taskManager:
numberOfTaskSlots: 12 # Number of task slots for TaskManager numberOfTaskSlots: 12 # Number of task slots for task manager
processMemory: 4096m # Size of task manager process memory processMemory: 4096m # Size of task manager process memory
replicas: 1 replicas: 1 # Number of task manager replicas
storage: storage:
rocksDb: rocksDb:
size: 4Gi size: 4Gi
resources: resources:
limits: limits:
cpu: 3 cpu: 3
memory: 4Gi memory: 4Gi
requests: requests:
cpu: 1 cpu: 1
memory: 2Gi memory: 2Gi

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,6 +1,62 @@
apiVersion: v1 apiVersion: v1
entries: entries:
flink-kube-operator: flink-kube-operator:
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T12:47:25.848097207+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 3458b9be97d2a4bcf8574706e44ea9f7fdeb11e83058a615566e6e094a51b920
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.0.tgz
version: 1.2.0
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-15T12:06:59.425538953+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 2b307a113476eebb34f58308bf1d4d0d36ca5e08fe6541f369a1c231ae0a71be
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.1.2.tgz
version: 1.1.2
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-12T23:13:39.394371646+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 14b08b443b4118cee4c279f62b498bc040b4a3e7ebafa8e195606e3d9b21810a
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.1.tgz
version: 1.0.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-06T01:52:09.478716316+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: e177bc2f11987f4add27c09e521476eabaa456df1b9621321200b58f3a330813
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.0.tgz
version: 1.0.0
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.971040367+03:30" created: "2025-04-04T13:50:27.971040367+03:30"
@@ -151,4 +207,4 @@ entries:
urls: urls:
- flink-kube-operator-0.1.0.tgz - flink-kube-operator-0.1.0.tgz
version: 0.1.0 version: 0.1.0
generated: "2025-04-04T13:50:27.967565847+03:30" generated: "2025-04-15T12:06:59.397928815+03:30"

View File

@@ -13,7 +13,7 @@ func (job *ManagedJob) Cycle() {
// pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName())) // pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName()))
// Init job // Init job
if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" { if job.def.Status.LifeCycleStatus == "" && (job.def.Status.JobStatus == "" || job.def.Status.JobStatus == v1alpha1.JobStatusFinished) {
job.Run(false) job.Run(false)
return return
} }

View File

@@ -117,14 +117,15 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
"status": patchStatusObj, "status": patchStatusObj,
}) })
} else { } else {
patchStatusObj := map[string]interface{}{ // TODO handle job not found status
"jobStatus": "", // patchStatusObj := map[string]interface{}{
"lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed), // "jobStatus": "",
} // "lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed),
// }
crdInstance.Patch(uid, map[string]interface{}{ // crdInstance.Patch(uid, map[string]interface{}{
"status": patchStatusObj, // "status": patchStatusObj,
}) // })
} }
managedJob.Cycle() managedJob.Cycle()