From 1a34728c11e151b7dd897784899b5a763b3b8fe8 Mon Sep 17 00:00:00 2001 From: Matthew Dillon Date: Fri, 10 Aug 2012 20:46:51 -0700 Subject: [PATCH] hammer2 - Config notifications, cleanup HAMMER2 VFS API * A hammer2 volume has a PERSISTENT table of 256 entries in the media volume header for specifying how the cluster connects together. Various hammer2 directives can list, add, or remove entries from this table. This table will be used for several different aspects of the filesystem and one of them is to tell the userland hammer2 service daemon what other machines to connect to. That is, we want the cluster configuration to be persistently stored as part of a HAMMER2 filesystem. * Add a notification message from the kernel to the daemon whenever this table id modified. The kernel will also spam the contents of the table to the daemon when it first connects to the daemon. The service daemon tracks the table and will connect to (or disconnect from) the listed targets in real time. In addition, the service daemon will retry a failed connection (or failed DNS lookup) forever as long as the entry is intact. The idea being that a machine in the cluster will recover once transitory failures are resolved. This is a bit messy at the moment as two pthreads have to be created for each connection... one to handle connect, disconnect, and retry operations and the other to handle the actual message stream over the connection. * Clean up the HAMMER2 VFS code's messaging APIs a bit, bringing them closer to the hammer2 userland APIs (though of course there will always be major differences). * Adjust the hammer2 VFS to try to clean up open transactional states when a socket failure occurs before proceeding with a umount, so the related functional states can be triggered and cleaned up. * Added an ioctl to reconnect a hammer2 mount to the userland hammer2 service daemon (not yet used). This will allow us to kill and restart the daemon and have it recover the communications pipes between itself and the HAMMER2 mounts in the kernel. --- sbin/hammer2/cmd_remote.c | 2 +- sbin/hammer2/msg.c | 3 + sbin/hammer2/msg_lnk.c | 261 +++++++++++++++++++++++++++--- sbin/hammer2/network.h | 10 +- sys/vfs/hammer2/hammer2.h | 18 ++- sys/vfs/hammer2/hammer2_ioctl.c | 37 ++++- sys/vfs/hammer2/hammer2_ioctl.h | 10 +- sys/vfs/hammer2/hammer2_msg.c | 78 +++++---- sys/vfs/hammer2/hammer2_network.h | 33 +++- sys/vfs/hammer2/hammer2_vfsops.c | 151 +++++++++++++++-- 10 files changed, 511 insertions(+), 92 deletions(-) diff --git a/sbin/hammer2/cmd_remote.c b/sbin/hammer2/cmd_remote.c index 8300e09df8..97116b1805 100644 --- a/sbin/hammer2/cmd_remote.c +++ b/sbin/hammer2/cmd_remote.c @@ -100,7 +100,7 @@ cmd_remote_status(const char *sel_path, int all_opt __unused) bzero(&remote, sizeof(remote)); while ((remote.copyid = remote.nextid) >= 0) { - if (ioctl(fd, HAMMER2IOC_REMOTE_GET, &remote) < 0) { + if (ioctl(fd, HAMMER2IOC_REMOTE_SCAN, &remote) < 0) { perror("ioctl"); ecode = 1; break; diff --git a/sbin/hammer2/msg.c b/sbin/hammer2/msg.c index 3531379852..ff38357b49 100644 --- a/sbin/hammer2/msg.c +++ b/sbin/hammer2/msg.c @@ -2062,6 +2062,9 @@ hammer2_basecmd_str(uint32_t cmd) case HAMMER2_LNK_SPAN: cmdstr = "SPAN"; break; + case HAMMER2_LNK_VOLCONF: + cmdstr = "VOLCONF"; + break; case HAMMER2_LNK_ERROR: if (cmd & HAMMER2_MSGF_DELETE) cmdstr = "RETURN"; diff --git a/sbin/hammer2/msg_lnk.c b/sbin/hammer2/msg_lnk.c index 1c0ab6913c..91b7dfe419 100644 --- a/sbin/hammer2/msg_lnk.c +++ b/sbin/hammer2/msg_lnk.c @@ -185,6 +185,7 @@ struct h2span_link; struct h2span_relay; +TAILQ_HEAD(h2span_media_queue, h2span_media); TAILQ_HEAD(h2span_connect_queue, h2span_connect); TAILQ_HEAD(h2span_relay_queue, h2span_relay); @@ -193,13 +194,40 @@ RB_HEAD(h2span_node_tree, h2span_node); RB_HEAD(h2span_link_tree, h2span_link); RB_HEAD(h2span_relay_tree, h2span_relay); +/* + * This represents a media + */ +struct h2span_media { + TAILQ_ENTRY(h2span_media) entry; + uuid_t mediaid; + int refs; + struct h2span_media_config { + hammer2_copy_data_t copy_run; + hammer2_copy_data_t copy_pend; + pthread_t thread; + pthread_cond_t cond; + int ctl; + int fd; + hammer2_iocom_t iocom; + pthread_t iocom_thread; + enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state; + } config[HAMMER2_COPYID_COUNT]; +}; + +typedef struct h2span_media_config h2span_media_config_t; + +#define H2CONFCTL_STOP 0x00000001 +#define H2CONFCTL_UPDATE 0x00000002 + /* * Received LNK_CONN transaction enables SPAN protocol over connection. - * (may contain filter). + * (may contain filter). Typically one for each mount and several may + * share the same media. */ struct h2span_connect { TAILQ_ENTRY(h2span_connect) entry; struct h2span_relay_tree tree; + struct h2span_media *media; hammer2_state_t *state; }; @@ -260,6 +288,7 @@ struct h2span_relay { }; +typedef struct h2span_media h2span_media_t; typedef struct h2span_connect h2span_connect_t; typedef struct h2span_cluster h2span_cluster_t; typedef struct h2span_node h2span_node_t; @@ -363,11 +392,12 @@ RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay, rbnode, h2span_relay_cmp); /* - * Global mutex protects cluster_tree lookups. + * Global mutex protects cluster_tree lookups, connq, mediaq. */ static pthread_mutex_t cluster_mtx; static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree); static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq); +static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq); static void hammer2_lnk_span(hammer2_msg_t *msg); static void hammer2_lnk_conn(hammer2_msg_t *msg); @@ -375,6 +405,11 @@ static void hammer2_lnk_relay(hammer2_msg_t *msg); static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node); static void hammer2_relay_delete(h2span_relay_t *relay); +static void *hammer2_volconf_thread(void *info); +static void hammer2_volconf_stop(h2span_media_config_t *conf); +static void hammer2_volconf_start(h2span_media_config_t *conf, + const char *hostname); + void hammer2_msg_lnk_signal(hammer2_router_t *router __unused) { @@ -411,20 +446,23 @@ void hammer2_lnk_conn(hammer2_msg_t *msg) { hammer2_state_t *state = msg->state; + h2span_media_t *media; + h2span_media_config_t *conf; h2span_connect_t *conn; h2span_relay_t *relay; char *alloc = NULL; + int i; pthread_mutex_lock(&cluster_mtx); - /* - * On transaction start we allocate a new h2span_connect and - * acknowledge the request, leaving the transaction open. - * We then relay priority-selected SPANs. - */ - if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) { - state->func = hammer2_lnk_conn; - + switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) { + case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE: + case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * On transaction start we allocate a new h2span_connect and + * acknowledge the request, leaving the transaction open. + * We then relay priority-selected SPANs. + */ fprintf(stderr, "LNK_CONN(%08x): %s/%s\n", (uint32_t)msg->any.head.msgid, hammer2_uuid_to_str(&msg->any.lnk_conn.pfs_clid, @@ -436,29 +474,76 @@ hammer2_lnk_conn(hammer2_msg_t *msg) RB_INIT(&conn->tree); conn->state = state; + state->func = hammer2_lnk_conn; state->any.conn = conn; TAILQ_INSERT_TAIL(&connq, conn, entry); - hammer2_msg_result(msg, 0); - -#if 0 /* - * Span-synchronize all nodes with the new connection + * Set up media */ - hammer2_relay_scan(conn, NULL); -#endif - hammer2_router_signal(msg->router); - } + TAILQ_FOREACH(media, &mediaq, entry) { + if (uuid_compare(&msg->any.lnk_conn.mediaid, + &media->mediaid, NULL) == 0) { + break; + } + } + if (media == NULL) { + media = hammer2_alloc(sizeof(*media)); + media->mediaid = msg->any.lnk_conn.mediaid; + TAILQ_INSERT_TAIL(&mediaq, media, entry); + } + conn->media = media; + ++media->refs; - /* - * On transaction terminate we clean out our h2span_connect - * and acknowledge the request, closing the transaction. - */ - if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { + if ((msg->any.head.cmd & HAMMER2_MSGF_DELETE) == 0) { + hammer2_msg_result(msg, 0); + hammer2_router_signal(msg->router); + break; + } + /* FALL THROUGH */ + case HAMMER2_LNK_CONN | HAMMER2_MSGF_DELETE: + case HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE: +deleteconn: + /* + * On transaction terminate we clean out our h2span_connect + * and acknowledge the request, closing the transaction. + */ fprintf(stderr, "LNK_CONN: Terminated\n"); conn = state->any.conn; assert(conn); + /* + * Clean out the media structure. If refs drops to zero we + * also clean out the media config threads. These threads + * maintain span connections to other hammer2 service daemons. + */ + media = conn->media; + if (--media->refs == 0) { + fprintf(stderr, "Shutting down media spans\n"); + for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) { + conf = &media->config[i]; + + if (conf->thread == NULL) + continue; + conf->ctl = H2CONFCTL_STOP; + pthread_cond_signal(&conf->cond); + } + for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) { + conf = &media->config[i]; + + if (conf->thread == NULL) + continue; + pthread_mutex_unlock(&cluster_mtx); + pthread_join(conf->thread, NULL); + pthread_mutex_lock(&cluster_mtx); + conf->thread = NULL; + pthread_cond_destroy(&conf->cond); + } + fprintf(stderr, "Media shutdown complete\n"); + TAILQ_REMOVE(&mediaq, media, entry); + hammer2_free(media); + } + /* * Clean out all relays. This requires terminating each * relay transaction. @@ -470,6 +555,7 @@ hammer2_lnk_conn(hammer2_msg_t *msg) /* * Clean out conn */ + conn->media = NULL; conn->state = NULL; msg->state->any.conn = NULL; TAILQ_REMOVE(&connq, conn, entry); @@ -477,6 +563,49 @@ hammer2_lnk_conn(hammer2_msg_t *msg) hammer2_msg_reply(msg, 0); /* state invalid after reply */ + break; + case HAMMER2_LNK_VOLCONF: + /* + * One-way volume-configuration message is transmitted + * over the open LNK_CONN transaction. + */ + fprintf(stderr, "RECEIVED VOLCONF\n"); + if (msg->any.lnk_volconf.index < 0 || + msg->any.lnk_volconf.index >= HAMMER2_COPYID_COUNT) { + fprintf(stderr, "VOLCONF: ILLEGAL INDEX %d\n", + msg->any.lnk_volconf.index); + break; + } + if (msg->any.lnk_volconf.copy.path[sizeof(msg->any.lnk_volconf.copy.path) - 1] != 0 || + msg->any.lnk_volconf.copy.path[0] == 0) { + fprintf(stderr, "VOLCONF: ILLEGAL PATH %d\n", + msg->any.lnk_volconf.index); + break; + } + conn = msg->state->any.conn; + if (conn == NULL) { + fprintf(stderr, "VOLCONF: LNK_CONN is missing\n"); + break; + } + conf = &conn->media->config[msg->any.lnk_volconf.index]; + conf->copy_pend = msg->any.lnk_volconf.copy; + conf->ctl |= H2CONFCTL_UPDATE; + if (conf->thread == NULL) { + fprintf(stderr, "VOLCONF THREAD STARTED\n"); + pthread_cond_init(&conf->cond, NULL); + pthread_create(&conf->thread, NULL, + hammer2_volconf_thread, (void *)conf); + } + pthread_cond_signal(&conf->cond); + break; + default: + /* + * Failsafe + */ + if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) + goto deleteconn; + hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP); + break; } pthread_mutex_unlock(&cluster_mtx); } @@ -918,6 +1047,92 @@ hammer2_relay_delete(h2span_relay_t *relay) hammer2_free(relay); } +static void * +hammer2_volconf_thread(void *info) +{ + h2span_media_config_t *conf = info; + + pthread_mutex_lock(&cluster_mtx); + while ((conf->ctl & H2CONFCTL_STOP) == 0) { + if (conf->ctl & H2CONFCTL_UPDATE) { + fprintf(stderr, "VOLCONF UPDATE\n"); + conf->ctl &= ~H2CONFCTL_UPDATE; + if (bcmp(&conf->copy_run, &conf->copy_pend, + sizeof(conf->copy_run)) == 0) { + fprintf(stderr, "VOLCONF: no changes\n"); + continue; + } + /* + * XXX TODO - auto reconnect on lookup failure or + * connect failure or stream failure. + */ + + pthread_mutex_unlock(&cluster_mtx); + hammer2_volconf_stop(conf); + conf->copy_run = conf->copy_pend; + if (conf->copy_run.copyid != 0 && + strncmp(conf->copy_run.path, "span:", 5) == 0) { + hammer2_volconf_start(conf, + conf->copy_run.path + 5); + } + pthread_mutex_lock(&cluster_mtx); + fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state); + } + if (conf->state == H2MC_CONNECT) { + hammer2_volconf_start(conf, conf->copy_run.path + 5); + pthread_mutex_unlock(&cluster_mtx); + sleep(5); + pthread_mutex_lock(&cluster_mtx); + } else { + pthread_cond_wait(&conf->cond, &cluster_mtx); + } + } + pthread_mutex_unlock(&cluster_mtx); + hammer2_volconf_stop(conf); + return(NULL); +} + +static +void +hammer2_volconf_stop(h2span_media_config_t *conf) +{ + switch(conf->state) { + case H2MC_STOPPED: + break; + case H2MC_CONNECT: + conf->state = H2MC_STOPPED; + break; + case H2MC_RUNNING: + shutdown(conf->fd, SHUT_WR); + pthread_join(conf->iocom_thread, NULL); + conf->iocom_thread = NULL; + break; + } +} + +static +void +hammer2_volconf_start(h2span_media_config_t *conf, const char *hostname) +{ + switch(conf->state) { + case H2MC_STOPPED: + case H2MC_CONNECT: + conf->fd = hammer2_connect(hostname); + if (conf->fd < 0) { + fprintf(stderr, "Unable to connect to %s\n", hostname); + conf->state = H2MC_CONNECT; + } else { + pthread_create(&conf->iocom_thread, NULL, + master_service, + (void *)(intptr_t)conf->fd); + conf->state = H2MC_RUNNING; + } + break; + case H2MC_RUNNING: + break; + } +} + /************************************************************************ * ROUTER AND MESSAGING HANDLES * ************************************************************************ diff --git a/sbin/hammer2/network.h b/sbin/hammer2/network.h index 46399fca0c..565d38759a 100644 --- a/sbin/hammer2/network.h +++ b/sbin/hammer2/network.h @@ -245,8 +245,8 @@ struct hammer2_router { struct hammer2_state_tree statewr_tree; /* active messages */ hammer2_msg_queue_t txmsgq; /* tx msgq from remote */ uint64_t target; /* for routing */ - int flags; - int refs; /* refs prevent destruction */ + int flags; + int refs; /* refs prevent destruction */ }; #define HAMMER2_ROUTER_CONNECTED 0x0001 /* on global RB tree */ @@ -288,12 +288,6 @@ typedef struct hammer2_iocom hammer2_iocom_t; #define HAMMER2_IOCOMF_SWORK 0x00000100 /* immediate work pending */ #define HAMMER2_IOCOMF_CRYPTED 0x00000200 /* encrypt enabled */ - - - - - - /* * Crypto algorithm table and related typedefs. */ diff --git a/sys/vfs/hammer2/hammer2.h b/sys/vfs/hammer2/hammer2.h index 068f94fc21..3000763af0 100644 --- a/sys/vfs/hammer2/hammer2.h +++ b/sys/vfs/hammer2/hammer2.h @@ -338,6 +338,7 @@ struct hammer2_pfsmount { uint32_t msgid_iterator; struct lock msglk; /* lockmgr lock */ TAILQ_HEAD(, hammer2_msg) msgq; /* transmit queue */ + struct hammer2_state *conn_state; /* active LNK_CONN state */ struct hammer2_state *freerd_state; /* allocation cache */ struct hammer2_state *freewr_state; /* allocation cache */ struct hammer2_state_tree staterd_tree; /* active messages */ @@ -347,7 +348,11 @@ struct hammer2_pfsmount { typedef struct hammer2_pfsmount hammer2_pfsmount_t; -#define HAMMER2_CLUSTERCTL_KILL 0x0001 +/* + * msg_ctl flags (atomic) + */ +#define HAMMER2_CLUSTERCTL_KILL 0x00000001 +#define HAMMER2_CLUSTERCTL_SLEEPING 0x00000002 /* interlocked w/msglk */ /* * Transactional state structure, representing an open transaction. The @@ -557,10 +562,10 @@ void hammer2_state_cleanuptx(hammer2_msg_t *msg); int hammer2_msg_execute(hammer2_msg_t *msg); void hammer2_state_free(hammer2_state_t *state); void hammer2_msg_free(hammer2_msg_t *msg); -hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd); -void hammer2_msg_write(hammer2_msg_t *msg, +hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd, int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data); +void hammer2_msg_write(hammer2_msg_t *msg); void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error); void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error); @@ -570,6 +575,13 @@ void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error); int hammer2_msg_dbg_rcvmsg(hammer2_msg_t *msg); int hammer2_msg_adhoc_input(hammer2_msg_t *msg); +/* + * hammer2_vfsops.c + */ +void hammer2_clusterctl_wakeup(hammer2_pfsmount_t *pmp); +void hammer2_volconf_update(hammer2_pfsmount_t *pmp, int index); +void hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp); + /* * hammer2_freemap.c */ diff --git a/sys/vfs/hammer2/hammer2_ioctl.c b/sys/vfs/hammer2/hammer2_ioctl.c index 7df7e88019..56aaab27ac 100644 --- a/sys/vfs/hammer2/hammer2_ioctl.c +++ b/sys/vfs/hammer2/hammer2_ioctl.c @@ -43,7 +43,8 @@ #include "hammer2.h" static int hammer2_ioctl_version_get(hammer2_inode_t *ip, void *data); -static int hammer2_ioctl_remote_get(hammer2_inode_t *ip, void *data); +static int hammer2_ioctl_recluster(hammer2_inode_t *ip, void *data); +static int hammer2_ioctl_remote_scan(hammer2_inode_t *ip, void *data); static int hammer2_ioctl_remote_add(hammer2_inode_t *ip, void *data); static int hammer2_ioctl_remote_del(hammer2_inode_t *ip, void *data); static int hammer2_ioctl_remote_rep(hammer2_inode_t *ip, void *data); @@ -72,9 +73,13 @@ hammer2_ioctl(hammer2_inode_t *ip, u_long com, void *data, int fflag, case HAMMER2IOC_VERSION_GET: error = hammer2_ioctl_version_get(ip, data); break; - case HAMMER2IOC_REMOTE_GET: + case HAMMER2IOC_RECLUSTER: if (error == 0) - error = hammer2_ioctl_remote_get(ip, data); + error = hammer2_ioctl_recluster(ip, data); + break; + case HAMMER2IOC_REMOTE_SCAN: + if (error == 0) + error = hammer2_ioctl_remote_scan(ip, data); break; case HAMMER2IOC_REMOTE_ADD: if (error == 0) @@ -139,11 +144,27 @@ hammer2_ioctl_version_get(hammer2_inode_t *ip, void *data) return 0; } +static int +hammer2_ioctl_recluster(hammer2_inode_t *ip, void *data) +{ + hammer2_ioc_recluster_t *recl = data; + struct file *fp; + + fp = holdfp(curproc->p_fd, recl->fd, -1); + if (fp) { + kprintf("reconnect to cluster\n"); + hammer2_cluster_reconnect(ip->pmp, fp); + return 0; + } else { + return EINVAL; + } +} + /* * Retrieve information about a remote */ static int -hammer2_ioctl_remote_get(hammer2_inode_t *ip, void *data) +hammer2_ioctl_remote_scan(hammer2_inode_t *ip, void *data) { hammer2_mount_t *hmp = ip->hmp; hammer2_ioc_remote_t *remote = data; @@ -161,7 +182,7 @@ hammer2_ioctl_remote_get(hammer2_inode_t *ip, void *data) */ while (++copyid < HAMMER2_COPYID_COUNT && hmp->voldata.copyinfo[copyid].copyid == 0) { - ++copyid; + ; } if (copyid == HAMMER2_COPYID_COUNT) remote->nextid = -1; @@ -178,6 +199,7 @@ static int hammer2_ioctl_remote_add(hammer2_inode_t *ip, void *data) { hammer2_mount_t *hmp = ip->hmp; + hammer2_pfsmount_t *pmp = ip->pmp; hammer2_ioc_remote_t *remote = data; int copyid = remote->copyid; int error = 0; @@ -197,9 +219,9 @@ hammer2_ioctl_remote_add(hammer2_inode_t *ip, void *data) } } hammer2_modify_volume(hmp); - kprintf("copyid %d\n", copyid); remote->copy1.copyid = copyid; hmp->voldata.copyinfo[copyid] = remote->copy1; + hammer2_volconf_update(pmp, copyid); failed: hammer2_voldata_unlock(hmp); return (error); @@ -212,6 +234,7 @@ static int hammer2_ioctl_remote_del(hammer2_inode_t *ip, void *data) { hammer2_mount_t *hmp = ip->hmp; + hammer2_pfsmount_t *pmp = ip->pmp; hammer2_ioc_remote_t *remote = data; int copyid = remote->copyid; int error = 0; @@ -236,6 +259,7 @@ hammer2_ioctl_remote_del(hammer2_inode_t *ip, void *data) } hammer2_modify_volume(hmp); hmp->voldata.copyinfo[copyid].copyid = 0; + hammer2_volconf_update(pmp, copyid); failed: hammer2_voldata_unlock(hmp); return (error); @@ -255,6 +279,7 @@ hammer2_ioctl_remote_rep(hammer2_inode_t *ip, void *data) return (EINVAL); hammer2_voldata_lock(hmp); + /*hammer2_volconf_update(pmp, copyid);*/ hammer2_voldata_unlock(hmp); return(0); diff --git a/sys/vfs/hammer2/hammer2_ioctl.h b/sys/vfs/hammer2/hammer2_ioctl.h index 6d96b28458..f0f13e684c 100644 --- a/sys/vfs/hammer2/hammer2_ioctl.h +++ b/sys/vfs/hammer2/hammer2_ioctl.h @@ -55,6 +55,13 @@ struct hammer2_ioc_version { typedef struct hammer2_ioc_version hammer2_ioc_version_t; +struct hammer2_ioc_recluster { + int fd; + char reserved[256 - 4]; +}; + +typedef struct hammer2_ioc_recluster hammer2_ioc_recluster_t; + /* * Ioctls to manage the volume->copyinfo[] array and to associate or * disassociate sockets @@ -114,8 +121,9 @@ typedef struct hammer2_ioc_inode hammer2_ioc_inode_t; */ #define HAMMER2IOC_VERSION_GET _IOWR('h', 64, struct hammer2_ioc_version) +#define HAMMER2IOC_RECLUSTER _IOWR('h', 65, struct hammer2_ioc_recluster) -#define HAMMER2IOC_REMOTE_GET _IOWR('h', 68, struct hammer2_ioc_remote) +#define HAMMER2IOC_REMOTE_SCAN _IOWR('h', 68, struct hammer2_ioc_remote) #define HAMMER2IOC_REMOTE_ADD _IOWR('h', 69, struct hammer2_ioc_remote) #define HAMMER2IOC_REMOTE_DEL _IOWR('h', 70, struct hammer2_ioc_remote) #define HAMMER2IOC_REMOTE_REP _IOWR('h', 71, struct hammer2_ioc_remote) diff --git a/sys/vfs/hammer2/hammer2_msg.c b/sys/vfs/hammer2/hammer2_msg.c index ef84f340a1..272fa5a8c2 100644 --- a/sys/vfs/hammer2/hammer2_msg.c +++ b/sys/vfs/hammer2/hammer2_msg.c @@ -585,14 +585,17 @@ hammer2_state_free(hammer2_state_t *state) } hammer2_msg_t * -hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd) +hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd, + int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data) { + hammer2_pfsmount_t *pmp = router->pmp; hammer2_msg_t *msg; + hammer2_state_t *state; size_t hbytes; hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN; msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes, - router->pmp->mmsg, M_WAITOK | M_ZERO); + pmp->mmsg, M_WAITOK | M_ZERO); msg->hdr_size = hbytes; msg->router = router; KKASSERT(router != NULL); @@ -601,6 +604,32 @@ hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd) msg->any.head.target = router->target; msg->any.head.cmd = cmd; + if (cmd & HAMMER2_MSGF_CREATE) { + /* + * New transaction, requires tracking state and a unique + * msgid to be allocated. + */ + KKASSERT(msg->state == NULL); + state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO); + state->pmp = pmp; + state->flags = HAMMER2_STATE_DYNAMIC; + state->func = func; + state->any.any = data; + state->msg = msg; + state->msgid = (uint64_t)(uintptr_t)state; + state->router = msg->router; + msg->state = state; + msg->any.head.source = 0; + msg->any.head.target = state->router->target; + msg->any.head.msgid = state->msgid; + + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state)) + panic("duplicate msgid allocated"); + msg->any.head.msgid = state->msgid; + lockmgr(&pmp->msglk, LK_RELEASE); + } + return (msg); } @@ -653,8 +682,7 @@ hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2) * does not write to the message socket/pipe. */ void -hammer2_msg_write(hammer2_msg_t *msg, - int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data) +hammer2_msg_write(hammer2_msg_t *msg) { hammer2_pfsmount_t *pmp = msg->router->pmp; hammer2_state_t *state; @@ -672,33 +700,6 @@ hammer2_msg_write(hammer2_msg_t *msg, msg->any.head.source = 0; msg->any.head.target = state->router->target; lockmgr(&pmp->msglk, LK_EXCLUSIVE); - if (func) { - state->func = func; - state->any.any = data; - } - } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) { - /* - * New transaction, requires tracking state and a unique - * msgid to be allocated. - */ - KKASSERT(msg->state == NULL); - state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO); - state->pmp = pmp; - state->flags = HAMMER2_STATE_DYNAMIC; - state->func = func; - state->any.any = data; - state->msg = msg; - state->msgid = (uint64_t)(uintptr_t)state; - state->router = msg->router; - msg->state = state; - msg->any.head.source = 0; - msg->any.head.target = state->router->target; - msg->any.head.msgid = state->msgid; - - lockmgr(&pmp->msglk, LK_EXCLUSIVE); - if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state)) - panic("duplicate msgid allocated"); - msg->any.head.msgid = state->msgid; } else { /* * One-off message (always uses msgid 0 to distinguish @@ -721,6 +722,7 @@ hammer2_msg_write(hammer2_msg_t *msg, msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size); TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry); + hammer2_clusterctl_wakeup(pmp); lockmgr(&pmp->msglk, LK_RELEASE); } @@ -765,10 +767,13 @@ hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error) cmd |= HAMMER2_MSGF_REPLY; } - nmsg = hammer2_msg_alloc(msg->router, cmd); + /* XXX messy mask cmd to avoid allocating state */ + nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK, + NULL, NULL); + nmsg->any.head.cmd = cmd; nmsg->any.head.error = error; nmsg->state = state; - hammer2_msg_write(nmsg, NULL, NULL); + hammer2_msg_write(nmsg); } /* @@ -813,8 +818,11 @@ hammer2_msg_result(hammer2_msg_t *msg, uint32_t error) cmd |= HAMMER2_MSGF_REPLY; } - nmsg = hammer2_msg_alloc(msg->router, cmd); + /* XXX messy mask cmd to avoid allocating state */ + nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK, + NULL, NULL); + nmsg->any.head.cmd = cmd; nmsg->any.head.error = error; nmsg->state = state; - hammer2_msg_write(nmsg, NULL, NULL); + hammer2_msg_write(nmsg); } diff --git a/sys/vfs/hammer2/hammer2_network.h b/sys/vfs/hammer2/hammer2_network.h index 882141ce97..533e1152a3 100644 --- a/sys/vfs/hammer2/hammer2_network.h +++ b/sys/vfs/hammer2/hammer2_network.h @@ -314,37 +314,44 @@ typedef struct hammer2_msg_hdr hammer2_msg_hdr_t; * Link layer ops basically talk to just the other side of a direct * connection. * - * PAD - One-way message on link-0, ignored by target. Used to + * LNK_PAD - One-way message on link-0, ignored by target. Used to * pad message buffers on shared-memory transports. Not * typically used with TCP. * - * PING - One-way message on link-0, keep-alive, run by both sides + * LNK_PING - One-way message on link-0, keep-alive, run by both sides * typically 1/sec on idle link, link is lost after 10 seconds * of inactivity. * - * AUTH - Authenticate the connection, negotiate administrative + * LNK_AUTH - Authenticate the connection, negotiate administrative * rights & encryption, protocol class, etc. Only PAD and * AUTH messages (not even PING) are accepted until * authentication is complete. This message also identifies * the host. * - * CONN - Enable the SPAN protocol on link-0, possibly also installing + * LNK_CONN - Enable the SPAN protocol on link-0, possibly also installing * a PFS filter (by cluster id, unique id, and/or wildcarded * name). * - * SPAN - A SPAN transaction on link-0 enables messages to be relayed + * LNK_SPAN - A SPAN transaction on link-0 enables messages to be relayed * to/from a particular cluster node. SPANs are received, * sorted, aggregated, and retransmitted back out across all * applicable connections. * * The leaf protocol also uses this to make a PFS available * to the cluster (e.g. on-mount). + * + * LNK_VOLCONF - Volume header configuration change. All hammer2 + * connections (hammer2 connect ...) stored in the volume + * header are spammed at the link level to the hammer2 + * service daemon, and any live configuration change + * thereafter. */ #define HAMMER2_LNK_PAD HAMMER2_MSG_LNK(0x000, hammer2_msg_hdr) #define HAMMER2_LNK_PING HAMMER2_MSG_LNK(0x001, hammer2_msg_hdr) #define HAMMER2_LNK_AUTH HAMMER2_MSG_LNK(0x010, hammer2_lnk_auth) #define HAMMER2_LNK_CONN HAMMER2_MSG_LNK(0x011, hammer2_lnk_conn) #define HAMMER2_LNK_SPAN HAMMER2_MSG_LNK(0x012, hammer2_lnk_span) +#define HAMMER2_LNK_VOLCONF HAMMER2_MSG_LNK(0x020, hammer2_lnk_volconf) #define HAMMER2_LNK_ERROR HAMMER2_MSG_LNK(0xFFF, hammer2_msg_hdr) /* @@ -368,6 +375,7 @@ struct hammer2_lnk_auth { struct hammer2_lnk_conn { hammer2_msg_hdr_t head; + uuid_t mediaid; /* media configuration id */ uuid_t pfs_clid; /* rendezvous pfs uuid */ uuid_t pfs_fsid; /* unique pfs uuid */ uint8_t pfs_type; /* peer type */ @@ -442,6 +450,20 @@ typedef struct hammer2_lnk_span hammer2_lnk_span_t; #define HAMMER2_SPAN_PROTO_1 1 +/* + * LNK_VOLCONF + */ +struct hammer2_lnk_volconf { + hammer2_msg_hdr_t head; + hammer2_copy_data_t copy; /* copy spec */ + int32_t index; + int32_t unused01; + uuid_t mediaid; + int64_t reserved02[32]; +}; + +typedef struct hammer2_lnk_volconf hammer2_lnk_volconf_t; + /* * Debug layer ops operate on any link * @@ -539,6 +561,7 @@ union hammer2_msg_any { hammer2_msg_hdr_t head; hammer2_lnk_span_t lnk_span; hammer2_lnk_conn_t lnk_conn; + hammer2_lnk_volconf_t lnk_volconf; }; typedef union hammer2_msg_any hammer2_msg_any_t; diff --git a/sys/vfs/hammer2/hammer2_vfsops.c b/sys/vfs/hammer2/hammer2_vfsops.c index 3bb3864f90..7ebe6a5124 100644 --- a/sys/vfs/hammer2/hammer2_vfsops.c +++ b/sys/vfs/hammer2/hammer2_vfsops.c @@ -1006,6 +1006,26 @@ hammer2_install_volume_header(hammer2_mount_t *hmp) return (error); } +/* + * Reconnect using the passed file pointer. The caller must ref the + * fp for us. + */ +void +hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp) +{ + atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL); + while (pmp->msgrd_td || pmp->msgwr_td) { + wakeup(&pmp->msg_ctl); + tsleep(pmp, 0, "clstrkl", hz); + } + atomic_clear_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL); + pmp->msg_fp = fp; + lwkt_create(hammer2_cluster_thread_rd, pmp, &pmp->msgrd_td, + NULL, 0, -1, "hammer2-msgrd"); + lwkt_create(hammer2_cluster_thread_wr, pmp, &pmp->msgwr_td, + NULL, 0, -1, "hammer2-msgwr"); +} + /* * Cluster controller thread. Perform messaging functions. We have one * thread for the reader and one for the writer. The writer handles @@ -1043,7 +1063,10 @@ hammer2_cluster_thread_rd(void *arg) error = EINVAL; break; } - msg = hammer2_msg_alloc(&pmp->router, hdr.cmd); + /* XXX messy: mask cmd to avoid allocating state */ + msg = hammer2_msg_alloc(&pmp->router, + hdr.cmd & HAMMER2_MSGF_BASECMDMASK, + NULL, NULL); msg->any.head = hdr; msg->hdr_size = hbytes; if (hbytes > sizeof(hdr)) { @@ -1136,9 +1159,30 @@ hammer2_cluster_thread_rd(void *arg) hammer2_state_free(state); } + /* + * XXX simulate MSGF_DELETEs + */ while ((state = RB_ROOT(&pmp->staterd_tree)) != NULL) { - RB_REMOVE(hammer2_state_tree, &pmp->staterd_tree, state); - hammer2_state_free(state); + kprintf("y"); + if (state->func && + (state->txcmd & HAMMER2_MSGF_DELETE) == 0 && + (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) { + lockmgr(&pmp->msglk, LK_RELEASE); + msg = hammer2_msg_alloc(&pmp->router, + HAMMER2_LNK_ERROR, + NULL, NULL); + if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) + msg->any.head.cmd |= HAMMER2_MSGF_CREATE; + msg->any.head.cmd |= HAMMER2_MSGF_DELETE; + msg->state = state; + msg->state->func(state, msg); + hammer2_state_cleanuprx(msg); + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + } else { + RB_REMOVE(hammer2_state_tree, + &pmp->staterd_tree, state); + hammer2_state_free(state); + } } lockmgr(&pmp->msglk, LK_RELEASE); @@ -1172,7 +1216,8 @@ hammer2_cluster_thread_wr(void *arg) * connection. */ msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_CONN | - HAMMER2_MSGF_CREATE); + HAMMER2_MSGF_CREATE, + hammer2_msg_conn_reply, pmp); msg->any.lnk_conn.pfs_clid = pmp->iroot->ip_data.pfs_clid; msg->any.lnk_conn.pfs_fsid = pmp->iroot->ip_data.pfs_fsid; msg->any.lnk_conn.pfs_type = pmp->iroot->ip_data.pfs_type; @@ -1181,8 +1226,9 @@ hammer2_cluster_thread_wr(void *arg) if (name_len >= sizeof(msg->any.lnk_conn.label)) name_len = sizeof(msg->any.lnk_conn.label) - 1; bcopy(pmp->iroot->ip_data.filename, msg->any.lnk_conn.label, name_len); + pmp->conn_state = msg->state; msg->any.lnk_conn.label[name_len] = 0; - hammer2_msg_write(msg, hammer2_msg_conn_reply, pmp); + hammer2_msg_write(msg); /* * Transmit loop @@ -1191,7 +1237,18 @@ hammer2_cluster_thread_wr(void *arg) lockmgr(&pmp->msglk, LK_EXCLUSIVE); while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0 && error == 0) { - lksleep(&pmp->msg_ctl, &pmp->msglk, 0, "msgwr", hz); + /* + * Sleep if no messages pending. Interlock with flag while + * holding msglk. + */ + if (TAILQ_EMPTY(&pmp->msgq)) { + atomic_set_int(&pmp->msg_ctl, + HAMMER2_CLUSTERCTL_SLEEPING); + lksleep(&pmp->msg_ctl, &pmp->msglk, 0, "msgwr", hz); + atomic_clear_int(&pmp->msg_ctl, + HAMMER2_CLUSTERCTL_SLEEPING); + } + while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) { /* * Remove msg from the transmit queue and do @@ -1263,9 +1320,30 @@ hammer2_cluster_thread_wr(void *arg) hammer2_state_free(state); } + /* + * XXX simulate MSGF_DELETEs + */ while ((state = RB_ROOT(&pmp->statewr_tree)) != NULL) { - RB_REMOVE(hammer2_state_tree, &pmp->statewr_tree, state); - hammer2_state_free(state); + kprintf("x"); + if (state->func && + (state->txcmd & HAMMER2_MSGF_DELETE) == 0 && + (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) { + lockmgr(&pmp->msglk, LK_RELEASE); + msg = hammer2_msg_alloc(&pmp->router, + HAMMER2_LNK_ERROR, + NULL, NULL); + if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) + msg->any.head.cmd |= HAMMER2_MSGF_CREATE; + msg->any.head.cmd |= HAMMER2_MSGF_DELETE; + msg->state = state; + msg->state->func(state, msg); + hammer2_state_cleanuprx(msg); + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + } else { + RB_REMOVE(hammer2_state_tree, + &pmp->statewr_tree, state); + hammer2_state_free(state); + } } lockmgr(&pmp->msglk, LK_RELEASE); @@ -1281,6 +1359,20 @@ hammer2_cluster_thread_wr(void *arg) lwkt_exit(); } +/* + * Called with msglk held after queueing a new message, wakes up the + * transmit thread. We use an interlock thread to avoid unnecessary + * wakeups. + */ +void +hammer2_clusterctl_wakeup(hammer2_pfsmount_t *pmp) +{ + if (pmp->msg_ctl & HAMMER2_CLUSTERCTL_SLEEPING) { + atomic_clear_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_SLEEPING); + wakeup(&pmp->msg_ctl); + } +} + static int hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg) { @@ -1315,12 +1407,15 @@ static int hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg) { hammer2_pfsmount_t *pmp = state->any.pmp; + hammer2_mount_t *hmp = pmp->hmp; size_t name_len; + int copyid; if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) { kprintf("LNK_CONN transaction replied to, initiate SPAN\n"); msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN | - HAMMER2_MSGF_CREATE); + HAMMER2_MSGF_CREATE, + hammer2_msg_span_reply, pmp); msg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid; msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid; msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type; @@ -1332,10 +1427,22 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg) msg->any.lnk_span.label, name_len); msg->any.lnk_span.label[name_len] = 0; - hammer2_msg_write(msg, hammer2_msg_span_reply, pmp); + hammer2_msg_write(msg); + + /* + * Dump the configuration stored in the volume header + */ + hammer2_voldata_lock(hmp); + for (copyid = 0; copyid < HAMMER2_COPYID_COUNT; ++copyid) { + if (hmp->voldata.copyinfo[copyid].copyid == 0) + continue; + hammer2_volconf_update(pmp, copyid); + } + hammer2_voldata_unlock(hmp); } if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { kprintf("LNK_CONN transaction terminated by remote\n"); + pmp->conn_state = NULL; hammer2_msg_reply(msg, 0); } return(0); @@ -1349,3 +1456,27 @@ hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg) kprintf("SPAN REPLY - Our span was terminated? %p\n", pmp); return(0); } + +/* + * Volume configuration updates are passed onto the userland service + * daemon via the open LNK_CONN transaction. + */ +void +hammer2_volconf_update(hammer2_pfsmount_t *pmp, int index) +{ + hammer2_mount_t *hmp = pmp->hmp; + hammer2_msg_t *msg; + + /* XXX interlock against connection state termination */ + kprintf("volconf update %p\n", pmp->conn_state); + if (pmp->conn_state) { + kprintf("TRANSMIT VOLCONF VIA OPEN CONN TRANSACTION\n"); + msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_VOLCONF, + NULL, NULL); + msg->state = pmp->conn_state; + msg->any.lnk_volconf.copy = hmp->voldata.copyinfo[index]; + msg->any.lnk_volconf.mediaid = hmp->voldata.fsid; + msg->any.lnk_volconf.index = index; + hammer2_msg_write(msg); + } +} -- 2.41.0