2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
38 #include <sys/xdiskioctl.h>
41 TAILQ_ENTRY(diskcon) entry;
45 struct service_node_opaque {
48 dmsg_media_block_t block;
56 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
57 pthread_mutex_t diskmtx;
59 static void *service_thread(void *data);
60 static void *udev_thread(void *data);
61 static void master_reconnect(const char *mntpt);
62 static void disk_reconnect(const char *disk);
63 static void disk_disconnect(void *handle);
64 static void udev_check_disks(void);
65 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
67 static void xdisk_reconnect(struct service_node_opaque *info);
68 static void xdisk_disconnect(void *handle);
69 static void *xdisk_attach_tmpthread(void *data);
72 * Start-up the master listener daemon for the machine.
74 * The master listener serves as a rendezvous point in the cluster, accepting
75 * connections, performing registrations and authentications, maintaining
76 * the spanning tree, and keeping track of message state so disconnects can
77 * be handled properly.
79 * Once authenticated only low-level messaging protocols (which includes
80 * tracking persistent messages) are handled by this daemon. This daemon
81 * does not run the higher level quorum or locking protocols.
83 * This daemon can also be told to maintain connections to other nodes,
84 * forming a messaging backbone, which in turn allows PFS's (if desired) to
85 * simply connect to the master daemon via localhost if desired.
86 * Backbones are specified via /etc/hammer2.conf.
91 struct sockaddr_in lsin;
96 * Acquire socket and set options
98 if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
99 fprintf(stderr, "master_listen: socket(): %s\n",
104 setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
107 * Setup listen port and try to bind. If the bind fails we assume
108 * that a master listener process is already running and silently
111 bzero(&lsin, sizeof(lsin));
112 lsin.sin_family = AF_INET;
113 lsin.sin_addr.s_addr = INADDR_ANY;
114 lsin.sin_port = htons(DMSG_LISTEN_PORT);
115 if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
119 "master listen: daemon already running\n");
124 fprintf(stderr, "master listen: startup\n");
128 * Fork and disconnect the controlling terminal and parent process,
129 * executing the specified function as a pthread.
131 * Returns to the original process which can then continue running.
132 * In debug mode this call will create the pthread without forking
133 * and set NormalExit to 0, instead of fork.
135 hammer2_demon(service_thread, (void *)(intptr_t)lfd);
142 * Master listen/accept thread. Accept connections on the master socket,
143 * starting a pthread for each one.
147 service_thread(void *data)
149 struct sockaddr_in asin;
152 dmsg_master_service_info_t *info;
153 int lfd = (int)(intptr_t)data;
157 struct statfs *mntbuf = NULL;
158 struct statvfs *mntvbuf = NULL;
161 * Nobody waits for us
163 setproctitle("hammer2 master listen");
164 pthread_detach(pthread_self());
166 dmsg_node_handler = service_node_handler;
169 * Start up a thread to handle block device monitoring
172 pthread_create(&thread, NULL, udev_thread, NULL);
175 * Scan existing hammer2 mounts and reconnect to them using
176 * HAMMER2IOC_RECLUSTER.
178 count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
179 for (i = 0; i < count; ++i) {
180 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
181 master_reconnect(mntbuf[i].f_mntonname);
185 * Accept connections and create pthreads to handle them after
190 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
197 fprintf(stderr, "service_thread: accept fd %d\n", fd);
198 info = malloc(sizeof(*info));
199 bzero(info, sizeof(*info));
202 info->dbgmsg_callback = hammer2_shell_parse;
203 pthread_create(&thread, NULL, dmsg_master_service, info);
209 * Node discovery code on received SPANs (or loss of SPANs). This code
210 * is used to track the availability of remote block devices and install
211 * or deinstall them using the xdisk driver (/dev/xdisk).
213 * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
214 * the data handed to it. When opened, a virtual circuit is forged and
215 * maintained to the block device server via DMSG. Temporary failures
216 * stall the device until successfully reconnected or explicitly destroyed.
220 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
222 struct service_node_opaque *info = *opaquep;
225 case DMSG_NODEOP_ADD:
226 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
228 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
231 info = malloc(sizeof(*info));
232 bzero(info, sizeof(*info));
235 snprintf(info->cl_label, sizeof(info->cl_label),
236 "%s", msg->any.lnk_span.cl_label);
237 snprintf(info->fs_label, sizeof(info->fs_label),
238 "%s", msg->any.lnk_span.fs_label);
239 info->block = msg->any.lnk_span.media.block;
240 fprintf(stderr, "NODE ADD %s serno %s\n",
241 info->cl_label, info->fs_label);
242 xdisk_reconnect(info);
244 case DMSG_NODEOP_DEL:
246 fprintf(stderr, "NODE DEL %s serno %s\n",
247 info->cl_label, info->fs_label);
248 pthread_mutex_lock(&diskmtx);
251 if (info->servicing == 0)
254 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
255 pthread_mutex_unlock(&diskmtx);
264 * Monitor block devices. Currently polls every ~10 seconds or so.
268 udev_thread(void *data __unused)
273 pthread_detach(pthread_self());
275 if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
276 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
281 while (ioctl(fd, UDEVWAIT, &seq) == 0) {
289 * Retrieve the list of disk attachments and attempt to export
294 udev_check_disks(void)
304 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
305 if (error < 0 || n == 0)
307 if (n >= sizeof(tmpbuf))
311 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
324 fprintf(stderr, "DISKS: %s\n", buf);
325 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
326 disk_reconnect(disk);
334 * Normally the mount program supplies a cluster communications
335 * descriptor to the hammer2 vfs on mount, but if you kill the service
336 * daemon and restart it that link will be lost.
338 * This procedure attempts to [re]connect to existing mounts when
339 * the service daemon is started up before going into its accept
342 * NOTE: A hammer2 mount point can only accomodate one connection at a time
343 * so this will disconnect any existing connection during the
348 master_reconnect(const char *mntpt)
350 struct hammer2_ioc_recluster recls;
351 dmsg_master_service_info_t *info;
356 fd = open(mntpt, O_RDONLY);
358 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
361 if (pipe(pipefds) < 0) {
362 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
366 bzero(&recls, sizeof(recls));
367 recls.fd = pipefds[0];
368 if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
369 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
378 info = malloc(sizeof(*info));
379 bzero(info, sizeof(*info));
380 info->fd = pipefds[1];
382 info->dbgmsg_callback = hammer2_shell_parse;
383 pthread_create(&thread, NULL, dmsg_master_service, info);
387 * Reconnect a physical disk service to the mesh.
391 disk_reconnect(const char *disk)
393 struct disk_ioc_recluster recls;
395 dmsg_master_service_info_t *info;
402 * Urm, this will auto-create mdX+1, just ignore for now.
403 * This mechanic needs to be fixed. It might actually be nice
404 * to be able to export md disks.
406 if (strncmp(disk, "md", 2) == 0)
408 if (strncmp(disk, "xa", 2) == 0)
412 * Check if already connected
414 pthread_mutex_lock(&diskmtx);
415 TAILQ_FOREACH(dc, &diskconq, entry) {
416 if (strcmp(dc->disk, disk) == 0)
419 pthread_mutex_unlock(&diskmtx);
424 * Not already connected, create a connection to the kernel
427 asprintf(&path, "/dev/%s", disk);
428 fd = open(path, O_RDONLY);
430 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
435 if (pipe(pipefds) < 0) {
436 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
440 bzero(&recls, sizeof(recls));
441 recls.fd = pipefds[0];
442 if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
443 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
452 dc = malloc(sizeof(*dc));
453 dc->disk = strdup(disk);
454 pthread_mutex_lock(&diskmtx);
455 TAILQ_INSERT_TAIL(&diskconq, dc, entry);
456 pthread_mutex_unlock(&diskmtx);
458 info = malloc(sizeof(*info));
459 bzero(info, sizeof(*info));
460 info->fd = pipefds[1];
462 info->dbgmsg_callback = hammer2_shell_parse;
463 info->exit_callback = disk_disconnect;
465 pthread_create(&thread, NULL, dmsg_master_service, info);
470 disk_disconnect(void *handle)
472 struct diskcon *dc = handle;
474 fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
476 pthread_mutex_lock(&diskmtx);
477 TAILQ_REMOVE(&diskconq, dc, entry);
478 pthread_mutex_unlock(&diskmtx);
484 * [re]connect a remote disk service to the local system via /dev/xdisk.
488 xdisk_reconnect(struct service_node_opaque *xdisk)
490 struct xdisk_attach_ioctl *xaioc;
491 dmsg_master_service_info_t *info;
495 if (pipe(pipefds) < 0) {
496 fprintf(stderr, "reconnect %s: pipe() failed\n",
501 info = malloc(sizeof(*info));
502 bzero(info, sizeof(*info));
503 info->fd = pipefds[1];
505 info->dbgmsg_callback = hammer2_shell_parse;
506 info->exit_callback = xdisk_disconnect;
507 info->handle = xdisk;
508 xdisk->servicing = 1;
509 xdisk->servicefd = info->fd;
510 pthread_create(&thread, NULL, dmsg_master_service, info);
513 * We have to run the attach in its own pthread because it will
514 * synchronously interact with the messaging subsystem over the
515 * pipe. If we do it here we will deadlock.
517 xaioc = malloc(sizeof(*xaioc));
518 bzero(xaioc, sizeof(xaioc));
519 snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
520 "%s", xdisk->cl_label);
521 snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
522 "X-%s", xdisk->fs_label);
523 xaioc->bytes = xdisk->block.bytes;
524 xaioc->blksize = xdisk->block.blksize;
525 xaioc->fd = pipefds[0];
527 pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
532 xdisk_attach_tmpthread(void *data)
534 struct xdisk_attach_ioctl *xaioc = data;
537 pthread_detach(pthread_self());
539 fd = open("/dev/xdisk", O_RDWR, 0600);
541 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
543 if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
544 fprintf(stderr, "reconnect %s: xdisk attach failed\n",
554 xdisk_disconnect(void *handle)
556 struct service_node_opaque *info = handle;
558 assert(info->servicing == 1);
560 pthread_mutex_lock(&diskmtx);
562 if (info->attached == 0)
564 pthread_mutex_unlock(&diskmtx);