67cc7a9670be58ea4cb05dbe9054b6e89aed8fee
[dragonfly.git] / lib / libdmsg / msg_lnk.c
1 /*
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS
36  *
37  * This code supports the LNK_SPAN protocol.  Essentially all PFS's
38  * clients and services rendezvous with the userland hammer2 service and
39  * open LNK_SPAN transactions using a message header linkid of 0,
40  * registering any PFS's they have connectivity to with us.
41  *
42  * --
43  *
44  * Each registration maintains its own open LNK_SPAN message transaction.
45  * The SPANs are collected, aggregated, and retransmitted over available
46  * connections through the maintainance of additional LNK_SPAN message
47  * transactions on each link.
48  *
49  * The msgid for each active LNK_SPAN transaction we receive allows us to
50  * send a message to the target PFS (which might be one of many belonging
51  * to the same cluster), by specifying that msgid as the linkid in any
52  * message we send to the target PFS.
53  *
54  * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit
55  * (and remember we will maintain multiple open LNK_SPAN transactions on
56  * each connection representing the topology span, so every node sees every
57  * other node as a separate open transaction).  So, similarly the msgid for
58  * these active transactions which we initiated can be used by the other
59  * end to route messages through us to another node, ultimately winding up
60  * at the identified hammer2 PFS.  We have to adjust the spanid in the message
61  * header at each hop to be representative of the outgoing LNK_SPAN we
62  * are forwarding the message through.
63  *
64  * --
65  *
66  * If we were to retransmit every LNK_SPAN transaction we receive it would
67  * create a huge mess, so we have to aggregate all received LNK_SPAN
68  * transactions, sort them by the fsid (the cluster) and sub-sort them by
69  * the pfs_fsid (individual nodes in the cluster), and only retransmit
70  * (create outgoing transactions) for a subset of the nearest distance-hops
71  * for each individual node.
72  *
73  * The higher level protocols can then issue transactions to the nodes making
74  * up a cluster to perform all actions required.
75  *
76  * --
77  *
78  * Since this is a large topology and a spanning tree protocol, links can
79  * go up and down all the time.  Any time a link goes down its transaction
80  * is closed.  The transaction has to be closed on both ends before we can
81  * delete (and potentially reuse) the related spanid.  The LNK_SPAN being
82  * closed may have been propagated out to other connections and those related
83  * LNK_SPANs are also closed.  Ultimately all routes via the lost LNK_SPAN
84  * go away, ultimately reaching all sources and all targets.
85  *
86  * Any messages in-transit using a route that goes away will be thrown away.
87  * Open transactions are only tracked at the two end-points.  When a link
88  * failure propagates to an end-point the related open transactions lose
89  * their spanid and are automatically aborted.
90  *
91  * It is important to note that internal route nodes cannot just associate
92  * a lost LNK_SPAN transaction with another route to the same destination.
93  * Message transactions MUST be serialized and MUST be ordered.  All messages
94  * for a transaction must run over the same route.  So if the route used by
95  * an active transaction is lost, the related messages will be fully aborted
96  * and the higher protocol levels will retry as appropriate.
97  *
98  * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation
99  * back to the originator.  Only the originator keeps tracks of a message.
100  * Routers just pass it through.  If a route is lost during transit the
101  * message is simply thrown away.
102  *
103  * It is also important to note that several paths to the same PFS can be
104  * propagated along the same link, which allows concurrency and even
105  * redundancy over several network interfaces or via different routes through
106  * the topology.  Any given transaction will use only a single route but busy
107  * servers will often have hundreds of transactions active simultaniously,
108  * so having multiple active paths through the network topology for A<->B
109  * will improve performance.
110  *
111  * --
112  *
113  * Most protocols consolidate operations rather than simply relaying them.
114  * This is particularly true of LEAF protocols (such as strict HAMMER2
115  * clients), of which there can be millions connecting into the cluster at
116  * various points.  The SPAN protocol is not used for these LEAF elements.
117  *
118  * Instead the primary service they connect to implements a proxy for the
119  * client protocols so the core topology only has to propagate a couple of
120  * LNK_SPANs and not millions.  LNK_SPANs are meant to be used only for
121  * core master nodes and satellite slaves and cache nodes.
122  */
123
124 #include "dmsg_local.h"
125
126 /*
127  * Maximum spanning tree distance.  This has the practical effect of
128  * stopping tail-chasing closed loops when a feeder span is lost.
129  */
130 #define DMSG_SPAN_MAXDIST       16
131
132 /*
133  * RED-BLACK TREE DEFINITIONS
134  *
135  * We need to track:
136  *
137  * (1) shared fsid's (a cluster).
138  * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions.
139  *
140  * We need to aggegate all active LNK_SPANs, aggregate, and create our own
141  * outgoing LNK_SPAN transactions on each of our connections representing
142  * the aggregated state.
143  *
144  * h2span_conn          - list of iocom connections who wish to receive SPAN
145  *                        propagation from other connections.  Might contain
146  *                        a filter string.  Only iocom's with an open
147  *                        LNK_CONN transactions are applicable for SPAN
148  *                        propagation.
149  *
150  * h2span_relay         - List of links relayed (via SPAN).  Essentially
151  *                        each relay structure represents a LNK_SPAN
152  *                        transaction that we initiated, verses h2span_link
153  *                        which is a LNK_SPAN transaction that we received.
154  *
155  * --
156  *
157  * h2span_cluster       - Organizes the shared fsid's.  One structure for
158  *                        each cluster.
159  *
160  * h2span_node          - Organizes the nodes in a cluster.  One structure
161  *                        for each unique {cluster,node}, aka {fsid, pfs_fsid}.
162  *
163  * h2span_link          - Organizes all incoming and outgoing LNK_SPAN message
164  *                        transactions related to a node.
165  *
166  *                        One h2span_link structure for each incoming LNK_SPAN
167  *                        transaction.  Links selected for propagation back
168  *                        out are also where the outgoing LNK_SPAN messages
169  *                        are indexed into (so we can propagate changes).
170  *
171  *                        The h2span_link's use a red-black tree to sort the
172  *                        distance hop metric for the incoming LNK_SPAN.  We
173  *                        then select the top N for outgoing.  When the
174  *                        topology changes the top N may also change and cause
175  *                        new outgoing LNK_SPAN transactions to be opened
176  *                        and less desireable ones to be closed, causing
177  *                        transactional aborts within the message flow in
178  *                        the process.
179  *
180  * Also note            - All outgoing LNK_SPAN message transactions are also
181  *                        entered into a red-black tree for use by the routing
182  *                        function.  This is handled by msg.c in the state
183  *                        code, not here.
184  */
185
186 struct h2span_link;
187 struct h2span_relay;
188 TAILQ_HEAD(h2span_media_queue, h2span_media);
189 TAILQ_HEAD(h2span_conn_queue, h2span_conn);
190 TAILQ_HEAD(h2span_relay_queue, h2span_relay);
191
192 RB_HEAD(h2span_cluster_tree, h2span_cluster);
193 RB_HEAD(h2span_node_tree, h2span_node);
194 RB_HEAD(h2span_link_tree, h2span_link);
195 RB_HEAD(h2span_relay_tree, h2span_relay);
196
197 /*
198  * This represents a media
199  */
200 struct h2span_media {
201         TAILQ_ENTRY(h2span_media) entry;
202         uuid_t  mediaid;
203         int     refs;
204         struct h2span_media_config {
205                 dmsg_vol_data_t         copy_run;
206                 dmsg_vol_data_t         copy_pend;
207                 pthread_t               thread;
208                 pthread_cond_t          cond;
209                 int                     ctl;
210                 int                     fd;
211                 dmsg_iocom_t            iocom;
212                 pthread_t               iocom_thread;
213                 enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
214         } config[DMSG_COPYID_COUNT];
215 };
216
217 typedef struct h2span_media_config h2span_media_config_t;
218
219 #define H2CONFCTL_STOP          0x00000001
220 #define H2CONFCTL_UPDATE        0x00000002
221
222 /*
223  * Received LNK_CONN transaction enables SPAN protocol over connection.
224  * (may contain filter).  Typically one for each mount and several may
225  * share the same media.
226  */
227 struct h2span_conn {
228         TAILQ_ENTRY(h2span_conn) entry;
229         struct h2span_relay_tree tree;
230         struct h2span_media *media;
231         dmsg_state_t *state;
232 };
233
234 /*
235  * All received LNK_SPANs are organized by cluster (pfs_clid),
236  * node (pfs_fsid), and link (received LNK_SPAN transaction).
237  */
238 struct h2span_cluster {
239         RB_ENTRY(h2span_cluster) rbnode;
240         struct h2span_node_tree tree;
241         uuid_t  pfs_clid;               /* shared fsid */
242         int     refs;                   /* prevents destruction */
243 };
244
245 struct h2span_node {
246         RB_ENTRY(h2span_node) rbnode;
247         struct h2span_link_tree tree;
248         struct h2span_cluster *cls;
249         uint8_t peer_type;
250         uuid_t  pfs_fsid;               /* unique fsid */
251         char    label[64];
252 };
253
254 struct h2span_link {
255         RB_ENTRY(h2span_link) rbnode;
256         dmsg_state_t    *state;         /* state<->link */
257         struct h2span_node *node;       /* related node */
258         int32_t dist;
259         struct h2span_relay_queue relayq; /* relay out */
260         struct dmsg_router *router;     /* route out this link */
261 };
262
263 /*
264  * Any LNK_SPAN transactions we receive which are relayed out other
265  * connections utilize this structure to track the LNK_SPAN transaction
266  * we initiate on the other connections, if selected for relay.
267  *
268  * In many respects this is the core of the protocol... actually figuring
269  * out what LNK_SPANs to relay.  The spanid used for relaying is the
270  * address of the 'state' structure, which is why h2span_relay has to
271  * be entered into a RB-TREE based at h2span_conn (so we can look
272  * up the spanid to validate it).
273  *
274  * NOTE: Messages can be received via the LNK_SPAN transaction the
275  *       relay maintains, and can be replied via relay->router, but
276  *       messages are NOT initiated via a relay.  Messages are initiated
277  *       via incoming links (h2span_link's).
278  *
279  *       relay->link represents the link being relayed, NOT the LNK_SPAN
280  *       transaction the relay is holding open.
281  */
282 struct h2span_relay {
283         RB_ENTRY(h2span_relay) rbnode;  /* from h2span_conn */
284         TAILQ_ENTRY(h2span_relay) entry; /* from link */
285         struct h2span_conn *conn;
286         dmsg_state_t    *state;         /* transmitted LNK_SPAN */
287         struct h2span_link *link;       /* LNK_SPAN being relayed */
288         struct dmsg_router      *router;/* route out this relay */
289 };
290
291
292 typedef struct h2span_media h2span_media_t;
293 typedef struct h2span_conn h2span_conn_t;
294 typedef struct h2span_cluster h2span_cluster_t;
295 typedef struct h2span_node h2span_node_t;
296 typedef struct h2span_link h2span_link_t;
297 typedef struct h2span_relay h2span_relay_t;
298
299 static
300 int
301 h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2)
302 {
303         return(uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL));
304 }
305
306 static
307 int
308 h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
309 {
310         int r;
311
312         if (node1->peer_type < node2->peer_type)
313                 return(-1);
314         if (node1->peer_type > node2->peer_type)
315                 return(1);
316         r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL);
317         if (r == 0 && node1->peer_type == DMSG_PEER_BLOCK)
318                 r = strcmp(node1->label, node2->label);
319         return (r);
320 }
321
322 /*
323  * Sort/subsort must match h2span_relay_cmp() under any given node
324  * to make the aggregation algorithm easier, so the best links are
325  * in the same sorted order as the best relays.
326  *
327  * NOTE: We cannot use link*->state->msgid because this msgid is created
328  *       by each remote host and thus might wind up being the same.
329  */
330 static
331 int
332 h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
333 {
334         if (link1->dist < link2->dist)
335                 return(-1);
336         if (link1->dist > link2->dist)
337                 return(1);
338 #if 1
339         if ((uintptr_t)link1->state < (uintptr_t)link2->state)
340                 return(-1);
341         if ((uintptr_t)link1->state > (uintptr_t)link2->state)
342                 return(1);
343 #else
344         if (link1->state->msgid < link2->state->msgid)
345                 return(-1);
346         if (link1->state->msgid > link2->state->msgid)
347                 return(1);
348 #endif
349         return(0);
350 }
351
352 /*
353  * Relay entries are sorted by node, subsorted by distance and link
354  * address (so we can match up the conn->tree relay topology with
355  * a node's link topology).
356  */
357 static
358 int
359 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
360 {
361         h2span_link_t *link1 = relay1->link;
362         h2span_link_t *link2 = relay2->link;
363
364         if ((intptr_t)link1->node < (intptr_t)link2->node)
365                 return(-1);
366         if ((intptr_t)link1->node > (intptr_t)link2->node)
367                 return(1);
368         if (link1->dist < link2->dist)
369                 return(-1);
370         if (link1->dist > link2->dist)
371                 return(1);
372 #if 1
373         if ((uintptr_t)link1->state < (uintptr_t)link2->state)
374                 return(-1);
375         if ((uintptr_t)link1->state > (uintptr_t)link2->state)
376                 return(1);
377 #else
378         if (link1->state->msgid < link2->state->msgid)
379                 return(-1);
380         if (link1->state->msgid > link2->state->msgid)
381                 return(1);
382 #endif
383         return(0);
384 }
385
386 RB_PROTOTYPE_STATIC(h2span_cluster_tree, h2span_cluster,
387              rbnode, h2span_cluster_cmp);
388 RB_PROTOTYPE_STATIC(h2span_node_tree, h2span_node,
389              rbnode, h2span_node_cmp);
390 RB_PROTOTYPE_STATIC(h2span_link_tree, h2span_link,
391              rbnode, h2span_link_cmp);
392 RB_PROTOTYPE_STATIC(h2span_relay_tree, h2span_relay,
393              rbnode, h2span_relay_cmp);
394
395 RB_GENERATE_STATIC(h2span_cluster_tree, h2span_cluster,
396              rbnode, h2span_cluster_cmp);
397 RB_GENERATE_STATIC(h2span_node_tree, h2span_node,
398              rbnode, h2span_node_cmp);
399 RB_GENERATE_STATIC(h2span_link_tree, h2span_link,
400              rbnode, h2span_link_cmp);
401 RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay,
402              rbnode, h2span_relay_cmp);
403
404 /*
405  * Global mutex protects cluster_tree lookups, connq, mediaq.
406  */
407 static pthread_mutex_t cluster_mtx;
408 static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
409 static struct h2span_conn_queue connq = TAILQ_HEAD_INITIALIZER(connq);
410 static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq);
411
412 static void dmsg_lnk_span(dmsg_msg_t *msg);
413 static void dmsg_lnk_conn(dmsg_msg_t *msg);
414 static void dmsg_lnk_relay(dmsg_msg_t *msg);
415 static void dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node);
416 static void dmsg_relay_delete(h2span_relay_t *relay);
417
418 static void *dmsg_volconf_thread(void *info);
419 static void dmsg_volconf_stop(h2span_media_config_t *conf);
420 static void dmsg_volconf_start(h2span_media_config_t *conf,
421                                 const char *hostname);
422
423 void
424 dmsg_msg_lnk_signal(dmsg_router_t *router __unused)
425 {
426         pthread_mutex_lock(&cluster_mtx);
427         dmsg_relay_scan(NULL, NULL);
428         pthread_mutex_unlock(&cluster_mtx);
429 }
430
431 /*
432  * Receive a DMSG_PROTO_LNK message.  This only called for
433  * one-way and opening-transactions since state->func will be assigned
434  * in all other cases.
435  */
436 void
437 dmsg_msg_lnk(dmsg_msg_t *msg)
438 {
439         switch(msg->any.head.cmd & DMSGF_BASECMDMASK) {
440         case DMSG_LNK_CONN:
441                 dmsg_lnk_conn(msg);
442                 break;
443         case DMSG_LNK_SPAN:
444                 dmsg_lnk_span(msg);
445                 break;
446         default:
447                 fprintf(stderr,
448                         "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
449                 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
450                 /* state invalid after reply */
451                 break;
452         }
453 }
454
455 void
456 dmsg_lnk_conn(dmsg_msg_t *msg)
457 {
458         dmsg_state_t *state = msg->state;
459         h2span_media_t *media;
460         h2span_media_config_t *conf;
461         h2span_conn_t *conn;
462         h2span_relay_t *relay;
463         char *alloc = NULL;
464         int i;
465
466         pthread_mutex_lock(&cluster_mtx);
467
468         switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
469         case DMSG_LNK_CONN | DMSGF_CREATE:
470         case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
471                 /*
472                  * On transaction start we allocate a new h2span_conn and
473                  * acknowledge the request, leaving the transaction open.
474                  * We then relay priority-selected SPANs.
475                  */
476                 fprintf(stderr, "LNK_CONN(%08x): %s/%s\n",
477                         (uint32_t)msg->any.head.msgid,
478                         dmsg_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
479                                             &alloc),
480                         msg->any.lnk_conn.label);
481                 free(alloc);
482
483                 conn = dmsg_alloc(sizeof(*conn));
484
485                 RB_INIT(&conn->tree);
486                 conn->state = state;
487                 state->func = dmsg_lnk_conn;
488                 state->any.conn = conn;
489                 TAILQ_INSERT_TAIL(&connq, conn, entry);
490
491                 /*
492                  * Set up media
493                  */
494                 TAILQ_FOREACH(media, &mediaq, entry) {
495                         if (uuid_compare(&msg->any.lnk_conn.mediaid,
496                                          &media->mediaid, NULL) == 0) {
497                                 break;
498                         }
499                 }
500                 if (media == NULL) {
501                         media = dmsg_alloc(sizeof(*media));
502                         media->mediaid = msg->any.lnk_conn.mediaid;
503                         TAILQ_INSERT_TAIL(&mediaq, media, entry);
504                 }
505                 conn->media = media;
506                 ++media->refs;
507
508                 if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
509                         dmsg_msg_result(msg, 0);
510                         dmsg_router_signal(msg->router);
511                         break;
512                 }
513                 /* FALL THROUGH */
514         case DMSG_LNK_CONN | DMSGF_DELETE:
515         case DMSG_LNK_ERROR | DMSGF_DELETE:
516 deleteconn:
517                 /*
518                  * On transaction terminate we clean out our h2span_conn
519                  * and acknowledge the request, closing the transaction.
520                  */
521                 fprintf(stderr, "LNK_CONN: Terminated\n");
522                 conn = state->any.conn;
523                 assert(conn);
524
525                 /*
526                  * Clean out the media structure. If refs drops to zero we
527                  * also clean out the media config threads.  These threads
528                  * maintain span connections to other hammer2 service daemons.
529                  */
530                 media = conn->media;
531                 if (--media->refs == 0) {
532                         fprintf(stderr, "Shutting down media spans\n");
533                         for (i = 0; i < DMSG_COPYID_COUNT; ++i) {
534                                 conf = &media->config[i];
535
536                                 if (conf->thread == NULL)
537                                         continue;
538                                 conf->ctl = H2CONFCTL_STOP;
539                                 pthread_cond_signal(&conf->cond);
540                         }
541                         for (i = 0; i < DMSG_COPYID_COUNT; ++i) {
542                                 conf = &media->config[i];
543
544                                 if (conf->thread == NULL)
545                                         continue;
546                                 pthread_mutex_unlock(&cluster_mtx);
547                                 pthread_join(conf->thread, NULL);
548                                 pthread_mutex_lock(&cluster_mtx);
549                                 conf->thread = NULL;
550                                 pthread_cond_destroy(&conf->cond);
551                         }
552                         fprintf(stderr, "Media shutdown complete\n");
553                         TAILQ_REMOVE(&mediaq, media, entry);
554                         dmsg_free(media);
555                 }
556
557                 /*
558                  * Clean out all relays.  This requires terminating each
559                  * relay transaction.
560                  */
561                 while ((relay = RB_ROOT(&conn->tree)) != NULL) {
562                         dmsg_relay_delete(relay);
563                 }
564
565                 /*
566                  * Clean out conn
567                  */
568                 conn->media = NULL;
569                 conn->state = NULL;
570                 msg->state->any.conn = NULL;
571                 TAILQ_REMOVE(&connq, conn, entry);
572                 dmsg_free(conn);
573
574                 dmsg_msg_reply(msg, 0);
575                 /* state invalid after reply */
576                 break;
577         case DMSG_LNK_VOLCONF:
578                 /*
579                  * One-way volume-configuration message is transmitted
580                  * over the open LNK_CONN transaction.
581                  */
582                 fprintf(stderr, "RECEIVED VOLCONF\n");
583                 if (msg->any.lnk_volconf.index < 0 ||
584                     msg->any.lnk_volconf.index >= DMSG_COPYID_COUNT) {
585                         fprintf(stderr, "VOLCONF: ILLEGAL INDEX %d\n",
586                                 msg->any.lnk_volconf.index);
587                         break;
588                 }
589                 if (msg->any.lnk_volconf.copy.path[sizeof(msg->any.lnk_volconf.copy.path) - 1] != 0 ||
590                     msg->any.lnk_volconf.copy.path[0] == 0) {
591                         fprintf(stderr, "VOLCONF: ILLEGAL PATH %d\n",
592                                 msg->any.lnk_volconf.index);
593                         break;
594                 }
595                 conn = msg->state->any.conn;
596                 if (conn == NULL) {
597                         fprintf(stderr, "VOLCONF: LNK_CONN is missing\n");
598                         break;
599                 }
600                 conf = &conn->media->config[msg->any.lnk_volconf.index];
601                 conf->copy_pend = msg->any.lnk_volconf.copy;
602                 conf->ctl |= H2CONFCTL_UPDATE;
603                 if (conf->thread == NULL) {
604                         fprintf(stderr, "VOLCONF THREAD STARTED\n");
605                         pthread_cond_init(&conf->cond, NULL);
606                         pthread_create(&conf->thread, NULL,
607                                        dmsg_volconf_thread, (void *)conf);
608                 }
609                 pthread_cond_signal(&conf->cond);
610                 break;
611         default:
612                 /*
613                  * Failsafe
614                  */
615                 if (msg->any.head.cmd & DMSGF_DELETE)
616                         goto deleteconn;
617                 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
618                 break;
619         }
620         pthread_mutex_unlock(&cluster_mtx);
621 }
622
623 void
624 dmsg_lnk_span(dmsg_msg_t *msg)
625 {
626         dmsg_state_t *state = msg->state;
627         h2span_cluster_t dummy_cls;
628         h2span_node_t dummy_node;
629         h2span_cluster_t *cls;
630         h2span_node_t *node;
631         h2span_link_t *slink;
632         h2span_relay_t *relay;
633         char *alloc = NULL;
634
635         assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
636
637         pthread_mutex_lock(&cluster_mtx);
638
639         /*
640          * On transaction start we initialize the tracking infrastructure
641          */
642         if (msg->any.head.cmd & DMSGF_CREATE) {
643                 assert(state->func == NULL);
644                 state->func = dmsg_lnk_span;
645
646                 msg->any.lnk_span.label[sizeof(msg->any.lnk_span.label)-1] = 0;
647
648                 /*
649                  * Find the cluster
650                  */
651                 dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid;
652                 cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
653                 if (cls == NULL) {
654                         cls = dmsg_alloc(sizeof(*cls));
655                         cls->pfs_clid = msg->any.lnk_span.pfs_clid;
656                         RB_INIT(&cls->tree);
657                         RB_INSERT(h2span_cluster_tree, &cluster_tree, cls);
658                 }
659
660                 /*
661                  * Find the node
662                  */
663                 dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid;
664                 dummy_node.peer_type = msg->any.lnk_span.peer_type;
665                 snprintf(dummy_node.label, sizeof(dummy_node.label),
666                          "%s", msg->any.lnk_span.label);
667                 node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node);
668                 if (node == NULL) {
669                         node = dmsg_alloc(sizeof(*node));
670                         node->pfs_fsid = msg->any.lnk_span.pfs_fsid;
671                         node->peer_type = msg->any.lnk_span.peer_type;
672                         snprintf(node->label, sizeof(node->label),
673                                  "%s", msg->any.lnk_span.label);
674                         node->cls = cls;
675                         RB_INIT(&node->tree);
676                         RB_INSERT(h2span_node_tree, &cls->tree, node);
677                 }
678
679                 /*
680                  * Create the link
681                  */
682                 assert(state->any.link == NULL);
683                 slink = dmsg_alloc(sizeof(*slink));
684                 TAILQ_INIT(&slink->relayq);
685                 slink->node = node;
686                 slink->dist = msg->any.lnk_span.dist;
687                 slink->state = state;
688                 state->any.link = slink;
689
690                 /*
691                  * Embedded router structure in link for message forwarding.
692                  *
693                  * The spanning id for the router is the message id of
694                  * the SPAN link it is embedded in, allowing messages to
695                  * be routed via &slink->router.
696                  */
697                 slink->router = dmsg_router_alloc();
698                 slink->router->iocom = state->iocom;
699                 slink->router->link = slink;
700                 slink->router->target = state->msgid;
701                 dmsg_router_connect(slink->router);
702
703                 RB_INSERT(h2span_link_tree, &node->tree, slink);
704
705                 fprintf(stderr, "LNK_SPAN(thr %p): %p %s/%s dist=%d\n",
706                         msg->router->iocom,
707                         slink,
708                         dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid,
709                                             &alloc),
710                         msg->any.lnk_span.label,
711                         msg->any.lnk_span.dist);
712                 free(alloc);
713 #if 0
714                 dmsg_relay_scan(NULL, node);
715 #endif
716                 dmsg_router_signal(msg->router);
717         }
718
719         /*
720          * On transaction terminate we remove the tracking infrastructure.
721          */
722         if (msg->any.head.cmd & DMSGF_DELETE) {
723                 slink = state->any.link;
724                 assert(slink != NULL);
725                 node = slink->node;
726                 cls = node->cls;
727
728                 fprintf(stderr, "LNK_DELE(thr %p): %p %s/%s dist=%d\n",
729                         msg->router->iocom,
730                         slink,
731                         dmsg_uuid_to_str(&cls->pfs_clid, &alloc),
732                         state->msg->any.lnk_span.label,
733                         state->msg->any.lnk_span.dist);
734                 free(alloc);
735
736                 /*
737                  * Remove the router from consideration
738                  */
739                 dmsg_router_disconnect(&slink->router);
740
741                 /*
742                  * Clean out all relays.  This requires terminating each
743                  * relay transaction.
744                  */
745                 while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) {
746                         dmsg_relay_delete(relay);
747                 }
748
749                 /*
750                  * Clean out the topology
751                  */
752                 RB_REMOVE(h2span_link_tree, &node->tree, slink);
753                 if (RB_EMPTY(&node->tree)) {
754                         RB_REMOVE(h2span_node_tree, &cls->tree, node);
755                         if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
756                                 RB_REMOVE(h2span_cluster_tree,
757                                           &cluster_tree, cls);
758                                 dmsg_free(cls);
759                         }
760                         node->cls = NULL;
761                         dmsg_free(node);
762                         node = NULL;
763                 }
764                 state->any.link = NULL;
765                 slink->state = NULL;
766                 slink->node = NULL;
767                 dmsg_free(slink);
768
769                 /*
770                  * We have to terminate the transaction
771                  */
772                 dmsg_state_reply(state, 0);
773                 /* state invalid after reply */
774
775                 /*
776                  * If the node still exists issue any required updates.  If
777                  * it doesn't then all related relays have already been
778                  * removed and there's nothing left to do.
779                  */
780 #if 0
781                 if (node)
782                         dmsg_relay_scan(NULL, node);
783 #endif
784                 if (node)
785                         dmsg_router_signal(msg->router);
786         }
787
788         pthread_mutex_unlock(&cluster_mtx);
789 }
790
791 /*
792  * Messages received on relay SPANs.  These are open transactions so it is
793  * in fact possible for the other end to close the transaction.
794  *
795  * XXX MPRACE on state structure
796  */
797 static void
798 dmsg_lnk_relay(dmsg_msg_t *msg)
799 {
800         dmsg_state_t *state = msg->state;
801         h2span_relay_t *relay;
802
803         assert(msg->any.head.cmd & DMSGF_REPLY);
804
805         if (msg->any.head.cmd & DMSGF_DELETE) {
806                 pthread_mutex_lock(&cluster_mtx);
807                 if ((relay = state->any.relay) != NULL) {
808                         dmsg_relay_delete(relay);
809                 } else {
810                         dmsg_state_reply(state, 0);
811                 }
812                 pthread_mutex_unlock(&cluster_mtx);
813         }
814 }
815
816 /*
817  * Update relay transactions for SPANs.
818  *
819  * Called with cluster_mtx held.
820  */
821 static void dmsg_relay_scan_specific(h2span_node_t *node,
822                                         h2span_conn_t *conn);
823
824 static void
825 dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node)
826 {
827         h2span_cluster_t *cls;
828
829         if (node) {
830                 /*
831                  * Iterate specific node
832                  */
833                 TAILQ_FOREACH(conn, &connq, entry)
834                         dmsg_relay_scan_specific(node, conn);
835         } else {
836                 /*
837                  * Full iteration.
838                  *
839                  * Iterate cluster ids, nodes, and either a specific connection
840                  * or all connections.
841                  */
842                 RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
843                         /*
844                          * Iterate node ids
845                          */
846                         RB_FOREACH(node, h2span_node_tree, &cls->tree) {
847                                 /*
848                                  * Synchronize the node's link (received SPANs)
849                                  * with each connection's relays.
850                                  */
851                                 if (conn) {
852                                         dmsg_relay_scan_specific(node, conn);
853                                 } else {
854                                         TAILQ_FOREACH(conn, &connq, entry) {
855                                             dmsg_relay_scan_specific(node,
856                                                                         conn);
857                                         }
858                                         assert(conn == NULL);
859                                 }
860                         }
861                 }
862         }
863 }
864
865 /*
866  * Update the relay'd SPANs for this (node, conn).
867  *
868  * Iterate links and adjust relays to match.  We only propagate the top link
869  * for now (XXX we want to propagate the top two).
870  *
871  * The dmsg_relay_scan_cmp() function locates the first relay element
872  * for any given node.  The relay elements will be sub-sorted by dist.
873  */
874 struct relay_scan_info {
875         h2span_node_t *node;
876         h2span_relay_t *relay;
877 };
878
879 static int
880 dmsg_relay_scan_cmp(h2span_relay_t *relay, void *arg)
881 {
882         struct relay_scan_info *info = arg;
883
884         if ((intptr_t)relay->link->node < (intptr_t)info->node)
885                 return(-1);
886         if ((intptr_t)relay->link->node > (intptr_t)info->node)
887                 return(1);
888         return(0);
889 }
890
891 static int
892 dmsg_relay_scan_callback(h2span_relay_t *relay, void *arg)
893 {
894         struct relay_scan_info *info = arg;
895
896         info->relay = relay;
897         return(-1);
898 }
899
900 static void
901 dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
902 {
903         struct relay_scan_info info;
904         h2span_relay_t *relay;
905         h2span_relay_t *next_relay;
906         h2span_link_t *slink;
907         dmsg_lnk_conn_t *lconn;
908         dmsg_msg_t *msg;
909         int count = 2;
910         uint8_t peer_type;
911
912         info.node = node;
913         info.relay = NULL;
914
915         /*
916          * Locate the first related relay for the node on this connection.
917          * relay will be NULL if there were none.
918          */
919         RB_SCAN(h2span_relay_tree, &conn->tree,
920                 dmsg_relay_scan_cmp, dmsg_relay_scan_callback, &info);
921         relay = info.relay;
922         info.relay = NULL;
923         if (relay)
924                 assert(relay->link->node == node);
925
926         if (DMsgDebugOpt > 8)
927                 fprintf(stderr, "relay scan for connection %p\n", conn);
928
929         /*
930          * Iterate the node's links (received SPANs) in distance order,
931          * lowest (best) dist first.
932          *
933          * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION.
934          *
935          * Track relays while iterating the best links and construct
936          * missing relays when necessary.
937          *
938          * (If some prior better link was removed it would have also
939          *  removed the relay, so the relay can only match exactly or
940          *  be worse).
941          */
942         RB_FOREACH(slink, h2span_link_tree, &node->tree) {
943                 /*
944                  * Match, relay already in-place, get the next
945                  * relay to match against the next slink.
946                  */
947                 if (relay && relay->link == slink) {
948                         relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
949                         if (--count == 0)
950                                 break;
951                         continue;
952                 }
953
954                 /*
955                  * We might want this SLINK, if it passes our filters.
956                  *
957                  * The spanning tree can cause closed loops so we have
958                  * to limit slink->dist.
959                  */
960                 if (slink->dist > DMSG_SPAN_MAXDIST)
961                         break;
962
963                 /*
964                  * Don't bother transmitting a LNK_SPAN out the same
965                  * connection it came in on.  Trivial optimization.
966                  */
967                 if (slink->state->iocom == conn->state->iocom)
968                         break;
969
970                 /*
971                  * NOTE ON FILTERS: The protocol spec allows non-requested
972                  * SPANs to be transmitted, the other end is expected to
973                  * leave their transactions open but otherwise ignore them.
974                  *
975                  * Don't bother transmitting if the remote connection
976                  * is not accepting this SPAN's peer_type.
977                  */
978                 peer_type = slink->state->msg->any.lnk_span.peer_type;
979                 lconn = &conn->state->msg->any.lnk_conn;
980                 if (((1LLU << peer_type) & lconn->peer_mask) == 0)
981                         break;
982
983                 /*
984                  * Filter based on pfs_clid or label (XXX).  This typically
985                  * reduces the amount of SPAN traffic that a mount end-point
986                  * sees by only passing along SPANs related to the cluster id
987                  * (that is, it will see all PFS's associated with the
988                  * particular cluster it represents).
989                  */
990                 if (peer_type == lconn->peer_type &&
991                     peer_type == DMSG_PEER_HAMMER2) {
992                         if (!uuid_is_nil(&slink->node->cls->pfs_clid, NULL) &&
993                             uuid_compare(&slink->node->cls->pfs_clid,
994                                          &lconn->pfs_clid, NULL) != 0) {
995                                 break;
996                         }
997                 }
998
999                 /*
1000                  * Ok, we've accepted this SPAN for relaying.
1001                  */
1002                 assert(relay == NULL ||
1003                        relay->link->node != slink->node ||
1004                        relay->link->dist >= slink->dist);
1005                 relay = dmsg_alloc(sizeof(*relay));
1006                 relay->conn = conn;
1007                 relay->link = slink;
1008
1009                 msg = dmsg_msg_alloc(conn->state->iocom->router, 0,
1010                                         DMSG_LNK_SPAN |
1011                                         DMSGF_CREATE,
1012                                         dmsg_lnk_relay, relay);
1013                 relay->state = msg->state;
1014                 relay->router = dmsg_router_alloc();
1015                 relay->router->iocom = relay->state->iocom;
1016                 relay->router->relay = relay;
1017                 relay->router->target = relay->state->msgid;
1018
1019                 msg->any.lnk_span = slink->state->msg->any.lnk_span;
1020                 msg->any.lnk_span.dist = slink->dist + 1;
1021
1022                 dmsg_router_connect(relay->router);
1023
1024                 RB_INSERT(h2span_relay_tree, &conn->tree, relay);
1025                 TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
1026
1027                 dmsg_msg_write(msg);
1028
1029                 fprintf(stderr,
1030                         "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d "
1031                         "FD %d state %p\n",
1032                         slink,
1033                         relay,
1034                         node->cls, node, slink->dist,
1035                         conn->state->iocom->sock_fd, relay->state);
1036
1037                 /*
1038                  * Match (created new relay), get the next relay to
1039                  * match against the next slink.
1040                  */
1041                 relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
1042                 if (--count == 0)
1043                         break;
1044         }
1045
1046         /*
1047          * Any remaining relay's belonging to this connection which match
1048          * the node are in excess of the current aggregate spanning state
1049          * and should be removed.
1050          */
1051         while (relay && relay->link->node == node) {
1052                 next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
1053                 dmsg_relay_delete(relay);
1054                 relay = next_relay;
1055         }
1056 }
1057
1058 static
1059 void
1060 dmsg_relay_delete(h2span_relay_t *relay)
1061 {
1062         fprintf(stderr,
1063                 "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
1064                 relay->link,
1065                 relay,
1066                 relay->link->node->cls, relay->link->node,
1067                 relay->link->dist,
1068                 relay->conn->state->iocom->sock_fd, relay->state);
1069
1070         dmsg_router_disconnect(&relay->router);
1071
1072         RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
1073         TAILQ_REMOVE(&relay->link->relayq, relay, entry);
1074
1075         if (relay->state) {
1076                 relay->state->any.relay = NULL;
1077                 dmsg_state_reply(relay->state, 0);
1078                 /* state invalid after reply */
1079                 relay->state = NULL;
1080         }
1081         relay->conn = NULL;
1082         relay->link = NULL;
1083         dmsg_free(relay);
1084 }
1085
1086 static void *
1087 dmsg_volconf_thread(void *info)
1088 {
1089         h2span_media_config_t *conf = info;
1090
1091         pthread_mutex_lock(&cluster_mtx);
1092         while ((conf->ctl & H2CONFCTL_STOP) == 0) {
1093                 if (conf->ctl & H2CONFCTL_UPDATE) {
1094                         fprintf(stderr, "VOLCONF UPDATE\n");
1095                         conf->ctl &= ~H2CONFCTL_UPDATE;
1096                         if (bcmp(&conf->copy_run, &conf->copy_pend,
1097                                  sizeof(conf->copy_run)) == 0) {
1098                                 fprintf(stderr, "VOLCONF: no changes\n");
1099                                 continue;
1100                         }
1101                         /*
1102                          * XXX TODO - auto reconnect on lookup failure or
1103                          *              connect failure or stream failure.
1104                          */
1105
1106                         pthread_mutex_unlock(&cluster_mtx);
1107                         dmsg_volconf_stop(conf);
1108                         conf->copy_run = conf->copy_pend;
1109                         if (conf->copy_run.copyid != 0 &&
1110                             strncmp(conf->copy_run.path, "span:", 5) == 0) {
1111                                 dmsg_volconf_start(conf,
1112                                                       conf->copy_run.path + 5);
1113                         }
1114                         pthread_mutex_lock(&cluster_mtx);
1115                         fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
1116                 }
1117                 if (conf->state == H2MC_CONNECT) {
1118                         dmsg_volconf_start(conf, conf->copy_run.path + 5);
1119                         pthread_mutex_unlock(&cluster_mtx);
1120                         sleep(5);
1121                         pthread_mutex_lock(&cluster_mtx);
1122                 } else {
1123                         pthread_cond_wait(&conf->cond, &cluster_mtx);
1124                 }
1125         }
1126         pthread_mutex_unlock(&cluster_mtx);
1127         dmsg_volconf_stop(conf);
1128         return(NULL);
1129 }
1130
1131 static
1132 void
1133 dmsg_volconf_stop(h2span_media_config_t *conf)
1134 {
1135         switch(conf->state) {
1136         case H2MC_STOPPED:
1137                 break;
1138         case H2MC_CONNECT:
1139                 conf->state = H2MC_STOPPED;
1140                 break;
1141         case H2MC_RUNNING:
1142                 shutdown(conf->fd, SHUT_WR);
1143                 pthread_join(conf->iocom_thread, NULL);
1144                 conf->iocom_thread = NULL;
1145                 break;
1146         }
1147 }
1148
1149 static
1150 void
1151 dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname)
1152 {
1153         dmsg_master_service_info_t *info;
1154
1155         switch(conf->state) {
1156         case H2MC_STOPPED:
1157         case H2MC_CONNECT:
1158                 conf->fd = dmsg_connect(hostname);
1159                 if (conf->fd < 0) {
1160                         fprintf(stderr, "Unable to connect to %s\n", hostname);
1161                         conf->state = H2MC_CONNECT;
1162                 } else {
1163                         info = malloc(sizeof(*info));
1164                         bzero(info, sizeof(*info));
1165                         info->fd = conf->fd;
1166                         info->detachme = 0;
1167                         conf->state = H2MC_RUNNING;
1168                         pthread_create(&conf->iocom_thread, NULL,
1169                                        dmsg_master_service, info);
1170                 }
1171                 break;
1172         case H2MC_RUNNING:
1173                 break;
1174         }
1175 }
1176
1177 /************************************************************************
1178  *                      ROUTER AND MESSAGING HANDLES                    *
1179  ************************************************************************
1180  *
1181  * Basically the idea here is to provide a stable data structure which
1182  * can be localized to the caller for higher level protocols to work with.
1183  * Depends on the context, these dmsg_handle's can be pooled by use-case
1184  * and remain persistent through a client (or mount point's) life.
1185  */
1186
1187 #if 0
1188 /*
1189  * Obtain a stable handle on a cluster given its uuid.  This ties directly
1190  * into the global cluster topology, creating the structure if necessary
1191  * (even if the uuid does not exist or does not exist yet), and preventing
1192  * the structure from getting ripped out from under us while we hold a
1193  * pointer to it.
1194  */
1195 h2span_cluster_t *
1196 dmsg_cluster_get(uuid_t *pfs_clid)
1197 {
1198         h2span_cluster_t dummy_cls;
1199         h2span_cluster_t *cls;
1200
1201         dummy_cls.pfs_clid = *pfs_clid;
1202         pthread_mutex_lock(&cluster_mtx);
1203         cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
1204         if (cls)
1205                 ++cls->refs;
1206         pthread_mutex_unlock(&cluster_mtx);
1207         return (cls);
1208 }
1209
1210 void
1211 dmsg_cluster_put(h2span_cluster_t *cls)
1212 {
1213         pthread_mutex_lock(&cluster_mtx);
1214         assert(cls->refs > 0);
1215         --cls->refs;
1216         if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
1217                 RB_REMOVE(h2span_cluster_tree,
1218                           &cluster_tree, cls);
1219                 dmsg_free(cls);
1220         }
1221         pthread_mutex_unlock(&cluster_mtx);
1222 }
1223
1224 /*
1225  * Obtain a stable handle to a specific cluster node given its uuid.
1226  * This handle does NOT lock in the route to the node and is typically
1227  * used as part of the dmsg_handle_*() API to obtain a set of
1228  * stable nodes.
1229  */
1230 h2span_node_t *
1231 dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
1232 {
1233 }
1234
1235 #endif
1236
1237 #if 0
1238 /*
1239  * Acquire a persistent router structure given the cluster and node ids.
1240  * Messages can be transacted via this structure while held.  If the route
1241  * is lost messages will return failure.
1242  */
1243 dmsg_router_t *
1244 dmsg_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid)
1245 {
1246 }
1247
1248 /*
1249  * Release previously acquired router.
1250  */
1251 void
1252 dmsg_router_put(dmsg_router_t *router)
1253 {
1254 }
1255 #endif
1256
1257 /*
1258  * Dumps the spanning tree
1259  */
1260 void
1261 dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused)
1262 {
1263         h2span_cluster_t *cls;
1264         h2span_node_t *node;
1265         h2span_link_t *slink;
1266         char *uustr = NULL;
1267
1268         pthread_mutex_lock(&cluster_mtx);
1269         RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
1270                 dmsg_router_printf(router, "Cluster %s\n",
1271                                    dmsg_uuid_to_str(&cls->pfs_clid, &uustr));
1272                 RB_FOREACH(node, h2span_node_tree, &cls->tree) {
1273                         dmsg_router_printf(router, "    Node %s (%s)\n",
1274                                 dmsg_uuid_to_str(&node->pfs_fsid, &uustr),
1275                                 node->label);
1276                         RB_FOREACH(slink, h2span_link_tree, &node->tree) {
1277                                 dmsg_router_printf(router,
1278                                             "\tLink dist=%d via %d\n",
1279                                             slink->dist,
1280                                             slink->state->iocom->sock_fd);
1281                         }
1282                 }
1283         }
1284         pthread_mutex_unlock(&cluster_mtx);
1285         if (uustr)
1286                 free(uustr);
1287 #if 0
1288         TAILQ_FOREACH(conn, &connq, entry) {
1289         }
1290 #endif
1291 }