Merge remote-tracking branch 'origin/vendor/BINUTILS227'
[dragonfly.git] / tools / tools / netrate / kq_sendrecv / kq_recvserv / kq_recvserv.c
1 #include <sys/types.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/socket.h>
5 #include <sys/sysctl.h>
6
7 #include <arpa/inet.h>
8 #include <netinet/in.h>
9
10 #include <err.h>
11 #include <errno.h>
12 #include <pthread.h>
13 #include <pthread_np.h>
14 #include <signal.h>
15 #include <stdio.h>
16 #include <stdint.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <unistd.h>
20
21 #include "kq_sendrecv_proto.h"
22
23 #define RECV_EVENT_MAX          64
24 #define RECV_BUFLEN             (128 * 1024)
25
26 struct recv_thrctx {
27         int                     t_id;
28         struct sockaddr_in      t_in;
29
30         pthread_mutex_t         t_lock;
31         pthread_cond_t          t_cond;
32
33         pthread_t               t_tid;
34 };
35
36 static void     *recv_thread(void *);
37
38 static void
39 usage(const char *cmd)
40 {
41         fprintf(stderr, "%s [-4 addr4] [-p port] [-t nthreads] [-D]\n", cmd);
42         exit(2);
43 }
44
45 int
46 main(int argc, char *argv[])
47 {
48         struct recv_thrctx *ctx_arr;
49         struct recv_info *info;
50         struct sockaddr_in in;
51         sigset_t sigset;
52         int opt, s, on, nthr, i, info_sz, do_daemon;
53         size_t sz;
54
55         sigemptyset(&sigset);
56         sigaddset(&sigset, SIGPIPE);
57         if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
58                 err(1, "sigprocmask failed");
59
60         sz = sizeof(nthr);
61         if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0)
62                 err(1, "sysctl hw.ncpu failed");
63
64         memset(&in, 0, sizeof(in));
65         in.sin_family = AF_INET;
66         in.sin_addr.s_addr = htonl(INADDR_ANY);
67         in.sin_port = htons(RECV_PORT);
68
69         do_daemon = 1;
70
71         while ((opt = getopt(argc, argv, "4:Dp:t:")) != -1) {
72                 switch (opt) {
73                 case '4':
74                         if (inet_pton(AF_INET, optarg, &in.sin_addr) <= 0)
75                                 errx(1, "inet_pton failed %s", optarg);
76                         break;
77
78                 case 'D':
79                         do_daemon = 0;
80                         break;
81
82                 case 'p':
83                         in.sin_port = htons(strtoul(optarg, NULL, 10));
84                         break;
85
86                 case 't':
87                         nthr = strtol(optarg, NULL, 10);
88                         if (nthr <= 0)
89                                 errx(1, "invalid -t");
90                         break;
91
92                 default:
93                         usage(argv[0]);
94                 }
95         }
96
97         s = socket(AF_INET, SOCK_STREAM, 0);
98         if (s < 0)
99                 err(1, "socket failed");
100
101         on = 1;
102         if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
103                 err(1, "setsockopt(REUSEPADDR) failed");
104
105         if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0)
106                 err(1, "bind failed");
107
108         if (listen(s, -1) < 0)
109                 err(1, "listen failed");
110
111         ctx_arr = calloc(nthr, sizeof(struct recv_thrctx));
112         if (ctx_arr == NULL)
113                 err(1, "calloc failed");
114
115         info_sz = __offsetof(struct recv_info, dport[nthr]);
116         info = calloc(1, info_sz);
117         if (info == NULL)
118                 err(1, "calloc failed");
119         info->ndport = nthr;
120
121         if (do_daemon)
122                 daemon(0, 0);
123
124         pthread_set_name_np(pthread_self(), "main");
125
126         for (i = 0; i < nthr; ++i) {
127                 struct recv_thrctx *ctx = &ctx_arr[i];
128                 int error;
129
130                 ctx->t_in = in;
131                 ctx->t_in.sin_port = 0;
132
133                 ctx->t_id = i;
134                 pthread_mutex_init(&ctx->t_lock, NULL);
135                 pthread_cond_init(&ctx->t_cond, NULL);
136
137                 /* Start receiver */
138                 error = pthread_create(&ctx->t_tid, NULL, recv_thread, ctx);
139                 if (error)
140                         errc(1, error, "pthread_create %d failed", i);
141
142                 /*
143                  * Wait for the receiver to select a proper data port
144                  * and start a listen socket on the data port.
145                  */
146                 pthread_mutex_lock(&ctx->t_lock);
147                 while (ctx->t_in.sin_port == 0)
148                         pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
149                 pthread_mutex_unlock(&ctx->t_lock);
150
151                 info->dport[i] = ctx->t_in.sin_port;
152         }
153
154         /*
155          * Send information, e.g. data ports, back to the clients.
156          */
157         for (;;) {
158                 int s1;
159
160                 s1 = accept(s, NULL, NULL);
161                 if (s1 < 0)
162                         continue;
163                 write(s1, info, info_sz);
164                 close(s1);
165         }
166
167         /* NEVER REACHED */
168         exit(0);
169 }
170
171 static void *
172 recv_thread(void *xctx)
173 {
174         struct recv_thrctx *ctx = xctx;
175         struct kevent change_evt0[RECV_EVENT_MAX];
176         struct conn_ack ack;
177         uint8_t *buf;
178         char name[32];
179         u_short port;
180         int s, kq, nchange;
181
182         /*
183          * Select a proper data port and create a listen socket on it.
184          */
185         port = RECV_PORT + ctx->t_id;
186         for (;;) {
187                 struct sockaddr_in in = ctx->t_in;
188                 int on;
189
190                 ++port;
191                 if (port < RECV_PORT)
192                         errx(1, "failed to find a data port");
193                 in.sin_port = htons(port);
194
195                 s = socket(AF_INET, SOCK_STREAM, 0);
196                 if (s < 0)
197                         err(1, "socket failed");
198
199                 on = 1;
200                 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
201                         err(1, "setsockopt(REUSEADDR) failed");
202
203                 on = 1;
204                 if (ioctl(s, FIONBIO, &on, sizeof(on)) < 0)
205                         err(1, "ioctl(FIONBIO) failed");
206
207                 if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) {
208                         close(s);
209                         continue;
210                 }
211
212                 if (listen(s, -1) < 0)
213                         err(1, "listen failed");
214
215                 break;
216         }
217
218         kq = kqueue();
219         if (kq < 0)
220                 err(1, "kqueue failed");
221
222         buf = malloc(RECV_BUFLEN);
223         if (buf == NULL)
224                 err(1, "malloc %d failed", RECV_BUFLEN);
225
226         memset(&ack, 0, sizeof(ack));
227
228         snprintf(name, sizeof(name), "rcv%d %d", ctx->t_id, port);
229         pthread_set_name_np(pthread_self(), name);
230
231         /*
232          * Inform the main thread that we are ready.
233          */
234         pthread_mutex_lock(&ctx->t_lock);
235         ctx->t_in.sin_port = htons(port);
236         pthread_mutex_unlock(&ctx->t_lock);
237         pthread_cond_signal(&ctx->t_cond);
238
239         EV_SET(&change_evt0[0], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
240         nchange = 1;
241
242         for (;;) {
243                 const struct kevent *change_evt = NULL;
244                 struct kevent evt[RECV_EVENT_MAX];
245                 int nevt, i;
246
247                 if (nchange > 0)
248                         change_evt = change_evt0;
249
250                 nevt = kevent(kq, change_evt, nchange, evt, RECV_EVENT_MAX,
251                     NULL);
252                 if (nevt < 0)
253                         err(1, "kevent failed");
254                 nchange = 0;
255
256                 for (i = 0; i < nevt; ++i) {
257                         int n;
258
259                         if (evt[i].ident == (u_int)s) {
260                                 while (nchange < RECV_EVENT_MAX) {
261                                         int s1;
262
263                                         s1 = accept(s, NULL, NULL);
264                                         if (s1 < 0)
265                                                 break;
266
267                                         /* TODO: keepalive */
268
269                                         n = write(s1, &ack, sizeof(ack));
270                                         if (n != sizeof(ack)) {
271                                                 close(s1);
272                                                 continue;
273                                         }
274
275                                         EV_SET(&change_evt0[nchange], s1,
276                                             EVFILT_READ, EV_ADD, 0, 0, NULL);
277                                         ++nchange;
278                                 }
279                         } else {
280                                 n = read(evt[i].ident, buf, RECV_BUFLEN);
281                                 if (n <= 0) {
282                                         if (n == 0 || errno != EAGAIN)
283                                                 close(evt[i].ident);
284                                 }
285                         }
286                 }
287         }
288
289         /* NEVER REACHED */
290         return NULL;
291 }