Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions worker/include/dpdk_filter/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

#ifdef DEBUG
#define LOG_INFO(info, ...) \
fprintf(stdout, "[INFO] %s: %d: " info "\n", __func__, __LINE__, \
fprintf(stderr, "[INFO] %s: %d: " info "\n", __func__, __LINE__, \
##__VA_ARGS__)

#define LOG_ERROR(error, ...) \
fprintf(stdout, "[ERROR] %s: %d: " error "\n", __func__, __LINE__, \
fprintf(stderr, "[ERROR] %s: %d: " error "\n", __func__, __LINE__, \
##__VA_ARGS__)

#define LOG_WARNING(warning, ...) \
fprintf(stdout, "[WARNING] %s: %d: " warning "\n", __func__, __LINE__, \
fprintf(stderr, "[WARNING] %s: %d: " warning "\n", __func__, __LINE__, \
##__VA_ARGS__)

#else
Expand All @@ -24,11 +24,11 @@
} while (0)

#define LOG_ERROR(error, ...) \
fprintf(stdout, "[ERROR] %s: %d: " error "\n", __func__, __LINE__, \
fprintf(stderr, "[ERROR] %s: %d: " error "\n", __func__, __LINE__, \
##__VA_ARGS__)

#define LOG_WARNING(warning, ...) \
fprintf(stdout, "[WARNING] %s: %d: " warning "\n", __func__, __LINE__, \
fprintf(stderr, "[WARNING] %s: %d: " warning "\n", __func__, __LINE__, \
##__VA_ARGS__)

#endif
Expand Down
8 changes: 4 additions & 4 deletions worker/src/dpdk_filter/filtr_packets.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ bool main_filtring_by_domain(struct requested_classification *req_clas,
struct info_of_pakage *info_pac) {

if (check_domain_is_block(info_pac->domain, policy->block_domains) == true) {
LOG_INFO("This domain is blocked");
LOG_INFO("Domain '%s' is blocked", info_pac->domain);
return false;
}

if (check_domain_is_allow(info_pac->domain, policy->allow_domains) == true) {
LOG_INFO("This domain is allowed");
LOG_INFO("Domain '%s' is allowed", info_pac->domain);
return true;
}

Expand All @@ -180,12 +180,12 @@ bool main_filtring_by_ip(struct requested_classification *req_clas,
struct info_of_pakage *info_pac) {

if (check_ip_is_block(info_pac, policy) == true) {
LOG_INFO("This ip is blocked");
LOG_INFO("IPv%d dst is blocked", info_pac->ip_version == IP_4 ? 4 : 6);
return false;
}

if (check_ip_is_allow(info_pac, policy) == true) {
LOG_INFO("This ip is allowed");
LOG_INFO("IPv%d dst is allowed", info_pac->ip_version == IP_4 ? 4 : 6);
return true;
}

Expand Down
10 changes: 10 additions & 0 deletions worker/src/dpdk_filter/pars_packets.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void parsing_pakage(struct rte_mbuf *packet, struct info_of_pakage *info_pac) {
uint32_t l3_offset = hdr_lens.l2_len;

if (pkt_type & RTE_PTYPE_L3_IPV4) {
LOG_INFO("IPv4 packet detected");
info_pac->ip_version = IP_4;
info_pac->ethernet_type_protocol = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);

Expand All @@ -37,12 +38,15 @@ void parsing_pakage(struct rte_mbuf *packet, struct info_of_pakage *info_pac) {
struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(
packet, struct rte_tcp_hdr *, l3_offset + hdr_lens.l3_len);
info_pac->number_port = tcp_hdr->dst_port;
LOG_INFO("TCP dst port: %hu", ntohs(info_pac->number_port));
} else if (pkt_type & RTE_PTYPE_L4_UDP) {
struct rte_udp_hdr *udp_hdr = rte_pktmbuf_mtod_offset(
packet, struct rte_udp_hdr *, l3_offset + hdr_lens.l3_len);
info_pac->number_port = udp_hdr->dst_port;
LOG_INFO("UDP dst port: %hu", ntohs(info_pac->number_port));
}
} else if (pkt_type & RTE_PTYPE_L3_IPV6) {
LOG_INFO("IPv6 packet detected");
info_pac->ip_version = IP_6;
info_pac->ethernet_type_protocol = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV6);

Expand All @@ -58,15 +62,18 @@ void parsing_pakage(struct rte_mbuf *packet, struct info_of_pakage *info_pac) {
struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(
packet, struct rte_tcp_hdr *, l3_offset + hdr_lens.l3_len);
info_pac->number_port = tcp_hdr->dst_port;
LOG_INFO("TCP dst port: %hu", ntohs(info_pac->number_port));
} else if (pkt_type & RTE_PTYPE_L4_UDP) {
struct rte_udp_hdr *udp_hdr = rte_pktmbuf_mtod_offset(
packet, struct rte_udp_hdr *, l3_offset + hdr_lens.l3_len);
info_pac->number_port = udp_hdr->dst_port;
LOG_INFO("UDP dst port: %hu", ntohs(info_pac->number_port));
}
}

if ((pkt_type & RTE_PTYPE_L4_UDP) &&
info_pac->number_port == rte_cpu_to_be_16(53)) {
LOG_INFO("DNS query detected");
uint32_t l4_offset = l3_offset + hdr_lens.l3_len + hdr_lens.l4_len;
uint8_t *udp_payload =
rte_pktmbuf_mtod_offset(packet, uint8_t *, l4_offset);
Expand Down Expand Up @@ -95,6 +102,9 @@ void parsing_pakage(struct rte_mbuf *packet, struct info_of_pakage *info_pac) {
pos += label_len;
}
*dst = '\0';
LOG_INFO("Domain extracted: %s", info_pac->domain);
} else {
LOG_WARNING("Malformed DNS name field in packet");
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions worker/src/dpdk_filter/proc_packets.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ void pakage_processing(struct net_port *port_in, struct net_port *port_out,
uint16_t nb_rx =
rte_eth_rx_burst(port_in->port_id, queue_number, pkts, nb_pkts);

if (nb_rx > 0) {
LOG_INFO("Received %hu packets on queue %hu", nb_rx, queue_number);
}
if (atomic_load(&filtring_is_turned_off)) {
for (int i = 0; i < nb_rx; i++) {
package_sending_decision(true, pkts[i], port_out, queue_number);
Expand All @@ -62,6 +65,8 @@ void pakage_processing(struct net_port *port_in, struct net_port *port_out,
struct node_cache_ip *cached_node_ip = NULL;

if (check_is_exception(&info_pac.number_port) == true) {
LOG_INFO("Exception port %hu, forwarding to exception port",
ntohs(info_pac.number_port));
package_sending_decision(true, pkts[i], port_exception, queue_number);
continue;
}
Expand All @@ -79,9 +84,12 @@ void pakage_processing(struct net_port *port_in, struct net_port *port_out,
}

if (ret >= 0 && cached_node_ip) {
LOG_INFO("IP cache hit, decision: %s",
cached_node_ip->solution_is_send ? "send" : "drop");
package_sending_decision(cached_node_ip->solution_is_send, pkts[i],
port_out, queue_number);
} else if (ret == -ENOENT) {
LOG_INFO("IP cache miss, applying filter");

struct requested_classification req_clas; // query to ip controller

Expand Down Expand Up @@ -140,16 +148,22 @@ void pakage_processing(struct net_port *port_in, struct net_port *port_out,
struct node_cache_domain *cached_node_domain = NULL;

if (check_is_exception(&info_pac.number_port) == true) {
LOG_INFO("Exception port %hu, forwarding to exception port",
ntohs(info_pac.number_port));
package_sending_decision(true, pkts[i], port_exception, queue_number);
continue;
}

int ret = lookup_dns_cache(info_pac.domain, &cached_node_domain);

if (ret >= 0 && cached_node_domain) {
LOG_INFO("Domain cache hit for '%s', decision: %s", info_pac.domain,
cached_node_domain->solution_is_send ? "send" : "drop");
package_sending_decision(cached_node_domain->solution_is_send, pkts[i],
port_out, queue_number);
} else if (ret == -ENOENT) {
LOG_INFO("Domain cache miss for '%s', applying filter",
info_pac.domain);

struct requested_classification req_clas; // query to domain controller

Expand Down
1 change: 1 addition & 0 deletions worker/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ int main(int argc, char **argv) {
return 1;
}

spdlog::info("Starting worker ID: {}", worker_id);
spdlog::info("Initialize MetricsCollector with {}:{}", gateway_address,
gateway_port);

Expand Down
5 changes: 4 additions & 1 deletion worker/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void Worker::requestPolicyFromController() {
current_policy.ttl_domain = pol.ttl_domain();

current_config_version = pol.config_version();

spdlog::info("Clearing cache due to policy update");
clear_ip_cache();
clear_dns_cache();
spdlog::info("POLICY LOADED");
Expand Down Expand Up @@ -345,6 +345,7 @@ Worker::Worker(uint64_t id) : worker_id(id), state(WorkerState::FREE) {
auto channel =
grpc::CreateChannel(controller_addr, grpc::InsecureChannelCredentials());
stub_ = DataService::NewStub(channel);
spdlog::info("Worker ID: {}", worker_id);
spdlog::info("gRPC channel created to {}", controller_addr);
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
Expand Down Expand Up @@ -409,6 +410,7 @@ void Worker::MainLoop() {
last_stats_time = now;
stats_interval =
MIN_STATS_TIME + (rand() % (MAX_STATS_TIME - MIN_STATS_TIME + 1));
spdlog::info("Next stats report in {}s", stats_interval);
}

int64_t seconds_since_policy = (now - last_policy_time) / 1s;
Expand All @@ -417,6 +419,7 @@ void Worker::MainLoop() {
last_policy_time = now;
policy_interval =
MIN_POLICY_TIME + (rand() % (MAX_POLICY_TIME - MIN_POLICY_TIME + 1));
spdlog::info("Next policy request in {}s", policy_interval);
}
}

Expand Down
Loading