hammer2 - reformualte cluster messaging interface
[dragonfly.git] / lib / libdmsg / msg_lnk.c
CommitLineData
8c280d5d
MD
1/*
2 * Copyright (c) 2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 */
34/*
35 * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS
36 *
37 * This code supports the LNK_SPAN protocol. Essentially all PFS's
38 * clients and services rendezvous with the userland hammer2 service and
39 * open LNK_SPAN transactions using a message header linkid of 0,
40 * registering any PFS's they have connectivity to with us.
41 *
42 * --
43 *
44 * Each registration maintains its own open LNK_SPAN message transaction.
45 * The SPANs are collected, aggregated, and retransmitted over available
46 * connections through the maintainance of additional LNK_SPAN message
47 * transactions on each link.
48 *
49 * The msgid for each active LNK_SPAN transaction we receive allows us to
50 * send a message to the target PFS (which might be one of many belonging
51 * to the same cluster), by specifying that msgid as the linkid in any
52 * message we send to the target PFS.
53 *
54 * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit
55 * (and remember we will maintain multiple open LNK_SPAN transactions on
56 * each connection representing the topology span, so every node sees every
57 * other node as a separate open transaction). So, similarly the msgid for
58 * these active transactions which we initiated can be used by the other
59 * end to route messages through us to another node, ultimately winding up
60 * at the identified hammer2 PFS. We have to adjust the spanid in the message
61 * header at each hop to be representative of the outgoing LNK_SPAN we
62 * are forwarding the message through.
63 *
64 * --
65 *
66 * If we were to retransmit every LNK_SPAN transaction we receive it would
67 * create a huge mess, so we have to aggregate all received LNK_SPAN
68 * transactions, sort them by the fsid (the cluster) and sub-sort them by
69 * the pfs_fsid (individual nodes in the cluster), and only retransmit
7dc0f844 70 * (create outgoing transactions) for a subset of the nearest distance-hops
8c280d5d
MD
71 * for each individual node.
72 *
73 * The higher level protocols can then issue transactions to the nodes making
74 * up a cluster to perform all actions required.
75 *
76 * --
77 *
78 * Since this is a large topology and a spanning tree protocol, links can
79 * go up and down all the time. Any time a link goes down its transaction
80 * is closed. The transaction has to be closed on both ends before we can
81 * delete (and potentially reuse) the related spanid. The LNK_SPAN being
82 * closed may have been propagated out to other connections and those related
83 * LNK_SPANs are also closed. Ultimately all routes via the lost LNK_SPAN
84 * go away, ultimately reaching all sources and all targets.
85 *
86 * Any messages in-transit using a route that goes away will be thrown away.
87 * Open transactions are only tracked at the two end-points. When a link
88 * failure propagates to an end-point the related open transactions lose
89 * their spanid and are automatically aborted.
90 *
91 * It is important to note that internal route nodes cannot just associate
92 * a lost LNK_SPAN transaction with another route to the same destination.
93 * Message transactions MUST be serialized and MUST be ordered. All messages
94 * for a transaction must run over the same route. So if the route used by
95 * an active transaction is lost, the related messages will be fully aborted
96 * and the higher protocol levels will retry as appropriate.
97 *
29ead430
MD
98 * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation
99 * back to the originator. Only the originator keeps tracks of a message.
100 * Routers just pass it through. If a route is lost during transit the
101 * message is simply thrown away.
102 *
8c280d5d
MD
103 * It is also important to note that several paths to the same PFS can be
104 * propagated along the same link, which allows concurrency and even
105 * redundancy over several network interfaces or via different routes through
106 * the topology. Any given transaction will use only a single route but busy
107 * servers will often have hundreds of transactions active simultaniously,
108 * so having multiple active paths through the network topology for A<->B
109 * will improve performance.
110 *
111 * --
112 *
113 * Most protocols consolidate operations rather than simply relaying them.
114 * This is particularly true of LEAF protocols (such as strict HAMMER2
115 * clients), of which there can be millions connecting into the cluster at
116 * various points. The SPAN protocol is not used for these LEAF elements.
117 *
118 * Instead the primary service they connect to implements a proxy for the
119 * client protocols so the core topology only has to propagate a couple of
120 * LNK_SPANs and not millions. LNK_SPANs are meant to be used only for
121 * core master nodes and satellite slaves and cache nodes.
122 */
123
0c3a8cd0 124#include "dmsg_local.h"
8c280d5d 125
cf715800
MD
126/*
127 * Maximum spanning tree distance. This has the practical effect of
128 * stopping tail-chasing closed loops when a feeder span is lost.
129 */
0c3a8cd0 130#define DMSG_SPAN_MAXDIST 16
cf715800 131
8c280d5d
MD
132/*
133 * RED-BLACK TREE DEFINITIONS
134 *
7dc0f844 135 * We need to track:
8c280d5d
MD
136 *
137 * (1) shared fsid's (a cluster).
138 * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions.
139 *
140 * We need to aggegate all active LNK_SPANs, aggregate, and create our own
141 * outgoing LNK_SPAN transactions on each of our connections representing
142 * the aggregated state.
143 *
2063f4d7 144 * h2span_conn - list of iocom connections who wish to receive SPAN
8c280d5d
MD
145 * propagation from other connections. Might contain
146 * a filter string. Only iocom's with an open
147 * LNK_CONN transactions are applicable for SPAN
148 * propagation.
149 *
150 * h2span_relay - List of links relayed (via SPAN). Essentially
151 * each relay structure represents a LNK_SPAN
152 * transaction that we initiated, verses h2span_link
153 * which is a LNK_SPAN transaction that we received.
154 *
155 * --
156 *
157 * h2span_cluster - Organizes the shared fsid's. One structure for
158 * each cluster.
159 *
160 * h2span_node - Organizes the nodes in a cluster. One structure
161 * for each unique {cluster,node}, aka {fsid, pfs_fsid}.
162 *
163 * h2span_link - Organizes all incoming and outgoing LNK_SPAN message
164 * transactions related to a node.
165 *
166 * One h2span_link structure for each incoming LNK_SPAN
167 * transaction. Links selected for propagation back
168 * out are also where the outgoing LNK_SPAN messages
169 * are indexed into (so we can propagate changes).
170 *
171 * The h2span_link's use a red-black tree to sort the
7dc0f844 172 * distance hop metric for the incoming LNK_SPAN. We
8c280d5d
MD
173 * then select the top N for outgoing. When the
174 * topology changes the top N may also change and cause
175 * new outgoing LNK_SPAN transactions to be opened
176 * and less desireable ones to be closed, causing
177 * transactional aborts within the message flow in
178 * the process.
179 *
180 * Also note - All outgoing LNK_SPAN message transactions are also
181 * entered into a red-black tree for use by the routing
182 * function. This is handled by msg.c in the state
183 * code, not here.
184 */
185
186struct h2span_link;
187struct h2span_relay;
1a34728c 188TAILQ_HEAD(h2span_media_queue, h2span_media);
2063f4d7 189TAILQ_HEAD(h2span_conn_queue, h2span_conn);
8c280d5d
MD
190TAILQ_HEAD(h2span_relay_queue, h2span_relay);
191
192RB_HEAD(h2span_cluster_tree, h2span_cluster);
193RB_HEAD(h2span_node_tree, h2span_node);
194RB_HEAD(h2span_link_tree, h2span_link);
195RB_HEAD(h2span_relay_tree, h2span_relay);
196
1a34728c
MD
197/*
198 * This represents a media
199 */
200struct h2span_media {
201 TAILQ_ENTRY(h2span_media) entry;
202 uuid_t mediaid;
203 int refs;
204 struct h2span_media_config {
5bc5bca2
MD
205 dmsg_vol_data_t copy_run;
206 dmsg_vol_data_t copy_pend;
1a34728c
MD
207 pthread_t thread;
208 pthread_cond_t cond;
209 int ctl;
210 int fd;
0c3a8cd0 211 dmsg_iocom_t iocom;
1a34728c
MD
212 pthread_t iocom_thread;
213 enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
0c3a8cd0 214 } config[DMSG_COPYID_COUNT];
1a34728c
MD
215};
216
217typedef struct h2span_media_config h2span_media_config_t;
218
219#define H2CONFCTL_STOP 0x00000001
220#define H2CONFCTL_UPDATE 0x00000002
221
8c280d5d
MD
222/*
223 * Received LNK_CONN transaction enables SPAN protocol over connection.
1a34728c
MD
224 * (may contain filter). Typically one for each mount and several may
225 * share the same media.
8c280d5d 226 */
2063f4d7
MD
227struct h2span_conn {
228 TAILQ_ENTRY(h2span_conn) entry;
8c280d5d 229 struct h2span_relay_tree tree;
1a34728c 230 struct h2span_media *media;
0c3a8cd0 231 dmsg_state_t *state;
8c280d5d
MD
232};
233
234/*
235 * All received LNK_SPANs are organized by cluster (pfs_clid),
236 * node (pfs_fsid), and link (received LNK_SPAN transaction).
237 */
238struct h2span_cluster {
239 RB_ENTRY(h2span_cluster) rbnode;
240 struct h2span_node_tree tree;
241 uuid_t pfs_clid; /* shared fsid */
ddfbb283
MD
242 uint8_t peer_type;
243 char cl_label[128]; /* cluster label (typ PEER_BLOCK) */
90e8cd1d 244 int refs; /* prevents destruction */
8c280d5d
MD
245};
246
7dc0f844 247struct h2span_node {
8c280d5d
MD
248 RB_ENTRY(h2span_node) rbnode;
249 struct h2span_link_tree tree;
250 struct h2span_cluster *cls;
ddfbb283 251 uint8_t pfs_type;
8c280d5d 252 uuid_t pfs_fsid; /* unique fsid */
ddfbb283 253 char fs_label[128]; /* fs label (typ PEER_HAMMER2) */
8c280d5d
MD
254};
255
256struct h2span_link {
257 RB_ENTRY(h2span_link) rbnode;
0c3a8cd0 258 dmsg_state_t *state; /* state<->link */
8c280d5d 259 struct h2span_node *node; /* related node */
7dc0f844 260 int32_t dist;
8c280d5d 261 struct h2span_relay_queue relayq; /* relay out */
0c3a8cd0 262 struct dmsg_router *router; /* route out this link */
8c280d5d
MD
263};
264
265/*
266 * Any LNK_SPAN transactions we receive which are relayed out other
267 * connections utilize this structure to track the LNK_SPAN transaction
268 * we initiate on the other connections, if selected for relay.
269 *
270 * In many respects this is the core of the protocol... actually figuring
271 * out what LNK_SPANs to relay. The spanid used for relaying is the
272 * address of the 'state' structure, which is why h2span_relay has to
2063f4d7 273 * be entered into a RB-TREE based at h2span_conn (so we can look
8c280d5d 274 * up the spanid to validate it).
90e8cd1d
MD
275 *
276 * NOTE: Messages can be received via the LNK_SPAN transaction the
277 * relay maintains, and can be replied via relay->router, but
278 * messages are NOT initiated via a relay. Messages are initiated
279 * via incoming links (h2span_link's).
280 *
281 * relay->link represents the link being relayed, NOT the LNK_SPAN
282 * transaction the relay is holding open.
8c280d5d
MD
283 */
284struct h2span_relay {
2063f4d7 285 RB_ENTRY(h2span_relay) rbnode; /* from h2span_conn */
8c280d5d 286 TAILQ_ENTRY(h2span_relay) entry; /* from link */
2063f4d7 287 struct h2span_conn *conn;
0c3a8cd0 288 dmsg_state_t *state; /* transmitted LNK_SPAN */
90e8cd1d 289 struct h2span_link *link; /* LNK_SPAN being relayed */
0c3a8cd0 290 struct dmsg_router *router;/* route out this relay */
8c280d5d
MD
291};
292
293
1a34728c 294typedef struct h2span_media h2span_media_t;
2063f4d7 295typedef struct h2span_conn h2span_conn_t;
8c280d5d
MD
296typedef struct h2span_cluster h2span_cluster_t;
297typedef struct h2span_node h2span_node_t;
298typedef struct h2span_link h2span_link_t;
299typedef struct h2span_relay h2span_relay_t;
300
ddfbb283
MD
301#define dmsg_termstr(array) _dmsg_termstr((array), sizeof(array))
302
303static __inline
304void
305_dmsg_termstr(char *base, size_t size)
306{
307 base[size-1] = 0;
308}
309
310/*
311 * Cluster peer_type, uuid, AND label must match for a match
312 */
8c280d5d
MD
313static
314int
315h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2)
316{
ddfbb283
MD
317 int r;
318
319 if (cls1->peer_type < cls2->peer_type)
320 return(-1);
321 if (cls1->peer_type > cls2->peer_type)
322 return(1);
323 r = uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL);
324 if (r == 0)
325 r = strcmp(cls1->cl_label, cls2->cl_label);
326
327 return r;
8c280d5d
MD
328}
329
ddfbb283
MD
330/*
331 * Match against the uuid. Currently we never match against the label.
332 */
8c280d5d
MD
333static
334int
335h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
336{
185ace93
MD
337 int r;
338
185ace93 339 r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL);
185ace93 340 return (r);
8c280d5d
MD
341}
342
cf715800 343/*
10c86c4e
MD
344 * Sort/subsort must match h2span_relay_cmp() under any given node
345 * to make the aggregation algorithm easier, so the best links are
346 * in the same sorted order as the best relays.
347 *
348 * NOTE: We cannot use link*->state->msgid because this msgid is created
349 * by each remote host and thus might wind up being the same.
cf715800 350 */
8c280d5d
MD
351static
352int
353h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
354{
7dc0f844 355 if (link1->dist < link2->dist)
8c280d5d 356 return(-1);
7dc0f844 357 if (link1->dist > link2->dist)
8c280d5d 358 return(1);
10c86c4e
MD
359#if 1
360 if ((uintptr_t)link1->state < (uintptr_t)link2->state)
361 return(-1);
362 if ((uintptr_t)link1->state > (uintptr_t)link2->state)
363 return(1);
364#else
29ead430 365 if (link1->state->msgid < link2->state->msgid)
8c280d5d 366 return(-1);
29ead430 367 if (link1->state->msgid > link2->state->msgid)
8c280d5d 368 return(1);
10c86c4e 369#endif
8c280d5d
MD
370 return(0);
371}
372
7dc0f844
MD
373/*
374 * Relay entries are sorted by node, subsorted by distance and link
375 * address (so we can match up the conn->tree relay topology with
376 * a node's link topology).
377 */
8c280d5d
MD
378static
379int
380h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
381{
29ead430
MD
382 h2span_link_t *link1 = relay1->link;
383 h2span_link_t *link2 = relay2->link;
384
385 if ((intptr_t)link1->node < (intptr_t)link2->node)
7dc0f844 386 return(-1);
29ead430 387 if ((intptr_t)link1->node > (intptr_t)link2->node)
7dc0f844 388 return(1);
29ead430 389 if (link1->dist < link2->dist)
8c280d5d 390 return(-1);
29ead430 391 if (link1->dist > link2->dist)
7dc0f844 392 return(1);
10c86c4e
MD
393#if 1
394 if ((uintptr_t)link1->state < (uintptr_t)link2->state)
395 return(-1);
396 if ((uintptr_t)link1->state > (uintptr_t)link2->state)
397 return(1);
398#else
29ead430 399 if (link1->state->msgid < link2->state->msgid)
7dc0f844 400 return(-1);
29ead430 401 if (link1->state->msgid > link2->state->msgid)
8c280d5d 402 return(1);
10c86c4e 403#endif
8c280d5d
MD
404 return(0);
405}
406
407RB_PROTOTYPE_STATIC(h2span_cluster_tree, h2span_cluster,
408 rbnode, h2span_cluster_cmp);
409RB_PROTOTYPE_STATIC(h2span_node_tree, h2span_node,
410 rbnode, h2span_node_cmp);
411RB_PROTOTYPE_STATIC(h2span_link_tree, h2span_link,
412 rbnode, h2span_link_cmp);
413RB_PROTOTYPE_STATIC(h2span_relay_tree, h2span_relay,
414 rbnode, h2span_relay_cmp);
415
416RB_GENERATE_STATIC(h2span_cluster_tree, h2span_cluster,
417 rbnode, h2span_cluster_cmp);
418RB_GENERATE_STATIC(h2span_node_tree, h2span_node,
419 rbnode, h2span_node_cmp);
420RB_GENERATE_STATIC(h2span_link_tree, h2span_link,
421 rbnode, h2span_link_cmp);
422RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay,
423 rbnode, h2span_relay_cmp);
424
425/*
1a34728c 426 * Global mutex protects cluster_tree lookups, connq, mediaq.
8c280d5d
MD
427 */
428static pthread_mutex_t cluster_mtx;
429static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
2063f4d7 430static struct h2span_conn_queue connq = TAILQ_HEAD_INITIALIZER(connq);
1a34728c 431static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq);
8c280d5d 432
0c3a8cd0
MD
433static void dmsg_lnk_span(dmsg_msg_t *msg);
434static void dmsg_lnk_conn(dmsg_msg_t *msg);
435static void dmsg_lnk_relay(dmsg_msg_t *msg);
436static void dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node);
437static void dmsg_relay_delete(h2span_relay_t *relay);
8c280d5d 438
0c3a8cd0
MD
439static void *dmsg_volconf_thread(void *info);
440static void dmsg_volconf_stop(h2span_media_config_t *conf);
441static void dmsg_volconf_start(h2span_media_config_t *conf,
1a34728c
MD
442 const char *hostname);
443
29ead430 444void
0c3a8cd0 445dmsg_msg_lnk_signal(dmsg_router_t *router __unused)
29ead430
MD
446{
447 pthread_mutex_lock(&cluster_mtx);
0c3a8cd0 448 dmsg_relay_scan(NULL, NULL);
29ead430
MD
449 pthread_mutex_unlock(&cluster_mtx);
450}
451
8c280d5d 452/*
0c3a8cd0 453 * Receive a DMSG_PROTO_LNK message. This only called for
8c280d5d
MD
454 * one-way and opening-transactions since state->func will be assigned
455 * in all other cases.
456 */
457void
0c3a8cd0 458dmsg_msg_lnk(dmsg_msg_t *msg)
8c280d5d 459{
5bc5bca2
MD
460 switch(msg->any.head.cmd & DMSGF_BASECMDMASK) {
461 case DMSG_LNK_CONN:
0c3a8cd0 462 dmsg_lnk_conn(msg);
8c280d5d 463 break;
5bc5bca2 464 case DMSG_LNK_SPAN:
0c3a8cd0 465 dmsg_lnk_span(msg);
8c280d5d
MD
466 break;
467 default:
468 fprintf(stderr,
469 "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
0c3a8cd0 470 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
8c280d5d
MD
471 /* state invalid after reply */
472 break;
473 }
474}
475
476void
0c3a8cd0 477dmsg_lnk_conn(dmsg_msg_t *msg)
8c280d5d 478{
0c3a8cd0 479 dmsg_state_t *state = msg->state;
1a34728c
MD
480 h2span_media_t *media;
481 h2span_media_config_t *conf;
2063f4d7 482 h2span_conn_t *conn;
8c280d5d
MD
483 h2span_relay_t *relay;
484 char *alloc = NULL;
1a34728c 485 int i;
8c280d5d
MD
486
487 pthread_mutex_lock(&cluster_mtx);
488
5bc5bca2
MD
489 switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
490 case DMSG_LNK_CONN | DMSGF_CREATE:
491 case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
1a34728c 492 /*
2063f4d7 493 * On transaction start we allocate a new h2span_conn and
1a34728c
MD
494 * acknowledge the request, leaving the transaction open.
495 * We then relay priority-selected SPANs.
496 */
ddfbb283 497 fprintf(stderr, "LNK_CONN(%08x): %s/%s/%s\n",
81666e1b 498 (uint32_t)msg->any.head.msgid,
0c3a8cd0 499 dmsg_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
8c280d5d 500 &alloc),
ddfbb283
MD
501 msg->any.lnk_conn.cl_label,
502 msg->any.lnk_conn.fs_label);
8c280d5d
MD
503 free(alloc);
504
0c3a8cd0 505 conn = dmsg_alloc(sizeof(*conn));
8c280d5d
MD
506
507 RB_INIT(&conn->tree);
508 conn->state = state;
0c3a8cd0 509 state->func = dmsg_lnk_conn;
8c280d5d
MD
510 state->any.conn = conn;
511 TAILQ_INSERT_TAIL(&connq, conn, entry);
512
02454b3e 513 /*
1a34728c 514 * Set up media
02454b3e 515 */
1a34728c
MD
516 TAILQ_FOREACH(media, &mediaq, entry) {
517 if (uuid_compare(&msg->any.lnk_conn.mediaid,
518 &media->mediaid, NULL) == 0) {
519 break;
520 }
521 }
522 if (media == NULL) {
0c3a8cd0 523 media = dmsg_alloc(sizeof(*media));
1a34728c
MD
524 media->mediaid = msg->any.lnk_conn.mediaid;
525 TAILQ_INSERT_TAIL(&mediaq, media, entry);
526 }
527 conn->media = media;
528 ++media->refs;
8c280d5d 529
5bc5bca2 530 if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
0c3a8cd0
MD
531 dmsg_msg_result(msg, 0);
532 dmsg_router_signal(msg->router);
1a34728c
MD
533 break;
534 }
535 /* FALL THROUGH */
5bc5bca2
MD
536 case DMSG_LNK_CONN | DMSGF_DELETE:
537 case DMSG_LNK_ERROR | DMSGF_DELETE:
1a34728c
MD
538deleteconn:
539 /*
2063f4d7 540 * On transaction terminate we clean out our h2span_conn
1a34728c
MD
541 * and acknowledge the request, closing the transaction.
542 */
8c280d5d
MD
543 fprintf(stderr, "LNK_CONN: Terminated\n");
544 conn = state->any.conn;
545 assert(conn);
7dc0f844 546
1a34728c
MD
547 /*
548 * Clean out the media structure. If refs drops to zero we
549 * also clean out the media config threads. These threads
550 * maintain span connections to other hammer2 service daemons.
551 */
552 media = conn->media;
553 if (--media->refs == 0) {
554 fprintf(stderr, "Shutting down media spans\n");
0c3a8cd0 555 for (i = 0; i < DMSG_COPYID_COUNT; ++i) {
1a34728c
MD
556 conf = &media->config[i];
557
558 if (conf->thread == NULL)
559 continue;
560 conf->ctl = H2CONFCTL_STOP;
561 pthread_cond_signal(&conf->cond);
562 }
0c3a8cd0 563 for (i = 0; i < DMSG_COPYID_COUNT; ++i) {
1a34728c
MD
564 conf = &media->config[i];
565
566 if (conf->thread == NULL)
567 continue;
568 pthread_mutex_unlock(&cluster_mtx);
569 pthread_join(conf->thread, NULL);
570 pthread_mutex_lock(&cluster_mtx);
571 conf->thread = NULL;
572 pthread_cond_destroy(&conf->cond);
573 }
574 fprintf(stderr, "Media shutdown complete\n");
575 TAILQ_REMOVE(&mediaq, media, entry);
0c3a8cd0 576 dmsg_free(media);
1a34728c
MD
577 }
578
7dc0f844
MD
579 /*
580 * Clean out all relays. This requires terminating each
581 * relay transaction.
582 */
8c280d5d 583 while ((relay = RB_ROOT(&conn->tree)) != NULL) {
0c3a8cd0 584 dmsg_relay_delete(relay);
8c280d5d
MD
585 }
586
587 /*
588 * Clean out conn
589 */
1a34728c 590 conn->media = NULL;
8c280d5d
MD
591 conn->state = NULL;
592 msg->state->any.conn = NULL;
593 TAILQ_REMOVE(&connq, conn, entry);
0c3a8cd0 594 dmsg_free(conn);
8c280d5d 595
0c3a8cd0 596 dmsg_msg_reply(msg, 0);
8c280d5d 597 /* state invalid after reply */
1a34728c 598 break;
5bc5bca2 599 case DMSG_LNK_VOLCONF:
1a34728c
MD
600 /*
601 * One-way volume-configuration message is transmitted
602 * over the open LNK_CONN transaction.
603 */
604 fprintf(stderr, "RECEIVED VOLCONF\n");
605 if (msg->any.lnk_volconf.index < 0 ||
0c3a8cd0 606 msg->any.lnk_volconf.index >= DMSG_COPYID_COUNT) {
1a34728c
MD
607 fprintf(stderr, "VOLCONF: ILLEGAL INDEX %d\n",
608 msg->any.lnk_volconf.index);
609 break;
610 }
611 if (msg->any.lnk_volconf.copy.path[sizeof(msg->any.lnk_volconf.copy.path) - 1] != 0 ||
612 msg->any.lnk_volconf.copy.path[0] == 0) {
613 fprintf(stderr, "VOLCONF: ILLEGAL PATH %d\n",
614 msg->any.lnk_volconf.index);
615 break;
616 }
617 conn = msg->state->any.conn;
618 if (conn == NULL) {
619 fprintf(stderr, "VOLCONF: LNK_CONN is missing\n");
620 break;
621 }
622 conf = &conn->media->config[msg->any.lnk_volconf.index];
623 conf->copy_pend = msg->any.lnk_volconf.copy;
624 conf->ctl |= H2CONFCTL_UPDATE;
625 if (conf->thread == NULL) {
626 fprintf(stderr, "VOLCONF THREAD STARTED\n");
627 pthread_cond_init(&conf->cond, NULL);
628 pthread_create(&conf->thread, NULL,
0c3a8cd0 629 dmsg_volconf_thread, (void *)conf);
1a34728c
MD
630 }
631 pthread_cond_signal(&conf->cond);
632 break;
633 default:
634 /*
635 * Failsafe
636 */
5bc5bca2 637 if (msg->any.head.cmd & DMSGF_DELETE)
1a34728c 638 goto deleteconn;
0c3a8cd0 639 dmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
1a34728c 640 break;
8c280d5d
MD
641 }
642 pthread_mutex_unlock(&cluster_mtx);
643}
644
645void
0c3a8cd0 646dmsg_lnk_span(dmsg_msg_t *msg)
8c280d5d 647{
0c3a8cd0 648 dmsg_state_t *state = msg->state;
8c280d5d
MD
649 h2span_cluster_t dummy_cls;
650 h2span_node_t dummy_node;
651 h2span_cluster_t *cls;
652 h2span_node_t *node;
653 h2span_link_t *slink;
654 h2span_relay_t *relay;
655 char *alloc = NULL;
656
5bc5bca2 657 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
29ead430 658
8c280d5d
MD
659 pthread_mutex_lock(&cluster_mtx);
660
661 /*
662 * On transaction start we initialize the tracking infrastructure
663 */
5bc5bca2 664 if (msg->any.head.cmd & DMSGF_CREATE) {
29ead430 665 assert(state->func == NULL);
0c3a8cd0 666 state->func = dmsg_lnk_span;
8c280d5d 667
ddfbb283
MD
668 dmsg_termstr(msg->any.lnk_span.cl_label);
669 dmsg_termstr(msg->any.lnk_span.fs_label);
81666e1b 670
8c280d5d
MD
671 /*
672 * Find the cluster
673 */
674 dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid;
ddfbb283
MD
675 dummy_cls.peer_type = msg->any.lnk_span.peer_type;
676 bcopy(msg->any.lnk_span.cl_label,
677 dummy_cls.cl_label,
678 sizeof(dummy_cls.cl_label));
8c280d5d
MD
679 cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
680 if (cls == NULL) {
0c3a8cd0 681 cls = dmsg_alloc(sizeof(*cls));
8c280d5d 682 cls->pfs_clid = msg->any.lnk_span.pfs_clid;
ddfbb283
MD
683 cls->peer_type = msg->any.lnk_span.peer_type;
684 bcopy(msg->any.lnk_span.cl_label,
685 cls->cl_label,
686 sizeof(cls->cl_label));
8c280d5d
MD
687 RB_INIT(&cls->tree);
688 RB_INSERT(h2span_cluster_tree, &cluster_tree, cls);
689 }
690
691 /*
692 * Find the node
693 */
694 dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid;
695 node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node);
696 if (node == NULL) {
0c3a8cd0 697 node = dmsg_alloc(sizeof(*node));
8c280d5d 698 node->pfs_fsid = msg->any.lnk_span.pfs_fsid;
ddfbb283
MD
699 bcopy(msg->any.lnk_span.fs_label,
700 node->fs_label,
701 sizeof(node->fs_label));
8c280d5d
MD
702 node->cls = cls;
703 RB_INIT(&node->tree);
704 RB_INSERT(h2span_node_tree, &cls->tree, node);
705 }
706
707 /*
708 * Create the link
709 */
710 assert(state->any.link == NULL);
0c3a8cd0 711 slink = dmsg_alloc(sizeof(*slink));
7dc0f844 712 TAILQ_INIT(&slink->relayq);
8c280d5d 713 slink->node = node;
7dc0f844 714 slink->dist = msg->any.lnk_span.dist;
8c280d5d
MD
715 slink->state = state;
716 state->any.link = slink;
29ead430
MD
717
718 /*
719 * Embedded router structure in link for message forwarding.
90e8cd1d
MD
720 *
721 * The spanning id for the router is the message id of
722 * the SPAN link it is embedded in, allowing messages to
723 * be routed via &slink->router.
29ead430 724 */
0c3a8cd0 725 slink->router = dmsg_router_alloc();
90e8cd1d
MD
726 slink->router->iocom = state->iocom;
727 slink->router->link = slink;
10c86c4e 728 slink->router->target = state->msgid;
0c3a8cd0 729 dmsg_router_connect(slink->router);
29ead430 730
8c280d5d
MD
731 RB_INSERT(h2span_link_tree, &node->tree, slink);
732
ddfbb283
MD
733 fprintf(stderr,
734 "LNK_SPAN(thr %p): %p %s cl=%s fs=%s dist=%d\n",
29ead430
MD
735 msg->router->iocom,
736 slink,
ddfbb283
MD
737 dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid, &alloc),
738 msg->any.lnk_span.cl_label,
739 msg->any.lnk_span.fs_label,
29ead430
MD
740 msg->any.lnk_span.dist);
741 free(alloc);
29ead430 742#if 0
0c3a8cd0 743 dmsg_relay_scan(NULL, node);
29ead430 744#endif
0c3a8cd0 745 dmsg_router_signal(msg->router);
8c280d5d
MD
746 }
747
748 /*
749 * On transaction terminate we remove the tracking infrastructure.
750 */
5bc5bca2 751 if (msg->any.head.cmd & DMSGF_DELETE) {
8c280d5d
MD
752 slink = state->any.link;
753 assert(slink != NULL);
754 node = slink->node;
755 cls = node->cls;
756
ddfbb283 757 fprintf(stderr, "LNK_DELE(thr %p): %p %s cl=%s fs=%s dist=%d\n",
29ead430
MD
758 msg->router->iocom,
759 slink,
0c3a8cd0 760 dmsg_uuid_to_str(&cls->pfs_clid, &alloc),
ddfbb283
MD
761 state->msg->any.lnk_span.cl_label,
762 state->msg->any.lnk_span.fs_label,
29ead430
MD
763 state->msg->any.lnk_span.dist);
764 free(alloc);
765
90e8cd1d
MD
766 /*
767 * Remove the router from consideration
768 */
0c3a8cd0 769 dmsg_router_disconnect(&slink->router);
90e8cd1d 770
8c280d5d 771 /*
7dc0f844
MD
772 * Clean out all relays. This requires terminating each
773 * relay transaction.
8c280d5d
MD
774 */
775 while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) {
0c3a8cd0 776 dmsg_relay_delete(relay);
8c280d5d
MD
777 }
778
779 /*
780 * Clean out the topology
781 */
782 RB_REMOVE(h2span_link_tree, &node->tree, slink);
783 if (RB_EMPTY(&node->tree)) {
784 RB_REMOVE(h2span_node_tree, &cls->tree, node);
90e8cd1d 785 if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
8c280d5d
MD
786 RB_REMOVE(h2span_cluster_tree,
787 &cluster_tree, cls);
0c3a8cd0 788 dmsg_free(cls);
8c280d5d
MD
789 }
790 node->cls = NULL;
0c3a8cd0 791 dmsg_free(node);
7dc0f844 792 node = NULL;
8c280d5d
MD
793 }
794 state->any.link = NULL;
795 slink->state = NULL;
796 slink->node = NULL;
0c3a8cd0 797 dmsg_free(slink);
7dc0f844
MD
798
799 /*
800 * We have to terminate the transaction
801 */
0c3a8cd0 802 dmsg_state_reply(state, 0);
7dc0f844
MD
803 /* state invalid after reply */
804
805 /*
806 * If the node still exists issue any required updates. If
807 * it doesn't then all related relays have already been
808 * removed and there's nothing left to do.
809 */
29ead430 810#if 0
7dc0f844 811 if (node)
0c3a8cd0 812 dmsg_relay_scan(NULL, node);
29ead430
MD
813#endif
814 if (node)
0c3a8cd0 815 dmsg_router_signal(msg->router);
8c280d5d
MD
816 }
817
818 pthread_mutex_unlock(&cluster_mtx);
819}
820
821/*
7dc0f844
MD
822 * Messages received on relay SPANs. These are open transactions so it is
823 * in fact possible for the other end to close the transaction.
824 *
825 * XXX MPRACE on state structure
826 */
827static void
0c3a8cd0 828dmsg_lnk_relay(dmsg_msg_t *msg)
7dc0f844 829{
0c3a8cd0 830 dmsg_state_t *state = msg->state;
7dc0f844
MD
831 h2span_relay_t *relay;
832
5bc5bca2 833 assert(msg->any.head.cmd & DMSGF_REPLY);
29ead430 834
5bc5bca2 835 if (msg->any.head.cmd & DMSGF_DELETE) {
7dc0f844
MD
836 pthread_mutex_lock(&cluster_mtx);
837 if ((relay = state->any.relay) != NULL) {
0c3a8cd0 838 dmsg_relay_delete(relay);
7dc0f844 839 } else {
0c3a8cd0 840 dmsg_state_reply(state, 0);
7dc0f844
MD
841 }
842 pthread_mutex_unlock(&cluster_mtx);
843 }
844}
845
846/*
847 * Update relay transactions for SPANs.
848 *
849 * Called with cluster_mtx held.
850 */
0c3a8cd0 851static void dmsg_relay_scan_specific(h2span_node_t *node,
2063f4d7 852 h2span_conn_t *conn);
7dc0f844
MD
853
854static void
0c3a8cd0 855dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node)
7dc0f844
MD
856{
857 h2span_cluster_t *cls;
7dc0f844
MD
858
859 if (node) {
860 /*
861 * Iterate specific node
862 */
863 TAILQ_FOREACH(conn, &connq, entry)
0c3a8cd0 864 dmsg_relay_scan_specific(node, conn);
7dc0f844
MD
865 } else {
866 /*
02454b3e 867 * Full iteration.
7dc0f844 868 *
02454b3e
MD
869 * Iterate cluster ids, nodes, and either a specific connection
870 * or all connections.
7dc0f844 871 */
7dc0f844
MD
872 RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
873 /*
874 * Iterate node ids
875 */
876 RB_FOREACH(node, h2span_node_tree, &cls->tree) {
877 /*
878 * Synchronize the node's link (received SPANs)
879 * with each connection's relays.
880 */
02454b3e 881 if (conn) {
0c3a8cd0 882 dmsg_relay_scan_specific(node, conn);
02454b3e
MD
883 } else {
884 TAILQ_FOREACH(conn, &connq, entry) {
0c3a8cd0 885 dmsg_relay_scan_specific(node,
02454b3e
MD
886 conn);
887 }
888 assert(conn == NULL);
889 }
7dc0f844
MD
890 }
891 }
892 }
893}
894
895/*
896 * Update the relay'd SPANs for this (node, conn).
897 *
898 * Iterate links and adjust relays to match. We only propagate the top link
899 * for now (XXX we want to propagate the top two).
900 *
0c3a8cd0 901 * The dmsg_relay_scan_cmp() function locates the first relay element
7dc0f844 902 * for any given node. The relay elements will be sub-sorted by dist.
8c280d5d 903 */
7dc0f844
MD
904struct relay_scan_info {
905 h2span_node_t *node;
906 h2span_relay_t *relay;
907};
908
909static int
0c3a8cd0 910dmsg_relay_scan_cmp(h2span_relay_t *relay, void *arg)
7dc0f844
MD
911{
912 struct relay_scan_info *info = arg;
913
914 if ((intptr_t)relay->link->node < (intptr_t)info->node)
915 return(-1);
916 if ((intptr_t)relay->link->node > (intptr_t)info->node)
917 return(1);
918 return(0);
919}
920
921static int
0c3a8cd0 922dmsg_relay_scan_callback(h2span_relay_t *relay, void *arg)
7dc0f844
MD
923{
924 struct relay_scan_info *info = arg;
925
926 info->relay = relay;
927 return(-1);
928}
929
8c280d5d 930static void
0c3a8cd0 931dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
8c280d5d 932{
7dc0f844
MD
933 struct relay_scan_info info;
934 h2span_relay_t *relay;
935 h2span_relay_t *next_relay;
936 h2span_link_t *slink;
5bc5bca2 937 dmsg_lnk_conn_t *lconn;
ddfbb283 938 dmsg_lnk_span_t *lspan;
0c3a8cd0 939 dmsg_msg_t *msg;
7dc0f844
MD
940 int count = 2;
941
942 info.node = node;
943 info.relay = NULL;
944
945 /*
29ead430
MD
946 * Locate the first related relay for the node on this connection.
947 * relay will be NULL if there were none.
7dc0f844
MD
948 */
949 RB_SCAN(h2span_relay_tree, &conn->tree,
0c3a8cd0 950 dmsg_relay_scan_cmp, dmsg_relay_scan_callback, &info);
7dc0f844 951 relay = info.relay;
cf715800
MD
952 info.relay = NULL;
953 if (relay)
954 assert(relay->link->node == node);
7dc0f844 955
0c3a8cd0 956 if (DMsgDebugOpt > 8)
81666e1b 957 fprintf(stderr, "relay scan for connection %p\n", conn);
7dc0f844
MD
958
959 /*
960 * Iterate the node's links (received SPANs) in distance order,
961 * lowest (best) dist first.
2063f4d7
MD
962 *
963 * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION.
964 *
965 * Track relays while iterating the best links and construct
966 * missing relays when necessary.
967 *
968 * (If some prior better link was removed it would have also
969 * removed the relay, so the relay can only match exactly or
970 * be worse).
7dc0f844
MD
971 */
972 RB_FOREACH(slink, h2span_link_tree, &node->tree) {
29ead430 973 /*
2063f4d7
MD
974 * Match, relay already in-place, get the next
975 * relay to match against the next slink.
7dc0f844 976 */
cf715800 977 if (relay && relay->link == slink) {
cf715800
MD
978 relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
979 if (--count == 0)
980 break;
2063f4d7
MD
981 continue;
982 }
983
984 /*
985 * We might want this SLINK, if it passes our filters.
986 *
987 * The spanning tree can cause closed loops so we have
988 * to limit slink->dist.
989 */
0c3a8cd0 990 if (slink->dist > DMSG_SPAN_MAXDIST)
29ead430 991 break;
2063f4d7
MD
992
993 /*
994 * Don't bother transmitting a LNK_SPAN out the same
995 * connection it came in on. Trivial optimization.
996 */
997 if (slink->state->iocom == conn->state->iocom)
c1963fb2 998 break;
7dc0f844 999
2063f4d7
MD
1000 /*
1001 * NOTE ON FILTERS: The protocol spec allows non-requested
1002 * SPANs to be transmitted, the other end is expected to
1003 * leave their transactions open but otherwise ignore them.
1004 *
1005 * Don't bother transmitting if the remote connection
1006 * is not accepting this SPAN's peer_type.
1007 */
ddfbb283 1008 lspan = &slink->state->msg->any.lnk_span;
2063f4d7 1009 lconn = &conn->state->msg->any.lnk_conn;
ddfbb283 1010 if (((1LLU << lspan->peer_type) & lconn->peer_mask) == 0)
2063f4d7
MD
1011 break;
1012
1013 /*
ddfbb283 1014 * Do not give pure clients visibility to other pure clients
2063f4d7 1015 */
ddfbb283
MD
1016 if (lconn->pfs_type == DMSG_PFSTYPE_CLIENT &&
1017 lspan->pfs_type == DMSG_PFSTYPE_CLIENT) {
1018 break;
1019 }
1020
1021 /*
1022 * Connection filter, if cluster uuid is not NULL it must
1023 * match the span cluster uuid. Only applies when the
1024 * peer_type matches.
1025 */
1026 if (lspan->peer_type == lconn->peer_type &&
1027 !uuid_is_nil(&lconn->pfs_clid, NULL) &&
1028 uuid_compare(&slink->node->cls->pfs_clid,
1029 &lconn->pfs_clid, NULL)) {
1030 break;
7dc0f844 1031 }
2063f4d7 1032
ddfbb283
MD
1033 /*
1034 * Connection filter, if cluster label is not empty it must
1035 * match the span cluster label. Only applies when the
1036 * peer_type matches.
1037 */
1038 if (lspan->peer_type == lconn->peer_type &&
1039 lconn->cl_label[0] &&
1040 strcmp(lconn->cl_label, slink->node->cls->cl_label)) {
1041 break;
1042 }
1043
1044 /*
1045 * NOTE! fs_uuid differentiates nodes within the same cluster
1046 * so we obviously don't want to match those. Similarly
1047 * for fs_label.
1048 */
1049
2063f4d7
MD
1050 /*
1051 * Ok, we've accepted this SPAN for relaying.
1052 */
1053 assert(relay == NULL ||
1054 relay->link->node != slink->node ||
1055 relay->link->dist >= slink->dist);
0c3a8cd0 1056 relay = dmsg_alloc(sizeof(*relay));
2063f4d7
MD
1057 relay->conn = conn;
1058 relay->link = slink;
1059
0c3a8cd0 1060 msg = dmsg_msg_alloc(conn->state->iocom->router, 0,
5bc5bca2
MD
1061 DMSG_LNK_SPAN |
1062 DMSGF_CREATE,
0c3a8cd0 1063 dmsg_lnk_relay, relay);
2063f4d7 1064 relay->state = msg->state;
0c3a8cd0 1065 relay->router = dmsg_router_alloc();
2063f4d7
MD
1066 relay->router->iocom = relay->state->iocom;
1067 relay->router->relay = relay;
1068 relay->router->target = relay->state->msgid;
1069
1070 msg->any.lnk_span = slink->state->msg->any.lnk_span;
1071 msg->any.lnk_span.dist = slink->dist + 1;
1072
0c3a8cd0 1073 dmsg_router_connect(relay->router);
2063f4d7
MD
1074
1075 RB_INSERT(h2span_relay_tree, &conn->tree, relay);
1076 TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
1077
0c3a8cd0 1078 dmsg_msg_write(msg);
2063f4d7
MD
1079
1080 fprintf(stderr,
1081 "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d "
1082 "FD %d state %p\n",
1083 slink,
1084 relay,
1085 node->cls, node, slink->dist,
1086 conn->state->iocom->sock_fd, relay->state);
1087
1088 /*
1089 * Match (created new relay), get the next relay to
1090 * match against the next slink.
1091 */
1092 relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
1093 if (--count == 0)
1094 break;
7dc0f844
MD
1095 }
1096
1097 /*
1098 * Any remaining relay's belonging to this connection which match
1099 * the node are in excess of the current aggregate spanning state
1100 * and should be removed.
1101 */
1102 while (relay && relay->link->node == node) {
1103 next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
0c3a8cd0 1104 dmsg_relay_delete(relay);
7dc0f844
MD
1105 relay = next_relay;
1106 }
1107}
1108
1109static
1110void
0c3a8cd0 1111dmsg_relay_delete(h2span_relay_t *relay)
7dc0f844 1112{
81666e1b 1113 fprintf(stderr,
29ead430
MD
1114 "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
1115 relay->link,
1116 relay,
7dc0f844 1117 relay->link->node->cls, relay->link->node,
cf715800 1118 relay->link->dist,
7dc0f844 1119 relay->conn->state->iocom->sock_fd, relay->state);
7dc0f844 1120
0c3a8cd0 1121 dmsg_router_disconnect(&relay->router);
90e8cd1d 1122
7dc0f844
MD
1123 RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
1124 TAILQ_REMOVE(&relay->link->relayq, relay, entry);
1125
1126 if (relay->state) {
1127 relay->state->any.relay = NULL;
0c3a8cd0 1128 dmsg_state_reply(relay->state, 0);
7dc0f844
MD
1129 /* state invalid after reply */
1130 relay->state = NULL;
1131 }
1132 relay->conn = NULL;
1133 relay->link = NULL;
0c3a8cd0 1134 dmsg_free(relay);
8c280d5d 1135}
81666e1b 1136
1a34728c 1137static void *
0c3a8cd0 1138dmsg_volconf_thread(void *info)
1a34728c
MD
1139{
1140 h2span_media_config_t *conf = info;
1141
1142 pthread_mutex_lock(&cluster_mtx);
1143 while ((conf->ctl & H2CONFCTL_STOP) == 0) {
1144 if (conf->ctl & H2CONFCTL_UPDATE) {
1145 fprintf(stderr, "VOLCONF UPDATE\n");
1146 conf->ctl &= ~H2CONFCTL_UPDATE;
1147 if (bcmp(&conf->copy_run, &conf->copy_pend,
1148 sizeof(conf->copy_run)) == 0) {
1149 fprintf(stderr, "VOLCONF: no changes\n");
1150 continue;
1151 }
1152 /*
1153 * XXX TODO - auto reconnect on lookup failure or
1154 * connect failure or stream failure.
1155 */
1156
1157 pthread_mutex_unlock(&cluster_mtx);
0c3a8cd0 1158 dmsg_volconf_stop(conf);
1a34728c
MD
1159 conf->copy_run = conf->copy_pend;
1160 if (conf->copy_run.copyid != 0 &&
1161 strncmp(conf->copy_run.path, "span:", 5) == 0) {
0c3a8cd0 1162 dmsg_volconf_start(conf,
1a34728c
MD
1163 conf->copy_run.path + 5);
1164 }
1165 pthread_mutex_lock(&cluster_mtx);
1166 fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
1167 }
1168 if (conf->state == H2MC_CONNECT) {
0c3a8cd0 1169 dmsg_volconf_start(conf, conf->copy_run.path + 5);
1a34728c
MD
1170 pthread_mutex_unlock(&cluster_mtx);
1171 sleep(5);
1172 pthread_mutex_lock(&cluster_mtx);
1173 } else {
1174 pthread_cond_wait(&conf->cond, &cluster_mtx);
1175 }
1176 }
1177 pthread_mutex_unlock(&cluster_mtx);
0c3a8cd0 1178 dmsg_volconf_stop(conf);
1a34728c
MD
1179 return(NULL);
1180}
1181
1182static
1183void
0c3a8cd0 1184dmsg_volconf_stop(h2span_media_config_t *conf)
1a34728c
MD
1185{
1186 switch(conf->state) {
1187 case H2MC_STOPPED:
1188 break;
1189 case H2MC_CONNECT:
1190 conf->state = H2MC_STOPPED;
1191 break;
1192 case H2MC_RUNNING:
1193 shutdown(conf->fd, SHUT_WR);
1194 pthread_join(conf->iocom_thread, NULL);
1195 conf->iocom_thread = NULL;
1196 break;
1197 }
1198}
1199
1200static
1201void
0c3a8cd0 1202dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname)
1a34728c 1203{
0c3a8cd0 1204 dmsg_master_service_info_t *info;
e1648a68 1205
1a34728c
MD
1206 switch(conf->state) {
1207 case H2MC_STOPPED:
1208 case H2MC_CONNECT:
0c3a8cd0 1209 conf->fd = dmsg_connect(hostname);
1a34728c
MD
1210 if (conf->fd < 0) {
1211 fprintf(stderr, "Unable to connect to %s\n", hostname);
1212 conf->state = H2MC_CONNECT;
1213 } else {
e1648a68
MD
1214 info = malloc(sizeof(*info));
1215 bzero(info, sizeof(*info));
1216 info->fd = conf->fd;
1217 info->detachme = 0;
1a34728c 1218 conf->state = H2MC_RUNNING;
e1648a68 1219 pthread_create(&conf->iocom_thread, NULL,
0c3a8cd0 1220 dmsg_master_service, info);
1a34728c
MD
1221 }
1222 break;
1223 case H2MC_RUNNING:
1224 break;
1225 }
1226}
1227
29ead430 1228/************************************************************************
90e8cd1d 1229 * ROUTER AND MESSAGING HANDLES *
29ead430
MD
1230 ************************************************************************
1231 *
90e8cd1d
MD
1232 * Basically the idea here is to provide a stable data structure which
1233 * can be localized to the caller for higher level protocols to work with.
0c3a8cd0 1234 * Depends on the context, these dmsg_handle's can be pooled by use-case
90e8cd1d
MD
1235 * and remain persistent through a client (or mount point's) life.
1236 */
1237
1238#if 0
1239/*
1240 * Obtain a stable handle on a cluster given its uuid. This ties directly
1241 * into the global cluster topology, creating the structure if necessary
1242 * (even if the uuid does not exist or does not exist yet), and preventing
1243 * the structure from getting ripped out from under us while we hold a
1244 * pointer to it.
1245 */
1246h2span_cluster_t *
0c3a8cd0 1247dmsg_cluster_get(uuid_t *pfs_clid)
90e8cd1d
MD
1248{
1249 h2span_cluster_t dummy_cls;
1250 h2span_cluster_t *cls;
1251
1252 dummy_cls.pfs_clid = *pfs_clid;
1253 pthread_mutex_lock(&cluster_mtx);
1254 cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
1255 if (cls)
1256 ++cls->refs;
1257 pthread_mutex_unlock(&cluster_mtx);
1258 return (cls);
1259}
1260
1261void
0c3a8cd0 1262dmsg_cluster_put(h2span_cluster_t *cls)
90e8cd1d
MD
1263{
1264 pthread_mutex_lock(&cluster_mtx);
1265 assert(cls->refs > 0);
1266 --cls->refs;
1267 if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
1268 RB_REMOVE(h2span_cluster_tree,
1269 &cluster_tree, cls);
0c3a8cd0 1270 dmsg_free(cls);
90e8cd1d
MD
1271 }
1272 pthread_mutex_unlock(&cluster_mtx);
1273}
1274
1275/*
1276 * Obtain a stable handle to a specific cluster node given its uuid.
1277 * This handle does NOT lock in the route to the node and is typically
0c3a8cd0 1278 * used as part of the dmsg_handle_*() API to obtain a set of
90e8cd1d 1279 * stable nodes.
29ead430 1280 */
90e8cd1d 1281h2span_node_t *
0c3a8cd0 1282dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
90e8cd1d
MD
1283{
1284}
1285
1286#endif
29ead430
MD
1287
1288#if 0
1289/*
1290 * Acquire a persistent router structure given the cluster and node ids.
1291 * Messages can be transacted via this structure while held. If the route
1292 * is lost messages will return failure.
1293 */
0c3a8cd0
MD
1294dmsg_router_t *
1295dmsg_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid)
29ead430
MD
1296{
1297}
1298
1299/*
1300 * Release previously acquired router.
1301 */
1302void
0c3a8cd0 1303dmsg_router_put(dmsg_router_t *router)
29ead430
MD
1304{
1305}
1306#endif
1307
81666e1b
MD
1308/*
1309 * Dumps the spanning tree
1310 */
1311void
0c3a8cd0 1312dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused)
81666e1b
MD
1313{
1314 h2span_cluster_t *cls;
1315 h2span_node_t *node;
1316 h2span_link_t *slink;
1317 char *uustr = NULL;
1318
1319 pthread_mutex_lock(&cluster_mtx);
1320 RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
ddfbb283
MD
1321 dmsg_router_printf(router, "Cluster %s (%s)\n",
1322 dmsg_uuid_to_str(&cls->pfs_clid, &uustr),
1323 cls->cl_label);
81666e1b 1324 RB_FOREACH(node, h2span_node_tree, &cls->tree) {
0c3a8cd0
MD
1325 dmsg_router_printf(router, " Node %s (%s)\n",
1326 dmsg_uuid_to_str(&node->pfs_fsid, &uustr),
ddfbb283 1327 node->fs_label);
81666e1b 1328 RB_FOREACH(slink, h2span_link_tree, &node->tree) {
0c3a8cd0
MD
1329 dmsg_router_printf(router,
1330 "\tLink dist=%d via %d\n",
1331 slink->dist,
1332 slink->state->iocom->sock_fd);
81666e1b
MD
1333 }
1334 }
1335 }
1336 pthread_mutex_unlock(&cluster_mtx);
1337 if (uustr)
1338 free(uustr);
1339#if 0
1340 TAILQ_FOREACH(conn, &connq, entry) {
1341 }
1342#endif
1343}