bd213bb8ce188ed86d4cd9cb72ed6447c2ec7542
[dragonfly.git] / tools / tools / netrate / kq_sendrecv / kq_recvserv / kq_recvserv.c
1 #include <sys/types.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/socket.h>
5 #include <sys/sysctl.h>
6
7 #include <arpa/inet.h>
8 #include <netinet/in.h>
9
10 #include <err.h>
11 #include <errno.h>
12 #include <pthread.h>
13 #include <pthread_np.h>
14 #include <signal.h>
15 #include <stdio.h>
16 #include <stdint.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <unistd.h>
20
21 #include "kq_sendrecv_proto.h"
22
23 #define RECV_EVENT_MAX          64
24 #define RECV_BUFLEN             (128 * 1024)
25
26 struct recv_thrctx {
27         int                     t_id;
28         struct sockaddr_in      t_in;
29
30         pthread_mutex_t         t_lock;
31         pthread_cond_t          t_cond;
32
33         pthread_t               t_tid;
34 };
35
36 static void     *recv_thread(void *);
37
38 static int      recv_buflen = RECV_BUFLEN;
39 static int      recv_reuseport = 0;
40
41 static void
42 usage(const char *cmd)
43 {
44         fprintf(stderr, "%s [-4 addr4] [-p port] [-t nthreads] [-D] [-R] "
45             "[-b buflen]\n", cmd);
46         exit(2);
47 }
48
49 int
50 main(int argc, char *argv[])
51 {
52         struct recv_thrctx *ctx_arr;
53         struct recv_info *info;
54         struct sockaddr_in in;
55         sigset_t sigset;
56         int opt, s, on, nthr, i, info_sz, do_daemon;
57         size_t sz;
58
59         sigemptyset(&sigset);
60         sigaddset(&sigset, SIGPIPE);
61         if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
62                 err(1, "sigprocmask failed");
63
64         sz = sizeof(nthr);
65         if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0)
66                 err(1, "sysctl hw.ncpu failed");
67
68         memset(&in, 0, sizeof(in));
69         in.sin_family = AF_INET;
70         in.sin_addr.s_addr = htonl(INADDR_ANY);
71         in.sin_port = htons(RECV_PORT);
72
73         do_daemon = 1;
74
75         while ((opt = getopt(argc, argv, "4:DRb:p:t:")) != -1) {
76                 switch (opt) {
77                 case '4':
78                         if (inet_pton(AF_INET, optarg, &in.sin_addr) <= 0)
79                                 errx(1, "inet_pton failed %s", optarg);
80                         break;
81
82                 case 'D':
83                         do_daemon = 0;
84                         break;
85
86                 case 'R':
87 #ifdef __DragonFly__
88                         recv_reuseport = 1;
89 #else
90                         /* Not supported on other BSDs */
91 #endif
92                         break;
93
94                 case 'b':
95                         recv_buflen = strtol(optarg, NULL, 10);
96                         if (recv_buflen <= 0)
97                                 errx(1, "invalid -b");
98                         break;
99
100                 case 'p':
101                         in.sin_port = htons(strtoul(optarg, NULL, 10));
102                         break;
103
104                 case 't':
105                         nthr = strtol(optarg, NULL, 10);
106                         if (nthr <= 0)
107                                 errx(1, "invalid -t");
108                         break;
109
110                 default:
111                         usage(argv[0]);
112                 }
113         }
114
115         s = socket(AF_INET, SOCK_STREAM, 0);
116         if (s < 0)
117                 err(1, "socket failed");
118
119         on = 1;
120         if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
121                 err(1, "setsockopt(REUSEPADDR) failed");
122
123         if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0)
124                 err(1, "bind failed");
125
126         if (listen(s, -1) < 0)
127                 err(1, "listen failed");
128
129         ctx_arr = calloc(nthr, sizeof(struct recv_thrctx));
130         if (ctx_arr == NULL)
131                 err(1, "calloc failed");
132
133         info_sz = __offsetof(struct recv_info, dport[nthr]);
134         info = calloc(1, info_sz);
135         if (info == NULL)
136                 err(1, "calloc failed");
137         info->ndport = nthr;
138
139         if (do_daemon)
140                 daemon(0, 0);
141
142         pthread_set_name_np(pthread_self(), "main");
143
144         for (i = 0; i < nthr; ++i) {
145                 struct recv_thrctx *ctx = &ctx_arr[i];
146                 int error;
147
148                 ctx->t_in = in;
149                 ctx->t_in.sin_port = 0;
150
151                 ctx->t_id = i;
152                 pthread_mutex_init(&ctx->t_lock, NULL);
153                 pthread_cond_init(&ctx->t_cond, NULL);
154
155                 /* Start receiver */
156                 error = pthread_create(&ctx->t_tid, NULL, recv_thread, ctx);
157                 if (error)
158                         errc(1, error, "pthread_create %d failed", i);
159
160                 /*
161                  * Wait for the receiver to select a proper data port
162                  * and start a listen socket on the data port.
163                  */
164                 pthread_mutex_lock(&ctx->t_lock);
165                 while (ctx->t_in.sin_port == 0)
166                         pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
167                 pthread_mutex_unlock(&ctx->t_lock);
168
169                 info->dport[i] = ctx->t_in.sin_port;
170         }
171
172         /*
173          * Send information, e.g. data ports, back to the clients.
174          */
175         for (;;) {
176                 int s1;
177
178                 s1 = accept(s, NULL, NULL);
179                 if (s1 < 0)
180                         continue;
181                 write(s1, info, info_sz);
182                 close(s1);
183         }
184
185         /* NEVER REACHED */
186         exit(0);
187 }
188
189 static void *
190 recv_thread(void *xctx)
191 {
192         struct recv_thrctx *ctx = xctx;
193         struct kevent change_evt0[RECV_EVENT_MAX];
194         struct conn_ack ack;
195         uint8_t *buf;
196         char name[32];
197         u_short port;
198         int s, kq, nchange;
199
200         /*
201          * Select a proper data port and create a listen socket on it.
202          */
203         if (recv_reuseport)
204                 port = RECV_PORT;
205         else
206                 port = RECV_PORT + ctx->t_id;
207         for (;;) {
208                 struct sockaddr_in in = ctx->t_in;
209                 int on;
210
211                 ++port;
212                 if (port < RECV_PORT)
213                         errx(1, "failed to find a data port");
214                 in.sin_port = htons(port);
215
216                 s = socket(AF_INET, SOCK_STREAM, 0);
217                 if (s < 0)
218                         err(1, "socket failed");
219
220                 on = 1;
221                 if (recv_reuseport) {
222                         if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &on,
223                             sizeof(on)))
224                                 err(1, "setsockopt(REUSEPORT) failed");
225                 } else {
226                         if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on,
227                             sizeof(on)))
228                                 err(1, "setsockopt(REUSEADDR) failed");
229                 }
230
231                 on = 1;
232                 if (ioctl(s, FIONBIO, &on, sizeof(on)) < 0)
233                         err(1, "ioctl(FIONBIO) failed");
234
235                 if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) {
236                         close(s);
237                         continue;
238                 }
239
240                 if (listen(s, -1) < 0)
241                         err(1, "listen failed");
242
243                 break;
244         }
245
246         kq = kqueue();
247         if (kq < 0)
248                 err(1, "kqueue failed");
249
250         buf = malloc(recv_buflen);
251         if (buf == NULL)
252                 err(1, "malloc %d failed", recv_buflen);
253
254         memset(&ack, 0, sizeof(ack));
255
256         snprintf(name, sizeof(name), "rcv%d %d", ctx->t_id, port);
257         pthread_set_name_np(pthread_self(), name);
258
259         /*
260          * Inform the main thread that we are ready.
261          */
262         pthread_mutex_lock(&ctx->t_lock);
263         ctx->t_in.sin_port = htons(port);
264         pthread_mutex_unlock(&ctx->t_lock);
265         pthread_cond_signal(&ctx->t_cond);
266
267         EV_SET(&change_evt0[0], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
268         nchange = 1;
269
270         for (;;) {
271                 const struct kevent *change_evt = NULL;
272                 struct kevent evt[RECV_EVENT_MAX];
273                 int nevt, i;
274
275                 if (nchange > 0)
276                         change_evt = change_evt0;
277
278                 nevt = kevent(kq, change_evt, nchange, evt, RECV_EVENT_MAX,
279                     NULL);
280                 if (nevt < 0)
281                         err(1, "kevent failed");
282                 nchange = 0;
283
284                 for (i = 0; i < nevt; ++i) {
285                         int n;
286
287                         if (evt[i].ident == (u_int)s) {
288                                 while (nchange < RECV_EVENT_MAX) {
289                                         int s1;
290
291                                         s1 = accept(s, NULL, NULL);
292                                         if (s1 < 0)
293                                                 break;
294
295                                         /* TODO: keepalive */
296
297                                         n = write(s1, &ack, sizeof(ack));
298                                         if (n != sizeof(ack)) {
299                                                 close(s1);
300                                                 continue;
301                                         }
302
303                                         EV_SET(&change_evt0[nchange], s1,
304                                             EVFILT_READ, EV_ADD, 0, 0, NULL);
305                                         ++nchange;
306                                 }
307                         } else {
308                                 n = read(evt[i].ident, buf, recv_buflen);
309                                 if (n <= 0) {
310                                         if (n == 0 || errno != EAGAIN)
311                                                 close(evt[i].ident);
312                                 }
313                         }
314                 }
315         }
316
317         /* NEVER REACHED */
318         return NULL;
319 }