cluster - Improve hammer2 connect directive, add /etc/hammer2/autoconn
authorMatthew Dillon <dillon@apollo.backplane.com>
Tue, 4 Dec 2012 22:23:22 +0000 (14:23 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Tue, 4 Dec 2012 22:29:59 +0000 (14:29 -0800)
* Improve the hammer2 connect directive, fixing a socket descriptor reuse
  race.

* Monitor the file /etc/hammer2/autoconn which contains a list of hosts
  the hammer2 service demon whould automatically maintain connections to.

* Adjust the dmsg_master_service() API to also allow an alternative
  signalling descriptor and callback to be specified.  This simplifies
  forced connection terminations.

* Fix descriptor leak in dmsg_master_service() on closure.

lib/libdmsg/dmsg.h
lib/libdmsg/msg.c
lib/libdmsg/msg_lnk.c
lib/libdmsg/service.c
sbin/hammer2/cmd_service.c

index 8b9dc9b..efd1403 100644 (file)
@@ -329,11 +329,13 @@ struct crypto_algo {
  */
 struct dmsg_master_service_info {
        int     fd;
+       int     altfd;
        int     detachme;
        char    *label;
        void    *handle;
        void    (*dbgmsg_callback)(dmsg_msg_t *msg);
        void    (*exit_callback)(void *handle);
+       void    (*altmsg_callback)(dmsg_iocom_t *);
 };
 
 typedef struct dmsg_master_service_info dmsg_master_service_info_t;
index b579ac8..18173cd 100644 (file)
@@ -1080,9 +1080,12 @@ skip:
                        if (state->rxcmd & DMSGF_DELETE) {
                                dmsg_msg_free(msg);
                                fprintf(stderr,
-                                       "iocom: ioq error %d sleeping\n",
-                                       ioq->error);
-                               sleep(1);       /* XXX */
+                                       "iocom: ioq error(rd) %d sleeping "
+                                       "state %p rxcmd %08x txcmd %08x "
+                                       "func %p\n",
+                                       ioq->error, state, state->rxcmd,
+                                       state->txcmd, state->func);
+                               usleep(100000); /* XXX */
                                atomic_set_int(&iocom->flags,
                                               DMSG_IOCOMF_RWORK);
                                msg = NULL;
@@ -1102,9 +1105,12 @@ skip:
                        if (state->rxcmd & DMSGF_DELETE) {
                                dmsg_msg_free(msg);
                                fprintf(stderr,
-                                       "iocom: ioq error %d sleeping\n",
-                                       ioq->error);
-                               sleep(1);       /* XXX */
+                                       "iocom: ioq error(wr) %d sleeping "
+                                       "state %p rxcmd %08x txcmd %08x "
+                                       "func %p\n",
+                                       ioq->error, state, state->rxcmd,
+                                       state->txcmd, state->func);
+                               usleep(100000); /* XXX */
                                atomic_set_int(&iocom->flags,
                                               DMSG_IOCOMF_RWORK);
                                msg = NULL;
index e2e2b73..a8c3302 100644 (file)
@@ -126,6 +126,7 @@ struct h2span_media {
                pthread_cond_t          cond;
                int                     ctl;
                int                     fd;
+               int                     pipefd[2];      /* signal stop */
                dmsg_iocom_t            iocom;
                pthread_t               iocom_thread;
                enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
@@ -857,7 +858,13 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                circA->span_state = tx_state;   /* H2SPAN_RELAY state */
                circA->is_relay = 1;
                circA->refs = 2;                /* state and peer */
+
+               /*
+                * Upgrade received state so we act on both it and its
+                * peer (created below) symmetrically.
+                */
                msg->state->any.circ = circA;
+               msg->state->func = dmsg_lnk_circ;
 
                iocomB = rx_state->iocom;
 
@@ -932,12 +939,12 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                disconnect = 0;
                if (circB && (state = circB->state) != NULL) {
                        if (state->rxcmd & DMSGF_DELETE) {
+                               disconnect = 1;
                                circB->state = NULL;
                                state->any.circ = NULL;
                                dmsg_circuit_drop(circB);
                        }
                        dmsg_state_reply(state, msg->any.head.error);
-                       disconnect = 1;
                }
 
                /*
@@ -946,11 +953,11 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                 */
                if (circA && (state = circA->state) != NULL) {
                        if (state->txcmd & DMSGF_DELETE) {
+                               disconnect = 1;
                                circA->state = NULL;
                                state->any.circ = NULL;
                                dmsg_circuit_drop(circA);
                        }
-                       disconnect = 1;
                }
 
                /*
@@ -1001,12 +1008,12 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                disconnect = 0;
                if (circA && (state = circA->state) != NULL) {
                        if (state->rxcmd & DMSGF_DELETE) {
+                               disconnect = 1;
                                circA->state = NULL;
                                state->any.circ = NULL;
                                dmsg_circuit_drop(circA);
                        }
                        dmsg_state_reply(state, msg->any.head.error);
-                       disconnect = 1;
                }
 
                /*
@@ -1015,11 +1022,11 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                 */
                if (circB && (state = circB->state) != NULL) {
                        if (state->txcmd & DMSGF_DELETE) {
+                               disconnect = 1;
                                circB->state = NULL;
                                state->any.circ = NULL;
                                dmsg_circuit_drop(circB);
                        }
-                       disconnect = 1;
                }
 
                /*
@@ -1444,23 +1451,7 @@ dmsg_volconf_thread(void *info)
        return(NULL);
 }
 
-static
-void
-dmsg_volconf_stop(h2span_media_config_t *conf)
-{
-       switch(conf->state) {
-       case H2MC_STOPPED:
-               break;
-       case H2MC_CONNECT:
-               conf->state = H2MC_STOPPED;
-               break;
-       case H2MC_RUNNING:
-               shutdown(conf->fd, SHUT_WR);
-               pthread_join(conf->iocom_thread, NULL);
-               conf->iocom_thread = NULL;
-               break;
-       }
-}
+static void dmsg_volconf_signal(dmsg_iocom_t *iocom);
 
 static
 void
@@ -1475,10 +1466,17 @@ dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname)
                if (conf->fd < 0) {
                        fprintf(stderr, "Unable to connect to %s\n", hostname);
                        conf->state = H2MC_CONNECT;
+               } else if (pipe(conf->pipefd) < 0) {
+                       close(conf->fd);
+                       fprintf(stderr, "pipe() failed during volconf\n");
+                       conf->state = H2MC_CONNECT;
                } else {
+                       fprintf(stderr, "VOLCONF CONNECT\n");
                        info = malloc(sizeof(*info));
                        bzero(info, sizeof(*info));
                        info->fd = conf->fd;
+                       info->altfd = conf->pipefd[0];
+                       info->altmsg_callback = dmsg_volconf_signal;
                        info->detachme = 0;
                        conf->state = H2MC_RUNNING;
                        pthread_create(&conf->iocom_thread, NULL,
@@ -1490,6 +1488,33 @@ dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname)
        }
 }
 
+static
+void
+dmsg_volconf_stop(h2span_media_config_t *conf)
+{
+       switch(conf->state) {
+       case H2MC_STOPPED:
+               break;
+       case H2MC_CONNECT:
+               conf->state = H2MC_STOPPED;
+               break;
+       case H2MC_RUNNING:
+               close(conf->pipefd[1]);
+               conf->pipefd[1] = -1;
+               pthread_join(conf->iocom_thread, NULL);
+               conf->iocom_thread = NULL;
+               conf->state = H2MC_STOPPED;
+               break;
+       }
+}
+
+static
+void
+dmsg_volconf_signal(dmsg_iocom_t *iocom)
+{
+       shutdown(iocom->sock_fd, SHUT_RDWR);
+}
+
 /************************************************************************
  *                     MESSAGE ROUTING AND SOURCE VALIDATION           *
  ************************************************************************/
index 655895a..a267060 100644 (file)
@@ -54,17 +54,20 @@ dmsg_master_service(void *data)
        if (info->detachme)
                pthread_detach(pthread_self());
 
-       dmsg_iocom_init(&iocom, info->fd, -1,
-                          master_auth_signal,
-                          master_auth_rxmsg,
-                          info->dbgmsg_callback,
-                          NULL);
+       dmsg_iocom_init(&iocom,
+                       info->fd,
+                       (info->altmsg_callback ? info->altfd : -1),
+                       master_auth_signal,
+                       master_auth_rxmsg,
+                       info->dbgmsg_callback,
+                       info->altmsg_callback);
        if (info->label) {
                dmsg_iocom_label(&iocom, "%s", info->label);
                free(info->label);
                info->label = NULL;
        }
        dmsg_iocom_core(&iocom);
+       dmsg_iocom_done(&iocom);
 
        fprintf(stderr,
                "iocom on fd %d terminated error rx=%d, tx=%d\n",
@@ -114,7 +117,7 @@ master_auth_signal(dmsg_iocom_t *iocom)
        dmsg_iocom_restate(iocom,
                            master_link_signal,
                            master_link_rxmsg,
-                           NULL);
+                           iocom->altmsg_callback);
 }
 
 static
index c494172..512448a 100644 (file)
@@ -51,6 +51,16 @@ struct service_node_opaque {
        int     servicefd;
 };
 
+struct autoconn {
+       TAILQ_ENTRY(autoconn) entry;
+       char    *host;
+       int     stage;
+       int     stopme;
+       int     pipefd[2];      /* {read,write} */
+       enum { AUTOCONN_INACTIVE, AUTOCONN_ACTIVE } state;
+       pthread_t thread;
+};
+
 #define WS " \r\n"
 
 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
@@ -58,6 +68,7 @@ pthread_mutex_t diskmtx;
 
 static void *service_thread(void *data);
 static void *udev_thread(void *data);
+static void *autoconn_thread(void *data);
 static void master_reconnect(const char *mntpt);
 static void disk_reconnect(const char *disk);
 static void disk_disconnect(void *handle);
@@ -171,6 +182,12 @@ service_thread(void *data)
        thread = NULL;
        pthread_create(&thread, NULL, udev_thread, NULL);
 
+       /*
+        * Start thread to manage /etc/hammer2/autoconn
+        */
+       thread = NULL;
+       pthread_create(&thread, NULL, autoconn_thread, NULL);
+
        /*
         * Scan existing hammer2 mounts and reconnect to them using
         * HAMMER2IOC_RECLUSTER.
@@ -287,6 +304,221 @@ udev_thread(void *data __unused)
        return (NULL);
 }
 
+static void *autoconn_connect_thread(void *data);
+static void autoconn_disconnect_signal(dmsg_iocom_t *iocom);
+
+static
+void *
+autoconn_thread(void *data __unused)
+{
+       TAILQ_HEAD(, autoconn) autolist;
+       struct autoconn *ac;
+       struct autoconn *next;
+       pthread_t thread;
+       struct stat st;
+       time_t  t;
+       time_t  lmod;
+       int     found_last;
+       FILE    *fp;
+       char    buf[256];
+
+       TAILQ_INIT(&autolist);
+       found_last = 0;
+       lmod = 0;
+
+       pthread_detach(pthread_self());
+       for (;;) {
+               /*
+                * Polling interval
+                */
+               sleep(5);
+
+               /*
+                * Poll the file.  Loop up if the synchronized state (lmod)
+                * has not changed.
+                */
+               if (stat(HAMMER2_DEFAULT_DIR "/autoconn", &st) == 0) {
+                       if (lmod == st.st_mtime)
+                               continue;
+                       fp = fopen(HAMMER2_DEFAULT_DIR "/autoconn", "r");
+                       if (fp == NULL)
+                               continue;
+               } else {
+                       if (lmod == 0)
+                               continue;
+                       fp = NULL;
+               }
+
+               /*
+                * Wait at least 5 seconds after the file is created or
+                * removed.
+                *
+                * Do not update the synchronized state.
+                */
+               if (fp == NULL && found_last) {
+                       found_last = 0;
+                       continue;
+               } else if (fp && found_last == 0) {
+                       fclose(fp);
+                       found_last = 1;
+                       continue;
+               }
+
+               /*
+                * Don't scan the file until the time progresses past the
+                * file's mtime, so we can validate that the file was not
+                * further modified during our scan.
+                *
+                * Do not update the synchronized state.
+                */
+               time(&t);
+               if (fp) {
+                       if (t == st.st_mtime) {
+                               fclose(fp);
+                               continue;
+                       }
+                       t = st.st_mtime;
+               } else {
+                       t = 0;
+               }
+
+               /*
+                * Set staging to disconnect, then scan the file.
+                */
+               TAILQ_FOREACH(ac, &autolist, entry)
+                       ac->stage = 0;
+               while (fp && fgets(buf, sizeof(buf), fp) != NULL) {
+                       char *host;
+
+                       if ((host = strtok(buf, " \t\r\n")) == NULL ||
+                           host[0] == '#') {
+                               continue;
+                       }
+                       TAILQ_FOREACH(ac, &autolist, entry) {
+                               if (strcmp(host, ac->host) == 0)
+                                       break;
+                       }
+                       if (ac == NULL) {
+                               ac = malloc(sizeof(*ac));
+                               bzero(ac, sizeof(*ac));
+                               ac->host = strdup(host);
+                               ac->state = AUTOCONN_INACTIVE;
+                               TAILQ_INSERT_TAIL(&autolist, ac, entry);
+                       }
+                       ac->stage = 1;
+               }
+
+               /*
+                * Ignore the scan (and retry again) if the file was
+                * modified during the scan.
+                *
+                * Do not update the synchronized state.
+                */
+               if (fp) {
+                       if (fstat(fileno(fp), &st) < 0) {
+                               fclose(fp);
+                               continue;
+                       }
+                       fclose(fp);
+                       if (t != st.st_mtime)
+                               continue;
+               }
+
+               /*
+                * Update the synchronized state and reconfigure the
+                * connect list as needed.
+                */
+               lmod = t;
+               next = TAILQ_FIRST(&autolist);
+               while ((ac = next) != NULL) {
+                       next = TAILQ_NEXT(ac, entry);
+
+                       /*
+                        * Staging, initiate
+                        */
+                       if (ac->stage && ac->state == AUTOCONN_INACTIVE) {
+                               if (pipe(ac->pipefd) == 0) {
+                                       ac->stopme = 0;
+                                       ac->state = AUTOCONN_ACTIVE;
+                                       thread = NULL;
+                                       pthread_create(&thread, NULL,
+                                                      autoconn_connect_thread,
+                                                      ac);
+                               }
+                       }
+
+                       /*
+                        * Unstaging, stop active connection.
+                        */
+                       if (ac->stage == 0 &&
+                           ac->state == AUTOCONN_ACTIVE) {
+                               if (ac->stopme == 0) {
+                                       ac->stopme = 1;
+                                       close(ac->pipefd[1]); /* signal */
+                               }
+                       }
+
+                       /*
+                        * Unstaging, delete inactive connection.
+                        */
+                       if (ac->stage == 0 &&
+                           ac->state == AUTOCONN_INACTIVE) {
+                               TAILQ_REMOVE(&autolist, ac, entry);
+                               free(ac->host);
+                               free(ac);
+                               continue;
+                       }
+               }
+               sleep(5);
+       }
+       return(NULL);
+}
+
+static
+void *
+autoconn_connect_thread(void *data)
+{
+       dmsg_master_service_info_t *info;
+       struct autoconn *ac;
+       void *res;
+       int fd;
+
+       ac = data;
+       pthread_detach(pthread_self());
+
+       while (ac->stopme == 0) {
+               fd = dmsg_connect(ac->host);
+               if (fd < 0) {
+                       fprintf(stderr, "autoconn: Connect failure: %s\n",
+                               ac->host);
+                       sleep(5);
+                       continue;
+               }
+               fprintf(stderr, "autoconn: Connect %s\n", ac->host);
+
+               info = malloc(sizeof(*info));
+               bzero(info, sizeof(*info));
+               info->fd = fd;
+               info->altfd = ac->pipefd[0];
+               info->altmsg_callback = autoconn_disconnect_signal;
+               info->detachme = 0;
+               pthread_create(&ac->thread, NULL, dmsg_master_service, info);
+               pthread_join(ac->thread, &res);
+       }
+       close(ac->pipefd[0]);
+       ac->state = AUTOCONN_INACTIVE;
+       /* auto structure can be ripped out here */
+       return(NULL);
+}
+
+static
+void
+autoconn_disconnect_signal(dmsg_iocom_t *iocom)
+{
+       fprintf(stderr, "autoconn: Shutting down socket\n");
+       shutdown(iocom->sock_fd, SHUT_RDWR);
+}
+
 /*
  * Retrieve the list of disk attachments and attempt to export
  * them.