Skip to content

Commit b38b7e2

Browse files
authored
refactor: abstract contube-go impl (#131)
Signed-off-by: Zike Yang <zike@apache.org>
1 parent 98df98d commit b38b7e2

14 files changed

Lines changed: 280 additions & 151 deletions

File tree

benchmark/bench_test.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
2323
"github.com/functionstream/functionstream/common"
2424
"github.com/functionstream/functionstream/lib"
25+
"github.com/functionstream/functionstream/lib/contube"
2526
"github.com/functionstream/functionstream/perf"
2627
"github.com/functionstream/functionstream/restclient"
2728
"github.com/functionstream/functionstream/server"
@@ -35,12 +36,10 @@ import (
3536

3637
func BenchmarkStressForBasicFunc(b *testing.B) {
3738
s := server.New(server.LoadConfigFromEnv())
38-
go s.Run()
39+
svrCtx, svrCancel := context.WithCancel(context.Background())
40+
go s.Run(svrCtx)
3941
defer func() {
40-
err := s.Close()
41-
if err != nil {
42-
b.Fatal(err)
43-
}
42+
svrCancel()
4443
}()
4544

4645
inputTopic := "test-input-" + strconv.Itoa(rand.Int())
@@ -78,7 +77,7 @@ func BenchmarkStressForBasicFunc(b *testing.B) {
7877

7978
b.ReportAllocs()
8079

81-
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second))
80+
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
8281
defer cancel()
8382

8483
profile := b.Name() + ".pprof"
@@ -95,28 +94,27 @@ func BenchmarkStressForBasicFunc(b *testing.B) {
9594
b.Fatal(err)
9695
}
9796

97+
<-s.WaitForReady(context.Background())
9898
perf.New(pConfig).Run(ctx)
9999

100100
pprof.StopCPUProfile()
101101
}
102102

103103
func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
104-
memoryQueueFactory := lib.NewMemoryQueueFactory(context.Background())
104+
memoryQueueFactory := contube.NewMemoryQueueFactory(context.Background())
105105

106106
svrConf := &lib.Config{
107107
ListenAddr: common.DefaultAddr,
108-
QueueBuilder: func(ctx context.Context, config *lib.Config) (lib.EventQueueFactory, error) {
108+
QueueBuilder: func(ctx context.Context, config *lib.Config) (contube.TubeFactory, error) {
109109
return memoryQueueFactory, nil
110110
},
111111
}
112112

113113
s := server.New(svrConf)
114-
go s.Run()
114+
svrCtx, svrCancel := context.WithCancel(context.Background())
115+
go s.Run(svrCtx)
115116
defer func() {
116-
err := s.Close()
117-
if err != nil {
118-
b.Fatal(err)
119-
}
117+
svrCancel()
120118
}()
121119

122120
inputTopic := "test-input-" + strconv.Itoa(rand.Int())
@@ -132,14 +130,14 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
132130
Output: outputTopic,
133131
Replicas: &replicas,
134132
},
135-
QueueBuilder: func(ctx context.Context, c *lib.Config) (lib.EventQueueFactory, error) {
133+
QueueBuilder: func(ctx context.Context, c *lib.Config) (contube.TubeFactory, error) {
136134
return memoryQueueFactory, nil
137135
},
138136
}
139137

140138
b.ReportAllocs()
141139

142-
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second))
140+
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
143141
defer cancel()
144142

145143
profile := b.Name() + ".pprof"
@@ -156,6 +154,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
156154
b.Fatal(err)
157155
}
158156

157+
<-s.WaitForReady(context.Background())
159158
perf.New(pConfig).Run(ctx)
160159

161160
pprof.StopCPUProfile()

cmd/server/cmd.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package server
1818

1919
import (
20+
"context"
2021
"github.com/functionstream/functionstream/common"
2122
"github.com/functionstream/functionstream/server"
2223
"github.com/spf13/cobra"
@@ -35,7 +36,7 @@ var (
3536
func exec(*cobra.Command, []string) {
3637
common.RunProcess(func() (io.Closer, error) {
3738
s := server.New(server.LoadConfigFromEnv())
38-
go s.Run()
39+
go s.Run(context.Background())
3940
return s, nil
4041
})
4142
}

cmd/standalone/cmd.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package standalone
1818

1919
import (
20+
"context"
2021
"github.com/functionstream/functionstream/common"
2122
"github.com/functionstream/functionstream/server"
2223
"github.com/spf13/cobra"
@@ -35,7 +36,7 @@ var (
3536
func exec(*cobra.Command, []string) {
3637
common.RunProcess(func() (io.Closer, error) {
3738
s := server.New(server.LoadStandaloneConfigFromEnv())
38-
go s.Run()
39+
go s.Run(context.Background())
3940
return s, nil
4041
})
4142
}

lib/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ package lib
1818

1919
import (
2020
"context"
21+
"github.com/functionstream/functionstream/lib/contube"
2122
)
2223

23-
type QueueBuilder func(ctx context.Context, config *Config) (EventQueueFactory, error)
24+
type QueueBuilder func(ctx context.Context, config *Config) (contube.TubeFactory, error)
2425

2526
type Config struct {
2627
ListenAddr string

lib/contube/event_tube.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package contube
18+
19+
import (
20+
"context"
21+
)
22+
23+
type Record interface {
24+
GetPayload() []byte
25+
Commit()
26+
}
27+
28+
type SourceQueueConfig struct {
29+
Topics []string
30+
SubName string
31+
}
32+
33+
type SinkQueueConfig struct {
34+
Topic string
35+
}
36+
37+
const (
38+
TopicKey = "topic"
39+
TopicListKey = "topicList"
40+
SubNameKey = "subName"
41+
)
42+
43+
func NewSourceQueueConfig(config ConfigMap) *SourceQueueConfig {
44+
var result SourceQueueConfig
45+
if topics, ok := config[TopicListKey].([]string); ok {
46+
result.Topics = topics
47+
}
48+
if subName, ok := config[SubNameKey].(string); ok {
49+
result.SubName = subName
50+
}
51+
return &result
52+
}
53+
54+
func (c *SourceQueueConfig) ToConfigMap() ConfigMap {
55+
return ConfigMap{
56+
TopicListKey: c.Topics,
57+
SubNameKey: c.SubName,
58+
}
59+
}
60+
61+
func NewSinkQueueConfig(config ConfigMap) *SinkQueueConfig {
62+
var result SinkQueueConfig
63+
if topic, ok := config[TopicKey].(string); ok {
64+
result.Topic = topic
65+
}
66+
return &result
67+
}
68+
69+
func (c *SinkQueueConfig) ToConfigMap() ConfigMap {
70+
return ConfigMap{
71+
TopicKey: c.Topic,
72+
}
73+
}
74+
75+
type ConfigMap map[string]interface{}
76+
77+
type TubeFactory interface {
78+
// NewSourceTube returns a new channel that can be used to receive events
79+
// The channel would be closed when the context is done
80+
NewSourceTube(ctx context.Context, config ConfigMap) (<-chan Record, error)
81+
// NewSinkTube returns a new channel that can be used to sink events
82+
// The event.Commit() would be invoked after the event is sunk successfully
83+
// The caller should close the channel when it is done
84+
NewSinkTube(ctx context.Context, config ConfigMap) (chan<- Record, error)
85+
}
86+
87+
type RecordImpl struct {
88+
payload []byte
89+
commitFunc func()
90+
}
91+
92+
func NewRecordImpl(payload []byte, ackFunc func()) *RecordImpl {
93+
return &RecordImpl{
94+
payload: payload,
95+
commitFunc: ackFunc,
96+
}
97+
}
98+
99+
func (e *RecordImpl) GetPayload() []byte {
100+
return e.payload
101+
}
102+
103+
func (e *RecordImpl) Commit() {
104+
e.commitFunc()
105+
}
Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package lib
17+
package contube
1818

1919
import (
2020
"context"
@@ -24,7 +24,7 @@ import (
2424
)
2525

2626
type queue struct {
27-
c chan Event
27+
c chan Record
2828
refCnt int32
2929
}
3030

@@ -34,14 +34,14 @@ type MemoryQueueFactory struct {
3434
queues map[string]*queue
3535
}
3636

37-
func NewMemoryQueueFactory(ctx context.Context) EventQueueFactory {
37+
func NewMemoryQueueFactory(ctx context.Context) TubeFactory {
3838
return &MemoryQueueFactory{
3939
ctx: ctx,
4040
queues: make(map[string]*queue),
4141
}
4242
}
4343

44-
func (f *MemoryQueueFactory) getOrCreateChan(name string) chan Event {
44+
func (f *MemoryQueueFactory) getOrCreateChan(name string) chan Record {
4545
f.mu.Lock()
4646
defer f.mu.Unlock()
4747
defer func() {
@@ -53,7 +53,7 @@ func (f *MemoryQueueFactory) getOrCreateChan(name string) chan Event {
5353
atomic.AddInt32(&q.refCnt, 1)
5454
return q.c
5555
}
56-
c := make(chan Event, 100)
56+
c := make(chan Record, 100)
5757
f.queues[name] = &queue{
5858
c: c,
5959
refCnt: 1,
@@ -77,8 +77,9 @@ func (f *MemoryQueueFactory) release(name string) {
7777
"name", name)
7878
}
7979

80-
func (f *MemoryQueueFactory) NewSourceChan(ctx context.Context, config *SourceQueueConfig) (<-chan Event, error) {
81-
result := make(chan Event)
80+
func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) {
81+
config := NewSourceQueueConfig(configMap)
82+
result := make(chan Record)
8283
for _, topic := range config.Topics {
8384
t := topic
8485
go func() {
@@ -101,9 +102,10 @@ func (f *MemoryQueueFactory) NewSourceChan(ctx context.Context, config *SourceQu
101102
return result, nil
102103
}
103104

104-
func (f *MemoryQueueFactory) NewSinkChan(ctx context.Context, config *SinkQueueConfig) (chan<- Event, error) {
105+
func (f *MemoryQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMap) (chan<- Record, error) {
106+
config := NewSinkQueueConfig(configMap)
105107
c := f.getOrCreateChan(config.Topic)
106-
wrapperC := make(chan Event)
108+
wrapperC := make(chan Record)
107109
go func() {
108110
defer f.release(config.Topic)
109111
for {
@@ -114,7 +116,7 @@ func (f *MemoryQueueFactory) NewSinkChan(ctx context.Context, config *SinkQueueC
114116
if !ok {
115117
return
116118
}
117-
event.Ack()
119+
event.Commit()
118120
c <- event
119121
}
120122
}

0 commit comments

Comments
 (0)