Skip to content

Commit a2855d2

Browse files
committed
out_azure_blob: add zstd compression support
Signed-off-by: Nico Berlee <[email protected]>
1 parent f4108db commit a2855d2

File tree

8 files changed

+406
-32
lines changed

8 files changed

+406
-32
lines changed

plugins/out_azure_blob/azure_blob.c

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
#include <fluent-bit/flb_pack.h>
2525
#include <fluent-bit/flb_config_map.h>
2626
#include <fluent-bit/flb_gzip.h>
27+
#include <fluent-bit/flb_compression.h>
28+
#include <fluent-bit/flb_zstd.h>
2729
#include <fluent-bit/flb_base64.h>
2830
#include <fluent-bit/flb_sqldb.h>
2931
#include <fluent-bit/flb_input_blob.h>
@@ -136,7 +138,7 @@ static int construct_request_buffer(struct flb_azure_blob *ctx, flb_sds_t new_da
136138
}
137139
body = buffered_data = tmp;
138140
memcpy(body + buffer_size, new_data, flb_sds_len(new_data));
139-
if (ctx->compress_gzip == FLB_FALSE){
141+
if (ctx->compression == FLB_COMPRESSION_ALGORITHM_NONE) {
140142
body[body_size] = '\0';
141143
}
142144
}
@@ -149,6 +151,38 @@ static int construct_request_buffer(struct flb_azure_blob *ctx, flb_sds_t new_da
149151
return 0;
150152
}
151153

154+
/*
155+
* Compress a payload using the configured algorithm. Returns 0 on success and
156+
* negative on failure so callers can gracefully fall back to sending the raw
157+
* payload.
158+
*/
159+
static int azure_blob_compress_payload(int algorithm,
160+
void *in_data, size_t in_len,
161+
void **out_data, size_t *out_len)
162+
{
163+
if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
164+
return flb_gzip_compress(in_data, in_len, out_data, out_len);
165+
}
166+
else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
167+
return flb_zstd_compress(in_data, in_len, out_data, out_len);
168+
}
169+
170+
return -1;
171+
}
172+
173+
/* Map a compression algorithm to its human-friendly label for logs. */
174+
static const char *azure_blob_compression_name(int algorithm)
175+
{
176+
if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
177+
return "gzip";
178+
}
179+
else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
180+
return "zstd";
181+
}
182+
183+
return "unknown";
184+
}
185+
152186
void generate_random_string_blob(char *str, size_t length)
153187
{
154188
const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
@@ -332,8 +366,12 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx,
332366
{
333367
int ret;
334368
int compressed = FLB_FALSE;
335-
int content_encoding = FLB_FALSE;
336-
int content_type = FLB_FALSE;
369+
int content_encoding = AZURE_BLOB_CE_NONE;
370+
int content_type = AZURE_BLOB_CT_NONE;
371+
int compression_algorithm = FLB_COMPRESSION_ALGORITHM_NONE;
372+
int network_compression_algorithm = ctx->compression;
373+
int network_compression_applied = FLB_FALSE;
374+
int blob_compression_applied = FLB_FALSE;
337375
size_t b_sent;
338376
void *payload_buf;
339377
size_t payload_size;
@@ -358,27 +396,62 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx,
358396
payload_buf = data;
359397
payload_size = bytes;
360398

399+
/* Determine compression algorithm */
400+
if (network_compression_algorithm != FLB_COMPRESSION_ALGORITHM_NONE) {
401+
compression_algorithm = network_compression_algorithm;
402+
}
403+
404+
if (ctx->compress_blob == FLB_TRUE) {
405+
if (compression_algorithm == FLB_COMPRESSION_ALGORITHM_NONE) {
406+
compression_algorithm = FLB_COMPRESSION_ALGORITHM_GZIP;
407+
}
408+
}
409+
361410
/* Handle compression requests */
362-
if (ctx->compress_gzip == FLB_TRUE || ctx->compress_blob == FLB_TRUE) {
363-
ret = flb_gzip_compress((void *) data, bytes, &payload_buf, &payload_size);
411+
if (compression_algorithm != FLB_COMPRESSION_ALGORITHM_NONE) {
412+
ret = azure_blob_compress_payload(compression_algorithm,
413+
(void *) data, bytes,
414+
&payload_buf, &payload_size);
364415
if (ret == 0) {
365416
compressed = FLB_TRUE;
417+
if (network_compression_algorithm != FLB_COMPRESSION_ALGORITHM_NONE) {
418+
network_compression_applied = FLB_TRUE;
419+
}
420+
if (ctx->compress_blob == FLB_TRUE) {
421+
blob_compression_applied = FLB_TRUE;
422+
}
366423
}
367424
else {
425+
const char *alg_name;
426+
427+
alg_name = azure_blob_compression_name(compression_algorithm);
368428
flb_plg_warn(ctx->ins,
369-
"cannot gzip payload, disabling compression");
429+
"cannot %s payload, disabling compression",
430+
alg_name);
370431
payload_buf = data;
371432
payload_size = bytes;
433+
compression_algorithm = FLB_COMPRESSION_ALGORITHM_NONE;
372434
}
373435
}
374436

375437
/* set http header flags */
376-
if (ctx->compress_blob == FLB_TRUE) {
438+
if (blob_compression_applied == FLB_TRUE) {
377439
content_encoding = AZURE_BLOB_CE_NONE;
378-
content_type = AZURE_BLOB_CT_GZIP;
440+
441+
if (compression_algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
442+
content_type = AZURE_BLOB_CT_ZSTD;
443+
}
444+
else {
445+
content_type = AZURE_BLOB_CT_GZIP;
446+
}
379447
}
380-
else if (compressed == FLB_TRUE) {
381-
content_encoding = AZURE_BLOB_CE_GZIP;
448+
else if (network_compression_applied == FLB_TRUE) {
449+
if (network_compression_algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
450+
content_encoding = AZURE_BLOB_CE_GZIP;
451+
}
452+
else if (network_compression_algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
453+
content_encoding = AZURE_BLOB_CE_ZSTD;
454+
}
382455
content_type = AZURE_BLOB_CT_JSON;
383456
}
384457

@@ -1783,14 +1856,15 @@ static struct flb_config_map config_map[] = {
17831856
{
17841857
FLB_CONFIG_MAP_STR, "compress", NULL,
17851858
0, FLB_FALSE, 0,
1786-
"Set payload compression in network transfer. Option available is 'gzip'"
1859+
"Set payload compression in network transfer. Options: 'gzip', 'zstd'"
17871860
},
17881861

17891862
{
17901863
FLB_CONFIG_MAP_BOOL, "compress_blob", "false",
17911864
0, FLB_TRUE, offsetof(struct flb_azure_blob, compress_blob),
1792-
"Enable block blob GZIP compression in the final blob file. This option is "
1793-
"not compatible with 'appendblob' block type"
1865+
"Enable block blob compression in the final blob file (defaults to gzip, "
1866+
"uses the 'compress' codec when set). This option is not compatible with "
1867+
"'appendblob' block type"
17941868
},
17951869

17961870
{

plugins/out_azure_blob/azure_blob.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@
3030
#define AZURE_BLOB_CT_NONE 0
3131
#define AZURE_BLOB_CT_JSON 1 /* application/json */
3232
#define AZURE_BLOB_CT_GZIP 2 /* application/gzip */
33+
#define AZURE_BLOB_CT_ZSTD 3 /* application/zstd */
3334

3435
/* Content-Encoding */
3536
#define AZURE_BLOB_CE "Content-Encoding"
3637
#define AZURE_BLOB_CE_NONE 0
3738
#define AZURE_BLOB_CE_GZIP 1 /* gzip */
39+
#define AZURE_BLOB_CE_ZSTD 2 /* zstd */
3840

3941
/* service endpoint */
4042
#define AZURE_ENDPOINT_PREFIX ".blob.core.windows.net"
@@ -54,7 +56,7 @@
5456
struct flb_azure_blob {
5557
int auto_create_container;
5658
int emulator_mode;
57-
int compress_gzip;
59+
int compression;
5860
int compress_blob;
5961
flb_sds_t account_name;
6062
flb_sds_t container_name;

plugins/out_azure_blob/azure_blob_blockblob.c

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <fluent-bit/flb_sds.h>
2424
#include <fluent-bit/flb_hash.h>
2525
#include <fluent-bit/flb_crypto_constants.h>
26+
#include <fluent-bit/flb_compression.h>
2627

2728
#include <math.h>
2829

@@ -31,6 +32,19 @@
3132
#include "azure_blob_uri.h"
3233
#include "azure_blob_http.h"
3334

35+
static const char *azb_blob_extension(struct flb_azure_blob *ctx)
36+
{
37+
if (ctx->compress_blob != FLB_TRUE) {
38+
return "";
39+
}
40+
41+
if (ctx->compression == FLB_COMPRESSION_ALGORITHM_ZSTD) {
42+
return ".zst";
43+
}
44+
45+
return ".gz";
46+
}
47+
3448
flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name)
3549
{
3650
flb_sds_t uri;
@@ -60,7 +74,7 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name,
6074
{
6175
int len;
6276
flb_sds_t uri;
63-
char *ext;
77+
const char *ext;
6478
char *encoded_blockid;
6579

6680
len = strlen(blockid);
@@ -75,12 +89,7 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name,
7589
return NULL;
7690
}
7791

78-
if (ctx->compress_blob == FLB_TRUE) {
79-
ext = ".gz";
80-
}
81-
else {
82-
ext = "";
83-
}
92+
ext = azb_blob_extension(ctx);
8493

8594
if (ctx->path) {
8695
if (ms > 0) {
@@ -114,20 +123,15 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name,
114123
flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx,
115124
char *tag, uint64_t ms, char *str)
116125
{
117-
char *ext;
126+
const char *ext;
118127
flb_sds_t uri;
119128

120129
uri = azb_uri_container(ctx);
121130
if (!uri) {
122131
return NULL;
123132
}
124133

125-
if (ctx->compress_blob == FLB_TRUE) {
126-
ext = ".gz";
127-
}
128-
else {
129-
ext = "";
130-
}
134+
ext = azb_blob_extension(ctx);
131135

132136
if (ctx->path) {
133137
flb_sds_printf(&uri, "/%s/%s.%s.%" PRIu64 "%s?comp=blocklist", ctx->path, tag, str,

plugins/out_azure_blob/azure_blob_conf.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <fluent-bit/flb_output_plugin.h>
2121
#include <fluent-bit/flb_base64.h>
2222
#include <fluent-bit/flb_pack.h>
23+
#include <fluent-bit/flb_compression.h>
2324

2425
#include "azure_blob.h"
2526
#include "azure_blob_conf.h"
@@ -655,12 +656,19 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
655656
return NULL;
656657
}
657658

658-
/* Compress (gzip) */
659+
/* Compress payload over the wire */
659660
tmp = (char *) flb_output_get_property("compress", ins);
660-
ctx->compress_gzip = FLB_FALSE;
661+
ctx->compression = FLB_COMPRESSION_ALGORITHM_NONE;
661662
if (tmp) {
662663
if (strcasecmp(tmp, "gzip") == 0) {
663-
ctx->compress_gzip = FLB_TRUE;
664+
ctx->compression = FLB_COMPRESSION_ALGORITHM_GZIP;
665+
}
666+
else if (strcasecmp(tmp, "zstd") == 0) {
667+
ctx->compression = FLB_COMPRESSION_ALGORITHM_ZSTD;
668+
}
669+
else {
670+
flb_plg_error(ctx->ins, "invalid compress value '%s' (supported: gzip, zstd)", tmp);
671+
return NULL;
664672
}
665673
}
666674

plugins/out_azure_blob/azure_blob_http.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,9 @@ flb_sds_t azb_http_canonical_request(struct flb_azure_blob *ctx,
193193
if (content_encoding == AZURE_BLOB_CE_GZIP) {
194194
encoding = "gzip";
195195
}
196+
else if (content_encoding == AZURE_BLOB_CE_ZSTD) {
197+
encoding = "zstd";
198+
}
196199
else {
197200
encoding = "";
198201
}
@@ -223,6 +226,9 @@ flb_sds_t azb_http_canonical_request(struct flb_azure_blob *ctx,
223226
else if (content_type == AZURE_BLOB_CT_GZIP) {
224227
ctype = "application/gzip";
225228
}
229+
else if (content_type == AZURE_BLOB_CT_ZSTD) {
230+
ctype = "application/zstd";
231+
}
226232

227233
flb_sds_printf(&can_req,
228234
"\n" /* Content-MD5 */
@@ -315,12 +321,22 @@ int azb_http_client_setup(struct flb_azure_blob *ctx, struct flb_http_client *c,
315321
AZURE_BLOB_CT, sizeof(AZURE_BLOB_CT) - 1,
316322
"application/gzip", 16);
317323
}
324+
else if (content_type == AZURE_BLOB_CT_ZSTD) {
325+
flb_http_add_header(c,
326+
AZURE_BLOB_CT, sizeof(AZURE_BLOB_CT) - 1,
327+
"application/zstd", 16);
328+
}
318329

319330
if (content_encoding == AZURE_BLOB_CE_GZIP) {
320331
flb_http_add_header(c,
321332
AZURE_BLOB_CE, sizeof(AZURE_BLOB_CE) - 1,
322333
"gzip", 4);
323334
}
335+
else if (content_encoding == AZURE_BLOB_CE_ZSTD) {
336+
flb_http_add_header(c,
337+
AZURE_BLOB_CE, sizeof(AZURE_BLOB_CE) - 1,
338+
"zstd", 4);
339+
}
324340

325341
/* Azure header: x-ms-blob-type */
326342
if (blob_type == FLB_TRUE) {

plugins/out_azure_blob/azure_blob_http.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ int azb_http_client_setup(struct flb_azure_blob *ctx, struct flb_http_client *c,
3131
flb_sds_t azb_http_canonical_request(struct flb_azure_blob *ctx,
3232
struct flb_http_client *c,
3333
ssize_t content_length,
34-
int content_type);
34+
int content_type,
35+
int content_encoding);
3536

3637
#endif

tests/internal/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ set(UNIT_TESTS_FILES
1818
unit_sizes.c
1919
hashtable.c
2020
http_client.c
21+
azure_blob.c
2122
utils.c
2223
gzip.c
2324
zstd.c

0 commit comments

Comments
 (0)