17 Commits

Author SHA1 Message Date
5ca1c28b33 fix(helm): wrong savepoint path config when storage-type is filesystem 2025-07-18 18:12:21 +03:30
d73292ac54 fix: resolve missing task manager statefulset savepoint pvc mount 2025-05-17 14:35:22 +03:30
f0df5ff937 fix: wrong fieldPath in task-manager statefulset spec.hostname 2025-05-17 14:02:01 +03:30
83c4b5ded2 feat(helm): add filesystem savepoint storage mode 2025-05-17 13:02:24 +03:30
89647f3b5b fix(helm): add flink taskmanager host env to task manager 2025-04-15 12:08:17 +03:30
dedbe00fba fix(helm): wrong checkpoint path flink properties 2025-04-13 10:38:15 +03:30
62c340bc64 feat(helm): add filesystem checkpoint storage mode 2025-04-13 10:00:32 +03:30
44ff3627fc feat(helm): add flink properties variable to values 2025-04-12 23:14:52 +03:30
392004d99a ci(docker): add zstd dependency jar to flink docker file 2025-04-12 23:07:53 +03:30
22c7d712f4 feat: update flink http client library 2025-04-07 13:20:39 +03:30
2dd625ec7c feat: update flink http client library 2025-04-07 11:28:33 +03:30
c991215a9d Merge branch 'main' of https://git.logicamp.tech/Logicamp/flink-kube-operator 2025-04-06 08:48:54 +03:30
1c32bfbbe0 chore: create index and chart package 2025-04-06 01:53:33 +03:30
f210090dff Merge branch 'feature/new-helm-structure' into HEAD 2025-04-06 01:49:21 +03:30
54008669cb fix(helm): wrong savepoint and checkpoint s3 configs 2025-04-06 01:49:00 +03:30
830e265162 feat: apply new helm structure
use minio s3 for savepoint and checkpoint path
separate task-manager, job-manager and operator
use statefulset for task-manager to handle replication
support basic credential for download jar request
update to flink 1.20.1
2025-04-05 01:39:02 +03:30
556d9ff6af fix: wong update status in some situations 2025-03-05 11:40:22 +03:30
39 changed files with 582 additions and 284 deletions

View File

@@ -14,6 +14,7 @@
"nindent", "nindent",
"reactivex", "reactivex",
"repsert", "repsert",
"taskmanager",
"tolerations" "tolerations"
] ]
} }

View File

@@ -1,8 +1,8 @@
FROM public.ecr.aws/docker/library/golang:1.23.4-bookworm AS build FROM public.ecr.aws/docker/library/golang:1.24.1-bookworm AS build
ARG upx_version=4.2.4 ARG upx_version=4.2.4
RUN apt-get update && apt-get install -y --no-install-recommends xz-utils && \ RUN apt-get update && apt-get install -y --no-install-recommends xz-utils ca-certificates && \
curl -Ls https://github.com/upx/upx/releases/download/v${upx_version}/upx-${upx_version}-amd64_linux.tar.xz -o - | tar xvJf - -C /tmp && \ curl -Ls https://github.com/upx/upx/releases/download/v${upx_version}/upx-${upx_version}-amd64_linux.tar.xz -o - | tar xvJf - -C /tmp && \
cp /tmp/upx-${upx_version}-amd64_linux/upx /usr/local/bin/ && \ cp /tmp/upx-${upx_version}-amd64_linux/upx /usr/local/bin/ && \
chmod +x /usr/local/bin/upx && \ chmod +x /usr/local/bin/upx && \
@@ -27,6 +27,7 @@ FROM public.ecr.aws/docker/library/busybox:1.37.0 AS final
COPY --from=build /flink-kube-operator /flink-kube-operator COPY --from=build /flink-kube-operator /flink-kube-operator
COPY --from=build /etc/ssl/certs /etc/ssl/certs
EXPOSE 8083 EXPOSE 8083

View File

@@ -1,4 +1,4 @@
FROM public.ecr.aws/docker/library/flink:1.20.0-scala_2.12-java17 FROM public.ecr.aws/docker/library/flink:1.20.1-scala_2.12-java17
# Set working directory # Set working directory
WORKDIR /opt/flink WORKDIR /opt/flink
@@ -15,17 +15,19 @@ RUN chmod +x /opt/flink/bin/start-cluster.sh
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.4.0-1.20/flink-connector-kafka-3.4.0-1.20.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.4.0-1.20/flink-connector-kafka-3.4.0-1.20.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.1/flink-avro-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.1/flink-avro-confluent-registry-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/
RUN wget -q https://jdbc.postgresql.org/download/postgresql-42.7.4.jar -P /opt/flink/lib/ RUN wget -q https://jdbc.postgresql.org/download/postgresql-42.7.4.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-jdbc-driver/1.20.0/flink-sql-jdbc-driver-1.20.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-jdbc-driver/1.20.1/flink-sql-jdbc-driver-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-2/zstd-jni-1.5.7-2.jar -P /opt/flink/lib/
# Command to start Flink JobManager and TaskManager in a mini-cluster setup # Command to start Flink JobManager and TaskManager in a mini-cluster setup
CMD ["bin/start-cluster.sh"] CMD ["bin/start-cluster.sh"]

View File

@@ -36,6 +36,10 @@ spec:
type: integer type: integer
jarUri: jarUri:
type: string type: string
jarURIBasicAuthUsername:
type: string
jarURIBasicAuthPassword:
type: string
args: args:
type: array type: array
items: items:

View File

@@ -3,13 +3,14 @@ apiVersion: flink.logicamp.tech/v1alpha1
kind: FlinkJob kind: FlinkJob
metadata: metadata:
name: my-flink-job name: my-flink-job
namespace: default
spec: spec:
key: word-count key: word-count
name: "Word Count Example" name: "Word Count Example"
entryClass: "org.apache.flink.examples.java.wordcount.WordCount" entryClass: "tech.logicamp.logiline.FacilityEnrichment"
parallelism: 2 parallelism: 1
jarUri: "http://192.168.7.7:8080/product-enrichment-processor.jar" jarUri: "https://git.logicamp.tech/api/packages/logiline/generic/facility-enrichment/1.0.0/facility-enrichment.jar"
jarURIBasicAuthUsername: logiline-actrunner
jarURIBasicAuthPassword: daeweeb7ohpaiw3oojiCoong
flinkConfiguration: flinkConfiguration:
taskmanager.numberOfTaskSlots: "2" taskmanager.numberOfTaskSlots: "1"
parallelism.default: "2" parallelism.default: "1"

2
go.sum
View File

@@ -64,6 +64,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/logi-camp/go-flink-client v0.2.0 h1:PIyfJq7FjW28bnvemReCicIuQD7JzVgJDk2xPTZUS2s= github.com/logi-camp/go-flink-client v0.2.0 h1:PIyfJq7FjW28bnvemReCicIuQD7JzVgJDk2xPTZUS2s=
github.com/logi-camp/go-flink-client v0.2.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= github.com/logi-camp/go-flink-client v0.2.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
github.com/logi-camp/go-flink-client v0.2.1 h1:STfKamFm9+2SxxfZO3ysdFsb5MViQdThB4UHbnkUOE8=
github.com/logi-camp/go-flink-client v0.2.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=

6
helm/chart/Chart.lock Normal file
View File

@@ -0,0 +1,6 @@
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
digest: sha256:9a822e9c5a4eee1b6515c143150c1dd6f84ceb080a7be4573e09396c5916f7d3
generated: "2025-04-04T14:42:09.771390014+03:30"

View File

@@ -2,5 +2,5 @@ apiVersion: v2
name: flink-kube-operator name: flink-kube-operator
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
type: application type: application
version: 0.1.14 version: 1.2.3
appVersion: "0.1.0" appVersion: "0.1.1"

View File

@@ -17,6 +17,6 @@
{{- else if contains "ClusterIP" .Values.service.type }} {{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "flink-kube-operator.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}") export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "flink-kube-operator.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}") export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8080 to use your application" echo "Visit http://127.0.0.1:8081 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8081:$CONTAINER_PORT
{{- end }} {{- end }}

View File

@@ -0,0 +1,12 @@
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-checkpoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.checkpoint.size }}
{{- end }}

View File

@@ -0,0 +1,43 @@
{{- define "flink.env" -}}
- name: JOB_MANAGER_RPC_ADDRESS
value: "localhost"
- name: NAMESPACE
value: {{ .Release.Namespace }}
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: {{ .Release.Name }}-flink-job-manager
jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }}
taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }}
taskmanager.data.port: 6125
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
parallelism.default: {{ .Values.flink.parallelism.default }}
state.backend: {{ .Values.flink.state.backend }}
rest.port: 8081
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.state.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.state.checkpoint.mode }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
state.checkpoints.dir: file:///opt/flink/checkpoints/
{{- else if eq .Values.flink.state.checkpoint.storageType "s3" }}
state.checkpoints.dir: s3://flink/checkpoints/
{{- end }}
state.backend.rocksdb.localdir: /opt/flink/rocksdb
high-availability.storageDir: /opt/flink/ha
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
state.savepoints.dir: file:///opt/flink/savepoints/
{{- else if eq .Values.flink.state.savepoint.storageType "s3" }}
state.savepoints.dir: s3://flink/savepoints/
{{- end }}
state.backend.incremental: {{ .Values.flink.state.incremental }}
rest.profiling.enabled: true
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
s3.endpoint: http://{{ .Release.Name }}-minio:9000
s3.path.style.access: true
{{- end }}
{{- toYaml .Values.flink.properties | default "" | nindent 4 }}
{{- end }}

View File

@@ -1,10 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Values.flink.state.data.pvcName }}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.flink.state.data.size }} # Use size defined in values.yaml

View File

@@ -1,165 +0,0 @@
{{- define "flink.env" -}}
- name: JOB_MANAGER_RPC_ADDRESS
value: "localhost"
- name: NAMESPACE
value: {{ .Release.Namespace }}
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: localhost
jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }}
taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }}
taskmanager.data.port: 6125
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
parallelism.default: {{ .Values.flink.parallelism.default }}
state.backend: {{ .Values.flink.state.backend }}
rest.port: 8081
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }}
web.upload.dir: {{ .Values.flink.state.data.dir }}/web-upload
state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints
state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb
high-availability.storageDir: file://{{ .Values.flink.state.ha.dir }}
state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }}
state.backend.incremental: {{ .Values.flink.state.incremental }}
rest.profiling.enabled: true
{{- end }}
{{- define "flink.volumeMounts" -}}
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/data
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/rocksdb
subPath: rocksdb
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/checkpoints
subPath: checkpoints
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/web-upload
subPath: web-upload
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
- name: flink-savepoints
mountPath: {{ .Values.flink.state.savepoints.dir }}
{{- end }}
{{- define "flink.volumes" -}}
- name: flink-data
persistentVolumeClaim:
claimName: {{ .Values.flink.state.data.pvcName }}
- name: flink-savepoints
persistentVolumeClaim:
claimName: {{ .Values.flink.state.savepoints.pvcName }}
- name: flink-ha
persistentVolumeClaim:
claimName: {{ .Values.flink.state.ha.pvcName }}
{{- end }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-flink
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
template:
metadata:
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: volume-mount-hack
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
runAsUser: 0
command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.data.dir }}/data {{ .Values.flink.state.data.dir }}/rocksdb {{ .Values.flink.state.data.dir }}/checkpoints {{ .Values.flink.state.data.dir }}/web-upload {{ .Values.flink.state.ha.dir }} {{ .Values.flink.state.savepoints.dir }}"]
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
containers:
- name: jobmanager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["jobmanager"]
ports:
- containerPort: 6123 # JobManager RPC port
name: rpc
- containerPort: 6124 # JobManager blob server port
name: blob
- containerPort: 6125 # JobManager queryable state port
name: query
- containerPort: 8081 # JobManager Web UI port
name: ui
env:
{{- include "flink.env" . | nindent 12 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
- name: taskmanager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["taskmanager"]
ports:
- containerPort: 6121 # TaskManager data port
name: data
- containerPort: 6122 # TaskManager RPC port
name: rpc
env:
{{- include "flink.env" . | nindent 12 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
- name: operator
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
env:
- name: FLINK_API_URL
value: localhost:8081
- name: SAVEPOINT_PATH
value: file://{{ .Values.flink.state.savepoints.dir }}
- name: NAMESPACE
value: "{{ .Release.Namespace }}"
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
volumes:
{{- include "flink.volumes" . | nindent 8 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@@ -1,7 +1,7 @@
apiVersion: v1 apiVersion: v1
kind: PersistentVolumeClaim kind: PersistentVolumeClaim
metadata: metadata:
name: {{ .Values.flink.state.ha.pvcName }} name: {{ .Release.Name }}-flink-ha-pvc
spec: spec:
accessModes: accessModes:
- ReadWriteOnce - ReadWriteOnce

View File

@@ -0,0 +1,102 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-flink-job-manager
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: volume-mount-hack
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
runAsUser: 0
command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.ha.dir }}"]
volumeMounts:
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
containers:
- name: jobmanager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["jobmanager"]
ports:
- containerPort: 6123 # JobManager RPC port
name: rpc
- containerPort: 6124 # JobManager blob server port
name: blob
- containerPort: 6125 # JobManager queryable state port
name: query
- containerPort: 8081 # JobManager Web UI port
name: ui
env:
{{- include "flink.env" . | nindent 12 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
- name: S3_ENDPOINT
value: "http://minio-service:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_access_key
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_secret_key
{{- end }}
volumeMounts:
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
mountPath: /opt/flink/savepoints
{{- end }}
volumes:
- name: flink-ha
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-ha-pvc
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-savepoint-pvc
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@@ -0,0 +1,28 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Release.Name }}-flink-job-manager
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink-job-manager
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
ports:
- name: flink-web-ui
port: 8081
targetPort: 8081
- name: rpc
port: 6123
targetPort: 6123
- name: blob
port: 6124
targetPort: 6124
- name: query
port: 6125
targetPort: 6125
- name: operator
port: 3000
targetPort: 3000
selector:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
type: ClusterIP # Change to LoadBalancer if you want external access

View File

@@ -0,0 +1,12 @@
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Release.Name }}-flink-savepoint-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: {{ .Values.flink.state.savepoint.size }}
{{- end }}

View File

@@ -1,10 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Values.flink.state.savepoints.pvcName }}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.flink.state.savepoints.size }} # Use size defined in values.yaml

View File

@@ -1,19 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: flink
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
ports:
- port: 8081
name: flink-web-ui
targetPort: 8081
- port: 3000
name: operator
targetPort: 3000
selector:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
type: ClusterIP # Change to LoadBalancer if you want external access

View File

@@ -0,0 +1,80 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .Release.Name }}-flink-task-manager
labels:
app: {{ .Release.Name }}-flink-operator
component: taskmanager
spec:
serviceName: {{ .Release.Name }}-flink-task-manager
replicas: {{ .Values.flink.taskManager.replicas }}
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-task-manager
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-task-manager
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
containers:
- name: task-manager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["taskmanager"]
env:
{{- include "flink.env" . | nindent 8 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
{{- if or (eq .Values.flink.state.checkpoint.storageType "s3") (eq .Values.flink.state.savepoint.storageType "s3") }}
- name: S3_ENDPOINT
value: "http://minio-service:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_access_key
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_secret_key
{{- end }}
volumeMounts:
- name: rocksdb-storage
mountPath: /opt/flink/rocksdb
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
mountPath: /opt/flink/savepoints
{{- end }}
resources:
{{- toYaml .Values.flink.taskManager.resources | nindent 10 }}
volumes:
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- if eq .Values.flink.state.savepoint.storageType "filesystem" }}
- name: flink-savepoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-savepoint-pvc
{{- end }}
volumeClaimTemplates:
- metadata:
name: rocksdb-storage
spec:
accessModes: [ ReadWriteOnce ]
resources:
requests:
storage: {{ .Values.flink.taskManager.storage.rocksDb.size }}

View File

@@ -1,9 +1,10 @@
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: {{ include "flink-kube-operator.fullname" . }} name: {{ .Release.Name }}-flink-operator
labels: labels:
{{- include "flink-kube-operator.labels" . | nindent 4 }} app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec: spec:
type: {{ .Values.service.type }} type: {{ .Values.service.type }}
ports: ports:
@@ -12,4 +13,5 @@ spec:
protocol: TCP protocol: TCP
name: http name: http
selector: selector:
{{- include "flink-kube-operator.selectorLabels" . | nindent 4 }} app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator

View File

@@ -0,0 +1,70 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .Release.Name }}-flink-operator
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec:
serviceName: {{ .Release.Name }}-flink-operator
replicas: 1
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: wait-for-jobmanager
image: curlimages/curl:8.5.0 # Lightweight curl image
command:
- sh
- -c
- |
echo "Waiting for Flink JobManager to be ready..."
until curl -sSf "http://{{ .Release.Name }}-flink-job-manager:8081/taskmanagers"; do
echo "JobManager not ready yet - retrying in 5s..."
sleep 5
done
echo "JobManager is ready!"
containers:
- name: operator
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
env:
- name: FLINK_API_URL
value: {{ .Release.Name }}-flink-job-manager:8081
- name: NAMESPACE
value: "{{ .Release.Namespace }}"
{{- if eq .Values.flink.state.savepoint.storageType "s3" }}
- name: SAVEPOINT_PATH
value: s3://flink/savepoints/
- name: S3_ENDPOINT
value: "http://{{ .Release.Name }}-minio:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-minio
key: root-user
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-minio
key: root-password
{{- else }}
- name: SAVEPOINT_PATH
value: /opt/flink/savepoints/
{{- end }}

View File

@@ -38,8 +38,7 @@ podAnnotations: {}
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ # For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
podLabels: {} podLabels: {}
podSecurityContext: {} podSecurityContext: {} # fsGroup: 2000
# fsGroup: 2000
securityContext: {} securityContext: {}
# capabilities: # capabilities:
@@ -64,10 +63,10 @@ ingress:
# kubernetes.io/ingress.class: nginx # kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true" # kubernetes.io/tls-acme: "true"
hosts: hosts:
- host: chart-example.local - host: chart-example.local
paths: paths:
- path: / - path: /
pathType: ImplementationSpecific pathType: ImplementationSpecific
tls: [] tls: []
# - secretName: chart-example-tls # - secretName: chart-example-tls
# hosts: # hosts:
@@ -106,7 +105,6 @@ autoscaling:
config: config:
flinkApiUrl: flink:8081 flinkApiUrl: flink:8081
nodeSelector: {} nodeSelector: {}
tolerations: [] tolerations: []
@@ -117,37 +115,44 @@ affinity: {}
flink: flink:
image: image:
repository: lcr.logicamp.tech/library/flink repository: lcr.logicamp.tech/library/flink
tag: 1.20.0-scala_2.12-java17-minicluster tag: 1.20.1-scala_2.12-java17-minicluster
parallelism: parallelism:
default: 1 # Default parallelism for Flink jobs default: 1 # Default parallelism for Flink jobs
checkpoint:
interval: 5min
mode: EXACTLY_ONCE
state: state:
backend: rocksdb # Use RocksDB for state backend backend: rocksdb # Use RocksDB for state backend
incremental: true incremental: true
savepoints:
dir: "/opt/flink/savepoints" # Directory to store savepoints
pvcName: flink-savepoints-pvc # PVC for savepoints persistence
size: 10Gi # PVC size for savepoints storage
data:
dir: "/opt/flink/data" # Directory to store checkpoints/web-upload/rocksdb
pvcName: flink-data-pvc # PVC for checkpoints/web-upload/rocksdb
size: 10Gi # PVC size for checkpoints/web-upload/rocksdb
ha: ha:
dir: "/opt/flink/ha" # Directory to store ha data dir: "/opt/flink/ha" # Directory to store ha data
pvcName: flink-ha-pvc # PVC for ha pvcName: flink-ha-pvc # PVC for ha
size: 10Gi # PVC size for ha size: 10Gi # PVC size for ha
checkpoint:
storageType: s3 # s3 / filesystem
interval: 5min
mode: EXACTLY_ONCE
size: 8Gi
savepoint:
storageType: s3
size: 8Gi
jobManager: jobManager:
processMemory: 4096m # Size of job manager process memory processMemory: 4096m # Size of job manager process memory
properties:
jobmanager.rpc.timeout: 300s
taskManager: taskManager:
numberOfTaskSlots: 12 # Number of task slots for TaskManager numberOfTaskSlots: 12 # Number of task slots for task manager
processMemory: 4096m # Size of task manager process memory processMemory: 4096m # Size of task manager process memory
replicas: 1 # Number of task manager replicas
# clusterId: some-id storage:
rocksDb:
size: 4Gi
resources:
limits:
cpu: 3
memory: 4Gi
requests:
cpu: 1
memory: 2Gi

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,6 +1,92 @@
apiVersion: v1 apiVersion: v1
entries: entries:
flink-kube-operator: flink-kube-operator:
- apiVersion: v2
appVersion: 0.1.1
created: "2025-07-18T18:09:46.27166563+03:30"
description: Helm chart for flink kube operator
digest: 597f2c07884bb5411dcc6e1a9cdf7672977858efe30273a46fb6525eb6013091
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.3.tgz
version: 1.2.3
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T14:34:55.317942453+03:30"
description: Helm chart for flink kube operator
digest: 422a34dc173ebe29adccd46d7ef94505cc022ff20ccbfb85ac3e6e201cba476c
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.2.tgz
version: 1.2.2
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T14:01:29.891695937+03:30"
description: Helm chart for flink kube operator
digest: 404ed2c28ff43b630b44c1215be5369417a1b9b2747ae24e2963a6b81813e7dc
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.1.tgz
version: 1.2.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-05-17T12:47:25.848097207+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 3458b9be97d2a4bcf8574706e44ea9f7fdeb11e83058a615566e6e094a51b920
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.2.0.tgz
version: 1.2.0
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-15T12:06:59.425538953+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 2b307a113476eebb34f58308bf1d4d0d36ca5e08fe6541f369a1c231ae0a71be
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.1.2.tgz
version: 1.1.2
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-12T23:13:39.394371646+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 14b08b443b4118cee4c279f62b498bc040b4a3e7ebafa8e195606e3d9b21810a
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.1.tgz
version: 1.0.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-06T01:52:09.478716316+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: e177bc2f11987f4add27c09e521476eabaa456df1b9621321200b58f3a330813
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.0.tgz
version: 1.0.0
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.971040367+03:30" created: "2025-04-04T13:50:27.971040367+03:30"
@@ -53,7 +139,7 @@ entries:
version: 0.1.10 version: 0.1.10
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.975218534+03:30" created: "2025-03-04T18:04:35.495842696+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: abc08853c65ba36ff3485f182555522408e150f2508d4cac672d588972ddca3c digest: abc08853c65ba36ff3485f182555522408e150f2508d4cac672d588972ddca3c
name: flink-kube-operator name: flink-kube-operator
@@ -63,7 +149,7 @@ entries:
version: 0.1.9 version: 0.1.9
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.974750898+03:30" created: "2025-03-04T18:04:35.495392608+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 3986a0a2348db1e17a1524eb0d87eabf6d64050d4007c5b393f723393cc4b675 digest: 3986a0a2348db1e17a1524eb0d87eabf6d64050d4007c5b393f723393cc4b675
name: flink-kube-operator name: flink-kube-operator
@@ -73,7 +159,7 @@ entries:
version: 0.1.8 version: 0.1.8
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.974306458+03:30" created: "2025-03-04T18:04:35.494948853+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 1bbeb92ecd10e36fa7d742a61cced0d842139ada0cfeff6fa1b0cf8718189235 digest: 1bbeb92ecd10e36fa7d742a61cced0d842139ada0cfeff6fa1b0cf8718189235
name: flink-kube-operator name: flink-kube-operator
@@ -83,7 +169,7 @@ entries:
version: 0.1.7 version: 0.1.7
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.973833587+03:30" created: "2025-03-04T18:04:35.49450822+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 4031f4a79e65f6c5e60b6ebf9dd7e2a663b1fb6f893056ad81ca33660f94406e digest: 4031f4a79e65f6c5e60b6ebf9dd7e2a663b1fb6f893056ad81ca33660f94406e
name: flink-kube-operator name: flink-kube-operator
@@ -93,7 +179,7 @@ entries:
version: 0.1.6 version: 0.1.6
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.972800097+03:30" created: "2025-03-04T18:04:35.494040193+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 22ed155c8538ca5e7dc26863304eb9f76b09c454edbf709a891d7ccc440f35f6 digest: 22ed155c8538ca5e7dc26863304eb9f76b09c454edbf709a891d7ccc440f35f6
name: flink-kube-operator name: flink-kube-operator
@@ -103,7 +189,7 @@ entries:
version: 0.1.5 version: 0.1.5
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.972374168+03:30" created: "2025-03-04T18:04:35.493584927+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: b548a64ef89bbcd12d92fefffd1fd37758e8fccda02aecd97c7519a08f10fa4a digest: b548a64ef89bbcd12d92fefffd1fd37758e8fccda02aecd97c7519a08f10fa4a
name: flink-kube-operator name: flink-kube-operator
@@ -113,7 +199,7 @@ entries:
version: 0.1.4 version: 0.1.4
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.971952322+03:30" created: "2025-03-04T18:04:35.493138547+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 05a9664f574e2d5d1cca764efb6481ad21b9176663b907973a8ef5264f15a91f digest: 05a9664f574e2d5d1cca764efb6481ad21b9176663b907973a8ef5264f15a91f
name: flink-kube-operator name: flink-kube-operator
@@ -123,7 +209,7 @@ entries:
version: 0.1.3 version: 0.1.3
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.971461428+03:30" created: "2025-03-04T18:04:35.492696005+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2 digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2
name: flink-kube-operator name: flink-kube-operator
@@ -133,7 +219,7 @@ entries:
version: 0.1.2 version: 0.1.2
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.968770748+03:30" created: "2025-03-04T18:04:35.490170385+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2 digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2
name: flink-kube-operator name: flink-kube-operator
@@ -143,7 +229,7 @@ entries:
version: 0.1.1 version: 0.1.1
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.968266924+03:30" created: "2025-03-04T18:04:35.489734651+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da
name: flink-kube-operator name: flink-kube-operator
@@ -151,4 +237,4 @@ entries:
urls: urls:
- flink-kube-operator-0.1.0.tgz - flink-kube-operator-0.1.0.tgz
version: 0.1.0 version: 0.1.0
generated: "2025-04-04T13:50:27.967565847+03:30" generated: "2025-07-18T18:09:46.244672127+03:30"

View File

@@ -10,13 +10,15 @@ import (
//go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE //go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE
type FlinkJobSpec struct { type FlinkJobSpec struct {
Key string `json:"key"` Key string `json:"key"`
Name string `json:"name"` Name string `json:"name"`
Parallelism int `json:"parallelism"` Parallelism int `json:"parallelism"`
JarURI string `json:"jarUri"` JarURI string `json:"jarUri"`
SavepointInterval metaV1.Duration `json:"savepointInterval"` JarURIBasicAuthUsername *string `json:"jarURIBasicAuthUsername"`
EntryClass string `json:"entryClass"` JarURIBasicAuthPassword *string `json:"jarURIBasicAuthPassword"`
Args []string `json:"args"` SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"`
Args []string `json:"args"`
} }
type FlinkJobStatus struct { type FlinkJobStatus struct {

View File

@@ -2,10 +2,12 @@ package jar
import ( import (
"crypto/rand" "crypto/rand"
"encoding/base64"
"encoding/hex" "encoding/hex"
"errors" "errors"
"io" "io"
"net/http" "net/http"
"net/http/cookiejar"
"os" "os"
"strings" "strings"
@@ -16,13 +18,17 @@ import (
) )
type JarFile struct { type JarFile struct {
uri string uri string
filePath string filePath string
basicAuthUsername *string
basicAuthPassword *string
} }
func NewJarFile(URI string) (*JarFile, error) { func NewJarFile(URI string, basicAuthUsername *string, basicAuthPassword *string) (*JarFile, error) {
jarFile := &JarFile{ jarFile := &JarFile{
uri: URI, uri: URI,
basicAuthUsername: basicAuthUsername,
basicAuthPassword: basicAuthPassword,
} }
err := jarFile.Download() err := jarFile.Download()
if err != nil { if err != nil {
@@ -57,9 +63,45 @@ func (jarFile *JarFile) Download() error {
} }
defer out.Close() defer out.Close()
resp, err := http.Get(jarFile.uri)
if err != nil || resp.StatusCode > 299 { var resp *http.Response
if jarFile.basicAuthPassword != nil && jarFile.basicAuthUsername != nil {
basicAuth := func(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
redirectPolicyFunc := func(req *http.Request, via []*http.Request) error {
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
return nil
}
client := &http.Client{
Jar: &cookiejar.Jar{},
CheckRedirect: redirectPolicyFunc,
}
req, err := http.NewRequest("GET", jarFile.uri, nil)
if err != nil {
jarFile.delete()
return err
}
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
resp, err = client.Do(req)
} else {
resp, err = http.Get(jarFile.uri)
}
if err != nil {
jarFile.delete() jarFile.delete()
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}
if resp.StatusCode > 299 {
respBody := []byte{}
resp.Body.Read(respBody)
err = errors.New(string(respBody) + " status:" + resp.Status)
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err return err
} }

View File

@@ -13,7 +13,7 @@ func (job *ManagedJob) Cycle() {
// pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName())) // pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName()))
// Init job // Init job
if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" { if job.def.Status.LifeCycleStatus == "" && (job.def.Status.JobStatus == "" || job.def.Status.JobStatus == v1alpha1.JobStatusFinished) {
job.Run(false) job.Run(false)
return return
} }

View File

@@ -9,7 +9,7 @@ import (
// upload jar file and set the jarId for later usages // upload jar file and set the jarId for later usages
func (job *ManagedJob) upload() error { func (job *ManagedJob) upload() error {
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI) jarFile, err := jar.NewJarFile(job.def.Spec.JarURI, job.def.Spec.JarURIBasicAuthUsername, job.def.Spec.JarURIBasicAuthPassword)
if err != nil { if err != nil {
pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err)) pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err))
return err return err

View File

@@ -117,14 +117,15 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
"status": patchStatusObj, "status": patchStatusObj,
}) })
} else { } else {
patchStatusObj := map[string]interface{}{ // TODO handle job not found status
"jobStatus": "", // patchStatusObj := map[string]interface{}{
"lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed), // "jobStatus": "",
} // "lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed),
// }
crdInstance.Patch(uid, map[string]interface{}{ // crdInstance.Patch(uid, map[string]interface{}{
"status": patchStatusObj, // "status": patchStatusObj,
}) // })
} }
managedJob.Cycle() managedJob.Cycle()