hammer2 - SPAN protocol work
[dragonfly.git] / sbin / hammer2 / msg_lnk.c
1 /*
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS
36  *
37  * This code supports the LNK_SPAN protocol.  Essentially all PFS's
38  * clients and services rendezvous with the userland hammer2 service and
39  * open LNK_SPAN transactions using a message header linkid of 0,
40  * registering any PFS's they have connectivity to with us.
41  *
42  * --
43  *
44  * Each registration maintains its own open LNK_SPAN message transaction.
45  * The SPANs are collected, aggregated, and retransmitted over available
46  * connections through the maintainance of additional LNK_SPAN message
47  * transactions on each link.
48  *
49  * The msgid for each active LNK_SPAN transaction we receive allows us to
50  * send a message to the target PFS (which might be one of many belonging
51  * to the same cluster), by specifying that msgid as the linkid in any
52  * message we send to the target PFS.
53  *
54  * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit
55  * (and remember we will maintain multiple open LNK_SPAN transactions on
56  * each connection representing the topology span, so every node sees every
57  * other node as a separate open transaction).  So, similarly the msgid for
58  * these active transactions which we initiated can be used by the other
59  * end to route messages through us to another node, ultimately winding up
60  * at the identified hammer2 PFS.  We have to adjust the spanid in the message
61  * header at each hop to be representative of the outgoing LNK_SPAN we
62  * are forwarding the message through.
63  *
64  * --
65  *
66  * If we were to retransmit every LNK_SPAN transaction we receive it would
67  * create a huge mess, so we have to aggregate all received LNK_SPAN
68  * transactions, sort them by the fsid (the cluster) and sub-sort them by
69  * the pfs_fsid (individual nodes in the cluster), and only retransmit
70  * (create outgoing transactions) for a subset of the nearest distance-hops
71  * for each individual node.
72  *
73  * The higher level protocols can then issue transactions to the nodes making
74  * up a cluster to perform all actions required.
75  *
76  * --
77  *
78  * Since this is a large topology and a spanning tree protocol, links can
79  * go up and down all the time.  Any time a link goes down its transaction
80  * is closed.  The transaction has to be closed on both ends before we can
81  * delete (and potentially reuse) the related spanid.  The LNK_SPAN being
82  * closed may have been propagated out to other connections and those related
83  * LNK_SPANs are also closed.  Ultimately all routes via the lost LNK_SPAN
84  * go away, ultimately reaching all sources and all targets.
85  *
86  * Any messages in-transit using a route that goes away will be thrown away.
87  * Open transactions are only tracked at the two end-points.  When a link
88  * failure propagates to an end-point the related open transactions lose
89  * their spanid and are automatically aborted.
90  *
91  * It is important to note that internal route nodes cannot just associate
92  * a lost LNK_SPAN transaction with another route to the same destination.
93  * Message transactions MUST be serialized and MUST be ordered.  All messages
94  * for a transaction must run over the same route.  So if the route used by
95  * an active transaction is lost, the related messages will be fully aborted
96  * and the higher protocol levels will retry as appropriate.
97  *
98  * It is also important to note that several paths to the same PFS can be
99  * propagated along the same link, which allows concurrency and even
100  * redundancy over several network interfaces or via different routes through
101  * the topology.  Any given transaction will use only a single route but busy
102  * servers will often have hundreds of transactions active simultaniously,
103  * so having multiple active paths through the network topology for A<->B
104  * will improve performance.
105  *
106  * --
107  *
108  * Most protocols consolidate operations rather than simply relaying them.
109  * This is particularly true of LEAF protocols (such as strict HAMMER2
110  * clients), of which there can be millions connecting into the cluster at
111  * various points.  The SPAN protocol is not used for these LEAF elements.
112  *
113  * Instead the primary service they connect to implements a proxy for the
114  * client protocols so the core topology only has to propagate a couple of
115  * LNK_SPANs and not millions.  LNK_SPANs are meant to be used only for
116  * core master nodes and satellite slaves and cache nodes.
117  */
118
119 #include "hammer2.h"
120
121 /*
122  * Maximum spanning tree distance.  This has the practical effect of
123  * stopping tail-chasing closed loops when a feeder span is lost.
124  */
125 #define HAMMER2_SPAN_MAXDIST    16
126
127 /*
128  * RED-BLACK TREE DEFINITIONS
129  *
130  * We need to track:
131  *
132  * (1) shared fsid's (a cluster).
133  * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions.
134  *
135  * We need to aggegate all active LNK_SPANs, aggregate, and create our own
136  * outgoing LNK_SPAN transactions on each of our connections representing
137  * the aggregated state.
138  *
139  * h2span_connect       - list of iocom connections who wish to receive SPAN
140  *                        propagation from other connections.  Might contain
141  *                        a filter string.  Only iocom's with an open
142  *                        LNK_CONN transactions are applicable for SPAN
143  *                        propagation.
144  *
145  * h2span_relay         - List of links relayed (via SPAN).  Essentially
146  *                        each relay structure represents a LNK_SPAN
147  *                        transaction that we initiated, verses h2span_link
148  *                        which is a LNK_SPAN transaction that we received.
149  *
150  * --
151  *
152  * h2span_cluster       - Organizes the shared fsid's.  One structure for
153  *                        each cluster.
154  *
155  * h2span_node          - Organizes the nodes in a cluster.  One structure
156  *                        for each unique {cluster,node}, aka {fsid, pfs_fsid}.
157  *
158  * h2span_link          - Organizes all incoming and outgoing LNK_SPAN message
159  *                        transactions related to a node.
160  *
161  *                        One h2span_link structure for each incoming LNK_SPAN
162  *                        transaction.  Links selected for propagation back
163  *                        out are also where the outgoing LNK_SPAN messages
164  *                        are indexed into (so we can propagate changes).
165  *
166  *                        The h2span_link's use a red-black tree to sort the
167  *                        distance hop metric for the incoming LNK_SPAN.  We
168  *                        then select the top N for outgoing.  When the
169  *                        topology changes the top N may also change and cause
170  *                        new outgoing LNK_SPAN transactions to be opened
171  *                        and less desireable ones to be closed, causing
172  *                        transactional aborts within the message flow in
173  *                        the process.
174  *
175  * Also note            - All outgoing LNK_SPAN message transactions are also
176  *                        entered into a red-black tree for use by the routing
177  *                        function.  This is handled by msg.c in the state
178  *                        code, not here.
179  */
180
181 struct h2span_link;
182 struct h2span_relay;
183 TAILQ_HEAD(h2span_connect_queue, h2span_connect);
184 TAILQ_HEAD(h2span_relay_queue, h2span_relay);
185
186 RB_HEAD(h2span_cluster_tree, h2span_cluster);
187 RB_HEAD(h2span_node_tree, h2span_node);
188 RB_HEAD(h2span_link_tree, h2span_link);
189 RB_HEAD(h2span_relay_tree, h2span_relay);
190
191 /*
192  * Received LNK_CONN transaction enables SPAN protocol over connection.
193  * (may contain filter).
194  */
195 struct h2span_connect {
196         TAILQ_ENTRY(h2span_connect) entry;
197         struct h2span_relay_tree tree;
198         hammer2_state_t *state;
199 };
200
201 /*
202  * All received LNK_SPANs are organized by cluster (pfs_clid),
203  * node (pfs_fsid), and link (received LNK_SPAN transaction).
204  */
205 struct h2span_cluster {
206         RB_ENTRY(h2span_cluster) rbnode;
207         struct h2span_node_tree tree;
208         uuid_t  pfs_clid;               /* shared fsid */
209 };
210
211 struct h2span_node {
212         RB_ENTRY(h2span_node) rbnode;
213         struct h2span_link_tree tree;
214         struct h2span_cluster *cls;
215         uuid_t  pfs_fsid;               /* unique fsid */
216         char label[64];
217 };
218
219 struct h2span_link {
220         RB_ENTRY(h2span_link) rbnode;
221         hammer2_state_t *state;         /* state<->link */
222         struct h2span_node *node;       /* related node */
223         int32_t dist;
224         struct h2span_relay_queue relayq; /* relay out */
225 };
226
227 /*
228  * Any LNK_SPAN transactions we receive which are relayed out other
229  * connections utilize this structure to track the LNK_SPAN transaction
230  * we initiate on the other connections, if selected for relay.
231  *
232  * In many respects this is the core of the protocol... actually figuring
233  * out what LNK_SPANs to relay.  The spanid used for relaying is the
234  * address of the 'state' structure, which is why h2span_relay has to
235  * be entered into a RB-TREE based at h2span_connect (so we can look
236  * up the spanid to validate it).
237  */
238 struct h2span_relay {
239         RB_ENTRY(h2span_relay) rbnode;  /* from h2span_connect */
240         TAILQ_ENTRY(h2span_relay) entry; /* from link */
241         struct h2span_connect *conn;
242         hammer2_state_t *state;         /* transmitted LNK_SPAN */
243         struct h2span_link *link;       /* received LNK_SPAN */
244 };
245
246
247 typedef struct h2span_connect h2span_connect_t;
248 typedef struct h2span_cluster h2span_cluster_t;
249 typedef struct h2span_node h2span_node_t;
250 typedef struct h2span_link h2span_link_t;
251 typedef struct h2span_relay h2span_relay_t;
252
253 static
254 int
255 h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2)
256 {
257         return(uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL));
258 }
259
260 static
261 int
262 h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
263 {
264         return(uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL));
265 }
266
267 /*
268  * NOTE: Sort/subsort must match h2span_relay_cmp() under any given
269  *       node.
270  */
271 static
272 int
273 h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
274 {
275         if (link1->dist < link2->dist)
276                 return(-1);
277         if (link1->dist > link2->dist)
278                 return(1);
279         if ((intptr_t)link1 < (intptr_t)link2)
280                 return(-1);
281         if ((intptr_t)link1 > (intptr_t)link2)
282                 return(1);
283         return(0);
284 }
285
286 /*
287  * Relay entries are sorted by node, subsorted by distance and link
288  * address (so we can match up the conn->tree relay topology with
289  * a node's link topology).
290  */
291 static
292 int
293 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
294 {
295         if ((intptr_t)relay1->link->node < (intptr_t)relay2->link->node)
296                 return(-1);
297         if ((intptr_t)relay1->link->node > (intptr_t)relay2->link->node)
298                 return(1);
299         if ((intptr_t)relay1->link->dist < (intptr_t)relay2->link->dist)
300                 return(-1);
301         if ((intptr_t)relay1->link->dist > (intptr_t)relay2->link->dist)
302                 return(1);
303         if ((intptr_t)relay1->link < (intptr_t)relay2->link)
304                 return(-1);
305         if ((intptr_t)relay1->link > (intptr_t)relay2->link)
306                 return(1);
307         return(0);
308 }
309
310 RB_PROTOTYPE_STATIC(h2span_cluster_tree, h2span_cluster,
311              rbnode, h2span_cluster_cmp);
312 RB_PROTOTYPE_STATIC(h2span_node_tree, h2span_node,
313              rbnode, h2span_node_cmp);
314 RB_PROTOTYPE_STATIC(h2span_link_tree, h2span_link,
315              rbnode, h2span_link_cmp);
316 RB_PROTOTYPE_STATIC(h2span_relay_tree, h2span_relay,
317              rbnode, h2span_relay_cmp);
318
319 RB_GENERATE_STATIC(h2span_cluster_tree, h2span_cluster,
320              rbnode, h2span_cluster_cmp);
321 RB_GENERATE_STATIC(h2span_node_tree, h2span_node,
322              rbnode, h2span_node_cmp);
323 RB_GENERATE_STATIC(h2span_link_tree, h2span_link,
324              rbnode, h2span_link_cmp);
325 RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay,
326              rbnode, h2span_relay_cmp);
327
328 /*
329  * Global mutex protects cluster_tree lookups.
330  */
331 static pthread_mutex_t cluster_mtx;
332 static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
333 static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq);
334
335 static void hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg);
336 static void hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg);
337 static void hammer2_lnk_relay(hammer2_state_t *state, hammer2_msg_t *msg);
338 static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node);
339 static void hammer2_relay_delete(h2span_relay_t *relay);
340
341 /*
342  * Receive a HAMMER2_MSG_PROTO_LNK message.  This only called for
343  * one-way and opening-transactions since state->func will be assigned
344  * in all other cases.
345  */
346 void
347 hammer2_msg_lnk(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
348 {
349         switch(msg->any.head.cmd & HAMMER2_MSGF_BASECMDMASK) {
350         case HAMMER2_LNK_CONN:
351                 hammer2_lnk_conn(msg->state, msg);
352                 break;
353         case HAMMER2_LNK_SPAN:
354                 hammer2_lnk_span(msg->state, msg);
355                 break;
356         default:
357                 fprintf(stderr,
358                         "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
359                 hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
360                 /* state invalid after reply */
361                 break;
362         }
363 }
364
365 void
366 hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg)
367 {
368         h2span_connect_t *conn;
369         h2span_relay_t *relay;
370         char *alloc = NULL;
371
372         pthread_mutex_lock(&cluster_mtx);
373
374         /*
375          * On transaction start we allocate a new h2span_connect and
376          * acknowledge the request, leaving the transaction open.
377          * We then relay priority-selected SPANs.
378          */
379         if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
380                 state->func = hammer2_lnk_conn;
381
382                 fprintf(stderr, "LNK_CONN(%08x): %s/%s\n",
383                         (uint32_t)msg->any.head.msgid,
384                         hammer2_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
385                                             &alloc),
386                         msg->any.lnk_conn.label);
387                 free(alloc);
388
389                 conn = hammer2_alloc(sizeof(*conn));
390
391                 RB_INIT(&conn->tree);
392                 conn->state = state;
393                 state->any.conn = conn;
394                 TAILQ_INSERT_TAIL(&connq, conn, entry);
395
396                 hammer2_msg_result(state->iocom, msg, 0);
397
398                 /*
399                  * Span-synchronize all nodes with the new connection
400                  */
401                 hammer2_relay_scan(conn, NULL);
402         }
403
404         /*
405          * On transaction terminate we clean out our h2span_connect
406          * and acknowledge the request, closing the transaction.
407          */
408         if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
409                 fprintf(stderr, "LNK_CONN: Terminated\n");
410                 conn = state->any.conn;
411                 assert(conn);
412
413                 /*
414                  * Clean out all relays.  This requires terminating each
415                  * relay transaction.
416                  */
417                 while ((relay = RB_ROOT(&conn->tree)) != NULL) {
418                         hammer2_relay_delete(relay);
419                 }
420
421                 /*
422                  * Clean out conn
423                  */
424                 conn->state = NULL;
425                 msg->state->any.conn = NULL;
426                 TAILQ_REMOVE(&connq, conn, entry);
427                 hammer2_free(conn);
428
429                 hammer2_msg_reply(state->iocom, msg, 0);
430                 /* state invalid after reply */
431         }
432         pthread_mutex_unlock(&cluster_mtx);
433 }
434
435 void
436 hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
437 {
438         h2span_cluster_t dummy_cls;
439         h2span_node_t dummy_node;
440         h2span_cluster_t *cls;
441         h2span_node_t *node;
442         h2span_link_t *slink;
443         h2span_relay_t *relay;
444         char *alloc = NULL;
445
446         pthread_mutex_lock(&cluster_mtx);
447
448         /*
449          * On transaction start we initialize the tracking infrastructure
450          */
451         if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
452                 state->func = hammer2_lnk_span;
453
454                 msg->any.lnk_span.label[sizeof(msg->any.lnk_span.label)-1] = 0;
455
456                 fprintf(stderr, "LNK_SPAN: %s/%s dist=%d\n",
457                         hammer2_uuid_to_str(&msg->any.lnk_span.pfs_clid,
458                                             &alloc),
459                         msg->any.lnk_span.label,
460                         msg->any.lnk_span.dist);
461                 free(alloc);
462
463                 /*
464                  * Find the cluster
465                  */
466                 dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid;
467                 cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
468                 if (cls == NULL) {
469                         cls = hammer2_alloc(sizeof(*cls));
470                         cls->pfs_clid = msg->any.lnk_span.pfs_clid;
471                         RB_INIT(&cls->tree);
472                         RB_INSERT(h2span_cluster_tree, &cluster_tree, cls);
473                 }
474
475                 /*
476                  * Find the node
477                  */
478                 dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid;
479                 node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node);
480                 if (node == NULL) {
481                         node = hammer2_alloc(sizeof(*node));
482                         node->pfs_fsid = msg->any.lnk_span.pfs_fsid;
483                         node->cls = cls;
484                         RB_INIT(&node->tree);
485                         RB_INSERT(h2span_node_tree, &cls->tree, node);
486                         snprintf(node->label, sizeof(node->label),
487                                  "%s", msg->any.lnk_span.label);
488                 }
489
490                 /*
491                  * Create the link
492                  */
493                 assert(state->any.link == NULL);
494                 slink = hammer2_alloc(sizeof(*slink));
495                 TAILQ_INIT(&slink->relayq);
496                 slink->node = node;
497                 slink->dist = msg->any.lnk_span.dist;
498                 slink->state = state;
499                 state->any.link = slink;
500                 RB_INSERT(h2span_link_tree, &node->tree, slink);
501
502                 hammer2_relay_scan(NULL, node);
503         }
504
505         /*
506          * On transaction terminate we remove the tracking infrastructure.
507          */
508         if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
509                 slink = state->any.link;
510                 assert(slink != NULL);
511                 node = slink->node;
512                 cls = node->cls;
513
514                 /*
515                  * Clean out all relays.  This requires terminating each
516                  * relay transaction.
517                  */
518                 while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) {
519                         hammer2_relay_delete(relay);
520                 }
521
522                 /*
523                  * Clean out the topology
524                  */
525                 RB_REMOVE(h2span_link_tree, &node->tree, slink);
526                 if (RB_EMPTY(&node->tree)) {
527                         RB_REMOVE(h2span_node_tree, &cls->tree, node);
528                         if (RB_EMPTY(&cls->tree)) {
529                                 RB_REMOVE(h2span_cluster_tree,
530                                           &cluster_tree, cls);
531                                 hammer2_free(cls);
532                         }
533                         node->cls = NULL;
534                         hammer2_free(node);
535                         node = NULL;
536                 }
537                 state->any.link = NULL;
538                 slink->state = NULL;
539                 slink->node = NULL;
540                 hammer2_free(slink);
541
542                 /*
543                  * We have to terminate the transaction
544                  */
545                 hammer2_state_reply(state, 0);
546                 /* state invalid after reply */
547
548                 /*
549                  * If the node still exists issue any required updates.  If
550                  * it doesn't then all related relays have already been
551                  * removed and there's nothing left to do.
552                  */
553                 if (node)
554                         hammer2_relay_scan(NULL, node);
555         }
556
557         pthread_mutex_unlock(&cluster_mtx);
558 }
559
560 /*
561  * Messages received on relay SPANs.  These are open transactions so it is
562  * in fact possible for the other end to close the transaction.
563  *
564  * XXX MPRACE on state structure
565  */
566 static void
567 hammer2_lnk_relay(hammer2_state_t *state, hammer2_msg_t *msg)
568 {
569         h2span_relay_t *relay;
570
571         if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
572                 pthread_mutex_lock(&cluster_mtx);
573                 if ((relay = state->any.relay) != NULL) {
574                         hammer2_relay_delete(relay);
575                 } else {
576                         hammer2_state_reply(state, 0);
577                 }
578                 pthread_mutex_unlock(&cluster_mtx);
579         }
580 }
581
582 /*
583  * Update relay transactions for SPANs.
584  *
585  * Called with cluster_mtx held.
586  */
587 static void hammer2_relay_scan_specific(h2span_node_t *node,
588                                         h2span_connect_t *conn);
589
590 static void
591 hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node)
592 {
593         h2span_cluster_t *cls;
594
595         if (node) {
596                 /*
597                  * Iterate specific node
598                  */
599                 TAILQ_FOREACH(conn, &connq, entry)
600                         hammer2_relay_scan_specific(node, conn);
601         } else {
602                 /*
603                  * Full iteration.
604                  *
605                  * Iterate cluster ids, nodes, and either a specific connection
606                  * or all connections.
607                  */
608                 RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
609                         /*
610                          * Iterate node ids
611                          */
612                         RB_FOREACH(node, h2span_node_tree, &cls->tree) {
613                                 /*
614                                  * Synchronize the node's link (received SPANs)
615                                  * with each connection's relays.
616                                  */
617                                 if (conn) {
618                                         hammer2_relay_scan_specific(node, conn);
619                                 } else {
620                                         TAILQ_FOREACH(conn, &connq, entry) {
621                                             hammer2_relay_scan_specific(node,
622                                                                         conn);
623                                         }
624                                         assert(conn == NULL);
625                                 }
626                         }
627                 }
628         }
629 }
630
631 /*
632  * Update the relay'd SPANs for this (node, conn).
633  *
634  * Iterate links and adjust relays to match.  We only propagate the top link
635  * for now (XXX we want to propagate the top two).
636  *
637  * The hammer2_relay_scan_cmp() function locates the first relay element
638  * for any given node.  The relay elements will be sub-sorted by dist.
639  */
640 struct relay_scan_info {
641         h2span_node_t *node;
642         h2span_relay_t *relay;
643 };
644
645 static int
646 hammer2_relay_scan_cmp(h2span_relay_t *relay, void *arg)
647 {
648         struct relay_scan_info *info = arg;
649
650         if ((intptr_t)relay->link->node < (intptr_t)info->node)
651                 return(-1);
652         if ((intptr_t)relay->link->node > (intptr_t)info->node)
653                 return(1);
654         return(0);
655 }
656
657 static int
658 hammer2_relay_scan_callback(h2span_relay_t *relay, void *arg)
659 {
660         struct relay_scan_info *info = arg;
661
662         info->relay = relay;
663         return(-1);
664 }
665
666 static void
667 hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn)
668 {
669         struct relay_scan_info info;
670         h2span_relay_t *relay;
671         h2span_relay_t *next_relay;
672         h2span_link_t *slink;
673         int count = 2;
674
675         info.node = node;
676         info.relay = NULL;
677
678         /*
679          * Locate the first related relay for the connection.  relay will
680          * be NULL if there were none.
681          */
682         RB_SCAN(h2span_relay_tree, &conn->tree,
683                 hammer2_relay_scan_cmp, hammer2_relay_scan_callback, &info);
684         relay = info.relay;
685         info.relay = NULL;
686         if (relay)
687                 assert(relay->link->node == node);
688
689         if (DebugOpt > 8)
690                 fprintf(stderr, "relay scan for connection %p\n", conn);
691
692         /*
693          * Iterate the node's links (received SPANs) in distance order,
694          * lowest (best) dist first.
695          */
696         RB_FOREACH(slink, h2span_link_tree, &node->tree) {
697                 /*
698                  * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION.
699                  *
700                  * Track relays while iterating the best links and construct
701                  * missing relays when necessary.
702                  *
703                  * (If some prior better link was removed it would have also
704                  *  removed the relay, so the relay can only match exactly or
705                  *  be worst).
706                  */
707                 if (relay && relay->link == slink) {
708                         /*
709                          * Match, get the next relay to match against the
710                          * next slink.
711                          */
712                         relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
713                         if (--count == 0)
714                                 break;
715                 } else if (slink->dist > HAMMER2_SPAN_MAXDIST) {
716                         /*
717                          * No match but span distance is too great,
718                          * do not relay.  This prevents endless closed
719                          * loops with ever-incrementing distances when
720                          * the seed span is lost in the graph.
721                          */
722                         /* no code needed */
723                 } else {
724                         /*
725                          * No match, distance is ok, construct a new relay.
726                          */
727                         hammer2_msg_t *msg;
728
729                         assert(relay == NULL ||
730                                relay->link->dist <= slink->dist);
731                         relay = hammer2_alloc(sizeof(*relay));
732                         relay->conn = conn;
733                         relay->link = slink;
734
735                         RB_INSERT(h2span_relay_tree, &conn->tree, relay);
736                         TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
737
738                         msg = hammer2_msg_alloc(conn->state->iocom, 0,
739                                                 HAMMER2_LNK_SPAN |
740                                                 HAMMER2_MSGF_CREATE);
741                         msg->any.lnk_span = slink->state->msg->any.lnk_span;
742                         ++msg->any.lnk_span.dist; /* XXX add weighting */
743
744                         hammer2_msg_write(conn->state->iocom, msg,
745                                           hammer2_lnk_relay, relay,
746                                           &relay->state);
747                         fprintf(stderr,
748                                 "RELAY SPAN ON CLS=%p NODE=%p DIST=%d "
749                                 "FD %d state %p\n",
750                                 node->cls, node, slink->dist,
751                                 conn->state->iocom->sock_fd, relay->state);
752
753                         /*
754                          * Match (created new relay), get the next relay to
755                          * match against the next slink.
756                          */
757                         relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
758                         if (--count == 0)
759                                 break;
760                 }
761         }
762
763         /*
764          * Any remaining relay's belonging to this connection which match
765          * the node are in excess of the current aggregate spanning state
766          * and should be removed.
767          */
768         while (relay && relay->link->node == node) {
769                 next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
770                 hammer2_relay_delete(relay);
771                 relay = next_relay;
772         }
773 }
774
775 static
776 void
777 hammer2_relay_delete(h2span_relay_t *relay)
778 {
779         fprintf(stderr,
780                 "RELAY DELETE ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
781                 relay->link->node->cls, relay->link->node,
782                 relay->link->dist,
783                 relay->conn->state->iocom->sock_fd, relay->state);
784
785         RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
786         TAILQ_REMOVE(&relay->link->relayq, relay, entry);
787
788         if (relay->state) {
789                 relay->state->any.relay = NULL;
790                 hammer2_state_reply(relay->state, 0);
791                 /* state invalid after reply */
792                 relay->state = NULL;
793         }
794         relay->conn = NULL;
795         relay->link = NULL;
796         hammer2_free(relay);
797 }
798
799 /*
800  * Dumps the spanning tree
801  */
802 void
803 shell_tree(hammer2_iocom_t *iocom, char *cmdbuf __unused)
804 {
805         h2span_cluster_t *cls;
806         h2span_node_t *node;
807         h2span_link_t *slink;
808         char *uustr = NULL;
809
810         pthread_mutex_lock(&cluster_mtx);
811         RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
812                 iocom_printf(iocom, "Cluster %s\n",
813                              hammer2_uuid_to_str(&cls->pfs_clid, &uustr));
814                 RB_FOREACH(node, h2span_node_tree, &cls->tree) {
815                         iocom_printf(iocom, "    Node %s (%s)\n",
816                                  hammer2_uuid_to_str(&node->pfs_fsid, &uustr),
817                                  node->label);
818                         RB_FOREACH(slink, h2span_link_tree, &node->tree) {
819                                 iocom_printf(iocom, "\tLink dist=%d via %d\n",
820                                              slink->dist,
821                                              slink->state->iocom->sock_fd);
822                         }
823                 }
824         }
825         pthread_mutex_unlock(&cluster_mtx);
826         if (uustr)
827                 free(uustr);
828 #if 0
829         TAILQ_FOREACH(conn, &connq, entry) {
830         }
831 #endif
832 }