2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
38 #include <sys/xdiskioctl.h>
41 TAILQ_ENTRY(diskcon) entry;
45 struct service_node_opaque {
48 dmsg_media_block_t block;
56 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
57 pthread_mutex_t diskmtx;
59 static void *service_thread(void *data);
60 static void *udev_thread(void *data);
61 static void master_reconnect(const char *mntpt);
62 static void disk_reconnect(const char *disk);
63 static void disk_disconnect(void *handle);
64 static void udev_check_disks(void);
65 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
67 static void xdisk_reconnect(struct service_node_opaque *info);
68 static void xdisk_disconnect(void *handle);
69 static void *xdisk_attach_tmpthread(void *data);
72 * Start-up the master listener daemon for the machine.
74 * The master listener serves as a rendezvous point in the cluster, accepting
75 * connections, performing registrations and authentications, maintaining
76 * the spanning tree, and keeping track of message state so disconnects can
77 * be handled properly.
79 * Once authenticated only low-level messaging protocols (which includes
80 * tracking persistent messages) are handled by this daemon. This daemon
81 * does not run the higher level quorum or locking protocols.
83 * This daemon can also be told to maintain connections to other nodes,
84 * forming a messaging backbone, which in turn allows PFS's (if desired) to
85 * simply connect to the master daemon via localhost if desired.
86 * Backbones are specified via /etc/hammer2.conf.
91 struct sockaddr_in lsin;
96 * Acquire socket and set options
98 if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
99 fprintf(stderr, "master_listen: socket(): %s\n",
104 setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
107 * Setup listen port and try to bind. If the bind fails we assume
108 * that a master listener process is already running and silently
111 bzero(&lsin, sizeof(lsin));
112 lsin.sin_family = AF_INET;
113 lsin.sin_addr.s_addr = INADDR_ANY;
114 lsin.sin_port = htons(DMSG_LISTEN_PORT);
115 if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
119 "master listen: daemon already running\n");
124 fprintf(stderr, "master listen: startup\n");
128 * Fork and disconnect the controlling terminal and parent process,
129 * executing the specified function as a pthread.
131 * Returns to the original process which can then continue running.
132 * In debug mode this call will create the pthread without forking
133 * and set NormalExit to 0, instead of fork.
135 hammer2_demon(service_thread, (void *)(intptr_t)lfd);
142 * Master listen/accept thread. Accept connections on the master socket,
143 * starting a pthread for each one.
147 service_thread(void *data)
149 struct sockaddr_in asin;
152 dmsg_master_service_info_t *info;
153 int lfd = (int)(intptr_t)data;
157 struct statfs *mntbuf = NULL;
158 struct statvfs *mntvbuf = NULL;
161 * Nobody waits for us
163 setproctitle("hammer2 master listen");
164 pthread_detach(pthread_self());
166 dmsg_node_handler = service_node_handler;
169 * Start up a thread to handle block device monitoring
172 pthread_create(&thread, NULL, udev_thread, NULL);
175 * Scan existing hammer2 mounts and reconnect to them using
176 * HAMMER2IOC_RECLUSTER.
178 count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
179 for (i = 0; i < count; ++i) {
180 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
181 master_reconnect(mntbuf[i].f_mntonname);
185 * Accept connections and create pthreads to handle them after
190 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
197 fprintf(stderr, "service_thread: accept fd %d\n", fd);
198 info = malloc(sizeof(*info));
199 bzero(info, sizeof(*info));
202 info->dbgmsg_callback = hammer2_shell_parse;
203 info->label = strdup("client");
204 pthread_create(&thread, NULL, dmsg_master_service, info);
210 * Node discovery code on received SPANs (or loss of SPANs). This code
211 * is used to track the availability of remote block devices and install
212 * or deinstall them using the xdisk driver (/dev/xdisk).
214 * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
215 * the data handed to it. When opened, a virtual circuit is forged and
216 * maintained to the block device server via DMSG. Temporary failures
217 * stall the device until successfully reconnected or explicitly destroyed.
221 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
223 struct service_node_opaque *info = *opaquep;
226 case DMSG_NODEOP_ADD:
227 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
229 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
232 info = malloc(sizeof(*info));
233 bzero(info, sizeof(*info));
236 snprintf(info->cl_label, sizeof(info->cl_label),
237 "%s", msg->any.lnk_span.cl_label);
238 snprintf(info->fs_label, sizeof(info->fs_label),
239 "%s", msg->any.lnk_span.fs_label);
240 info->block = msg->any.lnk_span.media.block;
241 fprintf(stderr, "NODE ADD %s serno %s\n",
242 info->cl_label, info->fs_label);
244 xdisk_reconnect(info);
246 case DMSG_NODEOP_DEL:
248 fprintf(stderr, "NODE DEL %s serno %s\n",
249 info->cl_label, info->fs_label);
250 pthread_mutex_lock(&diskmtx);
253 if (info->servicing == 0)
256 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
257 pthread_mutex_unlock(&diskmtx);
266 * Monitor block devices. Currently polls every ~10 seconds or so.
270 udev_thread(void *data __unused)
275 pthread_detach(pthread_self());
277 if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
278 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
283 while (ioctl(fd, UDEVWAIT, &seq) == 0) {
291 * Retrieve the list of disk attachments and attempt to export
296 udev_check_disks(void)
306 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
307 if (error < 0 || n == 0)
309 if (n >= sizeof(tmpbuf))
313 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
326 fprintf(stderr, "DISKS: %s\n", buf);
327 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
328 disk_reconnect(disk);
336 * Normally the mount program supplies a cluster communications
337 * descriptor to the hammer2 vfs on mount, but if you kill the service
338 * daemon and restart it that link will be lost.
340 * This procedure attempts to [re]connect to existing mounts when
341 * the service daemon is started up before going into its accept
344 * NOTE: A hammer2 mount point can only accomodate one connection at a time
345 * so this will disconnect any existing connection during the
350 master_reconnect(const char *mntpt)
352 struct hammer2_ioc_recluster recls;
353 dmsg_master_service_info_t *info;
358 fd = open(mntpt, O_RDONLY);
360 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
363 if (pipe(pipefds) < 0) {
364 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
368 bzero(&recls, sizeof(recls));
369 recls.fd = pipefds[0];
370 if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
371 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
380 info = malloc(sizeof(*info));
381 bzero(info, sizeof(*info));
382 info->fd = pipefds[1];
384 info->dbgmsg_callback = hammer2_shell_parse;
385 info->label = strdup("hammer2");
386 pthread_create(&thread, NULL, dmsg_master_service, info);
390 * Reconnect a physical disk service to the mesh.
394 disk_reconnect(const char *disk)
396 struct disk_ioc_recluster recls;
398 dmsg_master_service_info_t *info;
405 * Urm, this will auto-create mdX+1, just ignore for now.
406 * This mechanic needs to be fixed. It might actually be nice
407 * to be able to export md disks.
409 if (strncmp(disk, "md", 2) == 0)
411 if (strncmp(disk, "xa", 2) == 0)
415 * Check if already connected
417 pthread_mutex_lock(&diskmtx);
418 TAILQ_FOREACH(dc, &diskconq, entry) {
419 if (strcmp(dc->disk, disk) == 0)
422 pthread_mutex_unlock(&diskmtx);
427 * Not already connected, create a connection to the kernel
430 asprintf(&path, "/dev/%s", disk);
431 fd = open(path, O_RDONLY);
433 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
438 if (pipe(pipefds) < 0) {
439 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
443 bzero(&recls, sizeof(recls));
444 recls.fd = pipefds[0];
445 if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
446 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
455 dc = malloc(sizeof(*dc));
456 dc->disk = strdup(disk);
457 pthread_mutex_lock(&diskmtx);
458 TAILQ_INSERT_TAIL(&diskconq, dc, entry);
459 pthread_mutex_unlock(&diskmtx);
461 info = malloc(sizeof(*info));
462 bzero(info, sizeof(*info));
463 info->fd = pipefds[1];
465 info->dbgmsg_callback = hammer2_shell_parse;
466 info->exit_callback = disk_disconnect;
468 info->label = strdup(dc->disk);
469 pthread_create(&thread, NULL, dmsg_master_service, info);
474 disk_disconnect(void *handle)
476 struct diskcon *dc = handle;
478 fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
480 pthread_mutex_lock(&diskmtx);
481 TAILQ_REMOVE(&diskconq, dc, entry);
482 pthread_mutex_unlock(&diskmtx);
488 * [re]connect a remote disk service to the local system via /dev/xdisk.
492 xdisk_reconnect(struct service_node_opaque *xdisk)
494 struct xdisk_attach_ioctl *xaioc;
495 dmsg_master_service_info_t *info;
499 if (pipe(pipefds) < 0) {
500 fprintf(stderr, "reconnect %s: pipe() failed\n",
505 info = malloc(sizeof(*info));
506 bzero(info, sizeof(*info));
507 info->fd = pipefds[1];
509 info->dbgmsg_callback = hammer2_shell_parse;
510 info->exit_callback = xdisk_disconnect;
511 info->handle = xdisk;
512 xdisk->servicing = 1;
513 xdisk->servicefd = info->fd;
514 info->label = strdup(xdisk->cl_label);
515 pthread_create(&thread, NULL, dmsg_master_service, info);
518 * We have to run the attach in its own pthread because it will
519 * synchronously interact with the messaging subsystem over the
520 * pipe. If we do it here we will deadlock.
522 xaioc = malloc(sizeof(*xaioc));
523 bzero(xaioc, sizeof(xaioc));
524 snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
525 "%s", xdisk->cl_label);
526 snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
527 "X-%s", xdisk->fs_label);
528 xaioc->bytes = xdisk->block.bytes;
529 xaioc->blksize = xdisk->block.blksize;
530 xaioc->fd = pipefds[0];
532 pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
537 xdisk_attach_tmpthread(void *data)
539 struct xdisk_attach_ioctl *xaioc = data;
542 pthread_detach(pthread_self());
544 fd = open("/dev/xdisk", O_RDWR, 0600);
546 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
548 if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
549 fprintf(stderr, "reconnect %s: xdisk attach failed\n",
559 xdisk_disconnect(void *handle)
561 struct service_node_opaque *info = handle;
563 assert(info->servicing == 1);
565 pthread_mutex_lock(&diskmtx);
567 if (info->attached == 0)
569 pthread_mutex_unlock(&diskmtx);