feat(crd): add kube api and crds

This commit is contained in:
2024-11-30 19:04:02 +03:30
parent e95634c942
commit 19b874cba6
14 changed files with 600 additions and 3 deletions

View File

@@ -0,0 +1,74 @@
package client
import (
"context"
"flink-kube-operator/internal/crd/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
)
type FlinkJobInterface interface {
List(opts metav1.ListOptions) (*v1alpha1.FlinkJobList, error)
Get(name string, options metav1.GetOptions) (*v1alpha1.FlinkJob, error)
Create(*v1alpha1.FlinkJob) (*v1alpha1.FlinkJob, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
// ...
}
type FlinkJobClient struct {
restClient rest.Interface
ns string
}
func (c *FlinkJobClient) List(opts metav1.ListOptions) (*v1alpha1.FlinkJobList, error) {
result := v1alpha1.FlinkJobList{}
err := c.restClient.
Get().
Namespace(c.ns).
Resource("FlinkJobs").
VersionedParams(&opts, scheme.ParameterCodec).
Do(context.Background()).
Into(&result)
return &result, err
}
func (c *FlinkJobClient) Get(name string, opts metav1.GetOptions) (*v1alpha1.FlinkJob, error) {
result := v1alpha1.FlinkJob{}
err := c.restClient.
Get().
Namespace(c.ns).
Resource("FlinkJobs").
Name(name).
VersionedParams(&opts, scheme.ParameterCodec).
Do(context.Background()).
Into(&result)
return &result, err
}
func (c *FlinkJobClient) Create(FlinkJob *v1alpha1.FlinkJob) (*v1alpha1.FlinkJob, error) {
result := v1alpha1.FlinkJob{}
err := c.restClient.
Post().
Namespace(c.ns).
Resource("FlinkJobs").
Body(FlinkJob).
Do(context.Background()).
Into(&result)
return &result, err
}
func (c *FlinkJobClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
opts.Watch = true
return c.restClient.
Get().
Namespace(c.ns).
Resource("FlinkJobs").
VersionedParams(&opts, scheme.ParameterCodec).
Watch(context.Background())
}

12
internal/crd/crd.type.go Normal file
View File

@@ -0,0 +1,12 @@
package crd
import (
"k8s.io/apimachinery/pkg/runtime/schema"
)
// Define the FlinkJob resource GVR (Group, Version, Resource)
var flinkJobGVR = schema.GroupVersionResource{
Group: "flink.logicamp.tech",
Version: "v1beta1",
Resource: "flink-jobs",
}

38
internal/crd/new.go Normal file
View File

@@ -0,0 +1,38 @@
package crd
import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type Crd struct {
client dynamic.NamespaceableResourceInterface
}
func New() {
// Get Kubernetes config
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
config, err = rest.InClusterConfig()
if err != nil {
panic(err)
}
}
// Create dynamic client
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
// Get FlinkJob resource interface
flinkJobClient := dynamicClient.Resource(flinkJobGVR)
crd := Crd{
client: flinkJobClient,
}
// Watch for FlinkJob creation
go crd.watchFlinkJobs()
}

View File

@@ -0,0 +1,28 @@
package v1alpha1
import (
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
//go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen object paths=$GOFILE
type FlinkJobSpec struct {
Name string `json:"name"`
Parallelism int `json:"parallelism"`
Parallelism2 int `json:"parallelism2"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type FlinkJob struct {
metaV1.TypeMeta `json:",inline"`
metaV1.ObjectMeta `json:"metadata,omitempty"`
Spec FlinkJobSpec `json:"spec"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type FlinkJobList struct {
metaV1.TypeMeta `json:",inline"`
metaV1.ListMeta `json:"metadata,omitempty"`
Items []FlinkJob `json:"items"`
}

View File

@@ -0,0 +1,27 @@
package v1alpha1
import (
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
const GroupName = "flink.logicamp.tech"
const GroupVersion = "v1alpha1"
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: GroupVersion}
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&FlinkJob{},
&FlinkJobList{},
)
metaV1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}

View File

@@ -0,0 +1,67 @@
//go:build !ignore_autogenerated
// Code generated by controller-gen. DO NOT EDIT.
package v1alpha1
import (
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FlinkJob) DeepCopyInto(out *FlinkJob) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Spec = in.Spec
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlinkJob.
func (in *FlinkJob) DeepCopy() *FlinkJob {
if in == nil {
return nil
}
out := new(FlinkJob)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *FlinkJob) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FlinkJobList) DeepCopyInto(out *FlinkJobList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]FlinkJob, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlinkJobList.
func (in *FlinkJobList) DeepCopy() *FlinkJobList {
if in == nil {
return nil
}
out := new(FlinkJobList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *FlinkJobList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}

51
internal/crd/watch.go Normal file
View File

@@ -0,0 +1,51 @@
package crd
import (
"context"
"fmt"
"gitea.com/logicamp/lc"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)
func (crd Crd) watchFlinkJobs() {
lc.Logger.Debug("[crd] starting watch")
watcher, err := crd.client.Watch(context.Background(), metav1.ListOptions{})
if err != nil {
panic(err)
}
defer watcher.Stop()
for event := range watcher.ResultChan() {
lc.Logger.Debug("[crd] event received")
switch event.Type {
case watch.Added:
job := event.Object.(*unstructured.Unstructured)
fmt.Printf("New FlinkJob created: %s\n", job.GetName())
// Handle the new FlinkJob
handleNewFlinkJob(job)
}
}
}
func handleNewFlinkJob(job *unstructured.Unstructured) {
// Extract job details
name := job.GetName()
namespace := job.GetNamespace()
// Get specific fields using unstructured.Unstructured methods
spec, found, err := unstructured.NestedMap(job.Object, "spec")
if err != nil || !found {
fmt.Printf("Error getting spec for job %s: %v\n", name, err)
return
}
// Process job specification
fmt.Printf("Processing FlinkJob %s in namespace %s\n", name, namespace)
lc.Logger.Debug("[crd] [watch]", zap.Any("spec", spec))
// Add your custom logic here
}