Compare commits
11 Commits
feature/ku
...
37936c8c58
| Author | SHA1 | Date | |
|---|---|---|---|
| 37936c8c58 | |||
| 4ed533f284 | |||
| 00030195c8 | |||
| 7e33fd6cef | |||
| 5bc047dbd1 | |||
| 07b8a36e63 | |||
| 5e3f093f08 | |||
| 03fe9910a3 | |||
| 438296ec35 | |||
| 6a475c7755 | |||
| a3a806a54f |
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
@@ -10,7 +10,7 @@
|
||||
"request": "launch",
|
||||
"mode": "auto",
|
||||
"env": {
|
||||
"FLINK_API_URL": "127.0.0.1:8081",
|
||||
"FLINK_API_URL": "flink.bz2:8081",
|
||||
"SAVEPOINT_PATH": "/opt/flink/savepoints"
|
||||
},
|
||||
"cwd": "${workspaceFolder}",
|
||||
|
||||
@@ -20,8 +20,8 @@ COPY . .
|
||||
|
||||
# Build
|
||||
ENV GOCACHE=/root/.cache/go-build
|
||||
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator
|
||||
RUN upx -q -5 /flink-kube-operator
|
||||
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator \
|
||||
&& upx -q -9 /flink-kube-operator
|
||||
|
||||
FROM public.ecr.aws/docker/library/busybox:1.37.0 AS final
|
||||
|
||||
|
||||
@@ -17,6 +17,11 @@ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafk
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/
|
||||
RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/
|
||||
|
||||
|
||||
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
|
||||
CMD ["bin/start-cluster.sh"]
|
||||
6
README.md
Normal file
6
README.md
Normal file
@@ -0,0 +1,6 @@
|
||||
Installation:
|
||||
|
||||
```bash
|
||||
helm repo add lc-flink-operator https://git.logicamp.tech/Logicamp/flink-kube-operator/raw/branch/main/helm/
|
||||
helm install flink-kube-operator lc-flink-operator/flink-kube-operator
|
||||
```
|
||||
@@ -36,6 +36,10 @@ spec:
|
||||
type: integer
|
||||
jarUri:
|
||||
type: string
|
||||
args:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
savepointInterval:
|
||||
type: string
|
||||
format: duration
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
apiVersion: v2
|
||||
name: flink-kube-operator
|
||||
description: A Helm chart for Kubernetes
|
||||
|
||||
# A chart can be either an 'application' or a 'library' chart.
|
||||
#
|
||||
# Application charts are a collection of templates that can be packaged into versioned archives
|
||||
# to be deployed.
|
||||
#
|
||||
# Library charts provide useful utilities or functions for the chart developer. They're included as
|
||||
# a dependency of application charts to inject those utilities and functions into the rendering
|
||||
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
|
||||
type: application
|
||||
|
||||
# This is the chart version. This version number should be incremented each time you make changes
|
||||
# to the chart and its templates, including the app version.
|
||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
||||
version: 0.1.0
|
||||
|
||||
# This is the version number of the application being deployed. This version number should be
|
||||
# incremented each time you make changes to the application. Versions are not expected to
|
||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
||||
# It is recommended to use it with quotes.
|
||||
appVersion: "1.16.0"
|
||||
6
helm/chart/Chart.yaml
Normal file
6
helm/chart/Chart.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
apiVersion: v2
|
||||
name: flink-kube-operator
|
||||
description: Helm chart for flink kube operator
|
||||
type: application
|
||||
version: 0.1.2
|
||||
appVersion: "0.1.0"
|
||||
10
helm/chart/templates/flink/data.pvc.yaml
Normal file
10
helm/chart/templates/flink/data.pvc.yaml
Normal file
@@ -0,0 +1,10 @@
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: {{ .Values.flink.state.data.pvcName }}
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.state.data.size }} # Use size defined in values.yaml
|
||||
@@ -1,21 +1,20 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-flink # Adding the flink prefix to the name
|
||||
name: {{ .Release.Name }}-flink
|
||||
labels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the labels
|
||||
app.kubernetes.io/instance: {{ .Release.Name }} # Using the release name for instance
|
||||
app.kubernetes.io/managed-by: Helm
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the selector
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the template labels
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
spec:
|
||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||
@@ -40,32 +39,33 @@ spec:
|
||||
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
|
||||
parallelism.default: {{ .Values.flink.parallelism.default }}
|
||||
state.backend: {{ .Values.flink.state.backend }}
|
||||
state.savepoints.dir: {{ .Values.flink.state.savepoints.dir }}
|
||||
rest.port: 8081
|
||||
rootLogger.level = DEBUG
|
||||
rootLogger.appenderRef.console.ref = ConsoleAppender
|
||||
web.upload.dir: /opt/flink/data/web-upload
|
||||
state.checkpoints.dir: file:///tmp/flink-checkpoints
|
||||
high-availability.type: kubernetes
|
||||
high-availability.storageDir: file:///opt/flink/ha
|
||||
kubernetes.cluster-id: cluster-one
|
||||
kubernetes.namespace: {{ .Release.Namespace }}
|
||||
kubernetes.cluster-id: cluster-one
|
||||
web.upload.dir: file://{{ .Values.flink.state.data.dir }}/web-upload
|
||||
state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints
|
||||
state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb
|
||||
high-availability.storageDir: file://{{ .Values.flink.state.data.dir }}
|
||||
state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }}
|
||||
volumeMounts:
|
||||
- name: flink-data
|
||||
mountPath: /opt/flink/data
|
||||
subPath: data
|
||||
- name: flink-data
|
||||
mountPath: /opt/flink/web-upload
|
||||
subPath: web-upload
|
||||
mountPath: {{ .Values.flink.state.data.dir }}
|
||||
- name: flink-ha
|
||||
mountPath: {{ .Values.flink.state.ha.dir }}
|
||||
- name: flink-savepoints
|
||||
mountPath: /opt/flink/savepoints
|
||||
- name: flink-savepoints
|
||||
mountPath: /opt/flink/ha
|
||||
subPath: ha
|
||||
mountPath: {{ .Values.flink.state.savepoints.dir }}
|
||||
|
||||
|
||||
volumes:
|
||||
- name: flink-data
|
||||
emptyDir: {} # Temporary storage for internal data
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Values.flink.state.data.pvcName }} # PVC for savepoints persistence
|
||||
- name: flink-savepoints
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Values.flink.state.savepoints.pvcName }} # PVC for savepoints persistence
|
||||
- name: flink-ha
|
||||
persistentVolumeClaim:
|
||||
claimName: {{ .Values.flink.state.ha.pvcName }} # PVC for savepoints persistence
|
||||
10
helm/chart/templates/flink/savepoint.pvc.yaml
Normal file
10
helm/chart/templates/flink/savepoint.pvc.yaml
Normal file
@@ -0,0 +1,10 @@
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: {{ .Values.flink.state.savepoints.pvcName }}
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.state.savepoints.size }} # Use size defined in values.yaml
|
||||
@@ -1,15 +1,15 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: flink # Adding the flink prefix to the service name
|
||||
name: flink
|
||||
labels:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to labels
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
spec:
|
||||
ports:
|
||||
- port: 8081
|
||||
targetPort: 8081
|
||||
selector:
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to selector
|
||||
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
type: ClusterIP # Change to LoadBalancer if you want external access
|
||||
@@ -125,14 +125,17 @@ flink:
|
||||
state:
|
||||
backend: rocksdb # Use RocksDB for state backend
|
||||
savepoints:
|
||||
dir: "file:///opt/flink/savepoints" # Directory to store savepoints
|
||||
dir: "/opt/flink/savepoints" # Directory to store savepoints
|
||||
pvcName: flink-savepoints-pvc # PVC for savepoints persistence
|
||||
size: 10Gi # PVC size for savepoints storage
|
||||
data:
|
||||
dir: "/opt/flink/data" # Directory to store checkpoints/web-upload/rocksdb
|
||||
pvcName: flink-data-pvc # PVC for checkpoints/web-upload/rocksdb
|
||||
size: 10Gi # PVC size for checkpoints/web-upload/rocksdb
|
||||
ha:
|
||||
dir: "/opt/flink/ha" # Directory to store ha data
|
||||
pvcName: flink-ha-pvc # PVC for ha
|
||||
size: 10Gi # PVC size for ha
|
||||
|
||||
taskManager:
|
||||
numberOfTaskSlots: 100 # Number of task slots for TaskManager
|
||||
|
||||
persistence:
|
||||
enabled: true
|
||||
size: 10Gi # PVC size for savepoints storage
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
BIN
helm/flink-kube-operator-0.1.0.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.0.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.1.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.1.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.2.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.2.tgz
Normal file
Binary file not shown.
34
helm/index.yaml
Normal file
34
helm/index.yaml
Normal file
@@ -0,0 +1,34 @@
|
||||
apiVersion: v1
|
||||
entries:
|
||||
flink-kube-operator:
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2024-12-20T17:12:30.023189281+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.2.tgz
|
||||
version: 0.1.2
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2024-12-20T17:12:30.022644192+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.1.tgz
|
||||
version: 0.1.1
|
||||
- apiVersion: v2
|
||||
appVersion: 0.1.0
|
||||
created: "2024-12-20T17:12:30.022187811+03:30"
|
||||
description: Helm chart for flink kube operator
|
||||
digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da
|
||||
name: flink-kube-operator
|
||||
type: application
|
||||
urls:
|
||||
- flink-kube-operator-0.1.0.tgz
|
||||
version: 0.1.0
|
||||
generated: "2024-12-20T17:12:30.021533384+03:30"
|
||||
@@ -1,10 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: {{ .Values.flink.state.savepoints.pvcName }} # Adding the flink prefix to PVC name
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
resources:
|
||||
requests:
|
||||
storage: {{ .Values.flink.persistence.size }} # Use size defined in values.yaml
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
|
||||
job := GetJob(jobUid)
|
||||
pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid))
|
||||
|
||||
patchBytes, err := json.Marshal(patchData)
|
||||
if err != nil {
|
||||
|
||||
@@ -16,6 +16,7 @@ type FlinkJobSpec struct {
|
||||
JarURI string `json:"jarUri"`
|
||||
SavepointInterval metaV1.Duration `json:"savepointInterval"`
|
||||
EntryClass string `json:"entryClass"`
|
||||
Args []string `json:"args"`
|
||||
}
|
||||
|
||||
type FlinkJobStatus struct {
|
||||
@@ -54,6 +55,7 @@ var (
|
||||
ErrNoJarId = errors.New("[managed-job] no jar id")
|
||||
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
||||
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
|
||||
ErrOnStartingJob = errors.New("[managed-job] error on starting job")
|
||||
)
|
||||
|
||||
type JobStatus string
|
||||
|
||||
@@ -26,7 +26,6 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
|
||||
}
|
||||
defer watcher.Stop()
|
||||
for event := range watcher.ResultChan() {
|
||||
pkg.Logger.Debug("[crd] event received", zap.Any("type", event.Type))
|
||||
unstructuredJob := event.Object.(*unstructured.Unstructured)
|
||||
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
|
||||
if err != nil {
|
||||
@@ -50,10 +49,10 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
|
||||
switch event.Type {
|
||||
case watch.Bookmark:
|
||||
case watch.Modified:
|
||||
pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
||||
//pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
||||
crd.repsert(job)
|
||||
case watch.Added:
|
||||
pkg.Logger.Info("[crd] [watch] new flink job created")
|
||||
//pkg.Logger.Info("[crd] [watch] new flink job created")
|
||||
crd.repsert(job)
|
||||
case watch.Deleted:
|
||||
}
|
||||
|
||||
@@ -41,6 +41,12 @@ func (job *ManagedJob) Cycle() {
|
||||
if job.def.Status.JobStatus == v1alpha1.JobStatusCreating {
|
||||
return
|
||||
}
|
||||
|
||||
if job.def.Status.JobStatus == v1alpha1.JobStatusFailed {
|
||||
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
|
||||
job.crd.SetJobStatus(job.def.UID, job.def.Status)
|
||||
return
|
||||
}
|
||||
// if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
|
||||
// //job.restore()
|
||||
// return
|
||||
|
||||
@@ -42,16 +42,25 @@ func (job *ManagedJob) run(restoreMode bool) error {
|
||||
AllowNonRestoredState: true,
|
||||
EntryClass: job.def.Spec.EntryClass,
|
||||
SavepointPath: savepointPath,
|
||||
Parallelism: job.def.Spec.Parallelism,
|
||||
ProgramArg: job.def.Spec.Args,
|
||||
})
|
||||
if err == nil {
|
||||
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))
|
||||
jobId = &runJarResp.JobId
|
||||
break
|
||||
} else {
|
||||
if strings.ContainsAny(err.Error(), ".jar does not exist") {
|
||||
if strings.Contains(err.Error(), ".jar does not exist") {
|
||||
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||
shouldUpload = true
|
||||
} else {
|
||||
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||
stringErr := err.Error()
|
||||
job.def.Status.Error = &stringErr
|
||||
job.def.Status.JobStatus = ""
|
||||
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
|
||||
job.crd.SetJobStatus(job.def.UID, job.def.Status)
|
||||
return v1alpha1.ErrOnStartingJob
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -66,9 +75,9 @@ func (job *ManagedJob) run(restoreMode bool) error {
|
||||
})
|
||||
return nil
|
||||
}
|
||||
shouldUpload = false
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// job.def.Status.JobId = &runJarResp.JobId
|
||||
|
||||
@@ -58,6 +58,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
||||
|
||||
// Loop over job definitions as Kubernetes CRD
|
||||
for _, uid := range crd.GetAllJobKeys() {
|
||||
pkg.Logger.Debug("mgr.processingJobsIds", zap.Any("processingJobIds", mgr.processingJobsIds))
|
||||
if lo.Contains(mgr.processingJobsIds, uid) {
|
||||
pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid))
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user