Merge branch 'vendor/LIBARCHIVE'
[dragonfly.git] / sbin / hammer2 / cmd_service.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
40
41 struct diskcon {
42         TAILQ_ENTRY(diskcon) entry;
43         char    *disk;
44 };
45
46 struct service_node_opaque {
47         char    cl_label[64];
48         char    fs_label[64];
49         dmsg_media_block_t block;
50         int     attached;
51         int     servicing;
52         int     servicefd;
53 };
54
55 struct autoconn {
56         TAILQ_ENTRY(autoconn) entry;
57         char    *host;
58         int     stage;
59         int     stopme;
60         int     pipefd[2];      /* {read,write} */
61         enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
62         pthread_t thread;
63 };
64
65 #define WS " \r\n"
66
67 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
68 pthread_mutex_t diskmtx;
69
70 static void *service_thread(void *data);
71 static void *udev_thread(void *data);
72 static void *autoconn_thread(void *data);
73 static void master_reconnect(const char *mntpt);
74 static void disk_reconnect(const char *disk);
75 static void disk_disconnect(void *handle);
76 static void udev_check_disks(void);
77 static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
78
79 static void xdisk_reconnect(struct service_node_opaque *info);
80 static void xdisk_disconnect(void *handle);
81 static void *xdisk_attach_tmpthread(void *data);
82
83 /*
84  * Start-up the master listener daemon for the machine.
85  *
86  * The master listener serves as a rendezvous point in the cluster, accepting
87  * connections, performing registrations and authentications, maintaining
88  * the spanning tree, and keeping track of message state so disconnects can
89  * be handled properly.
90  *
91  * Once authenticated only low-level messaging protocols (which includes
92  * tracking persistent messages) are handled by this daemon.  This daemon
93  * does not run the higher level quorum or locking protocols.
94  *
95  * This daemon can also be told to maintain connections to other nodes,
96  * forming a messaging backbone, which in turn allows PFS's (if desired) to
97  * simply connect to the master daemon via localhost if desired.
98  * Backbones are specified via /etc/hammer2.conf.
99  */
100 int
101 cmd_service(void)
102 {
103         struct sockaddr_in lsin;
104         int on;
105         int lfd;
106
107         /*
108          * Acquire socket and set options
109          */
110         if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
111                 fprintf(stderr, "master_listen: socket(): %s\n",
112                         strerror(errno));
113                 return 1;
114         }
115         on = 1;
116         setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
117
118         /*
119          * Setup listen port and try to bind.  If the bind fails we assume
120          * that a master listener process is already running and silently
121          * fail.
122          */
123         bzero(&lsin, sizeof(lsin));
124         lsin.sin_family = AF_INET;
125         lsin.sin_addr.s_addr = INADDR_ANY;
126         lsin.sin_port = htons(DMSG_LISTEN_PORT);
127         if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
128                 close(lfd);
129                 if (QuietOpt == 0) {
130                         fprintf(stderr,
131                                 "master listen: daemon already running\n");
132                 }
133                 return 0;
134         }
135         if (QuietOpt == 0)
136                 fprintf(stderr, "master listen: startup\n");
137         listen(lfd, 50);
138
139         /*
140          * Fork and disconnect the controlling terminal and parent process,
141          * executing the specified function as a pthread.
142          *
143          * Returns to the original process which can then continue running.
144          * In debug mode this call will create the pthread without forking
145          * and set NormalExit to 0, instead of fork.
146          */
147         hammer2_demon(service_thread, (void *)(intptr_t)lfd);
148         if (NormalExit)
149                 close(lfd);
150         return 0;
151 }
152
153 /*
154  * Master listen/accept thread.  Accept connections on the master socket,
155  * starting a pthread for each one.
156  */
157 static
158 void *
159 service_thread(void *data)
160 {
161         struct sockaddr_in asin;
162         socklen_t alen;
163         pthread_t thread;
164         dmsg_master_service_info_t *info;
165         int lfd = (int)(intptr_t)data;
166         int fd;
167         int i;
168         int count;
169         struct statfs *mntbuf = NULL;
170         struct statvfs *mntvbuf = NULL;
171
172         /*
173          * Nobody waits for us
174          */
175         setproctitle("hammer2 master listen");
176         pthread_detach(pthread_self());
177
178         dmsg_node_handler = service_node_handler;
179
180         /*
181          * Start up a thread to handle block device monitoring
182          */
183         thread = NULL;
184         pthread_create(&thread, NULL, udev_thread, NULL);
185
186         /*
187          * Start thread to manage /etc/hammer2/autoconn
188          */
189         thread = NULL;
190         pthread_create(&thread, NULL, autoconn_thread, NULL);
191
192         /*
193          * Scan existing hammer2 mounts and reconnect to them using
194          * HAMMER2IOC_RECLUSTER.
195          */
196         count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
197         for (i = 0; i < count; ++i) {
198                 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
199                         master_reconnect(mntbuf[i].f_mntonname);
200         }
201
202         /*
203          * Accept connections and create pthreads to handle them after
204          * validating the IP.
205          */
206         for (;;) {
207                 alen = sizeof(asin);
208                 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
209                 if (fd < 0) {
210                         if (errno == EINTR)
211                                 continue;
212                         break;
213                 }
214                 thread = NULL;
215                 fprintf(stderr, "service_thread: accept fd %d\n", fd);
216                 info = malloc(sizeof(*info));
217                 bzero(info, sizeof(*info));
218                 info->fd = fd;
219                 info->detachme = 1;
220                 info->dbgmsg_callback = hammer2_shell_parse;
221                 info->label = strdup("client");
222                 pthread_create(&thread, NULL, dmsg_master_service, info);
223         }
224         return (NULL);
225 }
226
227 /*
228  * Node discovery code on received SPANs (or loss of SPANs).  This code
229  * is used to track the availability of remote block devices and install
230  * or deinstall them using the xdisk driver (/dev/xdisk).
231  *
232  * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
233  * the data handed to it.  When opened, a virtual circuit is forged and
234  * maintained to the block device server via DMSG.  Temporary failures
235  * stall the device until successfully reconnected or explicitly destroyed.
236  */
237 static
238 void
239 service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
240 {
241         struct service_node_opaque *info = *opaquep;
242
243         switch(op) {
244         case DMSG_NODEOP_ADD:
245                 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
246                         break;
247                 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
248                         break;
249                 if (info == NULL) {
250                         info = malloc(sizeof(*info));
251                         bzero(info, sizeof(*info));
252                         *opaquep = info;
253                 }
254                 snprintf(info->cl_label, sizeof(info->cl_label),
255                          "%s", msg->any.lnk_span.cl_label);
256                 snprintf(info->fs_label, sizeof(info->fs_label),
257                          "%s", msg->any.lnk_span.fs_label);
258                 info->block = msg->any.lnk_span.media.block;
259                 fprintf(stderr, "NODE ADD %s serno %s\n",
260                         info->cl_label, info->fs_label);
261                 info->attached = 1;
262                 xdisk_reconnect(info);
263                 break;
264         case DMSG_NODEOP_DEL:
265                 if (info) {
266                         fprintf(stderr, "NODE DEL %s serno %s\n",
267                                 info->cl_label, info->fs_label);
268                         pthread_mutex_lock(&diskmtx);
269                         *opaquep = NULL;
270                         info->attached = 0;
271                         if (info->servicing == 0)
272                                 free(info);
273                         else
274                                 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
275                         pthread_mutex_unlock(&diskmtx);
276                 }
277                 break;
278         default:
279                 break;
280         }
281 }
282
283 /*
284  * Monitor block devices.  Currently polls every ~10 seconds or so.
285  */
286 static
287 void *
288 udev_thread(void *data __unused)
289 {
290         int     fd;
291         int     seq = 0;
292
293         pthread_detach(pthread_self());
294
295         if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
296                 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
297                         UDEV_DEVICE_PATH);
298                 pthread_exit(NULL);
299         }
300         udev_check_disks();
301         while (ioctl(fd, UDEVWAIT, &seq) == 0) {
302                 udev_check_disks();
303                 sleep(1);
304         }
305         return (NULL);
306 }
307
308 static void *autoconn_connect_thread(void *data);
309 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
310
311 static
312 void *
313 autoconn_thread(void *data __unused)
314 {
315         TAILQ_HEAD(, autoconn) autolist;
316         struct autoconn *ac;
317         struct autoconn *next;
318         pthread_t thread;
319         struct stat st;
320         time_t  t;
321         time_t  lmod;
322         int     found_last;
323         FILE    *fp;
324         char    buf[256];
325
326         TAILQ_INIT(&autolist);
327         found_last = 0;
328         lmod = 0;
329
330         pthread_detach(pthread_self());
331         for (;;) {
332                 /*
333                  * Polling interval
334                  */
335                 sleep(5);
336
337                 /*
338                  * Poll the file.  Loop up if the synchronized state (lmod)
339                  * has not changed.
340                  */
341                 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
342                         if (lmod == st.st_mtime)
343                                 continue;
344                         fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
345                         if (fp == NULL)
346                                 continue;
347                 } else {
348                         if (lmod == 0)
349                                 continue;
350                         fp = NULL;
351                 }
352
353                 /*
354                  * Wait at least 5 seconds after the file is created or
355                  * removed.
356                  *
357                  * Do not update the synchronized state.
358                  */
359                 if (fp == NULL && found_last) {
360                         found_last = 0;
361                         continue;
362                 } else if (fp && found_last == 0) {
363                         fclose(fp);
364                         found_last = 1;
365                         continue;
366                 }
367
368                 /*
369                  * Don't scan the file until the time progresses past the
370                  * file's mtime, so we can validate that the file was not
371                  * further modified during our scan.
372                  *
373                  * Do not update the synchronized state.
374                  */
375                 time(&t);
376                 if (fp) {
377                         if (t == st.st_mtime) {
378                                 fclose(fp);
379                                 continue;
380                         }
381                         t = st.st_mtime;
382                 } else {
383                         t = 0;
384                 }
385
386                 /*
387                  * Set staging to disconnect, then scan the file.
388                  */
389                 TAILQ_FOREACH(ac, &autolist, entry)
390                         ac->stage = 0;
391                 while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
392                         char *host;
393
394                         if ((host = strtok(buf, " \t\r\n")) == NULL ||
395                             host[0] == '#') {
396                                 continue;
397                         }
398                         TAILQ_FOREACH(ac, &autolist, entry) {
399                                 if (strcmp(host, ac->host) == 0)
400                                         break;
401                         }
402                         if (ac == NULL) {
403                                 ac = malloc(sizeof(*ac));
404                                 bzero(ac, sizeof(*ac));
405                                 ac->host = strdup(host);
406                                 ac->state = AUTOCONN_INACTIVE;
407                                 TAILQ_INSERT_TAIL(&autolist, ac, entry);
408                         }
409                         ac->stage = 1;
410                 }
411
412                 /*
413                  * Ignore the scan (and retry again) if the file was
414                  * modified during the scan.
415                  *
416                  * Do not update the synchronized state.
417                  */
418                 if (fp) {
419                         if (fstat(fileno(fp), &st) < 0) {
420                                 fclose(fp);
421                                 continue;
422                         }
423                         fclose(fp);
424                         if (t != st.st_mtime)
425                                 continue;
426                 }
427
428                 /*
429                  * Update the synchronized state and reconfigure the
430                  * connect list as needed.
431                  */
432                 lmod = t;
433                 next = TAILQ_FIRST(&autolist);
434                 while ((ac = next) != NULL) {
435                         next = TAILQ_NEXT(ac, entry);
436
437                         /*
438                          * Staging, initiate
439                          */
440                         if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
441                                 if (pipe(ac->pipefd) == 0) {
442                                         ac->stopme = 0;
443                                         ac->state = AUTOCONN_ACTIVE;
444                                         thread = NULL;
445                                         pthread_create(&thread, NULL,
446                                                        autoconn_connect_thread,
447                                                        ac);
448                                 }
449                         }
450
451                         /*
452                          * Unstaging, stop active connection.
453                          *
454                          * We write to the pipe which causes the iocom_core
455                          * to call autoconn_disconnect_signal().
456                          */
457                         if (ac->stage == 0 &&
458                             ac->state == AUTOCONN_ACTIVE) {
459                                 if (ac->stopme == 0) {
460                                         char dummy = 0;
461                                         ac->stopme = 1;
462                                         write(ac->pipefd[1], &dummy, 1);
463                                 }
464                         }
465
466                         /*
467                          * Unstaging, delete inactive connection.
468                          */
469                         if (ac->stage == 0 &&
470                             ac->state == AUTOCONN_INACTIVE) {
471                                 TAILQ_REMOVE(&autolist, ac, entry);
472                                 free(ac->host);
473                                 free(ac);
474                                 continue;
475                         }
476                 }
477                 sleep(5);
478         }
479         return(NULL);
480 }
481
482 static
483 void *
484 autoconn_connect_thread(void *data)
485 {
486         dmsg_master_service_info_t *info;
487         struct autoconn *ac;
488         void *res;
489         int fd;
490
491         ac = data;
492         pthread_detach(pthread_self());
493
494         while (ac->stopme == 0) {
495                 fd = dmsg_connect(ac->host);
496                 if (fd < 0) {
497                         fprintf(stderr, "autoconn: Connect failure: %s\n",
498                                 ac->host);
499                         sleep(5);
500                         continue;
501                 }
502                 fprintf(stderr, "autoconn: Connect %s\n", ac->host);
503
504                 info = malloc(sizeof(*info));
505                 bzero(info, sizeof(*info));
506                 info->fd = fd;
507                 info->altfd = ac->pipefd[0];
508                 info->altmsg_callback = autoconn_disconnect_signal;
509                 info->detachme = 0;
510                 info->noclosealt = 1;
511                 pthread_create(&ac->thread, NULL, dmsg_master_service, info);
512                 pthread_join(ac->thread, &res);
513         }
514         close(ac->pipefd[0]);
515         ac->state = AUTOCONN_INACTIVE;
516         /* auto structure can be ripped out here */
517         return(NULL);
518 }
519
520 static
521 void
522 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
523 {
524         fprintf(stderr, "autoconn: Shutting down socket\n");
525         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
526 }
527
528 /*
529  * Retrieve the list of disk attachments and attempt to export
530  * them.
531  */
532 static
533 void
534 udev_check_disks(void)
535 {
536         char tmpbuf[1024];
537         char *buf = NULL;
538         char *disk;
539         int error;
540         size_t n;
541
542         for (;;) {
543                 n = 0;
544                 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
545                 if (error < 0 || n == 0)
546                         break;
547                 if (n >= sizeof(tmpbuf))
548                         buf = malloc(n + 1);
549                 else
550                         buf = tmpbuf;
551                 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
552                 if (error == 0) {
553                         buf[n] = 0;
554                         break;
555                 }
556                 if (buf != tmpbuf) {
557                         free(buf);
558                         buf = NULL;
559                 }
560                 if (errno != ENOMEM)
561                         break;
562         }
563         if (buf) {
564                 fprintf(stderr, "DISKS: %s\n", buf);
565                 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
566                         disk_reconnect(disk);
567                 }
568                 if (buf != tmpbuf)
569                         free(buf);
570         }
571 }
572
573 /*
574  * Normally the mount program supplies a cluster communications
575  * descriptor to the hammer2 vfs on mount, but if you kill the service
576  * daemon and restart it that link will be lost.
577  *
578  * This procedure attempts to [re]connect to existing mounts when
579  * the service daemon is started up before going into its accept
580  * loop.
581  *
582  * NOTE: A hammer2 mount point can only accomodate one connection at a time
583  *       so this will disconnect any existing connection during the
584  *       reconnect.
585  */
586 static
587 void
588 master_reconnect(const char *mntpt)
589 {
590         struct hammer2_ioc_recluster recls;
591         dmsg_master_service_info_t *info;
592         pthread_t thread;
593         int fd;
594         int pipefds[2];
595
596         fd = open(mntpt, O_RDONLY);
597         if (fd < 0) {
598                 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
599                 return;
600         }
601         if (pipe(pipefds) < 0) {
602                 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
603                 close(fd);
604                 return;
605         }
606         bzero(&recls, sizeof(recls));
607         recls.fd = pipefds[0];
608         if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
609                 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
610                 close(pipefds[0]);
611                 close(pipefds[1]);
612                 close(fd);
613                 return;
614         }
615         close(pipefds[0]);
616         close(fd);
617
618         info = malloc(sizeof(*info));
619         bzero(info, sizeof(*info));
620         info->fd = pipefds[1];
621         info->detachme = 1;
622         info->dbgmsg_callback = hammer2_shell_parse;
623         info->label = strdup("hammer2");
624         pthread_create(&thread, NULL, dmsg_master_service, info);
625 }
626
627 /*
628  * Reconnect a physical disk service to the mesh.
629  */
630 static
631 void
632 disk_reconnect(const char *disk)
633 {
634         struct disk_ioc_recluster recls;
635         struct diskcon *dc;
636         dmsg_master_service_info_t *info;
637         pthread_t thread;
638         int fd;
639         int pipefds[2];
640         char *path;
641
642         /*
643          * Urm, this will auto-create mdX+1, just ignore for now.
644          * This mechanic needs to be fixed.  It might actually be nice
645          * to be able to export md disks.
646          */
647         if (strncmp(disk, "md", 2) == 0)
648                 return;
649         if (strncmp(disk, "xa", 2) == 0)
650                 return;
651
652         /*
653          * Check if already connected
654          */
655         pthread_mutex_lock(&diskmtx);
656         TAILQ_FOREACH(dc, &diskconq, entry) {
657                 if (strcmp(dc->disk, disk) == 0)
658                         break;
659         }
660         pthread_mutex_unlock(&diskmtx);
661         if (dc)
662                 return;
663
664         /*
665          * Not already connected, create a connection to the kernel
666          * disk driver.
667          */
668         asprintf(&path, "/dev/%s", disk);
669         fd = open(path, O_RDONLY);
670         if (fd < 0) {
671                 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
672                 free(path);
673                 return;
674         }
675         free(path);
676         if (pipe(pipefds) < 0) {
677                 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
678                 close(fd);
679                 return;
680         }
681         bzero(&recls, sizeof(recls));
682         recls.fd = pipefds[0];
683         if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
684                 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
685                 close(pipefds[0]);
686                 close(pipefds[1]);
687                 close(fd);
688                 return;
689         }
690         close(pipefds[0]);
691         close(fd);
692
693         dc = malloc(sizeof(*dc));
694         dc->disk = strdup(disk);
695         pthread_mutex_lock(&diskmtx);
696         TAILQ_INSERT_TAIL(&diskconq, dc, entry);
697         pthread_mutex_unlock(&diskmtx);
698
699         info = malloc(sizeof(*info));
700         bzero(info, sizeof(*info));
701         info->fd = pipefds[1];
702         info->detachme = 1;
703         info->dbgmsg_callback = hammer2_shell_parse;
704         info->exit_callback = disk_disconnect;
705         info->handle = dc;
706         info->label = strdup(dc->disk);
707         pthread_create(&thread, NULL, dmsg_master_service, info);
708 }
709
710 static
711 void
712 disk_disconnect(void *handle)
713 {
714         struct diskcon *dc = handle;
715
716         fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
717
718         pthread_mutex_lock(&diskmtx);
719         TAILQ_REMOVE(&diskconq, dc, entry);
720         pthread_mutex_unlock(&diskmtx);
721         free(dc->disk);
722         free(dc);
723 }
724
725 /*
726  * [re]connect a remote disk service to the local system via /dev/xdisk.
727  */
728 static
729 void
730 xdisk_reconnect(struct service_node_opaque *xdisk)
731 {
732         struct xdisk_attach_ioctl *xaioc;
733         dmsg_master_service_info_t *info;
734         pthread_t thread;
735         int pipefds[2];
736
737         if (pipe(pipefds) < 0) {
738                 fprintf(stderr, "reconnect %s: pipe() failed\n",
739                         xdisk->cl_label);
740                 return;
741         }
742
743         info = malloc(sizeof(*info));
744         bzero(info, sizeof(*info));
745         info->fd = pipefds[1];
746         info->detachme = 1;
747         info->dbgmsg_callback = hammer2_shell_parse;
748         info->exit_callback = xdisk_disconnect;
749         info->handle = xdisk;
750         xdisk->servicing = 1;
751         xdisk->servicefd = info->fd;
752         info->label = strdup(xdisk->cl_label);
753         pthread_create(&thread, NULL, dmsg_master_service, info);
754
755         /*
756          * We have to run the attach in its own pthread because it will
757          * synchronously interact with the messaging subsystem over the
758          * pipe.  If we do it here we will deadlock.
759          */
760         xaioc = malloc(sizeof(*xaioc));
761         bzero(xaioc, sizeof(xaioc));
762         snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
763                  "%s", xdisk->cl_label);
764         snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
765                  "X-%s", xdisk->fs_label);
766         xaioc->bytes = xdisk->block.bytes;
767         xaioc->blksize = xdisk->block.blksize;
768         xaioc->fd = pipefds[0];
769
770         pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
771 }
772
773 static
774 void *
775 xdisk_attach_tmpthread(void *data)
776 {
777         struct xdisk_attach_ioctl *xaioc = data;
778         int fd;
779
780         pthread_detach(pthread_self());
781
782         fd = open("/dev/xdisk", O_RDWR, 0600);
783         if (fd < 0) {
784                 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
785         }
786         if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
787                 fprintf(stderr, "reconnect %s: xdisk attach failed\n",
788                         xaioc->cl_label);
789         }
790         close(xaioc->fd);
791         close(fd);
792         return (NULL);
793 }
794
795 static
796 void
797 xdisk_disconnect(void *handle)
798 {
799         struct service_node_opaque *info = handle;
800
801         assert(info->servicing == 1);
802
803         pthread_mutex_lock(&diskmtx);
804         info->servicing = 0;
805         if (info->attached == 0)
806                 free(info);
807         pthread_mutex_unlock(&diskmtx);
808 }