hammer2 - more dmsg/separation work
[dragonfly.git] / sbin / hammer2 / cmd_service.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
40
41 struct hammer2_media_config {
42         hammer2_volconf_t       copy_run;
43         hammer2_volconf_t       copy_pend;
44         pthread_t               thread;
45         pthread_cond_t          cond;
46         int                     ctl;
47         int                     fd;
48         int                     pipefd[2];      /* signal stop */
49         dmsg_iocom_t            iocom;
50         pthread_t               iocom_thread;
51         enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
52 };
53
54 typedef struct hammer2_media_config hammer2_media_config_t;
55
56 #define H2CONFCTL_STOP          0x00000001
57 #define H2CONFCTL_UPDATE        0x00000002
58
59 struct diskcon {
60         TAILQ_ENTRY(diskcon) entry;
61         char    *disk;
62 };
63
64 struct service_node_opaque {
65         char    cl_label[64];
66         char    fs_label[64];
67         dmsg_media_block_t block;
68         int     attached;
69         int     servicing;
70         int     servicefd;
71 };
72
73 struct autoconn {
74         TAILQ_ENTRY(autoconn) entry;
75         char    *host;
76         int     stage;
77         int     stopme;
78         int     pipefd[2];      /* {read,write} */
79         enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
80         pthread_t thread;
81 };
82
83 #define WS " \r\n"
84
85 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
86 static pthread_mutex_t diskmtx;
87 static pthread_mutex_t confmtx;
88
89 static void *service_thread(void *data);
90 static void *udev_thread(void *data);
91 static void *autoconn_thread(void *data);
92 static void master_reconnect(const char *mntpt);
93 static void disk_reconnect(const char *disk);
94 static void disk_disconnect(void *handle);
95 static void udev_check_disks(void);
96 static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged);
97 static void hammer2_node_handler(void **opaque, struct dmsg_msg *msg, int op);
98 static void *hammer2_volconf_thread(void *info);
99 static void hammer2_volconf_signal(dmsg_iocom_t *iocom);
100 static void hammer2_volconf_start(hammer2_media_config_t *conf,
101                         const char *hostname);
102 static void hammer2_volconf_stop(hammer2_media_config_t *conf);
103
104
105 static void xdisk_reconnect(struct service_node_opaque *info);
106 static void xdisk_disconnect(void *handle);
107 static void *xdisk_attach_tmpthread(void *data);
108
109 /*
110  * Start-up the master listener daemon for the machine.  This daemon runs
111  * a UDP discovery protocol, a TCP rendezvous, and scans certain files
112  * and directories for work.
113  *
114  * --
115  *
116  * The only purpose for the UDP discovery protocol is to determine what
117  * other IPs on the LAN are running the hammer2 service daemon.  DNS is not
118  * required to operate, but hostnames (if assigned) must be unique.  If
119  * no hostname is assigned the host's IP is used as the name.  This name
120  * is broadcast along with the mtime of the originator's private key.
121  *
122  * Receiving hammer2 service daemons which are able to match the label against
123  * /etc/hammer2/remote/<label>.pub will initiate a persistent connection
124  * to the target.  Removal of the file will cause a disconnection.  A failed
125  * public key negotiation stops further connection attempts until either the
126  * file is updated or the remote mtime is updated.
127  *
128  * Generally speaking this results in a web of connections, typically a
129  * combination of point-to-point for the more important links and relayed
130  * (spanning tree) for less important or filtered links.
131  *
132  * --
133  *
134  * The TCP listener serves as a rendezvous point in the cluster, accepting
135  * connections, performing registrations and authentications, maintaining
136  * the spanning tree, and keeping track of message state so disconnects can
137  * be handled properly.
138  *
139  * Once authenticated only low-level messaging protocols (which includes
140  * tracking persistent messages) are handled by this daemon.  This daemon
141  * does not run the higher level quorum or locking protocols.
142  *
143  * --
144  *
145  * The file /etc/hammer2/autoconn, if it exists, contains a list of targets
146  * to connect to (which do not have to be on the local lan).  This list will
147  * be retried until a connection can be established.  The file is not usually
148  * needed for linkages local to the LAN.
149  */
150 int
151 cmd_service(void)
152 {
153         struct sockaddr_in lsin;
154         int on;
155         int lfd;
156
157         /*
158          * Acquire socket and set options
159          */
160         if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
161                 fprintf(stderr, "master_listen: socket(): %s\n",
162                         strerror(errno));
163                 return 1;
164         }
165         on = 1;
166         setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
167
168         /*
169          * Setup listen port and try to bind.  If the bind fails we assume
170          * that a master listener process is already running and silently
171          * fail.
172          */
173         bzero(&lsin, sizeof(lsin));
174         lsin.sin_family = AF_INET;
175         lsin.sin_addr.s_addr = INADDR_ANY;
176         lsin.sin_port = htons(DMSG_LISTEN_PORT);
177         if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
178                 close(lfd);
179                 if (QuietOpt == 0) {
180                         fprintf(stderr,
181                                 "master listen: daemon already running\n");
182                 }
183                 return 0;
184         }
185         if (QuietOpt == 0)
186                 fprintf(stderr, "master listen: startup\n");
187         listen(lfd, 50);
188
189         /*
190          * Fork and disconnect the controlling terminal and parent process,
191          * executing the specified function as a pthread.
192          *
193          * Returns to the original process which can then continue running.
194          * In debug mode this call will create the pthread without forking
195          * and set NormalExit to 0, instead of fork.
196          */
197         hammer2_demon(service_thread, (void *)(intptr_t)lfd);
198         if (NormalExit)
199                 close(lfd);
200         return 0;
201 }
202
203 /*
204  * Master listen/accept thread.  Accept connections on the master socket,
205  * starting a pthread for each one.
206  */
207 static
208 void *
209 service_thread(void *data)
210 {
211         struct sockaddr_in asin;
212         socklen_t alen;
213         pthread_t thread;
214         dmsg_master_service_info_t *info;
215         int lfd = (int)(intptr_t)data;
216         int fd;
217         int i;
218         int count;
219         struct statfs *mntbuf = NULL;
220         struct statvfs *mntvbuf = NULL;
221
222         /*
223          * Nobody waits for us
224          */
225         setproctitle("hammer2 master listen");
226         pthread_detach(pthread_self());
227
228         /*
229          * Start up a thread to handle block device monitoring
230          */
231         thread = NULL;
232         pthread_create(&thread, NULL, udev_thread, NULL);
233
234         /*
235          * Start thread to manage /etc/hammer2/autoconn
236          */
237         thread = NULL;
238         pthread_create(&thread, NULL, autoconn_thread, NULL);
239
240         /*
241          * Scan existing hammer2 mounts and reconnect to them using
242          * HAMMER2IOC_RECLUSTER.
243          */
244         count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
245         for (i = 0; i < count; ++i) {
246                 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
247                         master_reconnect(mntbuf[i].f_mntonname);
248         }
249
250         /*
251          * Accept connections and create pthreads to handle them after
252          * validating the IP.
253          */
254         for (;;) {
255                 alen = sizeof(asin);
256                 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
257                 if (fd < 0) {
258                         if (errno == EINTR)
259                                 continue;
260                         break;
261                 }
262                 thread = NULL;
263                 fprintf(stderr, "service_thread: accept fd %d\n", fd);
264                 info = malloc(sizeof(*info));
265                 bzero(info, sizeof(*info));
266                 info->fd = fd;
267                 info->detachme = 1;
268                 info->usrmsg_callback = hammer2_usrmsg_handler;
269                 info->node_handler = hammer2_node_handler;
270                 info->label = strdup("client");
271                 pthread_create(&thread, NULL, dmsg_master_service, info);
272         }
273         return (NULL);
274 }
275
276 /*
277  * Handle/Monitor the dmsg stream.  If unmanaged is set we are responsible
278  * for responding for the message, otherwise if it is not set libdmsg has
279  * already done some preprocessing and will respond to the message for us
280  * when we return.
281  *
282  * We primarily monitor for VOLCONFs
283  */
284 static
285 void
286 hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged)
287 {
288         dmsg_state_t *state;
289         hammer2_media_config_t *conf;
290         dmsg_lnk_hammer2_volconf_t *msgconf;
291         int i;
292
293         /*
294          * Only process messages which are part of a LNK_CONN stream
295          */
296         state = msg->state;
297         if (state == NULL ||
298             (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) {
299                 hammer2_shell_parse(msg, unmanaged);
300                 return;
301         }
302
303         switch(msg->tcmd) {
304         case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
305         case DMSG_LNK_CONN | DMSGF_DELETE:
306         case DMSG_LNK_ERROR | DMSGF_DELETE:
307                 /*
308                  * Deleting connection, clean out all volume configs
309                  */
310                 if (state->media == NULL || state->media->usrhandle == NULL)
311                         break;
312                 conf = state->media->usrhandle;
313                 fprintf(stderr, "Shutting down media spans\n");
314                 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
315                         if (conf[i].thread) {
316                                 conf[i].ctl = H2CONFCTL_STOP;
317                                 pthread_cond_signal(&conf[i].cond);
318                         }
319                 }
320                 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
321                         if (conf[i].thread) {
322                                 pthread_join(conf[i].thread, NULL);
323                                 conf->thread = NULL;
324                                 pthread_cond_destroy(&conf[i].cond);
325                         }
326                 }
327                 state->media->usrhandle = NULL;
328                 free(conf);
329                 break;
330         case DMSG_LNK_HAMMER2_VOLCONF:
331                 /*
332                  * One-way volume-configuration message is transmitted
333                  * over the open LNK_CONN transaction.
334                  */
335                 fprintf(stderr, "RECEIVED VOLCONF\n");
336
337                 if ((conf = state->media->usrhandle) == NULL) {
338                         conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT);
339                         bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT);
340                         state->media->usrhandle = conf;
341                 }
342                 msgconf = H2_LNK_VOLCONF(msg);
343
344                 if (msgconf->index < 0 ||
345                     msgconf->index >= HAMMER2_COPYID_COUNT) {
346                         fprintf(stderr,
347                                 "VOLCONF: ILLEGAL INDEX %d\n",
348                                 msgconf->index);
349                         break;
350                 }
351                 if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 ||
352                     msgconf->copy.path[0] == 0) {
353                         fprintf(stderr,
354                                 "VOLCONF: ILLEGAL PATH %d\n",
355                                 msgconf->index);
356                         break;
357                 }
358                 conf += msgconf->index;
359                 pthread_mutex_lock(&confmtx);
360                 conf->copy_pend = msgconf->copy;
361                 conf->ctl |= H2CONFCTL_UPDATE;
362                 pthread_mutex_unlock(&confmtx);
363                 if (conf->thread == NULL) {
364                         fprintf(stderr, "VOLCONF THREAD STARTED\n");
365                         pthread_cond_init(&conf->cond, NULL);
366                         pthread_create(&conf->thread, NULL,
367                                        hammer2_volconf_thread, (void *)conf);
368                 }
369                 pthread_cond_signal(&conf->cond);
370                 break;
371         default:
372                 if (unmanaged)
373                         dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
374                 break;
375         }
376 }
377
378 static void *
379 hammer2_volconf_thread(void *info)
380 {
381         hammer2_media_config_t *conf = info;
382
383         pthread_mutex_lock(&confmtx);
384         while ((conf->ctl & H2CONFCTL_STOP) == 0) {
385                 if (conf->ctl & H2CONFCTL_UPDATE) {
386                         fprintf(stderr, "VOLCONF UPDATE\n");
387                         conf->ctl &= ~H2CONFCTL_UPDATE;
388                         if (bcmp(&conf->copy_run, &conf->copy_pend,
389                                  sizeof(conf->copy_run)) == 0) {
390                                 fprintf(stderr, "VOLCONF: no changes\n");
391                                 continue;
392                         }
393                         /*
394                          * XXX TODO - auto reconnect on lookup failure or
395                          *              connect failure or stream failure.
396                          */
397
398                         pthread_mutex_unlock(&confmtx);
399                         hammer2_volconf_stop(conf);
400                         conf->copy_run = conf->copy_pend;
401                         if (conf->copy_run.copyid != 0 &&
402                             strncmp(conf->copy_run.path, "span:", 5) == 0) {
403                                 hammer2_volconf_start(conf,
404                                                       conf->copy_run.path + 5);
405                         }
406                         pthread_mutex_lock(&confmtx);
407                         fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
408                 }
409                 if (conf->state == H2MC_CONNECT) {
410                         hammer2_volconf_start(conf, conf->copy_run.path + 5);
411                         pthread_mutex_unlock(&confmtx);
412                         sleep(5);
413                         pthread_mutex_lock(&confmtx);
414                 } else {
415                         pthread_cond_wait(&conf->cond, &confmtx);
416                 }
417         }
418         pthread_mutex_unlock(&confmtx);
419         hammer2_volconf_stop(conf);
420         return(NULL);
421 }
422
423 static
424 void
425 hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname)
426 {
427         dmsg_master_service_info_t *info;
428
429         switch(conf->state) {
430         case H2MC_STOPPED:
431         case H2MC_CONNECT:
432                 conf->fd = dmsg_connect(hostname);
433                 if (conf->fd < 0) {
434                         fprintf(stderr, "Unable to connect to %s\n", hostname);
435                         conf->state = H2MC_CONNECT;
436                 } else if (pipe(conf->pipefd) < 0) {
437                         close(conf->fd);
438                         fprintf(stderr, "pipe() failed during volconf\n");
439                         conf->state = H2MC_CONNECT;
440                 } else {
441                         fprintf(stderr, "VOLCONF CONNECT\n");
442                         info = malloc(sizeof(*info));
443                         bzero(info, sizeof(*info));
444                         info->fd = conf->fd;
445                         info->altfd = conf->pipefd[0];
446                         info->altmsg_callback = hammer2_volconf_signal;
447                         info->usrmsg_callback = hammer2_usrmsg_handler;
448                         info->detachme = 0;
449                         conf->state = H2MC_RUNNING;
450                         pthread_create(&conf->iocom_thread, NULL,
451                                        dmsg_master_service, info);
452                 }
453                 break;
454         case H2MC_RUNNING:
455                 break;
456         }
457 }
458
459 static
460 void
461 hammer2_volconf_stop(hammer2_media_config_t *conf)
462 {
463         switch(conf->state) {
464         case H2MC_STOPPED:
465                 break;
466         case H2MC_CONNECT:
467                 conf->state = H2MC_STOPPED;
468                 break;
469         case H2MC_RUNNING:
470                 close(conf->pipefd[1]);
471                 conf->pipefd[1] = -1;
472                 pthread_join(conf->iocom_thread, NULL);
473                 conf->iocom_thread = NULL;
474                 conf->state = H2MC_STOPPED;
475                 break;
476         }
477 }
478
479 static
480 void
481 hammer2_volconf_signal(dmsg_iocom_t *iocom)
482 {
483         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
484 }
485
486
487 /*
488  * Node discovery code on received SPANs (or loss of SPANs).  This code
489  * is used to track the availability of remote block devices and install
490  * or deinstall them using the xdisk driver (/dev/xdisk).
491  *
492  * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
493  * the data handed to it.  When opened, a virtual circuit is forged and
494  * maintained to the block device server via DMSG.  Temporary failures
495  * stall the device until successfully reconnected or explicitly destroyed.
496  */
497 static
498 void
499 hammer2_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
500 {
501         struct service_node_opaque *info = *opaquep;
502
503         switch(op) {
504         case DMSG_NODEOP_ADD:
505                 if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
506                         break;
507                 if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
508                         break;
509                 if (info == NULL) {
510                         info = malloc(sizeof(*info));
511                         bzero(info, sizeof(*info));
512                         *opaquep = info;
513                 }
514                 snprintf(info->cl_label, sizeof(info->cl_label),
515                          "%s", msg->any.lnk_span.cl_label);
516                 snprintf(info->fs_label, sizeof(info->fs_label),
517                          "%s", msg->any.lnk_span.fs_label);
518                 info->block = msg->any.lnk_span.media.block;
519                 fprintf(stderr, "NODE ADD %s serno %s\n",
520                         info->cl_label, info->fs_label);
521                 info->attached = 1;
522                 xdisk_reconnect(info);
523                 break;
524         case DMSG_NODEOP_DEL:
525                 if (info) {
526                         fprintf(stderr, "NODE DEL %s serno %s\n",
527                                 info->cl_label, info->fs_label);
528                         pthread_mutex_lock(&diskmtx);
529                         *opaquep = NULL;
530                         info->attached = 0;
531                         if (info->servicing == 0)
532                                 free(info);
533                         else
534                                 shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
535                         pthread_mutex_unlock(&diskmtx);
536                 }
537                 break;
538         default:
539                 break;
540         }
541 }
542
543 /*
544  * Monitor block devices.  Currently polls every ~10 seconds or so.
545  */
546 static
547 void *
548 udev_thread(void *data __unused)
549 {
550         int     fd;
551         int     seq = 0;
552
553         pthread_detach(pthread_self());
554
555         if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
556                 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
557                         UDEV_DEVICE_PATH);
558                 pthread_exit(NULL);
559         }
560         udev_check_disks();
561         while (ioctl(fd, UDEVWAIT, &seq) == 0) {
562                 udev_check_disks();
563                 sleep(1);
564         }
565         return (NULL);
566 }
567
568 static void *autoconn_connect_thread(void *data);
569 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
570
571 static
572 void *
573 autoconn_thread(void *data __unused)
574 {
575         TAILQ_HEAD(, autoconn) autolist;
576         struct autoconn *ac;
577         struct autoconn *next;
578         pthread_t thread;
579         struct stat st;
580         time_t  t;
581         time_t  lmod;
582         int     found_last;
583         FILE    *fp;
584         char    buf[256];
585
586         TAILQ_INIT(&autolist);
587         found_last = 0;
588         lmod = 0;
589
590         pthread_detach(pthread_self());
591         for (;;) {
592                 /*
593                  * Polling interval
594                  */
595                 sleep(5);
596
597                 /*
598                  * Poll the file.  Loop up if the synchronized state (lmod)
599                  * has not changed.
600                  */
601                 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
602                         if (lmod == st.st_mtime)
603                                 continue;
604                         fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
605                         if (fp == NULL)
606                                 continue;
607                 } else {
608                         if (lmod == 0)
609                                 continue;
610                         fp = NULL;
611                 }
612
613                 /*
614                  * Wait at least 5 seconds after the file is created or
615                  * removed.
616                  *
617                  * Do not update the synchronized state.
618                  */
619                 if (fp == NULL && found_last) {
620                         found_last = 0;
621                         continue;
622                 } else if (fp && found_last == 0) {
623                         fclose(fp);
624                         found_last = 1;
625                         continue;
626                 }
627
628                 /*
629                  * Don't scan the file until the time progresses past the
630                  * file's mtime, so we can validate that the file was not
631                  * further modified during our scan.
632                  *
633                  * Do not update the synchronized state.
634                  */
635                 time(&t);
636                 if (fp) {
637                         if (t == st.st_mtime) {
638                                 fclose(fp);
639                                 continue;
640                         }
641                         t = st.st_mtime;
642                 } else {
643                         t = 0;
644                 }
645
646                 /*
647                  * Set staging to disconnect, then scan the file.
648                  */
649                 TAILQ_FOREACH(ac, &autolist, entry)
650                         ac->stage = 0;
651                 while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
652                         char *host;
653
654                         if ((host = strtok(buf, " \t\r\n")) == NULL ||
655                             host[0] == '#') {
656                                 continue;
657                         }
658                         TAILQ_FOREACH(ac, &autolist, entry) {
659                                 if (strcmp(host, ac->host) == 0)
660                                         break;
661                         }
662                         if (ac == NULL) {
663                                 ac = malloc(sizeof(*ac));
664                                 bzero(ac, sizeof(*ac));
665                                 ac->host = strdup(host);
666                                 ac->state = AUTOCONN_INACTIVE;
667                                 TAILQ_INSERT_TAIL(&autolist, ac, entry);
668                         }
669                         ac->stage = 1;
670                 }
671
672                 /*
673                  * Ignore the scan (and retry again) if the file was
674                  * modified during the scan.
675                  *
676                  * Do not update the synchronized state.
677                  */
678                 if (fp) {
679                         if (fstat(fileno(fp), &st) < 0) {
680                                 fclose(fp);
681                                 continue;
682                         }
683                         fclose(fp);
684                         if (t != st.st_mtime)
685                                 continue;
686                 }
687
688                 /*
689                  * Update the synchronized state and reconfigure the
690                  * connect list as needed.
691                  */
692                 lmod = t;
693                 next = TAILQ_FIRST(&autolist);
694                 while ((ac = next) != NULL) {
695                         next = TAILQ_NEXT(ac, entry);
696
697                         /*
698                          * Staging, initiate
699                          */
700                         if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
701                                 if (pipe(ac->pipefd) == 0) {
702                                         ac->stopme = 0;
703                                         ac->state = AUTOCONN_ACTIVE;
704                                         thread = NULL;
705                                         pthread_create(&thread, NULL,
706                                                        autoconn_connect_thread,
707                                                        ac);
708                                 }
709                         }
710
711                         /*
712                          * Unstaging, stop active connection.
713                          *
714                          * We write to the pipe which causes the iocom_core
715                          * to call autoconn_disconnect_signal().
716                          */
717                         if (ac->stage == 0 &&
718                             ac->state == AUTOCONN_ACTIVE) {
719                                 if (ac->stopme == 0) {
720                                         char dummy = 0;
721                                         ac->stopme = 1;
722                                         write(ac->pipefd[1], &dummy, 1);
723                                 }
724                         }
725
726                         /*
727                          * Unstaging, delete inactive connection.
728                          */
729                         if (ac->stage == 0 &&
730                             ac->state == AUTOCONN_INACTIVE) {
731                                 TAILQ_REMOVE(&autolist, ac, entry);
732                                 free(ac->host);
733                                 free(ac);
734                                 continue;
735                         }
736                 }
737                 sleep(5);
738         }
739         return(NULL);
740 }
741
742 static
743 void *
744 autoconn_connect_thread(void *data)
745 {
746         dmsg_master_service_info_t *info;
747         struct autoconn *ac;
748         void *res;
749         int fd;
750
751         ac = data;
752         pthread_detach(pthread_self());
753
754         while (ac->stopme == 0) {
755                 fd = dmsg_connect(ac->host);
756                 if (fd < 0) {
757                         fprintf(stderr, "autoconn: Connect failure: %s\n",
758                                 ac->host);
759                         sleep(5);
760                         continue;
761                 }
762                 fprintf(stderr, "autoconn: Connect %s\n", ac->host);
763
764                 info = malloc(sizeof(*info));
765                 bzero(info, sizeof(*info));
766                 info->fd = fd;
767                 info->altfd = ac->pipefd[0];
768                 info->altmsg_callback = autoconn_disconnect_signal;
769                 info->usrmsg_callback = hammer2_usrmsg_handler;
770                 info->detachme = 0;
771                 info->noclosealt = 1;
772                 pthread_create(&ac->thread, NULL, dmsg_master_service, info);
773                 pthread_join(ac->thread, &res);
774         }
775         close(ac->pipefd[0]);
776         ac->state = AUTOCONN_INACTIVE;
777         /* auto structure can be ripped out here */
778         return(NULL);
779 }
780
781 static
782 void
783 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
784 {
785         fprintf(stderr, "autoconn: Shutting down socket\n");
786         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
787 }
788
789 /*
790  * Retrieve the list of disk attachments and attempt to export
791  * them.
792  */
793 static
794 void
795 udev_check_disks(void)
796 {
797         char tmpbuf[1024];
798         char *buf = NULL;
799         char *disk;
800         int error;
801         size_t n;
802
803         for (;;) {
804                 n = 0;
805                 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
806                 if (error < 0 || n == 0)
807                         break;
808                 if (n >= sizeof(tmpbuf))
809                         buf = malloc(n + 1);
810                 else
811                         buf = tmpbuf;
812                 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
813                 if (error == 0) {
814                         buf[n] = 0;
815                         break;
816                 }
817                 if (buf != tmpbuf) {
818                         free(buf);
819                         buf = NULL;
820                 }
821                 if (errno != ENOMEM)
822                         break;
823         }
824         if (buf) {
825                 fprintf(stderr, "DISKS: %s\n", buf);
826                 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
827                         disk_reconnect(disk);
828                 }
829                 if (buf != tmpbuf)
830                         free(buf);
831         }
832 }
833
834 /*
835  * Normally the mount program supplies a cluster communications
836  * descriptor to the hammer2 vfs on mount, but if you kill the service
837  * daemon and restart it that link will be lost.
838  *
839  * This procedure attempts to [re]connect to existing mounts when
840  * the service daemon is started up before going into its accept
841  * loop.
842  *
843  * NOTE: A hammer2 mount point can only accomodate one connection at a time
844  *       so this will disconnect any existing connection during the
845  *       reconnect.
846  */
847 static
848 void
849 master_reconnect(const char *mntpt)
850 {
851         struct hammer2_ioc_recluster recls;
852         dmsg_master_service_info_t *info;
853         pthread_t thread;
854         int fd;
855         int pipefds[2];
856
857         fd = open(mntpt, O_RDONLY);
858         if (fd < 0) {
859                 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
860                 return;
861         }
862         if (pipe(pipefds) < 0) {
863                 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
864                 close(fd);
865                 return;
866         }
867         bzero(&recls, sizeof(recls));
868         recls.fd = pipefds[0];
869         if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
870                 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
871                 close(pipefds[0]);
872                 close(pipefds[1]);
873                 close(fd);
874                 return;
875         }
876         close(pipefds[0]);
877         close(fd);
878
879         info = malloc(sizeof(*info));
880         bzero(info, sizeof(*info));
881         info->fd = pipefds[1];
882         info->detachme = 1;
883         info->usrmsg_callback = hammer2_usrmsg_handler;
884         info->node_handler = hammer2_node_handler;
885         info->label = strdup("hammer2");
886         pthread_create(&thread, NULL, dmsg_master_service, info);
887 }
888
889 /*
890  * Reconnect a physical disk service to the mesh.
891  */
892 static
893 void
894 disk_reconnect(const char *disk)
895 {
896         struct disk_ioc_recluster recls;
897         struct diskcon *dc;
898         dmsg_master_service_info_t *info;
899         pthread_t thread;
900         int fd;
901         int pipefds[2];
902         char *path;
903
904         /*
905          * Urm, this will auto-create mdX+1, just ignore for now.
906          * This mechanic needs to be fixed.  It might actually be nice
907          * to be able to export md disks.
908          */
909         if (strncmp(disk, "md", 2) == 0)
910                 return;
911         if (strncmp(disk, "xa", 2) == 0)
912                 return;
913
914         /*
915          * Check if already connected
916          */
917         pthread_mutex_lock(&diskmtx);
918         TAILQ_FOREACH(dc, &diskconq, entry) {
919                 if (strcmp(dc->disk, disk) == 0)
920                         break;
921         }
922         pthread_mutex_unlock(&diskmtx);
923         if (dc)
924                 return;
925
926         /*
927          * Not already connected, create a connection to the kernel
928          * disk driver.
929          */
930         asprintf(&path, "/dev/%s", disk);
931         fd = open(path, O_RDONLY);
932         if (fd < 0) {
933                 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
934                 free(path);
935                 return;
936         }
937         free(path);
938         if (pipe(pipefds) < 0) {
939                 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
940                 close(fd);
941                 return;
942         }
943         bzero(&recls, sizeof(recls));
944         recls.fd = pipefds[0];
945         if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
946                 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
947                 close(pipefds[0]);
948                 close(pipefds[1]);
949                 close(fd);
950                 return;
951         }
952         close(pipefds[0]);
953         close(fd);
954
955         dc = malloc(sizeof(*dc));
956         dc->disk = strdup(disk);
957         pthread_mutex_lock(&diskmtx);
958         TAILQ_INSERT_TAIL(&diskconq, dc, entry);
959         pthread_mutex_unlock(&diskmtx);
960
961         info = malloc(sizeof(*info));
962         bzero(info, sizeof(*info));
963         info->fd = pipefds[1];
964         info->detachme = 1;
965         info->usrmsg_callback = hammer2_usrmsg_handler;
966         info->node_handler = hammer2_node_handler;
967         info->exit_callback = disk_disconnect;
968         info->handle = dc;
969         info->label = strdup(dc->disk);
970         pthread_create(&thread, NULL, dmsg_master_service, info);
971 }
972
973 static
974 void
975 disk_disconnect(void *handle)
976 {
977         struct diskcon *dc = handle;
978
979         fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
980
981         pthread_mutex_lock(&diskmtx);
982         TAILQ_REMOVE(&diskconq, dc, entry);
983         pthread_mutex_unlock(&diskmtx);
984         free(dc->disk);
985         free(dc);
986 }
987
988 /*
989  * [re]connect a remote disk service to the local system via /dev/xdisk.
990  */
991 static
992 void
993 xdisk_reconnect(struct service_node_opaque *xdisk)
994 {
995         struct xdisk_attach_ioctl *xaioc;
996         dmsg_master_service_info_t *info;
997         pthread_t thread;
998         int pipefds[2];
999
1000         if (pipe(pipefds) < 0) {
1001                 fprintf(stderr, "reconnect %s: pipe() failed\n",
1002                         xdisk->cl_label);
1003                 return;
1004         }
1005
1006         info = malloc(sizeof(*info));
1007         bzero(info, sizeof(*info));
1008         info->fd = pipefds[1];
1009         info->detachme = 1;
1010         info->usrmsg_callback = hammer2_usrmsg_handler;
1011         info->node_handler = hammer2_node_handler;
1012         info->exit_callback = xdisk_disconnect;
1013         info->handle = xdisk;
1014         xdisk->servicing = 1;
1015         xdisk->servicefd = info->fd;
1016         info->label = strdup(xdisk->cl_label);
1017         pthread_create(&thread, NULL, dmsg_master_service, info);
1018
1019         /*
1020          * We have to run the attach in its own pthread because it will
1021          * synchronously interact with the messaging subsystem over the
1022          * pipe.  If we do it here we will deadlock.
1023          */
1024         xaioc = malloc(sizeof(*xaioc));
1025         bzero(xaioc, sizeof(*xaioc));
1026         snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
1027                  "%s", xdisk->cl_label);
1028         snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
1029                  "X-%s", xdisk->fs_label);
1030         xaioc->bytes = xdisk->block.bytes;
1031         xaioc->blksize = xdisk->block.blksize;
1032         xaioc->fd = pipefds[0];
1033
1034         pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
1035 }
1036
1037 static
1038 void *
1039 xdisk_attach_tmpthread(void *data)
1040 {
1041         struct xdisk_attach_ioctl *xaioc = data;
1042         int fd;
1043
1044         pthread_detach(pthread_self());
1045
1046         fd = open("/dev/xdisk", O_RDWR, 0600);
1047         if (fd < 0) {
1048                 fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
1049         }
1050         if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
1051                 fprintf(stderr, "reconnect %s: xdisk attach failed\n",
1052                         xaioc->cl_label);
1053         }
1054         close(xaioc->fd);
1055         close(fd);
1056         return (NULL);
1057 }
1058
1059 static
1060 void
1061 xdisk_disconnect(void *handle)
1062 {
1063         struct service_node_opaque *info = handle;
1064
1065         assert(info->servicing == 1);
1066
1067         pthread_mutex_lock(&diskmtx);
1068         info->servicing = 0;
1069         if (info->attached == 0)
1070                 free(info);
1071         pthread_mutex_unlock(&diskmtx);
1072 }