Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions badgerdriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,28 @@ func (d *Driver) Shutdown(_ context.Context) error {

func (d *Driver) GetData(cmd *flowstate.GetDataCommand) error {
return d.db.View(func(txn *badger.Txn) error {
return getData(txn, cmd.Data)
data := cmd.StateCtx.MustData(cmd.Alias)
if err := getData(txn, data); err != nil {
return err
}
return nil
})
}

func (d *Driver) StoreData(cmd *flowstate.AttachDataCommand) error {
nextRev, err := d.dataRevSeq.Next()
if err != nil {
return fmt.Errorf("get next sequence: %w", err)
}

data := cmd.Data
data.Rev = int64(nextRev)
}

func (d *Driver) StoreData(cmd *flowstate.StoreDataCommand) error {
return d.db.Update(func(txn *badger.Txn) error {
return setData(txn, data)
data := cmd.StateCtx.MustData(cmd.Alias)
nextRev, err := d.dataRevSeq.Next()
if err != nil {
return fmt.Errorf("get next sequence: %w", err)
}

data.Rev = int64(nextRev)
if err := setData(txn, data); err != nil {
return err
}
return nil
})
}

Expand Down
59 changes: 38 additions & 21 deletions badgerdriver/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package badgerdriver

import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -130,46 +131,62 @@ func committedAtIndexPrefix() []byte {
return []byte("flowstate.index.committed_at.")
}

func dataBKey(data *flowstate.Data) []byte {
return []byte(fmt.Sprintf(`flowstate.data.blob.%020d.%s`, data.Rev, data.ID))
func dataBlobKey(data *flowstate.Data) []byte {
return []byte(fmt.Sprintf(`flowstate.data.blob.%020d`, data.Rev))
}

func dataBinaryKey(data *flowstate.Data) []byte {
return []byte(fmt.Sprintf(`flowstate.data.binary.%020d.%s`, data.Rev, data.ID))
func dataAnnotationsKey(data *flowstate.Data) []byte {
return []byte(fmt.Sprintf(`flowstate.data.annotations.%020d`, data.Rev))
}

func setData(txn *badger.Txn, data *flowstate.Data) error {
if err := txn.Set(dataBKey(data), data.B); err != nil {
return fmt.Errorf("set data.B: %w", err)
if err := txn.Set(dataBlobKey(data), data.Blob); err != nil {
return fmt.Errorf("set data.Blob: %w", err)
}

var dataBinary []byte
if data.Binary {
dataBinary = append(dataBinary, 1)
}

if err := txn.Set(dataBinaryKey(data), dataBinary); err != nil {
return fmt.Errorf("set data.Binary: %w", err)
if len(data.Annotations) > 0 {
annotationsJSON, err := json.Marshal(data.Annotations)
if err != nil {
return fmt.Errorf("marshal data.Annotations: %w", err)
}
if err := txn.Set(dataAnnotationsKey(data), annotationsJSON); err != nil {
return fmt.Errorf("set data.Annotations: %w", err)
}
}

return nil
}

func getData(txn *badger.Txn, data *flowstate.Data) error {
item, err := txn.Get(dataBKey(data))
blobItem, err := txn.Get(dataBlobKey(data))
if err != nil {
return fmt.Errorf("get data.Bd: %w", err)
return fmt.Errorf("get data.Blob: %w", err)
}
data.B, err = item.ValueCopy(data.B)
data.Blob, err = blobItem.ValueCopy(data.Blob)
if err != nil {
return fmt.Errorf("copy data.B: %w", err)
return fmt.Errorf("copy data.Blob: %w", err)
}

dataBinary, err := txn.Get(dataBinaryKey(data))
if err != nil {
return fmt.Errorf("get data.Binary: %w", err)
if annotationsItem, err := txn.Get(dataAnnotationsKey(data)); errors.Is(err, badger.ErrKeyNotFound) {
// ok
} else if err != nil {
return fmt.Errorf("get data.Annotations: %w", err)
} else {
if err := annotationsItem.Value(func(val []byte) error {
if len(val) == 0 {
return nil
}
annotations := make(map[string]string)
if err := json.Unmarshal(val, &annotations); err != nil {
return fmt.Errorf("unmarshal annotations: %w", err)
}

data.Annotations = annotations
return nil
}); err != nil {
return fmt.Errorf("copy data.Annotations: %w", err)
}
}
data.Binary = dataBinary.ValueSize() > 0

return nil
}
Expand Down
108 changes: 38 additions & 70 deletions cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ package flowstate
import (
"encoding/base64"
"fmt"
"strconv"
"strings"
"time"

"github.com/oklog/ulid/v2"
)

var _ Command = &TransitCommand{}
Expand All @@ -30,10 +26,6 @@ var _ Command = &GetStatesCommand{}

var _ Command = &GetDelayedStatesCommand{}

var _ Command = &AttachDataCommand{}

var _ Command = &GetDataCommand{}

var _ Command = &CommitCommand{}

var _ Command = &ExecuteCommand{}
Expand Down Expand Up @@ -394,60 +386,47 @@ func (cmd *UnstackCommand) Do() error {
return nil
}

func AttachData(stateCtx *StateCtx, data *Data, alias string) *AttachDataCommand {
return &AttachDataCommand{
func StoreData(stateCtx *StateCtx, alias string) *StoreDataCommand {
return &StoreDataCommand{
StateCtx: stateCtx,
Data: data,
Alias: alias,

Store: true,
}
}

type AttachDataCommand struct {
type StoreDataCommand struct {
command
StateCtx *StateCtx
Data *Data
Alias string
Store bool
}

func (cmd *AttachDataCommand) WithoutStore() *AttachDataCommand {
cmd.Store = false
return cmd
}

func (cmd *AttachDataCommand) Prepare() error {
if cmd.Alias == "" {
return fmt.Errorf("alias is empty")
}
if cmd.Data.ID == "" {
cmd.Data.ID = DataID(ulid.Make().String())
}
if cmd.Data.Rev < 0 {
return fmt.Errorf("Data.Rev is negative")
func (cmd *StoreDataCommand) Prepare() (bool, error) {
d, err := cmd.StateCtx.Data(cmd.Alias)
if err != nil {
return false, err
}
if cmd.Data.B == nil || len(cmd.Data.B) == 0 {
return fmt.Errorf("Data.B is empty")

if d.Rev < 0 {
return false, fmt.Errorf("data rev is negative")
}
if cmd.Data.Rev == 0 && !cmd.Store {
return fmt.Errorf("Data.Rev is zero, but Store is false; this would lead to data loss")

if !d.isDirty() {
referenceData(cmd.StateCtx, cmd.Alias, d.Rev)
return false, nil
}

return nil
d.checksum()

return true, nil
}

func (cmd *AttachDataCommand) Do() {
cmd.StateCtx.Current.SetAnnotation(
dataAnnotation(cmd.Alias),
string(cmd.Data.ID)+":"+strconv.FormatInt(cmd.Data.Rev, 10),
)
func (cmd *StoreDataCommand) post() {
d := cmd.StateCtx.MustData(cmd.Alias)
referenceData(cmd.StateCtx, cmd.Alias, d.Rev)
}

func GetData(stateCtx *StateCtx, data *Data, alias string) *GetDataCommand {
func GetData(stateCtx *StateCtx, alias string) *GetDataCommand {
return &GetDataCommand{
StateCtx: stateCtx,
Data: data,
Alias: alias,
}

Expand All @@ -456,43 +435,32 @@ func GetData(stateCtx *StateCtx, data *Data, alias string) *GetDataCommand {
type GetDataCommand struct {
command
StateCtx *StateCtx
Data *Data
Alias string
}

func (cmd *GetDataCommand) Prepare() error {
if cmd.Data == nil {
return fmt.Errorf("data is nil")
}
if cmd.Alias == "" {
return fmt.Errorf("alias is empty")
}

annotKey := dataAnnotation(cmd.Alias)
idRevStr := cmd.StateCtx.Current.Annotations[annotKey]
if idRevStr == "" {
return fmt.Errorf("annotation %q is not set", annotKey)
}

sepIdx := strings.LastIndexAny(idRevStr, ":")
if sepIdx < 1 || sepIdx+1 == len(idRevStr) {
return fmt.Errorf("annotation %q contains invalid data reference; got %q", annotKey, idRevStr)
func (cmd *GetDataCommand) Prepare() (bool, error) {
rev, err := dereferenceData(cmd.StateCtx, cmd.Alias)
if err != nil {
return false, err
}

id := DataID(idRevStr[:sepIdx])
rev, err := strconv.ParseInt(idRevStr[sepIdx+1:], 10, 64)
d, err := cmd.StateCtx.Data(cmd.Alias)
if err != nil {
return fmt.Errorf("annotation %q contains invalid data revision; got %q: %w", annotKey, idRevStr[sepIdx+1:], err)
cmd.StateCtx.SetData(cmd.Alias, &Data{
Rev: rev,
})
return true, nil
} else if d.Rev == rev {
return false, nil
}

cmd.Data.ID = id
cmd.Data.Rev = rev

return nil
}
d.Rev = rev
d.Blob = d.Blob[:0]
for k := range d.Annotations {
delete(d.Annotations, k)
}

func dataAnnotation(alias string) string {
return "flowstate.data." + string(alias)
return true, nil
}

func nextTransitionOrCurrent(stateCtx *StateCtx, to FlowID) Transition {
Expand Down
Loading