11 Commits

32 changed files with 135 additions and 73 deletions

2
.vscode/launch.json vendored
View File

@@ -10,7 +10,7 @@
"request": "launch",
"mode": "auto",
"env": {
"FLINK_API_URL": "127.0.0.1:8081",
"FLINK_API_URL": "flink.bz2:8081",
"SAVEPOINT_PATH": "/opt/flink/savepoints"
},
"cwd": "${workspaceFolder}",

View File

@@ -20,8 +20,8 @@ COPY . .
# Build
ENV GOCACHE=/root/.cache/go-build
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator
RUN upx -q -5 /flink-kube-operator
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator \
&& upx -q -9 /flink-kube-operator
FROM public.ecr.aws/docker/library/busybox:1.37.0 AS final

View File

@@ -17,6 +17,11 @@ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafk
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/
RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
CMD ["bin/start-cluster.sh"]

6
README.md Normal file
View File

@@ -0,0 +1,6 @@
Installation:
```bash
helm repo add lc-flink-operator https://git.logicamp.tech/Logicamp/flink-kube-operator/raw/branch/main/helm/
helm install flink-kube-operator lc-flink-operator/flink-kube-operator
```

View File

@@ -36,6 +36,10 @@ spec:
type: integer
jarUri:
type: string
args:
type: array
items:
type: string
savepointInterval:
type: string
format: duration

View File

@@ -1,24 +0,0 @@
apiVersion: v2
name: flink-kube-operator
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.16.0"

6
helm/chart/Chart.yaml Normal file
View File

@@ -0,0 +1,6 @@
apiVersion: v2
name: flink-kube-operator
description: Helm chart for flink kube operator
type: application
version: 0.1.2
appVersion: "0.1.0"

View File

@@ -0,0 +1,10 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Values.flink.state.data.pvcName }}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.flink.state.data.size }} # Use size defined in values.yaml

View File

@@ -1,21 +1,20 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-flink # Adding the flink prefix to the name
name: {{ .Release.Name }}-flink
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the labels
app.kubernetes.io/instance: {{ .Release.Name }} # Using the release name for instance
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the selector
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
template:
metadata:
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the template labels
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
@@ -40,32 +39,33 @@ spec:
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
parallelism.default: {{ .Values.flink.parallelism.default }}
state.backend: {{ .Values.flink.state.backend }}
state.savepoints.dir: {{ .Values.flink.state.savepoints.dir }}
rest.port: 8081
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
web.upload.dir: /opt/flink/data/web-upload
state.checkpoints.dir: file:///tmp/flink-checkpoints
high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/ha
kubernetes.cluster-id: cluster-one
kubernetes.namespace: {{ .Release.Namespace }}
kubernetes.cluster-id: cluster-one
web.upload.dir: file://{{ .Values.flink.state.data.dir }}/web-upload
state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints
state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb
high-availability.storageDir: file://{{ .Values.flink.state.data.dir }}
state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }}
volumeMounts:
- name: flink-data
mountPath: /opt/flink/data
subPath: data
- name: flink-data
mountPath: /opt/flink/web-upload
subPath: web-upload
mountPath: {{ .Values.flink.state.data.dir }}
- name: flink-ha
mountPath: {{ .Values.flink.state.ha.dir }}
- name: flink-savepoints
mountPath: /opt/flink/savepoints
- name: flink-savepoints
mountPath: /opt/flink/ha
subPath: ha
mountPath: {{ .Values.flink.state.savepoints.dir }}
volumes:
- name: flink-data
emptyDir: {} # Temporary storage for internal data
persistentVolumeClaim:
claimName: {{ .Values.flink.state.data.pvcName }} # PVC for savepoints persistence
- name: flink-savepoints
persistentVolumeClaim:
claimName: {{ .Values.flink.state.savepoints.pvcName }} # PVC for savepoints persistence
- name: flink-ha
persistentVolumeClaim:
claimName: {{ .Values.flink.state.ha.pvcName }} # PVC for savepoints persistence

View File

@@ -0,0 +1,10 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Values.flink.state.savepoints.pvcName }}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.flink.state.savepoints.size }} # Use size defined in values.yaml

View File

@@ -1,15 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: flink # Adding the flink prefix to the service name
name: flink
labels:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to labels
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
ports:
- port: 8081
targetPort: 8081
selector:
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to selector
app.kubernetes.io/name: {{ .Release.Name }}-flink
app.kubernetes.io/instance: {{ .Release.Name }}
type: ClusterIP # Change to LoadBalancer if you want external access

View File

@@ -125,14 +125,17 @@ flink:
state:
backend: rocksdb # Use RocksDB for state backend
savepoints:
dir: "file:///opt/flink/savepoints" # Directory to store savepoints
dir: "/opt/flink/savepoints" # Directory to store savepoints
pvcName: flink-savepoints-pvc # PVC for savepoints persistence
size: 10Gi # PVC size for savepoints storage
data:
dir: "/opt/flink/data" # Directory to store checkpoints/web-upload/rocksdb
pvcName: flink-data-pvc # PVC for checkpoints/web-upload/rocksdb
size: 10Gi # PVC size for checkpoints/web-upload/rocksdb
ha:
dir: "/opt/flink/ha" # Directory to store ha data
pvcName: flink-ha-pvc # PVC for ha
size: 10Gi # PVC size for ha
taskManager:
numberOfTaskSlots: 100 # Number of task slots for TaskManager
persistence:
enabled: true
size: 10Gi # PVC size for savepoints storage
accessModes:
- ReadWriteOnce

Binary file not shown.

Binary file not shown.

Binary file not shown.

34
helm/index.yaml Normal file
View File

@@ -0,0 +1,34 @@
apiVersion: v1
entries:
flink-kube-operator:
- apiVersion: v2
appVersion: 0.1.0
created: "2024-12-20T17:12:30.023189281+03:30"
description: Helm chart for flink kube operator
digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.2.tgz
version: 0.1.2
- apiVersion: v2
appVersion: 0.1.0
created: "2024-12-20T17:12:30.022644192+03:30"
description: Helm chart for flink kube operator
digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.1.tgz
version: 0.1.1
- apiVersion: v2
appVersion: 0.1.0
created: "2024-12-20T17:12:30.022187811+03:30"
description: Helm chart for flink kube operator
digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da
name: flink-kube-operator
type: application
urls:
- flink-kube-operator-0.1.0.tgz
version: 0.1.0
generated: "2024-12-20T17:12:30.021533384+03:30"

View File

@@ -1,10 +0,0 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .Values.flink.state.savepoints.pvcName }} # Adding the flink prefix to PVC name
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.flink.persistence.size }} # Use size defined in values.yaml

View File

@@ -14,6 +14,7 @@ import (
func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
job := GetJob(jobUid)
pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid))
patchBytes, err := json.Marshal(patchData)
if err != nil {

View File

@@ -16,6 +16,7 @@ type FlinkJobSpec struct {
JarURI string `json:"jarUri"`
SavepointInterval metaV1.Duration `json:"savepointInterval"`
EntryClass string `json:"entryClass"`
Args []string `json:"args"`
}
type FlinkJobStatus struct {
@@ -54,6 +55,7 @@ var (
ErrNoJarId = errors.New("[managed-job] no jar id")
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
ErrOnStartingJob = errors.New("[managed-job] error on starting job")
)
type JobStatus string

View File

@@ -26,7 +26,6 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
}
defer watcher.Stop()
for event := range watcher.ResultChan() {
pkg.Logger.Debug("[crd] event received", zap.Any("type", event.Type))
unstructuredJob := event.Object.(*unstructured.Unstructured)
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
if err != nil {
@@ -50,10 +49,10 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
switch event.Type {
case watch.Bookmark:
case watch.Modified:
pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
//pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
crd.repsert(job)
case watch.Added:
pkg.Logger.Info("[crd] [watch] new flink job created")
//pkg.Logger.Info("[crd] [watch] new flink job created")
crd.repsert(job)
case watch.Deleted:
}

View File

@@ -41,6 +41,12 @@ func (job *ManagedJob) Cycle() {
if job.def.Status.JobStatus == v1alpha1.JobStatusCreating {
return
}
if job.def.Status.JobStatus == v1alpha1.JobStatusFailed {
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
job.crd.SetJobStatus(job.def.UID, job.def.Status)
return
}
// if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
// //job.restore()
// return

View File

@@ -42,16 +42,25 @@ func (job *ManagedJob) run(restoreMode bool) error {
AllowNonRestoredState: true,
EntryClass: job.def.Spec.EntryClass,
SavepointPath: savepointPath,
Parallelism: job.def.Spec.Parallelism,
ProgramArg: job.def.Spec.Args,
})
if err == nil {
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))
jobId = &runJarResp.JobId
break
} else {
if strings.ContainsAny(err.Error(), ".jar does not exist") {
if strings.Contains(err.Error(), ".jar does not exist") {
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
shouldUpload = true
} else {
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
stringErr := err.Error()
job.def.Status.Error = &stringErr
job.def.Status.JobStatus = ""
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
job.crd.SetJobStatus(job.def.UID, job.def.Status)
return v1alpha1.ErrOnStartingJob
}
}
}
@@ -66,9 +75,9 @@ func (job *ManagedJob) run(restoreMode bool) error {
})
return nil
}
shouldUpload = false
continue
}
return nil
}
// job.def.Status.JobId = &runJarResp.JobId

View File

@@ -58,6 +58,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
// Loop over job definitions as Kubernetes CRD
for _, uid := range crd.GetAllJobKeys() {
pkg.Logger.Debug("mgr.processingJobsIds", zap.Any("processingJobIds", mgr.processingJobsIds))
if lo.Contains(mgr.processingJobsIds, uid) {
pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid))
continue