Skip to content

Commit 704de47

Browse files
authored
Merge pull request #9 from berezovskyi-oleksandr/feature/configurable-batch-size
Make BatchSize configurable
2 parents d316da7 + afc00a3 commit 704de47

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS.
1111
| PluginTagAttribute | attribute name of the message tag | no |
1212
| QueueMessageGroupId | the group id required for fifo queues | fifo-only |
1313
| ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no |
14+
| BatchSize | set amount of messages to be sent in a batch request | yes |
1415

1516
```conf
1617
[SERVICE]
@@ -32,6 +33,7 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS.
3233
Match *
3334
QueueUrl http://aws-sqs-url.com
3435
QueueRegion eu-central-1
36+
BatchSize 10
3537
```
3638

3739
## Installation

out_sqs.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import (
44
"C"
55
"errors"
66
"fmt"
7+
"os"
8+
"strconv"
79
"time"
810
"unsafe"
9-
"os"
1011

1112
"github.com/aws/aws-sdk-go/aws"
1213
"github.com/aws/aws-sdk-go/aws/credentials"
@@ -24,7 +25,7 @@ import (
2425
// integer representation for this plugin log level
2526
// 0 - debug
2627
// 1 - info
27-
// 2 - error
28+
// 2 - error
2829
var sqsOutLogLevel int
2930

3031
// MessageCounter is used for count the current SQS Batch messages
@@ -39,6 +40,7 @@ type sqsConfig struct {
3940
mySQS *sqs.SQS
4041
pluginTagAttribute string
4142
proxyURL string
43+
batchSize int
4244
}
4345

4446
//export FLBPluginRegister
@@ -54,12 +56,14 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
5456
queueMessageGroupID := output.FLBPluginConfigKey(plugin, "QueueMessageGroupId")
5557
pluginTagAttribute := output.FLBPluginConfigKey(plugin, "PluginTagAttribute")
5658
proxyURL := output.FLBPluginConfigKey(plugin, "ProxyUrl")
59+
batchSizeString := output.FLBPluginConfigKey(plugin, "BatchSize")
5760

5861
writeInfoLog(fmt.Sprintf("QueueUrl is: %s", queueURL))
5962
writeInfoLog(fmt.Sprintf("QueueRegion is: %s", queueRegion))
6063
writeInfoLog(fmt.Sprintf("QueueMessageGroupId is: %s", queueMessageGroupID))
6164
writeInfoLog(fmt.Sprintf("pluginTagAttribute is: %s", pluginTagAttribute))
6265
writeInfoLog(fmt.Sprintf("ProxyUrl is: %s", proxyURL))
66+
writeInfoLog(fmt.Sprintf("BatchSize is: %s", batchSizeString))
6367

6468
if queueURL == "" {
6569
writeErrorLog(errors.New("QueueUrl configuration key is mandatory"))
@@ -78,6 +82,12 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
7882
}
7983
}
8084

85+
batchSize, err := strconv.Atoi(batchSizeString)
86+
if err != nil || (0 > batchSize && batchSize > 10) {
87+
writeErrorLog(errors.New("BatchSize should be integer value between 1 and 10"))
88+
return output.FLB_ERROR
89+
}
90+
8191
writeInfoLog("retrieving aws credentials from environment variables")
8292
awsCredentials := credentials.NewEnvCredentials()
8393
var myAWSSession *session.Session
@@ -126,6 +136,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int {
126136
queueMessageGroupID: queueMessageGroupID,
127137
mySQS: sqs.New(myAWSSession),
128138
pluginTagAttribute: pluginTagAttribute,
139+
batchSize: batchSize,
129140
})
130141

131142
return output.FLB_OK
@@ -212,7 +223,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
212223

213224
SqsRecords = append(SqsRecords, sqsRecord)
214225

215-
if MessageCounter == 10 {
226+
if MessageCounter == sqsConf.batchSize {
216227
err := sendBatchToSqs(sqsConf, SqsRecords)
217228

218229
SqsRecords = nil

0 commit comments

Comments
 (0)