5 #include <sys/socket.h>
6 #include <sys/sysctl.h>
9 #include <machine/atomic.h>
11 #include <machine/cpu.h>
13 #include <machine/cpufunc.h>
15 #include <arpa/inet.h>
16 #include <netinet/in.h>
21 #include <pthread_np.h>
30 #include "kq_sendrecv_proto.h"
33 * Note about the sender start synchronization.
35 * We apply two stage synchronization. The first stage uses pthread
36 * condition (it sleeps), which waits for the establishment for all
37 * connections, which could be slow. The second stage uses g_nwait
38 * of send_globctx; all relevant threads spin on g_nwait. The main
39 * thread spin-waits for all senders to increase g_nwait. The sender
40 * thread increases the g_nwait, then it spin-waits for main thread
41 * to reset g_nwait. In this way, we can make sure that all senders
42 * start roughly at the same time.
46 #define timespecsub(vvp, uvp) \
48 (vvp)->tv_sec -= (uvp)->tv_sec; \
49 (vvp)->tv_nsec -= (uvp)->tv_nsec; \
50 if ((vvp)->tv_nsec < 0) { \
52 (vvp)->tv_nsec += 1000000000; \
58 #define timespeccmp(tvp, uvp, cmp) \
59 (((tvp)->tv_sec == (uvp)->tv_sec) ? \
60 ((tvp)->tv_nsec cmp (uvp)->tv_nsec) : \
61 ((tvp)->tv_sec cmp (uvp)->tv_sec))
68 #define SEND_TIME_DEBUG
72 #define SEND_EVENT_MAX 64
73 #define SEND_BUFLEN (128 * 1024)
76 * The successful 3-way handshake on the connection does not mean the
77 * remote application can accept(2) this connection. Even worse, the
78 * remote side's network stack may drop the connection silently, i.e.
79 * w/o RST. If this happened, the blocking read(2) would not return,
80 * until the keepalive kicked in, which would take quite some time.
81 * This is obviously not what we want here, so use synthetic timeout
82 * for blocking read(2). Here, we will retry if a blocking read(2)
85 #define SEND_READTO_MS 1000 /* unit: ms */
87 #if defined(__DragonFly__)
88 #define SEND_CONN_CTX_ALIGN __VM_CACHELINE_SIZE
89 #elif defined(__FreeBSD__)
90 #define SEND_CONN_CTX_ALIGN CACHE_LINE_SIZE
92 #define SEND_CONN_CTX_ALIGN 64 /* XXX */
99 struct timespec c_terr;
101 STAILQ_ENTRY(conn_ctx) c_glob_link;
102 STAILQ_ENTRY(conn_ctx) c_link;
103 struct sockaddr_in c_in;
105 } __aligned(SEND_CONN_CTX_ALIGN);
107 STAILQ_HEAD(conn_ctx_list, conn_ctx);
109 struct send_globctx {
110 struct conn_ctx_list g_conn;
114 pthread_mutex_t g_lock;
115 pthread_cond_t g_cond;
117 volatile u_int g_nwait;
118 int g_readto_ms; /* unit: ms */
124 struct conn_ctx_list t_conn;
125 pthread_mutex_t t_lock;
126 pthread_cond_t t_cond;
128 struct send_globctx *t_glob;
129 struct timespec t_start;
130 struct timespec t_end;
131 double t_run_us; /* unit: us */
137 static void send_build_addrlist(const struct sockaddr_in *, int,
138 const struct sockaddr_in **, int *, int);
139 static void *send_thread(void *);
144 #if defined(__DragonFly__)
146 #elif defined(__FreeBSD__)
154 usage(const char *cmd)
156 fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] "
157 "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-S] [-E] "
158 "[-b buflen] [-B]\n", cmd);
163 main(int argc, char *argv[])
165 struct send_globctx glob;
166 struct send_thrctx *ctx_arr, *ctx;
167 struct sockaddr_in *in_arr, *in;
168 const struct sockaddr_in *daddr;
169 struct timespec run, end, start;
170 double total_run_us, total, conn_min, conn_max;
171 double jain, jain_res;
173 struct conn_ctx *conn;
176 int in_arr_cnt, in_arr_sz, ndaddr;
177 int nthr, nconn, dur, readto_ms, buflen;
178 int log_err, err_cnt, has_minmax;
179 u_short port = RECV_PORT;
182 bool do_sendfile = false, bindcpu = false;
184 sigemptyset(&sigset);
185 sigaddset(&sigset, SIGPIPE);
186 if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
187 err(1, "sigprocmask failed");
190 if (sysctlbyname("hw.ncpu", &ncpus, &sz, NULL, 0) < 0)
191 err(1, "sysctl hw.ncpu failed");
196 in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in));
198 err(1, "malloc failed");
203 readto_ms = SEND_READTO_MS;
204 buflen = SEND_BUFLEN;
206 while ((opt = getopt(argc, argv, "4:BESb:c:l:p:r:t:")) != -1) {
209 if (in_arr_cnt == in_arr_sz) {
211 in_arr = reallocf(in_arr,
212 in_arr_sz * sizeof(struct sockaddr_in));
214 err(1, "reallocf failed");
216 in = &in_arr[in_arr_cnt];
219 memset(in, 0, sizeof(*in));
220 in->sin_family = AF_INET;
221 if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0)
222 errx(1, "inet_pton failed %s", optarg);
238 buflen = strtol(optarg, NULL, 10);
240 errx(1, "invalid -b");
244 nconn = strtol(optarg, NULL, 10);
246 errx(1, "invalid -c");
250 dur = strtoul(optarg, NULL, 10);
252 errx(1, "invalid -l");
256 port = strtoul(optarg, NULL, 10);
260 readto_ms = strtol(optarg, NULL, 10);
262 errx(1, "invalid -r");
266 nthr = strtol(optarg, NULL, 10);
268 errx(1, "invalid -t");
275 if (in_arr_cnt == 0 || nconn == 0)
276 errx(1, "either -4 or -c are specified");
281 for (i = 0; i < in_arr_cnt; ++i)
282 in_arr[i].sin_port = htons(port);
284 ctx_arr = calloc(nthr, sizeof(struct send_thrctx));
286 err(1, "calloc failed");
288 memset(&glob, 0, sizeof(glob));
289 STAILQ_INIT(&glob.g_conn);
290 glob.g_nconn = nconn;
291 glob.g_nwait = 1; /* count self in */
293 glob.g_readto_ms = readto_ms;
294 glob.g_sendfile = do_sendfile;
295 glob.g_buflen = buflen;
296 pthread_mutex_init(&glob.g_lock, NULL);
297 pthread_cond_init(&glob.g_cond, NULL);
299 pthread_set_name_np(pthread_self(), "main");
301 /* Build receiver address list */
302 send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms);
307 for (i = 0; i < nthr; ++i) {
312 STAILQ_INIT(&ctx->t_conn);
315 pthread_mutex_init(&ctx->t_lock, NULL);
316 pthread_cond_init(&ctx->t_cond, NULL);
318 pthread_attr_init(&attr);
323 CPU_SET(i % ncpus, &mask);
324 error = pthread_attr_setaffinity_np(&attr,
325 sizeof(mask), &mask);
327 errc(1, error, "pthread_attr_setaffinity_np "
332 error = pthread_create(&ctx->t_tid, &attr, send_thread, ctx);
334 errc(1, error, "pthread_create failed");
335 pthread_attr_destroy(&attr);
339 * Distribute connections to senders.
342 * We start from a random position in the address list, so that the
343 * first several receiving servers will not be abused, if the number
344 * of connections is small and there are many clients.
346 idx = arc4random_uniform(ndaddr);
347 for (i = 0; i < nconn; ++i) {
348 const struct sockaddr_in *da;
350 da = &daddr[idx % ndaddr];
353 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
355 err(1, "aligned_alloc failed");
356 memset(conn, 0, sizeof(*conn));
360 ctx = &ctx_arr[i % nthr];
361 conn->c_thr_id = ctx->t_id;
363 pthread_mutex_lock(&ctx->t_lock);
364 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
365 pthread_mutex_unlock(&ctx->t_lock);
366 pthread_cond_signal(&ctx->t_cond);
368 /* Add to the global list for results gathering */
369 STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link);
373 * No more connections; notify the senders.
376 * The marker for 'the end of connection list' has 0 in its
379 for (i = 0; i < nthr; ++i) {
380 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
382 err(1, "aligned_alloc failed");
383 memset(conn, 0, sizeof(*conn));
387 pthread_mutex_lock(&ctx->t_lock);
388 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
389 pthread_mutex_unlock(&ctx->t_lock);
390 pthread_cond_signal(&ctx->t_cond);
394 * Sender start sync, stage 1:
395 * Wait for connections establishment (slow).
397 pthread_mutex_lock(&glob.g_lock);
398 while (glob.g_nconn != 0)
399 pthread_cond_wait(&glob.g_cond, &glob.g_lock);
400 pthread_mutex_unlock(&glob.g_lock);
403 * Sender start sync, stage 2:
404 * Wait for senders to spin-wait; and once all senders spin-wait,
405 * release them by resetting g_nwait.
407 while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0)
410 fprintf(stderr, "start %d seconds sending test: %d threads, "
411 "%d connections\n", dur, nthr, nconn);
414 * Wait for the senders to finish and gather the results.
417 memset(&end, 0, sizeof(end)); /* XXX stupid gcc warning */
418 memset(&start, 0, sizeof(start)); /* XXX stupid gcc warning */
420 for (i = 0; i < nthr; ++i) {
422 pthread_join(ctx->t_tid, NULL);
425 timespecsub(&run, &ctx->t_start);
426 ctx->t_run_us = ((double)run.tv_sec * 1000000.0) +
427 ((double)run.tv_nsec / 1000.0);
430 start = ctx->t_start;
433 if (timespeccmp(&start, &ctx->t_start, >))
434 start = ctx->t_start;
435 if (timespeccmp(&end, &ctx->t_end, <))
439 #ifdef SEND_TIME_DEBUG
440 fprintf(stderr, "start %ld.%ld, end %ld.%ld\n",
441 ctx->t_start.tv_sec, ctx->t_start.tv_nsec,
442 ctx->t_end.tv_sec, ctx->t_end.tv_nsec);
446 #ifdef SEND_TIME_DEBUG
447 fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n",
448 start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec);
452 timespecsub(&run, &start);
453 total_run_us = ((double)run.tv_sec * 1000000.0) +
454 ((double)run.tv_nsec / 1000.0);
466 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
467 total += conn->c_stat;
468 if (conn->c_err == 0) {
469 double perf; /* unit: Mbps */
471 perf = (conn->c_stat * 8.0) /
472 ctx_arr[conn->c_thr_id].t_run_us;
483 jain += (perf * perf);
492 jain = (jain_res * jain_res) / jain;
494 printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, "
495 "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max,
498 if (log_err && err_cnt) {
499 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
500 char name[INET_ADDRSTRLEN];
503 if (conn->c_err == 0)
507 timespecsub(&run, &ctx_arr[conn->c_thr_id].t_start);
508 tmp_run = ((double)run.tv_sec * 1000000.0) +
509 ((double)run.tv_nsec / 1000.0);
510 fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, "
513 inet_ntop(AF_INET, &conn->c_in.sin_addr,
515 ntohs(conn->c_in.sin_port),
516 run.tv_sec, (conn->c_stat * 8.0) / tmp_run,
528 send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt,
529 const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms)
531 struct sockaddr_in *daddr;
532 struct timeval readto;
538 memset(&readto, 0, sizeof(readto));
539 readto.tv_sec = readto_ms / 1000;
540 readto.tv_usec = (readto_ms % 1000) * 1000;
542 for (i = 0; i < in_arr_cnt; ++i) {
543 const struct sockaddr_in *in = &in_arr[i];
544 struct recv_info info_hdr;
546 int s, n, ports_sz, d;
549 s = socket(AF_INET, SOCK_STREAM, 0);
551 err(1, "socket failed");
553 if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0)
554 err(1, "connect failed");
556 if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO,
557 &readto, sizeof(readto)) < 0)
558 err(1, "setsockopt(RCVTIMEO) failed");
560 n = read(s, &info_hdr, sizeof(info_hdr));
561 if (n != sizeof(info_hdr)) {
563 if (errno == EAGAIN) {
567 err(1, "read info hdr failed");
569 errx(1, "read truncated info hdr");
572 if (info_hdr.ndport == 0) {
577 ports_sz = info_hdr.ndport * sizeof(uint16_t);
578 ports = malloc(ports_sz);
580 err(1, "malloc failed");
582 n = read(s, ports, ports_sz);
585 if (errno == EAGAIN) {
590 err(1, "read ports failed");
592 errx(1, "read truncated ports");
596 daddr = reallocf(daddr,
597 (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in));
599 err(1, "reallocf failed");
601 for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) {
602 struct sockaddr_in *da = &daddr[d];
605 da->sin_port = ports[d - ndaddr];
607 ndaddr += info_hdr.ndport;
614 for (i = 0; i < ndaddr; ++i) {
615 const struct sockaddr_in *da = &daddr[i];
616 char name[INET_ADDRSTRLEN];
618 fprintf(stderr, "%s:%d\n",
619 inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)),
620 ntohs(da->sin_port));
629 send_thread(void *xctx)
631 struct send_thrctx *ctx = xctx;
632 struct conn_ctx *timeo;
633 struct kevent chg_evt;
635 int nconn = 0, kq, n, fd = -1, buflen;
638 snprintf(name, sizeof(name), "snd%d", ctx->t_id);
639 pthread_set_name_np(pthread_self(), name);
641 buflen = ctx->t_glob->g_buflen;
642 buf = malloc(buflen);
644 err(1, "malloc(%d) failed", buflen);
646 if (ctx->t_glob->g_sendfile) {
647 char filename[] = "sendtmpXXX";
649 fd = mkstemp(filename);
651 err(1, "mkstemp failed");
652 if (write(fd, buf, buflen) != buflen)
653 err(1, "write to file failed");
661 err(1, "kqueue failed");
664 * Establish the connections assigned to us and add the
665 * established connections to kqueue.
669 char addr_name[INET_ADDRSTRLEN];
671 struct timeval readto;
672 struct conn_ctx *conn;
676 pthread_mutex_lock(&ctx->t_lock);
677 while (STAILQ_EMPTY(&ctx->t_conn))
678 pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
679 conn = STAILQ_FIRST(&ctx->t_conn);
680 STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link);
681 pthread_mutex_unlock(&ctx->t_lock);
683 if (conn->c_in.sin_port == 0) {
685 * The marker for 'the end of connection list'.
686 * See the related comment in main thread.
689 * We reuse the marker as the udata for the
698 fprintf(stderr, "%s %s:%d\n", name,
699 inet_ntop(AF_INET, &conn->c_in.sin_addr,
700 addr_name, sizeof(addr_name)),
701 ntohs(conn->c_in.sin_port));
705 conn->c_s = socket(AF_INET, SOCK_STREAM, 0);
707 err(1, "socket failed");
709 if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in,
710 sizeof(conn->c_in)) < 0)
711 err(1, "connect failed");
713 memset(&readto, 0, sizeof(readto));
714 readto.tv_sec = ctx->t_glob->g_readto_ms / 1000;
715 readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000;
716 if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto,
718 err(1, "setsockopt(RCVTIMEO) failed");
720 n = read(conn->c_s, &ack, sizeof(ack));
721 if (n != sizeof(ack)) {
723 if (errno == EAGAIN) {
727 err(1, "read ack failed");
729 errx(1, "read truncated ack");
734 if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0)
735 err(1, "ioctl(FIONBIO) failed");
737 EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn);
738 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
740 err(1, "kevent add failed");
743 fprintf(stderr, "%s conn %d\n", name, nconn);
747 * Sender start sync, stage 1:
748 * Wait for connections establishment (slow).
750 pthread_mutex_lock(&ctx->t_glob->g_lock);
751 ctx->t_glob->g_nconn -= nconn;
752 pthread_cond_broadcast(&ctx->t_glob->g_cond);
753 while (ctx->t_glob->g_nconn != 0)
754 pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock);
755 pthread_mutex_unlock(&ctx->t_glob->g_lock);
758 * Sender start sync, stage2.
760 /* Increase the g_nwait. */
761 atomic_add_int(&ctx->t_glob->g_nwait, 1);
762 /* Spin-wait for main thread to release us (reset g_nwait). */
763 while (ctx->t_glob->g_nwait)
767 fprintf(stderr, "%s start\n", name);
771 * Wire a kqueue timer, so that the sending can be terminated
775 * Set -2 to c_s for timer udata, so we could distinguish it
776 * from real connections.
779 EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0,
780 ctx->t_glob->g_dur * 1000L, timeo);
781 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
783 err(1, "kevent add failed");
785 clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start);
787 struct kevent evt[SEND_EVENT_MAX];
790 nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL);
792 err(1, "kevent failed");
794 for (i = 0; i < nevt; ++i) {
795 struct conn_ctx *conn = evt[i].udata;
798 if (conn->c_s == -2) {
809 off = conn->c_stat % buflen;
812 n = sendfile(fd, conn->c_s, off, len, NULL,
814 if (n == 0 || (n < 0 && errno == EAGAIN))
817 n = write(conn->c_s, buf, buflen);
821 if (errno != EAGAIN) {
823 clock_gettime(CLOCK_MONOTONIC_PRECISE,
834 clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end);