Skip to content

Commit f3fa9f5

Browse files
singpolymasjaeckel
authored andcommitted
Allow serializing SM state
It is useful in some applications to get a snapshot of the stream management state for storage outside the process, in order to recover from crashes and other things. The get_sm_state already present is not suitable for this because it returns a live object tied in to the current context and such, and containing much unneeded internal-api data. Introduce xmpp_sm_state_set_callback which is called every time the state changes with a serialized state, and a dual xmpp_sm_state_restore which takes in this serialized state and sets up a new sm_state based on that. The serialization is considered opaque from the PoV of the API, but is based on CBOR to facilitate easy debugging.
1 parent 5690c4e commit f3fa9f5

File tree

7 files changed

+446
-3
lines changed

7 files changed

+446
-3
lines changed

Makefile.am

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ STATIC_TESTS = \
190190
tests/test_jid \
191191
tests/test_ctx \
192192
tests/test_send_queue \
193+
tests/test_serialize_sm \
193194
tests/test_string \
194195
tests/test_resolver
195196

@@ -285,6 +286,11 @@ tests_test_send_queue_CFLAGS = -I$(top_srcdir)/src
285286
tests_test_send_queue_LDADD = $(STROPHE_LIBS)
286287
tests_test_send_queue_LDFLAGS = -static
287288

289+
tests_test_serialize_sm_SOURCES = tests/test_serialize_sm.c tests/test.c tests/test.h
290+
tests_test_serialize_sm_CFLAGS = -I$(top_srcdir)/src
291+
tests_test_serialize_sm_LDADD = $(STROPHE_LIBS)
292+
tests_test_serialize_sm_LDFLAGS = -static
293+
288294
tests_test_snprintf_SOURCES = tests/test_snprintf.c
289295
tests_test_snprintf_CFLAGS = -I$(top_srcdir)/src
290296

src/auth.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,6 +1232,7 @@ static void _sm_enable(xmpp_conn_t *conn)
12321232
send_stanza(conn, enable, XMPP_QUEUE_SM_STROPHE);
12331233
conn->sm_state->sm_sent_nr = 0;
12341234
conn->sm_state->sm_enabled = 1;
1235+
trigger_sm_callback(conn);
12351236
}
12361237

12371238
static int
@@ -1486,6 +1487,8 @@ static int _handle_sm(xmpp_conn_t *const conn,
14861487
name = NULL;
14871488
}
14881489

1490+
trigger_sm_callback(conn);
1491+
14891492
err_sm:
14901493
if (!name) {
14911494
char *err = "Couldn't convert stanza to text!";

src/common.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,8 @@ struct _xmpp_conn_t {
323323
hash_t *id_handlers;
324324
xmpp_handlist_t *handlers;
325325
xmpp_sockopt_callback sockopt_cb;
326+
xmpp_sm_callback sm_callback;
327+
void *sm_callback_ctx;
326328
};
327329

328330
void conn_disconnect(xmpp_conn_t *conn);
@@ -376,6 +378,7 @@ void handler_add(xmpp_conn_t *conn,
376378
void handler_system_delete_all(xmpp_conn_t *conn);
377379

378380
/* utility functions */
381+
void trigger_sm_callback(xmpp_conn_t *conn);
379382
void reset_sm_state(xmpp_sm_state_t *sm_state);
380383
void disconnect_mem_error(xmpp_conn_t *conn);
381384

src/conn.c

Lines changed: 269 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,6 +1284,267 @@ xmpp_sm_state_t *xmpp_conn_get_sm_state(xmpp_conn_t *conn)
12841284
return ret;
12851285
}
12861286

1287+
void xmpp_sm_state_set_callback(xmpp_conn_t *conn, xmpp_sm_callback cb, void *ctx)
1288+
{
1289+
conn->sm_callback = cb;
1290+
conn->sm_callback_ctx = ctx;
1291+
}
1292+
1293+
int xmpp_sm_state_restore(xmpp_conn_t *conn, const unsigned char *sm_state, size_t sm_state_len)
1294+
{
1295+
/* We can only set the SM state when we're disconnected */
1296+
if (conn->state != XMPP_STATE_DISCONNECTED) {
1297+
strophe_error(conn->ctx, "conn",
1298+
"SM state can only be set the when we're disconnected");
1299+
return XMPP_EINVOP;
1300+
}
1301+
1302+
if (conn->sm_state) {
1303+
strophe_error(conn->ctx, "conn", "SM state is already set!");
1304+
return XMPP_EINVOP;
1305+
}
1306+
1307+
if (sm_state_len < 5*6) {
1308+
strophe_error(conn->ctx, "conn", "Provided sm_state data is too short");
1309+
return XMPP_EINVOP;
1310+
}
1311+
1312+
if (memcmp(sm_state, "\x1a\x00\x00\x00\x00", 5) != 0) {
1313+
strophe_error(conn->ctx, "conn", "Unknown sm_state version");
1314+
return XMPP_EINVOP;
1315+
}
1316+
sm_state += 5;
1317+
1318+
conn->sm_state = strophe_alloc(conn->ctx, sizeof(*conn->sm_state));
1319+
if (!conn->sm_state) return XMPP_EMEM;
1320+
1321+
memset(conn->sm_state, 0, sizeof(*conn->sm_state));
1322+
conn->sm_state->sm_queue.head = NULL;
1323+
conn->sm_state->sm_queue.tail = NULL;
1324+
conn->sm_state->ctx = conn->ctx;
1325+
1326+
conn->sm_state->sm_support = 1;
1327+
conn->sm_state->sm_enabled = 1;
1328+
conn->sm_state->can_resume = 1;
1329+
conn->sm_state->resume = 1;
1330+
1331+
sm_state++;
1332+
conn->sm_state->sm_sent_nr = *sm_state++ << 24;
1333+
conn->sm_state->sm_sent_nr |= *sm_state++ << 16;
1334+
conn->sm_state->sm_sent_nr |= *sm_state++ << 8;
1335+
conn->sm_state->sm_sent_nr |= *sm_state++;
1336+
1337+
sm_state++;
1338+
conn->sm_state->sm_handled_nr = *sm_state++ << 24;
1339+
conn->sm_state->sm_handled_nr |= *sm_state++ << 16;
1340+
conn->sm_state->sm_handled_nr |= *sm_state++ << 8;
1341+
conn->sm_state->sm_handled_nr |= *sm_state++;
1342+
1343+
sm_state++;
1344+
size_t id_len = *sm_state++ << 24;
1345+
id_len |= *sm_state++ << 16;
1346+
id_len |= *sm_state++ << 8;
1347+
id_len |= *sm_state++;
1348+
conn->sm_state->id = strophe_alloc(conn->ctx, id_len + 1);
1349+
if (!conn->sm_state->id) {
1350+
xmpp_free_sm_state(conn->sm_state);
1351+
return XMPP_EMEM;
1352+
}
1353+
memcpy(conn->sm_state->id, sm_state, id_len);
1354+
conn->sm_state->id[id_len] = '\0';
1355+
sm_state += id_len;
1356+
1357+
sm_state++;
1358+
conn->send_queue_len = *sm_state++ << 24;
1359+
conn->send_queue_len |= *sm_state++ << 16;
1360+
conn->send_queue_len |= *sm_state++ << 8;
1361+
conn->send_queue_len |= *sm_state++;
1362+
conn->send_queue_user_len = conn->send_queue_len;
1363+
for (int i = 0; i < conn->send_queue_len; i++) {
1364+
xmpp_send_queue_t *item = strophe_alloc(conn->ctx, sizeof(*item));
1365+
if (!item) {
1366+
xmpp_free_sm_state(conn->sm_state);
1367+
return XMPP_EMEM;
1368+
}
1369+
memset(item, 0, sizeof(*item));
1370+
1371+
sm_state++;
1372+
item->len = *sm_state++ << 24;
1373+
item->len |= *sm_state++ << 16;
1374+
item->len |= *sm_state++ << 8;
1375+
item->len |= *sm_state++;
1376+
item->data = strophe_alloc(conn->ctx, item->len + 1);
1377+
if (!item->data) {
1378+
xmpp_free_sm_state(conn->sm_state);
1379+
return XMPP_EMEM;
1380+
}
1381+
memcpy(item->data, sm_state, item->len);
1382+
item->data[item->len] = '\0';
1383+
sm_state += item->len;
1384+
1385+
item->written = 0;
1386+
item->wip = 0;
1387+
item->userdata = NULL;
1388+
item->owner = XMPP_QUEUE_USER;
1389+
1390+
if (!conn->send_queue_tail) {
1391+
conn->send_queue_head = item;
1392+
conn->send_queue_tail = item;
1393+
} else {
1394+
conn->send_queue_tail->next = item;
1395+
conn->send_queue_tail = item;
1396+
}
1397+
}
1398+
1399+
sm_state++;
1400+
size_t sm_q_len = *sm_state++ << 24;
1401+
sm_q_len |= *sm_state++ << 16;
1402+
sm_q_len |= *sm_state++ << 8;
1403+
sm_q_len |= *sm_state++;
1404+
for (size_t i = 0; i < sm_q_len; i++) {
1405+
xmpp_send_queue_t *item = strophe_alloc(conn->ctx, sizeof(*item));
1406+
if (!item) {
1407+
xmpp_free_sm_state(conn->sm_state);
1408+
return XMPP_EMEM;
1409+
}
1410+
memset(item, 0, sizeof(*item));
1411+
1412+
sm_state++;
1413+
item->sm_h = *sm_state++ << 24;
1414+
item->sm_h |= *sm_state++ << 16;
1415+
item->sm_h |= *sm_state++ << 8;
1416+
item->sm_h |= *sm_state++;
1417+
sm_state++;
1418+
item->len = *sm_state++ << 24;
1419+
item->len |= *sm_state++ << 16;
1420+
item->len |= *sm_state++ << 8;
1421+
item->len |= *sm_state++;
1422+
item->data = strophe_alloc(conn->ctx, item->len + 1);
1423+
if (!item->data) {
1424+
xmpp_free_sm_state(conn->sm_state);
1425+
return XMPP_EMEM;
1426+
}
1427+
memcpy(item->data, sm_state, item->len);
1428+
item->data[item->len] = '\0';
1429+
sm_state += item->len;
1430+
1431+
item->written = 0;
1432+
item->wip = 0;
1433+
item->userdata = NULL;
1434+
item->owner = XMPP_QUEUE_USER;
1435+
add_queue_back(&conn->sm_state->sm_queue, item);
1436+
}
1437+
1438+
return XMPP_EOK;
1439+
}
1440+
1441+
size_t xmpp_conn_serialize_sm_state(xmpp_conn_t *conn, unsigned char **buf)
1442+
{
1443+
if (!conn->sm_state->sm_support || !conn->sm_state->sm_enabled || !conn->sm_state->can_resume) {
1444+
*buf = NULL;
1445+
return 0;
1446+
}
1447+
1448+
size_t id_len = strlen(conn->sm_state->id);
1449+
xmpp_send_queue_t *peek = conn->sm_state->sm_queue.head;
1450+
size_t sm_queue_len = 0;
1451+
size_t sm_queue_size = 0;
1452+
while (peek) {
1453+
sm_queue_len++;
1454+
sm_queue_size += 10 + peek->len;
1455+
peek = peek->next;
1456+
}
1457+
1458+
size_t send_queue_len = 0;
1459+
size_t send_queue_size = 0;
1460+
peek = conn->send_queue_head;
1461+
while (peek) {
1462+
send_queue_len++;
1463+
send_queue_size += 5 + peek->len;
1464+
peek = peek->next;
1465+
}
1466+
1467+
size_t buf_size = 5 + 5 + 5 + 5 + id_len + 5 + send_queue_size + 5 + sm_queue_size;
1468+
*buf = strophe_alloc(conn->ctx, buf_size);
1469+
unsigned char *next = *buf;
1470+
1471+
memcpy(next, "\x1a\x00\x00\x00\x00", 5); // Version
1472+
next += 5;
1473+
1474+
*next++ = 0x1a;
1475+
*next++ = (conn->sm_state->sm_sent_nr >> 24) & 0xFF;
1476+
*next++ = (conn->sm_state->sm_sent_nr >> 16) & 0xFF;
1477+
*next++ = (conn->sm_state->sm_sent_nr >> 8) & 0xFF;
1478+
*next++ = conn->sm_state->sm_sent_nr & 0xFF;
1479+
1480+
*next++ = 0x1a;
1481+
*next++ = (conn->sm_state->sm_handled_nr >> 24) & 0xFF;
1482+
*next++ = (conn->sm_state->sm_handled_nr >> 16) & 0xFF;
1483+
*next++ = (conn->sm_state->sm_handled_nr >> 8) & 0xFF;
1484+
*next++ = conn->sm_state->sm_handled_nr & 0xFF;
1485+
1486+
*next++ = 0x7a;
1487+
*next++ = (id_len >> 24) & 0xFF;
1488+
*next++ = (id_len >> 16) & 0xFF;
1489+
*next++ = (id_len >> 8) & 0xFF;
1490+
*next++ = id_len & 0xFF;
1491+
memcpy(next, conn->sm_state->id, id_len);
1492+
next += id_len;
1493+
1494+
*next++ = 0x9a;
1495+
*next++ = (send_queue_len >> 24) & 0xFF;
1496+
*next++ = (send_queue_len >> 16) & 0xFF;
1497+
*next++ = (send_queue_len >> 8) & 0xFF;
1498+
*next++ = send_queue_len & 0xFF;
1499+
1500+
peek = conn->send_queue_head;
1501+
while (peek) {
1502+
*next++ = 0x7a;
1503+
*next++ = (peek->len >> 24) & 0xFF;
1504+
*next++ = (peek->len >> 16) & 0xFF;
1505+
*next++ = (peek->len >> 8) & 0xFF;
1506+
*next++ = peek->len & 0xFF;
1507+
memcpy(next, peek->data, peek->len);
1508+
next += peek->len;
1509+
peek = peek->next;
1510+
}
1511+
1512+
*next++ = 0xba;
1513+
*next++ = (sm_queue_len >> 24) & 0xFF;
1514+
*next++ = (sm_queue_len >> 16) & 0xFF;
1515+
*next++ = (sm_queue_len >> 8) & 0xFF;
1516+
*next++ = sm_queue_len & 0xFF;
1517+
1518+
peek = conn->sm_state->sm_queue.head;
1519+
while (peek) {
1520+
*next++ = 0x1a;
1521+
*next++ = (peek->sm_h >> 24) & 0xFF;
1522+
*next++ = (peek->sm_h >> 16) & 0xFF;
1523+
*next++ = (peek->sm_h >> 8) & 0xFF;
1524+
*next++ = peek->sm_h & 0xFF;
1525+
1526+
*next++ = 0x7a;
1527+
*next++ = (peek->len >> 24) & 0xFF;
1528+
*next++ = (peek->len >> 16) & 0xFF;
1529+
*next++ = (peek->len >> 8) & 0xFF;
1530+
*next++ = peek->len & 0xFF;
1531+
memcpy(next, peek->data, peek->len);
1532+
next += peek->len;
1533+
peek = peek->next;
1534+
}
1535+
1536+
return buf_size;
1537+
}
1538+
1539+
void trigger_sm_callback(xmpp_conn_t *conn) {
1540+
if (!conn || !conn->sm_callback) return;
1541+
1542+
unsigned char *buf;
1543+
size_t size = xmpp_conn_serialize_sm_state(conn, &buf);
1544+
conn->sm_callback(conn, conn->sm_callback_ctx, buf, size);
1545+
strophe_free(conn->ctx, buf);
1546+
}
1547+
12871548
static void _reset_sm_state_for_reconnect(xmpp_conn_t *conn)
12881549
{
12891550
xmpp_sm_state_t *s = conn->sm_state;
@@ -1490,7 +1751,9 @@ char *xmpp_conn_send_queue_drop_element(xmpp_conn_t *conn,
14901751
conn->sm_state->r_sent = 0;
14911752
}
14921753
/* Finally drop the element */
1493-
return _drop_send_queue_element(conn, t);
1754+
char *r = _drop_send_queue_element(conn, t);
1755+
trigger_sm_callback(conn);
1756+
return r;
14941757
}
14951758

14961759
/* timed handler for cleanup if normal disconnect procedure takes too long */
@@ -1684,6 +1947,7 @@ static void _handle_stream_end(char *name, void *userdata)
16841947
strophe_debug(conn->ctx, "xmpp", "RECV: </stream:stream>");
16851948
/* the session has been terminated properly, i.e. it can't be resumed */
16861949
conn->sm_state->can_resume = 0;
1950+
trigger_sm_callback(conn);
16871951
conn_disconnect_clean(conn);
16881952
}
16891953

@@ -1759,6 +2023,7 @@ static void _conn_sm_handle_stanza(xmpp_conn_t *const conn,
17592023
conn->sm_state->r_sent = 0;
17602024
}
17612025
}
2026+
trigger_sm_callback(conn);
17622027
}
17632028

17642029
static unsigned short _conn_default_port(xmpp_conn_t *conn,
@@ -2045,8 +2310,10 @@ static int _send_raw(xmpp_conn_t *conn,
20452310
strophe_debug_verbose(1, conn->ctx, "conn", "Q_ADD: %p", item);
20462311
if (!(owner & XMPP_QUEUE_SM) && conn->sm_state->sm_enabled &&
20472312
!conn->sm_state->r_sent) {
2048-
send_raw(conn, req_ack, strlen(req_ack), XMPP_QUEUE_SM_STROPHE, item);
20492313
conn->sm_state->r_sent = 1;
2314+
send_raw(conn, req_ack, strlen(req_ack), XMPP_QUEUE_SM_STROPHE, item);
2315+
} else {
2316+
trigger_sm_callback(conn);
20502317
}
20512318
return XMPP_EOK;
20522319
}

src/event.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ void xmpp_run_once(xmpp_ctx_t *ctx, unsigned long timeout)
174174
/* if we've sent everything update the tail */
175175
if (!sq)
176176
conn->send_queue_tail = NULL;
177+
trigger_sm_callback(conn);
177178
}
178179
intf->flush(intf);
179180

strophe.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#define __LIBSTROPHE_STROPHE_H__
1919

2020
#include <stddef.h> /* size_t */
21+
#include <stdint.h>
2122

2223
#ifdef __cplusplus
2324
extern "C" {
@@ -414,8 +415,11 @@ typedef enum {
414415
char *xmpp_conn_send_queue_drop_element(xmpp_conn_t *conn,
415416
xmpp_queue_element_t which);
416417

417-
xmpp_sm_state_t *xmpp_conn_get_sm_state(xmpp_conn_t *conn);
418+
typedef void (*xmpp_sm_callback)(xmpp_conn_t *conn, void *ctx, const unsigned char *sm_state, size_t sm_state_len);
418419
int xmpp_conn_set_sm_state(xmpp_conn_t *conn, xmpp_sm_state_t *sm_state);
420+
xmpp_sm_state_t *xmpp_conn_get_sm_state(xmpp_conn_t *conn);
421+
void xmpp_sm_state_set_callback(xmpp_conn_t *conn, xmpp_sm_callback cb, void *ctx);
422+
int xmpp_sm_state_restore(xmpp_conn_t *conn, const unsigned char *sm_state, size_t sm_state_len);
419423

420424
void xmpp_free_sm_state(xmpp_sm_state_t *sm_state);
421425

0 commit comments

Comments
 (0)