Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 122 additions & 110 deletions src/comsock.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015-2021 The NATS Authors
// Copyright 2015-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -152,6 +152,7 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port)
int64_t totalTimeout = 0;
int64_t timeoutPerIP = 0;
struct addrinfo *tmpStorage[64];
bool hasProxyConnectCb = ctx->proxyConnectCb != NULL;

if (phost == NULL)
return nats_setError(NATS_ADDRESS_MISSING, "%s", "No host specified");
Expand All @@ -170,139 +171,150 @@ natsSock_ConnectTcp(natsSockCtx *ctx, const char *phost, int port)

snprintf(sport, sizeof(sport), "%d", port);

if ((ctx->orderIP == 46) || (ctx->orderIP == 64))
max = 2;

start = nats_Now();

for (i=0; i<max; i++)
// Call the proxy connect callback if provided
if (hasProxyConnectCb)
{
struct addrinfo hints;
struct addrinfo *servinfo = NULL;
int count = 0;
struct addrinfo *p;

memset(&hints,0,sizeof(hints));
hints.ai_socktype = SOCK_STREAM;
// Invoke the proxy connect callback.
ctx->fd = ctx->proxyConnectCb(host, port);
if (ctx->fd == NATS_SOCK_INVALID)
s = nats_setError(NATS_SYS_ERROR, "proxy connect socket error: %d", NATS_SOCK_GET_ERROR);
}
else
{
if ((ctx->orderIP == 46) || (ctx->orderIP == 64))
max = 2;

switch (ctx->orderIP)
for (i = 0; i < max; i++)
{
struct addrinfo hints;
struct addrinfo* servinfo = NULL;
int count = 0;
struct addrinfo* p;

memset(&hints, 0, sizeof(hints));
hints.ai_socktype = SOCK_STREAM;

switch (ctx->orderIP)
{
case 4: hints.ai_family = AF_INET; break;
case 6: hints.ai_family = AF_INET6; break;
case 46: hints.ai_family = (i == 0 ? AF_INET : AF_INET6); break;
case 64: hints.ai_family = (i == 0 ? AF_INET6 : AF_INET); break;
default: hints.ai_family = AF_UNSPEC;
}

if ((res = getaddrinfo(host, sport, &hints, &servinfo)) != 0)
{
s = nats_setError(NATS_SYS_ERROR, "getaddrinfo error: %s",
gai_strerror(res));
continue;
}
servInfos[numServInfo] = servinfo;
for (p = servinfo; (p != NULL); p = p->ai_next)
{
count++;
numIPs++;
}
natsSock_ShuffleIPs(ctx, tmpStorage, sizeof(tmpStorage), &(servInfos[numServInfo]), count);
numServInfo++;
}
// If we got a getaddrinfo() and there is no servInfos to try to connect to
// bail out now.
if ((s != NATS_OK) && (numServInfo == 0))
return NATS_UPDATE_ERR_STACK(s);

if ((res = getaddrinfo(host, sport, &hints, &servinfo)) != 0)
// I don't think it can be the case if s == OK and/or numServInfo >= 1...
if (numIPs == 0)
{
s = nats_setError(NATS_SYS_ERROR, "getaddrinfo error: %s",
gai_strerror(res));
continue;
for (i = 0; i < numServInfo; i++)
nats_FreeAddrInfo(servInfos[i]);

return NATS_UPDATE_ERR_STACK(NATS_NO_SERVER);
}
servInfos[numServInfo] = servinfo;
for (p = servinfo; (p != NULL); p = p->ai_next)

// Check if there has been a deadline set.
totalTimeout = natsDeadline_GetTimeout(&(ctx->writeDeadline));
if (totalTimeout > 0)
{
count++;
numIPs++;
// If so, compute a timeout based on the number of IPs we are going
// to possibly try to connect to.
timeoutPerIP = totalTimeout / numIPs;
// If really small, give at least a 10ms timeout
if (timeoutPerIP < 10)
timeoutPerIP = 10;
}
natsSock_ShuffleIPs(ctx, tmpStorage, sizeof(tmpStorage), &(servInfos[numServInfo]), count);
numServInfo++;
}
// If we got a getaddrinfo() and there is no servInfos to try to connect to
// bail out now.
if ((s != NATS_OK) && (numServInfo == 0))
return NATS_UPDATE_ERR_STACK(s);

// I don't think it can be the case if s == OK and/or numServInfo >= 1...
if (numIPs == 0)
{
for (i=0; i<numServInfo; i++)
nats_FreeAddrInfo(servInfos[i]);

return NATS_UPDATE_ERR_STACK(NATS_NO_SERVER);
}

// Check if there has been a deadline set.
totalTimeout = natsDeadline_GetTimeout(&(ctx->writeDeadline));
if (totalTimeout > 0)
{
// If so, compute a timeout based on the number of IPs we are going
// to possibly try to connect to.
timeoutPerIP = totalTimeout / numIPs;
// If really small, give at least a 10ms timeout
if (timeoutPerIP < 10)
timeoutPerIP = 10;
}

for (i=0; i<numServInfo; i++)
{
struct addrinfo *p;
for (i=0; i<numServInfo; i++)
{
struct addrinfo *p;

for (p = servInfos[i]; (p != NULL); p = p->ai_next)
{
ctx->fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (ctx->fd == NATS_SOCK_INVALID)
{
s = nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR);
continue;
}
for (p = servInfos[i]; (p != NULL); p = p->ai_next)
{
ctx->fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (ctx->fd == NATS_SOCK_INVALID)
{
s = nats_setError(NATS_SYS_ERROR, "socket error: %d", NATS_SOCK_GET_ERROR);
continue;
}

// Reset 's' for this loop iteration.
s = NATS_OK;
// Reset 's' for this loop iteration.
s = NATS_OK;

#ifdef SO_NOSIGPIPE
int set = 1;
if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1)
{
s = nats_setError(NATS_SYS_ERROR,
int set = 1;
if (setsockopt(ctx->fd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&set, sizeof(int)) == -1)
{
s = nats_setError(NATS_SYS_ERROR,
"setsockopt SO_NOSIGPIPE error: %d",
NATS_SOCK_GET_ERROR);
}
}
#endif
if (s == NATS_OK)
s = natsSock_SetBlocking(ctx->fd, false);

if (s == NATS_OK)
{
res = connect(ctx->fd, p->ai_addr, (natsSockLen) p->ai_addrlen);
if ((res == NATS_SOCK_ERROR)
&& (NATS_SOCK_GET_ERROR == NATS_SOCK_CONNECT_IN_PROGRESS))
{
if (timeoutPerIP > 0)
natsDeadline_Init(&(ctx->writeDeadline), timeoutPerIP);

s = natsSock_WaitReady(WAIT_FOR_CONNECT, ctx);
if ((s == NATS_OK) && !natsSock_IsConnected(ctx->fd))
s = NATS_TIMEOUT;
}
else if (res == NATS_SOCK_ERROR)
{
s = nats_setDefaultError(NATS_NO_SERVER);
}
}

if (s == NATS_OK)
{
s = natsSock_SetCommonTcpOptions(ctx->fd);
// We have connected OK and completed setting options, so we are done.
if (s == NATS_OK)
break;
}

_closeFd(ctx->fd);
ctx->fd = NATS_SOCK_INVALID;
}
if (s == NATS_OK)
{
// Clear the error stack in case we got errors in the loop until
// being able to successfully connect.
nats_clearLastError();
break;
}
if (s == NATS_OK)
s = natsSock_SetBlocking(ctx->fd, false);

if (s == NATS_OK)
{
res = connect(ctx->fd, p->ai_addr, (natsSockLen) p->ai_addrlen);
if ((res == NATS_SOCK_ERROR)
&& (NATS_SOCK_GET_ERROR == NATS_SOCK_CONNECT_IN_PROGRESS))
{
if (timeoutPerIP > 0)
natsDeadline_Init(&(ctx->writeDeadline), timeoutPerIP);

s = natsSock_WaitReady(WAIT_FOR_CONNECT, ctx);
if ((s == NATS_OK) && !natsSock_IsConnected(ctx->fd))
s = NATS_TIMEOUT;
}
else if (res == NATS_SOCK_ERROR)
{
s = nats_setDefaultError(NATS_NO_SERVER);
}
}

if (s == NATS_OK)
{
s = natsSock_SetCommonTcpOptions(ctx->fd);
// We have connected OK and completed setting options, so we are done.
if (s == NATS_OK)
break;
}

_closeFd(ctx->fd);
ctx->fd = NATS_SOCK_INVALID;
}
if (s == NATS_OK)
{
// Clear the error stack in case we got errors in the loop until
// being able to successfully connect.
nats_clearLastError();
break;
}
}

for (i=0; i<numServInfo; i++)
nats_FreeAddrInfo(servInfos[i]);
}
for (i=0; i<numServInfo; i++)
nats_FreeAddrInfo(servInfos[i]);

// If there was a deadline, reset the deadline with whatever is left.
if (totalTimeout > 0)
Expand Down
5 changes: 4 additions & 1 deletion src/conn.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015-2024 The NATS Authors
// Copyright 2015-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -403,6 +403,9 @@ _createConn(natsConnection *nc)
// Set ctx.noRandomize based on public NoRandomize option.
nc->sockCtx.noRandomize = nc->opts->noRandomize;

// Set the proxy connect callback
nc->sockCtx.proxyConnectCb = nc->opts->proxyConnectCb;

s = natsSock_ConnectTcp(&(nc->sockCtx), nc->cur->url->host, nc->cur->url->port);
if (s == NATS_OK)
nc->sockCtx.fdActive = true;
Expand Down
21 changes: 21 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,14 @@ typedef void (*natsMsgHandler)(
typedef void (*natsConnectionHandler)(
natsConnection *nc, void *closure);

/** \brief Callback used to handle connections via proxy.
*
* This callback is used to handle proxy connection. It is invoked before the
* main connections and return the socket that will be used to connect to the server.
*/
typedef natsSock (*natsProxyConnHandler)(
char* host, int port);

/** \brief Callback used to notify the user of errors encountered while processing
* inbound messages.
*
Expand Down Expand Up @@ -2951,6 +2959,19 @@ natsOptions_SetMaxPendingMsgs(natsOptions *opts, int maxPending);
NATS_EXTERN natsStatus
natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending);

/** \brief Sets the proxy connection handler.
*
* Specifies the callback to invoke for proxy connection returning the socket to use.
*
* @see natsProxyConnHandler
*
* @param opts the pointer to the #natsOptions object.
* @param proxyConnHandler the proxy connection handler callback.
* the callback. `closure` can be `NULL`.
*/
NATS_EXTERN natsStatus
natsOptions_SetProxyConnHandler(natsOptions* opts, natsProxyConnHandler proxyConnHandler);

/** \brief Sets the error handler for asynchronous events.
*
* Specifies the callback to invoke when an asynchronous error
Expand Down
20 changes: 12 additions & 8 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ struct __natsOptions
natsConnectionHandler microClosedCb;
natsErrHandler microAsyncErrCb;

natsProxyConnHandler proxyConnectCb;

int64_t pingInterval;
int maxPingsOut;
int maxPendingMsgs;
Expand Down Expand Up @@ -644,22 +646,24 @@ typedef struct __natsPongList

typedef struct __natsSockCtx
{
natsSock fd;
bool fdActive;
natsSock fd;
bool fdActive;

natsDeadline readDeadline;
natsDeadline writeDeadline;
natsDeadline readDeadline;
natsDeadline writeDeadline;

SSL *ssl;
SSL *ssl;

// This is true when we are using an external event loop (such as libuv).
bool useEventLoop;
bool useEventLoop;

int orderIP; // possible values: 0,4,6,46,64
int orderIP; // possible values: 0,4,6,46,64

// By default, the list of IPs returned by the hostname resolution will
// be shuffled. This option, if `true`, will disable the shuffling.
bool noRandomize;
bool noRandomize;

natsProxyConnHandler proxyConnectCb;

} natsSockCtx;

Expand Down
14 changes: 13 additions & 1 deletion src/opts.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015-2024 The NATS Authors
// Copyright 2015-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -989,6 +989,18 @@ natsOptions_SetMaxPendingBytes(natsOptions* opts, int64_t maxPending)
return NATS_OK;
}

natsStatus
natsOptions_SetProxyConnHandler(natsOptions* opts, natsProxyConnHandler proxyConnHandler)
{
LOCK_AND_CHECK_OPTIONS(opts, 0)

opts->proxyConnectCb = proxyConnHandler;

UNLOCK_OPTS(opts);

return NATS_OK;
}

natsStatus
natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler,
void *closure)
Expand Down
Loading