From 5ce650d13590ef09a4b86ff6cd03317434b8e086 Mon Sep 17 00:00:00 2001 From: wolfkor Date: Sun, 20 Apr 2025 00:11:02 +0200 Subject: [PATCH 01/11] added proxyConnectCb (#870) --- src/comsock.c | 111 ++++++++++++++++++++++++++++---------------------- src/conn.c | 3 ++ src/nats.h | 21 ++++++++++ src/natsp.h | 20 +++++---- src/opts.c | 11 +++++ 5 files changed, 109 insertions(+), 57 deletions(-) diff --git a/src/comsock.c b/src/comsock.c index 1873be621..f4694358c 100644 --- a/src/comsock.c +++ b/src/comsock.c @@ -152,6 +152,9 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) int64_t totalTimeout = 0; int64_t timeoutPerIP = 0; struct addrinfo *tmpStorage[64]; + bool hasProxyConnectCb = false; + + hasProxyConnectCb = (ctx->proxyConnectCb != NULL); if (phost == NULL) return nats_setError(NATS_ADDRESS_MISSING, "%s", "No host specified"); @@ -170,69 +173,79 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) snprintf(sport, sizeof(sport), "%d", port); - if ((ctx->orderIP == 46) || (ctx->orderIP == 64)) - max = 2; - start = nats_Now(); - - for (i=0; ifd = ctx->proxyConnectCb(host, port); + if (ctx->fd == NATS_SOCK_INVALID) + s = nats_setError(NATS_SYS_ERROR, "proxy socket error: %d", NATS_SOCK_GET_ERROR); + } + else + { + if ((ctx->orderIP == 46) || (ctx->orderIP == 64)) + max = 2; - switch (ctx->orderIP) + for (i = 0; i < max; i++) { + struct addrinfo hints; + struct addrinfo* servinfo = NULL; + int count = 0; + struct addrinfo* p; + + memset(&hints, 0, sizeof(hints)); + hints.ai_socktype = SOCK_STREAM; + + switch (ctx->orderIP) + { case 4: hints.ai_family = AF_INET; break; case 6: hints.ai_family = AF_INET6; break; case 46: hints.ai_family = (i == 0 ? AF_INET : AF_INET6); break; case 64: hints.ai_family = (i == 0 ? AF_INET6 : AF_INET); break; default: hints.ai_family = AF_UNSPEC; + } + + if ((res = getaddrinfo(host, sport, &hints, &servinfo)) != 0) + { + s = nats_setError(NATS_SYS_ERROR, "getaddrinfo error: %s", + gai_strerror(res)); + continue; + } + servInfos[numServInfo] = servinfo; + for (p = servinfo; (p != NULL); p = p->ai_next) + { + count++; + numIPs++; + } + natsSock_ShuffleIPs(ctx, tmpStorage, sizeof(tmpStorage), &(servInfos[numServInfo]), count); + numServInfo++; } + // If we got a getaddrinfo() and there is no servInfos to try to connect to + // bail out now. + if ((s != NATS_OK) && (numServInfo == 0)) + return NATS_UPDATE_ERR_STACK(s); - if ((res = getaddrinfo(host, sport, &hints, &servinfo)) != 0) + // I don't think it can be the case if s == OK and/or numServInfo >= 1... + if (numIPs == 0) { - s = nats_setError(NATS_SYS_ERROR, "getaddrinfo error: %s", - gai_strerror(res)); - continue; + for (i = 0; i < numServInfo; i++) + nats_FreeAddrInfo(servInfos[i]); + + return NATS_UPDATE_ERR_STACK(NATS_NO_SERVER); } - servInfos[numServInfo] = servinfo; - for (p = servinfo; (p != NULL); p = p->ai_next) + + // Check if there has been a deadline set. + totalTimeout = natsDeadline_GetTimeout(&(ctx->writeDeadline)); + if (totalTimeout > 0) { - count++; - numIPs++; + // If so, compute a timeout based on the number of IPs we are going + // to possibly try to connect to. + timeoutPerIP = totalTimeout / numIPs; + // If really small, give at least a 10ms timeout + if (timeoutPerIP < 10) + timeoutPerIP = 10; } - natsSock_ShuffleIPs(ctx, tmpStorage, sizeof(tmpStorage), &(servInfos[numServInfo]), count); - numServInfo++; - } - // If we got a getaddrinfo() and there is no servInfos to try to connect to - // bail out now. - if ((s != NATS_OK) && (numServInfo == 0)) - return NATS_UPDATE_ERR_STACK(s); - - // I don't think it can be the case if s == OK and/or numServInfo >= 1... - if (numIPs == 0) - { - for (i=0; iwriteDeadline)); - if (totalTimeout > 0) - { - // If so, compute a timeout based on the number of IPs we are going - // to possibly try to connect to. - timeoutPerIP = totalTimeout / numIPs; - // If really small, give at least a 10ms timeout - if (timeoutPerIP < 10) - timeoutPerIP = 10; } for (i=0; ifd); // We have connected OK and completed setting options, so we are done. if (s == NATS_OK) - break; + break; } _closeFd(ctx->fd); diff --git a/src/conn.c b/src/conn.c index 376b643d7..a70074aae 100644 --- a/src/conn.c +++ b/src/conn.c @@ -403,6 +403,9 @@ _createConn(natsConnection *nc) // Set ctx.noRandomize based on public NoRandomize option. nc->sockCtx.noRandomize = nc->opts->noRandomize; + //Set the proxy connect callback + nc->sockCtx.proxyConnectCb = nc->opts->proxyConnectCb; + s = natsSock_ConnectTcp(&(nc->sockCtx), nc->cur->url->host, nc->cur->url->port); if (s == NATS_OK) nc->sockCtx.fdActive = true; diff --git a/src/nats.h b/src/nats.h index fd68fc62c..7981ac2bc 100644 --- a/src/nats.h +++ b/src/nats.h @@ -1672,6 +1672,14 @@ typedef void (*natsMsgHandler)( typedef void (*natsConnectionHandler)( natsConnection *nc, void *closure); +/** \brief Callback used to handle connections via proxy. + * + * This callback is used to handle proxy connection. It is invoked before the + * main connections and return the socket that will be used to connect to the server. + */ +typedef natsSock (*natsProxyConnHandler)( + char* host, int port); + /** \brief Callback used to notify the user of errors encountered while processing * inbound messages. * @@ -2951,6 +2959,19 @@ natsOptions_SetMaxPendingMsgs(natsOptions *opts, int maxPending); NATS_EXTERN natsStatus natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending); +/** \brief Sets the proxy connection handler for asynchronous events. + * + * Specifies the callback to invoke for proxy connection. + * + * @see natsProxyConnHandler + * + * @param opts the pointer to the #natsOptions object. + * @param proxyConnHandler the proxy connection handler callback. + * the callback. `closure` can be `NULL`. + */ +NATS_EXTERN natsStatus +natsOptions_SetProxyConnHandler(natsOptions* opts, natsProxyConnHandler proxyConnHandler); + /** \brief Sets the error handler for asynchronous events. * * Specifies the callback to invoke when an asynchronous error diff --git a/src/natsp.h b/src/natsp.h index a750c4c4c..075f35718 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -268,6 +268,8 @@ struct __natsOptions natsConnectionHandler microClosedCb; natsErrHandler microAsyncErrCb; + natsProxyConnHandler proxyConnectCb; + int64_t pingInterval; int maxPingsOut; int maxPendingMsgs; @@ -644,22 +646,24 @@ typedef struct __natsPongList typedef struct __natsSockCtx { - natsSock fd; - bool fdActive; + natsSock fd; + bool fdActive; - natsDeadline readDeadline; - natsDeadline writeDeadline; + natsDeadline readDeadline; + natsDeadline writeDeadline; - SSL *ssl; + SSL *ssl; // This is true when we are using an external event loop (such as libuv). - bool useEventLoop; + bool useEventLoop; - int orderIP; // possible values: 0,4,6,46,64 + int orderIP; // possible values: 0,4,6,46,64 // By default, the list of IPs returned by the hostname resolution will // be shuffled. This option, if `true`, will disable the shuffling. - bool noRandomize; + bool noRandomize; + + natsProxyConnHandler proxyConnectCb; } natsSockCtx; diff --git a/src/opts.c b/src/opts.c index 48c75382c..8ddeb37e1 100644 --- a/src/opts.c +++ b/src/opts.c @@ -989,6 +989,17 @@ natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending) return NATS_OK; } +natsStatus natsOptions_SetProxyConnHandler(natsOptions* opts, natsProxyConnHandler proxyConnHandler) +{ + LOCK_AND_CHECK_OPTIONS(opts, 0); + + opts->proxyConnectCb = proxyConnHandler; + + UNLOCK_OPTS(opts); + + return NATS_OK; +} + natsStatus natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler, void *closure) From 1847437ccee3137b2b404acdce064dc6d7be593a Mon Sep 17 00:00:00 2001 From: wolfkor Date: Sun, 20 Apr 2025 12:48:57 +0200 Subject: [PATCH 02/11] formatting and optimizing (#871) --- src/comsock.c | 133 +++++++++++++++++++++++++------------------------- src/conn.c | 4 +- src/nats.h | 4 +- src/opts.c | 7 +-- 4 files changed, 74 insertions(+), 74 deletions(-) diff --git a/src/comsock.c b/src/comsock.c index f4694358c..18a6530ec 100644 --- a/src/comsock.c +++ b/src/comsock.c @@ -1,4 +1,4 @@ -// Copyright 2015-2021 The NATS Authors +// Copyright 2015-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -152,9 +152,7 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) int64_t totalTimeout = 0; int64_t timeoutPerIP = 0; struct addrinfo *tmpStorage[64]; - bool hasProxyConnectCb = false; - - hasProxyConnectCb = (ctx->proxyConnectCb != NULL); + bool hasProxyConnectCb = ctx->proxyConnectCb != NULL; if (phost == NULL) return nats_setError(NATS_ADDRESS_MISSING, "%s", "No host specified"); @@ -177,10 +175,10 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) // Call the proxy connect callback if provided if (hasProxyConnectCb) { - // Invoke the callback. + // Invoke the proxy connect callback. ctx->fd = ctx->proxyConnectCb(host, port); if (ctx->fd == NATS_SOCK_INVALID) - s = nats_setError(NATS_SYS_ERROR, "proxy socket error: %d", NATS_SOCK_GET_ERROR); + s = nats_setError(NATS_SYS_ERROR, "proxy connect socket error: %d", NATS_SOCK_GET_ERROR); } else { @@ -246,76 +244,77 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) if (timeoutPerIP < 10) timeoutPerIP = 10; } - } - for (i=0; iai_next) - { - ctx->fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); - if (ctx->fd == NATS_SOCK_INVALID) - { - s = nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR); - continue; - } + for (p = servInfos[i]; (p != NULL); p = p->ai_next) + { + ctx->fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (ctx->fd == NATS_SOCK_INVALID) + { + s = nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR); + continue; + } - // Reset 's' for this loop iteration. - s = NATS_OK; + // Reset 's' for this loop iteration. + s = NATS_OK; #ifdef SO_NOSIGPIPE - int set = 1; - if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1) - { - s = nats_setError(NATS_SYS_ERROR, + int set = 1; + if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1) + { + s = nats_setError(NATS_SYS_ERROR, "setsockopt SO_NOSIGPIPE error: %d", NATS_SOCK_GET_ERROR); - } + } #endif - if (s == NATS_OK) - s = natsSock_SetBlocking(ctx->fd, false); - - if (s == NATS_OK) - { - res = connect(ctx->fd, p->ai_addr, (natsSockLen) p->ai_addrlen); - if ((res == NATS_SOCK_ERROR) - && (NATS_SOCK_GET_ERROR == NATS_SOCK_CONNECT_IN_PROGRESS)) - { - if (timeoutPerIP > 0) - natsDeadline_Init(&(ctx->writeDeadline), timeoutPerIP); - - s = natsSock_WaitReady(WAIT_FOR_CONNECT, ctx); - if ((s == NATS_OK) && !natsSock_IsConnected(ctx->fd)) - s = NATS_TIMEOUT; - } - else if (res == NATS_SOCK_ERROR) - { - s = nats_setDefaultError(NATS_NO_SERVER); - } - } - - if (s == NATS_OK) - { - s = natsSock_SetCommonTcpOptions(ctx->fd); - // We have connected OK and completed setting options, so we are done. - if (s == NATS_OK) - break; - } - - _closeFd(ctx->fd); - ctx->fd = NATS_SOCK_INVALID; - } - if (s == NATS_OK) - { - // Clear the error stack in case we got errors in the loop until - // being able to successfully connect. - nats_clearLastError(); - break; - } + if (s == NATS_OK) + s = natsSock_SetBlocking(ctx->fd, false); + + if (s == NATS_OK) + { + res = connect(ctx->fd, p->ai_addr, (natsSockLen) p->ai_addrlen); + if ((res == NATS_SOCK_ERROR) + && (NATS_SOCK_GET_ERROR == NATS_SOCK_CONNECT_IN_PROGRESS)) + { + if (timeoutPerIP > 0) + natsDeadline_Init(&(ctx->writeDeadline), timeoutPerIP); + + s = natsSock_WaitReady(WAIT_FOR_CONNECT, ctx); + if ((s == NATS_OK) && !natsSock_IsConnected(ctx->fd)) + s = NATS_TIMEOUT; + } + else if (res == NATS_SOCK_ERROR) + { + s = nats_setDefaultError(NATS_NO_SERVER); + } + } + + if (s == NATS_OK) + { + s = natsSock_SetCommonTcpOptions(ctx->fd); + // We have connected OK and completed setting options, so we are done. + if (s == NATS_OK) + break; + } + + _closeFd(ctx->fd); + ctx->fd = NATS_SOCK_INVALID; + } + if (s == NATS_OK) + { + // Clear the error stack in case we got errors in the loop until + // being able to successfully connect. + nats_clearLastError(); + break; + } + } + + for (i=0; i 0) diff --git a/src/conn.c b/src/conn.c index a70074aae..79534861a 100644 --- a/src/conn.c +++ b/src/conn.c @@ -1,4 +1,4 @@ -// Copyright 2015-2024 The NATS Authors +// Copyright 2015-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -403,7 +403,7 @@ _createConn(natsConnection *nc) // Set ctx.noRandomize based on public NoRandomize option. nc->sockCtx.noRandomize = nc->opts->noRandomize; - //Set the proxy connect callback + // Set the proxy connect callback nc->sockCtx.proxyConnectCb = nc->opts->proxyConnectCb; s = natsSock_ConnectTcp(&(nc->sockCtx), nc->cur->url->host, nc->cur->url->port); diff --git a/src/nats.h b/src/nats.h index 7981ac2bc..d196afc2b 100644 --- a/src/nats.h +++ b/src/nats.h @@ -2959,9 +2959,9 @@ natsOptions_SetMaxPendingMsgs(natsOptions *opts, int maxPending); NATS_EXTERN natsStatus natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending); -/** \brief Sets the proxy connection handler for asynchronous events. +/** \brief Sets the proxy connection handler. * - * Specifies the callback to invoke for proxy connection. + * Specifies the callback to invoke for proxy connection returning the socket to use. * * @see natsProxyConnHandler * diff --git a/src/opts.c b/src/opts.c index 8ddeb37e1..bb7f973af 100644 --- a/src/opts.c +++ b/src/opts.c @@ -1,4 +1,4 @@ -// Copyright 2015-2024 The NATS Authors +// Copyright 2015-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -989,9 +989,10 @@ natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending) return NATS_OK; } -natsStatus natsOptions_SetProxyConnHandler(natsOptions* opts, natsProxyConnHandler proxyConnHandler) +natsStatus +natsOptions_SetProxyConnHandler(natsOptions* opts, natsProxyConnHandler proxyConnHandler) { - LOCK_AND_CHECK_OPTIONS(opts, 0); + LOCK_AND_CHECK_OPTIONS(opts, 0) opts->proxyConnectCb = proxyConnHandler; From ada276b1e5f563e5172a3f3b16b7cf533e3b2f21 Mon Sep 17 00:00:00 2001 From: wolfkor Date: Sat, 26 Jul 2025 20:20:24 +0200 Subject: [PATCH 03/11] review changes (#871) --- src/comsock.c | 256 ++++++++++++++++++++++++++------------------------ src/conn.c | 6 +- src/nats.h | 10 +- test/test.c | 14 +++ 4 files changed, 153 insertions(+), 133 deletions(-) diff --git a/src/comsock.c b/src/comsock.c index 18a6530ec..523dd149f 100644 --- a/src/comsock.c +++ b/src/comsock.c @@ -134,6 +134,18 @@ natsSock_ShuffleIPs(natsSockCtx *ctx, struct addrinfo **tmp, int tmpSize, struct #define MAX_HOST_NAME (256) +void resetDeadline(natsSockCtx* ctx, int64_t start, int64_t totalTimeout) +{ + // If there was a deadline, reset the deadline with whatever is left. + if (totalTimeout > 0) + { + int64_t used = nats_Now() - start; + int64_t left = totalTimeout - used; + + natsDeadline_Init(&(ctx->writeDeadline), (left > 0 ? left : 0)); + } +} + natsStatus natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) { @@ -172,94 +184,96 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) snprintf(sport, sizeof(sport), "%d", port); start = nats_Now(); + // Call the proxy connect callback if provided if (hasProxyConnectCb) { // Invoke the proxy connect callback. - ctx->fd = ctx->proxyConnectCb(host, port); - if (ctx->fd == NATS_SOCK_INVALID) - s = nats_setError(NATS_SYS_ERROR, "proxy connect socket error: %d", NATS_SOCK_GET_ERROR); - } - else - { - if ((ctx->orderIP == 46) || (ctx->orderIP == 64)) - max = 2; + s = ctx->proxyConnectCb(host, port, &ctx->fd); - for (i = 0; i < max; i++) - { - struct addrinfo hints; - struct addrinfo* servinfo = NULL; - int count = 0; - struct addrinfo* p; + resetDeadline(ctx, start, totalTimeout); - memset(&hints, 0, sizeof(hints)); - hints.ai_socktype = SOCK_STREAM; + return NATS_UPDATE_ERR_STACK(s); + } - switch (ctx->orderIP) - { - case 4: hints.ai_family = AF_INET; break; - case 6: hints.ai_family = AF_INET6; break; - case 46: hints.ai_family = (i == 0 ? AF_INET : AF_INET6); break; - case 64: hints.ai_family = (i == 0 ? AF_INET6 : AF_INET); break; - default: hints.ai_family = AF_UNSPEC; - } + if ((ctx->orderIP == 46) || (ctx->orderIP == 64)) + max = 2; - if ((res = getaddrinfo(host, sport, &hints, &servinfo)) != 0) - { - s = nats_setError(NATS_SYS_ERROR, "getaddrinfo error: %s", - gai_strerror(res)); - continue; - } - servInfos[numServInfo] = servinfo; - for (p = servinfo; (p != NULL); p = p->ai_next) - { - count++; - numIPs++; - } - natsSock_ShuffleIPs(ctx, tmpStorage, sizeof(tmpStorage), &(servInfos[numServInfo]), count); - numServInfo++; - } - // If we got a getaddrinfo() and there is no servInfos to try to connect to - // bail out now. - if ((s != NATS_OK) && (numServInfo == 0)) - return NATS_UPDATE_ERR_STACK(s); + for (i = 0; i < max; i++) + { + struct addrinfo hints; + struct addrinfo* servinfo = NULL; + int count = 0; + struct addrinfo* p; + + memset(&hints, 0, sizeof(hints)); + hints.ai_socktype = SOCK_STREAM; + + switch (ctx->orderIP) + { + case 4: hints.ai_family = AF_INET; break; + case 6: hints.ai_family = AF_INET6; break; + case 46: hints.ai_family = (i == 0 ? AF_INET : AF_INET6); break; + case 64: hints.ai_family = (i == 0 ? AF_INET6 : AF_INET); break; + default: hints.ai_family = AF_UNSPEC; + } + + if ((res = getaddrinfo(host, sport, &hints, &servinfo)) != 0) + { + s = nats_setError(NATS_SYS_ERROR, "getaddrinfo error: %s", + gai_strerror(res)); + continue; + } + servInfos[numServInfo] = servinfo; + for (p = servinfo; (p != NULL); p = p->ai_next) + { + count++; + numIPs++; + } + natsSock_ShuffleIPs(ctx, tmpStorage, sizeof(tmpStorage), &(servInfos[numServInfo]), count); + numServInfo++; + } + // If we got a getaddrinfo() and there is no servInfos to try to connect to + // bail out now. + if ((s != NATS_OK) && (numServInfo == 0)) + return NATS_UPDATE_ERR_STACK(s); - // I don't think it can be the case if s == OK and/or numServInfo >= 1... - if (numIPs == 0) - { - for (i = 0; i < numServInfo; i++) - nats_FreeAddrInfo(servInfos[i]); + // I don't think it can be the case if s == OK and/or numServInfo >= 1... + if (numIPs == 0) + { + for (i = 0; i < numServInfo; i++) + nats_FreeAddrInfo(servInfos[i]); - return NATS_UPDATE_ERR_STACK(NATS_NO_SERVER); - } + return NATS_UPDATE_ERR_STACK(NATS_NO_SERVER); + } - // Check if there has been a deadline set. - totalTimeout = natsDeadline_GetTimeout(&(ctx->writeDeadline)); - if (totalTimeout > 0) - { - // If so, compute a timeout based on the number of IPs we are going - // to possibly try to connect to. - timeoutPerIP = totalTimeout / numIPs; - // If really small, give at least a 10ms timeout - if (timeoutPerIP < 10) - timeoutPerIP = 10; - } + // Check if there has been a deadline set. + totalTimeout = natsDeadline_GetTimeout(&(ctx->writeDeadline)); + if (totalTimeout > 0) + { + // If so, compute a timeout based on the number of IPs we are going + // to possibly try to connect to. + timeoutPerIP = totalTimeout / numIPs; + // If really small, give at least a 10ms timeout + if (timeoutPerIP < 10) + timeoutPerIP = 10; + } - for (i=0; iai_next) - { - ctx->fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); - if (ctx->fd == NATS_SOCK_INVALID) - { - s = nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR); - continue; - } + for (p = servInfos[i]; (p != NULL); p = p->ai_next) + { + ctx->fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (ctx->fd == NATS_SOCK_INVALID) + { + s = nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR); + continue; + } - // Reset 's' for this loop iteration. - s = NATS_OK; + // Reset 's' for this loop iteration. + s = NATS_OK; #ifdef SO_NOSIGPIPE int set = 1; @@ -270,60 +284,52 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) NATS_SOCK_GET_ERROR); } #endif - if (s == NATS_OK) - s = natsSock_SetBlocking(ctx->fd, false); - - if (s == NATS_OK) - { - res = connect(ctx->fd, p->ai_addr, (natsSockLen) p->ai_addrlen); - if ((res == NATS_SOCK_ERROR) - && (NATS_SOCK_GET_ERROR == NATS_SOCK_CONNECT_IN_PROGRESS)) - { - if (timeoutPerIP > 0) - natsDeadline_Init(&(ctx->writeDeadline), timeoutPerIP); - - s = natsSock_WaitReady(WAIT_FOR_CONNECT, ctx); - if ((s == NATS_OK) && !natsSock_IsConnected(ctx->fd)) - s = NATS_TIMEOUT; - } - else if (res == NATS_SOCK_ERROR) - { - s = nats_setDefaultError(NATS_NO_SERVER); - } - } - - if (s == NATS_OK) - { - s = natsSock_SetCommonTcpOptions(ctx->fd); - // We have connected OK and completed setting options, so we are done. - if (s == NATS_OK) - break; - } - - _closeFd(ctx->fd); - ctx->fd = NATS_SOCK_INVALID; - } - if (s == NATS_OK) - { - // Clear the error stack in case we got errors in the loop until - // being able to successfully connect. - nats_clearLastError(); - break; - } - } - - for (i=0; ifd, false); + + if (s == NATS_OK) + { + res = connect(ctx->fd, p->ai_addr, (natsSockLen) p->ai_addrlen); + if ((res == NATS_SOCK_ERROR) + && (NATS_SOCK_GET_ERROR == NATS_SOCK_CONNECT_IN_PROGRESS)) + { + if (timeoutPerIP > 0) + natsDeadline_Init(&(ctx->writeDeadline), timeoutPerIP); + + s = natsSock_WaitReady(WAIT_FOR_CONNECT, ctx); + if ((s == NATS_OK) && !natsSock_IsConnected(ctx->fd)) + s = NATS_TIMEOUT; + } + else if (res == NATS_SOCK_ERROR) + { + s = nats_setDefaultError(NATS_NO_SERVER); + } + } + + if (s == NATS_OK) + { + s = natsSock_SetCommonTcpOptions(ctx->fd); + // We have connected OK and completed setting options, so we are done. + if (s == NATS_OK) + break; + } + + _closeFd(ctx->fd); + ctx->fd = NATS_SOCK_INVALID; + } + if (s == NATS_OK) + { + // Clear the error stack in case we got errors in the loop until + // being able to successfully connect. + nats_clearLastError(); + break; + } } - // If there was a deadline, reset the deadline with whatever is left. - if (totalTimeout > 0) - { - int64_t used = nats_Now() - start; - int64_t left = totalTimeout - used; + for (i=0; iwriteDeadline), (left > 0 ? left : 0)); - } + resetDeadline(ctx, start, totalTimeout); return NATS_UPDATE_ERR_STACK(s); } diff --git a/src/conn.c b/src/conn.c index 79534861a..260d8b816 100644 --- a/src/conn.c +++ b/src/conn.c @@ -402,9 +402,9 @@ _createConn(natsConnection *nc) // Set ctx.noRandomize based on public NoRandomize option. nc->sockCtx.noRandomize = nc->opts->noRandomize; - - // Set the proxy connect callback - nc->sockCtx.proxyConnectCb = nc->opts->proxyConnectCb; + + // Set the proxy connect callback + nc->sockCtx.proxyConnectCb = nc->opts->proxyConnectCb; s = natsSock_ConnectTcp(&(nc->sockCtx), nc->cur->url->host, nc->cur->url->port); if (s == NATS_OK) diff --git a/src/nats.h b/src/nats.h index d196afc2b..8dfb60320 100644 --- a/src/nats.h +++ b/src/nats.h @@ -1674,11 +1674,12 @@ typedef void (*natsConnectionHandler)( /** \brief Callback used to handle connections via proxy. * - * This callback is used to handle proxy connection. It is invoked before the - * main connections and return the socket that will be used to connect to the server. + * This callback is used to handle connections via proxy. + * It creates a socket, use it for proxy verification, and return it in `fd` + * to be used for the bus connection. */ -typedef natsSock (*natsProxyConnHandler)( - char* host, int port); +typedef natsStatus (*natsProxyConnHandler)( + char* host, int port, natsSock* fd); /** \brief Callback used to notify the user of errors encountered while processing * inbound messages. @@ -2967,7 +2968,6 @@ natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending); * * @param opts the pointer to the #natsOptions object. * @param proxyConnHandler the proxy connection handler callback. - * the callback. `closure` can be `NULL`. */ NATS_EXTERN natsStatus natsOptions_SetProxyConnHandler(natsOptions* opts, natsProxyConnHandler proxyConnHandler); diff --git a/test/test.c b/test/test.c index 0c1a73088..ad76f29b8 100644 --- a/test/test.c +++ b/test/test.c @@ -2601,6 +2601,12 @@ _dummyTokenHandler(void *closure) return "token"; } +static natsStatus +_dummyProxyConnHandler(char* host, int port, natsSock* fd) +{ + return NATS_SOCK_ERROR; +} + static void _dummyErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure) @@ -2896,6 +2902,14 @@ void test_natsOptions(void) s = natsOptions_SetMaxPendingBytes(opts, 1000000); testCond((s == NATS_OK) && (opts->maxPendingBytes == 1000000)) + test("Set Proxy Connection Handler: ") + s = natsOptions_SetProxyConnHandler(opts, _dummyProxyConnHandler); + testCond(s == NATS_OK && opts->proxyConnectCb == _dummyProxyConnHandler) + + test("Remove Proxy Connection Handler: ") + s = natsOptions_SetProxyConnHandler(opts, NULL); + testCond(s == NATS_OK && opts->proxyConnectCb == NULL) + test("Set Error Handler: "); s = natsOptions_SetErrorHandler(opts, _dummyErrHandler, NULL); testCond((s == NATS_OK) && (opts->asyncErrCb == _dummyErrHandler)); From cc7cdd4644758443beb4cc735f0f20ebdd92b86e Mon Sep 17 00:00:00 2001 From: wolfkor Date: Sat, 26 Jul 2025 20:43:56 +0200 Subject: [PATCH 04/11] formatting (#871) --- src/comsock.c | 70 ++++++++++++++++++++++++++------------------------- test/test.c | 4 +-- 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/src/comsock.c b/src/comsock.c index 523dd149f..76fff90fa 100644 --- a/src/comsock.c +++ b/src/comsock.c @@ -137,8 +137,8 @@ natsSock_ShuffleIPs(natsSockCtx *ctx, struct addrinfo **tmp, int tmpSize, struct void resetDeadline(natsSockCtx* ctx, int64_t start, int64_t totalTimeout) { // If there was a deadline, reset the deadline with whatever is left. - if (totalTimeout > 0) - { + if (totalTimeout > 0) + { int64_t used = nats_Now() - start; int64_t left = totalTimeout - used; @@ -200,38 +200,40 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) max = 2; for (i = 0; i < max; i++) - { - struct addrinfo hints; - struct addrinfo* servinfo = NULL; - int count = 0; - struct addrinfo* p; - - memset(&hints, 0, sizeof(hints)); - hints.ai_socktype = SOCK_STREAM; - - switch (ctx->orderIP) - { - case 4: hints.ai_family = AF_INET; break; - case 6: hints.ai_family = AF_INET6; break; - case 46: hints.ai_family = (i == 0 ? AF_INET : AF_INET6); break; - case 64: hints.ai_family = (i == 0 ? AF_INET6 : AF_INET); break; - default: hints.ai_family = AF_UNSPEC; - } - - if ((res = getaddrinfo(host, sport, &hints, &servinfo)) != 0) - { - s = nats_setError(NATS_SYS_ERROR, "getaddrinfo error: %s", - gai_strerror(res)); - continue; - } - servInfos[numServInfo] = servinfo; - for (p = servinfo; (p != NULL); p = p->ai_next) - { - count++; - numIPs++; - } - natsSock_ShuffleIPs(ctx, tmpStorage, sizeof(tmpStorage), &(servInfos[numServInfo]), count); - numServInfo++; + { + struct addrinfo hints; + struct addrinfo* servinfo = NULL; + int count = 0; + struct addrinfo* p; + + memset(&hints, 0, sizeof(hints)); + hints.ai_socktype = SOCK_STREAM; + + switch (ctx->orderIP) + { + case 4: hints.ai_family = AF_INET; break; + case 6: hints.ai_family = AF_INET6; break; + case 46: hints.ai_family = (i == 0 ? AF_INET : AF_INET6); break; + case 64: hints.ai_family = (i == 0 ? AF_INET6 : AF_INET); break; + default: hints.ai_family = AF_UNSPEC; + } + + if ((res = getaddrinfo(host, sport, &hints, &servinfo)) != 0) + { + s = nats_setError(NATS_SYS_ERROR, "getaddrinfo error: %s", gai_strerror(res)); + continue; + } + + servInfos[numServInfo] = servinfo; + + for (p = servinfo; (p != NULL); p = p->ai_next) + { + count++; + numIPs++; + } + + natsSock_ShuffleIPs(ctx, tmpStorage, sizeof(tmpStorage), &(servInfos[numServInfo]), count); + numServInfo++; } // If we got a getaddrinfo() and there is no servInfos to try to connect to // bail out now. diff --git a/test/test.c b/test/test.c index ad76f29b8..9f55d5768 100644 --- a/test/test.c +++ b/test/test.c @@ -2901,8 +2901,8 @@ void test_natsOptions(void) test("Set Max Pending Bytes : "); s = natsOptions_SetMaxPendingBytes(opts, 1000000); testCond((s == NATS_OK) && (opts->maxPendingBytes == 1000000)) - - test("Set Proxy Connection Handler: ") + + test("Set Proxy Connection Handler: ") s = natsOptions_SetProxyConnHandler(opts, _dummyProxyConnHandler); testCond(s == NATS_OK && opts->proxyConnectCb == _dummyProxyConnHandler) From 8bafa679b7f9a432afe732e9416b46584717ff70 Mon Sep 17 00:00:00 2001 From: wolfkor Date: Sat, 26 Jul 2025 21:05:52 +0200 Subject: [PATCH 05/11] formatting (#871) --- src/comsock.c | 152 +++++++++++++++++++++++++------------------------- 1 file changed, 76 insertions(+), 76 deletions(-) diff --git a/src/comsock.c b/src/comsock.c index 76fff90fa..2e00d2cbb 100644 --- a/src/comsock.c +++ b/src/comsock.c @@ -237,95 +237,95 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) } // If we got a getaddrinfo() and there is no servInfos to try to connect to // bail out now. - if ((s != NATS_OK) && (numServInfo == 0)) - return NATS_UPDATE_ERR_STACK(s); + if ((s != NATS_OK) && (numServInfo == 0)) + return NATS_UPDATE_ERR_STACK(s); // I don't think it can be the case if s == OK and/or numServInfo >= 1... if (numIPs == 0) - { - for (i = 0; i < numServInfo; i++) - nats_FreeAddrInfo(servInfos[i]); - - return NATS_UPDATE_ERR_STACK(NATS_NO_SERVER); + { + for (i = 0; i < numServInfo; i++) + nats_FreeAddrInfo(servInfos[i]); + + return NATS_UPDATE_ERR_STACK(NATS_NO_SERVER); } // Check if there has been a deadline set. totalTimeout = natsDeadline_GetTimeout(&(ctx->writeDeadline)); if (totalTimeout > 0) - { - // If so, compute a timeout based on the number of IPs we are going - // to possibly try to connect to. - timeoutPerIP = totalTimeout / numIPs; - // If really small, give at least a 10ms timeout - if (timeoutPerIP < 10) - timeoutPerIP = 10; + { + // If so, compute a timeout based on the number of IPs we are going + // to possibly try to connect to. + timeoutPerIP = totalTimeout / numIPs; + // If really small, give at least a 10ms timeout + if (timeoutPerIP < 10) + timeoutPerIP = 10; } for (i=0; iai_next) - { - ctx->fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); - if (ctx->fd == NATS_SOCK_INVALID) - { - s = nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR); - continue; - } - - // Reset 's' for this loop iteration. - s = NATS_OK; + { + struct addrinfo *p; + + for (p = servInfos[i]; (p != NULL); p = p->ai_next) + { + ctx->fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (ctx->fd == NATS_SOCK_INVALID) + { + s = nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR); + continue; + } + + // Reset 's' for this loop iteration. + s = NATS_OK; #ifdef SO_NOSIGPIPE - int set = 1; - if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1) - { - s = nats_setError(NATS_SYS_ERROR, - "setsockopt SO_NOSIGPIPE error: %d", - NATS_SOCK_GET_ERROR); - } -#endif - if (s == NATS_OK) - s = natsSock_SetBlocking(ctx->fd, false); - - if (s == NATS_OK) - { - res = connect(ctx->fd, p->ai_addr, (natsSockLen) p->ai_addrlen); - if ((res == NATS_SOCK_ERROR) - && (NATS_SOCK_GET_ERROR == NATS_SOCK_CONNECT_IN_PROGRESS)) - { - if (timeoutPerIP > 0) - natsDeadline_Init(&(ctx->writeDeadline), timeoutPerIP); - - s = natsSock_WaitReady(WAIT_FOR_CONNECT, ctx); - if ((s == NATS_OK) && !natsSock_IsConnected(ctx->fd)) - s = NATS_TIMEOUT; - } - else if (res == NATS_SOCK_ERROR) - { - s = nats_setDefaultError(NATS_NO_SERVER); - } - } - - if (s == NATS_OK) - { - s = natsSock_SetCommonTcpOptions(ctx->fd); - // We have connected OK and completed setting options, so we are done. - if (s == NATS_OK) - break; - } - - _closeFd(ctx->fd); - ctx->fd = NATS_SOCK_INVALID; - } - if (s == NATS_OK) - { - // Clear the error stack in case we got errors in the loop until - // being able to successfully connect. - nats_clearLastError(); - break; - } + int set = 1; + if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1) + { + s = nats_setError(NATS_SYS_ERROR, + "setsockopt SO_NOSIGPIPE error: %d", + NATS_SOCK_GET_ERROR); + } +#endif + if (s == NATS_OK) + s = natsSock_SetBlocking(ctx->fd, false); + + if (s == NATS_OK) + { + res = connect(ctx->fd, p->ai_addr, (natsSockLen) p->ai_addrlen); + if ((res == NATS_SOCK_ERROR) + && (NATS_SOCK_GET_ERROR == NATS_SOCK_CONNECT_IN_PROGRESS)) + { + if (timeoutPerIP > 0) + natsDeadline_Init(&(ctx->writeDeadline), timeoutPerIP); + + s = natsSock_WaitReady(WAIT_FOR_CONNECT, ctx); + if ((s == NATS_OK) && !natsSock_IsConnected(ctx->fd)) + s = NATS_TIMEOUT; + } + else if (res == NATS_SOCK_ERROR) + { + s = nats_setDefaultError(NATS_NO_SERVER); + } + } + + if (s == NATS_OK) + { + s = natsSock_SetCommonTcpOptions(ctx->fd); + // We have connected OK and completed setting options, so we are done. + if (s == NATS_OK) + break; + } + + _closeFd(ctx->fd); + ctx->fd = NATS_SOCK_INVALID; + } + if (s == NATS_OK) + { + // Clear the error stack in case we got errors in the loop until + // being able to successfully connect. + nats_clearLastError(); + break; + } } for (i=0; i Date: Sat, 26 Jul 2025 21:16:52 +0200 Subject: [PATCH 06/11] formatting (#871) --- src/comsock.c | 52 +++++++++++++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/src/comsock.c b/src/comsock.c index 2e00d2cbb..ea4e9bc6f 100644 --- a/src/comsock.c +++ b/src/comsock.c @@ -237,13 +237,13 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) } // If we got a getaddrinfo() and there is no servInfos to try to connect to // bail out now. - if ((s != NATS_OK) && (numServInfo == 0)) + if ((s != NATS_OK) && (numServInfo == 0)) return NATS_UPDATE_ERR_STACK(s); // I don't think it can be the case if s == OK and/or numServInfo >= 1... if (numIPs == 0) { - for (i = 0; i < numServInfo; i++) + for (i = 0; i < numServInfo; i++) nats_FreeAddrInfo(servInfos[i]); return NATS_UPDATE_ERR_STACK(NATS_NO_SERVER); @@ -252,39 +252,39 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) // Check if there has been a deadline set. totalTimeout = natsDeadline_GetTimeout(&(ctx->writeDeadline)); if (totalTimeout > 0) - { - // If so, compute a timeout based on the number of IPs we are going - // to possibly try to connect to. - timeoutPerIP = totalTimeout / numIPs; - // If really small, give at least a 10ms timeout - if (timeoutPerIP < 10) + { + // If so, compute a timeout based on the number of IPs we are going + // to possibly try to connect to. + timeoutPerIP = totalTimeout / numIPs; + // If really small, give at least a 10ms timeout + if (timeoutPerIP < 10) timeoutPerIP = 10; } for (i=0; iai_next) - { - ctx->fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); - if (ctx->fd == NATS_SOCK_INVALID) - { - s = nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR); - continue; + for (p = servInfos[i]; (p != NULL); p = p->ai_next) + { + ctx->fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (ctx->fd == NATS_SOCK_INVALID) + { + s = nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR); + continue; } - // Reset 's' for this loop iteration. + // Reset 's' for this loop iteration. s = NATS_OK; -#ifdef SO_NOSIGPIPE - int set = 1; - if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1) +#ifdef SO_NOSIGPIPE + int set = 1; + if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1) { - s = nats_setError(NATS_SYS_ERROR, - "setsockopt SO_NOSIGPIPE error: %d", - NATS_SOCK_GET_ERROR); - } + s = nats_setError(NATS_SYS_ERROR, + "setsockopt SO_NOSIGPIPE error: %d", + NATS_SOCK_GET_ERROR); + } #endif if (s == NATS_OK) s = natsSock_SetBlocking(ctx->fd, false); @@ -297,7 +297,7 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) { if (timeoutPerIP > 0) natsDeadline_Init(&(ctx->writeDeadline), timeoutPerIP); - + s = natsSock_WaitReady(WAIT_FOR_CONNECT, ctx); if ((s == NATS_OK) && !natsSock_IsConnected(ctx->fd)) s = NATS_TIMEOUT; @@ -322,7 +322,7 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) if (s == NATS_OK) { // Clear the error stack in case we got errors in the loop until - // being able to successfully connect. + // being able to successfully connect. nats_clearLastError(); break; } From fc7b526cb1016b5e361c88fabefe7f575b1b4202 Mon Sep 17 00:00:00 2001 From: wolfkor Date: Sat, 26 Jul 2025 21:19:50 +0200 Subject: [PATCH 07/11] formatting (#871) --- src/comsock.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/comsock.c b/src/comsock.c index ea4e9bc6f..db7dc38a1 100644 --- a/src/comsock.c +++ b/src/comsock.c @@ -242,10 +242,10 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) // I don't think it can be the case if s == OK and/or numServInfo >= 1... if (numIPs == 0) - { - for (i = 0; i < numServInfo; i++) + { + for (i=0; i Date: Sat, 26 Jul 2025 21:27:27 +0200 Subject: [PATCH 08/11] formatting (#871) --- src/comsock.c | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/comsock.c b/src/comsock.c index db7dc38a1..86562e5ff 100644 --- a/src/comsock.c +++ b/src/comsock.c @@ -224,14 +224,12 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) continue; } - servInfos[numServInfo] = servinfo; - + servInfos[numServInfo] = servinfo; for (p = servinfo; (p != NULL); p = p->ai_next) { count++; numIPs++; - } - + } natsSock_ShuffleIPs(ctx, tmpStorage, sizeof(tmpStorage), &(servInfos[numServInfo]), count); numServInfo++; } @@ -273,20 +271,20 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) s = nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR); continue; } - + // Reset 's' for this loop iteration. s = NATS_OK; -#ifdef SO_NOSIGPIPE +#ifdef SO_NOSIGPIPE int set = 1; - if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1) - { + if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1) + { s = nats_setError(NATS_SYS_ERROR, "setsockopt SO_NOSIGPIPE error: %d", NATS_SOCK_GET_ERROR); } -#endif - if (s == NATS_OK) +#endif + if (s == NATS_OK) s = natsSock_SetBlocking(ctx->fd, false); if (s == NATS_OK) @@ -329,7 +327,7 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) } for (i=0; i Date: Sat, 26 Jul 2025 21:30:53 +0200 Subject: [PATCH 09/11] formatting (#871) --- src/comsock.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/comsock.c b/src/comsock.c index 86562e5ff..497a60142 100644 --- a/src/comsock.c +++ b/src/comsock.c @@ -277,11 +277,11 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) #ifdef SO_NOSIGPIPE int set = 1; - if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1) + if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1) { s = nats_setError(NATS_SYS_ERROR, - "setsockopt SO_NOSIGPIPE error: %d", - NATS_SOCK_GET_ERROR); + "setsockopt SO_NOSIGPIPE error: %d", + NATS_SOCK_GET_ERROR); } #endif if (s == NATS_OK) From 417531d37e0223ee7351a445af1d01c35ccfdf64 Mon Sep 17 00:00:00 2001 From: wolfkor Date: Sat, 26 Jul 2025 21:33:20 +0200 Subject: [PATCH 10/11] formatting (#871) --- src/comsock.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/comsock.c b/src/comsock.c index 497a60142..1d63e7314 100644 --- a/src/comsock.c +++ b/src/comsock.c @@ -280,8 +280,8 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port) if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1) { s = nats_setError(NATS_SYS_ERROR, - "setsockopt SO_NOSIGPIPE error: %d", - NATS_SOCK_GET_ERROR); + "setsockopt SO_NOSIGPIPE error: %d", + NATS_SOCK_GET_ERROR); } #endif if (s == NATS_OK) From 7a43d40ba91461493fa00cde99edc3316d9465c6 Mon Sep 17 00:00:00 2001 From: wolfkor Date: Sun, 27 Jul 2025 13:10:07 +0200 Subject: [PATCH 11/11] Test ProxyConnectCb (#871) --- test/list_test.txt | 1 + test/test.c | 74 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/test/list_test.txt b/test/list_test.txt index df12951c1..95518947c 100644 --- a/test/list_test.txt +++ b/test/list_test.txt @@ -220,6 +220,7 @@ _test(PingReconnect) _test(ProcessMsgArgs) _test(ProperFalloutAfterMaxAttempts) _test(ProperReconnectDelay) +_test(ProxyConnectCb) _test(PublishMsg) _test(PubSubWithReply) _test(QueueSubscriber) diff --git a/test/test.c b/test/test.c index 15e9b5254..d8faadf1d 100644 --- a/test/test.c +++ b/test/test.c @@ -5966,6 +5966,80 @@ void test_ReconnectServerStats(void) _destroyDefaultThreadArgs(&args); } + +static natsStatus +_proxyConnectCb(char* host, int port, natsSock* pSock) +{ + natsStatus s = NATS_OK; + SOCKADDR_IN sockaddr_in; + + natsSock fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + *pSock = fd; + if (fd == NATS_SOCK_INVALID) + return nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR); + + // In case of an existing proxy scenario we would use the proxy host and port as the socket address + // and send a connect request. If the connect request is successful (i.e. 200 Connection established), + // it is ready for use. + // For this test, we will just connect to the server directly. + + memset(&sockaddr_in, 0, sizeof(sockaddr_in)); + sockaddr_in.sin_family = AF_INET; + sockaddr_in.sin_port = htons((unsigned short)port); + inet_pton(AF_INET, host, &sockaddr_in.sin_addr); + + struct sockaddr* sockaddr = (struct sockaddr*)&sockaddr_in; + socklen_t socklen = (socklen_t)sizeof(sockaddr_in); + + s = natsSock_SetBlocking(fd, false); + if (s == NATS_OK) + { + int ret = connect(fd, sockaddr, socklen); + if (ret == NATS_SOCK_ERROR) + return nats_setDefaultError(NATS_NO_SERVER); + + s = natsSock_SetCommonTcpOptions(fd); + } + + return s; +} + +void test_ProxyConnectCb(void) +{ + natsStatus s; + natsConnection* nc = NULL; + natsOptions* opts = NULL; + natsPid serverPid = NATS_INVALID_PID; + struct threadArg args; + + test("ProxyConnectCb Fail: ") + + s = _createDefaultThreadArgsForCbTests(&args); + if (s != NATS_OK) + FAIL("Unable to setup test") + + serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); + CHECK_SERVER_STARTED(serverPid) + + s = natsOptions_Create(&opts); + IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL)) + IFOK(s, natsOptions_SetProxyConnHandler(opts, _dummyProxyConnHandler)) + + s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + if (s == NATS_OK) + FAIL("Test not failed!") + + test("ProxyConnectCb OK: ") + + IFOK(s, natsOptions_SetProxyConnHandler(opts, _proxyConnectCb)) + s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + + if (s != NATS_OK) + FAIL("Test failed!") + + _stopServer(serverPid); +} + static void _disconnectedCb(natsConnection *nc, void *closure) {