Skip to content

Commit 6b84432

Browse files
committed
Make body field byte array instead of any type.
1 parent 40f334c commit 6b84432

File tree

14 files changed

+88
-117
lines changed

14 files changed

+88
-117
lines changed

README.md

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,7 @@ Start a new workflow run with provided options.
9595
```go
9696
runID, err := client.Trigger(workflow.TriggerOptions{
9797
Url: "https://your-workflow-endpoint.com/api/process"
98-
Body: map[string]interface{}{
99-
"user_id": 12345,
100-
"action": "purchase",
101-
"items": []string{"item1", "item2", "item3"},
102-
"total_cost": 99.99,
103-
},
98+
Body: []byte("request payload"),
10499
})
105100
if err != nil {
106101
// handle err
@@ -113,9 +108,7 @@ Send a notify message to workflows waiting for a specific event.
113108

114109
```go
115110

116-
messages, err := client.Notify("event-id", map[string]string{
117-
"userId": "testUser",
118-
})
111+
messages, err := client.Notify("event-id", []byte("notify data"))
119112
if err != nil {
120113
// handle err
121114
}

cancel.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (c *Client) cancel(req cancelRequest) (int, error) {
6161
resp, _, err := c.do(requestOptions{
6262
method: http.MethodDelete,
6363
path: []string{"v2", "workflows", "runs"},
64-
body: string(data),
64+
body: data,
6565
header: http.Header{
6666
"Content-Type": []string{"application/json"},
6767
},

client.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package workflow
22

33
import (
4+
"bytes"
45
"encoding/json"
56
"errors"
67
"fmt"
@@ -76,13 +77,13 @@ func NewClientWithEnv() (*Client, error) {
7677
type requestOptions struct {
7778
method string
7879
path []string
79-
body string
80+
body []byte
8081
header http.Header
8182
params url.Values
8283
}
8384

8485
func (c *Client) do(opts requestOptions) ([]byte, int, error) {
85-
request, err := http.NewRequest(opts.method, fmt.Sprintf("%s%s", c.url, strings.Join(opts.path, "/")), strings.NewReader(opts.body))
86+
request, err := http.NewRequest(opts.method, fmt.Sprintf("%s%s", c.url, strings.Join(opts.path, "/")), bytes.NewReader(opts.body))
8687
if err != nil {
8788
return nil, -1, err
8889
}

const.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const (
1010
forwardPrefix = "Upstash-Forward-"
1111

1212
authorizationHeader = "Authorization"
13+
contentTypeHeader = "Content-Type"
1314

1415
initHeader = "Upstash-Workflow-Init"
1516
runIdHeader = "Upstash-Workflow-Runid"

examples/notify/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"encoding/json"
45
"fmt"
56
workflow "workflow-go"
67
)
@@ -12,9 +13,14 @@ func main() {
1213
return
1314
}
1415

15-
notifiedWaiters, err := client.Notify("event-id", map[string]string{
16+
eventData, err := json.Marshal(map[string]string{
1617
"userId": "testUser",
1718
})
19+
if err != nil {
20+
fmt.Printf("failed to marshal request payload: %v\n", err)
21+
return
22+
}
23+
notifiedWaiters, err := client.Notify("event-id", eventData)
1824
if err != nil {
1925
fmt.Printf("failed to notify event: %v\n", err)
2026
return

examples/trigger/main.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"encoding/json"
45
"fmt"
56
workflow "workflow-go"
67
)
@@ -12,19 +13,23 @@ func main() {
1213
return
1314
}
1415

16+
requestPayload, err := json.Marshal(map[string]interface{}{
17+
"user_id": 12345,
18+
"action": "purchase",
19+
"items": []string{"item1", "item2", "item3"},
20+
"total_cost": 99.99,
21+
"timestamp": "2025-04-16T18:05:24+03:00",
22+
})
23+
if err != nil {
24+
return
25+
}
1526
runID, err := client.Trigger(workflow.TriggerOptions{
16-
Url: "https://your-workflow-endpoint.com/api/process",
17-
Body: map[string]interface{}{
18-
"user_id": 12345,
19-
"action": "purchase",
20-
"items": []string{"item1", "item2", "item3"},
21-
"total_cost": 99.99,
22-
"timestamp": "2025-04-16T18:05:24+03:00",
23-
},
27+
Url: "https://your-workflow-endpoint.com/api/process",
28+
Body: requestPayload,
2429
Retries: workflow.Retry(2),
2530
FlowControlKey: "my-flow-control-key",
26-
Rate: workflow.Rate(100),
27-
Parallelism: workflow.Parallelism(100),
31+
Rate: 100,
32+
Parallelism: 100,
2833
})
2934
if err != nil {
3035
fmt.Printf("failed to trigger workflow run: %v\n", err)

logs_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ func TestLogs(t *testing.T) {
1414
for i := 0; i < 3; i++ {
1515
runId, err := client.Trigger(workflow.TriggerOptions{
1616
Url: simpleWorkflowUrl,
17-
Body: map[string]string{
17+
Body: jsonMarshall(t, map[string]string{
1818
"name": "Run Test User",
1919
"id": string(rune(65 + i)),
20-
},
20+
}),
2121
})
2222
assert.NoError(t, err)
2323
assert.NotEmpty(t, runId)
@@ -26,9 +26,9 @@ func TestLogs(t *testing.T) {
2626

2727
longRunId, err := client.Trigger(workflow.TriggerOptions{
2828
Url: longRunningWorkflow,
29-
Body: map[string]string{
29+
Body: jsonMarshall(t, map[string]string{
3030
"name": "Long Running Test",
31-
},
31+
}),
3232
})
3333
assert.NoError(t, err)
3434
assert.NotEmpty(t, longRunId)

notify.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,17 @@
11
package workflow
22

33
import (
4-
"encoding/json"
5-
"fmt"
64
"net/http"
7-
"reflect"
85
)
96

107
// Notify notifies waiters waiting for the given event ID.
11-
// Event data is expected to be either a string or a JSON serializable object.
8+
// Event data is sent to the waiters as is.
129
// It returns the list of waiters that were notified.
13-
func (c *Client) Notify(eventId string, eventData any) ([]NotifyMessage, error) {
14-
body := ""
15-
if reflect.ValueOf(eventData).Kind() != reflect.String {
16-
data, err := json.Marshal(eventData)
17-
if err != nil {
18-
return nil, fmt.Errorf("failed to marshall event data: %w", err)
19-
}
20-
body = string(data)
21-
} else {
22-
body = eventData.(string)
23-
}
10+
func (c *Client) Notify(eventId string, eventData []byte) ([]NotifyMessage, error) {
2411
req := requestOptions{
2512
method: http.MethodPost,
2613
path: []string{"v2", "notify", eventId},
27-
body: body,
14+
body: eventData,
2815
}
2916
resp, _, err := c.do(req)
3017
if err != nil {

notify_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ func TestNotify_WithoutWaiter(t *testing.T) {
1212
client, err := workflow.NewClientWithEnv()
1313
assert.NoError(t, err)
1414

15-
messages, err := client.Notify(uuid.NewString(), uuid.NewString())
15+
messages, err := client.Notify(uuid.NewString(), []byte(uuid.NewString()))
1616
assert.NoError(t, err)
1717
assert.Len(t, messages, 0)
1818
}
@@ -26,15 +26,15 @@ func TestNotify(t *testing.T) {
2626
})
2727

2828
eventId := uuid.NewString()
29-
expectedEventData := map[string]string{
29+
expectedEventData := jsonMarshall(t, map[string]string{
3030
"uuid": uuid.NewString(),
31-
}
31+
})
3232
runId, err := client.Trigger(workflow.TriggerOptions{
3333
Url: waitForEvent,
34-
Body: map[string]any{
34+
Body: jsonMarshall(t, map[string]any{
3535
"eventId": eventId,
3636
"expectedEventData": expectedEventData,
37-
},
37+
}),
3838
})
3939
assert.NoError(t, err)
4040
assert.NotEmpty(t, runId)

trigger.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,17 @@ type TriggerOptions struct {
1010
// Url of the workflow
1111
Url string
1212
// Body is the request payload of the new workflow run.
13-
// It is expected to be either a string or JSON serializable object.
14-
Body any
13+
Body []byte
1514
// RunId is the id of the new workflow run. If not provided, a random id will be generated
1615
RunId string
1716
// Retries is the number of retries if a step fails.
18-
Retries *int
17+
Retries *int
18+
// FlowControlKey is the key used to control the flow of new steps.
1919
FlowControlKey string
20-
Rate *int
21-
Parallelism *int
20+
// Rate is the number of new starting steps per period.
21+
Rate int
22+
// Parallelism defines the maximum number of active steps associated with this FlowControlKey.
23+
Parallelism int
2224
// Header is the custom headers that will be forwarded to the workflow.
2325
Header http.Header
2426
}
@@ -42,20 +44,25 @@ func (o *TriggerOptions) header() http.Header {
4244
header.Add(featureSetHeader, featureInitialBody)
4345
if o.FlowControlKey != "" {
4446
value := ""
45-
if o.Rate != nil {
46-
value += "rate=" + strconv.Itoa(*o.Rate)
47+
givenRate := o.Rate != 0
48+
if givenRate {
49+
value += fmt.Sprintf("rate=%d", o.Rate)
4750
}
48-
if len(value) != 0 {
49-
value += ","
50-
}
51-
if o.Parallelism != nil {
52-
value += "parallelism=" + strconv.Itoa(*o.Parallelism)
51+
if o.Parallelism != 0 {
52+
if givenRate {
53+
value += ","
54+
}
55+
value += fmt.Sprintf("parallelism=%d", o.Parallelism)
5356
}
5457
if len(value) > 0 {
5558
header.Set(flowControlKeyHeader, o.FlowControlKey)
5659
header.Set(flowControlValueHeader, value)
5760
}
5861
}
62+
if contentType := o.Header.Get(contentTypeHeader); contentType != "" {
63+
header.Set(contentTypeHeader, contentType)
64+
o.Header.Del(contentTypeHeader)
65+
}
5966
header.Set(fmt.Sprintf("%s%s", forwardPrefix, sdkVersionHeader), sdkVersion)
6067
for k, v := range o.Header {
6168
for _, vv := range v {
@@ -77,15 +84,11 @@ func (c *Client) Trigger(opts TriggerOptions) (runId string, err error) {
7784
if err = opts.validate(); err != nil {
7885
return "", fmt.Errorf("failed to validate options: %w", err)
7986
}
80-
body, _, err := serializeToStr(opts.Body)
81-
if err != nil {
82-
return "", fmt.Errorf("failed to serialize body: %w", err)
83-
}
8487
header := opts.header()
8588
req := requestOptions{
8689
method: http.MethodPost,
8790
path: []string{"v2", "publish", opts.Url},
88-
body: body,
91+
body: opts.Body,
8992
header: header,
9093
}
9194
resp, _, err := c.do(req)

0 commit comments

Comments
 (0)