Compare commits
11 Commits
feature/ku
...
37936c8c58
| Author | SHA1 | Date | |
|---|---|---|---|
| 37936c8c58 | |||
| 4ed533f284 | |||
| 00030195c8 | |||
| 7e33fd6cef | |||
| 5bc047dbd1 | |||
| 07b8a36e63 | |||
| 5e3f093f08 | |||
| 03fe9910a3 | |||
| 438296ec35 | |||
| 6a475c7755 | |||
| a3a806a54f |
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
@@ -10,7 +10,7 @@
|
|||||||
"request": "launch",
|
"request": "launch",
|
||||||
"mode": "auto",
|
"mode": "auto",
|
||||||
"env": {
|
"env": {
|
||||||
"FLINK_API_URL": "127.0.0.1:8081",
|
"FLINK_API_URL": "flink.bz2:8081",
|
||||||
"SAVEPOINT_PATH": "/opt/flink/savepoints"
|
"SAVEPOINT_PATH": "/opt/flink/savepoints"
|
||||||
},
|
},
|
||||||
"cwd": "${workspaceFolder}",
|
"cwd": "${workspaceFolder}",
|
||||||
|
|||||||
@@ -20,8 +20,8 @@ COPY . .
|
|||||||
|
|
||||||
# Build
|
# Build
|
||||||
ENV GOCACHE=/root/.cache/go-build
|
ENV GOCACHE=/root/.cache/go-build
|
||||||
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator
|
RUN --mount=type=cache,target="/go" --mount=type=cache,target="/root/.cache/go-build" CGO_ENABLED=1 GOOS=linux go build -ldflags '-s -w' -o /flink-kube-operator ./cmd/operator \
|
||||||
RUN upx -q -5 /flink-kube-operator
|
&& upx -q -9 /flink-kube-operator
|
||||||
|
|
||||||
FROM public.ecr.aws/docker/library/busybox:1.37.0 AS final
|
FROM public.ecr.aws/docker/library/busybox:1.37.0 AS final
|
||||||
|
|
||||||
|
|||||||
@@ -17,6 +17,11 @@ RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafk
|
|||||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
|
RUN wget -q https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.9.0/kafka-clients-3.9.0.jar -P /opt/flink/lib/
|
||||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/
|
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.20.0/flink-avro-1.20.0.jar -P /opt/flink/lib/
|
||||||
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/
|
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.20.0/flink-avro-confluent-registry-1.20.0.jar -P /opt/flink/lib/
|
||||||
|
RUN wget -q https://repo1.maven.org/maven2/name/nkonev/flink/flink-sql-connector-clickhouse/1.17.1-8/flink-sql-connector-clickhouse-1.17.1-8.jar -P /opt/flink/lib/
|
||||||
|
RUN wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.2.1/flink-sql-connector-postgres-cdc-3.2.1.jar -P /opt/flink/lib/
|
||||||
|
RUN wget -q https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P /opt/flink/lib/
|
||||||
|
RUN wget -q https://repo1.maven.org/maven2/net/objecthunter/exp4j/0.4.5/exp4j-0.4.5.jar -P /opt/flink/lib/
|
||||||
|
|
||||||
|
|
||||||
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
|
# Command to start Flink JobManager and TaskManager in a mini-cluster setup
|
||||||
CMD ["bin/start-cluster.sh"]
|
CMD ["bin/start-cluster.sh"]
|
||||||
6
README.md
Normal file
6
README.md
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
Installation:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
helm repo add lc-flink-operator https://git.logicamp.tech/Logicamp/flink-kube-operator/raw/branch/main/helm/
|
||||||
|
helm install flink-kube-operator lc-flink-operator/flink-kube-operator
|
||||||
|
```
|
||||||
@@ -36,6 +36,10 @@ spec:
|
|||||||
type: integer
|
type: integer
|
||||||
jarUri:
|
jarUri:
|
||||||
type: string
|
type: string
|
||||||
|
args:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
type: string
|
||||||
savepointInterval:
|
savepointInterval:
|
||||||
type: string
|
type: string
|
||||||
format: duration
|
format: duration
|
||||||
|
|||||||
@@ -1,24 +0,0 @@
|
|||||||
apiVersion: v2
|
|
||||||
name: flink-kube-operator
|
|
||||||
description: A Helm chart for Kubernetes
|
|
||||||
|
|
||||||
# A chart can be either an 'application' or a 'library' chart.
|
|
||||||
#
|
|
||||||
# Application charts are a collection of templates that can be packaged into versioned archives
|
|
||||||
# to be deployed.
|
|
||||||
#
|
|
||||||
# Library charts provide useful utilities or functions for the chart developer. They're included as
|
|
||||||
# a dependency of application charts to inject those utilities and functions into the rendering
|
|
||||||
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
|
|
||||||
type: application
|
|
||||||
|
|
||||||
# This is the chart version. This version number should be incremented each time you make changes
|
|
||||||
# to the chart and its templates, including the app version.
|
|
||||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
|
||||||
version: 0.1.0
|
|
||||||
|
|
||||||
# This is the version number of the application being deployed. This version number should be
|
|
||||||
# incremented each time you make changes to the application. Versions are not expected to
|
|
||||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
|
||||||
# It is recommended to use it with quotes.
|
|
||||||
appVersion: "1.16.0"
|
|
||||||
6
helm/chart/Chart.yaml
Normal file
6
helm/chart/Chart.yaml
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
apiVersion: v2
|
||||||
|
name: flink-kube-operator
|
||||||
|
description: Helm chart for flink kube operator
|
||||||
|
type: application
|
||||||
|
version: 0.1.2
|
||||||
|
appVersion: "0.1.0"
|
||||||
10
helm/chart/templates/flink/data.pvc.yaml
Normal file
10
helm/chart/templates/flink/data.pvc.yaml
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
apiVersion: v1
|
||||||
|
kind: PersistentVolumeClaim
|
||||||
|
metadata:
|
||||||
|
name: {{ .Values.flink.state.data.pvcName }}
|
||||||
|
spec:
|
||||||
|
accessModes:
|
||||||
|
- ReadWriteOnce
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
storage: {{ .Values.flink.state.data.size }} # Use size defined in values.yaml
|
||||||
@@ -1,21 +1,20 @@
|
|||||||
apiVersion: apps/v1
|
apiVersion: apps/v1
|
||||||
kind: Deployment
|
kind: Deployment
|
||||||
metadata:
|
metadata:
|
||||||
name: {{ .Release.Name }}-flink # Adding the flink prefix to the name
|
name: {{ .Release.Name }}-flink
|
||||||
labels:
|
labels:
|
||||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the labels
|
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||||
app.kubernetes.io/instance: {{ .Release.Name }} # Using the release name for instance
|
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||||
app.kubernetes.io/managed-by: Helm
|
|
||||||
spec:
|
spec:
|
||||||
replicas: 1
|
replicas: 1
|
||||||
selector:
|
selector:
|
||||||
matchLabels:
|
matchLabels:
|
||||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the selector
|
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||||
template:
|
template:
|
||||||
metadata:
|
metadata:
|
||||||
labels:
|
labels:
|
||||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to the template labels
|
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||||
spec:
|
spec:
|
||||||
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }}
|
||||||
@@ -40,32 +39,33 @@ spec:
|
|||||||
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
|
taskmanager.numberOfTaskSlots: {{ .Values.flink.taskManager.numberOfTaskSlots }}
|
||||||
parallelism.default: {{ .Values.flink.parallelism.default }}
|
parallelism.default: {{ .Values.flink.parallelism.default }}
|
||||||
state.backend: {{ .Values.flink.state.backend }}
|
state.backend: {{ .Values.flink.state.backend }}
|
||||||
state.savepoints.dir: {{ .Values.flink.state.savepoints.dir }}
|
|
||||||
rest.port: 8081
|
rest.port: 8081
|
||||||
rootLogger.level = DEBUG
|
rootLogger.level = DEBUG
|
||||||
rootLogger.appenderRef.console.ref = ConsoleAppender
|
rootLogger.appenderRef.console.ref = ConsoleAppender
|
||||||
web.upload.dir: /opt/flink/data/web-upload
|
|
||||||
state.checkpoints.dir: file:///tmp/flink-checkpoints
|
|
||||||
high-availability.type: kubernetes
|
high-availability.type: kubernetes
|
||||||
high-availability.storageDir: file:///opt/flink/ha
|
|
||||||
kubernetes.cluster-id: cluster-one
|
|
||||||
kubernetes.namespace: {{ .Release.Namespace }}
|
kubernetes.namespace: {{ .Release.Namespace }}
|
||||||
|
kubernetes.cluster-id: cluster-one
|
||||||
|
web.upload.dir: file://{{ .Values.flink.state.data.dir }}/web-upload
|
||||||
|
state.checkpoints.dir: file://{{ .Values.flink.state.data.dir }}/checkpoints
|
||||||
|
state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb
|
||||||
|
high-availability.storageDir: file://{{ .Values.flink.state.data.dir }}
|
||||||
|
state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }}
|
||||||
volumeMounts:
|
volumeMounts:
|
||||||
- name: flink-data
|
- name: flink-data
|
||||||
mountPath: /opt/flink/data
|
mountPath: {{ .Values.flink.state.data.dir }}
|
||||||
subPath: data
|
- name: flink-ha
|
||||||
- name: flink-data
|
mountPath: {{ .Values.flink.state.ha.dir }}
|
||||||
mountPath: /opt/flink/web-upload
|
|
||||||
subPath: web-upload
|
|
||||||
- name: flink-savepoints
|
- name: flink-savepoints
|
||||||
mountPath: /opt/flink/savepoints
|
mountPath: {{ .Values.flink.state.savepoints.dir }}
|
||||||
- name: flink-savepoints
|
|
||||||
mountPath: /opt/flink/ha
|
|
||||||
subPath: ha
|
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
- name: flink-data
|
- name: flink-data
|
||||||
emptyDir: {} # Temporary storage for internal data
|
persistentVolumeClaim:
|
||||||
|
claimName: {{ .Values.flink.state.data.pvcName }} # PVC for savepoints persistence
|
||||||
- name: flink-savepoints
|
- name: flink-savepoints
|
||||||
persistentVolumeClaim:
|
persistentVolumeClaim:
|
||||||
claimName: {{ .Values.flink.state.savepoints.pvcName }} # PVC for savepoints persistence
|
claimName: {{ .Values.flink.state.savepoints.pvcName }} # PVC for savepoints persistence
|
||||||
|
- name: flink-ha
|
||||||
|
persistentVolumeClaim:
|
||||||
|
claimName: {{ .Values.flink.state.ha.pvcName }} # PVC for savepoints persistence
|
||||||
10
helm/chart/templates/flink/savepoint.pvc.yaml
Normal file
10
helm/chart/templates/flink/savepoint.pvc.yaml
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
apiVersion: v1
|
||||||
|
kind: PersistentVolumeClaim
|
||||||
|
metadata:
|
||||||
|
name: {{ .Values.flink.state.savepoints.pvcName }}
|
||||||
|
spec:
|
||||||
|
accessModes:
|
||||||
|
- ReadWriteOnce
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
storage: {{ .Values.flink.state.savepoints.size }} # Use size defined in values.yaml
|
||||||
@@ -1,15 +1,15 @@
|
|||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: Service
|
kind: Service
|
||||||
metadata:
|
metadata:
|
||||||
name: flink # Adding the flink prefix to the service name
|
name: flink
|
||||||
labels:
|
labels:
|
||||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to labels
|
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||||
spec:
|
spec:
|
||||||
ports:
|
ports:
|
||||||
- port: 8081
|
- port: 8081
|
||||||
targetPort: 8081
|
targetPort: 8081
|
||||||
selector:
|
selector:
|
||||||
app.kubernetes.io/name: {{ .Release.Name }}-flink # Adding the flink prefix to selector
|
app.kubernetes.io/name: {{ .Release.Name }}-flink
|
||||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||||
type: ClusterIP # Change to LoadBalancer if you want external access
|
type: ClusterIP # Change to LoadBalancer if you want external access
|
||||||
@@ -125,14 +125,17 @@ flink:
|
|||||||
state:
|
state:
|
||||||
backend: rocksdb # Use RocksDB for state backend
|
backend: rocksdb # Use RocksDB for state backend
|
||||||
savepoints:
|
savepoints:
|
||||||
dir: "file:///opt/flink/savepoints" # Directory to store savepoints
|
dir: "/opt/flink/savepoints" # Directory to store savepoints
|
||||||
pvcName: flink-savepoints-pvc # PVC for savepoints persistence
|
pvcName: flink-savepoints-pvc # PVC for savepoints persistence
|
||||||
|
size: 10Gi # PVC size for savepoints storage
|
||||||
|
data:
|
||||||
|
dir: "/opt/flink/data" # Directory to store checkpoints/web-upload/rocksdb
|
||||||
|
pvcName: flink-data-pvc # PVC for checkpoints/web-upload/rocksdb
|
||||||
|
size: 10Gi # PVC size for checkpoints/web-upload/rocksdb
|
||||||
|
ha:
|
||||||
|
dir: "/opt/flink/ha" # Directory to store ha data
|
||||||
|
pvcName: flink-ha-pvc # PVC for ha
|
||||||
|
size: 10Gi # PVC size for ha
|
||||||
|
|
||||||
taskManager:
|
taskManager:
|
||||||
numberOfTaskSlots: 100 # Number of task slots for TaskManager
|
numberOfTaskSlots: 100 # Number of task slots for TaskManager
|
||||||
|
|
||||||
persistence:
|
|
||||||
enabled: true
|
|
||||||
size: 10Gi # PVC size for savepoints storage
|
|
||||||
accessModes:
|
|
||||||
- ReadWriteOnce
|
|
||||||
BIN
helm/flink-kube-operator-0.1.0.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.0.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.1.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.1.tgz
Normal file
Binary file not shown.
BIN
helm/flink-kube-operator-0.1.2.tgz
Normal file
BIN
helm/flink-kube-operator-0.1.2.tgz
Normal file
Binary file not shown.
34
helm/index.yaml
Normal file
34
helm/index.yaml
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
apiVersion: v1
|
||||||
|
entries:
|
||||||
|
flink-kube-operator:
|
||||||
|
- apiVersion: v2
|
||||||
|
appVersion: 0.1.0
|
||||||
|
created: "2024-12-20T17:12:30.023189281+03:30"
|
||||||
|
description: Helm chart for flink kube operator
|
||||||
|
digest: 89345b1a9a79aa18b646705aeb8cfdc547629600cb8a00708a3f64d188f296f2
|
||||||
|
name: flink-kube-operator
|
||||||
|
type: application
|
||||||
|
urls:
|
||||||
|
- flink-kube-operator-0.1.2.tgz
|
||||||
|
version: 0.1.2
|
||||||
|
- apiVersion: v2
|
||||||
|
appVersion: 0.1.0
|
||||||
|
created: "2024-12-20T17:12:30.022644192+03:30"
|
||||||
|
description: Helm chart for flink kube operator
|
||||||
|
digest: 1d2af9af6b9889cc2962d627946464766f1b65b05629073b7fffb9a98cd957e2
|
||||||
|
name: flink-kube-operator
|
||||||
|
type: application
|
||||||
|
urls:
|
||||||
|
- flink-kube-operator-0.1.1.tgz
|
||||||
|
version: 0.1.1
|
||||||
|
- apiVersion: v2
|
||||||
|
appVersion: 0.1.0
|
||||||
|
created: "2024-12-20T17:12:30.022187811+03:30"
|
||||||
|
description: Helm chart for flink kube operator
|
||||||
|
digest: 0890d955904e6a3b2155c086a933b27e45266d896fb69eaad0e811dea40414da
|
||||||
|
name: flink-kube-operator
|
||||||
|
type: application
|
||||||
|
urls:
|
||||||
|
- flink-kube-operator-0.1.0.tgz
|
||||||
|
version: 0.1.0
|
||||||
|
generated: "2024-12-20T17:12:30.021533384+03:30"
|
||||||
@@ -1,10 +0,0 @@
|
|||||||
apiVersion: v1
|
|
||||||
kind: PersistentVolumeClaim
|
|
||||||
metadata:
|
|
||||||
name: {{ .Values.flink.state.savepoints.pvcName }} # Adding the flink prefix to PVC name
|
|
||||||
spec:
|
|
||||||
accessModes:
|
|
||||||
- ReadWriteOnce
|
|
||||||
resources:
|
|
||||||
requests:
|
|
||||||
storage: {{ .Values.flink.persistence.size }} # Use size defined in values.yaml
|
|
||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
|
|
||||||
func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
|
func (crd *Crd) Patch(jobUid types.UID, patchData map[string]interface{}) error {
|
||||||
job := GetJob(jobUid)
|
job := GetJob(jobUid)
|
||||||
|
pkg.Logger.Debug("[patch-job]", zap.Any("jobUid", jobUid))
|
||||||
|
|
||||||
patchBytes, err := json.Marshal(patchData)
|
patchBytes, err := json.Marshal(patchData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ type FlinkJobSpec struct {
|
|||||||
JarURI string `json:"jarUri"`
|
JarURI string `json:"jarUri"`
|
||||||
SavepointInterval metaV1.Duration `json:"savepointInterval"`
|
SavepointInterval metaV1.Duration `json:"savepointInterval"`
|
||||||
EntryClass string `json:"entryClass"`
|
EntryClass string `json:"entryClass"`
|
||||||
|
Args []string `json:"args"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type FlinkJobStatus struct {
|
type FlinkJobStatus struct {
|
||||||
@@ -54,6 +55,7 @@ var (
|
|||||||
ErrNoJarId = errors.New("[managed-job] no jar id")
|
ErrNoJarId = errors.New("[managed-job] no jar id")
|
||||||
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
ErrNoSavepointTriggerId = errors.New("[managed-job] no savepoint trigger id")
|
||||||
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
|
ErrNoSavepointPath = errors.New("[managed-job] no savepoint path")
|
||||||
|
ErrOnStartingJob = errors.New("[managed-job] error on starting job")
|
||||||
)
|
)
|
||||||
|
|
||||||
type JobStatus string
|
type JobStatus string
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
|
|||||||
}
|
}
|
||||||
defer watcher.Stop()
|
defer watcher.Stop()
|
||||||
for event := range watcher.ResultChan() {
|
for event := range watcher.ResultChan() {
|
||||||
pkg.Logger.Debug("[crd] event received", zap.Any("type", event.Type))
|
|
||||||
unstructuredJob := event.Object.(*unstructured.Unstructured)
|
unstructuredJob := event.Object.(*unstructured.Unstructured)
|
||||||
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
|
unstructuredMap, _, err := unstructured.NestedMap(unstructuredJob.Object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -50,10 +49,10 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable {
|
|||||||
switch event.Type {
|
switch event.Type {
|
||||||
case watch.Bookmark:
|
case watch.Bookmark:
|
||||||
case watch.Modified:
|
case watch.Modified:
|
||||||
pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
//pkg.Logger.Info("[crd] [watch] flink job modified", zap.String("jobName", job.GetName()))
|
||||||
crd.repsert(job)
|
crd.repsert(job)
|
||||||
case watch.Added:
|
case watch.Added:
|
||||||
pkg.Logger.Info("[crd] [watch] new flink job created")
|
//pkg.Logger.Info("[crd] [watch] new flink job created")
|
||||||
crd.repsert(job)
|
crd.repsert(job)
|
||||||
case watch.Deleted:
|
case watch.Deleted:
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,6 +41,12 @@ func (job *ManagedJob) Cycle() {
|
|||||||
if job.def.Status.JobStatus == v1alpha1.JobStatusCreating {
|
if job.def.Status.JobStatus == v1alpha1.JobStatusCreating {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if job.def.Status.JobStatus == v1alpha1.JobStatusFailed {
|
||||||
|
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
|
||||||
|
job.crd.SetJobStatus(job.def.UID, job.def.Status)
|
||||||
|
return
|
||||||
|
}
|
||||||
// if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
|
// if job.def.Status.JobStatus == v1alpha1.JobStatusFailed && job.def.Status.LastSavepointPath != nil {
|
||||||
// //job.restore()
|
// //job.restore()
|
||||||
// return
|
// return
|
||||||
|
|||||||
@@ -42,16 +42,25 @@ func (job *ManagedJob) run(restoreMode bool) error {
|
|||||||
AllowNonRestoredState: true,
|
AllowNonRestoredState: true,
|
||||||
EntryClass: job.def.Spec.EntryClass,
|
EntryClass: job.def.Spec.EntryClass,
|
||||||
SavepointPath: savepointPath,
|
SavepointPath: savepointPath,
|
||||||
|
Parallelism: job.def.Spec.Parallelism,
|
||||||
|
ProgramArg: job.def.Spec.Args,
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))
|
pkg.Logger.Info("[managed-job] [run] jar successfully ran", zap.Any("run-jar-resp", runJarResp))
|
||||||
jobId = &runJarResp.JobId
|
jobId = &runJarResp.JobId
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
if strings.ContainsAny(err.Error(), ".jar does not exist") {
|
if strings.Contains(err.Error(), ".jar does not exist") {
|
||||||
|
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||||
shouldUpload = true
|
shouldUpload = true
|
||||||
} else {
|
} else {
|
||||||
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
pkg.Logger.Error("[managed-job] [run] unhandled jar run Flink error", zap.Error(err))
|
||||||
|
stringErr := err.Error()
|
||||||
|
job.def.Status.Error = &stringErr
|
||||||
|
job.def.Status.JobStatus = ""
|
||||||
|
job.def.Status.LifeCycleStatus = v1alpha1.LifeCycleStatusFailed
|
||||||
|
job.crd.SetJobStatus(job.def.UID, job.def.Status)
|
||||||
|
return v1alpha1.ErrOnStartingJob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -66,9 +75,9 @@ func (job *ManagedJob) run(restoreMode bool) error {
|
|||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
shouldUpload = false
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// job.def.Status.JobId = &runJarResp.JobId
|
// job.def.Status.JobId = &runJarResp.JobId
|
||||||
|
|||||||
@@ -58,6 +58,7 @@ func (mgr *Manager) cycle(client *api.Client, crdInstance *crd.Crd) {
|
|||||||
|
|
||||||
// Loop over job definitions as Kubernetes CRD
|
// Loop over job definitions as Kubernetes CRD
|
||||||
for _, uid := range crd.GetAllJobKeys() {
|
for _, uid := range crd.GetAllJobKeys() {
|
||||||
|
pkg.Logger.Debug("mgr.processingJobsIds", zap.Any("processingJobIds", mgr.processingJobsIds))
|
||||||
if lo.Contains(mgr.processingJobsIds, uid) {
|
if lo.Contains(mgr.processingJobsIds, uid) {
|
||||||
pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid))
|
pkg.Logger.Warn("[manager] already in process", zap.Any("uid", uid))
|
||||||
continue
|
continue
|
||||||
|
|||||||
Reference in New Issue
Block a user