cluster - more libdmsg work
[dragonfly.git] / lib / libdmsg / msg_lnk.c
index c76da55..e2e2b73 100644 (file)
@@ -432,6 +432,9 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
 
        pthread_mutex_lock(&cluster_mtx);
 
+       fprintf(stderr, "dmsg_lnk_conn: msg %p cmd %08x state %p txcmd %08x rxcmd %08x\n",
+               msg, msg->any.head.cmd, state, state->txcmd, state->rxcmd);
+
        switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
        case DMSG_LNK_CONN | DMSGF_CREATE:
        case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
@@ -785,9 +788,13 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
        dmsg_msg_t *fwd_msg;
        dmsg_iocom_t *iocomA;
        dmsg_iocom_t *iocomB;
+       int disconnect;
 
        /*pthread_mutex_lock(&cluster_mtx);*/
 
+       if (DMsgDebugOpt >= 4)
+               fprintf(stderr, "CIRC receive cmd=%08x\n", msg->any.head.cmd);
+
        switch (msg->any.head.cmd & (DMSGF_CREATE |
                                     DMSGF_DELETE |
                                     DMSGF_REPLY)) {
@@ -819,9 +826,19 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                tx_state = RB_FIND(dmsg_state_tree,
                                   &iocomA->circuit0.statewr_tree,
                                   &dummy);
-               /* XXX state refs */
-               assert(tx_state);
                pthread_mutex_unlock(&iocomA->mtx);
+               if (tx_state == NULL) {
+                       /* XXX SMP race */
+                       fprintf(stderr, "dmsg_lnk_circ: no circuit\n");
+                       dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
+                       break;
+               }
+               if (tx_state->icmd != DMSG_LNK_SPAN) {
+                       /* XXX SMP race */
+                       fprintf(stderr, "dmsg_lnk_circ: not LNK_SPAN\n");
+                       dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
+                       break;
+               }
 
                /* locate h2span_link */
                rx_state = tx_state->any.relay->source_rt;
@@ -834,7 +851,7 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                 * it received from us as <target>.
                 */
                circA = dmsg_alloc(sizeof(*circA));
-               circA->iocom = iocomA;
+               dmsg_circuit_init(iocomA, circA);
                circA->state = msg->state;      /* LNK_CIRC state */
                circA->msgid = msg->state->msgid;
                circA->span_state = tx_state;   /* H2SPAN_RELAY state */
@@ -845,6 +862,7 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                iocomB = rx_state->iocom;
 
                circB = dmsg_alloc(sizeof(*circB));
+               dmsg_circuit_init(iocomB, circB);
 
                /*
                 * Create a LNK_CIRC transaction on B
@@ -853,23 +871,40 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                                         0, DMSG_LNK_CIRC | DMSGF_CREATE,
                                         dmsg_lnk_circ, circB);
                fwd_msg->state->any.circ = circB;
-               circB->iocom = iocomB;
+               fwd_msg->any.lnk_circ.target = rx_state->msgid;
                circB->state = fwd_msg->state;  /* LNK_CIRC state */
                circB->msgid = fwd_msg->any.head.msgid;
                circB->span_state = rx_state;   /* H2SPAN_LINK state */
                circB->is_relay = 0;
                circB->refs = 2;                /* state and peer */
 
+               if (DMsgDebugOpt >= 4)
+                       fprintf(stderr, "CIRC forward %p->%p\n", circA, circB);
+
                /*
                 * Link the two circuits together.
                 */
                circA->peer = circB;
                circB->peer = circA;
 
+               if (iocomA < iocomB) {
+                       pthread_mutex_lock(&iocomA->mtx);
+                       pthread_mutex_lock(&iocomB->mtx);
+               } else {
+                       pthread_mutex_lock(&iocomB->mtx);
+                       pthread_mutex_lock(&iocomA->mtx);
+               }
                if (RB_INSERT(dmsg_circuit_tree, &iocomA->circuit_tree, circA))
                        assert(0);
                if (RB_INSERT(dmsg_circuit_tree, &iocomB->circuit_tree, circB))
                        assert(0);
+               if (iocomA < iocomB) {
+                       pthread_mutex_unlock(&iocomB->mtx);
+                       pthread_mutex_unlock(&iocomA->mtx);
+               } else {
+                       pthread_mutex_unlock(&iocomA->mtx);
+                       pthread_mutex_unlock(&iocomB->mtx);
+               }
 
                dmsg_msg_write(fwd_msg);
 
@@ -882,13 +917,19 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                 * to (B).
                 */
                iocomA = msg->iocom;
+               if (msg->state->any.circ == NULL) {
+                       /* already returned an error/deleted */
+                       break;
+               }
                circA = msg->state->any.circ;
                circB = circA->peer;
                assert(msg->state == circA->state);
 
                /*
-                * If we are closing A and the peer B is closed, disconnect.
+                * We are closing B's send side.  If B's receive side is
+                * already closed we disconnect the circuit from B's state.
                 */
+               disconnect = 0;
                if (circB && (state = circB->state) != NULL) {
                        if (state->rxcmd & DMSGF_DELETE) {
                                circB->state = NULL;
@@ -896,23 +937,32 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                                dmsg_circuit_drop(circB);
                        }
                        dmsg_state_reply(state, msg->any.head.error);
+                       disconnect = 1;
                }
 
                /*
-                * If both sides now closed terminate the peer association
-                * and the state association.  This may drop up to two refs
-                * on circA and one on circB.
+                * We received a close on A.  If A's send side is already
+                * closed we disconnect the circuit from A's state.
                 */
-               if (circA->state->txcmd & DMSGF_DELETE) {
+               if (circA && (state = circA->state) != NULL) {
+                       if (state->txcmd & DMSGF_DELETE) {
+                               circA->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circA);
+                       }
+                       disconnect = 1;
+               }
+
+               /*
+                * Disconnect the peer<->peer association
+                */
+               if (disconnect) {
                        if (circB) {
                                circA->peer = NULL;
                                circB->peer = NULL;
                                dmsg_circuit_drop(circA);
                                dmsg_circuit_drop(circB); /* XXX SMP */
                        }
-                       circA->state->any.circ = NULL;
-                       circA->state = NULL;
-                       dmsg_circuit_drop(circA);
                }
                break;
        case DMSGF_REPLY | DMSGF_CREATE:
@@ -924,9 +974,11 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                 * via the virtual circuit before seeing this reply.
                 */
                circB = msg->state->any.circ;
+               assert(circB);
                circA = circB->peer;
                assert(msg->state == circB->state);
-               if (circA && (msg->any.head.cmd & DMSGF_DELETE) == 0) {
+               assert(circA);
+               if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
                        dmsg_state_result(circA->state, msg->any.head.error);
                        break;
                }
@@ -943,8 +995,10 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                assert(msg->state == circB->state);
 
                /*
-                * If we are closing A and the peer B is closed, disconnect.
+                * We received a close on (B), propagate to (A).  If we have
+                * already received the close from (A) we disconnect the state.
                 */
+               disconnect = 0;
                if (circA && (state = circA->state) != NULL) {
                        if (state->rxcmd & DMSGF_DELETE) {
                                circA->state = NULL;
@@ -952,23 +1006,32 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                                dmsg_circuit_drop(circA);
                        }
                        dmsg_state_reply(state, msg->any.head.error);
+                       disconnect = 1;
+               }
+
+               /*
+                * We received a close on (B).  If (B)'s send side is already
+                * closed we disconnect the state.
+                */
+               if (circB && (state = circB->state) != NULL) {
+                       if (state->txcmd & DMSGF_DELETE) {
+                               circB->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circB);
+                       }
+                       disconnect = 1;
                }
 
                /*
-                * If both sides now closed terminate the peer association
-                * and the state association.  This may drop up to two refs
-                * on circA and one on circB.
+                * Disconnect the peer<->peer association
                 */
-               if (circB->state->txcmd & DMSGF_DELETE) {
+               if (disconnect) {
                        if (circA) {
                                circB->peer = NULL;
                                circA->peer = NULL;
                                dmsg_circuit_drop(circB);
                                dmsg_circuit_drop(circA); /* XXX SMP */
                        }
-                       circB->state->any.circ = NULL;
-                       circB->state = NULL;
-                       dmsg_circuit_drop(circB);
                }
                break;
        }
@@ -1235,6 +1298,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
         */
        while (relay && relay->source_rt->any.link->node == node) {
                next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
+               fprintf(stderr, "RELAY DELETE FROM EXTRAS\n");
                dmsg_relay_delete(relay);
                relay = next_relay;
        }
@@ -1296,6 +1360,7 @@ dmsg_lnk_relay(dmsg_msg_t *msg)
 
        if (msg->any.head.cmd & DMSGF_DELETE) {
                pthread_mutex_lock(&cluster_mtx);
+               fprintf(stderr, "RELAY DELETE FROM LNK_RELAY MSG\n");
                if ((relay = state->any.relay) != NULL) {
                        dmsg_relay_delete(relay);
                } else {
@@ -1305,7 +1370,9 @@ dmsg_lnk_relay(dmsg_msg_t *msg)
        }
 }
 
-
+/*
+ * cluster_mtx held by caller
+ */
 static
 void
 dmsg_relay_delete(h2span_relay_t *relay)
@@ -1445,20 +1512,27 @@ dmsg_circuit_relay(dmsg_msg_t *msg)
        /*
         * Lookup the circuit on the incoming iocom.
         */
-       pthread_mutex_lock(&cluster_mtx);
+       pthread_mutex_lock(&iocom->mtx);
 
        dummy.msgid = msg->any.head.circuit;
        circ = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &dummy);
        assert(circ);
        peer = circ->peer;
+       dmsg_circuit_hold(peer);
+
+       if (DMsgDebugOpt >= 4) {
+               fprintf(stderr,
+                       "CIRC relay %08x %p->%p\n",
+                       msg->any.head.cmd, circ, peer);
+       }
 
        msg->iocom = peer->iocom;
        msg->any.head.circuit = peer->msgid;
+       dmsg_circuit_drop_locked(msg->circuit);
+       msg->circuit = peer;
 
-       pthread_mutex_unlock(&cluster_mtx);
+       pthread_mutex_unlock(&iocom->mtx);
 
-       fprintf(stderr, "ROUTE MESSAGE VC %08x to %08x\n",
-               (uint32_t)circ->msgid, (uint32_t)peer->msgid);  /* brevity */
        dmsg_msg_write(msg);
        error = DMSG_IOQ_ERROR_ROUTED;
 
@@ -1527,6 +1601,8 @@ dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
 
 /*
  * Dumps the spanning tree
+ *
+ * DEBUG ONLY
  */
 void
 dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
@@ -1534,6 +1610,7 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
        h2span_cluster_t *cls;
        h2span_node_t *node;
        h2span_link_t *slink;
+       h2span_relay_t *relay;
        char *uustr = NULL;
 
        pthread_mutex_lock(&cluster_mtx);
@@ -1549,9 +1626,18 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
                                node->fs_label);
                        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
                                dmsg_circuit_printf(circuit,
-                                           "\tLink dist=%d via %d\n",
+                                           "\tSLink msgid %016jx "
+                                           "dist=%d via %d\n",
+                                           (intmax_t)slink->state->msgid,
                                            slink->dist,
                                            slink->state->iocom->sock_fd);
+                               TAILQ_FOREACH(relay, &slink->relayq, entry) {
+                                       dmsg_circuit_printf(circuit,
+                                           "\t    Relay-out msgid %016jx "
+                                           "via %d\n",
+                                           (intmax_t)relay->target_rt->msgid,
+                                           relay->target_rt->iocom->sock_fd);
+                               }
                        }
                }
        }
@@ -1564,6 +1650,39 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
 #endif
 }
 
+/*
+ * DEBUG ONLY
+ *
+ * Locate the state representing an incoming LNK_SPAN given its msgid.
+ */
+int
+dmsg_debug_findspan(uint64_t msgid, dmsg_state_t **statep)
+{
+       h2span_cluster_t *cls;
+       h2span_node_t *node;
+       h2span_link_t *slink;
+       h2span_relay_t *relay;
+
+       pthread_mutex_lock(&cluster_mtx);
+       relay = NULL;
+       RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
+               RB_FOREACH(node, h2span_node_tree, &cls->tree) {
+                       RB_FOREACH(slink, h2span_link_tree, &node->tree) {
+                               if (slink->state->msgid == msgid) {
+                                       *statep = slink->state;
+                                       goto found;
+                               }
+                       }
+               }
+       }
+       pthread_mutex_unlock(&cluster_mtx);
+       *statep = NULL;
+       return(ENOENT);
+found:
+       pthread_mutex_unlock(&cluster_mtx);
+       return(0);
+}
+
 /*
  * Random number sub-sort value to add to SPAN rnss fields on relay.
  * This allows us to differentiate spans with the same <dist> field