From 772669ffc752ad0933362899ebca783767209b20 Mon Sep 17 00:00:00 2001 From: Sepherosa Ziehau Date: Wed, 30 Mar 2016 21:58:17 +0800 Subject: [PATCH] tools/kq_sendrecv: Implement kqueue(2) based TCP traffic generator It is mainly used to genetate TCP traffic w/ large amount of TCP connections, e.g. > 10K connections. netperf is no longer suitable to generate this kind of TCP traffic, since 10K+ processes is not desirable. iperf3 is also not suitable, since it is implemented using select(2). --- tools/tools/netrate/kq_sendrecv/Makefile | 3 + tools/tools/netrate/kq_sendrecv/Makefile.inc | 8 + .../kq_sendrecv/include/kq_sendrecv_proto.h | 24 + .../netrate/kq_sendrecv/kq_recvserv/Makefile | 3 + .../kq_sendrecv/kq_recvserv/kq_recvserv.c | 291 +++++++ .../netrate/kq_sendrecv/kq_sendcli/Makefile | 3 + .../kq_sendrecv/kq_sendcli/kq_sendcli.c | 769 ++++++++++++++++++ 7 files changed, 1101 insertions(+) create mode 100644 tools/tools/netrate/kq_sendrecv/Makefile create mode 100644 tools/tools/netrate/kq_sendrecv/Makefile.inc create mode 100644 tools/tools/netrate/kq_sendrecv/include/kq_sendrecv_proto.h create mode 100644 tools/tools/netrate/kq_sendrecv/kq_recvserv/Makefile create mode 100644 tools/tools/netrate/kq_sendrecv/kq_recvserv/kq_recvserv.c create mode 100644 tools/tools/netrate/kq_sendrecv/kq_sendcli/Makefile create mode 100644 tools/tools/netrate/kq_sendrecv/kq_sendcli/kq_sendcli.c diff --git a/tools/tools/netrate/kq_sendrecv/Makefile b/tools/tools/netrate/kq_sendrecv/Makefile new file mode 100644 index 0000000000..3fae201e58 --- /dev/null +++ b/tools/tools/netrate/kq_sendrecv/Makefile @@ -0,0 +1,3 @@ +SUBDIR = kq_sendcli kq_recvserv + +.include diff --git a/tools/tools/netrate/kq_sendrecv/Makefile.inc b/tools/tools/netrate/kq_sendrecv/Makefile.inc new file mode 100644 index 0000000000..01b3be462f --- /dev/null +++ b/tools/tools/netrate/kq_sendrecv/Makefile.inc @@ -0,0 +1,8 @@ +COPTS += -fno-common -I${.CURDIR}/../include +LDADD += -lpthread +DPADD += ${DESTDIR}${LIBDIR}/libpthread.a + +WARNS = 6 +MAN = + +BINDIR = /usr/local/bin diff --git a/tools/tools/netrate/kq_sendrecv/include/kq_sendrecv_proto.h b/tools/tools/netrate/kq_sendrecv/include/kq_sendrecv_proto.h new file mode 100644 index 0000000000..c23666ea8e --- /dev/null +++ b/tools/tools/netrate/kq_sendrecv/include/kq_sendrecv_proto.h @@ -0,0 +1,24 @@ +#ifndef _KQ_SENDRECV_PROTO_H_ +#define _KQ_SENDRECV_PROTO_H_ + +#include +#include + +#define RECV_PORT 11236 + +struct conn_ack { + uint16_t version; + uint16_t rsvd; /* reserved 0 */ + uint32_t rsvd1; /* reserved 0 */ + uint64_t dummy; +} __packed; + +struct recv_info { + uint16_t version; + uint16_t ndport; + uint32_t rsvd; /* reserved 0 */ + + uint16_t dport[]; /* network byte order */ +} __packed; + +#endif /* !_KQ_SENDRECV_PROTO_H_ */ diff --git a/tools/tools/netrate/kq_sendrecv/kq_recvserv/Makefile b/tools/tools/netrate/kq_sendrecv/kq_recvserv/Makefile new file mode 100644 index 0000000000..b85f02316b --- /dev/null +++ b/tools/tools/netrate/kq_sendrecv/kq_recvserv/Makefile @@ -0,0 +1,3 @@ +PROG = kq_recvserv + +.include diff --git a/tools/tools/netrate/kq_sendrecv/kq_recvserv/kq_recvserv.c b/tools/tools/netrate/kq_sendrecv/kq_recvserv/kq_recvserv.c new file mode 100644 index 0000000000..2b4f68f36f --- /dev/null +++ b/tools/tools/netrate/kq_sendrecv/kq_recvserv/kq_recvserv.c @@ -0,0 +1,291 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "kq_sendrecv_proto.h" + +#define RECV_EVENT_MAX 64 +#define RECV_BUFLEN (128 * 1024) + +struct recv_thrctx { + int t_id; + struct sockaddr_in t_in; + + pthread_mutex_t t_lock; + pthread_cond_t t_cond; + + pthread_t t_tid; +}; + +static void *recv_thread(void *); + +static void +usage(const char *cmd) +{ + fprintf(stderr, "%s [-4 addr4] [-p port] [-t nthreads] [-D]\n", cmd); + exit(2); +} + +int +main(int argc, char *argv[]) +{ + struct recv_thrctx *ctx_arr; + struct recv_info *info; + struct sockaddr_in in; + sigset_t sigset; + int opt, s, on, nthr, i, info_sz, do_daemon; + size_t sz; + + sigemptyset(&sigset); + sigaddset(&sigset, SIGPIPE); + if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0) + err(1, "sigprocmask failed"); + + sz = sizeof(nthr); + if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0) + err(1, "sysctl hw.ncpu failed"); + + memset(&in, 0, sizeof(in)); + in.sin_family = AF_INET; + in.sin_addr.s_addr = htonl(INADDR_ANY); + in.sin_port = htons(RECV_PORT); + + do_daemon = 1; + + while ((opt = getopt(argc, argv, "4:Dp:t:")) != -1) { + switch (opt) { + case '4': + if (inet_pton(AF_INET, optarg, &in.sin_addr) <= 0) + errx(1, "inet_pton failed %s", optarg); + break; + + case 'D': + do_daemon = 0; + break; + + case 'p': + in.sin_port = htons(strtoul(optarg, NULL, 10)); + break; + + case 't': + nthr = strtol(optarg, NULL, 10); + if (nthr <= 0) + errx(1, "invalid -t"); + break; + + default: + usage(argv[0]); + } + } + + s = socket(AF_INET, SOCK_STREAM, 0); + if (s < 0) + err(1, "socket failed"); + + on = 1; + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) + err(1, "setsockopt(REUSEPADDR) failed"); + + if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) + err(1, "bind failed"); + + if (listen(s, -1) < 0) + err(1, "listen failed"); + + ctx_arr = calloc(nthr, sizeof(struct recv_thrctx)); + if (ctx_arr == NULL) + err(1, "calloc failed"); + + info_sz = __offsetof(struct recv_info, dport[nthr]); + info = calloc(1, info_sz); + if (info == NULL) + err(1, "calloc failed"); + info->ndport = nthr; + + if (do_daemon) + daemon(0, 0); + + pthread_set_name_np(pthread_self(), "main"); + + for (i = 0; i < nthr; ++i) { + struct recv_thrctx *ctx = &ctx_arr[i]; + int error; + + ctx->t_in = in; + ctx->t_in.sin_port = 0; + + ctx->t_id = i; + pthread_mutex_init(&ctx->t_lock, NULL); + pthread_cond_init(&ctx->t_cond, NULL); + + /* Start receiver */ + error = pthread_create(&ctx->t_tid, NULL, recv_thread, ctx); + if (error) + errc(1, error, "pthread_create %d failed", i); + + /* + * Wait for the receiver to select a proper data port + * and start a listen socket on the data port. + */ + pthread_mutex_lock(&ctx->t_lock); + while (ctx->t_in.sin_port == 0) + pthread_cond_wait(&ctx->t_cond, &ctx->t_lock); + pthread_mutex_unlock(&ctx->t_lock); + + info->dport[i] = ctx->t_in.sin_port; + } + + /* + * Send information, e.g. data ports, back to the clients. + */ + for (;;) { + int s1; + + s1 = accept(s, NULL, NULL); + if (s1 < 0) + continue; + write(s1, info, info_sz); + close(s1); + } + + /* NEVER REACHED */ + exit(0); +} + +static void * +recv_thread(void *xctx) +{ + struct recv_thrctx *ctx = xctx; + struct kevent change_evt0[RECV_EVENT_MAX]; + struct conn_ack ack; + uint8_t *buf; + char name[32]; + u_short port; + int s, kq, nchange; + + /* + * Select a proper data port and create a listen socket on it. + */ + port = RECV_PORT + ctx->t_id; + for (;;) { + struct sockaddr_in in = ctx->t_in; + int on; + + ++port; + if (port < RECV_PORT) + errx(1, "failed to find a data port"); + in.sin_port = htons(port); + + s = socket(AF_INET, SOCK_STREAM, 0); + if (s < 0) + err(1, "socket failed"); + + on = 1; + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) + err(1, "setsockopt(REUSEADDR) failed"); + + on = 1; + if (ioctl(s, FIONBIO, &on, sizeof(on)) < 0) + err(1, "ioctl(FIONBIO) failed"); + + if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) { + close(s); + continue; + } + + if (listen(s, -1) < 0) + err(1, "listen failed"); + + break; + } + + kq = kqueue(); + if (kq < 0) + err(1, "kqueue failed"); + + buf = malloc(RECV_BUFLEN); + if (buf == NULL) + err(1, "malloc %d failed", RECV_BUFLEN); + + memset(&ack, 0, sizeof(ack)); + + snprintf(name, sizeof(name), "rcv%d %d", ctx->t_id, port); + pthread_set_name_np(pthread_self(), name); + + /* + * Inform the main thread that we are ready. + */ + pthread_mutex_lock(&ctx->t_lock); + ctx->t_in.sin_port = htons(port); + pthread_mutex_unlock(&ctx->t_lock); + pthread_cond_signal(&ctx->t_cond); + + EV_SET(&change_evt0[0], s, EVFILT_READ, EV_ADD, 0, 0, NULL); + nchange = 1; + + for (;;) { + const struct kevent *change_evt = NULL; + struct kevent evt[RECV_EVENT_MAX]; + int nevt, i; + + if (nchange > 0) + change_evt = change_evt0; + + nevt = kevent(kq, change_evt, nchange, evt, RECV_EVENT_MAX, + NULL); + if (nevt < 0) + err(1, "kevent failed"); + nchange = 0; + + for (i = 0; i < nevt; ++i) { + int n; + + if (evt[i].ident == (u_int)s) { + while (nchange < RECV_EVENT_MAX) { + int s1; + + s1 = accept(s, NULL, NULL); + if (s1 < 0) + break; + + /* TODO: keepalive */ + + n = write(s1, &ack, sizeof(ack)); + if (n != sizeof(ack)) { + close(s1); + continue; + } + + EV_SET(&change_evt0[nchange], s1, + EVFILT_READ, EV_ADD, 0, 0, NULL); + ++nchange; + } + } else { + n = read(evt[i].ident, buf, RECV_BUFLEN); + if (n <= 0) { + if (n == 0 || errno != EAGAIN) + close(evt[i].ident); + } + } + } + } + + /* NEVER REACHED */ + return NULL; +} diff --git a/tools/tools/netrate/kq_sendrecv/kq_sendcli/Makefile b/tools/tools/netrate/kq_sendrecv/kq_sendcli/Makefile new file mode 100644 index 0000000000..fc53e363fd --- /dev/null +++ b/tools/tools/netrate/kq_sendrecv/kq_sendcli/Makefile @@ -0,0 +1,3 @@ +PROG = kq_sendcli + +.include diff --git a/tools/tools/netrate/kq_sendrecv/kq_sendcli/kq_sendcli.c b/tools/tools/netrate/kq_sendrecv/kq_sendcli/kq_sendcli.c new file mode 100644 index 0000000000..b2f9b19977 --- /dev/null +++ b/tools/tools/netrate/kq_sendrecv/kq_sendcli/kq_sendcli.c @@ -0,0 +1,769 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#ifdef __FreeBSD__ +#include +#endif +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "kq_sendrecv_proto.h" + +/* + * Note about the sender start synchronization. + * + * We apply two stage synchronization. The first stage uses pthread + * condition (it sleeps), which waits for the establishment for all + * connections, which could be slow. The second stage uses g_nwait + * of send_globctx; all relevant threads spin on g_nwait. The main + * thread spin-waits for all senders to increase g_nwait. The sender + * thread increases the g_nwait, then it spin-waits for main thread + * to reset g_nwait. In this way, we can make sure that all senders + * start roughly at the same time. + */ + +#ifndef timespecsub +#define timespecsub(vvp, uvp) \ + do { \ + (vvp)->tv_sec -= (uvp)->tv_sec; \ + (vvp)->tv_nsec -= (uvp)->tv_nsec; \ + if ((vvp)->tv_nsec < 0) { \ + (vvp)->tv_sec--; \ + (vvp)->tv_nsec += 1000000000; \ + } \ + } while (0) +#endif + +#ifndef timespeccmp +#define timespeccmp(tvp, uvp, cmp) \ + (((tvp)->tv_sec == (uvp)->tv_sec) ? \ + ((tvp)->tv_nsec cmp (uvp)->tv_nsec) : \ + ((tvp)->tv_sec cmp (uvp)->tv_sec)) +#endif + +#if 0 +#define SEND_DEBUG +#endif +#if 0 +#define SEND_TIME_DEBUG +#endif + +#define SEND_DUR 10 +#define SEND_EVENT_MAX 64 +#define SEND_BUFLEN (128 * 1024) + +/* + * The successful 3-way handshake on the connection does not mean the + * remote application can accept(2) this connection. Even worse, the + * remote side's network stack may drop the connection silently, i.e. + * w/o RST. If this happened, the blocking read(2) would not return, + * until the keepalive kicked in, which would take quite some time. + * This is obviously not what we want here, so use synthetic timeout + * for blocking read(2). Here, we will retry if a blocking read(2) + * times out. + */ +#define SEND_READTO_MS 1000 /* unit: ms */ + +#if defined(__DragonFly__) +#define SEND_CONN_CTX_ALIGN __VM_CACHELINE_SIZE +#elif defined(__FreeBSD__) +#define SEND_CONN_CTX_ALIGN CACHE_LINE_SIZE +#else +#define SEND_CONN_CTX_ALIGN 64 /* XXX */ +#endif + +struct conn_ctx { + int c_s; + int c_err; + uint64_t c_stat; + struct timespec c_terr; + + STAILQ_ENTRY(conn_ctx) c_glob_link; + STAILQ_ENTRY(conn_ctx) c_link; + struct sockaddr_in c_in; + int c_thr_id; +} __aligned(SEND_CONN_CTX_ALIGN); + +STAILQ_HEAD(conn_ctx_list, conn_ctx); + +struct send_globctx { + struct conn_ctx_list g_conn; + + int g_dur; + int g_nconn; + pthread_mutex_t g_lock; + pthread_cond_t g_cond; + + volatile u_int g_nwait; + int g_readto_ms; /* unit: ms */ +}; + +struct send_thrctx { + struct conn_ctx_list t_conn; + pthread_mutex_t t_lock; + pthread_cond_t t_cond; + + struct send_globctx *t_glob; + struct timespec t_start; + struct timespec t_end; + double t_run_us; /* unit: us */ + + pthread_t t_tid; + int t_id; +}; + +static void send_build_addrlist(const struct sockaddr_in *, int, + const struct sockaddr_in **, int *, int); +static void *send_thread(void *); + +static __inline void +send_spinwait(void) +{ +#if defined(__DragonFly__) + cpu_pause(); +#elif defined(__FreeBSD__) + cpu_spinwait(); +#else + /* XXX nothing */ +#endif +} + +static void +usage(const char *cmd) +{ + fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] " + "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-E]\n", cmd); + exit(2); +} + +int +main(int argc, char *argv[]) +{ + struct send_globctx glob; + struct send_thrctx *ctx_arr, *ctx; + struct sockaddr_in *in_arr, *in; + const struct sockaddr_in *daddr; + struct timespec run, end, start; + double total_run_us, total, conn_min, conn_max; + double jain, jain_res; + int jain_cnt; + struct conn_ctx *conn; + sigset_t sigset; + int opt, i; + int in_arr_cnt, in_arr_sz, ndaddr; + int nthr, nconn, dur, readto_ms; + int log_err, err_cnt, has_minmax; + u_short port = RECV_PORT; + uint32_t idx; + size_t sz; + + sigemptyset(&sigset); + sigaddset(&sigset, SIGPIPE); + if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0) + err(1, "sigprocmask failed"); + + sz = sizeof(nthr); + if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0) + err(1, "sysctl hw.ncpu failed"); + + in_arr_sz = 4; + in_arr_cnt = 0; + in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in)); + if (in_arr == NULL) + err(1, "malloc failed"); + + log_err = 0; + nconn = 0; + dur = SEND_DUR; + readto_ms = SEND_READTO_MS; + + while ((opt = getopt(argc, argv, "4:Ec:l:p:r:t:")) != -1) { + switch (opt) { + case '4': + if (in_arr_cnt == in_arr_sz) { + in_arr_sz *= 2; + in_arr = reallocf(in_arr, + in_arr_sz * sizeof(struct sockaddr_in)); + if (in_arr == NULL) + err(1, "reallocf failed"); + } + in = &in_arr[in_arr_cnt]; + ++in_arr_cnt; + + memset(in, 0, sizeof(*in)); + in->sin_family = AF_INET; + if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0) + errx(1, "inet_pton failed %s", optarg); + break; + + case 'E': + log_err = 1; + break; + + case 'c': + nconn = strtol(optarg, NULL, 10); + if (nconn <= 0) + errx(1, "invalid -c"); + break; + + case 'l': + dur = strtoul(optarg, NULL, 10); + if (dur == 0) + errx(1, "invalid -l"); + break; + + case 'p': + port = strtoul(optarg, NULL, 10); + break; + + case 'r': + readto_ms = strtol(optarg, NULL, 10); + if (readto_ms <= 0) + errx(1, "invalid -r"); + break; + + case 't': + nthr = strtol(optarg, NULL, 10); + if (nthr <= 0) + errx(1, "invalid -t"); + break; + + default: + usage(argv[0]); + } + } + if (in_arr_cnt == 0 || nconn == 0) + errx(1, "neither -4 nor -c are specified"); + + if (nthr > nconn) + nthr = nconn; + + for (i = 0; i < in_arr_cnt; ++i) + in_arr[i].sin_port = htons(port); + + ctx_arr = calloc(nthr, sizeof(struct send_thrctx)); + if (ctx_arr == NULL) + err(1, "calloc failed"); + + memset(&glob, 0, sizeof(glob)); + STAILQ_INIT(&glob.g_conn); + glob.g_nconn = nconn; + glob.g_nwait = 1; /* count self in */ + glob.g_dur = dur; + glob.g_readto_ms = readto_ms; + pthread_mutex_init(&glob.g_lock, NULL); + pthread_cond_init(&glob.g_cond, NULL); + + pthread_set_name_np(pthread_self(), "main"); + + /* Build receiver address list */ + send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms); + + /* + * Start senders. + */ + for (i = 0; i < nthr; ++i) { + int error; + + ctx = &ctx_arr[i]; + STAILQ_INIT(&ctx->t_conn); + ctx->t_id = i; + ctx->t_glob = &glob; + pthread_mutex_init(&ctx->t_lock, NULL); + pthread_cond_init(&ctx->t_cond, NULL); + + error = pthread_create(&ctx->t_tid, NULL, send_thread, ctx); + if (error) + errc(1, error, "pthread_create failed"); + } + + /* + * Distribute connections to senders. + * + * NOTE: + * We start from a random position in the address list, so that the + * first several receiving servers will not be abused, if the number + * of connections is small and there are many clients. + */ + idx = arc4random_uniform(ndaddr); + for (i = 0; i < nconn; ++i) { + const struct sockaddr_in *da; + + da = &daddr[idx % ndaddr]; + ++idx; + + conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn)); + if (conn == NULL) + err(1, "aligned_alloc failed"); + memset(conn, 0, sizeof(*conn)); + conn->c_in = *da; + conn->c_s = -1; + + ctx = &ctx_arr[i % nthr]; + conn->c_thr_id = ctx->t_id; + + pthread_mutex_lock(&ctx->t_lock); + STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link); + pthread_mutex_unlock(&ctx->t_lock); + pthread_cond_signal(&ctx->t_cond); + + /* Add to the global list for results gathering */ + STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link); + } + + /* + * No more connections; notify the senders. + * + * NOTE: + * The marker for 'the end of connection list' has 0 in its + * c_in.sin_port. + */ + for (i = 0; i < nthr; ++i) { + conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn)); + if (conn == NULL) + err(1, "aligned_alloc failed"); + memset(conn, 0, sizeof(*conn)); + conn->c_s = -1; + + ctx = &ctx_arr[i]; + pthread_mutex_lock(&ctx->t_lock); + STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link); + pthread_mutex_unlock(&ctx->t_lock); + pthread_cond_signal(&ctx->t_cond); + } + + /* + * Sender start sync, stage 1: + * Wait for connections establishment (slow). + */ + pthread_mutex_lock(&glob.g_lock); + while (glob.g_nconn != 0) + pthread_cond_wait(&glob.g_cond, &glob.g_lock); + pthread_mutex_unlock(&glob.g_lock); + + /* + * Sender start sync, stage 2: + * Wait for senders to spin-wait; and once all senders spin-wait, + * release them by resetting g_nwait. + */ + while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0) + send_spinwait(); + + fprintf(stderr, "start %d seconds sending test: %d threads, " + "%d connections\n", dur, nthr, nconn); + + /* + * Wait for the senders to finish and gather the results. + */ + + memset(&end, 0, sizeof(end)); /* XXX stupid gcc warning */ + memset(&start, 0, sizeof(start)); /* XXX stupid gcc warning */ + + for (i = 0; i < nthr; ++i) { + ctx = &ctx_arr[i]; + pthread_join(ctx->t_tid, NULL); + + run = ctx->t_end; + timespecsub(&run, &ctx->t_start); + ctx->t_run_us = ((double)run.tv_sec * 1000000.0) + + ((double)run.tv_nsec / 1000.0); + + if (i == 0) { + start = ctx->t_start; + end = ctx->t_end; + } else { + if (timespeccmp(&start, &ctx->t_start, >)) + start = ctx->t_start; + if (timespeccmp(&end, &ctx->t_end, <)) + end = ctx->t_end; + } + +#ifdef SEND_TIME_DEBUG + fprintf(stderr, "start %ld.%ld, end %ld.%ld\n", + ctx->t_start.tv_sec, ctx->t_start.tv_nsec, + ctx->t_end.tv_sec, ctx->t_end.tv_nsec); +#endif + } + +#ifdef SEND_TIME_DEBUG + fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n", + start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec); +#endif + + run = end; + timespecsub(&run, &start); + total_run_us = ((double)run.tv_sec * 1000000.0) + + ((double)run.tv_nsec / 1000.0); + total = 0.0; + + err_cnt = 0; + has_minmax = 0; + conn_min = 0.0; + conn_max = 0.0; + + jain = 0.0; + jain_res = 0.0; + jain_cnt = 0; + + STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) { + total += conn->c_stat; + if (conn->c_err == 0) { + double perf; /* unit: Mbps */ + + perf = (conn->c_stat * 8.0) / + ctx_arr[conn->c_thr_id].t_run_us; + if (!has_minmax) { + conn_min = perf; + conn_max = perf; + has_minmax = 1; + } else { + if (perf > conn_max) + conn_max = perf; + if (perf < conn_min) + conn_min = perf; + } + jain += (perf * perf); + jain_res += perf; + ++jain_cnt; + } else { + ++err_cnt; + } + } + + jain *= jain_cnt; + jain = (jain_res * jain_res) / jain; + + printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, " + "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max, + jain, err_cnt); + + if (log_err && err_cnt) { + STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) { + char name[INET_ADDRSTRLEN]; + double tmp_run; + + if (conn->c_err == 0) + continue; + + run = conn->c_terr; + timespecsub(&run, &ctx_arr[conn->c_thr_id].t_start); + tmp_run = ((double)run.tv_sec * 1000000.0) + + ((double)run.tv_nsec / 1000.0); + fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, " + "errno %d\n", + conn->c_thr_id, + inet_ntop(AF_INET, &conn->c_in.sin_addr, + name, sizeof(name)), + ntohs(conn->c_in.sin_port), + run.tv_sec, (conn->c_stat * 8.0) / tmp_run, + conn->c_err); + --err_cnt; + if (err_cnt == 0) + break; + } + } + + exit(0); +} + +static void +send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt, + const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms) +{ + struct sockaddr_in *daddr; + struct timeval readto; + int i, ndaddr; + + daddr = NULL; + ndaddr = 0; + + memset(&readto, 0, sizeof(readto)); + readto.tv_sec = readto_ms / 1000; + readto.tv_usec = (readto_ms % 1000) * 1000; + + for (i = 0; i < in_arr_cnt; ++i) { + const struct sockaddr_in *in = &in_arr[i]; + struct recv_info info_hdr; + uint16_t *ports; + int s, n, ports_sz, d; + +again: + s = socket(AF_INET, SOCK_STREAM, 0); + if (s < 0) + err(1, "socket failed"); + + if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0) + err(1, "connect failed"); + + if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, + &readto, sizeof(readto)) < 0) + err(1, "setsockopt(RCVTIMEO) failed"); + + n = read(s, &info_hdr, sizeof(info_hdr)); + if (n != sizeof(info_hdr)) { + if (n < 0) { + if (errno == EAGAIN) { + close(s); + goto again; + } + err(1, "read info hdr failed"); + } else { + errx(1, "read truncated info hdr"); + } + } + if (info_hdr.ndport == 0) { + close(s); + continue; + } + + ports_sz = info_hdr.ndport * sizeof(uint16_t); + ports = malloc(ports_sz); + if (ports == NULL) + err(1, "malloc failed"); + + n = read(s, ports, ports_sz); + if (n != ports_sz) { + if (n < 0) { + if (errno == EAGAIN) { + free(ports); + close(s); + goto again; + } + err(1, "read ports failed"); + } else { + errx(1, "read truncated ports"); + } + } + + daddr = reallocf(daddr, + (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in)); + if (daddr == NULL) + err(1, "reallocf failed"); + + for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) { + struct sockaddr_in *da = &daddr[d]; + + *da = *in; + da->sin_port = ports[d - ndaddr]; + } + ndaddr += info_hdr.ndport; + + free(ports); + close(s); + } + +#ifdef SEND_DEBUG + for (i = 0; i < ndaddr; ++i) { + const struct sockaddr_in *da = &daddr[i]; + char name[INET_ADDRSTRLEN]; + + fprintf(stderr, "%s:%d\n", + inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)), + ntohs(da->sin_port)); + } +#endif + + *daddr0 = daddr; + *ndaddr0 = ndaddr; +} + +static void * +send_thread(void *xctx) +{ + struct send_thrctx *ctx = xctx; + struct conn_ctx *timeo; + struct kevent chg_evt; + uint8_t *buf; + int nconn = 0, kq, n; + char name[32]; + + snprintf(name, sizeof(name), "snd%d", ctx->t_id); + pthread_set_name_np(pthread_self(), name); + + buf = malloc(SEND_BUFLEN); + if (buf == NULL) + err(1, "malloc failed"); + + kq = kqueue(); + if (kq < 0) + err(1, "kqueue failed"); + + /* + * Establish the connections assigned to us and add the + * established connections to kqueue. + */ + for (;;) { +#ifdef SEND_DEBUG + char addr_name[INET_ADDRSTRLEN]; +#endif + struct timeval readto; + struct conn_ctx *conn; + struct conn_ack ack; + int on; + + pthread_mutex_lock(&ctx->t_lock); + while (STAILQ_EMPTY(&ctx->t_conn)) + pthread_cond_wait(&ctx->t_cond, &ctx->t_lock); + conn = STAILQ_FIRST(&ctx->t_conn); + STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link); + pthread_mutex_unlock(&ctx->t_lock); + + if (conn->c_in.sin_port == 0) { + /* + * The marker for 'the end of connection list'. + * See the related comment in main thread. + * + * NOTE: + * We reuse the marker as the udata for the + * kqueue timer. + */ + timeo = conn; + break; + } + + ++nconn; +#ifdef SEND_DEBUG + fprintf(stderr, "%s %s:%d\n", name, + inet_ntop(AF_INET, &conn->c_in.sin_addr, + addr_name, sizeof(addr_name)), + ntohs(conn->c_in.sin_port)); +#endif + +again: + conn->c_s = socket(AF_INET, SOCK_STREAM, 0); + if (conn->c_s < 0) + err(1, "socket failed"); + + if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in, + sizeof(conn->c_in)) < 0) + err(1, "connect failed"); + + memset(&readto, 0, sizeof(readto)); + readto.tv_sec = ctx->t_glob->g_readto_ms / 1000; + readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000; + if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto, + sizeof(readto)) < 0) + err(1, "setsockopt(RCVTIMEO) failed"); + + n = read(conn->c_s, &ack, sizeof(ack)); + if (n != sizeof(ack)) { + if (n < 0) { + if (errno == EAGAIN) { + close(conn->c_s); + goto again; + } + err(1, "read ack failed"); + } else { + errx(1, "read truncated ack"); + } + } + + on = 1; + if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0) + err(1, "ioctl(FIONBIO) failed"); + + EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn); + n = kevent(kq, &chg_evt, 1, NULL, 0, NULL); + if (n < 0) + err(1, "kevent add failed"); + } +#ifdef SEND_DEBUG + fprintf(stderr, "%s conn %d\n", name, nconn); +#endif + + /* + * Sender start sync, stage 1: + * Wait for connections establishment (slow). + */ + pthread_mutex_lock(&ctx->t_glob->g_lock); + ctx->t_glob->g_nconn -= nconn; + pthread_cond_broadcast(&ctx->t_glob->g_cond); + while (ctx->t_glob->g_nconn != 0) + pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock); + pthread_mutex_unlock(&ctx->t_glob->g_lock); + + /* + * Sender start sync, stage2. + */ + /* Increase the g_nwait. */ + atomic_add_int(&ctx->t_glob->g_nwait, 1); + /* Spin-wait for main thread to release us (reset g_nwait). */ + while (ctx->t_glob->g_nwait) + send_spinwait(); + +#ifdef SEND_DEBUG + fprintf(stderr, "%s start\n", name); +#endif + + /* + * Wire a kqueue timer, so that the sending can be terminated + * as requested. + * + * NOTE: + * Set -2 to c_s for timer udata, so we could distinguish it + * from real connections. + */ + timeo->c_s = -2; + EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0, + ctx->t_glob->g_dur * 1000L, timeo); + n = kevent(kq, &chg_evt, 1, NULL, 0, NULL); + if (n < 0) + err(1, "kevent add failed"); + + clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start); + for (;;) { + struct kevent evt[SEND_EVENT_MAX]; + int nevt, i; + + nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL); + if (nevt < 0) + err(1, "kevent failed"); + + for (i = 0; i < nevt; ++i) { + struct conn_ctx *conn = evt[i].udata; + + if (conn->c_s < 0) { + if (conn->c_s == -2) { + /* Timer expired */ + goto done; + } + continue; + } + + n = write(conn->c_s, buf, SEND_BUFLEN); + if (n < 0) { + if (errno != EAGAIN) { + conn->c_err = errno; + clock_gettime(CLOCK_MONOTONIC_PRECISE, + &conn->c_terr); + close(conn->c_s); + conn->c_s = -1; + } + } else { + conn->c_stat += n; + } + } + } +done: + clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end); + return NULL; +} -- 2.41.0