Merge branch 'vendor/OPENSSL'
[dragonfly.git] / sbin / hammer2 / cmd_service.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
40
41 struct diskcon {
42         TAILQ_ENTRY(diskcon) entry;
43         char    *disk;
44 };
45
46 struct service_node_opaque {
47         char    cl_label[64];
48         char    fs_label[64];
49         dmsg_media_block_t block;
50         int     attached;
51         int     servicing;
52         int     servicefd;
53 };
54
55 struct autoconn {
56         TAILQ_ENTRY(autoconn) entry;
57         char    *host;
58         int     stage;
59         int     stopme;
60         int     pipefd[2];      /* {read,write} */
61         enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
62         pthread_t thread;
63 };
64
65 #define WS " \r\n"
66
67 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
68 pthread_mutex_t diskmtx;
69
70 static void *service_thread(void *data);
71 static void *udev_thread(void *data);
72 static void *autoconn_thread(void *data);
73 static void master_reconnect(const char *mntpt);
74 static void disk_reconnect(const char *disk);
75 static void disk_disconnect(void *handle);
76 static void udev_check_disks(void);
77 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
78
79 static void xdisk_reconnect(struct service_node_opaque *info);
80 static void xdisk_disconnect(void *handle);
81 static void *xdisk_attach_tmpthread(void *data);
82
83 /*
84  * Start-up the master listener daemon for the machine.  This daemon runs
85  * a UDP discovery protocol, a TCP rendezvous, and scans certain files
86  * and directories for work.
87  *
88  * --
89  *
90  * The only purpose for the UDP discovery protocol is to determine what
91  * other IPs on the LAN are running the hammer2 service daemon.  DNS is not
92  * required to operate, but hostnames (if assigned) must be unique.  If
93  * no hostname is assigned the host's IP is used as the name.  This name
94  * is broadcast along with the mtime of the originator's private key.
95  *
96  * Receiving hammer2 service daemons which are able to match the label against
97  * /etc/hammer2/remote/<label>.pub will initiate a persistent connection
98  * to the target.  Removal of the file will cause a disconnection.  A failed
99  * public key negotiation stops further connection attempts until either the
100  * file is updated or the remote mtime is updated.
101  *
102  * Generally speaking this results in a web of connections, typically a
103  * combination of point-to-point for the more important links and relayed
104  * (spanning tree) for less important or filtered links.
105  *
106  * --
107  *
108  * The TCP listener serves as a rendezvous point in the cluster, accepting
109  * connections, performing registrations and authentications, maintaining
110  * the spanning tree, and keeping track of message state so disconnects can
111  * be handled properly.
112  *
113  * Once authenticated only low-level messaging protocols (which includes
114  * tracking persistent messages) are handled by this daemon.  This daemon
115  * does not run the higher level quorum or locking protocols.
116  *
117  * --
118  *
119  * The file /etc/hammer2/autoconn, if it exists, contains a list of targets
120  * to connect to (which do not have to be on the local lan).  This list will
121  * be retried until a connection can be established.  The file is not usually
122  * needed for linkages local to the LAN.
123  */
124 int
125 cmd_service(void)
126 {
127         struct sockaddr_in lsin;
128         int on;
129         int lfd;
130
131         /*
132          * Acquire socket and set options
133          */
134         if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
135                 fprintf(stderr, "master_listen: socket(): %s\n",
136                         strerror(errno));
137                 return 1;
138         }
139         on = 1;
140         setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
141
142         /*
143          * Setup listen port and try to bind.  If the bind fails we assume
144          * that a master listener process is already running and silently
145          * fail.
146          */
147         bzero(&lsin, sizeof(lsin));
148         lsin.sin_family = AF_INET;
149         lsin.sin_addr.s_addr = INADDR_ANY;
150         lsin.sin_port = htons(DMSG_LISTEN_PORT);
151         if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
152                 close(lfd);
153                 if (QuietOpt == 0) {
154                         fprintf(stderr,
155                                 "master listen: daemon already running\n");
156                 }
157                 return 0;
158         }
159         if (QuietOpt == 0)
160                 fprintf(stderr, "master listen: startup\n");
161         listen(lfd, 50);
162
163         /*
164          * Fork and disconnect the controlling terminal and parent process,
165          * executing the specified function as a pthread.
166          *
167          * Returns to the original process which can then continue running.
168          * In debug mode this call will create the pthread without forking
169          * and set NormalExit to 0, instead of fork.
170          */
171         hammer2_demon(service_thread, (void *)(intptr_t)lfd);
172         if (NormalExit)
173                 close(lfd);
174         return 0;
175 }
176
177 /*
178  * Master listen/accept thread.  Accept connections on the master socket,
179  * starting a pthread for each one.
180  */
181 static
182 void *
183 service_thread(void *data)
184 {
185         struct sockaddr_in asin;
186         socklen_t alen;
187         pthread_t thread;
188         dmsg_master_service_info_t *info;
189         int lfd = (int)(intptr_t)data;
190         int fd;
191         int i;
192         int count;
193         struct statfs *mntbuf = NULL;
194         struct statvfs *mntvbuf = NULL;
195
196         /*
197          * Nobody waits for us
198          */
199         setproctitle("hammer2 master listen");
200         pthread_detach(pthread_self());
201
202         dmsg_node_handler = service_node_handler;
203
204         /*
205          * Start up a thread to handle block device monitoring
206          */
207         thread = NULL;
208         pthread_create(&thread, NULL, udev_thread, NULL);
209
210         /*
211          * Start thread to manage /etc/hammer2/autoconn
212          */
213         thread = NULL;
214         pthread_create(&thread, NULL, autoconn_thread, NULL);
215
216         /*
217          * Scan existing hammer2 mounts and reconnect to them using
218          * HAMMER2IOC_RECLUSTER.
219          */
220         count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
221         for (i = 0; i < count; ++i) {
222                 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
223                         master_reconnect(mntbuf[i].f_mntonname);
224         }
225
226         /*
227          * Accept connections and create pthreads to handle them after
228          * validating the IP.
229          */
230         for (;;) {
231                 alen = sizeof(asin);
232                 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
233                 if (fd < 0) {
234                         if (errno == EINTR)
235                                 continue;
236                         break;
237                 }
238                 thread = NULL;
239                 fprintf(stderr, "service_thread: accept fd %d\n", fd);
240                 info = malloc(sizeof(*info));
241                 bzero(info, sizeof(*info));
242                 info->fd = fd;
243                 info->detachme = 1;
244                 info->dbgmsg_callback = hammer2_shell_parse;
245                 info->label = strdup("client");
246                 pthread_create(&thread, NULL, dmsg_master_service, info);
247         }
248         return (NULL);
249 }
250
251 /*
252  * Node discovery code on received SPANs (or loss of SPANs).  This code
253  * is used to track the availability of remote block devices and install
254  * or deinstall them using the xdisk driver (/dev/xdisk).
255  *
256  * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
257  * the data handed to it.  When opened, a virtual circuit is forged and
258  * maintained to the block device server via DMSG.  Temporary failures
259  * stall the device until successfully reconnected or explicitly destroyed.
260  */
261 static
262 void
263 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
264 {
265         struct service_node_opaque *info = *opaquep;
266
267         switch(op) {
268         case DMSG_NODEOP_ADD:
269                 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
270                         break;
271                 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
272                         break;
273                 if (info == NULL) {
274                         info = malloc(sizeof(*info));
275                         bzero(info, sizeof(*info));
276                         *opaquep = info;
277                 }
278                 snprintf(info->cl_label, sizeof(info->cl_label),
279                          "%s", msg->any.lnk_span.cl_label);
280                 snprintf(info->fs_label, sizeof(info->fs_label),
281                          "%s", msg->any.lnk_span.fs_label);
282                 info->block = msg->any.lnk_span.media.block;
283                 fprintf(stderr, "NODE ADD %s serno %s\n",
284                         info->cl_label, info->fs_label);
285                 info->attached = 1;
286                 xdisk_reconnect(info);
287                 break;
288         case DMSG_NODEOP_DEL:
289                 if (info) {
290                         fprintf(stderr, "NODE DEL %s serno %s\n",
291                                 info->cl_label, info->fs_label);
292                         pthread_mutex_lock(&diskmtx);
293                         *opaquep = NULL;
294                         info->attached = 0;
295                         if (info->servicing == 0)
296                                 free(info);
297                         else
298                                 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
299                         pthread_mutex_unlock(&diskmtx);
300                 }
301                 break;
302         default:
303                 break;
304         }
305 }
306
307 /*
308  * Monitor block devices.  Currently polls every ~10 seconds or so.
309  */
310 static
311 void *
312 udev_thread(void *data __unused)
313 {
314         int     fd;
315         int     seq = 0;
316
317         pthread_detach(pthread_self());
318
319         if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
320                 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
321                         UDEV_DEVICE_PATH);
322                 pthread_exit(NULL);
323         }
324         udev_check_disks();
325         while (ioctl(fd, UDEVWAIT, &seq) == 0) {
326                 udev_check_disks();
327                 sleep(1);
328         }
329         return (NULL);
330 }
331
332 static void *autoconn_connect_thread(void *data);
333 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
334
335 static
336 void *
337 autoconn_thread(void *data __unused)
338 {
339         TAILQ_HEAD(, autoconn) autolist;
340         struct autoconn *ac;
341         struct autoconn *next;
342         pthread_t thread;
343         struct stat st;
344         time_t  t;
345         time_t  lmod;
346         int     found_last;
347         FILE    *fp;
348         char    buf[256];
349
350         TAILQ_INIT(&autolist);
351         found_last = 0;
352         lmod = 0;
353
354         pthread_detach(pthread_self());
355         for (;;) {
356                 /*
357                  * Polling interval
358                  */
359                 sleep(5);
360
361                 /*
362                  * Poll the file.  Loop up if the synchronized state (lmod)
363                  * has not changed.
364                  */
365                 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
366                         if (lmod == st.st_mtime)
367                                 continue;
368                         fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
369                         if (fp == NULL)
370                                 continue;
371                 } else {
372                         if (lmod == 0)
373                                 continue;
374                         fp = NULL;
375                 }
376
377                 /*
378                  * Wait at least 5 seconds after the file is created or
379                  * removed.
380                  *
381                  * Do not update the synchronized state.
382                  */
383                 if (fp == NULL && found_last) {
384                         found_last = 0;
385                         continue;
386                 } else if (fp && found_last == 0) {
387                         fclose(fp);
388                         found_last = 1;
389                         continue;
390                 }
391
392                 /*
393                  * Don't scan the file until the time progresses past the
394                  * file's mtime, so we can validate that the file was not
395                  * further modified during our scan.
396                  *
397                  * Do not update the synchronized state.
398                  */
399                 time(&t);
400                 if (fp) {
401                         if (t == st.st_mtime) {
402                                 fclose(fp);
403                                 continue;
404                         }
405                         t = st.st_mtime;
406                 } else {
407                         t = 0;
408                 }
409
410                 /*
411                  * Set staging to disconnect, then scan the file.
412                  */
413                 TAILQ_FOREACH(ac, &autolist, entry)
414                         ac->stage = 0;
415                 while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
416                         char *host;
417
418                         if ((host = strtok(buf, " \t\r\n")) == NULL ||
419                             host[0] == '#') {
420                                 continue;
421                         }
422                         TAILQ_FOREACH(ac, &autolist, entry) {
423                                 if (strcmp(host, ac->host) == 0)
424                                         break;
425                         }
426                         if (ac == NULL) {
427                                 ac = malloc(sizeof(*ac));
428                                 bzero(ac, sizeof(*ac));
429                                 ac->host = strdup(host);
430                                 ac->state = AUTOCONN_INACTIVE;
431                                 TAILQ_INSERT_TAIL(&autolist, ac, entry);
432                         }
433                         ac->stage = 1;
434                 }
435
436                 /*
437                  * Ignore the scan (and retry again) if the file was
438                  * modified during the scan.
439                  *
440                  * Do not update the synchronized state.
441                  */
442                 if (fp) {
443                         if (fstat(fileno(fp), &st) < 0) {
444                                 fclose(fp);
445                                 continue;
446                         }
447                         fclose(fp);
448                         if (t != st.st_mtime)
449                                 continue;
450                 }
451
452                 /*
453                  * Update the synchronized state and reconfigure the
454                  * connect list as needed.
455                  */
456                 lmod = t;
457                 next = TAILQ_FIRST(&autolist);
458                 while ((ac = next) != NULL) {
459                         next = TAILQ_NEXT(ac, entry);
460
461                         /*
462                          * Staging, initiate
463                          */
464                         if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
465                                 if (pipe(ac->pipefd) == 0) {
466                                         ac->stopme = 0;
467                                         ac->state = AUTOCONN_ACTIVE;
468                                         thread = NULL;
469                                         pthread_create(&thread, NULL,
470                                                        autoconn_connect_thread,
471                                                        ac);
472                                 }
473                         }
474
475                         /*
476                          * Unstaging, stop active connection.
477                          *
478                          * We write to the pipe which causes the iocom_core
479                          * to call autoconn_disconnect_signal().
480                          */
481                         if (ac->stage == 0 &&
482                             ac->state == AUTOCONN_ACTIVE) {
483                                 if (ac->stopme == 0) {
484                                         char dummy = 0;
485                                         ac->stopme = 1;
486                                         write(ac->pipefd[1], &dummy, 1);
487                                 }
488                         }
489
490                         /*
491                          * Unstaging, delete inactive connection.
492                          */
493                         if (ac->stage == 0 &&
494                             ac->state == AUTOCONN_INACTIVE) {
495                                 TAILQ_REMOVE(&autolist, ac, entry);
496                                 free(ac->host);
497                                 free(ac);
498                                 continue;
499                         }
500                 }
501                 sleep(5);
502         }
503         return(NULL);
504 }
505
506 static
507 void *
508 autoconn_connect_thread(void *data)
509 {
510         dmsg_master_service_info_t *info;
511         struct autoconn *ac;
512         void *res;
513         int fd;
514
515         ac = data;
516         pthread_detach(pthread_self());
517
518         while (ac->stopme == 0) {
519                 fd = dmsg_connect(ac->host);
520                 if (fd < 0) {
521                         fprintf(stderr, "autoconn: Connect failure: %s\n",
522                                 ac->host);
523                         sleep(5);
524                         continue;
525                 }
526                 fprintf(stderr, "autoconn: Connect %s\n", ac->host);
527
528                 info = malloc(sizeof(*info));
529                 bzero(info, sizeof(*info));
530                 info->fd = fd;
531                 info->altfd = ac->pipefd[0];
532                 info->altmsg_callback = autoconn_disconnect_signal;
533                 info->detachme = 0;
534                 info->noclosealt = 1;
535                 pthread_create(&ac->thread, NULL, dmsg_master_service, info);
536                 pthread_join(ac->thread, &res);
537         }
538         close(ac->pipefd[0]);
539         ac->state = AUTOCONN_INACTIVE;
540         /* auto structure can be ripped out here */
541         return(NULL);
542 }
543
544 static
545 void
546 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
547 {
548         fprintf(stderr, "autoconn: Shutting down socket\n");
549         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
550 }
551
552 /*
553  * Retrieve the list of disk attachments and attempt to export
554  * them.
555  */
556 static
557 void
558 udev_check_disks(void)
559 {
560         char tmpbuf[1024];
561         char *buf = NULL;
562         char *disk;
563         int error;
564         size_t n;
565
566         for (;;) {
567                 n = 0;
568                 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
569                 if (error < 0 || n == 0)
570                         break;
571                 if (n >= sizeof(tmpbuf))
572                         buf = malloc(n + 1);
573                 else
574                         buf = tmpbuf;
575                 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
576                 if (error == 0) {
577                         buf[n] = 0;
578                         break;
579                 }
580                 if (buf != tmpbuf) {
581                         free(buf);
582                         buf = NULL;
583                 }
584                 if (errno != ENOMEM)
585                         break;
586         }
587         if (buf) {
588                 fprintf(stderr, "DISKS: %s\n", buf);
589                 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
590                         disk_reconnect(disk);
591                 }
592                 if (buf != tmpbuf)
593                         free(buf);
594         }
595 }
596
597 /*
598  * Normally the mount program supplies a cluster communications
599  * descriptor to the hammer2 vfs on mount, but if you kill the service
600  * daemon and restart it that link will be lost.
601  *
602  * This procedure attempts to [re]connect to existing mounts when
603  * the service daemon is started up before going into its accept
604  * loop.
605  *
606  * NOTE: A hammer2 mount point can only accomodate one connection at a time
607  *       so this will disconnect any existing connection during the
608  *       reconnect.
609  */
610 static
611 void
612 master_reconnect(const char *mntpt)
613 {
614         struct hammer2_ioc_recluster recls;
615         dmsg_master_service_info_t *info;
616         pthread_t thread;
617         int fd;
618         int pipefds[2];
619
620         fd = open(mntpt, O_RDONLY);
621         if (fd < 0) {
622                 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
623                 return;
624         }
625         if (pipe(pipefds) < 0) {
626                 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
627                 close(fd);
628                 return;
629         }
630         bzero(&recls, sizeof(recls));
631         recls.fd = pipefds[0];
632         if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
633                 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
634                 close(pipefds[0]);
635                 close(pipefds[1]);
636                 close(fd);
637                 return;
638         }
639         close(pipefds[0]);
640         close(fd);
641
642         info = malloc(sizeof(*info));
643         bzero(info, sizeof(*info));
644         info->fd = pipefds[1];
645         info->detachme = 1;
646         info->dbgmsg_callback = hammer2_shell_parse;
647         info->label = strdup("hammer2");
648         pthread_create(&thread, NULL, dmsg_master_service, info);
649 }
650
651 /*
652  * Reconnect a physical disk service to the mesh.
653  */
654 static
655 void
656 disk_reconnect(const char *disk)
657 {
658         struct disk_ioc_recluster recls;
659         struct diskcon *dc;
660         dmsg_master_service_info_t *info;
661         pthread_t thread;
662         int fd;
663         int pipefds[2];
664         char *path;
665
666         /*
667          * Urm, this will auto-create mdX+1, just ignore for now.
668          * This mechanic needs to be fixed.  It might actually be nice
669          * to be able to export md disks.
670          */
671         if (strncmp(disk, "md", 2) == 0)
672                 return;
673         if (strncmp(disk, "xa", 2) == 0)
674                 return;
675
676         /*
677          * Check if already connected
678          */
679         pthread_mutex_lock(&diskmtx);
680         TAILQ_FOREACH(dc, &diskconq, entry) {
681                 if (strcmp(dc->disk, disk) == 0)
682                         break;
683         }
684         pthread_mutex_unlock(&diskmtx);
685         if (dc)
686                 return;
687
688         /*
689          * Not already connected, create a connection to the kernel
690          * disk driver.
691          */
692         asprintf(&path, "/dev/%s", disk);
693         fd = open(path, O_RDONLY);
694         if (fd < 0) {
695                 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
696                 free(path);
697                 return;
698         }
699         free(path);
700         if (pipe(pipefds) < 0) {
701                 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
702                 close(fd);
703                 return;
704         }
705         bzero(&recls, sizeof(recls));
706         recls.fd = pipefds[0];
707         if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
708                 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
709                 close(pipefds[0]);
710                 close(pipefds[1]);
711                 close(fd);
712                 return;
713         }
714         close(pipefds[0]);
715         close(fd);
716
717         dc = malloc(sizeof(*dc));
718         dc->disk = strdup(disk);
719         pthread_mutex_lock(&diskmtx);
720         TAILQ_INSERT_TAIL(&diskconq, dc, entry);
721         pthread_mutex_unlock(&diskmtx);
722
723         info = malloc(sizeof(*info));
724         bzero(info, sizeof(*info));
725         info->fd = pipefds[1];
726         info->detachme = 1;
727         info->dbgmsg_callback = hammer2_shell_parse;
728         info->exit_callback = disk_disconnect;
729         info->handle = dc;
730         info->label = strdup(dc->disk);
731         pthread_create(&thread, NULL, dmsg_master_service, info);
732 }
733
734 static
735 void
736 disk_disconnect(void *handle)
737 {
738         struct diskcon *dc = handle;
739
740         fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
741
742         pthread_mutex_lock(&diskmtx);
743         TAILQ_REMOVE(&diskconq, dc, entry);
744         pthread_mutex_unlock(&diskmtx);
745         free(dc->disk);
746         free(dc);
747 }
748
749 /*
750  * [re]connect a remote disk service to the local system via /dev/xdisk.
751  */
752 static
753 void
754 xdisk_reconnect(struct service_node_opaque *xdisk)
755 {
756         struct xdisk_attach_ioctl *xaioc;
757         dmsg_master_service_info_t *info;
758         pthread_t thread;
759         int pipefds[2];
760
761         if (pipe(pipefds) < 0) {
762                 fprintf(stderr, "reconnect %s: pipe() failed\n",
763                         xdisk->cl_label);
764                 return;
765         }
766
767         info = malloc(sizeof(*info));
768         bzero(info, sizeof(*info));
769         info->fd = pipefds[1];
770         info->detachme = 1;
771         info->dbgmsg_callback = hammer2_shell_parse;
772         info->exit_callback = xdisk_disconnect;
773         info->handle = xdisk;
774         xdisk->servicing = 1;
775         xdisk->servicefd = info->fd;
776         info->label = strdup(xdisk->cl_label);
777         pthread_create(&thread, NULL, dmsg_master_service, info);
778
779         /*
780          * We have to run the attach in its own pthread because it will
781          * synchronously interact with the messaging subsystem over the
782          * pipe.  If we do it here we will deadlock.
783          */
784         xaioc = malloc(sizeof(*xaioc));
785         bzero(xaioc, sizeof(*xaioc));
786         snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
787                  "%s", xdisk->cl_label);
788         snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
789                  "X-%s", xdisk->fs_label);
790         xaioc->bytes = xdisk->block.bytes;
791         xaioc->blksize = xdisk->block.blksize;
792         xaioc->fd = pipefds[0];
793
794         pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
795 }
796
797 static
798 void *
799 xdisk_attach_tmpthread(void *data)
800 {
801         struct xdisk_attach_ioctl *xaioc = data;
802         int fd;
803
804         pthread_detach(pthread_self());
805
806         fd = open("/dev/xdisk", O_RDWR, 0600);
807         if (fd < 0) {
808                 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
809         }
810         if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
811                 fprintf(stderr, "reconnect %s: xdisk attach failed\n",
812                         xaioc->cl_label);
813         }
814         close(xaioc->fd);
815         close(fd);
816         return (NULL);
817 }
818
819 static
820 void
821 xdisk_disconnect(void *handle)
822 {
823         struct service_node_opaque *info = handle;
824
825         assert(info->servicing == 1);
826
827         pthread_mutex_lock(&diskmtx);
828         info->servicing = 0;
829         if (info->attached == 0)
830                 free(info);
831         pthread_mutex_unlock(&diskmtx);
832 }