2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
42 TAILQ_ENTRY(diskcon) entry;
46 struct service_node_opaque {
49 dmsg_media_block_t block;
56 TAILQ_ENTRY(autoconn) entry;
60 int pipefd[2]; /* {read,write} */
61 enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
67 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
68 pthread_mutex_t diskmtx;
70 static void *service_thread(void *data);
71 static void *udev_thread(void *data);
72 static void *autoconn_thread(void *data);
73 static void master_reconnect(const char *mntpt);
74 static void disk_reconnect(const char *disk);
75 static void disk_disconnect(void *handle);
76 static void udev_check_disks(void);
77 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
79 static void xdisk_reconnect(struct service_node_opaque *info);
80 static void xdisk_disconnect(void *handle);
81 static void *xdisk_attach_tmpthread(void *data);
84 * Start-up the master listener daemon for the machine. This daemon runs
85 * a UDP discovery protocol, a TCP rendezvous, and scans certain files
86 * and directories for work.
90 * The only purpose for the UDP discovery protocol is to determine what
91 * other IPs on the LAN are running the hammer2 service daemon. DNS is not
92 * required to operate, but hostnames (if assigned) must be unique. If
93 * no hostname is assigned the host's IP is used as the name. This name
94 * is broadcast along with the mtime of the originator's private key.
96 * Receiving hammer2 service daemons which are able to match the label against
97 * /etc/hammer2/remote/<label>.pub will initiate a persistent connection
98 * to the target. Removal of the file will cause a disconnection. A failed
99 * public key negotiation stops further connection attempts until either the
100 * file is updated or the remote mtime is updated.
102 * Generally speaking this results in a web of connections, typically a
103 * combination of point-to-point for the more important links and relayed
104 * (spanning tree) for less important or filtered links.
108 * The TCP listener serves as a rendezvous point in the cluster, accepting
109 * connections, performing registrations and authentications, maintaining
110 * the spanning tree, and keeping track of message state so disconnects can
111 * be handled properly.
113 * Once authenticated only low-level messaging protocols (which includes
114 * tracking persistent messages) are handled by this daemon. This daemon
115 * does not run the higher level quorum or locking protocols.
119 * The file /etc/hammer2/autoconn, if it exists, contains a list of targets
120 * to connect to (which do not have to be on the local lan). This list will
121 * be retried until a connection can be established. The file is not usually
122 * needed for linkages local to the LAN.
127 struct sockaddr_in lsin;
132 * Acquire socket and set options
134 if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
135 fprintf(stderr, "master_listen: socket(): %s\n",
140 setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
143 * Setup listen port and try to bind. If the bind fails we assume
144 * that a master listener process is already running and silently
147 bzero(&lsin, sizeof(lsin));
148 lsin.sin_family = AF_INET;
149 lsin.sin_addr.s_addr = INADDR_ANY;
150 lsin.sin_port = htons(DMSG_LISTEN_PORT);
151 if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
155 "master listen: daemon already running\n");
160 fprintf(stderr, "master listen: startup\n");
164 * Fork and disconnect the controlling terminal and parent process,
165 * executing the specified function as a pthread.
167 * Returns to the original process which can then continue running.
168 * In debug mode this call will create the pthread without forking
169 * and set NormalExit to 0, instead of fork.
171 hammer2_demon(service_thread, (void *)(intptr_t)lfd);
178 * Master listen/accept thread. Accept connections on the master socket,
179 * starting a pthread for each one.
183 service_thread(void *data)
185 struct sockaddr_in asin;
188 dmsg_master_service_info_t *info;
189 int lfd = (int)(intptr_t)data;
193 struct statfs *mntbuf = NULL;
194 struct statvfs *mntvbuf = NULL;
197 * Nobody waits for us
199 setproctitle("hammer2 master listen");
200 pthread_detach(pthread_self());
202 dmsg_node_handler = service_node_handler;
205 * Start up a thread to handle block device monitoring
208 pthread_create(&thread, NULL, udev_thread, NULL);
211 * Start thread to manage /etc/hammer2/autoconn
214 pthread_create(&thread, NULL, autoconn_thread, NULL);
217 * Scan existing hammer2 mounts and reconnect to them using
218 * HAMMER2IOC_RECLUSTER.
220 count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
221 for (i = 0; i < count; ++i) {
222 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
223 master_reconnect(mntbuf[i].f_mntonname);
227 * Accept connections and create pthreads to handle them after
232 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
239 fprintf(stderr, "service_thread: accept fd %d\n", fd);
240 info = malloc(sizeof(*info));
241 bzero(info, sizeof(*info));
244 info->dbgmsg_callback = hammer2_shell_parse;
245 info->label = strdup("client");
246 pthread_create(&thread, NULL, dmsg_master_service, info);
252 * Node discovery code on received SPANs (or loss of SPANs). This code
253 * is used to track the availability of remote block devices and install
254 * or deinstall them using the xdisk driver (/dev/xdisk).
256 * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
257 * the data handed to it. When opened, a virtual circuit is forged and
258 * maintained to the block device server via DMSG. Temporary failures
259 * stall the device until successfully reconnected or explicitly destroyed.
263 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
265 struct service_node_opaque *info = *opaquep;
268 case DMSG_NODEOP_ADD:
269 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
271 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
274 info = malloc(sizeof(*info));
275 bzero(info, sizeof(*info));
278 snprintf(info->cl_label, sizeof(info->cl_label),
279 "%s", msg->any.lnk_span.cl_label);
280 snprintf(info->fs_label, sizeof(info->fs_label),
281 "%s", msg->any.lnk_span.fs_label);
282 info->block = msg->any.lnk_span.media.block;
283 fprintf(stderr, "NODE ADD %s serno %s\n",
284 info->cl_label, info->fs_label);
286 xdisk_reconnect(info);
288 case DMSG_NODEOP_DEL:
290 fprintf(stderr, "NODE DEL %s serno %s\n",
291 info->cl_label, info->fs_label);
292 pthread_mutex_lock(&diskmtx);
295 if (info->servicing == 0)
298 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
299 pthread_mutex_unlock(&diskmtx);
308 * Monitor block devices. Currently polls every ~10 seconds or so.
312 udev_thread(void *data __unused)
317 pthread_detach(pthread_self());
319 if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
320 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
325 while (ioctl(fd, UDEVWAIT, &seq) == 0) {
332 static void *autoconn_connect_thread(void *data);
333 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
337 autoconn_thread(void *data __unused)
339 TAILQ_HEAD(, autoconn) autolist;
341 struct autoconn *next;
350 TAILQ_INIT(&autolist);
354 pthread_detach(pthread_self());
362 * Poll the file. Loop up if the synchronized state (lmod)
365 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
366 if (lmod == st.st_mtime)
368 fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
378 * Wait at least 5 seconds after the file is created or
381 * Do not update the synchronized state.
383 if (fp == NULL && found_last) {
386 } else if (fp && found_last == 0) {
393 * Don't scan the file until the time progresses past the
394 * file's mtime, so we can validate that the file was not
395 * further modified during our scan.
397 * Do not update the synchronized state.
401 if (t == st.st_mtime) {
411 * Set staging to disconnect, then scan the file.
413 TAILQ_FOREACH(ac, &autolist, entry)
415 while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
418 if ((host = strtok(buf, " \t\r\n")) == NULL ||
422 TAILQ_FOREACH(ac, &autolist, entry) {
423 if (strcmp(host, ac->host) == 0)
427 ac = malloc(sizeof(*ac));
428 bzero(ac, sizeof(*ac));
429 ac->host = strdup(host);
430 ac->state = AUTOCONN_INACTIVE;
431 TAILQ_INSERT_TAIL(&autolist, ac, entry);
437 * Ignore the scan (and retry again) if the file was
438 * modified during the scan.
440 * Do not update the synchronized state.
443 if (fstat(fileno(fp), &st) < 0) {
448 if (t != st.st_mtime)
453 * Update the synchronized state and reconfigure the
454 * connect list as needed.
457 next = TAILQ_FIRST(&autolist);
458 while ((ac = next) != NULL) {
459 next = TAILQ_NEXT(ac, entry);
464 if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
465 if (pipe(ac->pipefd) == 0) {
467 ac->state = AUTOCONN_ACTIVE;
469 pthread_create(&thread, NULL,
470 autoconn_connect_thread,
476 * Unstaging, stop active connection.
478 * We write to the pipe which causes the iocom_core
479 * to call autoconn_disconnect_signal().
481 if (ac->stage == 0 &&
482 ac->state == AUTOCONN_ACTIVE) {
483 if (ac->stopme == 0) {
486 write(ac->pipefd[1], &dummy, 1);
491 * Unstaging, delete inactive connection.
493 if (ac->stage == 0 &&
494 ac->state == AUTOCONN_INACTIVE) {
495 TAILQ_REMOVE(&autolist, ac, entry);
508 autoconn_connect_thread(void *data)
510 dmsg_master_service_info_t *info;
516 pthread_detach(pthread_self());
518 while (ac->stopme == 0) {
519 fd = dmsg_connect(ac->host);
521 fprintf(stderr, "autoconn: Connect failure: %s\n",
526 fprintf(stderr, "autoconn: Connect %s\n", ac->host);
528 info = malloc(sizeof(*info));
529 bzero(info, sizeof(*info));
531 info->altfd = ac->pipefd[0];
532 info->altmsg_callback = autoconn_disconnect_signal;
534 info->noclosealt = 1;
535 pthread_create(&ac->thread, NULL, dmsg_master_service, info);
536 pthread_join(ac->thread, &res);
538 close(ac->pipefd[0]);
539 ac->state = AUTOCONN_INACTIVE;
540 /* auto structure can be ripped out here */
546 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
548 fprintf(stderr, "autoconn: Shutting down socket\n");
549 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
553 * Retrieve the list of disk attachments and attempt to export
558 udev_check_disks(void)
568 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
569 if (error < 0 || n == 0)
571 if (n >= sizeof(tmpbuf))
575 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
588 fprintf(stderr, "DISKS: %s\n", buf);
589 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
590 disk_reconnect(disk);
598 * Normally the mount program supplies a cluster communications
599 * descriptor to the hammer2 vfs on mount, but if you kill the service
600 * daemon and restart it that link will be lost.
602 * This procedure attempts to [re]connect to existing mounts when
603 * the service daemon is started up before going into its accept
606 * NOTE: A hammer2 mount point can only accomodate one connection at a time
607 * so this will disconnect any existing connection during the
612 master_reconnect(const char *mntpt)
614 struct hammer2_ioc_recluster recls;
615 dmsg_master_service_info_t *info;
620 fd = open(mntpt, O_RDONLY);
622 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
625 if (pipe(pipefds) < 0) {
626 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
630 bzero(&recls, sizeof(recls));
631 recls.fd = pipefds[0];
632 if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
633 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
642 info = malloc(sizeof(*info));
643 bzero(info, sizeof(*info));
644 info->fd = pipefds[1];
646 info->dbgmsg_callback = hammer2_shell_parse;
647 info->label = strdup("hammer2");
648 pthread_create(&thread, NULL, dmsg_master_service, info);
652 * Reconnect a physical disk service to the mesh.
656 disk_reconnect(const char *disk)
658 struct disk_ioc_recluster recls;
660 dmsg_master_service_info_t *info;
667 * Urm, this will auto-create mdX+1, just ignore for now.
668 * This mechanic needs to be fixed. It might actually be nice
669 * to be able to export md disks.
671 if (strncmp(disk, "md", 2) == 0)
673 if (strncmp(disk, "xa", 2) == 0)
677 * Check if already connected
679 pthread_mutex_lock(&diskmtx);
680 TAILQ_FOREACH(dc, &diskconq, entry) {
681 if (strcmp(dc->disk, disk) == 0)
684 pthread_mutex_unlock(&diskmtx);
689 * Not already connected, create a connection to the kernel
692 asprintf(&path, "/dev/%s", disk);
693 fd = open(path, O_RDONLY);
695 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
700 if (pipe(pipefds) < 0) {
701 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
705 bzero(&recls, sizeof(recls));
706 recls.fd = pipefds[0];
707 if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
708 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
717 dc = malloc(sizeof(*dc));
718 dc->disk = strdup(disk);
719 pthread_mutex_lock(&diskmtx);
720 TAILQ_INSERT_TAIL(&diskconq, dc, entry);
721 pthread_mutex_unlock(&diskmtx);
723 info = malloc(sizeof(*info));
724 bzero(info, sizeof(*info));
725 info->fd = pipefds[1];
727 info->dbgmsg_callback = hammer2_shell_parse;
728 info->exit_callback = disk_disconnect;
730 info->label = strdup(dc->disk);
731 pthread_create(&thread, NULL, dmsg_master_service, info);
736 disk_disconnect(void *handle)
738 struct diskcon *dc = handle;
740 fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
742 pthread_mutex_lock(&diskmtx);
743 TAILQ_REMOVE(&diskconq, dc, entry);
744 pthread_mutex_unlock(&diskmtx);
750 * [re]connect a remote disk service to the local system via /dev/xdisk.
754 xdisk_reconnect(struct service_node_opaque *xdisk)
756 struct xdisk_attach_ioctl *xaioc;
757 dmsg_master_service_info_t *info;
761 if (pipe(pipefds) < 0) {
762 fprintf(stderr, "reconnect %s: pipe() failed\n",
767 info = malloc(sizeof(*info));
768 bzero(info, sizeof(*info));
769 info->fd = pipefds[1];
771 info->dbgmsg_callback = hammer2_shell_parse;
772 info->exit_callback = xdisk_disconnect;
773 info->handle = xdisk;
774 xdisk->servicing = 1;
775 xdisk->servicefd = info->fd;
776 info->label = strdup(xdisk->cl_label);
777 pthread_create(&thread, NULL, dmsg_master_service, info);
780 * We have to run the attach in its own pthread because it will
781 * synchronously interact with the messaging subsystem over the
782 * pipe. If we do it here we will deadlock.
784 xaioc = malloc(sizeof(*xaioc));
785 bzero(xaioc, sizeof(*xaioc));
786 snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
787 "%s", xdisk->cl_label);
788 snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
789 "X-%s", xdisk->fs_label);
790 xaioc->bytes = xdisk->block.bytes;
791 xaioc->blksize = xdisk->block.blksize;
792 xaioc->fd = pipefds[0];
794 pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
799 xdisk_attach_tmpthread(void *data)
801 struct xdisk_attach_ioctl *xaioc = data;
804 pthread_detach(pthread_self());
806 fd = open("/dev/xdisk", O_RDWR, 0600);
808 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
810 if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
811 fprintf(stderr, "reconnect %s: xdisk attach failed\n",
821 xdisk_disconnect(void *handle)
823 struct service_node_opaque *info = handle;
825 assert(info->servicing == 1);
827 pthread_mutex_lock(&diskmtx);
829 if (info->attached == 0)
831 pthread_mutex_unlock(&diskmtx);