hammer2 - cluster / hammer2 service daemon work
[dragonfly.git] / sbin / hammer2 / cmd_service.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 #include <sys/xdiskioctl.h>
39
40 struct diskcon {
41         TAILQ_ENTRY(diskcon) entry;
42         char    *disk;
43 };
44
45 struct service_node_opaque {
46         char    cl_label[64];
47         char    fs_label[64];
48         dmsg_media_block_t block;
49         int     attached;
50         int     servicing;
51         int     servicefd;
52 };
53
54 #define WS " \r\n"
55
56 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
57 pthread_mutex_t diskmtx;
58
59 static void *service_thread(void *data);
60 static void *udev_thread(void *data);
61 static void master_reconnect(const char *mntpt);
62 static void disk_reconnect(const char *disk);
63 static void disk_disconnect(void *handle);
64 static void udev_check_disks(void);
65 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
66
67 static void xdisk_reconnect(struct service_node_opaque *info);
68 static void xdisk_disconnect(void *handle);
69 static void *xdisk_attach_tmpthread(void *data);
70
71 /*
72  * Start-up the master listener daemon for the machine.
73  *
74  * The master listener serves as a rendezvous point in the cluster, accepting
75  * connections, performing registrations and authentications, maintaining
76  * the spanning tree, and keeping track of message state so disconnects can
77  * be handled properly.
78  *
79  * Once authenticated only low-level messaging protocols (which includes
80  * tracking persistent messages) are handled by this daemon.  This daemon
81  * does not run the higher level quorum or locking protocols.
82  *
83  * This daemon can also be told to maintain connections to other nodes,
84  * forming a messaging backbone, which in turn allows PFS's (if desired) to
85  * simply connect to the master daemon via localhost if desired.
86  * Backbones are specified via /etc/hammer2.conf.
87  */
88 int
89 cmd_service(void)
90 {
91         struct sockaddr_in lsin;
92         int on;
93         int lfd;
94
95         /*
96          * Acquire socket and set options
97          */
98         if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
99                 fprintf(stderr, "master_listen: socket(): %s\n",
100                         strerror(errno));
101                 return 1;
102         }
103         on = 1;
104         setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
105
106         /*
107          * Setup listen port and try to bind.  If the bind fails we assume
108          * that a master listener process is already running and silently
109          * fail.
110          */
111         bzero(&lsin, sizeof(lsin));
112         lsin.sin_family = AF_INET;
113         lsin.sin_addr.s_addr = INADDR_ANY;
114         lsin.sin_port = htons(DMSG_LISTEN_PORT);
115         if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
116                 close(lfd);
117                 if (QuietOpt == 0) {
118                         fprintf(stderr,
119                                 "master listen: daemon already running\n");
120                 }
121                 return 0;
122         }
123         if (QuietOpt == 0)
124                 fprintf(stderr, "master listen: startup\n");
125         listen(lfd, 50);
126
127         /*
128          * Fork and disconnect the controlling terminal and parent process,
129          * executing the specified function as a pthread.
130          *
131          * Returns to the original process which can then continue running.
132          * In debug mode this call will create the pthread without forking
133          * and set NormalExit to 0, instead of fork.
134          */
135         hammer2_demon(service_thread, (void *)(intptr_t)lfd);
136         if (NormalExit)
137                 close(lfd);
138         return 0;
139 }
140
141 /*
142  * Master listen/accept thread.  Accept connections on the master socket,
143  * starting a pthread for each one.
144  */
145 static
146 void *
147 service_thread(void *data)
148 {
149         struct sockaddr_in asin;
150         socklen_t alen;
151         pthread_t thread;
152         dmsg_master_service_info_t *info;
153         int lfd = (int)(intptr_t)data;
154         int fd;
155         int i;
156         int count;
157         struct statfs *mntbuf = NULL;
158         struct statvfs *mntvbuf = NULL;
159
160         /*
161          * Nobody waits for us
162          */
163         setproctitle("hammer2 master listen");
164         pthread_detach(pthread_self());
165
166         dmsg_node_handler = service_node_handler;
167
168         /*
169          * Start up a thread to handle block device monitoring
170          */
171         thread = NULL;
172         pthread_create(&thread, NULL, udev_thread, NULL);
173
174         /*
175          * Scan existing hammer2 mounts and reconnect to them using
176          * HAMMER2IOC_RECLUSTER.
177          */
178         count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
179         for (i = 0; i < count; ++i) {
180                 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
181                         master_reconnect(mntbuf[i].f_mntonname);
182         }
183
184         /*
185          * Accept connections and create pthreads to handle them after
186          * validating the IP.
187          */
188         for (;;) {
189                 alen = sizeof(asin);
190                 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
191                 if (fd < 0) {
192                         if (errno == EINTR)
193                                 continue;
194                         break;
195                 }
196                 thread = NULL;
197                 fprintf(stderr, "service_thread: accept fd %d\n", fd);
198                 info = malloc(sizeof(*info));
199                 bzero(info, sizeof(*info));
200                 info->fd = fd;
201                 info->detachme = 1;
202                 info->dbgmsg_callback = hammer2_shell_parse;
203                 pthread_create(&thread, NULL, dmsg_master_service, info);
204         }
205         return (NULL);
206 }
207
208 /*
209  * Node discovery code on received SPANs (or loss of SPANs).  This code
210  * is used to track the availability of remote block devices and install
211  * or deinstall them using the xdisk driver (/dev/xdisk).
212  *
213  * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
214  * the data handed to it.  When opened, a virtual circuit is forged and
215  * maintained to the block device server via DMSG.  Temporary failures
216  * stall the device until successfully reconnected or explicitly destroyed.
217  */
218 static
219 void
220 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
221 {
222         struct service_node_opaque *info = *opaquep;
223
224         switch(op) {
225         case DMSG_NODEOP_ADD:
226                 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
227                         break;
228                 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
229                         break;
230                 if (info == NULL) {
231                         info = malloc(sizeof(*info));
232                         bzero(info, sizeof(*info));
233                         *opaquep = info;
234                 }
235                 snprintf(info->cl_label, sizeof(info->cl_label),
236                          "%s", msg->any.lnk_span.cl_label);
237                 snprintf(info->fs_label, sizeof(info->fs_label),
238                          "%s", msg->any.lnk_span.fs_label);
239                 info->block = msg->any.lnk_span.media.block;
240                 fprintf(stderr, "NODE ADD %s serno %s\n",
241                         info->cl_label, info->fs_label);
242                 xdisk_reconnect(info);
243                 break;
244         case DMSG_NODEOP_DEL:
245                 if (info) {
246                         fprintf(stderr, "NODE DEL %s serno %s\n",
247                                 info->cl_label, info->fs_label);
248                         pthread_mutex_lock(&diskmtx);
249                         *opaquep = NULL;
250                         info->attached = 0;
251                         if (info->servicing == 0)
252                                 free(info);
253                         else
254                                 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
255                         pthread_mutex_unlock(&diskmtx);
256                 }
257                 break;
258         default:
259                 break;
260         }
261 }
262
263 /*
264  * Monitor block devices.  Currently polls every ~10 seconds or so.
265  */
266 static
267 void *
268 udev_thread(void *data __unused)
269 {
270         int     fd;
271         int     seq = 0;
272
273         pthread_detach(pthread_self());
274
275         if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
276                 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
277                         UDEV_DEVICE_PATH);
278                 pthread_exit(NULL);
279         }
280         udev_check_disks();
281         while (ioctl(fd, UDEVWAIT, &seq) == 0) {
282                 udev_check_disks();
283                 sleep(1);
284         }
285         return (NULL);
286 }
287
288 /*
289  * Retrieve the list of disk attachments and attempt to export
290  * them.
291  */
292 static
293 void
294 udev_check_disks(void)
295 {
296         char tmpbuf[1024];
297         char *buf = NULL;
298         char *disk;
299         int error;
300         size_t n;
301
302         for (;;) {
303                 n = 0;
304                 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
305                 if (error < 0 || n == 0)
306                         break;
307                 if (n >= sizeof(tmpbuf))
308                         buf = malloc(n + 1);
309                 else
310                         buf = tmpbuf;
311                 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
312                 if (error == 0) {
313                         buf[n] = 0;
314                         break;
315                 }
316                 if (buf != tmpbuf) {
317                         free(buf);
318                         buf = NULL;
319                 }
320                 if (errno != ENOMEM)
321                         break;
322         }
323         if (buf) {
324                 fprintf(stderr, "DISKS: %s\n", buf);
325                 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
326                         disk_reconnect(disk);
327                 }
328                 if (buf != tmpbuf)
329                         free(buf);
330         }
331 }
332
333 /*
334  * Normally the mount program supplies a cluster communications
335  * descriptor to the hammer2 vfs on mount, but if you kill the service
336  * daemon and restart it that link will be lost.
337  *
338  * This procedure attempts to [re]connect to existing mounts when
339  * the service daemon is started up before going into its accept
340  * loop.
341  *
342  * NOTE: A hammer2 mount point can only accomodate one connection at a time
343  *       so this will disconnect any existing connection during the
344  *       reconnect.
345  */
346 static
347 void
348 master_reconnect(const char *mntpt)
349 {
350         struct hammer2_ioc_recluster recls;
351         dmsg_master_service_info_t *info;
352         pthread_t thread;
353         int fd;
354         int pipefds[2];
355
356         fd = open(mntpt, O_RDONLY);
357         if (fd < 0) {
358                 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
359                 return;
360         }
361         if (pipe(pipefds) < 0) {
362                 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
363                 close(fd);
364                 return;
365         }
366         bzero(&recls, sizeof(recls));
367         recls.fd = pipefds[0];
368         if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
369                 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
370                 close(pipefds[0]);
371                 close(pipefds[1]);
372                 close(fd);
373                 return;
374         }
375         close(pipefds[0]);
376         close(fd);
377
378         info = malloc(sizeof(*info));
379         bzero(info, sizeof(*info));
380         info->fd = pipefds[1];
381         info->detachme = 1;
382         info->dbgmsg_callback = hammer2_shell_parse;
383         pthread_create(&thread, NULL, dmsg_master_service, info);
384 }
385
386 /*
387  * Reconnect a physical disk service to the mesh.
388  */
389 static
390 void
391 disk_reconnect(const char *disk)
392 {
393         struct disk_ioc_recluster recls;
394         struct diskcon *dc;
395         dmsg_master_service_info_t *info;
396         pthread_t thread;
397         int fd;
398         int pipefds[2];
399         char *path;
400
401         /*
402          * Urm, this will auto-create mdX+1, just ignore for now.
403          * This mechanic needs to be fixed.  It might actually be nice
404          * to be able to export md disks.
405          */
406         if (strncmp(disk, "md", 2) == 0)
407                 return;
408         if (strncmp(disk, "xa", 2) == 0)
409                 return;
410
411         /*
412          * Check if already connected
413          */
414         pthread_mutex_lock(&diskmtx);
415         TAILQ_FOREACH(dc, &diskconq, entry) {
416                 if (strcmp(dc->disk, disk) == 0)
417                         break;
418         }
419         pthread_mutex_unlock(&diskmtx);
420         if (dc)
421                 return;
422
423         /*
424          * Not already connected, create a connection to the kernel
425          * disk driver.
426          */
427         asprintf(&path, "/dev/%s", disk);
428         fd = open(path, O_RDONLY);
429         if (fd < 0) {
430                 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
431                 free(path);
432                 return;
433         }
434         free(path);
435         if (pipe(pipefds) < 0) {
436                 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
437                 close(fd);
438                 return;
439         }
440         bzero(&recls, sizeof(recls));
441         recls.fd = pipefds[0];
442         if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
443                 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
444                 close(pipefds[0]);
445                 close(pipefds[1]);
446                 close(fd);
447                 return;
448         }
449         close(pipefds[0]);
450         close(fd);
451
452         dc = malloc(sizeof(*dc));
453         dc->disk = strdup(disk);
454         pthread_mutex_lock(&diskmtx);
455         TAILQ_INSERT_TAIL(&diskconq, dc, entry);
456         pthread_mutex_unlock(&diskmtx);
457
458         info = malloc(sizeof(*info));
459         bzero(info, sizeof(*info));
460         info->fd = pipefds[1];
461         info->detachme = 1;
462         info->dbgmsg_callback = hammer2_shell_parse;
463         info->exit_callback = disk_disconnect;
464         info->handle = dc;
465         pthread_create(&thread, NULL, dmsg_master_service, info);
466 }
467
468 static
469 void
470 disk_disconnect(void *handle)
471 {
472         struct diskcon *dc = handle;
473
474         fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
475
476         pthread_mutex_lock(&diskmtx);
477         TAILQ_REMOVE(&diskconq, dc, entry);
478         pthread_mutex_unlock(&diskmtx);
479         free(dc->disk);
480         free(dc);
481 }
482
483 /*
484  * [re]connect a remote disk service to the local system via /dev/xdisk.
485  */
486 static
487 void
488 xdisk_reconnect(struct service_node_opaque *xdisk)
489 {
490         struct xdisk_attach_ioctl *xaioc;
491         dmsg_master_service_info_t *info;
492         pthread_t thread;
493         int pipefds[2];
494
495         if (pipe(pipefds) < 0) {
496                 fprintf(stderr, "reconnect %s: pipe() failed\n",
497                         xdisk->cl_label);
498                 return;
499         }
500
501         info = malloc(sizeof(*info));
502         bzero(info, sizeof(*info));
503         info->fd = pipefds[1];
504         info->detachme = 1;
505         info->dbgmsg_callback = hammer2_shell_parse;
506         info->exit_callback = xdisk_disconnect;
507         info->handle = xdisk;
508         xdisk->servicing = 1;
509         xdisk->servicefd = info->fd;
510         pthread_create(&thread, NULL, dmsg_master_service, info);
511
512         /*
513          * We have to run the attach in its own pthread because it will
514          * synchronously interact with the messaging subsystem over the
515          * pipe.  If we do it here we will deadlock.
516          */
517         xaioc = malloc(sizeof(*xaioc));
518         bzero(xaioc, sizeof(xaioc));
519         snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
520                  "%s", xdisk->cl_label);
521         snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
522                  "X-%s", xdisk->fs_label);
523         xaioc->bytes = xdisk->block.bytes;
524         xaioc->blksize = xdisk->block.blksize;
525         xaioc->fd = pipefds[0];
526
527         pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
528 }
529
530 static
531 void *
532 xdisk_attach_tmpthread(void *data)
533 {
534         struct xdisk_attach_ioctl *xaioc = data;
535         int fd;
536
537         pthread_detach(pthread_self());
538
539         fd = open("/dev/xdisk", O_RDWR, 0600);
540         if (fd < 0) {
541                 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
542         }
543         if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
544                 fprintf(stderr, "reconnect %s: xdisk attach failed\n",
545                         xaioc->cl_label);
546         }
547         close(xaioc->fd);
548         close(fd);
549         return (NULL);
550 }
551
552 static
553 void
554 xdisk_disconnect(void *handle)
555 {
556         struct service_node_opaque *info = handle;
557
558         assert(info->servicing == 1);
559
560         pthread_mutex_lock(&diskmtx);
561         info->servicing = 0;
562         if (info->attached == 0)
563                 free(info);
564         pthread_mutex_unlock(&diskmtx);
565 }