Compare commits
9 Commits
feature/ku
...
00030195c8
| Author | SHA1 | Date | |
|---|---|---|---|
| 00030195c8 | |||
| 7e33fd6cef | |||
| 5bc047dbd1 | |||
| 07b8a36e63 | |||
| 5e3f093f08 | |||
| 03fe9910a3 | |||
| 438296ec35 | |||
| 6a475c7755 | |||
| a3a806a54f |
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
@@ -10,7 +10,7 @@
|
|||||||
"request": "launch",
|
"request": "launch",
|
||||||
"mode": "auto",
|
"mode": "auto",
|
||||||
"env": {
|
"env": {
|
||||||
"FLINK_API_URL": "127.0.0.1:8081",
|
"FLINK_API_URL": "flink.bz2:8081",
|
||||||
"SAVEPOINT_PATH": "/opt/flink/savepoints"
|
"SAVEPOINT_PATH": "/opt/flink/savepoints"
|
||||||
},
|
},
|
||||||
"cwd": "${workspaceFolder}",
|
"cwd": "${workspaceFolder}",
|
||||||
|
|||||||
@@ -17,6 +17,11 @@ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafk
|
|||||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
|
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
|
||||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/
|
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/
|
||||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/
|
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/
|
||||||
|
RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/
|
||||||
|
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/
|
||||||
|
RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/
|
||||||
|
RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/
|
||||||
|
|
||||||
|
|
||||||
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
|
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
|
||||||
CMD ["bin/start-cluster.sh"]
|
CMD ["bin/start-cluster.sh"]
|
||||||
6
README.md
Normal file
6
README.md
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
Installation:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
helm repo add lc-flink-operator https://git.logicamp.tech/Logicamp/flink-kube-operator/raw/branch/main/helm/
|
||||||
|
helm install flink-kube-operator lc-flink-operator/flink-kube-operator
|
||||||
|
```
|
||||||
@@ -36,6 +36,10 @@ spec:
|
|||||||
type: integer
|
type: integer
|
||||||
jarUri:
|
jarUri:
|
||||||
type: string
|
type: string
|
||||||
|
args:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
type: string
|
||||||
savepointInterval:
|
savepointInterval:
|
||||||
type: string
|
type: string
|
||||||
format: duration
|
format: duration
|
||||||
|
|||||||
@@ -1,24 +0,0 @@
|
|||||||
apiVersion: v2
|
|
||||||
name: flink-kube-operator
|
|
||||||
description: A Helm chart for Kubernetes
|
|
||||||
|
|
||||||
# A chart can be either an 'application' or a 'library' chart.
|
|
||||||
#
|
|
||||||
# Application charts are a collection of templates that can be packaged into versioned archives
|
|
||||||
# to be deployed.
|
|
||||||
#
|
|
||||||
# Library charts provide useful utilities or functions for the chart developer. They're included as
|
|
||||||
# a dependency of application charts to inject those utilities and functions into the rendering
|
|
||||||
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
|
|
||||||
type: application
|
|
||||||
|
|
||||||
# This is the chart version. This version number should be incremented each time you make changes
|
|
||||||
# to the chart and its templates, including the app version.
|
|
||||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
|
||||||
version: 0.1.0
|
|
||||||
|
|
||||||
# This is the version number of the application being deployed. This version number should be
|
|
||||||
# incremented each time you make changes to the application. Versions are not expected to
|
|
||||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
|
||||||
# It is recommended to use it with quotes.
|
|
||||||
appVersion: "1.16.0"
|
|
||||||
6
helm/chart/Chart.yaml
Normal file
6
helm/chart/Chart.yaml
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
apiVersion: v2
|
||||||
|
name: flink-kube-operator
|
||||||
|
description: Helm chart for flink kube operator
|
||||||
|
type: application
|
||||||
|
version: 0.1.1
|
||||||
|
appVersion: "0.1.0"
|
||||||
@@ -1,21 +1,20 @@
|
|||||||
apiVersion: apps/v1
|
apiVersion: apps/v1
|
||||||
kind: Deployment
|
kind: Deployment
|
||||||
metadata:
|
metadata:
|
||||||
name: {{ .Release.Name }}-flink # Adding the flink prefix to the name
|
name: {{ .Release.Name }}-flink
|
||||||
labels:
|
labels:
|
||||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the labels
|
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||||
app.kubernetes.io/instance: {{ .Release.Name }} # Using the release name for instance
|
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||||
app.kubernetes.io/managed-by: Helm
|
|
||||||
spec:
|
spec:
|
||||||
replicas: 1
|
replicas: 1
|
||||||
selector:
|
selector:
|
||||||
matchLabels:
|
matchLabels:
|
||||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the selector
|
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||||
template:
|
template:
|
||||||
metadata:
|
metadata:
|
||||||
labels:
|
labels:
|
||||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the template labels
|
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||||
spec:
|
spec:
|
||||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||||
@@ -54,9 +53,6 @@ spec:
|
|||||||
- name: flink-data
|
- name: flink-data
|
||||||
mountPath: /opt/flink/data
|
mountPath: /opt/flink/data
|
||||||
subPath: data
|
subPath: data
|
||||||
- name: flink-data
|
|
||||||
mountPath: /opt/flink/web-upload
|
|
||||||
subPath: web-upload
|
|
||||||
- name: flink-savepoints
|
- name: flink-savepoints
|
||||||
mountPath: /opt/flink/savepoints
|
mountPath: /opt/flink/savepoints
|
||||||
- name: flink-savepoints
|
- name: flink-savepoints
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: PersistentVolumeClaim
|
kind: PersistentVolumeClaim
|
||||||
metadata:
|
metadata:
|
||||||
name: {{ .Values.flink.state.savepoints.pvcName }} # Adding the flink prefix to PVC name
|
name: {{ .Values.flink.state.savepoints.pvcName }}
|
||||||
spec:
|
spec:
|
||||||
accessModes:
|
accessModes:
|
||||||
- ReadWriteOnce
|
- ReadWriteOnce
|
||||||
@@ -1,15 +1,15 @@
|
|||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: Service
|
kind: Service
|
||||||
metadata:
|
metadata:
|
||||||
name: flink # Adding the flink prefix to the service name
|
name: flink
|
||||||
labels:
|
labels:
|
||||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to labels
|
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||||
spec:
|
spec:
|
||||||
ports:
|
ports:
|
||||||
- port: 8081
|
- port: 8081
|
||||||
targetPort: 8081
|
targetPort: 8081
|
||||||
selector:
|
selector:
|
||||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to selector
|
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||||
type: ClusterIP # Change to LoadBalancer if you want external access
|
type: ClusterIP # Change to LoadBalancer if you want external access
|
||||||
BIN
helm/flink-kube-operator-0.1.0.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.0.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.1.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.1.tgz
Normal file
Binary file not shown.
24
helm/index.yaml
Normal file
24
helm/index.yaml
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
apiVersion: v1
|
||||||
|
entries:
|
||||||
|
flink-kube-operator:
|
||||||
|
- apiVersion: v2
|
||||||
|
appVersion: 0.1.0
|
||||||
|
created: "2024-12-19T00:39:44.4857163+03:30"
|
||||||
|
description: Helm chart for flink kube operator
|
||||||
|
digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2
|
||||||
|
name: flink-kube-operator
|
||||||
|
type: application
|
||||||
|
urls:
|
||||||
|
- flink-kube-operator-0.1.1.tgz
|
||||||
|
version: 0.1.1
|
||||||
|
- apiVersion: v2
|
||||||
|
appVersion: 0.1.0
|
||||||
|
created: "2024-12-19T00:39:44.485286485+03:30"
|
||||||
|
description: Helm chart for flink kube operator
|
||||||
|
digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da
|
||||||
|
name: flink-kube-operator
|
||||||
|
type: application
|
||||||
|
urls:
|
||||||
|
- flink-kube-operator-0.1.0.tgz
|
||||||
|
version: 0.1.0
|
||||||
|
generated: "2024-12-19T00:39:44.48463577+03:30"
|
||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
|
|
||||||
func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
|
func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
|
||||||
job := GetJob(jobUid)
|
job := GetJob(jobUid)
|
||||||
|
pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid))
|
||||||
|
|
||||||
patchBytes, err := json.Marshal(patchData)
|
patchBytes, err := json.Marshal(patchData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ type FlinkJobSpec struct {
|
|||||||
JarURI string `json:"jarUri"`
|
JarURI string `json:"jarUri"`
|
||||||
SavepointInterval metaV1.Duration `json:"savepointInterval"`
|
SavepointInterval metaV1.Duration `json:"savepointInterval"`
|
||||||
EntryClass string `json:"entryClass"`
|
EntryClass string `json:"entryClass"`
|
||||||
|
Args []string `json:"args"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type FlinkJobStatus struct {
|
type FlinkJobStatus struct {
|
||||||
@@ -54,6 +55,7 @@ var (
|
|||||||
ErrNoJarId = errors.New("[managed-job] no jar id")
|
ErrNoJarId = errors.New("[managed-job] no jar id")
|
||||||
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
||||||
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
|
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
|
||||||
|
ErrOnStartingJob = errors.New("[managed-job] error on starting job")
|
||||||
)
|
)
|
||||||
|
|
||||||
type JobStatus string
|
type JobStatus string
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
|
|||||||
}
|
}
|
||||||
defer watcher.Stop()
|
defer watcher.Stop()
|
||||||
for event := range watcher.ResultChan() {
|
for event := range watcher.ResultChan() {
|
||||||
pkg.Logger.Debug("[crd] event received", zap.Any("type", event.Type))
|
|
||||||
unstructuredJob := event.Object.(*unstructured.Unstructured)
|
unstructuredJob := event.Object.(*unstructured.Unstructured)
|
||||||
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
|
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -50,10 +49,10 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
|
|||||||
switch event.Type {
|
switch event.Type {
|
||||||
case watch.Bookmark:
|
case watch.Bookmark:
|
||||||
case watch.Modified:
|
case watch.Modified:
|
||||||
pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
//pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
||||||
crd.repsert(job)
|
crd.repsert(job)
|
||||||
case watch.Added:
|
case watch.Added:
|
||||||
pkg.Logger.Info("[crd] [watch] new flink job created")
|
//pkg.Logger.Info("[crd] [watch] new flink job created")
|
||||||
crd.repsert(job)
|
crd.repsert(job)
|
||||||
case watch.Deleted:
|
case watch.Deleted:
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,6 +41,12 @@ func (job *ManagedJob) Cycle() {
|
|||||||
if job.def.Status.JobStatus == v1alpha1.JobStatusCreating {
|
if job.def.Status.JobStatus == v1alpha1.JobStatusCreating {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if job.def.Status.JobStatus == v1alpha1.JobStatusFailed {
|
||||||
|
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
|
||||||
|
job.crd.SetJobStatus(job.def.UID, job.def.Status)
|
||||||
|
return
|
||||||
|
}
|
||||||
// if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
|
// if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
|
||||||
// //job.restore()
|
// //job.restore()
|
||||||
// return
|
// return
|
||||||
|
|||||||
@@ -42,16 +42,25 @@ func (job *ManagedJob) run(restoreMode bool) error {
|
|||||||
AllowNonRestoredState: true,
|
AllowNonRestoredState: true,
|
||||||
EntryClass: job.def.Spec.EntryClass,
|
EntryClass: job.def.Spec.EntryClass,
|
||||||
SavepointPath: savepointPath,
|
SavepointPath: savepointPath,
|
||||||
|
Parallelism: job.def.Spec.Parallelism,
|
||||||
|
ProgramArg: job.def.Spec.Args,
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))
|
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))
|
||||||
jobId = &runJarResp.JobId
|
jobId = &runJarResp.JobId
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
if strings.ContainsAny(err.Error(), ".jar does not exist") {
|
if strings.Contains(err.Error(), ".jar does not exist") {
|
||||||
|
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||||
shouldUpload = true
|
shouldUpload = true
|
||||||
} else {
|
} else {
|
||||||
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||||
|
stringErr := err.Error()
|
||||||
|
job.def.Status.Error = &stringErr
|
||||||
|
job.def.Status.JobStatus = ""
|
||||||
|
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
|
||||||
|
job.crd.SetJobStatus(job.def.UID, job.def.Status)
|
||||||
|
return v1alpha1.ErrOnStartingJob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -66,6 +75,7 @@ func (job *ManagedJob) run(restoreMode bool) error {
|
|||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
shouldUpload = false
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -58,6 +58,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
|||||||
|
|
||||||
// Loop over job definitions as Kubernetes CRD
|
// Loop over job definitions as Kubernetes CRD
|
||||||
for _, uid := range crd.GetAllJobKeys() {
|
for _, uid := range crd.GetAllJobKeys() {
|
||||||
|
pkg.Logger.Debug("mgr.processingJobsIds", zap.Any("processingJobIds", mgr.processingJobsIds))
|
||||||
if lo.Contains(mgr.processingJobsIds, uid) {
|
if lo.Contains(mgr.processingJobsIds, uid) {
|
||||||
pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid))
|
pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid))
|
||||||
continue
|
continue
|
||||||
|
|||||||
Reference in New Issue
Block a user