tools/kq_recvserv: Add option to set receiving thread's CPU affinity.
[dragonfly.git] / tools / tools / netrate / kq_sendrecv / kq_recvserv / kq_recvserv.c
1 #include <sys/types.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/socket.h>
5 #include <sys/sysctl.h>
6
7 #include <arpa/inet.h>
8 #include <netinet/in.h>
9
10 #include <err.h>
11 #include <errno.h>
12 #include <pthread.h>
13 #include <pthread_np.h>
14 #include <signal.h>
15 #include <stdio.h>
16 #include <stdint.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <unistd.h>
20
21 #include "kq_sendrecv_proto.h"
22
23 #define RECV_EVENT_MAX          64
24 #define RECV_BUFLEN             (128 * 1024)
25
26 struct recv_thrctx {
27         int                     t_id;
28         struct sockaddr_in      t_in;
29
30         pthread_mutex_t         t_lock;
31         pthread_cond_t          t_cond;
32
33         pthread_t               t_tid;
34 };
35
36 static void     *recv_thread(void *);
37
38 static int      recv_buflen = RECV_BUFLEN;
39 static int      recv_reuseport = 0;
40 static int      recv_bindcpu = 0;
41
42 static void
43 usage(const char *cmd)
44 {
45         fprintf(stderr, "%s [-4 addr4] [-p port] [-t nthreads] [-D] [-R] [-B] "
46             "[-b buflen]\n", cmd);
47         exit(2);
48 }
49
50 int
51 main(int argc, char *argv[])
52 {
53         struct recv_thrctx *ctx_arr;
54         struct recv_info *info;
55         struct sockaddr_in in;
56         sigset_t sigset;
57         int opt, s, on, nthr, i, info_sz, do_daemon;
58         size_t sz;
59
60         sigemptyset(&sigset);
61         sigaddset(&sigset, SIGPIPE);
62         if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
63                 err(1, "sigprocmask failed");
64
65         sz = sizeof(nthr);
66         if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0)
67                 err(1, "sysctl hw.ncpu failed");
68
69         memset(&in, 0, sizeof(in));
70         in.sin_family = AF_INET;
71         in.sin_addr.s_addr = htonl(INADDR_ANY);
72         in.sin_port = htons(RECV_PORT);
73
74         do_daemon = 1;
75
76         while ((opt = getopt(argc, argv, "4:BDRb:p:t:")) != -1) {
77                 switch (opt) {
78                 case '4':
79                         if (inet_pton(AF_INET, optarg, &in.sin_addr) <= 0)
80                                 errx(1, "inet_pton failed %s", optarg);
81                         break;
82
83                 case 'B':
84                         recv_bindcpu = 1;
85                         break;
86
87                 case 'D':
88                         do_daemon = 0;
89                         break;
90
91                 case 'R':
92 #ifdef __DragonFly__
93                         recv_reuseport = 1;
94 #else
95                         /* Not supported on other BSDs */
96 #endif
97                         break;
98
99                 case 'b':
100                         recv_buflen = strtol(optarg, NULL, 10);
101                         if (recv_buflen <= 0)
102                                 errx(1, "invalid -b");
103                         break;
104
105                 case 'p':
106                         in.sin_port = htons(strtoul(optarg, NULL, 10));
107                         break;
108
109                 case 't':
110                         nthr = strtol(optarg, NULL, 10);
111                         if (nthr <= 0)
112                                 errx(1, "invalid -t");
113                         break;
114
115                 default:
116                         usage(argv[0]);
117                 }
118         }
119
120         s = socket(AF_INET, SOCK_STREAM, 0);
121         if (s < 0)
122                 err(1, "socket failed");
123
124         on = 1;
125         if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
126                 err(1, "setsockopt(REUSEPADDR) failed");
127
128         if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0)
129                 err(1, "bind failed");
130
131         if (listen(s, -1) < 0)
132                 err(1, "listen failed");
133
134         ctx_arr = calloc(nthr, sizeof(struct recv_thrctx));
135         if (ctx_arr == NULL)
136                 err(1, "calloc failed");
137
138         info_sz = __offsetof(struct recv_info, dport[nthr]);
139         info = calloc(1, info_sz);
140         if (info == NULL)
141                 err(1, "calloc failed");
142         info->ndport = nthr;
143
144         if (do_daemon)
145                 daemon(0, 0);
146
147         pthread_set_name_np(pthread_self(), "main");
148
149         for (i = 0; i < nthr; ++i) {
150                 struct recv_thrctx *ctx = &ctx_arr[i];
151                 int error;
152
153                 ctx->t_in = in;
154                 ctx->t_in.sin_port = 0;
155
156                 ctx->t_id = i;
157                 pthread_mutex_init(&ctx->t_lock, NULL);
158                 pthread_cond_init(&ctx->t_cond, NULL);
159
160                 /* Start receiver */
161                 error = pthread_create(&ctx->t_tid, NULL, recv_thread, ctx);
162                 if (error)
163                         errc(1, error, "pthread_create %d failed", i);
164
165                 /*
166                  * Wait for the receiver to select a proper data port
167                  * and start a listen socket on the data port.
168                  */
169                 pthread_mutex_lock(&ctx->t_lock);
170                 while (ctx->t_in.sin_port == 0)
171                         pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
172                 pthread_mutex_unlock(&ctx->t_lock);
173
174                 info->dport[i] = ctx->t_in.sin_port;
175         }
176
177         /*
178          * Send information, e.g. data ports, back to the clients.
179          */
180         for (;;) {
181                 int s1;
182
183                 s1 = accept(s, NULL, NULL);
184                 if (s1 < 0)
185                         continue;
186                 write(s1, info, info_sz);
187                 close(s1);
188         }
189
190         /* NEVER REACHED */
191         exit(0);
192 }
193
194 static void *
195 recv_thread(void *xctx)
196 {
197         struct recv_thrctx *ctx = xctx;
198         struct kevent change_evt0[RECV_EVENT_MAX];
199         struct conn_ack ack;
200         uint8_t *buf;
201         char name[32];
202         u_short port;
203         int s, kq, nchange;
204
205         /*
206          * Select a proper data port and create a listen socket on it.
207          */
208         if (recv_reuseport)
209                 port = RECV_PORT;
210         else
211                 port = RECV_PORT + ctx->t_id;
212         for (;;) {
213                 struct sockaddr_in in = ctx->t_in;
214                 int on;
215
216                 ++port;
217                 if (port < RECV_PORT)
218                         errx(1, "failed to find a data port");
219                 in.sin_port = htons(port);
220
221                 s = socket(AF_INET, SOCK_STREAM, 0);
222                 if (s < 0)
223                         err(1, "socket failed");
224
225                 on = 1;
226                 if (recv_reuseport) {
227                         if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &on,
228                             sizeof(on)))
229                                 err(1, "setsockopt(REUSEPORT) failed");
230                 } else {
231                         if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on,
232                             sizeof(on)))
233                                 err(1, "setsockopt(REUSEADDR) failed");
234                 }
235
236                 on = 1;
237                 if (ioctl(s, FIONBIO, &on, sizeof(on)) < 0)
238                         err(1, "ioctl(FIONBIO) failed");
239
240                 if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) {
241                         close(s);
242                         continue;
243                 }
244
245                 if (listen(s, -1) < 0)
246                         err(1, "listen failed");
247
248                 if (recv_bindcpu) {
249                         int cpu = -1, error;
250                         cpu_set_t mask;
251
252 #ifdef __DragonFly__
253                         if (recv_reuseport) {
254                                 socklen_t olen;
255
256                                 olen = sizeof(cpu);
257                                 if (getsockopt(s, SOL_SOCKET, SO_CPUHINT,
258                                     &cpu, &olen) < 0)
259                                         err(1, "getsockopt(CPUHINT) failed");
260                         }
261 #endif
262                         if (cpu < 0) {
263                                 int ncpus;
264                                 size_t len;
265
266                                 len = sizeof(ncpus);
267                                 if (sysctlbyname("hw.ncpu", &ncpus, &len,
268                                     NULL, 0) < 0)
269                                         err(1, "sysctlbyname hw.ncpu failed");
270                                 cpu = ctx->t_id % ncpus;
271                         }
272
273                         CPU_ZERO(&mask);
274                         CPU_SET(cpu, &mask);
275                         error = pthread_setaffinity_np(pthread_self(),
276                             sizeof(mask), &mask);
277                         if (error) {
278                                 errc(1, error, "pthread_setaffinity_np cpu%d "
279                                     "failed", cpu);
280                         }
281                 }
282                 break;
283         }
284
285         kq = kqueue();
286         if (kq < 0)
287                 err(1, "kqueue failed");
288
289         buf = malloc(recv_buflen);
290         if (buf == NULL)
291                 err(1, "malloc %d failed", recv_buflen);
292
293         memset(&ack, 0, sizeof(ack));
294
295         snprintf(name, sizeof(name), "rcv%d %d", ctx->t_id, port);
296         pthread_set_name_np(pthread_self(), name);
297
298         /*
299          * Inform the main thread that we are ready.
300          */
301         pthread_mutex_lock(&ctx->t_lock);
302         ctx->t_in.sin_port = htons(port);
303         pthread_mutex_unlock(&ctx->t_lock);
304         pthread_cond_signal(&ctx->t_cond);
305
306         EV_SET(&change_evt0[0], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
307         nchange = 1;
308
309         for (;;) {
310                 const struct kevent *change_evt = NULL;
311                 struct kevent evt[RECV_EVENT_MAX];
312                 int nevt, i;
313
314                 if (nchange > 0)
315                         change_evt = change_evt0;
316
317                 nevt = kevent(kq, change_evt, nchange, evt, RECV_EVENT_MAX,
318                     NULL);
319                 if (nevt < 0)
320                         err(1, "kevent failed");
321                 nchange = 0;
322
323                 for (i = 0; i < nevt; ++i) {
324                         int n;
325
326                         if (evt[i].ident == (u_int)s) {
327                                 while (nchange < RECV_EVENT_MAX) {
328                                         int s1;
329
330                                         s1 = accept(s, NULL, NULL);
331                                         if (s1 < 0)
332                                                 break;
333
334                                         /* TODO: keepalive */
335
336                                         n = write(s1, &ack, sizeof(ack));
337                                         if (n != sizeof(ack)) {
338                                                 close(s1);
339                                                 continue;
340                                         }
341
342                                         EV_SET(&change_evt0[nchange], s1,
343                                             EVFILT_READ, EV_ADD, 0, 0, NULL);
344                                         ++nchange;
345                                 }
346                         } else {
347                                 n = read(evt[i].ident, buf, recv_buflen);
348                                 if (n <= 0) {
349                                         if (n == 0 || errno != EAGAIN)
350                                                 close(evt[i].ident);
351                                 }
352                         }
353                 }
354         }
355
356         /* NEVER REACHED */
357         return NULL;
358 }