2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
42 TAILQ_ENTRY(diskcon) entry;
46 struct service_node_opaque {
49 dmsg_media_block_t block;
56 TAILQ_ENTRY(autoconn) entry;
60 int pipefd[2]; /* {read,write} */
61 enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
67 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
68 pthread_mutex_t diskmtx;
70 static void *service_thread(void *data);
71 static void *udev_thread(void *data);
72 static void *autoconn_thread(void *data);
73 static void master_reconnect(const char *mntpt);
74 static void disk_reconnect(const char *disk);
75 static void disk_disconnect(void *handle);
76 static void udev_check_disks(void);
77 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
79 static void xdisk_reconnect(struct service_node_opaque *info);
80 static void xdisk_disconnect(void *handle);
81 static void *xdisk_attach_tmpthread(void *data);
84 * Start-up the master listener daemon for the machine.
86 * The master listener serves as a rendezvous point in the cluster, accepting
87 * connections, performing registrations and authentications, maintaining
88 * the spanning tree, and keeping track of message state so disconnects can
89 * be handled properly.
91 * Once authenticated only low-level messaging protocols (which includes
92 * tracking persistent messages) are handled by this daemon. This daemon
93 * does not run the higher level quorum or locking protocols.
95 * This daemon can also be told to maintain connections to other nodes,
96 * forming a messaging backbone, which in turn allows PFS's (if desired) to
97 * simply connect to the master daemon via localhost if desired.
98 * Backbones are specified via /etc/hammer2.conf.
103 struct sockaddr_in lsin;
108 * Acquire socket and set options
110 if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
111 fprintf(stderr, "master_listen: socket(): %s\n",
116 setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
119 * Setup listen port and try to bind. If the bind fails we assume
120 * that a master listener process is already running and silently
123 bzero(&lsin, sizeof(lsin));
124 lsin.sin_family = AF_INET;
125 lsin.sin_addr.s_addr = INADDR_ANY;
126 lsin.sin_port = htons(DMSG_LISTEN_PORT);
127 if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
131 "master listen: daemon already running\n");
136 fprintf(stderr, "master listen: startup\n");
140 * Fork and disconnect the controlling terminal and parent process,
141 * executing the specified function as a pthread.
143 * Returns to the original process which can then continue running.
144 * In debug mode this call will create the pthread without forking
145 * and set NormalExit to 0, instead of fork.
147 hammer2_demon(service_thread, (void *)(intptr_t)lfd);
154 * Master listen/accept thread. Accept connections on the master socket,
155 * starting a pthread for each one.
159 service_thread(void *data)
161 struct sockaddr_in asin;
164 dmsg_master_service_info_t *info;
165 int lfd = (int)(intptr_t)data;
169 struct statfs *mntbuf = NULL;
170 struct statvfs *mntvbuf = NULL;
173 * Nobody waits for us
175 setproctitle("hammer2 master listen");
176 pthread_detach(pthread_self());
178 dmsg_node_handler = service_node_handler;
181 * Start up a thread to handle block device monitoring
184 pthread_create(&thread, NULL, udev_thread, NULL);
187 * Start thread to manage /etc/hammer2/autoconn
190 pthread_create(&thread, NULL, autoconn_thread, NULL);
193 * Scan existing hammer2 mounts and reconnect to them using
194 * HAMMER2IOC_RECLUSTER.
196 count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
197 for (i = 0; i < count; ++i) {
198 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
199 master_reconnect(mntbuf[i].f_mntonname);
203 * Accept connections and create pthreads to handle them after
208 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
215 fprintf(stderr, "service_thread: accept fd %d\n", fd);
216 info = malloc(sizeof(*info));
217 bzero(info, sizeof(*info));
220 info->dbgmsg_callback = hammer2_shell_parse;
221 info->label = strdup("client");
222 pthread_create(&thread, NULL, dmsg_master_service, info);
228 * Node discovery code on received SPANs (or loss of SPANs). This code
229 * is used to track the availability of remote block devices and install
230 * or deinstall them using the xdisk driver (/dev/xdisk).
232 * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
233 * the data handed to it. When opened, a virtual circuit is forged and
234 * maintained to the block device server via DMSG. Temporary failures
235 * stall the device until successfully reconnected or explicitly destroyed.
239 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
241 struct service_node_opaque *info = *opaquep;
244 case DMSG_NODEOP_ADD:
245 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
247 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
250 info = malloc(sizeof(*info));
251 bzero(info, sizeof(*info));
254 snprintf(info->cl_label, sizeof(info->cl_label),
255 "%s", msg->any.lnk_span.cl_label);
256 snprintf(info->fs_label, sizeof(info->fs_label),
257 "%s", msg->any.lnk_span.fs_label);
258 info->block = msg->any.lnk_span.media.block;
259 fprintf(stderr, "NODE ADD %s serno %s\n",
260 info->cl_label, info->fs_label);
262 xdisk_reconnect(info);
264 case DMSG_NODEOP_DEL:
266 fprintf(stderr, "NODE DEL %s serno %s\n",
267 info->cl_label, info->fs_label);
268 pthread_mutex_lock(&diskmtx);
271 if (info->servicing == 0)
274 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
275 pthread_mutex_unlock(&diskmtx);
284 * Monitor block devices. Currently polls every ~10 seconds or so.
288 udev_thread(void *data __unused)
293 pthread_detach(pthread_self());
295 if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
296 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
301 while (ioctl(fd, UDEVWAIT, &seq) == 0) {
308 static void *autoconn_connect_thread(void *data);
309 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
313 autoconn_thread(void *data __unused)
315 TAILQ_HEAD(, autoconn) autolist;
317 struct autoconn *next;
326 TAILQ_INIT(&autolist);
330 pthread_detach(pthread_self());
338 * Poll the file. Loop up if the synchronized state (lmod)
341 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
342 if (lmod == st.st_mtime)
344 fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
354 * Wait at least 5 seconds after the file is created or
357 * Do not update the synchronized state.
359 if (fp == NULL && found_last) {
362 } else if (fp && found_last == 0) {
369 * Don't scan the file until the time progresses past the
370 * file's mtime, so we can validate that the file was not
371 * further modified during our scan.
373 * Do not update the synchronized state.
377 if (t == st.st_mtime) {
387 * Set staging to disconnect, then scan the file.
389 TAILQ_FOREACH(ac, &autolist, entry)
391 while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
394 if ((host = strtok(buf, " \t\r\n")) == NULL ||
398 TAILQ_FOREACH(ac, &autolist, entry) {
399 if (strcmp(host, ac->host) == 0)
403 ac = malloc(sizeof(*ac));
404 bzero(ac, sizeof(*ac));
405 ac->host = strdup(host);
406 ac->state = AUTOCONN_INACTIVE;
407 TAILQ_INSERT_TAIL(&autolist, ac, entry);
413 * Ignore the scan (and retry again) if the file was
414 * modified during the scan.
416 * Do not update the synchronized state.
419 if (fstat(fileno(fp), &st) < 0) {
424 if (t != st.st_mtime)
429 * Update the synchronized state and reconfigure the
430 * connect list as needed.
433 next = TAILQ_FIRST(&autolist);
434 while ((ac = next) != NULL) {
435 next = TAILQ_NEXT(ac, entry);
440 if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
441 if (pipe(ac->pipefd) == 0) {
443 ac->state = AUTOCONN_ACTIVE;
445 pthread_create(&thread, NULL,
446 autoconn_connect_thread,
452 * Unstaging, stop active connection.
454 * We write to the pipe which causes the iocom_core
455 * to call autoconn_disconnect_signal().
457 if (ac->stage == 0 &&
458 ac->state == AUTOCONN_ACTIVE) {
459 if (ac->stopme == 0) {
462 write(ac->pipefd[1], &dummy, 1);
467 * Unstaging, delete inactive connection.
469 if (ac->stage == 0 &&
470 ac->state == AUTOCONN_INACTIVE) {
471 TAILQ_REMOVE(&autolist, ac, entry);
484 autoconn_connect_thread(void *data)
486 dmsg_master_service_info_t *info;
492 pthread_detach(pthread_self());
494 while (ac->stopme == 0) {
495 fd = dmsg_connect(ac->host);
497 fprintf(stderr, "autoconn: Connect failure: %s\n",
502 fprintf(stderr, "autoconn: Connect %s\n", ac->host);
504 info = malloc(sizeof(*info));
505 bzero(info, sizeof(*info));
507 info->altfd = ac->pipefd[0];
508 info->altmsg_callback = autoconn_disconnect_signal;
510 info->noclosealt = 1;
511 pthread_create(&ac->thread, NULL, dmsg_master_service, info);
512 pthread_join(ac->thread, &res);
514 close(ac->pipefd[0]);
515 ac->state = AUTOCONN_INACTIVE;
516 /* auto structure can be ripped out here */
522 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
524 fprintf(stderr, "autoconn: Shutting down socket\n");
525 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
529 * Retrieve the list of disk attachments and attempt to export
534 udev_check_disks(void)
544 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
545 if (error < 0 || n == 0)
547 if (n >= sizeof(tmpbuf))
551 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
564 fprintf(stderr, "DISKS: %s\n", buf);
565 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
566 disk_reconnect(disk);
574 * Normally the mount program supplies a cluster communications
575 * descriptor to the hammer2 vfs on mount, but if you kill the service
576 * daemon and restart it that link will be lost.
578 * This procedure attempts to [re]connect to existing mounts when
579 * the service daemon is started up before going into its accept
582 * NOTE: A hammer2 mount point can only accomodate one connection at a time
583 * so this will disconnect any existing connection during the
588 master_reconnect(const char *mntpt)
590 struct hammer2_ioc_recluster recls;
591 dmsg_master_service_info_t *info;
596 fd = open(mntpt, O_RDONLY);
598 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
601 if (pipe(pipefds) < 0) {
602 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
606 bzero(&recls, sizeof(recls));
607 recls.fd = pipefds[0];
608 if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
609 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
618 info = malloc(sizeof(*info));
619 bzero(info, sizeof(*info));
620 info->fd = pipefds[1];
622 info->dbgmsg_callback = hammer2_shell_parse;
623 info->label = strdup("hammer2");
624 pthread_create(&thread, NULL, dmsg_master_service, info);
628 * Reconnect a physical disk service to the mesh.
632 disk_reconnect(const char *disk)
634 struct disk_ioc_recluster recls;
636 dmsg_master_service_info_t *info;
643 * Urm, this will auto-create mdX+1, just ignore for now.
644 * This mechanic needs to be fixed. It might actually be nice
645 * to be able to export md disks.
647 if (strncmp(disk, "md", 2) == 0)
649 if (strncmp(disk, "xa", 2) == 0)
653 * Check if already connected
655 pthread_mutex_lock(&diskmtx);
656 TAILQ_FOREACH(dc, &diskconq, entry) {
657 if (strcmp(dc->disk, disk) == 0)
660 pthread_mutex_unlock(&diskmtx);
665 * Not already connected, create a connection to the kernel
668 asprintf(&path, "/dev/%s", disk);
669 fd = open(path, O_RDONLY);
671 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
676 if (pipe(pipefds) < 0) {
677 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
681 bzero(&recls, sizeof(recls));
682 recls.fd = pipefds[0];
683 if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
684 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
693 dc = malloc(sizeof(*dc));
694 dc->disk = strdup(disk);
695 pthread_mutex_lock(&diskmtx);
696 TAILQ_INSERT_TAIL(&diskconq, dc, entry);
697 pthread_mutex_unlock(&diskmtx);
699 info = malloc(sizeof(*info));
700 bzero(info, sizeof(*info));
701 info->fd = pipefds[1];
703 info->dbgmsg_callback = hammer2_shell_parse;
704 info->exit_callback = disk_disconnect;
706 info->label = strdup(dc->disk);
707 pthread_create(&thread, NULL, dmsg_master_service, info);
712 disk_disconnect(void *handle)
714 struct diskcon *dc = handle;
716 fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
718 pthread_mutex_lock(&diskmtx);
719 TAILQ_REMOVE(&diskconq, dc, entry);
720 pthread_mutex_unlock(&diskmtx);
726 * [re]connect a remote disk service to the local system via /dev/xdisk.
730 xdisk_reconnect(struct service_node_opaque *xdisk)
732 struct xdisk_attach_ioctl *xaioc;
733 dmsg_master_service_info_t *info;
737 if (pipe(pipefds) < 0) {
738 fprintf(stderr, "reconnect %s: pipe() failed\n",
743 info = malloc(sizeof(*info));
744 bzero(info, sizeof(*info));
745 info->fd = pipefds[1];
747 info->dbgmsg_callback = hammer2_shell_parse;
748 info->exit_callback = xdisk_disconnect;
749 info->handle = xdisk;
750 xdisk->servicing = 1;
751 xdisk->servicefd = info->fd;
752 info->label = strdup(xdisk->cl_label);
753 pthread_create(&thread, NULL, dmsg_master_service, info);
756 * We have to run the attach in its own pthread because it will
757 * synchronously interact with the messaging subsystem over the
758 * pipe. If we do it here we will deadlock.
760 xaioc = malloc(sizeof(*xaioc));
761 bzero(xaioc, sizeof(xaioc));
762 snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
763 "%s", xdisk->cl_label);
764 snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
765 "X-%s", xdisk->fs_label);
766 xaioc->bytes = xdisk->block.bytes;
767 xaioc->blksize = xdisk->block.blksize;
768 xaioc->fd = pipefds[0];
770 pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
775 xdisk_attach_tmpthread(void *data)
777 struct xdisk_attach_ioctl *xaioc = data;
780 pthread_detach(pthread_self());
782 fd = open("/dev/xdisk", O_RDWR, 0600);
784 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
786 if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
787 fprintf(stderr, "reconnect %s: xdisk attach failed\n",
797 xdisk_disconnect(void *handle)
799 struct service_node_opaque *info = handle;
801 assert(info->servicing == 1);
803 pthread_mutex_lock(&diskmtx);
805 if (info->attached == 0)
807 pthread_mutex_unlock(&diskmtx);