Skip to content

Commit fb84baa

Browse files
committed
Improved datastore speed and deduplication mechanism to not overload db writes
1 parent 1c65887 commit fb84baa

File tree

3 files changed

+100
-10
lines changed

3 files changed

+100
-10
lines changed

db-connector.go

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12301,7 +12301,6 @@ func SetDatastoreKeyBulk(ctx context.Context, allKeys []CacheKeyData) ([]Datasto
1230112301
nameKey := "org_cache"
1230212302
timeNow := int64(time.Now().Unix())
1230312303

12304-
newArray := []CacheKeyData{}
1230512304
dbKeys := []*datastore.Key{}
1230612305

1230712306
existingInfo := []DatastoreKeyMini{}
@@ -12348,6 +12347,8 @@ func SetDatastoreKeyBulk(ctx context.Context, allKeys []CacheKeyData) ([]Datasto
1234812347
go func(cacheData CacheKeyData, index int) {
1234912348
defer wg.Done()
1235012349

12350+
cacheData.Existed = false
12351+
cacheData.Changed = false
1235112352
cacheData.Created = timeNow
1235212353
cacheData.Edited = timeNow
1235312354

@@ -12368,8 +12369,9 @@ func SetDatastoreKeyBulk(ctx context.Context, allKeys []CacheKeyData) ([]Datasto
1236812369
cacheData.PublicAuthorization = config.PublicAuthorization
1236912370

1237012371
cacheData.Existed = true
12371-
}
12372+
}
1237212373

12374+
cacheData.Changed = true
1237312375
sameValue := false
1237412376
if getCacheError == nil && config.Value == cacheData.Value {
1237512377
sameValue = true
@@ -12429,7 +12431,8 @@ func SetDatastoreKeyBulk(ctx context.Context, allKeys []CacheKeyData) ([]Datasto
1242912431

1243012432
if sameValue {
1243112433
if debug {
12432-
log.Printf("[DEBUG] SAME VALUE FOR KEY %s in category %s. SHOULD skip datastore write.", cacheData.Key, cacheData.Category)
12434+
cacheData.Changed = false
12435+
//log.Printf("[DEBUG] SAME VALUE FOR KEY %s in category %s. SHOULD skip datastore write.", cacheData.Key, cacheData.Category)
1243312436
}
1243412437

1243512438
// FIXME: Should NOT be returning keys
@@ -12453,19 +12456,26 @@ func SetDatastoreKeyBulk(ctx context.Context, allKeys []CacheKeyData) ([]Datasto
1245312456
close(datastoreKeys)
1245412457

1245512458
// Ensures no duplicates
12459+
newArray := []CacheKeyData{}
1245612460
handledKeys := []string{}
12461+
12462+
skippedKeys := []string{}
1245712463
for key := range cacheKeys {
1245812464
if key.Key == "" {
1245912465
continue
1246012466
}
12461-
12467+
12468+
// Assumes duplicates
1246212469
checkKey := fmt.Sprintf("%s_%s", key.Key, key.Category)
1246312470
if ArrayContains(handledKeys, checkKey) {
12471+
if debug {
12472+
log.Printf("[DEBUG] Skipping duplicate key %s in category %s", key.Key, key.Category)
12473+
}
12474+
12475+
handledKeys = append(handledKeys, checkKey)
1246412476
continue
1246512477
}
1246612478

12467-
handledKeys = append(handledKeys, checkKey)
12468-
1246912479
// Details to help with filtering old vs new
1247012480
// Built for the "is_in_datastore" shuffle tools action
1247112481
minKey := DatastoreKeyMini{
@@ -12474,8 +12484,15 @@ func SetDatastoreKeyBulk(ctx context.Context, allKeys []CacheKeyData) ([]Datasto
1247412484
}
1247512485

1247612486
existingInfo = append(existingInfo, minKey)
12487+
if !key.Changed {
12488+
parsedKey := fmt.Sprintf("%s_%s_%s", key.OrgId, key.Key, key.Category)
12489+
skippedKeys = append(skippedKeys, parsedKey)
12490+
//log.Printf("[DEBUG] Key %s did NOT change, skipping database", parsedKey)
12491+
continue
12492+
}
1247712493

1247812494
key.Existed = false
12495+
key.Changed = false
1247912496
newArray = append(newArray, key)
1248012497

1248112498
}
@@ -12486,22 +12503,67 @@ func SetDatastoreKeyBulk(ctx context.Context, allKeys []CacheKeyData) ([]Datasto
1248612503
continue
1248712504
}
1248812505

12506+
// Duplicate handler
1248912507
if ArrayContains(handledKeys, key.Name) {
1249012508
continue
1249112509
}
1249212510

12493-
// Look for empty keys and continue if so:
12494-
//datastoreKeys <- *datastore.NameKey("", "", nil)
12495-
//cacheKeys <- CacheKeyData{}
12511+
if ArrayContains(skippedKeys, key.Name) {
12512+
continue
12513+
}
1249612514

12515+
// Look for empty keys and continue if so:
1249712516
handledKeys = append(handledKeys, key.Name)
1249812517
dbKeys = append(dbKeys, &key)
1249912518
}
1250012519

12520+
// Autofixer on the fly
12521+
if len(newArray) != len(dbKeys) {
12522+
dbKeys = []*datastore.Key{}
12523+
12524+
// FIXME: newArray backwards to ALWAYS have latest key? Is latest last
12525+
// or first in the array? :thinking:
12526+
handledKeys := []string{}
12527+
skippedIndexes := []int{}
12528+
for skipIndex, cacheData:= range newArray {
12529+
datastoreId := fmt.Sprintf("%s_%s", cacheData.OrgId, cacheData.Key)
12530+
if len(cacheData.Category) > 0 && cacheData.Category != "default" {
12531+
// Adds category on the end
12532+
datastoreId = fmt.Sprintf("%s_%s", datastoreId, cacheData.Category)
12533+
}
12534+
12535+
if ArrayContains(handledKeys, datastoreId) {
12536+
skippedIndexes = append(skippedIndexes, skipIndex)
12537+
continue
12538+
}
12539+
12540+
handledKeys = append(handledKeys, datastoreId)
12541+
12542+
dbKeys = append(dbKeys, datastore.NameKey(nameKey, strings.ToLower(datastoreId), nil) )
12543+
}
12544+
12545+
// Cleanup newArray again due to transactional handler
12546+
// Example where problems show up are nested items:
12547+
// Multiple emails in the same thread
12548+
if len(skippedIndexes) > 0 {
12549+
newDeduped := []CacheKeyData{}
12550+
for index, val := range newArray {
12551+
if ArrayContainsInt(skippedIndexes, index) {
12552+
continue
12553+
}
12554+
12555+
newDeduped = append(newDeduped, val)
12556+
}
12557+
12558+
newArray = newDeduped
12559+
}
12560+
}
12561+
1250112562
// New struct, to not add body, author etc
1250212563
if project.DbType == "opensearch" {
1250312564
var buf bytes.Buffer
1250412565

12566+
// Bulk encoding them
1250512567
for _, cacheData := range newArray {
1250612568
cacheId := fmt.Sprintf("%s_%s", cacheData.OrgId, cacheData.Key)
1250712569
if len(cacheData.Category) > 0 && cacheData.Category != "default" {
@@ -13864,6 +13926,8 @@ func GetAllCacheKeys(ctx context.Context, orgId string, category string, max int
1386413926
query := datastore.NewQuery(nameKey).Filter("OrgId =", orgId).Order("-Edited")
1386513927
if len(category) > 0 {
1386613928
query = query.Filter("category =", category)
13929+
} else {
13930+
query = query.Filter("category =", "")
1386713931
}
1386813932

1386913933
query = query.Limit(max)

shared.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3605,6 +3605,19 @@ func ArrayContainsLower(visited []string, id string) bool {
36053605
for _, item := range visited {
36063606
if strings.ToLower(item) == strings.ToLower(id) {
36073607
found = true
3608+
break
3609+
}
3610+
}
3611+
3612+
return found
3613+
}
3614+
3615+
func ArrayContainsInt(visited []int, id int) bool {
3616+
found := false
3617+
for _, item := range visited {
3618+
if item == id {
3619+
found = true
3620+
break
36083621
}
36093622
}
36103623

@@ -3616,6 +3629,7 @@ func ArrayContains(visited []string, id string) bool {
36163629
for _, item := range visited {
36173630
if item == id {
36183631
found = true
3632+
break
36193633
}
36203634
}
36213635

@@ -19435,9 +19449,20 @@ func HandleSetDatastoreKey(resp http.ResponseWriter, request *http.Request) {
1943519449
KeysExisted []DatastoreKeyMini `json:"keys_existed"`
1943619450
}
1943719451

19452+
/*
19453+
// For testing deduplication
1943819454
if debug {
19439-
log.Printf("[DEBUG] EXISTINGINFO: %#v", existingInfo)
19455+
found := []string{}
19456+
for _, existing := range existingInfo {
19457+
if ArrayContains(found, existing.Key) {
19458+
log.Printf("[DEBUG] Key %s already found in existing info", existing.Key)
19459+
continue
19460+
}
19461+
19462+
found = append(found, existing.Key)
19463+
}
1944019464
}
19465+
*/
1944119466

1944219467
returnData := returnStruct{
1944319468
Success: true,

structs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,6 +1144,7 @@ type CacheKeyData struct {
11441144
Edited int64 `json:"edited" datastore:"Edited"`
11451145

11461146
Existed bool `json:"existed,omitempty" datastore:"Existed"` // If the key existed before the update. Should always be set back to false.
1147+
Changed bool `json:"changed,omitempty" datastore:"Changed"` // If the value was changed. Should always be set back to false.
11471148
Encrypted bool `json:"encrypted" datastore:"Encrypted"`
11481149
FormattedKey string `json:"formatted_key,omitempty" datastore:"FormattedKey"`
11491150
PublicAuthorization string `json:"public_authorization,omitempty" datastore:"PublicAuthorization"` // Used for public authorization

0 commit comments

Comments
 (0)