From ec256cd754f8dcf537f72533fec06c3d90c302bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Dosko=C4=8Dil?= <jan.doskocil@nic.cz> Date: Tue, 13 Aug 2024 11:31:45 +0200 Subject: [PATCH] kxdpgun: refactoring --- src/utils/kxdpgun/main.c | 212 +++++++++++++++++++++------------------ 1 file changed, 113 insertions(+), 99 deletions(-) diff --git a/src/utils/kxdpgun/main.c b/src/utils/kxdpgun/main.c index 91a657c14a..5b19ccac61 100644 --- a/src/utils/kxdpgun/main.c +++ b/src/utils/kxdpgun/main.c @@ -14,12 +14,15 @@ along with this program. If not, see <https://www.gnu.org/licenses/>. */ +#include <arpa/inet.h> #include <assert.h> #include <errno.h> #include <getopt.h> #include <ifaddrs.h> #include <inttypes.h> #include <net/if.h> +#include <netdb.h> +#include <netinet/in.h> #include <poll.h> #include <pthread.h> #include <signal.h> @@ -28,16 +31,11 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#include <time.h> -#include <unistd.h> -#include <netdb.h> - -#include <arpa/inet.h> -#include <netinet/in.h> -#include <net/if.h> #include <sys/ioctl.h> -#include <sys/socket.h> #include <sys/resource.h> +#include <sys/socket.h> +#include <time.h> +#include <unistd.h> #include "libknot/libknot.h" #include "libknot/xdp.h" @@ -48,13 +46,10 @@ #endif // ENABLE_QUIC #include "contrib/atomic.h" #include "contrib/macros.h" -#include "contrib/mempattern.h" -#include "contrib/openbsd/strlcat.h" #include "contrib/openbsd/strlcpy.h" #include "contrib/os.h" #include "contrib/sockaddr.h" #include "contrib/toeplitz.h" -#include "contrib/ucw/mempool.h" #include "utils/common/msg.h" #include "utils/common/params.h" #include "utils/kxdpgun/ip_route.h" @@ -76,11 +71,11 @@ volatile knot_atomic_uint64_t stats_trigger = 0; unsigned global_cpu_aff_start = 0; unsigned global_cpu_aff_step = 1; -#define REMOTE_PORT_DEFAULT 53 -#define REMOTE_PORT_DOQ_DEFAULT 853 -#define LOCAL_PORT_MIN 2000 -#define LOCAL_PORT_MAX 65535 -#define QUIC_THREAD_PORTS 100 +#define REMOTE_PORT_DEFAULT 53 +#define REMOTE_PORT_DOQ_DEFAULT 853 +#define LOCAL_PORT_MIN 2000 +#define LOCAL_PORT_MAX 65535 +#define QUIC_THREAD_PORTS 100 #define RCODE_MAX (0x0F + 1) @@ -94,6 +89,8 @@ typedef struct { uint64_t rst_recv; uint64_t size_recv; uint64_t wire_recv; + uint64_t errors; + uint64_t lost; uint64_t rcodes_recv[RCODE_MAX]; pthread_mutex_t mutex; } kxdpgun_stats_t; @@ -119,27 +116,27 @@ typedef struct { struct sockaddr_in6 target_ip; struct sockaddr_storage target_ip_ss; }; - char dev[IFNAMSIZ]; - uint64_t qps, duration; - unsigned at_once; - uint16_t msgid; - uint16_t edns_size; - uint16_t vlan_tci; - uint8_t local_mac[6], target_mac[6]; - uint8_t local_ip_range; - bool ipv6; - bool tcp; - bool quic; - bool quic_full_handshake; - const char *qlog_dir; - const char *sending_mode; - xdp_gun_ignore_t ignore1; - knot_tcp_ignore_t ignore2; - uint16_t target_port; + char dev[IFNAMSIZ]; + uint64_t qps, duration; + unsigned at_once; + uint16_t msgid; + uint16_t edns_size; + uint16_t vlan_tci; + uint8_t local_mac[6], target_mac[6]; + uint8_t local_ip_range; + bool ipv6; + bool tcp; + bool quic; + bool quic_full_handshake; + const char *qlog_dir; + const char *sending_mode; + xdp_gun_ignore_t ignore1; + knot_tcp_ignore_t ignore2; + uint16_t target_port; knot_xdp_filter_flag_t flags; - unsigned n_threads, thread_id; - knot_eth_rss_conf_t *rss_conf; - knot_xdp_config_t xdp_config; + unsigned n_threads, thread_id; + knot_eth_rss_conf_t *rss_conf; + knot_xdp_config_t xdp_config; } xdp_gun_ctx_t; const static xdp_gun_ctx_t ctx_defaults = { @@ -180,6 +177,8 @@ static void clear_stats(kxdpgun_stats_t *st) st->size_recv = 0; st->wire_recv = 0; st->collected = 0; + st->lost = 0; + st->errors = 0; memset(st->rcodes_recv, 0, sizeof(st->rcodes_recv)); pthread_mutex_unlock(&st->mutex); } @@ -187,7 +186,7 @@ static void clear_stats(kxdpgun_stats_t *st) static size_t collect_stats(kxdpgun_stats_t *into, const kxdpgun_stats_t *what) { pthread_mutex_lock(&into->mutex); - into->duration = MAX(into->duration, what->duration); + into->duration = MAX(into->duration, what->duration); into->qry_sent += what->qry_sent; into->synack_recv += what->synack_recv; into->ans_recv += what->ans_recv; @@ -195,6 +194,8 @@ static size_t collect_stats(kxdpgun_stats_t *into, const kxdpgun_stats_t *what) into->rst_recv += what->rst_recv; into->size_recv += what->size_recv; into->wire_recv += what->wire_recv; + into->lost += what->lost; + into->errors += what->errors; for (int i = 0; i < RCODE_MAX; i++) { into->rcodes_recv[i] += what->rcodes_recv[i]; } @@ -268,7 +269,8 @@ static unsigned addr_bits(bool ipv6) return ipv6 ? 128 : 32; } -static void shuffle_sockaddr4(struct sockaddr_in *dst, struct sockaddr_in *src, uint64_t increment) +static void shuffle_sockaddr4(struct sockaddr_in *dst, struct sockaddr_in *src, + uint64_t increment) { memcpy(&dst->sin_addr, &src->sin_addr, sizeof(dst->sin_addr)); if (increment > 0) { @@ -276,7 +278,8 @@ static void shuffle_sockaddr4(struct sockaddr_in *dst, struct sockaddr_in *src, } } -static void shuffle_sockaddr6(struct sockaddr_in6 *dst, struct sockaddr_in6 *src, uint64_t increment) +static void shuffle_sockaddr6(struct sockaddr_in6 *dst, struct sockaddr_in6 *src, + uint64_t increment) { memcpy(&dst->sin6_addr, &src->sin6_addr, sizeof(dst->sin6_addr)); if (increment > 0) { @@ -294,7 +297,8 @@ static void shuffle_sockaddr(struct sockaddr_in6 *dst, struct sockaddr_in6 *src, if (src->sin6_family == AF_INET6) { shuffle_sockaddr6(dst, src, increment); } else { - shuffle_sockaddr4((struct sockaddr_in *)dst, (struct sockaddr_in *)src, increment); + shuffle_sockaddr4((struct sockaddr_in *)dst, (struct sockaddr_in *)src, + increment); } } @@ -312,7 +316,8 @@ static void next_payload(struct pkt_payload **payload, int increment) } } -static void put_dns_payload(struct iovec *put_into, bool zero_copy, xdp_gun_ctx_t *ctx, struct pkt_payload **payl) +static void put_dns_payload(struct iovec *put_into, bool zero_copy, xdp_gun_ctx_t *ctx, + struct pkt_payload **payl) { if (zero_copy) { put_into->iov_base = (*payl)->payload; @@ -473,13 +478,37 @@ static void quic_free_cb(knot_quic_reply_t *rpl) } #endif // ENABLE_QUIC +static void print_thrd_summary(const xdp_gun_ctx_t *ctx, const kxdpgun_stats_t *st) { + char recv_str[40] = "", lost_str[40] = "", err_str[40] = ""; + if (!(ctx->flags & KNOT_XDP_FILTER_DROP)) { + (void)snprintf(recv_str, sizeof(recv_str), ", received %"PRIu64, st->ans_recv); + } + if (st->lost > 0) { + (void)snprintf(lost_str, sizeof(lost_str), ", lost %"PRIu64, st->lost); + } + if (st->errors > 0) { + (void)snprintf(err_str, sizeof(err_str), ", errors %"PRIu64, st->errors); + } + INFO2("thread#%02u: sent %"PRIu64"%s%s%s", + ctx->thread_id, st->qry_sent, recv_str, lost_str, err_str); +} + +static void print_stats_header(const xdp_gun_ctx_t *ctx) { + INFO2("using interface %s, XDP threads %u, IPv%c/%s%s%s, %s mode", ctx->dev, ctx->n_threads, + (ctx->ipv6 ? '6' : '4'), + (ctx->tcp ? "TCP" : ctx->quic ? "QUIC" : "UDP"), + (ctx->sending_mode[0] != '\0' ? " mode " : ""), + (ctx->sending_mode[0] != '\0' ? ctx->sending_mode : ""), + (knot_eth_xdp_mode(if_nametoindex(ctx->dev)) == KNOT_XDP_MODE_FULL ? "native" : "emulated")); +} + void *xdp_gun_thread(void *_ctx) { xdp_gun_ctx_t *ctx = _ctx; struct knot_xdp_socket *xsk = NULL; struct timespec timer; knot_xdp_msg_t pkts[ctx->at_once]; - uint64_t errors = 0, lost = 0, duration = 0; + uint64_t duration = 0; kxdpgun_stats_t local_stats = { 0 }; unsigned stats_triggered = 0; knot_tcp_table_t *tcp_table = NULL; @@ -507,7 +536,8 @@ void *xdp_gun_thread(void *_ctx) ERR2("failed to initialize QUIC context"); goto cleanup; } - quic_table = knot_quic_table_new(ctx->qps * 100, SIZE_MAX, SIZE_MAX, 1232, quic_creds); + quic_table = knot_quic_table_new(ctx->qps * 100, SIZE_MAX, SIZE_MAX, + 1232, quic_creds); if (quic_table == NULL) { ERR2("failed to allocate QUIC connection table"); goto cleanup; @@ -518,12 +548,12 @@ void *xdp_gun_thread(void *_ctx) #endif // ENABLE_QUIC } - knot_xdp_load_bpf_t mode = (ctx->thread_id == 0 ? - KNOT_XDP_LOAD_BPF_ALWAYS : KNOT_XDP_LOAD_BPF_NEVER); + knot_xdp_load_bpf_t mode = + (ctx->thread_id == 0 ? KNOT_XDP_LOAD_BPF_ALWAYS : KNOT_XDP_LOAD_BPF_NEVER); /* * This mutex prevents libbpf from logging: * 'libbpf: can't get link by id (5535): Resource temporarily unavailable' - */ + */ pthread_mutex_lock(&global_stats.mutex); int ret = knot_xdp_init(&xsk, ctx->dev, ctx->thread_id, ctx->flags, LOCAL_PORT_MIN, LOCAL_PORT_MIN, mode, &ctx->xdp_config); @@ -535,13 +565,7 @@ void *xdp_gun_thread(void *_ctx) } if (ctx->thread_id == 0) { - INFO2("using interface %s, XDP threads %u, IPv%c/%s%s%s, %s mode", - ctx->dev, ctx->n_threads, (ctx->ipv6 ? '6' : '4'), - (ctx->tcp ? "TCP" : ctx->quic ? "QUIC" : "UDP"), - (ctx->sending_mode[0] != '\0' ? " mode " : ""), - (ctx->sending_mode[0] != '\0' ? ctx->sending_mode : ""), - (knot_eth_xdp_mode(if_nametoindex(ctx->dev)) == KNOT_XDP_MODE_FULL ? - "native" : "emulated")); + print_stats_header(ctx); } struct pollfd pfd = { knot_xdp_socket_fd(xsk), POLLIN, 0 }; @@ -590,21 +614,20 @@ void *xdp_gun_thread(void *_ctx) timer_start(&timer); while (duration < ctx->duration + extra_wait) { - // sending part if (duration < ctx->duration) { while (1) { knot_xdp_send_prepare(xsk); unsigned alloced = alloc_pkts(pkts, xsk, ctx, tick); if (alloced < ctx->at_once) { - lost += ctx->at_once - alloced; + local_stats.lost += ctx->at_once - alloced; if (alloced == 0) { break; } } if (ctx->tcp) { - for (int i = 0; i < alloced; i++) { + for (uint32_t i = 0; i < alloced; i++) { pkts[i].payload.iov_len = 0; if (!EMPTY_LIST(reuse_conns)) { @@ -678,7 +701,7 @@ void *xdp_gun_thread(void *_ctx) #endif // ENABLE_QUIC break; } else { - for (int i = 0; i < alloced; i++) { + for (uint32_t i = 0; i < alloced; i++) { put_dns_payload(&pkts[i].payload, false, ctx, &payload_ptr); } @@ -686,7 +709,7 @@ void *xdp_gun_thread(void *_ctx) uint32_t really_sent = 0; if (knot_xdp_send(xsk, pkts, alloced, &really_sent) != KNOT_EOK) { - lost += alloced; + local_stats.lost += alloced; } local_stats.qry_sent += really_sent; (void)knot_xdp_send_finish(xsk); @@ -700,7 +723,7 @@ void *xdp_gun_thread(void *_ctx) while (1) { ret = poll(&pfd, 1, 0); if (ret < 0) { - errors++; + local_stats.errors++; break; } if (!pfd.revents) { @@ -720,7 +743,7 @@ void *xdp_gun_thread(void *_ctx) knot_tcp_relay_t *rl = &relays[i]; ret = knot_tcp_recv(rl, &pkts[i], tcp_table, NULL, ctx->ignore2); if (ret != KNOT_EOK) { - errors++; + local_stats.errors++; continue; } @@ -736,7 +759,7 @@ void *xdp_gun_thread(void *_ctx) (ctx->ignore1 & KXDPGUN_IGNORE_LASTBYTE), payl.iov_base, payl.iov_len); if (ret != KNOT_EOK) { - errors++; + local_stats.errors++; } break; case XDP_TCP_CLOSE: @@ -765,7 +788,7 @@ void *xdp_gun_thread(void *_ctx) ret = knot_tcp_send(xsk, relays, recvd, ctx->at_once); if (ret != KNOT_EOK) { - errors++; + local_stats.errors++; } (void)knot_xdp_send_finish(xsk); @@ -787,7 +810,7 @@ void *xdp_gun_thread(void *_ctx) knot_quic_cleanup(&conn, 1); continue; } else if (ret != 0) { - errors++; + local_stats.errors++; knot_quic_cleanup(&conn, 1); break; } @@ -839,7 +862,9 @@ void *xdp_gun_thread(void *_ctx) knot_quic_cleanup(&conn, 1); continue; } else if ((ctx->ignore1 & KXDPGUN_REUSE_CONN)) { - if (conn->streams_count > 1) { // keep the number of outstanding streams below MAX_STREAMS_PER_CONN, while preserving at least one at all times + /* keep the number of outstanding streams below MAX_STREAMS_PER_CONN, + * while preserving at least one at all times */ + if (conn->streams_count > 1) { knot_quic_conn_stream_free(conn, conn->streams_first * 4); } ptrlist_add(&reuse_conns, conn, NULL); @@ -848,27 +873,28 @@ void *xdp_gun_thread(void *_ctx) ret = knot_quic_send(quic_table, conn, &quic_reply, 4, (ctx->ignore1 & KXDPGUN_IGNORE_LASTBYTE) ? KNOT_QUIC_SEND_IGNORE_LASTBYTE : 0); if (ret != KNOT_EOK) { - errors++; + local_stats.errors++; } - if (!(ctx->ignore1 & KXDPGUN_IGNORE_CLOSE) && (conn->flags & KNOT_QUIC_CONN_SESSION_TAKEN) && - stream0 != NULL && stream0->inbufs != NULL && stream0->inbufs->n_inbufs == 0) { + if (!(ctx->ignore1 & KXDPGUN_IGNORE_CLOSE) + && (conn->flags & KNOT_QUIC_CONN_SESSION_TAKEN) + && stream0 != NULL && stream0->inbufs != NULL + && stream0->inbufs->n_inbufs == 0) { assert(!(ctx->ignore2 & XDP_TCP_IGNORE_DATA_ACK)); quic_reply.handle_ret = KNOT_QUIC_HANDLE_RET_CLOSE; ret = knot_quic_send(quic_table, conn, &quic_reply, 1, 0); knot_quic_table_rem(conn, quic_table); knot_quic_cleanup(&conn, 1); if (ret != KNOT_EOK) { - errors++; + local_stats.errors++; } } } (void)knot_xdp_send_finish(xsk); #endif // ENABLE_QUIC } else { - for (int i = 0; i < recvd; i++) { - (void)check_dns_payload(&pkts[i].payload, ctx, - &local_stats); + for (uint32_t i = 0; i < recvd; i++) { + check_dns_payload(&pkts[i].payload, ctx, &local_stats); } } local_stats.wire_recv += wire; @@ -897,9 +923,7 @@ void *xdp_gun_thread(void *_ctx) size_t collected = collect_stats(&global_stats, &local_stats); assert(collected <= ctx->n_threads); if (collected == ctx->n_threads) { - print_stats(&global_stats, ctx->tcp, ctx->quic, - !(ctx->flags & KNOT_XDP_FILTER_DROP), - ctx->qps * ctx->n_threads); + print_stats(&global_stats, ctx->tcp, ctx->quic, !(ctx->flags & KNOT_XDP_FILTER_DROP), ctx->qps * ctx->n_threads); clear_stats(&global_stats); } } @@ -912,18 +936,8 @@ void *xdp_gun_thread(void *_ctx) tick++; } - char recv_str[40] = "", lost_str[40] = "", err_str[40] = ""; - if (!(ctx->flags & KNOT_XDP_FILTER_DROP)) { - (void)snprintf(recv_str, sizeof(recv_str), ", received %"PRIu64, local_stats.ans_recv); - } - if (lost > 0) { - (void)snprintf(lost_str, sizeof(lost_str), ", lost %"PRIu64, lost); - } - if (errors > 0) { - (void)snprintf(err_str, sizeof(err_str), ", errors %"PRIu64, errors); - } - INFO2("thread#%02u: sent %"PRIu64"%s%s%s", - ctx->thread_id, local_stats.qry_sent, recv_str, lost_str, err_str); + print_thrd_summary(ctx, &local_stats); + local_stats.duration = ctx->duration; collect_stats(&global_stats, &local_stats); @@ -1243,6 +1257,7 @@ static int set_mode(const char *arg, knot_xdp_config_t *config) static bool get_opts(int argc, char *argv[], xdp_gun_ctx_t *ctx) { + const char *opts_str = "hV::t:Q:b:rp:T::U::F:I:i:Bl:L:R:v:e:m:G:"; struct option opts[] = { { "help", no_argument, NULL, 'h' }, { "version", optional_argument, NULL, 'V' }, @@ -1264,7 +1279,7 @@ static bool get_opts(int argc, char *argv[], xdp_gun_ctx_t *ctx) { "edns-size", required_argument, NULL, 'e' }, { "mode", required_argument, NULL, 'm' }, { "qlog", required_argument, NULL, 'G' }, - { NULL } + { 0 } }; int opt = 0, arg; @@ -1272,7 +1287,7 @@ static bool get_opts(int argc, char *argv[], xdp_gun_ctx_t *ctx) double argf; char *argcp, *local_ip = NULL; input_t input = { .format = TXT }; - while ((opt = getopt_long(argc, argv, "hV::t:Q:b:rp:T::U::F:I:i:Bl:L:R:v:e:m:G:", opts, NULL)) != -1) { + while ((opt = getopt_long(argc, argv, opts_str, opts, NULL)) != -1) { switch (opt) { case 'h': print_help(); @@ -1460,25 +1475,23 @@ static bool get_opts(int argc, char *argv[], xdp_gun_ctx_t *ctx) int main(int argc, char *argv[]) { + int ecode = EXIT_FAILURE; + xdp_gun_ctx_t ctx = ctx_defaults, *thread_ctxs = NULL; ctx.msgid = time(NULL) % UINT16_MAX; pthread_t *threads = NULL; if (!get_opts(argc, argv, &ctx)) { - free_global_payloads(); - return EXIT_FAILURE; + goto err; } thread_ctxs = calloc(ctx.n_threads, sizeof(*thread_ctxs)); threads = calloc(ctx.n_threads, sizeof(*threads)); if (thread_ctxs == NULL || threads == NULL) { ERR2("out of memory"); - free(thread_ctxs); - free(threads); - free_global_payloads(); - return EXIT_FAILURE; + goto err; } - for (int i = 0; i < ctx.n_threads; i++) { + for (uint32_t i = 0; i < ctx.n_threads; i++) { thread_ctxs[i] = ctx; thread_ctxs[i].thread_id = i; } @@ -1490,8 +1503,7 @@ int main(int argc, char *argv[]) cur_limit.rlim_max != min_limit.rlim_max) { int ret = setrlimit(RLIMIT_MEMLOCK, &min_limit); if (ret != 0) { - WARN2("unable to increase RLIMIT_MEMLOCK: %s", - strerror(errno)); + WARN2("unable to increase RLIMIT_MEMLOCK: %s", strerror(errno)); } } } @@ -1529,10 +1541,12 @@ int main(int argc, char *argv[]) } pthread_mutex_destroy(&global_stats.mutex); + ecode = EXIT_SUCCESS; + +err: free(ctx.rss_conf); free(thread_ctxs); free(threads); free_global_payloads(); - - return EXIT_SUCCESS; + return ecode; } -- GitLab