Skip to content

Commit e1823f8

Browse files
authored
feat: Add sync sharding (#1891)
#### Summary Goes with cloudquery/plugin-pb-go#401. ~~Still testing this so in draft~~ Part of cloudquery/cloudquery-issues#2214 (internal issue) ---
1 parent b05d24b commit e1823f8

File tree

12 files changed

+198
-18
lines changed

12 files changed

+198
-18
lines changed

examples/simple_plugin/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ require (
5454
github.com/oapi-codegen/runtime v1.1.1 // indirect
5555
github.com/pierrec/lz4/v4 v4.1.21 // indirect
5656
github.com/pmezard/go-difflib v1.0.0 // indirect
57+
github.com/samber/lo v1.47.0 // indirect
5758
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect
5859
github.com/spf13/cobra v1.8.1 // indirect
5960
github.com/spf13/pflag v1.0.5 // indirect

examples/simple_plugin/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
126126
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
127127
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
128128
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
129+
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
130+
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
129131
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw=
130132
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU=
131133
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/hashicorp/go-retryablehttp v0.7.7
2323
github.com/invopop/jsonschema v0.12.0
2424
github.com/rs/zerolog v1.33.0
25+
github.com/samber/lo v1.47.0
2526
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1
2627
github.com/spf13/cobra v1.8.1
2728
github.com/stretchr/testify v1.9.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
126126
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
127127
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
128128
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
129+
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
130+
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
129131
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw=
130132
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU=
131133
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=

internal/servers/plugin/v3/plugin.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,12 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
178178
Connection: req.Backend.Connection,
179179
}
180180
}
181+
if req.Shard != nil {
182+
syncOptions.Shard = &plugin.Shard{
183+
Num: req.Shard.Num,
184+
Total: req.Shard.Total,
185+
}
186+
}
181187

182188
go func() {
183189
defer flushMetrics()

plugin/plugin_source.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@ type BackendOptions struct {
1515
Connection string
1616
}
1717

18+
type Shard struct {
19+
Num int32
20+
Total int32
21+
}
22+
1823
type SyncOptions struct {
1924
Tables []string
2025
SkipTables []string
2126
SkipDependentTables bool
2227
DeterministicCQID bool
2328
BackendOptions *BackendOptions
29+
Shard *Shard
2430
}
2531

2632
type SourceClient interface {

scheduler/scheduler.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cloudquery/plugin-sdk/v4/message"
1414
"github.com/cloudquery/plugin-sdk/v4/schema"
1515
"github.com/rs/zerolog"
16+
"github.com/samber/lo"
1617
"github.com/thoas/go-funk"
1718
"go.opentelemetry.io/otel"
1819
"go.opentelemetry.io/otel/attribute"
@@ -90,6 +91,12 @@ func WithInvocationID(invocationID string) Option {
9091
}
9192
}
9293

94+
func WithShard(num int32, total int32) SyncOption {
95+
return func(s *syncClient) {
96+
s.shard = &shard{num: num, total: total}
97+
}
98+
}
99+
93100
type Client interface {
94101
ID() string
95102
}
@@ -119,6 +126,11 @@ type Scheduler struct {
119126
invocationID string
120127
}
121128

129+
type shard struct {
130+
num int32
131+
total int32
132+
}
133+
122134
type syncClient struct {
123135
tables schema.Tables
124136
client schema.ClientMeta
@@ -128,6 +140,8 @@ type syncClient struct {
128140
metrics *Metrics
129141
logger zerolog.Logger
130142
invocationID string
143+
144+
shard *shard
131145
}
132146

133147
func NewScheduler(opts ...Option) *Scheduler {
@@ -346,3 +360,24 @@ func maxDepth(tables schema.Tables) uint64 {
346360
}
347361
return depth
348362
}
363+
364+
func shardTableClients(tableClients []tableClient, shard *shard) []tableClient {
365+
// For sharding to work as expected, tableClients must be deterministic between different shards.
366+
if shard == nil || len(tableClients) == 0 {
367+
return tableClients
368+
}
369+
num := int(shard.num)
370+
total := int(shard.total)
371+
chunkSize := len(tableClients) / total
372+
if chunkSize == 0 {
373+
chunkSize = 1
374+
}
375+
chunks := lo.Chunk(tableClients, chunkSize)
376+
if num > len(chunks) {
377+
return nil
378+
}
379+
if len(chunks) > total && num == total {
380+
return append(chunks[num-1], chunks[num]...)
381+
}
382+
return chunks[num-1]
383+
}

scheduler/scheduler_debug.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedR
6767
}
6868
}
6969
shuffle(allClients, seed)
70+
allClients = shardTableClients(allClients, s.shard)
7071

7172
var wg sync.WaitGroup
7273
for _, tc := range allClients {

scheduler/scheduler_dfs.go

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,27 +40,34 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche
4040
s.metrics.initWithClients(table, clients)
4141
}
4242

43-
var wg sync.WaitGroup
43+
tableClients := make([]tableClient, 0)
4444
for i, table := range s.tables {
45-
table := table
46-
clients := preInitialisedClients[i]
47-
for _, client := range clients {
48-
client := client
49-
if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil {
50-
// This means context was cancelled
51-
wg.Wait()
52-
return
53-
}
54-
wg.Add(1)
55-
go func() {
56-
defer wg.Done()
57-
defer s.scheduler.tableSems[0].Release(1)
58-
// not checking for error here as nothing much todo.
59-
// the error is logged and this happens when context is cancelled
60-
s.resolveTableDfs(ctx, table, client, nil, resolvedResources, 1)
61-
}()
45+
for _, client := range preInitialisedClients[i] {
46+
tableClients = append(tableClients, tableClient{table: table, client: client})
6247
}
6348
}
49+
tableClients = shardTableClients(tableClients, s.shard)
50+
51+
var wg sync.WaitGroup
52+
for _, tc := range tableClients {
53+
table := tc.table
54+
cl := tc.client
55+
if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil {
56+
// This means context was cancelled
57+
wg.Wait()
58+
return
59+
}
60+
wg.Add(1)
61+
go func() {
62+
defer wg.Done()
63+
defer s.scheduler.tableSems[0].Release(1)
64+
// not checking for error here as nothing much to do.
65+
// the error is logged and this happens when context is cancelled
66+
// Round Robin currently uses the DFS algorithm to resolve the tables, but this
67+
// may change in the future.
68+
s.resolveTableDfs(ctx, table, cl, nil, resolvedResources, 1)
69+
}()
70+
}
6471

6572
// Wait for all the worker goroutines to finish
6673
wg.Wait()

scheduler/scheduler_round_robin.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func (s *syncClient) syncRoundRobin(ctx context.Context, resolvedResources chan<
3737
}
3838

3939
tableClients := roundRobinInterleave(s.tables, preInitialisedClients)
40+
tableClients = shardTableClients(tableClients, s.shard)
4041

4142
var wg sync.WaitGroup
4243
for _, tc := range tableClients {

0 commit comments

Comments
 (0)