2 * Copyright (c) 2015 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * This module implements various PFS-based helper threads.
39 #define HAMMER2_THREAD_DEBUG 1
41 static int hammer2_sync_slaves(hammer2_thread_t *thr,
42 hammer2_cluster_t *cparent, int *errors);
43 static void hammer2_update_pfs_status(hammer2_thread_t *thr,
44 hammer2_cluster_t *cparent);
45 static int hammer2_sync_insert(hammer2_thread_t *thr,
46 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
47 hammer2_tid_t modify_tid,
49 static int hammer2_sync_destroy(hammer2_thread_t *thr,
50 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
52 static int hammer2_sync_replace(hammer2_thread_t *thr,
53 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
54 hammer2_tid_t modify_tid,
57 /****************************************************************************
58 * HAMMER2 THREAD API *
59 ****************************************************************************/
61 * Initialize the suspplied thread structure, starting the specified
65 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp,
66 const char *id, int clindex, int repidx,
67 void (*func)(void *arg))
69 lockinit(&thr->lk, "h2thr", 0, 0);
71 thr->clindex = clindex;
73 TAILQ_INIT(&thr->xopq);
75 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
76 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx);
78 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
79 "%s-%s", id, pmp->pfs_names[clindex]);
84 * Terminate a thread. This function will silently return if the thread
85 * was never initialized or has already been deleted.
87 * This is accomplished by setting the STOP flag and waiting for the td
88 * structure to become NULL.
91 hammer2_thr_delete(hammer2_thread_t *thr)
95 lockmgr(&thr->lk, LK_EXCLUSIVE);
96 atomic_set_int(&thr->flags, HAMMER2_THREAD_STOP);
99 lksleep(thr, &thr->lk, 0, "h2thr", hz);
101 lockmgr(&thr->lk, LK_RELEASE);
103 lockuninit(&thr->lk);
107 * Asynchronous remaster request. Ask the synchronization thread to
108 * start over soon (as if it were frozen and unfrozen, but without waiting).
109 * The thread always recalculates mastership relationships when restarting.
112 hammer2_thr_remaster(hammer2_thread_t *thr)
116 lockmgr(&thr->lk, LK_EXCLUSIVE);
117 atomic_set_int(&thr->flags, HAMMER2_THREAD_REMASTER);
119 lockmgr(&thr->lk, LK_RELEASE);
123 hammer2_thr_freeze_async(hammer2_thread_t *thr)
125 atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
130 hammer2_thr_freeze(hammer2_thread_t *thr)
134 lockmgr(&thr->lk, LK_EXCLUSIVE);
135 atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
137 while ((thr->flags & HAMMER2_THREAD_FROZEN) == 0) {
138 lksleep(thr, &thr->lk, 0, "h2frz", hz);
140 lockmgr(&thr->lk, LK_RELEASE);
144 hammer2_thr_unfreeze(hammer2_thread_t *thr)
148 lockmgr(&thr->lk, LK_EXCLUSIVE);
149 atomic_clear_int(&thr->flags, HAMMER2_THREAD_FROZEN);
151 lockmgr(&thr->lk, LK_RELEASE);
154 /****************************************************************************
155 * HAMMER2 SYNC THREADS *
156 ****************************************************************************/
158 * Primary management thread for an element of a node. A thread will exist
159 * for each element requiring management.
161 * No management threads are needed for the SPMP or for any PMP with only
164 * On the SPMP - handles bulkfree and dedup operations
165 * On a PFS - handles remastering and synchronization
168 hammer2_primary_sync_thread(void *arg)
170 hammer2_thread_t *thr = arg;
171 hammer2_cluster_t *cparent;
172 hammer2_chain_t *chain;
174 int errors[HAMMER2_MAXCLUSTER];
179 lockmgr(&thr->lk, LK_EXCLUSIVE);
180 while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
182 * Handle freeze request
184 if (thr->flags & HAMMER2_THREAD_FREEZE) {
185 atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
186 atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
190 * Force idle if frozen until unfrozen or stopped.
192 if (thr->flags & HAMMER2_THREAD_FROZEN) {
193 lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
198 * Reset state on REMASTER request
200 if (thr->flags & HAMMER2_THREAD_REMASTER) {
201 atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
206 * Synchronization scan.
208 hammer2_trans_init(&thr->trans, pmp, HAMMER2_TRANS_KEEPMODIFY);
209 hammer2_inode_lock(pmp->iroot, HAMMER2_RESOLVE_ALWAYS);
210 cparent = hammer2_inode_cluster(pmp->iroot,
211 HAMMER2_RESOLVE_ALWAYS);
212 hammer2_update_pfs_status(thr, cparent);
213 hammer2_inode_unlock(pmp->iroot, NULL);
214 bzero(errors, sizeof(errors));
215 kprintf("sync_slaves clindex %d\n", thr->clindex);
218 * We are the syncer, not a normal frontend operator,
219 * so force cparent good to prime the scan.
221 hammer2_cluster_forcegood(cparent);
222 error = hammer2_sync_slaves(thr, cparent, errors);
224 kprintf("hammer2_sync_slaves: error %d\n", error);
225 chain = cparent->array[thr->clindex].chain;
228 * Retain chain for our node and release the cluster.
230 hammer2_chain_ref(chain);
231 hammer2_chain_lock(chain, HAMMER2_RESOLVE_ALWAYS);
232 hammer2_cluster_unlock(cparent);
233 hammer2_cluster_drop(cparent);
238 hammer2_flush(&thr->trans, chain, 1);
239 hammer2_chain_unlock(chain);
240 hammer2_chain_drop(chain);
242 hammer2_trans_done(&thr->trans);
245 * Wait for event, or 5-second poll.
247 lksleep(&thr->flags, &thr->lk, 0, "h2idle", hz * 5);
251 lockmgr(&thr->lk, LK_RELEASE);
252 /* thr structure can go invalid after this point */
256 * Given a locked cluster created from pmp->iroot, update the PFS's
261 hammer2_update_pfs_status(hammer2_thread_t *thr, hammer2_cluster_t *cparent)
263 hammer2_pfs_t *pmp = thr->pmp;
266 flags = cparent->flags & HAMMER2_CLUSTER_ZFLAGS;
267 if (pmp->flags == flags)
271 kprintf("pfs %p", pmp);
272 if (flags & HAMMER2_CLUSTER_MSYNCED)
273 kprintf(" masters-all-good");
274 if (flags & HAMMER2_CLUSTER_SSYNCED)
275 kprintf(" slaves-all-good");
277 if (flags & HAMMER2_CLUSTER_WRHARD)
278 kprintf(" quorum/rw");
279 else if (flags & HAMMER2_CLUSTER_RDHARD)
280 kprintf(" quorum/ro");
282 if (flags & HAMMER2_CLUSTER_UNHARD)
283 kprintf(" out-of-sync-masters");
284 else if (flags & HAMMER2_CLUSTER_NOHARD)
285 kprintf(" no-masters-visible");
287 if (flags & HAMMER2_CLUSTER_WRSOFT)
289 else if (flags & HAMMER2_CLUSTER_RDSOFT)
292 if (flags & HAMMER2_CLUSTER_UNSOFT)
293 kprintf(" out-of-sync-slaves");
294 else if (flags & HAMMER2_CLUSTER_NOSOFT)
295 kprintf(" no-slaves-visible");
301 dumpcluster(const char *label,
302 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster)
304 hammer2_chain_t *chain;
307 if ((hammer2_debug & 1) == 0)
310 kprintf("%s\t", label);
311 KKASSERT(cparent->nchains == cluster->nchains);
312 for (i = 0; i < cparent->nchains; ++i) {
316 if ((chain = cparent->array[i].chain) != NULL) {
319 ((cparent->array[i].flags &
320 HAMMER2_CITEM_INVALID) ? "(I)" : " ")
323 kprintf(" NULL %s ", " ");
325 if ((chain = cluster->array[i].chain) != NULL) {
328 ((cluster->array[i].flags &
329 HAMMER2_CITEM_INVALID) ? "(I)" : " ")
332 kprintf(" NULL %s ", " ");
339 * TODO - have cparent use a shared lock normally instead of exclusive,
340 * (needs to be upgraded for slave adjustments).
344 hammer2_sync_slaves(hammer2_thread_t *thr, hammer2_cluster_t *cparent,
348 hammer2_cluster_t *cluster;
349 hammer2_cluster_t *scluster;
350 hammer2_chain_t *focus;
351 hammer2_chain_t *chain;
352 hammer2_key_t key_next;
361 idx = thr->clindex; /* cluster node we are responsible for */
364 * Nothing to do if all slaves are synchronized.
365 * Nothing to do if cluster not authoritatively readable.
367 if (pmp->flags & HAMMER2_CLUSTER_SSYNCED)
369 if ((pmp->flags & HAMMER2_CLUSTER_RDHARD) == 0)
370 return(HAMMER2_ERROR_INCOMPLETE);
375 * XXX snapshot the source to provide a stable source to copy.
379 * Update all local slaves (remote slaves are handled by the sync
380 * threads on their respective hosts).
382 * Do a full topology scan, insert/delete elements on slaves as
383 * needed. cparent must be ref'd so we can unlock and relock it
386 * ALLNODES - Allows clusters with a NULL focus to be returned if
387 * elements remain on other nodes.
389 hammer2_cluster_ref(cparent);
390 cluster = hammer2_cluster_lookup(cparent, &key_next,
391 HAMMER2_KEY_MIN, HAMMER2_KEY_MAX,
392 HAMMER2_LOOKUP_NODATA |
393 HAMMER2_LOOKUP_NOLOCK |
394 HAMMER2_LOOKUP_NODIRECT |
395 HAMMER2_LOOKUP_ALLNODES);
396 dumpcluster("lookup", cparent, cluster);
403 * nowork is adjusted during the loop,
404 * dorecursion is calculated here.
407 focus = cluster->focus;
408 if (focus && focus->bref.type == HAMMER2_BREF_TYPE_INODE)
413 if (idx == 3 && (hammer2_debug & 1) && focus)
414 kprintf("scan3 focus %d.%016jx %d.%016jx\n",
415 (cparent ? cparent->focus->bref.type : 0xFF),
416 (cparent ? cparent->focus->bref.key : (uintmax_t)-1LLU),
417 focus->bref.type, focus->bref.key);
420 * Synchronize chains to focus
422 if (idx >= cluster->nchains)
424 chain = cluster->array[idx].chain;
425 if (idx == 3 && (hammer2_debug & 1) && chain)
426 kprintf("scan3 slave %d.%016jx %d.%016jx\n",
427 ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.type : 0xFF),
428 ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.key : (uintmax_t)-1LLU),
429 cluster->array[idx].chain->bref.type,
430 cluster->array[idx].chain->bref.key);
431 if (idx == 3 && (hammer2_debug & 1) && chain == NULL)
432 kprintf("scan3 slave %d.%16jx NULL\n",
433 ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.type : 0xFF),
434 ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.key : (uintmax_t)-1LLU)
438 * Disable recursion for this index and loop up
439 * if a chain error is detected.
441 * A NULL chain is ok, it simply indicates that
442 * the slave reached the end of its scan, but we
443 * might have stuff from the master that still
444 * needs to be copied in.
446 if (chain && chain->error) {
447 kprintf("chain error index %d: %d\n",
449 errors[idx] = chain->error;
450 error = chain->error;
451 cluster->array[idx].flags |= HAMMER2_CITEM_INVALID;
456 * Skip if the slave already has the record (everything
457 * matches including the modify_tid). Note that the
458 * mirror_tid does not have to match, mirror_tid is
459 * a per-block-device entity.
462 (cluster->array[idx].flags & HAMMER2_CITEM_INVALID) == 0) {
467 * Invalid element needs to be updated.
472 * Otherwise adjust the slave. Compare the focus to
473 * the chain. Note that focus and chain can
474 * independently be NULL.
476 KKASSERT(cluster->focus == focus);
479 n = hammer2_chain_cmp(focus, chain);
481 n = -1; /* end-of-scan on slave */
484 n = 1; /* end-of-scan on focus */
486 n = 0; /* end-of-scan on both */
491 * slave chain missing, create missing chain.
493 * If we are going to recurse we have to set
494 * the initial modify_tid to 0 until the
495 * sub-tree is completely synchronized.
496 * Setting (n = 0) in this situation forces
497 * the replacement call to run on the way
498 * back up after the sub-tree has
502 nerror = hammer2_sync_insert(
503 thr, cparent, cluster,
509 nerror = hammer2_sync_insert(
510 thr, cparent, cluster,
511 focus->bref.modify_tid,
516 * excess slave chain, destroy
518 nerror = hammer2_sync_destroy(thr,
521 hammer2_cluster_next_single_chain(
527 HAMMER2_LOOKUP_NODATA |
528 HAMMER2_LOOKUP_NOLOCK |
529 HAMMER2_LOOKUP_NODIRECT |
530 HAMMER2_LOOKUP_ALLNODES);
532 * Re-execute same index, there might be more
533 * items to delete before this slave catches
539 * Key matched but INVALID was set which likely
540 * means that modify_tid is out of sync.
542 * If we are going to recurse we have to do
543 * a partial replacement of the parent to
544 * ensure that the block array is compatible.
545 * For example, the current slave inode might
546 * be flagged DIRECTDATA when the focus is not.
547 * We must set modify_tid to 0 for now and
548 * will fix it when recursion is complete.
550 * If we are not going to recurse we can do
551 * a normal replacement.
553 * focus && chain can both be NULL on a match.
556 nerror = hammer2_sync_replace(
557 thr, cparent, cluster,
561 nerror = hammer2_sync_replace(
562 thr, cparent, cluster,
563 focus->bref.modify_tid,
571 /* finished primary synchronization of chains */
576 * Operation may have modified cparent, we must replace
577 * iroot->cluster if we are at the top level.
580 hammer2_inode_repoint_one(pmp->iroot, cparent, idx);
582 KKASSERT(cluster->focus == focus);
585 * If no work to do this iteration, skip any recursion.
591 * EXECUTE RECURSION (skip if no recursion)
593 * Indirect blocks are absorbed by the iteration so we only
594 * have to recurse on inodes.
596 * Do not resolve scluster, it represents the iteration
597 * parent and while it is logically in-sync the physical
598 * elements might not match due to the presence of indirect
601 if (dorecursion == 0)
603 if (thr->depth > 20) {
604 kprintf("depth limit reached\n");
605 nerror = HAMMER2_ERROR_DEPTH;
607 hammer2_cluster_unlock(cparent);
608 scluster = hammer2_cluster_copy(cluster);
609 hammer2_cluster_lock(scluster, HAMMER2_RESOLVE_ALWAYS);
611 nerror = hammer2_sync_slaves(thr, scluster, errors);
613 hammer2_cluster_unlock(scluster);
614 hammer2_cluster_drop(scluster);
615 /* XXX modify_tid on scluster */
616 /* flush needs to not update modify_tid */
617 hammer2_cluster_lock(cparent, HAMMER2_RESOLVE_ALWAYS);
623 * Fixup parent nodes on the way back up from the recursion
624 * if no error occurred. The modify_tid for these nodes
625 * would have been set to 0 and must be set to their final
628 chain = cluster->array[idx].chain;
629 if (chain == NULL || chain->error)
632 * should not be set but must fixup parents.
633 if ((cluster->array[idx].flags & HAMMER2_CITEM_INVALID) == 0)
638 * At this point we have to have key-matched non-NULL
641 n = hammer2_chain_cmp(focus, chain);
643 kprintf("hammer2_sync_slaves: illegal "
644 "post-recursion state %d\n", n);
649 * Update modify_tid on the way back up.
651 nerror = hammer2_sync_replace(
652 thr, cparent, cluster,
653 focus->bref.modify_tid,
660 * Operation may modify cparent, must replace
661 * iroot->cluster if we are at the top level.
664 hammer2_inode_repoint_one(pmp->iroot, cparent, idx);
671 dumpcluster("adjust", cparent, cluster);
672 cluster = hammer2_cluster_next(cparent, cluster,
676 HAMMER2_LOOKUP_NODATA |
677 HAMMER2_LOOKUP_NOLOCK |
678 HAMMER2_LOOKUP_NODIRECT |
679 HAMMER2_LOOKUP_ALLNODES);
680 dumpcluster("nextcl", cparent, cluster);
682 hammer2_cluster_drop(cparent);
684 hammer2_cluster_drop(cluster);
690 * cparent is locked exclusively, with an extra ref, cluster is not locked.
694 hammer2_sync_insert(hammer2_thread_t *thr,
695 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
696 hammer2_tid_t modify_tid, int i, int *errors)
698 hammer2_chain_t *focus;
699 hammer2_chain_t *chain;
702 focus = cluster->focus;
703 #if HAMMER2_THREAD_DEBUG
704 if (hammer2_debug & 1)
705 kprintf("insert rec par=%p/%d.%016jx slave %d %d.%016jx mod=%016jx\n",
706 cparent->array[i].chain,
707 cparent->array[i].chain->bref.type,
708 cparent->array[i].chain->bref.key,
709 i, focus->bref.type, focus->bref.key, modify_tid);
713 * We have to do a lookup to position ourselves at the correct
714 * parent when inserting a record into a new slave because the
715 * cluster iteration for this slave might not be pointing to the
716 * right place. Our expectation is that the record will not be
719 hammer2_cluster_unlock_except(cparent, i);
720 chain = hammer2_chain_lookup(&cparent->array[i].chain, &dummy,
721 focus->bref.key, focus->bref.key,
722 &cparent->array[i].cache_index,
723 HAMMER2_LOOKUP_NODIRECT);
724 if (cparent->focus_index == i)
725 cparent->focus = cparent->array[i].chain;
726 KKASSERT(chain == NULL);
729 * Create the missing chain.
731 * Have to be careful to avoid deadlocks.
734 if (cluster->focus_index < i)
735 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
736 hammer2_chain_create(&thr->trans, &cparent->array[i].chain,
738 focus->bref.key, focus->bref.keybits,
739 focus->bref.type, focus->bytes,
741 if (cluster->focus_index > i)
742 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
743 if (cparent->focus_index == i)
744 cparent->focus = cparent->array[i].chain;
745 hammer2_chain_modify(&thr->trans, chain, 0);
748 * Copy focus to new chain
751 /* type already set */
752 chain->bref.methods = focus->bref.methods;
753 /* keybits already set */
754 chain->bref.vradix = focus->bref.vradix;
755 /* mirror_tid set by flush */
756 chain->bref.modify_tid = modify_tid;
757 chain->bref.flags = focus->bref.flags;
758 /* key already present */
759 /* check code will be recalculated */
764 switch(chain->bref.type) {
765 case HAMMER2_BREF_TYPE_INODE:
766 if ((focus->data->ipdata.meta.op_flags &
767 HAMMER2_OPFLAG_DIRECTDATA) == 0) {
768 bcopy(focus->data, chain->data,
769 offsetof(hammer2_inode_data_t, u));
773 case HAMMER2_BREF_TYPE_DATA:
774 bcopy(focus->data, chain->data, chain->bytes);
775 hammer2_chain_setcheck(chain, chain->data);
782 hammer2_chain_unlock(focus);
783 hammer2_chain_unlock(chain); /* unlock, leave ref */
786 * Avoid ordering deadlock when relocking cparent.
789 hammer2_cluster_lock_except(cparent, i, HAMMER2_RESOLVE_ALWAYS);
791 hammer2_chain_unlock(cparent->array[i].chain);
792 hammer2_cluster_lock(cparent, HAMMER2_RESOLVE_ALWAYS);
796 * Enter item into (unlocked) cluster.
798 * Must clear invalid for iteration to work properly.
800 if (cluster->array[i].chain)
801 hammer2_chain_drop(cluster->array[i].chain);
802 cluster->array[i].chain = chain;
803 cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
809 * cparent is locked exclusively, with an extra ref, cluster is not locked.
813 hammer2_sync_destroy(hammer2_thread_t *thr,
814 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
817 hammer2_chain_t *chain;
819 chain = cluster->array[i].chain;
820 #if HAMMER2_THREAD_DEBUG
821 if (hammer2_debug & 1)
822 kprintf("destroy rec %p/%p slave %d %d.%016jx\n",
824 i, chain->bref.type, chain->bref.key);
827 * Try to avoid unnecessary I/O.
829 * XXX accounting not propagated up properly. We might have to do
830 * a RESOLVE_MAYBE here and pass 0 for the flags.
832 hammer2_chain_lock(chain, HAMMER2_RESOLVE_NEVER);
833 hammer2_chain_delete(&thr->trans, cparent->array[i].chain, chain,
834 HAMMER2_DELETE_NOSTATS |
835 HAMMER2_DELETE_PERMANENT);
836 hammer2_chain_unlock(chain);
839 * The element is not valid in that it doesn't match the other
840 * elements, but we have to mark it valid here to allow the
841 * cluster_next() call to advance this index to the next element.
843 cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
849 * cparent is locked exclusively, with an extra ref, cluster is not locked.
850 * Replace element [i] in the cluster.
854 hammer2_sync_replace(hammer2_thread_t *thr,
855 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
856 hammer2_tid_t modify_tid, int i, int *errors)
858 hammer2_chain_t *focus;
859 hammer2_chain_t *chain;
863 focus = cluster->focus;
864 chain = cluster->array[i].chain;
865 #if HAMMER2_THREAD_DEBUG
866 if (hammer2_debug & 1)
867 kprintf("replace rec %p/%p slave %d %d.%016jx mod=%016jx\n",
869 i, focus->bref.type, focus->bref.key, modify_tid);
871 if (cluster->focus_index < i)
872 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
873 hammer2_chain_lock(chain, HAMMER2_RESOLVE_ALWAYS);
874 if (cluster->focus_index >= i)
875 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
876 if (chain->bytes != focus->bytes) {
877 /* XXX what if compressed? */
878 nradix = hammer2_getradix(chain->bytes);
879 hammer2_chain_resize(&thr->trans, NULL,
880 cparent->array[i].chain, chain,
883 hammer2_chain_modify(&thr->trans, chain, 0);
884 otype = chain->bref.type;
885 chain->bref.type = focus->bref.type;
886 chain->bref.methods = focus->bref.methods;
887 chain->bref.keybits = focus->bref.keybits;
888 chain->bref.vradix = focus->bref.vradix;
889 /* mirror_tid updated by flush */
890 chain->bref.modify_tid = modify_tid;
891 chain->bref.flags = focus->bref.flags;
892 /* key already present */
893 /* check code will be recalculated */
899 switch(chain->bref.type) {
900 case HAMMER2_BREF_TYPE_INODE:
901 if ((focus->data->ipdata.meta.op_flags &
902 HAMMER2_OPFLAG_DIRECTDATA) == 0) {
904 * If DIRECTDATA is transitioning to 0 or the old
905 * chain is not an inode we have to initialize
908 if (otype != HAMMER2_BREF_TYPE_INODE ||
909 (chain->data->ipdata.meta.op_flags &
910 HAMMER2_OPFLAG_DIRECTDATA)) {
911 kprintf("chain inode trans away from dd\n");
912 bzero(&chain->data->ipdata.u,
913 sizeof(chain->data->ipdata.u));
915 bcopy(focus->data, chain->data,
916 offsetof(hammer2_inode_data_t, u));
917 /* XXX setcheck on inode should not be needed */
918 hammer2_chain_setcheck(chain, chain->data);
922 case HAMMER2_BREF_TYPE_DATA:
923 bcopy(focus->data, chain->data, chain->bytes);
924 hammer2_chain_setcheck(chain, chain->data);
931 hammer2_chain_unlock(focus);
932 hammer2_chain_unlock(chain);
935 * Must clear invalid for iteration to work properly.
937 cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
942 /****************************************************************************
943 * HAMMER2 XOPS THREADS *
944 ****************************************************************************/
947 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp)
949 hammer2_mtx_init(&xgrp->mtx, "h2xopq");
953 * Allocate a XOP request.
955 * Once allocated a XOP request can be started, collected, and retired,
956 * and can be retired early if desired.
958 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get().
961 hammer2_xop_alloc(hammer2_inode_t *ip, hammer2_xop_func_t func)
965 xop = objcache_get(cache_xops, M_WAITOK);
967 xop->head.func = func;
973 xop->head.cluster.nchains = ip->cluster.nchains;
974 xop->head.cluster.pmp = ip->pmp;
975 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED;
978 * run_mask - Active thread (or frontend) associated with XOP
980 xop->head.run_mask = HAMMER2_XOPMASK_VOP;
982 hammer2_inode_ref(ip);
988 * A mounted PFS needs Xops threads to support frontend operations.
991 hammer2_xop_helper_create(hammer2_pfs_t *pmp)
996 kprintf("XOP_HELPER_CREATE: %d\n", pmp->pfs_nmasters);
997 for (i = 0; i < pmp->pfs_nmasters; ++i) {
998 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
999 if (pmp->xop_groups[j].thrs[i].td)
1001 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], pmp,
1003 hammer2_primary_xops_thread);
1009 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp)
1014 for (i = 0; i < pmp->pfs_nmasters; ++i) {
1015 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
1016 if (pmp->xop_groups[j].thrs[i].td)
1017 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]);
1026 * Start a XOP request, queueing it to all nodes in the cluster to
1027 * execute the cluster op.
1029 * XXX optimize single-target case.
1032 hammer2_xop_start(hammer2_xop_head_t *xop)
1034 hammer2_xop_group_t *xgrp;
1035 hammer2_thread_t *thr;
1042 g = pmp->xop_iterator++;
1043 g = g & HAMMER2_XOPGROUPS_MASK;
1044 xgrp = &pmp->xop_groups[g];
1047 for (i = 0; i < xop->ip->cluster.nchains; ++i) {
1048 thr = &xgrp->thrs[i];
1050 lockmgr(&thr->lk, LK_EXCLUSIVE);
1052 (thr->flags & HAMMER2_THREAD_STOP) == 0) {
1053 atomic_set_int(&xop->run_mask, 1U << i);
1054 TAILQ_INSERT_TAIL(&thr->xopq, xop,
1057 lockmgr(&thr->lk, LK_RELEASE);
1058 wakeup(&thr->flags);
1064 * Retire a XOP. Used by both the VOP frontend and by the XOP backend.
1067 hammer2_xop_retire(hammer2_xop_head_t *xop, uint32_t mask)
1069 hammer2_xop_group_t *xgrp;
1070 hammer2_chain_t *chain;
1076 * Remove the frontend or remove a backend feeder. When removing
1077 * the frontend we must wakeup any backend feeders who are waiting
1080 * XXX optimize wakeup.
1082 KKASSERT(xop->run_mask & mask);
1083 if (atomic_fetchadd_int(&xop->run_mask, -mask) != mask) {
1084 if (mask == HAMMER2_XOPMASK_VOP)
1090 * Cleanup the collection cluster.
1092 for (i = 0; i < xop->cluster.nchains; ++i) {
1093 xop->cluster.array[i].flags = 0;
1094 chain = xop->cluster.array[i].chain;
1096 xop->cluster.array[i].chain = NULL;
1097 hammer2_chain_unlock(chain);
1098 hammer2_chain_drop(chain);
1103 * Cleanup the fifos, use check_counter to optimize the loop.
1105 mask = xop->chk_mask;
1106 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) {
1107 hammer2_xop_fifo_t *fifo = &xop->collect[i];
1108 while (fifo->ri != fifo->wi) {
1109 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1111 hammer2_chain_unlock(chain);
1112 hammer2_chain_drop(chain);
1115 if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1116 wakeup(xop); /* XXX optimize */
1122 * The inode is only held at this point, simply drop it.
1125 hammer2_inode_drop(xop->ip);
1129 objcache_put(cache_xops, xop);
1133 * (Backend) Returns non-zero if the frontend is still attached.
1136 hammer2_xop_active(hammer2_xop_head_t *xop)
1138 if (xop->run_mask & HAMMER2_XOPMASK_VOP)
1145 * (Backend) Feed chain data through the cluster validator and back to
1146 * the frontend. Chains are fed from multiple nodes concurrently
1147 * and pipelined via per-node FIFOs in the XOP.
1149 * No xop lock is needed because we are only manipulating fields under
1150 * our direct control.
1152 * Returns 0 on success and a hammer error code if sync is permanently
1156 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain,
1157 int clindex, int error)
1159 hammer2_xop_fifo_t *fifo;
1162 * Multi-threaded entry into the XOP collector. We own the
1163 * fifo->wi for our clindex.
1165 fifo = &xop->collect[clindex];
1167 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1168 tsleep_interlock(xop, 0);
1169 if (hammer2_xop_active(xop) == 0) {
1173 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1174 tsleep(xop, PINTERLOCKED, "h2feed", hz*60);
1178 hammer2_chain_ref(chain);
1179 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error;
1180 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain;
1183 atomic_set_int(&xop->chk_mask, 1U << clindex);
1184 atomic_add_int(&xop->check_counter, 1);
1185 wakeup(&xop->check_counter); /* XXX optimize */
1192 * (Frontend) collect a response from a running cluster op.
1194 * Responses are fed from all appropriate nodes concurrently
1195 * and collected into a cohesive response >= nkey. lkey is
1196 * then set to nkey and nkey is advanced prior to return.
1197 * The caller may depend on xop->lkey reflecting the current
1198 * key of the returned response.
1200 * The collector will return the instant quorum or other requirements
1201 * are met, even if some nodes get behind or become non-responsive.
1203 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection,
1204 * usually called synchronously from the
1205 * node XOPs for the strategy code to
1206 * fake the frontend collection and complete
1207 * the BIO as soon as possible.
1209 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular
1210 * cluster index, prevents looping when that
1211 * index is out of sync so caller can act on
1212 * the out of sync element. ESRCH and EDEADLK
1213 * can be returned if this flag is specified.
1215 * Returns 0 on success plus a filled out xop->cluster structure.
1216 * Return ENOENT on normal termination.
1217 * Otherwise return an error.
1220 hammer2_xop_collect(hammer2_xop_head_t *xop)
1222 hammer2_xop_fifo_t *fifo;
1223 hammer2_chain_t *chain;
1224 hammer2_key_t lokey;
1227 int adv; /* advance the element */
1229 uint32_t check_counter;
1233 * First loop tries to advance pieces of the cluster which
1236 lokey = HAMMER2_KEY_MAX;
1237 keynull = HAMMER2_CHECK_NULL;
1238 check_counter = xop->check_counter;
1241 for (i = 0; i < xop->cluster.nchains; ++i) {
1242 chain = xop->cluster.array[i].chain;
1243 if (chain == NULL) {
1245 } else if (chain->bref.key < xop->nkey) {
1248 keynull &= ~HAMMER2_CHECK_NULL;
1249 if (lokey > chain->bref.key)
1250 lokey = chain->bref.key;
1257 * Advance element if possible, advanced element may be NULL.
1260 hammer2_chain_unlock(chain);
1261 hammer2_chain_drop(chain);
1263 fifo = &xop->collect[i];
1264 if (fifo->ri != fifo->wi) {
1266 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1268 xop->cluster.array[i].chain = chain;
1269 if (chain == NULL) {
1270 xop->cluster.array[i].flags |=
1273 if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1274 wakeup(xop); /* XXX optimize */
1275 --i; /* loop on same index */
1278 * Retain CITEM_NULL flag. If set just repeat EOF.
1279 * If not, the NULL,0 combination indicates an
1280 * operation in-progress.
1282 xop->cluster.array[i].chain = NULL;
1283 /* retain any CITEM_NULL setting */
1288 * Determine whether the lowest collected key meets clustering
1289 * requirements. Returns:
1291 * 0 - key valid, cluster can be returned.
1293 * ENOENT - normal end of scan, return ENOENT.
1295 * ESRCH - sufficient elements collected, quorum agreement
1296 * that lokey is not a valid element and should be
1299 * EDEADLK - sufficient elements collected, no quorum agreement
1300 * (and no agreement possible). In this situation a
1301 * repair is needed, for now we loop.
1303 * EINPROGRESS - insufficient elements collected to resolve, wait
1304 * for event and loop.
1306 error = hammer2_cluster_check(&xop->cluster, lokey, keynull);
1307 if (error == EINPROGRESS) {
1308 if (xop->check_counter == check_counter) {
1309 tsleep_interlock(&xop->check_counter, 0);
1311 if (xop->check_counter == check_counter) {
1312 tsleep(&xop->check_counter, PINTERLOCKED,
1318 if (error == ESRCH) {
1319 if (lokey != HAMMER2_KEY_MAX) {
1320 xop->nkey = lokey + 1;
1325 if (error == EDEADLK) {
1326 kprintf("hammer2: no quorum possible lkey %016jx\n",
1328 if (lokey != HAMMER2_KEY_MAX) {
1329 xop->nkey = lokey + 1;
1334 if (lokey == HAMMER2_KEY_MAX)
1337 xop->nkey = lokey + 1;
1343 * Primary management thread for xops support. Each node has several such
1344 * threads which replicate front-end operations on cluster nodes.
1346 * XOPS thread node operations, allowing the function to focus on a single
1347 * node in the cluster after validating the operation with the cluster.
1348 * This is primarily what prevents dead or stalled nodes from stalling
1352 hammer2_primary_xops_thread(void *arg)
1354 hammer2_thread_t *thr = arg;
1356 hammer2_xop_head_t *xop;
1357 hammer2_xop_group_t *xgrp;
1361 xgrp = &pmp->xop_groups[thr->repidx];
1362 mask = 1U << thr->clindex;
1364 lockmgr(&thr->lk, LK_EXCLUSIVE);
1365 while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
1367 * Handle freeze request
1369 if (thr->flags & HAMMER2_THREAD_FREEZE) {
1370 atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
1371 atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
1375 * Force idle if frozen until unfrozen or stopped.
1377 if (thr->flags & HAMMER2_THREAD_FROZEN) {
1378 lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
1383 * Reset state on REMASTER request
1385 if (thr->flags & HAMMER2_THREAD_REMASTER) {
1386 atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
1391 * Process requests. Each request can be multi-queued.
1393 * If we get behind and the frontend VOP is no longer active,
1394 * we retire the request without processing it. The callback
1395 * may also abort processing if the frontend VOP becomes
1398 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1399 TAILQ_REMOVE(&thr->xopq, xop,
1400 collect[thr->clindex].entry);
1401 if (hammer2_xop_active(xop)) {
1402 lockmgr(&thr->lk, LK_RELEASE);
1403 xop->func((hammer2_xop_t *)xop, thr->clindex);
1404 hammer2_xop_retire(xop, mask);
1405 lockmgr(&thr->lk, LK_EXCLUSIVE);
1407 hammer2_xop_feed(xop, NULL, thr->clindex,
1409 hammer2_xop_retire(xop, mask);
1416 lksleep(&thr->flags, &thr->lk, 0, "h2idle", 0);
1420 * Cleanup / termination
1422 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1423 kprintf("hammer2_thread: aborting xop %p\n", xop->func);
1424 TAILQ_REMOVE(&thr->xopq, xop,
1425 collect[thr->clindex].entry);
1426 hammer2_xop_retire(xop, mask);
1431 lockmgr(&thr->lk, LK_RELEASE);
1432 /* thr structure can go invalid after this point */