sbin/hammer2: Move hammer2_demon() to cmd_service.c
[dragonfly.git] / sbin / hammer2 / cmd_service.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 #include <sys/xdiskioctl.h>
39 #include <machine/atomic.h>
40
41 struct hammer2_media_config {
42         hammer2_volconf_t       copy_run;
43         hammer2_volconf_t       copy_pend;
44         pthread_t               thread;
45         int                     thread_started;
46         pthread_cond_t          cond;
47         int                     ctl;
48         int                     fd;
49         int                     pipefd[2];      /* signal stop */
50         dmsg_iocom_t            iocom;
51         pthread_t               iocom_thread;
52         enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
53 };
54
55 typedef struct hammer2_media_config hammer2_media_config_t;
56
57 #define H2CONFCTL_STOP          0x00000001
58 #define H2CONFCTL_UPDATE        0x00000002
59
60 struct diskcon {
61         TAILQ_ENTRY(diskcon) entry;
62         char    *disk;
63 };
64
65 struct service_node_opaque {
66         char    cl_label[64];
67         char    fs_label[64];
68         dmsg_media_block_t block;
69         int     attached;
70         int     servicing;
71         int     servicefd;
72 };
73
74 struct autoconn {
75         TAILQ_ENTRY(autoconn) entry;
76         char    *host;
77         int     stage;
78         int     stopme;
79         int     pipefd[2];      /* {read,write} */
80         enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
81         pthread_t thread;
82 };
83
84 #define WS " \r\n"
85
86 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
87 static pthread_mutex_t diskmtx;
88 static pthread_mutex_t confmtx;
89
90 static void *service_thread(void *data);
91 static void *udev_thread(void *data);
92 static void *autoconn_thread(void *data);
93 static void master_reconnect(const char *mntpt);
94 static void disk_reconnect(const char *disk);
95 static void disk_disconnect(void *handle);
96 static void udev_check_disks(void);
97 static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged);
98 static void *hammer2_volconf_thread(void *info);
99 static void hammer2_volconf_signal(dmsg_iocom_t *iocom);
100 static void hammer2_volconf_start(hammer2_media_config_t *conf,
101                         const char *hostname);
102 static void hammer2_volconf_stop(hammer2_media_config_t *conf);
103
104
105 static void xdisk_connect(void);
106
107 /*
108  * Start-up the master listener daemon for the machine.  This daemon runs
109  * a UDP discovery protocol, a TCP rendezvous, and scans certain files
110  * and directories for work.
111  *
112  * --
113  *
114  * The only purpose for the UDP discovery protocol is to determine what
115  * other IPs on the LAN are running the hammer2 service daemon.  DNS is not
116  * required to operate, but hostnames (if assigned) must be unique.  If
117  * no hostname is assigned the host's IP is used as the name.  This name
118  * is broadcast along with the mtime of the originator's private key.
119  *
120  * Receiving hammer2 service daemons which are able to match the label against
121  * /etc/hammer2/remote/<label>.pub will initiate a persistent connection
122  * to the target.  Removal of the file will cause a disconnection.  A failed
123  * public key negotiation stops further connection attempts until either the
124  * file is updated or the remote mtime is updated.
125  *
126  * Generally speaking this results in a web of connections, typically a
127  * combination of point-to-point for the more important links and relayed
128  * (spanning tree) for less important or filtered links.
129  *
130  * --
131  *
132  * The TCP listener serves as a rendezvous point in the cluster, accepting
133  * connections, performing registrations and authentications, maintaining
134  * the spanning tree, and keeping track of message state so disconnects can
135  * be handled properly.
136  *
137  * Once authenticated only low-level messaging protocols (which includes
138  * tracking persistent messages) are handled by this daemon.  This daemon
139  * does not run the higher level quorum or locking protocols.
140  *
141  * --
142  *
143  * The file /etc/hammer2/autoconn, if it exists, contains a list of targets
144  * to connect to (which do not have to be on the local lan).  This list will
145  * be retried until a connection can be established.  The file is not usually
146  * needed for linkages local to the LAN.
147  */
148 int
149 cmd_service(void)
150 {
151         struct sockaddr_in lsin;
152         int on;
153         int lfd;
154
155         /*
156          * Acquire socket and set options
157          */
158         if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
159                 fprintf(stderr, "master_listen: socket(): %s\n",
160                         strerror(errno));
161                 return 1;
162         }
163         on = 1;
164         setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
165
166         /*
167          * Setup listen port and try to bind.  If the bind fails we assume
168          * that a master listener process is already running and silently
169          * fail.
170          */
171         bzero(&lsin, sizeof(lsin));
172         lsin.sin_family = AF_INET;
173         lsin.sin_addr.s_addr = INADDR_ANY;
174         lsin.sin_port = htons(DMSG_LISTEN_PORT);
175         if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
176                 close(lfd);
177                 if (QuietOpt == 0) {
178                         fprintf(stderr,
179                                 "master listen: daemon already running\n");
180                 }
181                 return 0;
182         }
183         if (QuietOpt == 0)
184                 fprintf(stderr, "master listen: startup\n");
185         listen(lfd, 50);
186
187         /*
188          * Fork and disconnect the controlling terminal and parent process,
189          * executing the specified function as a pthread.
190          *
191          * Returns to the original process which can then continue running.
192          * In debug mode this call will create the pthread without forking
193          * and set NormalExit to 0, instead of fork.
194          */
195         hammer2_demon(service_thread, (void *)(intptr_t)lfd);
196         if (NormalExit)
197                 close(lfd);
198         return 0;
199 }
200
201 /*
202  * Master listen/accept thread.  Accept connections on the master socket,
203  * starting a pthread for each one.
204  */
205 static
206 void *
207 service_thread(void *data)
208 {
209         struct sockaddr_in asin;
210         socklen_t alen;
211         pthread_t thread;
212         dmsg_master_service_info_t *info;
213         int lfd = (int)(intptr_t)data;
214         int fd;
215         int i;
216         int count;
217         int opt;
218         struct statfs *mntbuf = NULL;
219         struct statvfs *mntvbuf = NULL;
220
221         /*
222          * Nobody waits for us
223          */
224         setproctitle("hammer2 master listen");
225         pthread_detach(pthread_self());
226
227         /*
228          * Start up a thread to handle block device monitoring for
229          * export to the cluster.
230          */
231         pthread_create(&thread, NULL, udev_thread, NULL);
232
233         /*
234          * Start up a thread to tie /dev/xdisk into the cluster
235          * controller.
236          */
237         xdisk_connect();
238
239         /*
240          * Start thread to manage /etc/hammer2/autoconn
241          */
242         pthread_create(&thread, NULL, autoconn_thread, NULL);
243
244         /*
245          * Scan existing hammer2 mounts and reconnect to them using
246          * HAMMER2IOC_RECLUSTER.
247          */
248         count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
249         for (i = 0; i < count; ++i) {
250                 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
251                         master_reconnect(mntbuf[i].f_mntonname);
252         }
253
254         /*
255          * Accept connections and create pthreads to handle them after
256          * validating the IP.
257          */
258         for (;;) {
259                 alen = sizeof(asin);
260                 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
261                 if (fd < 0) {
262                         if (errno == EINTR)
263                                 continue;
264                         break;
265                 }
266                 opt = 1;
267                 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof opt);
268                 fprintf(stderr, "service_thread: accept fd %d\n", fd);
269                 info = malloc(sizeof(*info));
270                 bzero(info, sizeof(*info));
271                 info->fd = fd;
272                 info->detachme = 1;
273                 info->usrmsg_callback = hammer2_usrmsg_handler;
274                 info->label = strdup("client");
275                 pthread_create(&thread, NULL, dmsg_master_service, info);
276         }
277         return (NULL);
278 }
279
280 /*
281  * Handle/Monitor the dmsg stream.  If unmanaged is set we are responsible
282  * for responding for the message, otherwise if it is not set libdmsg has
283  * already done some preprocessing and will respond to the message for us
284  * when we return.
285  *
286  * We primarily monitor for VOLCONFs
287  */
288 static
289 void
290 hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged)
291 {
292         dmsg_state_t *state;
293         hammer2_media_config_t *conf;
294         dmsg_lnk_hammer2_volconf_t *msgconf;
295         int i;
296
297         /*
298          * Only process messages which are part of a LNK_CONN stream
299          */
300         state = msg->state;
301         if (state == NULL ||
302             (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) {
303                 hammer2_shell_parse(msg, unmanaged);
304                 return;
305         }
306
307         switch(msg->tcmd) {
308         case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
309         case DMSG_LNK_CONN | DMSGF_DELETE:
310         case DMSG_LNK_ERROR | DMSGF_DELETE:
311                 /*
312                  * Deleting connection, clean out all volume configs
313                  */
314                 if (state->media == NULL || state->media->usrhandle == NULL)
315                         break;
316                 conf = state->media->usrhandle;
317                 fprintf(stderr, "Shutting down media spans\n");
318                 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
319                         if (conf[i].thread) {
320                                 conf[i].ctl = H2CONFCTL_STOP;
321                                 pthread_cond_signal(&conf[i].cond);
322                         }
323                 }
324                 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
325                         if (conf[i].thread) {
326                                 pthread_join(conf[i].thread, NULL);
327                                 conf->thread_started = 0;
328                                 pthread_cond_destroy(&conf[i].cond);
329                         }
330                 }
331                 state->media->usrhandle = NULL;
332                 free(conf);
333                 break;
334         case DMSG_LNK_HAMMER2_VOLCONF:
335                 /*
336                  * One-way volume-configuration message is transmitted
337                  * over the open LNK_CONN transaction.
338                  */
339                 fprintf(stderr, "RECEIVED VOLCONF\n");
340
341                 if ((conf = state->media->usrhandle) == NULL) {
342                         conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT);
343                         bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT);
344                         state->media->usrhandle = conf;
345                 }
346                 msgconf = H2_LNK_VOLCONF(msg);
347
348                 if (msgconf->index < 0 ||
349                     msgconf->index >= HAMMER2_COPYID_COUNT) {
350                         fprintf(stderr,
351                                 "VOLCONF: ILLEGAL INDEX %d\n",
352                                 msgconf->index);
353                         break;
354                 }
355                 if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 ||
356                     msgconf->copy.path[0] == 0) {
357                         fprintf(stderr,
358                                 "VOLCONF: ILLEGAL PATH %d\n",
359                                 msgconf->index);
360                         break;
361                 }
362                 conf += msgconf->index;
363                 pthread_mutex_lock(&confmtx);
364                 conf->copy_pend = msgconf->copy;
365                 conf->ctl |= H2CONFCTL_UPDATE;
366                 pthread_mutex_unlock(&confmtx);
367                 if (conf->thread_started == 0) {
368                         fprintf(stderr, "VOLCONF THREAD STARTED\n");
369                         pthread_cond_init(&conf->cond, NULL);
370                         pthread_create(&conf->thread, NULL,
371                                        hammer2_volconf_thread, (void *)conf);
372                         conf->thread_started = 1;
373                 }
374                 pthread_cond_signal(&conf->cond);
375                 break;
376         default:
377                 if (unmanaged)
378                         dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
379                 break;
380         }
381 }
382
383 static void *
384 hammer2_volconf_thread(void *info)
385 {
386         hammer2_media_config_t *conf = info;
387
388         setproctitle("hammer2 volconf");
389
390         pthread_mutex_lock(&confmtx);
391         while ((conf->ctl & H2CONFCTL_STOP) == 0) {
392                 if (conf->ctl & H2CONFCTL_UPDATE) {
393                         fprintf(stderr, "VOLCONF UPDATE\n");
394                         conf->ctl &= ~H2CONFCTL_UPDATE;
395                         if (bcmp(&conf->copy_run, &conf->copy_pend,
396                                  sizeof(conf->copy_run)) == 0) {
397                                 fprintf(stderr, "VOLCONF: no changes\n");
398                                 continue;
399                         }
400                         /*
401                          * XXX TODO - auto reconnect on lookup failure or
402                          *              connect failure or stream failure.
403                          */
404
405                         pthread_mutex_unlock(&confmtx);
406                         hammer2_volconf_stop(conf);
407                         conf->copy_run = conf->copy_pend;
408                         if (conf->copy_run.copyid != 0 &&
409                             strncmp((char*)conf->copy_run.path, "span:", 5) == 0) {
410                                 hammer2_volconf_start(conf,
411                                                       (char*)conf->copy_run.path + 5);
412                         }
413                         pthread_mutex_lock(&confmtx);
414                         fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
415                 }
416                 if (conf->state == H2MC_CONNECT) {
417                         hammer2_volconf_start(conf, (char*)conf->copy_run.path + 5);
418                         pthread_mutex_unlock(&confmtx);
419                         sleep(5);
420                         pthread_mutex_lock(&confmtx);
421                 } else {
422                         pthread_cond_wait(&conf->cond, &confmtx);
423                 }
424         }
425         pthread_mutex_unlock(&confmtx);
426         hammer2_volconf_stop(conf);
427         return(NULL);
428 }
429
430 static
431 void
432 hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname)
433 {
434         dmsg_master_service_info_t *info;
435
436         switch(conf->state) {
437         case H2MC_STOPPED:
438         case H2MC_CONNECT:
439                 conf->fd = dmsg_connect(hostname);
440                 if (conf->fd < 0) {
441                         fprintf(stderr, "Unable to connect to %s\n", hostname);
442                         conf->state = H2MC_CONNECT;
443                 } else if (pipe(conf->pipefd) < 0) {
444                         close(conf->fd);
445                         fprintf(stderr, "pipe() failed during volconf\n");
446                         conf->state = H2MC_CONNECT;
447                 } else {
448                         fprintf(stderr, "VOLCONF CONNECT\n");
449                         info = malloc(sizeof(*info));
450                         bzero(info, sizeof(*info));
451                         info->fd = conf->fd;
452                         info->altfd = conf->pipefd[0];
453                         info->altmsg_callback = hammer2_volconf_signal;
454                         info->usrmsg_callback = hammer2_usrmsg_handler;
455                         info->detachme = 0;
456                         conf->state = H2MC_RUNNING;
457                         pthread_create(&conf->iocom_thread, NULL,
458                                        dmsg_master_service, info);
459                 }
460                 break;
461         case H2MC_RUNNING:
462                 break;
463         }
464 }
465
466 static
467 void
468 hammer2_volconf_stop(hammer2_media_config_t *conf)
469 {
470         switch(conf->state) {
471         case H2MC_STOPPED:
472                 break;
473         case H2MC_CONNECT:
474                 conf->state = H2MC_STOPPED;
475                 break;
476         case H2MC_RUNNING:
477                 close(conf->pipefd[1]);
478                 conf->pipefd[1] = -1;
479                 pthread_join(conf->iocom_thread, NULL);
480                 conf->state = H2MC_STOPPED;
481                 break;
482         }
483 }
484
485 static
486 void
487 hammer2_volconf_signal(dmsg_iocom_t *iocom)
488 {
489         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
490 }
491
492 /*
493  * Monitor block devices.  Currently polls every ~10 seconds or so.
494  */
495 static
496 void *
497 udev_thread(void *data __unused)
498 {
499         int     fd;
500         int     seq = 0;
501
502         pthread_detach(pthread_self());
503         setproctitle("hammer2 udev_thread");
504
505         if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
506                 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
507                         UDEV_DEVICE_PATH);
508                 pthread_exit(NULL);
509         }
510         udev_check_disks();
511         while (ioctl(fd, UDEVWAIT, &seq) == 0) {
512                 udev_check_disks();
513                 sleep(1);
514         }
515         return (NULL);
516 }
517
518 static void *autoconn_connect_thread(void *data);
519 static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
520
521 static
522 void *
523 autoconn_thread(void *data __unused)
524 {
525         TAILQ_HEAD(, autoconn) autolist;
526         struct autoconn *ac;
527         struct autoconn *next;
528         pthread_t thread;
529         struct stat st;
530         time_t  t;
531         time_t  lmod;
532         int     found_last;
533         FILE    *fp;
534         char    buf[256];
535
536         TAILQ_INIT(&autolist);
537         found_last = 0;
538         lmod = 0;
539
540         pthread_detach(pthread_self());
541         setproctitle("hammer2 autoconn_thread");
542         for (;;) {
543                 /*
544                  * Polling interval
545                  */
546                 sleep(5);
547
548                 /*
549                  * Poll the file.  Loop up if the synchronized state (lmod)
550                  * has not changed.
551                  */
552                 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
553                         if (lmod == st.st_mtime)
554                                 continue;
555                         fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
556                         if (fp == NULL)
557                                 continue;
558                 } else {
559                         if (lmod == 0)
560                                 continue;
561                         fp = NULL;
562                 }
563
564                 /*
565                  * Wait at least 5 seconds after the file is created or
566                  * removed.
567                  *
568                  * Do not update the synchronized state.
569                  */
570                 if (fp == NULL && found_last) {
571                         found_last = 0;
572                         continue;
573                 } else if (fp && found_last == 0) {
574                         fclose(fp);
575                         found_last = 1;
576                         continue;
577                 }
578
579                 /*
580                  * Don't scan the file until the time progresses past the
581                  * file's mtime, so we can validate that the file was not
582                  * further modified during our scan.
583                  *
584                  * Do not update the synchronized state.
585                  */
586                 time(&t);
587                 if (fp) {
588                         if (t == st.st_mtime) {
589                                 fclose(fp);
590                                 continue;
591                         }
592                         t = st.st_mtime;
593                 } else {
594                         t = 0;
595                 }
596
597                 /*
598                  * Set staging to disconnect, then scan the file.
599                  */
600                 TAILQ_FOREACH(ac, &autolist, entry)
601                         ac->stage = 0;
602                 while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
603                         char *host;
604
605                         if ((host = strtok(buf, " \t\r\n")) == NULL ||
606                             host[0] == '#') {
607                                 continue;
608                         }
609                         TAILQ_FOREACH(ac, &autolist, entry) {
610                                 if (strcmp(host, ac->host) == 0)
611                                         break;
612                         }
613                         if (ac == NULL) {
614                                 ac = malloc(sizeof(*ac));
615                                 bzero(ac, sizeof(*ac));
616                                 ac->host = strdup(host);
617                                 ac->state = AUTOCONN_INACTIVE;
618                                 TAILQ_INSERT_TAIL(&autolist, ac, entry);
619                         }
620                         ac->stage = 1;
621                 }
622
623                 /*
624                  * Ignore the scan (and retry again) if the file was
625                  * modified during the scan.
626                  *
627                  * Do not update the synchronized state.
628                  */
629                 if (fp) {
630                         if (fstat(fileno(fp), &st) < 0) {
631                                 fclose(fp);
632                                 continue;
633                         }
634                         fclose(fp);
635                         if (t != st.st_mtime)
636                                 continue;
637                 }
638
639                 /*
640                  * Update the synchronized state and reconfigure the
641                  * connect list as needed.
642                  */
643                 lmod = t;
644                 next = TAILQ_FIRST(&autolist);
645                 while ((ac = next) != NULL) {
646                         next = TAILQ_NEXT(ac, entry);
647
648                         /*
649                          * Staging, initiate
650                          */
651                         if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
652                                 if (pipe(ac->pipefd) == 0) {
653                                         ac->stopme = 0;
654                                         ac->state = AUTOCONN_ACTIVE;
655                                         pthread_create(&thread, NULL,
656                                                        autoconn_connect_thread,
657                                                        ac);
658                                 }
659                         }
660
661                         /*
662                          * Unstaging, stop active connection.
663                          *
664                          * We write to the pipe which causes the iocom_core
665                          * to call autoconn_disconnect_signal().
666                          */
667                         if (ac->stage == 0 &&
668                             ac->state == AUTOCONN_ACTIVE) {
669                                 if (ac->stopme == 0) {
670                                         char dummy = 0;
671                                         ac->stopme = 1;
672                                         write(ac->pipefd[1], &dummy, 1);
673                                 }
674                         }
675
676                         /*
677                          * Unstaging, delete inactive connection.
678                          */
679                         if (ac->stage == 0 &&
680                             ac->state == AUTOCONN_INACTIVE) {
681                                 TAILQ_REMOVE(&autolist, ac, entry);
682                                 free(ac->host);
683                                 free(ac);
684                                 continue;
685                         }
686                 }
687                 sleep(5);
688         }
689         return(NULL);
690 }
691
692 static
693 void *
694 autoconn_connect_thread(void *data)
695 {
696         dmsg_master_service_info_t *info;
697         struct autoconn *ac;
698         void *res;
699         int fd;
700
701         ac = data;
702         pthread_detach(pthread_self());
703         setproctitle("hammer2 dmsg");
704
705         while (ac->stopme == 0) {
706                 fd = dmsg_connect(ac->host);
707                 if (fd < 0) {
708                         if (DMsgDebugOpt > 2) {
709                                 fprintf(stderr,
710                                         "autoconn: Connect failure: %s\n",
711                                         ac->host);
712                         }
713                         sleep(5);
714                         continue;
715                 }
716                 fprintf(stderr, "autoconn: Connect %s\n", ac->host);
717
718                 info = malloc(sizeof(*info));
719                 bzero(info, sizeof(*info));
720                 info->fd = fd;
721                 info->altfd = ac->pipefd[0];
722                 info->altmsg_callback = autoconn_disconnect_signal;
723                 info->usrmsg_callback = hammer2_usrmsg_handler;
724                 info->detachme = 0;
725                 info->noclosealt = 1;
726                 pthread_create(&ac->thread, NULL, dmsg_master_service, info);
727                 pthread_join(ac->thread, &res);
728         }
729         close(ac->pipefd[0]);
730         ac->state = AUTOCONN_INACTIVE;
731         /* auto structure can be ripped out here */
732         return(NULL);
733 }
734
735 static
736 void
737 autoconn_disconnect_signal(dmsg_iocom_t *iocom)
738 {
739         fprintf(stderr, "autoconn: Shutting down socket\n");
740         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
741 }
742
743 /*
744  * Retrieve the list of disk attachments and attempt to export
745  * them.
746  */
747 static
748 void
749 udev_check_disks(void)
750 {
751         char tmpbuf[1024];
752         char *buf = NULL;
753         char *disk;
754         int error;
755         size_t n;
756
757         for (;;) {
758                 n = 0;
759                 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
760                 if (error < 0 || n == 0)
761                         break;
762                 if (n >= sizeof(tmpbuf))
763                         buf = malloc(n + 1);
764                 else
765                         buf = tmpbuf;
766                 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
767                 if (error == 0) {
768                         buf[n] = 0;
769                         break;
770                 }
771                 if (buf != tmpbuf) {
772                         free(buf);
773                         buf = NULL;
774                 }
775                 if (errno != ENOMEM)
776                         break;
777         }
778         if (buf) {
779                 fprintf(stderr, "DISKS: %s\n", buf);
780                 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
781                         disk_reconnect(disk);
782                 }
783                 if (buf != tmpbuf)
784                         free(buf);
785         }
786 }
787
788 /*
789  * Normally the mount program supplies a cluster communications
790  * descriptor to the hammer2 vfs on mount, but if you kill the service
791  * daemon and restart it that link will be lost.
792  *
793  * This procedure attempts to [re]connect to existing mounts when
794  * the service daemon is started up before going into its accept
795  * loop.
796  *
797  * NOTE: A hammer2 mount point can only accomodate one connection at a time
798  *       so this will disconnect any existing connection during the
799  *       reconnect.
800  */
801 static
802 void
803 master_reconnect(const char *mntpt)
804 {
805         struct hammer2_ioc_recluster recls;
806         dmsg_master_service_info_t *info;
807         pthread_t thread;
808         int fd;
809         int pipefds[2];
810
811         fd = open(mntpt, O_RDONLY);
812         if (fd < 0) {
813                 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
814                 return;
815         }
816         if (pipe(pipefds) < 0) {
817                 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
818                 close(fd);
819                 return;
820         }
821         bzero(&recls, sizeof(recls));
822         recls.fd = pipefds[0];
823         if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
824                 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
825                 close(pipefds[0]);
826                 close(pipefds[1]);
827                 close(fd);
828                 return;
829         }
830         close(pipefds[0]);
831         close(fd);
832
833         info = malloc(sizeof(*info));
834         bzero(info, sizeof(*info));
835         info->fd = pipefds[1];
836         info->detachme = 1;
837         info->usrmsg_callback = hammer2_usrmsg_handler;
838         info->label = strdup("hammer2");
839         pthread_create(&thread, NULL, dmsg_master_service, info);
840 }
841
842 /*
843  * Reconnect a physical disk service to the mesh.
844  */
845 static
846 void
847 disk_reconnect(const char *disk)
848 {
849         struct disk_ioc_recluster recls;
850         struct diskcon *dc;
851         dmsg_master_service_info_t *info;
852         pthread_t thread;
853         int fd;
854         int pipefds[2];
855         char *path;
856
857         /*
858          * Urm, this will auto-create mdX+1, just ignore for now.
859          * This mechanic needs to be fixed.  It might actually be nice
860          * to be able to export md disks.
861          */
862         if (strncmp(disk, "md", 2) == 0)
863                 return;
864         if (strncmp(disk, "xa", 2) == 0)
865                 return;
866
867         /*
868          * Check if already connected
869          */
870         pthread_mutex_lock(&diskmtx);
871         TAILQ_FOREACH(dc, &diskconq, entry) {
872                 if (strcmp(dc->disk, disk) == 0)
873                         break;
874         }
875         pthread_mutex_unlock(&diskmtx);
876         if (dc)
877                 return;
878
879         /*
880          * Not already connected, create a connection to the kernel
881          * disk driver.
882          */
883         asprintf(&path, "/dev/%s", disk);
884         fd = open(path, O_RDONLY);
885         if (fd < 0) {
886                 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
887                 free(path);
888                 return;
889         }
890         free(path);
891         if (pipe(pipefds) < 0) {
892                 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
893                 close(fd);
894                 return;
895         }
896         bzero(&recls, sizeof(recls));
897         recls.fd = pipefds[0];
898         if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
899                 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
900                 close(pipefds[0]);
901                 close(pipefds[1]);
902                 close(fd);
903                 return;
904         }
905         close(pipefds[0]);
906         close(fd);
907
908         dc = malloc(sizeof(*dc));
909         dc->disk = strdup(disk);
910         pthread_mutex_lock(&diskmtx);
911         TAILQ_INSERT_TAIL(&diskconq, dc, entry);
912         pthread_mutex_unlock(&diskmtx);
913
914         info = malloc(sizeof(*info));
915         bzero(info, sizeof(*info));
916         info->fd = pipefds[1];
917         info->detachme = 1;
918         info->usrmsg_callback = hammer2_usrmsg_handler;
919         info->exit_callback = disk_disconnect;
920         info->handle = dc;
921         info->label = strdup(dc->disk);
922         pthread_create(&thread, NULL, dmsg_master_service, info);
923 }
924
925 static
926 void
927 disk_disconnect(void *handle)
928 {
929         struct diskcon *dc = handle;
930
931         fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
932
933         pthread_mutex_lock(&diskmtx);
934         TAILQ_REMOVE(&diskconq, dc, entry);
935         pthread_mutex_unlock(&diskmtx);
936         free(dc->disk);
937         free(dc);
938 }
939
940 /*
941  * Connect our cluster controller to /dev/xdisk.  xdisk will pick up
942  * SPAN messages that we route to it, makes remote block devices
943  * available to the host, and can issue dmsg transactions based on
944  * device requests.
945  */
946 static
947 void
948 xdisk_connect(void)
949 {
950         dmsg_master_service_info_t *info;
951         struct xdisk_attach_ioctl xaioc;
952         pthread_t thread;
953         int pipefds[2];
954         int xfd;
955         int error;
956
957         /*
958          * Is /dev/xdisk available?
959          */
960         xfd = open("/dev/xdisk", O_RDWR, 0600);
961         if (xfd < 0) {
962                 fprintf(stderr, "xdisk_connect: Unable to open /dev/xdisk\n");
963                 return;
964         }
965
966         if (pipe(pipefds) < 0) {
967                 fprintf(stderr, "xdisk_connect: pipe() failed\n");
968                 return;
969         }
970
971         /*
972          * Pipe between cluster controller (this user process).
973          */
974         info = malloc(sizeof(*info));
975         bzero(info, sizeof(*info));
976         info->fd = pipefds[1];
977         info->detachme = 1;
978         info->usrmsg_callback = hammer2_usrmsg_handler;
979         info->exit_callback = NULL;
980         pthread_create(&thread, NULL, dmsg_master_service, info);
981
982         /*
983          * And the xdisk device.
984          */
985         bzero(&xaioc, sizeof(xaioc));
986         xaioc.fd = pipefds[0];
987         error = ioctl(xfd, XDISKIOCATTACH, &xaioc);
988         close(pipefds[0]);
989         close(xfd);
990
991         if (error < 0) {
992                 fprintf(stderr,
993                         "xdisk_connect: cannot attach %s\n",
994                         strerror(errno));
995                 return;
996         }
997 }
998
999 /*
1000  * Execute the specified function as a detached independent process/daemon,
1001  * unless we are in debug mode.  If we are in debug mode the function is
1002  * executed as a pthread in the current process.
1003  */
1004 void
1005 hammer2_demon(void *(*func)(void *), void *arg)
1006 {
1007         pthread_t thread;
1008         pid_t pid;
1009         int ttyfd;
1010
1011         /*
1012          * Do not disconnect in debug mode
1013          */
1014         if (DebugOpt) {
1015                 pthread_create(&thread, NULL, func, arg);
1016                 NormalExit = 0;
1017                 return;
1018         }
1019
1020         /*
1021          * Otherwise disconnect us.  Double-fork to get rid of the ppid
1022          * association and disconnect the TTY.
1023          */
1024         if ((pid = fork()) < 0) {
1025                 fprintf(stderr, "hammer2: fork(): %s\n", strerror(errno));
1026                 exit(1);
1027         }
1028         if (pid > 0) {
1029                 while (waitpid(pid, NULL, 0) != pid)
1030                         ;
1031                 return;         /* parent returns */
1032         }
1033
1034         /*
1035          * Get rid of the TTY/session before double-forking to finish off
1036          * the ppid.
1037          */
1038         ttyfd = open("/dev/null", O_RDWR);
1039         if (ttyfd >= 0) {
1040                 if (ttyfd != 0)
1041                         dup2(ttyfd, 0);
1042                 if (ttyfd != 1)
1043                         dup2(ttyfd, 1);
1044                 if (ttyfd != 2)
1045                         dup2(ttyfd, 2);
1046                 if (ttyfd > 2)
1047                         close(ttyfd);
1048         }
1049
1050         ttyfd = open("/dev/tty", O_RDWR);
1051         if (ttyfd >= 0) {
1052                 ioctl(ttyfd, TIOCNOTTY, 0);
1053                 close(ttyfd);
1054         }
1055         setsid();
1056
1057         /*
1058          * Second fork to disconnect ppid (the original parent waits for
1059          * us to exit).
1060          */
1061         if ((pid = fork()) < 0) {
1062                 _exit(2);
1063         }
1064         if (pid > 0)
1065                 _exit(0);
1066
1067         /*
1068          * The double child
1069          */
1070         setsid();
1071         pthread_create(&thread, NULL, func, arg);
1072         pthread_exit(NULL);
1073         _exit(2);       /* NOT REACHED */
1074 }