<sys/time.h>: Add 3rd arg to timespecadd()/sub() and make them public.
[dragonfly.git] / tools / tools / netrate / kq_sendrecv / kq_sendcli / kq_sendcli.c
1 #include <sys/param.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/queue.h>
5 #include <sys/socket.h>
6 #include <sys/sysctl.h>
7 #include <sys/time.h>
8
9 #include <machine/atomic.h>
10 #ifdef __FreeBSD__
11 #include <machine/cpu.h>
12 #endif
13 #include <machine/cpufunc.h>
14
15 #include <arpa/inet.h>
16 #include <netinet/in.h>
17
18 #include <err.h>
19 #include <errno.h>
20 #include <pthread.h>
21 #include <pthread_np.h>
22 #include <signal.h>
23 #include <stdbool.h>
24 #include <stdio.h>
25 #include <stdint.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29
30 #include "kq_sendrecv_proto.h"
31
32 /*
33  * Note about the sender start synchronization.
34  *
35  * We apply two stage synchronization.  The first stage uses pthread
36  * condition (it sleeps), which waits for the establishment for all
37  * connections, which could be slow.  The second stage uses g_nwait
38  * of send_globctx; all relevant threads spin on g_nwait.  The main
39  * thread spin-waits for all senders to increase g_nwait.  The sender
40  * thread increases the g_nwait, then it spin-waits for main thread
41  * to reset g_nwait.  In this way, we can make sure that all senders
42  * start roughly at the same time.
43  */
44
45 #if 0
46 #define SEND_DEBUG
47 #endif
48 #if 0
49 #define SEND_TIME_DEBUG
50 #endif
51
52 #define SEND_DUR                10
53 #define SEND_EVENT_MAX          64
54 #define SEND_BUFLEN             (128 * 1024)
55
56 /*
57  * The successful 3-way handshake on the connection does not mean the
58  * remote application can accept(2) this connection.  Even worse, the
59  * remote side's network stack may drop the connection silently, i.e.
60  * w/o RST.  If this happened, the blocking read(2) would not return,
61  * until the keepalive kicked in, which would take quite some time.
62  * This is obviously not what we want here, so use synthetic timeout
63  * for blocking read(2).  Here, we will retry if a blocking read(2)
64  * times out.
65  */
66 #define SEND_READTO_MS          1000            /* unit: ms */
67
68 #if defined(__DragonFly__)
69 #define SEND_CONN_CTX_ALIGN     __VM_CACHELINE_SIZE
70 #elif defined(__FreeBSD__)
71 #define SEND_CONN_CTX_ALIGN     CACHE_LINE_SIZE
72 #else
73 #define SEND_CONN_CTX_ALIGN     64      /* XXX */
74 #endif
75
76 struct conn_ctx {
77         int                     c_s;
78         int                     c_err;
79         uint64_t                c_stat;
80         struct timespec         c_terr;
81
82         STAILQ_ENTRY(conn_ctx)  c_glob_link;
83         STAILQ_ENTRY(conn_ctx)  c_link;
84         struct sockaddr_in      c_in;
85         int                     c_thr_id;
86 } __aligned(SEND_CONN_CTX_ALIGN);
87
88 STAILQ_HEAD(conn_ctx_list, conn_ctx);
89
90 struct send_globctx {
91         struct conn_ctx_list    g_conn;
92
93         int                     g_dur;
94         int                     g_nconn;
95         pthread_mutex_t         g_lock;
96         pthread_cond_t          g_cond;
97
98         volatile u_int          g_nwait;
99         int                     g_readto_ms;    /* unit: ms */
100         int                     g_buflen;
101         bool                    g_sendfile;
102 };
103
104 struct send_thrctx {
105         struct conn_ctx_list    t_conn;
106         pthread_mutex_t         t_lock;
107         pthread_cond_t          t_cond;
108
109         struct send_globctx     *t_glob;
110         struct timespec         t_start;
111         struct timespec         t_end;
112         double                  t_run_us;       /* unit: us */
113
114         pthread_t               t_tid;
115         int                     t_id;
116 };
117
118 static void     send_build_addrlist(const struct sockaddr_in *, int,
119                     const struct sockaddr_in **, int *, int);
120 static void     *send_thread(void *);
121
122 static __inline void
123 send_spinwait(void)
124 {
125 #if defined(__DragonFly__)
126         cpu_pause();
127 #elif defined(__FreeBSD__)
128         cpu_spinwait();
129 #else
130         /* XXX nothing */
131 #endif
132 }
133
134 static void
135 usage(const char *cmd)
136 {
137         fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] "
138             "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-S] [-E] "
139             "[-b buflen] [-B]\n", cmd);
140         exit(2);
141 }
142
143 int
144 main(int argc, char *argv[])
145 {
146         struct send_globctx glob;
147         struct send_thrctx *ctx_arr, *ctx;
148         struct sockaddr_in *in_arr, *in;
149         const struct sockaddr_in *daddr;
150         struct timespec run, end, start;
151         double total_run_us, total, conn_min, conn_max;
152         double jain, jain_res;
153         int jain_cnt;
154         struct conn_ctx *conn;
155         sigset_t sigset;
156         int opt, i, ncpus;
157         int in_arr_cnt, in_arr_sz, ndaddr;
158         int nthr, nconn, dur, readto_ms, buflen;
159         int log_err, err_cnt, has_minmax;
160         u_short port = RECV_PORT;
161         uint32_t idx;
162         size_t sz;
163         bool do_sendfile = false, bindcpu = false;
164
165         sigemptyset(&sigset);
166         sigaddset(&sigset, SIGPIPE);
167         if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
168                 err(1, "sigprocmask failed");
169
170         sz = sizeof(ncpus);
171         if (sysctlbyname("hw.ncpu", &ncpus, &sz, NULL, 0) < 0)
172                 err(1, "sysctl hw.ncpu failed");
173         nthr = ncpus;
174
175         in_arr_sz = 4;
176         in_arr_cnt = 0;
177         in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in));
178         if (in_arr == NULL)
179                 err(1, "malloc failed");
180
181         log_err = 0;
182         nconn = 0;
183         dur = SEND_DUR;
184         readto_ms = SEND_READTO_MS;
185         buflen = SEND_BUFLEN;
186
187         while ((opt = getopt(argc, argv, "4:BESb:c:l:p:r:t:")) != -1) {
188                 switch (opt) {
189                 case '4':
190                         if (in_arr_cnt == in_arr_sz) {
191                                 in_arr_sz *= 2;
192                                 in_arr = reallocf(in_arr,
193                                     in_arr_sz * sizeof(struct sockaddr_in));
194                                 if (in_arr == NULL)
195                                         err(1, "reallocf failed");
196                         }
197                         in = &in_arr[in_arr_cnt];
198                         ++in_arr_cnt;
199
200                         memset(in, 0, sizeof(*in));
201                         in->sin_family = AF_INET;
202                         if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0)
203                                 errx(1, "inet_pton failed %s", optarg);
204                         break;
205
206                 case 'B':
207                         bindcpu = true;
208                         break;
209
210                 case 'E':
211                         log_err = 1;
212                         break;
213
214                 case 'S':
215                         do_sendfile = true;
216                         break;
217
218                 case 'b':
219                         buflen = strtol(optarg, NULL, 10);
220                         if (buflen <= 0)
221                                 errx(1, "invalid -b");
222                         break;
223
224                 case 'c':
225                         nconn = strtol(optarg, NULL, 10);
226                         if (nconn <= 0)
227                                 errx(1, "invalid -c");
228                         break;
229
230                 case 'l':
231                         dur = strtoul(optarg, NULL, 10);
232                         if (dur == 0)
233                                 errx(1, "invalid -l");
234                         break;
235
236                 case 'p':
237                         port = strtoul(optarg, NULL, 10);
238                         break;
239
240                 case 'r':
241                         readto_ms = strtol(optarg, NULL, 10);
242                         if (readto_ms <= 0)
243                                 errx(1, "invalid -r");
244                         break;
245
246                 case 't':
247                         nthr = strtol(optarg, NULL, 10);
248                         if (nthr <= 0)
249                                 errx(1, "invalid -t");
250                         break;
251
252                 default:
253                         usage(argv[0]);
254                 }
255         }
256         if (in_arr_cnt == 0 || nconn == 0)
257                 errx(1, "either -4 or -c are specified");
258
259         if (nthr > nconn)
260                 nthr = nconn;
261
262         for (i = 0; i < in_arr_cnt; ++i)
263                 in_arr[i].sin_port = htons(port);
264
265         ctx_arr = calloc(nthr, sizeof(struct send_thrctx));
266         if (ctx_arr == NULL)
267                 err(1, "calloc failed");
268
269         memset(&glob, 0, sizeof(glob));
270         STAILQ_INIT(&glob.g_conn);
271         glob.g_nconn = nconn;
272         glob.g_nwait = 1; /* count self in */
273         glob.g_dur = dur;
274         glob.g_readto_ms = readto_ms;
275         glob.g_sendfile = do_sendfile;
276         glob.g_buflen = buflen;
277         pthread_mutex_init(&glob.g_lock, NULL);
278         pthread_cond_init(&glob.g_cond, NULL);
279
280         pthread_set_name_np(pthread_self(), "main");
281
282         /* Build receiver address list */
283         send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms);
284
285         /*
286          * Start senders.
287          */
288         for (i = 0; i < nthr; ++i) {
289                 pthread_attr_t attr;
290                 int error;
291
292                 ctx = &ctx_arr[i];
293                 STAILQ_INIT(&ctx->t_conn);
294                 ctx->t_id = i;
295                 ctx->t_glob = &glob;
296                 pthread_mutex_init(&ctx->t_lock, NULL);
297                 pthread_cond_init(&ctx->t_cond, NULL);
298
299                 pthread_attr_init(&attr);
300                 if (bindcpu) {
301 #ifdef __FreeBSD__
302                         cpuset_t mask;
303 #else
304                         cpu_set_t mask;
305 #endif
306
307                         CPU_ZERO(&mask);
308                         CPU_SET(i % ncpus, &mask);
309                         error = pthread_attr_setaffinity_np(&attr,
310                             sizeof(mask), &mask);
311                         if (error) {
312                                 errc(1, error, "pthread_attr_setaffinity_np "
313                                     "failed");
314                         }
315                 }
316
317                 error = pthread_create(&ctx->t_tid, &attr, send_thread, ctx);
318                 if (error)
319                         errc(1, error, "pthread_create failed");
320                 pthread_attr_destroy(&attr);
321         }
322
323         /*
324          * Distribute connections to senders.
325          *
326          * NOTE:
327          * We start from a random position in the address list, so that the
328          * first several receiving servers will not be abused, if the number
329          * of connections is small and there are many clients.
330          */
331         idx = arc4random_uniform(ndaddr);
332         for (i = 0; i < nconn; ++i) {
333                 const struct sockaddr_in *da;
334
335                 da = &daddr[idx % ndaddr];
336                 ++idx;
337
338                 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
339                 if (conn == NULL)
340                         err(1, "aligned_alloc failed");
341                 memset(conn, 0, sizeof(*conn));
342                 conn->c_in = *da;
343                 conn->c_s = -1;
344
345                 ctx = &ctx_arr[i % nthr];
346                 conn->c_thr_id = ctx->t_id;
347
348                 pthread_mutex_lock(&ctx->t_lock);
349                 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
350                 pthread_mutex_unlock(&ctx->t_lock);
351                 pthread_cond_signal(&ctx->t_cond);
352
353                 /* Add to the global list for results gathering */
354                 STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link);
355         }
356
357         /*
358          * No more connections; notify the senders.
359          *
360          * NOTE:
361          * The marker for 'the end of connection list' has 0 in its
362          * c_in.sin_port.
363          */
364         for (i = 0; i < nthr; ++i) {
365                 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
366                 if (conn == NULL)
367                         err(1, "aligned_alloc failed");
368                 memset(conn, 0, sizeof(*conn));
369                 conn->c_s = -1;
370
371                 ctx = &ctx_arr[i];
372                 pthread_mutex_lock(&ctx->t_lock);
373                 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
374                 pthread_mutex_unlock(&ctx->t_lock);
375                 pthread_cond_signal(&ctx->t_cond);
376         }
377
378         /*
379          * Sender start sync, stage 1:
380          * Wait for connections establishment (slow).
381          */
382         pthread_mutex_lock(&glob.g_lock);
383         while (glob.g_nconn != 0)
384                 pthread_cond_wait(&glob.g_cond, &glob.g_lock);
385         pthread_mutex_unlock(&glob.g_lock);
386
387         /*
388          * Sender start sync, stage 2:
389          * Wait for senders to spin-wait; and once all senders spin-wait,
390          * release them by resetting g_nwait.
391          */
392         while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0)
393                 send_spinwait();
394
395         fprintf(stderr, "start %d seconds sending test: %d threads, "
396             "%d connections\n", dur, nthr, nconn);
397
398         /*
399          * Wait for the senders to finish and gather the results.
400          */
401
402         memset(&end, 0, sizeof(end));           /* XXX stupid gcc warning */
403         memset(&start, 0, sizeof(start));       /* XXX stupid gcc warning */
404
405         for (i = 0; i < nthr; ++i) {
406                 ctx = &ctx_arr[i];
407                 pthread_join(ctx->t_tid, NULL);
408
409                 timespecsub(&ctx->t_end, &ctx->t_start, &run);
410                 ctx->t_run_us = ((double)run.tv_sec * 1000000.0) +
411                     ((double)run.tv_nsec / 1000.0);
412
413                 if (i == 0) {
414                         start = ctx->t_start;
415                         end = ctx->t_end;
416                 } else {
417                         if (timespeccmp(&start, &ctx->t_start, >))
418                                 start = ctx->t_start;
419                         if (timespeccmp(&end, &ctx->t_end, <))
420                                 end = ctx->t_end;
421                 }
422
423 #ifdef SEND_TIME_DEBUG
424                 fprintf(stderr, "start %ld.%ld, end %ld.%ld\n",
425                     ctx->t_start.tv_sec, ctx->t_start.tv_nsec,
426                     ctx->t_end.tv_sec, ctx->t_end.tv_nsec);
427 #endif
428         }
429
430 #ifdef SEND_TIME_DEBUG
431         fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n",
432             start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec);
433 #endif
434
435         timespecsub(&end, &start, &run);
436         total_run_us = ((double)run.tv_sec * 1000000.0) +
437             ((double)run.tv_nsec / 1000.0);
438         total = 0.0;
439
440         err_cnt = 0;
441         has_minmax = 0;
442         conn_min = 0.0;
443         conn_max = 0.0;
444
445         jain = 0.0;
446         jain_res = 0.0;
447         jain_cnt = 0;
448
449         STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
450                 total += conn->c_stat;
451                 if (conn->c_err == 0) {
452                         double perf;    /* unit: Mbps */
453
454                         perf = (conn->c_stat * 8.0) /
455                             ctx_arr[conn->c_thr_id].t_run_us;
456                         if (!has_minmax) {
457                                 conn_min = perf;
458                                 conn_max = perf;
459                                 has_minmax = 1;
460                         } else {
461                                 if (perf > conn_max)
462                                         conn_max = perf;
463                                 if (perf < conn_min)
464                                         conn_min = perf;
465                         }
466                         jain += (perf * perf);
467                         jain_res += perf;
468                         ++jain_cnt;
469                 } else {
470                         ++err_cnt;
471                 }
472         }
473
474         jain *= jain_cnt;
475         jain = (jain_res * jain_res) / jain;
476
477         printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, "
478             "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max,
479             jain, err_cnt);
480
481         if (log_err && err_cnt) {
482                 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
483                         char name[INET_ADDRSTRLEN];
484                         double tmp_run;
485
486                         if (conn->c_err == 0)
487                                 continue;
488
489                         run = conn->c_terr;
490                         timespecsub(&conn->c_terr,
491                             &ctx_arr[conn->c_thr_id].t_start, &run);
492                         tmp_run = ((double)run.tv_sec * 1000000.0) +
493                             ((double)run.tv_nsec / 1000.0);
494                         fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, "
495                             "errno %d\n",
496                             conn->c_thr_id,
497                             inet_ntop(AF_INET, &conn->c_in.sin_addr,
498                                 name, sizeof(name)),
499                             ntohs(conn->c_in.sin_port),
500                             run.tv_sec, (conn->c_stat * 8.0) / tmp_run,
501                             conn->c_err);
502                         --err_cnt;
503                         if (err_cnt == 0)
504                                 break;
505                 }
506         }
507
508         exit(0);
509 }
510
511 static void
512 send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt,
513     const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms)
514 {
515         struct sockaddr_in *daddr;
516         struct timeval readto;
517         int i, ndaddr;
518
519         daddr = NULL;
520         ndaddr = 0;
521
522         memset(&readto, 0, sizeof(readto));
523         readto.tv_sec = readto_ms / 1000;
524         readto.tv_usec = (readto_ms % 1000) * 1000;
525
526         for (i = 0; i < in_arr_cnt; ++i) {
527                 const struct sockaddr_in *in = &in_arr[i];
528                 struct recv_info info_hdr;
529                 uint16_t *ports;
530                 int s, n, ports_sz, d;
531
532 again:
533                 s = socket(AF_INET, SOCK_STREAM, 0);
534                 if (s < 0)
535                         err(1, "socket failed");
536
537                 if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0)
538                         err(1, "connect failed");
539
540                 if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO,
541                     &readto, sizeof(readto)) < 0)
542                         err(1, "setsockopt(RCVTIMEO) failed");
543
544                 n = read(s, &info_hdr, sizeof(info_hdr));
545                 if (n != sizeof(info_hdr)) {
546                         if (n < 0) {
547                                 if (errno == EAGAIN) {
548                                         close(s);
549                                         goto again;
550                                 }
551                                 err(1, "read info hdr failed");
552                         } else {
553                                 errx(1, "read truncated info hdr");
554                         }
555                 }
556                 if (info_hdr.ndport == 0) {
557                         close(s);
558                         continue;
559                 }
560
561                 ports_sz = info_hdr.ndport * sizeof(uint16_t);
562                 ports = malloc(ports_sz);
563                 if (ports == NULL)
564                         err(1, "malloc failed");
565
566                 n = read(s, ports, ports_sz);
567                 if (n != ports_sz) {
568                         if (n < 0) {
569                                 if (errno == EAGAIN) {
570                                         free(ports);
571                                         close(s);
572                                         goto again;
573                                 }
574                                 err(1, "read ports failed");
575                         } else {
576                                 errx(1, "read truncated ports");
577                         }
578                 }
579
580                 daddr = reallocf(daddr,
581                     (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in));
582                 if (daddr == NULL)
583                         err(1, "reallocf failed");
584
585                 for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) {
586                         struct sockaddr_in *da = &daddr[d];
587
588                         *da = *in;
589                         da->sin_port = ports[d - ndaddr];
590                 }
591                 ndaddr += info_hdr.ndport;
592
593                 free(ports);
594                 close(s);
595         }
596
597 #ifdef SEND_DEBUG
598         for (i = 0; i < ndaddr; ++i) {
599                 const struct sockaddr_in *da = &daddr[i];
600                 char name[INET_ADDRSTRLEN];
601
602                 fprintf(stderr, "%s:%d\n",
603                     inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)),
604                     ntohs(da->sin_port));
605         }
606 #endif
607
608         *daddr0 = daddr;
609         *ndaddr0 = ndaddr;
610 }
611
612 static void *
613 send_thread(void *xctx)
614 {
615         struct send_thrctx *ctx = xctx;
616         struct conn_ctx *timeo;
617         struct kevent chg_evt;
618         uint8_t *buf;
619         int nconn = 0, kq, n, fd = -1, buflen;
620         char name[32];
621
622         snprintf(name, sizeof(name), "snd%d", ctx->t_id);
623         pthread_set_name_np(pthread_self(), name);
624
625         buflen = ctx->t_glob->g_buflen;
626         buf = malloc(buflen);
627         if (buf == NULL)
628                 err(1, "malloc(%d) failed", buflen);
629
630         if (ctx->t_glob->g_sendfile) {
631                 char filename[] = "sendtmpXXX";
632
633                 fd = mkstemp(filename);
634                 if (fd < 0)
635                         err(1, "mkstemp failed");
636                 if (write(fd, buf, buflen) != buflen)
637                         err(1, "write to file failed");
638                 unlink(filename);
639                 free(buf);
640                 buf = NULL;
641         }
642
643         kq = kqueue();
644         if (kq < 0)
645                 err(1, "kqueue failed");
646
647         /*
648          * Establish the connections assigned to us and add the
649          * established connections to kqueue.
650          */
651         for (;;) {
652 #ifdef SEND_DEBUG
653                 char addr_name[INET_ADDRSTRLEN];
654 #endif
655                 struct timeval readto;
656                 struct conn_ctx *conn;
657                 struct conn_ack ack;
658                 int on;
659
660                 pthread_mutex_lock(&ctx->t_lock);
661                 while (STAILQ_EMPTY(&ctx->t_conn))
662                         pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
663                 conn = STAILQ_FIRST(&ctx->t_conn);
664                 STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link);
665                 pthread_mutex_unlock(&ctx->t_lock);
666
667                 if (conn->c_in.sin_port == 0) {
668                         /*
669                          * The marker for 'the end of connection list'.
670                          * See the related comment in main thread.
671                          *
672                          * NOTE:
673                          * We reuse the marker as the udata for the
674                          * kqueue timer.
675                          */
676                         timeo = conn;
677                         break;
678                 }
679
680                 ++nconn;
681 #ifdef SEND_DEBUG
682                 fprintf(stderr, "%s %s:%d\n", name,
683                     inet_ntop(AF_INET, &conn->c_in.sin_addr,
684                         addr_name, sizeof(addr_name)),
685                     ntohs(conn->c_in.sin_port));
686 #endif
687
688 again:
689                 conn->c_s = socket(AF_INET, SOCK_STREAM, 0);
690                 if (conn->c_s < 0)
691                         err(1, "socket failed");
692
693                 if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in,
694                     sizeof(conn->c_in)) < 0)
695                         err(1, "connect failed");
696
697                 memset(&readto, 0, sizeof(readto));
698                 readto.tv_sec = ctx->t_glob->g_readto_ms / 1000;
699                 readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000;
700                 if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto,
701                     sizeof(readto)) < 0)
702                         err(1, "setsockopt(RCVTIMEO) failed");
703
704                 n = read(conn->c_s, &ack, sizeof(ack));
705                 if (n != sizeof(ack)) {
706                         if (n < 0) {
707                                 if (errno == EAGAIN) {
708                                         close(conn->c_s);
709                                         goto again;
710                                 }
711                                 err(1, "read ack failed");
712                         } else {
713                                 errx(1, "read truncated ack");
714                         }
715                 }
716
717                 on = 1;
718                 if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0)
719                         err(1, "ioctl(FIONBIO) failed");
720
721                 EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn);
722                 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
723                 if (n < 0)
724                         err(1, "kevent add failed");
725         }
726 #ifdef SEND_DEBUG
727         fprintf(stderr, "%s conn %d\n", name, nconn);
728 #endif
729
730         /*
731          * Sender start sync, stage 1:
732          * Wait for connections establishment (slow).
733          */
734         pthread_mutex_lock(&ctx->t_glob->g_lock);
735         ctx->t_glob->g_nconn -= nconn;
736         pthread_cond_broadcast(&ctx->t_glob->g_cond);
737         while (ctx->t_glob->g_nconn != 0)
738                 pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock);
739         pthread_mutex_unlock(&ctx->t_glob->g_lock);
740
741         /*
742          * Sender start sync, stage2.
743          */
744         /* Increase the g_nwait. */
745         atomic_add_int(&ctx->t_glob->g_nwait, 1);
746         /* Spin-wait for main thread to release us (reset g_nwait). */
747         while (ctx->t_glob->g_nwait)
748                 send_spinwait();
749
750 #ifdef SEND_DEBUG
751         fprintf(stderr, "%s start\n", name);
752 #endif
753
754         /*
755          * Wire a kqueue timer, so that the sending can be terminated
756          * as requested.
757          *
758          * NOTE:
759          * Set -2 to c_s for timer udata, so we could distinguish it
760          * from real connections.
761          */
762         timeo->c_s = -2;
763         EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0,
764             ctx->t_glob->g_dur * 1000L, timeo);
765         n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
766         if (n < 0)
767                 err(1, "kevent add failed");
768
769         clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start);
770         for (;;) {
771                 struct kevent evt[SEND_EVENT_MAX];
772                 int nevt, i;
773
774                 nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL);
775                 if (nevt < 0)
776                         err(1, "kevent failed");
777
778                 for (i = 0; i < nevt; ++i) {
779                         struct conn_ctx *conn = evt[i].udata;
780
781                         if (conn->c_s < 0) {
782                                 if (conn->c_s == -2) {
783                                         /* Timer expired */
784                                         goto done;
785                                 }
786                                 continue;
787                         }
788
789                         if (fd >= 0) {
790                                 off_t m, off;
791                                 size_t len;
792
793                                 off = conn->c_stat % buflen;
794                                 len = buflen - off;
795
796                                 n = sendfile(fd, conn->c_s, off, len, NULL,
797                                     &m, 0);
798                                 if (n == 0 || (n < 0 && errno == EAGAIN))
799                                         n = m;
800                         } else {
801                                 n = write(conn->c_s, buf, buflen);
802                         }
803
804                         if (n < 0) {
805                                 if (errno != EAGAIN) {
806                                         conn->c_err = errno;
807                                         clock_gettime(CLOCK_MONOTONIC_PRECISE,
808                                             &conn->c_terr);
809                                         close(conn->c_s);
810                                         conn->c_s = -1;
811                                 }
812                         } else {
813                                 conn->c_stat += n;
814                         }
815                 }
816         }
817 done:
818         clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end);
819
820         if (fd >= 0)
821                 close(fd);
822         if (buf != NULL)
823                 free(buf);
824         return NULL;
825 }