hammer2 - Refactor frontend part 17
[dragonfly.git] / sys / vfs / hammer2 / hammer2_thread.c
1 /*
2  * Copyright (c) 2015 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module implements various PFS-based helper threads.
36  */
37 #include "hammer2.h"
38
39 typedef struct hammer2_deferred_ip {
40         struct hammer2_deferred_ip *next;
41         hammer2_inode_t *ip;
42 } hammer2_deferred_ip_t;
43
44 typedef struct hammer2_deferred_list {
45         hammer2_deferred_ip_t   *base;
46         int                     count;
47 } hammer2_deferred_list_t;
48
49
50 #define HAMMER2_THREAD_DEBUG 1
51
52 static int hammer2_sync_slaves(hammer2_thread_t *thr, hammer2_inode_t *ip,
53                                 hammer2_deferred_list_t *list);
54 #if 0
55 static void hammer2_update_pfs_status(hammer2_thread_t *thr, uint32_t flags);
56                                 nerror = hammer2_sync_insert(
57                                                 thr, &parent, &chain,
58                                                 focus->bref.modify_tid,
59                                                 idx, focus);
60 #endif
61 static int hammer2_sync_insert(hammer2_thread_t *thr,
62                         hammer2_chain_t **parentp, hammer2_chain_t **chainp,
63                         hammer2_tid_t modify_tid, int idx,
64                         hammer2_chain_t *focus);
65 static int hammer2_sync_destroy(hammer2_thread_t *thr,
66                         hammer2_chain_t **parentp, hammer2_chain_t **chainp,
67                         int idx);
68 static int hammer2_sync_replace(hammer2_thread_t *thr,
69                         hammer2_chain_t *parent, hammer2_chain_t *chain,
70                         hammer2_tid_t modify_tid, int idx,
71                         hammer2_chain_t *focus);
72
73 /****************************************************************************
74  *                          HAMMER2 THREAD API                              *
75  ****************************************************************************/
76 /*
77  * Initialize the suspplied thread structure, starting the specified
78  * thread.
79  */
80 void
81 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp,
82                    const char *id, int clindex, int repidx,
83                    void (*func)(void *arg))
84 {
85         lockinit(&thr->lk, "h2thr", 0, 0);
86         thr->pmp = pmp;
87         thr->clindex = clindex;
88         thr->repidx = repidx;
89         TAILQ_INIT(&thr->xopq);
90         if (repidx >= 0) {
91                 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
92                             "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx);
93         } else {
94                 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
95                             "%s-%s", id, pmp->pfs_names[clindex]);
96         }
97 }
98
99 /*
100  * Terminate a thread.  This function will silently return if the thread
101  * was never initialized or has already been deleted.
102  *
103  * This is accomplished by setting the STOP flag and waiting for the td
104  * structure to become NULL.
105  */
106 void
107 hammer2_thr_delete(hammer2_thread_t *thr)
108 {
109         if (thr->td == NULL)
110                 return;
111         lockmgr(&thr->lk, LK_EXCLUSIVE);
112         atomic_set_int(&thr->flags, HAMMER2_THREAD_STOP);
113         wakeup(&thr->flags);
114         while (thr->td) {
115                 lksleep(thr, &thr->lk, 0, "h2thr", hz);
116         }
117         lockmgr(&thr->lk, LK_RELEASE);
118         thr->pmp = NULL;
119         lockuninit(&thr->lk);
120 }
121
122 /*
123  * Asynchronous remaster request.  Ask the synchronization thread to
124  * start over soon (as if it were frozen and unfrozen, but without waiting).
125  * The thread always recalculates mastership relationships when restarting.
126  */
127 void
128 hammer2_thr_remaster(hammer2_thread_t *thr)
129 {
130         if (thr->td == NULL)
131                 return;
132         lockmgr(&thr->lk, LK_EXCLUSIVE);
133         atomic_set_int(&thr->flags, HAMMER2_THREAD_REMASTER);
134         wakeup(&thr->flags);
135         lockmgr(&thr->lk, LK_RELEASE);
136 }
137
138 void
139 hammer2_thr_freeze_async(hammer2_thread_t *thr)
140 {
141         atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
142         wakeup(&thr->flags);
143 }
144
145 void
146 hammer2_thr_freeze(hammer2_thread_t *thr)
147 {
148         if (thr->td == NULL)
149                 return;
150         lockmgr(&thr->lk, LK_EXCLUSIVE);
151         atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
152         wakeup(&thr->flags);
153         while ((thr->flags & HAMMER2_THREAD_FROZEN) == 0) {
154                 lksleep(thr, &thr->lk, 0, "h2frz", hz);
155         }
156         lockmgr(&thr->lk, LK_RELEASE);
157 }
158
159 void
160 hammer2_thr_unfreeze(hammer2_thread_t *thr)
161 {
162         if (thr->td == NULL)
163                 return;
164         lockmgr(&thr->lk, LK_EXCLUSIVE);
165         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FROZEN);
166         wakeup(&thr->flags);
167         lockmgr(&thr->lk, LK_RELEASE);
168 }
169
170 static
171 int
172 hammer2_thr_break(hammer2_thread_t *thr)
173 {
174         if (thr->flags & (HAMMER2_THREAD_STOP |
175                           HAMMER2_THREAD_REMASTER |
176                           HAMMER2_THREAD_FREEZE)) {
177                 return 1;
178         }
179         return 0;
180 }
181
182 /****************************************************************************
183  *                          HAMMER2 SYNC THREADS                            *
184  ****************************************************************************/
185 /*
186  * Primary management thread for an element of a node.  A thread will exist
187  * for each element requiring management.
188  *
189  * No management threads are needed for the SPMP or for any PMP with only
190  * a single MASTER.
191  *
192  * On the SPMP - handles bulkfree and dedup operations
193  * On a PFS    - handles remastering and synchronization
194  */
195 void
196 hammer2_primary_sync_thread(void *arg)
197 {
198         hammer2_thread_t *thr = arg;
199         hammer2_pfs_t *pmp;
200         hammer2_deferred_list_t list;
201         hammer2_deferred_ip_t *defer;
202         int error;
203
204         pmp = thr->pmp;
205         bzero(&list, sizeof(list));
206
207         lockmgr(&thr->lk, LK_EXCLUSIVE);
208         while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
209                 /*
210                  * Handle freeze request
211                  */
212                 if (thr->flags & HAMMER2_THREAD_FREEZE) {
213                         atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
214                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
215                 }
216
217                 /*
218                  * Force idle if frozen until unfrozen or stopped.
219                  */
220                 if (thr->flags & HAMMER2_THREAD_FROZEN) {
221                         lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
222                         continue;
223                 }
224
225                 /*
226                  * Reset state on REMASTER request
227                  */
228                 if (thr->flags & HAMMER2_THREAD_REMASTER) {
229                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
230                         /* reset state */
231                 }
232
233                 /*
234                  * Synchronization scan.
235                  */
236                 kprintf("sync_slaves clindex %d\n", thr->clindex);
237                 hammer2_trans_init(pmp, 0);
238
239                 hammer2_inode_ref(pmp->iroot);
240                 for (;;) {
241                         int didbreak = 0;
242                         error = hammer2_sync_slaves(thr, pmp->iroot, &list);
243                         if (error != EAGAIN)
244                                 break;
245                         while ((defer = list.base) != NULL) {
246                                 hammer2_inode_t *nip;
247
248                                 nip = defer->ip;
249                                 error = hammer2_sync_slaves(thr, nip, &list);
250                                 if (error && error != EAGAIN)
251                                         break;
252                                 if (hammer2_thr_break(thr)) {
253                                         didbreak = 1;
254                                         break;
255                                 }
256
257                                 /*
258                                  * If no additional defers occurred we can
259                                  * remove this one, otherwrise keep it on
260                                  * the list and retry once the additional
261                                  * defers have completed.
262                                  */
263                                 if (defer == list.base) {
264                                         list.base = defer->next;
265                                         kfree(defer, M_HAMMER2);
266                                         defer = NULL;   /* safety */
267                                         hammer2_inode_drop(nip);
268                                 }
269                         }
270
271                         /*
272                          * If the thread is being remastered, frozen, or
273                          * stopped, clean up any left-over deferals.
274                          */
275                         if (didbreak) {
276                                 kprintf("didbreak\n");
277                                 while ((defer = list.base) != NULL) {
278                                         hammer2_inode_drop(defer->ip);
279                                         list.base = defer->next;
280                                         kfree(defer, M_HAMMER2);
281                                 }
282                                 error = EINPROGRESS;
283                                 break;
284                         }
285                 }
286
287                 hammer2_inode_drop(pmp->iroot);
288                 hammer2_trans_done(pmp);
289
290                 if (error)
291                         kprintf("hammer2_sync_slaves: error %d\n", error);
292
293                 /*
294                  * Wait for event, or 5-second poll.
295                  */
296                 lksleep(&thr->flags, &thr->lk, 0, "h2idle", hz * 5);
297         }
298         thr->td = NULL;
299         wakeup(thr);
300         lockmgr(&thr->lk, LK_RELEASE);
301         /* thr structure can go invalid after this point */
302 }
303
304 #if 0
305 /*
306  * Given a locked cluster created from pmp->iroot, update the PFS's
307  * reporting status.
308  */
309 static
310 void
311 hammer2_update_pfs_status(hammer2_thread_t *thr, uint32_t flags)
312 {
313         hammer2_pfs_t *pmp = thr->pmp;
314
315         flags &= HAMMER2_CLUSTER_ZFLAGS;
316         if (pmp->cluster_flags == flags)
317                 return;
318         pmp->cluster_flags = flags;
319
320         kprintf("pfs %p", pmp);
321         if (flags & HAMMER2_CLUSTER_MSYNCED)
322                 kprintf(" masters-all-good");
323         if (flags & HAMMER2_CLUSTER_SSYNCED)
324                 kprintf(" slaves-all-good");
325
326         if (flags & HAMMER2_CLUSTER_WRHARD)
327                 kprintf(" quorum/rw");
328         else if (flags & HAMMER2_CLUSTER_RDHARD)
329                 kprintf(" quorum/ro");
330
331         if (flags & HAMMER2_CLUSTER_UNHARD)
332                 kprintf(" out-of-sync-masters");
333         else if (flags & HAMMER2_CLUSTER_NOHARD)
334                 kprintf(" no-masters-visible");
335
336         if (flags & HAMMER2_CLUSTER_WRSOFT)
337                 kprintf(" soft/rw");
338         else if (flags & HAMMER2_CLUSTER_RDSOFT)
339                 kprintf(" soft/ro");
340
341         if (flags & HAMMER2_CLUSTER_UNSOFT)
342                 kprintf(" out-of-sync-slaves");
343         else if (flags & HAMMER2_CLUSTER_NOSOFT)
344                 kprintf(" no-slaves-visible");
345         kprintf("\n");
346 }
347 #endif
348
349 #if 0
350 static
351 void
352 dumpcluster(const char *label,
353             hammer2_cluster_t *cparent, hammer2_cluster_t *cluster)
354 {
355         hammer2_chain_t *chain;
356         int i;
357
358         if ((hammer2_debug & 1) == 0)
359                 return;
360
361         kprintf("%s\t", label);
362         KKASSERT(cparent->nchains == cluster->nchains);
363         for (i = 0; i < cparent->nchains; ++i) {
364                 if (i)
365                         kprintf("\t");
366                 kprintf("%d ", i);
367                 if ((chain = cparent->array[i].chain) != NULL) {
368                         kprintf("%016jx%s ",
369                                 chain->bref.key,
370                                 ((cparent->array[i].flags &
371                                   HAMMER2_CITEM_INVALID) ? "(I)" : "   ")
372                         );
373                 } else {
374                         kprintf("      NULL      %s ", "   ");
375                 }
376                 if ((chain = cluster->array[i].chain) != NULL) {
377                         kprintf("%016jx%s ",
378                                 chain->bref.key,
379                                 ((cluster->array[i].flags &
380                                   HAMMER2_CITEM_INVALID) ? "(I)" : "   ")
381                         );
382                 } else {
383                         kprintf("      NULL      %s ", "   ");
384                 }
385                 kprintf("\n");
386         }
387 }
388 #endif
389
390 /*
391  * Each out of sync node sync-thread must issue an all-nodes XOP scan of
392  * the inode.  This creates a multiplication effect since the XOP scan itself
393  * issues to all nodes.  However, this is the only way we can safely
394  * synchronize nodes which might have disparate I/O bandwidths and the only
395  * way we can safely deal with stalled nodes.
396  */
397 static
398 int
399 hammer2_sync_slaves(hammer2_thread_t *thr, hammer2_inode_t *ip,
400                     hammer2_deferred_list_t *list)
401 {
402         hammer2_xop_scanall_t *xop;
403         hammer2_chain_t *parent;
404         hammer2_chain_t *chain;
405         hammer2_pfs_t *pmp;
406         hammer2_key_t key_next;
407         int cache_index = -1;
408         int needrescan;
409         int didwork;
410         int error;
411         int nerror;
412         int idx;
413         int n;
414
415         pmp = ip->pmp;
416         idx = thr->clindex;     /* cluster node we are responsible for */
417         needrescan = 0;
418         didwork = 0;
419
420 #if 0
421         /*
422          * Nothing to do if all slaves are synchronized.
423          * Nothing to do if cluster not authoritatively readable.
424          */
425         if (pmp->cluster_flags & HAMMER2_CLUSTER_SSYNCED)
426                 return(0);
427         if ((pmp->cluster_flags & HAMMER2_CLUSTER_RDHARD) == 0)
428                 return(HAMMER2_ERROR_INCOMPLETE);
429 #endif
430
431         error = 0;
432
433         /*
434          * The inode is left unlocked during the scan.  Issue a XOP
435          * that does *not* include our cluster index to iterate
436          * properly synchronized elements and resolve our cluster index
437          * against it.
438          */
439         hammer2_inode_lock(ip, HAMMER2_RESOLVE_SHARED);
440         xop = &hammer2_xop_alloc(ip)->xop_scanall;
441         xop->key_beg = HAMMER2_KEY_MIN;
442         xop->key_end = HAMMER2_KEY_MAX;
443         hammer2_xop_start_except(&xop->head, hammer2_xop_scanall, idx);
444         parent = hammer2_inode_chain(ip, idx,
445                                      HAMMER2_RESOLVE_ALWAYS |
446                                      HAMMER2_RESOLVE_SHARED);
447
448         hammer2_inode_unlock(ip);
449
450         chain = hammer2_chain_lookup(&parent, &key_next,
451                                      HAMMER2_KEY_MIN, HAMMER2_KEY_MAX,
452                                      &cache_index,
453                                      HAMMER2_LOOKUP_SHARED |
454                                      HAMMER2_LOOKUP_NODIRECT |
455                                      HAMMER2_LOOKUP_NODATA);
456         error = hammer2_xop_collect(&xop->head, 0);
457
458         for (;;) {
459                 /*
460                  * We are done if our scan is done and the XOP scan is done.
461                  * We are done if the XOP scan failed (that is, we don't
462                  * have authoritative data to synchronize with).
463                  */
464                 int advance_local = 0;
465                 int advance_xop = 0;
466                 int dodefer = 0;
467                 hammer2_chain_t *focus;
468
469                 if (chain == NULL && error == ENOENT)
470                         break;
471                 if (error && error != ENOENT)
472                         break;
473
474                 /*
475                  * Compare
476                  */
477                 if (chain && error == ENOENT) {
478                         /*
479                          * If we have local chains but the XOP scan is done,
480                          * the chains need to be deleted.
481                          */
482                         n = -1;
483                         focus = NULL;
484                 } else if (chain == NULL) {
485                         /*
486                          * If our local scan is done but the XOP scan is not,
487                          * we need to create the missing chain(s).
488                          */
489                         n = 1;
490                         focus = xop->head.cluster.focus;
491                 } else {
492                         /*
493                          * Otherwise compare to determine the action
494                          * needed.
495                          */
496                         focus = xop->head.cluster.focus;
497                         n = hammer2_chain_cmp(chain, focus);
498                 }
499
500                 /*
501                  * Take action based on comparison results.
502                  */
503                 if (n < 0) {
504                         /*
505                          * Delete extranious local data.  This will
506                          * automatically advance the chain.
507                          */
508                         nerror = hammer2_sync_destroy(thr, &parent, &chain,
509                                                       idx);
510                         didwork = 1;
511                 } else if (n == 0 && chain->bref.modify_tid !=
512                                      focus->bref.modify_tid) {
513                         /*
514                          * Matching key but local data or meta-data requires
515                          * updating.  If we will recurse, we still need to
516                          * update to compatible content first but we do not
517                          * synchronize modify_tid until the entire recursion
518                          * has completed successfully.
519                          */
520                         if (focus->bref.type == HAMMER2_BREF_TYPE_INODE) {
521                                 nerror = hammer2_sync_replace(
522                                                 thr, parent, chain,
523                                                 0,
524                                                 idx, focus);
525                                 dodefer = 1;
526                         } else {
527                                 nerror = hammer2_sync_replace(
528                                                 thr, parent, chain,
529                                                 focus->bref.modify_tid,
530                                                 idx, focus);
531                         }
532                         didwork = 1;
533                 } else if (n == 0) {
534                         /*
535                          * 100% match, advance both
536                          */
537                         advance_local = 1;
538                         advance_xop = 1;
539                         nerror = 0;
540                 } else if (n > 0) {
541                         /*
542                          * Insert missing local data.
543                          *
544                          * If we will recurse, we still need to update to
545                          * compatible content first but we do not synchronize
546                          * modify_tid until the entire recursion has
547                          * completed successfully.
548                          */
549                         if (focus->bref.type == HAMMER2_BREF_TYPE_INODE) {
550                                 nerror = hammer2_sync_insert(
551                                                 thr, &parent, &chain,
552                                                 0,
553                                                 idx, focus);
554                                 dodefer = 1;
555                         } else {
556                                 nerror = hammer2_sync_insert(
557                                                 thr, &parent, &chain,
558                                                 focus->bref.modify_tid,
559                                                 idx, focus);
560                         }
561                         advance_local = 1;
562                         advance_xop = 1;
563                         didwork = 1;
564                 }
565
566                 /*
567                  * We cannot recurse depth-first because the XOP is still
568                  * running in node threads for this scan.  Create a placemarker
569                  * by obtaining and record the hammer2_inode.
570                  *
571                  * We excluded our node from the XOP so we must temporarily
572                  * add it to xop->head.cluster so it is properly incorporated
573                  * into the inode.
574                  *
575                  * The deferral is pushed onto a LIFO list for bottom-up
576                  * synchronization.
577                  */
578                 if (error == 0 && dodefer) {
579                         hammer2_inode_t *nip;
580                         hammer2_deferred_ip_t *defer;
581
582                         KKASSERT(focus->bref.type == HAMMER2_BREF_TYPE_INODE);
583
584                         defer = kmalloc(sizeof(*defer), M_HAMMER2,
585                                         M_WAITOK | M_ZERO);
586                         KKASSERT(xop->head.cluster.array[idx].chain == NULL);
587                         xop->head.cluster.array[idx].flags =
588                                                         HAMMER2_CITEM_INVALID;
589                         xop->head.cluster.array[idx].chain = chain;
590                         nip = hammer2_inode_get(pmp, ip, &xop->head.cluster);
591                         xop->head.cluster.array[idx].chain = NULL;
592
593                         kprintf("DEFER INODE %p->%p\n", ip, nip);
594                         hammer2_inode_ref(nip);
595                         hammer2_inode_unlock(nip);
596
597                         defer->next = list->base;
598                         defer->ip = nip;
599                         list->base = defer;
600                         ++list->count;
601                         needrescan = 1;
602                 }
603
604                 /*
605                  * If at least one deferral was added and the deferral
606                  * list has grown too large, stop adding more.  This
607                  * will trigger an EAGAIN return.
608                  */
609                 if (needrescan && list->count > 1000)
610                         break;
611
612                 /*
613                  * Advancements for iteration.
614                  */
615                 if (advance_xop) {
616                         error = hammer2_xop_collect(&xop->head, 0);
617                 }
618                 if (advance_local) {
619                         chain = hammer2_chain_next(&parent, chain, &key_next,
620                                                    key_next, HAMMER2_KEY_MAX,
621                                                    &cache_index,
622                                                    HAMMER2_LOOKUP_SHARED |
623                                                    HAMMER2_LOOKUP_NODIRECT |
624                                                    HAMMER2_LOOKUP_NODATA);
625                 }
626         }
627         hammer2_xop_retire(&xop->head, HAMMER2_XOPMASK_VOP);
628         if (chain) {
629                 hammer2_chain_unlock(chain);
630                 hammer2_chain_drop(chain);
631         }
632         if (parent) {
633                 hammer2_chain_unlock(parent);
634                 hammer2_chain_drop(parent);
635         }
636
637         /*
638          * If we added deferrals we want the caller to synchronize them
639          * and then call us again.
640          *
641          * NOTE: In this situation we do not yet want to synchronize our
642          *       inode, setting the error code also has that effect.
643          */
644         if (error == 0 && needrescan)
645                 error = EAGAIN;
646
647         /*
648          * If no error occurred and work was performed, synchronize the
649          * inode meta-data itself.
650          *
651          * XXX inode lock was lost
652          */
653         if (error == 0 && didwork) {
654                 hammer2_xop_ipcluster_t *xop2;
655                 hammer2_chain_t *focus;
656
657                 xop2 = &hammer2_xop_alloc(ip)->xop_ipcluster;
658                 hammer2_xop_start_except(&xop2->head, hammer2_xop_ipcluster,
659                                          idx);
660                 error = hammer2_xop_collect(&xop2->head, 0);
661                 if (error == 0) {
662                         focus = xop2->head.cluster.focus;
663                         kprintf("syncthr: update inode\n");
664                         chain = hammer2_inode_chain_and_parent(ip, idx,
665                                                     &parent,
666                                                     HAMMER2_RESOLVE_ALWAYS |
667                                                     HAMMER2_RESOLVE_SHARED);
668
669                         KKASSERT(parent != NULL);
670                         nerror = hammer2_sync_replace(
671                                         thr, parent, chain,
672                                         focus->bref.modify_tid,
673                                         idx, xop2->head.cluster.focus);
674                         hammer2_chain_unlock(chain);
675                         hammer2_chain_drop(chain);
676                         hammer2_chain_unlock(parent);
677                         hammer2_chain_drop(parent);
678                         /* XXX */
679                 }
680                 hammer2_xop_retire(&xop2->head, HAMMER2_XOPMASK_VOP);
681         }
682
683         return error;
684 }
685
686 /*
687  * Create a missing chain by copying the focus from another device.
688  *
689  * On entry *parentp and focus are both locked shared.  The chain will be
690  * created and returned in *chainp also locked shared.
691  */
692 static
693 int
694 hammer2_sync_insert(hammer2_thread_t *thr,
695                     hammer2_chain_t **parentp, hammer2_chain_t **chainp,
696                     hammer2_tid_t modify_tid, int idx,
697                     hammer2_chain_t *focus)
698 {
699         hammer2_chain_t *chain;
700
701 #if HAMMER2_THREAD_DEBUG
702         if (hammer2_debug & 1)
703         kprintf("insert rec par=%p/%d.%016jx slave %d %d.%016jx mod=%016jx\n",
704                 *parentp, 
705                 (*parentp)->bref.type,
706                 (*parentp)->bref.key,
707                 idx,
708                 focus->bref.type, focus->bref.key, modify_tid);
709 #endif
710
711         hammer2_chain_unlock(*parentp);
712         hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_ALWAYS);
713         /* reissue lookup? */
714
715         /*
716          * Create the missing chain.
717          *
718          * Have to be careful to avoid deadlocks.
719          */
720         chain = NULL;
721         hammer2_chain_create(parentp, &chain, thr->pmp,
722                              focus->bref.key, focus->bref.keybits,
723                              focus->bref.type, focus->bytes,
724                              0);
725         hammer2_chain_modify(chain, HAMMER2_MODIFY_KEEPMODIFY);
726
727         /*
728          * Copy focus to new chain
729          */
730
731         /* type already set */
732         chain->bref.methods = focus->bref.methods;
733         /* keybits already set */
734         chain->bref.vradix = focus->bref.vradix;
735         /* mirror_tid set by flush */
736         chain->bref.modify_tid = modify_tid;
737         chain->bref.flags = focus->bref.flags;
738         /* key already present */
739         /* check code will be recalculated */
740
741         /*
742          * Copy data body.
743          */
744         switch(chain->bref.type) {
745         case HAMMER2_BREF_TYPE_INODE:
746                 if ((focus->data->ipdata.meta.op_flags &
747                      HAMMER2_OPFLAG_DIRECTDATA) == 0) {
748                         bcopy(focus->data, chain->data,
749                               offsetof(hammer2_inode_data_t, u));
750                         break;
751                 }
752                 /* fall through */
753         case HAMMER2_BREF_TYPE_DATA:
754                 bcopy(focus->data, chain->data, chain->bytes);
755                 hammer2_chain_setcheck(chain, chain->data);
756                 break;
757         default:
758                 KKASSERT(0);
759                 break;
760         }
761
762         hammer2_chain_unlock(focus);
763         hammer2_chain_unlock(chain);            /* unlock, leave ref */
764
765         /*
766          * Avoid ordering deadlock when relocking cparent.
767          */
768         hammer2_chain_unlock(*parentp);
769         hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_SHARED |
770                                      HAMMER2_RESOLVE_ALWAYS);
771         hammer2_chain_lock(chain, HAMMER2_RESOLVE_SHARED);
772
773         return 0;
774 }
775
776 /*
777  * Destroy an extranious chain.
778  *
779  * Both *parentp and *chainp are locked shared.
780  *
781  * On return, *chainp will be adjusted to point to the next element in the
782  * iteration and locked shared.
783  */
784 static
785 int
786 hammer2_sync_destroy(hammer2_thread_t *thr,
787                      hammer2_chain_t **parentp, hammer2_chain_t **chainp,
788                      int idx)
789 {
790         hammer2_chain_t *chain;
791         hammer2_chain_t *parent;
792         hammer2_key_t key_next;
793         hammer2_key_t save_key;
794         int cache_index = -1;
795
796         chain = *chainp;
797
798 #if HAMMER2_THREAD_DEBUG
799         if (hammer2_debug & 1)
800         kprintf("destroy rec %p/%p slave %d %d.%016jx\n",
801                 *parentp, chain,
802                 idx, chain->bref.type, chain->bref.key);
803 #endif
804
805         save_key = chain->bref.key;
806         if (save_key != HAMMER2_KEY_MAX)
807                 ++save_key;
808
809         /*
810          * Try to avoid unnecessary I/O.
811          *
812          * XXX accounting not propagated up properly.  We might have to do
813          *     a RESOLVE_MAYBE here and pass 0 for the flags.
814          */
815         hammer2_chain_unlock(chain);    /* relock exclusive */
816         hammer2_chain_unlock(*parentp);
817         hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_ALWAYS);
818         hammer2_chain_lock(chain, HAMMER2_RESOLVE_NEVER);
819
820         hammer2_chain_delete(*parentp, chain, HAMMER2_DELETE_PERMANENT);
821         hammer2_chain_unlock(chain);
822         hammer2_chain_drop(chain);
823         chain = NULL;                   /* safety */
824
825         hammer2_chain_unlock(*parentp); /* relock shared */
826         hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_SHARED |
827                                      HAMMER2_RESOLVE_ALWAYS);
828         *chainp = hammer2_chain_lookup(&parent, &key_next,
829                                      save_key, HAMMER2_KEY_MAX,
830                                      &cache_index,
831                                      HAMMER2_LOOKUP_SHARED |
832                                      HAMMER2_LOOKUP_NODIRECT |
833                                      HAMMER2_LOOKUP_NODATA);
834         return 0;
835 }
836
837 /*
838  * cparent is locked exclusively, with an extra ref, cluster is not locked.
839  * Replace element [i] in the cluster.
840  */
841 static
842 int
843 hammer2_sync_replace(hammer2_thread_t *thr,
844                      hammer2_chain_t *parent, hammer2_chain_t *chain,
845                      hammer2_tid_t modify_tid, int idx,
846                      hammer2_chain_t *focus)
847 {
848         int nradix;
849         uint8_t otype;
850
851 #if HAMMER2_THREAD_DEBUG
852         if (hammer2_debug & 1)
853         kprintf("replace rec %p slave %d %d.%016jx mod=%016jx\n",
854                 chain,
855                 idx,
856                 focus->bref.type, focus->bref.key, modify_tid);
857 #endif
858         hammer2_chain_unlock(chain);
859         hammer2_chain_lock(chain, HAMMER2_RESOLVE_ALWAYS);
860         if (chain->bytes != focus->bytes) {
861                 /* XXX what if compressed? */
862                 nradix = hammer2_getradix(chain->bytes);
863                 hammer2_chain_resize(NULL, parent, chain, nradix, 0);
864         }
865         hammer2_chain_modify(chain, HAMMER2_MODIFY_KEEPMODIFY);
866         otype = chain->bref.type;
867         chain->bref.type = focus->bref.type;
868         chain->bref.methods = focus->bref.methods;
869         chain->bref.keybits = focus->bref.keybits;
870         chain->bref.vradix = focus->bref.vradix;
871         /* mirror_tid updated by flush */
872         chain->bref.modify_tid = modify_tid;
873         chain->bref.flags = focus->bref.flags;
874         /* key already present */
875         /* check code will be recalculated */
876         chain->error = 0;
877
878         /*
879          * Copy data body.
880          */
881         switch(chain->bref.type) {
882         case HAMMER2_BREF_TYPE_INODE:
883                 if ((focus->data->ipdata.meta.op_flags &
884                      HAMMER2_OPFLAG_DIRECTDATA) == 0) {
885                         /*
886                          * If DIRECTDATA is transitioning to 0 or the old
887                          * chain is not an inode we have to initialize
888                          * the block table.
889                          */
890                         if (otype != HAMMER2_BREF_TYPE_INODE ||
891                             (chain->data->ipdata.meta.op_flags &
892                              HAMMER2_OPFLAG_DIRECTDATA)) {
893                                 kprintf("chain inode trans away from dd\n");
894                                 bzero(&chain->data->ipdata.u,
895                                       sizeof(chain->data->ipdata.u));
896                         }
897                         bcopy(focus->data, chain->data,
898                               offsetof(hammer2_inode_data_t, u));
899                         /* XXX setcheck on inode should not be needed */
900                         hammer2_chain_setcheck(chain, chain->data);
901                         break;
902                 }
903                 /* fall through */
904         case HAMMER2_BREF_TYPE_DATA:
905                 bcopy(focus->data, chain->data, chain->bytes);
906                 hammer2_chain_setcheck(chain, chain->data);
907                 break;
908         default:
909                 KKASSERT(0);
910                 break;
911         }
912
913         hammer2_chain_unlock(chain);
914         hammer2_chain_lock(chain, HAMMER2_RESOLVE_SHARED |
915                                   HAMMER2_RESOLVE_MAYBE);
916
917         return 0;
918 }
919
920 /****************************************************************************
921  *                          HAMMER2 XOPS THREADS                            *
922  ****************************************************************************/
923
924 void
925 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp)
926 {
927         hammer2_mtx_init(&xgrp->mtx, "h2xopq");
928         hammer2_mtx_init(&xgrp->mtx2, "h2xopio");
929 }
930
931 /*
932  * Allocate a XOP request.
933  *
934  * Once allocated a XOP request can be started, collected, and retired,
935  * and can be retired early if desired.
936  *
937  * NOTE: Fifo indices might not be zero but ri == wi on objcache_get().
938  */
939 hammer2_xop_t *
940 hammer2_xop_alloc(hammer2_inode_t *ip)
941 {
942         hammer2_xop_t *xop;
943
944         xop = objcache_get(cache_xops, M_WAITOK);
945         KKASSERT(xop->head.cluster.array[0].chain == NULL);
946         xop->head.ip = ip;
947         xop->head.func = NULL;
948         xop->head.state = 0;
949         xop->head.error = 0;
950         xop->head.collect_key = 0;
951
952         xop->head.cluster.nchains = ip->cluster.nchains;
953         xop->head.cluster.pmp = ip->pmp;
954         xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED;
955
956         /*
957          * run_mask - Active thread (or frontend) associated with XOP
958          */
959         xop->head.run_mask = HAMMER2_XOPMASK_VOP;
960
961         hammer2_inode_ref(ip);
962
963         return xop;
964 }
965
966 void
967 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len)
968 {
969         xop->name = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO);
970         xop->name_len = name_len;
971         bcopy(name, xop->name, name_len);
972 }
973
974 void
975 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len)
976 {
977         xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO);
978         xop->name2_len = name_len;
979         bcopy(name, xop->name2, name_len);
980 }
981
982
983 void
984 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2)
985 {
986         xop->ip2 = ip2;
987         hammer2_inode_ref(ip2);
988 }
989
990 void
991 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3)
992 {
993         xop->ip3 = ip3;
994         hammer2_inode_ref(ip3);
995 }
996
997 void
998 hammer2_xop_reinit(hammer2_xop_head_t *xop)
999 {
1000         xop->state = 0;
1001         xop->error = 0;
1002         xop->collect_key = 0;
1003         xop->run_mask = HAMMER2_XOPMASK_VOP;
1004 }
1005
1006 /*
1007  * A mounted PFS needs Xops threads to support frontend operations.
1008  */
1009 void
1010 hammer2_xop_helper_create(hammer2_pfs_t *pmp)
1011 {
1012         int i;
1013         int j;
1014
1015         for (i = 0; i < pmp->pfs_nmasters; ++i) {
1016                 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
1017                         if (pmp->xop_groups[j].thrs[i].td)
1018                                 continue;
1019                         hammer2_thr_create(&pmp->xop_groups[j].thrs[i], pmp,
1020                                            "h2xop", i, j,
1021                                            hammer2_primary_xops_thread);
1022                 }
1023         }
1024 }
1025
1026 void
1027 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp)
1028 {
1029         int i;
1030         int j;
1031
1032         for (i = 0; i < pmp->pfs_nmasters; ++i) {
1033                 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
1034                         if (pmp->xop_groups[j].thrs[i].td)
1035                                 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]);
1036                 }
1037         }
1038 }
1039
1040
1041
1042
1043 /*
1044  * Start a XOP request, queueing it to all nodes in the cluster to
1045  * execute the cluster op.
1046  *
1047  * XXX optimize single-target case.
1048  */
1049 void
1050 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_func_t func,
1051                          int notidx)
1052 {
1053         hammer2_xop_group_t *xgrp;
1054         hammer2_thread_t *thr;
1055         hammer2_pfs_t *pmp;
1056         int g;
1057         int i;
1058
1059         pmp = xop->ip->pmp;
1060
1061         g = pmp->xop_iterator++;
1062         g = g & HAMMER2_XOPGROUPS_MASK;
1063         xgrp = &pmp->xop_groups[g];
1064         xop->func = func;
1065         xop->xgrp = xgrp;
1066
1067         for (i = 0; i < xop->ip->cluster.nchains; ++i) {
1068                 thr = &xgrp->thrs[i];
1069                 if (thr->td && i != notidx) {
1070                         lockmgr(&thr->lk, LK_EXCLUSIVE);
1071                         if (thr->td &&
1072                             (thr->flags & HAMMER2_THREAD_STOP) == 0) {
1073                                 atomic_set_int(&xop->run_mask, 1U << i);
1074                                 TAILQ_INSERT_TAIL(&thr->xopq, xop,
1075                                                   collect[i].entry);
1076                         }
1077                         lockmgr(&thr->lk, LK_RELEASE);
1078                         wakeup(&thr->flags);
1079                 }
1080         }
1081 }
1082
1083 void
1084 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_func_t func)
1085 {
1086         hammer2_xop_start_except(xop, func, -1);
1087 }
1088
1089 /*
1090  * Retire a XOP.  Used by both the VOP frontend and by the XOP backend.
1091  */
1092 void
1093 hammer2_xop_retire(hammer2_xop_head_t *xop, uint32_t mask)
1094 {
1095         hammer2_xop_group_t *xgrp;
1096         hammer2_chain_t *chain;
1097         int i;
1098
1099         xgrp = xop->xgrp;
1100
1101         /*
1102          * Remove the frontend or remove a backend feeder.  When removing
1103          * the frontend we must wakeup any backend feeders who are waiting
1104          * for FIFO space.
1105          *
1106          * XXX optimize wakeup.
1107          */
1108         KKASSERT(xop->run_mask & mask);
1109         if (atomic_fetchadd_int(&xop->run_mask, -mask) != mask) {
1110                 if (mask == HAMMER2_XOPMASK_VOP)
1111                         wakeup(xop);
1112                 return;
1113         }
1114
1115         /*
1116          * Cleanup the collection cluster.
1117          */
1118         for (i = 0; i < xop->cluster.nchains; ++i) {
1119                 xop->cluster.array[i].flags = 0;
1120                 chain = xop->cluster.array[i].chain;
1121                 if (chain) {
1122                         xop->cluster.array[i].chain = NULL;
1123                         hammer2_chain_unlock(chain);
1124                         hammer2_chain_drop(chain);
1125                 }
1126         }
1127
1128         /*
1129          * Cleanup the fifos, use check_counter to optimize the loop.
1130          */
1131         mask = xop->chk_mask;
1132         for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) {
1133                 hammer2_xop_fifo_t *fifo = &xop->collect[i];
1134                 while (fifo->ri != fifo->wi) {
1135                         chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1136                         if (chain) {
1137                                 hammer2_chain_unlock(chain);
1138                                 hammer2_chain_drop(chain);
1139                         }
1140                         ++fifo->ri;
1141                         if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1142                                 wakeup(xop);    /* XXX optimize */
1143                 }
1144                 mask &= ~(1U << i);
1145         }
1146
1147         /*
1148          * The inode is only held at this point, simply drop it.
1149          */
1150         if (xop->ip) {
1151                 hammer2_inode_drop(xop->ip);
1152                 xop->ip = NULL;
1153         }
1154         if (xop->ip2) {
1155                 hammer2_inode_drop(xop->ip2);
1156                 xop->ip2 = NULL;
1157         }
1158         if (xop->ip3) {
1159                 hammer2_inode_drop(xop->ip3);
1160                 xop->ip3 = NULL;
1161         }
1162         if (xop->name) {
1163                 kfree(xop->name, M_HAMMER2);
1164                 xop->name = NULL;
1165                 xop->name_len = 0;
1166         }
1167         if (xop->name2) {
1168                 kfree(xop->name2, M_HAMMER2);
1169                 xop->name2 = NULL;
1170                 xop->name2_len = 0;
1171         }
1172
1173         objcache_put(cache_xops, xop);
1174 }
1175
1176 /*
1177  * (Backend) Returns non-zero if the frontend is still attached.
1178  */
1179 int
1180 hammer2_xop_active(hammer2_xop_head_t *xop)
1181 {
1182         if (xop->run_mask & HAMMER2_XOPMASK_VOP)
1183                 return 1;
1184         else
1185                 return 0;
1186 }
1187
1188 /*
1189  * (Backend) Feed chain data through the cluster validator and back to
1190  * the frontend.  Chains are fed from multiple nodes concurrently
1191  * and pipelined via per-node FIFOs in the XOP.
1192  *
1193  * No xop lock is needed because we are only manipulating fields under
1194  * our direct control.
1195  *
1196  * Returns 0 on success and a hammer error code if sync is permanently
1197  * lost.  The caller retains a ref on the chain but by convention
1198  * the lock is typically inherited by the xop (caller loses lock).
1199  *
1200  * Returns non-zero on error.  In this situation the caller retains a
1201  * ref on the chain but loses the lock (we unlock here).
1202  *
1203  * WARNING!  The chain is moving between two different threads, it must
1204  *           be locked SHARED to retain its data mapping, not exclusive.
1205  *           When multiple operations are in progress at once, chains fed
1206  *           back to the frontend for collection can wind up being locked
1207  *           in different orders, only a shared lock can prevent a deadlock.
1208  *
1209  *           Exclusive locks may only be used by a XOP backend node thread
1210  *           temporarily, with no direct or indirect dependencies (aka
1211  *           blocking/waiting) on other nodes.
1212  */
1213 int
1214 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain,
1215                  int clindex, int error)
1216 {
1217         hammer2_xop_fifo_t *fifo;
1218
1219         /*
1220          * Multi-threaded entry into the XOP collector.  We own the
1221          * fifo->wi for our clindex.
1222          */
1223         fifo = &xop->collect[clindex];
1224
1225         while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1226                 tsleep_interlock(xop, 0);
1227                 if (hammer2_xop_active(xop) == 0) {
1228                         error = EINTR;
1229                         goto done;
1230                 }
1231                 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1232                         tsleep(xop, PINTERLOCKED, "h2feed", hz*60);
1233                 }
1234         }
1235         if (chain)
1236                 hammer2_chain_ref(chain);
1237         fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error;
1238         fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain;
1239         cpu_sfence();
1240         ++fifo->wi;
1241         atomic_set_int(&xop->chk_mask, 1U << clindex);
1242         atomic_add_int(&xop->check_counter, 1);
1243         wakeup(&xop->check_counter);    /* XXX optimize */
1244         error = 0;
1245
1246         /*
1247          * Cleanup.  If an error occurred we eat the lock.  If no error
1248          * occurred the fifo inherits the lock and gains an additional ref.
1249          *
1250          * The caller's ref remains in both cases.
1251          */
1252 done:
1253         if (error && chain)
1254                 hammer2_chain_unlock(chain);
1255         return error;
1256 }
1257
1258 /*
1259  * (Frontend) collect a response from a running cluster op.
1260  *
1261  * Responses are fed from all appropriate nodes concurrently
1262  * and collected into a cohesive response >= collect_key.
1263  *
1264  * The collector will return the instant quorum or other requirements
1265  * are met, even if some nodes get behind or become non-responsive.
1266  *
1267  * HAMMER2_XOP_COLLECT_NOWAIT   - Used to 'poll' a completed collection,
1268  *                                usually called synchronously from the
1269  *                                node XOPs for the strategy code to
1270  *                                fake the frontend collection and complete
1271  *                                the BIO as soon as possible.
1272  *
1273  * HAMMER2_XOP_SYNCHRONIZER     - Reqeuest synchronization with a particular
1274  *                                cluster index, prevents looping when that
1275  *                                index is out of sync so caller can act on
1276  *                                the out of sync element.  ESRCH and EDEADLK
1277  *                                can be returned if this flag is specified.
1278  *
1279  * Returns 0 on success plus a filled out xop->cluster structure.
1280  * Return ENOENT on normal termination.
1281  * Otherwise return an error.
1282  */
1283 int
1284 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags)
1285 {
1286         hammer2_xop_fifo_t *fifo;
1287         hammer2_chain_t *chain;
1288         hammer2_key_t lokey;
1289         int error;
1290         int keynull;
1291         int adv;                /* advance the element */
1292         int i;
1293         uint32_t check_counter;
1294
1295 loop:
1296         /*
1297          * First loop tries to advance pieces of the cluster which
1298          * are out of sync.
1299          */
1300         lokey = HAMMER2_KEY_MAX;
1301         keynull = HAMMER2_CHECK_NULL;
1302         check_counter = xop->check_counter;
1303         cpu_lfence();
1304
1305         for (i = 0; i < xop->cluster.nchains; ++i) {
1306                 chain = xop->cluster.array[i].chain;
1307                 if (chain == NULL) {
1308                         adv = 1;
1309                 } else if (chain->bref.key < xop->collect_key) {
1310                         adv = 1;
1311                 } else {
1312                         keynull &= ~HAMMER2_CHECK_NULL;
1313                         if (lokey > chain->bref.key)
1314                                 lokey = chain->bref.key;
1315                         adv = 0;
1316                 }
1317                 if (adv == 0)
1318                         continue;
1319
1320                 /*
1321                  * Advance element if possible, advanced element may be NULL.
1322                  */
1323                 if (chain) {
1324                         hammer2_chain_unlock(chain);
1325                         hammer2_chain_drop(chain);
1326                 }
1327                 fifo = &xop->collect[i];
1328                 if (fifo->ri != fifo->wi) {
1329                         cpu_lfence();
1330                         chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1331                         ++fifo->ri;
1332                         xop->cluster.array[i].chain = chain;
1333                         if (chain == NULL) {
1334                                 xop->cluster.array[i].flags |=
1335                                                         HAMMER2_CITEM_NULL;
1336                         }
1337                         if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1338                                 wakeup(xop);    /* XXX optimize */
1339                         --i;            /* loop on same index */
1340                 } else {
1341                         /*
1342                          * Retain CITEM_NULL flag.  If set just repeat EOF.
1343                          * If not, the NULL,0 combination indicates an
1344                          * operation in-progress.
1345                          */
1346                         xop->cluster.array[i].chain = NULL;
1347                         /* retain any CITEM_NULL setting */
1348                 }
1349         }
1350
1351         /*
1352          * Determine whether the lowest collected key meets clustering
1353          * requirements.  Returns:
1354          *
1355          * 0             - key valid, cluster can be returned.
1356          *
1357          * ENOENT        - normal end of scan, return ENOENT.
1358          *
1359          * ESRCH         - sufficient elements collected, quorum agreement
1360          *                 that lokey is not a valid element and should be
1361          *                 skipped.
1362          *
1363          * EDEADLK       - sufficient elements collected, no quorum agreement
1364          *                 (and no agreement possible).  In this situation a
1365          *                 repair is needed, for now we loop.
1366          *
1367          * EINPROGRESS   - insufficient elements collected to resolve, wait
1368          *                 for event and loop.
1369          */
1370         error = hammer2_cluster_check(&xop->cluster, lokey, keynull);
1371         if (error == EINPROGRESS) {
1372                 if (xop->check_counter == check_counter) {
1373                         if (flags & HAMMER2_XOP_COLLECT_NOWAIT)
1374                                 goto done;
1375                         tsleep_interlock(&xop->check_counter, 0);
1376                         cpu_lfence();
1377                         if (xop->check_counter == check_counter) {
1378                                 tsleep(&xop->check_counter, PINTERLOCKED,
1379                                         "h2coll", hz*60);
1380                         }
1381                 }
1382                 goto loop;
1383         }
1384         if (error == ESRCH) {
1385                 if (lokey != HAMMER2_KEY_MAX) {
1386                         xop->collect_key = lokey + 1;
1387                         goto loop;
1388                 }
1389                 error = ENOENT;
1390         }
1391         if (error == EDEADLK) {
1392                 kprintf("hammer2: no quorum possible lokey %016jx\n",
1393                         lokey);
1394                 if (lokey != HAMMER2_KEY_MAX) {
1395                         xop->collect_key = lokey + 1;
1396                         goto loop;
1397                 }
1398                 error = ENOENT;
1399         }
1400         if (lokey == HAMMER2_KEY_MAX)
1401                 xop->collect_key = lokey;
1402         else
1403                 xop->collect_key = lokey + 1;
1404 done:
1405         return error;
1406 }
1407
1408 /*
1409  * Primary management thread for xops support.  Each node has several such
1410  * threads which replicate front-end operations on cluster nodes.
1411  *
1412  * XOPS thread node operations, allowing the function to focus on a single
1413  * node in the cluster after validating the operation with the cluster.
1414  * This is primarily what prevents dead or stalled nodes from stalling
1415  * the front-end.
1416  */
1417 void
1418 hammer2_primary_xops_thread(void *arg)
1419 {
1420         hammer2_thread_t *thr = arg;
1421         hammer2_pfs_t *pmp;
1422         hammer2_xop_head_t *xop;
1423         hammer2_xop_group_t *xgrp;
1424         uint32_t mask;
1425
1426         pmp = thr->pmp;
1427         xgrp = &pmp->xop_groups[thr->repidx];
1428         mask = 1U << thr->clindex;
1429
1430         lockmgr(&thr->lk, LK_EXCLUSIVE);
1431         while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
1432                 /*
1433                  * Handle freeze request
1434                  */
1435                 if (thr->flags & HAMMER2_THREAD_FREEZE) {
1436                         atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
1437                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
1438                 }
1439
1440                 /*
1441                  * Force idle if frozen until unfrozen or stopped.
1442                  */
1443                 if (thr->flags & HAMMER2_THREAD_FROZEN) {
1444                         lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
1445                         continue;
1446                 }
1447
1448                 /*
1449                  * Reset state on REMASTER request
1450                  */
1451                 if (thr->flags & HAMMER2_THREAD_REMASTER) {
1452                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
1453                         /* reset state */
1454                 }
1455
1456                 /*
1457                  * Process requests.  Each request can be multi-queued.
1458                  *
1459                  * If we get behind and the frontend VOP is no longer active,
1460                  * we retire the request without processing it.  The callback
1461                  * may also abort processing if the frontend VOP becomes
1462                  * inactive.
1463                  */
1464                 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1465                         TAILQ_REMOVE(&thr->xopq, xop,
1466                                      collect[thr->clindex].entry);
1467                         if (hammer2_xop_active(xop)) {
1468                                 lockmgr(&thr->lk, LK_RELEASE);
1469                                 xop->func((hammer2_xop_t *)xop, thr->clindex);
1470                                 hammer2_xop_retire(xop, mask);
1471                                 lockmgr(&thr->lk, LK_EXCLUSIVE);
1472                         } else {
1473                                 hammer2_xop_feed(xop, NULL, thr->clindex,
1474                                                  ECONNABORTED);
1475                                 hammer2_xop_retire(xop, mask);
1476                         }
1477                 }
1478
1479                 /*
1480                  * Wait for event.
1481                  */
1482                 lksleep(&thr->flags, &thr->lk, 0, "h2idle", 0);
1483         }
1484
1485         /*
1486          * Cleanup / termination
1487          */
1488         while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1489                 kprintf("hammer2_thread: aborting xop %p\n", xop->func);
1490                 TAILQ_REMOVE(&thr->xopq, xop,
1491                              collect[thr->clindex].entry);
1492                 hammer2_xop_retire(xop, mask);
1493         }
1494
1495         thr->td = NULL;
1496         wakeup(thr);
1497         lockmgr(&thr->lk, LK_RELEASE);
1498         /* thr structure can go invalid after this point */
1499 }