hammer2 - Config notifications, cleanup HAMMER2 VFS API
authorMatthew Dillon <dillon@apollo.backplane.com>
Sat, 11 Aug 2012 03:46:51 +0000 (20:46 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Sat, 11 Aug 2012 03:46:51 +0000 (20:46 -0700)
* A hammer2 volume has a PERSISTENT table of 256 entries in the media
  volume header for specifying how the cluster connects together.  Various
  hammer2 directives can list, add, or remove entries from this table.

  This table will be used for several different aspects of the filesystem
  and one of them is to tell the userland hammer2 service daemon what
  other machines to connect to.  That is, we want the cluster configuration
  to be persistently stored as part of a HAMMER2 filesystem.

* Add a notification message from the kernel to the daemon whenever this
  table id modified.  The kernel will also spam the contents of the table
  to the daemon when it first connects to the daemon.

  The service daemon tracks the table and will connect to (or disconnect
  from) the listed targets in real time.  In addition, the service daemon
  will retry a failed connection (or failed DNS lookup) forever as long
  as the entry is intact.  The idea being that a machine in the cluster
  will recover once transitory failures are resolved.

  This is a bit messy at the moment as two pthreads have to be created for
  each connection... one to handle connect, disconnect, and retry operations
  and the other to handle the actual message stream over the connection.

* Clean up the HAMMER2 VFS code's messaging APIs a bit, bringing them
  closer to the hammer2 userland APIs (though of course there will
  always be major differences).

* Adjust the hammer2 VFS to try to clean up open transactional states
  when a socket failure occurs before proceeding with a umount, so the
  related functional states can be triggered and cleaned up.

* Added an ioctl to reconnect a hammer2 mount to the userland hammer2
  service daemon (not yet used).  This will allow us to kill and restart
  the daemon and have it recover the communications pipes between itself
  and the HAMMER2 mounts in the kernel.

sbin/hammer2/cmd_remote.c
sbin/hammer2/msg.c
sbin/hammer2/msg_lnk.c
sbin/hammer2/network.h
sys/vfs/hammer2/hammer2.h
sys/vfs/hammer2/hammer2_ioctl.c
sys/vfs/hammer2/hammer2_ioctl.h
sys/vfs/hammer2/hammer2_msg.c
sys/vfs/hammer2/hammer2_network.h
sys/vfs/hammer2/hammer2_vfsops.c

index 8300e09..97116b1 100644 (file)
@@ -100,7 +100,7 @@ cmd_remote_status(const char *sel_path, int all_opt __unused)
        bzero(&remote, sizeof(remote));
 
        while ((remote.copyid = remote.nextid) >= 0) {
        bzero(&remote, sizeof(remote));
 
        while ((remote.copyid = remote.nextid) >= 0) {
-               if (ioctl(fd, HAMMER2IOC_REMOTE_GET, &remote) < 0) {
+               if (ioctl(fd, HAMMER2IOC_REMOTE_SCAN, &remote) < 0) {
                        perror("ioctl");
                        ecode = 1;
                        break;
                        perror("ioctl");
                        ecode = 1;
                        break;
index 3531379..ff38357 100644 (file)
@@ -2062,6 +2062,9 @@ hammer2_basecmd_str(uint32_t cmd)
        case HAMMER2_LNK_SPAN:
                cmdstr = "SPAN";
                break;
        case HAMMER2_LNK_SPAN:
                cmdstr = "SPAN";
                break;
+       case HAMMER2_LNK_VOLCONF:
+               cmdstr = "VOLCONF";
+               break;
        case HAMMER2_LNK_ERROR:
                if (cmd & HAMMER2_MSGF_DELETE)
                        cmdstr = "RETURN";
        case HAMMER2_LNK_ERROR:
                if (cmd & HAMMER2_MSGF_DELETE)
                        cmdstr = "RETURN";
index 1c0ab69..91b7dfe 100644 (file)
 
 struct h2span_link;
 struct h2span_relay;
 
 struct h2span_link;
 struct h2span_relay;
+TAILQ_HEAD(h2span_media_queue, h2span_media);
 TAILQ_HEAD(h2span_connect_queue, h2span_connect);
 TAILQ_HEAD(h2span_relay_queue, h2span_relay);
 
 TAILQ_HEAD(h2span_connect_queue, h2span_connect);
 TAILQ_HEAD(h2span_relay_queue, h2span_relay);
 
@@ -194,12 +195,39 @@ RB_HEAD(h2span_link_tree, h2span_link);
 RB_HEAD(h2span_relay_tree, h2span_relay);
 
 /*
 RB_HEAD(h2span_relay_tree, h2span_relay);
 
 /*
+ * This represents a media
+ */
+struct h2span_media {
+       TAILQ_ENTRY(h2span_media) entry;
+       uuid_t  mediaid;
+       int     refs;
+       struct h2span_media_config {
+               hammer2_copy_data_t     copy_run;
+               hammer2_copy_data_t     copy_pend;
+               pthread_t               thread;
+               pthread_cond_t          cond;
+               int                     ctl;
+               int                     fd;
+               hammer2_iocom_t         iocom;
+               pthread_t               iocom_thread;
+               enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
+       } config[HAMMER2_COPYID_COUNT];
+};
+
+typedef struct h2span_media_config h2span_media_config_t;
+
+#define H2CONFCTL_STOP         0x00000001
+#define H2CONFCTL_UPDATE       0x00000002
+
+/*
  * Received LNK_CONN transaction enables SPAN protocol over connection.
  * Received LNK_CONN transaction enables SPAN protocol over connection.
- * (may contain filter).
+ * (may contain filter).  Typically one for each mount and several may
+ * share the same media.
  */
 struct h2span_connect {
        TAILQ_ENTRY(h2span_connect) entry;
        struct h2span_relay_tree tree;
  */
 struct h2span_connect {
        TAILQ_ENTRY(h2span_connect) entry;
        struct h2span_relay_tree tree;
+       struct h2span_media *media;
        hammer2_state_t *state;
 };
 
        hammer2_state_t *state;
 };
 
@@ -260,6 +288,7 @@ struct h2span_relay {
 };
 
 
 };
 
 
+typedef struct h2span_media h2span_media_t;
 typedef struct h2span_connect h2span_connect_t;
 typedef struct h2span_cluster h2span_cluster_t;
 typedef struct h2span_node h2span_node_t;
 typedef struct h2span_connect h2span_connect_t;
 typedef struct h2span_cluster h2span_cluster_t;
 typedef struct h2span_node h2span_node_t;
@@ -363,11 +392,12 @@ RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay,
             rbnode, h2span_relay_cmp);
 
 /*
             rbnode, h2span_relay_cmp);
 
 /*
- * Global mutex protects cluster_tree lookups.
+ * Global mutex protects cluster_tree lookups, connq, mediaq.
  */
 static pthread_mutex_t cluster_mtx;
 static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
 static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq);
  */
 static pthread_mutex_t cluster_mtx;
 static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
 static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq);
+static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq);
 
 static void hammer2_lnk_span(hammer2_msg_t *msg);
 static void hammer2_lnk_conn(hammer2_msg_t *msg);
 
 static void hammer2_lnk_span(hammer2_msg_t *msg);
 static void hammer2_lnk_conn(hammer2_msg_t *msg);
@@ -375,6 +405,11 @@ static void hammer2_lnk_relay(hammer2_msg_t *msg);
 static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node);
 static void hammer2_relay_delete(h2span_relay_t *relay);
 
 static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node);
 static void hammer2_relay_delete(h2span_relay_t *relay);
 
+static void *hammer2_volconf_thread(void *info);
+static void hammer2_volconf_stop(h2span_media_config_t *conf);
+static void hammer2_volconf_start(h2span_media_config_t *conf,
+                               const char *hostname);
+
 void
 hammer2_msg_lnk_signal(hammer2_router_t *router __unused)
 {
 void
 hammer2_msg_lnk_signal(hammer2_router_t *router __unused)
 {
@@ -411,20 +446,23 @@ void
 hammer2_lnk_conn(hammer2_msg_t *msg)
 {
        hammer2_state_t *state = msg->state;
 hammer2_lnk_conn(hammer2_msg_t *msg)
 {
        hammer2_state_t *state = msg->state;
+       h2span_media_t *media;
+       h2span_media_config_t *conf;
        h2span_connect_t *conn;
        h2span_relay_t *relay;
        char *alloc = NULL;
        h2span_connect_t *conn;
        h2span_relay_t *relay;
        char *alloc = NULL;
+       int i;
 
        pthread_mutex_lock(&cluster_mtx);
 
 
        pthread_mutex_lock(&cluster_mtx);
 
-       /*
-        * On transaction start we allocate a new h2span_connect and
-        * acknowledge the request, leaving the transaction open.
-        * We then relay priority-selected SPANs.
-        */
-       if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
-               state->func = hammer2_lnk_conn;
-
+       switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
+       case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE:
+       case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
+               /*
+                * On transaction start we allocate a new h2span_connect and
+                * acknowledge the request, leaving the transaction open.
+                * We then relay priority-selected SPANs.
+                */
                fprintf(stderr, "LNK_CONN(%08x): %s/%s\n",
                        (uint32_t)msg->any.head.msgid,
                        hammer2_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
                fprintf(stderr, "LNK_CONN(%08x): %s/%s\n",
                        (uint32_t)msg->any.head.msgid,
                        hammer2_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
@@ -436,30 +474,77 @@ hammer2_lnk_conn(hammer2_msg_t *msg)
 
                RB_INIT(&conn->tree);
                conn->state = state;
 
                RB_INIT(&conn->tree);
                conn->state = state;
+               state->func = hammer2_lnk_conn;
                state->any.conn = conn;
                TAILQ_INSERT_TAIL(&connq, conn, entry);
 
                state->any.conn = conn;
                TAILQ_INSERT_TAIL(&connq, conn, entry);
 
-               hammer2_msg_result(msg, 0);
-
-#if 0
                /*
                /*
-                * Span-synchronize all nodes with the new connection
+                * Set up media
                 */
                 */
-               hammer2_relay_scan(conn, NULL);
-#endif
-               hammer2_router_signal(msg->router);
-       }
+               TAILQ_FOREACH(media, &mediaq, entry) {
+                       if (uuid_compare(&msg->any.lnk_conn.mediaid,
+                                        &media->mediaid, NULL) == 0) {
+                               break;
+                       }
+               }
+               if (media == NULL) {
+                       media = hammer2_alloc(sizeof(*media));
+                       media->mediaid = msg->any.lnk_conn.mediaid;
+                       TAILQ_INSERT_TAIL(&mediaq, media, entry);
+               }
+               conn->media = media;
+               ++media->refs;
 
 
-       /*
-        * On transaction terminate we clean out our h2span_connect
-        * and acknowledge the request, closing the transaction.
-        */
-       if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+               if ((msg->any.head.cmd & HAMMER2_MSGF_DELETE) == 0) {
+                       hammer2_msg_result(msg, 0);
+                       hammer2_router_signal(msg->router);
+                       break;
+               }
+               /* FALL THROUGH */
+       case HAMMER2_LNK_CONN | HAMMER2_MSGF_DELETE:
+       case HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE:
+deleteconn:
+               /*
+                * On transaction terminate we clean out our h2span_connect
+                * and acknowledge the request, closing the transaction.
+                */
                fprintf(stderr, "LNK_CONN: Terminated\n");
                conn = state->any.conn;
                assert(conn);
 
                /*
                fprintf(stderr, "LNK_CONN: Terminated\n");
                conn = state->any.conn;
                assert(conn);
 
                /*
+                * Clean out the media structure. If refs drops to zero we
+                * also clean out the media config threads.  These threads
+                * maintain span connections to other hammer2 service daemons.
+                */
+               media = conn->media;
+               if (--media->refs == 0) {
+                       fprintf(stderr, "Shutting down media spans\n");
+                       for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
+                               conf = &media->config[i];
+
+                               if (conf->thread == NULL)
+                                       continue;
+                               conf->ctl = H2CONFCTL_STOP;
+                               pthread_cond_signal(&conf->cond);
+                       }
+                       for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
+                               conf = &media->config[i];
+
+                               if (conf->thread == NULL)
+                                       continue;
+                               pthread_mutex_unlock(&cluster_mtx);
+                               pthread_join(conf->thread, NULL);
+                               pthread_mutex_lock(&cluster_mtx);
+                               conf->thread = NULL;
+                               pthread_cond_destroy(&conf->cond);
+                       }
+                       fprintf(stderr, "Media shutdown complete\n");
+                       TAILQ_REMOVE(&mediaq, media, entry);
+                       hammer2_free(media);
+               }
+
+               /*
                 * Clean out all relays.  This requires terminating each
                 * relay transaction.
                 */
                 * Clean out all relays.  This requires terminating each
                 * relay transaction.
                 */
@@ -470,6 +555,7 @@ hammer2_lnk_conn(hammer2_msg_t *msg)
                /*
                 * Clean out conn
                 */
                /*
                 * Clean out conn
                 */
+               conn->media = NULL;
                conn->state = NULL;
                msg->state->any.conn = NULL;
                TAILQ_REMOVE(&connq, conn, entry);
                conn->state = NULL;
                msg->state->any.conn = NULL;
                TAILQ_REMOVE(&connq, conn, entry);
@@ -477,6 +563,49 @@ hammer2_lnk_conn(hammer2_msg_t *msg)
 
                hammer2_msg_reply(msg, 0);
                /* state invalid after reply */
 
                hammer2_msg_reply(msg, 0);
                /* state invalid after reply */
+               break;
+       case HAMMER2_LNK_VOLCONF:
+               /*
+                * One-way volume-configuration message is transmitted
+                * over the open LNK_CONN transaction.
+                */
+               fprintf(stderr, "RECEIVED VOLCONF\n");
+               if (msg->any.lnk_volconf.index < 0 ||
+                   msg->any.lnk_volconf.index >= HAMMER2_COPYID_COUNT) {
+                       fprintf(stderr, "VOLCONF: ILLEGAL INDEX %d\n",
+                               msg->any.lnk_volconf.index);
+                       break;
+               }
+               if (msg->any.lnk_volconf.copy.path[sizeof(msg->any.lnk_volconf.copy.path) - 1] != 0 ||
+                   msg->any.lnk_volconf.copy.path[0] == 0) {
+                       fprintf(stderr, "VOLCONF: ILLEGAL PATH %d\n",
+                               msg->any.lnk_volconf.index);
+                       break;
+               }
+               conn = msg->state->any.conn;
+               if (conn == NULL) {
+                       fprintf(stderr, "VOLCONF: LNK_CONN is missing\n");
+                       break;
+               }
+               conf = &conn->media->config[msg->any.lnk_volconf.index];
+               conf->copy_pend = msg->any.lnk_volconf.copy;
+               conf->ctl |= H2CONFCTL_UPDATE;
+               if (conf->thread == NULL) {
+                       fprintf(stderr, "VOLCONF THREAD STARTED\n");
+                       pthread_cond_init(&conf->cond, NULL);
+                       pthread_create(&conf->thread, NULL,
+                                      hammer2_volconf_thread, (void *)conf);
+               }
+               pthread_cond_signal(&conf->cond);
+               break;
+       default:
+               /*
+                * Failsafe
+                */
+               if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
+                       goto deleteconn;
+               hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
+               break;
        }
        pthread_mutex_unlock(&cluster_mtx);
 }
        }
        pthread_mutex_unlock(&cluster_mtx);
 }
@@ -918,6 +1047,92 @@ hammer2_relay_delete(h2span_relay_t *relay)
        hammer2_free(relay);
 }
 
        hammer2_free(relay);
 }
 
+static void *
+hammer2_volconf_thread(void *info)
+{
+       h2span_media_config_t *conf = info;
+
+       pthread_mutex_lock(&cluster_mtx);
+       while ((conf->ctl & H2CONFCTL_STOP) == 0) {
+               if (conf->ctl & H2CONFCTL_UPDATE) {
+                       fprintf(stderr, "VOLCONF UPDATE\n");
+                       conf->ctl &= ~H2CONFCTL_UPDATE;
+                       if (bcmp(&conf->copy_run, &conf->copy_pend,
+                                sizeof(conf->copy_run)) == 0) {
+                               fprintf(stderr, "VOLCONF: no changes\n");
+                               continue;
+                       }
+                       /*
+                        * XXX TODO - auto reconnect on lookup failure or
+                        *              connect failure or stream failure.
+                        */
+
+                       pthread_mutex_unlock(&cluster_mtx);
+                       hammer2_volconf_stop(conf);
+                       conf->copy_run = conf->copy_pend;
+                       if (conf->copy_run.copyid != 0 &&
+                           strncmp(conf->copy_run.path, "span:", 5) == 0) {
+                               hammer2_volconf_start(conf,
+                                                     conf->copy_run.path + 5);
+                       }
+                       pthread_mutex_lock(&cluster_mtx);
+                       fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
+               }
+               if (conf->state == H2MC_CONNECT) {
+                       hammer2_volconf_start(conf, conf->copy_run.path + 5);
+                       pthread_mutex_unlock(&cluster_mtx);
+                       sleep(5);
+                       pthread_mutex_lock(&cluster_mtx);
+               } else {
+                       pthread_cond_wait(&conf->cond, &cluster_mtx);
+               }
+       }
+       pthread_mutex_unlock(&cluster_mtx);
+       hammer2_volconf_stop(conf);
+       return(NULL);
+}
+
+static
+void
+hammer2_volconf_stop(h2span_media_config_t *conf)
+{
+       switch(conf->state) {
+       case H2MC_STOPPED:
+               break;
+       case H2MC_CONNECT:
+               conf->state = H2MC_STOPPED;
+               break;
+       case H2MC_RUNNING:
+               shutdown(conf->fd, SHUT_WR);
+               pthread_join(conf->iocom_thread, NULL);
+               conf->iocom_thread = NULL;
+               break;
+       }
+}
+
+static
+void
+hammer2_volconf_start(h2span_media_config_t *conf, const char *hostname)
+{
+       switch(conf->state) {
+       case H2MC_STOPPED:
+       case H2MC_CONNECT:
+               conf->fd = hammer2_connect(hostname);
+               if (conf->fd < 0) {
+                       fprintf(stderr, "Unable to connect to %s\n", hostname);
+                       conf->state = H2MC_CONNECT;
+               } else {
+                       pthread_create(&conf->iocom_thread, NULL,
+                                      master_service,
+                                      (void *)(intptr_t)conf->fd);
+                       conf->state = H2MC_RUNNING;
+               }
+               break;
+       case H2MC_RUNNING:
+               break;
+       }
+}
+
 /************************************************************************
  *                     ROUTER AND MESSAGING HANDLES                    *
  ************************************************************************
 /************************************************************************
  *                     ROUTER AND MESSAGING HANDLES                    *
  ************************************************************************
index 46399fc..565d387 100644 (file)
@@ -245,8 +245,8 @@ struct hammer2_router {
        struct hammer2_state_tree statewr_tree; /* active messages */
        hammer2_msg_queue_t txmsgq;             /* tx msgq from remote */
        uint64_t        target;                 /* for routing */
        struct hammer2_state_tree statewr_tree; /* active messages */
        hammer2_msg_queue_t txmsgq;             /* tx msgq from remote */
        uint64_t        target;                 /* for routing */
-       int     flags;
-       int     refs;                           /* refs prevent destruction */
+       int             flags;
+       int             refs;                   /* refs prevent destruction */
 };
 
 #define HAMMER2_ROUTER_CONNECTED       0x0001  /* on global RB tree */
 };
 
 #define HAMMER2_ROUTER_CONNECTED       0x0001  /* on global RB tree */
@@ -288,12 +288,6 @@ typedef struct hammer2_iocom hammer2_iocom_t;
 #define HAMMER2_IOCOMF_SWORK   0x00000100      /* immediate work pending */
 #define HAMMER2_IOCOMF_CRYPTED 0x00000200      /* encrypt enabled */
 
 #define HAMMER2_IOCOMF_SWORK   0x00000100      /* immediate work pending */
 #define HAMMER2_IOCOMF_CRYPTED 0x00000200      /* encrypt enabled */
 
-
-
-
-
-
-
 /*
  * Crypto algorithm table and related typedefs.
  */
 /*
  * Crypto algorithm table and related typedefs.
  */
index 068f94f..3000763 100644 (file)
@@ -338,6 +338,7 @@ struct hammer2_pfsmount {
        uint32_t                msgid_iterator;
        struct lock             msglk;          /* lockmgr lock */
        TAILQ_HEAD(, hammer2_msg) msgq;         /* transmit queue */
        uint32_t                msgid_iterator;
        struct lock             msglk;          /* lockmgr lock */
        TAILQ_HEAD(, hammer2_msg) msgq;         /* transmit queue */
+       struct hammer2_state    *conn_state;    /* active LNK_CONN state */
        struct hammer2_state    *freerd_state;  /* allocation cache */
        struct hammer2_state    *freewr_state;  /* allocation cache */
        struct hammer2_state_tree staterd_tree; /* active messages */
        struct hammer2_state    *freerd_state;  /* allocation cache */
        struct hammer2_state    *freewr_state;  /* allocation cache */
        struct hammer2_state_tree staterd_tree; /* active messages */
@@ -347,7 +348,11 @@ struct hammer2_pfsmount {
 
 typedef struct hammer2_pfsmount hammer2_pfsmount_t;
 
 
 typedef struct hammer2_pfsmount hammer2_pfsmount_t;
 
-#define HAMMER2_CLUSTERCTL_KILL        0x0001
+/*
+ * msg_ctl flags (atomic)
+ */
+#define HAMMER2_CLUSTERCTL_KILL                0x00000001
+#define HAMMER2_CLUSTERCTL_SLEEPING    0x00000002 /* interlocked w/msglk */
 
 /*
  * Transactional state structure, representing an open transaction.  The
 
 /*
  * Transactional state structure, representing an open transaction.  The
@@ -557,10 +562,10 @@ void hammer2_state_cleanuptx(hammer2_msg_t *msg);
 int hammer2_msg_execute(hammer2_msg_t *msg);
 void hammer2_state_free(hammer2_state_t *state);
 void hammer2_msg_free(hammer2_msg_t *msg);
 int hammer2_msg_execute(hammer2_msg_t *msg);
 void hammer2_state_free(hammer2_state_t *state);
 void hammer2_msg_free(hammer2_msg_t *msg);
-hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd);
-void hammer2_msg_write(hammer2_msg_t *msg,
+hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
                                int (*func)(hammer2_state_t *, hammer2_msg_t *),
                                void *data);
                                int (*func)(hammer2_state_t *, hammer2_msg_t *),
                                void *data);
+void hammer2_msg_write(hammer2_msg_t *msg);
 void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error);
 void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error);
 
 void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error);
 void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error);
 
@@ -571,6 +576,13 @@ int hammer2_msg_dbg_rcvmsg(hammer2_msg_t *msg);
 int hammer2_msg_adhoc_input(hammer2_msg_t *msg);
 
 /*
 int hammer2_msg_adhoc_input(hammer2_msg_t *msg);
 
 /*
+ * hammer2_vfsops.c
+ */
+void hammer2_clusterctl_wakeup(hammer2_pfsmount_t *pmp);
+void hammer2_volconf_update(hammer2_pfsmount_t *pmp, int index);
+void hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp);
+
+/*
  * hammer2_freemap.c
  */
 hammer2_off_t hammer2_freemap_alloc(hammer2_mount_t *hmp,
  * hammer2_freemap.c
  */
 hammer2_off_t hammer2_freemap_alloc(hammer2_mount_t *hmp,
index 7df7e88..56aaab2 100644 (file)
@@ -43,7 +43,8 @@
 #include "hammer2.h"
 
 static int hammer2_ioctl_version_get(hammer2_inode_t *ip, void *data);
 #include "hammer2.h"
 
 static int hammer2_ioctl_version_get(hammer2_inode_t *ip, void *data);
-static int hammer2_ioctl_remote_get(hammer2_inode_t *ip, void *data);
+static int hammer2_ioctl_recluster(hammer2_inode_t *ip, void *data);
+static int hammer2_ioctl_remote_scan(hammer2_inode_t *ip, void *data);
 static int hammer2_ioctl_remote_add(hammer2_inode_t *ip, void *data);
 static int hammer2_ioctl_remote_del(hammer2_inode_t *ip, void *data);
 static int hammer2_ioctl_remote_rep(hammer2_inode_t *ip, void *data);
 static int hammer2_ioctl_remote_add(hammer2_inode_t *ip, void *data);
 static int hammer2_ioctl_remote_del(hammer2_inode_t *ip, void *data);
 static int hammer2_ioctl_remote_rep(hammer2_inode_t *ip, void *data);
@@ -72,9 +73,13 @@ hammer2_ioctl(hammer2_inode_t *ip, u_long com, void *data, int fflag,
        case HAMMER2IOC_VERSION_GET:
                error = hammer2_ioctl_version_get(ip, data);
                break;
        case HAMMER2IOC_VERSION_GET:
                error = hammer2_ioctl_version_get(ip, data);
                break;
-       case HAMMER2IOC_REMOTE_GET:
+       case HAMMER2IOC_RECLUSTER:
                if (error == 0)
                if (error == 0)
-                       error = hammer2_ioctl_remote_get(ip, data);
+                       error = hammer2_ioctl_recluster(ip, data);
+               break;
+       case HAMMER2IOC_REMOTE_SCAN:
+               if (error == 0)
+                       error = hammer2_ioctl_remote_scan(ip, data);
                break;
        case HAMMER2IOC_REMOTE_ADD:
                if (error == 0)
                break;
        case HAMMER2IOC_REMOTE_ADD:
                if (error == 0)
@@ -139,11 +144,27 @@ hammer2_ioctl_version_get(hammer2_inode_t *ip, void *data)
        return 0;
 }
 
        return 0;
 }
 
+static int
+hammer2_ioctl_recluster(hammer2_inode_t *ip, void *data)
+{
+       hammer2_ioc_recluster_t *recl = data;
+       struct file *fp;
+
+       fp = holdfp(curproc->p_fd, recl->fd, -1);
+       if (fp) {
+               kprintf("reconnect to cluster\n");
+               hammer2_cluster_reconnect(ip->pmp, fp);
+               return 0;
+       } else {
+               return EINVAL;
+       }
+}
+
 /*
  * Retrieve information about a remote
  */
 static int
 /*
  * Retrieve information about a remote
  */
 static int
-hammer2_ioctl_remote_get(hammer2_inode_t *ip, void *data)
+hammer2_ioctl_remote_scan(hammer2_inode_t *ip, void *data)
 {
        hammer2_mount_t *hmp = ip->hmp;
        hammer2_ioc_remote_t *remote = data;
 {
        hammer2_mount_t *hmp = ip->hmp;
        hammer2_ioc_remote_t *remote = data;
@@ -161,7 +182,7 @@ hammer2_ioctl_remote_get(hammer2_inode_t *ip, void *data)
         */
        while (++copyid < HAMMER2_COPYID_COUNT &&
               hmp->voldata.copyinfo[copyid].copyid == 0) {
         */
        while (++copyid < HAMMER2_COPYID_COUNT &&
               hmp->voldata.copyinfo[copyid].copyid == 0) {
-               ++copyid;
+               ;
        }
        if (copyid == HAMMER2_COPYID_COUNT)
                remote->nextid = -1;
        }
        if (copyid == HAMMER2_COPYID_COUNT)
                remote->nextid = -1;
@@ -178,6 +199,7 @@ static int
 hammer2_ioctl_remote_add(hammer2_inode_t *ip, void *data)
 {
        hammer2_mount_t *hmp = ip->hmp;
 hammer2_ioctl_remote_add(hammer2_inode_t *ip, void *data)
 {
        hammer2_mount_t *hmp = ip->hmp;
+       hammer2_pfsmount_t *pmp = ip->pmp;
        hammer2_ioc_remote_t *remote = data;
        int copyid = remote->copyid;
        int error = 0;
        hammer2_ioc_remote_t *remote = data;
        int copyid = remote->copyid;
        int error = 0;
@@ -197,9 +219,9 @@ hammer2_ioctl_remote_add(hammer2_inode_t *ip, void *data)
                }
        }
        hammer2_modify_volume(hmp);
                }
        }
        hammer2_modify_volume(hmp);
-       kprintf("copyid %d\n", copyid);
        remote->copy1.copyid = copyid;
        hmp->voldata.copyinfo[copyid] = remote->copy1;
        remote->copy1.copyid = copyid;
        hmp->voldata.copyinfo[copyid] = remote->copy1;
+       hammer2_volconf_update(pmp, copyid);
 failed:
        hammer2_voldata_unlock(hmp);
        return (error);
 failed:
        hammer2_voldata_unlock(hmp);
        return (error);
@@ -212,6 +234,7 @@ static int
 hammer2_ioctl_remote_del(hammer2_inode_t *ip, void *data)
 {
        hammer2_mount_t *hmp = ip->hmp;
 hammer2_ioctl_remote_del(hammer2_inode_t *ip, void *data)
 {
        hammer2_mount_t *hmp = ip->hmp;
+       hammer2_pfsmount_t *pmp = ip->pmp;
        hammer2_ioc_remote_t *remote = data;
        int copyid = remote->copyid;
        int error = 0;
        hammer2_ioc_remote_t *remote = data;
        int copyid = remote->copyid;
        int error = 0;
@@ -236,6 +259,7 @@ hammer2_ioctl_remote_del(hammer2_inode_t *ip, void *data)
        }
        hammer2_modify_volume(hmp);
        hmp->voldata.copyinfo[copyid].copyid = 0;
        }
        hammer2_modify_volume(hmp);
        hmp->voldata.copyinfo[copyid].copyid = 0;
+       hammer2_volconf_update(pmp, copyid);
 failed:
        hammer2_voldata_unlock(hmp);
        return (error);
 failed:
        hammer2_voldata_unlock(hmp);
        return (error);
@@ -255,6 +279,7 @@ hammer2_ioctl_remote_rep(hammer2_inode_t *ip, void *data)
                return (EINVAL);
 
        hammer2_voldata_lock(hmp);
                return (EINVAL);
 
        hammer2_voldata_lock(hmp);
+       /*hammer2_volconf_update(pmp, copyid);*/
        hammer2_voldata_unlock(hmp);
 
        return(0);
        hammer2_voldata_unlock(hmp);
 
        return(0);
index 6d96b28..f0f13e6 100644 (file)
@@ -55,6 +55,13 @@ struct hammer2_ioc_version {
 
 typedef struct hammer2_ioc_version hammer2_ioc_version_t;
 
 
 typedef struct hammer2_ioc_version hammer2_ioc_version_t;
 
+struct hammer2_ioc_recluster {
+       int                     fd;
+       char                    reserved[256 - 4];
+};
+
+typedef struct hammer2_ioc_recluster hammer2_ioc_recluster_t;
+
 /*
  * Ioctls to manage the volume->copyinfo[] array and to associate or
  * disassociate sockets
 /*
  * Ioctls to manage the volume->copyinfo[] array and to associate or
  * disassociate sockets
@@ -114,8 +121,9 @@ typedef struct hammer2_ioc_inode hammer2_ioc_inode_t;
  */
 
 #define HAMMER2IOC_VERSION_GET _IOWR('h', 64, struct hammer2_ioc_version)
  */
 
 #define HAMMER2IOC_VERSION_GET _IOWR('h', 64, struct hammer2_ioc_version)
+#define HAMMER2IOC_RECLUSTER   _IOWR('h', 65, struct hammer2_ioc_recluster)
 
 
-#define HAMMER2IOC_REMOTE_GET  _IOWR('h', 68, struct hammer2_ioc_remote)
+#define HAMMER2IOC_REMOTE_SCAN _IOWR('h', 68, struct hammer2_ioc_remote)
 #define HAMMER2IOC_REMOTE_ADD  _IOWR('h', 69, struct hammer2_ioc_remote)
 #define HAMMER2IOC_REMOTE_DEL  _IOWR('h', 70, struct hammer2_ioc_remote)
 #define HAMMER2IOC_REMOTE_REP  _IOWR('h', 71, struct hammer2_ioc_remote)
 #define HAMMER2IOC_REMOTE_ADD  _IOWR('h', 69, struct hammer2_ioc_remote)
 #define HAMMER2IOC_REMOTE_DEL  _IOWR('h', 70, struct hammer2_ioc_remote)
 #define HAMMER2IOC_REMOTE_REP  _IOWR('h', 71, struct hammer2_ioc_remote)
index ef84f34..272fa5a 100644 (file)
@@ -585,14 +585,17 @@ hammer2_state_free(hammer2_state_t *state)
 }
 
 hammer2_msg_t *
 }
 
 hammer2_msg_t *
-hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd)
+hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
+                 int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
 {
 {
+       hammer2_pfsmount_t *pmp = router->pmp;
        hammer2_msg_t *msg;
        hammer2_msg_t *msg;
+       hammer2_state_t *state;
        size_t hbytes;
 
        hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
        msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
        size_t hbytes;
 
        hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
        msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
-                     router->pmp->mmsg, M_WAITOK | M_ZERO);
+                     pmp->mmsg, M_WAITOK | M_ZERO);
        msg->hdr_size = hbytes;
        msg->router = router;
        KKASSERT(router != NULL);
        msg->hdr_size = hbytes;
        msg->router = router;
        KKASSERT(router != NULL);
@@ -601,6 +604,32 @@ hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd)
        msg->any.head.target = router->target;
        msg->any.head.cmd = cmd;
 
        msg->any.head.target = router->target;
        msg->any.head.cmd = cmd;
 
+       if (cmd & HAMMER2_MSGF_CREATE) {
+               /*
+                * New transaction, requires tracking state and a unique
+                * msgid to be allocated.
+                */
+               KKASSERT(msg->state == NULL);
+               state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
+               state->pmp = pmp;
+               state->flags = HAMMER2_STATE_DYNAMIC;
+               state->func = func;
+               state->any.any = data;
+               state->msg = msg;
+               state->msgid = (uint64_t)(uintptr_t)state;
+               state->router = msg->router;
+               msg->state = state;
+               msg->any.head.source = 0;
+               msg->any.head.target = state->router->target;
+               msg->any.head.msgid = state->msgid;
+
+               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+               if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
+                       panic("duplicate msgid allocated");
+               msg->any.head.msgid = state->msgid;
+               lockmgr(&pmp->msglk, LK_RELEASE);
+       }
+
        return (msg);
 }
 
        return (msg);
 }
 
@@ -653,8 +682,7 @@ hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
  * does not write to the message socket/pipe.
  */
 void
  * does not write to the message socket/pipe.
  */
 void
-hammer2_msg_write(hammer2_msg_t *msg,
-                 int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
+hammer2_msg_write(hammer2_msg_t *msg)
 {
        hammer2_pfsmount_t *pmp = msg->router->pmp;
        hammer2_state_t *state;
 {
        hammer2_pfsmount_t *pmp = msg->router->pmp;
        hammer2_state_t *state;
@@ -672,33 +700,6 @@ hammer2_msg_write(hammer2_msg_t *msg,
                msg->any.head.source = 0;
                msg->any.head.target = state->router->target;
                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
                msg->any.head.source = 0;
                msg->any.head.target = state->router->target;
                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-               if (func) {
-                       state->func = func;
-                       state->any.any = data;
-               }
-       } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
-               /*
-                * New transaction, requires tracking state and a unique
-                * msgid to be allocated.
-                */
-               KKASSERT(msg->state == NULL);
-               state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
-               state->pmp = pmp;
-               state->flags = HAMMER2_STATE_DYNAMIC;
-               state->func = func;
-               state->any.any = data;
-               state->msg = msg;
-               state->msgid = (uint64_t)(uintptr_t)state;
-               state->router = msg->router;
-               msg->state = state;
-               msg->any.head.source = 0;
-               msg->any.head.target = state->router->target;
-               msg->any.head.msgid = state->msgid;
-
-               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-               if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
-                       panic("duplicate msgid allocated");
-               msg->any.head.msgid = state->msgid;
        } else {
                /*
                 * One-off message (always uses msgid 0 to distinguish
        } else {
                /*
                 * One-off message (always uses msgid 0 to distinguish
@@ -721,6 +722,7 @@ hammer2_msg_write(hammer2_msg_t *msg,
        msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
 
        TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
        msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
 
        TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
+       hammer2_clusterctl_wakeup(pmp);
        lockmgr(&pmp->msglk, LK_RELEASE);
 }
 
        lockmgr(&pmp->msglk, LK_RELEASE);
 }
 
@@ -765,10 +767,13 @@ hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
-       nmsg = hammer2_msg_alloc(msg->router, cmd);
+       /* XXX messy mask cmd to avoid allocating state */
+       nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK,
+                                NULL, NULL);
+       nmsg->any.head.cmd = cmd;
        nmsg->any.head.error = error;
        nmsg->state = state;
        nmsg->any.head.error = error;
        nmsg->state = state;
-       hammer2_msg_write(nmsg, NULL, NULL);
+       hammer2_msg_write(nmsg);
 }
 
 /*
 }
 
 /*
@@ -813,8 +818,11 @@ hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
-       nmsg = hammer2_msg_alloc(msg->router, cmd);
+       /* XXX messy mask cmd to avoid allocating state */
+       nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK,
+                                NULL, NULL);
+       nmsg->any.head.cmd = cmd;
        nmsg->any.head.error = error;
        nmsg->state = state;
        nmsg->any.head.error = error;
        nmsg->state = state;
-       hammer2_msg_write(nmsg, NULL, NULL);
+       hammer2_msg_write(nmsg);
 }
 }
index 882141c..533e115 100644 (file)
@@ -314,37 +314,44 @@ typedef struct hammer2_msg_hdr hammer2_msg_hdr_t;
  * Link layer ops basically talk to just the other side of a direct
  * connection.
  *
  * Link layer ops basically talk to just the other side of a direct
  * connection.
  *
- * PAD         - One-way message on link-0, ignored by target.  Used to
+ * LNK_PAD     - One-way message on link-0, ignored by target.  Used to
  *               pad message buffers on shared-memory transports.  Not
  *               typically used with TCP.
  *
  *               pad message buffers on shared-memory transports.  Not
  *               typically used with TCP.
  *
- * PING                - One-way message on link-0, keep-alive, run by both sides
+ * LNK_PING    - One-way message on link-0, keep-alive, run by both sides
  *               typically 1/sec on idle link, link is lost after 10 seconds
  *               of inactivity.
  *
  *               typically 1/sec on idle link, link is lost after 10 seconds
  *               of inactivity.
  *
- * AUTH                - Authenticate the connection, negotiate administrative
+ * LNK_AUTH    - Authenticate the connection, negotiate administrative
  *               rights & encryption, protocol class, etc.  Only PAD and
  *               AUTH messages (not even PING) are accepted until
  *               authentication is complete.  This message also identifies
  *               the host.
  *
  *               rights & encryption, protocol class, etc.  Only PAD and
  *               AUTH messages (not even PING) are accepted until
  *               authentication is complete.  This message also identifies
  *               the host.
  *
- * CONN                - Enable the SPAN protocol on link-0, possibly also installing
+ * LNK_CONN    - Enable the SPAN protocol on link-0, possibly also installing
  *               a PFS filter (by cluster id, unique id, and/or wildcarded
  *               name).
  *
  *               a PFS filter (by cluster id, unique id, and/or wildcarded
  *               name).
  *
- * SPAN                - A SPAN transaction on link-0 enables messages to be relayed
+ * LNK_SPAN    - A SPAN transaction on link-0 enables messages to be relayed
  *               to/from a particular cluster node.  SPANs are received,
  *               sorted, aggregated, and retransmitted back out across all
  *               applicable connections.
  *
  *               The leaf protocol also uses this to make a PFS available
  *               to the cluster (e.g. on-mount).
  *               to/from a particular cluster node.  SPANs are received,
  *               sorted, aggregated, and retransmitted back out across all
  *               applicable connections.
  *
  *               The leaf protocol also uses this to make a PFS available
  *               to the cluster (e.g. on-mount).
+ *
+ * LNK_VOLCONF - Volume header configuration change.  All hammer2
+ *               connections (hammer2 connect ...) stored in the volume
+ *               header are spammed at the link level to the hammer2
+ *               service daemon, and any live configuration change
+ *               thereafter.
  */
 #define HAMMER2_LNK_PAD                HAMMER2_MSG_LNK(0x000, hammer2_msg_hdr)
 #define HAMMER2_LNK_PING       HAMMER2_MSG_LNK(0x001, hammer2_msg_hdr)
 #define HAMMER2_LNK_AUTH       HAMMER2_MSG_LNK(0x010, hammer2_lnk_auth)
 #define HAMMER2_LNK_CONN       HAMMER2_MSG_LNK(0x011, hammer2_lnk_conn)
 #define HAMMER2_LNK_SPAN       HAMMER2_MSG_LNK(0x012, hammer2_lnk_span)
  */
 #define HAMMER2_LNK_PAD                HAMMER2_MSG_LNK(0x000, hammer2_msg_hdr)
 #define HAMMER2_LNK_PING       HAMMER2_MSG_LNK(0x001, hammer2_msg_hdr)
 #define HAMMER2_LNK_AUTH       HAMMER2_MSG_LNK(0x010, hammer2_lnk_auth)
 #define HAMMER2_LNK_CONN       HAMMER2_MSG_LNK(0x011, hammer2_lnk_conn)
 #define HAMMER2_LNK_SPAN       HAMMER2_MSG_LNK(0x012, hammer2_lnk_span)
+#define HAMMER2_LNK_VOLCONF    HAMMER2_MSG_LNK(0x020, hammer2_lnk_volconf)
 #define HAMMER2_LNK_ERROR      HAMMER2_MSG_LNK(0xFFF, hammer2_msg_hdr)
 
 /*
 #define HAMMER2_LNK_ERROR      HAMMER2_MSG_LNK(0xFFF, hammer2_msg_hdr)
 
 /*
@@ -368,6 +375,7 @@ struct hammer2_lnk_auth {
 
 struct hammer2_lnk_conn {
        hammer2_msg_hdr_t head;
 
 struct hammer2_lnk_conn {
        hammer2_msg_hdr_t head;
+       uuid_t          mediaid;        /* media configuration id */
        uuid_t          pfs_clid;       /* rendezvous pfs uuid */
        uuid_t          pfs_fsid;       /* unique pfs uuid */
        uint8_t         pfs_type;       /* peer type */
        uuid_t          pfs_clid;       /* rendezvous pfs uuid */
        uuid_t          pfs_fsid;       /* unique pfs uuid */
        uint8_t         pfs_type;       /* peer type */
@@ -443,6 +451,20 @@ typedef struct hammer2_lnk_span hammer2_lnk_span_t;
 #define HAMMER2_SPAN_PROTO_1   1
 
 /*
 #define HAMMER2_SPAN_PROTO_1   1
 
 /*
+ * LNK_VOLCONF
+ */
+struct hammer2_lnk_volconf {
+       hammer2_msg_hdr_t       head;
+       hammer2_copy_data_t     copy;   /* copy spec */
+       int32_t                 index;
+       int32_t                 unused01;
+       uuid_t                  mediaid;
+       int64_t                 reserved02[32];
+};
+
+typedef struct hammer2_lnk_volconf hammer2_lnk_volconf_t;
+
+/*
  * Debug layer ops operate on any link
  *
  * SHELL       - Persist stream, access the debug shell on the target
  * Debug layer ops operate on any link
  *
  * SHELL       - Persist stream, access the debug shell on the target
@@ -539,6 +561,7 @@ union hammer2_msg_any {
        hammer2_msg_hdr_t       head;
        hammer2_lnk_span_t      lnk_span;
        hammer2_lnk_conn_t      lnk_conn;
        hammer2_msg_hdr_t       head;
        hammer2_lnk_span_t      lnk_span;
        hammer2_lnk_conn_t      lnk_conn;
+       hammer2_lnk_volconf_t   lnk_volconf;
 };
 
 typedef union hammer2_msg_any hammer2_msg_any_t;
 };
 
 typedef union hammer2_msg_any hammer2_msg_any_t;
index 3bb3864..7ebe6a5 100644 (file)
@@ -1007,6 +1007,26 @@ hammer2_install_volume_header(hammer2_mount_t *hmp)
 }
 
 /*
 }
 
 /*
+ * Reconnect using the passed file pointer.  The caller must ref the
+ * fp for us.
+ */
+void
+hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp)
+{
+       atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
+       while (pmp->msgrd_td || pmp->msgwr_td) {
+              wakeup(&pmp->msg_ctl);
+              tsleep(pmp, 0, "clstrkl", hz);
+       }
+       atomic_clear_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
+       pmp->msg_fp = fp;
+       lwkt_create(hammer2_cluster_thread_rd, pmp, &pmp->msgrd_td,
+                   NULL, 0, -1, "hammer2-msgrd");
+       lwkt_create(hammer2_cluster_thread_wr, pmp, &pmp->msgwr_td,
+                   NULL, 0, -1, "hammer2-msgwr");
+}
+
+/*
  * Cluster controller thread.  Perform messaging functions.  We have one
  * thread for the reader and one for the writer.  The writer handles
  * shutdown requests (which should break the reader thread).
  * Cluster controller thread.  Perform messaging functions.  We have one
  * thread for the reader and one for the writer.  The writer handles
  * shutdown requests (which should break the reader thread).
@@ -1043,7 +1063,10 @@ hammer2_cluster_thread_rd(void *arg)
                        error = EINVAL;
                        break;
                }
                        error = EINVAL;
                        break;
                }
-               msg = hammer2_msg_alloc(&pmp->router, hdr.cmd);
+               /* XXX messy: mask cmd to avoid allocating state */
+               msg = hammer2_msg_alloc(&pmp->router,
+                                       hdr.cmd & HAMMER2_MSGF_BASECMDMASK,
+                                       NULL, NULL);
                msg->any.head = hdr;
                msg->hdr_size = hbytes;
                if (hbytes > sizeof(hdr)) {
                msg->any.head = hdr;
                msg->hdr_size = hbytes;
                if (hbytes > sizeof(hdr)) {
@@ -1136,9 +1159,30 @@ hammer2_cluster_thread_rd(void *arg)
                hammer2_state_free(state);
        }
 
                hammer2_state_free(state);
        }
 
+       /*
+        * XXX simulate MSGF_DELETEs
+        */
        while ((state = RB_ROOT(&pmp->staterd_tree)) != NULL) {
        while ((state = RB_ROOT(&pmp->staterd_tree)) != NULL) {
-               RB_REMOVE(hammer2_state_tree, &pmp->staterd_tree, state);
-               hammer2_state_free(state);
+               kprintf("y");
+               if (state->func &&
+                   (state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
+                   (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) {
+                       lockmgr(&pmp->msglk, LK_RELEASE);
+                       msg = hammer2_msg_alloc(&pmp->router,
+                                               HAMMER2_LNK_ERROR,
+                                               NULL, NULL);
+                       if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
+                               msg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+                       msg->any.head.cmd |= HAMMER2_MSGF_DELETE;
+                       msg->state = state;
+                       msg->state->func(state, msg);
+                       hammer2_state_cleanuprx(msg);
+                       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+               } else {
+                       RB_REMOVE(hammer2_state_tree,
+                                 &pmp->staterd_tree, state);
+                       hammer2_state_free(state);
+               }
        }
        lockmgr(&pmp->msglk, LK_RELEASE);
 
        }
        lockmgr(&pmp->msglk, LK_RELEASE);
 
@@ -1172,7 +1216,8 @@ hammer2_cluster_thread_wr(void *arg)
         * connection.
         */
        msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_CONN |
         * connection.
         */
        msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_CONN |
-                                             HAMMER2_MSGF_CREATE);
+                                             HAMMER2_MSGF_CREATE,
+                               hammer2_msg_conn_reply, pmp);
        msg->any.lnk_conn.pfs_clid = pmp->iroot->ip_data.pfs_clid;
        msg->any.lnk_conn.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
        msg->any.lnk_conn.pfs_type = pmp->iroot->ip_data.pfs_type;
        msg->any.lnk_conn.pfs_clid = pmp->iroot->ip_data.pfs_clid;
        msg->any.lnk_conn.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
        msg->any.lnk_conn.pfs_type = pmp->iroot->ip_data.pfs_type;
@@ -1181,8 +1226,9 @@ hammer2_cluster_thread_wr(void *arg)
        if (name_len >= sizeof(msg->any.lnk_conn.label))
                name_len = sizeof(msg->any.lnk_conn.label) - 1;
        bcopy(pmp->iroot->ip_data.filename, msg->any.lnk_conn.label, name_len);
        if (name_len >= sizeof(msg->any.lnk_conn.label))
                name_len = sizeof(msg->any.lnk_conn.label) - 1;
        bcopy(pmp->iroot->ip_data.filename, msg->any.lnk_conn.label, name_len);
+       pmp->conn_state = msg->state;
        msg->any.lnk_conn.label[name_len] = 0;
        msg->any.lnk_conn.label[name_len] = 0;
-       hammer2_msg_write(msg, hammer2_msg_conn_reply, pmp);
+       hammer2_msg_write(msg);
 
        /*
         * Transmit loop
 
        /*
         * Transmit loop
@@ -1191,7 +1237,18 @@ hammer2_cluster_thread_wr(void *arg)
        lockmgr(&pmp->msglk, LK_EXCLUSIVE);
 
        while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0 && error == 0) {
        lockmgr(&pmp->msglk, LK_EXCLUSIVE);
 
        while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0 && error == 0) {
-               lksleep(&pmp->msg_ctl, &pmp->msglk, 0, "msgwr", hz);
+               /*
+                * Sleep if no messages pending.  Interlock with flag while
+                * holding msglk.
+                */
+               if (TAILQ_EMPTY(&pmp->msgq)) {
+                       atomic_set_int(&pmp->msg_ctl,
+                                      HAMMER2_CLUSTERCTL_SLEEPING);
+                       lksleep(&pmp->msg_ctl, &pmp->msglk, 0, "msgwr", hz);
+                       atomic_clear_int(&pmp->msg_ctl,
+                                        HAMMER2_CLUSTERCTL_SLEEPING);
+               }
+
                while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
                        /*
                         * Remove msg from the transmit queue and do
                while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
                        /*
                         * Remove msg from the transmit queue and do
@@ -1263,9 +1320,30 @@ hammer2_cluster_thread_wr(void *arg)
                hammer2_state_free(state);
        }
 
                hammer2_state_free(state);
        }
 
+       /*
+        * XXX simulate MSGF_DELETEs
+        */
        while ((state = RB_ROOT(&pmp->statewr_tree)) != NULL) {
        while ((state = RB_ROOT(&pmp->statewr_tree)) != NULL) {
-               RB_REMOVE(hammer2_state_tree, &pmp->statewr_tree, state);
-               hammer2_state_free(state);
+               kprintf("x");
+               if (state->func &&
+                   (state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
+                   (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) {
+                       lockmgr(&pmp->msglk, LK_RELEASE);
+                       msg = hammer2_msg_alloc(&pmp->router,
+                                               HAMMER2_LNK_ERROR,
+                                               NULL, NULL);
+                       if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
+                               msg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+                       msg->any.head.cmd |= HAMMER2_MSGF_DELETE;
+                       msg->state = state;
+                       msg->state->func(state, msg);
+                       hammer2_state_cleanuprx(msg);
+                       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+               } else {
+                       RB_REMOVE(hammer2_state_tree,
+                                 &pmp->statewr_tree, state);
+                       hammer2_state_free(state);
+               }
        }
        lockmgr(&pmp->msglk, LK_RELEASE);
 
        }
        lockmgr(&pmp->msglk, LK_RELEASE);
 
@@ -1281,6 +1359,20 @@ hammer2_cluster_thread_wr(void *arg)
        lwkt_exit();
 }
 
        lwkt_exit();
 }
 
+/*
+ * Called with msglk held after queueing a new message, wakes up the
+ * transmit thread.  We use an interlock thread to avoid unnecessary
+ * wakeups.
+ */
+void
+hammer2_clusterctl_wakeup(hammer2_pfsmount_t *pmp)
+{
+       if (pmp->msg_ctl & HAMMER2_CLUSTERCTL_SLEEPING) {
+               atomic_clear_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_SLEEPING);
+               wakeup(&pmp->msg_ctl);
+       }
+}
+
 static int
 hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg)
 {
 static int
 hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg)
 {
@@ -1315,12 +1407,15 @@ static int
 hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
 {
        hammer2_pfsmount_t *pmp = state->any.pmp;
 hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
 {
        hammer2_pfsmount_t *pmp = state->any.pmp;
+       hammer2_mount_t *hmp = pmp->hmp;
        size_t name_len;
        size_t name_len;
+       int copyid;
 
        if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
                kprintf("LNK_CONN transaction replied to, initiate SPAN\n");
                msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN |
 
        if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
                kprintf("LNK_CONN transaction replied to, initiate SPAN\n");
                msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN |
-                                                     HAMMER2_MSGF_CREATE);
+                                                     HAMMER2_MSGF_CREATE,
+                                       hammer2_msg_span_reply, pmp);
                msg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
                msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
                msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
                msg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
                msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
                msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
@@ -1332,10 +1427,22 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
                      msg->any.lnk_span.label,
                      name_len);
                msg->any.lnk_span.label[name_len] = 0;
                      msg->any.lnk_span.label,
                      name_len);
                msg->any.lnk_span.label[name_len] = 0;
-               hammer2_msg_write(msg, hammer2_msg_span_reply, pmp);
+               hammer2_msg_write(msg);
+
+               /*
+                * Dump the configuration stored in the volume header
+                */
+               hammer2_voldata_lock(hmp);
+               for (copyid = 0; copyid < HAMMER2_COPYID_COUNT; ++copyid) {
+                       if (hmp->voldata.copyinfo[copyid].copyid == 0)
+                               continue;
+                       hammer2_volconf_update(pmp, copyid);
+               }
+               hammer2_voldata_unlock(hmp);
        }
        if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
                kprintf("LNK_CONN transaction terminated by remote\n");
        }
        if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
                kprintf("LNK_CONN transaction terminated by remote\n");
+               pmp->conn_state = NULL;
                hammer2_msg_reply(msg, 0);
        }
        return(0);
                hammer2_msg_reply(msg, 0);
        }
        return(0);
@@ -1349,3 +1456,27 @@ hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg)
        kprintf("SPAN REPLY - Our span was terminated? %p\n", pmp);
        return(0);
 }
        kprintf("SPAN REPLY - Our span was terminated? %p\n", pmp);
        return(0);
 }
+
+/*
+ * Volume configuration updates are passed onto the userland service
+ * daemon via the open LNK_CONN transaction.
+ */
+void
+hammer2_volconf_update(hammer2_pfsmount_t *pmp, int index)
+{
+       hammer2_mount_t *hmp = pmp->hmp;
+       hammer2_msg_t *msg;
+
+       /* XXX interlock against connection state termination */
+       kprintf("volconf update %p\n", pmp->conn_state);
+       if (pmp->conn_state) {
+               kprintf("TRANSMIT VOLCONF VIA OPEN CONN TRANSACTION\n");
+               msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_VOLCONF,
+                                       NULL, NULL);
+               msg->state = pmp->conn_state;
+               msg->any.lnk_volconf.copy = hmp->voldata.copyinfo[index];
+               msg->any.lnk_volconf.mediaid = hmp->voldata.fsid;
+               msg->any.lnk_volconf.index = index;
+               hammer2_msg_write(msg);
+       }
+}