13 Commits

Author SHA1 Message Date
89647f3b5b fix(helm): add flink taskmanager host env to task manager 2025-04-15 12:08:17 +03:30
dedbe00fba fix(helm): wrong checkpoint path flink properties 2025-04-13 10:38:15 +03:30
62c340bc64 feat(helm): add filesystem checkpoint storage mode 2025-04-13 10:00:32 +03:30
44ff3627fc feat(helm): add flink properties variable to values 2025-04-12 23:14:52 +03:30
392004d99a ci(docker): add zstd dependency jar to flink docker file 2025-04-12 23:07:53 +03:30
22c7d712f4 feat: update flink http client library 2025-04-07 13:20:39 +03:30
2dd625ec7c feat: update flink http client library 2025-04-07 11:28:33 +03:30
c991215a9d Merge branch 'main' of https://git.logicamp.tech/Logicamp/flink-kube-operator 2025-04-06 08:48:54 +03:30
1c32bfbbe0 chore: create index and chart package 2025-04-06 01:53:33 +03:30
f210090dff Merge branch 'feature/new-helm-structure' into HEAD 2025-04-06 01:49:21 +03:30
54008669cb fix(helm): wrong savepoint and checkpoint s3 configs 2025-04-06 01:49:00 +03:30
830e265162 feat: apply new helm structure
use minio s3 for savepoint and checkpoint path
separate task-manager, job-manager and operator
use statefulset for task-manager to handle replication
support basic credential for download jar request
update to flink 1.20.1
2025-04-05 01:39:02 +03:30
556d9ff6af fix: wong update status in some situations 2025-03-05 11:40:22 +03:30
34 changed files with 481 additions and 266 deletions

View File

@@ -14,6 +14,7 @@
"nindent", "nindent",
"reactivex", "reactivex",
"repsert", "repsert",
"taskmanager",
"tolerations" "tolerations"
] ]
} }

View File

@@ -1,8 +1,8 @@
FROM public.ecr.aws/docker/library/golang:1.23.4-bookworm AS build FROM public.ecr.aws/docker/library/golang:1.24.1-bookworm AS build
ARG upx_version=4.2.4 ARG upx_version=4.2.4
RUN apt-get update && apt-get install -y --no-install-recommends xz-utils && \ RUN apt-get update && apt-get install -y --no-install-recommends xz-utils ca-certificates && \
curl -Ls https://github.com/upx/upx/releases/download/v${upx_version}/upx-${upx_version}-amd64_linux.tar.xz -o - | tar xvJf - -C /tmp && \ curl -Ls https://github.com/upx/upx/releases/download/v${upx_version}/upx-${upx_version}-amd64_linux.tar.xz -o - | tar xvJf - -C /tmp && \
cp /tmp/upx-${upx_version}-amd64_linux/upx /usr/local/bin/ && \ cp /tmp/upx-${upx_version}-amd64_linux/upx /usr/local/bin/ && \
chmod +x /usr/local/bin/upx && \ chmod +x /usr/local/bin/upx && \
@@ -27,6 +27,7 @@ FROM public.ecr.aws/docker/library/busybox:1.37.0 AS final
COPY --from=build /flink-kube-operator /flink-kube-operator COPY --from=build /flink-kube-operator /flink-kube-operator
COPY --from=build /etc/ssl/certs /etc/ssl/certs
EXPOSE 8083 EXPOSE 8083

View File

@@ -1,4 +1,4 @@
FROM public.ecr.aws/docker/library/flink:1.20.0-scala_2.12-java17 FROM public.ecr.aws/docker/library/flink:1.20.1-scala_2.12-java17
# Set working directory # Set working directory
WORKDIR /opt/flink WORKDIR /opt/flink
@@ -15,17 +15,19 @@ RUN chmod +x /opt/flink/bin/start-cluster.sh
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.4.0-1.20/flink-connector-kafka-3.4.0-1.20.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.4.0-1.20/flink-connector-kafka-3.4.0-1.20.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.1/flink-avro-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.1/flink-avro-confluent-registry-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/
RUN wget -q https://jdbc.postgresql.org/download/postgresql-42.7.4.jar -P /opt/flink/lib/ RUN wget -q https://jdbc.postgresql.org/download/postgresql-42.7.4.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-jdbc-driver/1.20.0/flink-sql-jdbc-driver-1.20.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-jdbc-driver/1.20.1/flink-sql-jdbc-driver-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.12/1.10.3/flink-jdbc_2.12-1.10.3.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/com/aventrix/jnanoid/jnanoid/2.0.0/jnanoid-2.0.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.1/flink-s3-fs-presto-1.20.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-2/zstd-jni-1.5.7-2.jar -P /opt/flink/lib/
# Command to start Flink JobManager and TaskManager in a mini-cluster setup # Command to start Flink JobManager and TaskManager in a mini-cluster setup
CMD ["bin/start-cluster.sh"] CMD ["bin/start-cluster.sh"]

View File

@@ -36,6 +36,10 @@ spec:
type: integer type: integer
jarUri: jarUri:
type: string type: string
jarURIBasicAuthUsername:
type: string
jarURIBasicAuthPassword:
type: string
args: args:
type: array type: array
items: items:

View File

@@ -3,13 +3,14 @@ apiVersion: flink.logicamp.tech/v1alpha1
kind: FlinkJob kind: FlinkJob
metadata: metadata:
name: my-flink-job name: my-flink-job
namespace: default
spec: spec:
key: word-count key: word-count
name: "Word Count Example" name: "Word Count Example"
entryClass: "org.apache.flink.examples.java.wordcount.WordCount" entryClass: "tech.logicamp.logiline.FacilityEnrichment"
parallelism: 2 parallelism: 1
jarUri: "http://192.168.7.7:8080/product-enrichment-processor.jar" jarUri: "https://git.logicamp.tech/api/packages/logiline/generic/facility-enrichment/1.0.0/facility-enrichment.jar"
jarURIBasicAuthUsername: logiline-actrunner
jarURIBasicAuthPassword: daeweeb7ohpaiw3oojiCoong
flinkConfiguration: flinkConfiguration:
taskmanager.numberOfTaskSlots: "2" taskmanager.numberOfTaskSlots: "1"
parallelism.default: "2" parallelism.default: "1"

2
go.sum
View File

@@ -64,6 +64,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/logi-camp/go-flink-client v0.2.0 h1:PIyfJq7FjW28bnvemReCicIuQD7JzVgJDk2xPTZUS2s= github.com/logi-camp/go-flink-client v0.2.0 h1:PIyfJq7FjW28bnvemReCicIuQD7JzVgJDk2xPTZUS2s=
github.com/logi-camp/go-flink-client v0.2.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI= github.com/logi-camp/go-flink-client v0.2.0/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
github.com/logi-camp/go-flink-client v0.2.1 h1:STfKamFm9+2SxxfZO3ysdFsb5MViQdThB4UHbnkUOE8=
github.com/logi-camp/go-flink-client v0.2.1/go.mod h1:A79abedX6wGQI0FoICdZI7SRoGHj15QwMwWowgsKYFI=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=

6
helm/chart/Chart.lock Normal file
View File

@@ -0,0 +1,6 @@
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
digest: sha256:9a822e9c5a4eee1b6515c143150c1dd6f84ceb080a7be4573e09396c5916f7d3
generated: "2025-04-04T14:42:09.771390014+03:30"

View File

@@ -2,5 +2,9 @@ apiVersion: v2
name: flink-kube-operator name: flink-kube-operator
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
type: application type: application
version: 0.1.14 version: 1.1.2
appVersion: "0.1.0" appVersion: "0.1.1"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2

Binary file not shown.

View File

@@ -17,6 +17,6 @@
{{- else if contains "ClusterIP" .Values.service.type }} {{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "flink-kube-operator.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}") export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "flink-kube-operator.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}") export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8080 to use your application" echo "Visit http://127.0.0.1:8081 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8081:$CONTAINER_PORT
{{- end }} {{- end }}

View File

@@ -0,0 +1,37 @@
{{- define "flink.env" -}}
- name: JOB_MANAGER_RPC_ADDRESS
value: "localhost"
- name: NAMESPACE
value: {{ .Release.Namespace }}
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: {{ .Release.Name }}-flink-job-manager
jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }}
taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }}
taskmanager.data.port: 6125
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
parallelism.default: {{ .Values.flink.parallelism.default }}
state.backend: {{ .Values.flink.state.backend }}
rest.port: 8081
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.state.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.state.checkpoint.mode }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
state.checkpoints.dir: file:///opt/flink/checkpoints/
{{- else if eq .Values.flink.state.checkpoint.storageType "s3" }}
state.checkpoints.dir: s3://flink/checkpoints/
{{- end }}
state.backend.rocksdb.localdir: /opt/flink/rocksdb
high-availability.storageDir: /opt/flink/ha
state.savepoints.dir: s3://flink/savepoints/
state.backend.incremental: {{ .Values.flink.state.incremental }}
rest.profiling.enabled: true
s3.endpoint: http://{{ .Release.Name }}-minio:9000
s3.path.style.access: true
{{- toYaml .Values.flink.properties | default "" | nindent 4 }}
{{- end }}

View File

@@ -1,10 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Values.flink.state.data.pvcName }}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.flink.state.data.size }} # Use size defined in values.yaml

View File

@@ -1,165 +0,0 @@
{{- define "flink.env" -}}
- name: JOB_MANAGER_RPC_ADDRESS
value: "localhost"
- name: NAMESPACE
value: {{ .Release.Namespace }}
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: localhost
jobmanager.memory.process.size: {{ .Values.flink.jobManager.processMemory }}
taskmanager.memory.process.size: {{ .Values.flink.taskManager.processMemory }}
taskmanager.data.port: 6125
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
parallelism.default: {{ .Values.flink.parallelism.default }}
state.backend: {{ .Values.flink.state.backend }}
rest.port: 8081
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
high-availability.type: kubernetes
kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: {{ .Values.clusterId | default (print .Release.Name "-cluster") }}
execution.checkpointing.interval: {{ .Values.flink.checkpoint.interval }}
execution.checkpointing.mode: {{ .Values.flink.checkpoint.mode }}
web.upload.dir: {{ .Values.flink.state.data.dir }}/web-upload
state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints
state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb
high-availability.storageDir: file://{{ .Values.flink.state.ha.dir }}
state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }}
state.backend.incremental: {{ .Values.flink.state.incremental }}
rest.profiling.enabled: true
{{- end }}
{{- define "flink.volumeMounts" -}}
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/data
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/rocksdb
subPath: rocksdb
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/checkpoints
subPath: checkpoints
- name: flink-data
mountPath: {{ .Values.flink.state.data.dir }}/web-upload
subPath: web-upload
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
- name: flink-savepoints
mountPath: {{ .Values.flink.state.savepoints.dir }}
{{- end }}
{{- define "flink.volumes" -}}
- name: flink-data
persistentVolumeClaim:
claimName: {{ .Values.flink.state.data.pvcName }}
- name: flink-savepoints
persistentVolumeClaim:
claimName: {{ .Values.flink.state.savepoints.pvcName }}
- name: flink-ha
persistentVolumeClaim:
claimName: {{ .Values.flink.state.ha.pvcName }}
{{- end }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-flink
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
template:
metadata:
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: volume-mount-hack
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
runAsUser: 0
command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.data.dir }}/data {{ .Values.flink.state.data.dir }}/rocksdb {{ .Values.flink.state.data.dir }}/checkpoints {{ .Values.flink.state.data.dir }}/web-upload {{ .Values.flink.state.ha.dir }} {{ .Values.flink.state.savepoints.dir }}"]
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
containers:
- name: jobmanager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["jobmanager"]
ports:
- containerPort: 6123 # JobManager RPC port
name: rpc
- containerPort: 6124 # JobManager blob server port
name: blob
- containerPort: 6125 # JobManager queryable state port
name: query
- containerPort: 8081 # JobManager Web UI port
name: ui
env:
{{- include "flink.env" . | nindent 12 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
- name: taskmanager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["taskmanager"]
ports:
- containerPort: 6121 # TaskManager data port
name: data
- containerPort: 6122 # TaskManager RPC port
name: rpc
env:
{{- include "flink.env" . | nindent 12 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
- name: operator
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
env:
- name: FLINK_API_URL
value: localhost:8081
- name: SAVEPOINT_PATH
value: file://{{ .Values.flink.state.savepoints.dir }}
- name: NAMESPACE
value: "{{ .Release.Namespace }}"
resources:
{{- toYaml .Values.resources | nindent 12 }}
volumeMounts:
{{- include "flink.volumeMounts" . | nindent 12 }}
volumes:
{{- include "flink.volumes" . | nindent 8 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@@ -1,7 +1,7 @@
apiVersion: v1 apiVersion: v1
kind: PersistentVolumeClaim kind: PersistentVolumeClaim
metadata: metadata:
name: {{ .Values.flink.state.ha.pvcName }} name: {{ .Release.Name }}-flink-ha-pvc
spec: spec:
accessModes: accessModes:
- ReadWriteOnce - ReadWriteOnce

View File

@@ -0,0 +1,91 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-flink-job-manager
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: volume-mount-hack
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
runAsUser: 0
command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.ha.dir }}"]
volumeMounts:
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
containers:
- name: jobmanager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["jobmanager"]
ports:
- containerPort: 6123 # JobManager RPC port
name: rpc
- containerPort: 6124 # JobManager blob server port
name: blob
- containerPort: 6125 # JobManager queryable state port
name: query
- containerPort: 8081 # JobManager Web UI port
name: ui
env:
{{- include "flink.env" . | nindent 12 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: S3_ENDPOINT
value: "http://minio-service:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_access_key
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_secret_key
volumeMounts:
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
volumes:
- name: flink-ha
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-ha-pvc
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@@ -0,0 +1,28 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Release.Name }}-flink-job-manager
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink-job-manager
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
ports:
- name: flink-web-ui
port: 8081
targetPort: 8081
- name: rpc
port: 6123
targetPort: 6123
- name: blob
port: 6124
targetPort: 6124
- name: query
port: 6125
targetPort: 6125
- name: operator
port: 3000
targetPort: 3000
selector:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-job-manager
type: ClusterIP # Change to LoadBalancer if you want external access

View File

@@ -1,10 +1,10 @@
apiVersion: v1 apiVersion: v1
kind: PersistentVolumeClaim kind: PersistentVolumeClaim
metadata: metadata:
name: {{ .Values.flink.state.savepoints.pvcName }} name: {{ .Release.Name }}-flink-checkpoint-pvc
spec: spec:
accessModes: accessModes:
- ReadWriteOnce - ReadWriteMany
resources: resources:
requests: requests:
storage: {{ .Values.flink.state.savepoints.size }} # Use size defined in values.yaml storage: {{ .Values.flink.state.checkpoint.size }}

View File

@@ -1,19 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: flink
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
ports:
- port: 8081
name: flink-web-ui
targetPort: 8081
- port: 3000
name: operator
targetPort: 3000
selector:
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
type: ClusterIP # Change to LoadBalancer if you want external access

View File

@@ -0,0 +1,72 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .Release.Name }}-flink-task-manager
labels:
app: {{ .Release.Name }}-flink-operator
component: taskmanager
spec:
serviceName: {{ .Release.Name }}-flink-task-manager
replicas: {{ .Values.flink.taskManager.replicas }}
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-task-manager
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-task-manager
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
containers:
- name: task-manager
image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }}
imagePullPolicy: Always
args: ["taskmanager"]
env:
{{- include "flink.env" . | nindent 8 }}
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: S3_ENDPOINT
value: "http://minio-service:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_access_key
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-flink-secrets
key: minio_secret_key
- name: FLINK_TASKMANAGER_HOST
valueFrom:
fieldRef:
fieldPath: spec.hostname
volumeMounts:
- name: rocksdb-storage
mountPath: /opt/flink/rocksdb
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
- name: flink-checkpoint
mountPath: /opt/flink/checkpoints
{{- end }}
resources:
{{- toYaml .Values.flink.taskManager.resources | nindent 10 }}
{{- if eq .Values.flink.state.checkpoint.storageType "filesystem" }}
volumes:
- name: flink-checkpoint
persistentVolumeClaim:
claimName: {{ .Release.Name }}-flink-checkpoint-pvc
{{- end }}
volumeClaimTemplates:
- metadata:
name: rocksdb-storage
spec:
accessModes: [ ReadWriteOnce ]
resources:
requests:
storage: {{ .Values.flink.taskManager.storage.rocksDb.size }}

View File

@@ -1,9 +1,10 @@
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: {{ include "flink-kube-operator.fullname" . }} name: {{ .Release.Name }}-flink-operator
labels: labels:
{{- include "flink-kube-operator.labels" . | nindent 4 }} app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec: spec:
type: {{ .Values.service.type }} type: {{ .Values.service.type }}
ports: ports:
@@ -12,4 +13,5 @@ spec:
protocol: TCP protocol: TCP
name: http name: http
selector: selector:
{{- include "flink-kube-operator.selectorLabels" . | nindent 4 }} app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator

View File

@@ -0,0 +1,66 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {{ .Release.Name }}-flink-operator
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec:
serviceName: {{ .Release.Name }}-flink-operator
replicas: 1
selector:
matchLabels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
template:
metadata:
labels:
app: {{ .Release.Name }}-flink-operator
component: {{ .Release.Name }}-flink-operator
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
initContainers:
- name: wait-for-jobmanager
image: curlimages/curl:8.5.0 # Lightweight curl image
command:
- sh
- -c
- |
echo "Waiting for Flink JobManager to be ready..."
until curl -sSf "http://{{ .Release.Name }}-flink-job-manager:8081/taskmanagers"; do
echo "JobManager not ready yet - retrying in 5s..."
sleep 5
done
echo "JobManager is ready!"
containers:
- name: operator
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
env:
- name: FLINK_API_URL
value: {{ .Release.Name }}-flink-job-manager:8081
- name: SAVEPOINT_PATH
value: s3://flink/savepoints/
- name: NAMESPACE
value: "{{ .Release.Namespace }}"
- name: S3_ENDPOINT
value: "http://{{ .Release.Name }}-minio:9000"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-minio
key: root-user
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}-minio
key: root-password

View File

@@ -117,37 +117,42 @@ affinity: {}
flink: flink:
image: image:
repository: lcr.logicamp.tech/library/flink repository: lcr.logicamp.tech/library/flink
tag: 1.20.0-scala_2.12-java17-minicluster tag: 1.20.1-scala_2.12-java17-minicluster
parallelism: parallelism:
default: 1 # Default parallelism for Flink jobs default: 1 # Default parallelism for Flink jobs
checkpoint:
interval: 5min
mode: EXACTLY_ONCE
state: state:
backend: rocksdb # Use RocksDB for state backend backend: rocksdb # Use RocksDB for state backend
incremental: true incremental: true
savepoints:
dir: "/opt/flink/savepoints" # Directory to store savepoints
pvcName: flink-savepoints-pvc # PVC for savepoints persistence
size: 10Gi # PVC size for savepoints storage
data:
dir: "/opt/flink/data" # Directory to store checkpoints/web-upload/rocksdb
pvcName: flink-data-pvc # PVC for checkpoints/web-upload/rocksdb
size: 10Gi # PVC size for checkpoints/web-upload/rocksdb
ha: ha:
dir: "/opt/flink/ha" # Directory to store ha data dir: "/opt/flink/ha" # Directory to store ha data
pvcName: flink-ha-pvc # PVC for ha pvcName: flink-ha-pvc # PVC for ha
size: 10Gi # PVC size for ha size: 10Gi # PVC size for ha
checkpoint:
storageType: s3 # s3 / filesystem
interval: 5min
mode: EXACTLY_ONCE
size: 8Gi
jobManager: jobManager:
processMemory: 4096m # Size of job manager process memory processMemory: 4096m # Size of job manager process memory
properties:
jobmanager.rpc.timeout: 300s
taskManager: taskManager:
numberOfTaskSlots: 12 # Number of task slots for TaskManager numberOfTaskSlots: 12 # Number of task slots for task manager
processMemory: 4096m # Size of task manager process memory processMemory: 4096m # Size of task manager process memory
replicas: 1 # Number of task manager replicas
# clusterId: some-id storage:
rocksDb:
size: 4Gi
resources:
limits:
cpu: 3
memory: 4Gi
requests:
cpu: 1
memory: 2Gi

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,6 +1,48 @@
apiVersion: v1 apiVersion: v1
entries: entries:
flink-kube-operator: flink-kube-operator:
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-15T12:06:59.425538953+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 2b307a113476eebb34f58308bf1d4d0d36ca5e08fe6541f369a1c231ae0a71be
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.1.2.tgz
version: 1.1.2
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-12T23:13:39.394371646+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: 14b08b443b4118cee4c279f62b498bc040b4a3e7ebafa8e195606e3d9b21810a
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.1.tgz
version: 1.0.1
- apiVersion: v2
appVersion: 0.1.1
created: "2025-04-06T01:52:09.478716316+03:30"
dependencies:
- name: minio
repository: https://charts.bitnami.com/bitnami
version: 16.0.2
description: Helm chart for flink kube operator
digest: e177bc2f11987f4add27c09e521476eabaa456df1b9621321200b58f3a330813
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-1.0.0.tgz
version: 1.0.0
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.971040367+03:30" created: "2025-04-04T13:50:27.971040367+03:30"
@@ -53,7 +95,7 @@ entries:
version: 0.1.10 version: 0.1.10
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.975218534+03:30" created: "2025-03-04T18:04:35.495842696+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: abc08853c65ba36ff3485f182555522408e150f2508d4cac672d588972ddca3c digest: abc08853c65ba36ff3485f182555522408e150f2508d4cac672d588972ddca3c
name: flink-kube-operator name: flink-kube-operator
@@ -63,7 +105,7 @@ entries:
version: 0.1.9 version: 0.1.9
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.974750898+03:30" created: "2025-03-04T18:04:35.495392608+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 3986a0a2348db1e17a1524eb0d87eabf6d64050d4007c5b393f723393cc4b675 digest: 3986a0a2348db1e17a1524eb0d87eabf6d64050d4007c5b393f723393cc4b675
name: flink-kube-operator name: flink-kube-operator
@@ -73,7 +115,7 @@ entries:
version: 0.1.8 version: 0.1.8
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.974306458+03:30" created: "2025-03-04T18:04:35.494948853+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 1bbeb92ecd10e36fa7d742a61cced0d842139ada0cfeff6fa1b0cf8718189235 digest: 1bbeb92ecd10e36fa7d742a61cced0d842139ada0cfeff6fa1b0cf8718189235
name: flink-kube-operator name: flink-kube-operator
@@ -83,7 +125,7 @@ entries:
version: 0.1.7 version: 0.1.7
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.973833587+03:30" created: "2025-03-04T18:04:35.49450822+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 4031f4a79e65f6c5e60b6ebf9dd7e2a663b1fb6f893056ad81ca33660f94406e digest: 4031f4a79e65f6c5e60b6ebf9dd7e2a663b1fb6f893056ad81ca33660f94406e
name: flink-kube-operator name: flink-kube-operator
@@ -93,7 +135,7 @@ entries:
version: 0.1.6 version: 0.1.6
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.972800097+03:30" created: "2025-03-04T18:04:35.494040193+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 22ed155c8538ca5e7dc26863304eb9f76b09c454edbf709a891d7ccc440f35f6 digest: 22ed155c8538ca5e7dc26863304eb9f76b09c454edbf709a891d7ccc440f35f6
name: flink-kube-operator name: flink-kube-operator
@@ -103,7 +145,7 @@ entries:
version: 0.1.5 version: 0.1.5
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.972374168+03:30" created: "2025-03-04T18:04:35.493584927+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: b548a64ef89bbcd12d92fefffd1fd37758e8fccda02aecd97c7519a08f10fa4a digest: b548a64ef89bbcd12d92fefffd1fd37758e8fccda02aecd97c7519a08f10fa4a
name: flink-kube-operator name: flink-kube-operator
@@ -113,7 +155,7 @@ entries:
version: 0.1.4 version: 0.1.4
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.971952322+03:30" created: "2025-03-04T18:04:35.493138547+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 05a9664f574e2d5d1cca764efb6481ad21b9176663b907973a8ef5264f15a91f digest: 05a9664f574e2d5d1cca764efb6481ad21b9176663b907973a8ef5264f15a91f
name: flink-kube-operator name: flink-kube-operator
@@ -123,7 +165,7 @@ entries:
version: 0.1.3 version: 0.1.3
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.971461428+03:30" created: "2025-03-04T18:04:35.492696005+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2 digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2
name: flink-kube-operator name: flink-kube-operator
@@ -133,7 +175,7 @@ entries:
version: 0.1.2 version: 0.1.2
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.968770748+03:30" created: "2025-03-04T18:04:35.490170385+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2 digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2
name: flink-kube-operator name: flink-kube-operator
@@ -143,7 +185,7 @@ entries:
version: 0.1.1 version: 0.1.1
- apiVersion: v2 - apiVersion: v2
appVersion: 0.1.0 appVersion: 0.1.0
created: "2025-04-04T13:50:27.968266924+03:30" created: "2025-03-04T18:04:35.489734651+03:30"
description: Helm chart for flink kube operator description: Helm chart for flink kube operator
digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da
name: flink-kube-operator name: flink-kube-operator
@@ -151,4 +193,4 @@ entries:
urls: urls:
- flink-kube-operator-0.1.0.tgz - flink-kube-operator-0.1.0.tgz
version: 0.1.0 version: 0.1.0
generated: "2025-04-04T13:50:27.967565847+03:30" generated: "2025-04-15T12:06:59.397928815+03:30"

View File

@@ -10,13 +10,15 @@ import (
//go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE //go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE
type FlinkJobSpec struct { type FlinkJobSpec struct {
Key string `json:"key"` Key string `json:"key"`
Name string `json:"name"` Name string `json:"name"`
Parallelism int `json:"parallelism"` Parallelism int `json:"parallelism"`
JarURI string `json:"jarUri"` JarURI string `json:"jarUri"`
SavepointInterval metaV1.Duration `json:"savepointInterval"` JarURIBasicAuthUsername *string `json:"jarURIBasicAuthUsername"`
EntryClass string `json:"entryClass"` JarURIBasicAuthPassword *string `json:"jarURIBasicAuthPassword"`
Args []string `json:"args"` SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"`
Args []string `json:"args"`
} }
type FlinkJobStatus struct { type FlinkJobStatus struct {

View File

@@ -2,10 +2,12 @@ package jar
import ( import (
"crypto/rand" "crypto/rand"
"encoding/base64"
"encoding/hex" "encoding/hex"
"errors" "errors"
"io" "io"
"net/http" "net/http"
"net/http/cookiejar"
"os" "os"
"strings" "strings"
@@ -16,13 +18,17 @@ import (
) )
type JarFile struct { type JarFile struct {
uri string uri string
filePath string filePath string
basicAuthUsername *string
basicAuthPassword *string
} }
func NewJarFile(URI string) (*JarFile, error) { func NewJarFile(URI string, basicAuthUsername *string, basicAuthPassword *string) (*JarFile, error) {
jarFile := &JarFile{ jarFile := &JarFile{
uri: URI, uri: URI,
basicAuthUsername: basicAuthUsername,
basicAuthPassword: basicAuthPassword,
} }
err := jarFile.Download() err := jarFile.Download()
if err != nil { if err != nil {
@@ -57,9 +63,45 @@ func (jarFile *JarFile) Download() error {
} }
defer out.Close() defer out.Close()
resp, err := http.Get(jarFile.uri)
if err != nil || resp.StatusCode > 299 { var resp *http.Response
if jarFile.basicAuthPassword != nil && jarFile.basicAuthUsername != nil {
basicAuth := func(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
redirectPolicyFunc := func(req *http.Request, via []*http.Request) error {
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
return nil
}
client := &http.Client{
Jar: &cookiejar.Jar{},
CheckRedirect: redirectPolicyFunc,
}
req, err := http.NewRequest("GET", jarFile.uri, nil)
if err != nil {
jarFile.delete()
return err
}
req.Header.Add("Authorization", "Basic "+basicAuth(*jarFile.basicAuthUsername, *jarFile.basicAuthPassword))
resp, err = client.Do(req)
} else {
resp, err = http.Get(jarFile.uri)
}
if err != nil {
jarFile.delete() jarFile.delete()
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err
}
if resp.StatusCode > 299 {
respBody := []byte{}
resp.Body.Read(respBody)
err = errors.New(string(respBody) + " status:" + resp.Status)
pkg.Logger.Error("error in downloading jar", zap.Error(err))
return err return err
} }

View File

@@ -13,7 +13,7 @@ func (job *ManagedJob) Cycle() {
// pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName())) // pkg.Logger.Debug("[managed-job] [new] check cycle", zap.String("jobName", job.def.GetName()))
// Init job // Init job
if job.def.Status.LifeCycleStatus == "" && job.def.Status.JobStatus == "" { if job.def.Status.LifeCycleStatus == "" && (job.def.Status.JobStatus == "" || job.def.Status.JobStatus == v1alpha1.JobStatusFinished) {
job.Run(false) job.Run(false)
return return
} }

View File

@@ -9,7 +9,7 @@ import (
// upload jar file and set the jarId for later usages // upload jar file and set the jarId for later usages
func (job *ManagedJob) upload() error { func (job *ManagedJob) upload() error {
jarFile, err := jar.NewJarFile(job.def.Spec.JarURI) jarFile, err := jar.NewJarFile(job.def.Spec.JarURI, job.def.Spec.JarURIBasicAuthUsername, job.def.Spec.JarURIBasicAuthPassword)
if err != nil { if err != nil {
pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err)) pkg.Logger.Debug("[manage-job] [upload] error on download jar", zap.Error(err))
return err return err

View File

@@ -117,14 +117,15 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
"status": patchStatusObj, "status": patchStatusObj,
}) })
} else { } else {
patchStatusObj := map[string]interface{}{ // TODO handle job not found status
"jobStatus": "", // patchStatusObj := map[string]interface{}{
"lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed), // "jobStatus": "",
} // "lifeCycleStatus": string(v1alpha1.LifeCycleStatusFailed),
// }
crdInstance.Patch(uid, map[string]interface{}{ // crdInstance.Patch(uid, map[string]interface{}{
"status": patchStatusObj, // "status": patchStatusObj,
}) // })
} }
managedJob.Cycle() managedJob.Cycle()