hammer2 - refactor libdmsg volume configuration
authorMatthew Dillon <dillon@apollo.backplane.com>
Sat, 19 Apr 2014 19:55:21 +0000 (12:55 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Sat, 19 Apr 2014 19:55:21 +0000 (12:55 -0700)
* Move the libdmsg volume configuration structures and commands, which
  are hammer2-specific, over to hammer2.

sbin/hammer2/cmd_debug.c
sbin/hammer2/cmd_service.c
sbin/hammer2/hammer2.h
sys/vfs/hammer2/hammer2_disk.h
sys/vfs/hammer2/hammer2_flush.c
sys/vfs/hammer2/hammer2_vfsops.c

index c73f35d..756f00a 100644 (file)
@@ -172,27 +172,50 @@ static void shell_span(dmsg_circuit_t *circuit, char *cmdbuf);
 static void shell_circ(dmsg_circuit_t *circuit, char *cmdbuf);
 
 void
-hammer2_shell_parse(dmsg_msg_t *msg)
+hammer2_shell_parse(dmsg_msg_t *msg, int unmanaged)
 {
-       dmsg_circuit_t *circuit = msg->circuit;
-       char *cmdbuf = msg->aux_data;
-       char *cmd = strsep(&cmdbuf, " \t");
+       dmsg_circuit_t *circuit;
+       char *cmdbuf;
+       char *cmdp;
+       uint32_t cmd;
 
-       if (cmd == NULL || *cmd == 0) {
+       /*
+        * Filter on debug shell commands only
+        */
+       cmd = msg->any.head.cmd;
+       if ((cmd & DMSGF_PROTOS) != DMSG_PROTO_DBG) {
+               if (unmanaged)
+                       dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
+               return;
+       }
+       if ((cmd & DMSGF_CMDSWMASK) != DMSG_DBG_SHELL) {
+               if (unmanaged)
+                       dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
+               return;
+       }
+
+       /*
+        * Debug shell command
+        */
+       circuit = msg->circuit;
+       cmdbuf = msg->aux_data;
+       cmdp = strsep(&cmdbuf, " \t");
+
+       if (cmdp == NULL || *cmdp == 0) {
                ;
-       } else if (strcmp(cmd, "span") == 0) {
+       } else if (strcmp(cmdp, "span") == 0) {
                shell_span(circuit, cmdbuf);
-       } else if (strcmp(cmd, "circ") == 0) {
+       } else if (strcmp(cmdp, "circ") == 0) {
                shell_circ(circuit, cmdbuf);
-       } else if (strcmp(cmd, "tree") == 0) {
+       } else if (strcmp(cmdp, "tree") == 0) {
                dmsg_shell_tree(circuit, cmdbuf); /* dump spanning tree */
-       } else if (strcmp(cmd, "help") == 0 || strcmp(cmd, "?") == 0) {
+       } else if (strcmp(cmdp, "help") == 0 || strcmp(cmdp, "?") == 0) {
                dmsg_circuit_printf(circuit, "help            Command help\n");
                dmsg_circuit_printf(circuit, "span <host>     Span to target host\n");
                dmsg_circuit_printf(circuit, "circ <msgid>    Create VC to msgid of rx SPAN\n");
                dmsg_circuit_printf(circuit, "tree            Dump spanning tree\n");
        } else {
-               dmsg_circuit_printf(circuit, "Unrecognized command: %s\n", cmd);
+               dmsg_circuit_printf(circuit, "Unrecognized command: %s\n", cmdp);
        }
 }
 
@@ -227,7 +250,7 @@ shell_span(dmsg_circuit_t *circuit, char *cmdbuf)
                bzero(info, sizeof(*info));
                info->fd = fd;
                info->detachme = 1;
-               info->dbgmsg_callback = hammer2_shell_parse;
+               info->usrmsg_callback = hammer2_shell_parse;
                info->label = strdup("client");
 
                pthread_create(&thread, NULL, dmsg_master_service, info);
index a3b70f7..e48e755 100644 (file)
 #include <sys/xdiskioctl.h>
 #include <machine/atomic.h>
 
+struct hammer2_media_config {
+       hammer2_volconf_t       copy_run;
+       hammer2_volconf_t       copy_pend;
+       pthread_t               thread;
+       pthread_cond_t          cond;
+       int                     ctl;
+       int                     fd;
+       int                     pipefd[2];      /* signal stop */
+       dmsg_iocom_t            iocom;
+       pthread_t               iocom_thread;
+       enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
+};
+
+typedef struct hammer2_media_config hammer2_media_config_t;
+
+#define H2CONFCTL_STOP          0x00000001
+#define H2CONFCTL_UPDATE        0x00000002
+
 struct diskcon {
        TAILQ_ENTRY(diskcon) entry;
        char    *disk;
@@ -65,7 +83,8 @@ struct autoconn {
 #define WS " \r\n"
 
 TAILQ_HEAD(, diskcon) diskconq = TAILQ_HEAD_INITIALIZER(diskconq);
-pthread_mutex_t diskmtx;
+static pthread_mutex_t diskmtx;
+static pthread_mutex_t confmtx;
 
 static void *service_thread(void *data);
 static void *udev_thread(void *data);
@@ -74,7 +93,14 @@ static void master_reconnect(const char *mntpt);
 static void disk_reconnect(const char *disk);
 static void disk_disconnect(void *handle);
 static void udev_check_disks(void);
-static void service_node_handler(void **opaque, struct dmsg_msg *msg, int op);
+static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged);
+static void hammer2_node_handler(void **opaque, struct dmsg_msg *msg, int op);
+static void *hammer2_volconf_thread(void *info);
+static void hammer2_volconf_signal(dmsg_iocom_t *iocom);
+static void hammer2_volconf_start(hammer2_media_config_t *conf,
+                       const char *hostname);
+static void hammer2_volconf_stop(hammer2_media_config_t *conf);
+
 
 static void xdisk_reconnect(struct service_node_opaque *info);
 static void xdisk_disconnect(void *handle);
@@ -199,8 +225,6 @@ service_thread(void *data)
        setproctitle("hammer2 master listen");
        pthread_detach(pthread_self());
 
-       dmsg_node_handler = service_node_handler;
-
        /*
         * Start up a thread to handle block device monitoring
         */
@@ -241,13 +265,224 @@ service_thread(void *data)
                bzero(info, sizeof(*info));
                info->fd = fd;
                info->detachme = 1;
-               info->dbgmsg_callback = hammer2_shell_parse;
+               info->usrmsg_callback = hammer2_usrmsg_handler;
+               info->node_handler = hammer2_node_handler;
                info->label = strdup("client");
                pthread_create(&thread, NULL, dmsg_master_service, info);
        }
        return (NULL);
 }
 
+/*
+ * Handle/Monitor the dmsg stream.  If unmanaged is set we are responsible
+ * for responding for the message, otherwise if it is not set libdmsg has
+ * already done some preprocessing and will respond to the message for us
+ * when we return.
+ *
+ * We primarily monitor for VOLCONFs
+ */
+static
+void
+hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged)
+{
+       dmsg_state_t *state;
+       hammer2_media_config_t *conf;
+       dmsg_lnk_hammer2_volconf_t *msgconf;
+       int i;
+
+       /*
+        * Only process messages which are part of a LNK_CONN stream
+        */
+       state = msg->state;
+       if (state == NULL ||
+           (state->rxcmd & DMSGF_BASECMDMASK) != DMSG_LNK_CONN) {
+               hammer2_shell_parse(msg, unmanaged);
+               return;
+       }
+
+       switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
+       case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
+       case DMSG_LNK_CONN | DMSGF_DELETE:
+       case DMSG_LNK_ERROR | DMSGF_DELETE:
+               /*
+                * Deleting connection, clean out all volume configs
+                */
+               if (state->media == NULL || state->media->usrhandle == NULL)
+                       break;
+               conf = state->media->usrhandle;
+               fprintf(stderr, "Shutting down media spans\n");
+               for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
+                       if (conf[i].thread) {
+                               conf[i].ctl = H2CONFCTL_STOP;
+                               pthread_cond_signal(&conf[i].cond);
+                       }
+               }
+               for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
+                       if (conf[i].thread) {
+                               pthread_join(conf[i].thread, NULL);
+                               conf->thread = NULL;
+                               pthread_cond_destroy(&conf[i].cond);
+                       }
+               }
+               state->media->usrhandle = NULL;
+               free(conf);
+               break;
+       case DMSG_LNK_HAMMER2_VOLCONF:
+               /*
+                * One-way volume-configuration message is transmitted
+                * over the open LNK_CONN transaction.
+                */
+               fprintf(stderr, "RECEIVED VOLCONF\n");
+
+               if ((conf = state->media->usrhandle) == NULL) {
+                       conf = malloc(sizeof(*conf) * HAMMER2_COPYID_COUNT);
+                       bzero(conf, sizeof(*conf) * HAMMER2_COPYID_COUNT);
+                       state->media->usrhandle = conf;
+               }
+               msgconf = H2_LNK_VOLCONF(msg);
+
+               if (msgconf->index < 0 ||
+                   msgconf->index >= HAMMER2_COPYID_COUNT) {
+                       fprintf(stderr,
+                               "VOLCONF: ILLEGAL INDEX %d\n",
+                               msgconf->index);
+                       break;
+               }
+               if (msgconf->copy.path[sizeof(msgconf->copy.path) - 1] != 0 ||
+                   msgconf->copy.path[0] == 0) {
+                       fprintf(stderr,
+                               "VOLCONF: ILLEGAL PATH %d\n",
+                               msgconf->index);
+                       break;
+               }
+               conf += msgconf->index;
+               pthread_mutex_lock(&confmtx);
+               conf->copy_pend = msgconf->copy;
+               conf->ctl |= H2CONFCTL_UPDATE;
+               pthread_mutex_unlock(&confmtx);
+               if (conf->thread == NULL) {
+                       fprintf(stderr, "VOLCONF THREAD STARTED\n");
+                       pthread_cond_init(&conf->cond, NULL);
+                       pthread_create(&conf->thread, NULL,
+                                      hammer2_volconf_thread, (void *)conf);
+               }
+               pthread_cond_signal(&conf->cond);
+               break;
+       default:
+               if (unmanaged)
+                       dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
+               break;
+       }
+}
+
+static void *
+hammer2_volconf_thread(void *info)
+{
+       hammer2_media_config_t *conf = info;
+
+       pthread_mutex_lock(&confmtx);
+       while ((conf->ctl & H2CONFCTL_STOP) == 0) {
+               if (conf->ctl & H2CONFCTL_UPDATE) {
+                       fprintf(stderr, "VOLCONF UPDATE\n");
+                       conf->ctl &= ~H2CONFCTL_UPDATE;
+                       if (bcmp(&conf->copy_run, &conf->copy_pend,
+                                sizeof(conf->copy_run)) == 0) {
+                               fprintf(stderr, "VOLCONF: no changes\n");
+                               continue;
+                       }
+                       /*
+                        * XXX TODO - auto reconnect on lookup failure or
+                        *              connect failure or stream failure.
+                        */
+
+                       pthread_mutex_unlock(&confmtx);
+                       hammer2_volconf_stop(conf);
+                       conf->copy_run = conf->copy_pend;
+                       if (conf->copy_run.copyid != 0 &&
+                           strncmp(conf->copy_run.path, "span:", 5) == 0) {
+                               hammer2_volconf_start(conf,
+                                                     conf->copy_run.path + 5);
+                       }
+                       pthread_mutex_lock(&confmtx);
+                       fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
+               }
+               if (conf->state == H2MC_CONNECT) {
+                       hammer2_volconf_start(conf, conf->copy_run.path + 5);
+                       pthread_mutex_unlock(&confmtx);
+                       sleep(5);
+                       pthread_mutex_lock(&confmtx);
+               } else {
+                       pthread_cond_wait(&conf->cond, &confmtx);
+               }
+       }
+       pthread_mutex_unlock(&confmtx);
+       hammer2_volconf_stop(conf);
+       return(NULL);
+}
+
+static
+void
+hammer2_volconf_start(hammer2_media_config_t *conf, const char *hostname)
+{
+       dmsg_master_service_info_t *info;
+
+       switch(conf->state) {
+       case H2MC_STOPPED:
+       case H2MC_CONNECT:
+               conf->fd = dmsg_connect(hostname);
+               if (conf->fd < 0) {
+                       fprintf(stderr, "Unable to connect to %s\n", hostname);
+                       conf->state = H2MC_CONNECT;
+               } else if (pipe(conf->pipefd) < 0) {
+                       close(conf->fd);
+                       fprintf(stderr, "pipe() failed during volconf\n");
+                       conf->state = H2MC_CONNECT;
+               } else {
+                       fprintf(stderr, "VOLCONF CONNECT\n");
+                       info = malloc(sizeof(*info));
+                       bzero(info, sizeof(*info));
+                       info->fd = conf->fd;
+                       info->altfd = conf->pipefd[0];
+                       info->altmsg_callback = hammer2_volconf_signal;
+                       info->detachme = 0;
+                       conf->state = H2MC_RUNNING;
+                       pthread_create(&conf->iocom_thread, NULL,
+                                      dmsg_master_service, info);
+               }
+               break;
+       case H2MC_RUNNING:
+               break;
+       }
+}
+
+static
+void
+hammer2_volconf_stop(hammer2_media_config_t *conf)
+{
+       switch(conf->state) {
+       case H2MC_STOPPED:
+               break;
+       case H2MC_CONNECT:
+               conf->state = H2MC_STOPPED;
+               break;
+       case H2MC_RUNNING:
+               close(conf->pipefd[1]);
+               conf->pipefd[1] = -1;
+               pthread_join(conf->iocom_thread, NULL);
+               conf->iocom_thread = NULL;
+               conf->state = H2MC_STOPPED;
+               break;
+       }
+}
+
+static
+void
+hammer2_volconf_signal(dmsg_iocom_t *iocom)
+{
+       atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
+}
+
+
 /*
  * Node discovery code on received SPANs (or loss of SPANs).  This code
  * is used to track the availability of remote block devices and install
@@ -260,7 +495,7 @@ service_thread(void *data)
  */
 static
 void
-service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
+hammer2_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
 {
        struct service_node_opaque *info = *opaquep;
 
@@ -643,7 +878,8 @@ master_reconnect(const char *mntpt)
        bzero(info, sizeof(*info));
        info->fd = pipefds[1];
        info->detachme = 1;
-       info->dbgmsg_callback = hammer2_shell_parse;
+       info->usrmsg_callback = hammer2_usrmsg_handler;
+       info->node_handler = hammer2_node_handler;
        info->label = strdup("hammer2");
        pthread_create(&thread, NULL, dmsg_master_service, info);
 }
@@ -724,7 +960,8 @@ disk_reconnect(const char *disk)
        bzero(info, sizeof(*info));
        info->fd = pipefds[1];
        info->detachme = 1;
-       info->dbgmsg_callback = hammer2_shell_parse;
+       info->usrmsg_callback = hammer2_usrmsg_handler;
+       info->node_handler = hammer2_node_handler;
        info->exit_callback = disk_disconnect;
        info->handle = dc;
        info->label = strdup(dc->disk);
@@ -768,7 +1005,8 @@ xdisk_reconnect(struct service_node_opaque *xdisk)
        bzero(info, sizeof(*info));
        info->fd = pipefds[1];
        info->detachme = 1;
-       info->dbgmsg_callback = hammer2_shell_parse;
+       info->usrmsg_callback = hammer2_usrmsg_handler;
+       info->node_handler = hammer2_node_handler;
        info->exit_callback = xdisk_disconnect;
        info->handle = xdisk;
        xdisk->servicing = 1;
index 2b2c1fa..a04c9b3 100644 (file)
@@ -154,5 +154,5 @@ hammer2_key_t dirhash(const unsigned char *name, size_t len);
 uint32_t hammer2_icrc32(const void *buf, size_t size);
 uint32_t hammer2_icrc32c(const void *buf, size_t size, uint32_t crc);
 
-void hammer2_shell_parse(dmsg_msg_t *msg);
+void hammer2_shell_parse(dmsg_msg_t *msg, int unmanaged);
 void print_inode(char* inode_string);
index 727d427..e563b86 100644 (file)
@@ -451,8 +451,6 @@ typedef struct hammer2_volconf hammer2_volconf_t;
 #define DMSG_VOLF_CONN_EF      0x40    /* media errors flagged */
 #define DMSG_VOLF_CONN_PRI     0x0F    /* select priority 0-15 (15=best) */
 
-#define DMSG_COPYID_COUNT      256     /* WARNING! embedded in hammer2 vol */
-
 struct dmsg_lnk_hammer2_volconf {
        dmsg_hdr_t              head;
        hammer2_volconf_t       copy;   /* copy spec */
@@ -849,16 +847,7 @@ typedef struct hammer2_inode_data hammer2_inode_data_t;
 #define HAMMER2_COPYID_NONE            0
 #define HAMMER2_COPYID_LOCAL           ((uint8_t)-1)
 
-/*
- * PEER types identify connections and help cluster controller filter
- * out unwanted SPANs.
- */
-#define HAMMER2_PEER_NONE              DMSG_PEER_NONE
-#define HAMMER2_PEER_CLUSTER           DMSG_PEER_CLUSTER
-#define HAMMER2_PEER_BLOCK             DMSG_PEER_BLOCK
-#define HAMMER2_PEER_HAMMER2           DMSG_PEER_HAMMER2
-
-#define HAMMER2_COPYID_COUNT           DMSG_COPYID_COUNT
+#define HAMMER2_COPYID_COUNT           256
 
 /*
  * PFS types identify a PFS on media and in LNK_SPAN messages.
index 70a1466..159b94c 100644 (file)
@@ -965,7 +965,7 @@ hammer2_flush_core(hammer2_flush_info_t *info, hammer2_chain_t **chainp,
                 */
                hmp->voldata.mirror_tid = chain->bref.mirror_tid;
                hmp->voldata.freemap_tid = hmp->fchain.bref.mirror_tid;
-               kprintf("mirror_tid %08x\n", chain->bref.mirror_tid);
+               kprintf("mirror_tid %08jx\n", (intmax_t)chain->bref.mirror_tid);
 
                /*
                 * The volume header is flushed manually by the syncer, not
index 6d65a2e..56afb80 100644 (file)
@@ -451,7 +451,7 @@ hammer2_vfs_mount(struct mount *mp, char *path, caddr_t data,
                                if (error)
                                        break;
                        }
-                       hammer2_inode_install_hidden(pmp);
+                       /*hammer2_inode_install_hidden(pmp);*/
 
                        return error;
                }
@@ -762,7 +762,7 @@ hammer2_vfs_mount(struct mount *mp, char *path, caddr_t data,
                lockmgr(&hammer2_mntlk, LK_RELEASE);
 
                kprintf("ok\n");
-               /* hammer2_inode_install_hidden(pmp); */
+               hammer2_inode_install_hidden(pmp);
 
                return ERANGE;
        }
@@ -2570,7 +2570,7 @@ hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp)
         * clients (otherwise millions of clients would present a serious
         * problem).  The fs_label also serves to restrict the namespace.
         */
-       pmp->iocom.auto_lnk_conn.peer_mask = 1LLU << HAMMER2_PEER_HAMMER2;
+       pmp->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_HAMMER2;
        pmp->iocom.auto_lnk_conn.pfs_mask = (uint64_t)-1;
        switch (ipdata->pfs_type) {
        case DMSG_PFSTYPE_CLIENT: