Skip to content

Commit b916b40

Browse files
committed
Fix various issues with RMQ publish confirmations
* resolve bug where an ACK was being consumed by our "blocking" check * fix bug where integrity hash PDUs were not being included in the set of unconfirmed publishes * reduce ACK fetching timeout to a single second * fix bug where saved message counts were not properly reset if all of the messages of that type were ACKed but messages of another type remained unACKed. * do not destroy the RMQ socket if the ACK check times out, as we can just try again later * fix segfault due to continuing to process data or timer events after another event had caused us to tear down the RMQ connection
1 parent 2b502ab commit b916b40

File tree

4 files changed

+63
-88
lines changed

4 files changed

+63
-88
lines changed

src/mediator/coll_recv_thread.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@
3636
#include "med_epoll.h"
3737
#include "etsili_core.h"
3838

39+
#define COLL_OUTSTANDING_PUB_CONFIRMS(col) \
40+
(col->saved_iri_msg_cnt > 0 || col->saved_cc_msg_cnt > 0 || \
41+
col->saved_raw_msg_cnt > 0)
42+
3943
/** This file implements a "collector receive" thread for the OpenLI mediator.
4044
* Each OpenLI collector that reports to a mediator will be handled using
4145
* a separate instance of one of these threads.
@@ -1116,6 +1120,7 @@ static int receive_collector(coll_recv_t *col, med_epoll_ev_t *mev) {
11161120

11171121
processacks:
11181122
if (col->amqp_producer_state &&
1123+
COLL_OUTSTANDING_PUB_CONFIRMS(col) &&
11191124
consume_mediator_RMQ_producer_acks(col) == 0) {
11201125
/* RMQ failed to acknowledge everything we published, have to
11211126
* reconnect and re-publish
@@ -1620,6 +1625,13 @@ static void *start_collector_thread(void *params) {
16201625
move_thread_into_error_state(col, 1);
16211626
break;
16221627
}
1628+
if (col->amqp_producer_state == NULL) {
1629+
/* We've dropped our internal RMQ session -- no point
1630+
* in continuing processing data / responding to timer
1631+
* events until we've had a change to reconnect
1632+
*/
1633+
break;
1634+
}
16231635
}
16241636
}
16251637
/* If we get here, the message timer expired -- loop around and

src/mediator/coll_recv_thread.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
*
5252
*/
5353

54-
#define MAX_SAVED_RECEIVED_DATA 32
54+
#define MAX_SAVED_RECEIVED_DATA 256
5555

5656
/** Types of messages that can be sent between the main mediator thread and a
5757
* collector receive thread.

src/mediator/mediator_integrity_check.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,12 @@ uint8_t send_integrity_check_hash_pdu(coll_recv_t *col,
453453

454454
encres = generate_integrity_check_hash_pdu(ics, medid, operatorid,
455455
col->etsiencoder, col->etsidecoder);
456+
if (collrecv_save_message(col, (unsigned char *)ics->liid, encres->encoded,
457+
encres->len, ics->msgtype) < 0) {
458+
wandder_release_encoded_result(col->etsiencoder, encres);
459+
return -1;
460+
}
461+
456462
if (ics->msgtype == OPENLI_PROTO_ETSI_CC) {
457463
r = publish_cc_on_mediator_liid_RMQ_queue(col->amqp_producer_state,
458464
encres->encoded, encres->len, found->liid,
@@ -522,6 +528,9 @@ int send_integrity_check_sign_pdu(coll_recv_t *col,
522528

523529
if (collrecv_save_message(col, (unsigned char *)ics->liid, encres->encoded,
524530
encres->len, ics->msgtype) < 0) {
531+
if (operatorid) {
532+
free(operatorid);
533+
}
525534
wandder_release_encoded_result(col->etsiencoder, encres);
526535
return -1;
527536
}

src/mediator/mediator_rmq.c

Lines changed: 41 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -146,70 +146,6 @@ static int register_RMQ_consumer(amqp_connection_state_t state,
146146
return 0;
147147
}
148148

149-
static int update_mediator_rmq_connection_block_status(
150-
amqp_connection_state_t state, uint8_t *is_blocked) {
151-
152-
/* copy of code from export_buffer.c, but with different log
153-
* messages when things go awry
154-
*/
155-
amqp_frame_t frame;
156-
struct timeval tv;
157-
int x, ret;
158-
159-
tv.tv_sec = tv.tv_usec = 0;
160-
x = amqp_simple_wait_frame_noblock(state, &frame, &tv);
161-
162-
if (x != AMQP_STATUS_OK && x != AMQP_STATUS_TIMEOUT) {
163-
logger(LOG_INFO,
164-
"OpenLI mediator: unable to check status of an internal RMQ publishing socket");
165-
return -1;
166-
}
167-
168-
if (*is_blocked) {
169-
ret = 0;
170-
} else {
171-
ret = 1;
172-
}
173-
174-
if (x == AMQP_STATUS_TIMEOUT) {
175-
return ret;
176-
}
177-
178-
if (AMQP_FRAME_METHOD == frame.frame_type) {
179-
switch(frame.payload.method.id) {
180-
case AMQP_CONNECTION_BLOCKED_METHOD:
181-
if ((*is_blocked) == 0) {
182-
logger(LOG_INFO,
183-
"OpenLI mediator: RMQ is unable to handle any more published ETSI records!");
184-
logger(LOG_INFO,
185-
"OpenLI mediator: this is a SERIOUS problem -- received ETSI records are going to be dropped!");
186-
}
187-
*is_blocked = 1;
188-
ret = 0;
189-
break;
190-
case AMQP_CONNECTION_UNBLOCKED_METHOD:
191-
if ((*is_blocked) == 1) {
192-
logger(LOG_INFO,
193-
"OpenLI mediator: RMQ has become unblocked and will resume publishing ETSI records.");
194-
ret = 0;
195-
} else {
196-
ret = 1;
197-
}
198-
*is_blocked = 0;
199-
break;
200-
case AMQP_CONNECTION_CLOSE_METHOD:
201-
logger(LOG_INFO,
202-
"OpenLI mediator: 'close' exception occurred on an internal RMQ connection -- must restart connection");
203-
return -1;
204-
case AMQP_CHANNEL_CLOSE_METHOD:
205-
logger(LOG_INFO,
206-
"OpenLI mediator: channel exception occurred on an internal RMQ connection -- must reset connection");
207-
return -1;
208-
}
209-
}
210-
return ret;
211-
}
212-
213149
/** Disables consumption from a RabbitMQ queue by an existing connection
214150
*
215151
* @param state The RMQ connection to disassociate the queue from
@@ -253,11 +189,6 @@ int declare_mediator_liid_RMQ_queue(amqp_connection_state_t state,
253189
return 0;
254190
}
255191

256-
/*
257-
if (update_mediator_rmq_connection_block_status(state, is_blocked) < 0) {
258-
return -1;
259-
}
260-
*/
261192
if (*is_blocked) {
262193
return 0;
263194
}
@@ -285,11 +216,7 @@ int declare_mediator_rawip_RMQ_queue(amqp_connection_state_t state,
285216
char *liid, uint8_t *is_blocked) {
286217

287218
char queuename[1024];
288-
/*
289-
if (update_mediator_rmq_connection_block_status(state, is_blocked) < 0) {
290-
return -1;
291-
}
292-
*/
219+
293220
if (*is_blocked == 0) {
294221
snprintf(queuename, 1024, "%s-rawip", liid);
295222
return declare_RMQ_queue(state, queuename, 4);
@@ -332,10 +259,6 @@ static int produce_mediator_RMQ(amqp_connection_state_t state,
332259
props.expiration = amqp_cstring_bytes(expirystr);
333260
}
334261

335-
if (update_mediator_rmq_connection_block_status(state, is_blocked) < 0) {
336-
return -1;
337-
}
338-
339262
if ((*is_blocked) == 0) {
340263
pub_ret = amqp_basic_publish(state, channel, amqp_cstring_bytes(""),
341264
amqp_cstring_bytes(queuename), 0, 0, &props, message_bytes);
@@ -992,7 +915,7 @@ static int consume_other_frame(amqp_connection_state_t state) {
992915
int consume_mediator_RMQ_producer_acks(coll_recv_t *col) {
993916

994917
size_t i;
995-
int r, elapsed;
918+
int r;
996919
amqp_frame_t frame;
997920
saved_received_data_t *sav;
998921
struct timeval tv;
@@ -1027,7 +950,6 @@ int consume_mediator_RMQ_producer_acks(coll_recv_t *col) {
1027950
}
1028951
}
1029952

1030-
elapsed = 0;
1031953
while (cc_await > 0 || iri_await > 0 || raw_await > 0) {
1032954
/* keep consuming until we get an ACK or a NACK -- everything else
1033955
* can just be ignored (note that this will block until we get what
@@ -1038,13 +960,25 @@ int consume_mediator_RMQ_producer_acks(coll_recv_t *col) {
1038960
&tv);
1039961

1040962
if (r == AMQP_STATUS_TIMEOUT) {
1041-
elapsed ++;
1042-
if (elapsed >= 3) {
1043-
/* we've gone 3 seconds with no acknowledgement, we'll
1044-
* have to retry the publish
1045-
*/
1046-
return 0;
963+
/* No acknowledgements this second, we need to move on instead so
964+
* that we're not blocking and hope that something turns up
965+
* next time we check for acks.
966+
*/
967+
if (cc_await == 0) {
968+
col->saved_cc_msg_cnt = 0;
969+
}
970+
if (iri_await == 0) {
971+
col->saved_iri_msg_cnt = 0;
972+
}
973+
if (raw_await == 0) {
974+
col->saved_raw_msg_cnt = 0;
1047975
}
976+
if (col->saved_raw_msg_cnt < MAX_SAVED_RECEIVED_DATA &&
977+
col->saved_iri_msg_cnt < MAX_SAVED_RECEIVED_DATA &&
978+
col->saved_cc_msg_cnt < MAX_SAVED_RECEIVED_DATA) {
979+
col->queue_full = 0;
980+
}
981+
return 1;
1048982
} else if (r != AMQP_STATUS_OK) {
1049983
/* broker failure, must retry */
1050984
return 0;
@@ -1055,7 +989,12 @@ int consume_mediator_RMQ_producer_acks(coll_recv_t *col) {
1055989

1056990
switch(frame.payload.method.id) {
1057991
case AMQP_CONNECTION_CLOSE_METHOD:
992+
logger(LOG_INFO,
993+
"OpenLI mediator: 'close' exception occurred on an internal RMQ connection -- must restart connection");
994+
return 0;
1058995
case AMQP_CHANNEL_CLOSE_METHOD:
996+
logger(LOG_INFO,
997+
"OpenLI mediator: channel exception occurred on an internal RMQ connection -- must reset connection");
1059998
return 0;
1060999
case AMQP_BASIC_ACK_METHOD:
10611000
ack = (amqp_basic_ack_t *)frame.payload.method.decoded;
@@ -1078,7 +1017,6 @@ int consume_mediator_RMQ_producer_acks(coll_recv_t *col) {
10781017
} else {
10791018
break;
10801019
}
1081-
10821020
while (*start < MAX_SAVED_RECEIVED_DATA && *await > 0) {
10831021
saved_received_data_t *next = &(sav[*start]);
10841022

@@ -1103,6 +1041,22 @@ int consume_mediator_RMQ_producer_acks(coll_recv_t *col) {
11031041

11041042
case AMQP_BASIC_NACK_METHOD:
11051043
return 0;
1044+
case AMQP_CONNECTION_BLOCKED_METHOD:
1045+
if ((col->rmq_blocked) == 0) {
1046+
logger(LOG_INFO,
1047+
"OpenLI mediator: RMQ is unable to handle any more published ETSI records!");
1048+
logger(LOG_INFO,
1049+
"OpenLI mediator: this is a SERIOUS problem -- received ETSI records are going to be dropped!");
1050+
}
1051+
col->rmq_blocked = 1;
1052+
break;
1053+
case AMQP_CONNECTION_UNBLOCKED_METHOD:
1054+
if ((col->rmq_blocked) == 1) {
1055+
logger(LOG_INFO,
1056+
"OpenLI mediator: RMQ has become unblocked and will resume publishing ETSI records.");
1057+
}
1058+
col->rmq_blocked = 0;
1059+
break;
11061060
}
11071061
}
11081062
}

0 commit comments

Comments
 (0)