cluster - Major kernel component work (diskiocom, xdisk, kdmsg)
[dragonfly.git] / sbin / hammer2 / cmd_service.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 #include <sys/xdiskioctl.h>
39
40 struct diskcon {
41         TAILQ_ENTRY(diskcon) entry;
42         char    *disk;
43 };
44
45 struct service_node_opaque {
46         char    cl_label[64];
47         char    fs_label[64];
48         dmsg_media_block_t block;
49         int     attached;
50         int     servicing;
51         int     servicefd;
52 };
53
54 #define WS " \r\n"
55
56 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
57 pthread_mutex_t diskmtx;
58
59 static void *service_thread(void *data);
60 static void *udev_thread(void *data);
61 static void master_reconnect(const char *mntpt);
62 static void disk_reconnect(const char *disk);
63 static void disk_disconnect(void *handle);
64 static void udev_check_disks(void);
65 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
66
67 static void xdisk_reconnect(struct service_node_opaque *info);
68 static void xdisk_disconnect(void *handle);
69 static void *xdisk_attach_tmpthread(void *data);
70
71 /*
72  * Start-up the master listener daemon for the machine.
73  *
74  * The master listener serves as a rendezvous point in the cluster, accepting
75  * connections, performing registrations and authentications, maintaining
76  * the spanning tree, and keeping track of message state so disconnects can
77  * be handled properly.
78  *
79  * Once authenticated only low-level messaging protocols (which includes
80  * tracking persistent messages) are handled by this daemon.  This daemon
81  * does not run the higher level quorum or locking protocols.
82  *
83  * This daemon can also be told to maintain connections to other nodes,
84  * forming a messaging backbone, which in turn allows PFS's (if desired) to
85  * simply connect to the master daemon via localhost if desired.
86  * Backbones are specified via /etc/hammer2.conf.
87  */
88 int
89 cmd_service(void)
90 {
91         struct sockaddr_in lsin;
92         int on;
93         int lfd;
94
95         /*
96          * Acquire socket and set options
97          */
98         if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
99                 fprintf(stderr, "master_listen: socket(): %s\n",
100                         strerror(errno));
101                 return 1;
102         }
103         on = 1;
104         setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
105
106         /*
107          * Setup listen port and try to bind.  If the bind fails we assume
108          * that a master listener process is already running and silently
109          * fail.
110          */
111         bzero(&lsin, sizeof(lsin));
112         lsin.sin_family = AF_INET;
113         lsin.sin_addr.s_addr = INADDR_ANY;
114         lsin.sin_port = htons(DMSG_LISTEN_PORT);
115         if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
116                 close(lfd);
117                 if (QuietOpt == 0) {
118                         fprintf(stderr,
119                                 "master listen: daemon already running\n");
120                 }
121                 return 0;
122         }
123         if (QuietOpt == 0)
124                 fprintf(stderr, "master listen: startup\n");
125         listen(lfd, 50);
126
127         /*
128          * Fork and disconnect the controlling terminal and parent process,
129          * executing the specified function as a pthread.
130          *
131          * Returns to the original process which can then continue running.
132          * In debug mode this call will create the pthread without forking
133          * and set NormalExit to 0, instead of fork.
134          */
135         hammer2_demon(service_thread, (void *)(intptr_t)lfd);
136         if (NormalExit)
137                 close(lfd);
138         return 0;
139 }
140
141 /*
142  * Master listen/accept thread.  Accept connections on the master socket,
143  * starting a pthread for each one.
144  */
145 static
146 void *
147 service_thread(void *data)
148 {
149         struct sockaddr_in asin;
150         socklen_t alen;
151         pthread_t thread;
152         dmsg_master_service_info_t *info;
153         int lfd = (int)(intptr_t)data;
154         int fd;
155         int i;
156         int count;
157         struct statfs *mntbuf = NULL;
158         struct statvfs *mntvbuf = NULL;
159
160         /*
161          * Nobody waits for us
162          */
163         setproctitle("hammer2 master listen");
164         pthread_detach(pthread_self());
165
166         dmsg_node_handler = service_node_handler;
167
168         /*
169          * Start up a thread to handle block device monitoring
170          */
171         thread = NULL;
172         pthread_create(&thread, NULL, udev_thread, NULL);
173
174         /*
175          * Scan existing hammer2 mounts and reconnect to them using
176          * HAMMER2IOC_RECLUSTER.
177          */
178         count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
179         for (i = 0; i < count; ++i) {
180                 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
181                         master_reconnect(mntbuf[i].f_mntonname);
182         }
183
184         /*
185          * Accept connections and create pthreads to handle them after
186          * validating the IP.
187          */
188         for (;;) {
189                 alen = sizeof(asin);
190                 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
191                 if (fd < 0) {
192                         if (errno == EINTR)
193                                 continue;
194                         break;
195                 }
196                 thread = NULL;
197                 fprintf(stderr, "service_thread: accept fd %d\n", fd);
198                 info = malloc(sizeof(*info));
199                 bzero(info, sizeof(*info));
200                 info->fd = fd;
201                 info->detachme = 1;
202                 info->dbgmsg_callback = hammer2_shell_parse;
203                 info->label = strdup("client");
204                 pthread_create(&thread, NULL, dmsg_master_service, info);
205         }
206         return (NULL);
207 }
208
209 /*
210  * Node discovery code on received SPANs (or loss of SPANs).  This code
211  * is used to track the availability of remote block devices and install
212  * or deinstall them using the xdisk driver (/dev/xdisk).
213  *
214  * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
215  * the data handed to it.  When opened, a virtual circuit is forged and
216  * maintained to the block device server via DMSG.  Temporary failures
217  * stall the device until successfully reconnected or explicitly destroyed.
218  */
219 static
220 void
221 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
222 {
223         struct service_node_opaque *info = *opaquep;
224
225         switch(op) {
226         case DMSG_NODEOP_ADD:
227                 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
228                         break;
229                 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
230                         break;
231                 if (info == NULL) {
232                         info = malloc(sizeof(*info));
233                         bzero(info, sizeof(*info));
234                         *opaquep = info;
235                 }
236                 snprintf(info->cl_label, sizeof(info->cl_label),
237                          "%s", msg->any.lnk_span.cl_label);
238                 snprintf(info->fs_label, sizeof(info->fs_label),
239                          "%s", msg->any.lnk_span.fs_label);
240                 info->block = msg->any.lnk_span.media.block;
241                 fprintf(stderr, "NODE ADD %s serno %s\n",
242                         info->cl_label, info->fs_label);
243                 info->attached = 1;
244                 xdisk_reconnect(info);
245                 break;
246         case DMSG_NODEOP_DEL:
247                 if (info) {
248                         fprintf(stderr, "NODE DEL %s serno %s\n",
249                                 info->cl_label, info->fs_label);
250                         pthread_mutex_lock(&diskmtx);
251                         *opaquep = NULL;
252                         info->attached = 0;
253                         if (info->servicing == 0)
254                                 free(info);
255                         else
256                                 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
257                         pthread_mutex_unlock(&diskmtx);
258                 }
259                 break;
260         default:
261                 break;
262         }
263 }
264
265 /*
266  * Monitor block devices.  Currently polls every ~10 seconds or so.
267  */
268 static
269 void *
270 udev_thread(void *data __unused)
271 {
272         int     fd;
273         int     seq = 0;
274
275         pthread_detach(pthread_self());
276
277         if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
278                 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
279                         UDEV_DEVICE_PATH);
280                 pthread_exit(NULL);
281         }
282         udev_check_disks();
283         while (ioctl(fd, UDEVWAIT, &seq) == 0) {
284                 udev_check_disks();
285                 sleep(1);
286         }
287         return (NULL);
288 }
289
290 /*
291  * Retrieve the list of disk attachments and attempt to export
292  * them.
293  */
294 static
295 void
296 udev_check_disks(void)
297 {
298         char tmpbuf[1024];
299         char *buf = NULL;
300         char *disk;
301         int error;
302         size_t n;
303
304         for (;;) {
305                 n = 0;
306                 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
307                 if (error < 0 || n == 0)
308                         break;
309                 if (n >= sizeof(tmpbuf))
310                         buf = malloc(n + 1);
311                 else
312                         buf = tmpbuf;
313                 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
314                 if (error == 0) {
315                         buf[n] = 0;
316                         break;
317                 }
318                 if (buf != tmpbuf) {
319                         free(buf);
320                         buf = NULL;
321                 }
322                 if (errno != ENOMEM)
323                         break;
324         }
325         if (buf) {
326                 fprintf(stderr, "DISKS: %s\n", buf);
327                 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
328                         disk_reconnect(disk);
329                 }
330                 if (buf != tmpbuf)
331                         free(buf);
332         }
333 }
334
335 /*
336  * Normally the mount program supplies a cluster communications
337  * descriptor to the hammer2 vfs on mount, but if you kill the service
338  * daemon and restart it that link will be lost.
339  *
340  * This procedure attempts to [re]connect to existing mounts when
341  * the service daemon is started up before going into its accept
342  * loop.
343  *
344  * NOTE: A hammer2 mount point can only accomodate one connection at a time
345  *       so this will disconnect any existing connection during the
346  *       reconnect.
347  */
348 static
349 void
350 master_reconnect(const char *mntpt)
351 {
352         struct hammer2_ioc_recluster recls;
353         dmsg_master_service_info_t *info;
354         pthread_t thread;
355         int fd;
356         int pipefds[2];
357
358         fd = open(mntpt, O_RDONLY);
359         if (fd < 0) {
360                 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
361                 return;
362         }
363         if (pipe(pipefds) < 0) {
364                 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
365                 close(fd);
366                 return;
367         }
368         bzero(&recls, sizeof(recls));
369         recls.fd = pipefds[0];
370         if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
371                 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
372                 close(pipefds[0]);
373                 close(pipefds[1]);
374                 close(fd);
375                 return;
376         }
377         close(pipefds[0]);
378         close(fd);
379
380         info = malloc(sizeof(*info));
381         bzero(info, sizeof(*info));
382         info->fd = pipefds[1];
383         info->detachme = 1;
384         info->dbgmsg_callback = hammer2_shell_parse;
385         info->label = strdup("hammer2");
386         pthread_create(&thread, NULL, dmsg_master_service, info);
387 }
388
389 /*
390  * Reconnect a physical disk service to the mesh.
391  */
392 static
393 void
394 disk_reconnect(const char *disk)
395 {
396         struct disk_ioc_recluster recls;
397         struct diskcon *dc;
398         dmsg_master_service_info_t *info;
399         pthread_t thread;
400         int fd;
401         int pipefds[2];
402         char *path;
403
404         /*
405          * Urm, this will auto-create mdX+1, just ignore for now.
406          * This mechanic needs to be fixed.  It might actually be nice
407          * to be able to export md disks.
408          */
409         if (strncmp(disk, "md", 2) == 0)
410                 return;
411         if (strncmp(disk, "xa", 2) == 0)
412                 return;
413
414         /*
415          * Check if already connected
416          */
417         pthread_mutex_lock(&diskmtx);
418         TAILQ_FOREACH(dc, &diskconq, entry) {
419                 if (strcmp(dc->disk, disk) == 0)
420                         break;
421         }
422         pthread_mutex_unlock(&diskmtx);
423         if (dc)
424                 return;
425
426         /*
427          * Not already connected, create a connection to the kernel
428          * disk driver.
429          */
430         asprintf(&path, "/dev/%s", disk);
431         fd = open(path, O_RDONLY);
432         if (fd < 0) {
433                 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
434                 free(path);
435                 return;
436         }
437         free(path);
438         if (pipe(pipefds) < 0) {
439                 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
440                 close(fd);
441                 return;
442         }
443         bzero(&recls, sizeof(recls));
444         recls.fd = pipefds[0];
445         if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
446                 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
447                 close(pipefds[0]);
448                 close(pipefds[1]);
449                 close(fd);
450                 return;
451         }
452         close(pipefds[0]);
453         close(fd);
454
455         dc = malloc(sizeof(*dc));
456         dc->disk = strdup(disk);
457         pthread_mutex_lock(&diskmtx);
458         TAILQ_INSERT_TAIL(&diskconq, dc, entry);
459         pthread_mutex_unlock(&diskmtx);
460
461         info = malloc(sizeof(*info));
462         bzero(info, sizeof(*info));
463         info->fd = pipefds[1];
464         info->detachme = 1;
465         info->dbgmsg_callback = hammer2_shell_parse;
466         info->exit_callback = disk_disconnect;
467         info->handle = dc;
468         info->label = strdup(dc->disk);
469         pthread_create(&thread, NULL, dmsg_master_service, info);
470 }
471
472 static
473 void
474 disk_disconnect(void *handle)
475 {
476         struct diskcon *dc = handle;
477
478         fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
479
480         pthread_mutex_lock(&diskmtx);
481         TAILQ_REMOVE(&diskconq, dc, entry);
482         pthread_mutex_unlock(&diskmtx);
483         free(dc->disk);
484         free(dc);
485 }
486
487 /*
488  * [re]connect a remote disk service to the local system via /dev/xdisk.
489  */
490 static
491 void
492 xdisk_reconnect(struct service_node_opaque *xdisk)
493 {
494         struct xdisk_attach_ioctl *xaioc;
495         dmsg_master_service_info_t *info;
496         pthread_t thread;
497         int pipefds[2];
498
499         if (pipe(pipefds) < 0) {
500                 fprintf(stderr, "reconnect %s: pipe() failed\n",
501                         xdisk->cl_label);
502                 return;
503         }
504
505         info = malloc(sizeof(*info));
506         bzero(info, sizeof(*info));
507         info->fd = pipefds[1];
508         info->detachme = 1;
509         info->dbgmsg_callback = hammer2_shell_parse;
510         info->exit_callback = xdisk_disconnect;
511         info->handle = xdisk;
512         xdisk->servicing = 1;
513         xdisk->servicefd = info->fd;
514         info->label = strdup(xdisk->cl_label);
515         pthread_create(&thread, NULL, dmsg_master_service, info);
516
517         /*
518          * We have to run the attach in its own pthread because it will
519          * synchronously interact with the messaging subsystem over the
520          * pipe.  If we do it here we will deadlock.
521          */
522         xaioc = malloc(sizeof(*xaioc));
523         bzero(xaioc, sizeof(xaioc));
524         snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
525                  "%s", xdisk->cl_label);
526         snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
527                  "X-%s", xdisk->fs_label);
528         xaioc->bytes = xdisk->block.bytes;
529         xaioc->blksize = xdisk->block.blksize;
530         xaioc->fd = pipefds[0];
531
532         pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
533 }
534
535 static
536 void *
537 xdisk_attach_tmpthread(void *data)
538 {
539         struct xdisk_attach_ioctl *xaioc = data;
540         int fd;
541
542         pthread_detach(pthread_self());
543
544         fd = open("/dev/xdisk", O_RDWR, 0600);
545         if (fd < 0) {
546                 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
547         }
548         if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
549                 fprintf(stderr, "reconnect %s: xdisk attach failed\n",
550                         xaioc->cl_label);
551         }
552         close(xaioc->fd);
553         close(fd);
554         return (NULL);
555 }
556
557 static
558 void
559 xdisk_disconnect(void *handle)
560 {
561         struct service_node_opaque *info = handle;
562
563         assert(info->servicing == 1);
564
565         pthread_mutex_lock(&diskmtx);
566         info->servicing = 0;
567         if (info->attached == 0)
568                 free(info);
569         pthread_mutex_unlock(&diskmtx);
570 }