bzero(&remote, sizeof(remote));
while ((remote.copyid = remote.nextid) >= 0) {
- if (ioctl(fd, HAMMER2IOC_REMOTE_GET, &remote) < 0) {
+ if (ioctl(fd, HAMMER2IOC_REMOTE_SCAN, &remote) < 0) {
perror("ioctl");
ecode = 1;
break;
case HAMMER2_LNK_SPAN:
cmdstr = "SPAN";
break;
+ case HAMMER2_LNK_VOLCONF:
+ cmdstr = "VOLCONF";
+ break;
case HAMMER2_LNK_ERROR:
if (cmd & HAMMER2_MSGF_DELETE)
cmdstr = "RETURN";
struct h2span_link;
struct h2span_relay;
+TAILQ_HEAD(h2span_media_queue, h2span_media);
TAILQ_HEAD(h2span_connect_queue, h2span_connect);
TAILQ_HEAD(h2span_relay_queue, h2span_relay);
RB_HEAD(h2span_relay_tree, h2span_relay);
/*
+ * This represents a media
+ */
+struct h2span_media {
+ TAILQ_ENTRY(h2span_media) entry;
+ uuid_t mediaid;
+ int refs;
+ struct h2span_media_config {
+ hammer2_copy_data_t copy_run;
+ hammer2_copy_data_t copy_pend;
+ pthread_t thread;
+ pthread_cond_t cond;
+ int ctl;
+ int fd;
+ hammer2_iocom_t iocom;
+ pthread_t iocom_thread;
+ enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
+ } config[HAMMER2_COPYID_COUNT];
+};
+
+typedef struct h2span_media_config h2span_media_config_t;
+
+#define H2CONFCTL_STOP 0x00000001
+#define H2CONFCTL_UPDATE 0x00000002
+
+/*
* Received LNK_CONN transaction enables SPAN protocol over connection.
- * (may contain filter).
+ * (may contain filter). Typically one for each mount and several may
+ * share the same media.
*/
struct h2span_connect {
TAILQ_ENTRY(h2span_connect) entry;
struct h2span_relay_tree tree;
+ struct h2span_media *media;
hammer2_state_t *state;
};
};
+typedef struct h2span_media h2span_media_t;
typedef struct h2span_connect h2span_connect_t;
typedef struct h2span_cluster h2span_cluster_t;
typedef struct h2span_node h2span_node_t;
rbnode, h2span_relay_cmp);
/*
- * Global mutex protects cluster_tree lookups.
+ * Global mutex protects cluster_tree lookups, connq, mediaq.
*/
static pthread_mutex_t cluster_mtx;
static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq);
+static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq);
static void hammer2_lnk_span(hammer2_msg_t *msg);
static void hammer2_lnk_conn(hammer2_msg_t *msg);
static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node);
static void hammer2_relay_delete(h2span_relay_t *relay);
+static void *hammer2_volconf_thread(void *info);
+static void hammer2_volconf_stop(h2span_media_config_t *conf);
+static void hammer2_volconf_start(h2span_media_config_t *conf,
+ const char *hostname);
+
void
hammer2_msg_lnk_signal(hammer2_router_t *router __unused)
{
hammer2_lnk_conn(hammer2_msg_t *msg)
{
hammer2_state_t *state = msg->state;
+ h2span_media_t *media;
+ h2span_media_config_t *conf;
h2span_connect_t *conn;
h2span_relay_t *relay;
char *alloc = NULL;
+ int i;
pthread_mutex_lock(&cluster_mtx);
- /*
- * On transaction start we allocate a new h2span_connect and
- * acknowledge the request, leaving the transaction open.
- * We then relay priority-selected SPANs.
- */
- if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
- state->func = hammer2_lnk_conn;
-
+ switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
+ case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE:
+ case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
+ /*
+ * On transaction start we allocate a new h2span_connect and
+ * acknowledge the request, leaving the transaction open.
+ * We then relay priority-selected SPANs.
+ */
fprintf(stderr, "LNK_CONN(%08x): %s/%s\n",
(uint32_t)msg->any.head.msgid,
hammer2_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
RB_INIT(&conn->tree);
conn->state = state;
+ state->func = hammer2_lnk_conn;
state->any.conn = conn;
TAILQ_INSERT_TAIL(&connq, conn, entry);
- hammer2_msg_result(msg, 0);
-
-#if 0
/*
- * Span-synchronize all nodes with the new connection
+ * Set up media
*/
- hammer2_relay_scan(conn, NULL);
-#endif
- hammer2_router_signal(msg->router);
- }
+ TAILQ_FOREACH(media, &mediaq, entry) {
+ if (uuid_compare(&msg->any.lnk_conn.mediaid,
+ &media->mediaid, NULL) == 0) {
+ break;
+ }
+ }
+ if (media == NULL) {
+ media = hammer2_alloc(sizeof(*media));
+ media->mediaid = msg->any.lnk_conn.mediaid;
+ TAILQ_INSERT_TAIL(&mediaq, media, entry);
+ }
+ conn->media = media;
+ ++media->refs;
- /*
- * On transaction terminate we clean out our h2span_connect
- * and acknowledge the request, closing the transaction.
- */
- if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+ if ((msg->any.head.cmd & HAMMER2_MSGF_DELETE) == 0) {
+ hammer2_msg_result(msg, 0);
+ hammer2_router_signal(msg->router);
+ break;
+ }
+ /* FALL THROUGH */
+ case HAMMER2_LNK_CONN | HAMMER2_MSGF_DELETE:
+ case HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE:
+deleteconn:
+ /*
+ * On transaction terminate we clean out our h2span_connect
+ * and acknowledge the request, closing the transaction.
+ */
fprintf(stderr, "LNK_CONN: Terminated\n");
conn = state->any.conn;
assert(conn);
/*
+ * Clean out the media structure. If refs drops to zero we
+ * also clean out the media config threads. These threads
+ * maintain span connections to other hammer2 service daemons.
+ */
+ media = conn->media;
+ if (--media->refs == 0) {
+ fprintf(stderr, "Shutting down media spans\n");
+ for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
+ conf = &media->config[i];
+
+ if (conf->thread == NULL)
+ continue;
+ conf->ctl = H2CONFCTL_STOP;
+ pthread_cond_signal(&conf->cond);
+ }
+ for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
+ conf = &media->config[i];
+
+ if (conf->thread == NULL)
+ continue;
+ pthread_mutex_unlock(&cluster_mtx);
+ pthread_join(conf->thread, NULL);
+ pthread_mutex_lock(&cluster_mtx);
+ conf->thread = NULL;
+ pthread_cond_destroy(&conf->cond);
+ }
+ fprintf(stderr, "Media shutdown complete\n");
+ TAILQ_REMOVE(&mediaq, media, entry);
+ hammer2_free(media);
+ }
+
+ /*
* Clean out all relays. This requires terminating each
* relay transaction.
*/
/*
* Clean out conn
*/
+ conn->media = NULL;
conn->state = NULL;
msg->state->any.conn = NULL;
TAILQ_REMOVE(&connq, conn, entry);
hammer2_msg_reply(msg, 0);
/* state invalid after reply */
+ break;
+ case HAMMER2_LNK_VOLCONF:
+ /*
+ * One-way volume-configuration message is transmitted
+ * over the open LNK_CONN transaction.
+ */
+ fprintf(stderr, "RECEIVED VOLCONF\n");
+ if (msg->any.lnk_volconf.index < 0 ||
+ msg->any.lnk_volconf.index >= HAMMER2_COPYID_COUNT) {
+ fprintf(stderr, "VOLCONF: ILLEGAL INDEX %d\n",
+ msg->any.lnk_volconf.index);
+ break;
+ }
+ if (msg->any.lnk_volconf.copy.path[sizeof(msg->any.lnk_volconf.copy.path) - 1] != 0 ||
+ msg->any.lnk_volconf.copy.path[0] == 0) {
+ fprintf(stderr, "VOLCONF: ILLEGAL PATH %d\n",
+ msg->any.lnk_volconf.index);
+ break;
+ }
+ conn = msg->state->any.conn;
+ if (conn == NULL) {
+ fprintf(stderr, "VOLCONF: LNK_CONN is missing\n");
+ break;
+ }
+ conf = &conn->media->config[msg->any.lnk_volconf.index];
+ conf->copy_pend = msg->any.lnk_volconf.copy;
+ conf->ctl |= H2CONFCTL_UPDATE;
+ if (conf->thread == NULL) {
+ fprintf(stderr, "VOLCONF THREAD STARTED\n");
+ pthread_cond_init(&conf->cond, NULL);
+ pthread_create(&conf->thread, NULL,
+ hammer2_volconf_thread, (void *)conf);
+ }
+ pthread_cond_signal(&conf->cond);
+ break;
+ default:
+ /*
+ * Failsafe
+ */
+ if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
+ goto deleteconn;
+ hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
+ break;
}
pthread_mutex_unlock(&cluster_mtx);
}
hammer2_free(relay);
}
+static void *
+hammer2_volconf_thread(void *info)
+{
+ h2span_media_config_t *conf = info;
+
+ pthread_mutex_lock(&cluster_mtx);
+ while ((conf->ctl & H2CONFCTL_STOP) == 0) {
+ if (conf->ctl & H2CONFCTL_UPDATE) {
+ fprintf(stderr, "VOLCONF UPDATE\n");
+ conf->ctl &= ~H2CONFCTL_UPDATE;
+ if (bcmp(&conf->copy_run, &conf->copy_pend,
+ sizeof(conf->copy_run)) == 0) {
+ fprintf(stderr, "VOLCONF: no changes\n");
+ continue;
+ }
+ /*
+ * XXX TODO - auto reconnect on lookup failure or
+ * connect failure or stream failure.
+ */
+
+ pthread_mutex_unlock(&cluster_mtx);
+ hammer2_volconf_stop(conf);
+ conf->copy_run = conf->copy_pend;
+ if (conf->copy_run.copyid != 0 &&
+ strncmp(conf->copy_run.path, "span:", 5) == 0) {
+ hammer2_volconf_start(conf,
+ conf->copy_run.path + 5);
+ }
+ pthread_mutex_lock(&cluster_mtx);
+ fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
+ }
+ if (conf->state == H2MC_CONNECT) {
+ hammer2_volconf_start(conf, conf->copy_run.path + 5);
+ pthread_mutex_unlock(&cluster_mtx);
+ sleep(5);
+ pthread_mutex_lock(&cluster_mtx);
+ } else {
+ pthread_cond_wait(&conf->cond, &cluster_mtx);
+ }
+ }
+ pthread_mutex_unlock(&cluster_mtx);
+ hammer2_volconf_stop(conf);
+ return(NULL);
+}
+
+static
+void
+hammer2_volconf_stop(h2span_media_config_t *conf)
+{
+ switch(conf->state) {
+ case H2MC_STOPPED:
+ break;
+ case H2MC_CONNECT:
+ conf->state = H2MC_STOPPED;
+ break;
+ case H2MC_RUNNING:
+ shutdown(conf->fd, SHUT_WR);
+ pthread_join(conf->iocom_thread, NULL);
+ conf->iocom_thread = NULL;
+ break;
+ }
+}
+
+static
+void
+hammer2_volconf_start(h2span_media_config_t *conf, const char *hostname)
+{
+ switch(conf->state) {
+ case H2MC_STOPPED:
+ case H2MC_CONNECT:
+ conf->fd = hammer2_connect(hostname);
+ if (conf->fd < 0) {
+ fprintf(stderr, "Unable to connect to %s\n", hostname);
+ conf->state = H2MC_CONNECT;
+ } else {
+ pthread_create(&conf->iocom_thread, NULL,
+ master_service,
+ (void *)(intptr_t)conf->fd);
+ conf->state = H2MC_RUNNING;
+ }
+ break;
+ case H2MC_RUNNING:
+ break;
+ }
+}
+
/************************************************************************
* ROUTER AND MESSAGING HANDLES *
************************************************************************
struct hammer2_state_tree statewr_tree; /* active messages */
hammer2_msg_queue_t txmsgq; /* tx msgq from remote */
uint64_t target; /* for routing */
- int flags;
- int refs; /* refs prevent destruction */
+ int flags;
+ int refs; /* refs prevent destruction */
};
#define HAMMER2_ROUTER_CONNECTED 0x0001 /* on global RB tree */
#define HAMMER2_IOCOMF_SWORK 0x00000100 /* immediate work pending */
#define HAMMER2_IOCOMF_CRYPTED 0x00000200 /* encrypt enabled */
-
-
-
-
-
-
/*
* Crypto algorithm table and related typedefs.
*/
uint32_t msgid_iterator;
struct lock msglk; /* lockmgr lock */
TAILQ_HEAD(, hammer2_msg) msgq; /* transmit queue */
+ struct hammer2_state *conn_state; /* active LNK_CONN state */
struct hammer2_state *freerd_state; /* allocation cache */
struct hammer2_state *freewr_state; /* allocation cache */
struct hammer2_state_tree staterd_tree; /* active messages */
typedef struct hammer2_pfsmount hammer2_pfsmount_t;
-#define HAMMER2_CLUSTERCTL_KILL 0x0001
+/*
+ * msg_ctl flags (atomic)
+ */
+#define HAMMER2_CLUSTERCTL_KILL 0x00000001
+#define HAMMER2_CLUSTERCTL_SLEEPING 0x00000002 /* interlocked w/msglk */
/*
* Transactional state structure, representing an open transaction. The
int hammer2_msg_execute(hammer2_msg_t *msg);
void hammer2_state_free(hammer2_state_t *state);
void hammer2_msg_free(hammer2_msg_t *msg);
-hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd);
-void hammer2_msg_write(hammer2_msg_t *msg,
+hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
int (*func)(hammer2_state_t *, hammer2_msg_t *),
void *data);
+void hammer2_msg_write(hammer2_msg_t *msg);
void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error);
void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error);
int hammer2_msg_adhoc_input(hammer2_msg_t *msg);
/*
+ * hammer2_vfsops.c
+ */
+void hammer2_clusterctl_wakeup(hammer2_pfsmount_t *pmp);
+void hammer2_volconf_update(hammer2_pfsmount_t *pmp, int index);
+void hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp);
+
+/*
* hammer2_freemap.c
*/
hammer2_off_t hammer2_freemap_alloc(hammer2_mount_t *hmp,
#include "hammer2.h"
static int hammer2_ioctl_version_get(hammer2_inode_t *ip, void *data);
-static int hammer2_ioctl_remote_get(hammer2_inode_t *ip, void *data);
+static int hammer2_ioctl_recluster(hammer2_inode_t *ip, void *data);
+static int hammer2_ioctl_remote_scan(hammer2_inode_t *ip, void *data);
static int hammer2_ioctl_remote_add(hammer2_inode_t *ip, void *data);
static int hammer2_ioctl_remote_del(hammer2_inode_t *ip, void *data);
static int hammer2_ioctl_remote_rep(hammer2_inode_t *ip, void *data);
case HAMMER2IOC_VERSION_GET:
error = hammer2_ioctl_version_get(ip, data);
break;
- case HAMMER2IOC_REMOTE_GET:
+ case HAMMER2IOC_RECLUSTER:
if (error == 0)
- error = hammer2_ioctl_remote_get(ip, data);
+ error = hammer2_ioctl_recluster(ip, data);
+ break;
+ case HAMMER2IOC_REMOTE_SCAN:
+ if (error == 0)
+ error = hammer2_ioctl_remote_scan(ip, data);
break;
case HAMMER2IOC_REMOTE_ADD:
if (error == 0)
return 0;
}
+static int
+hammer2_ioctl_recluster(hammer2_inode_t *ip, void *data)
+{
+ hammer2_ioc_recluster_t *recl = data;
+ struct file *fp;
+
+ fp = holdfp(curproc->p_fd, recl->fd, -1);
+ if (fp) {
+ kprintf("reconnect to cluster\n");
+ hammer2_cluster_reconnect(ip->pmp, fp);
+ return 0;
+ } else {
+ return EINVAL;
+ }
+}
+
/*
* Retrieve information about a remote
*/
static int
-hammer2_ioctl_remote_get(hammer2_inode_t *ip, void *data)
+hammer2_ioctl_remote_scan(hammer2_inode_t *ip, void *data)
{
hammer2_mount_t *hmp = ip->hmp;
hammer2_ioc_remote_t *remote = data;
*/
while (++copyid < HAMMER2_COPYID_COUNT &&
hmp->voldata.copyinfo[copyid].copyid == 0) {
- ++copyid;
+ ;
}
if (copyid == HAMMER2_COPYID_COUNT)
remote->nextid = -1;
hammer2_ioctl_remote_add(hammer2_inode_t *ip, void *data)
{
hammer2_mount_t *hmp = ip->hmp;
+ hammer2_pfsmount_t *pmp = ip->pmp;
hammer2_ioc_remote_t *remote = data;
int copyid = remote->copyid;
int error = 0;
}
}
hammer2_modify_volume(hmp);
- kprintf("copyid %d\n", copyid);
remote->copy1.copyid = copyid;
hmp->voldata.copyinfo[copyid] = remote->copy1;
+ hammer2_volconf_update(pmp, copyid);
failed:
hammer2_voldata_unlock(hmp);
return (error);
hammer2_ioctl_remote_del(hammer2_inode_t *ip, void *data)
{
hammer2_mount_t *hmp = ip->hmp;
+ hammer2_pfsmount_t *pmp = ip->pmp;
hammer2_ioc_remote_t *remote = data;
int copyid = remote->copyid;
int error = 0;
}
hammer2_modify_volume(hmp);
hmp->voldata.copyinfo[copyid].copyid = 0;
+ hammer2_volconf_update(pmp, copyid);
failed:
hammer2_voldata_unlock(hmp);
return (error);
return (EINVAL);
hammer2_voldata_lock(hmp);
+ /*hammer2_volconf_update(pmp, copyid);*/
hammer2_voldata_unlock(hmp);
return(0);
typedef struct hammer2_ioc_version hammer2_ioc_version_t;
+struct hammer2_ioc_recluster {
+ int fd;
+ char reserved[256 - 4];
+};
+
+typedef struct hammer2_ioc_recluster hammer2_ioc_recluster_t;
+
/*
* Ioctls to manage the volume->copyinfo[] array and to associate or
* disassociate sockets
*/
#define HAMMER2IOC_VERSION_GET _IOWR('h', 64, struct hammer2_ioc_version)
+#define HAMMER2IOC_RECLUSTER _IOWR('h', 65, struct hammer2_ioc_recluster)
-#define HAMMER2IOC_REMOTE_GET _IOWR('h', 68, struct hammer2_ioc_remote)
+#define HAMMER2IOC_REMOTE_SCAN _IOWR('h', 68, struct hammer2_ioc_remote)
#define HAMMER2IOC_REMOTE_ADD _IOWR('h', 69, struct hammer2_ioc_remote)
#define HAMMER2IOC_REMOTE_DEL _IOWR('h', 70, struct hammer2_ioc_remote)
#define HAMMER2IOC_REMOTE_REP _IOWR('h', 71, struct hammer2_ioc_remote)
}
hammer2_msg_t *
-hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd)
+hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
+ int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
{
+ hammer2_pfsmount_t *pmp = router->pmp;
hammer2_msg_t *msg;
+ hammer2_state_t *state;
size_t hbytes;
hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
- router->pmp->mmsg, M_WAITOK | M_ZERO);
+ pmp->mmsg, M_WAITOK | M_ZERO);
msg->hdr_size = hbytes;
msg->router = router;
KKASSERT(router != NULL);
msg->any.head.target = router->target;
msg->any.head.cmd = cmd;
+ if (cmd & HAMMER2_MSGF_CREATE) {
+ /*
+ * New transaction, requires tracking state and a unique
+ * msgid to be allocated.
+ */
+ KKASSERT(msg->state == NULL);
+ state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
+ state->pmp = pmp;
+ state->flags = HAMMER2_STATE_DYNAMIC;
+ state->func = func;
+ state->any.any = data;
+ state->msg = msg;
+ state->msgid = (uint64_t)(uintptr_t)state;
+ state->router = msg->router;
+ msg->state = state;
+ msg->any.head.source = 0;
+ msg->any.head.target = state->router->target;
+ msg->any.head.msgid = state->msgid;
+
+ lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+ if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
+ panic("duplicate msgid allocated");
+ msg->any.head.msgid = state->msgid;
+ lockmgr(&pmp->msglk, LK_RELEASE);
+ }
+
return (msg);
}
* does not write to the message socket/pipe.
*/
void
-hammer2_msg_write(hammer2_msg_t *msg,
- int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
+hammer2_msg_write(hammer2_msg_t *msg)
{
hammer2_pfsmount_t *pmp = msg->router->pmp;
hammer2_state_t *state;
msg->any.head.source = 0;
msg->any.head.target = state->router->target;
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
- if (func) {
- state->func = func;
- state->any.any = data;
- }
- } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
- /*
- * New transaction, requires tracking state and a unique
- * msgid to be allocated.
- */
- KKASSERT(msg->state == NULL);
- state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
- state->pmp = pmp;
- state->flags = HAMMER2_STATE_DYNAMIC;
- state->func = func;
- state->any.any = data;
- state->msg = msg;
- state->msgid = (uint64_t)(uintptr_t)state;
- state->router = msg->router;
- msg->state = state;
- msg->any.head.source = 0;
- msg->any.head.target = state->router->target;
- msg->any.head.msgid = state->msgid;
-
- lockmgr(&pmp->msglk, LK_EXCLUSIVE);
- if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
- panic("duplicate msgid allocated");
- msg->any.head.msgid = state->msgid;
} else {
/*
* One-off message (always uses msgid 0 to distinguish
msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
+ hammer2_clusterctl_wakeup(pmp);
lockmgr(&pmp->msglk, LK_RELEASE);
}
cmd |= HAMMER2_MSGF_REPLY;
}
- nmsg = hammer2_msg_alloc(msg->router, cmd);
+ /* XXX messy mask cmd to avoid allocating state */
+ nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK,
+ NULL, NULL);
+ nmsg->any.head.cmd = cmd;
nmsg->any.head.error = error;
nmsg->state = state;
- hammer2_msg_write(nmsg, NULL, NULL);
+ hammer2_msg_write(nmsg);
}
/*
cmd |= HAMMER2_MSGF_REPLY;
}
- nmsg = hammer2_msg_alloc(msg->router, cmd);
+ /* XXX messy mask cmd to avoid allocating state */
+ nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK,
+ NULL, NULL);
+ nmsg->any.head.cmd = cmd;
nmsg->any.head.error = error;
nmsg->state = state;
- hammer2_msg_write(nmsg, NULL, NULL);
+ hammer2_msg_write(nmsg);
}
* Link layer ops basically talk to just the other side of a direct
* connection.
*
- * PAD - One-way message on link-0, ignored by target. Used to
+ * LNK_PAD - One-way message on link-0, ignored by target. Used to
* pad message buffers on shared-memory transports. Not
* typically used with TCP.
*
- * PING - One-way message on link-0, keep-alive, run by both sides
+ * LNK_PING - One-way message on link-0, keep-alive, run by both sides
* typically 1/sec on idle link, link is lost after 10 seconds
* of inactivity.
*
- * AUTH - Authenticate the connection, negotiate administrative
+ * LNK_AUTH - Authenticate the connection, negotiate administrative
* rights & encryption, protocol class, etc. Only PAD and
* AUTH messages (not even PING) are accepted until
* authentication is complete. This message also identifies
* the host.
*
- * CONN - Enable the SPAN protocol on link-0, possibly also installing
+ * LNK_CONN - Enable the SPAN protocol on link-0, possibly also installing
* a PFS filter (by cluster id, unique id, and/or wildcarded
* name).
*
- * SPAN - A SPAN transaction on link-0 enables messages to be relayed
+ * LNK_SPAN - A SPAN transaction on link-0 enables messages to be relayed
* to/from a particular cluster node. SPANs are received,
* sorted, aggregated, and retransmitted back out across all
* applicable connections.
*
* The leaf protocol also uses this to make a PFS available
* to the cluster (e.g. on-mount).
+ *
+ * LNK_VOLCONF - Volume header configuration change. All hammer2
+ * connections (hammer2 connect ...) stored in the volume
+ * header are spammed at the link level to the hammer2
+ * service daemon, and any live configuration change
+ * thereafter.
*/
#define HAMMER2_LNK_PAD HAMMER2_MSG_LNK(0x000, hammer2_msg_hdr)
#define HAMMER2_LNK_PING HAMMER2_MSG_LNK(0x001, hammer2_msg_hdr)
#define HAMMER2_LNK_AUTH HAMMER2_MSG_LNK(0x010, hammer2_lnk_auth)
#define HAMMER2_LNK_CONN HAMMER2_MSG_LNK(0x011, hammer2_lnk_conn)
#define HAMMER2_LNK_SPAN HAMMER2_MSG_LNK(0x012, hammer2_lnk_span)
+#define HAMMER2_LNK_VOLCONF HAMMER2_MSG_LNK(0x020, hammer2_lnk_volconf)
#define HAMMER2_LNK_ERROR HAMMER2_MSG_LNK(0xFFF, hammer2_msg_hdr)
/*
struct hammer2_lnk_conn {
hammer2_msg_hdr_t head;
+ uuid_t mediaid; /* media configuration id */
uuid_t pfs_clid; /* rendezvous pfs uuid */
uuid_t pfs_fsid; /* unique pfs uuid */
uint8_t pfs_type; /* peer type */
#define HAMMER2_SPAN_PROTO_1 1
/*
+ * LNK_VOLCONF
+ */
+struct hammer2_lnk_volconf {
+ hammer2_msg_hdr_t head;
+ hammer2_copy_data_t copy; /* copy spec */
+ int32_t index;
+ int32_t unused01;
+ uuid_t mediaid;
+ int64_t reserved02[32];
+};
+
+typedef struct hammer2_lnk_volconf hammer2_lnk_volconf_t;
+
+/*
* Debug layer ops operate on any link
*
* SHELL - Persist stream, access the debug shell on the target
hammer2_msg_hdr_t head;
hammer2_lnk_span_t lnk_span;
hammer2_lnk_conn_t lnk_conn;
+ hammer2_lnk_volconf_t lnk_volconf;
};
typedef union hammer2_msg_any hammer2_msg_any_t;
}
/*
+ * Reconnect using the passed file pointer. The caller must ref the
+ * fp for us.
+ */
+void
+hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp)
+{
+ atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
+ while (pmp->msgrd_td || pmp->msgwr_td) {
+ wakeup(&pmp->msg_ctl);
+ tsleep(pmp, 0, "clstrkl", hz);
+ }
+ atomic_clear_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
+ pmp->msg_fp = fp;
+ lwkt_create(hammer2_cluster_thread_rd, pmp, &pmp->msgrd_td,
+ NULL, 0, -1, "hammer2-msgrd");
+ lwkt_create(hammer2_cluster_thread_wr, pmp, &pmp->msgwr_td,
+ NULL, 0, -1, "hammer2-msgwr");
+}
+
+/*
* Cluster controller thread. Perform messaging functions. We have one
* thread for the reader and one for the writer. The writer handles
* shutdown requests (which should break the reader thread).
error = EINVAL;
break;
}
- msg = hammer2_msg_alloc(&pmp->router, hdr.cmd);
+ /* XXX messy: mask cmd to avoid allocating state */
+ msg = hammer2_msg_alloc(&pmp->router,
+ hdr.cmd & HAMMER2_MSGF_BASECMDMASK,
+ NULL, NULL);
msg->any.head = hdr;
msg->hdr_size = hbytes;
if (hbytes > sizeof(hdr)) {
hammer2_state_free(state);
}
+ /*
+ * XXX simulate MSGF_DELETEs
+ */
while ((state = RB_ROOT(&pmp->staterd_tree)) != NULL) {
- RB_REMOVE(hammer2_state_tree, &pmp->staterd_tree, state);
- hammer2_state_free(state);
+ kprintf("y");
+ if (state->func &&
+ (state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
+ (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) {
+ lockmgr(&pmp->msglk, LK_RELEASE);
+ msg = hammer2_msg_alloc(&pmp->router,
+ HAMMER2_LNK_ERROR,
+ NULL, NULL);
+ if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
+ msg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+ msg->any.head.cmd |= HAMMER2_MSGF_DELETE;
+ msg->state = state;
+ msg->state->func(state, msg);
+ hammer2_state_cleanuprx(msg);
+ lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+ } else {
+ RB_REMOVE(hammer2_state_tree,
+ &pmp->staterd_tree, state);
+ hammer2_state_free(state);
+ }
}
lockmgr(&pmp->msglk, LK_RELEASE);
* connection.
*/
msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_CONN |
- HAMMER2_MSGF_CREATE);
+ HAMMER2_MSGF_CREATE,
+ hammer2_msg_conn_reply, pmp);
msg->any.lnk_conn.pfs_clid = pmp->iroot->ip_data.pfs_clid;
msg->any.lnk_conn.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
msg->any.lnk_conn.pfs_type = pmp->iroot->ip_data.pfs_type;
if (name_len >= sizeof(msg->any.lnk_conn.label))
name_len = sizeof(msg->any.lnk_conn.label) - 1;
bcopy(pmp->iroot->ip_data.filename, msg->any.lnk_conn.label, name_len);
+ pmp->conn_state = msg->state;
msg->any.lnk_conn.label[name_len] = 0;
- hammer2_msg_write(msg, hammer2_msg_conn_reply, pmp);
+ hammer2_msg_write(msg);
/*
* Transmit loop
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0 && error == 0) {
- lksleep(&pmp->msg_ctl, &pmp->msglk, 0, "msgwr", hz);
+ /*
+ * Sleep if no messages pending. Interlock with flag while
+ * holding msglk.
+ */
+ if (TAILQ_EMPTY(&pmp->msgq)) {
+ atomic_set_int(&pmp->msg_ctl,
+ HAMMER2_CLUSTERCTL_SLEEPING);
+ lksleep(&pmp->msg_ctl, &pmp->msglk, 0, "msgwr", hz);
+ atomic_clear_int(&pmp->msg_ctl,
+ HAMMER2_CLUSTERCTL_SLEEPING);
+ }
+
while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
/*
* Remove msg from the transmit queue and do
hammer2_state_free(state);
}
+ /*
+ * XXX simulate MSGF_DELETEs
+ */
while ((state = RB_ROOT(&pmp->statewr_tree)) != NULL) {
- RB_REMOVE(hammer2_state_tree, &pmp->statewr_tree, state);
- hammer2_state_free(state);
+ kprintf("x");
+ if (state->func &&
+ (state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
+ (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) {
+ lockmgr(&pmp->msglk, LK_RELEASE);
+ msg = hammer2_msg_alloc(&pmp->router,
+ HAMMER2_LNK_ERROR,
+ NULL, NULL);
+ if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
+ msg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+ msg->any.head.cmd |= HAMMER2_MSGF_DELETE;
+ msg->state = state;
+ msg->state->func(state, msg);
+ hammer2_state_cleanuprx(msg);
+ lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+ } else {
+ RB_REMOVE(hammer2_state_tree,
+ &pmp->statewr_tree, state);
+ hammer2_state_free(state);
+ }
}
lockmgr(&pmp->msglk, LK_RELEASE);
lwkt_exit();
}
+/*
+ * Called with msglk held after queueing a new message, wakes up the
+ * transmit thread. We use an interlock thread to avoid unnecessary
+ * wakeups.
+ */
+void
+hammer2_clusterctl_wakeup(hammer2_pfsmount_t *pmp)
+{
+ if (pmp->msg_ctl & HAMMER2_CLUSTERCTL_SLEEPING) {
+ atomic_clear_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_SLEEPING);
+ wakeup(&pmp->msg_ctl);
+ }
+}
+
static int
hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg)
{
hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
{
hammer2_pfsmount_t *pmp = state->any.pmp;
+ hammer2_mount_t *hmp = pmp->hmp;
size_t name_len;
+ int copyid;
if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
kprintf("LNK_CONN transaction replied to, initiate SPAN\n");
msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN |
- HAMMER2_MSGF_CREATE);
+ HAMMER2_MSGF_CREATE,
+ hammer2_msg_span_reply, pmp);
msg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
msg->any.lnk_span.label,
name_len);
msg->any.lnk_span.label[name_len] = 0;
- hammer2_msg_write(msg, hammer2_msg_span_reply, pmp);
+ hammer2_msg_write(msg);
+
+ /*
+ * Dump the configuration stored in the volume header
+ */
+ hammer2_voldata_lock(hmp);
+ for (copyid = 0; copyid < HAMMER2_COPYID_COUNT; ++copyid) {
+ if (hmp->voldata.copyinfo[copyid].copyid == 0)
+ continue;
+ hammer2_volconf_update(pmp, copyid);
+ }
+ hammer2_voldata_unlock(hmp);
}
if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
kprintf("LNK_CONN transaction terminated by remote\n");
+ pmp->conn_state = NULL;
hammer2_msg_reply(msg, 0);
}
return(0);
kprintf("SPAN REPLY - Our span was terminated? %p\n", pmp);
return(0);
}
+
+/*
+ * Volume configuration updates are passed onto the userland service
+ * daemon via the open LNK_CONN transaction.
+ */
+void
+hammer2_volconf_update(hammer2_pfsmount_t *pmp, int index)
+{
+ hammer2_mount_t *hmp = pmp->hmp;
+ hammer2_msg_t *msg;
+
+ /* XXX interlock against connection state termination */
+ kprintf("volconf update %p\n", pmp->conn_state);
+ if (pmp->conn_state) {
+ kprintf("TRANSMIT VOLCONF VIA OPEN CONN TRANSACTION\n");
+ msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_VOLCONF,
+ NULL, NULL);
+ msg->state = pmp->conn_state;
+ msg->any.lnk_volconf.copy = hmp->voldata.copyinfo[index];
+ msg->any.lnk_volconf.mediaid = hmp->voldata.fsid;
+ msg->any.lnk_volconf.index = index;
+ hammer2_msg_write(msg);
+ }
+}