cluster - more libdmsg work
[dragonfly.git] / lib / libdmsg / msg_lnk.c
index 2544d8b..e2e2b73 100644 (file)
@@ -792,6 +792,9 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
 
        /*pthread_mutex_lock(&cluster_mtx);*/
 
+       if (DMsgDebugOpt >= 4)
+               fprintf(stderr, "CIRC receive cmd=%08x\n", msg->any.head.cmd);
+
        switch (msg->any.head.cmd & (DMSGF_CREATE |
                                     DMSGF_DELETE |
                                     DMSGF_REPLY)) {
@@ -825,10 +828,17 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                                   &dummy);
                pthread_mutex_unlock(&iocomA->mtx);
                if (tx_state == NULL) {
+                       /* XXX SMP race */
                        fprintf(stderr, "dmsg_lnk_circ: no circuit\n");
                        dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
                        break;
                }
+               if (tx_state->icmd != DMSG_LNK_SPAN) {
+                       /* XXX SMP race */
+                       fprintf(stderr, "dmsg_lnk_circ: not LNK_SPAN\n");
+                       dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
+                       break;
+               }
 
                /* locate h2span_link */
                rx_state = tx_state->any.relay->source_rt;
@@ -841,7 +851,7 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                 * it received from us as <target>.
                 */
                circA = dmsg_alloc(sizeof(*circA));
-               circA->iocom = iocomA;
+               dmsg_circuit_init(iocomA, circA);
                circA->state = msg->state;      /* LNK_CIRC state */
                circA->msgid = msg->state->msgid;
                circA->span_state = tx_state;   /* H2SPAN_RELAY state */
@@ -852,6 +862,7 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                iocomB = rx_state->iocom;
 
                circB = dmsg_alloc(sizeof(*circB));
+               dmsg_circuit_init(iocomB, circB);
 
                /*
                 * Create a LNK_CIRC transaction on B
@@ -860,23 +871,40 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                                         0, DMSG_LNK_CIRC | DMSGF_CREATE,
                                         dmsg_lnk_circ, circB);
                fwd_msg->state->any.circ = circB;
-               circB->iocom = iocomB;
+               fwd_msg->any.lnk_circ.target = rx_state->msgid;
                circB->state = fwd_msg->state;  /* LNK_CIRC state */
                circB->msgid = fwd_msg->any.head.msgid;
                circB->span_state = rx_state;   /* H2SPAN_LINK state */
                circB->is_relay = 0;
                circB->refs = 2;                /* state and peer */
 
+               if (DMsgDebugOpt >= 4)
+                       fprintf(stderr, "CIRC forward %p->%p\n", circA, circB);
+
                /*
                 * Link the two circuits together.
                 */
                circA->peer = circB;
                circB->peer = circA;
 
+               if (iocomA < iocomB) {
+                       pthread_mutex_lock(&iocomA->mtx);
+                       pthread_mutex_lock(&iocomB->mtx);
+               } else {
+                       pthread_mutex_lock(&iocomB->mtx);
+                       pthread_mutex_lock(&iocomA->mtx);
+               }
                if (RB_INSERT(dmsg_circuit_tree, &iocomA->circuit_tree, circA))
                        assert(0);
                if (RB_INSERT(dmsg_circuit_tree, &iocomB->circuit_tree, circB))
                        assert(0);
+               if (iocomA < iocomB) {
+                       pthread_mutex_unlock(&iocomB->mtx);
+                       pthread_mutex_unlock(&iocomA->mtx);
+               } else {
+                       pthread_mutex_unlock(&iocomA->mtx);
+                       pthread_mutex_unlock(&iocomB->mtx);
+               }
 
                dmsg_msg_write(fwd_msg);
 
@@ -1270,6 +1298,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
         */
        while (relay && relay->source_rt->any.link->node == node) {
                next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
+               fprintf(stderr, "RELAY DELETE FROM EXTRAS\n");
                dmsg_relay_delete(relay);
                relay = next_relay;
        }
@@ -1331,6 +1360,7 @@ dmsg_lnk_relay(dmsg_msg_t *msg)
 
        if (msg->any.head.cmd & DMSGF_DELETE) {
                pthread_mutex_lock(&cluster_mtx);
+               fprintf(stderr, "RELAY DELETE FROM LNK_RELAY MSG\n");
                if ((relay = state->any.relay) != NULL) {
                        dmsg_relay_delete(relay);
                } else {
@@ -1340,7 +1370,9 @@ dmsg_lnk_relay(dmsg_msg_t *msg)
        }
 }
 
-
+/*
+ * cluster_mtx held by caller
+ */
 static
 void
 dmsg_relay_delete(h2span_relay_t *relay)
@@ -1480,17 +1512,26 @@ dmsg_circuit_relay(dmsg_msg_t *msg)
        /*
         * Lookup the circuit on the incoming iocom.
         */
-       pthread_mutex_lock(&cluster_mtx);
+       pthread_mutex_lock(&iocom->mtx);
 
        dummy.msgid = msg->any.head.circuit;
        circ = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &dummy);
        assert(circ);
        peer = circ->peer;
+       dmsg_circuit_hold(peer);
+
+       if (DMsgDebugOpt >= 4) {
+               fprintf(stderr,
+                       "CIRC relay %08x %p->%p\n",
+                       msg->any.head.cmd, circ, peer);
+       }
 
        msg->iocom = peer->iocom;
        msg->any.head.circuit = peer->msgid;
+       dmsg_circuit_drop_locked(msg->circuit);
+       msg->circuit = peer;
 
-       pthread_mutex_unlock(&cluster_mtx);
+       pthread_mutex_unlock(&iocom->mtx);
 
        dmsg_msg_write(msg);
        error = DMSG_IOQ_ERROR_ROUTED;
@@ -1560,6 +1601,8 @@ dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
 
 /*
  * Dumps the spanning tree
+ *
+ * DEBUG ONLY
  */
 void
 dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
@@ -1567,6 +1610,7 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
        h2span_cluster_t *cls;
        h2span_node_t *node;
        h2span_link_t *slink;
+       h2span_relay_t *relay;
        char *uustr = NULL;
 
        pthread_mutex_lock(&cluster_mtx);
@@ -1582,9 +1626,18 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
                                node->fs_label);
                        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
                                dmsg_circuit_printf(circuit,
-                                           "\tLink dist=%d via %d\n",
+                                           "\tSLink msgid %016jx "
+                                           "dist=%d via %d\n",
+                                           (intmax_t)slink->state->msgid,
                                            slink->dist,
                                            slink->state->iocom->sock_fd);
+                               TAILQ_FOREACH(relay, &slink->relayq, entry) {
+                                       dmsg_circuit_printf(circuit,
+                                           "\t    Relay-out msgid %016jx "
+                                           "via %d\n",
+                                           (intmax_t)relay->target_rt->msgid,
+                                           relay->target_rt->iocom->sock_fd);
+                               }
                        }
                }
        }
@@ -1597,6 +1650,39 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
 #endif
 }
 
+/*
+ * DEBUG ONLY
+ *
+ * Locate the state representing an incoming LNK_SPAN given its msgid.
+ */
+int
+dmsg_debug_findspan(uint64_t msgid, dmsg_state_t **statep)
+{
+       h2span_cluster_t *cls;
+       h2span_node_t *node;
+       h2span_link_t *slink;
+       h2span_relay_t *relay;
+
+       pthread_mutex_lock(&cluster_mtx);
+       relay = NULL;
+       RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
+               RB_FOREACH(node, h2span_node_tree, &cls->tree) {
+                       RB_FOREACH(slink, h2span_link_tree, &node->tree) {
+                               if (slink->state->msgid == msgid) {
+                                       *statep = slink->state;
+                                       goto found;
+                               }
+                       }
+               }
+       }
+       pthread_mutex_unlock(&cluster_mtx);
+       *statep = NULL;
+       return(ENOENT);
+found:
+       pthread_mutex_unlock(&cluster_mtx);
+       return(0);
+}
+
 /*
  * Random number sub-sort value to add to SPAN rnss fields on relay.
  * This allows us to differentiate spans with the same <dist> field