Initial import of binutils 2.22 on the new vendor branch
[dragonfly.git] / contrib / lvm2 / dist / daemons / clvmd / clvmd.c
1 /*      $NetBSD: clvmd.c,v 1.1.1.3 2009/12/02 00:27:05 haad Exp $       */
2
3 /*
4  * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved.
5  * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
6  *
7  * This file is part of LVM2.
8  *
9  * This copyrighted material is made available to anyone wishing to use,
10  * modify, copy, or redistribute it subject to the terms and conditions
11  * of the GNU General Public License v.2.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software Foundation,
15  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16  */
17
18 /*
19  * CLVMD: Cluster LVM daemon
20  */
21
22 #define _GNU_SOURCE
23 #define _FILE_OFFSET_BITS 64
24
25 #include <configure.h>
26 #include <libdevmapper.h>
27
28 #include <pthread.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <sys/socket.h>
32 #include <sys/uio.h>
33 #include <sys/un.h>
34 #include <sys/time.h>
35 #include <sys/ioctl.h>
36 #include <sys/utsname.h>
37 #include <netinet/in.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <stddef.h>
41 #include <stdarg.h>
42 #include <signal.h>
43 #include <unistd.h>
44 #include <fcntl.h>
45 #include <getopt.h>
46 #include <syslog.h>
47 #include <errno.h>
48 #include <limits.h>
49 #ifdef HAVE_COROSYNC_CONFDB_H
50 #include <corosync/confdb.h>
51 #endif
52
53 #include "clvmd-comms.h"
54 #include "lvm-functions.h"
55 #include "clvm.h"
56 #include "lvm-version.h"
57 #include "clvmd.h"
58 #include "refresh_clvmd.h"
59 #include "lvm-logging.h"
60
61 #ifndef TRUE
62 #define TRUE 1
63 #endif
64 #ifndef FALSE
65 #define FALSE 0
66 #endif
67
68 #define MAX_RETRIES 4
69
70 #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0)
71
72 /* Head of the fd list. Also contains
73    the cluster_socket details */
74 static struct local_client local_client_head;
75
76 static unsigned short global_xid = 0;   /* Last transaction ID issued */
77
78 struct cluster_ops *clops = NULL;
79
80 static char our_csid[MAX_CSID_LEN];
81 static unsigned max_csid_len;
82 static unsigned max_cluster_message;
83 static unsigned max_cluster_member_name_len;
84
85 /* Structure of items on the LVM thread list */
86 struct lvm_thread_cmd {
87         struct dm_list list;
88
89         struct local_client *client;
90         struct clvm_header *msg;
91         char csid[MAX_CSID_LEN];
92         int remote;             /* Flag */
93         int msglen;
94         unsigned short xid;
95 };
96
97 debug_t debug;
98 static pthread_t lvm_thread;
99 static pthread_mutex_t lvm_thread_mutex;
100 static pthread_cond_t lvm_thread_cond;
101 static pthread_mutex_t lvm_start_mutex;
102 static struct dm_list lvm_cmd_head;
103 static volatile sig_atomic_t quit = 0;
104 static volatile sig_atomic_t reread_config = 0;
105 static int child_pipe[2];
106
107 /* Reasons the daemon failed initialisation */
108 #define DFAIL_INIT       1
109 #define DFAIL_LOCAL_SOCK 2
110 #define DFAIL_CLUSTER_IF 3
111 #define DFAIL_MALLOC     4
112 #define DFAIL_TIMEOUT    5
113 #define SUCCESS          0
114
115 typedef enum {IF_AUTO, IF_CMAN, IF_GULM, IF_OPENAIS, IF_COROSYNC} if_type_t;
116
117 typedef void *(lvm_pthread_fn_t)(void*);
118
119 /* Prototypes for code further down */
120 static void sigusr2_handler(int sig);
121 static void sighup_handler(int sig);
122 static void sigterm_handler(int sig);
123 static void send_local_reply(struct local_client *client, int status,
124                              int clientid);
125 static void free_reply(struct local_client *client);
126 static void send_version_message(void);
127 static void *pre_and_post_thread(void *arg);
128 static int send_message(void *buf, int msglen, const char *csid, int fd,
129                         const char *errtext);
130 static int read_from_local_sock(struct local_client *thisfd);
131 static int process_local_command(struct clvm_header *msg, int msglen,
132                                  struct local_client *client,
133                                  unsigned short xid);
134 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
135                                    const char *csid);
136 static int process_reply(const struct clvm_header *msg, int msglen,
137                          const char *csid);
138 static int open_local_sock(void);
139 static int check_local_clvmd(void);
140 static struct local_client *find_client(int clientid);
141 static void main_loop(int local_sock, int cmd_timeout);
142 static void be_daemon(int start_timeout);
143 static int check_all_clvmds_running(struct local_client *client);
144 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
145                                      int len, const char *csid,
146                                      struct local_client **new_client);
147 static void lvm_thread_fn(void *) __attribute__ ((noreturn));
148 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
149                            int msglen, const char *csid);
150 static int distribute_command(struct local_client *thisfd);
151 static void hton_clvm(struct clvm_header *hdr);
152 static void ntoh_clvm(struct clvm_header *hdr);
153 static void add_reply_to_list(struct local_client *client, int status,
154                               const char *csid, const char *buf, int len);
155 static if_type_t parse_cluster_interface(char *ifname);
156 static if_type_t get_cluster_type(void);
157
158 static void usage(char *prog, FILE *file)
159 {
160         fprintf(file, "Usage:\n");
161         fprintf(file, "%s [Vhd]\n", prog);
162         fprintf(file, "\n");
163         fprintf(file, "   -V       Show version of clvmd\n");
164         fprintf(file, "   -h       Show this help information\n");
165         fprintf(file, "   -d       Set debug level\n");
166         fprintf(file, "            If starting clvmd then don't fork, run in the foreground\n");
167         fprintf(file, "   -R       Tell all running clvmds in the cluster to reload their device cache\n");
168         fprintf(file, "   -C       Sets debug level (from -d) on all clvmd instances clusterwide\n");
169         fprintf(file, "   -t<secs> Command timeout (default 60 seconds)\n");
170         fprintf(file, "   -T<secs> Startup timeout (default none)\n");
171         fprintf(file, "   -I<cmgr> Cluster manager (default: auto)\n");
172         fprintf(file, "            Available cluster managers: ");
173 #ifdef USE_COROSYNC
174         fprintf(file, "corosync ");
175 #endif
176 #ifdef USE_CMAN
177         fprintf(file, "cman ");
178 #endif
179 #ifdef USE_OPENAIS
180         fprintf(file, "openais ");
181 #endif
182 #ifdef USE_GULM
183         fprintf(file, "gulm ");
184 #endif
185         fprintf(file, "\n");
186 }
187
188 /* Called to signal the parent how well we got on during initialisation */
189 static void child_init_signal(int status)
190 {
191         if (child_pipe[1]) {
192                 write(child_pipe[1], &status, sizeof(status));
193                 close(child_pipe[1]);
194         }
195         if (status)
196                 exit(status);
197 }
198
199
200 void debuglog(const char *fmt, ...)
201 {
202         time_t P;
203         va_list ap;
204         static int syslog_init = 0;
205
206         if (debug == DEBUG_STDERR) {
207                 va_start(ap,fmt);
208                 time(&P);
209                 fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 );
210                 vfprintf(stderr, fmt, ap);
211                 va_end(ap);
212         }
213         if (debug == DEBUG_SYSLOG) {
214                 if (!syslog_init) {
215                         openlog("clvmd", LOG_PID, LOG_DAEMON);
216                         syslog_init = 1;
217                 }
218
219                 va_start(ap,fmt);
220                 vsyslog(LOG_DEBUG, fmt, ap);
221                 va_end(ap);
222         }
223 }
224
225 static const char *decode_cmd(unsigned char cmdl)
226 {
227         static char buf[128];
228         const char *command;
229
230         switch (cmdl) {
231         case CLVMD_CMD_TEST:
232                 command = "TEST";
233                 break;
234         case CLVMD_CMD_LOCK_VG:
235                 command = "LOCK_VG";
236                 break;
237         case CLVMD_CMD_LOCK_LV:
238                 command = "LOCK_LV";
239                 break;
240         case CLVMD_CMD_REFRESH:
241                 command = "REFRESH";
242                 break;
243         case CLVMD_CMD_SET_DEBUG:
244                 command = "SET_DEBUG";
245                 break;
246         case CLVMD_CMD_GET_CLUSTERNAME:
247                 command = "GET_CLUSTERNAME";
248                 break;
249         case CLVMD_CMD_VG_BACKUP:
250                 command = "VG_BACKUP";
251                 break;
252         case CLVMD_CMD_REPLY:
253                 command = "REPLY";
254                 break;
255         case CLVMD_CMD_VERSION:
256                 command = "VERSION";
257                 break;
258         case CLVMD_CMD_GOAWAY:
259                 command = "GOAWAY";
260                 break;
261         case CLVMD_CMD_LOCK:
262                 command = "LOCK";
263                 break;
264         case CLVMD_CMD_UNLOCK:
265                 command = "UNLOCK";
266                 break;
267         case CLVMD_CMD_LOCK_QUERY:
268                 command = "LOCK_QUERY";
269                 break;
270         default:
271                 command = "unknown";
272                 break;
273         }
274
275         sprintf(buf, "%s (0x%x)", command, cmdl);
276
277         return buf;
278 }
279
280 int main(int argc, char *argv[])
281 {
282         int local_sock;
283         struct local_client *newfd;
284         struct utsname nodeinfo;
285         signed char opt;
286         int cmd_timeout = DEFAULT_CMD_TIMEOUT;
287         int start_timeout = 0;
288         if_type_t cluster_iface = IF_AUTO;
289         sigset_t ss;
290         int using_gulm = 0;
291         int debug_opt = 0;
292         int clusterwide_opt = 0;
293
294         /* Deal with command-line arguments */
295         opterr = 0;
296         optind = 0;
297         while ((opt = getopt(argc, argv, "?vVhd::t:RT:CI:")) != EOF) {
298                 switch (opt) {
299                 case 'h':
300                         usage(argv[0], stdout);
301                         exit(0);
302
303                 case '?':
304                         usage(argv[0], stderr);
305                         exit(0);
306
307                 case 'R':
308                         return refresh_clvmd()==1?0:1;
309
310                 case 'C':
311                         clusterwide_opt = 1;
312                         break;
313
314                 case 'd':
315                         debug_opt = 1;
316                         if (optarg)
317                                 debug = atoi(optarg);
318                         else
319                                 debug = DEBUG_STDERR;
320                         break;
321
322                 case 't':
323                         cmd_timeout = atoi(optarg);
324                         if (!cmd_timeout) {
325                                 fprintf(stderr, "command timeout is invalid\n");
326                                 usage(argv[0], stderr);
327                                 exit(1);
328                         }
329                         break;
330                 case 'I':
331                         cluster_iface = parse_cluster_interface(optarg);
332                         break;
333                 case 'T':
334                         start_timeout = atoi(optarg);
335                         if (start_timeout <= 0) {
336                                 fprintf(stderr, "startup timeout is invalid\n");
337                                 usage(argv[0], stderr);
338                                 exit(1);
339                         }
340                         break;
341
342                 case 'V':
343                         printf("Cluster LVM daemon version: %s\n", LVM_VERSION);
344                         printf("Protocol version:           %d.%d.%d\n",
345                                CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION,
346                                CLVMD_PATCH_VERSION);
347                         exit(1);
348                         break;
349
350                 }
351         }
352
353         /* Setting debug options on an existing clvmd */
354         if (debug_opt && !check_local_clvmd()) {
355
356                 /* Sending to stderr makes no sense for a detached daemon */
357                 if (debug == DEBUG_STDERR)
358                         debug = DEBUG_SYSLOG;
359                 return debug_clvmd(debug, clusterwide_opt)==1?0:1;
360         }
361
362         /* Fork into the background (unless requested not to) */
363         if (debug != DEBUG_STDERR) {
364                 be_daemon(start_timeout);
365         }
366
367         DEBUGLOG("CLVMD started\n");
368
369         /* Open the Unix socket we listen for commands on.
370            We do this before opening the cluster socket so that
371            potential clients will block rather than error if we are running
372            but the cluster is not ready yet */
373         local_sock = open_local_sock();
374         if (local_sock < 0)
375                 child_init_signal(DFAIL_LOCAL_SOCK);
376
377         /* Set up signal handlers, USR1 is for cluster change notifications (in cman)
378            USR2 causes child threads to exit.
379            HUP causes gulm version to re-read nodes list from CCS.
380            PIPE should be ignored */
381         signal(SIGUSR2, sigusr2_handler);
382         signal(SIGHUP,  sighup_handler);
383         signal(SIGPIPE, SIG_IGN);
384
385         /* Block SIGUSR2/SIGINT/SIGTERM in process */
386         sigemptyset(&ss);
387         sigaddset(&ss, SIGUSR2);
388         sigaddset(&ss, SIGINT);
389         sigaddset(&ss, SIGTERM);
390         sigprocmask(SIG_BLOCK, &ss, NULL);
391
392         /* Initialise the LVM thread variables */
393         dm_list_init(&lvm_cmd_head);
394         pthread_mutex_init(&lvm_thread_mutex, NULL);
395         pthread_cond_init(&lvm_thread_cond, NULL);
396         pthread_mutex_init(&lvm_start_mutex, NULL);
397         init_lvhash();
398
399         /* Start the cluster interface */
400         if (cluster_iface == IF_AUTO)
401                 cluster_iface = get_cluster_type();
402
403 #ifdef USE_CMAN
404         if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) && (clops = init_cman_cluster())) {
405                 max_csid_len = CMAN_MAX_CSID_LEN;
406                 max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE;
407                 max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN;
408                 syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN");
409         }
410 #endif
411 #ifdef USE_GULM
412         if (!clops)
413                 if ((cluster_iface == IF_AUTO || cluster_iface == IF_GULM) && (clops = init_gulm_cluster())) {
414                         max_csid_len = GULM_MAX_CSID_LEN;
415                         max_cluster_message = GULM_MAX_CLUSTER_MESSAGE;
416                         max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN;
417                         using_gulm = 1;
418                         syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM");
419                 }
420 #endif
421 #ifdef USE_COROSYNC
422         if (!clops)
423                 if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) && (clops = init_corosync_cluster()))) {
424                         max_csid_len = COROSYNC_CSID_LEN;
425                         max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE;
426                         max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN;
427                         syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync");
428                 }
429 #endif
430 #ifdef USE_OPENAIS
431         if (!clops)
432                 if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) && (clops = init_openais_cluster())) {
433                         max_csid_len = OPENAIS_CSID_LEN;
434                         max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE;
435                         max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN;
436                         syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS");
437                 }
438 #endif
439
440         if (!clops) {
441                 DEBUGLOG("Can't initialise cluster interface\n");
442                 log_error("Can't initialise cluster interface\n");
443                 child_init_signal(DFAIL_CLUSTER_IF);
444         }
445         DEBUGLOG("Cluster ready, doing some more initialisation\n");
446
447         /* Save our CSID */
448         uname(&nodeinfo);
449         clops->get_our_csid(our_csid);
450
451         /* Initialise the FD list head */
452         local_client_head.fd = clops->get_main_cluster_fd();
453         local_client_head.type = CLUSTER_MAIN_SOCK;
454         local_client_head.callback = clops->cluster_fd_callback;
455
456         /* Add the local socket to the list */
457         newfd = malloc(sizeof(struct local_client));
458         if (!newfd)
459                 child_init_signal(DFAIL_MALLOC);
460
461         newfd->fd = local_sock;
462         newfd->removeme = 0;
463         newfd->type = LOCAL_RENDEZVOUS;
464         newfd->callback = local_rendezvous_callback;
465         newfd->next = local_client_head.next;
466         local_client_head.next = newfd;
467
468         /* This needs to be started after cluster initialisation
469            as it may need to take out locks */
470         DEBUGLOG("starting LVM thread\n");
471
472         /* Don't let anyone else to do work until we are started */
473         pthread_mutex_lock(&lvm_start_mutex);
474         pthread_create(&lvm_thread, NULL, (lvm_pthread_fn_t*)lvm_thread_fn,
475                         (void *)(long)using_gulm);
476
477         /* Tell the rest of the cluster our version number */
478         /* CMAN can do this immediately, gulm needs to wait until
479            the core initialisation has finished and the node list
480            has been gathered */
481         if (clops->cluster_init_completed)
482                 clops->cluster_init_completed();
483
484         DEBUGLOG("clvmd ready for work\n");
485         child_init_signal(SUCCESS);
486
487         /* Try to shutdown neatly */
488         signal(SIGTERM, sigterm_handler);
489         signal(SIGINT, sigterm_handler);
490
491         /* Do some work */
492         main_loop(local_sock, cmd_timeout);
493
494         destroy_lvm();
495
496         return 0;
497 }
498
499 /* Called when the GuLM cluster layer has completed initialisation.
500    We send the version message */
501 void clvmd_cluster_init_completed()
502 {
503         send_version_message();
504 }
505
506 /* Data on a connected socket */
507 static int local_sock_callback(struct local_client *thisfd, char *buf, int len,
508                                const char *csid,
509                                struct local_client **new_client)
510 {
511         *new_client = NULL;
512         return read_from_local_sock(thisfd);
513 }
514
515 /* Data on a connected socket */
516 static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
517                                      int len, const char *csid,
518                                      struct local_client **new_client)
519 {
520         /* Someone connected to our local socket, accept it. */
521
522         struct sockaddr_un socka;
523         struct local_client *newfd;
524         socklen_t sl = sizeof(socka);
525         int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl);
526
527         if (client_fd == -1 && errno == EINTR)
528                 return 1;
529
530         if (client_fd >= 0) {
531                 newfd = malloc(sizeof(struct local_client));
532                 if (!newfd) {
533                         close(client_fd);
534                         return 1;
535                 }
536                 newfd->fd = client_fd;
537                 newfd->type = LOCAL_SOCK;
538                 newfd->xid = 0;
539                 newfd->removeme = 0;
540                 newfd->callback = local_sock_callback;
541                 newfd->bits.localsock.replies = NULL;
542                 newfd->bits.localsock.expected_replies = 0;
543                 newfd->bits.localsock.cmd = NULL;
544                 newfd->bits.localsock.in_progress = FALSE;
545                 newfd->bits.localsock.sent_out = FALSE;
546                 newfd->bits.localsock.threadid = 0;
547                 newfd->bits.localsock.finished = 0;
548                 newfd->bits.localsock.pipe_client = NULL;
549                 newfd->bits.localsock.private = NULL;
550                 newfd->bits.localsock.all_success = 1;
551                 DEBUGLOG("Got new connection on fd %d\n", newfd->fd);
552                 *new_client = newfd;
553         }
554         return 1;
555 }
556
557 static int local_pipe_callback(struct local_client *thisfd, char *buf,
558                                int maxlen, const char *csid,
559                                struct local_client **new_client)
560 {
561         int len;
562         char buffer[PIPE_BUF];
563         struct local_client *sock_client = thisfd->bits.pipe.client;
564         int status = -1;        /* in error by default */
565
566         len = read(thisfd->fd, buffer, sizeof(int));
567         if (len == -1 && errno == EINTR)
568                 return 1;
569
570         if (len == sizeof(int)) {
571                 memcpy(&status, buffer, sizeof(int));
572         }
573
574         DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n",
575                  thisfd->fd, len, status);
576
577         /* EOF on pipe or an error, close it */
578         if (len <= 0) {
579                 int jstat;
580                 void *ret = &status;
581                 close(thisfd->fd);
582
583                 /* Clear out the cross-link */
584                 if (thisfd->bits.pipe.client != NULL)
585                         thisfd->bits.pipe.client->bits.localsock.pipe_client =
586                             NULL;
587
588                 /* Reap child thread */
589                 if (thisfd->bits.pipe.threadid) {
590                         jstat = pthread_join(thisfd->bits.pipe.threadid, &ret);
591                         thisfd->bits.pipe.threadid = 0;
592                         if (thisfd->bits.pipe.client != NULL)
593                                 thisfd->bits.pipe.client->bits.localsock.
594                                     threadid = 0;
595                 }
596                 return -1;
597         } else {
598                 DEBUGLOG("background routine status was %d, sock_client=%p\n",
599                          status, sock_client);
600                 /* But has the client gone away ?? */
601                 if (sock_client == NULL) {
602                         DEBUGLOG
603                             ("Got PIPE response for dead client, ignoring it\n");
604                 } else {
605                         /* If error then just return that code */
606                         if (status)
607                                 send_local_reply(sock_client, status,
608                                                  sock_client->fd);
609                         else {
610                                 if (sock_client->bits.localsock.state ==
611                                     POST_COMMAND) {
612                                         send_local_reply(sock_client, 0,
613                                                          sock_client->fd);
614                                 } else  // PRE_COMMAND finished.
615                                 {
616                                         if (
617                                             (status =
618                                              distribute_command(sock_client)) !=
619                                             0) send_local_reply(sock_client,
620                                                                 EFBIG,
621                                                                 sock_client->
622                                                                 fd);
623                                 }
624                         }
625                 }
626         }
627         return len;
628 }
629
630 /* If a noed is up, look for it in the reply array, if it's not there then
631    add one with "ETIMEDOUT".
632    NOTE: This won't race with real replies because they happen in the same thread.
633 */
634 static void timedout_callback(struct local_client *client, const char *csid,
635                               int node_up)
636 {
637         if (node_up) {
638                 struct node_reply *reply;
639                 char nodename[max_cluster_member_name_len];
640
641                 clops->name_from_csid(csid, nodename);
642                 DEBUGLOG("Checking for a reply from %s\n", nodename);
643                 pthread_mutex_lock(&client->bits.localsock.reply_mutex);
644
645                 reply = client->bits.localsock.replies;
646                 while (reply && strcmp(reply->node, nodename) != 0) {
647                         reply = reply->next;
648                 }
649
650                 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
651
652                 if (!reply) {
653                         DEBUGLOG("Node %s timed-out\n", nodename);
654                         add_reply_to_list(client, ETIMEDOUT, csid,
655                                           "Command timed out", 18);
656                 }
657         }
658 }
659
660 /* Called when the request has timed out on at least one node. We fill in
661    the remaining node entries with ETIMEDOUT and return.
662
663    By the time we get here the node that caused
664    the timeout could have gone down, in which case we will never get the expected
665    number of replies that triggers the post command so we need to do it here
666 */
667 static void request_timed_out(struct local_client *client)
668 {
669         DEBUGLOG("Request timed-out. padding\n");
670         clops->cluster_do_node_callback(client, timedout_callback);
671
672         if (client->bits.localsock.num_replies !=
673             client->bits.localsock.expected_replies) {
674                 /* Post-process the command */
675                 if (client->bits.localsock.threadid) {
676                         pthread_mutex_lock(&client->bits.localsock.mutex);
677                         client->bits.localsock.state = POST_COMMAND;
678                         pthread_cond_signal(&client->bits.localsock.cond);
679                         pthread_mutex_unlock(&client->bits.localsock.mutex);
680                 }
681         }
682 }
683
684 /* This is where the real work happens */
685 static void main_loop(int local_sock, int cmd_timeout)
686 {
687         DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout);
688
689         sigset_t ss;
690         sigemptyset(&ss);
691         sigaddset(&ss, SIGINT);
692         sigaddset(&ss, SIGTERM);
693         pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
694         /* Main loop */
695         while (!quit) {
696                 fd_set in;
697                 int select_status;
698                 struct local_client *thisfd;
699                 struct timeval tv = { cmd_timeout, 0 };
700                 int quorate = clops->is_quorate();
701
702                 /* Wait on the cluster FD and all local sockets/pipes */
703                 local_client_head.fd = clops->get_main_cluster_fd();
704                 FD_ZERO(&in);
705                 for (thisfd = &local_client_head; thisfd != NULL;
706                      thisfd = thisfd->next) {
707
708                         if (thisfd->removeme)
709                                 continue;
710
711                         /* if the cluster is not quorate then don't listen for new requests */
712                         if ((thisfd->type != LOCAL_RENDEZVOUS &&
713                              thisfd->type != LOCAL_SOCK) || quorate)
714                                 FD_SET(thisfd->fd, &in);
715                 }
716
717                 select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv);
718
719                 if (reread_config) {
720                         int saved_errno = errno;
721
722                         reread_config = 0;
723                         if (clops->reread_config)
724                                 clops->reread_config();
725                         errno = saved_errno;
726                 }
727
728                 if (select_status > 0) {
729                         struct local_client *lastfd = NULL;
730                         char csid[MAX_CSID_LEN];
731                         char buf[max_cluster_message];
732
733                         for (thisfd = &local_client_head; thisfd != NULL;
734                              thisfd = thisfd->next) {
735
736                                 if (thisfd->removeme) {
737                                         struct local_client *free_fd;
738                                         lastfd->next = thisfd->next;
739                                         free_fd = thisfd;
740                                         thisfd = lastfd;
741
742                                         DEBUGLOG("removeme set for fd %d\n", free_fd->fd);
743
744                                         /* Queue cleanup, this also frees the client struct */
745                                         add_to_lvmqueue(free_fd, NULL, 0, NULL);
746                                         break;
747                                 }
748
749                                 if (FD_ISSET(thisfd->fd, &in)) {
750                                         struct local_client *newfd = NULL;
751                                         int ret;
752
753                                         /* Do callback */
754                                         ret =
755                                             thisfd->callback(thisfd, buf,
756                                                              sizeof(buf), csid,
757                                                              &newfd);
758                                         /* Ignore EAGAIN */
759                                         if (ret < 0 && (errno == EAGAIN ||
760                                                         errno == EINTR)) continue;
761
762                                         /* Got error or EOF: Remove it from the list safely */
763                                         if (ret <= 0) {
764                                                 struct local_client *free_fd;
765                                                 int type = thisfd->type;
766
767                                                 /* If the cluster socket shuts down, so do we */
768                                                 if (type == CLUSTER_MAIN_SOCK ||
769                                                     type == CLUSTER_INTERNAL)
770                                                         goto closedown;
771
772                                                 DEBUGLOG("ret == %d, errno = %d. removing client\n",
773                                                          ret, errno);
774                                                 lastfd->next = thisfd->next;
775                                                 free_fd = thisfd;
776                                                 thisfd = lastfd;
777                                                 close(free_fd->fd);
778
779                                                 /* Queue cleanup, this also frees the client struct */
780                                                 add_to_lvmqueue(free_fd, NULL, 0, NULL);
781                                                 break;
782                                         }
783
784                                         /* New client...simply add it to the list */
785                                         if (newfd) {
786                                                 newfd->next = thisfd->next;
787                                                 thisfd->next = newfd;
788                                                 break;
789                                         }
790                                 }
791                                 lastfd = thisfd;
792                         }
793                 }
794
795                 /* Select timed out. Check for clients that have been waiting too long for a response */
796                 if (select_status == 0) {
797                         time_t the_time = time(NULL);
798
799                         for (thisfd = &local_client_head; thisfd != NULL;
800                              thisfd = thisfd->next) {
801                                 if (thisfd->type == LOCAL_SOCK
802                                     && thisfd->bits.localsock.sent_out
803                                     && thisfd->bits.localsock.sent_time +
804                                     cmd_timeout < the_time
805                                     && thisfd->bits.localsock.
806                                     expected_replies !=
807                                     thisfd->bits.localsock.num_replies) {
808                                         /* Send timed out message + replies we already have */
809                                         DEBUGLOG
810                                             ("Request timed-out (send: %ld, now: %ld)\n",
811                                              thisfd->bits.localsock.sent_time,
812                                              the_time);
813
814                                         thisfd->bits.localsock.all_success = 0;
815
816                                         request_timed_out(thisfd);
817                                 }
818                         }
819                 }
820                 if (select_status < 0) {
821                         if (errno == EINTR)
822                                 continue;
823
824 #ifdef DEBUG
825                         perror("select error");
826                         exit(-1);
827 #endif
828                 }
829         }
830
831       closedown:
832         clops->cluster_closedown();
833         close(local_sock);
834 }
835
836 static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout)
837 {
838         int child_status;
839         int sstat;
840         fd_set fds;
841         struct timeval tv = {timeout, 0};
842
843         FD_ZERO(&fds);
844         FD_SET(c_pipe, &fds);
845
846         sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL);
847         if (sstat == 0) {
848                 fprintf(stderr, "clvmd startup timed out\n");
849                 exit(DFAIL_TIMEOUT);
850         }
851         if (sstat == 1) {
852                 if (read(c_pipe, &child_status, sizeof(child_status)) !=
853                     sizeof(child_status)) {
854
855                         fprintf(stderr, "clvmd failed in initialisation\n");
856                         exit(DFAIL_INIT);
857                 }
858                 else {
859                         switch (child_status) {
860                         case SUCCESS:
861                                 break;
862                         case DFAIL_INIT:
863                                 fprintf(stderr, "clvmd failed in initialisation\n");
864                                 break;
865                         case DFAIL_LOCAL_SOCK:
866                                 fprintf(stderr, "clvmd could not create local socket\n");
867                                 fprintf(stderr, "Another clvmd is probably already running\n");
868                                 break;
869                         case DFAIL_CLUSTER_IF:
870                                 fprintf(stderr, "clvmd could not connect to cluster manager\n");
871                                 fprintf(stderr, "Consult syslog for more information\n");
872                                 break;
873                         case DFAIL_MALLOC:
874                                 fprintf(stderr, "clvmd failed, not enough memory\n");
875                                 break;
876                         default:
877                                 fprintf(stderr, "clvmd failed, error was %d\n", child_status);
878                                 break;
879                         }
880                         exit(child_status);
881                 }
882         }
883         fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno));
884         exit(DFAIL_INIT);
885 }
886
887 /*
888  * Fork into the background and detach from our parent process.
889  * In the interests of user-friendliness we wait for the daemon
890  * to complete initialisation before returning its status
891  * the the user.
892  */
893 static void be_daemon(int timeout)
894 {
895         pid_t pid;
896         int devnull = open("/dev/null", O_RDWR);
897         if (devnull == -1) {
898                 perror("Can't open /dev/null");
899                 exit(3);
900         }
901
902         pipe(child_pipe);
903
904         switch (pid = fork()) {
905         case -1:
906                 perror("clvmd: can't fork");
907                 exit(2);
908
909         case 0:         /* Child */
910                 close(child_pipe[0]);
911                 break;
912
913         default:       /* Parent */
914                 close(child_pipe[1]);
915                 wait_for_child(child_pipe[0], timeout);
916         }
917
918         /* Detach ourself from the calling environment */
919         if (close(0) || close(1) || close(2)) {
920                 perror("Error closing terminal FDs");
921                 exit(4);
922         }
923         setsid();
924
925         if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0
926             || dup2(devnull, 2) < 0) {
927                 perror("Error setting terminal FDs to /dev/null");
928                 log_error("Error setting terminal FDs to /dev/null: %m");
929                 exit(5);
930         }
931         if (chdir("/")) {
932                 log_error("Error setting current directory to /: %m");
933                 exit(6);
934         }
935
936 }
937
938 /* Called when we have a read from the local socket.
939    was in the main loop but it's grown up and is a big girl now */
940 static int read_from_local_sock(struct local_client *thisfd)
941 {
942         int len;
943         int argslen;
944         int missing_len;
945         char buffer[PIPE_BUF];
946
947         len = read(thisfd->fd, buffer, sizeof(buffer));
948         if (len == -1 && errno == EINTR)
949                 return 1;
950
951         DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len);
952
953         /* EOF or error on socket */
954         if (len <= 0) {
955                 int *status;
956                 int jstat;
957
958                 DEBUGLOG("EOF on local socket: inprogress=%d\n",
959                          thisfd->bits.localsock.in_progress);
960
961                 thisfd->bits.localsock.finished = 1;
962
963                 /* If the client went away in mid command then tidy up */
964                 if (thisfd->bits.localsock.in_progress) {
965                         pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2);
966                         pthread_mutex_lock(&thisfd->bits.localsock.mutex);
967                         thisfd->bits.localsock.state = POST_COMMAND;
968                         pthread_cond_signal(&thisfd->bits.localsock.cond);
969                         pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
970
971                         /* Free any unsent buffers */
972                         free_reply(thisfd);
973                 }
974
975                 /* Kill the subthread & free resources */
976                 if (thisfd->bits.localsock.threadid) {
977                         DEBUGLOG("Waiting for child thread\n");
978                         pthread_mutex_lock(&thisfd->bits.localsock.mutex);
979                         thisfd->bits.localsock.state = PRE_COMMAND;
980                         pthread_cond_signal(&thisfd->bits.localsock.cond);
981                         pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
982
983                         jstat =
984                             pthread_join(thisfd->bits.localsock.threadid,
985                                          (void **) &status);
986                         DEBUGLOG("Joined child thread\n");
987
988                         thisfd->bits.localsock.threadid = 0;
989                         pthread_cond_destroy(&thisfd->bits.localsock.cond);
990                         pthread_mutex_destroy(&thisfd->bits.localsock.mutex);
991
992                         /* Remove the pipe client */
993                         if (thisfd->bits.localsock.pipe_client != NULL) {
994                                 struct local_client *newfd;
995                                 struct local_client *lastfd = NULL;
996                                 struct local_client *free_fd = NULL;
997
998                                 close(thisfd->bits.localsock.pipe_client->fd);  /* Close pipe */
999                                 close(thisfd->bits.localsock.pipe);
1000
1001                                 /* Remove pipe client */
1002                                 for (newfd = &local_client_head; newfd != NULL;
1003                                      newfd = newfd->next) {
1004                                         if (thisfd->bits.localsock.
1005                                             pipe_client == newfd) {
1006                                                 thisfd->bits.localsock.
1007                                                     pipe_client = NULL;
1008
1009                                                 lastfd->next = newfd->next;
1010                                                 free_fd = newfd;
1011                                                 newfd->next = lastfd;
1012                                                 free(free_fd);
1013                                                 break;
1014                                         }
1015                                         lastfd = newfd;
1016                                 }
1017                         }
1018                 }
1019
1020                 /* Free the command buffer */
1021                 free(thisfd->bits.localsock.cmd);
1022
1023                 /* Clear out the cross-link */
1024                 if (thisfd->bits.localsock.pipe_client != NULL)
1025                         thisfd->bits.localsock.pipe_client->bits.pipe.client =
1026                             NULL;
1027
1028                 close(thisfd->fd);
1029                 return 0;
1030         } else {
1031                 int comms_pipe[2];
1032                 struct local_client *newfd;
1033                 char csid[MAX_CSID_LEN];
1034                 struct clvm_header *inheader;
1035                 int status;
1036
1037                 inheader = (struct clvm_header *) buffer;
1038
1039                 /* Fill in the client ID */
1040                 inheader->clientid = htonl(thisfd->fd);
1041
1042                 /* If we are already busy then return an error */
1043                 if (thisfd->bits.localsock.in_progress) {
1044                         struct clvm_header reply;
1045                         reply.cmd = CLVMD_CMD_REPLY;
1046                         reply.status = EBUSY;
1047                         reply.arglen = 0;
1048                         reply.flags = 0;
1049                         send_message(&reply, sizeof(reply), our_csid,
1050                                      thisfd->fd,
1051                                      "Error sending EBUSY reply to local user");
1052                         return len;
1053                 }
1054
1055                 /* Free any old buffer space */
1056                 free(thisfd->bits.localsock.cmd);
1057
1058                 /* See if we have the whole message */
1059                 argslen =
1060                     len - strlen(inheader->node) - sizeof(struct clvm_header);
1061                 missing_len = inheader->arglen - argslen;
1062
1063                 if (missing_len < 0)
1064                         missing_len = 0;
1065
1066                 /* Save the message */
1067                 thisfd->bits.localsock.cmd = malloc(len + missing_len);
1068
1069                 if (!thisfd->bits.localsock.cmd) {
1070                         struct clvm_header reply;
1071                         reply.cmd = CLVMD_CMD_REPLY;
1072                         reply.status = ENOMEM;
1073                         reply.arglen = 0;
1074                         reply.flags = 0;
1075                         send_message(&reply, sizeof(reply), our_csid,
1076                                      thisfd->fd,
1077                                      "Error sending ENOMEM reply to local user");
1078                         return 0;
1079                 }
1080                 memcpy(thisfd->bits.localsock.cmd, buffer, len);
1081                 thisfd->bits.localsock.cmd_len = len + missing_len;
1082                 inheader = (struct clvm_header *) thisfd->bits.localsock.cmd;
1083
1084                 /* If we don't have the full message then read the rest now */
1085                 if (missing_len) {
1086                         char *argptr =
1087                             inheader->node + strlen(inheader->node) + 1;
1088
1089                         while (missing_len > 0 && len >= 0) {
1090                                 DEBUGLOG
1091                                     ("got %d bytes, need another %d (total %d)\n",
1092                                      argslen, missing_len, inheader->arglen);
1093                                 len = read(thisfd->fd, argptr + argslen,
1094                                            missing_len);
1095                                 if (len >= 0) {
1096                                         missing_len -= len;
1097                                         argslen += len;
1098                                 }
1099                         }
1100                 }
1101
1102                 /* Initialise and lock the mutex so the subthread will wait after
1103                    finishing the PRE routine */
1104                 if (!thisfd->bits.localsock.threadid) {
1105                         pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL);
1106                         pthread_cond_init(&thisfd->bits.localsock.cond, NULL);
1107                         pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL);
1108                 }
1109
1110                 /* Only run the command if all the cluster nodes are running CLVMD */
1111                 if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) &&
1112                     (check_all_clvmds_running(thisfd) == -1)) {
1113                         thisfd->bits.localsock.expected_replies = 0;
1114                         thisfd->bits.localsock.num_replies = 0;
1115                         send_local_reply(thisfd, EHOSTDOWN, thisfd->fd);
1116                         return len;
1117                 }
1118
1119                 /* Check the node name for validity */
1120                 if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) {
1121                         /* Error, node is not in the cluster */
1122                         struct clvm_header reply;
1123                         DEBUGLOG("Unknown node: '%s'\n", inheader->node);
1124
1125                         reply.cmd = CLVMD_CMD_REPLY;
1126                         reply.status = ENOENT;
1127                         reply.flags = 0;
1128                         reply.arglen = 0;
1129                         send_message(&reply, sizeof(reply), our_csid,
1130                                      thisfd->fd,
1131                                      "Error sending ENOENT reply to local user");
1132                         thisfd->bits.localsock.expected_replies = 0;
1133                         thisfd->bits.localsock.num_replies = 0;
1134                         thisfd->bits.localsock.in_progress = FALSE;
1135                         thisfd->bits.localsock.sent_out = FALSE;
1136                         return len;
1137                 }
1138
1139                 /* If we already have a subthread then just signal it to start */
1140                 if (thisfd->bits.localsock.threadid) {
1141                         pthread_mutex_lock(&thisfd->bits.localsock.mutex);
1142                         thisfd->bits.localsock.state = PRE_COMMAND;
1143                         pthread_cond_signal(&thisfd->bits.localsock.cond);
1144                         pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
1145                         return len;
1146                 }
1147
1148                 /* Create a pipe and add the reading end to our FD list */
1149                 pipe(comms_pipe);
1150                 newfd = malloc(sizeof(struct local_client));
1151                 if (!newfd) {
1152                         struct clvm_header reply;
1153                         close(comms_pipe[0]);
1154                         close(comms_pipe[1]);
1155
1156                         reply.cmd = CLVMD_CMD_REPLY;
1157                         reply.status = ENOMEM;
1158                         reply.arglen = 0;
1159                         reply.flags = 0;
1160                         send_message(&reply, sizeof(reply), our_csid,
1161                                      thisfd->fd,
1162                                      "Error sending ENOMEM reply to local user");
1163                         return len;
1164                 }
1165                 DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0],
1166                          comms_pipe[1]);
1167                 newfd->fd = comms_pipe[0];
1168                 newfd->removeme = 0;
1169                 newfd->type = THREAD_PIPE;
1170                 newfd->callback = local_pipe_callback;
1171                 newfd->next = thisfd->next;
1172                 newfd->bits.pipe.client = thisfd;
1173                 newfd->bits.pipe.threadid = 0;
1174                 thisfd->next = newfd;
1175
1176                 /* Store a cross link to the pipe */
1177                 thisfd->bits.localsock.pipe_client = newfd;
1178
1179                 thisfd->bits.localsock.pipe = comms_pipe[1];
1180
1181                 /* Make sure the thread has a copy of it's own ID */
1182                 newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid;
1183
1184                 /* Run the pre routine */
1185                 thisfd->bits.localsock.in_progress = TRUE;
1186                 thisfd->bits.localsock.state = PRE_COMMAND;
1187                 DEBUGLOG("Creating pre&post thread\n");
1188                 status = pthread_create(&thisfd->bits.localsock.threadid, NULL,
1189                                pre_and_post_thread, thisfd);
1190                 DEBUGLOG("Created pre&post thread, state = %d\n", status);
1191         }
1192         return len;
1193 }
1194
1195 /* Add a file descriptor from the cluster or comms interface to
1196    our list of FDs for select
1197 */
1198 int add_client(struct local_client *new_client)
1199 {
1200         new_client->next = local_client_head.next;
1201         local_client_head.next = new_client;
1202
1203         return 0;
1204 }
1205
1206 /* Called when the pre-command has completed successfully - we
1207    now execute the real command on all the requested nodes */
1208 static int distribute_command(struct local_client *thisfd)
1209 {
1210         struct clvm_header *inheader =
1211             (struct clvm_header *) thisfd->bits.localsock.cmd;
1212         int len = thisfd->bits.localsock.cmd_len;
1213
1214         thisfd->xid = global_xid++;
1215         DEBUGLOG("distribute command: XID = %d\n", thisfd->xid);
1216
1217         /* Forward it to other nodes in the cluster if needed */
1218         if (!(inheader->flags & CLVMD_FLAG_LOCAL)) {
1219                 /* if node is empty then do it on the whole cluster */
1220                 if (inheader->node[0] == '\0') {
1221                         thisfd->bits.localsock.expected_replies =
1222                             clops->get_num_nodes();
1223                         thisfd->bits.localsock.num_replies = 0;
1224                         thisfd->bits.localsock.sent_time = time(NULL);
1225                         thisfd->bits.localsock.in_progress = TRUE;
1226                         thisfd->bits.localsock.sent_out = TRUE;
1227
1228                         /* Do it here first */
1229                         add_to_lvmqueue(thisfd, inheader, len, NULL);
1230
1231                         DEBUGLOG("Sending message to all cluster nodes\n");
1232                         inheader->xid = thisfd->xid;
1233                         send_message(inheader, len, NULL, -1,
1234                                      "Error forwarding message to cluster");
1235                 } else {
1236                         /* Do it on a single node */
1237                         char csid[MAX_CSID_LEN];
1238
1239                         if (clops->csid_from_name(csid, inheader->node)) {
1240                                 /* This has already been checked so should not happen */
1241                                 return 0;
1242                         } else {
1243                                 /* OK, found a node... */
1244                                 thisfd->bits.localsock.expected_replies = 1;
1245                                 thisfd->bits.localsock.num_replies = 0;
1246                                 thisfd->bits.localsock.in_progress = TRUE;
1247
1248                                 /* Are we the requested node ?? */
1249                                 if (memcmp(csid, our_csid, max_csid_len) == 0) {
1250                                         DEBUGLOG("Doing command on local node only\n");
1251                                         add_to_lvmqueue(thisfd, inheader, len, NULL);
1252                                 } else {
1253                                         DEBUGLOG("Sending message to single node: %s\n",
1254                                                  inheader->node);
1255                                         inheader->xid = thisfd->xid;
1256                                         send_message(inheader, len,
1257                                                      csid, -1,
1258                                                      "Error forwarding message to cluster node");
1259                                 }
1260                         }
1261                 }
1262         } else {
1263                 /* Local explicitly requested, ignore nodes */
1264                 thisfd->bits.localsock.in_progress = TRUE;
1265                 thisfd->bits.localsock.expected_replies = 1;
1266                 thisfd->bits.localsock.num_replies = 0;
1267                 add_to_lvmqueue(thisfd, inheader, len, NULL);
1268         }
1269         return 0;
1270 }
1271
1272 /* Process a command from a remote node and return the result */
1273 static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
1274                                    const char *csid)
1275 {
1276         char *replyargs;
1277         char nodename[max_cluster_member_name_len];
1278         int replylen = 0;
1279         int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1280         int status;
1281         int msg_malloced = 0;
1282
1283         /* Get the node name as we /may/ need it later */
1284         clops->name_from_csid(csid, nodename);
1285
1286         DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n",
1287                  decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename);
1288
1289         /* Check for GOAWAY and sulk */
1290         if (msg->cmd == CLVMD_CMD_GOAWAY) {
1291
1292                 DEBUGLOG("Told to go away by %s\n", nodename);
1293                 log_error("Told to go away by %s\n", nodename);
1294                 exit(99);
1295         }
1296
1297         /* Version check is internal - don't bother exposing it in
1298            clvmd-command.c */
1299         if (msg->cmd == CLVMD_CMD_VERSION) {
1300                 int version_nums[3];
1301                 char node[256];
1302
1303                 memcpy(version_nums, msg->args, sizeof(version_nums));
1304
1305                 clops->name_from_csid(csid, node);
1306                 DEBUGLOG("Remote node %s is version %d.%d.%d\n",
1307                          node,
1308                          ntohl(version_nums[0]),
1309                          ntohl(version_nums[1]), ntohl(version_nums[2]));
1310
1311                 if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) {
1312                         struct clvm_header byebyemsg;
1313                         DEBUGLOG
1314                             ("Telling node %s to go away because of incompatible version number\n",
1315                              node);
1316                         log_notice
1317                             ("Telling node %s to go away because of incompatible version number %d.%d.%d\n",
1318                              node, ntohl(version_nums[0]),
1319                              ntohl(version_nums[1]), ntohl(version_nums[2]));
1320
1321                         byebyemsg.cmd = CLVMD_CMD_GOAWAY;
1322                         byebyemsg.status = 0;
1323                         byebyemsg.flags = 0;
1324                         byebyemsg.arglen = 0;
1325                         byebyemsg.clientid = 0;
1326                         clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg),
1327                                              our_csid,
1328                                              "Error Sending GOAWAY message");
1329                 } else {
1330                         clops->add_up_node(csid);
1331                 }
1332                 return;
1333         }
1334
1335         /* Allocate a default reply buffer */
1336         replyargs = malloc(max_cluster_message - sizeof(struct clvm_header));
1337
1338         if (replyargs != NULL) {
1339                 /* Run the command */
1340                 status =
1341                     do_command(NULL, msg, msglen, &replyargs, buflen,
1342                                &replylen);
1343         } else {
1344                 status = ENOMEM;
1345         }
1346
1347         /* If it wasn't a reply, then reply */
1348         if (msg->cmd != CLVMD_CMD_REPLY) {
1349                 char *aggreply;
1350
1351                 aggreply =
1352                     realloc(replyargs, replylen + sizeof(struct clvm_header));
1353                 if (aggreply) {
1354                         struct clvm_header *agghead =
1355                             (struct clvm_header *) aggreply;
1356
1357                         replyargs = aggreply;
1358                         /* Move it up so there's room for a header in front of the data */
1359                         memmove(aggreply + offsetof(struct clvm_header, args),
1360                                 replyargs, replylen);
1361
1362                         agghead->xid = msg->xid;
1363                         agghead->cmd = CLVMD_CMD_REPLY;
1364                         agghead->status = status;
1365                         agghead->flags = 0;
1366                         agghead->clientid = msg->clientid;
1367                         agghead->arglen = replylen;
1368                         agghead->node[0] = '\0';
1369                         send_message(aggreply,
1370                                      sizeof(struct clvm_header) +
1371                                      replylen, csid, fd,
1372                                      "Error sending command reply");
1373                 } else {
1374                         struct clvm_header head;
1375
1376                         DEBUGLOG("Error attempting to realloc return buffer\n");
1377                         /* Return a failure response */
1378                         head.cmd = CLVMD_CMD_REPLY;
1379                         head.status = ENOMEM;
1380                         head.flags = 0;
1381                         head.clientid = msg->clientid;
1382                         head.arglen = 0;
1383                         head.node[0] = '\0';
1384                         send_message(&head, sizeof(struct clvm_header), csid,
1385                                      fd, "Error sending ENOMEM command reply");
1386                         return;
1387                 }
1388         }
1389
1390         /* Free buffer if it was malloced */
1391         if (msg_malloced) {
1392                 free(msg);
1393         }
1394         free(replyargs);
1395 }
1396
1397 /* Add a reply to a command to the list of replies for this client.
1398    If we have got a full set then send them to the waiting client down the local
1399    socket */
1400 static void add_reply_to_list(struct local_client *client, int status,
1401                               const char *csid, const char *buf, int len)
1402 {
1403         struct node_reply *reply;
1404
1405         pthread_mutex_lock(&client->bits.localsock.reply_mutex);
1406
1407         /* Add it to the list of replies */
1408         reply = malloc(sizeof(struct node_reply));
1409         if (reply) {
1410                 reply->status = status;
1411                 clops->name_from_csid(csid, reply->node);
1412                 DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len);
1413
1414                 if (len > 0) {
1415                         reply->replymsg = malloc(len);
1416                         if (!reply->replymsg) {
1417                                 reply->status = ENOMEM;
1418                         } else {
1419                                 memcpy(reply->replymsg, buf, len);
1420                         }
1421                 } else {
1422                         reply->replymsg = NULL;
1423                 }
1424                 /* Hook it onto the reply chain */
1425                 reply->next = client->bits.localsock.replies;
1426                 client->bits.localsock.replies = reply;
1427         } else {
1428                 /* It's all gone horribly wrong... */
1429                 pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1430                 send_local_reply(client, ENOMEM, client->fd);
1431                 return;
1432         }
1433         DEBUGLOG("Got %d replies, expecting: %d\n",
1434                  client->bits.localsock.num_replies + 1,
1435                  client->bits.localsock.expected_replies);
1436
1437         /* If we have the whole lot then do the post-process */
1438         if (++client->bits.localsock.num_replies ==
1439             client->bits.localsock.expected_replies) {
1440                 /* Post-process the command */
1441                 if (client->bits.localsock.threadid) {
1442                         pthread_mutex_lock(&client->bits.localsock.mutex);
1443                         client->bits.localsock.state = POST_COMMAND;
1444                         pthread_cond_signal(&client->bits.localsock.cond);
1445                         pthread_mutex_unlock(&client->bits.localsock.mutex);
1446                 }
1447         }
1448         pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
1449 }
1450
1451 /* This is the thread that runs the PRE and post commands for a particular connection */
1452 static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg)
1453 {
1454         struct local_client *client = (struct local_client *) arg;
1455         int status;
1456         int write_status;
1457         sigset_t ss;
1458         int pipe_fd = client->bits.localsock.pipe;
1459
1460         DEBUGLOG("in sub thread: client = %p\n", client);
1461
1462         /* Don't start until the LVM thread is ready */
1463         pthread_mutex_lock(&lvm_start_mutex);
1464         pthread_mutex_unlock(&lvm_start_mutex);
1465         DEBUGLOG("Sub thread ready for work.\n");
1466
1467         /* Ignore SIGUSR1 (handled by master process) but enable
1468            SIGUSR2 (kills subthreads) */
1469         sigemptyset(&ss);
1470         sigaddset(&ss, SIGUSR1);
1471         pthread_sigmask(SIG_BLOCK, &ss, NULL);
1472
1473         sigdelset(&ss, SIGUSR1);
1474         sigaddset(&ss, SIGUSR2);
1475         pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
1476
1477         /* Loop around doing PRE and POST functions until the client goes away */
1478         while (!client->bits.localsock.finished) {
1479                 /* Execute the code */
1480                 status = do_pre_command(client);
1481
1482                 if (status)
1483                         client->bits.localsock.all_success = 0;
1484
1485                 DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd);
1486
1487                 /* Tell the parent process we have finished this bit */
1488                 do {
1489                         write_status = write(pipe_fd, &status, sizeof(int));
1490                         if (write_status == sizeof(int))
1491                                 break;
1492                         if (write_status < 0 &&
1493                             (errno == EINTR || errno == EAGAIN))
1494                                 continue;
1495                         log_error("Error sending to pipe: %m\n");
1496                         break;
1497                 } while(1);
1498
1499                 if (status) {
1500                         client->bits.localsock.state = POST_COMMAND;
1501                         goto next_pre;
1502                 }
1503
1504                 /* We may need to wait for the condition variable before running the post command */
1505                 pthread_mutex_lock(&client->bits.localsock.mutex);
1506                 DEBUGLOG("Waiting to do post command - state = %d\n",
1507                          client->bits.localsock.state);
1508
1509                 if (client->bits.localsock.state != POST_COMMAND) {
1510                         pthread_cond_wait(&client->bits.localsock.cond,
1511                                           &client->bits.localsock.mutex);
1512                 }
1513                 pthread_mutex_unlock(&client->bits.localsock.mutex);
1514
1515                 DEBUGLOG("Got post command condition...\n");
1516
1517                 /* POST function must always run, even if the client aborts */
1518                 status = 0;
1519                 do_post_command(client);
1520
1521                 do {
1522                         write_status = write(pipe_fd, &status, sizeof(int));
1523                         if (write_status == sizeof(int))
1524                                 break;
1525                         if (write_status < 0 &&
1526                             (errno == EINTR || errno == EAGAIN))
1527                                 continue;
1528                         log_error("Error sending to pipe: %m\n");
1529                         break;
1530                 } while(1);
1531 next_pre:
1532                 DEBUGLOG("Waiting for next pre command\n");
1533
1534                 pthread_mutex_lock(&client->bits.localsock.mutex);
1535                 if (client->bits.localsock.state != PRE_COMMAND &&
1536                     !client->bits.localsock.finished) {
1537                         pthread_cond_wait(&client->bits.localsock.cond,
1538                                           &client->bits.localsock.mutex);
1539                 }
1540                 pthread_mutex_unlock(&client->bits.localsock.mutex);
1541
1542                 DEBUGLOG("Got pre command condition...\n");
1543         }
1544         DEBUGLOG("Subthread finished\n");
1545         pthread_exit((void *) 0);
1546 }
1547
1548 /* Process a command on the local node and store the result */
1549 static int process_local_command(struct clvm_header *msg, int msglen,
1550                                  struct local_client *client,
1551                                  unsigned short xid)
1552 {
1553         char *replybuf = malloc(max_cluster_message);
1554         int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
1555         int replylen = 0;
1556         int status;
1557
1558         DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n",
1559                  decode_cmd(msg->cmd), msg, msglen, client);
1560
1561         if (replybuf == NULL)
1562                 return -1;
1563
1564         status = do_command(client, msg, msglen, &replybuf, buflen, &replylen);
1565
1566         if (status)
1567                 client->bits.localsock.all_success = 0;
1568
1569         /* If we took too long then discard the reply */
1570         if (xid == client->xid) {
1571                 add_reply_to_list(client, status, our_csid, replybuf, replylen);
1572         } else {
1573                 DEBUGLOG
1574                     ("Local command took too long, discarding xid %d, current is %d\n",
1575                      xid, client->xid);
1576         }
1577
1578         free(replybuf);
1579         return status;
1580 }
1581
1582 static int process_reply(const struct clvm_header *msg, int msglen, const char *csid)
1583 {
1584         struct local_client *client = NULL;
1585
1586         client = find_client(msg->clientid);
1587         if (!client) {
1588                 DEBUGLOG("Got message for unknown client 0x%x\n",
1589                          msg->clientid);
1590                 log_error("Got message for unknown client 0x%x\n",
1591                           msg->clientid);
1592                 return -1;
1593         }
1594
1595         if (msg->status)
1596                 client->bits.localsock.all_success = 0;
1597
1598         /* Gather replies together for this client id */
1599         if (msg->xid == client->xid) {
1600                 add_reply_to_list(client, msg->status, csid, msg->args,
1601                                   msg->arglen);
1602         } else {
1603                 DEBUGLOG("Discarding reply with old XID %d, current = %d\n",
1604                          msg->xid, client->xid);
1605         }
1606         return 0;
1607 }
1608
1609 /* Send an aggregated reply back to the client */
1610 static void send_local_reply(struct local_client *client, int status, int fd)
1611 {
1612         struct clvm_header *clientreply;
1613         struct node_reply *thisreply = client->bits.localsock.replies;
1614         char *replybuf;
1615         char *ptr;
1616         int message_len = 0;
1617
1618         DEBUGLOG("Send local reply\n");
1619
1620         /* Work out the total size of the reply */
1621         while (thisreply) {
1622                 if (thisreply->replymsg)
1623                         message_len += strlen(thisreply->replymsg) + 1;
1624                 else
1625                         message_len++;
1626
1627                 message_len += strlen(thisreply->node) + 1 + sizeof(int);
1628
1629                 thisreply = thisreply->next;
1630         }
1631
1632         /* Add in the size of our header */
1633         message_len = message_len + sizeof(struct clvm_header) + 1;
1634         replybuf = malloc(message_len);
1635
1636         clientreply = (struct clvm_header *) replybuf;
1637         clientreply->status = status;
1638         clientreply->cmd = CLVMD_CMD_REPLY;
1639         clientreply->node[0] = '\0';
1640         clientreply->flags = 0;
1641
1642         ptr = clientreply->args;
1643
1644         /* Add in all the replies, and free them as we go */
1645         thisreply = client->bits.localsock.replies;
1646         while (thisreply) {
1647                 struct node_reply *tempreply = thisreply;
1648
1649                 strcpy(ptr, thisreply->node);
1650                 ptr += strlen(thisreply->node) + 1;
1651
1652                 if (thisreply->status)
1653                         clientreply->flags |= CLVMD_FLAG_NODEERRS;
1654
1655                 memcpy(ptr, &thisreply->status, sizeof(int));
1656                 ptr += sizeof(int);
1657
1658                 if (thisreply->replymsg) {
1659                         strcpy(ptr, thisreply->replymsg);
1660                         ptr += strlen(thisreply->replymsg) + 1;
1661                 } else {
1662                         ptr[0] = '\0';
1663                         ptr++;
1664                 }
1665                 thisreply = thisreply->next;
1666
1667                 free(tempreply->replymsg);
1668                 free(tempreply);
1669         }
1670
1671         /* Terminate with an empty node name */
1672         *ptr = '\0';
1673
1674         clientreply->arglen = ptr - clientreply->args + 1;
1675
1676         /* And send it */
1677         send_message(replybuf, message_len, our_csid, fd,
1678                      "Error sending REPLY to client");
1679         free(replybuf);
1680
1681         /* Reset comms variables */
1682         client->bits.localsock.replies = NULL;
1683         client->bits.localsock.expected_replies = 0;
1684         client->bits.localsock.in_progress = FALSE;
1685         client->bits.localsock.sent_out = FALSE;
1686 }
1687
1688 /* Just free a reply chain baceuse it wasn't used. */
1689 static void free_reply(struct local_client *client)
1690 {
1691         /* Add in all the replies, and free them as we go */
1692         struct node_reply *thisreply = client->bits.localsock.replies;
1693         while (thisreply) {
1694                 struct node_reply *tempreply = thisreply;
1695
1696                 thisreply = thisreply->next;
1697
1698                 free(tempreply->replymsg);
1699                 free(tempreply);
1700         }
1701         client->bits.localsock.replies = NULL;
1702 }
1703
1704 /* Send our version number to the cluster */
1705 static void send_version_message()
1706 {
1707         char message[sizeof(struct clvm_header) + sizeof(int) * 3];
1708         struct clvm_header *msg = (struct clvm_header *) message;
1709         int version_nums[3];
1710
1711         msg->cmd = CLVMD_CMD_VERSION;
1712         msg->status = 0;
1713         msg->flags = 0;
1714         msg->clientid = 0;
1715         msg->arglen = sizeof(version_nums);
1716
1717         version_nums[0] = htonl(CLVMD_MAJOR_VERSION);
1718         version_nums[1] = htonl(CLVMD_MINOR_VERSION);
1719         version_nums[2] = htonl(CLVMD_PATCH_VERSION);
1720
1721         memcpy(&msg->args, version_nums, sizeof(version_nums));
1722
1723         hton_clvm(msg);
1724
1725         clops->cluster_send_message(message, sizeof(message), NULL,
1726                              "Error Sending version number");
1727 }
1728
1729 /* Send a message to either a local client or another server */
1730 static int send_message(void *buf, int msglen, const char *csid, int fd,
1731                         const char *errtext)
1732 {
1733         int len = 0;
1734         int saved_errno = 0;
1735         struct timespec delay;
1736         struct timespec remtime;
1737
1738         int retry_cnt = 0;
1739
1740         /* Send remote messages down the cluster socket */
1741         if (csid == NULL || !ISLOCAL_CSID(csid)) {
1742                 hton_clvm((struct clvm_header *) buf);
1743                 return clops->cluster_send_message(buf, msglen, csid, errtext);
1744         } else {
1745                 int ptr = 0;
1746
1747                 /* Make sure it all goes */
1748                 do {
1749                         if (retry_cnt > MAX_RETRIES)
1750                         {
1751                                 errno = saved_errno;
1752                                 log_error("%s", errtext);
1753                                 errno = saved_errno;
1754                                 break;
1755                         }
1756
1757                         len = write(fd, buf + ptr, msglen - ptr);
1758
1759                         if (len <= 0) {
1760                                 if (errno == EINTR)
1761                                         continue;
1762                                 if (errno == EAGAIN ||
1763                                     errno == EIO ||
1764                                     errno == ENOSPC) {
1765                                         saved_errno = errno;
1766                                         retry_cnt++;
1767
1768                                         delay.tv_sec = 0;
1769                                         delay.tv_nsec = 100000;
1770                                         remtime.tv_sec = 0;
1771                                         remtime.tv_nsec = 0;
1772                                         (void) nanosleep (&delay, &remtime);
1773
1774                                         continue;
1775                                 }
1776                                 log_error("%s", errtext);
1777                                 break;
1778                         }
1779                         ptr += len;
1780                 } while (ptr < msglen);
1781         }
1782         return len;
1783 }
1784
1785 static int process_work_item(struct lvm_thread_cmd *cmd)
1786 {
1787         /* If msg is NULL then this is a cleanup request */
1788         if (cmd->msg == NULL) {
1789                 DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd);
1790                 cmd_client_cleanup(cmd->client);
1791                 free(cmd->client);
1792                 return 0;
1793         }
1794
1795         if (!cmd->remote) {
1796                 DEBUGLOG("process_work_item: local\n");
1797                 process_local_command(cmd->msg, cmd->msglen, cmd->client,
1798                                       cmd->xid);
1799         } else {
1800                 DEBUGLOG("process_work_item: remote\n");
1801                 process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd,
1802                                        cmd->csid);
1803         }
1804         return 0;
1805 }
1806
1807 /*
1808  * Routine that runs in the "LVM thread".
1809  */
1810 static void lvm_thread_fn(void *arg)
1811 {
1812         struct dm_list *cmdl, *tmp;
1813         sigset_t ss;
1814         int using_gulm = (int)(long)arg;
1815
1816         DEBUGLOG("LVM thread function started\n");
1817
1818         /* Ignore SIGUSR1 & 2 */
1819         sigemptyset(&ss);
1820         sigaddset(&ss, SIGUSR1);
1821         sigaddset(&ss, SIGUSR2);
1822         pthread_sigmask(SIG_BLOCK, &ss, NULL);
1823
1824         /* Initialise the interface to liblvm */
1825         init_lvm(using_gulm);
1826
1827         /* Allow others to get moving */
1828         pthread_mutex_unlock(&lvm_start_mutex);
1829
1830         /* Now wait for some actual work */
1831         for (;;) {
1832                 DEBUGLOG("LVM thread waiting for work\n");
1833
1834                 pthread_mutex_lock(&lvm_thread_mutex);
1835                 if (dm_list_empty(&lvm_cmd_head))
1836                         pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex);
1837
1838                 dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) {
1839                         struct lvm_thread_cmd *cmd;
1840
1841                         cmd =
1842                             dm_list_struct_base(cmdl, struct lvm_thread_cmd, list);
1843                         dm_list_del(&cmd->list);
1844                         pthread_mutex_unlock(&lvm_thread_mutex);
1845
1846                         process_work_item(cmd);
1847                         free(cmd->msg);
1848                         free(cmd);
1849
1850                         pthread_mutex_lock(&lvm_thread_mutex);
1851                 }
1852                 pthread_mutex_unlock(&lvm_thread_mutex);
1853         }
1854 }
1855
1856 /* Pass down some work to the LVM thread */
1857 static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
1858                            int msglen, const char *csid)
1859 {
1860         struct lvm_thread_cmd *cmd;
1861
1862         cmd = malloc(sizeof(struct lvm_thread_cmd));
1863         if (!cmd)
1864                 return ENOMEM;
1865
1866         if (msglen) {
1867                 cmd->msg = malloc(msglen);
1868                 if (!cmd->msg) {
1869                         log_error("Unable to allocate buffer space\n");
1870                         free(cmd);
1871                         return -1;
1872                 }
1873                 memcpy(cmd->msg, msg, msglen);
1874         }
1875         else {
1876                 cmd->msg = NULL;
1877         }
1878         cmd->client = client;
1879         cmd->msglen = msglen;
1880         cmd->xid = client->xid;
1881
1882         if (csid) {
1883                 memcpy(cmd->csid, csid, max_csid_len);
1884                 cmd->remote = 1;
1885         } else {
1886                 cmd->remote = 0;
1887         }
1888
1889         DEBUGLOG
1890             ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n",
1891              cmd, client, msg, msglen, csid, cmd->xid);
1892         pthread_mutex_lock(&lvm_thread_mutex);
1893         dm_list_add(&lvm_cmd_head, &cmd->list);
1894         pthread_cond_signal(&lvm_thread_cond);
1895         pthread_mutex_unlock(&lvm_thread_mutex);
1896
1897         return 0;
1898 }
1899
1900 /* Return 0 if we can talk to an existing clvmd */
1901 static int check_local_clvmd(void)
1902 {
1903         int local_socket;
1904         struct sockaddr_un sockaddr;
1905         int ret = 0;
1906
1907         /* Open local socket */
1908         if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
1909                 return -1;
1910         }
1911
1912         memset(&sockaddr, 0, sizeof(sockaddr));
1913         memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1914         sockaddr.sun_family = AF_UNIX;
1915
1916         if (connect(local_socket,(struct sockaddr *) &sockaddr,
1917                     sizeof(sockaddr))) {
1918                 ret = -1;
1919         }
1920
1921         close(local_socket);
1922         return ret;
1923 }
1924
1925
1926 /* Open the local socket, that's the one we talk to libclvm down */
1927 static int open_local_sock()
1928 {
1929         int local_socket;
1930         struct sockaddr_un sockaddr;
1931
1932         /* Open local socket */
1933         if (CLVMD_SOCKNAME[0] != '\0')
1934                 unlink(CLVMD_SOCKNAME);
1935         local_socket = socket(PF_UNIX, SOCK_STREAM, 0);
1936         if (local_socket < 0) {
1937                 log_error("Can't create local socket: %m");
1938                 return -1;
1939         }
1940         /* Set Close-on-exec & non-blocking */
1941         fcntl(local_socket, F_SETFD, 1);
1942         fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK);
1943
1944         memset(&sockaddr, 0, sizeof(sockaddr));
1945         memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
1946         sockaddr.sun_family = AF_UNIX;
1947         if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) {
1948                 log_error("can't bind local socket: %m");
1949                 close(local_socket);
1950                 return -1;
1951         }
1952         if (listen(local_socket, 1) != 0) {
1953                 log_error("listen local: %m");
1954                 close(local_socket);
1955                 return -1;
1956         }
1957         if (CLVMD_SOCKNAME[0] != '\0')
1958                 chmod(CLVMD_SOCKNAME, 0600);
1959
1960         return local_socket;
1961 }
1962
1963 void process_message(struct local_client *client, const char *buf, int len,
1964                      const char *csid)
1965 {
1966         struct clvm_header *inheader;
1967
1968         inheader = (struct clvm_header *) buf;
1969         ntoh_clvm(inheader);    /* Byteswap fields */
1970         if (inheader->cmd == CLVMD_CMD_REPLY)
1971                 process_reply(inheader, len, csid);
1972         else
1973                 add_to_lvmqueue(client, inheader, len, csid);
1974 }
1975
1976
1977 static void check_all_callback(struct local_client *client, const char *csid,
1978                                int node_up)
1979 {
1980         if (!node_up)
1981                 add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running",
1982                                   18);
1983 }
1984
1985 /* Check to see if all CLVMDs are running (ie one on
1986    every node in the cluster).
1987    If not, returns -1 and prints out a list of errant nodes */
1988 static int check_all_clvmds_running(struct local_client *client)
1989 {
1990         DEBUGLOG("check_all_clvmds_running\n");
1991         return clops->cluster_do_node_callback(client, check_all_callback);
1992 }
1993
1994 /* Return a local_client struct given a client ID.
1995    client IDs are in network byte order */
1996 static struct local_client *find_client(int clientid)
1997 {
1998         struct local_client *thisfd;
1999         for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) {
2000                 if (thisfd->fd == ntohl(clientid))
2001                         return thisfd;
2002         }
2003         return NULL;
2004 }
2005
2006 /* Byte-swapping routines for the header so we
2007    work in a heterogeneous environment */
2008 static void hton_clvm(struct clvm_header *hdr)
2009 {
2010         hdr->status = htonl(hdr->status);
2011         hdr->arglen = htonl(hdr->arglen);
2012         hdr->xid = htons(hdr->xid);
2013         /* Don't swap clientid as it's only a token as far as
2014            remote nodes are concerned */
2015 }
2016
2017 static void ntoh_clvm(struct clvm_header *hdr)
2018 {
2019         hdr->status = ntohl(hdr->status);
2020         hdr->arglen = ntohl(hdr->arglen);
2021         hdr->xid = ntohs(hdr->xid);
2022 }
2023
2024 /* Handler for SIGUSR2 - sent to kill subthreads */
2025 static void sigusr2_handler(int sig)
2026 {
2027         DEBUGLOG("SIGUSR2 received\n");
2028         return;
2029 }
2030
2031 static void sigterm_handler(int sig)
2032 {
2033         DEBUGLOG("SIGTERM received\n");
2034         quit = 1;
2035         return;
2036 }
2037
2038 static void sighup_handler(int sig)
2039 {
2040         DEBUGLOG("got SIGHUP\n");
2041         reread_config = 1;
2042 }
2043
2044 int sync_lock(const char *resource, int mode, int flags, int *lockid)
2045 {
2046         return clops->sync_lock(resource, mode, flags, lockid);
2047 }
2048
2049 int sync_unlock(const char *resource, int lockid)
2050 {
2051         return clops->sync_unlock(resource, lockid);
2052 }
2053
2054 static if_type_t parse_cluster_interface(char *ifname)
2055 {
2056         if_type_t iface = IF_AUTO;
2057
2058         if (!strcmp(ifname, "auto"))
2059                 iface = IF_AUTO;
2060         if (!strcmp(ifname, "cman"))
2061                 iface = IF_CMAN;
2062         if (!strcmp(ifname, "gulm"))
2063                 iface = IF_GULM;
2064         if (!strcmp(ifname, "openais"))
2065                 iface = IF_OPENAIS;
2066         if (!strcmp(ifname, "corosync"))
2067                 iface = IF_COROSYNC;
2068
2069         return iface;
2070 }
2071
2072 /*
2073  * Try and find a cluster system in corosync's objdb, if it is running. This is
2074  * only called if the command-line option is not present, and if it fails
2075  * we still try the interfaces in order.
2076  */
2077 static if_type_t get_cluster_type()
2078 {
2079 #ifdef HAVE_COROSYNC_CONFDB_H
2080         confdb_handle_t handle;
2081         if_type_t type = IF_AUTO;
2082         int result;
2083         char buf[255];
2084         size_t namelen = sizeof(buf);
2085         hdb_handle_t cluster_handle;
2086         hdb_handle_t clvmd_handle;
2087         confdb_callbacks_t callbacks = {
2088                 .confdb_key_change_notify_fn = NULL,
2089                 .confdb_object_create_change_notify_fn = NULL,
2090                 .confdb_object_delete_change_notify_fn = NULL
2091         };
2092
2093         result = confdb_initialize (&handle, &callbacks);
2094         if (result != CS_OK)
2095                 return type;
2096
2097         result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
2098         if (result != CS_OK)
2099                 goto out;
2100
2101         result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
2102         if (result != CS_OK)
2103                 goto out;
2104
2105         result = confdb_object_find_start(handle, cluster_handle);
2106         if (result != CS_OK)
2107                 goto out;
2108
2109         result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle);
2110         if (result != CS_OK)
2111                 goto out;
2112
2113         result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen);
2114         if (result != CS_OK)
2115                 goto out;
2116
2117         buf[namelen] = '\0';
2118         type = parse_cluster_interface(buf);
2119         DEBUGLOG("got interface type '%s' from confdb\n", buf);
2120 out:
2121         confdb_finalize(handle);
2122         return type;
2123 #else
2124         return IF_AUTO;
2125 #endif
2126 }