diff --git a/helm/chart/templates/flink/deploy.yaml b/helm/chart/templates/flink/deploy.yaml index 217e05b..ac279da 100644 --- a/helm/chart/templates/flink/deploy.yaml +++ b/helm/chart/templates/flink/deploy.yaml @@ -7,6 +7,8 @@ metadata: app.kubernetes.io/instance: {{ .Release.Name }} spec: replicas: 1 + strategy: + type: Recreate selector: matchLabels: app.kubernetes.io/name: {{ .Release.Name }}-flink @@ -17,8 +19,28 @@ spec: app.kubernetes.io/name: {{ .Release.Name }}-flink app.kubernetes.io/instance: {{ .Release.Name }} spec: + initContainers: + - name: volume-mount-hack + image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} + runAsUser: 0 + command: ["sh", "-c", "chown -R flink {{ .Values.flink.state.data.dir }}/data {{ .Values.flink.state.data.dir }}/rocksdb {{ .Values.flink.state.data.dir }}/checkpoints {{ .Values.flink.state.data.dir }}/web-upload {{ .Values.flink.state.ha.dir }} {{ .Values.flink.state.savepoints.dir }}"] + volumeMounts: + - name: flink-data + mountPath: {{ .Values.flink.state.data.dir }}/data + - name: flink-data + mountPath: {{ .Values.flink.state.data.dir }}/rocksdb + subPath: rocksdb + - name: flink-data + mountPath: {{ .Values.flink.state.data.dir }}/checkpoints + subPath: checkpoints + - name: flink-data + mountPath: {{ .Values.flink.state.data.dir }}/web-upload + subPath: web-upload + - name: flink-ha + mountPath: {{ .Values.flink.state.ha.dir }} + - name: flink-savepoints + mountPath: {{ .Values.flink.state.savepoints.dir }} serviceAccountName: {{ include "flink-kube-operator.serviceAccountName" . }} - containers: - name: flink image: {{ .Values.flink.image.repository }}:{{ .Values.flink.image.tag }} @@ -27,10 +49,12 @@ spec: httpGet: path: / port: 8081 + failureThreshold: 8 readinessProbe: httpGet: path: / port: 8081 + failureThreshold: 8 ports: - containerPort: 8081 # JobManager Web UI port - containerPort: 6121 # TaskManager communication port @@ -38,6 +62,8 @@ spec: env: - name: JOB_MANAGER_RPC_ADDRESS value: "localhost" # JobManager and TaskManager in the same container + - name: NAMESPACE + value: {{ .Release.Namespace }} - name: FLINK_PROPERTIES value: | jobmanager.rpc.address: localhost @@ -58,6 +84,7 @@ spec: state.backend.rocksdb.localdir: file://{{ .Values.flink.state.data.dir }}/rocksdb high-availability.storageDir: file://{{ .Values.flink.state.ha.dir }} state.savepoints.dir: file://{{ .Values.flink.state.savepoints.dir }} + state.backend.incremental: false volumeMounts: - name: flink-data mountPath: {{ .Values.flink.state.data.dir }}/data diff --git a/helm/chart/templates/operator/role.yaml b/helm/chart/templates/operator/role.yaml index 99c8d30..2f4ec2b 100644 --- a/helm/chart/templates/operator/role.yaml +++ b/helm/chart/templates/operator/role.yaml @@ -2,7 +2,6 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: {{ include "flink-kube-operator.serviceAccountName" . }} - namespace: {{ .Release.Namespace }} # Namespace where the role is created labels: {{- include "flink-kube-operator.labels" . | nindent 4 }} rules: diff --git a/helm/chart/values.yaml b/helm/chart/values.yaml index c1fd7f6..b68f403 100644 --- a/helm/chart/values.yaml +++ b/helm/chart/values.yaml @@ -54,7 +54,7 @@ service: # This sets the service type more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types type: ClusterIP # This sets the ports more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#field-spec-ports - port: 80 + port: 3000 # This block is for setting up the ingress for more information can be found here: https://kubernetes.io/docs/concepts/services-networking/ingress/ ingress: diff --git a/internal/crd/watch.go b/internal/crd/watch.go index aa12a74..fd1de5c 100644 --- a/internal/crd/watch.go +++ b/internal/crd/watch.go @@ -3,6 +3,7 @@ package crd import ( "context" "flink-kube-operator/internal/crd/v1alpha1" + "os" "flink-kube-operator/pkg" @@ -20,7 +21,7 @@ func (crd Crd) watchFlinkJobs() rxgo.Observable { go func() { pkg.Logger.Debug("[crd] starting watch") - watcher, err := crd.client.Watch(context.Background(), metaV1.ListOptions{}) + watcher, err := crd.client.Namespace(os.Getenv("NAMESPACE")).Watch(context.Background(), metaV1.ListOptions{}) if err != nil { panic(err) }