Skip to content

Commit 679ebca

Browse files
authored
Merge pull request #1 from DumbMachine/feature/impl-get-updates
Minor docs update and added GetUpdates method
2 parents a604cbf + 1ed9932 commit 679ebca

File tree

6 files changed

+169
-87
lines changed

6 files changed

+169
-87
lines changed

README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,63 @@ For successful cancellations it is important that `ProcessingFunc`, the function
3333

3434
# Task Options Walkthrough
3535

36+
## Watch for updates
37+
38+
( Introduced in v0.3 )
39+
40+
Listen for updates to task metadata
41+
42+
```go
43+
func main() {
44+
client := nq.NewPublishClient(nq.NatsClientOpt{
45+
Addr: "nats://127.0.0.1:4222",
46+
}, nq.NoAuthentcation(),
47+
)
48+
49+
defer client.Close()
50+
51+
bytesPayload1, err := json.Marshal(UrlPayload{Url: "https://httpstat.us/200?sleep=10000"})
52+
if err != nil {
53+
panic(err)
54+
}
55+
56+
var wg sync.WaitGroup
57+
task1 := nq.NewTask(QueueDev, bytesPayload1)
58+
if ack, err := client.Enqueue(task1); err == nil {
59+
log.Printf("Watching updates queue=%s taskID=%s payload=%s", ack.Queue, ack.ID, ack.Payload)
60+
wg.Add(1)
61+
updates, err := client.GetUpdates(ack.ID)
62+
if err != nil {
63+
panic(err)
64+
}
65+
// listening for updates
66+
go func() {
67+
defer wg.Done()
68+
69+
for {
70+
msg, ok := <-updates
71+
if !ok {
72+
// channel closed
73+
return
74+
}
75+
log.Printf("Change detected, status=%s", msg.GetStatus())
76+
}
77+
}()
78+
} else {
79+
log.Printf("err=%s", err)
80+
}
81+
wg.Wait()
82+
}
83+
84+
```
85+
86+
```
87+
2022/08/29 22:17:15 Watching updates queue=scrap-url-dev taskID=yzaKwBIcbGEt8sMGgMJcZ0 payload={"url":"https://httpstat.us/200?sleep=10000"}
88+
2022/08/29 22:17:15 Change detected, status=pending
89+
2022/08/29 22:17:16 Change detected, status=processing
90+
2022/08/29 22:17:28 Change detected, status=completed
91+
```
92+
3693
## Retrying
3794

3895
By default `task` is submitted for retry, if it returns non-nil error.

nq.go

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
// Copyright 2022 Ratin Kumar. All rights reserved.
2-
// Use of this source code is governed by a MIT license
3-
// that can be found in the LICENSE file.
4-
51
// nq provides a go package to publish/process tasks via nats
62
package nq
73

@@ -13,20 +9,16 @@ import (
139
"github.com/nats-io/nuid"
1410
)
1511

12+
// Task is a representation work to be performed by a worker
1613
type Task struct {
1714
// Stream subject
1815
queue string
1916
// Payload for task
2017
payload []byte
21-
opts []TaskOption
22-
// Result writer for retention
18+
// Options to configure task processing behavior
19+
opts []TaskOption
2320
}
2421

25-
// !TODO: Remove this later
26-
// func (p *PublishClient) Subscribe(pattern string, handler ProcessingFunc, contextMap ContextStore) error {
27-
// // return p.broker.Subscribe(pattern, handler, contextMap)
28-
// }
29-
3022
// Value zero indicates no timeout and no deadline.
3123
var (
3224
noTimeout time.Duration = 0
@@ -51,22 +43,27 @@ type TaskOption interface {
5143
Value() interface{}
5244
}
5345

54-
// Internal option representations.
5546
type (
5647
retryOption int
5748
taskIDOption string
5849
timeoutOption time.Duration
5950
deadlineOption time.Time
6051
)
6152

53+
// Returns an options to specify maximum number of times a task will be retried before being marked as failed.
54+
//
55+
// -ve retry count is assigned defaultRetry ( 0 )
6256
func Retry(n int) TaskOption {
57+
if n < 0 {
58+
return retryOption(defaultMaxRetry)
59+
}
6360
return retryOption(n)
6461
}
6562
func (n retryOption) String() string { return fmt.Sprintf("MaxRetry(%d)", int(n)) }
6663
func (n retryOption) Type() TaskOptionType { return MaxRetryOpt }
6764
func (n retryOption) Value() interface{} { return int(n) }
6865

69-
// TaskID returns an option to specify the task ID.
66+
// TaskID returns an option to specify the task ID
7067
func TaskID(id string) TaskOption {
7168
return taskIDOption(id)
7269
}
@@ -75,11 +72,11 @@ func (id taskIDOption) String() string { return fmt.Sprintf("TaskID(%q)",
7572
func (id taskIDOption) Type() TaskOptionType { return TaskIDOpt }
7673
func (id taskIDOption) Value() interface{} { return string(id) }
7774

78-
// Timeout returns an option to specify how long a task may run.
75+
// Timeout returns an option to specify how long a task can run before being cancelled.
7976
//
80-
// Zero duration means no limit ( math.MaxInt64 is chosen )
77+
// Zero duration means no limit ( math.MaxInt32 )
8178
//
82-
// If there's a conflicting Deadline option, whichever comes earliest
79+
// If both Deadline and Timeout options are set, whichever comes earliest
8380
// will be used.
8481
func Timeout(d time.Duration) TaskOption {
8582
return timeoutOption(d)
@@ -90,10 +87,8 @@ func (d timeoutOption) Type() TaskOptionType { return TimeoutOpt }
9087
func (d timeoutOption) Value() interface{} { return time.Duration(d) }
9188

9289
// Deadline returns an option to specify the deadline for the given task.
93-
// If it reaches the deadline before the Handler returns, then the task
94-
// will be retried.
9590
//
96-
// If there's a conflicting Timeout option, whichever comes earliest
91+
// If both Deadline and Timeout options are set, whichever comes earliest
9792
// will be used.
9893
func Deadline(t time.Time) TaskOption {
9994
return deadlineOption(t)
@@ -116,7 +111,7 @@ type option struct {
116111
func withDefaultOptions(opts ...TaskOption) (option, error) {
117112
res := option{
118113
timeout: 0,
119-
retry: 0,
114+
retry: defaultMaxRetry,
120115
deadline: time.Time{},
121116
// TODO: store generator per server
122117
taskID: nuid.New().Next(),
@@ -144,7 +139,9 @@ func withDefaultOptions(opts ...TaskOption) (option, error) {
144139
return res, nil
145140
}
146141

147-
// payload is jsonified data of whatever the ProcessingFunc expects
142+
// NewTask returns a new Task given queue and byte payload
143+
//
144+
// TaskOption can be used to configure task processing
148145
func NewTask(queue string, payload []byte, opts ...TaskOption) *Task {
149146
return &Task{
150147
queue: queue,

options.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const (
1313
defaultKVName = "nq-store"
1414
defaultReconnectWait = time.Second * 10
1515
defaultAuthenticationType = NoAuthenticationOpt
16+
defaultMaxRetry = 0
1617
)
1718

1819
var defaultNatsOptions = make([]nats.Option, 0)

publisher.go

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,44 @@ import (
44
"context"
55
"encoding/json"
66
"log"
7-
8-
"github.com/nats-io/nats.go"
97
)
108

119
// Signature for function executed by a worker.
1210
// `ProcessingFunc` type are be registered to subjects, process messages published by client
1311
type ProcessingFunc func(context.Context, *TaskPayload) error
1412

13+
type ListenUpdates struct {
14+
m map[string]chan string
15+
}
16+
17+
type Inspector struct {
18+
broker *NatsBroker
19+
listeners *ListenUpdates
20+
}
21+
22+
func NewInspector(broker *NatsBroker) *Inspector {
23+
return &Inspector{
24+
broker: broker,
25+
listeners: &ListenUpdates{make(map[string]chan string)},
26+
}
27+
}
28+
29+
func (i *Inspector) AddAnother(queue string, sendUpdatesTo chan string) error {
30+
return nil
31+
}
32+
func (i *Inspector) Servers() {}
33+
func (i *Inspector) Queues() {}
34+
35+
// Client responsible for interaction with nq tasks
36+
//
37+
// Client is used to enqueue / cancel tasks or fetch metadata for tasks
1538
type PublishClient struct {
1639
broker *NatsBroker
1740
kv ResultHandlerIFACE
1841
}
1942

43+
// NewPublishClient returns a new Client instance, given nats connection options, to interact with nq tasks
44+
//
2045
func NewPublishClient(config NatsClientOpt, opts ...ClientConnectionOption) *PublishClient {
2146
opt, err := withDefaultClientOptions(opts...)
2247
if err != nil {
@@ -40,12 +65,6 @@ func NewPublishClient(config NatsClientOpt, opts ...ClientConnectionOption) *Pub
4065
return &PublishClient{broker: broker, kv: kv}
4166
}
4267

43-
type PackagePubAck struct {
44-
// ID assigned to published message
45-
ID string
46-
*nats.PubAck
47-
}
48-
4968
func (p *PublishClient) Stats(queue string) error {
5069
return p.broker.Stats(NewQueue(queue))
5170
}
@@ -68,8 +87,7 @@ func (p *PublishClient) publishMessage(msg *TaskMessage) (*TaskMessage, error) {
6887
}
6988
}
7089

71-
// Publish a TaskMessage into a stream
72-
func (p *PublishClient) PublishToSubject(task *Task, opts ...TaskOption) (*TaskMessage, error) {
90+
func (p *PublishClient) publishToSubject(task *Task, opts ...TaskOption) (*TaskMessage, error) {
7391
opts = append(task.opts, opts...)
7492
opt, err := withDefaultOptions(opts...)
7593
if err != nil {
@@ -98,14 +116,29 @@ func (p *PublishClient) PublishToSubject(task *Task, opts ...TaskOption) (*TaskM
98116
return p.publishMessage(taskMessage)
99117
}
100118

119+
// GetUpdates can be used get changes to a task's metadata
120+
//
121+
// Returns error if failed to start watching for changes
122+
// Channel is closed, once task reaches terminal state
123+
func (p *PublishClient) GetUpdates(taskID string) (chan *TaskMessage, error) {
124+
if status, err := p.kv.Watch(taskID); err != nil {
125+
return nil, err
126+
} else {
127+
return status, err
128+
}
129+
}
130+
131+
// Enqueue can be used to enqueu given task to a queue
132+
//
133+
// Returns TaskMessage and nil error is enqueued successfully, else non-nill error
101134
func (p *PublishClient) Enqueue(task *Task, opts ...TaskOption) (*TaskMessage, error) {
102135
q := NewQueue(task.queue)
103136
p.broker.ConnectoQueue(q)
104137

105-
return p.PublishToSubject(task, opts...)
138+
return p.publishToSubject(task, opts...)
106139
}
107140

108-
// Fetch qname from kv store instead
141+
// Cancel sends `cancel` request for given task to workers
109142
func (p *PublishClient) Cancel(id string) error {
110143
if taskInfo, err := p.kv.Get(id); err != nil {
111144
return ErrTaskNotFound
@@ -140,22 +173,31 @@ func (p *PublishClient) CancelInQueue(id string, qname string) error {
140173
return p.cancelInStream(id, q)
141174
}
142175

143-
func (p *PublishClient) createStream(streamName, subject string, policy nats.RetentionPolicy) error {
144-
if err := p.broker.AddStream(nats.StreamConfig{
145-
Name: streamName,
146-
Subjects: []string{subject},
147-
Retention: policy,
148-
}); err != nil {
149-
return err
150-
}
151-
return nil
152-
}
153-
176+
// Delete a queue
177+
//
178+
// Deletes underlying nats stream assosociated with a queue
154179
func (p *PublishClient) DeleteQueue(qname string) {
155180
q := NewQueue(qname)
181+
// delete task stream
156182
if err := p.broker.DeleteStream(q.stream); err != nil {
157183
log.Printf("error deleting stream=%s", q.stream)
158184
}
185+
// delete cancellation stream
186+
if err := p.broker.DeleteStream(q.cancelStream); err != nil {
187+
log.Printf("error deleting stream=%s", q.stream)
188+
}
189+
}
190+
191+
// Fetch fetches TaskMessage for given task
192+
//
193+
func (p *PublishClient) Fetch(id string) (*TaskMessage, error) {
194+
return p.kv.Get(id)
195+
}
196+
197+
// Close closes the connection with nats
198+
func (p *PublishClient) Close() error {
199+
defer p.broker.Close()
200+
return nil
159201
}
160202

161203
func (p *PublishClient) cancelInStream(id string, q *Queue) error {
@@ -174,13 +216,3 @@ func (p *PublishClient) cancelInStream(id string, q *Queue) error {
174216
return nil
175217
}
176218
}
177-
178-
func (p *PublishClient) Fetch(id string) (*TaskMessage, error) {
179-
return p.kv.Get(id)
180-
}
181-
182-
// Also delete stream for cleanup
183-
func (p *PublishClient) Close() error {
184-
defer p.broker.Close()
185-
return nil
186-
}

utils.go

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -49,24 +49,8 @@ func CancelStreamNameToStreamName(stream, subject string) string {
4949

5050
//
5151
//
52-
// Internally `Queue`s represent an abstraction over a nats stream -> subject
52+
// Internal `Queue`s represent an abstraction over a nats stream -> subject
5353
type Queue struct {
54-
// // Name of this queue. User facing
55-
// //
56-
// // In nats terminalogy this corresponds to stream/subject
57-
// name string
58-
59-
// streamName string
60-
61-
// // // Durable name for a stream subscription
62-
// // // Used to maintain sequence number after reconnection to nats-server
63-
// // durableName string
64-
65-
// cancelStreamName string
66-
67-
// // Name of cancel subject
68-
// cancelName string
69-
7054
stream string
7155
subject string
7256
cancelStream string
@@ -79,13 +63,6 @@ func NewQueue(name string) *Queue {
7963
subject: fmt.Sprintf("%s.task", name),
8064
cancelStream: fmt.Sprintf("%s/cancel", name),
8165
cancelSubject: fmt.Sprintf("%s.cancel", name),
82-
// // Stream and subject for recieving tasks
83-
// streamName: fmt.Sprintf("stream/%s", name),
84-
// name: name,
85-
86-
// // Stream and subject for recieving cancellations
87-
// cancelName: fmt.Sprintf("cancel/%s", name),
88-
// cancelStreamName: fmt.Sprintf("stream/cancel/%s", name),
8966
}
9067
}
9168

0 commit comments

Comments
 (0)