Merge branch 'vendor/DIFFUTILS'
[dragonfly.git] / sbin / hammer2 / cmd_service.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
40
41 struct hammer2_media_config {
42         hammer2_volconf_t       copy_run;
43         hammer2_volconf_t       copy_pend;
44         pthread_t               thread;
45         pthread_cond_t          cond;
46         int                     ctl;
47         int                     fd;
48         int                     pipefd[2];      /* signal stop */
49         dmsg_iocom_t            iocom;
50         pthread_t               iocom_thread;
51         enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
52 };
53
54 typedef struct hammer2_media_config hammer2_media_config_t;
55
56 #define H2CONFCTL_STOP          0x00000001
57 #define H2CONFCTL_UPDATE        0x00000002
58
59 struct diskcon {
60         TAILQ_ENTRY(diskcon) entry;
61         char    *disk;
62 };
63
64 struct service_node_opaque {
65         char    cl_label[64];
66         char    fs_label[64];
67         dmsg_media_block_t block;
68         int     attached;
69         int     servicing;
70         int     servicefd;
71 };
72
73 struct autoconn {
74         TAILQ_ENTRY(autoconn) entry;
75         char    *host;
76         int     stage;
77         int     stopme;
78         int     pipefd[2];      /* {read,write} */
79         enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
80         pthread_t thread;
81 };
82
83 #define WS " \r\n"
84
85 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
86 static pthread_mutex_t diskmtx;
87 static pthread_mutex_t confmtx;
88
89 static void *service_thread(void *data);
90 static void *udev_thread(void *data);
91 static void *autoconn_thread(void *data);
92 static void master_reconnect(const char *mntpt);
93 static void disk_reconnect(const char *disk);
94 static void disk_disconnect(void *handle);
95 static void udev_check_disks(void);
96 static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged);
97 static void *hammer2_volconf_thread(void *info);
98 static void hammer2_volconf_signal(dmsg_iocom_t *iocom);
99 static void hammer2_volconf_start(hammer2_media_config_t *conf,
100                         const char *hostname);
101 static void hammer2_volconf_stop(hammer2_media_config_t *conf);
102
103
104 static void xdisk_connect(void);
105
106 /*
107  * Start-up the master listener daemon for the machine.  This daemon runs
108  * a UDP discovery protocol, a TCP rendezvous, and scans certain files
109  * and directories for work.
110  *
111  * --
112  *
113  * The only purpose for the UDP discovery protocol is to determine what
114  * other IPs on the LAN are running the hammer2 service daemon.  DNS is not
115  * required to operate, but hostnames (if assigned) must be unique.  If
116  * no hostname is assigned the host's IP is used as the name.  This name
117  * is broadcast along with the mtime of the originator's private key.
118  *
119  * Receiving hammer2 service daemons which are able to match the label against
120  * /etc/hammer2/remote/<label>.pub will initiate a persistent connection
121  * to the target.  Removal of the file will cause a disconnection.  A failed
122  * public key negotiation stops further connection attempts until either the
123  * file is updated or the remote mtime is updated.
124  *
125  * Generally speaking this results in a web of connections, typically a
126  * combination of point-to-point for the more important links and relayed
127  * (spanning tree) for less important or filtered links.
128  *
129  * --
130  *
131  * The TCP listener serves as a rendezvous point in the cluster, accepting
132  * connections, performing registrations and authentications, maintaining
133  * the spanning tree, and keeping track of message state so disconnects can
134  * be handled properly.
135  *
136  * Once authenticated only low-level messaging protocols (which includes
137  * tracking persistent messages) are handled by this daemon.  This daemon
138  * does not run the higher level quorum or locking protocols.
139  *
140  * --
141  *
142  * The file /etc/hammer2/autoconn, if it exists, contains a list of targets
143  * to connect to (which do not have to be on the local lan).  This list will
144  * be retried until a connection can be established.  The file is not usually
145  * needed for linkages local to the LAN.
146  */
147 int
148 cmd_service(void)
149 {
150         struct sockaddr_in lsin;
151         int on;
152         int lfd;
153
154         /*
155          * Acquire socket and set options
156          */
157         if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
158                 fprintf(stderr, "master_listen: socket(): %s\n",
159                         strerror(errno));
160                 return 1;
161         }
162         on = 1;
163         setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
164
165         /*
166          * Setup listen port and try to bind.  If the bind fails we assume
167          * that a master listener process is already running and silently
168          * fail.
169          */
170         bzero(&lsin, sizeof(lsin));
171         lsin.sin_family = AF_INET;
172         lsin.sin_addr.s_addr = INADDR_ANY;
173         lsin.sin_port = htons(DMSG_LISTEN_PORT);
174         if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
175                 close(lfd);
176                 if (QuietOpt == 0) {
177                         fprintf(stderr,
178                                 "master listen: daemon already running\n");
179                 }
180                 return 0;
181         }
182         if (QuietOpt == 0)
183                 fprintf(stderr, "master listen: startup\n");
184         listen(lfd, 50);
185
186         /*
187          * Fork and disconnect the controlling terminal and parent process,
188          * executing the specified function as a pthread.
189          *
190          * Returns to the original process which can then continue running.
191          * In debug mode this call will create the pthread without forking
192          * and set NormalExit to 0, instead of fork.
193          */
194         hammer2_demon(service_thread, (void *)(intptr_t)lfd);
195         if (NormalExit)
196                 close(lfd);
197         return 0;
198 }
199
200 /*
201  * Master listen/accept thread.  Accept connections on the master socket,
202  * starting a pthread for each one.
203  */
204 static
205 void *
206 service_thread(void *data)
207 {
208         struct sockaddr_in asin;
209         socklen_t alen;
210         pthread_t thread;
211         dmsg_master_service_info_t *info;
212         int lfd = (int)(intptr_t)data;
213         int fd;
214         int i;
215         int count;
216         int opt;
217         struct statfs *mntbuf = NULL;
218         struct statvfs *mntvbuf = NULL;
219
220         /*
221          * Nobody waits for us
222          */
223         setproctitle("hammer2 master listen");
224         pthread_detach(pthread_self());
225
226         /*
227          * Start up a thread to handle block device monitoring for
228          * export to the cluster.
229          */
230         thread = NULL;
231         pthread_create(&thread, NULL, udev_thread, NULL);
232
233         /*
234          * Start up a thread to tie /dev/xdisk into the cluster
235          * controller.
236          */
237         xdisk_connect();
238
239         /*
240          * Start thread to manage /etc/hammer2/autoconn
241          */
242         thread = NULL;
243         pthread_create(&thread, NULL, autoconn_thread, NULL);
244
245         /*
246          * Scan existing hammer2 mounts and reconnect to them using
247          * HAMMER2IOC_RECLUSTER.
248          */
249         count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
250         for (i = 0; i < count; ++i) {
251                 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
252                         master_reconnect(mntbuf[i].f_mntonname);
253         }
254
255         /*
256          * Accept connections and create pthreads to handle them after
257          * validating the IP.
258          */
259         for (;;) {
260                 alen = sizeof(asin);
261                 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
262                 if (fd < 0) {
263                         if (errno == EINTR)
264                                 continue;
265                         break;
266                 }
267                 opt = 1;
268                 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof opt);
269                 thread = NULL;
270                 fprintf(stderr, "service_thread: accept fd %d\n", fd);
271                 info = malloc(sizeof(*info));
272                 bzero(info, sizeof(*info));
273                 info->fd = fd;
274                 info->detachme = 1;
275                 info->usrmsg_callback = hammer2_usrmsg_handler;
276                 info->label = strdup("client");
277                 pthread_create(&thread, NULL, dmsg_master_service, info);
278         }
279         return (NULL);
280 }
281
282 /*
283  * Handle/Monitor the dmsg stream.  If unmanaged is set we are responsible
284  * for responding for the message, otherwise if it is not set libdmsg has
285  * already done some preprocessing and will respond to the message for us
286  * when we return.
287  *
288  * We primarily monitor for VOLCONFs
289  */
290 static
291 void
292 hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged)
293 {
294         dmsg_state_t *state;
295         hammer2_media_config_t *conf;
296         dmsg_lnk_hammer2_volconf_t *msgconf;
297         int i;
298
299         /*
300          * Only process messages which are part of a LNK_CONN stream
301          */
302         state = msg->state;
303         if (state == NULL ||
304             (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) {
305                 hammer2_shell_parse(msg, unmanaged);
306                 return;
307         }
308
309         switch(msg->tcmd) {
310         case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
311         case DMSG_LNK_CONN | DMSGF_DELETE:
312         case DMSG_LNK_ERROR | DMSGF_DELETE:
313                 /*
314                  * Deleting connection, clean out all volume configs
315                  */
316                 if (state->media == NULL || state->media->usrhandle == NULL)
317                         break;
318                 conf = state->media->usrhandle;
319                 fprintf(stderr, "Shutting down media spans\n");
320                 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
321                         if (conf[i].thread) {
322                                 conf[i].ctl = H2CONFCTL_STOP;
323                                 pthread_cond_signal(&conf[i].cond);
324                         }
325                 }
326                 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
327                         if (conf[i].thread) {
328                                 pthread_join(conf[i].thread, NULL);
329                                 conf->thread = NULL;
330                                 pthread_cond_destroy(&conf[i].cond);
331                         }
332                 }
333                 state->media->usrhandle = NULL;
334                 free(conf);
335                 break;
336         case DMSG_LNK_HAMMER2_VOLCONF:
337                 /*
338                  * One-way volume-configuration message is transmitted
339                  * over the open LNK_CONN transaction.
340                  */
341                 fprintf(stderr, "RECEIVED VOLCONF\n");
342
343                 if ((conf = state->media->usrhandle) == NULL) {
344                         conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT);
345                         bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT);
346                         state->media->usrhandle = conf;
347                 }
348                 msgconf = H2_LNK_VOLCONF(msg);
349
350                 if (msgconf->index < 0 ||
351                     msgconf->index >= HAMMER2_COPYID_COUNT) {
352                         fprintf(stderr,
353                                 "VOLCONF: ILLEGAL INDEX %d\n",
354                                 msgconf->index);
355                         break;
356                 }
357                 if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 ||
358                     msgconf->copy.path[0] == 0) {
359                         fprintf(stderr,
360                                 "VOLCONF: ILLEGAL PATH %d\n",
361                                 msgconf->index);
362                         break;
363                 }
364                 conf += msgconf->index;
365                 pthread_mutex_lock(&confmtx);
366                 conf->copy_pend = msgconf->copy;
367                 conf->ctl |= H2CONFCTL_UPDATE;
368                 pthread_mutex_unlock(&confmtx);
369                 if (conf->thread == NULL) {
370                         fprintf(stderr, "VOLCONF THREAD STARTED\n");
371                         pthread_cond_init(&conf->cond, NULL);
372                         pthread_create(&conf->thread, NULL,
373                                        hammer2_volconf_thread, (void *)conf);
374                 }
375                 pthread_cond_signal(&conf->cond);
376                 break;
377         default:
378                 if (unmanaged)
379                         dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
380                 break;
381         }
382 }
383
384 static void *
385 hammer2_volconf_thread(void *info)
386 {
387         hammer2_media_config_t *conf = info;
388
389         setproctitle("hammer2 volconf");
390
391         pthread_mutex_lock(&confmtx);
392         while ((conf->ctl & H2CONFCTL_STOP) == 0) {
393                 if (conf->ctl & H2CONFCTL_UPDATE) {
394                         fprintf(stderr, "VOLCONF UPDATE\n");
395                         conf->ctl &= ~H2CONFCTL_UPDATE;
396                         if (bcmp(&conf->copy_run, &conf->copy_pend,
397                                  sizeof(conf->copy_run)) == 0) {
398                                 fprintf(stderr, "VOLCONF: no changes\n");
399                                 continue;
400                         }
401                         /*
402                          * XXX TODO - auto reconnect on lookup failure or
403                          *              connect failure or stream failure.
404                          */
405
406                         pthread_mutex_unlock(&confmtx);
407                         hammer2_volconf_stop(conf);
408                         conf->copy_run = conf->copy_pend;
409                         if (conf->copy_run.copyid != 0 &&
410                             strncmp(conf->copy_run.path, "span:", 5) == 0) {
411                                 hammer2_volconf_start(conf,
412                                                       conf->copy_run.path + 5);
413                         }
414                         pthread_mutex_lock(&confmtx);
415                         fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
416                 }
417                 if (conf->state == H2MC_CONNECT) {
418                         hammer2_volconf_start(conf, conf->copy_run.path + 5);
419                         pthread_mutex_unlock(&confmtx);
420                         sleep(5);
421                         pthread_mutex_lock(&confmtx);
422                 } else {
423                         pthread_cond_wait(&conf->cond, &confmtx);
424                 }
425         }
426         pthread_mutex_unlock(&confmtx);
427         hammer2_volconf_stop(conf);
428         return(NULL);
429 }
430
431 static
432 void
433 hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname)
434 {
435         dmsg_master_service_info_t *info;
436
437         switch(conf->state) {
438         case H2MC_STOPPED:
439         case H2MC_CONNECT:
440                 conf->fd = dmsg_connect(hostname);
441                 if (conf->fd < 0) {
442                         fprintf(stderr, "Unable to connect to %s\n", hostname);
443                         conf->state = H2MC_CONNECT;
444                 } else if (pipe(conf->pipefd) < 0) {
445                         close(conf->fd);
446                         fprintf(stderr, "pipe() failed during volconf\n");
447                         conf->state = H2MC_CONNECT;
448                 } else {
449                         fprintf(stderr, "VOLCONF CONNECT\n");
450                         info = malloc(sizeof(*info));
451                         bzero(info, sizeof(*info));
452                         info->fd = conf->fd;
453                         info->altfd = conf->pipefd[0];
454                         info->altmsg_callback = hammer2_volconf_signal;
455                         info->usrmsg_callback = hammer2_usrmsg_handler;
456                         info->detachme = 0;
457                         conf->state = H2MC_RUNNING;
458                         pthread_create(&conf->iocom_thread, NULL,
459                                        dmsg_master_service, info);
460                 }
461                 break;
462         case H2MC_RUNNING:
463                 break;
464         }
465 }
466
467 static
468 void
469 hammer2_volconf_stop(hammer2_media_config_t *conf)
470 {
471         switch(conf->state) {
472         case H2MC_STOPPED:
473                 break;
474         case H2MC_CONNECT:
475                 conf->state = H2MC_STOPPED;
476                 break;
477         case H2MC_RUNNING:
478                 close(conf->pipefd[1]);
479                 conf->pipefd[1] = -1;
480                 pthread_join(conf->iocom_thread, NULL);
481                 conf->iocom_thread = NULL;
482                 conf->state = H2MC_STOPPED;
483                 break;
484         }
485 }
486
487 static
488 void
489 hammer2_volconf_signal(dmsg_iocom_t *iocom)
490 {
491         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
492 }
493
494 /*
495  * Monitor block devices.  Currently polls every ~10 seconds or so.
496  */
497 static
498 void *
499 udev_thread(void *data __unused)
500 {
501         int     fd;
502         int     seq = 0;
503
504         pthread_detach(pthread_self());
505         setproctitle("hammer2 udev_thread");
506
507         if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
508                 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
509                         UDEV_DEVICE_PATH);
510                 pthread_exit(NULL);
511         }
512         udev_check_disks();
513         while (ioctl(fd, UDEVWAIT, &seq) == 0) {
514                 udev_check_disks();
515                 sleep(1);
516         }
517         return (NULL);
518 }
519
520 static void *autoconn_connect_thread(void *data);
521 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
522
523 static
524 void *
525 autoconn_thread(void *data __unused)
526 {
527         TAILQ_HEAD(, autoconn) autolist;
528         struct autoconn *ac;
529         struct autoconn *next;
530         pthread_t thread;
531         struct stat st;
532         time_t  t;
533         time_t  lmod;
534         int     found_last;
535         FILE    *fp;
536         char    buf[256];
537
538         TAILQ_INIT(&autolist);
539         found_last = 0;
540         lmod = 0;
541
542         pthread_detach(pthread_self());
543         setproctitle("hammer2 autoconn_thread");
544         for (;;) {
545                 /*
546                  * Polling interval
547                  */
548                 sleep(5);
549
550                 /*
551                  * Poll the file.  Loop up if the synchronized state (lmod)
552                  * has not changed.
553                  */
554                 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
555                         if (lmod == st.st_mtime)
556                                 continue;
557                         fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
558                         if (fp == NULL)
559                                 continue;
560                 } else {
561                         if (lmod == 0)
562                                 continue;
563                         fp = NULL;
564                 }
565
566                 /*
567                  * Wait at least 5 seconds after the file is created or
568                  * removed.
569                  *
570                  * Do not update the synchronized state.
571                  */
572                 if (fp == NULL && found_last) {
573                         found_last = 0;
574                         continue;
575                 } else if (fp && found_last == 0) {
576                         fclose(fp);
577                         found_last = 1;
578                         continue;
579                 }
580
581                 /*
582                  * Don't scan the file until the time progresses past the
583                  * file's mtime, so we can validate that the file was not
584                  * further modified during our scan.
585                  *
586                  * Do not update the synchronized state.
587                  */
588                 time(&t);
589                 if (fp) {
590                         if (t == st.st_mtime) {
591                                 fclose(fp);
592                                 continue;
593                         }
594                         t = st.st_mtime;
595                 } else {
596                         t = 0;
597                 }
598
599                 /*
600                  * Set staging to disconnect, then scan the file.
601                  */
602                 TAILQ_FOREACH(ac, &autolist, entry)
603                         ac->stage = 0;
604                 while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
605                         char *host;
606
607                         if ((host = strtok(buf, " \t\r\n")) == NULL ||
608                             host[0] == '#') {
609                                 continue;
610                         }
611                         TAILQ_FOREACH(ac, &autolist, entry) {
612                                 if (strcmp(host, ac->host) == 0)
613                                         break;
614                         }
615                         if (ac == NULL) {
616                                 ac = malloc(sizeof(*ac));
617                                 bzero(ac, sizeof(*ac));
618                                 ac->host = strdup(host);
619                                 ac->state = AUTOCONN_INACTIVE;
620                                 TAILQ_INSERT_TAIL(&autolist, ac, entry);
621                         }
622                         ac->stage = 1;
623                 }
624
625                 /*
626                  * Ignore the scan (and retry again) if the file was
627                  * modified during the scan.
628                  *
629                  * Do not update the synchronized state.
630                  */
631                 if (fp) {
632                         if (fstat(fileno(fp), &st) < 0) {
633                                 fclose(fp);
634                                 continue;
635                         }
636                         fclose(fp);
637                         if (t != st.st_mtime)
638                                 continue;
639                 }
640
641                 /*
642                  * Update the synchronized state and reconfigure the
643                  * connect list as needed.
644                  */
645                 lmod = t;
646                 next = TAILQ_FIRST(&autolist);
647                 while ((ac = next) != NULL) {
648                         next = TAILQ_NEXT(ac, entry);
649
650                         /*
651                          * Staging, initiate
652                          */
653                         if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
654                                 if (pipe(ac->pipefd) == 0) {
655                                         ac->stopme = 0;
656                                         ac->state = AUTOCONN_ACTIVE;
657                                         thread = NULL;
658                                         pthread_create(&thread, NULL,
659                                                        autoconn_connect_thread,
660                                                        ac);
661                                 }
662                         }
663
664                         /*
665                          * Unstaging, stop active connection.
666                          *
667                          * We write to the pipe which causes the iocom_core
668                          * to call autoconn_disconnect_signal().
669                          */
670                         if (ac->stage == 0 &&
671                             ac->state == AUTOCONN_ACTIVE) {
672                                 if (ac->stopme == 0) {
673                                         char dummy = 0;
674                                         ac->stopme = 1;
675                                         write(ac->pipefd[1], &dummy, 1);
676                                 }
677                         }
678
679                         /*
680                          * Unstaging, delete inactive connection.
681                          */
682                         if (ac->stage == 0 &&
683                             ac->state == AUTOCONN_INACTIVE) {
684                                 TAILQ_REMOVE(&autolist, ac, entry);
685                                 free(ac->host);
686                                 free(ac);
687                                 continue;
688                         }
689                 }
690                 sleep(5);
691         }
692         return(NULL);
693 }
694
695 static
696 void *
697 autoconn_connect_thread(void *data)
698 {
699         dmsg_master_service_info_t *info;
700         struct autoconn *ac;
701         void *res;
702         int fd;
703
704         ac = data;
705         pthread_detach(pthread_self());
706         setproctitle("hammer2 dmsg");
707
708         while (ac->stopme == 0) {
709                 fd = dmsg_connect(ac->host);
710                 if (fd < 0) {
711                         if (DMsgDebugOpt > 2) {
712                                 fprintf(stderr,
713                                         "autoconn: Connect failure: %s\n",
714                                         ac->host);
715                         }
716                         sleep(5);
717                         continue;
718                 }
719                 fprintf(stderr, "autoconn: Connect %s\n", ac->host);
720
721                 info = malloc(sizeof(*info));
722                 bzero(info, sizeof(*info));
723                 info->fd = fd;
724                 info->altfd = ac->pipefd[0];
725                 info->altmsg_callback = autoconn_disconnect_signal;
726                 info->usrmsg_callback = hammer2_usrmsg_handler;
727                 info->detachme = 0;
728                 info->noclosealt = 1;
729                 pthread_create(&ac->thread, NULL, dmsg_master_service, info);
730                 pthread_join(ac->thread, &res);
731         }
732         close(ac->pipefd[0]);
733         ac->state = AUTOCONN_INACTIVE;
734         /* auto structure can be ripped out here */
735         return(NULL);
736 }
737
738 static
739 void
740 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
741 {
742         fprintf(stderr, "autoconn: Shutting down socket\n");
743         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
744 }
745
746 /*
747  * Retrieve the list of disk attachments and attempt to export
748  * them.
749  */
750 static
751 void
752 udev_check_disks(void)
753 {
754         char tmpbuf[1024];
755         char *buf = NULL;
756         char *disk;
757         int error;
758         size_t n;
759
760         for (;;) {
761                 n = 0;
762                 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
763                 if (error < 0 || n == 0)
764                         break;
765                 if (n >= sizeof(tmpbuf))
766                         buf = malloc(n + 1);
767                 else
768                         buf = tmpbuf;
769                 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
770                 if (error == 0) {
771                         buf[n] = 0;
772                         break;
773                 }
774                 if (buf != tmpbuf) {
775                         free(buf);
776                         buf = NULL;
777                 }
778                 if (errno != ENOMEM)
779                         break;
780         }
781         if (buf) {
782                 fprintf(stderr, "DISKS: %s\n", buf);
783                 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
784                         disk_reconnect(disk);
785                 }
786                 if (buf != tmpbuf)
787                         free(buf);
788         }
789 }
790
791 /*
792  * Normally the mount program supplies a cluster communications
793  * descriptor to the hammer2 vfs on mount, but if you kill the service
794  * daemon and restart it that link will be lost.
795  *
796  * This procedure attempts to [re]connect to existing mounts when
797  * the service daemon is started up before going into its accept
798  * loop.
799  *
800  * NOTE: A hammer2 mount point can only accomodate one connection at a time
801  *       so this will disconnect any existing connection during the
802  *       reconnect.
803  */
804 static
805 void
806 master_reconnect(const char *mntpt)
807 {
808         struct hammer2_ioc_recluster recls;
809         dmsg_master_service_info_t *info;
810         pthread_t thread;
811         int fd;
812         int pipefds[2];
813
814         fd = open(mntpt, O_RDONLY);
815         if (fd < 0) {
816                 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
817                 return;
818         }
819         if (pipe(pipefds) < 0) {
820                 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
821                 close(fd);
822                 return;
823         }
824         bzero(&recls, sizeof(recls));
825         recls.fd = pipefds[0];
826         if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
827                 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
828                 close(pipefds[0]);
829                 close(pipefds[1]);
830                 close(fd);
831                 return;
832         }
833         close(pipefds[0]);
834         close(fd);
835
836         info = malloc(sizeof(*info));
837         bzero(info, sizeof(*info));
838         info->fd = pipefds[1];
839         info->detachme = 1;
840         info->usrmsg_callback = hammer2_usrmsg_handler;
841         info->label = strdup("hammer2");
842         pthread_create(&thread, NULL, dmsg_master_service, info);
843 }
844
845 /*
846  * Reconnect a physical disk service to the mesh.
847  */
848 static
849 void
850 disk_reconnect(const char *disk)
851 {
852         struct disk_ioc_recluster recls;
853         struct diskcon *dc;
854         dmsg_master_service_info_t *info;
855         pthread_t thread;
856         int fd;
857         int pipefds[2];
858         char *path;
859
860         /*
861          * Urm, this will auto-create mdX+1, just ignore for now.
862          * This mechanic needs to be fixed.  It might actually be nice
863          * to be able to export md disks.
864          */
865         if (strncmp(disk, "md", 2) == 0)
866                 return;
867         if (strncmp(disk, "xa", 2) == 0)
868                 return;
869
870         /*
871          * Check if already connected
872          */
873         pthread_mutex_lock(&diskmtx);
874         TAILQ_FOREACH(dc, &diskconq, entry) {
875                 if (strcmp(dc->disk, disk) == 0)
876                         break;
877         }
878         pthread_mutex_unlock(&diskmtx);
879         if (dc)
880                 return;
881
882         /*
883          * Not already connected, create a connection to the kernel
884          * disk driver.
885          */
886         asprintf(&path, "/dev/%s", disk);
887         fd = open(path, O_RDONLY);
888         if (fd < 0) {
889                 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
890                 free(path);
891                 return;
892         }
893         free(path);
894         if (pipe(pipefds) < 0) {
895                 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
896                 close(fd);
897                 return;
898         }
899         bzero(&recls, sizeof(recls));
900         recls.fd = pipefds[0];
901         if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
902                 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
903                 close(pipefds[0]);
904                 close(pipefds[1]);
905                 close(fd);
906                 return;
907         }
908         close(pipefds[0]);
909         close(fd);
910
911         dc = malloc(sizeof(*dc));
912         dc->disk = strdup(disk);
913         pthread_mutex_lock(&diskmtx);
914         TAILQ_INSERT_TAIL(&diskconq, dc, entry);
915         pthread_mutex_unlock(&diskmtx);
916
917         info = malloc(sizeof(*info));
918         bzero(info, sizeof(*info));
919         info->fd = pipefds[1];
920         info->detachme = 1;
921         info->usrmsg_callback = hammer2_usrmsg_handler;
922         info->exit_callback = disk_disconnect;
923         info->handle = dc;
924         info->label = strdup(dc->disk);
925         pthread_create(&thread, NULL, dmsg_master_service, info);
926 }
927
928 static
929 void
930 disk_disconnect(void *handle)
931 {
932         struct diskcon *dc = handle;
933
934         fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
935
936         pthread_mutex_lock(&diskmtx);
937         TAILQ_REMOVE(&diskconq, dc, entry);
938         pthread_mutex_unlock(&diskmtx);
939         free(dc->disk);
940         free(dc);
941 }
942
943 /*
944  * Connect our cluster controller to /dev/xdisk.  xdisk will pick up
945  * SPAN messages that we route to it, makes remote block devices
946  * available to the host, and can issue dmsg transactions based on
947  * device requests.
948  */
949 static
950 void
951 xdisk_connect(void)
952 {
953         dmsg_master_service_info_t *info;
954         struct xdisk_attach_ioctl xaioc;
955         pthread_t thread;
956         int pipefds[2];
957         int xfd;
958         int error;
959
960         /*
961          * Is /dev/xdisk available?
962          */
963         xfd = open("/dev/xdisk", O_RDWR, 0600);
964         if (xfd < 0) {
965                 fprintf(stderr, "xdisk_connect: Unable to open /dev/xdisk\n");
966                 return;
967         }
968
969         if (pipe(pipefds) < 0) {
970                 fprintf(stderr, "xdisk_connect: pipe() failed\n");
971                 return;
972         }
973
974         /*
975          * Pipe between cluster controller (this user process).
976          */
977         info = malloc(sizeof(*info));
978         bzero(info, sizeof(*info));
979         info->fd = pipefds[1];
980         info->detachme = 1;
981         info->usrmsg_callback = hammer2_usrmsg_handler;
982         info->exit_callback = NULL;
983         pthread_create(&thread, NULL, dmsg_master_service, info);
984
985         /*
986          * And the xdisk device.
987          */
988         bzero(&xaioc, sizeof(xaioc));
989         xaioc.fd = pipefds[0];
990         error = ioctl(xfd, XDISKIOCATTACH, &xaioc);
991         close(pipefds[0]);
992         close(xfd);
993
994         if (error < 0) {
995                 fprintf(stderr,
996                         "xdisk_connect: cannot attach %s\n",
997                         strerror(errno));
998                 return;
999         }
1000 }