From: Matthew Dillon Date: Sat, 11 Aug 2012 21:56:01 +0000 (-0700) Subject: Merge branches 'hammer2' and 'master' of ssh://crater.dragonflybsd.org/repository... X-Git-Tag: v3.4.0rc~1019 X-Git-Url: http://gitweb.dragonflybsd.org/dragonfly.git/commitdiff_plain/e1648a68d2a340848f1c8ec7bd695c6285927392 Merge branches 'hammer2' and 'master' of ssh://crater.dragonflybsd.org/repository/git/dragonfly into hammer2 --- e1648a68d2a340848f1c8ec7bd695c6285927392 diff --cc sbin/dhclient/dhclient.c index 9d421ed,9d421ed..7ec01ff --- a/sbin/dhclient/dhclient.c +++ b/sbin/dhclient/dhclient.c @@@ -170,15 -170,15 +170,26 @@@ routehandler(void struct iaddr a; ssize_t n; ++ /* ++ * Read one message non-blocking, ignore signals. ++ */ do { n = read(routefd, &msg, sizeof(msg)); } while (n == -1 && errno == EINTR); ++ /* ++ * No operation if no messages ready or the message is ++ * not sized properly. ++ */ rtm = (struct rt_msghdr *)msg; if (n < sizeof(rtm->rtm_msglen) || n < rtm->rtm_msglen || -- rtm->rtm_version != RTM_VERSION) ++ rtm->rtm_version != RTM_VERSION) { return; ++ } ++ /* ++ * Process the message. ++ */ switch (rtm->rtm_type) { case RTM_NEWADDR: ifam = (struct ifa_msghdr *)rtm; @@@ -187,8 -187,8 +198,10 @@@ if (findproto((char *)(ifam + 1), ifam->ifam_addrs) != AF_INET) break; sa = get_ifa((char *)(ifam + 1), ifam->ifam_addrs); -- if (sa == NULL) ++ if (sa == NULL) { ++ warning("RTM_NEWADDR: No IFA"); goto die; ++ } if ((a.len = sizeof(struct in_addr)) > sizeof(a.iabuf)) error("king bula sez: len mismatch"); @@@ -205,6 -205,6 +218,7 @@@ /* new addr is the one we set */ break; ++ warning("RTM_NEWADDR: Our IFA not found"); goto die; case RTM_DELADDR: ifam = (struct ifa_msghdr *)rtm; @@@ -214,13 -214,13 +228,16 @@@ break; if (scripttime == 0 || t < scripttime + 10) break; ++ warning("RTM_DELADDR: Third-party %d", (int)(t - scripttime)); goto die; case RTM_IFINFO: ifm = (struct if_msghdr *)rtm; if (ifm->ifm_index != ifi->index) break; -- if ((rtm->rtm_flags & RTF_UP) == 0) ++ if ((rtm->rtm_flags & RTF_UP) == 0) { ++ warning("RTM_IFINFO: Interface not up"); goto die; ++ } linkstat = LINK_STATE_IS_UP(ifm->ifm_data.ifi_link_state) ? 1 : 0; @@@ -238,8 -238,8 +255,10 @@@ case RTM_IFANNOUNCE: ifan = (struct if_announcemsghdr *)rtm; if (ifan->ifan_what == IFAN_DEPARTURE && -- ifan->ifan_index == ifi->index) ++ ifan->ifan_index == ifi->index) { ++ warning("RTM_IFANNOUNCE: Interface departure"); goto die; ++ } break; default: break; diff --cc sbin/hammer2/cmd_service.c index 82ca26b,0000000..7564773 mode 100644,000000..100644 --- a/sbin/hammer2/cmd_service.c +++ b/sbin/hammer2/cmd_service.c @@@ -1,280 -1,0 +1,349 @@@ +/* + * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved. + * + * This code is derived from software contributed to The DragonFly Project + * by Matthew Dillon + * by Venkatesh Srinivas + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of The DragonFly Project nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific, prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "hammer2.h" + +static void *master_accept(void *data); +static void master_auth_signal(hammer2_router_t *router); +static void master_auth_rxmsg(hammer2_msg_t *msg); +static void master_link_signal(hammer2_router_t *router); +static void master_link_rxmsg(hammer2_msg_t *msg); ++static void master_reconnect(const char *mntpt); + +/* + * Start-up the master listener daemon for the machine. + * + * The master listener serves as a rendezvous point in the cluster, accepting + * connections, performing registrations and authentications, maintaining + * the spanning tree, and keeping track of message state so disconnects can + * be handled properly. + * + * Once authenticated only low-level messaging protocols (which includes + * tracking persistent messages) are handled by this daemon. This daemon + * does not run the higher level quorum or locking protocols. + * + * This daemon can also be told to maintain connections to other nodes, + * forming a messaging backbone, which in turn allows PFS's (if desired) to + * simply connect to the master daemon via localhost if desired. + * Backbones are specified via /etc/hammer2.conf. + */ +int +cmd_service(void) +{ + struct sockaddr_in lsin; + int on; + int lfd; + + /* + * Acquire socket and set options + */ + if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + fprintf(stderr, "master_listen: socket(): %s\n", + strerror(errno)); + return 1; + } + on = 1; + setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); + + /* + * Setup listen port and try to bind. If the bind fails we assume + * that a master listener process is already running and silently + * fail. + */ + bzero(&lsin, sizeof(lsin)); + lsin.sin_family = AF_INET; + lsin.sin_addr.s_addr = INADDR_ANY; + lsin.sin_port = htons(HAMMER2_LISTEN_PORT); + if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) { + close(lfd); + if (QuietOpt == 0) { + fprintf(stderr, + "master listen: daemon already running\n"); + } + return 0; + } + if (QuietOpt == 0) + fprintf(stderr, "master listen: startup\n"); + listen(lfd, 50); + + /* + * Fork and disconnect the controlling terminal and parent process, + * executing the specified function as a pthread. + * + * Returns to the original process which can then continue running. + * In debug mode this call will create the pthread without forking + * and set NormalExit to 0, instead of fork. + */ + hammer2_demon(master_accept, (void *)(intptr_t)lfd); + if (NormalExit) + close(lfd); + return 0; +} + +/* + * Master listen/accept thread. Accept connections on the master socket, + * starting a pthread for each one. + */ +static +void * +master_accept(void *data) +{ + struct sockaddr_in asin; + socklen_t alen; + pthread_t thread; ++ hammer2_master_service_info_t *info; + int lfd = (int)(intptr_t)data; + int fd; ++ int i; ++ int count; ++ struct statfs *mntbuf = NULL; ++ struct statvfs *mntvbuf = NULL; + + /* + * Nobody waits for us + */ + setproctitle("hammer2 master listen"); + pthread_detach(pthread_self()); + + /* ++ * Scan existing hammer2 mounts and reconnect to them using ++ * HAMMER2IOC_RECLUSTER. ++ */ ++ count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT); ++ for (i = 0; i < count; ++i) { ++ if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0) ++ master_reconnect(mntbuf[i].f_mntonname); ++ } ++ ++ /* + * Accept connections and create pthreads to handle them after + * validating the IP. + */ + for (;;) { + alen = sizeof(asin); + fd = accept(lfd, (struct sockaddr *)&asin, &alen); + if (fd < 0) { + if (errno == EINTR) + continue; + break; + } + thread = NULL; + fprintf(stderr, "master_accept: accept fd %d\n", fd); - pthread_create(&thread, NULL, - master_service, (void *)(intptr_t)fd); ++ info = malloc(sizeof(*info)); ++ bzero(info, sizeof(*info)); ++ info->fd = fd; ++ info->detachme = 1; ++ pthread_create(&thread, NULL, master_service, info); + } + return (NULL); +} + +/* ++ * Normally the mount program supplies a cluster communications ++ * descriptor to the hammer2 vfs on mount, but if you kill the service ++ * daemon and restart it that link will be lost. ++ * ++ * This procedure attempts to [re]connect to existing mounts when ++ * the service daemon is started up before going into its accept ++ * loop. ++ */ ++static ++void ++master_reconnect(const char *mntpt) ++{ ++ struct hammer2_ioc_recluster recls; ++ hammer2_master_service_info_t *info; ++ pthread_t thread; ++ int fd; ++ int pipefds[2]; ++ ++ fd = open(mntpt, O_RDONLY); ++ if (fd < 0) { ++ fprintf(stderr, "reconnect %s: no access to mount\n", mntpt); ++ return; ++ } ++ if (pipe(pipefds) < 0) { ++ fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt); ++ return; ++ } ++ bzero(&recls, sizeof(recls)); ++ recls.fd = pipefds[0]; ++ if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) { ++ fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt); ++ close(pipefds[0]); ++ close(pipefds[1]); ++ close(fd); ++ return; ++ } ++ close(pipefds[0]); ++ ++ info = malloc(sizeof(*info)); ++ bzero(info, sizeof(*info)); ++ info->fd = pipefds[1]; ++ info->detachme = 1; ++ pthread_create(&thread, NULL, master_service, info); ++} ++ ++/* + * Service an accepted connection (runs as a pthread) + * + * (also called from a couple of other places) + */ +void * +master_service(void *data) +{ ++ hammer2_master_service_info_t *info = data; + hammer2_iocom_t iocom; - int fd; + - fd = (int)(intptr_t)data; - hammer2_iocom_init(&iocom, fd, -1, ++ if (info->detachme) ++ pthread_detach(pthread_self()); ++ ++ hammer2_iocom_init(&iocom, info->fd, -1, + master_auth_signal, + master_auth_rxmsg, + NULL); + hammer2_iocom_core(&iocom); + + fprintf(stderr, + "iocom on fd %d terminated error rx=%d, tx=%d\n", - fd, iocom.ioq_rx.error, iocom.ioq_tx.error); - close(fd); ++ info->fd, iocom.ioq_rx.error, iocom.ioq_tx.error); ++ close(info->fd); ++ info->fd = -1; /* safety */ ++ free(info); + + return (NULL); +} + +/************************************************************************ + * AUTHENTICATION * + ************************************************************************ + * + * Callback via hammer2_iocom_core(). + * + * Additional messaging-based authentication must occur before normal + * message operation. The connection has already been encrypted at + * this point. + */ +static void master_auth_conn_rx(hammer2_msg_t *msg); + +static +void +master_auth_signal(hammer2_router_t *router) +{ + hammer2_msg_t *msg; + + /* + * Transmit LNK_CONN, enabling the SPAN protocol if both sides + * agree. + * + * XXX put additional authentication states here + */ + msg = hammer2_msg_alloc(router, 0, HAMMER2_LNK_CONN | + HAMMER2_MSGF_CREATE, + master_auth_conn_rx, NULL); + snprintf(msg->any.lnk_conn.label, sizeof(msg->any.lnk_conn.label), "*"); + hammer2_msg_write(msg); + + hammer2_router_restate(router, + master_link_signal, + master_link_rxmsg, + NULL); +} + +static +void +master_auth_conn_rx(hammer2_msg_t *msg) +{ + if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) + hammer2_msg_reply(msg, 0); +} + +static +void +master_auth_rxmsg(hammer2_msg_t *msg __unused) +{ +} + +/************************************************************************ + * POST-AUTHENTICATION SERVICE MSGS * + ************************************************************************ + * + * Callback via hammer2_iocom_core(). + */ +static +void +master_link_signal(hammer2_router_t *router) +{ + hammer2_msg_lnk_signal(router); +} + +static +void +master_link_rxmsg(hammer2_msg_t *msg) +{ + hammer2_state_t *state; + uint32_t cmd; + + /* + * If the message state has a function established we just + * call the function, otherwise we call the appropriate + * link-level protocol related to the original command and + * let it sort it out. + * + * Non-transactional one-off messages, on the otherhand, + * might have REPLY set. + */ + state = msg->state; + cmd = state ? state->msg->any.head.cmd : msg->any.head.cmd; + + fprintf(stderr, "service-receive: %s\n", hammer2_msg_str(msg)); + + if (state && state->func) { + assert(state->func != NULL); + state->func(msg); + } else { + switch(cmd & HAMMER2_MSGF_PROTOS) { + case HAMMER2_MSG_PROTO_LNK: + hammer2_msg_lnk(msg); + break; + case HAMMER2_MSG_PROTO_DBG: + hammer2_msg_dbg(msg); + break; + default: + hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP); + break; + } + } +} diff --cc sbin/hammer2/hammer2.h index 5c91865,0000000..ddcbc08 mode 100644,000000..100644 --- a/sbin/hammer2/hammer2.h +++ b/sbin/hammer2/hammer2.h @@@ -1,196 -1,0 +1,203 @@@ +/* + * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved. + * + * This code is derived from software contributed to The DragonFly Project + * by Matthew Dillon + * by Venkatesh Srinivas + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of The DragonFly Project nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific, prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +/* + * Rollup headers for hammer2 utility + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "network.h" + +#define HAMMER2_DEFAULT_DIR "/etc/hammer2" +#define HAMMER2_PATH_REMOTE HAMMER2_DEFAULT_DIR "/remote" + +struct hammer2_idmap { + struct hammer2_idmap *next; + uint32_t ran_beg; /* inclusive */ + uint32_t ran_end; /* inclusive */ +}; + +typedef struct hammer2_idmap hammer2_idmap_t; + ++struct hammer2_master_service_info { ++ int fd; ++ int detachme; ++}; ++ ++typedef struct hammer2_master_service_info hammer2_master_service_info_t; ++ +extern int DebugOpt; +extern int VerboseOpt; +extern int QuietOpt; +extern int NormalExit; + +/* + * Hammer2 command APIs + */ +int hammer2_ioctl_handle(const char *sel_path); +void hammer2_demon(void *(*func)(void *), void *arg); + +int cmd_remote_connect(const char *sel_path, const char *url); +int cmd_remote_disconnect(const char *sel_path, const char *url); +int cmd_remote_status(const char *sel_path, int all_opt); + +int cmd_pfs_getid(const char *sel_path, const char *name, int privateid); +int cmd_pfs_list(const char *sel_path); +int cmd_pfs_create(const char *sel_path, const char *name, + uint8_t pfs_type, const char *uuid_str); +int cmd_pfs_delete(const char *sel_path, const char *name); + +int cmd_service(void); +int cmd_stat(int ac, const char **av); +int cmd_leaf(const char *sel_path); +int cmd_shell(const char *hostname); +int cmd_debugspan(const char *hostname); +int cmd_show(const char *devpath); +int cmd_rsainit(const char *dir_path); +int cmd_rsaenc(const char **keys, int nkeys); +int cmd_rsadec(const char **keys, int nkeys); + +/* + * Msg support functions + */ +void hammer2_bswap_head(hammer2_msg_hdr_t *head); +void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq); +void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq); +void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd, + void (*state_func)(hammer2_router_t *), + void (*rcvmsg_func)(hammer2_msg_t *), + void (*altmsg_func)(hammer2_iocom_t *)); +void hammer2_router_restate(hammer2_router_t *router, + void (*state_func)(hammer2_router_t *), + void (*rcvmsg_func)(hammer2_msg_t *), + void (*altmsg_func)(hammer2_iocom_t *)); +void hammer2_router_signal(hammer2_router_t *router); +void hammer2_iocom_done(hammer2_iocom_t *iocom); +hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router, + size_t aux_size, uint32_t cmd, + void (*func)(hammer2_msg_t *), void *data); +void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error); +void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error); +void hammer2_state_reply(hammer2_state_t *state, uint32_t error); + +void hammer2_msg_free(hammer2_msg_t *msg); + +void hammer2_iocom_core(hammer2_iocom_t *iocom); +hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom); +void hammer2_msg_write(hammer2_msg_t *msg); + +void hammer2_iocom_drain(hammer2_iocom_t *iocom); +void hammer2_iocom_flush1(hammer2_iocom_t *iocom); +void hammer2_iocom_flush2(hammer2_iocom_t *iocom); + +void hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg); +void hammer2_state_free(hammer2_state_t *state); + +hammer2_router_t *hammer2_router_alloc(void); +void hammer2_router_connect(hammer2_router_t *router); +void hammer2_router_disconnect(hammer2_router_t **routerp); + +/* + * Msg protocol functions + */ +void hammer2_msg_lnk_signal(hammer2_router_t *router); +void hammer2_msg_lnk(hammer2_msg_t *msg); +void hammer2_msg_dbg(hammer2_msg_t *msg); + +/* + * Crypto functions + */ +void hammer2_crypto_setup(void); +void hammer2_crypto_negotiate(hammer2_iocom_t *iocom); +void hammer2_crypto_decrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq); +int hammer2_crypto_encrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq, + struct iovec *iov, int n, size_t *nactp); + +/* + * Misc functions + */ +const char *hammer2_time64_to_str(uint64_t htime64, char **strp); +const char *hammer2_uuid_to_str(uuid_t *uuid, char **strp); +const char *hammer2_iptype_to_str(uint8_t type); +const char *hammer2_pfstype_to_str(uint8_t type); +const char *sizetostr(hammer2_off_t size); +const char * hammer2_basecmd_str(uint32_t cmd); +const char *hammer2_msg_str(hammer2_msg_t *msg); +int hammer2_connect(const char *hostname); + +void shell_tree(hammer2_router_t *router, char *cmdbuf); + +void *master_service(void *data); + +void hammer2_msg_debug(hammer2_iocom_t *iocom, hammer2_msg_t *msg); +void router_printf(hammer2_router_t *router, const char *ctl, ...); +void *hammer2_alloc(size_t bytes); +void hammer2_free(void *ptr); diff --cc sbin/hammer2/msg.c index ff38357,0000000..f4cb65c mode 100644,000000..100644 --- a/sbin/hammer2/msg.c +++ b/sbin/hammer2/msg.c @@@ -1,2232 -1,0 +1,2239 @@@ +/* + * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved. + * + * This code is derived from software contributed to The DragonFly Project + * by Matthew Dillon + * by Venkatesh Srinivas + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of The DragonFly Project nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific, prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "hammer2.h" + +static int hammer2_state_msgrx(hammer2_msg_t *msg); +static void hammer2_state_cleanuptx(hammer2_msg_t *msg); + +/* + * ROUTER TREE - Represents available routes for message routing, indexed + * by their spanid. The router structure is embedded in + * either an iocom, h2span_link, or h2span_relay (see msg_lnk.c). + */ +int +hammer2_router_cmp(hammer2_router_t *router1, hammer2_router_t *router2) +{ + if (router1->target < router2->target) + return(-1); + if (router1->target > router2->target) + return(1); + return(0); +} + +RB_GENERATE(hammer2_router_tree, hammer2_router, rbnode, hammer2_router_cmp); + +static pthread_mutex_t router_mtx; +static struct hammer2_router_tree router_ltree = RB_INITIALIZER(router_ltree); +static struct hammer2_router_tree router_rtree = RB_INITIALIZER(router_rtree); + +/* + * STATE TREE - Represents open transactions which are indexed by their + * {router,msgid} relative to the governing iocom. + * + * router is usually iocom->router since state isn't stored + * for relayed messages. + */ +int +hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2) +{ +#if 0 + if (state1->router < state2->router) + return(-1); + if (state1->router > state2->router) + return(1); +#endif + if (state1->msgid < state2->msgid) + return(-1); + if (state1->msgid > state2->msgid) + return(1); + return(0); +} + +RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp); + +/* + * Initialize a low-level ioq + */ +void +hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq) +{ + bzero(ioq, sizeof(*ioq)); + ioq->state = HAMMER2_MSGQ_STATE_HEADER1; + TAILQ_INIT(&ioq->msgq); +} + +/* + * Cleanup queue. + * + * caller holds iocom->mtx. + */ +void +hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq) +{ + hammer2_msg_t *msg; + + while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { + assert(0); /* shouldn't happen */ + TAILQ_REMOVE(&ioq->msgq, msg, qentry); + hammer2_msg_free(msg); + } + if ((msg = ioq->msg) != NULL) { + ioq->msg = NULL; + hammer2_msg_free(msg); + } +} + +/* + * Initialize a low-level communications channel. + * + * NOTE: The signal_func() is called at least once from the loop and can be + * re-armed via hammer2_iocom_restate(). + */ +void +hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd, + void (*signal_func)(hammer2_router_t *), + void (*rcvmsg_func)(hammer2_msg_t *), + void (*altmsg_func)(hammer2_iocom_t *)) +{ ++ struct stat st; ++ + bzero(iocom, sizeof(*iocom)); + + iocom->router = hammer2_router_alloc(); + iocom->router->signal_callback = signal_func; + iocom->router->rcvmsg_callback = rcvmsg_func; + iocom->router->altmsg_callback = altmsg_func; + /* we do not call hammer2_router_connect() for iocom routers */ + + pthread_mutex_init(&iocom->mtx, NULL); + RB_INIT(&iocom->router->staterd_tree); + RB_INIT(&iocom->router->statewr_tree); + TAILQ_INIT(&iocom->freeq); + TAILQ_INIT(&iocom->freeq_aux); + TAILQ_INIT(&iocom->router->txmsgq); + iocom->router->iocom = iocom; + iocom->sock_fd = sock_fd; + iocom->alt_fd = alt_fd; + iocom->flags = HAMMER2_IOCOMF_RREQ; + if (signal_func) + iocom->flags |= HAMMER2_IOCOMF_SWORK; + hammer2_ioq_init(iocom, &iocom->ioq_rx); + hammer2_ioq_init(iocom, &iocom->ioq_tx); + if (pipe(iocom->wakeupfds) < 0) + assert(0); + fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK); + fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK); + + /* + * Negotiate session crypto synchronously. This will mark the - * connection as error'd if it fails. ++ * connection as error'd if it fails. If this is a pipe it's ++ * a linkage that we set up ourselves to the filesystem and there ++ * is no crypto. + */ - hammer2_crypto_negotiate(iocom); ++ if (fstat(sock_fd, &st) < 0) ++ assert(0); ++ if (S_ISSOCK(st.st_mode)) ++ hammer2_crypto_negotiate(iocom); + + /* + * Make sure our fds are set to non-blocking for the iocom core. + */ + if (sock_fd >= 0) + fcntl(sock_fd, F_SETFL, O_NONBLOCK); +#if 0 + /* if line buffered our single fgets() should be fine */ + if (alt_fd >= 0) + fcntl(alt_fd, F_SETFL, O_NONBLOCK); +#endif +} + +/* + * May only be called from a callback from iocom_core. + * + * Adjust state machine functions, set flags to guarantee that both + * the recevmsg_func and the sendmsg_func is called at least once. + */ +void +hammer2_router_restate(hammer2_router_t *router, + void (*signal_func)(hammer2_router_t *), + void (*rcvmsg_func)(hammer2_msg_t *msg), + void (*altmsg_func)(hammer2_iocom_t *)) +{ + router->signal_callback = signal_func; + router->rcvmsg_callback = rcvmsg_func; + router->altmsg_callback = altmsg_func; + if (signal_func) + router->iocom->flags |= HAMMER2_IOCOMF_SWORK; + else + router->iocom->flags &= ~HAMMER2_IOCOMF_SWORK; +} + +void +hammer2_router_signal(hammer2_router_t *router) +{ + if (router->signal_callback) + router->iocom->flags |= HAMMER2_IOCOMF_SWORK; +} + +/* + * Cleanup a terminating iocom. + * + * Caller should not hold iocom->mtx. The iocom has already been disconnected + * from all possible references to it. + */ +void +hammer2_iocom_done(hammer2_iocom_t *iocom) +{ + hammer2_msg_t *msg; + + if (iocom->sock_fd >= 0) { + close(iocom->sock_fd); + iocom->sock_fd = -1; + } + if (iocom->alt_fd >= 0) { + close(iocom->alt_fd); + iocom->alt_fd = -1; + } + hammer2_ioq_done(iocom, &iocom->ioq_rx); + hammer2_ioq_done(iocom, &iocom->ioq_tx); + if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) { + TAILQ_REMOVE(&iocom->freeq, msg, qentry); + free(msg); + } + if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) { + TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); + free(msg->aux_data); + msg->aux_data = NULL; + free(msg); + } + if (iocom->wakeupfds[0] >= 0) { + close(iocom->wakeupfds[0]); + iocom->wakeupfds[0] = -1; + } + if (iocom->wakeupfds[1] >= 0) { + close(iocom->wakeupfds[1]); + iocom->wakeupfds[1] = -1; + } + pthread_mutex_destroy(&iocom->mtx); +} + +/* + * Allocate a new one-way message. + */ +hammer2_msg_t * +hammer2_msg_alloc(hammer2_router_t *router, size_t aux_size, uint32_t cmd, + void (*func)(hammer2_msg_t *), void *data) +{ + hammer2_state_t *state = NULL; + hammer2_iocom_t *iocom = router->iocom; + hammer2_msg_t *msg; + int hbytes; + + pthread_mutex_lock(&iocom->mtx); + if (aux_size) { + aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) & + ~HAMMER2_MSG_ALIGNMASK; + if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) + TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); + } else { + if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) + TAILQ_REMOVE(&iocom->freeq, msg, qentry); + } + if ((cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_REPLY)) == + HAMMER2_MSGF_CREATE) { + /* + * Create state when CREATE is set without REPLY. + * + * NOTE: CREATE in txcmd handled by hammer2_msg_write() + * NOTE: DELETE in txcmd handled by hammer2_state_cleanuptx() + */ + state = malloc(sizeof(*state)); + bzero(state, sizeof(*state)); + state->iocom = iocom; + state->flags = HAMMER2_STATE_DYNAMIC; + state->msgid = (uint64_t)(uintptr_t)state; + state->router = router; + state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE | + HAMMER2_MSGF_DELETE); + state->rxcmd = HAMMER2_MSGF_REPLY; + state->func = func; + state->any.any = data; + pthread_mutex_lock(&iocom->mtx); + RB_INSERT(hammer2_state_tree, + &iocom->router->statewr_tree, + state); + pthread_mutex_unlock(&iocom->mtx); + state->flags |= HAMMER2_STATE_INSERTED; + } + pthread_mutex_unlock(&iocom->mtx); + if (msg == NULL) { + msg = malloc(sizeof(*msg)); + bzero(msg, sizeof(*msg)); + msg->aux_data = NULL; + msg->aux_size = 0; + } + if (msg->aux_size != aux_size) { + if (msg->aux_data) { + free(msg->aux_data); + msg->aux_data = NULL; + msg->aux_size = 0; + } + if (aux_size) { + msg->aux_data = malloc(aux_size); + msg->aux_size = aux_size; + } + } + hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN; + if (hbytes) + bzero(&msg->any.head, hbytes); + msg->hdr_size = hbytes; + msg->any.head.cmd = cmd; + msg->any.head.aux_descr = 0; + msg->any.head.aux_crc = 0; + msg->router = router; + if (state) { + msg->state = state; + state->msg = msg; + msg->any.head.msgid = state->msgid; + } + return (msg); +} + +/* + * Free a message so it can be reused afresh. + * + * NOTE: aux_size can be 0 with a non-NULL aux_data. + */ +static +void +hammer2_msg_free_locked(hammer2_msg_t *msg) +{ + hammer2_iocom_t *iocom = msg->router->iocom; + + msg->state = NULL; + if (msg->aux_data) + TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry); + else + TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry); +} + +void +hammer2_msg_free(hammer2_msg_t *msg) +{ + hammer2_iocom_t *iocom = msg->router->iocom; + + pthread_mutex_lock(&iocom->mtx); + hammer2_msg_free_locked(msg); + pthread_mutex_unlock(&iocom->mtx); +} + +/* + * I/O core loop for an iocom. + * + * Thread localized, iocom->mtx not held. + */ +void +hammer2_iocom_core(hammer2_iocom_t *iocom) +{ + struct pollfd fds[3]; + char dummybuf[256]; + hammer2_msg_t *msg; + int timeout; + int count; + int wi; /* wakeup pipe */ + int si; /* socket */ + int ai; /* alt bulk path socket */ + + while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) { + if ((iocom->flags & (HAMMER2_IOCOMF_RWORK | + HAMMER2_IOCOMF_WWORK | + HAMMER2_IOCOMF_PWORK | + HAMMER2_IOCOMF_SWORK | + HAMMER2_IOCOMF_ARWORK | + HAMMER2_IOCOMF_AWWORK)) == 0) { + /* + * Only poll if no immediate work is pending. + * Otherwise we are just wasting our time calling + * poll. + */ + timeout = 5000; + + count = 0; + wi = -1; + si = -1; + ai = -1; + + /* + * Always check the inter-thread pipe, e.g. + * for iocom->txmsgq work. + */ + wi = count++; + fds[wi].fd = iocom->wakeupfds[0]; + fds[wi].events = POLLIN; + fds[wi].revents = 0; + + /* + * Check the socket input/output direction as + * requested + */ + if (iocom->flags & (HAMMER2_IOCOMF_RREQ | + HAMMER2_IOCOMF_WREQ)) { + si = count++; + fds[si].fd = iocom->sock_fd; + fds[si].events = 0; + fds[si].revents = 0; + + if (iocom->flags & HAMMER2_IOCOMF_RREQ) + fds[si].events |= POLLIN; + if (iocom->flags & HAMMER2_IOCOMF_WREQ) + fds[si].events |= POLLOUT; + } + + /* + * Check the alternative fd for work. + */ + if (iocom->alt_fd >= 0) { + ai = count++; + fds[ai].fd = iocom->alt_fd; + fds[ai].events = POLLIN; + fds[ai].revents = 0; + } + poll(fds, count, timeout); + + if (wi >= 0 && (fds[wi].revents & POLLIN)) + iocom->flags |= HAMMER2_IOCOMF_PWORK; + if (si >= 0 && (fds[si].revents & POLLIN)) + iocom->flags |= HAMMER2_IOCOMF_RWORK; + if (si >= 0 && (fds[si].revents & POLLOUT)) + iocom->flags |= HAMMER2_IOCOMF_WWORK; + if (wi >= 0 && (fds[wi].revents & POLLOUT)) + iocom->flags |= HAMMER2_IOCOMF_WWORK; + if (ai >= 0 && (fds[ai].revents & POLLIN)) + iocom->flags |= HAMMER2_IOCOMF_ARWORK; + } else { + /* + * Always check the pipe + */ + iocom->flags |= HAMMER2_IOCOMF_PWORK; + } + + if (iocom->flags & HAMMER2_IOCOMF_SWORK) { + iocom->flags &= ~HAMMER2_IOCOMF_SWORK; + iocom->router->signal_callback(iocom->router); + } + + /* + * Pending message queues from other threads wake us up + * with a write to the wakeupfds[] pipe. We have to clear + * the pipe with a dummy read. + */ + if (iocom->flags & HAMMER2_IOCOMF_PWORK) { + iocom->flags &= ~HAMMER2_IOCOMF_PWORK; + read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf)); + iocom->flags |= HAMMER2_IOCOMF_RWORK; + iocom->flags |= HAMMER2_IOCOMF_WWORK; + if (TAILQ_FIRST(&iocom->router->txmsgq)) + hammer2_iocom_flush1(iocom); + } + + /* + * Message write sequencing + */ + if (iocom->flags & HAMMER2_IOCOMF_WWORK) + hammer2_iocom_flush1(iocom); + + /* + * Message read sequencing. Run this after the write + * sequencing in case the write sequencing allowed another + * auto-DELETE to occur on the read side. + */ + if (iocom->flags & HAMMER2_IOCOMF_RWORK) { + while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 && + (msg = hammer2_ioq_read(iocom)) != NULL) { + if (DebugOpt) { + fprintf(stderr, "receive %s\n", + hammer2_msg_str(msg)); + } + iocom->router->rcvmsg_callback(msg); + hammer2_state_cleanuprx(iocom, msg); + } + } + + if (iocom->flags & HAMMER2_IOCOMF_ARWORK) { + iocom->flags &= ~HAMMER2_IOCOMF_ARWORK; + iocom->router->altmsg_callback(iocom); + } + } +} + +/* + * Make sure there's enough room in the FIFO to hold the + * needed data. + * + * Assume worst case encrypted form is 2x the size of the + * plaintext equivalent. + */ +static +size_t +hammer2_ioq_makeroom(hammer2_ioq_t *ioq, size_t needed) +{ + size_t bytes; + size_t nmax; + + bytes = ioq->fifo_cdx - ioq->fifo_beg; + nmax = sizeof(ioq->buf) - ioq->fifo_end; + if (bytes + nmax / 2 < needed) { + if (bytes) { + bcopy(ioq->buf + ioq->fifo_beg, + ioq->buf, + bytes); + } + ioq->fifo_cdx -= ioq->fifo_beg; + ioq->fifo_beg = 0; + if (ioq->fifo_cdn < ioq->fifo_end) { + bcopy(ioq->buf + ioq->fifo_cdn, + ioq->buf + ioq->fifo_cdx, + ioq->fifo_end - ioq->fifo_cdn); + } + ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx; + ioq->fifo_cdn = ioq->fifo_cdx; + nmax = sizeof(ioq->buf) - ioq->fifo_end; + } + return(nmax); +} + +/* + * Read the next ready message from the ioq, issuing I/O if needed. + * Caller should retry on a read-event when NULL is returned. + * + * If an error occurs during reception a HAMMER2_LNK_ERROR msg will + * be returned for each open transaction, then the ioq and iocom + * will be errored out and a non-transactional HAMMER2_LNK_ERROR + * msg will be returned as the final message. The caller should not call + * us again after the final message is returned. + * + * Thread localized, iocom->mtx not held. + */ +hammer2_msg_t * +hammer2_ioq_read(hammer2_iocom_t *iocom) +{ + hammer2_ioq_t *ioq = &iocom->ioq_rx; + hammer2_msg_t *msg; + hammer2_msg_hdr_t *head; + hammer2_state_t *state; + ssize_t n; + size_t bytes; + size_t nmax; + uint32_t xcrc32; + int error; + +again: + iocom->flags &= ~(HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_RWORK); + + /* + * If a message is already pending we can just remove and + * return it. Message state has already been processed. + * (currently not implemented) + */ + if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { + TAILQ_REMOVE(&ioq->msgq, msg, qentry); + return (msg); + } + + /* + * If the stream is errored out we stop processing it. + */ + if (ioq->error) + goto skip; + + /* + * Message read in-progress (msg is NULL at the moment). We don't + * allocate a msg until we have its core header. + */ + nmax = sizeof(ioq->buf) - ioq->fifo_end; + bytes = ioq->fifo_cdx - ioq->fifo_beg; /* already decrypted */ + msg = ioq->msg; + + switch(ioq->state) { + case HAMMER2_MSGQ_STATE_HEADER1: + /* + * Load the primary header, fail on any non-trivial read + * error or on EOF. Since the primary header is the same + * size is the message alignment it will never straddle + * the end of the buffer. + */ + nmax = hammer2_ioq_makeroom(ioq, sizeof(msg->any.head)); + if (bytes < sizeof(msg->any.head)) { + n = read(iocom->sock_fd, + ioq->buf + ioq->fifo_end, + nmax); + if (n <= 0) { + if (n == 0) { + ioq->error = HAMMER2_IOQ_ERROR_EOF; + break; + } + if (errno != EINTR && + errno != EINPROGRESS && + errno != EAGAIN) { + ioq->error = HAMMER2_IOQ_ERROR_SOCK; + break; + } + n = 0; + /* fall through */ + } + ioq->fifo_end += (size_t)n; + nmax -= (size_t)n; + } + + /* + * Decrypt data received so far. Data will be decrypted + * in-place but might create gaps in the FIFO. Partial + * blocks are not immediately decrypted. + * + * WARNING! The header might be in the wrong endian, we + * do not fix it up until we get the entire + * extended header. + */ + if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) { + hammer2_crypto_decrypt(iocom, ioq); + } else { + ioq->fifo_cdx = ioq->fifo_end; + ioq->fifo_cdn = ioq->fifo_end; + } + bytes = ioq->fifo_cdx - ioq->fifo_beg; + + /* + * Insufficient data accumulated (msg is NULL, caller will + * retry on event). + */ + assert(msg == NULL); + if (bytes < sizeof(msg->any.head)) + break; + + /* + * Check and fixup the core header. Note that the icrc + * has to be calculated before any fixups, but the crc + * fields in the msg may have to be swapped like everything + * else. + */ + head = (void *)(ioq->buf + ioq->fifo_beg); + if (head->magic != HAMMER2_MSGHDR_MAGIC && + head->magic != HAMMER2_MSGHDR_MAGIC_REV) { + ioq->error = HAMMER2_IOQ_ERROR_SYNC; + break; + } + + /* + * Calculate the full header size and aux data size + */ + if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) { + ioq->hbytes = (bswap32(head->cmd) & HAMMER2_MSGF_SIZE) * + HAMMER2_MSG_ALIGN; + ioq->abytes = bswap32(head->aux_bytes) * + HAMMER2_MSG_ALIGN; + } else { + ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) * + HAMMER2_MSG_ALIGN; + ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN; + } + if (ioq->hbytes < sizeof(msg->any.head) || + ioq->hbytes > sizeof(msg->any) || + ioq->abytes > HAMMER2_MSGAUX_MAX) { + ioq->error = HAMMER2_IOQ_ERROR_FIELD; + break; + } + + /* + * Allocate the message, the next state will fill it in. + */ + msg = hammer2_msg_alloc(iocom->router, ioq->abytes, 0, + NULL, NULL); + ioq->msg = msg; + + /* + * Fall through to the next state. Make sure that the + * extended header does not straddle the end of the buffer. + * We still want to issue larger reads into our buffer, + * book-keeping is easier if we don't bcopy() yet. + * + * Make sure there is enough room for bloated encrypt data. + */ + nmax = hammer2_ioq_makeroom(ioq, ioq->hbytes); + ioq->state = HAMMER2_MSGQ_STATE_HEADER2; + /* fall through */ + case HAMMER2_MSGQ_STATE_HEADER2: + /* + * Fill out the extended header. + */ + assert(msg != NULL); + if (bytes < ioq->hbytes) { + n = read(iocom->sock_fd, + ioq->buf + ioq->fifo_end, + nmax); + if (n <= 0) { + if (n == 0) { + ioq->error = HAMMER2_IOQ_ERROR_EOF; + break; + } + if (errno != EINTR && + errno != EINPROGRESS && + errno != EAGAIN) { + ioq->error = HAMMER2_IOQ_ERROR_SOCK; + break; + } + n = 0; + /* fall through */ + } + ioq->fifo_end += (size_t)n; + nmax -= (size_t)n; + } + + if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) { + hammer2_crypto_decrypt(iocom, ioq); + } else { + ioq->fifo_cdx = ioq->fifo_end; + ioq->fifo_cdn = ioq->fifo_end; + } + bytes = ioq->fifo_cdx - ioq->fifo_beg; + + /* + * Insufficient data accumulated (set msg NULL so caller will + * retry on event). + */ + if (bytes < ioq->hbytes) { + msg = NULL; + break; + } + + /* + * Calculate the extended header, decrypt data received + * so far. Handle endian-conversion for the entire extended + * header. + */ + head = (void *)(ioq->buf + ioq->fifo_beg); + + /* + * Check the CRC. + */ + if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) + xcrc32 = bswap32(head->hdr_crc); + else + xcrc32 = head->hdr_crc; + head->hdr_crc = 0; + if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) { + ioq->error = HAMMER2_IOQ_ERROR_XCRC; + fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n", + xcrc32, hammer2_icrc32(head, ioq->hbytes), + hammer2_msg_str(msg)); + assert(0); + break; + } + head->hdr_crc = xcrc32; + + if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) { + hammer2_bswap_head(head); + } + + /* + * Copy the extended header into the msg and adjust the + * FIFO. + */ + bcopy(head, &msg->any, ioq->hbytes); + + /* + * We are either done or we fall-through. + */ + if (ioq->abytes == 0) { + ioq->fifo_beg += ioq->hbytes; + break; + } + + /* + * Must adjust bytes (and the state) when falling through. + * nmax doesn't change. + */ + ioq->fifo_beg += ioq->hbytes; + bytes -= ioq->hbytes; + ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1; + /* fall through */ + case HAMMER2_MSGQ_STATE_AUXDATA1: + /* + * Copy the partial or complete payload from remaining + * bytes in the FIFO in order to optimize the makeroom call + * in the AUXDATA2 state. We have to fall-through either + * way so we can check the crc. + * + * msg->aux_size tracks our aux data. + */ + if (bytes >= ioq->abytes) { + bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data, + ioq->abytes); + msg->aux_size = ioq->abytes; + ioq->fifo_beg += ioq->abytes; + assert(ioq->fifo_beg <= ioq->fifo_cdx); + assert(ioq->fifo_cdx <= ioq->fifo_cdn); + bytes -= ioq->abytes; + } else if (bytes) { + bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data, + bytes); + msg->aux_size = bytes; + ioq->fifo_beg += bytes; + if (ioq->fifo_cdx < ioq->fifo_beg) + ioq->fifo_cdx = ioq->fifo_beg; + assert(ioq->fifo_beg <= ioq->fifo_cdx); + assert(ioq->fifo_cdx <= ioq->fifo_cdn); + bytes = 0; + } else { + msg->aux_size = 0; + } + ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2; + /* fall through */ + case HAMMER2_MSGQ_STATE_AUXDATA2: + /* + * Make sure there is enough room for more data. + */ + assert(msg); + nmax = hammer2_ioq_makeroom(ioq, ioq->abytes - msg->aux_size); + + /* + * Read and decrypt more of the payload. + */ + if (msg->aux_size < ioq->abytes) { + assert(bytes == 0); + n = read(iocom->sock_fd, + ioq->buf + ioq->fifo_end, + nmax); + if (n <= 0) { + if (n == 0) { + ioq->error = HAMMER2_IOQ_ERROR_EOF; + break; + } + if (errno != EINTR && + errno != EINPROGRESS && + errno != EAGAIN) { + ioq->error = HAMMER2_IOQ_ERROR_SOCK; + break; + } + n = 0; + /* fall through */ + } + ioq->fifo_end += (size_t)n; + nmax -= (size_t)n; + } + + if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) { + hammer2_crypto_decrypt(iocom, ioq); + } else { + ioq->fifo_cdx = ioq->fifo_end; + ioq->fifo_cdn = ioq->fifo_end; + } + bytes = ioq->fifo_cdx - ioq->fifo_beg; + + if (bytes > ioq->abytes - msg->aux_size) + bytes = ioq->abytes - msg->aux_size; + + if (bytes) { + bcopy(ioq->buf + ioq->fifo_beg, + msg->aux_data + msg->aux_size, + bytes); + msg->aux_size += bytes; + ioq->fifo_beg += bytes; + } + + /* + * Insufficient data accumulated (set msg NULL so caller will + * retry on event). + */ + if (msg->aux_size < ioq->abytes) { + msg = NULL; + break; + } + assert(msg->aux_size == ioq->abytes); + + /* + * Check aux_crc, then we are done. + */ + xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size); + if (xcrc32 != msg->any.head.aux_crc) { + ioq->error = HAMMER2_IOQ_ERROR_ACRC; + break; + } + break; + case HAMMER2_MSGQ_STATE_ERROR: + /* + * Continued calls to drain recorded transactions (returning + * a LNK_ERROR for each one), before we return the final + * LNK_ERROR. + */ + assert(msg == NULL); + break; + default: + /* + * We don't double-return errors, the caller should not + * have called us again after getting an error msg. + */ + assert(0); + break; + } + + /* + * Check the message sequence. The iv[] should prevent any + * possibility of a replay but we add this check anyway. + */ + if (msg && ioq->error == 0) { + if ((msg->any.head.salt & 255) != (ioq->seq & 255)) { + ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ; + } else { + ++ioq->seq; + } + } + + /* + * Process transactional state for the message. + */ + if (msg && ioq->error == 0) { + error = hammer2_state_msgrx(msg); + if (error) { + if (error == HAMMER2_IOQ_ERROR_EALREADY) { + hammer2_msg_free(msg); + goto again; + } + ioq->error = error; + } + } + + /* + * Handle error, RREQ, or completion + * + * NOTE: nmax and bytes are invalid at this point, we don't bother + * to update them when breaking out. + */ + if (ioq->error) { +skip: + /* + * An unrecoverable error causes all active receive + * transactions to be terminated with a LNK_ERROR message. + * + * Once all active transactions are exhausted we set the + * iocom ERROR flag and return a non-transactional LNK_ERROR + * message, which should cause master processing loops to + * terminate. + */ + assert(ioq->msg == msg); + if (msg) { + hammer2_msg_free(msg); + ioq->msg = NULL; + } + + /* + * No more I/O read processing + */ + ioq->state = HAMMER2_MSGQ_STATE_ERROR; + + /* + * Simulate a remote LNK_ERROR DELETE msg for any open + * transactions, ending with a final non-transactional + * LNK_ERROR (that the session can detect) when no + * transactions remain. + */ + msg = hammer2_msg_alloc(iocom->router, 0, 0, NULL, NULL); + bzero(&msg->any.head, sizeof(msg->any.head)); + msg->any.head.magic = HAMMER2_MSGHDR_MAGIC; + msg->any.head.cmd = HAMMER2_LNK_ERROR; + msg->any.head.error = ioq->error; + + pthread_mutex_lock(&iocom->mtx); + hammer2_iocom_drain(iocom); + if ((state = RB_ROOT(&iocom->router->staterd_tree)) != NULL) { + /* + * Active remote transactions are still present. + * Simulate the other end sending us a DELETE. + */ + if (state->rxcmd & HAMMER2_MSGF_DELETE) { + hammer2_msg_free(msg); + msg = NULL; + } else { + /*state->txcmd |= HAMMER2_MSGF_DELETE;*/ + msg->state = state; + msg->router = state->router; + msg->any.head.msgid = state->msgid; + msg->any.head.cmd |= HAMMER2_MSGF_ABORT | + HAMMER2_MSGF_DELETE; + } + } else if ((state = RB_ROOT(&iocom->router->statewr_tree)) != + NULL) { + /* + * Active local transactions are still present. + * Simulate the other end sending us a DELETE. + */ + if (state->rxcmd & HAMMER2_MSGF_DELETE) { + hammer2_msg_free(msg); + msg = NULL; + } else { + msg->state = state; + msg->router = state->router; + msg->any.head.msgid = state->msgid; + msg->any.head.cmd |= HAMMER2_MSGF_ABORT | + HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_REPLY; + if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + msg->any.head.cmd |= + HAMMER2_MSGF_CREATE; + } + } + } else { + /* + * No active local or remote transactions remain. + * Generate a final LNK_ERROR and flag EOF. + */ + msg->state = NULL; + iocom->flags |= HAMMER2_IOCOMF_EOF; + fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd); + } + pthread_mutex_unlock(&iocom->mtx); + + /* + * For the iocom error case we want to set RWORK to indicate + * that more messages might be pending. + * + * It is possible to return NULL when there is more work to + * do because each message has to be DELETEd in both + * directions before we continue on with the next (though + * this could be optimized). The transmit direction will + * re-set RWORK. + */ + if (msg) + iocom->flags |= HAMMER2_IOCOMF_RWORK; + } else if (msg == NULL) { + /* + * Insufficient data received to finish building the message, + * set RREQ and return NULL. + * + * Leave ioq->msg intact. + * Leave the FIFO intact. + */ + iocom->flags |= HAMMER2_IOCOMF_RREQ; + } else { + /* + * Return msg. + * + * The fifo has already been advanced past the message. + * Trivially reset the FIFO indices if possible. + * + * clear the FIFO if it is now empty and set RREQ to wait + * for more from the socket. If the FIFO is not empty set + * TWORK to bypass the poll so we loop immediately. + */ + if (ioq->fifo_beg == ioq->fifo_cdx && + ioq->fifo_cdn == ioq->fifo_end) { + iocom->flags |= HAMMER2_IOCOMF_RREQ; + ioq->fifo_cdx = 0; + ioq->fifo_cdn = 0; + ioq->fifo_beg = 0; + ioq->fifo_end = 0; + } else { + iocom->flags |= HAMMER2_IOCOMF_RWORK; + } + ioq->state = HAMMER2_MSGQ_STATE_HEADER1; + ioq->msg = NULL; + } + return (msg); +} + +/* + * Calculate the header and data crc's and write a low-level message to + * the connection. If aux_crc is non-zero the aux_data crc is already + * assumed to have been set. + * + * A non-NULL msg is added to the queue but not necessarily flushed. + * Calling this function with msg == NULL will get a flush going. + * + * Caller must hold iocom->mtx. + */ +void +hammer2_iocom_flush1(hammer2_iocom_t *iocom) +{ + hammer2_ioq_t *ioq = &iocom->ioq_tx; + hammer2_msg_t *msg; + uint32_t xcrc32; + int hbytes; + hammer2_msg_queue_t tmpq; + + iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK); + TAILQ_INIT(&tmpq); + pthread_mutex_lock(&iocom->mtx); + while ((msg = TAILQ_FIRST(&iocom->router->txmsgq)) != NULL) { + TAILQ_REMOVE(&iocom->router->txmsgq, msg, qentry); + TAILQ_INSERT_TAIL(&tmpq, msg, qentry); + } + pthread_mutex_unlock(&iocom->mtx); + + while ((msg = TAILQ_FIRST(&tmpq)) != NULL) { + /* + * Process terminal connection errors. + */ + TAILQ_REMOVE(&tmpq, msg, qentry); + if (ioq->error) { + TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry); + ++ioq->msgcount; + continue; + } + + /* + * Finish populating the msg fields. The salt ensures that + * the iv[] array is ridiculously randomized and we also + * re-seed our PRNG every 32768 messages just to be sure. + */ + msg->any.head.magic = HAMMER2_MSGHDR_MAGIC; + msg->any.head.salt = (random() << 8) | (ioq->seq & 255); + ++ioq->seq; + if ((ioq->seq & 32767) == 0) + srandomdev(); + + /* + * Calculate aux_crc if 0, then calculate hdr_crc. + */ + if (msg->aux_size && msg->any.head.aux_crc == 0) { + assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0); + xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size); + msg->any.head.aux_crc = xcrc32; + } + msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN; + assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0); + + hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) * + HAMMER2_MSG_ALIGN; + msg->any.head.hdr_crc = 0; + msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes); + + /* + * Enqueue the message (the flush codes handles stream + * encryption). + */ + TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry); + ++ioq->msgcount; + } + hammer2_iocom_flush2(iocom); +} + +/* + * Thread localized, iocom->mtx not held by caller. + */ +void +hammer2_iocom_flush2(hammer2_iocom_t *iocom) +{ + hammer2_ioq_t *ioq = &iocom->ioq_tx; + hammer2_msg_t *msg; + ssize_t n; + struct iovec iov[HAMMER2_IOQ_MAXIOVEC]; + size_t nact; + size_t hbytes; + size_t abytes; + size_t hoff; + size_t aoff; + int iovcnt; + + if (ioq->error) { + hammer2_iocom_drain(iocom); + return; + } + + /* + * Pump messages out the connection by building an iovec. + * + * ioq->hbytes/ioq->abytes tracks how much of the first message + * in the queue has been successfully written out, so we can + * resume writing. + */ + iovcnt = 0; + nact = 0; + hoff = ioq->hbytes; + aoff = ioq->abytes; + + TAILQ_FOREACH(msg, &ioq->msgq, qentry) { + hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) * + HAMMER2_MSG_ALIGN; + abytes = msg->aux_size; + assert(hoff <= hbytes && aoff <= abytes); + + if (hoff < hbytes) { + iov[iovcnt].iov_base = (char *)&msg->any.head + hoff; + iov[iovcnt].iov_len = hbytes - hoff; + nact += hbytes - hoff; + ++iovcnt; + if (iovcnt == HAMMER2_IOQ_MAXIOVEC) + break; + } + if (aoff < abytes) { + assert(msg->aux_data != NULL); + iov[iovcnt].iov_base = (char *)msg->aux_data + aoff; + iov[iovcnt].iov_len = abytes - aoff; + nact += abytes - aoff; + ++iovcnt; + if (iovcnt == HAMMER2_IOQ_MAXIOVEC) + break; + } + hoff = 0; + aoff = 0; + } + if (iovcnt == 0) + return; + + /* + * Encrypt and write the data. The crypto code will move the + * data into the fifo and adjust the iov as necessary. If + * encryption is disabled the iov is left alone. + * + * May return a smaller iov (thus a smaller n), with aggregated + * chunks. May reduce nmax to what fits in the FIFO. + * + * This function sets nact to the number of original bytes now + * encrypted, adding to the FIFO some number of bytes that might + * be greater depending on the crypto mechanic. iov[] is adjusted + * to point at the FIFO if necessary. + * + * NOTE: The return value from the writev() is the post-encrypted + * byte count, not the plaintext count. + */ + if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) { + /* + * Make sure the FIFO has a reasonable amount of space + * left (if not completely full). + */ + if (ioq->fifo_beg > sizeof(ioq->buf) / 2 && + sizeof(ioq->buf) - ioq->fifo_end >= HAMMER2_MSG_ALIGN * 2) { + bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, + ioq->fifo_end - ioq->fifo_beg); + ioq->fifo_cdx -= ioq->fifo_beg; + ioq->fifo_cdn -= ioq->fifo_beg; + ioq->fifo_end -= ioq->fifo_beg; + ioq->fifo_beg = 0; + } + + iovcnt = hammer2_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact); + n = writev(iocom->sock_fd, iov, iovcnt); + if (n > 0) { + ioq->fifo_beg += n; + ioq->fifo_cdn += n; + ioq->fifo_cdx += n; + if (ioq->fifo_beg == ioq->fifo_end) { + ioq->fifo_beg = 0; + ioq->fifo_cdn = 0; + ioq->fifo_cdx = 0; + ioq->fifo_end = 0; + } + } + } else { + n = writev(iocom->sock_fd, iov, iovcnt); + if (n > 0) + nact = n; + else + nact = 0; + } + + /* + * Clean out the transmit queue based on what we successfully + * sent (nact is the plaintext count). ioq->hbytes/abytes + * represents the portion of the first message previously sent. + */ + while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { + hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) * + HAMMER2_MSG_ALIGN; + abytes = msg->aux_size; + + if ((size_t)nact < hbytes - ioq->hbytes) { + ioq->hbytes += nact; + nact = 0; + break; + } + nact -= hbytes - ioq->hbytes; + ioq->hbytes = hbytes; + if ((size_t)nact < abytes - ioq->abytes) { + ioq->abytes += nact; + nact = 0; + break; + } + nact -= abytes - ioq->abytes; + + TAILQ_REMOVE(&ioq->msgq, msg, qentry); + --ioq->msgcount; + ioq->hbytes = 0; + ioq->abytes = 0; + + hammer2_state_cleanuptx(msg); + } + assert(nact == 0); + + /* + * Process the return value from the write w/regards to blocking. + */ + if (n < 0) { + if (errno != EINTR && + errno != EINPROGRESS && + errno != EAGAIN) { + /* + * Fatal write error + */ + ioq->error = HAMMER2_IOQ_ERROR_SOCK; + hammer2_iocom_drain(iocom); + } else { + /* + * Wait for socket buffer space + */ + iocom->flags |= HAMMER2_IOCOMF_WREQ; + } + } else { + iocom->flags |= HAMMER2_IOCOMF_WREQ; + } + if (ioq->error) { + hammer2_iocom_drain(iocom); + } +} + +/* + * Kill pending msgs on ioq_tx and adjust the flags such that no more + * write events will occur. We don't kill read msgs because we want + * the caller to pull off our contrived terminal error msg to detect + * the connection failure. + * + * Thread localized, iocom->mtx not held by caller. + */ +void +hammer2_iocom_drain(hammer2_iocom_t *iocom) +{ + hammer2_ioq_t *ioq = &iocom->ioq_tx; + hammer2_msg_t *msg; + + iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK); + ioq->hbytes = 0; + ioq->abytes = 0; + + while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { + TAILQ_REMOVE(&ioq->msgq, msg, qentry); + --ioq->msgcount; + hammer2_state_cleanuptx(msg); + } +} + +/* + * Write a message to an iocom, with additional state processing. + */ +void +hammer2_msg_write(hammer2_msg_t *msg) +{ + hammer2_iocom_t *iocom = msg->router->iocom; + hammer2_state_t *state; + char dummy; + + /* + * Handle state processing, create state if necessary. + */ + pthread_mutex_lock(&iocom->mtx); + if ((state = msg->state) != NULL) { + /* + * Existing transaction (could be reply). It is also + * possible for this to be the first reply (CREATE is set), + * in which case we populate state->txcmd. + * + * state->txcmd is adjusted to hold the final message cmd, + * and we also be sure to set the CREATE bit here. We did + * not set it in hammer2_msg_alloc() because that would have + * not been serialized (state could have gotten ripped out + * from under the message prior to it being transmitted). + */ + if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | + HAMMER2_MSGF_REPLY)) == + HAMMER2_MSGF_CREATE) { + state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + } + msg->any.head.msgid = state->msgid; + assert(((state->txcmd ^ msg->any.head.cmd) & + HAMMER2_MSGF_REPLY) == 0); + if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) + state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + } else { + msg->any.head.msgid = 0; + /* XXX set spanid by router */ + } + msg->any.head.source = 0; + msg->any.head.target = msg->router->target; + + /* + * Queue it for output, wake up the I/O pthread. Note that the + * I/O thread is responsible for generating the CRCs and encryption. + */ + TAILQ_INSERT_TAIL(&iocom->router->txmsgq, msg, qentry); + dummy = 0; + write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */ + pthread_mutex_unlock(&iocom->mtx); +} + +/* + * This is a shortcut to formulate a reply to msg with a simple error code, + * It can reply to and terminate a transaction, or it can reply to a one-way + * messages. A HAMMER2_LNK_ERROR command code is utilized to encode + * the error code (which can be 0). Not all transactions are terminated + * with HAMMER2_LNK_ERROR status (the low level only cares about the + * MSGF_DELETE flag), but most are. + * + * Replies to one-way messages are a bit of an oxymoron but the feature + * is used by the debug (DBG) protocol. + * + * The reply contains no extended data. + */ +void +hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error) +{ + hammer2_iocom_t *iocom = msg->router->iocom; + hammer2_state_t *state = msg->state; + hammer2_msg_t *nmsg; + uint32_t cmd; + + + /* + * Reply with a simple error code and terminate the transaction. + */ + cmd = HAMMER2_LNK_ERROR; + + /* + * Check if our direction has even been initiated yet, set CREATE. + * + * Check what direction this is (command or reply direction). Note + * that txcmd might not have been initiated yet. + * + * If our direction has already been closed we just return without + * doing anything. + */ + if (state) { + if (state->txcmd & HAMMER2_MSGF_DELETE) + return; + if (state->txcmd & HAMMER2_MSGF_REPLY) + cmd |= HAMMER2_MSGF_REPLY; + cmd |= HAMMER2_MSGF_DELETE; + } else { + if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0) + cmd |= HAMMER2_MSGF_REPLY; + } + + /* + * Allocate the message and associate it with the existing state. + * We cannot pass MSGF_CREATE to msg_alloc() because that may + * allocate new state. We have our state already. + */ + nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL); + if (state) { + if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) + nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE; + } + nmsg->any.head.error = error; + nmsg->state = state; + hammer2_msg_write(nmsg); +} + +/* + * Similar to hammer2_msg_reply() but leave the transaction open. That is, + * we are generating a streaming reply or an intermediate acknowledgement + * of some sort as part of the higher level protocol, with more to come + * later. + */ +void +hammer2_msg_result(hammer2_msg_t *msg, uint32_t error) +{ + hammer2_iocom_t *iocom = msg->router->iocom; + hammer2_state_t *state = msg->state; + hammer2_msg_t *nmsg; + uint32_t cmd; + + + /* + * Reply with a simple error code and terminate the transaction. + */ + cmd = HAMMER2_LNK_ERROR; + + /* + * Check if our direction has even been initiated yet, set CREATE. + * + * Check what direction this is (command or reply direction). Note + * that txcmd might not have been initiated yet. + * + * If our direction has already been closed we just return without + * doing anything. + */ + if (state) { + if (state->txcmd & HAMMER2_MSGF_DELETE) + return; + if (state->txcmd & HAMMER2_MSGF_REPLY) + cmd |= HAMMER2_MSGF_REPLY; + /* continuing transaction, do not set MSGF_DELETE */ + } else { + if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0) + cmd |= HAMMER2_MSGF_REPLY; + } + + nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL); + if (state) { + if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) + nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE; + } + nmsg->any.head.error = error; + nmsg->state = state; + hammer2_msg_write(nmsg); +} + +/* + * Terminate a transaction given a state structure by issuing a DELETE. + */ +void +hammer2_state_reply(hammer2_state_t *state, uint32_t error) +{ + hammer2_msg_t *nmsg; + uint32_t cmd = HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE; + + /* + * Nothing to do if we already transmitted a delete + */ + if (state->txcmd & HAMMER2_MSGF_DELETE) + return; + + /* + * Set REPLY if the other end initiated the command. Otherwise + * we are the command direction. + */ + if (state->txcmd & HAMMER2_MSGF_REPLY) + cmd |= HAMMER2_MSGF_REPLY; + + nmsg = hammer2_msg_alloc(state->iocom->router, 0, cmd, NULL, NULL); + if (state) { + if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) + nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE; + } + nmsg->any.head.error = error; + nmsg->state = state; + hammer2_msg_write(nmsg); +} + +/************************************************************************ + * TRANSACTION STATE HANDLING * + ************************************************************************ + * + */ + +/* + * Process state tracking for a message after reception, prior to + * execution. + * + * Called with msglk held and the msg dequeued. + * + * All messages are called with dummy state and return actual state. + * (One-off messages often just return the same dummy state). + * + * May request that caller discard the message by setting *discardp to 1. + * The returned state is not used in this case and is allowed to be NULL. + * + * -- + * + * These routines handle persistent and command/reply message state via the + * CREATE and DELETE flags. The first message in a command or reply sequence + * sets CREATE, the last message in a command or reply sequence sets DELETE. + * + * There can be any number of intermediate messages belonging to the same + * sequence sent inbetween the CREATE message and the DELETE message, + * which set neither flag. This represents a streaming command or reply. + * + * Any command message received with CREATE set expects a reply sequence to + * be returned. Reply sequences work the same as command sequences except the + * REPLY bit is also sent. Both the command side and reply side can + * degenerate into a single message with both CREATE and DELETE set. Note + * that one side can be streaming and the other side not, or neither, or both. + * + * The msgid is unique for the initiator. That is, two sides sending a new + * message can use the same msgid without colliding. + * + * -- + * + * ABORT sequences work by setting the ABORT flag along with normal message + * state. However, ABORTs can also be sent on half-closed messages, that is + * even if the command or reply side has already sent a DELETE, as long as + * the message has not been fully closed it can still send an ABORT+DELETE + * to terminate the half-closed message state. + * + * Since ABORT+DELETEs can race we silently discard ABORT's for message + * state which has already been fully closed. REPLY+ABORT+DELETEs can + * also race, and in this situation the other side might have already + * initiated a new unrelated command with the same message id. Since + * the abort has not set the CREATE flag the situation can be detected + * and the message will also be discarded. + * + * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE]. + * The ABORT request is essentially integrated into the command instead + * of being sent later on. In this situation the command implementation + * detects that CREATE and ABORT are both set (vs ABORT alone) and can + * special-case non-blocking operation for the command. + * + * NOTE! Messages with ABORT set without CREATE or DELETE are considered + * to be mid-stream aborts for command/reply sequences. ABORTs on + * one-way messages are not supported. + * + * NOTE! If a command sequence does not support aborts the ABORT flag is + * simply ignored. + * + * -- + * + * One-off messages (no reply expected) are sent with neither CREATE or DELETE + * set. One-off messages cannot be aborted and typically aren't processed + * by these routines. The REPLY bit can be used to distinguish whether a + * one-off message is a command or reply. For example, one-off replies + * will typically just contain status updates. + */ +static int +hammer2_state_msgrx(hammer2_msg_t *msg) +{ + hammer2_iocom_t *iocom = msg->router->iocom; + hammer2_state_t *state; + hammer2_state_t dummy; + int error; + + /* + * Lock RB tree and locate existing persistent state, if any. + * + * If received msg is a command state is on staterd_tree. + * If received msg is a reply state is on statewr_tree. + */ + dummy.msgid = msg->any.head.msgid; + pthread_mutex_lock(&iocom->mtx); + if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) { + state = RB_FIND(hammer2_state_tree, + &iocom->router->statewr_tree, &dummy); + } else { + state = RB_FIND(hammer2_state_tree, + &iocom->router->staterd_tree, &dummy); + } + msg->state = state; + pthread_mutex_unlock(&iocom->mtx); + + /* + * Short-cut one-off or mid-stream messages (state may be NULL). + */ + if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_ABORT)) == 0) { + return(0); + } + + /* + * Switch on CREATE, DELETE, REPLY, and also handle ABORT from + * inside the case statements. + */ + switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_REPLY)) { + case HAMMER2_MSGF_CREATE: + case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * New persistant command received. + */ + if (state) { + fprintf(stderr, "duplicate-trans %s\n", + hammer2_msg_str(msg)); + error = HAMMER2_IOQ_ERROR_TRANS; + assert(0); + break; + } + state = malloc(sizeof(*state)); + bzero(state, sizeof(*state)); + state->iocom = iocom; + state->flags = HAMMER2_STATE_DYNAMIC; + state->msg = msg; + state->txcmd = HAMMER2_MSGF_REPLY; + state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + state->flags |= HAMMER2_STATE_INSERTED; + state->msgid = msg->any.head.msgid; + state->router = msg->router; + msg->state = state; + pthread_mutex_lock(&iocom->mtx); + RB_INSERT(hammer2_state_tree, + &iocom->router->staterd_tree, state); + pthread_mutex_unlock(&iocom->mtx); + error = 0; + if (DebugOpt) { + fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n", + state, (uint32_t)state->msgid, iocom); + } + break; + case HAMMER2_MSGF_DELETE: + /* + * Persistent state is expected but might not exist if an + * ABORT+DELETE races the close. + */ + if (state == NULL) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + fprintf(stderr, "missing-state %s\n", + hammer2_msg_str(msg)); + error = HAMMER2_IOQ_ERROR_TRANS; + assert(0); + } + break; + } + + /* + * Handle another ABORT+DELETE case if the msgid has already + * been reused. + */ + if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + fprintf(stderr, "reused-state %s\n", + hammer2_msg_str(msg)); + error = HAMMER2_IOQ_ERROR_TRANS; + assert(0); + } + break; + } + error = 0; + break; + default: + /* + * Check for mid-stream ABORT command received, otherwise + * allow. + */ + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + if (state == NULL || + (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + error = HAMMER2_IOQ_ERROR_EALREADY; + break; + } + } + error = 0; + break; + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE: + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * When receiving a reply with CREATE set the original + * persistent state message should already exist. + */ + if (state == NULL) { + fprintf(stderr, "no-state(r) %s\n", + hammer2_msg_str(msg)); + error = HAMMER2_IOQ_ERROR_TRANS; + assert(0); + break; + } + assert(((state->rxcmd ^ msg->any.head.cmd) & + HAMMER2_MSGF_REPLY) == 0); + state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + error = 0; + break; + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE: + /* + * Received REPLY+ABORT+DELETE in case where msgid has + * already been fully closed, ignore the message. + */ + if (state == NULL) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + fprintf(stderr, "no-state(r,d) %s\n", + hammer2_msg_str(msg)); + error = HAMMER2_IOQ_ERROR_TRANS; + assert(0); + } + break; + } + + /* + * Received REPLY+ABORT+DELETE in case where msgid has + * already been reused for an unrelated message, + * ignore the message. + */ + if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + fprintf(stderr, "reused-state(r,d) %s\n", + hammer2_msg_str(msg)); + error = HAMMER2_IOQ_ERROR_TRANS; + assert(0); + } + break; + } + error = 0; + break; + case HAMMER2_MSGF_REPLY: + /* + * Check for mid-stream ABORT reply received to sent command. + */ + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + if (state == NULL || + (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + error = HAMMER2_IOQ_ERROR_EALREADY; + break; + } + } + error = 0; + break; + } + return (error); +} + +void +hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) +{ + hammer2_state_t *state; + + if ((state = msg->state) == NULL) { + /* + * Free a non-transactional message, there is no state + * to worry about. + */ + hammer2_msg_free(msg); + } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { + /* + * Message terminating transaction, destroy the related + * state, the original message, and this message (if it + * isn't the original message due to a CREATE|DELETE). + */ + pthread_mutex_lock(&iocom->mtx); + state->rxcmd |= HAMMER2_MSGF_DELETE; + if (state->txcmd & HAMMER2_MSGF_DELETE) { + if (state->msg == msg) + state->msg = NULL; + assert(state->flags & HAMMER2_STATE_INSERTED); + if (state->rxcmd & HAMMER2_MSGF_REPLY) { + assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY); + RB_REMOVE(hammer2_state_tree, + &iocom->router->statewr_tree, state); + } else { + assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); + RB_REMOVE(hammer2_state_tree, + &iocom->router->staterd_tree, state); + } + state->flags &= ~HAMMER2_STATE_INSERTED; + hammer2_state_free(state); + } else { + ; + } + pthread_mutex_unlock(&iocom->mtx); + hammer2_msg_free(msg); + } else if (state->msg != msg) { + /* + * Message not terminating transaction, leave state intact + * and free message if it isn't the CREATE message. + */ + hammer2_msg_free(msg); + } +} + +static void +hammer2_state_cleanuptx(hammer2_msg_t *msg) +{ + hammer2_iocom_t *iocom = msg->router->iocom; + hammer2_state_t *state; + + if ((state = msg->state) == NULL) { + hammer2_msg_free(msg); + } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { + pthread_mutex_lock(&iocom->mtx); + state->txcmd |= HAMMER2_MSGF_DELETE; + if (state->rxcmd & HAMMER2_MSGF_DELETE) { + if (state->msg == msg) + state->msg = NULL; + assert(state->flags & HAMMER2_STATE_INSERTED); + if (state->txcmd & HAMMER2_MSGF_REPLY) { + assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY); + RB_REMOVE(hammer2_state_tree, + &iocom->router->staterd_tree, state); + } else { + assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); + RB_REMOVE(hammer2_state_tree, + &iocom->router->statewr_tree, state); + } + state->flags &= ~HAMMER2_STATE_INSERTED; + hammer2_state_free(state); + } else { + ; + } + pthread_mutex_unlock(&iocom->mtx); + hammer2_msg_free(msg); + } else if (state->msg != msg) { + hammer2_msg_free(msg); + } +} + +/* + * Called with iocom locked + */ +void +hammer2_state_free(hammer2_state_t *state) +{ + hammer2_iocom_t *iocom = state->iocom; + hammer2_msg_t *msg; + char dummy; + + if (DebugOpt) { + fprintf(stderr, "terminate state %p id=%08x\n", + state, (uint32_t)state->msgid); + } + assert(state->any.any == NULL); + msg = state->msg; + state->msg = NULL; + if (msg) + hammer2_msg_free_locked(msg); + free(state); + + /* + * When an iocom error is present we are trying to close down the + * iocom, but we have to wait for all states to terminate before + * we can do so. The iocom rx code will terminate the receive side + * for all transactions by simulating incoming DELETE messages, + * but the state doesn't go away until both sides are terminated. + * + * We may have to wake up the rx code. + */ + if (iocom->ioq_rx.error && + RB_EMPTY(&iocom->router->staterd_tree) && + RB_EMPTY(&iocom->router->statewr_tree)) { + dummy = 0; + write(iocom->wakeupfds[1], &dummy, 1); + } +} + +/************************************************************************ + * ROUTING * + ************************************************************************ + * + * Incoming messages are routed by their spanid, matched up against + * outgoing LNK_SPANs managed by h2span_relay structures (see msg_lnk.c). + * Any replies run through the same router. + * + * Originated messages are routed by their spanid, matched up against + * incoming LNK_SPANs managed by h2span_link structures (see msg_lnk.c). + * Replies come back through the same route. + * + * Keep in mind that ALL MESSAGE TRAFFIC pertaining to a particular + * transaction runs through the same route. Commands and replies both. + * + * An originated message will use a different routing spanid to + * reach a target node than a message which originates from that node. + * They might use the same physical pipes (each pipe can have multiple + * SPANs and RELAYs), but the routes are distinct from the perspective + * of the router. + */ +hammer2_router_t * +hammer2_router_alloc(void) +{ + hammer2_router_t *router; + + router = hammer2_alloc(sizeof(*router)); + TAILQ_INIT(&router->txmsgq); + return (router); +} + +void +hammer2_router_connect(hammer2_router_t *router) +{ + hammer2_router_t *tmp; + + assert(router->link || router->relay); + assert((router->flags & HAMMER2_ROUTER_CONNECTED) == 0); + + pthread_mutex_lock(&router_mtx); + if (router->link) + tmp = RB_INSERT(hammer2_router_tree, &router_ltree, router); + else + tmp = RB_INSERT(hammer2_router_tree, &router_rtree, router); + assert(tmp == NULL); + router->flags |= HAMMER2_ROUTER_CONNECTED; + pthread_mutex_unlock(&router_mtx); +} + +void +hammer2_router_disconnect(hammer2_router_t **routerp) +{ + hammer2_router_t *router; + + router = *routerp; + assert(router->link || router->relay); + assert(router->flags & HAMMER2_ROUTER_CONNECTED); + + pthread_mutex_lock(&router_mtx); + if (router->link) + RB_REMOVE(hammer2_router_tree, &router_ltree, router); + else + RB_REMOVE(hammer2_router_tree, &router_rtree, router); + router->flags &= ~HAMMER2_ROUTER_CONNECTED; + *routerp = NULL; + pthread_mutex_unlock(&router_mtx); +} + +#if 0 +/* + * XXX + */ +hammer2_router_t * +hammer2_route_msg(hammer2_msg_t *msg) +{ +} +#endif + +/************************************************************************ + * DEBUGGING * + ************************************************************************/ + +const char * +hammer2_basecmd_str(uint32_t cmd) +{ + static char buf[64]; + char protobuf[32]; + char cmdbuf[32]; + const char *protostr; + const char *cmdstr; + + switch(cmd & HAMMER2_MSGF_PROTOS) { + case HAMMER2_MSG_PROTO_LNK: + protostr = "LNK_"; + break; + case HAMMER2_MSG_PROTO_DBG: + protostr = "DBG_"; + break; + case HAMMER2_MSG_PROTO_DOM: + protostr = "DOM_"; + break; + case HAMMER2_MSG_PROTO_CAC: + protostr = "CAC_"; + break; + case HAMMER2_MSG_PROTO_QRM: + protostr = "QRM_"; + break; + case HAMMER2_MSG_PROTO_BLK: + protostr = "BLK_"; + break; + case HAMMER2_MSG_PROTO_VOP: + protostr = "VOP_"; + break; + default: + snprintf(protobuf, sizeof(protobuf), "%x_", + (cmd & HAMMER2_MSGF_PROTOS) >> 20); + protostr = protobuf; + break; + } + + switch(cmd & (HAMMER2_MSGF_PROTOS | + HAMMER2_MSGF_CMDS | + HAMMER2_MSGF_SIZE)) { + case HAMMER2_LNK_PAD: + cmdstr = "PAD"; + break; + case HAMMER2_LNK_PING: + cmdstr = "PING"; + break; + case HAMMER2_LNK_AUTH: + cmdstr = "AUTH"; + break; + case HAMMER2_LNK_CONN: + cmdstr = "CONN"; + break; + case HAMMER2_LNK_SPAN: + cmdstr = "SPAN"; + break; + case HAMMER2_LNK_VOLCONF: + cmdstr = "VOLCONF"; + break; + case HAMMER2_LNK_ERROR: + if (cmd & HAMMER2_MSGF_DELETE) + cmdstr = "RETURN"; + else + cmdstr = "RESULT"; + break; + case HAMMER2_DBG_SHELL: + cmdstr = "SHELL"; + break; + default: + snprintf(cmdbuf, sizeof(cmdbuf), + "%06x", (cmd & (HAMMER2_MSGF_PROTOS | + HAMMER2_MSGF_CMDS | + HAMMER2_MSGF_SIZE))); + cmdstr = cmdbuf; + break; + } + snprintf(buf, sizeof(buf), "%s%s", protostr, cmdstr); + return (buf); +} + +const char * +hammer2_msg_str(hammer2_msg_t *msg) +{ + hammer2_state_t *state; + static char buf[256]; + char errbuf[16]; + char statebuf[64]; + char flagbuf[64]; + const char *statestr; + const char *errstr; + uint32_t basecmd; + int i; + + /* + * Parse the state + */ + if ((state = msg->state) != NULL) { + basecmd = (state->rxcmd & HAMMER2_MSGF_REPLY) ? + state->txcmd : state->rxcmd; + snprintf(statebuf, sizeof(statebuf), + " %s=%s,L=%s%s,R=%s%s", + ((state->txcmd & HAMMER2_MSGF_REPLY) ? + "rcvcmd" : "sndcmd"), + hammer2_basecmd_str(basecmd), + ((state->txcmd & HAMMER2_MSGF_CREATE) ? "C" : ""), + ((state->txcmd & HAMMER2_MSGF_DELETE) ? "D" : ""), + ((state->rxcmd & HAMMER2_MSGF_CREATE) ? "C" : ""), + ((state->rxcmd & HAMMER2_MSGF_DELETE) ? "D" : "") + ); + statestr = statebuf; + } else { + statestr = ""; + } + + /* + * Parse the error + */ + switch(msg->any.head.error) { + case 0: + errstr = ""; + break; + case HAMMER2_IOQ_ERROR_SYNC: + errstr = "err=IOQ:NOSYNC"; + break; + case HAMMER2_IOQ_ERROR_EOF: + errstr = "err=IOQ:STREAMEOF"; + break; + case HAMMER2_IOQ_ERROR_SOCK: + errstr = "err=IOQ:SOCKERR"; + break; + case HAMMER2_IOQ_ERROR_FIELD: + errstr = "err=IOQ:BADFIELD"; + break; + case HAMMER2_IOQ_ERROR_HCRC: + errstr = "err=IOQ:BADHCRC"; + break; + case HAMMER2_IOQ_ERROR_XCRC: + errstr = "err=IOQ:BADXCRC"; + break; + case HAMMER2_IOQ_ERROR_ACRC: + errstr = "err=IOQ:BADACRC"; + break; + case HAMMER2_IOQ_ERROR_STATE: + errstr = "err=IOQ:BADSTATE"; + break; + case HAMMER2_IOQ_ERROR_NOPEER: + errstr = "err=IOQ:PEERCONFIG"; + break; + case HAMMER2_IOQ_ERROR_NORKEY: + errstr = "err=IOQ:BADRKEY"; + break; + case HAMMER2_IOQ_ERROR_NOLKEY: + errstr = "err=IOQ:BADLKEY"; + break; + case HAMMER2_IOQ_ERROR_KEYXCHGFAIL: + errstr = "err=IOQ:BADKEYXCHG"; + break; + case HAMMER2_IOQ_ERROR_KEYFMT: + errstr = "err=IOQ:BADFMT"; + break; + case HAMMER2_IOQ_ERROR_BADURANDOM: + errstr = "err=IOQ:BADRANDOM"; + break; + case HAMMER2_IOQ_ERROR_MSGSEQ: + errstr = "err=IOQ:BADSEQ"; + break; + case HAMMER2_IOQ_ERROR_EALREADY: + errstr = "err=IOQ:DUPMSG"; + break; + case HAMMER2_IOQ_ERROR_TRANS: + errstr = "err=IOQ:BADTRANS"; + break; + case HAMMER2_IOQ_ERROR_IVWRAP: + errstr = "err=IOQ:IVWRAP"; + break; + case HAMMER2_IOQ_ERROR_MACFAIL: + errstr = "err=IOQ:MACFAIL"; + break; + case HAMMER2_IOQ_ERROR_ALGO: + errstr = "err=IOQ:ALGOFAIL"; + break; + case HAMMER2_MSG_ERR_NOSUPP: + errstr = "err=NOSUPPORT"; + break; + default: + snprintf(errbuf, sizeof(errbuf), + " err=%d", msg->any.head.error); + errstr = errbuf; + break; + } + + /* + * Message flags + */ + i = 0; + if (msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_ABORT | HAMMER2_MSGF_REPLY)) { + flagbuf[i++] = '|'; + if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) + flagbuf[i++] = 'C'; + if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) + flagbuf[i++] = 'D'; + if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) + flagbuf[i++] = 'R'; + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) + flagbuf[i++] = 'A'; + } + flagbuf[i] = 0; + + /* + * Generate the buf + */ + snprintf(buf, sizeof(buf), + "msg=%s%s %s id=%08x src=%08x tgt=%08x %s", + hammer2_basecmd_str(msg->any.head.cmd), + flagbuf, + errstr, + (uint32_t)(intmax_t)msg->any.head.msgid, /* for brevity */ + (uint32_t)(intmax_t)msg->any.head.source, /* for brevity */ + (uint32_t)(intmax_t)msg->any.head.target, /* for brevity */ + statestr); + + return(buf); +} diff --cc sbin/hammer2/msg_lnk.c index 91b7dfe,0000000..cb5afb7 mode 100644,000000..100644 --- a/sbin/hammer2/msg_lnk.c +++ b/sbin/hammer2/msg_lnk.c @@@ -1,1252 -1,0 +1,1257 @@@ +/* + * Copyright (c) 2012 The DragonFly Project. All rights reserved. + * + * This code is derived from software contributed to The DragonFly Project + * by Matthew Dillon + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of The DragonFly Project nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific, prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +/* + * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS + * + * This code supports the LNK_SPAN protocol. Essentially all PFS's + * clients and services rendezvous with the userland hammer2 service and + * open LNK_SPAN transactions using a message header linkid of 0, + * registering any PFS's they have connectivity to with us. + * + * -- + * + * Each registration maintains its own open LNK_SPAN message transaction. + * The SPANs are collected, aggregated, and retransmitted over available + * connections through the maintainance of additional LNK_SPAN message + * transactions on each link. + * + * The msgid for each active LNK_SPAN transaction we receive allows us to + * send a message to the target PFS (which might be one of many belonging + * to the same cluster), by specifying that msgid as the linkid in any + * message we send to the target PFS. + * + * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit + * (and remember we will maintain multiple open LNK_SPAN transactions on + * each connection representing the topology span, so every node sees every + * other node as a separate open transaction). So, similarly the msgid for + * these active transactions which we initiated can be used by the other + * end to route messages through us to another node, ultimately winding up + * at the identified hammer2 PFS. We have to adjust the spanid in the message + * header at each hop to be representative of the outgoing LNK_SPAN we + * are forwarding the message through. + * + * -- + * + * If we were to retransmit every LNK_SPAN transaction we receive it would + * create a huge mess, so we have to aggregate all received LNK_SPAN + * transactions, sort them by the fsid (the cluster) and sub-sort them by + * the pfs_fsid (individual nodes in the cluster), and only retransmit + * (create outgoing transactions) for a subset of the nearest distance-hops + * for each individual node. + * + * The higher level protocols can then issue transactions to the nodes making + * up a cluster to perform all actions required. + * + * -- + * + * Since this is a large topology and a spanning tree protocol, links can + * go up and down all the time. Any time a link goes down its transaction + * is closed. The transaction has to be closed on both ends before we can + * delete (and potentially reuse) the related spanid. The LNK_SPAN being + * closed may have been propagated out to other connections and those related + * LNK_SPANs are also closed. Ultimately all routes via the lost LNK_SPAN + * go away, ultimately reaching all sources and all targets. + * + * Any messages in-transit using a route that goes away will be thrown away. + * Open transactions are only tracked at the two end-points. When a link + * failure propagates to an end-point the related open transactions lose + * their spanid and are automatically aborted. + * + * It is important to note that internal route nodes cannot just associate + * a lost LNK_SPAN transaction with another route to the same destination. + * Message transactions MUST be serialized and MUST be ordered. All messages + * for a transaction must run over the same route. So if the route used by + * an active transaction is lost, the related messages will be fully aborted + * and the higher protocol levels will retry as appropriate. + * + * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation + * back to the originator. Only the originator keeps tracks of a message. + * Routers just pass it through. If a route is lost during transit the + * message is simply thrown away. + * + * It is also important to note that several paths to the same PFS can be + * propagated along the same link, which allows concurrency and even + * redundancy over several network interfaces or via different routes through + * the topology. Any given transaction will use only a single route but busy + * servers will often have hundreds of transactions active simultaniously, + * so having multiple active paths through the network topology for A<->B + * will improve performance. + * + * -- + * + * Most protocols consolidate operations rather than simply relaying them. + * This is particularly true of LEAF protocols (such as strict HAMMER2 + * clients), of which there can be millions connecting into the cluster at + * various points. The SPAN protocol is not used for these LEAF elements. + * + * Instead the primary service they connect to implements a proxy for the + * client protocols so the core topology only has to propagate a couple of + * LNK_SPANs and not millions. LNK_SPANs are meant to be used only for + * core master nodes and satellite slaves and cache nodes. + */ + +#include "hammer2.h" + +/* + * Maximum spanning tree distance. This has the practical effect of + * stopping tail-chasing closed loops when a feeder span is lost. + */ +#define HAMMER2_SPAN_MAXDIST 16 + +/* + * RED-BLACK TREE DEFINITIONS + * + * We need to track: + * + * (1) shared fsid's (a cluster). + * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions. + * + * We need to aggegate all active LNK_SPANs, aggregate, and create our own + * outgoing LNK_SPAN transactions on each of our connections representing + * the aggregated state. + * + * h2span_connect - list of iocom connections who wish to receive SPAN + * propagation from other connections. Might contain + * a filter string. Only iocom's with an open + * LNK_CONN transactions are applicable for SPAN + * propagation. + * + * h2span_relay - List of links relayed (via SPAN). Essentially + * each relay structure represents a LNK_SPAN + * transaction that we initiated, verses h2span_link + * which is a LNK_SPAN transaction that we received. + * + * -- + * + * h2span_cluster - Organizes the shared fsid's. One structure for + * each cluster. + * + * h2span_node - Organizes the nodes in a cluster. One structure + * for each unique {cluster,node}, aka {fsid, pfs_fsid}. + * + * h2span_link - Organizes all incoming and outgoing LNK_SPAN message + * transactions related to a node. + * + * One h2span_link structure for each incoming LNK_SPAN + * transaction. Links selected for propagation back + * out are also where the outgoing LNK_SPAN messages + * are indexed into (so we can propagate changes). + * + * The h2span_link's use a red-black tree to sort the + * distance hop metric for the incoming LNK_SPAN. We + * then select the top N for outgoing. When the + * topology changes the top N may also change and cause + * new outgoing LNK_SPAN transactions to be opened + * and less desireable ones to be closed, causing + * transactional aborts within the message flow in + * the process. + * + * Also note - All outgoing LNK_SPAN message transactions are also + * entered into a red-black tree for use by the routing + * function. This is handled by msg.c in the state + * code, not here. + */ + +struct h2span_link; +struct h2span_relay; +TAILQ_HEAD(h2span_media_queue, h2span_media); +TAILQ_HEAD(h2span_connect_queue, h2span_connect); +TAILQ_HEAD(h2span_relay_queue, h2span_relay); + +RB_HEAD(h2span_cluster_tree, h2span_cluster); +RB_HEAD(h2span_node_tree, h2span_node); +RB_HEAD(h2span_link_tree, h2span_link); +RB_HEAD(h2span_relay_tree, h2span_relay); + +/* + * This represents a media + */ +struct h2span_media { + TAILQ_ENTRY(h2span_media) entry; + uuid_t mediaid; + int refs; + struct h2span_media_config { + hammer2_copy_data_t copy_run; + hammer2_copy_data_t copy_pend; + pthread_t thread; + pthread_cond_t cond; + int ctl; + int fd; + hammer2_iocom_t iocom; + pthread_t iocom_thread; + enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state; + } config[HAMMER2_COPYID_COUNT]; +}; + +typedef struct h2span_media_config h2span_media_config_t; + +#define H2CONFCTL_STOP 0x00000001 +#define H2CONFCTL_UPDATE 0x00000002 + +/* + * Received LNK_CONN transaction enables SPAN protocol over connection. + * (may contain filter). Typically one for each mount and several may + * share the same media. + */ +struct h2span_connect { + TAILQ_ENTRY(h2span_connect) entry; + struct h2span_relay_tree tree; + struct h2span_media *media; + hammer2_state_t *state; +}; + +/* + * All received LNK_SPANs are organized by cluster (pfs_clid), + * node (pfs_fsid), and link (received LNK_SPAN transaction). + */ +struct h2span_cluster { + RB_ENTRY(h2span_cluster) rbnode; + struct h2span_node_tree tree; + uuid_t pfs_clid; /* shared fsid */ + int refs; /* prevents destruction */ +}; + +struct h2span_node { + RB_ENTRY(h2span_node) rbnode; + struct h2span_link_tree tree; + struct h2span_cluster *cls; + uuid_t pfs_fsid; /* unique fsid */ + char label[64]; +}; + +struct h2span_link { + RB_ENTRY(h2span_link) rbnode; + hammer2_state_t *state; /* state<->link */ + struct h2span_node *node; /* related node */ + int32_t dist; + struct h2span_relay_queue relayq; /* relay out */ + struct hammer2_router *router; /* route out this link */ +}; + +/* + * Any LNK_SPAN transactions we receive which are relayed out other + * connections utilize this structure to track the LNK_SPAN transaction + * we initiate on the other connections, if selected for relay. + * + * In many respects this is the core of the protocol... actually figuring + * out what LNK_SPANs to relay. The spanid used for relaying is the + * address of the 'state' structure, which is why h2span_relay has to + * be entered into a RB-TREE based at h2span_connect (so we can look + * up the spanid to validate it). + * + * NOTE: Messages can be received via the LNK_SPAN transaction the + * relay maintains, and can be replied via relay->router, but + * messages are NOT initiated via a relay. Messages are initiated + * via incoming links (h2span_link's). + * + * relay->link represents the link being relayed, NOT the LNK_SPAN + * transaction the relay is holding open. + */ +struct h2span_relay { + RB_ENTRY(h2span_relay) rbnode; /* from h2span_connect */ + TAILQ_ENTRY(h2span_relay) entry; /* from link */ + struct h2span_connect *conn; + hammer2_state_t *state; /* transmitted LNK_SPAN */ + struct h2span_link *link; /* LNK_SPAN being relayed */ + struct hammer2_router *router;/* route out this relay */ +}; + + +typedef struct h2span_media h2span_media_t; +typedef struct h2span_connect h2span_connect_t; +typedef struct h2span_cluster h2span_cluster_t; +typedef struct h2span_node h2span_node_t; +typedef struct h2span_link h2span_link_t; +typedef struct h2span_relay h2span_relay_t; + +static +int +h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2) +{ + return(uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL)); +} + +static +int +h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2) +{ + return(uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL)); +} + +/* + * Sort/subsort must match h2span_relay_cmp() under any given node + * to make the aggregation algorithm easier, so the best links are + * in the same sorted order as the best relays. + * + * NOTE: We cannot use link*->state->msgid because this msgid is created + * by each remote host and thus might wind up being the same. + */ +static +int +h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2) +{ + if (link1->dist < link2->dist) + return(-1); + if (link1->dist > link2->dist) + return(1); +#if 1 + if ((uintptr_t)link1->state < (uintptr_t)link2->state) + return(-1); + if ((uintptr_t)link1->state > (uintptr_t)link2->state) + return(1); +#else + if (link1->state->msgid < link2->state->msgid) + return(-1); + if (link1->state->msgid > link2->state->msgid) + return(1); +#endif + return(0); +} + +/* + * Relay entries are sorted by node, subsorted by distance and link + * address (so we can match up the conn->tree relay topology with + * a node's link topology). + */ +static +int +h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2) +{ + h2span_link_t *link1 = relay1->link; + h2span_link_t *link2 = relay2->link; + + if ((intptr_t)link1->node < (intptr_t)link2->node) + return(-1); + if ((intptr_t)link1->node > (intptr_t)link2->node) + return(1); + if (link1->dist < link2->dist) + return(-1); + if (link1->dist > link2->dist) + return(1); +#if 1 + if ((uintptr_t)link1->state < (uintptr_t)link2->state) + return(-1); + if ((uintptr_t)link1->state > (uintptr_t)link2->state) + return(1); +#else + if (link1->state->msgid < link2->state->msgid) + return(-1); + if (link1->state->msgid > link2->state->msgid) + return(1); +#endif + return(0); +} + +RB_PROTOTYPE_STATIC(h2span_cluster_tree, h2span_cluster, + rbnode, h2span_cluster_cmp); +RB_PROTOTYPE_STATIC(h2span_node_tree, h2span_node, + rbnode, h2span_node_cmp); +RB_PROTOTYPE_STATIC(h2span_link_tree, h2span_link, + rbnode, h2span_link_cmp); +RB_PROTOTYPE_STATIC(h2span_relay_tree, h2span_relay, + rbnode, h2span_relay_cmp); + +RB_GENERATE_STATIC(h2span_cluster_tree, h2span_cluster, + rbnode, h2span_cluster_cmp); +RB_GENERATE_STATIC(h2span_node_tree, h2span_node, + rbnode, h2span_node_cmp); +RB_GENERATE_STATIC(h2span_link_tree, h2span_link, + rbnode, h2span_link_cmp); +RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay, + rbnode, h2span_relay_cmp); + +/* + * Global mutex protects cluster_tree lookups, connq, mediaq. + */ +static pthread_mutex_t cluster_mtx; +static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree); +static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq); +static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq); + +static void hammer2_lnk_span(hammer2_msg_t *msg); +static void hammer2_lnk_conn(hammer2_msg_t *msg); +static void hammer2_lnk_relay(hammer2_msg_t *msg); +static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node); +static void hammer2_relay_delete(h2span_relay_t *relay); + +static void *hammer2_volconf_thread(void *info); +static void hammer2_volconf_stop(h2span_media_config_t *conf); +static void hammer2_volconf_start(h2span_media_config_t *conf, + const char *hostname); + +void +hammer2_msg_lnk_signal(hammer2_router_t *router __unused) +{ + pthread_mutex_lock(&cluster_mtx); + hammer2_relay_scan(NULL, NULL); + pthread_mutex_unlock(&cluster_mtx); +} + +/* + * Receive a HAMMER2_MSG_PROTO_LNK message. This only called for + * one-way and opening-transactions since state->func will be assigned + * in all other cases. + */ +void +hammer2_msg_lnk(hammer2_msg_t *msg) +{ + switch(msg->any.head.cmd & HAMMER2_MSGF_BASECMDMASK) { + case HAMMER2_LNK_CONN: + hammer2_lnk_conn(msg); + break; + case HAMMER2_LNK_SPAN: + hammer2_lnk_span(msg); + break; + default: + fprintf(stderr, + "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd); + hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP); + /* state invalid after reply */ + break; + } +} + +void +hammer2_lnk_conn(hammer2_msg_t *msg) +{ + hammer2_state_t *state = msg->state; + h2span_media_t *media; + h2span_media_config_t *conf; + h2span_connect_t *conn; + h2span_relay_t *relay; + char *alloc = NULL; + int i; + + pthread_mutex_lock(&cluster_mtx); + + switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) { + case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE: + case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * On transaction start we allocate a new h2span_connect and + * acknowledge the request, leaving the transaction open. + * We then relay priority-selected SPANs. + */ + fprintf(stderr, "LNK_CONN(%08x): %s/%s\n", + (uint32_t)msg->any.head.msgid, + hammer2_uuid_to_str(&msg->any.lnk_conn.pfs_clid, + &alloc), + msg->any.lnk_conn.label); + free(alloc); + + conn = hammer2_alloc(sizeof(*conn)); + + RB_INIT(&conn->tree); + conn->state = state; + state->func = hammer2_lnk_conn; + state->any.conn = conn; + TAILQ_INSERT_TAIL(&connq, conn, entry); + + /* + * Set up media + */ + TAILQ_FOREACH(media, &mediaq, entry) { + if (uuid_compare(&msg->any.lnk_conn.mediaid, + &media->mediaid, NULL) == 0) { + break; + } + } + if (media == NULL) { + media = hammer2_alloc(sizeof(*media)); + media->mediaid = msg->any.lnk_conn.mediaid; + TAILQ_INSERT_TAIL(&mediaq, media, entry); + } + conn->media = media; + ++media->refs; + + if ((msg->any.head.cmd & HAMMER2_MSGF_DELETE) == 0) { + hammer2_msg_result(msg, 0); + hammer2_router_signal(msg->router); + break; + } + /* FALL THROUGH */ + case HAMMER2_LNK_CONN | HAMMER2_MSGF_DELETE: + case HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE: +deleteconn: + /* + * On transaction terminate we clean out our h2span_connect + * and acknowledge the request, closing the transaction. + */ + fprintf(stderr, "LNK_CONN: Terminated\n"); + conn = state->any.conn; + assert(conn); + + /* + * Clean out the media structure. If refs drops to zero we + * also clean out the media config threads. These threads + * maintain span connections to other hammer2 service daemons. + */ + media = conn->media; + if (--media->refs == 0) { + fprintf(stderr, "Shutting down media spans\n"); + for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) { + conf = &media->config[i]; + + if (conf->thread == NULL) + continue; + conf->ctl = H2CONFCTL_STOP; + pthread_cond_signal(&conf->cond); + } + for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) { + conf = &media->config[i]; + + if (conf->thread == NULL) + continue; + pthread_mutex_unlock(&cluster_mtx); + pthread_join(conf->thread, NULL); + pthread_mutex_lock(&cluster_mtx); + conf->thread = NULL; + pthread_cond_destroy(&conf->cond); + } + fprintf(stderr, "Media shutdown complete\n"); + TAILQ_REMOVE(&mediaq, media, entry); + hammer2_free(media); + } + + /* + * Clean out all relays. This requires terminating each + * relay transaction. + */ + while ((relay = RB_ROOT(&conn->tree)) != NULL) { + hammer2_relay_delete(relay); + } + + /* + * Clean out conn + */ + conn->media = NULL; + conn->state = NULL; + msg->state->any.conn = NULL; + TAILQ_REMOVE(&connq, conn, entry); + hammer2_free(conn); + + hammer2_msg_reply(msg, 0); + /* state invalid after reply */ + break; + case HAMMER2_LNK_VOLCONF: + /* + * One-way volume-configuration message is transmitted + * over the open LNK_CONN transaction. + */ + fprintf(stderr, "RECEIVED VOLCONF\n"); + if (msg->any.lnk_volconf.index < 0 || + msg->any.lnk_volconf.index >= HAMMER2_COPYID_COUNT) { + fprintf(stderr, "VOLCONF: ILLEGAL INDEX %d\n", + msg->any.lnk_volconf.index); + break; + } + if (msg->any.lnk_volconf.copy.path[sizeof(msg->any.lnk_volconf.copy.path) - 1] != 0 || + msg->any.lnk_volconf.copy.path[0] == 0) { + fprintf(stderr, "VOLCONF: ILLEGAL PATH %d\n", + msg->any.lnk_volconf.index); + break; + } + conn = msg->state->any.conn; + if (conn == NULL) { + fprintf(stderr, "VOLCONF: LNK_CONN is missing\n"); + break; + } + conf = &conn->media->config[msg->any.lnk_volconf.index]; + conf->copy_pend = msg->any.lnk_volconf.copy; + conf->ctl |= H2CONFCTL_UPDATE; + if (conf->thread == NULL) { + fprintf(stderr, "VOLCONF THREAD STARTED\n"); + pthread_cond_init(&conf->cond, NULL); + pthread_create(&conf->thread, NULL, + hammer2_volconf_thread, (void *)conf); + } + pthread_cond_signal(&conf->cond); + break; + default: + /* + * Failsafe + */ + if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) + goto deleteconn; + hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP); + break; + } + pthread_mutex_unlock(&cluster_mtx); +} + +void +hammer2_lnk_span(hammer2_msg_t *msg) +{ + hammer2_state_t *state = msg->state; + h2span_cluster_t dummy_cls; + h2span_node_t dummy_node; + h2span_cluster_t *cls; + h2span_node_t *node; + h2span_link_t *slink; + h2span_relay_t *relay; + char *alloc = NULL; + + assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); + + pthread_mutex_lock(&cluster_mtx); + + /* + * On transaction start we initialize the tracking infrastructure + */ + if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) { + assert(state->func == NULL); + state->func = hammer2_lnk_span; + + msg->any.lnk_span.label[sizeof(msg->any.lnk_span.label)-1] = 0; + + /* + * Find the cluster + */ + dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid; + cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls); + if (cls == NULL) { + cls = hammer2_alloc(sizeof(*cls)); + cls->pfs_clid = msg->any.lnk_span.pfs_clid; + RB_INIT(&cls->tree); + RB_INSERT(h2span_cluster_tree, &cluster_tree, cls); + } + + /* + * Find the node + */ + dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid; + node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node); + if (node == NULL) { + node = hammer2_alloc(sizeof(*node)); + node->pfs_fsid = msg->any.lnk_span.pfs_fsid; + node->cls = cls; + RB_INIT(&node->tree); + RB_INSERT(h2span_node_tree, &cls->tree, node); + snprintf(node->label, sizeof(node->label), + "%s", msg->any.lnk_span.label); + } + + /* + * Create the link + */ + assert(state->any.link == NULL); + slink = hammer2_alloc(sizeof(*slink)); + TAILQ_INIT(&slink->relayq); + slink->node = node; + slink->dist = msg->any.lnk_span.dist; + slink->state = state; + state->any.link = slink; + + /* + * Embedded router structure in link for message forwarding. + * + * The spanning id for the router is the message id of + * the SPAN link it is embedded in, allowing messages to + * be routed via &slink->router. + */ + slink->router = hammer2_router_alloc(); + slink->router->iocom = state->iocom; + slink->router->link = slink; + slink->router->target = state->msgid; + hammer2_router_connect(slink->router); + + RB_INSERT(h2span_link_tree, &node->tree, slink); + + fprintf(stderr, "LNK_SPAN(thr %p): %p %s/%s dist=%d\n", + msg->router->iocom, + slink, + hammer2_uuid_to_str(&msg->any.lnk_span.pfs_clid, + &alloc), + msg->any.lnk_span.label, + msg->any.lnk_span.dist); + free(alloc); +#if 0 + hammer2_relay_scan(NULL, node); +#endif + hammer2_router_signal(msg->router); + } + + /* + * On transaction terminate we remove the tracking infrastructure. + */ + if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { + slink = state->any.link; + assert(slink != NULL); + node = slink->node; + cls = node->cls; + + fprintf(stderr, "LNK_DELE(thr %p): %p %s/%s dist=%d\n", + msg->router->iocom, + slink, + hammer2_uuid_to_str(&cls->pfs_clid, &alloc), + state->msg->any.lnk_span.label, + state->msg->any.lnk_span.dist); + free(alloc); + + /* + * Remove the router from consideration + */ + hammer2_router_disconnect(&slink->router); + + /* + * Clean out all relays. This requires terminating each + * relay transaction. + */ + while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) { + hammer2_relay_delete(relay); + } + + /* + * Clean out the topology + */ + RB_REMOVE(h2span_link_tree, &node->tree, slink); + if (RB_EMPTY(&node->tree)) { + RB_REMOVE(h2span_node_tree, &cls->tree, node); + if (RB_EMPTY(&cls->tree) && cls->refs == 0) { + RB_REMOVE(h2span_cluster_tree, + &cluster_tree, cls); + hammer2_free(cls); + } + node->cls = NULL; + hammer2_free(node); + node = NULL; + } + state->any.link = NULL; + slink->state = NULL; + slink->node = NULL; + hammer2_free(slink); + + /* + * We have to terminate the transaction + */ + hammer2_state_reply(state, 0); + /* state invalid after reply */ + + /* + * If the node still exists issue any required updates. If + * it doesn't then all related relays have already been + * removed and there's nothing left to do. + */ +#if 0 + if (node) + hammer2_relay_scan(NULL, node); +#endif + if (node) + hammer2_router_signal(msg->router); + } + + pthread_mutex_unlock(&cluster_mtx); +} + +/* + * Messages received on relay SPANs. These are open transactions so it is + * in fact possible for the other end to close the transaction. + * + * XXX MPRACE on state structure + */ +static void +hammer2_lnk_relay(hammer2_msg_t *msg) +{ + hammer2_state_t *state = msg->state; + h2span_relay_t *relay; + + assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY); + + if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { + pthread_mutex_lock(&cluster_mtx); + if ((relay = state->any.relay) != NULL) { + hammer2_relay_delete(relay); + } else { + hammer2_state_reply(state, 0); + } + pthread_mutex_unlock(&cluster_mtx); + } +} + +/* + * Update relay transactions for SPANs. + * + * Called with cluster_mtx held. + */ +static void hammer2_relay_scan_specific(h2span_node_t *node, + h2span_connect_t *conn); + +static void +hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node) +{ + h2span_cluster_t *cls; + + if (node) { + /* + * Iterate specific node + */ + TAILQ_FOREACH(conn, &connq, entry) + hammer2_relay_scan_specific(node, conn); + } else { + /* + * Full iteration. + * + * Iterate cluster ids, nodes, and either a specific connection + * or all connections. + */ + RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) { + /* + * Iterate node ids + */ + RB_FOREACH(node, h2span_node_tree, &cls->tree) { + /* + * Synchronize the node's link (received SPANs) + * with each connection's relays. + */ + if (conn) { + hammer2_relay_scan_specific(node, conn); + } else { + TAILQ_FOREACH(conn, &connq, entry) { + hammer2_relay_scan_specific(node, + conn); + } + assert(conn == NULL); + } + } + } + } +} + +/* + * Update the relay'd SPANs for this (node, conn). + * + * Iterate links and adjust relays to match. We only propagate the top link + * for now (XXX we want to propagate the top two). + * + * The hammer2_relay_scan_cmp() function locates the first relay element + * for any given node. The relay elements will be sub-sorted by dist. + */ +struct relay_scan_info { + h2span_node_t *node; + h2span_relay_t *relay; +}; + +static int +hammer2_relay_scan_cmp(h2span_relay_t *relay, void *arg) +{ + struct relay_scan_info *info = arg; + + if ((intptr_t)relay->link->node < (intptr_t)info->node) + return(-1); + if ((intptr_t)relay->link->node > (intptr_t)info->node) + return(1); + return(0); +} + +static int +hammer2_relay_scan_callback(h2span_relay_t *relay, void *arg) +{ + struct relay_scan_info *info = arg; + + info->relay = relay; + return(-1); +} + +static void +hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn) +{ + struct relay_scan_info info; + h2span_relay_t *relay; + h2span_relay_t *next_relay; + h2span_link_t *slink; + int count = 2; + + info.node = node; + info.relay = NULL; + + /* + * Locate the first related relay for the node on this connection. + * relay will be NULL if there were none. + */ + RB_SCAN(h2span_relay_tree, &conn->tree, + hammer2_relay_scan_cmp, hammer2_relay_scan_callback, &info); + relay = info.relay; + info.relay = NULL; + if (relay) + assert(relay->link->node == node); + + if (DebugOpt > 8) + fprintf(stderr, "relay scan for connection %p\n", conn); + + /* + * Iterate the node's links (received SPANs) in distance order, + * lowest (best) dist first. + */ + /* fprintf(stderr, "LOOP\n"); */ + RB_FOREACH(slink, h2span_link_tree, &node->tree) { + /* + fprintf(stderr, "SLINK %p RELAY %p(%p)\n", + slink, relay, relay ? relay->link : NULL); + */ + /* + * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION. + * + * Track relays while iterating the best links and construct + * missing relays when necessary. + * + * (If some prior better link was removed it would have also + * removed the relay, so the relay can only match exactly or + * be worse). + */ + if (relay && relay->link == slink) { + /* + * Match, relay already in-place, get the next + * relay to match against the next slink. + */ + relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); + if (--count == 0) + break; + } else if (slink->dist > HAMMER2_SPAN_MAXDIST) { + /* + * No match but span distance is too great, + * do not relay. This prevents endless closed + * loops with ever-incrementing distances when + * the seed span is lost in the graph. + * + * All later spans will also be too far away so + * we can break out of the loop. + */ + break; + } else if (slink->state->iocom == conn->state->iocom) { + /* + * No match but we would transmit a LNK_SPAN + * out the same connection it came in on, which + * can be trivially optimized out. + */ + break; + } else { + /* + * No match, distance is ok, construct a new relay. + * (slink is better than relay). + */ + hammer2_msg_t *msg; + + assert(relay == NULL || + relay->link->node != slink->node || + relay->link->dist >= slink->dist); + relay = hammer2_alloc(sizeof(*relay)); + relay->conn = conn; + relay->link = slink; + + msg = hammer2_msg_alloc(conn->state->iocom->router, 0, + HAMMER2_LNK_SPAN | + HAMMER2_MSGF_CREATE, + hammer2_lnk_relay, relay); + relay->state = msg->state; + relay->router = hammer2_router_alloc(); + relay->router->iocom = relay->state->iocom; + relay->router->relay = relay; + relay->router->target = relay->state->msgid; + + msg->any.lnk_span = slink->state->msg->any.lnk_span; + msg->any.lnk_span.dist = slink->dist + 1; + + hammer2_router_connect(relay->router); + + RB_INSERT(h2span_relay_tree, &conn->tree, relay); + TAILQ_INSERT_TAIL(&slink->relayq, relay, entry); + + hammer2_msg_write(msg); + + fprintf(stderr, + "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d " + "FD %d state %p\n", + slink, + relay, + node->cls, node, slink->dist, + conn->state->iocom->sock_fd, relay->state); + + /* + * Match (created new relay), get the next relay to + * match against the next slink. + */ + relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); + if (--count == 0) + break; + } + } + + /* + * Any remaining relay's belonging to this connection which match + * the node are in excess of the current aggregate spanning state + * and should be removed. + */ + while (relay && relay->link->node == node) { + next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); + hammer2_relay_delete(relay); + relay = next_relay; + } +} + +static +void +hammer2_relay_delete(h2span_relay_t *relay) +{ + fprintf(stderr, + "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n", + relay->link, + relay, + relay->link->node->cls, relay->link->node, + relay->link->dist, + relay->conn->state->iocom->sock_fd, relay->state); + + hammer2_router_disconnect(&relay->router); + + RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay); + TAILQ_REMOVE(&relay->link->relayq, relay, entry); + + if (relay->state) { + relay->state->any.relay = NULL; + hammer2_state_reply(relay->state, 0); + /* state invalid after reply */ + relay->state = NULL; + } + relay->conn = NULL; + relay->link = NULL; + hammer2_free(relay); +} + +static void * +hammer2_volconf_thread(void *info) +{ + h2span_media_config_t *conf = info; + + pthread_mutex_lock(&cluster_mtx); + while ((conf->ctl & H2CONFCTL_STOP) == 0) { + if (conf->ctl & H2CONFCTL_UPDATE) { + fprintf(stderr, "VOLCONF UPDATE\n"); + conf->ctl &= ~H2CONFCTL_UPDATE; + if (bcmp(&conf->copy_run, &conf->copy_pend, + sizeof(conf->copy_run)) == 0) { + fprintf(stderr, "VOLCONF: no changes\n"); + continue; + } + /* + * XXX TODO - auto reconnect on lookup failure or + * connect failure or stream failure. + */ + + pthread_mutex_unlock(&cluster_mtx); + hammer2_volconf_stop(conf); + conf->copy_run = conf->copy_pend; + if (conf->copy_run.copyid != 0 && + strncmp(conf->copy_run.path, "span:", 5) == 0) { + hammer2_volconf_start(conf, + conf->copy_run.path + 5); + } + pthread_mutex_lock(&cluster_mtx); + fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state); + } + if (conf->state == H2MC_CONNECT) { + hammer2_volconf_start(conf, conf->copy_run.path + 5); + pthread_mutex_unlock(&cluster_mtx); + sleep(5); + pthread_mutex_lock(&cluster_mtx); + } else { + pthread_cond_wait(&conf->cond, &cluster_mtx); + } + } + pthread_mutex_unlock(&cluster_mtx); + hammer2_volconf_stop(conf); + return(NULL); +} + +static +void +hammer2_volconf_stop(h2span_media_config_t *conf) +{ + switch(conf->state) { + case H2MC_STOPPED: + break; + case H2MC_CONNECT: + conf->state = H2MC_STOPPED; + break; + case H2MC_RUNNING: + shutdown(conf->fd, SHUT_WR); + pthread_join(conf->iocom_thread, NULL); + conf->iocom_thread = NULL; + break; + } +} + +static +void +hammer2_volconf_start(h2span_media_config_t *conf, const char *hostname) +{ ++ hammer2_master_service_info_t *info; ++ + switch(conf->state) { + case H2MC_STOPPED: + case H2MC_CONNECT: + conf->fd = hammer2_connect(hostname); + if (conf->fd < 0) { + fprintf(stderr, "Unable to connect to %s\n", hostname); + conf->state = H2MC_CONNECT; + } else { - pthread_create(&conf->iocom_thread, NULL, - master_service, - (void *)(intptr_t)conf->fd); ++ info = malloc(sizeof(*info)); ++ bzero(info, sizeof(*info)); ++ info->fd = conf->fd; ++ info->detachme = 0; + conf->state = H2MC_RUNNING; ++ pthread_create(&conf->iocom_thread, NULL, ++ master_service, info); + } + break; + case H2MC_RUNNING: + break; + } +} + +/************************************************************************ + * ROUTER AND MESSAGING HANDLES * + ************************************************************************ + * + * Basically the idea here is to provide a stable data structure which + * can be localized to the caller for higher level protocols to work with. + * Depends on the context, these hammer2_handle's can be pooled by use-case + * and remain persistent through a client (or mount point's) life. + */ + +#if 0 +/* + * Obtain a stable handle on a cluster given its uuid. This ties directly + * into the global cluster topology, creating the structure if necessary + * (even if the uuid does not exist or does not exist yet), and preventing + * the structure from getting ripped out from under us while we hold a + * pointer to it. + */ +h2span_cluster_t * +hammer2_cluster_get(uuid_t *pfs_clid) +{ + h2span_cluster_t dummy_cls; + h2span_cluster_t *cls; + + dummy_cls.pfs_clid = *pfs_clid; + pthread_mutex_lock(&cluster_mtx); + cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls); + if (cls) + ++cls->refs; + pthread_mutex_unlock(&cluster_mtx); + return (cls); +} + +void +hammer2_cluster_put(h2span_cluster_t *cls) +{ + pthread_mutex_lock(&cluster_mtx); + assert(cls->refs > 0); + --cls->refs; + if (RB_EMPTY(&cls->tree) && cls->refs == 0) { + RB_REMOVE(h2span_cluster_tree, + &cluster_tree, cls); + hammer2_free(cls); + } + pthread_mutex_unlock(&cluster_mtx); +} + +/* + * Obtain a stable handle to a specific cluster node given its uuid. + * This handle does NOT lock in the route to the node and is typically + * used as part of the hammer2_handle_*() API to obtain a set of + * stable nodes. + */ +h2span_node_t * +hammer2_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid) +{ +} + +#endif + +#if 0 +/* + * Acquire a persistent router structure given the cluster and node ids. + * Messages can be transacted via this structure while held. If the route + * is lost messages will return failure. + */ +hammer2_router_t * +hammer2_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid) +{ +} + +/* + * Release previously acquired router. + */ +void +hammer2_router_put(hammer2_router_t *router) +{ +} +#endif + +/************************************************************************ + * DEBUGGER * + ************************************************************************/ +/* + * Dumps the spanning tree + */ +void +shell_tree(hammer2_router_t *router, char *cmdbuf __unused) +{ + h2span_cluster_t *cls; + h2span_node_t *node; + h2span_link_t *slink; + char *uustr = NULL; + + pthread_mutex_lock(&cluster_mtx); + RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) { + router_printf(router, "Cluster %s\n", + hammer2_uuid_to_str(&cls->pfs_clid, &uustr)); + RB_FOREACH(node, h2span_node_tree, &cls->tree) { + router_printf(router, " Node %s (%s)\n", + hammer2_uuid_to_str(&node->pfs_fsid, &uustr), + node->label); + RB_FOREACH(slink, h2span_link_tree, &node->tree) { + router_printf(router, "\tLink dist=%d via %d\n", + slink->dist, + slink->state->iocom->sock_fd); + } + } + } + pthread_mutex_unlock(&cluster_mtx); + if (uustr) + free(uustr); +#if 0 + TAILQ_FOREACH(conn, &connq, entry) { + } +#endif +}