tools/kq_sendrecv: Implement kqueue(2) based TCP traffic generator
[dragonfly.git] / tools / tools / netrate / kq_sendrecv / kq_sendcli / kq_sendcli.c
1 #include <sys/param.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/queue.h>
5 #include <sys/socket.h>
6 #include <sys/sysctl.h>
7 #include <sys/time.h>
8
9 #include <machine/atomic.h>
10 #ifdef __FreeBSD__
11 #include <machine/cpu.h>
12 #endif
13 #include <machine/cpufunc.h>
14
15 #include <arpa/inet.h>
16 #include <netinet/in.h>
17
18 #include <err.h>
19 #include <errno.h>
20 #include <pthread.h>
21 #include <pthread_np.h>
22 #include <signal.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28
29 #include "kq_sendrecv_proto.h"
30
31 /*
32  * Note about the sender start synchronization.
33  *
34  * We apply two stage synchronization.  The first stage uses pthread
35  * condition (it sleeps), which waits for the establishment for all
36  * connections, which could be slow.  The second stage uses g_nwait
37  * of send_globctx; all relevant threads spin on g_nwait.  The main
38  * thread spin-waits for all senders to increase g_nwait.  The sender
39  * thread increases the g_nwait, then it spin-waits for main thread
40  * to reset g_nwait.  In this way, we can make sure that all senders
41  * start roughly at the same time.
42  */
43
44 #ifndef timespecsub
45 #define timespecsub(vvp, uvp)                                           \
46         do {                                                            \
47                 (vvp)->tv_sec -= (uvp)->tv_sec;                         \
48                 (vvp)->tv_nsec -= (uvp)->tv_nsec;                       \
49                 if ((vvp)->tv_nsec < 0) {                               \
50                         (vvp)->tv_sec--;                                \
51                         (vvp)->tv_nsec += 1000000000;                   \
52                 }                                                       \
53         } while (0)
54 #endif
55
56 #ifndef timespeccmp
57 #define timespeccmp(tvp, uvp, cmp)                                      \
58         (((tvp)->tv_sec == (uvp)->tv_sec) ?                             \
59             ((tvp)->tv_nsec cmp (uvp)->tv_nsec) :                       \
60             ((tvp)->tv_sec cmp (uvp)->tv_sec))
61 #endif
62
63 #if 0
64 #define SEND_DEBUG
65 #endif
66 #if 0
67 #define SEND_TIME_DEBUG
68 #endif
69
70 #define SEND_DUR                10
71 #define SEND_EVENT_MAX          64
72 #define SEND_BUFLEN             (128 * 1024)
73
74 /*
75  * The successful 3-way handshake on the connection does not mean the
76  * remote application can accept(2) this connection.  Even worse, the
77  * remote side's network stack may drop the connection silently, i.e.
78  * w/o RST.  If this happened, the blocking read(2) would not return,
79  * until the keepalive kicked in, which would take quite some time.
80  * This is obviously not what we want here, so use synthetic timeout
81  * for blocking read(2).  Here, we will retry if a blocking read(2)
82  * times out.
83  */
84 #define SEND_READTO_MS          1000            /* unit: ms */
85
86 #if defined(__DragonFly__)
87 #define SEND_CONN_CTX_ALIGN     __VM_CACHELINE_SIZE
88 #elif defined(__FreeBSD__)
89 #define SEND_CONN_CTX_ALIGN     CACHE_LINE_SIZE
90 #else
91 #define SEND_CONN_CTX_ALIGN     64      /* XXX */
92 #endif
93
94 struct conn_ctx {
95         int                     c_s;
96         int                     c_err;
97         uint64_t                c_stat;
98         struct timespec         c_terr;
99
100         STAILQ_ENTRY(conn_ctx)  c_glob_link;
101         STAILQ_ENTRY(conn_ctx)  c_link;
102         struct sockaddr_in      c_in;
103         int                     c_thr_id;
104 } __aligned(SEND_CONN_CTX_ALIGN);
105
106 STAILQ_HEAD(conn_ctx_list, conn_ctx);
107
108 struct send_globctx {
109         struct conn_ctx_list    g_conn;
110
111         int                     g_dur;
112         int                     g_nconn;
113         pthread_mutex_t         g_lock;
114         pthread_cond_t          g_cond;
115
116         volatile u_int          g_nwait;
117         int                     g_readto_ms;    /* unit: ms */
118 };
119
120 struct send_thrctx {
121         struct conn_ctx_list    t_conn;
122         pthread_mutex_t         t_lock;
123         pthread_cond_t          t_cond;
124
125         struct send_globctx     *t_glob;
126         struct timespec         t_start;
127         struct timespec         t_end;
128         double                  t_run_us;       /* unit: us */
129
130         pthread_t               t_tid;
131         int                     t_id;
132 };
133
134 static void     send_build_addrlist(const struct sockaddr_in *, int,
135                     const struct sockaddr_in **, int *, int);
136 static void     *send_thread(void *);
137
138 static __inline void
139 send_spinwait(void)
140 {
141 #if defined(__DragonFly__)
142         cpu_pause();
143 #elif defined(__FreeBSD__)
144         cpu_spinwait();
145 #else
146         /* XXX nothing */
147 #endif
148 }
149
150 static void
151 usage(const char *cmd)
152 {
153         fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] "
154             "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-E]\n", cmd);
155         exit(2);
156 }
157
158 int
159 main(int argc, char *argv[])
160 {
161         struct send_globctx glob;
162         struct send_thrctx *ctx_arr, *ctx;
163         struct sockaddr_in *in_arr, *in;
164         const struct sockaddr_in *daddr;
165         struct timespec run, end, start;
166         double total_run_us, total, conn_min, conn_max;
167         double jain, jain_res;
168         int jain_cnt;
169         struct conn_ctx *conn;
170         sigset_t sigset;
171         int opt, i;
172         int in_arr_cnt, in_arr_sz, ndaddr;
173         int nthr, nconn, dur, readto_ms;
174         int log_err, err_cnt, has_minmax;
175         u_short port = RECV_PORT;
176         uint32_t idx;
177         size_t sz;
178
179         sigemptyset(&sigset);
180         sigaddset(&sigset, SIGPIPE);
181         if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
182                 err(1, "sigprocmask failed");
183
184         sz = sizeof(nthr);
185         if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0)
186                 err(1, "sysctl hw.ncpu failed");
187
188         in_arr_sz = 4;
189         in_arr_cnt = 0;
190         in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in));
191         if (in_arr == NULL)
192                 err(1, "malloc failed");
193
194         log_err = 0;
195         nconn = 0;
196         dur = SEND_DUR;
197         readto_ms = SEND_READTO_MS;
198
199         while ((opt = getopt(argc, argv, "4:Ec:l:p:r:t:")) != -1) {
200                 switch (opt) {
201                 case '4':
202                         if (in_arr_cnt == in_arr_sz) {
203                                 in_arr_sz *= 2;
204                                 in_arr = reallocf(in_arr,
205                                     in_arr_sz * sizeof(struct sockaddr_in));
206                                 if (in_arr == NULL)
207                                         err(1, "reallocf failed");
208                         }
209                         in = &in_arr[in_arr_cnt];
210                         ++in_arr_cnt;
211
212                         memset(in, 0, sizeof(*in));
213                         in->sin_family = AF_INET;
214                         if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0)
215                                 errx(1, "inet_pton failed %s", optarg);
216                         break;
217
218                 case 'E':
219                         log_err = 1;
220                         break;
221
222                 case 'c':
223                         nconn = strtol(optarg, NULL, 10);
224                         if (nconn <= 0)
225                                 errx(1, "invalid -c");
226                         break;
227
228                 case 'l':
229                         dur = strtoul(optarg, NULL, 10);
230                         if (dur == 0)
231                                 errx(1, "invalid -l");
232                         break;
233
234                 case 'p':
235                         port = strtoul(optarg, NULL, 10);
236                         break;
237
238                 case 'r':
239                         readto_ms = strtol(optarg, NULL, 10);
240                         if (readto_ms <= 0)
241                                 errx(1, "invalid -r");
242                         break;
243
244                 case 't':
245                         nthr = strtol(optarg, NULL, 10);
246                         if (nthr <= 0)
247                                 errx(1, "invalid -t");
248                         break;
249
250                 default:
251                         usage(argv[0]);
252                 }
253         }
254         if (in_arr_cnt == 0 || nconn == 0)
255                 errx(1, "neither -4 nor -c are specified");
256
257         if (nthr > nconn)
258                 nthr = nconn;
259
260         for (i = 0; i < in_arr_cnt; ++i)
261                 in_arr[i].sin_port = htons(port);
262
263         ctx_arr = calloc(nthr, sizeof(struct send_thrctx));
264         if (ctx_arr == NULL)
265                 err(1, "calloc failed");
266
267         memset(&glob, 0, sizeof(glob));
268         STAILQ_INIT(&glob.g_conn);
269         glob.g_nconn = nconn;
270         glob.g_nwait = 1; /* count self in */
271         glob.g_dur = dur;
272         glob.g_readto_ms = readto_ms;
273         pthread_mutex_init(&glob.g_lock, NULL);
274         pthread_cond_init(&glob.g_cond, NULL);
275
276         pthread_set_name_np(pthread_self(), "main");
277
278         /* Build receiver address list */
279         send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms);
280
281         /*
282          * Start senders.
283          */
284         for (i = 0; i < nthr; ++i) {
285                 int error;
286
287                 ctx = &ctx_arr[i];
288                 STAILQ_INIT(&ctx->t_conn);
289                 ctx->t_id = i;
290                 ctx->t_glob = &glob;
291                 pthread_mutex_init(&ctx->t_lock, NULL);
292                 pthread_cond_init(&ctx->t_cond, NULL);
293
294                 error = pthread_create(&ctx->t_tid, NULL, send_thread, ctx);
295                 if (error)
296                         errc(1, error, "pthread_create failed");
297         }
298
299         /*
300          * Distribute connections to senders.
301          *
302          * NOTE:
303          * We start from a random position in the address list, so that the
304          * first several receiving servers will not be abused, if the number
305          * of connections is small and there are many clients.
306          */
307         idx = arc4random_uniform(ndaddr);
308         for (i = 0; i < nconn; ++i) {
309                 const struct sockaddr_in *da;
310
311                 da = &daddr[idx % ndaddr];
312                 ++idx;
313
314                 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
315                 if (conn == NULL)
316                         err(1, "aligned_alloc failed");
317                 memset(conn, 0, sizeof(*conn));
318                 conn->c_in = *da;
319                 conn->c_s = -1;
320
321                 ctx = &ctx_arr[i % nthr];
322                 conn->c_thr_id = ctx->t_id;
323
324                 pthread_mutex_lock(&ctx->t_lock);
325                 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
326                 pthread_mutex_unlock(&ctx->t_lock);
327                 pthread_cond_signal(&ctx->t_cond);
328
329                 /* Add to the global list for results gathering */
330                 STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link);
331         }
332
333         /*
334          * No more connections; notify the senders.
335          *
336          * NOTE:
337          * The marker for 'the end of connection list' has 0 in its
338          * c_in.sin_port.
339          */
340         for (i = 0; i < nthr; ++i) {
341                 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
342                 if (conn == NULL)
343                         err(1, "aligned_alloc failed");
344                 memset(conn, 0, sizeof(*conn));
345                 conn->c_s = -1;
346
347                 ctx = &ctx_arr[i];
348                 pthread_mutex_lock(&ctx->t_lock);
349                 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
350                 pthread_mutex_unlock(&ctx->t_lock);
351                 pthread_cond_signal(&ctx->t_cond);
352         }
353
354         /*
355          * Sender start sync, stage 1:
356          * Wait for connections establishment (slow).
357          */
358         pthread_mutex_lock(&glob.g_lock);
359         while (glob.g_nconn != 0)
360                 pthread_cond_wait(&glob.g_cond, &glob.g_lock);
361         pthread_mutex_unlock(&glob.g_lock);
362
363         /*
364          * Sender start sync, stage 2:
365          * Wait for senders to spin-wait; and once all senders spin-wait,
366          * release them by resetting g_nwait.
367          */
368         while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0)
369                 send_spinwait();
370
371         fprintf(stderr, "start %d seconds sending test: %d threads, "
372             "%d connections\n", dur, nthr, nconn);
373
374         /*
375          * Wait for the senders to finish and gather the results.
376          */
377
378         memset(&end, 0, sizeof(end));           /* XXX stupid gcc warning */
379         memset(&start, 0, sizeof(start));       /* XXX stupid gcc warning */
380
381         for (i = 0; i < nthr; ++i) {
382                 ctx = &ctx_arr[i];
383                 pthread_join(ctx->t_tid, NULL);
384
385                 run = ctx->t_end;
386                 timespecsub(&run, &ctx->t_start);
387                 ctx->t_run_us = ((double)run.tv_sec * 1000000.0) +
388                     ((double)run.tv_nsec / 1000.0);
389
390                 if (i == 0) {
391                         start = ctx->t_start;
392                         end = ctx->t_end;
393                 } else {
394                         if (timespeccmp(&start, &ctx->t_start, >))
395                                 start = ctx->t_start;
396                         if (timespeccmp(&end, &ctx->t_end, <))
397                                 end = ctx->t_end;
398                 }
399
400 #ifdef SEND_TIME_DEBUG
401                 fprintf(stderr, "start %ld.%ld, end %ld.%ld\n",
402                     ctx->t_start.tv_sec, ctx->t_start.tv_nsec,
403                     ctx->t_end.tv_sec, ctx->t_end.tv_nsec);
404 #endif
405         }
406
407 #ifdef SEND_TIME_DEBUG
408         fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n",
409             start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec);
410 #endif
411
412         run = end;
413         timespecsub(&run, &start);
414         total_run_us = ((double)run.tv_sec * 1000000.0) +
415             ((double)run.tv_nsec / 1000.0);
416         total = 0.0;
417
418         err_cnt = 0;
419         has_minmax = 0;
420         conn_min = 0.0;
421         conn_max = 0.0;
422
423         jain = 0.0;
424         jain_res = 0.0;
425         jain_cnt = 0;
426
427         STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
428                 total += conn->c_stat;
429                 if (conn->c_err == 0) {
430                         double perf;    /* unit: Mbps */
431
432                         perf = (conn->c_stat * 8.0) /
433                             ctx_arr[conn->c_thr_id].t_run_us;
434                         if (!has_minmax) {
435                                 conn_min = perf;
436                                 conn_max = perf;
437                                 has_minmax = 1;
438                         } else {
439                                 if (perf > conn_max)
440                                         conn_max = perf;
441                                 if (perf < conn_min)
442                                         conn_min = perf;
443                         }
444                         jain += (perf * perf);
445                         jain_res += perf;
446                         ++jain_cnt;
447                 } else {
448                         ++err_cnt;
449                 }
450         }
451
452         jain *= jain_cnt;
453         jain = (jain_res * jain_res) / jain;
454
455         printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, "
456             "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max,
457             jain, err_cnt);
458
459         if (log_err && err_cnt) {
460                 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
461                         char name[INET_ADDRSTRLEN];
462                         double tmp_run;
463
464                         if (conn->c_err == 0)
465                                 continue;
466
467                         run = conn->c_terr;
468                         timespecsub(&run, &ctx_arr[conn->c_thr_id].t_start);
469                         tmp_run = ((double)run.tv_sec * 1000000.0) +
470                             ((double)run.tv_nsec / 1000.0);
471                         fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, "
472                             "errno %d\n",
473                             conn->c_thr_id,
474                             inet_ntop(AF_INET, &conn->c_in.sin_addr,
475                                 name, sizeof(name)),
476                             ntohs(conn->c_in.sin_port),
477                             run.tv_sec, (conn->c_stat * 8.0) / tmp_run,
478                             conn->c_err);
479                         --err_cnt;
480                         if (err_cnt == 0)
481                                 break;
482                 }
483         }
484
485         exit(0);
486 }
487
488 static void
489 send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt,
490     const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms)
491 {
492         struct sockaddr_in *daddr;
493         struct timeval readto;
494         int i, ndaddr;
495
496         daddr = NULL;
497         ndaddr = 0;
498
499         memset(&readto, 0, sizeof(readto));
500         readto.tv_sec = readto_ms / 1000;
501         readto.tv_usec = (readto_ms % 1000) * 1000;
502
503         for (i = 0; i < in_arr_cnt; ++i) {
504                 const struct sockaddr_in *in = &in_arr[i];
505                 struct recv_info info_hdr;
506                 uint16_t *ports;
507                 int s, n, ports_sz, d;
508
509 again:
510                 s = socket(AF_INET, SOCK_STREAM, 0);
511                 if (s < 0)
512                         err(1, "socket failed");
513
514                 if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0)
515                         err(1, "connect failed");
516
517                 if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO,
518                     &readto, sizeof(readto)) < 0)
519                         err(1, "setsockopt(RCVTIMEO) failed");
520
521                 n = read(s, &info_hdr, sizeof(info_hdr));
522                 if (n != sizeof(info_hdr)) {
523                         if (n < 0) {
524                                 if (errno == EAGAIN) {
525                                         close(s);
526                                         goto again;
527                                 }
528                                 err(1, "read info hdr failed");
529                         } else {
530                                 errx(1, "read truncated info hdr");
531                         }
532                 }
533                 if (info_hdr.ndport == 0) {
534                         close(s);
535                         continue;
536                 }
537
538                 ports_sz = info_hdr.ndport * sizeof(uint16_t);
539                 ports = malloc(ports_sz);
540                 if (ports == NULL)
541                         err(1, "malloc failed");
542
543                 n = read(s, ports, ports_sz);
544                 if (n != ports_sz) {
545                         if (n < 0) {
546                                 if (errno == EAGAIN) {
547                                         free(ports);
548                                         close(s);
549                                         goto again;
550                                 }
551                                 err(1, "read ports failed");
552                         } else {
553                                 errx(1, "read truncated ports");
554                         }
555                 }
556
557                 daddr = reallocf(daddr,
558                     (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in));
559                 if (daddr == NULL)
560                         err(1, "reallocf failed");
561
562                 for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) {
563                         struct sockaddr_in *da = &daddr[d];
564
565                         *da = *in;
566                         da->sin_port = ports[d - ndaddr];
567                 }
568                 ndaddr += info_hdr.ndport;
569
570                 free(ports);
571                 close(s);
572         }
573
574 #ifdef SEND_DEBUG
575         for (i = 0; i < ndaddr; ++i) {
576                 const struct sockaddr_in *da = &daddr[i];
577                 char name[INET_ADDRSTRLEN];
578
579                 fprintf(stderr, "%s:%d\n",
580                     inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)),
581                     ntohs(da->sin_port));
582         }
583 #endif
584
585         *daddr0 = daddr;
586         *ndaddr0 = ndaddr;
587 }
588
589 static void *
590 send_thread(void *xctx)
591 {
592         struct send_thrctx *ctx = xctx;
593         struct conn_ctx *timeo;
594         struct kevent chg_evt;
595         uint8_t *buf;
596         int nconn = 0, kq, n;
597         char name[32];
598
599         snprintf(name, sizeof(name), "snd%d", ctx->t_id);
600         pthread_set_name_np(pthread_self(), name);
601
602         buf = malloc(SEND_BUFLEN);
603         if (buf == NULL)
604                 err(1, "malloc failed");
605
606         kq = kqueue();
607         if (kq < 0)
608                 err(1, "kqueue failed");
609
610         /*
611          * Establish the connections assigned to us and add the
612          * established connections to kqueue.
613          */
614         for (;;) {
615 #ifdef SEND_DEBUG
616                 char addr_name[INET_ADDRSTRLEN];
617 #endif
618                 struct timeval readto;
619                 struct conn_ctx *conn;
620                 struct conn_ack ack;
621                 int on;
622
623                 pthread_mutex_lock(&ctx->t_lock);
624                 while (STAILQ_EMPTY(&ctx->t_conn))
625                         pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
626                 conn = STAILQ_FIRST(&ctx->t_conn);
627                 STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link);
628                 pthread_mutex_unlock(&ctx->t_lock);
629
630                 if (conn->c_in.sin_port == 0) {
631                         /*
632                          * The marker for 'the end of connection list'.
633                          * See the related comment in main thread.
634                          *
635                          * NOTE:
636                          * We reuse the marker as the udata for the
637                          * kqueue timer.
638                          */
639                         timeo = conn;
640                         break;
641                 }
642
643                 ++nconn;
644 #ifdef SEND_DEBUG
645                 fprintf(stderr, "%s %s:%d\n", name,
646                     inet_ntop(AF_INET, &conn->c_in.sin_addr,
647                         addr_name, sizeof(addr_name)),
648                     ntohs(conn->c_in.sin_port));
649 #endif
650
651 again:
652                 conn->c_s = socket(AF_INET, SOCK_STREAM, 0);
653                 if (conn->c_s < 0)
654                         err(1, "socket failed");
655
656                 if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in,
657                     sizeof(conn->c_in)) < 0)
658                         err(1, "connect failed");
659
660                 memset(&readto, 0, sizeof(readto));
661                 readto.tv_sec = ctx->t_glob->g_readto_ms / 1000;
662                 readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000;
663                 if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto,
664                     sizeof(readto)) < 0)
665                         err(1, "setsockopt(RCVTIMEO) failed");
666
667                 n = read(conn->c_s, &ack, sizeof(ack));
668                 if (n != sizeof(ack)) {
669                         if (n < 0) {
670                                 if (errno == EAGAIN) {
671                                         close(conn->c_s);
672                                         goto again;
673                                 }
674                                 err(1, "read ack failed");
675                         } else {
676                                 errx(1, "read truncated ack");
677                         }
678                 }
679
680                 on = 1;
681                 if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0)
682                         err(1, "ioctl(FIONBIO) failed");
683
684                 EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn);
685                 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
686                 if (n < 0)
687                         err(1, "kevent add failed");
688         }
689 #ifdef SEND_DEBUG
690         fprintf(stderr, "%s conn %d\n", name, nconn);
691 #endif
692
693         /*
694          * Sender start sync, stage 1:
695          * Wait for connections establishment (slow).
696          */
697         pthread_mutex_lock(&ctx->t_glob->g_lock);
698         ctx->t_glob->g_nconn -= nconn;
699         pthread_cond_broadcast(&ctx->t_glob->g_cond);
700         while (ctx->t_glob->g_nconn != 0)
701                 pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock);
702         pthread_mutex_unlock(&ctx->t_glob->g_lock);
703
704         /*
705          * Sender start sync, stage2.
706          */
707         /* Increase the g_nwait. */
708         atomic_add_int(&ctx->t_glob->g_nwait, 1);
709         /* Spin-wait for main thread to release us (reset g_nwait). */
710         while (ctx->t_glob->g_nwait)
711                 send_spinwait();
712
713 #ifdef SEND_DEBUG
714         fprintf(stderr, "%s start\n", name);
715 #endif
716
717         /*
718          * Wire a kqueue timer, so that the sending can be terminated
719          * as requested.
720          *
721          * NOTE:
722          * Set -2 to c_s for timer udata, so we could distinguish it
723          * from real connections.
724          */
725         timeo->c_s = -2;
726         EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0,
727             ctx->t_glob->g_dur * 1000L, timeo);
728         n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
729         if (n < 0)
730                 err(1, "kevent add failed");
731
732         clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start);
733         for (;;) {
734                 struct kevent evt[SEND_EVENT_MAX];
735                 int nevt, i;
736
737                 nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL);
738                 if (nevt < 0)
739                         err(1, "kevent failed");
740
741                 for (i = 0; i < nevt; ++i) {
742                         struct conn_ctx *conn = evt[i].udata;
743
744                         if (conn->c_s < 0) {
745                                 if (conn->c_s == -2) {
746                                         /* Timer expired */
747                                         goto done;
748                                 }
749                                 continue;
750                         }
751
752                         n = write(conn->c_s, buf, SEND_BUFLEN);
753                         if (n < 0) {
754                                 if (errno != EAGAIN) {
755                                         conn->c_err = errno;
756                                         clock_gettime(CLOCK_MONOTONIC_PRECISE,
757                                             &conn->c_terr);
758                                         close(conn->c_s);
759                                         conn->c_s = -1;
760                                 }
761                         } else {
762                                 conn->c_stat += n;
763                         }
764                 }
765         }
766 done:
767         clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end);
768         return NULL;
769 }