2 * Copyright (c) 2015 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * This module implements various PFS-based helper threads.
39 typedef struct hammer2_deferred_ip {
40 struct hammer2_deferred_ip *next;
42 } hammer2_deferred_ip_t;
44 typedef struct hammer2_deferred_list {
45 hammer2_deferred_ip_t *base;
47 } hammer2_deferred_list_t;
50 #define HAMMER2_THREAD_DEBUG 1
52 static int hammer2_sync_slaves(hammer2_thread_t *thr, hammer2_inode_t *ip,
53 hammer2_deferred_list_t *list);
55 static void hammer2_update_pfs_status(hammer2_thread_t *thr, uint32_t flags);
56 nerror = hammer2_sync_insert(
58 focus->bref.modify_tid,
61 static int hammer2_sync_insert(hammer2_thread_t *thr,
62 hammer2_chain_t **parentp, hammer2_chain_t **chainp,
63 hammer2_tid_t modify_tid, int idx,
64 hammer2_chain_t *focus);
65 static int hammer2_sync_destroy(hammer2_thread_t *thr,
66 hammer2_chain_t **parentp, hammer2_chain_t **chainp,
67 hammer2_tid_t mtid, int idx);
68 static int hammer2_sync_replace(hammer2_thread_t *thr,
69 hammer2_chain_t *parent, hammer2_chain_t *chain,
70 hammer2_tid_t mtid, int idx,
71 hammer2_chain_t *focus);
73 /****************************************************************************
74 * HAMMER2 THREAD API *
75 ****************************************************************************/
77 * Initialize the suspplied thread structure, starting the specified
81 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp,
82 const char *id, int clindex, int repidx,
83 void (*func)(void *arg))
85 lockinit(&thr->lk, "h2thr", 0, 0);
87 thr->clindex = clindex;
89 TAILQ_INIT(&thr->xopq);
91 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
92 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx);
94 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
95 "%s-%s", id, pmp->pfs_names[clindex]);
100 * Terminate a thread. This function will silently return if the thread
101 * was never initialized or has already been deleted.
103 * This is accomplished by setting the STOP flag and waiting for the td
104 * structure to become NULL.
107 hammer2_thr_delete(hammer2_thread_t *thr)
111 lockmgr(&thr->lk, LK_EXCLUSIVE);
112 atomic_set_int(&thr->flags, HAMMER2_THREAD_STOP);
115 lksleep(thr, &thr->lk, 0, "h2thr", hz);
117 lockmgr(&thr->lk, LK_RELEASE);
119 lockuninit(&thr->lk);
123 * Asynchronous remaster request. Ask the synchronization thread to
124 * start over soon (as if it were frozen and unfrozen, but without waiting).
125 * The thread always recalculates mastership relationships when restarting.
128 hammer2_thr_remaster(hammer2_thread_t *thr)
132 lockmgr(&thr->lk, LK_EXCLUSIVE);
133 atomic_set_int(&thr->flags, HAMMER2_THREAD_REMASTER);
135 lockmgr(&thr->lk, LK_RELEASE);
139 hammer2_thr_freeze_async(hammer2_thread_t *thr)
141 atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
146 hammer2_thr_freeze(hammer2_thread_t *thr)
150 lockmgr(&thr->lk, LK_EXCLUSIVE);
151 atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
153 while ((thr->flags & HAMMER2_THREAD_FROZEN) == 0) {
154 lksleep(thr, &thr->lk, 0, "h2frz", hz);
156 lockmgr(&thr->lk, LK_RELEASE);
160 hammer2_thr_unfreeze(hammer2_thread_t *thr)
164 lockmgr(&thr->lk, LK_EXCLUSIVE);
165 atomic_clear_int(&thr->flags, HAMMER2_THREAD_FROZEN);
167 lockmgr(&thr->lk, LK_RELEASE);
172 hammer2_thr_break(hammer2_thread_t *thr)
174 if (thr->flags & (HAMMER2_THREAD_STOP |
175 HAMMER2_THREAD_REMASTER |
176 HAMMER2_THREAD_FREEZE)) {
182 /****************************************************************************
183 * HAMMER2 SYNC THREADS *
184 ****************************************************************************/
186 * Primary management thread for an element of a node. A thread will exist
187 * for each element requiring management.
189 * No management threads are needed for the SPMP or for any PMP with only
192 * On the SPMP - handles bulkfree and dedup operations
193 * On a PFS - handles remastering and synchronization
196 hammer2_primary_sync_thread(void *arg)
198 hammer2_thread_t *thr = arg;
200 hammer2_deferred_list_t list;
201 hammer2_deferred_ip_t *defer;
205 bzero(&list, sizeof(list));
207 lockmgr(&thr->lk, LK_EXCLUSIVE);
208 while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
210 * Handle freeze request
212 if (thr->flags & HAMMER2_THREAD_FREEZE) {
213 atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
214 atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
218 * Force idle if frozen until unfrozen or stopped.
220 if (thr->flags & HAMMER2_THREAD_FROZEN) {
221 lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
226 * Reset state on REMASTER request
228 if (thr->flags & HAMMER2_THREAD_REMASTER) {
229 atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
234 * Synchronization scan.
236 kprintf("sync_slaves pfs %s clindex %d\n",
237 pmp->pfs_names[thr->clindex], thr->clindex);
238 hammer2_trans_init(pmp, 0);
240 hammer2_inode_ref(pmp->iroot);
244 /* XXX lock synchronize pmp->modify_tid */
245 error = hammer2_sync_slaves(thr, pmp->iroot, &list);
248 while ((defer = list.base) != NULL) {
249 hammer2_inode_t *nip;
252 error = hammer2_sync_slaves(thr, nip, &list);
253 if (error && error != EAGAIN)
255 if (hammer2_thr_break(thr)) {
261 * If no additional defers occurred we can
262 * remove this one, otherwrise keep it on
263 * the list and retry once the additional
264 * defers have completed.
266 if (defer == list.base) {
268 list.base = defer->next;
269 kfree(defer, M_HAMMER2);
270 defer = NULL; /* safety */
271 hammer2_inode_drop(nip);
276 * If the thread is being remastered, frozen, or
277 * stopped, clean up any left-over deferals.
279 if (didbreak || (error && error != EAGAIN)) {
280 kprintf("didbreak\n");
281 while ((defer = list.base) != NULL) {
283 hammer2_inode_drop(defer->ip);
284 list.base = defer->next;
285 kfree(defer, M_HAMMER2);
287 if (error == 0 || error == EAGAIN)
293 hammer2_inode_drop(pmp->iroot);
294 hammer2_trans_done(pmp);
297 kprintf("hammer2_sync_slaves: error %d\n", error);
300 * Wait for event, or 5-second poll.
302 lksleep(&thr->flags, &thr->lk, 0, "h2idle", hz * 5);
306 lockmgr(&thr->lk, LK_RELEASE);
307 /* thr structure can go invalid after this point */
312 * Given a locked cluster created from pmp->iroot, update the PFS's
317 hammer2_update_pfs_status(hammer2_thread_t *thr, uint32_t flags)
319 hammer2_pfs_t *pmp = thr->pmp;
321 flags &= HAMMER2_CLUSTER_ZFLAGS;
322 if (pmp->cluster_flags == flags)
324 pmp->cluster_flags = flags;
326 kprintf("pfs %p", pmp);
327 if (flags & HAMMER2_CLUSTER_MSYNCED)
328 kprintf(" masters-all-good");
329 if (flags & HAMMER2_CLUSTER_SSYNCED)
330 kprintf(" slaves-all-good");
332 if (flags & HAMMER2_CLUSTER_WRHARD)
333 kprintf(" quorum/rw");
334 else if (flags & HAMMER2_CLUSTER_RDHARD)
335 kprintf(" quorum/ro");
337 if (flags & HAMMER2_CLUSTER_UNHARD)
338 kprintf(" out-of-sync-masters");
339 else if (flags & HAMMER2_CLUSTER_NOHARD)
340 kprintf(" no-masters-visible");
342 if (flags & HAMMER2_CLUSTER_WRSOFT)
344 else if (flags & HAMMER2_CLUSTER_RDSOFT)
347 if (flags & HAMMER2_CLUSTER_UNSOFT)
348 kprintf(" out-of-sync-slaves");
349 else if (flags & HAMMER2_CLUSTER_NOSOFT)
350 kprintf(" no-slaves-visible");
358 dumpcluster(const char *label,
359 hammer2_cluster_t *cparent, hammer2_cluster_t *cluster)
361 hammer2_chain_t *chain;
364 if ((hammer2_debug & 1) == 0)
367 kprintf("%s\t", label);
368 KKASSERT(cparent->nchains == cluster->nchains);
369 for (i = 0; i < cparent->nchains; ++i) {
373 if ((chain = cparent->array[i].chain) != NULL) {
376 ((cparent->array[i].flags &
377 HAMMER2_CITEM_INVALID) ? "(I)" : " ")
380 kprintf(" NULL %s ", " ");
382 if ((chain = cluster->array[i].chain) != NULL) {
385 ((cluster->array[i].flags &
386 HAMMER2_CITEM_INVALID) ? "(I)" : " ")
389 kprintf(" NULL %s ", " ");
397 * Each out of sync node sync-thread must issue an all-nodes XOP scan of
398 * the inode. This creates a multiplication effect since the XOP scan itself
399 * issues to all nodes. However, this is the only way we can safely
400 * synchronize nodes which might have disparate I/O bandwidths and the only
401 * way we can safely deal with stalled nodes.
405 hammer2_sync_slaves(hammer2_thread_t *thr, hammer2_inode_t *ip,
406 hammer2_deferred_list_t *list)
408 hammer2_xop_scanall_t *xop;
409 hammer2_chain_t *parent;
410 hammer2_chain_t *chain;
412 hammer2_key_t key_next;
413 hammer2_tid_t sync_tid;
414 int cache_index = -1;
423 idx = thr->clindex; /* cluster node we are responsible for */
427 if (ip->cluster.focus == NULL)
428 return (EINPROGRESS);
429 sync_tid = ip->cluster.focus->bref.modify_tid;
433 * Nothing to do if all slaves are synchronized.
434 * Nothing to do if cluster not authoritatively readable.
436 if (pmp->cluster_flags & HAMMER2_CLUSTER_SSYNCED)
438 if ((pmp->cluster_flags & HAMMER2_CLUSTER_RDHARD) == 0)
439 return(HAMMER2_ERROR_INCOMPLETE);
445 * The inode is left unlocked during the scan. Issue a XOP
446 * that does *not* include our cluster index to iterate
447 * properly synchronized elements and resolve our cluster index
450 hammer2_inode_lock(ip, HAMMER2_RESOLVE_SHARED);
451 xop = hammer2_xop_alloc(ip, HAMMER2_XOP_MODIFYING);
452 xop->key_beg = HAMMER2_KEY_MIN;
453 xop->key_end = HAMMER2_KEY_MAX;
454 hammer2_xop_start_except(&xop->head, hammer2_xop_scanall, idx);
455 parent = hammer2_inode_chain(ip, idx,
456 HAMMER2_RESOLVE_ALWAYS |
457 HAMMER2_RESOLVE_SHARED);
458 if (parent->bref.modify_tid != sync_tid)
461 hammer2_inode_unlock(ip);
463 chain = hammer2_chain_lookup(&parent, &key_next,
464 HAMMER2_KEY_MIN, HAMMER2_KEY_MAX,
466 HAMMER2_LOOKUP_SHARED |
467 HAMMER2_LOOKUP_NODIRECT |
468 HAMMER2_LOOKUP_NODATA);
469 error = hammer2_xop_collect(&xop->head, 0);
470 kprintf("XOP_INITIAL xop=%p clindex %d on %s\n", xop, thr->clindex,
471 pmp->pfs_names[thr->clindex]);
475 * We are done if our scan is done and the XOP scan is done.
476 * We are done if the XOP scan failed (that is, we don't
477 * have authoritative data to synchronize with).
479 int advance_local = 0;
482 hammer2_chain_t *focus;
484 kprintf("loop xop=%p chain[1]=%p lockcnt=%d\n",
485 xop, xop->head.cluster.array[1].chain,
486 (xop->head.cluster.array[1].chain ?
487 xop->head.cluster.array[1].chain->lockcnt : -1)
490 if (chain == NULL && error == ENOENT)
492 if (error && error != ENOENT)
498 if (chain && error == ENOENT) {
500 * If we have local chains but the XOP scan is done,
501 * the chains need to be deleted.
505 } else if (chain == NULL) {
507 * If our local scan is done but the XOP scan is not,
508 * we need to create the missing chain(s).
511 focus = xop->head.cluster.focus;
514 * Otherwise compare to determine the action
517 focus = xop->head.cluster.focus;
518 n = hammer2_chain_cmp(chain, focus);
522 * Take action based on comparison results.
526 * Delete extranious local data. This will
527 * automatically advance the chain.
529 nerror = hammer2_sync_destroy(thr, &parent, &chain,
531 } else if (n == 0 && chain->bref.modify_tid !=
532 focus->bref.modify_tid) {
534 * Matching key but local data or meta-data requires
535 * updating. If we will recurse, we still need to
536 * update to compatible content first but we do not
537 * synchronize modify_tid until the entire recursion
538 * has completed successfully.
540 if (focus->bref.type == HAMMER2_BREF_TYPE_INODE) {
541 nerror = hammer2_sync_replace(
547 nerror = hammer2_sync_replace(
549 focus->bref.modify_tid,
554 * 100% match, advance both
561 * Insert missing local data.
563 * If we will recurse, we still need to update to
564 * compatible content first but we do not synchronize
565 * modify_tid until the entire recursion has
566 * completed successfully.
568 if (focus->bref.type == HAMMER2_BREF_TYPE_INODE) {
569 nerror = hammer2_sync_insert(
570 thr, &parent, &chain,
575 nerror = hammer2_sync_insert(
576 thr, &parent, &chain,
577 focus->bref.modify_tid,
585 * We cannot recurse depth-first because the XOP is still
586 * running in node threads for this scan. Create a placemarker
587 * by obtaining and record the hammer2_inode.
589 * We excluded our node from the XOP so we must temporarily
590 * add it to xop->head.cluster so it is properly incorporated
593 * The deferral is pushed onto a LIFO list for bottom-up
596 if (error == 0 && dodefer) {
597 hammer2_inode_t *nip;
598 hammer2_deferred_ip_t *defer;
600 KKASSERT(focus->bref.type == HAMMER2_BREF_TYPE_INODE);
602 defer = kmalloc(sizeof(*defer), M_HAMMER2,
604 KKASSERT(xop->head.cluster.array[idx].chain == NULL);
605 xop->head.cluster.array[idx].flags =
606 HAMMER2_CITEM_INVALID;
607 xop->head.cluster.array[idx].chain = chain;
608 nip = hammer2_inode_get(pmp, ip,
609 &xop->head.cluster, idx);
610 xop->head.cluster.array[idx].chain = NULL;
612 hammer2_inode_ref(nip);
613 hammer2_inode_unlock(nip);
615 defer->next = list->base;
623 * If at least one deferral was added and the deferral
624 * list has grown too large, stop adding more. This
625 * will trigger an EAGAIN return.
627 if (needrescan && list->count > 1000)
631 * Advancements for iteration.
634 error = hammer2_xop_collect(&xop->head, 0);
637 chain = hammer2_chain_next(&parent, chain, &key_next,
638 key_next, HAMMER2_KEY_MAX,
640 HAMMER2_LOOKUP_SHARED |
641 HAMMER2_LOOKUP_NODIRECT |
642 HAMMER2_LOOKUP_NODATA);
645 hammer2_xop_retire(&xop->head, HAMMER2_XOPMASK_VOP);
647 hammer2_chain_unlock(chain);
648 hammer2_chain_drop(chain);
651 hammer2_chain_unlock(parent);
652 hammer2_chain_drop(parent);
656 * If we added deferrals we want the caller to synchronize them
657 * and then call us again.
659 * NOTE: In this situation we do not yet want to synchronize our
660 * inode, setting the error code also has that effect.
662 if (error == 0 && needrescan)
666 * If no error occurred and work was performed, synchronize the
667 * inode meta-data itself.
669 * XXX inode lock was lost
671 if (error == 0 && wantupdate) {
672 hammer2_xop_ipcluster_t *xop2;
673 hammer2_chain_t *focus;
675 xop2 = hammer2_xop_alloc(ip, HAMMER2_XOP_MODIFYING);
676 hammer2_xop_start_except(&xop2->head, hammer2_xop_ipcluster,
678 error = hammer2_xop_collect(&xop2->head, 0);
680 focus = xop2->head.cluster.focus;
681 kprintf("syncthr: update inode %p (%s)\n",
684 (char *)focus->data->ipdata.filename : "?"));
685 chain = hammer2_inode_chain_and_parent(ip, idx,
687 HAMMER2_RESOLVE_ALWAYS |
688 HAMMER2_RESOLVE_SHARED);
690 KKASSERT(parent != NULL);
691 nerror = hammer2_sync_replace(
695 hammer2_chain_unlock(chain);
696 hammer2_chain_drop(chain);
697 hammer2_chain_unlock(parent);
698 hammer2_chain_drop(parent);
701 hammer2_xop_retire(&xop2->head, HAMMER2_XOPMASK_VOP);
708 * Create a missing chain by copying the focus from another device.
710 * On entry *parentp and focus are both locked shared. The chain will be
711 * created and returned in *chainp also locked shared.
715 hammer2_sync_insert(hammer2_thread_t *thr,
716 hammer2_chain_t **parentp, hammer2_chain_t **chainp,
717 hammer2_tid_t mtid, int idx, hammer2_chain_t *focus)
719 hammer2_chain_t *chain;
721 #if HAMMER2_THREAD_DEBUG
722 if (hammer2_debug & 1)
723 kprintf("insert rec par=%p/%d.%016jx slave %d %d.%016jx mod=%016jx\n",
725 (*parentp)->bref.type,
726 (*parentp)->bref.key,
728 focus->bref.type, focus->bref.key, mtid);
732 * Create the missing chain. Exclusive locks are needed.
734 * Have to be careful to avoid deadlocks.
737 hammer2_chain_unlock(*chainp);
738 hammer2_chain_unlock(*parentp);
739 hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_ALWAYS);
740 /* reissue lookup? */
743 hammer2_chain_create(parentp, &chain, thr->pmp,
744 focus->bref.key, focus->bref.keybits,
745 focus->bref.type, focus->bytes,
747 hammer2_chain_modify(chain, mtid, 0);
750 * Copy focus to new chain
753 /* type already set */
754 chain->bref.methods = focus->bref.methods;
755 /* keybits already set */
756 chain->bref.vradix = focus->bref.vradix;
757 /* mirror_tid set by flush */
758 KKASSERT(chain->bref.modify_tid == mtid);
759 chain->bref.flags = focus->bref.flags;
760 /* key already present */
761 /* check code will be recalculated */
766 switch(chain->bref.type) {
767 case HAMMER2_BREF_TYPE_INODE:
768 if ((focus->data->ipdata.meta.op_flags &
769 HAMMER2_OPFLAG_DIRECTDATA) == 0) {
770 bcopy(focus->data, chain->data,
771 offsetof(hammer2_inode_data_t, u));
775 case HAMMER2_BREF_TYPE_DATA:
776 bcopy(focus->data, chain->data, chain->bytes);
777 hammer2_chain_setcheck(chain, chain->data);
784 hammer2_chain_unlock(chain); /* unlock, leave ref */
786 hammer2_chain_drop(*chainp);
787 *chainp = chain; /* will be returned locked */
790 * Avoid ordering deadlock when relocking.
792 hammer2_chain_unlock(*parentp);
793 hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_SHARED |
794 HAMMER2_RESOLVE_ALWAYS);
795 hammer2_chain_lock(chain, HAMMER2_RESOLVE_SHARED |
796 HAMMER2_RESOLVE_ALWAYS);
802 * Destroy an extranious chain.
804 * Both *parentp and *chainp are locked shared.
806 * On return, *chainp will be adjusted to point to the next element in the
807 * iteration and locked shared.
811 hammer2_sync_destroy(hammer2_thread_t *thr,
812 hammer2_chain_t **parentp, hammer2_chain_t **chainp,
813 hammer2_tid_t mtid, int idx)
815 hammer2_chain_t *chain;
816 hammer2_chain_t *parent;
817 hammer2_key_t key_next;
818 hammer2_key_t save_key;
819 int cache_index = -1;
823 #if HAMMER2_THREAD_DEBUG
824 if (hammer2_debug & 1)
825 kprintf("destroy rec %p/%p slave %d %d.%016jx\n",
827 idx, chain->bref.type, chain->bref.key);
830 save_key = chain->bref.key;
831 if (save_key != HAMMER2_KEY_MAX)
835 * Try to avoid unnecessary I/O.
837 * XXX accounting not propagated up properly. We might have to do
838 * a RESOLVE_MAYBE here and pass 0 for the flags.
840 hammer2_chain_unlock(chain); /* relock exclusive */
841 hammer2_chain_unlock(*parentp);
842 hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_ALWAYS);
843 hammer2_chain_lock(chain, HAMMER2_RESOLVE_NEVER);
845 hammer2_chain_delete(*parentp, chain, mtid, HAMMER2_DELETE_PERMANENT);
846 hammer2_chain_unlock(chain);
847 hammer2_chain_drop(chain);
848 chain = NULL; /* safety */
850 hammer2_chain_unlock(*parentp); /* relock shared */
851 hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_SHARED |
852 HAMMER2_RESOLVE_ALWAYS);
853 *chainp = hammer2_chain_lookup(&parent, &key_next,
854 save_key, HAMMER2_KEY_MAX,
856 HAMMER2_LOOKUP_SHARED |
857 HAMMER2_LOOKUP_NODIRECT |
858 HAMMER2_LOOKUP_NODATA);
863 * cparent is locked exclusively, with an extra ref, cluster is not locked.
864 * Replace element [i] in the cluster.
868 hammer2_sync_replace(hammer2_thread_t *thr,
869 hammer2_chain_t *parent, hammer2_chain_t *chain,
870 hammer2_tid_t mtid, int idx,
871 hammer2_chain_t *focus)
876 #if HAMMER2_THREAD_DEBUG
877 if (hammer2_debug & 1)
878 kprintf("replace rec %p slave %d %d.%016jx mod=%016jx\n",
881 focus->bref.type, focus->bref.key, mtid);
883 hammer2_chain_unlock(chain);
884 hammer2_chain_lock(chain, HAMMER2_RESOLVE_ALWAYS);
885 if (chain->bytes != focus->bytes) {
886 /* XXX what if compressed? */
887 nradix = hammer2_getradix(chain->bytes);
888 hammer2_chain_resize(NULL, parent, chain,
891 hammer2_chain_modify(chain, mtid, 0);
892 otype = chain->bref.type;
893 chain->bref.type = focus->bref.type;
894 chain->bref.methods = focus->bref.methods;
895 chain->bref.keybits = focus->bref.keybits;
896 chain->bref.vradix = focus->bref.vradix;
897 /* mirror_tid updated by flush */
898 KKASSERT(chain->bref.modify_tid == mtid);
899 chain->bref.flags = focus->bref.flags;
900 /* key already present */
901 /* check code will be recalculated */
907 switch(chain->bref.type) {
908 case HAMMER2_BREF_TYPE_INODE:
909 if ((focus->data->ipdata.meta.op_flags &
910 HAMMER2_OPFLAG_DIRECTDATA) == 0) {
912 * If DIRECTDATA is transitioning to 0 or the old
913 * chain is not an inode we have to initialize
916 if (otype != HAMMER2_BREF_TYPE_INODE ||
917 (chain->data->ipdata.meta.op_flags &
918 HAMMER2_OPFLAG_DIRECTDATA)) {
919 kprintf("chain inode trans away from dd\n");
920 bzero(&chain->data->ipdata.u,
921 sizeof(chain->data->ipdata.u));
923 bcopy(focus->data, chain->data,
924 offsetof(hammer2_inode_data_t, u));
925 /* XXX setcheck on inode should not be needed */
926 hammer2_chain_setcheck(chain, chain->data);
930 case HAMMER2_BREF_TYPE_DATA:
931 bcopy(focus->data, chain->data, chain->bytes);
932 hammer2_chain_setcheck(chain, chain->data);
939 hammer2_chain_unlock(chain);
940 hammer2_chain_lock(chain, HAMMER2_RESOLVE_SHARED |
941 HAMMER2_RESOLVE_MAYBE);
946 /****************************************************************************
947 * HAMMER2 XOPS THREADS *
948 ****************************************************************************/
951 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp)
953 hammer2_mtx_init(&xgrp->mtx, "h2xopq");
954 hammer2_mtx_init(&xgrp->mtx2, "h2xopio");
958 * Allocate a XOP request.
960 * Once allocated a XOP request can be started, collected, and retired,
961 * and can be retired early if desired.
963 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get().
966 hammer2_xop_alloc(hammer2_inode_t *ip, int flags)
970 xop = objcache_get(cache_xops, M_WAITOK);
971 KKASSERT(xop->head.cluster.array[0].chain == NULL);
973 xop->head.func = NULL;
976 xop->head.collect_key = 0;
977 if (flags & HAMMER2_XOP_MODIFYING)
978 xop->head.mtid = hammer2_trans_sub(ip->pmp);
982 xop->head.cluster.nchains = ip->cluster.nchains;
983 xop->head.cluster.pmp = ip->pmp;
984 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED;
987 * run_mask - Active thread (or frontend) associated with XOP
989 xop->head.run_mask = HAMMER2_XOPMASK_VOP;
991 hammer2_inode_ref(ip);
997 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len)
999 xop->name = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO);
1000 xop->name_len = name_len;
1001 bcopy(name, xop->name, name_len);
1005 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len)
1007 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO);
1008 xop->name2_len = name_len;
1009 bcopy(name, xop->name2, name_len);
1014 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2)
1017 hammer2_inode_ref(ip2);
1021 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3)
1024 hammer2_inode_ref(ip3);
1028 hammer2_xop_reinit(hammer2_xop_head_t *xop)
1032 xop->collect_key = 0;
1033 xop->run_mask = HAMMER2_XOPMASK_VOP;
1037 * A mounted PFS needs Xops threads to support frontend operations.
1040 hammer2_xop_helper_create(hammer2_pfs_t *pmp)
1045 lockmgr(&pmp->lock, LK_EXCLUSIVE);
1046 pmp->has_xop_threads = 1;
1048 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) {
1049 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
1050 if (pmp->xop_groups[j].thrs[i].td)
1052 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], pmp,
1054 hammer2_primary_xops_thread);
1057 lockmgr(&pmp->lock, LK_RELEASE);
1061 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp)
1066 for (i = 0; i < pmp->pfs_nmasters; ++i) {
1067 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
1068 if (pmp->xop_groups[j].thrs[i].td)
1069 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]);
1075 * Start a XOP request, queueing it to all nodes in the cluster to
1076 * execute the cluster op.
1078 * XXX optimize single-target case.
1081 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_func_t func,
1084 hammer2_xop_group_t *xgrp;
1085 hammer2_thread_t *thr;
1091 if (pmp->has_xop_threads == 0)
1092 hammer2_xop_helper_create(pmp);
1094 g = pmp->xop_iterator++;
1095 g = g & HAMMER2_XOPGROUPS_MASK;
1096 xgrp = &pmp->xop_groups[g];
1100 /* XXX do cluster_resolve or cluster_check here, only start
1101 * synchronized elements */
1103 for (i = 0; i < xop->ip->cluster.nchains; ++i) {
1104 thr = &xgrp->thrs[i];
1105 if (thr->td && i != notidx) {
1106 lockmgr(&thr->lk, LK_EXCLUSIVE);
1108 (thr->flags & HAMMER2_THREAD_STOP) == 0) {
1109 atomic_set_int(&xop->run_mask, 1U << i);
1110 atomic_set_int(&xop->chk_mask, 1U << i);
1111 TAILQ_INSERT_TAIL(&thr->xopq, xop,
1114 lockmgr(&thr->lk, LK_RELEASE);
1115 wakeup(&thr->flags);
1121 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_func_t func)
1123 hammer2_xop_start_except(xop, func, -1);
1127 * Retire a XOP. Used by both the VOP frontend and by the XOP backend.
1130 hammer2_xop_retire(hammer2_xop_head_t *xop, uint32_t mask)
1132 hammer2_xop_group_t *xgrp;
1133 hammer2_chain_t *chain;
1139 * Remove the frontend or remove a backend feeder. When removing
1140 * the frontend we must wakeup any backend feeders who are waiting
1143 * XXX optimize wakeup.
1145 KKASSERT(xop->run_mask & mask);
1146 if (atomic_fetchadd_int(&xop->run_mask, -mask) != mask) {
1147 if (mask == HAMMER2_XOPMASK_VOP)
1153 * Cleanup the collection cluster.
1155 for (i = 0; i < xop->cluster.nchains; ++i) {
1156 xop->cluster.array[i].flags = 0;
1157 chain = xop->cluster.array[i].chain;
1159 xop->cluster.array[i].chain = NULL;
1160 hammer2_chain_unlock(chain);
1161 hammer2_chain_drop(chain);
1166 * Cleanup the fifos, use check_counter to optimize the loop.
1168 mask = xop->chk_mask;
1169 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) {
1170 hammer2_xop_fifo_t *fifo = &xop->collect[i];
1171 while (fifo->ri != fifo->wi) {
1172 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1174 hammer2_chain_unlock(chain);
1175 hammer2_chain_drop(chain);
1178 if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1179 wakeup(xop); /* XXX optimize */
1185 * The inode is only held at this point, simply drop it.
1188 hammer2_inode_drop(xop->ip);
1192 hammer2_inode_drop(xop->ip2);
1196 hammer2_inode_drop(xop->ip3);
1200 kfree(xop->name, M_HAMMER2);
1205 kfree(xop->name2, M_HAMMER2);
1210 objcache_put(cache_xops, xop);
1214 * (Backend) Returns non-zero if the frontend is still attached.
1217 hammer2_xop_active(hammer2_xop_head_t *xop)
1219 if (xop->run_mask & HAMMER2_XOPMASK_VOP)
1226 * (Backend) Feed chain data through the cluster validator and back to
1227 * the frontend. Chains are fed from multiple nodes concurrently
1228 * and pipelined via per-node FIFOs in the XOP.
1230 * No xop lock is needed because we are only manipulating fields under
1231 * our direct control.
1233 * Returns 0 on success and a hammer error code if sync is permanently
1234 * lost. The caller retains a ref on the chain but by convention
1235 * the lock is typically inherited by the xop (caller loses lock).
1237 * Returns non-zero on error. In this situation the caller retains a
1238 * ref on the chain but loses the lock (we unlock here).
1240 * WARNING! The chain is moving between two different threads, it must
1241 * be locked SHARED to retain its data mapping, not exclusive.
1242 * When multiple operations are in progress at once, chains fed
1243 * back to the frontend for collection can wind up being locked
1244 * in different orders, only a shared lock can prevent a deadlock.
1246 * Exclusive locks may only be used by a XOP backend node thread
1247 * temporarily, with no direct or indirect dependencies (aka
1248 * blocking/waiting) on other nodes.
1251 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain,
1252 int clindex, int error)
1254 hammer2_xop_fifo_t *fifo;
1257 * Multi-threaded entry into the XOP collector. We own the
1258 * fifo->wi for our clindex.
1260 fifo = &xop->collect[clindex];
1262 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1263 tsleep_interlock(xop, 0);
1264 if (hammer2_xop_active(xop) == 0) {
1268 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1269 tsleep(xop, PINTERLOCKED, "h2feed", hz*60);
1273 hammer2_chain_ref(chain);
1274 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error;
1275 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain;
1278 atomic_add_int(&xop->check_counter, 1);
1279 wakeup(&xop->check_counter); /* XXX optimize */
1283 * Cleanup. If an error occurred we eat the lock. If no error
1284 * occurred the fifo inherits the lock and gains an additional ref.
1286 * The caller's ref remains in both cases.
1290 hammer2_chain_unlock(chain);
1295 * (Frontend) collect a response from a running cluster op.
1297 * Responses are fed from all appropriate nodes concurrently
1298 * and collected into a cohesive response >= collect_key.
1300 * The collector will return the instant quorum or other requirements
1301 * are met, even if some nodes get behind or become non-responsive.
1303 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection,
1304 * usually called synchronously from the
1305 * node XOPs for the strategy code to
1306 * fake the frontend collection and complete
1307 * the BIO as soon as possible.
1309 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular
1310 * cluster index, prevents looping when that
1311 * index is out of sync so caller can act on
1312 * the out of sync element. ESRCH and EDEADLK
1313 * can be returned if this flag is specified.
1315 * Returns 0 on success plus a filled out xop->cluster structure.
1316 * Return ENOENT on normal termination.
1317 * Otherwise return an error.
1320 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags)
1322 hammer2_xop_fifo_t *fifo;
1323 hammer2_chain_t *chain;
1324 hammer2_key_t lokey;
1327 int adv; /* advance the element */
1329 uint32_t check_counter;
1333 * First loop tries to advance pieces of the cluster which
1336 lokey = HAMMER2_KEY_MAX;
1337 keynull = HAMMER2_CHECK_NULL;
1338 check_counter = xop->check_counter;
1341 for (i = 0; i < xop->cluster.nchains; ++i) {
1342 chain = xop->cluster.array[i].chain;
1343 if (chain == NULL) {
1345 } else if (chain->bref.key < xop->collect_key) {
1348 keynull &= ~HAMMER2_CHECK_NULL;
1349 if (lokey > chain->bref.key)
1350 lokey = chain->bref.key;
1357 * Advance element if possible, advanced element may be NULL.
1360 hammer2_chain_unlock(chain);
1361 hammer2_chain_drop(chain);
1363 fifo = &xop->collect[i];
1364 if (fifo->ri != fifo->wi) {
1366 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1368 xop->cluster.array[i].chain = chain;
1369 if (chain == NULL) {
1371 xop->cluster.array[i].flags |=
1374 if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1375 wakeup(xop); /* XXX optimize */
1376 --i; /* loop on same index */
1379 * Retain CITEM_NULL flag. If set just repeat EOF.
1380 * If not, the NULL,0 combination indicates an
1381 * operation in-progress.
1383 xop->cluster.array[i].chain = NULL;
1384 /* retain any CITEM_NULL setting */
1389 * Determine whether the lowest collected key meets clustering
1390 * requirements. Returns:
1392 * 0 - key valid, cluster can be returned.
1394 * ENOENT - normal end of scan, return ENOENT.
1396 * ESRCH - sufficient elements collected, quorum agreement
1397 * that lokey is not a valid element and should be
1400 * EDEADLK - sufficient elements collected, no quorum agreement
1401 * (and no agreement possible). In this situation a
1402 * repair is needed, for now we loop.
1404 * EINPROGRESS - insufficient elements collected to resolve, wait
1405 * for event and loop.
1407 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) &&
1408 xop->run_mask != HAMMER2_XOPMASK_VOP) {
1409 error = EINPROGRESS;
1411 error = hammer2_cluster_check(&xop->cluster, lokey, keynull);
1413 if (error == EINPROGRESS) {
1414 if (xop->check_counter == check_counter) {
1415 if (flags & HAMMER2_XOP_COLLECT_NOWAIT)
1417 tsleep_interlock(&xop->check_counter, 0);
1419 if (xop->check_counter == check_counter) {
1420 tsleep(&xop->check_counter, PINTERLOCKED,
1426 if (error == ESRCH) {
1427 if (lokey != HAMMER2_KEY_MAX) {
1428 xop->collect_key = lokey + 1;
1433 if (error == EDEADLK) {
1434 kprintf("hammer2: no quorum possible lokey %016jx\n",
1436 if (lokey != HAMMER2_KEY_MAX) {
1437 xop->collect_key = lokey + 1;
1442 if (lokey == HAMMER2_KEY_MAX)
1443 xop->collect_key = lokey;
1445 xop->collect_key = lokey + 1;
1451 * Primary management thread for xops support. Each node has several such
1452 * threads which replicate front-end operations on cluster nodes.
1454 * XOPS thread node operations, allowing the function to focus on a single
1455 * node in the cluster after validating the operation with the cluster.
1456 * This is primarily what prevents dead or stalled nodes from stalling
1460 hammer2_primary_xops_thread(void *arg)
1462 hammer2_thread_t *thr = arg;
1464 hammer2_xop_head_t *xop;
1465 hammer2_xop_group_t *xgrp;
1469 xgrp = &pmp->xop_groups[thr->repidx];
1470 mask = 1U << thr->clindex;
1472 lockmgr(&thr->lk, LK_EXCLUSIVE);
1473 while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
1475 * Handle freeze request
1477 if (thr->flags & HAMMER2_THREAD_FREEZE) {
1478 atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
1479 atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
1483 * Force idle if frozen until unfrozen or stopped.
1485 if (thr->flags & HAMMER2_THREAD_FROZEN) {
1486 lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
1491 * Reset state on REMASTER request
1493 if (thr->flags & HAMMER2_THREAD_REMASTER) {
1494 atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
1499 * Process requests. Each request can be multi-queued.
1501 * If we get behind and the frontend VOP is no longer active,
1502 * we retire the request without processing it. The callback
1503 * may also abort processing if the frontend VOP becomes
1506 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1507 TAILQ_REMOVE(&thr->xopq, xop,
1508 collect[thr->clindex].entry);
1509 if (hammer2_xop_active(xop)) {
1510 lockmgr(&thr->lk, LK_RELEASE);
1511 xop->func((hammer2_xop_t *)xop, thr->clindex);
1512 hammer2_xop_retire(xop, mask);
1513 lockmgr(&thr->lk, LK_EXCLUSIVE);
1515 hammer2_xop_feed(xop, NULL, thr->clindex,
1517 hammer2_xop_retire(xop, mask);
1524 lksleep(&thr->flags, &thr->lk, 0, "h2idle", 0);
1528 * Cleanup / termination
1530 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1531 kprintf("hammer2_thread: aborting xop %p\n", xop->func);
1532 TAILQ_REMOVE(&thr->xopq, xop,
1533 collect[thr->clindex].entry);
1534 hammer2_xop_retire(xop, mask);
1539 lockmgr(&thr->lk, LK_RELEASE);
1540 /* thr structure can go invalid after this point */