Skip to content

Commit 2b502ab

Browse files
committed
Resolve issues with integrity signing request pipeline
* replace libtrace queues for sending requests with ZMQ * include forwarding thread ID in requests/responses to ensure that responses are routed back to the collector thread that asked for them * use entirely string based keys for identifying integrity "chains", as a 4 byte CIN where one of the bytes was 0x00 would cause problems * add extra map for storing the string key for each chain that is associated with an LIID, so we can look up the key with a uint64_t hash rather than having to construct the string every time we need it
1 parent 2b6a862 commit 2b502ab

File tree

9 files changed

+216
-54
lines changed

9 files changed

+216
-54
lines changed

configure.ac

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ AC_CHECK_LIB([uuid], [uuid_parse],,libuuid_found=0)
5252

5353
if test "x$libzmq_found" = "x1"; then
5454
COLLECTOR_LIBS="$COLLECTOR_LIBS -lzmq"
55+
MEDIATOR_LIBS="$MEDIATOR_LIBS -lzmq"
5556
fi
5657

5758
if test "x$libssl11_found" = "x1"; then

src/mediator/coll_recv_thread.c

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ static void remove_expired_liid_queues(coll_recv_t *col) {
331331
etsili_clear_preencoded_fields(known->preencoded_etsi);
332332
free(known->preencoded_etsi);
333333
}
334+
clear_digest_key_map(known);
334335
HASH_DELETE(hh, col->known_liids, known);
335336
free(known);
336337
}
@@ -677,6 +678,7 @@ static col_known_liid_t *create_new_known_liid(coll_recv_t *col,
677678
found->provisioner_withdrawn = 0;
678679
found->preencoded_etsi = calloc(OPENLI_PREENCODE_LAST,
679680
sizeof(wandder_encode_job_t));
681+
found->digest_cin_keys = NULL;
680682

681683
snprintf(qname, 1024, "%s-iri", found->liid);
682684
found->queuenames[0] = strdup(qname);
@@ -1301,10 +1303,15 @@ static void cleanup_collector_thread(coll_recv_t *col) {
13011303
etsili_clear_preencoded_fields(known->preencoded_etsi);
13021304
free(known->preencoded_etsi);
13031305
}
1306+
clear_digest_key_map(known);
13041307
HASH_DELETE(hh, col->known_liids, known);
13051308
free(known);
13061309
}
13071310

1311+
if (col->zmq_requests) {
1312+
zmq_close(col->zmq_requests);
1313+
}
1314+
13081315
if (col->ipaddr && col->forwarder_id >= 0) {
13091316
logger(LOG_INFO, "OpenLI mediator: exiting collector thread %d for %s",
13101317
col->forwarder_id, col->ipaddr);
@@ -1359,6 +1366,7 @@ static void *start_collector_thread(void *params) {
13591366
int epoll_fd = -1, timerexpired, nfds;
13601367
med_epoll_ev_t *timerev, *queuecheck = NULL;
13611368
struct epoll_event evs[64];
1369+
int zero = 0;
13621370

13631371
if (col->ipaddr == NULL) {
13641372
logger(LOG_INFO, "OpenLI Mediator: started collector thread for NULL collector IP??");
@@ -1376,6 +1384,26 @@ static void *start_collector_thread(void *params) {
13761384
}
13771385
unlock_med_collector_config(col->parentconfig);
13781386

1387+
/* create ZMQ requests queue for publishing requests */
1388+
col->zmq_requests = zmq_socket(col->zmq_ctxt, ZMQ_PUSH);
1389+
if (zmq_connect(col->zmq_requests, "inproc://openlimed_collrecv_requests")
1390+
< 0) {
1391+
logger(LOG_INFO,
1392+
"OpenLI Mediator: collector thread for %s failed to connect to the ZMQ for pushing requests back to the main thread: %s",
1393+
col->ipaddr, strerror(errno));
1394+
zmq_close(col->zmq_requests);
1395+
col->zmq_requests = NULL;
1396+
}
1397+
1398+
if (col->zmq_requests && zmq_setsockopt(col->zmq_requests, ZMQ_LINGER,
1399+
&zero, sizeof(zero)) != 0) {
1400+
logger(LOG_INFO,
1401+
"OpenLI Mediator: collector thread for %s failed to configure the ZMQ for pushing requests back to the main thread: %s",
1402+
col->ipaddr, strerror(errno));
1403+
zmq_close(col->zmq_requests);
1404+
col->zmq_requests = NULL;
1405+
}
1406+
13791407
epoll_fd = epoll_create1(0);
13801408

13811409
timerev = col->colev = col->rmq_colev = NULL;
@@ -1664,8 +1692,10 @@ static void init_new_colrecv_thread(mediator_collector_t *medcol,
16641692

16651693
libtrace_message_queue_init(&(newcol->in_main),
16661694
sizeof(col_thread_msg_t));
1667-
libtrace_message_queue_init(&(newcol->out_main),
1668-
sizeof(col_thread_msg_t));
1695+
1696+
newcol->zmq_ctxt = medcol->zmq_ctxt;
1697+
newcol->zmq_requests = NULL;
1698+
16691699
pthread_create(&(newcol->tid), NULL, start_collector_thread, newcol);
16701700
}
16711701

@@ -1743,7 +1773,6 @@ void mediator_disconnect_all_collectors(mediator_collector_t *medcol) {
17431773

17441774
pthread_join(col->tid, NULL);
17451775
libtrace_message_queue_destroy(&(col->in_main));
1746-
libtrace_message_queue_destroy(&(col->out_main));
17471776
tofree = col;
17481777
col = col->next;
17491778
free(tofree);
@@ -1834,7 +1863,10 @@ void mediator_clean_collectors(mediator_collector_t *medcol) {
18341863

18351864
pthread_join(tofree->tid, NULL);
18361865
libtrace_message_queue_destroy(&(tofree->in_main));
1837-
libtrace_message_queue_destroy(&(tofree->out_main));
1866+
1867+
if (tofree->zmq_requests) {
1868+
zmq_close(tofree->zmq_requests);
1869+
}
18381870

18391871
if (tofree == col && newhead != oldhead) {
18401872
/* we are removing the head, so we need

src/mediator/coll_recv_thread.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include <libtrace/message_queue.h>
3131
#include <libwandder_etsili.h>
32+
#include <zmq.h>
3233
#include <openssl/evp.h>
3334
#include <openssl/err.h>
3435
#include "netcomms.h"
@@ -123,6 +124,13 @@ typedef struct col_thread_msg {
123124

124125
typedef struct agency_digest_config agency_digest_config_t;
125126

127+
typedef struct digest_map_key {
128+
uint64_t key_cin;
129+
const char *keystring;
130+
131+
UT_hash_handle hh;
132+
} digest_map_key_t;
133+
126134
/** Structure for keeping track of the LIIDs that a collector receive thread
127135
* has seen
128136
*/
@@ -158,6 +166,8 @@ typedef struct col_known_liid {
158166

159167
wandder_encode_job_t *preencoded_etsi;
160168

169+
digest_map_key_t *digest_cin_keys;
170+
161171
UT_hash_handle hh;
162172
} col_known_liid_t;
163173

@@ -389,10 +399,13 @@ struct single_coll_receiver {
389399
*/
390400
libtrace_message_queue_t in_main;
391401

402+
/** Global ZMQ context for the entire mediator process */
403+
void *zmq_ctxt;
404+
392405
/** The message queue on which this thread will send requests (e.g.
393406
* integrity check signing requests) back to the main mediator thread.
394407
*/
395-
libtrace_message_queue_t out_main;
408+
void *zmq_requests;
396409

397410
/** Flag that indicates whether RMQ has told us that it is "connection
398411
* blocked, i.e. no longer able to accept published messages
@@ -450,6 +463,8 @@ typedef struct mediator_collectors {
450463
/** A hashmap containing the set of collector receive threads */
451464
coll_recv_t *threads;
452465

466+
void *zmq_ctxt;
467+
453468
/** Shared configuration for all collector receive threads */
454469
mediator_collector_config_t config;
455470

@@ -613,6 +628,7 @@ int send_integrity_check_signing_request(coll_recv_t *col,
613628
void handle_integrity_check_signature_response(coll_recv_t *col,
614629
struct ics_sign_response_message *resp);
615630
void destroy_integrity_sign_job(ics_sign_request_t *job);
631+
void clear_digest_key_map(col_known_liid_t *known);
616632

617633
/* defined in mediator_encryption.c */
618634
payload_encryption_method_t check_encryption_requirements(

src/mediator/mediator.c

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,13 @@ static void destroy_med_state(mediator_state_t *state) {
191191
free(state->col_clean_timerev);
192192
}
193193

194+
if (state->zmq_request_collrecv) {
195+
zmq_close(state->zmq_request_collrecv);
196+
}
197+
if (state->zmq_ctxt) {
198+
zmq_ctx_destroy(state->zmq_ctxt);
199+
}
200+
194201
}
195202

196203
/** Reads the configuration for a mediator instance and sets the relevant
@@ -281,12 +288,33 @@ static int init_mediator_config(mediator_state_t *state,
281288
* @return -1 if an error occurs, 0 otherwise
282289
*/
283290
static int init_med_state(mediator_state_t *state, char *configfile) {
291+
int zero = 0;
292+
284293
state->listenerev = NULL;
285294
state->timerev = NULL;
286295
state->col_clean_timerev = NULL;
287296
state->epoll_fd = -1;
288297
state->saved_agencies = NULL;
289298

299+
state->zmq_ctxt = zmq_ctx_new();
300+
state->zmq_request_collrecv = zmq_socket(state->zmq_ctxt, ZMQ_PULL);
301+
if (zmq_bind(state->zmq_request_collrecv,
302+
"inproc://openlimed_collrecv_requests") < 0) {
303+
logger(LOG_INFO,
304+
"OpenLI mediator: unable to bind to ZMQ socket for receiving requests from the collrecv threads: %s", strerror(errno));
305+
zmq_close(state->zmq_request_collrecv);
306+
state->zmq_request_collrecv = NULL;
307+
}
308+
309+
if (state->zmq_request_collrecv &&
310+
zmq_setsockopt(state->zmq_request_collrecv, ZMQ_LINGER, &zero,
311+
sizeof(zero)) != 0) {
312+
logger(LOG_INFO,
313+
"OpenLI mediator: unable to configure ZMQ socket for receiving requests from the collrecv threads: %s", strerror(errno));
314+
zmq_close(state->zmq_request_collrecv);
315+
state->zmq_request_collrecv = NULL;
316+
}
317+
290318
init_provisioner_instance(&(state->provisioner), &(state->sslconf.ctx));
291319
if (init_mediator_config(state, configfile) < 0) {
292320
return -1;
@@ -306,6 +334,7 @@ static int init_med_state(mediator_state_t *state, char *configfile) {
306334

307335
/* Initialise state and config for the collector receive threads */
308336
state->collector_threads.threads = NULL;
337+
state->collector_threads.zmq_ctxt = state->zmq_ctxt;
309338
init_med_collector_config(&(state->collector_threads.config),
310339
state->etsitls,
311340
&(state->sslconf), &(state->RMQ_conf), state->mediatorid,
@@ -932,11 +961,17 @@ static int receive_ics_signature(mediator_state_t *state, uint8_t *msgbody,
932961
goto tidyup_err;
933962
}
934963

935-
memset(&msg, 0, sizeof(msg));
936-
msg.type = MED_COLL_INTEGRITY_SIGN_RESULT;
937-
msg.arg = (uint64_t)resp;
964+
while (col != NULL) {
965+
if (col->forwarder_id >= 0 &&
966+
(uint32_t)col->forwarder_id == resp->requestedby_fwd) {
967+
memset(&msg, 0, sizeof(msg));
968+
msg.type = MED_COLL_INTEGRITY_SIGN_RESULT;
969+
msg.arg = (uint64_t)resp;
938970

939-
libtrace_message_queue_put(&(col->in_main), &msg);
971+
libtrace_message_queue_put(&(col->in_main), &msg);
972+
}
973+
col = col->next;
974+
}
940975

941976
return 0;
942977

@@ -1593,6 +1628,48 @@ static int reload_mediator_config(mediator_state_t *currstate) {
15931628

15941629
}
15951630

1631+
static void process_collector_thread_requests(mediator_state_t *state) {
1632+
1633+
int reqs_processed = 0;
1634+
unsigned char buf[1024];
1635+
col_thread_msg_t *colmsg;
1636+
int r;
1637+
1638+
while (reqs_processed < 10 && state->zmq_request_collrecv) {
1639+
if ((r = zmq_recv(state->zmq_request_collrecv, buf, 1024,
1640+
ZMQ_DONTWAIT)) < 0) {
1641+
if (errno == EAGAIN || errno == EINTR) {
1642+
/* no requests available */
1643+
break;
1644+
}
1645+
logger(LOG_INFO, "OpenLI mediator: error while reading a request from one of the collector threads: %s", strerror(errno));
1646+
zmq_close(state->zmq_request_collrecv);
1647+
state->zmq_request_collrecv = NULL;
1648+
continue;
1649+
}
1650+
1651+
if (r < (int)(sizeof(col_thread_msg_t))) {
1652+
logger(LOG_INFO, "OpenLI mediator: unexpected message size received from one of the collector threads: %d\n", r);
1653+
continue;
1654+
}
1655+
1656+
colmsg = (col_thread_msg_t *)(buf);
1657+
1658+
if (colmsg->type == MED_COLL_INTEGRITY_SIGN_REQUEST) {
1659+
struct ics_sign_request_message *signreq;
1660+
signreq = (struct ics_sign_request_message *)(colmsg->arg);
1661+
if (send_ics_signing_request_to_provisioner(
1662+
&state->provisioner, signreq) < 0) {
1663+
logger(LOG_INFO, "OpenLI mediator: failed to pass on integrity check signing request to the provisioner");
1664+
}
1665+
1666+
} else {
1667+
logger(LOG_INFO, "OpenLI mediator: invalid message type received by main thread from collector thread (%u)", colmsg->type);
1668+
}
1669+
reqs_processed ++;
1670+
}
1671+
}
1672+
15961673
/** The main loop of the mediator process.
15971674
*
15981675
* Continually checks for registered epoll events, e.g. timers expiring,
@@ -1611,7 +1688,6 @@ static void run(mediator_state_t *state) {
16111688
struct epoll_event evs[64];
16121689
int provfail = 0;
16131690
med_epoll_ev_t *signalev;
1614-
coll_recv_t *col_t, *tmp;
16151691

16161692
/* Register the epoll event for received signals */
16171693
signalev = create_mediator_fdevent(state->epoll_fd, NULL,
@@ -1638,9 +1714,6 @@ static void run(mediator_state_t *state) {
16381714
goto runfailure;
16391715
}
16401716

1641-
/* TODO this timer should be longer, but for testing I've set to fire
1642-
* more frequently
1643-
*/
16441717
if (start_mediator_timer(state->col_clean_timerev, 30) < 0) {
16451718
logger(LOG_INFO,
16461719
"OpenLI Mediator: failed to start collector cleanup timer");
@@ -1689,23 +1762,7 @@ static void run(mediator_state_t *state) {
16891762
/* Check for integrity check signing requests from the collector
16901763
* threads.
16911764
*/
1692-
HASH_ITER(hh, state->collector_threads.threads, col_t, tmp) {
1693-
col_thread_msg_t colmsg;
1694-
while (libtrace_message_queue_try_get(&(col_t->out_main),
1695-
(void *)&colmsg) != LIBTRACE_MQ_FAILED) {
1696-
if (colmsg.type == MED_COLL_INTEGRITY_SIGN_REQUEST) {
1697-
struct ics_sign_request_message *signreq;
1698-
signreq = (struct ics_sign_request_message *)(colmsg.arg);
1699-
if (send_ics_signing_request_to_provisioner(
1700-
&state->provisioner, signreq) < 0) {
1701-
logger(LOG_INFO, "OpenLI mediator: failed to pass on integrity check signing request to the provisioner");
1702-
}
1703-
1704-
} else {
1705-
logger(LOG_INFO, "OpenLI mediator: invalid message type received by main thread from collector thread (%u)", colmsg.type);
1706-
}
1707-
}
1708-
}
1765+
process_collector_thread_requests(state);
17091766

17101767
/* This timer will force us to stop checking epoll and go back
17111768
* to the start of this loop (i.e. checking if we should halt the

src/mediator/mediator.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <libwandder.h>
3232
#include <libwandder_etsili.h>
3333
#include <uthash.h>
34+
#include <zmq.h>
3435
#include "netcomms.h"
3536
#include "export_buffer.h"
3637
#include "util.h"
@@ -115,6 +116,13 @@ typedef struct med_state {
115116
/** The RabbitMQ configuration for the mediator */
116117
openli_RMQ_config_t RMQ_conf;
117118

119+
/** ZeroMQ context for creating ZMQ sockets */
120+
void *zmq_ctxt;
121+
122+
/** ZeroMQ socket for receiving requests from collector threads */
123+
void *zmq_request_collrecv;
124+
125+
118126
} mediator_state_t;
119127

120128
#endif

0 commit comments

Comments
 (0)