Skip to content

Commit 8b4c63b

Browse files
committed
Callbacks when a stanza is ack'd or fails to send
This adds two new callbacks, one of which runs with a stanza id when a stanza sends and gets an sm ack from the server, and one which runs with a stanza id when a stanza fails and will not be retried. Incomplete and presented for discussion of the design. An id element has been added to the send/sm queue in order to facilitate passing this to the callback without re-parsing the stanza. This is also added to the sm serialization, though we could parse the stanza on restore to avoid this I suppose. TODO: fail everything on stream end
1 parent a2b3b6b commit 8b4c63b

File tree

3 files changed

+70
-4
lines changed

3 files changed

+70
-4
lines changed

src/common.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ struct _xmpp_send_queue_t {
163163
xmpp_send_queue_owner_t owner;
164164
void *userdata;
165165
uint32_t sm_h;
166+
char *id;
166167

167168
xmpp_send_queue_t *prev, *next;
168169
};
@@ -330,6 +331,10 @@ struct _xmpp_conn_t {
330331
xmpp_sockopt_callback sockopt_cb;
331332
xmpp_sm_callback sm_callback;
332333
void *sm_callback_ctx;
334+
xmpp_sm_ack_callback sm_ack_callback;
335+
void *sm_ack_callback_ctx;
336+
xmpp_sm_ack_callback sm_fail_callback;
337+
void *sm_fail_callback_ctx;
333338
};
334339

335340
void conn_disconnect(xmpp_conn_t *conn);

src/conn.c

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* A part of those functions is listed under the \ref TLS section.
2121
*/
2222

23+
#include <assert.h>
2324
#include <errno.h>
2425
#include <netinet/in.h>
2526
#include <stdarg.h>
@@ -115,6 +116,7 @@ static void _send_valist(xmpp_conn_t *conn,
115116
static int _send_raw(xmpp_conn_t *conn,
116117
char *data,
117118
size_t len,
119+
const char *id,
118120
xmpp_send_queue_owner_t owner,
119121
void *userdata);
120122

@@ -1279,6 +1281,22 @@ void xmpp_conn_set_sm_callback(xmpp_conn_t *conn,
12791281
conn->sm_callback_ctx = ctx;
12801282
}
12811283

1284+
void xmpp_conn_set_sm_ack_callback(xmpp_conn_t *conn,
1285+
xmpp_sm_ack_callback cb,
1286+
void *ctx)
1287+
{
1288+
conn->sm_ack_callback = cb;
1289+
conn->sm_ack_callback_ctx = ctx;
1290+
}
1291+
1292+
void xmpp_conn_set_sm_fail_callback(xmpp_conn_t *conn,
1293+
xmpp_sm_ack_callback cb,
1294+
void *ctx)
1295+
{
1296+
conn->sm_fail_callback = cb;
1297+
conn->sm_fail_callback_ctx = ctx;
1298+
}
1299+
12821300
struct sm_restore {
12831301
xmpp_conn_t *conn;
12841302
const unsigned char *state;
@@ -1324,7 +1342,8 @@ static int sm_load_string(struct sm_restore *sm, char **val, size_t *len)
13241342
memcpy(*val, sm->state, l);
13251343
(*val)[l] = '\0';
13261344
sm->state += l;
1327-
*len = l;
1345+
if (len)
1346+
*len = l;
13281347
return 0;
13291348
}
13301349

@@ -1454,10 +1473,17 @@ int xmpp_conn_restore_sm_state(xmpp_conn_t *conn,
14541473
ret = sm_load_string(&sm, &item->data, &item->len);
14551474
if (ret)
14561475
goto err_reload;
1476+
if (sm.state < sm.state_end) {
1477+
ret = sm_load_string(&sm, &item->id, NULL);
1478+
if (ret)
1479+
goto err_reload;
1480+
}
14571481

14581482
item->owner = XMPP_QUEUE_USER;
14591483
}
14601484

1485+
assert(sm.state == sm.state_end);
1486+
14611487
return XMPP_EOK;
14621488

14631489
err_reload:
@@ -1496,6 +1522,7 @@ static size_t sm_state_serialize(xmpp_conn_t *conn, unsigned char **buf)
14961522
while (peek) {
14971523
sm_queue_len++;
14981524
sm_queue_size += 10 + peek->len;
1525+
if (peek->id) sm_queue_size += 5 + strlen(peek->id);
14991526
peek = peek->next;
15001527
}
15011528

@@ -1505,6 +1532,7 @@ static size_t sm_state_serialize(xmpp_conn_t *conn, unsigned char **buf)
15051532
while (peek) {
15061533
send_queue_len++;
15071534
send_queue_size += 5 + peek->len;
1535+
if (peek->id) send_queue_size += 5 + strlen(peek->id);
15081536
peek = peek->next;
15091537
}
15101538

@@ -1563,6 +1591,17 @@ static size_t sm_state_serialize(xmpp_conn_t *conn, unsigned char **buf)
15631591
goto err_serialize;
15641592
memcpy(next, peek->data, peek->len);
15651593
next += peek->len;
1594+
1595+
if (peek->id) {
1596+
uint32_t len = strlen(peek->id);
1597+
if (sm_store_u32(&next, end, 0x7a, len))
1598+
goto err_serialize;
1599+
if (next + len > end)
1600+
goto err_serialize;
1601+
memcpy(next, peek->id, len);
1602+
next += len;
1603+
}
1604+
15661605
peek = peek->next;
15671606
}
15681607

@@ -1813,6 +1852,10 @@ char *xmpp_conn_send_queue_drop_element(xmpp_conn_t *conn,
18131852
if (!t)
18141853
return NULL;
18151854

1855+
if (conn->sm_ack_callback && t->id) {
1856+
conn->sm_ack_callback(conn, conn->sm_ack_callback_ctx, t->id);
1857+
}
1858+
18161859
/* In case there exists a SM stanza that is linked to the
18171860
* one we're currently dropping, also delete that one.
18181861
*/
@@ -2088,6 +2131,10 @@ static void _conn_sm_handle_stanza(xmpp_conn_t *const conn,
20882131
e = pop_queue_front(&conn->sm_state->sm_queue);
20892132
strophe_debug_verbose(2, conn->ctx, "conn",
20902133
"SM_Q_DROP: %p, h=%lu", e, e->sm_h);
2134+
if (conn->sm_ack_callback && e->id) {
2135+
conn->sm_ack_callback(conn, conn->sm_ack_callback_ctx,
2136+
e->id);
2137+
}
20912138
c = queue_element_free(conn->ctx, e);
20922139
strophe_free(conn->ctx, c);
20932140
}
@@ -2115,6 +2162,7 @@ char *queue_element_free(xmpp_ctx_t *ctx, xmpp_send_queue_t *e)
21152162
{
21162163
char *ret = e->data;
21172164
strophe_debug_verbose(2, ctx, "conn", "Q_FREE: %p", e);
2165+
strophe_free(ctx, e->id);
21182166
memset(e, 0, sizeof(*e));
21192167
strophe_free(ctx, e);
21202168
strophe_debug_verbose(3, ctx, "conn", "Q_CONTENT: %s", ret);
@@ -2231,7 +2279,7 @@ void send_raw(xmpp_conn_t *conn,
22312279
return;
22322280
}
22332281

2234-
_send_raw(conn, d, len, owner, userdata);
2282+
_send_raw(conn, d, len, NULL, owner, userdata);
22352283
}
22362284

22372285
static void _send_valist(xmpp_conn_t *conn,
@@ -2266,7 +2314,7 @@ static void _send_valist(xmpp_conn_t *conn,
22662314
va_end(apdup);
22672315

22682316
/* len - 1 so we don't send trailing \0 */
2269-
_send_raw(conn, bigbuf, len - 1, owner, NULL);
2317+
_send_raw(conn, bigbuf, len - 1, NULL, owner, NULL);
22702318
} else {
22712319
/* go through send_raw() which does the strdup() for us */
22722320
send_raw(conn, buf, len, owner, NULL);
@@ -2300,7 +2348,8 @@ void send_stanza(xmpp_conn_t *conn,
23002348
goto out;
23012349
}
23022350

2303-
_send_raw(conn, buf, len, owner, NULL);
2351+
_send_raw(conn, buf, len, xmpp_stanza_get_attribute(stanza, "id"), owner,
2352+
NULL);
23042353
out:
23052354
xmpp_stanza_release(stanza);
23062355
}
@@ -2342,6 +2391,7 @@ xmpp_send_queue_t *pop_queue_front(xmpp_queue_t *queue)
23422391
static int _send_raw(xmpp_conn_t *conn,
23432392
char *data,
23442393
size_t len,
2394+
const char *id,
23452395
xmpp_send_queue_owner_t owner,
23462396
void *userdata)
23472397
{
@@ -2358,6 +2408,7 @@ static int _send_raw(xmpp_conn_t *conn,
23582408

23592409
item->data = data;
23602410
item->len = len;
2411+
item->id = id ? strophe_strdup(conn->ctx, id) : NULL;
23612412
item->next = NULL;
23622413
item->prev = conn->send_queue_tail;
23632414
item->written = 0;

strophe.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,16 @@ int xmpp_conn_restore_sm_state(xmpp_conn_t *conn,
448448
const unsigned char *sm_state,
449449
size_t sm_state_len);
450450

451+
typedef void (*xmpp_sm_ack_callback)(xmpp_conn_t *conn,
452+
void *ctx,
453+
const char *id);
454+
void xmpp_conn_set_sm_ack_callback(xmpp_conn_t *conn,
455+
xmpp_sm_ack_callback cb,
456+
void *ctx);
457+
void xmpp_conn_set_sm_fail_callback(xmpp_conn_t *conn,
458+
xmpp_sm_ack_callback cb,
459+
void *ctx);
460+
451461
void xmpp_free_sm_state(xmpp_sm_state_t *sm_state);
452462

453463
int xmpp_connect_client(xmpp_conn_t *conn,

0 commit comments

Comments
 (0)