tools/kq_recvserv: Add option to set read size
[dragonfly.git] / tools / tools / netrate / kq_sendrecv / kq_recvserv / kq_recvserv.c
1 #include <sys/types.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/socket.h>
5 #include <sys/sysctl.h>
6
7 #include <arpa/inet.h>
8 #include <netinet/in.h>
9
10 #include <err.h>
11 #include <errno.h>
12 #include <pthread.h>
13 #include <pthread_np.h>
14 #include <signal.h>
15 #include <stdio.h>
16 #include <stdint.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <unistd.h>
20
21 #include "kq_sendrecv_proto.h"
22
23 #define RECV_EVENT_MAX          64
24 #define RECV_BUFLEN             (128 * 1024)
25
26 struct recv_thrctx {
27         int                     t_id;
28         struct sockaddr_in      t_in;
29
30         pthread_mutex_t         t_lock;
31         pthread_cond_t          t_cond;
32
33         pthread_t               t_tid;
34 };
35
36 static void     *recv_thread(void *);
37
38 static int      recv_buflen = RECV_BUFLEN;
39
40 static void
41 usage(const char *cmd)
42 {
43         fprintf(stderr, "%s [-4 addr4] [-p port] [-t nthreads] [-D] "
44             "[-b buflen]\n", cmd);
45         exit(2);
46 }
47
48 int
49 main(int argc, char *argv[])
50 {
51         struct recv_thrctx *ctx_arr;
52         struct recv_info *info;
53         struct sockaddr_in in;
54         sigset_t sigset;
55         int opt, s, on, nthr, i, info_sz, do_daemon;
56         size_t sz;
57
58         sigemptyset(&sigset);
59         sigaddset(&sigset, SIGPIPE);
60         if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
61                 err(1, "sigprocmask failed");
62
63         sz = sizeof(nthr);
64         if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0)
65                 err(1, "sysctl hw.ncpu failed");
66
67         memset(&in, 0, sizeof(in));
68         in.sin_family = AF_INET;
69         in.sin_addr.s_addr = htonl(INADDR_ANY);
70         in.sin_port = htons(RECV_PORT);
71
72         do_daemon = 1;
73
74         while ((opt = getopt(argc, argv, "4:Db:p:t:")) != -1) {
75                 switch (opt) {
76                 case '4':
77                         if (inet_pton(AF_INET, optarg, &in.sin_addr) <= 0)
78                                 errx(1, "inet_pton failed %s", optarg);
79                         break;
80
81                 case 'D':
82                         do_daemon = 0;
83                         break;
84
85                 case 'b':
86                         recv_buflen = strtol(optarg, NULL, 10);
87                         if (recv_buflen <= 0)
88                                 errx(1, "invalid -b");
89                         break;
90
91                 case 'p':
92                         in.sin_port = htons(strtoul(optarg, NULL, 10));
93                         break;
94
95                 case 't':
96                         nthr = strtol(optarg, NULL, 10);
97                         if (nthr <= 0)
98                                 errx(1, "invalid -t");
99                         break;
100
101                 default:
102                         usage(argv[0]);
103                 }
104         }
105
106         s = socket(AF_INET, SOCK_STREAM, 0);
107         if (s < 0)
108                 err(1, "socket failed");
109
110         on = 1;
111         if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
112                 err(1, "setsockopt(REUSEPADDR) failed");
113
114         if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0)
115                 err(1, "bind failed");
116
117         if (listen(s, -1) < 0)
118                 err(1, "listen failed");
119
120         ctx_arr = calloc(nthr, sizeof(struct recv_thrctx));
121         if (ctx_arr == NULL)
122                 err(1, "calloc failed");
123
124         info_sz = __offsetof(struct recv_info, dport[nthr]);
125         info = calloc(1, info_sz);
126         if (info == NULL)
127                 err(1, "calloc failed");
128         info->ndport = nthr;
129
130         if (do_daemon)
131                 daemon(0, 0);
132
133         pthread_set_name_np(pthread_self(), "main");
134
135         for (i = 0; i < nthr; ++i) {
136                 struct recv_thrctx *ctx = &ctx_arr[i];
137                 int error;
138
139                 ctx->t_in = in;
140                 ctx->t_in.sin_port = 0;
141
142                 ctx->t_id = i;
143                 pthread_mutex_init(&ctx->t_lock, NULL);
144                 pthread_cond_init(&ctx->t_cond, NULL);
145
146                 /* Start receiver */
147                 error = pthread_create(&ctx->t_tid, NULL, recv_thread, ctx);
148                 if (error)
149                         errc(1, error, "pthread_create %d failed", i);
150
151                 /*
152                  * Wait for the receiver to select a proper data port
153                  * and start a listen socket on the data port.
154                  */
155                 pthread_mutex_lock(&ctx->t_lock);
156                 while (ctx->t_in.sin_port == 0)
157                         pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
158                 pthread_mutex_unlock(&ctx->t_lock);
159
160                 info->dport[i] = ctx->t_in.sin_port;
161         }
162
163         /*
164          * Send information, e.g. data ports, back to the clients.
165          */
166         for (;;) {
167                 int s1;
168
169                 s1 = accept(s, NULL, NULL);
170                 if (s1 < 0)
171                         continue;
172                 write(s1, info, info_sz);
173                 close(s1);
174         }
175
176         /* NEVER REACHED */
177         exit(0);
178 }
179
180 static void *
181 recv_thread(void *xctx)
182 {
183         struct recv_thrctx *ctx = xctx;
184         struct kevent change_evt0[RECV_EVENT_MAX];
185         struct conn_ack ack;
186         uint8_t *buf;
187         char name[32];
188         u_short port;
189         int s, kq, nchange;
190
191         /*
192          * Select a proper data port and create a listen socket on it.
193          */
194         port = RECV_PORT + ctx->t_id;
195         for (;;) {
196                 struct sockaddr_in in = ctx->t_in;
197                 int on;
198
199                 ++port;
200                 if (port < RECV_PORT)
201                         errx(1, "failed to find a data port");
202                 in.sin_port = htons(port);
203
204                 s = socket(AF_INET, SOCK_STREAM, 0);
205                 if (s < 0)
206                         err(1, "socket failed");
207
208                 on = 1;
209                 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
210                         err(1, "setsockopt(REUSEADDR) failed");
211
212                 on = 1;
213                 if (ioctl(s, FIONBIO, &on, sizeof(on)) < 0)
214                         err(1, "ioctl(FIONBIO) failed");
215
216                 if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) {
217                         close(s);
218                         continue;
219                 }
220
221                 if (listen(s, -1) < 0)
222                         err(1, "listen failed");
223
224                 break;
225         }
226
227         kq = kqueue();
228         if (kq < 0)
229                 err(1, "kqueue failed");
230
231         buf = malloc(recv_buflen);
232         if (buf == NULL)
233                 err(1, "malloc %d failed", recv_buflen);
234
235         memset(&ack, 0, sizeof(ack));
236
237         snprintf(name, sizeof(name), "rcv%d %d", ctx->t_id, port);
238         pthread_set_name_np(pthread_self(), name);
239
240         /*
241          * Inform the main thread that we are ready.
242          */
243         pthread_mutex_lock(&ctx->t_lock);
244         ctx->t_in.sin_port = htons(port);
245         pthread_mutex_unlock(&ctx->t_lock);
246         pthread_cond_signal(&ctx->t_cond);
247
248         EV_SET(&change_evt0[0], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
249         nchange = 1;
250
251         for (;;) {
252                 const struct kevent *change_evt = NULL;
253                 struct kevent evt[RECV_EVENT_MAX];
254                 int nevt, i;
255
256                 if (nchange > 0)
257                         change_evt = change_evt0;
258
259                 nevt = kevent(kq, change_evt, nchange, evt, RECV_EVENT_MAX,
260                     NULL);
261                 if (nevt < 0)
262                         err(1, "kevent failed");
263                 nchange = 0;
264
265                 for (i = 0; i < nevt; ++i) {
266                         int n;
267
268                         if (evt[i].ident == (u_int)s) {
269                                 while (nchange < RECV_EVENT_MAX) {
270                                         int s1;
271
272                                         s1 = accept(s, NULL, NULL);
273                                         if (s1 < 0)
274                                                 break;
275
276                                         /* TODO: keepalive */
277
278                                         n = write(s1, &ack, sizeof(ack));
279                                         if (n != sizeof(ack)) {
280                                                 close(s1);
281                                                 continue;
282                                         }
283
284                                         EV_SET(&change_evt0[nchange], s1,
285                                             EVFILT_READ, EV_ADD, 0, 0, NULL);
286                                         ++nchange;
287                                 }
288                         } else {
289                                 n = read(evt[i].ident, buf, recv_buflen);
290                                 if (n <= 0) {
291                                         if (n == 0 || errno != EAGAIN)
292                                                 close(evt[i].ident);
293                                 }
294                         }
295                 }
296         }
297
298         /* NEVER REACHED */
299         return NULL;
300 }