Skip to content

Commit 53f8dc6

Browse files
committed
Add JMirror and Cisco encap format support to UDP sinks
Also: * use seqtrackerid to distribute CCs derived from vendmirrorid based intercepts amongst seqtracker threads, rather than always pushing them to thread 0.
1 parent fe79254 commit 53f8dc6

File tree

6 files changed

+103
-32
lines changed

6 files changed

+103
-32
lines changed

src/collector/alushim_parser.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ int check_alu_intercept(colthread_local_t *loc,
196196
}
197197

198198
/* Create an appropriate IPCC and export it */
199-
if (push_vendor_mirrored_ipcc_job(loc->zmq_pubsocks[0], &(alu->common),
199+
if (push_vendor_mirrored_ipcc_job(
200+
loc->zmq_pubsocks[alu->common.seqtrackerid], &(alu->common),
200201
trace_get_timeval(packet), cin, direction, l3, bodylen) == 0) {
201202
/* for some reason, we failed to create or send the IPCC to
202203
* the sequencing thread? */

src/collector/cisco_parser.c

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,30 @@ static inline uint32_t ciscomirror_get_intercept_id(ciscomirror_hdr_t *hdr) {
4343
return ntohl(hdr->interceptid);
4444
}
4545

46+
uint8_t *decode_cisco_from_udp_payload(uint8_t *payload, uint32_t plen,
47+
uint32_t *shimintid, uint32_t *bodylen) {
48+
49+
ciscomirror_hdr_t *header;
50+
uint32_t rem;
51+
uint8_t *l3;
52+
53+
if (payload == NULL || plen < sizeof(ciscomirror_hdr_t)) {
54+
return NULL;
55+
}
56+
rem = plen;
57+
header = (ciscomirror_hdr_t *)payload;
58+
*shimintid = ciscomirror_get_intercept_id(header);
59+
60+
rem -= sizeof(ciscomirror_hdr_t);
61+
l3 = ((uint8_t *)header) + sizeof(ciscomirror_hdr_t);
62+
63+
if (rem == 0) {
64+
return 0;
65+
}
66+
*bodylen = rem;
67+
return l3;
68+
}
69+
4670
/** Converts a Cisco LI-mirrored packet directly into a CC encoding job for
4771
* any intercepts that have requested its vendor mirror ID.
4872
*
@@ -64,31 +88,23 @@ int generate_cc_from_cisco(colthread_local_t *loc,
6488
libtrace_packet_t *packet, packet_info_t *pinfo,
6589
vendmirror_intercept_list_t *ciscomirrors) {
6690

67-
ciscomirror_hdr_t *hdr = NULL;
6891
void *payload;
6992
uint8_t *l3;
70-
uint32_t rem, cept_id;
93+
uint32_t rem, cept_id, bodylen;
7194
vendmirror_intercept_t *cept, *tmp;
7295
vendmirror_intercept_list_t *vmilist;
7396

7497
payload = get_udp_payload(packet, &rem, NULL, NULL);
75-
if (rem < sizeof(ciscomirror_hdr_t) || payload == NULL) {
98+
l3 = decode_cisco_from_udp_payload(payload, rem, &cept_id, &bodylen);
99+
if (l3 == NULL) {
76100
return 0;
77101
}
78-
hdr = (ciscomirror_hdr_t *)payload;
79-
80-
cept_id = ciscomirror_get_intercept_id(hdr);
81102

82103
HASH_FIND(hh, ciscomirrors, &cept_id, sizeof(cept_id), vmilist);
83104
if (vmilist == NULL) {
84105
return 0;
85106
}
86-
rem -= sizeof(ciscomirror_hdr_t);
87-
l3 = ((uint8_t *)payload) + sizeof(ciscomirror_hdr_t);
88107

89-
if (rem == 0) {
90-
return 0;
91-
}
92108
HASH_ITER(hh, vmilist->intercepts, cept, tmp) {
93109
if (pinfo->tv.tv_sec < cept->common.tostart_time) {
94110
continue;
@@ -98,9 +114,10 @@ int generate_cc_from_cisco(colthread_local_t *loc,
98114
continue;
99115
}
100116
/* Create an appropriate IPCC and export it */
101-
if (push_vendor_mirrored_ipcc_job(loc->zmq_pubsocks[0], &(cept->common),
117+
if (push_vendor_mirrored_ipcc_job(
118+
loc->zmq_pubsocks[cept->common.seqtrackerid], &(cept->common),
102119
trace_get_timeval(packet), cept_id, ETSI_DIR_INDETERMINATE,
103-
l3, rem) == 0) {
120+
l3, bodylen) == 0) {
104121
/* for some reason, we failed to create or send the IPCC to
105122
* the sequencing thread? */
106123

src/collector/cisco_parser.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,21 @@
3232
#include "coreserver.h"
3333
#include "intercept.h"
3434

35+
/** Given the UDP payload of a Cisco LI-mirrored packet, this method will
36+
* skip past the shim header and return a pointer to the original packet
37+
* along with the intercept ID.
38+
*
39+
* @param payload The UDP payload from the mirrored packet
40+
* @param plen The length of the UDP payload
41+
* @param[out] shimintid Updated to contain the intercept ID from the
42+
* shim header
43+
* @param[out] bodylen The amount of bytes remaining after stripping
44+
* the shim header
45+
* @return a pointer to the first byte immediately after the shim header
46+
*/
47+
uint8_t *decode_cisco_from_udp_payload(uint8_t *payload, uint32_t plen,
48+
uint32_t *shimintid, uint32_t *bodylen);
49+
3550
/** Converts a Cisco LI-mirrored packet directly into a CC encoding job for
3651
* any intercepts that have requested its vendor mirror ID.
3752
*

src/collector/jmirror_parser.c

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,42 +44,58 @@ static inline uint32_t jmirror_get_interceptid(jmirrorhdr_t *header) {
4444
return (ntohl(header->interceptid) & 0x3fffffff);
4545
}
4646

47+
uint8_t *decode_jmirror_from_udp_payload(uint8_t *payload, uint32_t plen,
48+
uint32_t *cin, uint32_t *shimintid, uint32_t *bodylen) {
49+
50+
jmirrorhdr_t *header;
51+
uint32_t rem = plen;
52+
uint8_t *l3;
53+
54+
header = (jmirrorhdr_t *)payload;
55+
if (!header || rem < sizeof(jmirrorhdr_t)) {
56+
return NULL;
57+
}
58+
59+
*shimintid = jmirror_get_interceptid(header);
60+
*cin = ntohl(header->sessionid);
61+
62+
rem -= sizeof(jmirrorhdr_t);
63+
l3 = ((uint8_t *)header) + sizeof(jmirrorhdr_t);
64+
65+
if (rem == 0) {
66+
return NULL;
67+
}
68+
*bodylen = rem;
69+
return l3;
70+
}
71+
4772
int check_jmirror_intercept(colthread_local_t *loc,
4873
libtrace_packet_t *packet, packet_info_t *pinfo,
4974
coreserver_t *jmirror_sources,
5075
vendmirror_intercept_list_t *jmirror_ints) {
5176

5277
coreserver_t *cs;
53-
jmirrorhdr_t *header = NULL;
54-
uint32_t rem = 0, cept_id, cin;
78+
uint32_t rem = 0, cept_id, cin, bodylen;
5579
vendmirror_intercept_t *cept, *tmp;
5680
vendmirror_intercept_list_t *vmilist;
57-
char *l3;
81+
uint8_t *l3, *start;
5882

5983
if ((cs = match_packet_to_coreserver(jmirror_sources, pinfo, 1)) == NULL) {
6084
return 0;
6185
}
6286

63-
header = (jmirrorhdr_t *)get_udp_payload(packet, &rem, NULL, NULL);
64-
if (rem < sizeof(jmirrorhdr_t) || header == NULL) {
87+
start = get_udp_payload(packet, &rem, NULL, NULL);
88+
l3 = decode_jmirror_from_udp_payload(start, rem, &cin, &cept_id,
89+
&bodylen);
90+
if (l3 == NULL) {
6591
return 0;
6692
}
6793

68-
cept_id = jmirror_get_interceptid(header);
69-
7094
HASH_FIND(hh, jmirror_ints, &cept_id, sizeof(cept_id), vmilist);
7195
if (vmilist == NULL) {
7296
return 0;
7397
}
7498

75-
rem -= sizeof(jmirrorhdr_t);
76-
l3 = ((char *)header) + sizeof(jmirrorhdr_t);
77-
78-
if (rem == 0) {
79-
return 0;
80-
}
81-
cin = ntohl(header->sessionid);
82-
8399
HASH_ITER(hh, vmilist->intercepts, cept, tmp) {
84100
if (pinfo->tv.tv_sec < cept->common.tostart_time) {
85101
continue;
@@ -89,9 +105,10 @@ int check_jmirror_intercept(colthread_local_t *loc,
89105
continue;
90106
}
91107
/* Create an appropriate IPCC and export it */
92-
if (push_vendor_mirrored_ipcc_job(loc->zmq_pubsocks[0], &(cept->common),
108+
if (push_vendor_mirrored_ipcc_job(
109+
loc->zmq_pubsocks[cept->common.seqtrackerid], &(cept->common),
93110
trace_get_timeval(packet), cin, ETSI_DIR_INDETERMINATE,
94-
l3, rem) == 0) {
111+
l3, bodylen) == 0) {
95112
/* for some reason, we failed to create or send the IPCC to
96113
* the sequencing thread? */
97114

src/collector/jmirror_parser.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
#include "coreserver.h"
3232
#include "intercept.h"
3333

34+
uint8_t *decode_jmirror_from_udp_payload(uint8_t *payload, uint32_t plen,
35+
uint32_t *cin, uint32_t *shimintid, uint32_t *bodylen);
36+
3437
int check_jmirror_intercept(colthread_local_t *loc,
3538
libtrace_packet_t *packet, packet_info_t *pinfo,
3639
coreserver_t *alusources, vendmirror_intercept_list_t *jmirrors);

src/collector/udp_sink_worker.c

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
#include "collector_sync.h"
3232
#include "ipiri.h"
3333
#include "alushim_parser.h"
34+
#include "jmirror_parser.h"
35+
#include "cisco_parser.h"
3436

3537
#include <zmq.h>
3638
#include <unistd.h>
@@ -368,8 +370,24 @@ static int process_udp_datagram(udp_sink_local_t *local, char *key) {
368370
return 0;
369371
}
370372

373+
} else if (local->encapfmt == INTERCEPT_UDP_ENCAP_FORMAT_JMIRROR) {
374+
uint32_t shimintid = 0;
375+
skipptr = decode_jmirror_from_udp_payload(recvbuf, got, &cin,
376+
&shimintid, &iplen);
377+
if (skipptr == NULL) {
378+
return 0;
379+
}
380+
dir = local->direction;
381+
} else if (local->encapfmt == INTERCEPT_UDP_ENCAP_FORMAT_CISCO) {
382+
uint32_t shimintid = 0;
383+
skipptr = decode_cisco_from_udp_payload(recvbuf, got, &shimintid,
384+
&iplen);
385+
if (skipptr == NULL) {
386+
return 0;
387+
}
388+
dir = local->direction;
389+
cin = local->cin;
371390
} else {
372-
// TODO implement other encap methods
373391
return 0;
374392
}
375393

0 commit comments

Comments
 (0)