ec7d2730a10bbe7a224bdee32856f0d62e8fd66c
[dragonfly.git] / sbin / hammer2 / cmd_service.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 #include <sys/xdiskioctl.h>
39
40 struct diskcon {
41         TAILQ_ENTRY(diskcon) entry;
42         char    *disk;
43 };
44
45 struct service_node_opaque {
46         char    cl_label[64];
47         char    fs_label[64];
48         dmsg_media_block_t block;
49         int     attached;
50         int     servicing;
51         int     servicefd;
52 };
53
54 #define WS " \r\n"
55
56 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
57 pthread_mutex_t diskmtx;
58
59 static void *service_thread(void *data);
60 static void *udev_thread(void *data);
61 static void master_reconnect(const char *mntpt);
62 static void disk_reconnect(const char *disk);
63 static void disk_disconnect(void *handle);
64 static void udev_check_disks(void);
65 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
66
67 static void xdisk_reconnect(struct service_node_opaque *info);
68 static void xdisk_disconnect(void *handle);
69 static void *xdisk_attach_tmpthread(void *data);
70
71 /*
72  * Start-up the master listener daemon for the machine.
73  *
74  * The master listener serves as a rendezvous point in the cluster, accepting
75  * connections, performing registrations and authentications, maintaining
76  * the spanning tree, and keeping track of message state so disconnects can
77  * be handled properly.
78  *
79  * Once authenticated only low-level messaging protocols (which includes
80  * tracking persistent messages) are handled by this daemon.  This daemon
81  * does not run the higher level quorum or locking protocols.
82  *
83  * This daemon can also be told to maintain connections to other nodes,
84  * forming a messaging backbone, which in turn allows PFS's (if desired) to
85  * simply connect to the master daemon via localhost if desired.
86  * Backbones are specified via /etc/hammer2.conf.
87  */
88 int
89 cmd_service(void)
90 {
91         struct sockaddr_in lsin;
92         int on;
93         int lfd;
94
95         /*
96          * Acquire socket and set options
97          */
98         if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
99                 fprintf(stderr, "master_listen: socket(): %s\n",
100                         strerror(errno));
101                 return 1;
102         }
103         on = 1;
104         setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
105
106         /*
107          * Setup listen port and try to bind.  If the bind fails we assume
108          * that a master listener process is already running and silently
109          * fail.
110          */
111         bzero(&lsin, sizeof(lsin));
112         lsin.sin_family = AF_INET;
113         lsin.sin_addr.s_addr = INADDR_ANY;
114         lsin.sin_port = htons(DMSG_LISTEN_PORT);
115         if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
116                 close(lfd);
117                 if (QuietOpt == 0) {
118                         fprintf(stderr,
119                                 "master listen: daemon already running\n");
120                 }
121                 return 0;
122         }
123         if (QuietOpt == 0)
124                 fprintf(stderr, "master listen: startup\n");
125         listen(lfd, 50);
126
127         /*
128          * Fork and disconnect the controlling terminal and parent process,
129          * executing the specified function as a pthread.
130          *
131          * Returns to the original process which can then continue running.
132          * In debug mode this call will create the pthread without forking
133          * and set NormalExit to 0, instead of fork.
134          */
135         hammer2_demon(service_thread, (void *)(intptr_t)lfd);
136         if (NormalExit)
137                 close(lfd);
138         return 0;
139 }
140
141 /*
142  * Master listen/accept thread.  Accept connections on the master socket,
143  * starting a pthread for each one.
144  */
145 static
146 void *
147 service_thread(void *data)
148 {
149         struct sockaddr_in asin;
150         socklen_t alen;
151         pthread_t thread;
152         dmsg_master_service_info_t *info;
153         int lfd = (int)(intptr_t)data;
154         int fd;
155         int i;
156         int count;
157         struct statfs *mntbuf = NULL;
158         struct statvfs *mntvbuf = NULL;
159
160         /*
161          * Nobody waits for us
162          */
163         setproctitle("hammer2 master listen");
164         pthread_detach(pthread_self());
165
166         dmsg_node_handler = service_node_handler;
167
168         /*
169          * Start up a thread to handle block device monitoring
170          */
171         thread = NULL;
172         pthread_create(&thread, NULL, udev_thread, NULL);
173
174         /*
175          * Scan existing hammer2 mounts and reconnect to them using
176          * HAMMER2IOC_RECLUSTER.
177          */
178         count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
179         for (i = 0; i < count; ++i) {
180                 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
181                         master_reconnect(mntbuf[i].f_mntonname);
182         }
183
184         /*
185          * Accept connections and create pthreads to handle them after
186          * validating the IP.
187          */
188         for (;;) {
189                 alen = sizeof(asin);
190                 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
191                 if (fd < 0) {
192                         if (errno == EINTR)
193                                 continue;
194                         break;
195                 }
196                 thread = NULL;
197                 fprintf(stderr, "service_thread: accept fd %d\n", fd);
198                 info = malloc(sizeof(*info));
199                 bzero(info, sizeof(*info));
200                 info->fd = fd;
201                 info->detachme = 1;
202                 info->dbgmsg_callback = hammer2_shell_parse;
203                 info->label = strdup("client");
204                 pthread_create(&thread, NULL, dmsg_master_service, info);
205         }
206         return (NULL);
207 }
208
209 /*
210  * Node discovery code on received SPANs (or loss of SPANs).  This code
211  * is used to track the availability of remote block devices and install
212  * or deinstall them using the xdisk driver (/dev/xdisk).
213  *
214  * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
215  * the data handed to it.  When opened, a virtual circuit is forged and
216  * maintained to the block device server via DMSG.  Temporary failures
217  * stall the device until successfully reconnected or explicitly destroyed.
218  */
219 static
220 void
221 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
222 {
223         struct service_node_opaque *info = *opaquep;
224
225         switch(op) {
226         case DMSG_NODEOP_ADD:
227                 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
228                         break;
229                 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
230                         break;
231                 if (info == NULL) {
232                         info = malloc(sizeof(*info));
233                         bzero(info, sizeof(*info));
234                         *opaquep = info;
235                 }
236                 snprintf(info->cl_label, sizeof(info->cl_label),
237                          "%s", msg->any.lnk_span.cl_label);
238                 snprintf(info->fs_label, sizeof(info->fs_label),
239                          "%s", msg->any.lnk_span.fs_label);
240                 info->block = msg->any.lnk_span.media.block;
241                 fprintf(stderr, "NODE ADD %s serno %s\n",
242                         info->cl_label, info->fs_label);
243                 xdisk_reconnect(info);
244                 break;
245         case DMSG_NODEOP_DEL:
246                 if (info) {
247                         fprintf(stderr, "NODE DEL %s serno %s\n",
248                                 info->cl_label, info->fs_label);
249                         pthread_mutex_lock(&diskmtx);
250                         *opaquep = NULL;
251                         info->attached = 0;
252                         if (info->servicing == 0)
253                                 free(info);
254                         else
255                                 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
256                         pthread_mutex_unlock(&diskmtx);
257                 }
258                 break;
259         default:
260                 break;
261         }
262 }
263
264 /*
265  * Monitor block devices.  Currently polls every ~10 seconds or so.
266  */
267 static
268 void *
269 udev_thread(void *data __unused)
270 {
271         int     fd;
272         int     seq = 0;
273
274         pthread_detach(pthread_self());
275
276         if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
277                 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
278                         UDEV_DEVICE_PATH);
279                 pthread_exit(NULL);
280         }
281         udev_check_disks();
282         while (ioctl(fd, UDEVWAIT, &seq) == 0) {
283                 udev_check_disks();
284                 sleep(1);
285         }
286         return (NULL);
287 }
288
289 /*
290  * Retrieve the list of disk attachments and attempt to export
291  * them.
292  */
293 static
294 void
295 udev_check_disks(void)
296 {
297         char tmpbuf[1024];
298         char *buf = NULL;
299         char *disk;
300         int error;
301         size_t n;
302
303         for (;;) {
304                 n = 0;
305                 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
306                 if (error < 0 || n == 0)
307                         break;
308                 if (n >= sizeof(tmpbuf))
309                         buf = malloc(n + 1);
310                 else
311                         buf = tmpbuf;
312                 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
313                 if (error == 0) {
314                         buf[n] = 0;
315                         break;
316                 }
317                 if (buf != tmpbuf) {
318                         free(buf);
319                         buf = NULL;
320                 }
321                 if (errno != ENOMEM)
322                         break;
323         }
324         if (buf) {
325                 fprintf(stderr, "DISKS: %s\n", buf);
326                 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
327                         disk_reconnect(disk);
328                 }
329                 if (buf != tmpbuf)
330                         free(buf);
331         }
332 }
333
334 /*
335  * Normally the mount program supplies a cluster communications
336  * descriptor to the hammer2 vfs on mount, but if you kill the service
337  * daemon and restart it that link will be lost.
338  *
339  * This procedure attempts to [re]connect to existing mounts when
340  * the service daemon is started up before going into its accept
341  * loop.
342  *
343  * NOTE: A hammer2 mount point can only accomodate one connection at a time
344  *       so this will disconnect any existing connection during the
345  *       reconnect.
346  */
347 static
348 void
349 master_reconnect(const char *mntpt)
350 {
351         struct hammer2_ioc_recluster recls;
352         dmsg_master_service_info_t *info;
353         pthread_t thread;
354         int fd;
355         int pipefds[2];
356
357         fd = open(mntpt, O_RDONLY);
358         if (fd < 0) {
359                 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
360                 return;
361         }
362         if (pipe(pipefds) < 0) {
363                 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
364                 close(fd);
365                 return;
366         }
367         bzero(&recls, sizeof(recls));
368         recls.fd = pipefds[0];
369         if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
370                 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
371                 close(pipefds[0]);
372                 close(pipefds[1]);
373                 close(fd);
374                 return;
375         }
376         close(pipefds[0]);
377         close(fd);
378
379         info = malloc(sizeof(*info));
380         bzero(info, sizeof(*info));
381         info->fd = pipefds[1];
382         info->detachme = 1;
383         info->dbgmsg_callback = hammer2_shell_parse;
384         info->label = strdup("hammer2");
385         pthread_create(&thread, NULL, dmsg_master_service, info);
386 }
387
388 /*
389  * Reconnect a physical disk service to the mesh.
390  */
391 static
392 void
393 disk_reconnect(const char *disk)
394 {
395         struct disk_ioc_recluster recls;
396         struct diskcon *dc;
397         dmsg_master_service_info_t *info;
398         pthread_t thread;
399         int fd;
400         int pipefds[2];
401         char *path;
402
403         /*
404          * Urm, this will auto-create mdX+1, just ignore for now.
405          * This mechanic needs to be fixed.  It might actually be nice
406          * to be able to export md disks.
407          */
408         if (strncmp(disk, "md", 2) == 0)
409                 return;
410         if (strncmp(disk, "xa", 2) == 0)
411                 return;
412
413         /*
414          * Check if already connected
415          */
416         pthread_mutex_lock(&diskmtx);
417         TAILQ_FOREACH(dc, &diskconq, entry) {
418                 if (strcmp(dc->disk, disk) == 0)
419                         break;
420         }
421         pthread_mutex_unlock(&diskmtx);
422         if (dc)
423                 return;
424
425         /*
426          * Not already connected, create a connection to the kernel
427          * disk driver.
428          */
429         asprintf(&path, "/dev/%s", disk);
430         fd = open(path, O_RDONLY);
431         if (fd < 0) {
432                 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
433                 free(path);
434                 return;
435         }
436         free(path);
437         if (pipe(pipefds) < 0) {
438                 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
439                 close(fd);
440                 return;
441         }
442         bzero(&recls, sizeof(recls));
443         recls.fd = pipefds[0];
444         if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
445                 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
446                 close(pipefds[0]);
447                 close(pipefds[1]);
448                 close(fd);
449                 return;
450         }
451         close(pipefds[0]);
452         close(fd);
453
454         dc = malloc(sizeof(*dc));
455         dc->disk = strdup(disk);
456         pthread_mutex_lock(&diskmtx);
457         TAILQ_INSERT_TAIL(&diskconq, dc, entry);
458         pthread_mutex_unlock(&diskmtx);
459
460         info = malloc(sizeof(*info));
461         bzero(info, sizeof(*info));
462         info->fd = pipefds[1];
463         info->detachme = 1;
464         info->dbgmsg_callback = hammer2_shell_parse;
465         info->exit_callback = disk_disconnect;
466         info->handle = dc;
467         info->label = strdup(dc->disk);
468         pthread_create(&thread, NULL, dmsg_master_service, info);
469 }
470
471 static
472 void
473 disk_disconnect(void *handle)
474 {
475         struct diskcon *dc = handle;
476
477         fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
478
479         pthread_mutex_lock(&diskmtx);
480         TAILQ_REMOVE(&diskconq, dc, entry);
481         pthread_mutex_unlock(&diskmtx);
482         free(dc->disk);
483         free(dc);
484 }
485
486 /*
487  * [re]connect a remote disk service to the local system via /dev/xdisk.
488  */
489 static
490 void
491 xdisk_reconnect(struct service_node_opaque *xdisk)
492 {
493         struct xdisk_attach_ioctl *xaioc;
494         dmsg_master_service_info_t *info;
495         pthread_t thread;
496         int pipefds[2];
497
498         if (pipe(pipefds) < 0) {
499                 fprintf(stderr, "reconnect %s: pipe() failed\n",
500                         xdisk->cl_label);
501                 return;
502         }
503
504         info = malloc(sizeof(*info));
505         bzero(info, sizeof(*info));
506         info->fd = pipefds[1];
507         info->detachme = 1;
508         info->dbgmsg_callback = hammer2_shell_parse;
509         info->exit_callback = xdisk_disconnect;
510         info->handle = xdisk;
511         xdisk->servicing = 1;
512         xdisk->servicefd = info->fd;
513         info->label = strdup(xdisk->cl_label);
514         pthread_create(&thread, NULL, dmsg_master_service, info);
515
516         /*
517          * We have to run the attach in its own pthread because it will
518          * synchronously interact with the messaging subsystem over the
519          * pipe.  If we do it here we will deadlock.
520          */
521         xaioc = malloc(sizeof(*xaioc));
522         bzero(xaioc, sizeof(xaioc));
523         snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
524                  "%s", xdisk->cl_label);
525         snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
526                  "X-%s", xdisk->fs_label);
527         xaioc->bytes = xdisk->block.bytes;
528         xaioc->blksize = xdisk->block.blksize;
529         xaioc->fd = pipefds[0];
530
531         pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
532 }
533
534 static
535 void *
536 xdisk_attach_tmpthread(void *data)
537 {
538         struct xdisk_attach_ioctl *xaioc = data;
539         int fd;
540
541         pthread_detach(pthread_self());
542
543         fd = open("/dev/xdisk", O_RDWR, 0600);
544         if (fd < 0) {
545                 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
546         }
547         if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
548                 fprintf(stderr, "reconnect %s: xdisk attach failed\n",
549                         xaioc->cl_label);
550         }
551         close(xaioc->fd);
552         close(fd);
553         return (NULL);
554 }
555
556 static
557 void
558 xdisk_disconnect(void *handle)
559 {
560         struct service_node_opaque *info = handle;
561
562         assert(info->servicing == 1);
563
564         pthread_mutex_lock(&diskmtx);
565         info->servicing = 0;
566         if (info->attached == 0)
567                 free(info);
568         pthread_mutex_unlock(&diskmtx);
569 }