hammer2 - Refactor frontend part 9/many
[dragonfly.git] / sys / vfs / hammer2 / hammer2_thread.c
1 /*
2  * Copyright (c) 2015 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module implements various PFS-based helper threads.
36  */
37 #include "hammer2.h"
38
39 #define HAMMER2_THREAD_DEBUG 1
40
41 static int hammer2_sync_slaves(hammer2_thread_t *thr,
42                         hammer2_cluster_t *cparent, int *errors);
43 static void hammer2_update_pfs_status(hammer2_thread_t *thr,
44                         hammer2_cluster_t *cparent);
45 static int hammer2_sync_insert(hammer2_thread_t *thr,
46                         hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
47                         hammer2_tid_t modify_tid,
48                         int i, int *errors);
49 static int hammer2_sync_destroy(hammer2_thread_t *thr,
50                         hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
51                         int i, int *errors);
52 static int hammer2_sync_replace(hammer2_thread_t *thr,
53                         hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
54                         hammer2_tid_t modify_tid,
55                         int i, int *errors);
56
57 /****************************************************************************
58  *                          HAMMER2 THREAD API                              *
59  ****************************************************************************/
60 /*
61  * Initialize the suspplied thread structure, starting the specified
62  * thread.
63  */
64 void
65 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp,
66                    const char *id, int clindex, int repidx,
67                    void (*func)(void *arg))
68 {
69         lockinit(&thr->lk, "h2thr", 0, 0);
70         thr->pmp = pmp;
71         thr->clindex = clindex;
72         thr->repidx = repidx;
73         TAILQ_INIT(&thr->xopq);
74         if (repidx >= 0) {
75                 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
76                             "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx);
77         } else {
78                 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
79                             "%s-%s", id, pmp->pfs_names[clindex]);
80         }
81 }
82
83 /*
84  * Terminate a thread.  This function will silently return if the thread
85  * was never initialized or has already been deleted.
86  *
87  * This is accomplished by setting the STOP flag and waiting for the td
88  * structure to become NULL.
89  */
90 void
91 hammer2_thr_delete(hammer2_thread_t *thr)
92 {
93         if (thr->td == NULL)
94                 return;
95         lockmgr(&thr->lk, LK_EXCLUSIVE);
96         atomic_set_int(&thr->flags, HAMMER2_THREAD_STOP);
97         wakeup(&thr->flags);
98         while (thr->td) {
99                 lksleep(thr, &thr->lk, 0, "h2thr", hz);
100         }
101         lockmgr(&thr->lk, LK_RELEASE);
102         thr->pmp = NULL;
103         lockuninit(&thr->lk);
104 }
105
106 /*
107  * Asynchronous remaster request.  Ask the synchronization thread to
108  * start over soon (as if it were frozen and unfrozen, but without waiting).
109  * The thread always recalculates mastership relationships when restarting.
110  */
111 void
112 hammer2_thr_remaster(hammer2_thread_t *thr)
113 {
114         if (thr->td == NULL)
115                 return;
116         lockmgr(&thr->lk, LK_EXCLUSIVE);
117         atomic_set_int(&thr->flags, HAMMER2_THREAD_REMASTER);
118         wakeup(&thr->flags);
119         lockmgr(&thr->lk, LK_RELEASE);
120 }
121
122 void
123 hammer2_thr_freeze_async(hammer2_thread_t *thr)
124 {
125         atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
126         wakeup(&thr->flags);
127 }
128
129 void
130 hammer2_thr_freeze(hammer2_thread_t *thr)
131 {
132         if (thr->td == NULL)
133                 return;
134         lockmgr(&thr->lk, LK_EXCLUSIVE);
135         atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
136         wakeup(&thr->flags);
137         while ((thr->flags & HAMMER2_THREAD_FROZEN) == 0) {
138                 lksleep(thr, &thr->lk, 0, "h2frz", hz);
139         }
140         lockmgr(&thr->lk, LK_RELEASE);
141 }
142
143 void
144 hammer2_thr_unfreeze(hammer2_thread_t *thr)
145 {
146         if (thr->td == NULL)
147                 return;
148         lockmgr(&thr->lk, LK_EXCLUSIVE);
149         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FROZEN);
150         wakeup(&thr->flags);
151         lockmgr(&thr->lk, LK_RELEASE);
152 }
153
154 /****************************************************************************
155  *                          HAMMER2 SYNC THREADS                            *
156  ****************************************************************************/
157 /*
158  * Primary management thread for an element of a node.  A thread will exist
159  * for each element requiring management.
160  *
161  * No management threads are needed for the SPMP or for any PMP with only
162  * a single MASTER.
163  *
164  * On the SPMP - handles bulkfree and dedup operations
165  * On a PFS    - handles remastering and synchronization
166  */
167 void
168 hammer2_primary_sync_thread(void *arg)
169 {
170         hammer2_thread_t *thr = arg;
171         hammer2_cluster_t *cparent;
172         hammer2_chain_t *chain;
173         hammer2_pfs_t *pmp;
174         int errors[HAMMER2_MAXCLUSTER];
175         int error;
176
177         pmp = thr->pmp;
178
179         lockmgr(&thr->lk, LK_EXCLUSIVE);
180         while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
181                 /*
182                  * Handle freeze request
183                  */
184                 if (thr->flags & HAMMER2_THREAD_FREEZE) {
185                         atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
186                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
187                 }
188
189                 /*
190                  * Force idle if frozen until unfrozen or stopped.
191                  */
192                 if (thr->flags & HAMMER2_THREAD_FROZEN) {
193                         lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
194                         continue;
195                 }
196
197                 /*
198                  * Reset state on REMASTER request
199                  */
200                 if (thr->flags & HAMMER2_THREAD_REMASTER) {
201                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
202                         /* reset state */
203                 }
204
205                 /*
206                  * Synchronization scan.
207                  */
208                 hammer2_trans_init(&thr->trans, pmp, HAMMER2_TRANS_KEEPMODIFY);
209                 hammer2_inode_lock(pmp->iroot, HAMMER2_RESOLVE_ALWAYS);
210                 cparent = hammer2_inode_cluster(pmp->iroot,
211                                                 HAMMER2_RESOLVE_ALWAYS);
212                 hammer2_update_pfs_status(thr, cparent);
213                 hammer2_inode_unlock(pmp->iroot, NULL);
214                 bzero(errors, sizeof(errors));
215                 kprintf("sync_slaves clindex %d\n", thr->clindex);
216
217                 /*
218                  * We are the syncer, not a normal frontend operator,
219                  * so force cparent good to prime the scan.
220                  */
221                 hammer2_cluster_forcegood(cparent);
222                 error = hammer2_sync_slaves(thr, cparent, errors);
223                 if (error)
224                         kprintf("hammer2_sync_slaves: error %d\n", error);
225                 chain = cparent->array[thr->clindex].chain;
226
227                 /*
228                  * Retain chain for our node and release the cluster.
229                  */
230                 hammer2_chain_ref(chain);
231                 hammer2_chain_lock(chain, HAMMER2_RESOLVE_ALWAYS);
232                 hammer2_cluster_unlock(cparent);
233                 hammer2_cluster_drop(cparent);
234
235                 /*
236                  * Flush the chain.
237                  */
238                 hammer2_flush(&thr->trans, chain, 1);
239                 hammer2_chain_unlock(chain);
240                 hammer2_chain_drop(chain);
241
242                 hammer2_trans_done(&thr->trans);
243
244                 /*
245                  * Wait for event, or 5-second poll.
246                  */
247                 lksleep(&thr->flags, &thr->lk, 0, "h2idle", hz * 5);
248         }
249         thr->td = NULL;
250         wakeup(thr);
251         lockmgr(&thr->lk, LK_RELEASE);
252         /* thr structure can go invalid after this point */
253 }
254
255 /*
256  * Given a locked cluster created from pmp->iroot, update the PFS's
257  * reporting status.
258  */
259 static
260 void
261 hammer2_update_pfs_status(hammer2_thread_t *thr, hammer2_cluster_t *cparent)
262 {
263         hammer2_pfs_t *pmp = thr->pmp;
264         uint32_t flags;
265
266         flags = cparent->flags & HAMMER2_CLUSTER_ZFLAGS;
267         if (pmp->flags == flags)
268                 return;
269         pmp->flags = flags;
270
271         kprintf("pfs %p", pmp);
272         if (flags & HAMMER2_CLUSTER_MSYNCED)
273                 kprintf(" masters-all-good");
274         if (flags & HAMMER2_CLUSTER_SSYNCED)
275                 kprintf(" slaves-all-good");
276
277         if (flags & HAMMER2_CLUSTER_WRHARD)
278                 kprintf(" quorum/rw");
279         else if (flags & HAMMER2_CLUSTER_RDHARD)
280                 kprintf(" quorum/ro");
281
282         if (flags & HAMMER2_CLUSTER_UNHARD)
283                 kprintf(" out-of-sync-masters");
284         else if (flags & HAMMER2_CLUSTER_NOHARD)
285                 kprintf(" no-masters-visible");
286
287         if (flags & HAMMER2_CLUSTER_WRSOFT)
288                 kprintf(" soft/rw");
289         else if (flags & HAMMER2_CLUSTER_RDSOFT)
290                 kprintf(" soft/ro");
291
292         if (flags & HAMMER2_CLUSTER_UNSOFT)
293                 kprintf(" out-of-sync-slaves");
294         else if (flags & HAMMER2_CLUSTER_NOSOFT)
295                 kprintf(" no-slaves-visible");
296         kprintf("\n");
297 }
298
299 static
300 void
301 dumpcluster(const char *label,
302             hammer2_cluster_t *cparent, hammer2_cluster_t *cluster)
303 {
304         hammer2_chain_t *chain;
305         int i;
306
307         if ((hammer2_debug & 1) == 0)
308                 return;
309
310         kprintf("%s\t", label);
311         KKASSERT(cparent->nchains == cluster->nchains);
312         for (i = 0; i < cparent->nchains; ++i) {
313                 if (i)
314                         kprintf("\t");
315                 kprintf("%d ", i);
316                 if ((chain = cparent->array[i].chain) != NULL) {
317                         kprintf("%016jx%s ",
318                                 chain->bref.key,
319                                 ((cparent->array[i].flags &
320                                   HAMMER2_CITEM_INVALID) ? "(I)" : "   ")
321                         );
322                 } else {
323                         kprintf("      NULL      %s ", "   ");
324                 }
325                 if ((chain = cluster->array[i].chain) != NULL) {
326                         kprintf("%016jx%s ",
327                                 chain->bref.key,
328                                 ((cluster->array[i].flags &
329                                   HAMMER2_CITEM_INVALID) ? "(I)" : "   ")
330                         );
331                 } else {
332                         kprintf("      NULL      %s ", "   ");
333                 }
334                 kprintf("\n");
335         }
336 }
337
338 /*
339  * TODO - have cparent use a shared lock normally instead of exclusive,
340  *        (needs to be upgraded for slave adjustments).
341  */
342 static
343 int
344 hammer2_sync_slaves(hammer2_thread_t *thr, hammer2_cluster_t *cparent,
345                     int *errors)
346 {
347         hammer2_pfs_t *pmp;
348         hammer2_cluster_t *cluster;
349         hammer2_cluster_t *scluster;
350         hammer2_chain_t *focus;
351         hammer2_chain_t *chain;
352         hammer2_key_t key_next;
353         int error;
354         int nerror;
355         int idx;
356         int n;
357         int nowork;
358         int dorecursion;
359
360         pmp = thr->pmp;
361         idx = thr->clindex;     /* cluster node we are responsible for */
362
363         /*
364          * Nothing to do if all slaves are synchronized.
365          * Nothing to do if cluster not authoritatively readable.
366          */
367         if (pmp->flags & HAMMER2_CLUSTER_SSYNCED)
368                 return(0);
369         if ((pmp->flags & HAMMER2_CLUSTER_RDHARD) == 0)
370                 return(HAMMER2_ERROR_INCOMPLETE);
371
372         error = 0;
373
374         /*
375          * XXX snapshot the source to provide a stable source to copy.
376          */
377
378         /*
379          * Update all local slaves (remote slaves are handled by the sync
380          * threads on their respective hosts).
381          *
382          * Do a full topology scan, insert/delete elements on slaves as
383          * needed.  cparent must be ref'd so we can unlock and relock it
384          * on the recursion.
385          *
386          * ALLNODES - Allows clusters with a NULL focus to be returned if
387          *            elements remain on other nodes.
388          */
389         hammer2_cluster_ref(cparent);
390         cluster = hammer2_cluster_lookup(cparent, &key_next,
391                                          HAMMER2_KEY_MIN, HAMMER2_KEY_MAX,
392                                          HAMMER2_LOOKUP_NODATA |
393                                          HAMMER2_LOOKUP_NOLOCK |
394                                          HAMMER2_LOOKUP_NODIRECT |
395                                          HAMMER2_LOOKUP_ALLNODES);
396         dumpcluster("lookup", cparent, cluster);
397
398         /*
399          * Scan elements
400          */
401         while (cluster) {
402                 /*
403                  * nowork is adjusted during the loop,
404                  * dorecursion is calculated here.
405                  */
406                 nowork = 1;
407                 focus = cluster->focus;
408                 if (focus && focus->bref.type == HAMMER2_BREF_TYPE_INODE)
409                         dorecursion = 1;
410                 else
411                         dorecursion = 0;
412
413                 if (idx == 3 && (hammer2_debug & 1) && focus)
414                         kprintf("scan3 focus %d.%016jx %d.%016jx\n",
415                             (cparent ? cparent->focus->bref.type : 0xFF),
416                             (cparent ? cparent->focus->bref.key : (uintmax_t)-1LLU),
417                             focus->bref.type, focus->bref.key);
418 repeat1:
419                 /*
420                  * Synchronize chains to focus
421                  */
422                 if (idx >= cluster->nchains)
423                         goto skip1;
424                 chain = cluster->array[idx].chain;
425                 if (idx == 3 && (hammer2_debug & 1) && chain)
426                         kprintf("scan3 slave %d.%016jx %d.%016jx\n",
427                             ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.type : 0xFF),
428                             ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.key : (uintmax_t)-1LLU),
429                             cluster->array[idx].chain->bref.type,
430                             cluster->array[idx].chain->bref.key);
431                 if (idx == 3 && (hammer2_debug & 1) && chain == NULL)
432                         kprintf("scan3 slave %d.%16jx NULL\n",
433                             ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.type : 0xFF),
434                             ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.key : (uintmax_t)-1LLU)
435                         );
436
437                 /*
438                  * Disable recursion for this index and loop up
439                  * if a chain error is detected.
440                  *
441                  * A NULL chain is ok, it simply indicates that
442                  * the slave reached the end of its scan, but we
443                  * might have stuff from the master that still
444                  * needs to be copied in.
445                  */
446                 if (chain && chain->error) {
447                         kprintf("chain error index %d: %d\n",
448                                 idx, chain->error);
449                         errors[idx] = chain->error;
450                         error = chain->error;
451                         cluster->array[idx].flags |= HAMMER2_CITEM_INVALID;
452                         goto skip1;
453                 }
454
455                 /*
456                  * Skip if the slave already has the record (everything
457                  * matches including the modify_tid).  Note that the
458                  * mirror_tid does not have to match, mirror_tid is
459                  * a per-block-device entity.
460                  */
461                 if (chain &&
462                     (cluster->array[idx].flags & HAMMER2_CITEM_INVALID) == 0) {
463                         goto skip1;
464                 }
465
466                 /*
467                  * Invalid element needs to be updated.
468                  */
469                 nowork = 0;
470
471                 /*
472                  * Otherwise adjust the slave.  Compare the focus to
473                  * the chain.  Note that focus and chain can
474                  * independently be NULL.
475                  */
476                 KKASSERT(cluster->focus == focus);
477                 if (focus) {
478                         if (chain)
479                                 n = hammer2_chain_cmp(focus, chain);
480                         else
481                                 n = -1; /* end-of-scan on slave */
482                 } else {
483                         if (chain)
484                                 n = 1;  /* end-of-scan on focus */
485                         else
486                                 n = 0;  /* end-of-scan on both */
487                 }
488
489                 if (n < 0) {
490                         /*
491                          * slave chain missing, create missing chain.
492                          *
493                          * If we are going to recurse we have to set
494                          * the initial modify_tid to 0 until the
495                          * sub-tree is completely synchronized.
496                          * Setting (n = 0) in this situation forces
497                          * the replacement call to run on the way
498                          * back up after the sub-tree has
499                          * synchronized.
500                          */
501                         if (dorecursion) {
502                                 nerror = hammer2_sync_insert(
503                                                 thr, cparent, cluster,
504                                                 0,
505                                                 idx, errors);
506                                 if (nerror == 0)
507                                         n = 0;
508                         } else {
509                                 nerror = hammer2_sync_insert(
510                                                 thr, cparent, cluster,
511                                                 focus->bref.modify_tid,
512                                                 idx, errors);
513                         }
514                 } else if (n > 0) {
515                         /*
516                          * excess slave chain, destroy
517                          */
518                         nerror = hammer2_sync_destroy(thr,
519                                                       cparent, cluster,
520                                                       idx, errors);
521                         hammer2_cluster_next_single_chain(
522                                 cparent, cluster,
523                                 &key_next,
524                                 HAMMER2_KEY_MIN,
525                                 HAMMER2_KEY_MAX,
526                                 idx,
527                                 HAMMER2_LOOKUP_NODATA |
528                                 HAMMER2_LOOKUP_NOLOCK |
529                                 HAMMER2_LOOKUP_NODIRECT |
530                                 HAMMER2_LOOKUP_ALLNODES);
531                         /*
532                          * Re-execute same index, there might be more
533                          * items to delete before this slave catches
534                          * up to the focus.
535                          */
536                         goto repeat1;
537                 } else {
538                         /*
539                          * Key matched but INVALID was set which likely
540                          * means that modify_tid is out of sync.
541                          *
542                          * If we are going to recurse we have to do
543                          * a partial replacement of the parent to
544                          * ensure that the block array is compatible.
545                          * For example, the current slave inode might
546                          * be flagged DIRECTDATA when the focus is not.
547                          * We must set modify_tid to 0 for now and
548                          * will fix it when recursion is complete.
549                          *
550                          * If we are not going to recurse we can do
551                          * a normal replacement.
552                          *
553                          * focus && chain can both be NULL on a match.
554                          */
555                         if (dorecursion) {
556                                 nerror = hammer2_sync_replace(
557                                                 thr, cparent, cluster,
558                                                 0,
559                                                 idx, errors);
560                         } else if (focus) {
561                                 nerror = hammer2_sync_replace(
562                                                 thr, cparent, cluster,
563                                                 focus->bref.modify_tid,
564                                                 idx, errors);
565                         } else {
566                                 nerror = 0;
567                         }
568                 }
569                 if (nerror)
570                         error = nerror;
571                 /* finished primary synchronization of chains */
572
573 skip1:
574 #if 0
575                 /*
576                  * Operation may have modified cparent, we must replace
577                  * iroot->cluster if we are at the top level.
578                  */
579                 if (thr->depth == 0)
580                         hammer2_inode_repoint_one(pmp->iroot, cparent, idx);
581 #endif
582                 KKASSERT(cluster->focus == focus);
583
584                 /*
585                  * If no work to do this iteration, skip any recursion.
586                  */
587                 if (nowork)
588                         goto skip2;
589
590                 /*
591                  * EXECUTE RECURSION (skip if no recursion)
592                  *
593                  * Indirect blocks are absorbed by the iteration so we only
594                  * have to recurse on inodes.
595                  *
596                  * Do not resolve scluster, it represents the iteration
597                  * parent and while it is logically in-sync the physical
598                  * elements might not match due to the presence of indirect
599                  * blocks and such.
600                  */
601                 if (dorecursion == 0)
602                         goto skip2;
603                 if (thr->depth > 20) {
604                         kprintf("depth limit reached\n");
605                         nerror = HAMMER2_ERROR_DEPTH;
606                 } else {
607                         hammer2_cluster_unlock(cparent);
608                         scluster = hammer2_cluster_copy(cluster);
609                         hammer2_cluster_lock(scluster, HAMMER2_RESOLVE_ALWAYS);
610                         ++thr->depth;
611                         nerror = hammer2_sync_slaves(thr, scluster, errors);
612                         --thr->depth;
613                         hammer2_cluster_unlock(scluster);
614                         hammer2_cluster_drop(scluster);
615                         /* XXX modify_tid on scluster */
616                         /* flush needs to not update modify_tid */
617                         hammer2_cluster_lock(cparent, HAMMER2_RESOLVE_ALWAYS);
618                 }
619                 if (nerror)
620                         goto skip2;
621
622                 /*
623                  * Fixup parent nodes on the way back up from the recursion
624                  * if no error occurred.  The modify_tid for these nodes
625                  * would have been set to 0 and must be set to their final
626                  * value.
627                  */
628                 chain = cluster->array[idx].chain;
629                 if (chain == NULL || chain->error)
630                         goto skip2;
631                 /*
632                  * should not be set but must fixup parents.
633                 if ((cluster->array[idx].flags & HAMMER2_CITEM_INVALID) == 0)
634                         goto skip2;
635                 */
636
637                 /*
638                  * At this point we have to have key-matched non-NULL
639                  * elements.
640                  */
641                 n = hammer2_chain_cmp(focus, chain);
642                 if (n != 0) {
643                         kprintf("hammer2_sync_slaves: illegal "
644                                 "post-recursion state %d\n", n);
645                         goto skip2;
646                 }
647
648                 /*
649                  * Update modify_tid on the way back up.
650                  */
651                 nerror = hammer2_sync_replace(
652                                 thr, cparent, cluster,
653                                 focus->bref.modify_tid,
654                                 idx, errors);
655                 if (nerror)
656                         error = nerror;
657
658 #if 0
659                 /*
660                  * Operation may modify cparent, must replace
661                  * iroot->cluster if we are at the top level.
662                  */
663                 if (thr->depth == 0)
664                         hammer2_inode_repoint_one(pmp->iroot, cparent, idx);
665 #endif
666
667 skip2:
668                 /*
669                  * Iterate.
670                  */
671                 dumpcluster("adjust", cparent, cluster);
672                 cluster = hammer2_cluster_next(cparent, cluster,
673                                                &key_next,
674                                                HAMMER2_KEY_MIN,
675                                                HAMMER2_KEY_MAX,
676                                                HAMMER2_LOOKUP_NODATA |
677                                                HAMMER2_LOOKUP_NOLOCK |
678                                                HAMMER2_LOOKUP_NODIRECT |
679                                                HAMMER2_LOOKUP_ALLNODES);
680                 dumpcluster("nextcl", cparent, cluster);
681         }
682         hammer2_cluster_drop(cparent);
683         if (cluster)
684                 hammer2_cluster_drop(cluster);
685
686         return error;
687 }
688
689 /*
690  * cparent is locked exclusively, with an extra ref, cluster is not locked.
691  */
692 static
693 int
694 hammer2_sync_insert(hammer2_thread_t *thr,
695                     hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
696                     hammer2_tid_t modify_tid, int i, int *errors)
697 {
698         hammer2_chain_t *focus;
699         hammer2_chain_t *chain;
700         hammer2_key_t dummy;
701
702         focus = cluster->focus;
703 #if HAMMER2_THREAD_DEBUG
704         if (hammer2_debug & 1)
705         kprintf("insert rec par=%p/%d.%016jx slave %d %d.%016jx mod=%016jx\n",
706                 cparent->array[i].chain, 
707                 cparent->array[i].chain->bref.type,
708                 cparent->array[i].chain->bref.key,
709                 i, focus->bref.type, focus->bref.key, modify_tid);
710 #endif
711
712         /*
713          * We have to do a lookup to position ourselves at the correct
714          * parent when inserting a record into a new slave because the
715          * cluster iteration for this slave might not be pointing to the
716          * right place.  Our expectation is that the record will not be
717          * found.
718          */
719         hammer2_cluster_unlock_except(cparent, i);
720         chain = hammer2_chain_lookup(&cparent->array[i].chain, &dummy,
721                                      focus->bref.key, focus->bref.key,
722                                      &cparent->array[i].cache_index,
723                                      HAMMER2_LOOKUP_NODIRECT);
724         if (cparent->focus_index == i)
725                 cparent->focus = cparent->array[i].chain;
726         KKASSERT(chain == NULL);
727
728         /*
729          * Create the missing chain.
730          *
731          * Have to be careful to avoid deadlocks.
732          */
733         chain = NULL;
734         if (cluster->focus_index < i)
735                 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
736         hammer2_chain_create(&thr->trans, &cparent->array[i].chain,
737                              &chain, thr->pmp,
738                              focus->bref.key, focus->bref.keybits,
739                              focus->bref.type, focus->bytes,
740                              0);
741         if (cluster->focus_index > i)
742                 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
743         if (cparent->focus_index == i)
744                 cparent->focus = cparent->array[i].chain;
745         hammer2_chain_modify(&thr->trans, chain, 0);
746
747         /*
748          * Copy focus to new chain
749          */
750
751         /* type already set */
752         chain->bref.methods = focus->bref.methods;
753         /* keybits already set */
754         chain->bref.vradix = focus->bref.vradix;
755         /* mirror_tid set by flush */
756         chain->bref.modify_tid = modify_tid;
757         chain->bref.flags = focus->bref.flags;
758         /* key already present */
759         /* check code will be recalculated */
760
761         /*
762          * Copy data body.
763          */
764         switch(chain->bref.type) {
765         case HAMMER2_BREF_TYPE_INODE:
766                 if ((focus->data->ipdata.meta.op_flags &
767                      HAMMER2_OPFLAG_DIRECTDATA) == 0) {
768                         bcopy(focus->data, chain->data,
769                               offsetof(hammer2_inode_data_t, u));
770                         break;
771                 }
772                 /* fall through */
773         case HAMMER2_BREF_TYPE_DATA:
774                 bcopy(focus->data, chain->data, chain->bytes);
775                 hammer2_chain_setcheck(chain, chain->data);
776                 break;
777         default:
778                 KKASSERT(0);
779                 break;
780         }
781
782         hammer2_chain_unlock(focus);
783         hammer2_chain_unlock(chain);            /* unlock, leave ref */
784
785         /*
786          * Avoid ordering deadlock when relocking cparent.
787          */
788         if (i == 0) {
789                 hammer2_cluster_lock_except(cparent, i, HAMMER2_RESOLVE_ALWAYS);
790         } else {
791                 hammer2_chain_unlock(cparent->array[i].chain);
792                 hammer2_cluster_lock(cparent, HAMMER2_RESOLVE_ALWAYS);
793         }
794
795         /*
796          * Enter item into (unlocked) cluster.
797          *
798          * Must clear invalid for iteration to work properly.
799          */
800         if (cluster->array[i].chain)
801                 hammer2_chain_drop(cluster->array[i].chain);
802         cluster->array[i].chain = chain;
803         cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
804
805         return 0;
806 }
807
808 /*
809  * cparent is locked exclusively, with an extra ref, cluster is not locked.
810  */
811 static
812 int
813 hammer2_sync_destroy(hammer2_thread_t *thr,
814                      hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
815                      int i, int *errors)
816 {
817         hammer2_chain_t *chain;
818
819         chain = cluster->array[i].chain;
820 #if HAMMER2_THREAD_DEBUG
821         if (hammer2_debug & 1)
822         kprintf("destroy rec %p/%p slave %d %d.%016jx\n",
823                 cparent, cluster,
824                 i, chain->bref.type, chain->bref.key);
825 #endif
826         /*
827          * Try to avoid unnecessary I/O.
828          *
829          * XXX accounting not propagated up properly.  We might have to do
830          *     a RESOLVE_MAYBE here and pass 0 for the flags.
831          */
832         hammer2_chain_lock(chain, HAMMER2_RESOLVE_NEVER);
833         hammer2_chain_delete(&thr->trans, cparent->array[i].chain, chain,
834                              HAMMER2_DELETE_NOSTATS |
835                              HAMMER2_DELETE_PERMANENT);
836         hammer2_chain_unlock(chain);
837
838         /*
839          * The element is not valid in that it doesn't match the other
840          * elements, but we have to mark it valid here to allow the
841          * cluster_next() call to advance this index to the next element.
842          */
843         cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
844
845         return 0;
846 }
847
848 /*
849  * cparent is locked exclusively, with an extra ref, cluster is not locked.
850  * Replace element [i] in the cluster.
851  */
852 static
853 int
854 hammer2_sync_replace(hammer2_thread_t *thr,
855                      hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
856                      hammer2_tid_t modify_tid, int i, int *errors)
857 {
858         hammer2_chain_t *focus;
859         hammer2_chain_t *chain;
860         int nradix;
861         uint8_t otype;
862
863         focus = cluster->focus;
864         chain = cluster->array[i].chain;
865 #if HAMMER2_THREAD_DEBUG
866         if (hammer2_debug & 1)
867         kprintf("replace rec %p/%p slave %d %d.%016jx mod=%016jx\n",
868                 cparent, cluster,
869                 i, focus->bref.type, focus->bref.key, modify_tid);
870 #endif
871         if (cluster->focus_index < i)
872                 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
873         hammer2_chain_lock(chain, HAMMER2_RESOLVE_ALWAYS);
874         if (cluster->focus_index >= i)
875                 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
876         if (chain->bytes != focus->bytes) {
877                 /* XXX what if compressed? */
878                 nradix = hammer2_getradix(chain->bytes);
879                 hammer2_chain_resize(&thr->trans, NULL,
880                                      cparent->array[i].chain, chain,
881                                      nradix, 0);
882         }
883         hammer2_chain_modify(&thr->trans, chain, 0);
884         otype = chain->bref.type;
885         chain->bref.type = focus->bref.type;
886         chain->bref.methods = focus->bref.methods;
887         chain->bref.keybits = focus->bref.keybits;
888         chain->bref.vradix = focus->bref.vradix;
889         /* mirror_tid updated by flush */
890         chain->bref.modify_tid = modify_tid;
891         chain->bref.flags = focus->bref.flags;
892         /* key already present */
893         /* check code will be recalculated */
894         chain->error = 0;
895
896         /*
897          * Copy data body.
898          */
899         switch(chain->bref.type) {
900         case HAMMER2_BREF_TYPE_INODE:
901                 if ((focus->data->ipdata.meta.op_flags &
902                      HAMMER2_OPFLAG_DIRECTDATA) == 0) {
903                         /*
904                          * If DIRECTDATA is transitioning to 0 or the old
905                          * chain is not an inode we have to initialize
906                          * the block table.
907                          */
908                         if (otype != HAMMER2_BREF_TYPE_INODE ||
909                             (chain->data->ipdata.meta.op_flags &
910                              HAMMER2_OPFLAG_DIRECTDATA)) {
911                                 kprintf("chain inode trans away from dd\n");
912                                 bzero(&chain->data->ipdata.u,
913                                       sizeof(chain->data->ipdata.u));
914                         }
915                         bcopy(focus->data, chain->data,
916                               offsetof(hammer2_inode_data_t, u));
917                         /* XXX setcheck on inode should not be needed */
918                         hammer2_chain_setcheck(chain, chain->data);
919                         break;
920                 }
921                 /* fall through */
922         case HAMMER2_BREF_TYPE_DATA:
923                 bcopy(focus->data, chain->data, chain->bytes);
924                 hammer2_chain_setcheck(chain, chain->data);
925                 break;
926         default:
927                 KKASSERT(0);
928                 break;
929         }
930
931         hammer2_chain_unlock(focus);
932         hammer2_chain_unlock(chain);
933
934         /*
935          * Must clear invalid for iteration to work properly.
936          */
937         cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
938
939         return 0;
940 }
941
942 /****************************************************************************
943  *                          HAMMER2 XOPS THREADS                            *
944  ****************************************************************************/
945
946 void
947 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp)
948 {
949         hammer2_mtx_init(&xgrp->mtx, "h2xopq");
950 }
951
952 /*
953  * Allocate a XOP request.
954  *
955  * Once allocated a XOP request can be started, collected, and retired,
956  * and can be retired early if desired.
957  *
958  * NOTE: Fifo indices might not be zero but ri == wi on objcache_get().
959  */
960 hammer2_xop_t *
961 hammer2_xop_alloc(hammer2_inode_t *ip, hammer2_xop_func_t func)
962 {
963         hammer2_xop_t *xop;
964
965         xop = objcache_get(cache_xops, M_WAITOK);
966         xop->head.ip = ip;
967         xop->head.func = func;
968         xop->head.state = 0;
969         xop->head.error = 0;
970         xop->head.lkey = 0;
971         xop->head.nkey = 0;
972
973         xop->head.cluster.nchains = ip->cluster.nchains;
974         xop->head.cluster.pmp = ip->pmp;
975         xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED;
976
977         /*
978          * run_mask - Active thread (or frontend) associated with XOP
979          */
980         xop->head.run_mask = HAMMER2_XOPMASK_VOP;
981
982         hammer2_inode_ref(ip);
983
984         return xop;
985 }
986
987 /*
988  * A mounted PFS needs Xops threads to support frontend operations.
989  */
990 void
991 hammer2_xop_helper_create(hammer2_pfs_t *pmp)
992 {
993         int i;
994         int j;
995
996         kprintf("XOP_HELPER_CREATE: %d\n",  pmp->pfs_nmasters);
997         for (i = 0; i < pmp->pfs_nmasters; ++i) {
998                 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
999                         if (pmp->xop_groups[j].thrs[i].td)
1000                                 continue;
1001                         hammer2_thr_create(&pmp->xop_groups[j].thrs[i], pmp,
1002                                            "h2xop", i, j,
1003                                            hammer2_primary_xops_thread);
1004                 }
1005         }
1006 }
1007
1008 void
1009 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp)
1010 {
1011         int i;
1012         int j;
1013
1014         for (i = 0; i < pmp->pfs_nmasters; ++i) {
1015                 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
1016                         if (pmp->xop_groups[j].thrs[i].td)
1017                                 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]);
1018                 }
1019         }
1020 }
1021
1022
1023
1024
1025 /*
1026  * Start a XOP request, queueing it to all nodes in the cluster to
1027  * execute the cluster op.
1028  *
1029  * XXX optimize single-target case.
1030  */
1031 void
1032 hammer2_xop_start(hammer2_xop_head_t *xop)
1033 {
1034         hammer2_xop_group_t *xgrp;
1035         hammer2_thread_t *thr;
1036         hammer2_pfs_t *pmp;
1037         int g;
1038         int i;
1039
1040         pmp = xop->ip->pmp;
1041
1042         g = pmp->xop_iterator++;
1043         g = g & HAMMER2_XOPGROUPS_MASK;
1044         xgrp = &pmp->xop_groups[g];
1045         xop->xgrp = xgrp;
1046
1047         for (i = 0; i < xop->ip->cluster.nchains; ++i) {
1048                 thr = &xgrp->thrs[i];
1049                 if (thr->td) {
1050                         lockmgr(&thr->lk, LK_EXCLUSIVE);
1051                         if (thr->td &&
1052                             (thr->flags & HAMMER2_THREAD_STOP) == 0) {
1053                                 atomic_set_int(&xop->run_mask, 1U << i);
1054                                 TAILQ_INSERT_TAIL(&thr->xopq, xop,
1055                                                   collect[i].entry);
1056                         }
1057                         lockmgr(&thr->lk, LK_RELEASE);
1058                         wakeup(&thr->flags);
1059                 }
1060         }
1061 }
1062
1063 /*
1064  * Retire a XOP.  Used by both the VOP frontend and by the XOP backend.
1065  */
1066 void
1067 hammer2_xop_retire(hammer2_xop_head_t *xop, uint32_t mask)
1068 {
1069         hammer2_xop_group_t *xgrp;
1070         hammer2_chain_t *chain;
1071         int i;
1072
1073         xgrp = xop->xgrp;
1074
1075         /*
1076          * Remove the frontend or remove a backend feeder.  When removing
1077          * the frontend we must wakeup any backend feeders who are waiting
1078          * for FIFO space.
1079          *
1080          * XXX optimize wakeup.
1081          */
1082         KKASSERT(xop->run_mask & mask);
1083         if (atomic_fetchadd_int(&xop->run_mask, -mask) != mask) {
1084                 if (mask == HAMMER2_XOPMASK_VOP)
1085                         wakeup(xop);
1086                 return;
1087         }
1088
1089         /*
1090          * Cleanup the collection cluster.
1091          */
1092         for (i = 0; i < xop->cluster.nchains; ++i) {
1093                 xop->cluster.array[i].flags = 0;
1094                 chain = xop->cluster.array[i].chain;
1095                 if (chain) {
1096                         xop->cluster.array[i].chain = NULL;
1097                         hammer2_chain_unlock(chain);
1098                         hammer2_chain_drop(chain);
1099                 }
1100         }
1101
1102         /*
1103          * Cleanup the fifos, use check_counter to optimize the loop.
1104          */
1105         mask = xop->chk_mask;
1106         for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) {
1107                 hammer2_xop_fifo_t *fifo = &xop->collect[i];
1108                 while (fifo->ri != fifo->wi) {
1109                         chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1110                         if (chain) {
1111                                 hammer2_chain_unlock(chain);
1112                                 hammer2_chain_drop(chain);
1113                         }
1114                         ++fifo->ri;
1115                         if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1116                                 wakeup(xop);    /* XXX optimize */
1117                 }
1118                 mask &= ~(1U << i);
1119         }
1120
1121         /*
1122          * The inode is only held at this point, simply drop it.
1123          */
1124         if (xop->ip) {
1125                 hammer2_inode_drop(xop->ip);
1126                 xop->ip = NULL;
1127         }
1128
1129         objcache_put(cache_xops, xop);
1130 }
1131
1132 /*
1133  * (Backend) Returns non-zero if the frontend is still attached.
1134  */
1135 int
1136 hammer2_xop_active(hammer2_xop_head_t *xop)
1137 {
1138         if (xop->run_mask & HAMMER2_XOPMASK_VOP)
1139                 return 1;
1140         else
1141                 return 0;
1142 }
1143
1144 /*
1145  * (Backend) Feed chain data through the cluster validator and back to
1146  * the frontend.  Chains are fed from multiple nodes concurrently
1147  * and pipelined via per-node FIFOs in the XOP.
1148  *
1149  * No xop lock is needed because we are only manipulating fields under
1150  * our direct control.
1151  *
1152  * Returns 0 on success and a hammer error code if sync is permanently
1153  * lost.
1154  */
1155 int
1156 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain,
1157                  int clindex, int error)
1158 {
1159         hammer2_xop_fifo_t *fifo;
1160
1161         /*
1162          * Multi-threaded entry into the XOP collector.  We own the
1163          * fifo->wi for our clindex.
1164          */
1165         fifo = &xop->collect[clindex];
1166
1167         while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1168                 tsleep_interlock(xop, 0);
1169                 if (hammer2_xop_active(xop) == 0) {
1170                         error = EINTR;
1171                         goto done;
1172                 }
1173                 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1174                         tsleep(xop, PINTERLOCKED, "h2feed", hz*60);
1175                 }
1176         }
1177         if (chain)
1178                 hammer2_chain_ref(chain);
1179         fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error;
1180         fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain;
1181         cpu_sfence();
1182         ++fifo->wi;
1183         atomic_set_int(&xop->chk_mask, 1U << clindex);
1184         atomic_add_int(&xop->check_counter, 1);
1185         wakeup(&xop->check_counter);    /* XXX optimize */
1186         error = 0;
1187 done:
1188         return error;
1189 }
1190
1191 /*
1192  * (Frontend) collect a response from a running cluster op.
1193  *
1194  * Responses are fed from all appropriate nodes concurrently
1195  * and collected into a cohesive response >= nkey.  lkey is
1196  * then set to nkey and nkey is advanced prior to return.
1197  * The caller may depend on xop->lkey reflecting the current
1198  * key of the returned response.
1199  *
1200  * The collector will return the instant quorum or other requirements
1201  * are met, even if some nodes get behind or become non-responsive.
1202  *
1203  * HAMMER2_XOP_COLLECT_NOWAIT   - Used to 'poll' a completed collection,
1204  *                                usually called synchronously from the
1205  *                                node XOPs for the strategy code to
1206  *                                fake the frontend collection and complete
1207  *                                the BIO as soon as possible.
1208  *
1209  * HAMMER2_XOP_SYNCHRONIZER     - Reqeuest synchronization with a particular
1210  *                                cluster index, prevents looping when that
1211  *                                index is out of sync so caller can act on
1212  *                                the out of sync element.  ESRCH and EDEADLK
1213  *                                can be returned if this flag is specified.
1214  *
1215  * Returns 0 on success plus a filled out xop->cluster structure.
1216  * Return ENOENT on normal termination.
1217  * Otherwise return an error.
1218  */
1219 int
1220 hammer2_xop_collect(hammer2_xop_head_t *xop)
1221 {
1222         hammer2_xop_fifo_t *fifo;
1223         hammer2_chain_t *chain;
1224         hammer2_key_t lokey;
1225         int error;
1226         int keynull;
1227         int adv;                /* advance the element */
1228         int i;
1229         uint32_t check_counter;
1230
1231 loop:
1232         /*
1233          * First loop tries to advance pieces of the cluster which
1234          * are out of sync.
1235          */
1236         lokey = HAMMER2_KEY_MAX;
1237         keynull = HAMMER2_CHECK_NULL;
1238         check_counter = xop->check_counter;
1239         cpu_lfence();
1240
1241         for (i = 0; i < xop->cluster.nchains; ++i) {
1242                 chain = xop->cluster.array[i].chain;
1243                 if (chain == NULL) {
1244                         adv = 1;
1245                 } else if (chain->bref.key < xop->nkey) {
1246                         adv = 1;
1247                 } else {
1248                         keynull &= ~HAMMER2_CHECK_NULL;
1249                         if (lokey > chain->bref.key)
1250                                 lokey = chain->bref.key;
1251                         adv = 0;
1252                 }
1253                 if (adv == 0)
1254                         continue;
1255
1256                 /*
1257                  * Advance element if possible, advanced element may be NULL.
1258                  */
1259                 if (chain) {
1260                         hammer2_chain_unlock(chain);
1261                         hammer2_chain_drop(chain);
1262                 }
1263                 fifo = &xop->collect[i];
1264                 if (fifo->ri != fifo->wi) {
1265                         cpu_lfence();
1266                         chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1267                         ++fifo->ri;
1268                         xop->cluster.array[i].chain = chain;
1269                         if (chain == NULL) {
1270                                 xop->cluster.array[i].flags |=
1271                                                         HAMMER2_CITEM_NULL;
1272                         }
1273                         if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1274                                 wakeup(xop);    /* XXX optimize */
1275                         --i;            /* loop on same index */
1276                 } else {
1277                         /*
1278                          * Retain CITEM_NULL flag.  If set just repeat EOF.
1279                          * If not, the NULL,0 combination indicates an
1280                          * operation in-progress.
1281                          */
1282                         xop->cluster.array[i].chain = NULL;
1283                         /* retain any CITEM_NULL setting */
1284                 }
1285         }
1286
1287         /*
1288          * Determine whether the lowest collected key meets clustering
1289          * requirements.  Returns:
1290          *
1291          * 0             - key valid, cluster can be returned.
1292          *
1293          * ENOENT        - normal end of scan, return ENOENT.
1294          *
1295          * ESRCH         - sufficient elements collected, quorum agreement
1296          *                 that lokey is not a valid element and should be
1297          *                 skipped.
1298          *
1299          * EDEADLK       - sufficient elements collected, no quorum agreement
1300          *                 (and no agreement possible).  In this situation a
1301          *                 repair is needed, for now we loop.
1302          *
1303          * EINPROGRESS   - insufficient elements collected to resolve, wait
1304          *                 for event and loop.
1305          */
1306         error = hammer2_cluster_check(&xop->cluster, lokey, keynull);
1307         if (error == EINPROGRESS) {
1308                 if (xop->check_counter == check_counter) {
1309                         tsleep_interlock(&xop->check_counter, 0);
1310                         cpu_lfence();
1311                         if (xop->check_counter == check_counter) {
1312                                 tsleep(&xop->check_counter, PINTERLOCKED,
1313                                         "h2coll", hz*60);
1314                         }
1315                 }
1316                 goto loop;
1317         }
1318         if (error == ESRCH) {
1319                 if (lokey != HAMMER2_KEY_MAX) {
1320                         xop->nkey = lokey + 1;
1321                         goto loop;
1322                 }
1323                 error = ENOENT;
1324         }
1325         if (error == EDEADLK) {
1326                 kprintf("hammer2: no quorum possible lkey %016jx\n",
1327                         lokey);
1328                 if (lokey != HAMMER2_KEY_MAX) {
1329                         xop->nkey = lokey + 1;
1330                         goto loop;
1331                 }
1332                 error = ENOENT;
1333         }
1334         if (lokey == HAMMER2_KEY_MAX)
1335                 xop->nkey = lokey;
1336         else
1337                 xop->nkey = lokey + 1;
1338
1339         return error;
1340 }
1341
1342 /*
1343  * Primary management thread for xops support.  Each node has several such
1344  * threads which replicate front-end operations on cluster nodes.
1345  *
1346  * XOPS thread node operations, allowing the function to focus on a single
1347  * node in the cluster after validating the operation with the cluster.
1348  * This is primarily what prevents dead or stalled nodes from stalling
1349  * the front-end.
1350  */
1351 void
1352 hammer2_primary_xops_thread(void *arg)
1353 {
1354         hammer2_thread_t *thr = arg;
1355         hammer2_pfs_t *pmp;
1356         hammer2_xop_head_t *xop;
1357         hammer2_xop_group_t *xgrp;
1358         uint32_t mask;
1359
1360         pmp = thr->pmp;
1361         xgrp = &pmp->xop_groups[thr->repidx];
1362         mask = 1U << thr->clindex;
1363
1364         lockmgr(&thr->lk, LK_EXCLUSIVE);
1365         while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
1366                 /*
1367                  * Handle freeze request
1368                  */
1369                 if (thr->flags & HAMMER2_THREAD_FREEZE) {
1370                         atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
1371                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
1372                 }
1373
1374                 /*
1375                  * Force idle if frozen until unfrozen or stopped.
1376                  */
1377                 if (thr->flags & HAMMER2_THREAD_FROZEN) {
1378                         lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
1379                         continue;
1380                 }
1381
1382                 /*
1383                  * Reset state on REMASTER request
1384                  */
1385                 if (thr->flags & HAMMER2_THREAD_REMASTER) {
1386                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
1387                         /* reset state */
1388                 }
1389
1390                 /*
1391                  * Process requests.  Each request can be multi-queued.
1392                  *
1393                  * If we get behind and the frontend VOP is no longer active,
1394                  * we retire the request without processing it.  The callback
1395                  * may also abort processing if the frontend VOP becomes
1396                  * inactive.
1397                  */
1398                 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1399                         TAILQ_REMOVE(&thr->xopq, xop,
1400                                      collect[thr->clindex].entry);
1401                         if (hammer2_xop_active(xop)) {
1402                                 lockmgr(&thr->lk, LK_RELEASE);
1403                                 xop->func((hammer2_xop_t *)xop, thr->clindex);
1404                                 hammer2_xop_retire(xop, mask);
1405                                 lockmgr(&thr->lk, LK_EXCLUSIVE);
1406                         } else {
1407                                 hammer2_xop_feed(xop, NULL, thr->clindex,
1408                                                  ECONNABORTED);
1409                                 hammer2_xop_retire(xop, mask);
1410                         }
1411                 }
1412
1413                 /*
1414                  * Wait for event.
1415                  */
1416                 lksleep(&thr->flags, &thr->lk, 0, "h2idle", 0);
1417         }
1418
1419         /*
1420          * Cleanup / termination
1421          */
1422         while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1423                 kprintf("hammer2_thread: aborting xop %p\n", xop->func);
1424                 TAILQ_REMOVE(&thr->xopq, xop,
1425                              collect[thr->clindex].entry);
1426                 hammer2_xop_retire(xop, mask);
1427         }
1428
1429         thr->td = NULL;
1430         wakeup(thr);
1431         lockmgr(&thr->lk, LK_RELEASE);
1432         /* thr structure can go invalid after this point */
1433 }