hammer2 - Refactor frontend part 14/many
[dragonfly.git] / sys / vfs / hammer2 / hammer2_thread.c
CommitLineData
5ceaaa82
MD
1/*
2 * Copyright (c) 2015 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 */
34/*
35 * This module implements various PFS-based helper threads.
36 */
37#include "hammer2.h"
38
2ed4fece 39#define HAMMER2_THREAD_DEBUG 1
0cc33e20 40
2ed4fece 41static int hammer2_sync_slaves(hammer2_thread_t *thr,
8db69c9f 42 hammer2_cluster_t *cparent, int *errors);
2ed4fece 43static void hammer2_update_pfs_status(hammer2_thread_t *thr,
b93cc2e0 44 hammer2_cluster_t *cparent);
2ed4fece 45static int hammer2_sync_insert(hammer2_thread_t *thr,
8db69c9f 46 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
0cc33e20 47 hammer2_tid_t modify_tid,
a6cf1052 48 int i, int *errors);
2ed4fece 49static int hammer2_sync_destroy(hammer2_thread_t *thr,
8db69c9f
MD
50 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
51 int i, int *errors);
2ed4fece 52static int hammer2_sync_replace(hammer2_thread_t *thr,
8db69c9f 53 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
0cc33e20 54 hammer2_tid_t modify_tid,
a6cf1052 55 int i, int *errors);
b93cc2e0 56
2ed4fece
MD
57/****************************************************************************
58 * HAMMER2 THREAD API *
59 ****************************************************************************/
5ceaaa82 60/*
2ed4fece 61 * Initialize the suspplied thread structure, starting the specified
5ceaaa82
MD
62 * thread.
63 */
64void
2ed4fece
MD
65hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp,
66 const char *id, int clindex, int repidx,
67 void (*func)(void *arg))
5ceaaa82 68{
2ed4fece 69 lockinit(&thr->lk, "h2thr", 0, 0);
5ceaaa82 70 thr->pmp = pmp;
c7916d0b 71 thr->clindex = clindex;
2ed4fece 72 thr->repidx = repidx;
c847e838
MD
73 TAILQ_INIT(&thr->xopq);
74 if (repidx >= 0) {
75 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
76 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx);
77 } else {
78 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
79 "%s-%s", id, pmp->pfs_names[clindex]);
80 }
5ceaaa82
MD
81}
82
83/*
2ed4fece 84 * Terminate a thread. This function will silently return if the thread
5ceaaa82
MD
85 * was never initialized or has already been deleted.
86 *
87 * This is accomplished by setting the STOP flag and waiting for the td
88 * structure to become NULL.
89 */
90void
2ed4fece 91hammer2_thr_delete(hammer2_thread_t *thr)
5ceaaa82
MD
92{
93 if (thr->td == NULL)
94 return;
95 lockmgr(&thr->lk, LK_EXCLUSIVE);
2ed4fece 96 atomic_set_int(&thr->flags, HAMMER2_THREAD_STOP);
5ceaaa82
MD
97 wakeup(&thr->flags);
98 while (thr->td) {
99 lksleep(thr, &thr->lk, 0, "h2thr", hz);
100 }
101 lockmgr(&thr->lk, LK_RELEASE);
102 thr->pmp = NULL;
103 lockuninit(&thr->lk);
104}
105
b8ba9690
MD
106/*
107 * Asynchronous remaster request. Ask the synchronization thread to
108 * start over soon (as if it were frozen and unfrozen, but without waiting).
109 * The thread always recalculates mastership relationships when restarting.
110 */
5ceaaa82 111void
2ed4fece 112hammer2_thr_remaster(hammer2_thread_t *thr)
5ceaaa82
MD
113{
114 if (thr->td == NULL)
115 return;
116 lockmgr(&thr->lk, LK_EXCLUSIVE);
2ed4fece 117 atomic_set_int(&thr->flags, HAMMER2_THREAD_REMASTER);
5ceaaa82
MD
118 wakeup(&thr->flags);
119 lockmgr(&thr->lk, LK_RELEASE);
120}
121
122void
2ed4fece
MD
123hammer2_thr_freeze_async(hammer2_thread_t *thr)
124{
125 atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
126 wakeup(&thr->flags);
127}
128
129void
130hammer2_thr_freeze(hammer2_thread_t *thr)
5ceaaa82
MD
131{
132 if (thr->td == NULL)
133 return;
134 lockmgr(&thr->lk, LK_EXCLUSIVE);
2ed4fece 135 atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
5ceaaa82 136 wakeup(&thr->flags);
2ed4fece 137 while ((thr->flags & HAMMER2_THREAD_FROZEN) == 0) {
5ceaaa82
MD
138 lksleep(thr, &thr->lk, 0, "h2frz", hz);
139 }
140 lockmgr(&thr->lk, LK_RELEASE);
141}
142
143void
2ed4fece 144hammer2_thr_unfreeze(hammer2_thread_t *thr)
5ceaaa82
MD
145{
146 if (thr->td == NULL)
147 return;
148 lockmgr(&thr->lk, LK_EXCLUSIVE);
2ed4fece 149 atomic_clear_int(&thr->flags, HAMMER2_THREAD_FROZEN);
5ceaaa82
MD
150 wakeup(&thr->flags);
151 lockmgr(&thr->lk, LK_RELEASE);
152}
153
2ed4fece
MD
154/****************************************************************************
155 * HAMMER2 SYNC THREADS *
156 ****************************************************************************/
5ceaaa82 157/*
c7916d0b
MD
158 * Primary management thread for an element of a node. A thread will exist
159 * for each element requiring management.
160 *
161 * No management threads are needed for the SPMP or for any PMP with only
162 * a single MASTER.
b8ba9690
MD
163 *
164 * On the SPMP - handles bulkfree and dedup operations
165 * On a PFS - handles remastering and synchronization
5ceaaa82
MD
166 */
167void
2ed4fece 168hammer2_primary_sync_thread(void *arg)
5ceaaa82 169{
2ed4fece 170 hammer2_thread_t *thr = arg;
b93cc2e0 171 hammer2_cluster_t *cparent;
a6cf1052 172 hammer2_chain_t *chain;
b93cc2e0 173 hammer2_pfs_t *pmp;
8db69c9f
MD
174 int errors[HAMMER2_MAXCLUSTER];
175 int error;
b93cc2e0
MD
176
177 pmp = thr->pmp;
5ceaaa82
MD
178
179 lockmgr(&thr->lk, LK_EXCLUSIVE);
2ed4fece 180 while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
5ceaaa82
MD
181 /*
182 * Handle freeze request
183 */
2ed4fece
MD
184 if (thr->flags & HAMMER2_THREAD_FREEZE) {
185 atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
186 atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
5ceaaa82
MD
187 }
188
189 /*
190 * Force idle if frozen until unfrozen or stopped.
191 */
2ed4fece 192 if (thr->flags & HAMMER2_THREAD_FROZEN) {
c7916d0b 193 lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
5ceaaa82
MD
194 continue;
195 }
b8ba9690 196
b93cc2e0
MD
197 /*
198 * Reset state on REMASTER request
199 */
2ed4fece
MD
200 if (thr->flags & HAMMER2_THREAD_REMASTER) {
201 atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
b8ba9690
MD
202 /* reset state */
203 }
b93cc2e0
MD
204
205 /*
206 * Synchronization scan.
207 */
c603b86b 208 hammer2_trans_init(pmp, 0);
159c3ca2
MD
209 hammer2_inode_lock(pmp->iroot, HAMMER2_RESOLVE_ALWAYS);
210 cparent = hammer2_inode_cluster(pmp->iroot,
211 HAMMER2_RESOLVE_ALWAYS);
8db69c9f 212 hammer2_update_pfs_status(thr, cparent);
e513e77e 213 hammer2_inode_unlock(pmp->iroot, NULL);
8db69c9f 214 bzero(errors, sizeof(errors));
a6cf1052
MD
215 kprintf("sync_slaves clindex %d\n", thr->clindex);
216
0057d3b8
MD
217 /*
218 * We are the syncer, not a normal frontend operator,
219 * so force cparent good to prime the scan.
220 */
221 hammer2_cluster_forcegood(cparent);
8db69c9f
MD
222 error = hammer2_sync_slaves(thr, cparent, errors);
223 if (error)
224 kprintf("hammer2_sync_slaves: error %d\n", error);
a6cf1052
MD
225 chain = cparent->array[thr->clindex].chain;
226
227 /*
228 * Retain chain for our node and release the cluster.
229 */
230 hammer2_chain_ref(chain);
231 hammer2_chain_lock(chain, HAMMER2_RESOLVE_ALWAYS);
e513e77e
MD
232 hammer2_cluster_unlock(cparent);
233 hammer2_cluster_drop(cparent);
a6cf1052
MD
234
235 /*
236 * Flush the chain.
237 */
c603b86b 238 hammer2_flush(chain, 1);
a6cf1052
MD
239 hammer2_chain_unlock(chain);
240 hammer2_chain_drop(chain);
241
c603b86b 242 hammer2_trans_done(pmp);
b93cc2e0
MD
243
244 /*
245 * Wait for event, or 5-second poll.
246 */
247 lksleep(&thr->flags, &thr->lk, 0, "h2idle", hz * 5);
5ceaaa82
MD
248 }
249 thr->td = NULL;
250 wakeup(thr);
251 lockmgr(&thr->lk, LK_RELEASE);
252 /* thr structure can go invalid after this point */
253}
b93cc2e0
MD
254
255/*
256 * Given a locked cluster created from pmp->iroot, update the PFS's
257 * reporting status.
258 */
259static
260void
2ed4fece 261hammer2_update_pfs_status(hammer2_thread_t *thr, hammer2_cluster_t *cparent)
b93cc2e0 262{
8db69c9f 263 hammer2_pfs_t *pmp = thr->pmp;
b93cc2e0
MD
264 uint32_t flags;
265
266 flags = cparent->flags & HAMMER2_CLUSTER_ZFLAGS;
8db69c9f 267 if (pmp->flags == flags)
b93cc2e0 268 return;
8db69c9f 269 pmp->flags = flags;
b93cc2e0
MD
270
271 kprintf("pfs %p", pmp);
272 if (flags & HAMMER2_CLUSTER_MSYNCED)
273 kprintf(" masters-all-good");
274 if (flags & HAMMER2_CLUSTER_SSYNCED)
275 kprintf(" slaves-all-good");
276
277 if (flags & HAMMER2_CLUSTER_WRHARD)
278 kprintf(" quorum/rw");
279 else if (flags & HAMMER2_CLUSTER_RDHARD)
280 kprintf(" quorum/ro");
281
282 if (flags & HAMMER2_CLUSTER_UNHARD)
283 kprintf(" out-of-sync-masters");
284 else if (flags & HAMMER2_CLUSTER_NOHARD)
285 kprintf(" no-masters-visible");
286
287 if (flags & HAMMER2_CLUSTER_WRSOFT)
288 kprintf(" soft/rw");
289 else if (flags & HAMMER2_CLUSTER_RDSOFT)
290 kprintf(" soft/ro");
291
292 if (flags & HAMMER2_CLUSTER_UNSOFT)
293 kprintf(" out-of-sync-slaves");
294 else if (flags & HAMMER2_CLUSTER_NOSOFT)
295 kprintf(" no-slaves-visible");
296 kprintf("\n");
297}
298
0057d3b8
MD
299static
300void
301dumpcluster(const char *label,
302 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster)
303{
304 hammer2_chain_t *chain;
305 int i;
306
307 if ((hammer2_debug & 1) == 0)
308 return;
309
310 kprintf("%s\t", label);
311 KKASSERT(cparent->nchains == cluster->nchains);
312 for (i = 0; i < cparent->nchains; ++i) {
313 if (i)
314 kprintf("\t");
315 kprintf("%d ", i);
316 if ((chain = cparent->array[i].chain) != NULL) {
317 kprintf("%016jx%s ",
318 chain->bref.key,
319 ((cparent->array[i].flags &
320 HAMMER2_CITEM_INVALID) ? "(I)" : " ")
321 );
322 } else {
323 kprintf(" NULL %s ", " ");
324 }
325 if ((chain = cluster->array[i].chain) != NULL) {
326 kprintf("%016jx%s ",
327 chain->bref.key,
328 ((cluster->array[i].flags &
329 HAMMER2_CITEM_INVALID) ? "(I)" : " ")
330 );
331 } else {
332 kprintf(" NULL %s ", " ");
333 }
334 kprintf("\n");
335 }
336}
337
b93cc2e0 338/*
8db69c9f
MD
339 * TODO - have cparent use a shared lock normally instead of exclusive,
340 * (needs to be upgraded for slave adjustments).
b93cc2e0
MD
341 */
342static
8db69c9f 343int
2ed4fece 344hammer2_sync_slaves(hammer2_thread_t *thr, hammer2_cluster_t *cparent,
8db69c9f 345 int *errors)
b93cc2e0 346{
8db69c9f
MD
347 hammer2_pfs_t *pmp;
348 hammer2_cluster_t *cluster;
349 hammer2_cluster_t *scluster;
e513e77e 350 hammer2_chain_t *focus;
8db69c9f
MD
351 hammer2_chain_t *chain;
352 hammer2_key_t key_next;
353 int error;
354 int nerror;
c7916d0b 355 int idx;
8db69c9f 356 int n;
0cc33e20 357 int nowork;
e513e77e 358 int dorecursion;
8db69c9f
MD
359
360 pmp = thr->pmp;
c7916d0b 361 idx = thr->clindex; /* cluster node we are responsible for */
8db69c9f
MD
362
363 /*
364 * Nothing to do if all slaves are synchronized.
365 * Nothing to do if cluster not authoritatively readable.
366 */
e513e77e 367 if (pmp->flags & HAMMER2_CLUSTER_SSYNCED)
8db69c9f 368 return(0);
e513e77e 369 if ((pmp->flags & HAMMER2_CLUSTER_RDHARD) == 0)
8db69c9f 370 return(HAMMER2_ERROR_INCOMPLETE);
8db69c9f
MD
371
372 error = 0;
373
374 /*
375 * XXX snapshot the source to provide a stable source to copy.
376 */
377
378 /*
379 * Update all local slaves (remote slaves are handled by the sync
380 * threads on their respective hosts).
381 *
382 * Do a full topology scan, insert/delete elements on slaves as
383 * needed. cparent must be ref'd so we can unlock and relock it
384 * on the recursion.
a6cf1052
MD
385 *
386 * ALLNODES - Allows clusters with a NULL focus to be returned if
387 * elements remain on other nodes.
8db69c9f
MD
388 */
389 hammer2_cluster_ref(cparent);
390 cluster = hammer2_cluster_lookup(cparent, &key_next,
391 HAMMER2_KEY_MIN, HAMMER2_KEY_MAX,
392 HAMMER2_LOOKUP_NODATA |
f7712c43 393 HAMMER2_LOOKUP_NOLOCK |
a6cf1052
MD
394 HAMMER2_LOOKUP_NODIRECT |
395 HAMMER2_LOOKUP_ALLNODES);
0057d3b8 396 dumpcluster("lookup", cparent, cluster);
8db69c9f
MD
397
398 /*
399 * Scan elements
400 */
401 while (cluster) {
0cc33e20
MD
402 /*
403 * nowork is adjusted during the loop,
404 * dorecursion is calculated here.
405 */
406 nowork = 1;
407 focus = cluster->focus;
408 if (focus && focus->bref.type == HAMMER2_BREF_TYPE_INODE)
409 dorecursion = 1;
410 else
411 dorecursion = 0;
412
a6cf1052
MD
413 if (idx == 3 && (hammer2_debug & 1) && focus)
414 kprintf("scan3 focus %d.%016jx %d.%016jx\n",
415 (cparent ? cparent->focus->bref.type : 0xFF),
416 (cparent ? cparent->focus->bref.key : (uintmax_t)-1LLU),
417 focus->bref.type, focus->bref.key);
c7916d0b 418repeat1:
0cc33e20
MD
419 /*
420 * Synchronize chains to focus
421 */
c7916d0b
MD
422 if (idx >= cluster->nchains)
423 goto skip1;
424 chain = cluster->array[idx].chain;
a6cf1052
MD
425 if (idx == 3 && (hammer2_debug & 1) && chain)
426 kprintf("scan3 slave %d.%016jx %d.%016jx\n",
427 ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.type : 0xFF),
428 ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.key : (uintmax_t)-1LLU),
429 cluster->array[idx].chain->bref.type,
430 cluster->array[idx].chain->bref.key);
431 if (idx == 3 && (hammer2_debug & 1) && chain == NULL)
432 kprintf("scan3 slave %d.%16jx NULL\n",
433 ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.type : 0xFF),
434 ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.key : (uintmax_t)-1LLU)
435 );
c7916d0b
MD
436
437 /*
438 * Disable recursion for this index and loop up
439 * if a chain error is detected.
440 *
441 * A NULL chain is ok, it simply indicates that
442 * the slave reached the end of its scan, but we
443 * might have stuff from the master that still
444 * needs to be copied in.
445 */
446 if (chain && chain->error) {
447 kprintf("chain error index %d: %d\n",
448 idx, chain->error);
449 errors[idx] = chain->error;
450 error = chain->error;
451 cluster->array[idx].flags |= HAMMER2_CITEM_INVALID;
452 goto skip1;
453 }
454
455 /*
456 * Skip if the slave already has the record (everything
457 * matches including the modify_tid). Note that the
458 * mirror_tid does not have to match, mirror_tid is
459 * a per-block-device entity.
460 */
461 if (chain &&
462 (cluster->array[idx].flags & HAMMER2_CITEM_INVALID) == 0) {
463 goto skip1;
464 }
465
466 /*
467 * Invalid element needs to be updated.
468 */
469 nowork = 0;
8db69c9f 470
c7916d0b
MD
471 /*
472 * Otherwise adjust the slave. Compare the focus to
473 * the chain. Note that focus and chain can
474 * independently be NULL.
475 */
476 KKASSERT(cluster->focus == focus);
477 if (focus) {
478 if (chain)
479 n = hammer2_chain_cmp(focus, chain);
480 else
481 n = -1; /* end-of-scan on slave */
482 } else {
483 if (chain)
484 n = 1; /* end-of-scan on focus */
485 else
486 n = 0; /* end-of-scan on both */
487 }
488
489 if (n < 0) {
8db69c9f 490 /*
c7916d0b 491 * slave chain missing, create missing chain.
8db69c9f 492 *
c7916d0b
MD
493 * If we are going to recurse we have to set
494 * the initial modify_tid to 0 until the
495 * sub-tree is completely synchronized.
496 * Setting (n = 0) in this situation forces
497 * the replacement call to run on the way
498 * back up after the sub-tree has
499 * synchronized.
8db69c9f 500 */
c7916d0b
MD
501 if (dorecursion) {
502 nerror = hammer2_sync_insert(
503 thr, cparent, cluster,
504 0,
a6cf1052 505 idx, errors);
c7916d0b
MD
506 if (nerror == 0)
507 n = 0;
508 } else {
509 nerror = hammer2_sync_insert(
510 thr, cparent, cluster,
511 focus->bref.modify_tid,
a6cf1052 512 idx, errors);
8db69c9f 513 }
c7916d0b 514 } else if (n > 0) {
8db69c9f 515 /*
c7916d0b 516 * excess slave chain, destroy
8db69c9f 517 */
c7916d0b
MD
518 nerror = hammer2_sync_destroy(thr,
519 cparent, cluster,
520 idx, errors);
521 hammer2_cluster_next_single_chain(
522 cparent, cluster,
523 &key_next,
524 HAMMER2_KEY_MIN,
525 HAMMER2_KEY_MAX,
526 idx,
527 HAMMER2_LOOKUP_NODATA |
528 HAMMER2_LOOKUP_NOLOCK |
a6cf1052
MD
529 HAMMER2_LOOKUP_NODIRECT |
530 HAMMER2_LOOKUP_ALLNODES);
0cc33e20 531 /*
c7916d0b
MD
532 * Re-execute same index, there might be more
533 * items to delete before this slave catches
534 * up to the focus.
0cc33e20 535 */
c7916d0b
MD
536 goto repeat1;
537 } else {
8db69c9f 538 /*
c7916d0b
MD
539 * Key matched but INVALID was set which likely
540 * means that modify_tid is out of sync.
541 *
542 * If we are going to recurse we have to do
543 * a partial replacement of the parent to
544 * ensure that the block array is compatible.
545 * For example, the current slave inode might
546 * be flagged DIRECTDATA when the focus is not.
547 * We must set modify_tid to 0 for now and
548 * will fix it when recursion is complete.
549 *
550 * If we are not going to recurse we can do
551 * a normal replacement.
552 *
553 * focus && chain can both be NULL on a match.
8db69c9f 554 */
c7916d0b
MD
555 if (dorecursion) {
556 nerror = hammer2_sync_replace(
557 thr, cparent, cluster,
558 0,
a6cf1052 559 idx, errors);
c7916d0b
MD
560 } else if (focus) {
561 nerror = hammer2_sync_replace(
562 thr, cparent, cluster,
563 focus->bref.modify_tid,
a6cf1052 564 idx, errors);
0cc33e20 565 } else {
c7916d0b 566 nerror = 0;
0cc33e20 567 }
0cc33e20 568 }
c7916d0b
MD
569 if (nerror)
570 error = nerror;
0cc33e20
MD
571 /* finished primary synchronization of chains */
572
c7916d0b 573skip1:
0057d3b8 574#if 0
0cc33e20
MD
575 /*
576 * Operation may have modified cparent, we must replace
577 * iroot->cluster if we are at the top level.
578 */
579 if (thr->depth == 0)
a6cf1052 580 hammer2_inode_repoint_one(pmp->iroot, cparent, idx);
0057d3b8 581#endif
0cc33e20
MD
582 KKASSERT(cluster->focus == focus);
583
584 /*
585 * If no work to do this iteration, skip any recursion.
586 */
587 if (nowork)
c7916d0b 588 goto skip2;
0cc33e20
MD
589
590 /*
591 * EXECUTE RECURSION (skip if no recursion)
592 *
593 * Indirect blocks are absorbed by the iteration so we only
594 * have to recurse on inodes.
a6cf1052
MD
595 *
596 * Do not resolve scluster, it represents the iteration
597 * parent and while it is logically in-sync the physical
598 * elements might not match due to the presence of indirect
599 * blocks and such.
0cc33e20
MD
600 */
601 if (dorecursion == 0)
c7916d0b 602 goto skip2;
0cc33e20
MD
603 if (thr->depth > 20) {
604 kprintf("depth limit reached\n");
605 nerror = HAMMER2_ERROR_DEPTH;
606 } else {
607 hammer2_cluster_unlock(cparent);
608 scluster = hammer2_cluster_copy(cluster);
609 hammer2_cluster_lock(scluster, HAMMER2_RESOLVE_ALWAYS);
610 ++thr->depth;
611 nerror = hammer2_sync_slaves(thr, scluster, errors);
612 --thr->depth;
613 hammer2_cluster_unlock(scluster);
614 hammer2_cluster_drop(scluster);
615 /* XXX modify_tid on scluster */
616 /* flush needs to not update modify_tid */
617 hammer2_cluster_lock(cparent, HAMMER2_RESOLVE_ALWAYS);
618 }
619 if (nerror)
c7916d0b 620 goto skip2;
0cc33e20
MD
621
622 /*
623 * Fixup parent nodes on the way back up from the recursion
624 * if no error occurred. The modify_tid for these nodes
625 * would have been set to 0 and must be set to their final
626 * value.
627 */
c7916d0b
MD
628 chain = cluster->array[idx].chain;
629 if (chain == NULL || chain->error)
630 goto skip2;
a6cf1052
MD
631 /*
632 * should not be set but must fixup parents.
c7916d0b
MD
633 if ((cluster->array[idx].flags & HAMMER2_CITEM_INVALID) == 0)
634 goto skip2;
a6cf1052 635 */
e513e77e 636
c7916d0b
MD
637 /*
638 * At this point we have to have key-matched non-NULL
639 * elements.
640 */
641 n = hammer2_chain_cmp(focus, chain);
642 if (n != 0) {
643 kprintf("hammer2_sync_slaves: illegal "
644 "post-recursion state %d\n", n);
645 goto skip2;
8db69c9f 646 }
0cc33e20 647
c7916d0b
MD
648 /*
649 * Update modify_tid on the way back up.
650 */
651 nerror = hammer2_sync_replace(
652 thr, cparent, cluster,
653 focus->bref.modify_tid,
a6cf1052 654 idx, errors);
c7916d0b
MD
655 if (nerror)
656 error = nerror;
657
0057d3b8 658#if 0
0cc33e20
MD
659 /*
660 * Operation may modify cparent, must replace
661 * iroot->cluster if we are at the top level.
662 */
663 if (thr->depth == 0)
a6cf1052 664 hammer2_inode_repoint_one(pmp->iroot, cparent, idx);
0057d3b8 665#endif
0cc33e20 666
c7916d0b 667skip2:
0cc33e20
MD
668 /*
669 * Iterate.
670 */
0057d3b8 671 dumpcluster("adjust", cparent, cluster);
8db69c9f
MD
672 cluster = hammer2_cluster_next(cparent, cluster,
673 &key_next,
674 HAMMER2_KEY_MIN,
675 HAMMER2_KEY_MAX,
676 HAMMER2_LOOKUP_NODATA |
0cc33e20 677 HAMMER2_LOOKUP_NOLOCK |
a6cf1052
MD
678 HAMMER2_LOOKUP_NODIRECT |
679 HAMMER2_LOOKUP_ALLNODES);
0057d3b8 680 dumpcluster("nextcl", cparent, cluster);
8db69c9f
MD
681 }
682 hammer2_cluster_drop(cparent);
683 if (cluster)
684 hammer2_cluster_drop(cluster);
685
686 return error;
687}
688
689/*
690 * cparent is locked exclusively, with an extra ref, cluster is not locked.
691 */
692static
693int
2ed4fece 694hammer2_sync_insert(hammer2_thread_t *thr,
8db69c9f 695 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
a6cf1052 696 hammer2_tid_t modify_tid, int i, int *errors)
8db69c9f
MD
697{
698 hammer2_chain_t *focus;
699 hammer2_chain_t *chain;
e513e77e 700 hammer2_key_t dummy;
8db69c9f
MD
701
702 focus = cluster->focus;
2ed4fece 703#if HAMMER2_THREAD_DEBUG
0cc33e20 704 if (hammer2_debug & 1)
a6cf1052
MD
705 kprintf("insert rec par=%p/%d.%016jx slave %d %d.%016jx mod=%016jx\n",
706 cparent->array[i].chain,
707 cparent->array[i].chain->bref.type,
708 cparent->array[i].chain->bref.key,
0cc33e20 709 i, focus->bref.type, focus->bref.key, modify_tid);
e513e77e 710#endif
8db69c9f 711
e513e77e
MD
712 /*
713 * We have to do a lookup to position ourselves at the correct
714 * parent when inserting a record into a new slave because the
715 * cluster iteration for this slave might not be pointing to the
716 * right place. Our expectation is that the record will not be
717 * found.
718 */
eedd52a3 719 hammer2_cluster_unlock_except(cparent, i);
e513e77e
MD
720 chain = hammer2_chain_lookup(&cparent->array[i].chain, &dummy,
721 focus->bref.key, focus->bref.key,
722 &cparent->array[i].cache_index,
0cc33e20
MD
723 HAMMER2_LOOKUP_NODIRECT);
724 if (cparent->focus_index == i)
725 cparent->focus = cparent->array[i].chain;
e513e77e
MD
726 KKASSERT(chain == NULL);
727
728 /*
729 * Create the missing chain.
0057d3b8
MD
730 *
731 * Have to be careful to avoid deadlocks.
e513e77e 732 */
8db69c9f 733 chain = NULL;
0057d3b8
MD
734 if (cluster->focus_index < i)
735 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
c603b86b 736 hammer2_chain_create(&cparent->array[i].chain,
8db69c9f
MD
737 &chain, thr->pmp,
738 focus->bref.key, focus->bref.keybits,
739 focus->bref.type, focus->bytes,
740 0);
0057d3b8
MD
741 if (cluster->focus_index > i)
742 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
0cc33e20
MD
743 if (cparent->focus_index == i)
744 cparent->focus = cparent->array[i].chain;
c603b86b 745 hammer2_chain_modify(chain, HAMMER2_MODIFY_KEEPMODIFY);
8db69c9f 746
eedd52a3
MD
747 /*
748 * Copy focus to new chain
749 */
8db69c9f
MD
750
751 /* type already set */
752 chain->bref.methods = focus->bref.methods;
753 /* keybits already set */
754 chain->bref.vradix = focus->bref.vradix;
e513e77e
MD
755 /* mirror_tid set by flush */
756 chain->bref.modify_tid = modify_tid;
8db69c9f
MD
757 chain->bref.flags = focus->bref.flags;
758 /* key already present */
759 /* check code will be recalculated */
760
761 /*
762 * Copy data body.
763 */
764 switch(chain->bref.type) {
765 case HAMMER2_BREF_TYPE_INODE:
b0f58de8 766 if ((focus->data->ipdata.meta.op_flags &
8db69c9f
MD
767 HAMMER2_OPFLAG_DIRECTDATA) == 0) {
768 bcopy(focus->data, chain->data,
769 offsetof(hammer2_inode_data_t, u));
770 break;
771 }
772 /* fall through */
773 case HAMMER2_BREF_TYPE_DATA:
774 bcopy(focus->data, chain->data, chain->bytes);
0cc33e20 775 hammer2_chain_setcheck(chain, chain->data);
8db69c9f
MD
776 break;
777 default:
778 KKASSERT(0);
779 break;
780 }
781
782 hammer2_chain_unlock(focus);
e513e77e 783 hammer2_chain_unlock(chain); /* unlock, leave ref */
0cc33e20 784
0057d3b8
MD
785 /*
786 * Avoid ordering deadlock when relocking cparent.
787 */
788 if (i == 0) {
789 hammer2_cluster_lock_except(cparent, i, HAMMER2_RESOLVE_ALWAYS);
790 } else {
791 hammer2_chain_unlock(cparent->array[i].chain);
792 hammer2_cluster_lock(cparent, HAMMER2_RESOLVE_ALWAYS);
793 }
794
0cc33e20 795 /*
eedd52a3 796 * Enter item into (unlocked) cluster.
a6cf1052
MD
797 *
798 * Must clear invalid for iteration to work properly.
0cc33e20 799 */
0057d3b8
MD
800 if (cluster->array[i].chain)
801 hammer2_chain_drop(cluster->array[i].chain);
a6cf1052
MD
802 cluster->array[i].chain = chain;
803 cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
8db69c9f
MD
804
805 return 0;
806}
807
808/*
809 * cparent is locked exclusively, with an extra ref, cluster is not locked.
810 */
811static
812int
2ed4fece 813hammer2_sync_destroy(hammer2_thread_t *thr,
8db69c9f
MD
814 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
815 int i, int *errors)
816{
817 hammer2_chain_t *chain;
818
819 chain = cluster->array[i].chain;
2ed4fece 820#if HAMMER2_THREAD_DEBUG
0cc33e20
MD
821 if (hammer2_debug & 1)
822 kprintf("destroy rec %p/%p slave %d %d.%016jx\n",
823 cparent, cluster,
8db69c9f 824 i, chain->bref.type, chain->bref.key);
e513e77e 825#endif
a6cf1052
MD
826 /*
827 * Try to avoid unnecessary I/O.
828 *
829 * XXX accounting not propagated up properly. We might have to do
830 * a RESOLVE_MAYBE here and pass 0 for the flags.
831 */
8db69c9f 832 hammer2_chain_lock(chain, HAMMER2_RESOLVE_NEVER);
c603b86b 833 hammer2_chain_delete(cparent->array[i].chain, chain,
a6cf1052
MD
834 HAMMER2_DELETE_NOSTATS |
835 HAMMER2_DELETE_PERMANENT);
8db69c9f 836 hammer2_chain_unlock(chain);
a6cf1052
MD
837
838 /*
839 * The element is not valid in that it doesn't match the other
840 * elements, but we have to mark it valid here to allow the
841 * cluster_next() call to advance this index to the next element.
842 */
843 cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
8db69c9f
MD
844
845 return 0;
846}
847
848/*
849 * cparent is locked exclusively, with an extra ref, cluster is not locked.
a6cf1052 850 * Replace element [i] in the cluster.
8db69c9f
MD
851 */
852static
853int
2ed4fece 854hammer2_sync_replace(hammer2_thread_t *thr,
8db69c9f 855 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
a6cf1052 856 hammer2_tid_t modify_tid, int i, int *errors)
8db69c9f
MD
857{
858 hammer2_chain_t *focus;
859 hammer2_chain_t *chain;
860 int nradix;
0cc33e20 861 uint8_t otype;
8db69c9f
MD
862
863 focus = cluster->focus;
864 chain = cluster->array[i].chain;
2ed4fece 865#if HAMMER2_THREAD_DEBUG
0cc33e20
MD
866 if (hammer2_debug & 1)
867 kprintf("replace rec %p/%p slave %d %d.%016jx mod=%016jx\n",
868 cparent, cluster,
869 i, focus->bref.type, focus->bref.key, modify_tid);
e513e77e 870#endif
8db69c9f
MD
871 if (cluster->focus_index < i)
872 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
873 hammer2_chain_lock(chain, HAMMER2_RESOLVE_ALWAYS);
874 if (cluster->focus_index >= i)
875 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
876 if (chain->bytes != focus->bytes) {
877 /* XXX what if compressed? */
878 nradix = hammer2_getradix(chain->bytes);
c603b86b 879 hammer2_chain_resize(NULL, cparent->array[i].chain, chain,
8db69c9f
MD
880 nradix, 0);
881 }
c603b86b 882 hammer2_chain_modify(chain, HAMMER2_MODIFY_KEEPMODIFY);
0cc33e20 883 otype = chain->bref.type;
8db69c9f
MD
884 chain->bref.type = focus->bref.type;
885 chain->bref.methods = focus->bref.methods;
886 chain->bref.keybits = focus->bref.keybits;
887 chain->bref.vradix = focus->bref.vradix;
e513e77e 888 /* mirror_tid updated by flush */
0cc33e20 889 chain->bref.modify_tid = modify_tid;
8db69c9f
MD
890 chain->bref.flags = focus->bref.flags;
891 /* key already present */
892 /* check code will be recalculated */
0cc33e20 893 chain->error = 0;
8db69c9f
MD
894
895 /*
896 * Copy data body.
897 */
898 switch(chain->bref.type) {
899 case HAMMER2_BREF_TYPE_INODE:
b0f58de8 900 if ((focus->data->ipdata.meta.op_flags &
8db69c9f 901 HAMMER2_OPFLAG_DIRECTDATA) == 0) {
0cc33e20
MD
902 /*
903 * If DIRECTDATA is transitioning to 0 or the old
904 * chain is not an inode we have to initialize
905 * the block table.
906 */
907 if (otype != HAMMER2_BREF_TYPE_INODE ||
b0f58de8 908 (chain->data->ipdata.meta.op_flags &
0cc33e20 909 HAMMER2_OPFLAG_DIRECTDATA)) {
2ed4fece 910 kprintf("chain inode trans away from dd\n");
0cc33e20
MD
911 bzero(&chain->data->ipdata.u,
912 sizeof(chain->data->ipdata.u));
913 }
8db69c9f
MD
914 bcopy(focus->data, chain->data,
915 offsetof(hammer2_inode_data_t, u));
0cc33e20
MD
916 /* XXX setcheck on inode should not be needed */
917 hammer2_chain_setcheck(chain, chain->data);
8db69c9f
MD
918 break;
919 }
920 /* fall through */
921 case HAMMER2_BREF_TYPE_DATA:
922 bcopy(focus->data, chain->data, chain->bytes);
0cc33e20 923 hammer2_chain_setcheck(chain, chain->data);
8db69c9f
MD
924 break;
925 default:
926 KKASSERT(0);
927 break;
928 }
929
930 hammer2_chain_unlock(focus);
931 hammer2_chain_unlock(chain);
b93cc2e0 932
0cc33e20 933 /*
a6cf1052 934 * Must clear invalid for iteration to work properly.
0cc33e20 935 */
a6cf1052 936 cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
b93cc2e0 937
8db69c9f 938 return 0;
b93cc2e0 939}
2ed4fece
MD
940
941/****************************************************************************
942 * HAMMER2 XOPS THREADS *
943 ****************************************************************************/
944
945void
946hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp)
947{
948 hammer2_mtx_init(&xgrp->mtx, "h2xopq");
c603b86b 949 hammer2_mtx_init(&xgrp->mtx2, "h2xopio");
c847e838
MD
950}
951
952/*
953 * Allocate a XOP request.
954 *
955 * Once allocated a XOP request can be started, collected, and retired,
956 * and can be retired early if desired.
957 *
958 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get().
959 */
960hammer2_xop_t *
c603b86b 961hammer2_xop_alloc(hammer2_inode_t *ip)
c847e838
MD
962{
963 hammer2_xop_t *xop;
964
965 xop = objcache_get(cache_xops, M_WAITOK);
12ff971c 966 KKASSERT(xop->head.cluster.array[0].chain == NULL);
c847e838 967 xop->head.ip = ip;
c603b86b 968 xop->head.func = NULL;
c847e838
MD
969 xop->head.state = 0;
970 xop->head.error = 0;
971 xop->head.lkey = 0;
972 xop->head.nkey = 0;
973
974 xop->head.cluster.nchains = ip->cluster.nchains;
975 xop->head.cluster.pmp = ip->pmp;
976 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED;
977
978 /*
979 * run_mask - Active thread (or frontend) associated with XOP
980 */
981 xop->head.run_mask = HAMMER2_XOPMASK_VOP;
982
983 hammer2_inode_ref(ip);
984
985 return xop;
986}
987
e12ae3a5
MD
988void
989hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len)
990{
991 xop->name = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO);
992 xop->name_len = name_len;
993 bcopy(name, xop->name, name_len);
994}
995
996void
997hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len)
998{
999 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO);
1000 xop->name2_len = name_len;
1001 bcopy(name, xop->name2, name_len);
1002}
1003
1004
1005void
1006hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2)
1007{
1008 xop->ip2 = ip2;
1009 hammer2_inode_ref(ip2);
1010}
1011
1012void
1013hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3)
1014{
1015 xop->ip3 = ip3;
1016 hammer2_inode_ref(ip3);
1017}
1018
c603b86b
MD
1019void
1020hammer2_xop_reinit(hammer2_xop_head_t *xop)
1021{
1022 xop->state = 0;
1023 xop->error = 0;
1024 xop->lkey = 0;
1025 xop->nkey = 0;
1026 xop->run_mask = HAMMER2_XOPMASK_VOP;
1027}
1028
c847e838
MD
1029/*
1030 * A mounted PFS needs Xops threads to support frontend operations.
1031 */
1032void
1033hammer2_xop_helper_create(hammer2_pfs_t *pmp)
1034{
1035 int i;
1036 int j;
1037
c847e838
MD
1038 for (i = 0; i < pmp->pfs_nmasters; ++i) {
1039 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
1040 if (pmp->xop_groups[j].thrs[i].td)
1041 continue;
1042 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], pmp,
1043 "h2xop", i, j,
1044 hammer2_primary_xops_thread);
1045 }
1046 }
1047}
1048
1049void
1050hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp)
1051{
1052 int i;
1053 int j;
1054
1055 for (i = 0; i < pmp->pfs_nmasters; ++i) {
1056 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
1057 if (pmp->xop_groups[j].thrs[i].td)
1058 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]);
1059 }
1060 }
1061}
1062
1063
1064
1065
1066/*
1067 * Start a XOP request, queueing it to all nodes in the cluster to
1068 * execute the cluster op.
1069 *
1070 * XXX optimize single-target case.
1071 */
1072void
c603b86b 1073hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_func_t func)
c847e838
MD
1074{
1075 hammer2_xop_group_t *xgrp;
1076 hammer2_thread_t *thr;
1077 hammer2_pfs_t *pmp;
1078 int g;
1079 int i;
1080
1081 pmp = xop->ip->pmp;
1082
1083 g = pmp->xop_iterator++;
1084 g = g & HAMMER2_XOPGROUPS_MASK;
1085 xgrp = &pmp->xop_groups[g];
c603b86b 1086 xop->func = func;
c847e838
MD
1087 xop->xgrp = xgrp;
1088
1089 for (i = 0; i < xop->ip->cluster.nchains; ++i) {
1090 thr = &xgrp->thrs[i];
1091 if (thr->td) {
1092 lockmgr(&thr->lk, LK_EXCLUSIVE);
1093 if (thr->td &&
1094 (thr->flags & HAMMER2_THREAD_STOP) == 0) {
1095 atomic_set_int(&xop->run_mask, 1U << i);
1096 TAILQ_INSERT_TAIL(&thr->xopq, xop,
1097 collect[i].entry);
1098 }
1099 lockmgr(&thr->lk, LK_RELEASE);
1100 wakeup(&thr->flags);
1101 }
1102 }
1103}
1104
1105/*
1106 * Retire a XOP. Used by both the VOP frontend and by the XOP backend.
1107 */
1108void
1109hammer2_xop_retire(hammer2_xop_head_t *xop, uint32_t mask)
1110{
1111 hammer2_xop_group_t *xgrp;
1112 hammer2_chain_t *chain;
1113 int i;
1114
1115 xgrp = xop->xgrp;
1116
1117 /*
1118 * Remove the frontend or remove a backend feeder. When removing
1119 * the frontend we must wakeup any backend feeders who are waiting
1120 * for FIFO space.
1121 *
1122 * XXX optimize wakeup.
1123 */
1124 KKASSERT(xop->run_mask & mask);
1125 if (atomic_fetchadd_int(&xop->run_mask, -mask) != mask) {
1126 if (mask == HAMMER2_XOPMASK_VOP)
1127 wakeup(xop);
1128 return;
1129 }
1130
1131 /*
1132 * Cleanup the collection cluster.
1133 */
1134 for (i = 0; i < xop->cluster.nchains; ++i) {
1135 xop->cluster.array[i].flags = 0;
1136 chain = xop->cluster.array[i].chain;
1137 if (chain) {
1138 xop->cluster.array[i].chain = NULL;
1139 hammer2_chain_unlock(chain);
1140 hammer2_chain_drop(chain);
1141 }
1142 }
1143
1144 /*
1145 * Cleanup the fifos, use check_counter to optimize the loop.
1146 */
1147 mask = xop->chk_mask;
1148 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) {
1149 hammer2_xop_fifo_t *fifo = &xop->collect[i];
1150 while (fifo->ri != fifo->wi) {
1151 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1152 if (chain) {
1153 hammer2_chain_unlock(chain);
1154 hammer2_chain_drop(chain);
1155 }
1156 ++fifo->ri;
1157 if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1158 wakeup(xop); /* XXX optimize */
1159 }
1160 mask &= ~(1U << i);
1161 }
1162
1163 /*
1164 * The inode is only held at this point, simply drop it.
1165 */
1166 if (xop->ip) {
1167 hammer2_inode_drop(xop->ip);
1168 xop->ip = NULL;
1169 }
e12ae3a5
MD
1170 if (xop->ip2) {
1171 hammer2_inode_drop(xop->ip2);
1172 xop->ip2 = NULL;
1173 }
1174 if (xop->ip3) {
1175 hammer2_inode_drop(xop->ip3);
1176 xop->ip3 = NULL;
1177 }
12ff971c
MD
1178 if (xop->name) {
1179 kfree(xop->name, M_HAMMER2);
1180 xop->name = NULL;
1181 xop->name_len = 0;
1182 }
e12ae3a5
MD
1183 if (xop->name2) {
1184 kfree(xop->name2, M_HAMMER2);
1185 xop->name2 = NULL;
1186 xop->name2_len = 0;
1187 }
c847e838
MD
1188
1189 objcache_put(cache_xops, xop);
1190}
1191
1192/*
1193 * (Backend) Returns non-zero if the frontend is still attached.
1194 */
1195int
1196hammer2_xop_active(hammer2_xop_head_t *xop)
1197{
1198 if (xop->run_mask & HAMMER2_XOPMASK_VOP)
1199 return 1;
1200 else
1201 return 0;
1202}
1203
1204/*
1205 * (Backend) Feed chain data through the cluster validator and back to
1206 * the frontend. Chains are fed from multiple nodes concurrently
1207 * and pipelined via per-node FIFOs in the XOP.
1208 *
1209 * No xop lock is needed because we are only manipulating fields under
1210 * our direct control.
1211 *
1212 * Returns 0 on success and a hammer error code if sync is permanently
c603b86b
MD
1213 * lost. The caller retains a ref on the chain but by convention
1214 * the lock is typically inherited by the xop (caller loses lock).
1215 *
1216 * Returns non-zero on error. In this situation the caller retains a
1217 * ref on the chain but loses the lock (we unlock here).
1218 *
1219 * WARNING! The chain is moving between two different threads, it must
1220 * be locked SHARED, not exclusive.
c847e838
MD
1221 */
1222int
1223hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain,
1224 int clindex, int error)
1225{
1226 hammer2_xop_fifo_t *fifo;
1227
1228 /*
1229 * Multi-threaded entry into the XOP collector. We own the
1230 * fifo->wi for our clindex.
1231 */
1232 fifo = &xop->collect[clindex];
1233
1234 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1235 tsleep_interlock(xop, 0);
1236 if (hammer2_xop_active(xop) == 0) {
1237 error = EINTR;
1238 goto done;
1239 }
1240 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1241 tsleep(xop, PINTERLOCKED, "h2feed", hz*60);
1242 }
1243 }
1244 if (chain)
1245 hammer2_chain_ref(chain);
1246 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error;
1247 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain;
1248 cpu_sfence();
1249 ++fifo->wi;
1250 atomic_set_int(&xop->chk_mask, 1U << clindex);
1251 atomic_add_int(&xop->check_counter, 1);
1252 wakeup(&xop->check_counter); /* XXX optimize */
1253 error = 0;
c603b86b
MD
1254
1255 /*
1256 * Cleanup. If an error occurred we eat the lock. If no error
1257 * occurred the fifo inherits the lock and gains an additional ref.
1258 *
1259 * The caller's ref remains in both cases.
1260 */
c847e838 1261done:
c603b86b
MD
1262 if (error && chain)
1263 hammer2_chain_unlock(chain);
c847e838
MD
1264 return error;
1265}
1266
1267/*
1268 * (Frontend) collect a response from a running cluster op.
1269 *
1270 * Responses are fed from all appropriate nodes concurrently
1271 * and collected into a cohesive response >= nkey. lkey is
1272 * then set to nkey and nkey is advanced prior to return.
1273 * The caller may depend on xop->lkey reflecting the current
1274 * key of the returned response.
1275 *
1276 * The collector will return the instant quorum or other requirements
1277 * are met, even if some nodes get behind or become non-responsive.
1278 *
1279 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection,
1280 * usually called synchronously from the
1281 * node XOPs for the strategy code to
1282 * fake the frontend collection and complete
1283 * the BIO as soon as possible.
1284 *
1285 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular
1286 * cluster index, prevents looping when that
1287 * index is out of sync so caller can act on
1288 * the out of sync element. ESRCH and EDEADLK
1289 * can be returned if this flag is specified.
1290 *
1291 * Returns 0 on success plus a filled out xop->cluster structure.
1292 * Return ENOENT on normal termination.
1293 * Otherwise return an error.
1294 */
1295int
c603b86b 1296hammer2_xop_collect(hammer2_xop_head_t *xop, int flags)
c847e838
MD
1297{
1298 hammer2_xop_fifo_t *fifo;
1299 hammer2_chain_t *chain;
1300 hammer2_key_t lokey;
1301 int error;
1302 int keynull;
1303 int adv; /* advance the element */
1304 int i;
1305 uint32_t check_counter;
1306
1307loop:
1308 /*
1309 * First loop tries to advance pieces of the cluster which
1310 * are out of sync.
1311 */
1312 lokey = HAMMER2_KEY_MAX;
1313 keynull = HAMMER2_CHECK_NULL;
1314 check_counter = xop->check_counter;
1315 cpu_lfence();
1316
1317 for (i = 0; i < xop->cluster.nchains; ++i) {
1318 chain = xop->cluster.array[i].chain;
1319 if (chain == NULL) {
1320 adv = 1;
1321 } else if (chain->bref.key < xop->nkey) {
1322 adv = 1;
1323 } else {
1324 keynull &= ~HAMMER2_CHECK_NULL;
1325 if (lokey > chain->bref.key)
1326 lokey = chain->bref.key;
1327 adv = 0;
1328 }
1329 if (adv == 0)
1330 continue;
1331
1332 /*
1333 * Advance element if possible, advanced element may be NULL.
1334 */
1335 if (chain) {
1336 hammer2_chain_unlock(chain);
1337 hammer2_chain_drop(chain);
1338 }
1339 fifo = &xop->collect[i];
1340 if (fifo->ri != fifo->wi) {
1341 cpu_lfence();
1342 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1343 ++fifo->ri;
1344 xop->cluster.array[i].chain = chain;
1345 if (chain == NULL) {
1346 xop->cluster.array[i].flags |=
1347 HAMMER2_CITEM_NULL;
1348 }
1349 if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1350 wakeup(xop); /* XXX optimize */
1351 --i; /* loop on same index */
1352 } else {
1353 /*
1354 * Retain CITEM_NULL flag. If set just repeat EOF.
1355 * If not, the NULL,0 combination indicates an
1356 * operation in-progress.
1357 */
1358 xop->cluster.array[i].chain = NULL;
1359 /* retain any CITEM_NULL setting */
1360 }
1361 }
1362
1363 /*
1364 * Determine whether the lowest collected key meets clustering
1365 * requirements. Returns:
1366 *
1367 * 0 - key valid, cluster can be returned.
1368 *
1369 * ENOENT - normal end of scan, return ENOENT.
1370 *
1371 * ESRCH - sufficient elements collected, quorum agreement
1372 * that lokey is not a valid element and should be
1373 * skipped.
1374 *
1375 * EDEADLK - sufficient elements collected, no quorum agreement
1376 * (and no agreement possible). In this situation a
1377 * repair is needed, for now we loop.
1378 *
1379 * EINPROGRESS - insufficient elements collected to resolve, wait
1380 * for event and loop.
1381 */
1382 error = hammer2_cluster_check(&xop->cluster, lokey, keynull);
1383 if (error == EINPROGRESS) {
1384 if (xop->check_counter == check_counter) {
c603b86b
MD
1385 if (flags & HAMMER2_XOP_COLLECT_NOWAIT)
1386 goto done;
c847e838
MD
1387 tsleep_interlock(&xop->check_counter, 0);
1388 cpu_lfence();
1389 if (xop->check_counter == check_counter) {
1390 tsleep(&xop->check_counter, PINTERLOCKED,
1391 "h2coll", hz*60);
1392 }
1393 }
1394 goto loop;
1395 }
1396 if (error == ESRCH) {
1397 if (lokey != HAMMER2_KEY_MAX) {
1398 xop->nkey = lokey + 1;
1399 goto loop;
1400 }
1401 error = ENOENT;
1402 }
1403 if (error == EDEADLK) {
1404 kprintf("hammer2: no quorum possible lkey %016jx\n",
1405 lokey);
1406 if (lokey != HAMMER2_KEY_MAX) {
1407 xop->nkey = lokey + 1;
1408 goto loop;
1409 }
1410 error = ENOENT;
1411 }
1412 if (lokey == HAMMER2_KEY_MAX)
1413 xop->nkey = lokey;
1414 else
1415 xop->nkey = lokey + 1;
c603b86b 1416done:
c847e838 1417 return error;
2ed4fece
MD
1418}
1419
1420/*
1421 * Primary management thread for xops support. Each node has several such
1422 * threads which replicate front-end operations on cluster nodes.
1423 *
1424 * XOPS thread node operations, allowing the function to focus on a single
1425 * node in the cluster after validating the operation with the cluster.
1426 * This is primarily what prevents dead or stalled nodes from stalling
1427 * the front-end.
1428 */
1429void
1430hammer2_primary_xops_thread(void *arg)
1431{
1432 hammer2_thread_t *thr = arg;
1433 hammer2_pfs_t *pmp;
c847e838 1434 hammer2_xop_head_t *xop;
2ed4fece 1435 hammer2_xop_group_t *xgrp;
c847e838 1436 uint32_t mask;
2ed4fece
MD
1437
1438 pmp = thr->pmp;
1439 xgrp = &pmp->xop_groups[thr->repidx];
c847e838 1440 mask = 1U << thr->clindex;
2ed4fece
MD
1441
1442 lockmgr(&thr->lk, LK_EXCLUSIVE);
1443 while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
1444 /*
1445 * Handle freeze request
1446 */
1447 if (thr->flags & HAMMER2_THREAD_FREEZE) {
1448 atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
1449 atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
1450 }
1451
1452 /*
1453 * Force idle if frozen until unfrozen or stopped.
1454 */
1455 if (thr->flags & HAMMER2_THREAD_FROZEN) {
1456 lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
1457 continue;
1458 }
1459
1460 /*
1461 * Reset state on REMASTER request
1462 */
1463 if (thr->flags & HAMMER2_THREAD_REMASTER) {
1464 atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
1465 /* reset state */
1466 }
1467
1468 /*
c847e838
MD
1469 * Process requests. Each request can be multi-queued.
1470 *
1471 * If we get behind and the frontend VOP is no longer active,
1472 * we retire the request without processing it. The callback
1473 * may also abort processing if the frontend VOP becomes
1474 * inactive.
2ed4fece 1475 */
c847e838
MD
1476 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1477 TAILQ_REMOVE(&thr->xopq, xop,
1478 collect[thr->clindex].entry);
1479 if (hammer2_xop_active(xop)) {
1480 lockmgr(&thr->lk, LK_RELEASE);
1481 xop->func((hammer2_xop_t *)xop, thr->clindex);
1482 hammer2_xop_retire(xop, mask);
1483 lockmgr(&thr->lk, LK_EXCLUSIVE);
1484 } else {
1485 hammer2_xop_feed(xop, NULL, thr->clindex,
1486 ECONNABORTED);
1487 hammer2_xop_retire(xop, mask);
2ed4fece 1488 }
2ed4fece
MD
1489 }
1490
1491 /*
1492 * Wait for event.
1493 */
1494 lksleep(&thr->flags, &thr->lk, 0, "h2idle", 0);
1495 }
c847e838
MD
1496
1497 /*
1498 * Cleanup / termination
1499 */
1500 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1501 kprintf("hammer2_thread: aborting xop %p\n", xop->func);
1502 TAILQ_REMOVE(&thr->xopq, xop,
1503 collect[thr->clindex].entry);
1504 hammer2_xop_retire(xop, mask);
1505 }
1506
2ed4fece
MD
1507 thr->td = NULL;
1508 wakeup(thr);
1509 lockmgr(&thr->lk, LK_RELEASE);
1510 /* thr structure can go invalid after this point */
1511}