feat: change db to buntdb

This commit is contained in:
2024-11-30 21:08:06 +03:30
parent 412a5292cb
commit d8b87ac6ee
6 changed files with 345 additions and 93 deletions

View File

@@ -3,19 +3,19 @@ package managed_job
import (
"flink-kube-operator/internal/config"
"github.com/dgraph-io/badger/v4"
api "github.com/logi-camp/go-flink-client"
"github.com/tidwall/buntdb"
)
type ManagedJob struct {
def config.JobDef
client *api.Client
jarId string
db *badger.DB
db *buntdb.DB
state *jobState
}
func NewManagedJob(client *api.Client, db *badger.DB, def config.JobDef) *ManagedJob {
func NewManagedJob(client *api.Client, db *buntdb.DB, def config.JobDef) *ManagedJob {
job := &ManagedJob{
def: def,
client: client,

View File

@@ -6,21 +6,18 @@ import (
"time"
"github.com/dgraph-io/badger/v4"
"github.com/tidwall/buntdb"
)
// get state of job from local db
func (job *ManagedJob) loadState() {
err := job.db.View(
func(tx *badger.Txn) error {
if val, err := tx.Get([]byte(job.def.Key)); err != nil {
func(tx *buntdb.Tx) error {
if val, err := tx.Get(job.def.Key); err != nil {
return err
} else if val != nil {
val.Value(func(val []byte) error {
job.state = &jobState{}
return json.Unmarshal(val, job.state)
})
} else {
return json.Unmarshal([]byte(val), job.state)
}
return nil
})
if errors.Is(err, badger.ErrKeyNotFound) {
err = nil
@@ -32,12 +29,12 @@ func (job *ManagedJob) updateState(state jobState) {
job.state = &state
value, _ := json.Marshal(job.state)
job.db.Update(func(txn *badger.Txn) error {
err := txn.Set([]byte(job.def.Key), value)
job.db.Update(func(tx *buntdb.Tx) error {
_, _, err := tx.Set(job.def.Key, string(value), nil)
if err != nil {
return err
}
return txn.Commit()
return tx.Commit()
})
}