bc9b0e4faf5cb302e1b1130be6a58765e4099456
[dragonfly.git] / tools / tools / netrate / kq_sendrecv / kq_sendcli / kq_sendcli.c
1 #include <sys/param.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/queue.h>
5 #include <sys/socket.h>
6 #include <sys/sysctl.h>
7 #include <sys/time.h>
8
9 #include <machine/atomic.h>
10 #ifdef __FreeBSD__
11 #include <machine/cpu.h>
12 #endif
13 #include <machine/cpufunc.h>
14
15 #include <arpa/inet.h>
16 #include <netinet/in.h>
17
18 #include <err.h>
19 #include <errno.h>
20 #include <pthread.h>
21 #include <pthread_np.h>
22 #include <signal.h>
23 #include <stdbool.h>
24 #include <stdio.h>
25 #include <stdint.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29
30 #include "kq_sendrecv_proto.h"
31
32 /*
33  * Note about the sender start synchronization.
34  *
35  * We apply two stage synchronization.  The first stage uses pthread
36  * condition (it sleeps), which waits for the establishment for all
37  * connections, which could be slow.  The second stage uses g_nwait
38  * of send_globctx; all relevant threads spin on g_nwait.  The main
39  * thread spin-waits for all senders to increase g_nwait.  The sender
40  * thread increases the g_nwait, then it spin-waits for main thread
41  * to reset g_nwait.  In this way, we can make sure that all senders
42  * start roughly at the same time.
43  */
44
45 #ifndef timespecsub
46 #define timespecsub(vvp, uvp)                                           \
47         do {                                                            \
48                 (vvp)->tv_sec -= (uvp)->tv_sec;                         \
49                 (vvp)->tv_nsec -= (uvp)->tv_nsec;                       \
50                 if ((vvp)->tv_nsec < 0) {                               \
51                         (vvp)->tv_sec--;                                \
52                         (vvp)->tv_nsec += 1000000000;                   \
53                 }                                                       \
54         } while (0)
55 #endif
56
57 #ifndef timespeccmp
58 #define timespeccmp(tvp, uvp, cmp)                                      \
59         (((tvp)->tv_sec == (uvp)->tv_sec) ?                             \
60             ((tvp)->tv_nsec cmp (uvp)->tv_nsec) :                       \
61             ((tvp)->tv_sec cmp (uvp)->tv_sec))
62 #endif
63
64 #if 0
65 #define SEND_DEBUG
66 #endif
67 #if 0
68 #define SEND_TIME_DEBUG
69 #endif
70
71 #define SEND_DUR                10
72 #define SEND_EVENT_MAX          64
73 #define SEND_BUFLEN             (128 * 1024)
74
75 /*
76  * The successful 3-way handshake on the connection does not mean the
77  * remote application can accept(2) this connection.  Even worse, the
78  * remote side's network stack may drop the connection silently, i.e.
79  * w/o RST.  If this happened, the blocking read(2) would not return,
80  * until the keepalive kicked in, which would take quite some time.
81  * This is obviously not what we want here, so use synthetic timeout
82  * for blocking read(2).  Here, we will retry if a blocking read(2)
83  * times out.
84  */
85 #define SEND_READTO_MS          1000            /* unit: ms */
86
87 #if defined(__DragonFly__)
88 #define SEND_CONN_CTX_ALIGN     __VM_CACHELINE_SIZE
89 #elif defined(__FreeBSD__)
90 #define SEND_CONN_CTX_ALIGN     CACHE_LINE_SIZE
91 #else
92 #define SEND_CONN_CTX_ALIGN     64      /* XXX */
93 #endif
94
95 struct conn_ctx {
96         int                     c_s;
97         int                     c_err;
98         uint64_t                c_stat;
99         struct timespec         c_terr;
100
101         STAILQ_ENTRY(conn_ctx)  c_glob_link;
102         STAILQ_ENTRY(conn_ctx)  c_link;
103         struct sockaddr_in      c_in;
104         int                     c_thr_id;
105 } __aligned(SEND_CONN_CTX_ALIGN);
106
107 STAILQ_HEAD(conn_ctx_list, conn_ctx);
108
109 struct send_globctx {
110         struct conn_ctx_list    g_conn;
111
112         int                     g_dur;
113         int                     g_nconn;
114         pthread_mutex_t         g_lock;
115         pthread_cond_t          g_cond;
116
117         volatile u_int          g_nwait;
118         int                     g_readto_ms;    /* unit: ms */
119         int                     g_buflen;
120         bool                    g_sendfile;
121 };
122
123 struct send_thrctx {
124         struct conn_ctx_list    t_conn;
125         pthread_mutex_t         t_lock;
126         pthread_cond_t          t_cond;
127
128         struct send_globctx     *t_glob;
129         struct timespec         t_start;
130         struct timespec         t_end;
131         double                  t_run_us;       /* unit: us */
132
133         pthread_t               t_tid;
134         int                     t_id;
135 };
136
137 static void     send_build_addrlist(const struct sockaddr_in *, int,
138                     const struct sockaddr_in **, int *, int);
139 static void     *send_thread(void *);
140
141 static __inline void
142 send_spinwait(void)
143 {
144 #if defined(__DragonFly__)
145         cpu_pause();
146 #elif defined(__FreeBSD__)
147         cpu_spinwait();
148 #else
149         /* XXX nothing */
150 #endif
151 }
152
153 static void
154 usage(const char *cmd)
155 {
156         fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] "
157             "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-S] [-E] "
158             "[-b buflen] [-B]\n", cmd);
159         exit(2);
160 }
161
162 int
163 main(int argc, char *argv[])
164 {
165         struct send_globctx glob;
166         struct send_thrctx *ctx_arr, *ctx;
167         struct sockaddr_in *in_arr, *in;
168         const struct sockaddr_in *daddr;
169         struct timespec run, end, start;
170         double total_run_us, total, conn_min, conn_max;
171         double jain, jain_res;
172         int jain_cnt;
173         struct conn_ctx *conn;
174         sigset_t sigset;
175         int opt, i, ncpus;
176         int in_arr_cnt, in_arr_sz, ndaddr;
177         int nthr, nconn, dur, readto_ms, buflen;
178         int log_err, err_cnt, has_minmax;
179         u_short port = RECV_PORT;
180         uint32_t idx;
181         size_t sz;
182         bool do_sendfile = false, bindcpu = false;
183
184         sigemptyset(&sigset);
185         sigaddset(&sigset, SIGPIPE);
186         if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
187                 err(1, "sigprocmask failed");
188
189         sz = sizeof(ncpus);
190         if (sysctlbyname("hw.ncpu", &ncpus, &sz, NULL, 0) < 0)
191                 err(1, "sysctl hw.ncpu failed");
192         nthr = ncpus;
193
194         in_arr_sz = 4;
195         in_arr_cnt = 0;
196         in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in));
197         if (in_arr == NULL)
198                 err(1, "malloc failed");
199
200         log_err = 0;
201         nconn = 0;
202         dur = SEND_DUR;
203         readto_ms = SEND_READTO_MS;
204         buflen = SEND_BUFLEN;
205
206         while ((opt = getopt(argc, argv, "4:BESb:c:l:p:r:t:")) != -1) {
207                 switch (opt) {
208                 case '4':
209                         if (in_arr_cnt == in_arr_sz) {
210                                 in_arr_sz *= 2;
211                                 in_arr = reallocf(in_arr,
212                                     in_arr_sz * sizeof(struct sockaddr_in));
213                                 if (in_arr == NULL)
214                                         err(1, "reallocf failed");
215                         }
216                         in = &in_arr[in_arr_cnt];
217                         ++in_arr_cnt;
218
219                         memset(in, 0, sizeof(*in));
220                         in->sin_family = AF_INET;
221                         if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0)
222                                 errx(1, "inet_pton failed %s", optarg);
223                         break;
224
225                 case 'B':
226                         bindcpu = true;
227                         break;
228
229                 case 'E':
230                         log_err = 1;
231                         break;
232
233                 case 'S':
234                         do_sendfile = true;
235                         break;
236
237                 case 'b':
238                         buflen = strtol(optarg, NULL, 10);
239                         if (buflen <= 0)
240                                 errx(1, "invalid -b");
241                         break;
242
243                 case 'c':
244                         nconn = strtol(optarg, NULL, 10);
245                         if (nconn <= 0)
246                                 errx(1, "invalid -c");
247                         break;
248
249                 case 'l':
250                         dur = strtoul(optarg, NULL, 10);
251                         if (dur == 0)
252                                 errx(1, "invalid -l");
253                         break;
254
255                 case 'p':
256                         port = strtoul(optarg, NULL, 10);
257                         break;
258
259                 case 'r':
260                         readto_ms = strtol(optarg, NULL, 10);
261                         if (readto_ms <= 0)
262                                 errx(1, "invalid -r");
263                         break;
264
265                 case 't':
266                         nthr = strtol(optarg, NULL, 10);
267                         if (nthr <= 0)
268                                 errx(1, "invalid -t");
269                         break;
270
271                 default:
272                         usage(argv[0]);
273                 }
274         }
275         if (in_arr_cnt == 0 || nconn == 0)
276                 errx(1, "either -4 or -c are specified");
277
278         if (nthr > nconn)
279                 nthr = nconn;
280
281         for (i = 0; i < in_arr_cnt; ++i)
282                 in_arr[i].sin_port = htons(port);
283
284         ctx_arr = calloc(nthr, sizeof(struct send_thrctx));
285         if (ctx_arr == NULL)
286                 err(1, "calloc failed");
287
288         memset(&glob, 0, sizeof(glob));
289         STAILQ_INIT(&glob.g_conn);
290         glob.g_nconn = nconn;
291         glob.g_nwait = 1; /* count self in */
292         glob.g_dur = dur;
293         glob.g_readto_ms = readto_ms;
294         glob.g_sendfile = do_sendfile;
295         glob.g_buflen = buflen;
296         pthread_mutex_init(&glob.g_lock, NULL);
297         pthread_cond_init(&glob.g_cond, NULL);
298
299         pthread_set_name_np(pthread_self(), "main");
300
301         /* Build receiver address list */
302         send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms);
303
304         /*
305          * Start senders.
306          */
307         for (i = 0; i < nthr; ++i) {
308                 pthread_attr_t attr;
309                 int error;
310
311                 ctx = &ctx_arr[i];
312                 STAILQ_INIT(&ctx->t_conn);
313                 ctx->t_id = i;
314                 ctx->t_glob = &glob;
315                 pthread_mutex_init(&ctx->t_lock, NULL);
316                 pthread_cond_init(&ctx->t_cond, NULL);
317
318                 pthread_attr_init(&attr);
319                 if (bindcpu) {
320 #ifdef __FreeBSD__
321                         cpuset_t mask;
322 #else
323                         cpu_set_t mask;
324 #endif
325
326                         CPU_ZERO(&mask);
327                         CPU_SET(i % ncpus, &mask);
328                         error = pthread_attr_setaffinity_np(&attr,
329                             sizeof(mask), &mask);
330                         if (error) {
331                                 errc(1, error, "pthread_attr_setaffinity_np "
332                                     "failed");
333                         }
334                 }
335
336                 error = pthread_create(&ctx->t_tid, &attr, send_thread, ctx);
337                 if (error)
338                         errc(1, error, "pthread_create failed");
339                 pthread_attr_destroy(&attr);
340         }
341
342         /*
343          * Distribute connections to senders.
344          *
345          * NOTE:
346          * We start from a random position in the address list, so that the
347          * first several receiving servers will not be abused, if the number
348          * of connections is small and there are many clients.
349          */
350         idx = arc4random_uniform(ndaddr);
351         for (i = 0; i < nconn; ++i) {
352                 const struct sockaddr_in *da;
353
354                 da = &daddr[idx % ndaddr];
355                 ++idx;
356
357                 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
358                 if (conn == NULL)
359                         err(1, "aligned_alloc failed");
360                 memset(conn, 0, sizeof(*conn));
361                 conn->c_in = *da;
362                 conn->c_s = -1;
363
364                 ctx = &ctx_arr[i % nthr];
365                 conn->c_thr_id = ctx->t_id;
366
367                 pthread_mutex_lock(&ctx->t_lock);
368                 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
369                 pthread_mutex_unlock(&ctx->t_lock);
370                 pthread_cond_signal(&ctx->t_cond);
371
372                 /* Add to the global list for results gathering */
373                 STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link);
374         }
375
376         /*
377          * No more connections; notify the senders.
378          *
379          * NOTE:
380          * The marker for 'the end of connection list' has 0 in its
381          * c_in.sin_port.
382          */
383         for (i = 0; i < nthr; ++i) {
384                 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
385                 if (conn == NULL)
386                         err(1, "aligned_alloc failed");
387                 memset(conn, 0, sizeof(*conn));
388                 conn->c_s = -1;
389
390                 ctx = &ctx_arr[i];
391                 pthread_mutex_lock(&ctx->t_lock);
392                 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
393                 pthread_mutex_unlock(&ctx->t_lock);
394                 pthread_cond_signal(&ctx->t_cond);
395         }
396
397         /*
398          * Sender start sync, stage 1:
399          * Wait for connections establishment (slow).
400          */
401         pthread_mutex_lock(&glob.g_lock);
402         while (glob.g_nconn != 0)
403                 pthread_cond_wait(&glob.g_cond, &glob.g_lock);
404         pthread_mutex_unlock(&glob.g_lock);
405
406         /*
407          * Sender start sync, stage 2:
408          * Wait for senders to spin-wait; and once all senders spin-wait,
409          * release them by resetting g_nwait.
410          */
411         while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0)
412                 send_spinwait();
413
414         fprintf(stderr, "start %d seconds sending test: %d threads, "
415             "%d connections\n", dur, nthr, nconn);
416
417         /*
418          * Wait for the senders to finish and gather the results.
419          */
420
421         memset(&end, 0, sizeof(end));           /* XXX stupid gcc warning */
422         memset(&start, 0, sizeof(start));       /* XXX stupid gcc warning */
423
424         for (i = 0; i < nthr; ++i) {
425                 ctx = &ctx_arr[i];
426                 pthread_join(ctx->t_tid, NULL);
427
428                 run = ctx->t_end;
429                 timespecsub(&run, &ctx->t_start);
430                 ctx->t_run_us = ((double)run.tv_sec * 1000000.0) +
431                     ((double)run.tv_nsec / 1000.0);
432
433                 if (i == 0) {
434                         start = ctx->t_start;
435                         end = ctx->t_end;
436                 } else {
437                         if (timespeccmp(&start, &ctx->t_start, >))
438                                 start = ctx->t_start;
439                         if (timespeccmp(&end, &ctx->t_end, <))
440                                 end = ctx->t_end;
441                 }
442
443 #ifdef SEND_TIME_DEBUG
444                 fprintf(stderr, "start %ld.%ld, end %ld.%ld\n",
445                     ctx->t_start.tv_sec, ctx->t_start.tv_nsec,
446                     ctx->t_end.tv_sec, ctx->t_end.tv_nsec);
447 #endif
448         }
449
450 #ifdef SEND_TIME_DEBUG
451         fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n",
452             start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec);
453 #endif
454
455         run = end;
456         timespecsub(&run, &start);
457         total_run_us = ((double)run.tv_sec * 1000000.0) +
458             ((double)run.tv_nsec / 1000.0);
459         total = 0.0;
460
461         err_cnt = 0;
462         has_minmax = 0;
463         conn_min = 0.0;
464         conn_max = 0.0;
465
466         jain = 0.0;
467         jain_res = 0.0;
468         jain_cnt = 0;
469
470         STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
471                 total += conn->c_stat;
472                 if (conn->c_err == 0) {
473                         double perf;    /* unit: Mbps */
474
475                         perf = (conn->c_stat * 8.0) /
476                             ctx_arr[conn->c_thr_id].t_run_us;
477                         if (!has_minmax) {
478                                 conn_min = perf;
479                                 conn_max = perf;
480                                 has_minmax = 1;
481                         } else {
482                                 if (perf > conn_max)
483                                         conn_max = perf;
484                                 if (perf < conn_min)
485                                         conn_min = perf;
486                         }
487                         jain += (perf * perf);
488                         jain_res += perf;
489                         ++jain_cnt;
490                 } else {
491                         ++err_cnt;
492                 }
493         }
494
495         jain *= jain_cnt;
496         jain = (jain_res * jain_res) / jain;
497
498         printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, "
499             "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max,
500             jain, err_cnt);
501
502         if (log_err && err_cnt) {
503                 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
504                         char name[INET_ADDRSTRLEN];
505                         double tmp_run;
506
507                         if (conn->c_err == 0)
508                                 continue;
509
510                         run = conn->c_terr;
511                         timespecsub(&run, &ctx_arr[conn->c_thr_id].t_start);
512                         tmp_run = ((double)run.tv_sec * 1000000.0) +
513                             ((double)run.tv_nsec / 1000.0);
514                         fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, "
515                             "errno %d\n",
516                             conn->c_thr_id,
517                             inet_ntop(AF_INET, &conn->c_in.sin_addr,
518                                 name, sizeof(name)),
519                             ntohs(conn->c_in.sin_port),
520                             run.tv_sec, (conn->c_stat * 8.0) / tmp_run,
521                             conn->c_err);
522                         --err_cnt;
523                         if (err_cnt == 0)
524                                 break;
525                 }
526         }
527
528         exit(0);
529 }
530
531 static void
532 send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt,
533     const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms)
534 {
535         struct sockaddr_in *daddr;
536         struct timeval readto;
537         int i, ndaddr;
538
539         daddr = NULL;
540         ndaddr = 0;
541
542         memset(&readto, 0, sizeof(readto));
543         readto.tv_sec = readto_ms / 1000;
544         readto.tv_usec = (readto_ms % 1000) * 1000;
545
546         for (i = 0; i < in_arr_cnt; ++i) {
547                 const struct sockaddr_in *in = &in_arr[i];
548                 struct recv_info info_hdr;
549                 uint16_t *ports;
550                 int s, n, ports_sz, d;
551
552 again:
553                 s = socket(AF_INET, SOCK_STREAM, 0);
554                 if (s < 0)
555                         err(1, "socket failed");
556
557                 if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0)
558                         err(1, "connect failed");
559
560                 if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO,
561                     &readto, sizeof(readto)) < 0)
562                         err(1, "setsockopt(RCVTIMEO) failed");
563
564                 n = read(s, &info_hdr, sizeof(info_hdr));
565                 if (n != sizeof(info_hdr)) {
566                         if (n < 0) {
567                                 if (errno == EAGAIN) {
568                                         close(s);
569                                         goto again;
570                                 }
571                                 err(1, "read info hdr failed");
572                         } else {
573                                 errx(1, "read truncated info hdr");
574                         }
575                 }
576                 if (info_hdr.ndport == 0) {
577                         close(s);
578                         continue;
579                 }
580
581                 ports_sz = info_hdr.ndport * sizeof(uint16_t);
582                 ports = malloc(ports_sz);
583                 if (ports == NULL)
584                         err(1, "malloc failed");
585
586                 n = read(s, ports, ports_sz);
587                 if (n != ports_sz) {
588                         if (n < 0) {
589                                 if (errno == EAGAIN) {
590                                         free(ports);
591                                         close(s);
592                                         goto again;
593                                 }
594                                 err(1, "read ports failed");
595                         } else {
596                                 errx(1, "read truncated ports");
597                         }
598                 }
599
600                 daddr = reallocf(daddr,
601                     (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in));
602                 if (daddr == NULL)
603                         err(1, "reallocf failed");
604
605                 for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) {
606                         struct sockaddr_in *da = &daddr[d];
607
608                         *da = *in;
609                         da->sin_port = ports[d - ndaddr];
610                 }
611                 ndaddr += info_hdr.ndport;
612
613                 free(ports);
614                 close(s);
615         }
616
617 #ifdef SEND_DEBUG
618         for (i = 0; i < ndaddr; ++i) {
619                 const struct sockaddr_in *da = &daddr[i];
620                 char name[INET_ADDRSTRLEN];
621
622                 fprintf(stderr, "%s:%d\n",
623                     inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)),
624                     ntohs(da->sin_port));
625         }
626 #endif
627
628         *daddr0 = daddr;
629         *ndaddr0 = ndaddr;
630 }
631
632 static void *
633 send_thread(void *xctx)
634 {
635         struct send_thrctx *ctx = xctx;
636         struct conn_ctx *timeo;
637         struct kevent chg_evt;
638         uint8_t *buf;
639         int nconn = 0, kq, n, fd = -1, buflen;
640         char name[32];
641
642         snprintf(name, sizeof(name), "snd%d", ctx->t_id);
643         pthread_set_name_np(pthread_self(), name);
644
645         buflen = ctx->t_glob->g_buflen;
646         buf = malloc(buflen);
647         if (buf == NULL)
648                 err(1, "malloc(%d) failed", buflen);
649
650         if (ctx->t_glob->g_sendfile) {
651                 char filename[] = "sendtmpXXX";
652
653                 fd = mkstemp(filename);
654                 if (fd < 0)
655                         err(1, "mkstemp failed");
656                 if (write(fd, buf, buflen) != buflen)
657                         err(1, "write to file failed");
658                 unlink(filename);
659                 free(buf);
660                 buf = NULL;
661         }
662
663         kq = kqueue();
664         if (kq < 0)
665                 err(1, "kqueue failed");
666
667         /*
668          * Establish the connections assigned to us and add the
669          * established connections to kqueue.
670          */
671         for (;;) {
672 #ifdef SEND_DEBUG
673                 char addr_name[INET_ADDRSTRLEN];
674 #endif
675                 struct timeval readto;
676                 struct conn_ctx *conn;
677                 struct conn_ack ack;
678                 int on;
679
680                 pthread_mutex_lock(&ctx->t_lock);
681                 while (STAILQ_EMPTY(&ctx->t_conn))
682                         pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
683                 conn = STAILQ_FIRST(&ctx->t_conn);
684                 STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link);
685                 pthread_mutex_unlock(&ctx->t_lock);
686
687                 if (conn->c_in.sin_port == 0) {
688                         /*
689                          * The marker for 'the end of connection list'.
690                          * See the related comment in main thread.
691                          *
692                          * NOTE:
693                          * We reuse the marker as the udata for the
694                          * kqueue timer.
695                          */
696                         timeo = conn;
697                         break;
698                 }
699
700                 ++nconn;
701 #ifdef SEND_DEBUG
702                 fprintf(stderr, "%s %s:%d\n", name,
703                     inet_ntop(AF_INET, &conn->c_in.sin_addr,
704                         addr_name, sizeof(addr_name)),
705                     ntohs(conn->c_in.sin_port));
706 #endif
707
708 again:
709                 conn->c_s = socket(AF_INET, SOCK_STREAM, 0);
710                 if (conn->c_s < 0)
711                         err(1, "socket failed");
712
713                 if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in,
714                     sizeof(conn->c_in)) < 0)
715                         err(1, "connect failed");
716
717                 memset(&readto, 0, sizeof(readto));
718                 readto.tv_sec = ctx->t_glob->g_readto_ms / 1000;
719                 readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000;
720                 if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto,
721                     sizeof(readto)) < 0)
722                         err(1, "setsockopt(RCVTIMEO) failed");
723
724                 n = read(conn->c_s, &ack, sizeof(ack));
725                 if (n != sizeof(ack)) {
726                         if (n < 0) {
727                                 if (errno == EAGAIN) {
728                                         close(conn->c_s);
729                                         goto again;
730                                 }
731                                 err(1, "read ack failed");
732                         } else {
733                                 errx(1, "read truncated ack");
734                         }
735                 }
736
737                 on = 1;
738                 if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0)
739                         err(1, "ioctl(FIONBIO) failed");
740
741                 EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn);
742                 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
743                 if (n < 0)
744                         err(1, "kevent add failed");
745         }
746 #ifdef SEND_DEBUG
747         fprintf(stderr, "%s conn %d\n", name, nconn);
748 #endif
749
750         /*
751          * Sender start sync, stage 1:
752          * Wait for connections establishment (slow).
753          */
754         pthread_mutex_lock(&ctx->t_glob->g_lock);
755         ctx->t_glob->g_nconn -= nconn;
756         pthread_cond_broadcast(&ctx->t_glob->g_cond);
757         while (ctx->t_glob->g_nconn != 0)
758                 pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock);
759         pthread_mutex_unlock(&ctx->t_glob->g_lock);
760
761         /*
762          * Sender start sync, stage2.
763          */
764         /* Increase the g_nwait. */
765         atomic_add_int(&ctx->t_glob->g_nwait, 1);
766         /* Spin-wait for main thread to release us (reset g_nwait). */
767         while (ctx->t_glob->g_nwait)
768                 send_spinwait();
769
770 #ifdef SEND_DEBUG
771         fprintf(stderr, "%s start\n", name);
772 #endif
773
774         /*
775          * Wire a kqueue timer, so that the sending can be terminated
776          * as requested.
777          *
778          * NOTE:
779          * Set -2 to c_s for timer udata, so we could distinguish it
780          * from real connections.
781          */
782         timeo->c_s = -2;
783         EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0,
784             ctx->t_glob->g_dur * 1000L, timeo);
785         n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
786         if (n < 0)
787                 err(1, "kevent add failed");
788
789         clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start);
790         for (;;) {
791                 struct kevent evt[SEND_EVENT_MAX];
792                 int nevt, i;
793
794                 nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL);
795                 if (nevt < 0)
796                         err(1, "kevent failed");
797
798                 for (i = 0; i < nevt; ++i) {
799                         struct conn_ctx *conn = evt[i].udata;
800
801                         if (conn->c_s < 0) {
802                                 if (conn->c_s == -2) {
803                                         /* Timer expired */
804                                         goto done;
805                                 }
806                                 continue;
807                         }
808
809                         if (fd >= 0) {
810                                 off_t m, off;
811                                 size_t len;
812
813                                 off = conn->c_stat % buflen;
814                                 len = buflen - off;
815
816                                 n = sendfile(fd, conn->c_s, off, len, NULL,
817                                     &m, 0);
818                                 if (n == 0 || (n < 0 && errno == EAGAIN))
819                                         n = m;
820                         } else {
821                                 n = write(conn->c_s, buf, buflen);
822                         }
823
824                         if (n < 0) {
825                                 if (errno != EAGAIN) {
826                                         conn->c_err = errno;
827                                         clock_gettime(CLOCK_MONOTONIC_PRECISE,
828                                             &conn->c_terr);
829                                         close(conn->c_s);
830                                         conn->c_s = -1;
831                                 }
832                         } else {
833                                 conn->c_stat += n;
834                         }
835                 }
836         }
837 done:
838         clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end);
839
840         if (fd >= 0)
841                 close(fd);
842         if (buf != NULL)
843                 free(buf);
844         return NULL;
845 }