btools: Obsolete pre hostprog era cc_tools.
[dragonfly.git] / sbin / hammer2 / cmd_service.c
CommitLineData
9ab15106
MD
1/*
2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
17 * distribution.
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 * SUCH DAMAGE.
34 */
35
36#include "hammer2.h"
37
ce48a6c5 38#include <sys/xdiskioctl.h>
98126869 39#include <machine/atomic.h>
ce48a6c5 40
b53e7c4f
MD
41struct hammer2_media_config {
42 hammer2_volconf_t copy_run;
43 hammer2_volconf_t copy_pend;
44 pthread_t thread;
45 pthread_cond_t cond;
46 int ctl;
47 int fd;
48 int pipefd[2]; /* signal stop */
49 dmsg_iocom_t iocom;
50 pthread_t iocom_thread;
51 enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
52};
53
54typedef struct hammer2_media_config hammer2_media_config_t;
55
56#define H2CONFCTL_STOP 0x00000001
57#define H2CONFCTL_UPDATE 0x00000002
58
185ace93
MD
59struct diskcon {
60 TAILQ_ENTRY(diskcon) entry;
61 char *disk;
62};
63
ce48a6c5
MD
64struct service_node_opaque {
65 char cl_label[64];
66 char fs_label[64];
67 dmsg_media_block_t block;
68 int attached;
69 int servicing;
70 int servicefd;
71};
72
2892d211
MD
73struct autoconn {
74 TAILQ_ENTRY(autoconn) entry;
75 char *host;
76 int stage;
77 int stopme;
78 int pipefd[2]; /* {read,write} */
79 enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
80 pthread_t thread;
81};
82
185ace93
MD
83#define WS " \r\n"
84
85TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
b53e7c4f
MD
86static pthread_mutex_t diskmtx;
87static pthread_mutex_t confmtx;
185ace93 88
5bc5bca2
MD
89static void *service_thread(void *data);
90static void *udev_thread(void *data);
2892d211 91static void *autoconn_thread(void *data);
e1648a68 92static void master_reconnect(const char *mntpt);
185ace93
MD
93static void disk_reconnect(const char *disk);
94static void disk_disconnect(void *handle);
5bc5bca2 95static void udev_check_disks(void);
b53e7c4f 96static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged);
b53e7c4f
MD
97static void *hammer2_volconf_thread(void *info);
98static void hammer2_volconf_signal(dmsg_iocom_t *iocom);
99static void hammer2_volconf_start(hammer2_media_config_t *conf,
100 const char *hostname);
101static void hammer2_volconf_stop(hammer2_media_config_t *conf);
102
ce48a6c5 103
d30cab67 104static void xdisk_connect(void);
9ab15106
MD
105
106/*
de936870
MD
107 * Start-up the master listener daemon for the machine. This daemon runs
108 * a UDP discovery protocol, a TCP rendezvous, and scans certain files
109 * and directories for work.
9ab15106 110 *
de936870
MD
111 * --
112 *
113 * The only purpose for the UDP discovery protocol is to determine what
114 * other IPs on the LAN are running the hammer2 service daemon. DNS is not
115 * required to operate, but hostnames (if assigned) must be unique. If
116 * no hostname is assigned the host's IP is used as the name. This name
117 * is broadcast along with the mtime of the originator's private key.
118 *
119 * Receiving hammer2 service daemons which are able to match the label against
120 * /etc/hammer2/remote/<label>.pub will initiate a persistent connection
121 * to the target. Removal of the file will cause a disconnection. A failed
122 * public key negotiation stops further connection attempts until either the
123 * file is updated or the remote mtime is updated.
124 *
125 * Generally speaking this results in a web of connections, typically a
126 * combination of point-to-point for the more important links and relayed
127 * (spanning tree) for less important or filtered links.
128 *
129 * --
130 *
131 * The TCP listener serves as a rendezvous point in the cluster, accepting
9ab15106
MD
132 * connections, performing registrations and authentications, maintaining
133 * the spanning tree, and keeping track of message state so disconnects can
134 * be handled properly.
135 *
136 * Once authenticated only low-level messaging protocols (which includes
137 * tracking persistent messages) are handled by this daemon. This daemon
138 * does not run the higher level quorum or locking protocols.
139 *
de936870
MD
140 * --
141 *
142 * The file /etc/hammer2/autoconn, if it exists, contains a list of targets
143 * to connect to (which do not have to be on the local lan). This list will
144 * be retried until a connection can be established. The file is not usually
145 * needed for linkages local to the LAN.
9ab15106
MD
146 */
147int
62efe6ec 148cmd_service(void)
9ab15106
MD
149{
150 struct sockaddr_in lsin;
151 int on;
152 int lfd;
153
154 /*
155 * Acquire socket and set options
156 */
157 if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
62efe6ec 158 fprintf(stderr, "master_listen: socket(): %s\n",
9ab15106
MD
159 strerror(errno));
160 return 1;
161 }
162 on = 1;
163 setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
164
165 /*
166 * Setup listen port and try to bind. If the bind fails we assume
167 * that a master listener process is already running and silently
168 * fail.
169 */
170 bzero(&lsin, sizeof(lsin));
171 lsin.sin_family = AF_INET;
172 lsin.sin_addr.s_addr = INADDR_ANY;
0c3a8cd0 173 lsin.sin_port = htons(DMSG_LISTEN_PORT);
9ab15106
MD
174 if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
175 close(lfd);
9b8b748f
MD
176 if (QuietOpt == 0) {
177 fprintf(stderr,
178 "master listen: daemon already running\n");
179 }
9ab15106
MD
180 return 0;
181 }
9b8b748f
MD
182 if (QuietOpt == 0)
183 fprintf(stderr, "master listen: startup\n");
9ab15106
MD
184 listen(lfd, 50);
185
186 /*
187 * Fork and disconnect the controlling terminal and parent process,
188 * executing the specified function as a pthread.
189 *
190 * Returns to the original process which can then continue running.
191 * In debug mode this call will create the pthread without forking
192 * and set NormalExit to 0, instead of fork.
193 */
5bc5bca2 194 hammer2_demon(service_thread, (void *)(intptr_t)lfd);
9ab15106
MD
195 if (NormalExit)
196 close(lfd);
197 return 0;
198}
199
200/*
201 * Master listen/accept thread. Accept connections on the master socket,
202 * starting a pthread for each one.
203 */
204static
205void *
5bc5bca2 206service_thread(void *data)
9ab15106
MD
207{
208 struct sockaddr_in asin;
209 socklen_t alen;
210 pthread_t thread;
0c3a8cd0 211 dmsg_master_service_info_t *info;
9ab15106
MD
212 int lfd = (int)(intptr_t)data;
213 int fd;
e1648a68
MD
214 int i;
215 int count;
d30cab67 216 int opt;
e1648a68
MD
217 struct statfs *mntbuf = NULL;
218 struct statvfs *mntvbuf = NULL;
9ab15106
MD
219
220 /*
221 * Nobody waits for us
222 */
223 setproctitle("hammer2 master listen");
224 pthread_detach(pthread_self());
225
5bc5bca2 226 /*
d30cab67
MD
227 * Start up a thread to handle block device monitoring for
228 * export to the cluster.
5bc5bca2
MD
229 */
230 thread = NULL;
231 pthread_create(&thread, NULL, udev_thread, NULL);
232
d30cab67
MD
233 /*
234 * Start up a thread to tie /dev/xdisk into the cluster
235 * controller.
236 */
237 xdisk_connect();
238
2892d211
MD
239 /*
240 * Start thread to manage /etc/hammer2/autoconn
241 */
242 thread = NULL;
243 pthread_create(&thread, NULL, autoconn_thread, NULL);
244
e1648a68
MD
245 /*
246 * Scan existing hammer2 mounts and reconnect to them using
247 * HAMMER2IOC_RECLUSTER.
248 */
249 count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
250 for (i = 0; i < count; ++i) {
251 if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
252 master_reconnect(mntbuf[i].f_mntonname);
253 }
254
9ab15106
MD
255 /*
256 * Accept connections and create pthreads to handle them after
257 * validating the IP.
258 */
259 for (;;) {
260 alen = sizeof(asin);
261 fd = accept(lfd, (struct sockaddr *)&asin, &alen);
262 if (fd < 0) {
263 if (errno == EINTR)
264 continue;
265 break;
266 }
d30cab67
MD
267 opt = 1;
268 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof opt);
9ab15106 269 thread = NULL;
5bc5bca2 270 fprintf(stderr, "service_thread: accept fd %d\n", fd);
e1648a68
MD
271 info = malloc(sizeof(*info));
272 bzero(info, sizeof(*info));
273 info->fd = fd;
274 info->detachme = 1;
b53e7c4f 275 info->usrmsg_callback = hammer2_usrmsg_handler;
f306de83 276 info->label = strdup("client");
0c3a8cd0 277 pthread_create(&thread, NULL, dmsg_master_service, info);
9ab15106
MD
278 }
279 return (NULL);
280}
281
b53e7c4f
MD
282/*
283 * Handle/Monitor the dmsg stream. If unmanaged is set we are responsible
284 * for responding for the message, otherwise if it is not set libdmsg has
285 * already done some preprocessing and will respond to the message for us
286 * when we return.
287 *
288 * We primarily monitor for VOLCONFs
289 */
290static
291void
292hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged)
293{
294 dmsg_state_t *state;
295 hammer2_media_config_t *conf;
296 dmsg_lnk_hammer2_volconf_t *msgconf;
297 int i;
298
299 /*
300 * Only process messages which are part of a LNK_CONN stream
301 */
302 state = msg->state;
303 if (state == NULL ||
304 (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) {
305 hammer2_shell_parse(msg, unmanaged);
306 return;
307 }
308
063639ba 309 switch(msg->tcmd) {
b53e7c4f
MD
310 case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
311 case DMSG_LNK_CONN | DMSGF_DELETE:
312 case DMSG_LNK_ERROR | DMSGF_DELETE:
313 /*
314 * Deleting connection, clean out all volume configs
315 */
316 if (state->media == NULL || state->media->usrhandle == NULL)
317 break;
318 conf = state->media->usrhandle;
319 fprintf(stderr, "Shutting down media spans\n");
320 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
321 if (conf[i].thread) {
322 conf[i].ctl = H2CONFCTL_STOP;
323 pthread_cond_signal(&conf[i].cond);
324 }
325 }
326 for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
327 if (conf[i].thread) {
328 pthread_join(conf[i].thread, NULL);
329 conf->thread = NULL;
330 pthread_cond_destroy(&conf[i].cond);
331 }
332 }
333 state->media->usrhandle = NULL;
334 free(conf);
335 break;
336 case DMSG_LNK_HAMMER2_VOLCONF:
337 /*
338 * One-way volume-configuration message is transmitted
339 * over the open LNK_CONN transaction.
340 */
341 fprintf(stderr, "RECEIVED VOLCONF\n");
342
343 if ((conf = state->media->usrhandle) == NULL) {
344 conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT);
345 bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT);
346 state->media->usrhandle = conf;
347 }
348 msgconf = H2_LNK_VOLCONF(msg);
349
350 if (msgconf->index < 0 ||
351 msgconf->index >= HAMMER2_COPYID_COUNT) {
352 fprintf(stderr,
353 "VOLCONF: ILLEGAL INDEX %d\n",
354 msgconf->index);
355 break;
356 }
357 if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 ||
358 msgconf->copy.path[0] == 0) {
359 fprintf(stderr,
360 "VOLCONF: ILLEGAL PATH %d\n",
361 msgconf->index);
362 break;
363 }
364 conf += msgconf->index;
365 pthread_mutex_lock(&confmtx);
366 conf->copy_pend = msgconf->copy;
367 conf->ctl |= H2CONFCTL_UPDATE;
368 pthread_mutex_unlock(&confmtx);
369 if (conf->thread == NULL) {
370 fprintf(stderr, "VOLCONF THREAD STARTED\n");
371 pthread_cond_init(&conf->cond, NULL);
372 pthread_create(&conf->thread, NULL,
373 hammer2_volconf_thread, (void *)conf);
374 }
375 pthread_cond_signal(&conf->cond);
376 break;
377 default:
378 if (unmanaged)
379 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
380 break;
381 }
382}
383
384static void *
385hammer2_volconf_thread(void *info)
386{
387 hammer2_media_config_t *conf = info;
388
944ddad0
MD
389 setproctitle("hammer2 volconf");
390
b53e7c4f
MD
391 pthread_mutex_lock(&confmtx);
392 while ((conf->ctl & H2CONFCTL_STOP) == 0) {
393 if (conf->ctl & H2CONFCTL_UPDATE) {
394 fprintf(stderr, "VOLCONF UPDATE\n");
395 conf->ctl &= ~H2CONFCTL_UPDATE;
396 if (bcmp(&conf->copy_run, &conf->copy_pend,
397 sizeof(conf->copy_run)) == 0) {
398 fprintf(stderr, "VOLCONF: no changes\n");
399 continue;
400 }
401 /*
402 * XXX TODO - auto reconnect on lookup failure or
403 * connect failure or stream failure.
404 */
405
406 pthread_mutex_unlock(&confmtx);
407 hammer2_volconf_stop(conf);
408 conf->copy_run = conf->copy_pend;
409 if (conf->copy_run.copyid != 0 &&
410 strncmp(conf->copy_run.path, "span:", 5) == 0) {
411 hammer2_volconf_start(conf,
412 conf->copy_run.path + 5);
413 }
414 pthread_mutex_lock(&confmtx);
415 fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
416 }
417 if (conf->state == H2MC_CONNECT) {
418 hammer2_volconf_start(conf, conf->copy_run.path + 5);
419 pthread_mutex_unlock(&confmtx);
420 sleep(5);
421 pthread_mutex_lock(&confmtx);
422 } else {
423 pthread_cond_wait(&conf->cond, &confmtx);
424 }
425 }
426 pthread_mutex_unlock(&confmtx);
427 hammer2_volconf_stop(conf);
428 return(NULL);
429}
430
431static
432void
433hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname)
434{
435 dmsg_master_service_info_t *info;
436
437 switch(conf->state) {
438 case H2MC_STOPPED:
439 case H2MC_CONNECT:
440 conf->fd = dmsg_connect(hostname);
441 if (conf->fd < 0) {
442 fprintf(stderr, "Unable to connect to %s\n", hostname);
443 conf->state = H2MC_CONNECT;
444 } else if (pipe(conf->pipefd) < 0) {
445 close(conf->fd);
446 fprintf(stderr, "pipe() failed during volconf\n");
447 conf->state = H2MC_CONNECT;
448 } else {
449 fprintf(stderr, "VOLCONF CONNECT\n");
450 info = malloc(sizeof(*info));
451 bzero(info, sizeof(*info));
452 info->fd = conf->fd;
453 info->altfd = conf->pipefd[0];
454 info->altmsg_callback = hammer2_volconf_signal;
afa78c43 455 info->usrmsg_callback = hammer2_usrmsg_handler;
b53e7c4f
MD
456 info->detachme = 0;
457 conf->state = H2MC_RUNNING;
458 pthread_create(&conf->iocom_thread, NULL,
459 dmsg_master_service, info);
460 }
461 break;
462 case H2MC_RUNNING:
463 break;
464 }
465}
466
467static
468void
469hammer2_volconf_stop(hammer2_media_config_t *conf)
470{
471 switch(conf->state) {
472 case H2MC_STOPPED:
473 break;
474 case H2MC_CONNECT:
475 conf->state = H2MC_STOPPED;
476 break;
477 case H2MC_RUNNING:
478 close(conf->pipefd[1]);
479 conf->pipefd[1] = -1;
480 pthread_join(conf->iocom_thread, NULL);
481 conf->iocom_thread = NULL;
482 conf->state = H2MC_STOPPED;
483 break;
484 }
485}
486
487static
488void
489hammer2_volconf_signal(dmsg_iocom_t *iocom)
490{
491 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
492}
493
5bc5bca2
MD
494/*
495 * Monitor block devices. Currently polls every ~10 seconds or so.
496 */
497static
498void *
499udev_thread(void *data __unused)
500{
501 int fd;
502 int seq = 0;
503
504 pthread_detach(pthread_self());
944ddad0 505 setproctitle("hammer2 udev_thread");
5bc5bca2
MD
506
507 if ((fd = open(UDEV_DEVICE_PATH, O_RDWR)) < 0) {
508 fprintf(stderr, "udev_thread: unable to open \"%s\"\n",
509 UDEV_DEVICE_PATH);
510 pthread_exit(NULL);
511 }
512 udev_check_disks();
513 while (ioctl(fd, UDEVWAIT, &seq) == 0) {
514 udev_check_disks();
515 sleep(1);
516 }
517 return (NULL);
518}
519
2892d211
MD
520static void *autoconn_connect_thread(void *data);
521static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
522
523static
524void *
525autoconn_thread(void *data __unused)
526{
527 TAILQ_HEAD(, autoconn) autolist;
528 struct autoconn *ac;
529 struct autoconn *next;
530 pthread_t thread;
531 struct stat st;
532 time_t t;
533 time_t lmod;
534 int found_last;
535 FILE *fp;
536 char buf[256];
537
538 TAILQ_INIT(&autolist);
539 found_last = 0;
540 lmod = 0;
541
542 pthread_detach(pthread_self());
944ddad0 543 setproctitle("hammer2 autoconn_thread");
2892d211
MD
544 for (;;) {
545 /*
546 * Polling interval
547 */
548 sleep(5);
549
550 /*
551 * Poll the file. Loop up if the synchronized state (lmod)
552 * has not changed.
553 */
554 if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
555 if (lmod == st.st_mtime)
556 continue;
557 fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
558 if (fp == NULL)
559 continue;
560 } else {
561 if (lmod == 0)
562 continue;
563 fp = NULL;
564 }
565
566 /*
567 * Wait at least 5 seconds after the file is created or
568 * removed.
569 *
570 * Do not update the synchronized state.
571 */
572 if (fp == NULL && found_last) {
573 found_last = 0;
574 continue;
575 } else if (fp && found_last == 0) {
576 fclose(fp);
577 found_last = 1;
578 continue;
579 }
580
581 /*
582 * Don't scan the file until the time progresses past the
583 * file's mtime, so we can validate that the file was not
584 * further modified during our scan.
585 *
586 * Do not update the synchronized state.
587 */
588 time(&t);
589 if (fp) {
590 if (t == st.st_mtime) {
591 fclose(fp);
592 continue;
593 }
594 t = st.st_mtime;
595 } else {
596 t = 0;
597 }
598
599 /*
600 * Set staging to disconnect, then scan the file.
601 */
602 TAILQ_FOREACH(ac, &autolist, entry)
603 ac->stage = 0;
604 while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
605 char *host;
606
607 if ((host = strtok(buf, " \t\r\n")) == NULL ||
608 host[0] == '#') {
609 continue;
610 }
611 TAILQ_FOREACH(ac, &autolist, entry) {
612 if (strcmp(host, ac->host) == 0)
613 break;
614 }
615 if (ac == NULL) {
616 ac = malloc(sizeof(*ac));
617 bzero(ac, sizeof(*ac));
618 ac->host = strdup(host);
619 ac->state = AUTOCONN_INACTIVE;
620 TAILQ_INSERT_TAIL(&autolist, ac, entry);
621 }
622 ac->stage = 1;
623 }
624
625 /*
626 * Ignore the scan (and retry again) if the file was
627 * modified during the scan.
628 *
629 * Do not update the synchronized state.
630 */
631 if (fp) {
632 if (fstat(fileno(fp), &st) < 0) {
633 fclose(fp);
634 continue;
635 }
636 fclose(fp);
637 if (t != st.st_mtime)
638 continue;
639 }
640
641 /*
642 * Update the synchronized state and reconfigure the
643 * connect list as needed.
644 */
645 lmod = t;
646 next = TAILQ_FIRST(&autolist);
647 while ((ac = next) != NULL) {
648 next = TAILQ_NEXT(ac, entry);
649
650 /*
651 * Staging, initiate
652 */
653 if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
654 if (pipe(ac->pipefd) == 0) {
655 ac->stopme = 0;
656 ac->state = AUTOCONN_ACTIVE;
657 thread = NULL;
658 pthread_create(&thread, NULL,
659 autoconn_connect_thread,
660 ac);
661 }
662 }
663
664 /*
665 * Unstaging, stop active connection.
98126869
MD
666 *
667 * We write to the pipe which causes the iocom_core
668 * to call autoconn_disconnect_signal().
2892d211
MD
669 */
670 if (ac->stage == 0 &&
671 ac->state == AUTOCONN_ACTIVE) {
672 if (ac->stopme == 0) {
98126869 673 char dummy = 0;
2892d211 674 ac->stopme = 1;
98126869 675 write(ac->pipefd[1], &dummy, 1);
2892d211
MD
676 }
677 }
678
679 /*
680 * Unstaging, delete inactive connection.
681 */
682 if (ac->stage == 0 &&
683 ac->state == AUTOCONN_INACTIVE) {
684 TAILQ_REMOVE(&autolist, ac, entry);
685 free(ac->host);
686 free(ac);
687 continue;
688 }
689 }
690 sleep(5);
691 }
692 return(NULL);
693}
694
695static
696void *
697autoconn_connect_thread(void *data)
698{
699 dmsg_master_service_info_t *info;
700 struct autoconn *ac;
701 void *res;
702 int fd;
703
704 ac = data;
705 pthread_detach(pthread_self());
944ddad0 706 setproctitle("hammer2 dmsg");
2892d211
MD
707
708 while (ac->stopme == 0) {
709 fd = dmsg_connect(ac->host);
710 if (fd < 0) {
5ab1caed
MD
711 if (DMsgDebugOpt > 2) {
712 fprintf(stderr,
713 "autoconn: Connect failure: %s\n",
714 ac->host);
715 }
2892d211
MD
716 sleep(5);
717 continue;
718 }
719 fprintf(stderr, "autoconn: Connect %s\n", ac->host);
720
721 info = malloc(sizeof(*info));
722 bzero(info, sizeof(*info));
723 info->fd = fd;
724 info->altfd = ac->pipefd[0];
725 info->altmsg_callback = autoconn_disconnect_signal;
afa78c43 726 info->usrmsg_callback = hammer2_usrmsg_handler;
2892d211 727 info->detachme = 0;
98126869 728 info->noclosealt = 1;
2892d211
MD
729 pthread_create(&ac->thread, NULL, dmsg_master_service, info);
730 pthread_join(ac->thread, &res);
731 }
732 close(ac->pipefd[0]);
733 ac->state = AUTOCONN_INACTIVE;
734 /* auto structure can be ripped out here */
735 return(NULL);
736}
737
738static
739void
740autoconn_disconnect_signal(dmsg_iocom_t *iocom)
741{
742 fprintf(stderr, "autoconn: Shutting down socket\n");
98126869 743 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
2892d211
MD
744}
745
5bc5bca2
MD
746/*
747 * Retrieve the list of disk attachments and attempt to export
748 * them.
749 */
750static
751void
752udev_check_disks(void)
753{
754 char tmpbuf[1024];
755 char *buf = NULL;
185ace93 756 char *disk;
5bc5bca2
MD
757 int error;
758 size_t n;
759
760 for (;;) {
761 n = 0;
762 error = sysctlbyname("kern.disks", NULL, &n, NULL, 0);
763 if (error < 0 || n == 0)
764 break;
765 if (n >= sizeof(tmpbuf))
766 buf = malloc(n + 1);
767 else
768 buf = tmpbuf;
769 error = sysctlbyname("kern.disks", buf, &n, NULL, 0);
770 if (error == 0) {
771 buf[n] = 0;
772 break;
773 }
774 if (buf != tmpbuf) {
775 free(buf);
776 buf = NULL;
777 }
778 if (errno != ENOMEM)
779 break;
780 }
781 if (buf) {
782 fprintf(stderr, "DISKS: %s\n", buf);
185ace93
MD
783 for (disk = strtok(buf, WS); disk; disk = strtok(NULL, WS)) {
784 disk_reconnect(disk);
785 }
5bc5bca2
MD
786 if (buf != tmpbuf)
787 free(buf);
788 }
789}
790
e1648a68
MD
791/*
792 * Normally the mount program supplies a cluster communications
793 * descriptor to the hammer2 vfs on mount, but if you kill the service
794 * daemon and restart it that link will be lost.
795 *
796 * This procedure attempts to [re]connect to existing mounts when
797 * the service daemon is started up before going into its accept
798 * loop.
eae0d690
MD
799 *
800 * NOTE: A hammer2 mount point can only accomodate one connection at a time
801 * so this will disconnect any existing connection during the
802 * reconnect.
e1648a68
MD
803 */
804static
805void
806master_reconnect(const char *mntpt)
807{
808 struct hammer2_ioc_recluster recls;
0c3a8cd0 809 dmsg_master_service_info_t *info;
e1648a68
MD
810 pthread_t thread;
811 int fd;
812 int pipefds[2];
813
814 fd = open(mntpt, O_RDONLY);
815 if (fd < 0) {
816 fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
817 return;
818 }
819 if (pipe(pipefds) < 0) {
820 fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
2063f4d7 821 close(fd);
e1648a68
MD
822 return;
823 }
824 bzero(&recls, sizeof(recls));
825 recls.fd = pipefds[0];
826 if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
827 fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
828 close(pipefds[0]);
829 close(pipefds[1]);
830 close(fd);
831 return;
832 }
833 close(pipefds[0]);
2063f4d7 834 close(fd);
e1648a68
MD
835
836 info = malloc(sizeof(*info));
837 bzero(info, sizeof(*info));
838 info->fd = pipefds[1];
839 info->detachme = 1;
b53e7c4f 840 info->usrmsg_callback = hammer2_usrmsg_handler;
f306de83 841 info->label = strdup("hammer2");
0c3a8cd0 842 pthread_create(&thread, NULL, dmsg_master_service, info);
9ab15106 843}
185ace93
MD
844
845/*
ce48a6c5 846 * Reconnect a physical disk service to the mesh.
185ace93
MD
847 */
848static
849void
850disk_reconnect(const char *disk)
851{
852 struct disk_ioc_recluster recls;
853 struct diskcon *dc;
854 dmsg_master_service_info_t *info;
855 pthread_t thread;
856 int fd;
857 int pipefds[2];
858 char *path;
859
860 /*
861 * Urm, this will auto-create mdX+1, just ignore for now.
862 * This mechanic needs to be fixed. It might actually be nice
863 * to be able to export md disks.
864 */
865 if (strncmp(disk, "md", 2) == 0)
866 return;
ce48a6c5
MD
867 if (strncmp(disk, "xa", 2) == 0)
868 return;
185ace93
MD
869
870 /*
871 * Check if already connected
872 */
873 pthread_mutex_lock(&diskmtx);
874 TAILQ_FOREACH(dc, &diskconq, entry) {
875 if (strcmp(dc->disk, disk) == 0)
876 break;
877 }
878 pthread_mutex_unlock(&diskmtx);
879 if (dc)
880 return;
881
882 /*
883 * Not already connected, create a connection to the kernel
884 * disk driver.
885 */
886 asprintf(&path, "/dev/%s", disk);
887 fd = open(path, O_RDONLY);
888 if (fd < 0) {
889 fprintf(stderr, "reconnect %s: no access to disk\n", disk);
890 free(path);
891 return;
892 }
893 free(path);
894 if (pipe(pipefds) < 0) {
895 fprintf(stderr, "reconnect %s: pipe() failed\n", disk);
896 close(fd);
897 return;
898 }
899 bzero(&recls, sizeof(recls));
900 recls.fd = pipefds[0];
901 if (ioctl(fd, DIOCRECLUSTER, &recls) < 0) {
902 fprintf(stderr, "reconnect %s: ioctl failed\n", disk);
903 close(pipefds[0]);
904 close(pipefds[1]);
905 close(fd);
906 return;
907 }
908 close(pipefds[0]);
909 close(fd);
910
911 dc = malloc(sizeof(*dc));
912 dc->disk = strdup(disk);
913 pthread_mutex_lock(&diskmtx);
914 TAILQ_INSERT_TAIL(&diskconq, dc, entry);
915 pthread_mutex_unlock(&diskmtx);
916
917 info = malloc(sizeof(*info));
918 bzero(info, sizeof(*info));
919 info->fd = pipefds[1];
920 info->detachme = 1;
b53e7c4f 921 info->usrmsg_callback = hammer2_usrmsg_handler;
185ace93
MD
922 info->exit_callback = disk_disconnect;
923 info->handle = dc;
f306de83 924 info->label = strdup(dc->disk);
185ace93
MD
925 pthread_create(&thread, NULL, dmsg_master_service, info);
926}
927
928static
929void
930disk_disconnect(void *handle)
931{
932 struct diskcon *dc = handle;
933
934 fprintf(stderr, "DISK_DISCONNECT %s\n", dc->disk);
935
936 pthread_mutex_lock(&diskmtx);
937 TAILQ_REMOVE(&diskconq, dc, entry);
938 pthread_mutex_unlock(&diskmtx);
939 free(dc->disk);
940 free(dc);
941}
ce48a6c5
MD
942
943/*
d30cab67
MD
944 * Connect our cluster controller to /dev/xdisk. xdisk will pick up
945 * SPAN messages that we route to it, makes remote block devices
946 * available to the host, and can issue dmsg transactions based on
947 * device requests.
ce48a6c5
MD
948 */
949static
950void
d30cab67 951xdisk_connect(void)
ce48a6c5 952{
ce48a6c5 953 dmsg_master_service_info_t *info;
d30cab67 954 struct xdisk_attach_ioctl xaioc;
ce48a6c5
MD
955 pthread_t thread;
956 int pipefds[2];
d30cab67
MD
957 int xfd;
958 int error;
959
960 /*
961 * Is /dev/xdisk available?
962 */
963 xfd = open("/dev/xdisk", O_RDWR, 0600);
964 if (xfd < 0) {
965 fprintf(stderr, "xdisk_connect: Unable to open /dev/xdisk\n");
966 return;
967 }
ce48a6c5
MD
968
969 if (pipe(pipefds) < 0) {
d30cab67 970 fprintf(stderr, "xdisk_connect: pipe() failed\n");
ce48a6c5
MD
971 return;
972 }
973
d30cab67
MD
974 /*
975 * Pipe between cluster controller (this user process).
976 */
ce48a6c5
MD
977 info = malloc(sizeof(*info));
978 bzero(info, sizeof(*info));
979 info->fd = pipefds[1];
980 info->detachme = 1;
b53e7c4f 981 info->usrmsg_callback = hammer2_usrmsg_handler;
d30cab67 982 info->exit_callback = NULL;
ce48a6c5
MD
983 pthread_create(&thread, NULL, dmsg_master_service, info);
984
985 /*
d30cab67 986 * And the xdisk device.
ce48a6c5 987 */
d30cab67
MD
988 bzero(&xaioc, sizeof(xaioc));
989 xaioc.fd = pipefds[0];
990 error = ioctl(xfd, XDISKIOCATTACH, &xaioc);
991 close(pipefds[0]);
992 close(xfd);
ce48a6c5 993
d30cab67
MD
994 if (error < 0) {
995 fprintf(stderr,
996 "xdisk_connect: cannot attach %s\n",
997 strerror(errno));
998 return;
ce48a6c5 999 }
ce48a6c5 1000}