5e7eb27ba704f3e05e0f0852ac5d9fe34adb833e
[dragonfly.git] / lib / libdmsg / msg_lnk.c
1 /*
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS
36  *
37  * This code supports the LNK_SPAN protocol.  Essentially all PFS's
38  * clients and services rendezvous with the userland hammer2 service and
39  * open LNK_SPAN transactions using a message header linkid of 0,
40  * registering any PFS's they have connectivity to with us.
41  *
42  * --
43  *
44  * Each registration maintains its own open LNK_SPAN message transaction.
45  * The SPANs are collected, aggregated, and retransmitted over available
46  * connections through the maintainance of additional LNK_SPAN message
47  * transactions on each link.
48  *
49  * The msgid for each active LNK_SPAN transaction we receive allows us to
50  * send a message to the target PFS (which might be one of many belonging
51  * to the same cluster), by specifying that msgid as the linkid in any
52  * message we send to the target PFS.
53  *
54  * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit
55  * (and remember we will maintain multiple open LNK_SPAN transactions on
56  * each connection representing the topology span, so every node sees every
57  * other node as a separate open transaction).  So, similarly the msgid for
58  * these active transactions which we initiated can be used by the other
59  * end to route messages through us to another node, ultimately winding up
60  * at the identified hammer2 PFS.  We have to adjust the spanid in the message
61  * header at each hop to be representative of the outgoing LNK_SPAN we
62  * are forwarding the message through.
63  *
64  * --
65  *
66  * If we were to retransmit every LNK_SPAN transaction we receive it would
67  * create a huge mess, so we have to aggregate all received LNK_SPAN
68  * transactions, sort them by the fsid (the cluster) and sub-sort them by
69  * the pfs_fsid (individual nodes in the cluster), and only retransmit
70  * (create outgoing transactions) for a subset of the nearest distance-hops
71  * for each individual node.
72  *
73  * The higher level protocols can then issue transactions to the nodes making
74  * up a cluster to perform all actions required.
75  *
76  * --
77  *
78  * Since this is a large topology and a spanning tree protocol, links can
79  * go up and down all the time.  Any time a link goes down its transaction
80  * is closed.  The transaction has to be closed on both ends before we can
81  * delete (and potentially reuse) the related spanid.  The LNK_SPAN being
82  * closed may have been propagated out to other connections and those related
83  * LNK_SPANs are also closed.  Ultimately all routes via the lost LNK_SPAN
84  * go away, ultimately reaching all sources and all targets.
85  *
86  * Any messages in-transit using a route that goes away will be thrown away.
87  * Open transactions are only tracked at the two end-points.  When a link
88  * failure propagates to an end-point the related open transactions lose
89  * their spanid and are automatically aborted.
90  *
91  * It is important to note that internal route nodes cannot just associate
92  * a lost LNK_SPAN transaction with another route to the same destination.
93  * Message transactions MUST be serialized and MUST be ordered.  All messages
94  * for a transaction must run over the same route.  So if the route used by
95  * an active transaction is lost, the related messages will be fully aborted
96  * and the higher protocol levels will retry as appropriate.
97  *
98  * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation
99  * back to the originator.  Only the originator keeps tracks of a message.
100  * Routers just pass it through.  If a route is lost during transit the
101  * message is simply thrown away.
102  *
103  * It is also important to note that several paths to the same PFS can be
104  * propagated along the same link, which allows concurrency and even
105  * redundancy over several network interfaces or via different routes through
106  * the topology.  Any given transaction will use only a single route but busy
107  * servers will often have hundreds of transactions active simultaniously,
108  * so having multiple active paths through the network topology for A<->B
109  * will improve performance.
110  *
111  * --
112  *
113  * Most protocols consolidate operations rather than simply relaying them.
114  * This is particularly true of LEAF protocols (such as strict HAMMER2
115  * clients), of which there can be millions connecting into the cluster at
116  * various points.  The SPAN protocol is not used for these LEAF elements.
117  *
118  * Instead the primary service they connect to implements a proxy for the
119  * client protocols so the core topology only has to propagate a couple of
120  * LNK_SPANs and not millions.  LNK_SPANs are meant to be used only for
121  * core master nodes and satellite slaves and cache nodes.
122  */
123
124 #include "dmsg_local.h"
125
126 /*
127  * Maximum spanning tree distance.  This has the practical effect of
128  * stopping tail-chasing closed loops when a feeder span is lost.
129  */
130 #define DMSG_SPAN_MAXDIST       16
131
132 /*
133  * RED-BLACK TREE DEFINITIONS
134  *
135  * We need to track:
136  *
137  * (1) shared fsid's (a cluster).
138  * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions.
139  *
140  * We need to aggegate all active LNK_SPANs, aggregate, and create our own
141  * outgoing LNK_SPAN transactions on each of our connections representing
142  * the aggregated state.
143  *
144  * h2span_conn          - list of iocom connections who wish to receive SPAN
145  *                        propagation from other connections.  Might contain
146  *                        a filter string.  Only iocom's with an open
147  *                        LNK_CONN transactions are applicable for SPAN
148  *                        propagation.
149  *
150  * h2span_relay         - List of links relayed (via SPAN).  Essentially
151  *                        each relay structure represents a LNK_SPAN
152  *                        transaction that we initiated, verses h2span_link
153  *                        which is a LNK_SPAN transaction that we received.
154  *
155  * --
156  *
157  * h2span_cluster       - Organizes the shared fsid's.  One structure for
158  *                        each cluster.
159  *
160  * h2span_node          - Organizes the nodes in a cluster.  One structure
161  *                        for each unique {cluster,node}, aka {fsid, pfs_fsid}.
162  *
163  * h2span_link          - Organizes all incoming and outgoing LNK_SPAN message
164  *                        transactions related to a node.
165  *
166  *                        One h2span_link structure for each incoming LNK_SPAN
167  *                        transaction.  Links selected for propagation back
168  *                        out are also where the outgoing LNK_SPAN messages
169  *                        are indexed into (so we can propagate changes).
170  *
171  *                        The h2span_link's use a red-black tree to sort the
172  *                        distance hop metric for the incoming LNK_SPAN.  We
173  *                        then select the top N for outgoing.  When the
174  *                        topology changes the top N may also change and cause
175  *                        new outgoing LNK_SPAN transactions to be opened
176  *                        and less desireable ones to be closed, causing
177  *                        transactional aborts within the message flow in
178  *                        the process.
179  *
180  * Also note            - All outgoing LNK_SPAN message transactions are also
181  *                        entered into a red-black tree for use by the routing
182  *                        function.  This is handled by msg.c in the state
183  *                        code, not here.
184  */
185
186 struct h2span_link;
187 struct h2span_relay;
188 TAILQ_HEAD(h2span_media_queue, h2span_media);
189 TAILQ_HEAD(h2span_conn_queue, h2span_conn);
190 TAILQ_HEAD(h2span_relay_queue, h2span_relay);
191
192 RB_HEAD(h2span_cluster_tree, h2span_cluster);
193 RB_HEAD(h2span_node_tree, h2span_node);
194 RB_HEAD(h2span_link_tree, h2span_link);
195 RB_HEAD(h2span_relay_tree, h2span_relay);
196
197 /*
198  * This represents a media
199  */
200 struct h2span_media {
201         TAILQ_ENTRY(h2span_media) entry;
202         uuid_t  mediaid;
203         int     refs;
204         struct h2span_media_config {
205                 dmsg_vol_data_t         copy_run;
206                 dmsg_vol_data_t         copy_pend;
207                 pthread_t               thread;
208                 pthread_cond_t          cond;
209                 int                     ctl;
210                 int                     fd;
211                 dmsg_iocom_t            iocom;
212                 pthread_t               iocom_thread;
213                 enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
214         } config[DMSG_COPYID_COUNT];
215 };
216
217 typedef struct h2span_media_config h2span_media_config_t;
218
219 #define H2CONFCTL_STOP          0x00000001
220 #define H2CONFCTL_UPDATE        0x00000002
221
222 /*
223  * Received LNK_CONN transaction enables SPAN protocol over connection.
224  * (may contain filter).  Typically one for each mount and several may
225  * share the same media.
226  */
227 struct h2span_conn {
228         TAILQ_ENTRY(h2span_conn) entry;
229         struct h2span_relay_tree tree;
230         struct h2span_media *media;
231         dmsg_state_t *state;
232 };
233
234 /*
235  * All received LNK_SPANs are organized by cluster (pfs_clid),
236  * node (pfs_fsid), and link (received LNK_SPAN transaction).
237  */
238 struct h2span_cluster {
239         RB_ENTRY(h2span_cluster) rbnode;
240         struct h2span_node_tree tree;
241         uuid_t  pfs_clid;               /* shared fsid */
242         uint8_t peer_type;
243         char    cl_label[128];          /* cluster label (typ PEER_BLOCK) */
244         int     refs;                   /* prevents destruction */
245 };
246
247 struct h2span_node {
248         RB_ENTRY(h2span_node) rbnode;
249         struct h2span_link_tree tree;
250         struct h2span_cluster *cls;
251         uint8_t pfs_type;
252         uuid_t  pfs_fsid;               /* unique fsid */
253         char    fs_label[128];          /* fs label (typ PEER_HAMMER2) */
254 };
255
256 struct h2span_link {
257         RB_ENTRY(h2span_link) rbnode;
258         dmsg_state_t    *state;         /* state<->link */
259         struct h2span_node *node;       /* related node */
260         int32_t dist;
261         struct h2span_relay_queue relayq; /* relay out */
262         struct dmsg_router *router;     /* route out this link */
263 };
264
265 /*
266  * Any LNK_SPAN transactions we receive which are relayed out other
267  * connections utilize this structure to track the LNK_SPAN transaction
268  * we initiate on the other connections, if selected for relay.
269  *
270  * In many respects this is the core of the protocol... actually figuring
271  * out what LNK_SPANs to relay.  The spanid used for relaying is the
272  * address of the 'state' structure, which is why h2span_relay has to
273  * be entered into a RB-TREE based at h2span_conn (so we can look
274  * up the spanid to validate it).
275  *
276  * NOTE: Messages can be received via the LNK_SPAN transaction the
277  *       relay maintains, and can be replied via relay->router, but
278  *       messages are NOT initiated via a relay.  Messages are initiated
279  *       via incoming links (h2span_link's).
280  *
281  *       relay->link represents the link being relayed, NOT the LNK_SPAN
282  *       transaction the relay is holding open.
283  */
284 struct h2span_relay {
285         RB_ENTRY(h2span_relay) rbnode;  /* from h2span_conn */
286         TAILQ_ENTRY(h2span_relay) entry; /* from link */
287         struct h2span_conn *conn;
288         dmsg_state_t    *state;         /* transmitted LNK_SPAN */
289         struct h2span_link *link;       /* LNK_SPAN being relayed */
290         struct dmsg_router      *router;/* route out this relay */
291 };
292
293
294 typedef struct h2span_media h2span_media_t;
295 typedef struct h2span_conn h2span_conn_t;
296 typedef struct h2span_cluster h2span_cluster_t;
297 typedef struct h2span_node h2span_node_t;
298 typedef struct h2span_link h2span_link_t;
299 typedef struct h2span_relay h2span_relay_t;
300
301 #define dmsg_termstr(array)     _dmsg_termstr((array), sizeof(array))
302
303 static __inline
304 void
305 _dmsg_termstr(char *base, size_t size)
306 {
307         base[size-1] = 0;
308 }
309
310 /*
311  * Cluster peer_type, uuid, AND label must match for a match
312  */
313 static
314 int
315 h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2)
316 {
317         int r;
318
319         if (cls1->peer_type < cls2->peer_type)
320                 return(-1);
321         if (cls1->peer_type > cls2->peer_type)
322                 return(1);
323         r = uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL);
324         if (r == 0)
325                 r = strcmp(cls1->cl_label, cls2->cl_label);
326
327         return r;
328 }
329
330 /*
331  * Match against the uuid.  Currently we never match against the label.
332  */
333 static
334 int
335 h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
336 {
337         int r;
338
339         r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL);
340         return (r);
341 }
342
343 /*
344  * Sort/subsort must match h2span_relay_cmp() under any given node
345  * to make the aggregation algorithm easier, so the best links are
346  * in the same sorted order as the best relays.
347  *
348  * NOTE: We cannot use link*->state->msgid because this msgid is created
349  *       by each remote host and thus might wind up being the same.
350  */
351 static
352 int
353 h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
354 {
355         if (link1->dist < link2->dist)
356                 return(-1);
357         if (link1->dist > link2->dist)
358                 return(1);
359 #if 1
360         if ((uintptr_t)link1->state < (uintptr_t)link2->state)
361                 return(-1);
362         if ((uintptr_t)link1->state > (uintptr_t)link2->state)
363                 return(1);
364 #else
365         if (link1->state->msgid < link2->state->msgid)
366                 return(-1);
367         if (link1->state->msgid > link2->state->msgid)
368                 return(1);
369 #endif
370         return(0);
371 }
372
373 /*
374  * Relay entries are sorted by node, subsorted by distance and link
375  * address (so we can match up the conn->tree relay topology with
376  * a node's link topology).
377  */
378 static
379 int
380 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
381 {
382         h2span_link_t *link1 = relay1->link;
383         h2span_link_t *link2 = relay2->link;
384
385         if ((intptr_t)link1->node < (intptr_t)link2->node)
386                 return(-1);
387         if ((intptr_t)link1->node > (intptr_t)link2->node)
388                 return(1);
389         if (link1->dist < link2->dist)
390                 return(-1);
391         if (link1->dist > link2->dist)
392                 return(1);
393 #if 1
394         if ((uintptr_t)link1->state < (uintptr_t)link2->state)
395                 return(-1);
396         if ((uintptr_t)link1->state > (uintptr_t)link2->state)
397                 return(1);
398 #else
399         if (link1->state->msgid < link2->state->msgid)
400                 return(-1);
401         if (link1->state->msgid > link2->state->msgid)
402                 return(1);
403 #endif
404         return(0);
405 }
406
407 RB_PROTOTYPE_STATIC(h2span_cluster_tree, h2span_cluster,
408              rbnode, h2span_cluster_cmp);
409 RB_PROTOTYPE_STATIC(h2span_node_tree, h2span_node,
410              rbnode, h2span_node_cmp);
411 RB_PROTOTYPE_STATIC(h2span_link_tree, h2span_link,
412              rbnode, h2span_link_cmp);
413 RB_PROTOTYPE_STATIC(h2span_relay_tree, h2span_relay,
414              rbnode, h2span_relay_cmp);
415
416 RB_GENERATE_STATIC(h2span_cluster_tree, h2span_cluster,
417              rbnode, h2span_cluster_cmp);
418 RB_GENERATE_STATIC(h2span_node_tree, h2span_node,
419              rbnode, h2span_node_cmp);
420 RB_GENERATE_STATIC(h2span_link_tree, h2span_link,
421              rbnode, h2span_link_cmp);
422 RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay,
423              rbnode, h2span_relay_cmp);
424
425 /*
426  * Global mutex protects cluster_tree lookups, connq, mediaq.
427  */
428 static pthread_mutex_t cluster_mtx;
429 static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
430 static struct h2span_conn_queue connq = TAILQ_HEAD_INITIALIZER(connq);
431 static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq);
432
433 static void dmsg_lnk_span(dmsg_msg_t *msg);
434 static void dmsg_lnk_conn(dmsg_msg_t *msg);
435 static void dmsg_lnk_relay(dmsg_msg_t *msg);
436 static void dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node);
437 static void dmsg_relay_delete(h2span_relay_t *relay);
438
439 static void *dmsg_volconf_thread(void *info);
440 static void dmsg_volconf_stop(h2span_media_config_t *conf);
441 static void dmsg_volconf_start(h2span_media_config_t *conf,
442                                 const char *hostname);
443
444 void
445 dmsg_msg_lnk_signal(dmsg_router_t *router __unused)
446 {
447         pthread_mutex_lock(&cluster_mtx);
448         dmsg_relay_scan(NULL, NULL);
449         pthread_mutex_unlock(&cluster_mtx);
450 }
451
452 /*
453  * Receive a DMSG_PROTO_LNK message.  This only called for
454  * one-way and opening-transactions since state->func will be assigned
455  * in all other cases.
456  */
457 void
458 dmsg_msg_lnk(dmsg_msg_t *msg)
459 {
460         switch(msg->any.head.cmd & DMSGF_BASECMDMASK) {
461         case DMSG_LNK_CONN:
462                 dmsg_lnk_conn(msg);
463                 break;
464         case DMSG_LNK_SPAN:
465                 dmsg_lnk_span(msg);
466                 break;
467         default:
468                 fprintf(stderr,
469                         "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
470                 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
471                 /* state invalid after reply */
472                 break;
473         }
474 }
475
476 void
477 dmsg_lnk_conn(dmsg_msg_t *msg)
478 {
479         dmsg_state_t *state = msg->state;
480         h2span_media_t *media;
481         h2span_media_config_t *conf;
482         h2span_conn_t *conn;
483         h2span_relay_t *relay;
484         char *alloc = NULL;
485         int i;
486
487         pthread_mutex_lock(&cluster_mtx);
488
489         switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
490         case DMSG_LNK_CONN | DMSGF_CREATE:
491         case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
492                 /*
493                  * On transaction start we allocate a new h2span_conn and
494                  * acknowledge the request, leaving the transaction open.
495                  * We then relay priority-selected SPANs.
496                  */
497                 fprintf(stderr, "LNK_CONN(%08x): %s/%s/%s\n",
498                         (uint32_t)msg->any.head.msgid,
499                         dmsg_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
500                                             &alloc),
501                         msg->any.lnk_conn.cl_label,
502                         msg->any.lnk_conn.fs_label);
503                 free(alloc);
504
505                 conn = dmsg_alloc(sizeof(*conn));
506
507                 RB_INIT(&conn->tree);
508                 conn->state = state;
509                 state->func = dmsg_lnk_conn;
510                 state->any.conn = conn;
511                 TAILQ_INSERT_TAIL(&connq, conn, entry);
512
513                 /*
514                  * Set up media
515                  */
516                 TAILQ_FOREACH(media, &mediaq, entry) {
517                         if (uuid_compare(&msg->any.lnk_conn.mediaid,
518                                          &media->mediaid, NULL) == 0) {
519                                 break;
520                         }
521                 }
522                 if (media == NULL) {
523                         media = dmsg_alloc(sizeof(*media));
524                         media->mediaid = msg->any.lnk_conn.mediaid;
525                         TAILQ_INSERT_TAIL(&mediaq, media, entry);
526                 }
527                 conn->media = media;
528                 ++media->refs;
529
530                 if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
531                         dmsg_msg_result(msg, 0);
532                         dmsg_router_signal(msg->router);
533                         break;
534                 }
535                 /* FALL THROUGH */
536         case DMSG_LNK_CONN | DMSGF_DELETE:
537         case DMSG_LNK_ERROR | DMSGF_DELETE:
538 deleteconn:
539                 /*
540                  * On transaction terminate we clean out our h2span_conn
541                  * and acknowledge the request, closing the transaction.
542                  */
543                 fprintf(stderr, "LNK_CONN: Terminated\n");
544                 conn = state->any.conn;
545                 assert(conn);
546
547                 /*
548                  * Clean out the media structure. If refs drops to zero we
549                  * also clean out the media config threads.  These threads
550                  * maintain span connections to other hammer2 service daemons.
551                  */
552                 media = conn->media;
553                 if (--media->refs == 0) {
554                         fprintf(stderr, "Shutting down media spans\n");
555                         for (i = 0; i < DMSG_COPYID_COUNT; ++i) {
556                                 conf = &media->config[i];
557
558                                 if (conf->thread == NULL)
559                                         continue;
560                                 conf->ctl = H2CONFCTL_STOP;
561                                 pthread_cond_signal(&conf->cond);
562                         }
563                         for (i = 0; i < DMSG_COPYID_COUNT; ++i) {
564                                 conf = &media->config[i];
565
566                                 if (conf->thread == NULL)
567                                         continue;
568                                 pthread_mutex_unlock(&cluster_mtx);
569                                 pthread_join(conf->thread, NULL);
570                                 pthread_mutex_lock(&cluster_mtx);
571                                 conf->thread = NULL;
572                                 pthread_cond_destroy(&conf->cond);
573                         }
574                         fprintf(stderr, "Media shutdown complete\n");
575                         TAILQ_REMOVE(&mediaq, media, entry);
576                         dmsg_free(media);
577                 }
578
579                 /*
580                  * Clean out all relays.  This requires terminating each
581                  * relay transaction.
582                  */
583                 while ((relay = RB_ROOT(&conn->tree)) != NULL) {
584                         dmsg_relay_delete(relay);
585                 }
586
587                 /*
588                  * Clean out conn
589                  */
590                 conn->media = NULL;
591                 conn->state = NULL;
592                 msg->state->any.conn = NULL;
593                 TAILQ_REMOVE(&connq, conn, entry);
594                 dmsg_free(conn);
595
596                 dmsg_msg_reply(msg, 0);
597                 /* state invalid after reply */
598                 break;
599         case DMSG_LNK_VOLCONF:
600                 /*
601                  * One-way volume-configuration message is transmitted
602                  * over the open LNK_CONN transaction.
603                  */
604                 fprintf(stderr, "RECEIVED VOLCONF\n");
605                 if (msg->any.lnk_volconf.index < 0 ||
606                     msg->any.lnk_volconf.index >= DMSG_COPYID_COUNT) {
607                         fprintf(stderr, "VOLCONF: ILLEGAL INDEX %d\n",
608                                 msg->any.lnk_volconf.index);
609                         break;
610                 }
611                 if (msg->any.lnk_volconf.copy.path[sizeof(msg->any.lnk_volconf.copy.path) - 1] != 0 ||
612                     msg->any.lnk_volconf.copy.path[0] == 0) {
613                         fprintf(stderr, "VOLCONF: ILLEGAL PATH %d\n",
614                                 msg->any.lnk_volconf.index);
615                         break;
616                 }
617                 conn = msg->state->any.conn;
618                 if (conn == NULL) {
619                         fprintf(stderr, "VOLCONF: LNK_CONN is missing\n");
620                         break;
621                 }
622                 conf = &conn->media->config[msg->any.lnk_volconf.index];
623                 conf->copy_pend = msg->any.lnk_volconf.copy;
624                 conf->ctl |= H2CONFCTL_UPDATE;
625                 if (conf->thread == NULL) {
626                         fprintf(stderr, "VOLCONF THREAD STARTED\n");
627                         pthread_cond_init(&conf->cond, NULL);
628                         pthread_create(&conf->thread, NULL,
629                                        dmsg_volconf_thread, (void *)conf);
630                 }
631                 pthread_cond_signal(&conf->cond);
632                 break;
633         default:
634                 /*
635                  * Failsafe
636                  */
637                 if (msg->any.head.cmd & DMSGF_DELETE)
638                         goto deleteconn;
639                 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
640                 break;
641         }
642         pthread_mutex_unlock(&cluster_mtx);
643 }
644
645 void
646 dmsg_lnk_span(dmsg_msg_t *msg)
647 {
648         dmsg_state_t *state = msg->state;
649         h2span_cluster_t dummy_cls;
650         h2span_node_t dummy_node;
651         h2span_cluster_t *cls;
652         h2span_node_t *node;
653         h2span_link_t *slink;
654         h2span_relay_t *relay;
655         char *alloc = NULL;
656
657         assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
658
659         pthread_mutex_lock(&cluster_mtx);
660
661         /*
662          * On transaction start we initialize the tracking infrastructure
663          */
664         if (msg->any.head.cmd & DMSGF_CREATE) {
665                 assert(state->func == NULL);
666                 state->func = dmsg_lnk_span;
667
668                 dmsg_termstr(msg->any.lnk_span.cl_label);
669                 dmsg_termstr(msg->any.lnk_span.fs_label);
670
671                 /*
672                  * Find the cluster
673                  */
674                 dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid;
675                 dummy_cls.peer_type = msg->any.lnk_span.peer_type;
676                 bcopy(msg->any.lnk_span.cl_label,
677                       dummy_cls.cl_label,
678                       sizeof(dummy_cls.cl_label));
679                 cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
680                 if (cls == NULL) {
681                         cls = dmsg_alloc(sizeof(*cls));
682                         cls->pfs_clid = msg->any.lnk_span.pfs_clid;
683                         cls->peer_type = msg->any.lnk_span.peer_type;
684                         bcopy(msg->any.lnk_span.cl_label,
685                               cls->cl_label,
686                               sizeof(cls->cl_label));
687                         RB_INIT(&cls->tree);
688                         RB_INSERT(h2span_cluster_tree, &cluster_tree, cls);
689                 }
690
691                 /*
692                  * Find the node
693                  */
694                 dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid;
695                 node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node);
696                 if (node == NULL) {
697                         node = dmsg_alloc(sizeof(*node));
698                         node->pfs_fsid = msg->any.lnk_span.pfs_fsid;
699                         bcopy(msg->any.lnk_span.fs_label,
700                               node->fs_label,
701                               sizeof(node->fs_label));
702                         node->cls = cls;
703                         RB_INIT(&node->tree);
704                         RB_INSERT(h2span_node_tree, &cls->tree, node);
705                 }
706
707                 /*
708                  * Create the link
709                  */
710                 assert(state->any.link == NULL);
711                 slink = dmsg_alloc(sizeof(*slink));
712                 TAILQ_INIT(&slink->relayq);
713                 slink->node = node;
714                 slink->dist = msg->any.lnk_span.dist;
715                 slink->state = state;
716                 state->any.link = slink;
717
718                 /*
719                  * Embedded router structure in link for message forwarding.
720                  *
721                  * The spanning id for the router is the message id of
722                  * the SPAN link it is embedded in, allowing messages to
723                  * be routed via &slink->router.
724                  */
725                 slink->router = dmsg_router_alloc();
726                 slink->router->iocom = state->iocom;
727                 slink->router->link = slink;
728                 slink->router->target = state->msgid;
729                 dmsg_router_connect(slink->router);
730
731                 RB_INSERT(h2span_link_tree, &node->tree, slink);
732
733                 fprintf(stderr,
734                         "LNK_SPAN(thr %p): %p %s cl=%s fs=%s dist=%d\n",
735                         msg->router->iocom,
736                         slink,
737                         dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid, &alloc),
738                         msg->any.lnk_span.cl_label,
739                         msg->any.lnk_span.fs_label,
740                         msg->any.lnk_span.dist);
741                 free(alloc);
742 #if 0
743                 dmsg_relay_scan(NULL, node);
744 #endif
745                 dmsg_router_signal(msg->router);
746         }
747
748         /*
749          * On transaction terminate we remove the tracking infrastructure.
750          */
751         if (msg->any.head.cmd & DMSGF_DELETE) {
752                 slink = state->any.link;
753                 assert(slink != NULL);
754                 node = slink->node;
755                 cls = node->cls;
756
757                 fprintf(stderr, "LNK_DELE(thr %p): %p %s cl=%s fs=%s dist=%d\n",
758                         msg->router->iocom,
759                         slink,
760                         dmsg_uuid_to_str(&cls->pfs_clid, &alloc),
761                         state->msg->any.lnk_span.cl_label,
762                         state->msg->any.lnk_span.fs_label,
763                         state->msg->any.lnk_span.dist);
764                 free(alloc);
765
766                 /*
767                  * Remove the router from consideration
768                  */
769                 dmsg_router_disconnect(&slink->router);
770
771                 /*
772                  * Clean out all relays.  This requires terminating each
773                  * relay transaction.
774                  */
775                 while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) {
776                         dmsg_relay_delete(relay);
777                 }
778
779                 /*
780                  * Clean out the topology
781                  */
782                 RB_REMOVE(h2span_link_tree, &node->tree, slink);
783                 if (RB_EMPTY(&node->tree)) {
784                         RB_REMOVE(h2span_node_tree, &cls->tree, node);
785                         if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
786                                 RB_REMOVE(h2span_cluster_tree,
787                                           &cluster_tree, cls);
788                                 dmsg_free(cls);
789                         }
790                         node->cls = NULL;
791                         dmsg_free(node);
792                         node = NULL;
793                 }
794                 state->any.link = NULL;
795                 slink->state = NULL;
796                 slink->node = NULL;
797                 dmsg_free(slink);
798
799                 /*
800                  * We have to terminate the transaction
801                  */
802                 dmsg_state_reply(state, 0);
803                 /* state invalid after reply */
804
805                 /*
806                  * If the node still exists issue any required updates.  If
807                  * it doesn't then all related relays have already been
808                  * removed and there's nothing left to do.
809                  */
810 #if 0
811                 if (node)
812                         dmsg_relay_scan(NULL, node);
813 #endif
814                 if (node)
815                         dmsg_router_signal(msg->router);
816         }
817
818         pthread_mutex_unlock(&cluster_mtx);
819 }
820
821 /*
822  * Messages received on relay SPANs.  These are open transactions so it is
823  * in fact possible for the other end to close the transaction.
824  *
825  * XXX MPRACE on state structure
826  */
827 static void
828 dmsg_lnk_relay(dmsg_msg_t *msg)
829 {
830         dmsg_state_t *state = msg->state;
831         h2span_relay_t *relay;
832
833         assert(msg->any.head.cmd & DMSGF_REPLY);
834
835         if (msg->any.head.cmd & DMSGF_DELETE) {
836                 pthread_mutex_lock(&cluster_mtx);
837                 if ((relay = state->any.relay) != NULL) {
838                         dmsg_relay_delete(relay);
839                 } else {
840                         dmsg_state_reply(state, 0);
841                 }
842                 pthread_mutex_unlock(&cluster_mtx);
843         }
844 }
845
846 /*
847  * Update relay transactions for SPANs.
848  *
849  * Called with cluster_mtx held.
850  */
851 static void dmsg_relay_scan_specific(h2span_node_t *node,
852                                         h2span_conn_t *conn);
853
854 static void
855 dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node)
856 {
857         h2span_cluster_t *cls;
858
859         if (node) {
860                 /*
861                  * Iterate specific node
862                  */
863                 TAILQ_FOREACH(conn, &connq, entry)
864                         dmsg_relay_scan_specific(node, conn);
865         } else {
866                 /*
867                  * Full iteration.
868                  *
869                  * Iterate cluster ids, nodes, and either a specific connection
870                  * or all connections.
871                  */
872                 RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
873                         /*
874                          * Iterate node ids
875                          */
876                         RB_FOREACH(node, h2span_node_tree, &cls->tree) {
877                                 /*
878                                  * Synchronize the node's link (received SPANs)
879                                  * with each connection's relays.
880                                  */
881                                 if (conn) {
882                                         dmsg_relay_scan_specific(node, conn);
883                                 } else {
884                                         TAILQ_FOREACH(conn, &connq, entry) {
885                                             dmsg_relay_scan_specific(node,
886                                                                         conn);
887                                         }
888                                         assert(conn == NULL);
889                                 }
890                         }
891                 }
892         }
893 }
894
895 /*
896  * Update the relay'd SPANs for this (node, conn).
897  *
898  * Iterate links and adjust relays to match.  We only propagate the top link
899  * for now (XXX we want to propagate the top two).
900  *
901  * The dmsg_relay_scan_cmp() function locates the first relay element
902  * for any given node.  The relay elements will be sub-sorted by dist.
903  */
904 struct relay_scan_info {
905         h2span_node_t *node;
906         h2span_relay_t *relay;
907 };
908
909 static int
910 dmsg_relay_scan_cmp(h2span_relay_t *relay, void *arg)
911 {
912         struct relay_scan_info *info = arg;
913
914         if ((intptr_t)relay->link->node < (intptr_t)info->node)
915                 return(-1);
916         if ((intptr_t)relay->link->node > (intptr_t)info->node)
917                 return(1);
918         return(0);
919 }
920
921 static int
922 dmsg_relay_scan_callback(h2span_relay_t *relay, void *arg)
923 {
924         struct relay_scan_info *info = arg;
925
926         info->relay = relay;
927         return(-1);
928 }
929
930 static void
931 dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
932 {
933         struct relay_scan_info info;
934         h2span_relay_t *relay;
935         h2span_relay_t *next_relay;
936         h2span_link_t *slink;
937         dmsg_lnk_conn_t *lconn;
938         dmsg_lnk_span_t *lspan;
939         dmsg_msg_t *msg;
940         int count = 2;
941
942         info.node = node;
943         info.relay = NULL;
944
945         /*
946          * Locate the first related relay for the node on this connection.
947          * relay will be NULL if there were none.
948          */
949         RB_SCAN(h2span_relay_tree, &conn->tree,
950                 dmsg_relay_scan_cmp, dmsg_relay_scan_callback, &info);
951         relay = info.relay;
952         info.relay = NULL;
953         if (relay)
954                 assert(relay->link->node == node);
955
956         if (DMsgDebugOpt > 8)
957                 fprintf(stderr, "relay scan for connection %p\n", conn);
958
959         /*
960          * Iterate the node's links (received SPANs) in distance order,
961          * lowest (best) dist first.
962          *
963          * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION.
964          *
965          * Track relays while iterating the best links and construct
966          * missing relays when necessary.
967          *
968          * (If some prior better link was removed it would have also
969          *  removed the relay, so the relay can only match exactly or
970          *  be worse).
971          */
972         RB_FOREACH(slink, h2span_link_tree, &node->tree) {
973                 /*
974                  * Match, relay already in-place, get the next
975                  * relay to match against the next slink.
976                  */
977                 if (relay && relay->link == slink) {
978                         relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
979                         if (--count == 0)
980                                 break;
981                         continue;
982                 }
983
984                 /*
985                  * We might want this SLINK, if it passes our filters.
986                  *
987                  * The spanning tree can cause closed loops so we have
988                  * to limit slink->dist.
989                  */
990                 if (slink->dist > DMSG_SPAN_MAXDIST)
991                         break;
992
993                 /*
994                  * Don't bother transmitting a LNK_SPAN out the same
995                  * connection it came in on.  Trivial optimization.
996                  */
997                 if (slink->state->iocom == conn->state->iocom)
998                         break;
999
1000                 /*
1001                  * NOTE ON FILTERS: The protocol spec allows non-requested
1002                  * SPANs to be transmitted, the other end is expected to
1003                  * leave their transactions open but otherwise ignore them.
1004                  *
1005                  * Don't bother transmitting if the remote connection
1006                  * is not accepting this SPAN's peer_type.
1007                  */
1008                 lspan = &slink->state->msg->any.lnk_span;
1009                 lconn = &conn->state->msg->any.lnk_conn;
1010                 if (((1LLU << lspan->peer_type) & lconn->peer_mask) == 0)
1011                         break;
1012
1013                 /*
1014                  * Do not give pure clients visibility to other pure clients
1015                  */
1016                 if (lconn->pfs_type == DMSG_PFSTYPE_CLIENT &&
1017                     lspan->pfs_type == DMSG_PFSTYPE_CLIENT) {
1018                         break;
1019                 }
1020
1021                 /*
1022                  * Connection filter, if cluster uuid is not NULL it must
1023                  * match the span cluster uuid.  Only applies when the
1024                  * peer_type matches.
1025                  */
1026                 if (lspan->peer_type == lconn->peer_type &&
1027                     !uuid_is_nil(&lconn->pfs_clid, NULL) &&
1028                     uuid_compare(&slink->node->cls->pfs_clid,
1029                                  &lconn->pfs_clid, NULL)) {
1030                         break;
1031                 }
1032
1033                 /*
1034                  * Connection filter, if cluster label is not empty it must
1035                  * match the span cluster label.  Only applies when the
1036                  * peer_type matches.
1037                  */
1038                 if (lspan->peer_type == lconn->peer_type &&
1039                     lconn->cl_label[0] &&
1040                     strcmp(lconn->cl_label, slink->node->cls->cl_label)) {
1041                         break;
1042                 }
1043
1044                 /*
1045                  * NOTE! fs_uuid differentiates nodes within the same cluster
1046                  *       so we obviously don't want to match those.  Similarly
1047                  *       for fs_label.
1048                  */
1049
1050                 /*
1051                  * Ok, we've accepted this SPAN for relaying.
1052                  */
1053                 assert(relay == NULL ||
1054                        relay->link->node != slink->node ||
1055                        relay->link->dist >= slink->dist);
1056                 relay = dmsg_alloc(sizeof(*relay));
1057                 relay->conn = conn;
1058                 relay->link = slink;
1059
1060                 msg = dmsg_msg_alloc(conn->state->iocom->router, 0,
1061                                         DMSG_LNK_SPAN |
1062                                         DMSGF_CREATE,
1063                                         dmsg_lnk_relay, relay);
1064                 relay->state = msg->state;
1065                 relay->router = dmsg_router_alloc();
1066                 relay->router->iocom = relay->state->iocom;
1067                 relay->router->relay = relay;
1068                 relay->router->target = relay->state->msgid;
1069
1070                 msg->any.lnk_span = slink->state->msg->any.lnk_span;
1071                 msg->any.lnk_span.dist = slink->dist + 1;
1072
1073                 dmsg_router_connect(relay->router);
1074
1075                 RB_INSERT(h2span_relay_tree, &conn->tree, relay);
1076                 TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
1077
1078                 dmsg_msg_write(msg);
1079
1080                 fprintf(stderr,
1081                         "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d "
1082                         "FD %d state %p\n",
1083                         slink,
1084                         relay,
1085                         node->cls, node, slink->dist,
1086                         conn->state->iocom->sock_fd, relay->state);
1087
1088                 /*
1089                  * Match (created new relay), get the next relay to
1090                  * match against the next slink.
1091                  */
1092                 relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
1093                 if (--count == 0)
1094                         break;
1095         }
1096
1097         /*
1098          * Any remaining relay's belonging to this connection which match
1099          * the node are in excess of the current aggregate spanning state
1100          * and should be removed.
1101          */
1102         while (relay && relay->link->node == node) {
1103                 next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
1104                 dmsg_relay_delete(relay);
1105                 relay = next_relay;
1106         }
1107 }
1108
1109 static
1110 void
1111 dmsg_relay_delete(h2span_relay_t *relay)
1112 {
1113         fprintf(stderr,
1114                 "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
1115                 relay->link,
1116                 relay,
1117                 relay->link->node->cls, relay->link->node,
1118                 relay->link->dist,
1119                 relay->conn->state->iocom->sock_fd, relay->state);
1120
1121         dmsg_router_disconnect(&relay->router);
1122
1123         RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
1124         TAILQ_REMOVE(&relay->link->relayq, relay, entry);
1125
1126         if (relay->state) {
1127                 relay->state->any.relay = NULL;
1128                 dmsg_state_reply(relay->state, 0);
1129                 /* state invalid after reply */
1130                 relay->state = NULL;
1131         }
1132         relay->conn = NULL;
1133         relay->link = NULL;
1134         dmsg_free(relay);
1135 }
1136
1137 static void *
1138 dmsg_volconf_thread(void *info)
1139 {
1140         h2span_media_config_t *conf = info;
1141
1142         pthread_mutex_lock(&cluster_mtx);
1143         while ((conf->ctl & H2CONFCTL_STOP) == 0) {
1144                 if (conf->ctl & H2CONFCTL_UPDATE) {
1145                         fprintf(stderr, "VOLCONF UPDATE\n");
1146                         conf->ctl &= ~H2CONFCTL_UPDATE;
1147                         if (bcmp(&conf->copy_run, &conf->copy_pend,
1148                                  sizeof(conf->copy_run)) == 0) {
1149                                 fprintf(stderr, "VOLCONF: no changes\n");
1150                                 continue;
1151                         }
1152                         /*
1153                          * XXX TODO - auto reconnect on lookup failure or
1154                          *              connect failure or stream failure.
1155                          */
1156
1157                         pthread_mutex_unlock(&cluster_mtx);
1158                         dmsg_volconf_stop(conf);
1159                         conf->copy_run = conf->copy_pend;
1160                         if (conf->copy_run.copyid != 0 &&
1161                             strncmp(conf->copy_run.path, "span:", 5) == 0) {
1162                                 dmsg_volconf_start(conf,
1163                                                       conf->copy_run.path + 5);
1164                         }
1165                         pthread_mutex_lock(&cluster_mtx);
1166                         fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
1167                 }
1168                 if (conf->state == H2MC_CONNECT) {
1169                         dmsg_volconf_start(conf, conf->copy_run.path + 5);
1170                         pthread_mutex_unlock(&cluster_mtx);
1171                         sleep(5);
1172                         pthread_mutex_lock(&cluster_mtx);
1173                 } else {
1174                         pthread_cond_wait(&conf->cond, &cluster_mtx);
1175                 }
1176         }
1177         pthread_mutex_unlock(&cluster_mtx);
1178         dmsg_volconf_stop(conf);
1179         return(NULL);
1180 }
1181
1182 static
1183 void
1184 dmsg_volconf_stop(h2span_media_config_t *conf)
1185 {
1186         switch(conf->state) {
1187         case H2MC_STOPPED:
1188                 break;
1189         case H2MC_CONNECT:
1190                 conf->state = H2MC_STOPPED;
1191                 break;
1192         case H2MC_RUNNING:
1193                 shutdown(conf->fd, SHUT_WR);
1194                 pthread_join(conf->iocom_thread, NULL);
1195                 conf->iocom_thread = NULL;
1196                 break;
1197         }
1198 }
1199
1200 static
1201 void
1202 dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname)
1203 {
1204         dmsg_master_service_info_t *info;
1205
1206         switch(conf->state) {
1207         case H2MC_STOPPED:
1208         case H2MC_CONNECT:
1209                 conf->fd = dmsg_connect(hostname);
1210                 if (conf->fd < 0) {
1211                         fprintf(stderr, "Unable to connect to %s\n", hostname);
1212                         conf->state = H2MC_CONNECT;
1213                 } else {
1214                         info = malloc(sizeof(*info));
1215                         bzero(info, sizeof(*info));
1216                         info->fd = conf->fd;
1217                         info->detachme = 0;
1218                         conf->state = H2MC_RUNNING;
1219                         pthread_create(&conf->iocom_thread, NULL,
1220                                        dmsg_master_service, info);
1221                 }
1222                 break;
1223         case H2MC_RUNNING:
1224                 break;
1225         }
1226 }
1227
1228 /************************************************************************
1229  *                      ROUTER AND MESSAGING HANDLES                    *
1230  ************************************************************************
1231  *
1232  * Basically the idea here is to provide a stable data structure which
1233  * can be localized to the caller for higher level protocols to work with.
1234  * Depends on the context, these dmsg_handle's can be pooled by use-case
1235  * and remain persistent through a client (or mount point's) life.
1236  */
1237
1238 #if 0
1239 /*
1240  * Obtain a stable handle on a cluster given its uuid.  This ties directly
1241  * into the global cluster topology, creating the structure if necessary
1242  * (even if the uuid does not exist or does not exist yet), and preventing
1243  * the structure from getting ripped out from under us while we hold a
1244  * pointer to it.
1245  */
1246 h2span_cluster_t *
1247 dmsg_cluster_get(uuid_t *pfs_clid)
1248 {
1249         h2span_cluster_t dummy_cls;
1250         h2span_cluster_t *cls;
1251
1252         dummy_cls.pfs_clid = *pfs_clid;
1253         pthread_mutex_lock(&cluster_mtx);
1254         cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
1255         if (cls)
1256                 ++cls->refs;
1257         pthread_mutex_unlock(&cluster_mtx);
1258         return (cls);
1259 }
1260
1261 void
1262 dmsg_cluster_put(h2span_cluster_t *cls)
1263 {
1264         pthread_mutex_lock(&cluster_mtx);
1265         assert(cls->refs > 0);
1266         --cls->refs;
1267         if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
1268                 RB_REMOVE(h2span_cluster_tree,
1269                           &cluster_tree, cls);
1270                 dmsg_free(cls);
1271         }
1272         pthread_mutex_unlock(&cluster_mtx);
1273 }
1274
1275 /*
1276  * Obtain a stable handle to a specific cluster node given its uuid.
1277  * This handle does NOT lock in the route to the node and is typically
1278  * used as part of the dmsg_handle_*() API to obtain a set of
1279  * stable nodes.
1280  */
1281 h2span_node_t *
1282 dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
1283 {
1284 }
1285
1286 #endif
1287
1288 #if 0
1289 /*
1290  * Acquire a persistent router structure given the cluster and node ids.
1291  * Messages can be transacted via this structure while held.  If the route
1292  * is lost messages will return failure.
1293  */
1294 dmsg_router_t *
1295 dmsg_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid)
1296 {
1297 }
1298
1299 /*
1300  * Release previously acquired router.
1301  */
1302 void
1303 dmsg_router_put(dmsg_router_t *router)
1304 {
1305 }
1306 #endif
1307
1308 /*
1309  * Dumps the spanning tree
1310  */
1311 void
1312 dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused)
1313 {
1314         h2span_cluster_t *cls;
1315         h2span_node_t *node;
1316         h2span_link_t *slink;
1317         char *uustr = NULL;
1318
1319         pthread_mutex_lock(&cluster_mtx);
1320         RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
1321                 dmsg_router_printf(router, "Cluster %s (%s)\n",
1322                                    dmsg_uuid_to_str(&cls->pfs_clid, &uustr),
1323                                    cls->cl_label);
1324                 RB_FOREACH(node, h2span_node_tree, &cls->tree) {
1325                         dmsg_router_printf(router, "    Node %s (%s)\n",
1326                                 dmsg_uuid_to_str(&node->pfs_fsid, &uustr),
1327                                 node->fs_label);
1328                         RB_FOREACH(slink, h2span_link_tree, &node->tree) {
1329                                 dmsg_router_printf(router,
1330                                             "\tLink dist=%d via %d\n",
1331                                             slink->dist,
1332                                             slink->state->iocom->sock_fd);
1333                         }
1334                 }
1335         }
1336         pthread_mutex_unlock(&cluster_mtx);
1337         if (uustr)
1338                 free(uustr);
1339 #if 0
1340         TAILQ_FOREACH(conn, &connq, entry) {
1341         }
1342 #endif
1343 }