1d0a11093093d1e94af7cd937573b975da669d39
[dragonfly.git] / sys / vfs / hammer2 / hammer2_thread.c
1 /*
2  * Copyright (c) 2015 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module implements various PFS-based helper threads.
36  */
37 #include "hammer2.h"
38
39 typedef struct hammer2_deferred_ip {
40         struct hammer2_deferred_ip *next;
41         hammer2_inode_t *ip;
42 } hammer2_deferred_ip_t;
43
44 typedef struct hammer2_deferred_list {
45         hammer2_deferred_ip_t   *base;
46         int                     count;
47 } hammer2_deferred_list_t;
48
49
50 #define HAMMER2_THREAD_DEBUG 1
51
52 static int hammer2_sync_slaves(hammer2_thread_t *thr, hammer2_inode_t *ip,
53                                 hammer2_deferred_list_t *list);
54 #if 0
55 static void hammer2_update_pfs_status(hammer2_thread_t *thr, uint32_t flags);
56                                 nerror = hammer2_sync_insert(
57                                                 thr, &parent, &chain,
58                                                 focus->bref.modify_tid,
59                                                 idx, focus);
60 #endif
61 static int hammer2_sync_insert(hammer2_thread_t *thr,
62                         hammer2_chain_t **parentp, hammer2_chain_t **chainp,
63                         hammer2_tid_t modify_tid, int idx,
64                         hammer2_chain_t *focus);
65 static int hammer2_sync_destroy(hammer2_thread_t *thr,
66                         hammer2_chain_t **parentp, hammer2_chain_t **chainp,
67                         hammer2_tid_t mtid, int idx);
68 static int hammer2_sync_replace(hammer2_thread_t *thr,
69                         hammer2_chain_t *parent, hammer2_chain_t *chain,
70                         hammer2_tid_t mtid, int idx,
71                         hammer2_chain_t *focus);
72
73 /****************************************************************************
74  *                          HAMMER2 THREAD API                              *
75  ****************************************************************************/
76 /*
77  * Initialize the suspplied thread structure, starting the specified
78  * thread.
79  */
80 void
81 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp,
82                    const char *id, int clindex, int repidx,
83                    void (*func)(void *arg))
84 {
85         lockinit(&thr->lk, "h2thr", 0, 0);
86         thr->pmp = pmp;
87         thr->clindex = clindex;
88         thr->repidx = repidx;
89         TAILQ_INIT(&thr->xopq);
90         if (repidx >= 0) {
91                 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
92                             "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx);
93         } else {
94                 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
95                             "%s-%s", id, pmp->pfs_names[clindex]);
96         }
97 }
98
99 /*
100  * Terminate a thread.  This function will silently return if the thread
101  * was never initialized or has already been deleted.
102  *
103  * This is accomplished by setting the STOP flag and waiting for the td
104  * structure to become NULL.
105  */
106 void
107 hammer2_thr_delete(hammer2_thread_t *thr)
108 {
109         if (thr->td == NULL)
110                 return;
111         lockmgr(&thr->lk, LK_EXCLUSIVE);
112         atomic_set_int(&thr->flags, HAMMER2_THREAD_STOP);
113         wakeup(&thr->flags);
114         while (thr->td) {
115                 lksleep(thr, &thr->lk, 0, "h2thr", hz);
116         }
117         lockmgr(&thr->lk, LK_RELEASE);
118         thr->pmp = NULL;
119         lockuninit(&thr->lk);
120 }
121
122 /*
123  * Asynchronous remaster request.  Ask the synchronization thread to
124  * start over soon (as if it were frozen and unfrozen, but without waiting).
125  * The thread always recalculates mastership relationships when restarting.
126  */
127 void
128 hammer2_thr_remaster(hammer2_thread_t *thr)
129 {
130         if (thr->td == NULL)
131                 return;
132         lockmgr(&thr->lk, LK_EXCLUSIVE);
133         atomic_set_int(&thr->flags, HAMMER2_THREAD_REMASTER);
134         wakeup(&thr->flags);
135         lockmgr(&thr->lk, LK_RELEASE);
136 }
137
138 void
139 hammer2_thr_freeze_async(hammer2_thread_t *thr)
140 {
141         atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
142         wakeup(&thr->flags);
143 }
144
145 void
146 hammer2_thr_freeze(hammer2_thread_t *thr)
147 {
148         if (thr->td == NULL)
149                 return;
150         lockmgr(&thr->lk, LK_EXCLUSIVE);
151         atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
152         wakeup(&thr->flags);
153         while ((thr->flags & HAMMER2_THREAD_FROZEN) == 0) {
154                 lksleep(thr, &thr->lk, 0, "h2frz", hz);
155         }
156         lockmgr(&thr->lk, LK_RELEASE);
157 }
158
159 void
160 hammer2_thr_unfreeze(hammer2_thread_t *thr)
161 {
162         if (thr->td == NULL)
163                 return;
164         lockmgr(&thr->lk, LK_EXCLUSIVE);
165         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FROZEN);
166         wakeup(&thr->flags);
167         lockmgr(&thr->lk, LK_RELEASE);
168 }
169
170 static
171 int
172 hammer2_thr_break(hammer2_thread_t *thr)
173 {
174         if (thr->flags & (HAMMER2_THREAD_STOP |
175                           HAMMER2_THREAD_REMASTER |
176                           HAMMER2_THREAD_FREEZE)) {
177                 return 1;
178         }
179         return 0;
180 }
181
182 /****************************************************************************
183  *                          HAMMER2 SYNC THREADS                            *
184  ****************************************************************************/
185 /*
186  * Primary management thread for an element of a node.  A thread will exist
187  * for each element requiring management.
188  *
189  * No management threads are needed for the SPMP or for any PMP with only
190  * a single MASTER.
191  *
192  * On the SPMP - handles bulkfree and dedup operations
193  * On a PFS    - handles remastering and synchronization
194  */
195 void
196 hammer2_primary_sync_thread(void *arg)
197 {
198         hammer2_thread_t *thr = arg;
199         hammer2_pfs_t *pmp;
200         hammer2_deferred_list_t list;
201         hammer2_deferred_ip_t *defer;
202         int error;
203
204         pmp = thr->pmp;
205         bzero(&list, sizeof(list));
206
207         lockmgr(&thr->lk, LK_EXCLUSIVE);
208         while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
209                 /*
210                  * Handle freeze request
211                  */
212                 if (thr->flags & HAMMER2_THREAD_FREEZE) {
213                         atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
214                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
215                 }
216
217                 /*
218                  * Force idle if frozen until unfrozen or stopped.
219                  */
220                 if (thr->flags & HAMMER2_THREAD_FROZEN) {
221                         lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
222                         continue;
223                 }
224
225                 /*
226                  * Reset state on REMASTER request
227                  */
228                 if (thr->flags & HAMMER2_THREAD_REMASTER) {
229                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
230                         /* reset state */
231                 }
232
233                 /*
234                  * Synchronization scan.
235                  */
236                 kprintf("sync_slaves pfs %s clindex %d\n",
237                         pmp->pfs_names[thr->clindex], thr->clindex);
238                 hammer2_trans_init(pmp, 0);
239
240                 hammer2_inode_ref(pmp->iroot);
241
242                 for (;;) {
243                         int didbreak = 0;
244                         /* XXX lock synchronize pmp->modify_tid */
245                         error = hammer2_sync_slaves(thr, pmp->iroot, &list);
246                         if (error != EAGAIN)
247                                 break;
248                         while ((defer = list.base) != NULL) {
249                                 hammer2_inode_t *nip;
250
251                                 nip = defer->ip;
252                                 error = hammer2_sync_slaves(thr, nip, &list);
253                                 if (error && error != EAGAIN)
254                                         break;
255                                 if (hammer2_thr_break(thr)) {
256                                         didbreak = 1;
257                                         break;
258                                 }
259
260                                 /*
261                                  * If no additional defers occurred we can
262                                  * remove this one, otherwrise keep it on
263                                  * the list and retry once the additional
264                                  * defers have completed.
265                                  */
266                                 if (defer == list.base) {
267                                         --list.count;
268                                         list.base = defer->next;
269                                         kfree(defer, M_HAMMER2);
270                                         defer = NULL;   /* safety */
271                                         hammer2_inode_drop(nip);
272                                 }
273                         }
274
275                         /*
276                          * If the thread is being remastered, frozen, or
277                          * stopped, clean up any left-over deferals.
278                          */
279                         if (didbreak || (error && error != EAGAIN)) {
280                                 kprintf("didbreak\n");
281                                 while ((defer = list.base) != NULL) {
282                                         --list.count;
283                                         hammer2_inode_drop(defer->ip);
284                                         list.base = defer->next;
285                                         kfree(defer, M_HAMMER2);
286                                 }
287                                 if (error == 0 || error == EAGAIN)
288                                         error = EINPROGRESS;
289                                 break;
290                         }
291                 }
292
293                 hammer2_inode_drop(pmp->iroot);
294                 hammer2_trans_done(pmp);
295
296                 if (error)
297                         kprintf("hammer2_sync_slaves: error %d\n", error);
298
299                 /*
300                  * Wait for event, or 5-second poll.
301                  */
302                 lksleep(&thr->flags, &thr->lk, 0, "h2idle", hz * 5);
303         }
304         thr->td = NULL;
305         wakeup(thr);
306         lockmgr(&thr->lk, LK_RELEASE);
307         /* thr structure can go invalid after this point */
308 }
309
310 #if 0
311 /*
312  * Given a locked cluster created from pmp->iroot, update the PFS's
313  * reporting status.
314  */
315 static
316 void
317 hammer2_update_pfs_status(hammer2_thread_t *thr, uint32_t flags)
318 {
319         hammer2_pfs_t *pmp = thr->pmp;
320
321         flags &= HAMMER2_CLUSTER_ZFLAGS;
322         if (pmp->cluster_flags == flags)
323                 return;
324         pmp->cluster_flags = flags;
325
326         kprintf("pfs %p", pmp);
327         if (flags & HAMMER2_CLUSTER_MSYNCED)
328                 kprintf(" masters-all-good");
329         if (flags & HAMMER2_CLUSTER_SSYNCED)
330                 kprintf(" slaves-all-good");
331
332         if (flags & HAMMER2_CLUSTER_WRHARD)
333                 kprintf(" quorum/rw");
334         else if (flags & HAMMER2_CLUSTER_RDHARD)
335                 kprintf(" quorum/ro");
336
337         if (flags & HAMMER2_CLUSTER_UNHARD)
338                 kprintf(" out-of-sync-masters");
339         else if (flags & HAMMER2_CLUSTER_NOHARD)
340                 kprintf(" no-masters-visible");
341
342         if (flags & HAMMER2_CLUSTER_WRSOFT)
343                 kprintf(" soft/rw");
344         else if (flags & HAMMER2_CLUSTER_RDSOFT)
345                 kprintf(" soft/ro");
346
347         if (flags & HAMMER2_CLUSTER_UNSOFT)
348                 kprintf(" out-of-sync-slaves");
349         else if (flags & HAMMER2_CLUSTER_NOSOFT)
350                 kprintf(" no-slaves-visible");
351         kprintf("\n");
352 }
353 #endif
354
355 #if 0
356 static
357 void
358 dumpcluster(const char *label,
359             hammer2_cluster_t *cparent, hammer2_cluster_t *cluster)
360 {
361         hammer2_chain_t *chain;
362         int i;
363
364         if ((hammer2_debug & 1) == 0)
365                 return;
366
367         kprintf("%s\t", label);
368         KKASSERT(cparent->nchains == cluster->nchains);
369         for (i = 0; i < cparent->nchains; ++i) {
370                 if (i)
371                         kprintf("\t");
372                 kprintf("%d ", i);
373                 if ((chain = cparent->array[i].chain) != NULL) {
374                         kprintf("%016jx%s ",
375                                 chain->bref.key,
376                                 ((cparent->array[i].flags &
377                                   HAMMER2_CITEM_INVALID) ? "(I)" : "   ")
378                         );
379                 } else {
380                         kprintf("      NULL      %s ", "   ");
381                 }
382                 if ((chain = cluster->array[i].chain) != NULL) {
383                         kprintf("%016jx%s ",
384                                 chain->bref.key,
385                                 ((cluster->array[i].flags &
386                                   HAMMER2_CITEM_INVALID) ? "(I)" : "   ")
387                         );
388                 } else {
389                         kprintf("      NULL      %s ", "   ");
390                 }
391                 kprintf("\n");
392         }
393 }
394 #endif
395
396 /*
397  * Each out of sync node sync-thread must issue an all-nodes XOP scan of
398  * the inode.  This creates a multiplication effect since the XOP scan itself
399  * issues to all nodes.  However, this is the only way we can safely
400  * synchronize nodes which might have disparate I/O bandwidths and the only
401  * way we can safely deal with stalled nodes.
402  */
403 static
404 int
405 hammer2_sync_slaves(hammer2_thread_t *thr, hammer2_inode_t *ip,
406                     hammer2_deferred_list_t *list)
407 {
408         hammer2_xop_scanall_t *xop;
409         hammer2_chain_t *parent;
410         hammer2_chain_t *chain;
411         hammer2_pfs_t *pmp;
412         hammer2_key_t key_next;
413         hammer2_tid_t sync_tid;
414         int cache_index = -1;
415         int needrescan;
416         int wantupdate;
417         int error;
418         int nerror;
419         int idx;
420         int n;
421
422         pmp = ip->pmp;
423         idx = thr->clindex;     /* cluster node we are responsible for */
424         needrescan = 0;
425         wantupdate = 0;
426
427         if (ip->cluster.focus == NULL)
428                 return (EINPROGRESS);
429         sync_tid = ip->cluster.focus->bref.modify_tid;
430
431 #if 0
432         /*
433          * Nothing to do if all slaves are synchronized.
434          * Nothing to do if cluster not authoritatively readable.
435          */
436         if (pmp->cluster_flags & HAMMER2_CLUSTER_SSYNCED)
437                 return(0);
438         if ((pmp->cluster_flags & HAMMER2_CLUSTER_RDHARD) == 0)
439                 return(HAMMER2_ERROR_INCOMPLETE);
440 #endif
441
442         error = 0;
443
444         /*
445          * The inode is left unlocked during the scan.  Issue a XOP
446          * that does *not* include our cluster index to iterate
447          * properly synchronized elements and resolve our cluster index
448          * against it.
449          */
450         hammer2_inode_lock(ip, HAMMER2_RESOLVE_SHARED);
451         xop = hammer2_xop_alloc(ip, HAMMER2_XOP_MODIFYING);
452         xop->key_beg = HAMMER2_KEY_MIN;
453         xop->key_end = HAMMER2_KEY_MAX;
454         hammer2_xop_start_except(&xop->head, hammer2_xop_scanall, idx);
455         parent = hammer2_inode_chain(ip, idx,
456                                      HAMMER2_RESOLVE_ALWAYS |
457                                      HAMMER2_RESOLVE_SHARED);
458         if (parent->bref.modify_tid != sync_tid)
459                 wantupdate = 1;
460
461         hammer2_inode_unlock(ip);
462
463         chain = hammer2_chain_lookup(&parent, &key_next,
464                                      HAMMER2_KEY_MIN, HAMMER2_KEY_MAX,
465                                      &cache_index,
466                                      HAMMER2_LOOKUP_SHARED |
467                                      HAMMER2_LOOKUP_NODIRECT |
468                                      HAMMER2_LOOKUP_NODATA);
469         error = hammer2_xop_collect(&xop->head, 0);
470         kprintf("XOP_INITIAL xop=%p clindex %d on %s\n", xop, thr->clindex,
471                 pmp->pfs_names[thr->clindex]);
472
473         for (;;) {
474                 /*
475                  * We are done if our scan is done and the XOP scan is done.
476                  * We are done if the XOP scan failed (that is, we don't
477                  * have authoritative data to synchronize with).
478                  */
479                 int advance_local = 0;
480                 int advance_xop = 0;
481                 int dodefer = 0;
482                 hammer2_chain_t *focus;
483
484                 kprintf("loop xop=%p chain[1]=%p lockcnt=%d\n",
485                         xop, xop->head.cluster.array[1].chain,
486                         (xop->head.cluster.array[1].chain ?
487                             xop->head.cluster.array[1].chain->lockcnt : -1)
488                         );
489
490                 if (chain == NULL && error == ENOENT)
491                         break;
492                 if (error && error != ENOENT)
493                         break;
494
495                 /*
496                  * Compare
497                  */
498                 if (chain && error == ENOENT) {
499                         /*
500                          * If we have local chains but the XOP scan is done,
501                          * the chains need to be deleted.
502                          */
503                         n = -1;
504                         focus = NULL;
505                 } else if (chain == NULL) {
506                         /*
507                          * If our local scan is done but the XOP scan is not,
508                          * we need to create the missing chain(s).
509                          */
510                         n = 1;
511                         focus = xop->head.cluster.focus;
512                 } else {
513                         /*
514                          * Otherwise compare to determine the action
515                          * needed.
516                          */
517                         focus = xop->head.cluster.focus;
518                         n = hammer2_chain_cmp(chain, focus);
519                 }
520
521                 /*
522                  * Take action based on comparison results.
523                  */
524                 if (n < 0) {
525                         /*
526                          * Delete extranious local data.  This will
527                          * automatically advance the chain.
528                          */
529                         nerror = hammer2_sync_destroy(thr, &parent, &chain,
530                                                       0, idx);
531                 } else if (n == 0 && chain->bref.modify_tid !=
532                                      focus->bref.modify_tid) {
533                         /*
534                          * Matching key but local data or meta-data requires
535                          * updating.  If we will recurse, we still need to
536                          * update to compatible content first but we do not
537                          * synchronize modify_tid until the entire recursion
538                          * has completed successfully.
539                          */
540                         if (focus->bref.type == HAMMER2_BREF_TYPE_INODE) {
541                                 nerror = hammer2_sync_replace(
542                                                 thr, parent, chain,
543                                                 0,
544                                                 idx, focus);
545                                 dodefer = 1;
546                         } else {
547                                 nerror = hammer2_sync_replace(
548                                                 thr, parent, chain,
549                                                 focus->bref.modify_tid,
550                                                 idx, focus);
551                         }
552                 } else if (n == 0) {
553                         /*
554                          * 100% match, advance both
555                          */
556                         advance_local = 1;
557                         advance_xop = 1;
558                         nerror = 0;
559                 } else if (n > 0) {
560                         /*
561                          * Insert missing local data.
562                          *
563                          * If we will recurse, we still need to update to
564                          * compatible content first but we do not synchronize
565                          * modify_tid until the entire recursion has
566                          * completed successfully.
567                          */
568                         if (focus->bref.type == HAMMER2_BREF_TYPE_INODE) {
569                                 nerror = hammer2_sync_insert(
570                                                 thr, &parent, &chain,
571                                                 0,
572                                                 idx, focus);
573                                 dodefer = 2;
574                         } else {
575                                 nerror = hammer2_sync_insert(
576                                                 thr, &parent, &chain,
577                                                 focus->bref.modify_tid,
578                                                 idx, focus);
579                         }
580                         advance_local = 1;
581                         advance_xop = 1;
582                 }
583
584                 /*
585                  * We cannot recurse depth-first because the XOP is still
586                  * running in node threads for this scan.  Create a placemarker
587                  * by obtaining and record the hammer2_inode.
588                  *
589                  * We excluded our node from the XOP so we must temporarily
590                  * add it to xop->head.cluster so it is properly incorporated
591                  * into the inode.
592                  *
593                  * The deferral is pushed onto a LIFO list for bottom-up
594                  * synchronization.
595                  */
596                 if (error == 0 && dodefer) {
597                         hammer2_inode_t *nip;
598                         hammer2_deferred_ip_t *defer;
599
600                         KKASSERT(focus->bref.type == HAMMER2_BREF_TYPE_INODE);
601
602                         defer = kmalloc(sizeof(*defer), M_HAMMER2,
603                                         M_WAITOK | M_ZERO);
604                         KKASSERT(xop->head.cluster.array[idx].chain == NULL);
605                         xop->head.cluster.array[idx].flags =
606                                                         HAMMER2_CITEM_INVALID;
607                         xop->head.cluster.array[idx].chain = chain;
608                         nip = hammer2_inode_get(pmp, ip,
609                                                 &xop->head.cluster, idx);
610                         xop->head.cluster.array[idx].chain = NULL;
611
612                         hammer2_inode_ref(nip);
613                         hammer2_inode_unlock(nip);
614
615                         defer->next = list->base;
616                         defer->ip = nip;
617                         list->base = defer;
618                         ++list->count;
619                         needrescan = 1;
620                 }
621
622                 /*
623                  * If at least one deferral was added and the deferral
624                  * list has grown too large, stop adding more.  This
625                  * will trigger an EAGAIN return.
626                  */
627                 if (needrescan && list->count > 1000)
628                         break;
629
630                 /*
631                  * Advancements for iteration.
632                  */
633                 if (advance_xop) {
634                         error = hammer2_xop_collect(&xop->head, 0);
635                 }
636                 if (advance_local) {
637                         chain = hammer2_chain_next(&parent, chain, &key_next,
638                                                    key_next, HAMMER2_KEY_MAX,
639                                                    &cache_index,
640                                                    HAMMER2_LOOKUP_SHARED |
641                                                    HAMMER2_LOOKUP_NODIRECT |
642                                                    HAMMER2_LOOKUP_NODATA);
643                 }
644         }
645         hammer2_xop_retire(&xop->head, HAMMER2_XOPMASK_VOP);
646         if (chain) {
647                 hammer2_chain_unlock(chain);
648                 hammer2_chain_drop(chain);
649         }
650         if (parent) {
651                 hammer2_chain_unlock(parent);
652                 hammer2_chain_drop(parent);
653         }
654
655         /*
656          * If we added deferrals we want the caller to synchronize them
657          * and then call us again.
658          *
659          * NOTE: In this situation we do not yet want to synchronize our
660          *       inode, setting the error code also has that effect.
661          */
662         if (error == 0 && needrescan)
663                 error = EAGAIN;
664
665         /*
666          * If no error occurred and work was performed, synchronize the
667          * inode meta-data itself.
668          *
669          * XXX inode lock was lost
670          */
671         if (error == 0 && wantupdate) {
672                 hammer2_xop_ipcluster_t *xop2;
673                 hammer2_chain_t *focus;
674
675                 xop2 = hammer2_xop_alloc(ip, HAMMER2_XOP_MODIFYING);
676                 hammer2_xop_start_except(&xop2->head, hammer2_xop_ipcluster,
677                                          idx);
678                 error = hammer2_xop_collect(&xop2->head, 0);
679                 if (error == 0) {
680                         focus = xop2->head.cluster.focus;
681                         kprintf("syncthr: update inode %p (%s)\n",
682                                 focus,
683                                 (focus ?
684                                  (char *)focus->data->ipdata.filename : "?"));
685                         chain = hammer2_inode_chain_and_parent(ip, idx,
686                                                     &parent,
687                                                     HAMMER2_RESOLVE_ALWAYS |
688                                                     HAMMER2_RESOLVE_SHARED);
689
690                         KKASSERT(parent != NULL);
691                         nerror = hammer2_sync_replace(
692                                         thr, parent, chain,
693                                         sync_tid,
694                                         idx, focus);
695                         hammer2_chain_unlock(chain);
696                         hammer2_chain_drop(chain);
697                         hammer2_chain_unlock(parent);
698                         hammer2_chain_drop(parent);
699                         /* XXX */
700                 }
701                 hammer2_xop_retire(&xop2->head, HAMMER2_XOPMASK_VOP);
702         }
703
704         return error;
705 }
706
707 /*
708  * Create a missing chain by copying the focus from another device.
709  *
710  * On entry *parentp and focus are both locked shared.  The chain will be
711  * created and returned in *chainp also locked shared.
712  */
713 static
714 int
715 hammer2_sync_insert(hammer2_thread_t *thr,
716                     hammer2_chain_t **parentp, hammer2_chain_t **chainp,
717                     hammer2_tid_t mtid, int idx, hammer2_chain_t *focus)
718 {
719         hammer2_chain_t *chain;
720
721 #if HAMMER2_THREAD_DEBUG
722         if (hammer2_debug & 1)
723         kprintf("insert rec par=%p/%d.%016jx slave %d %d.%016jx mod=%016jx\n",
724                 *parentp, 
725                 (*parentp)->bref.type,
726                 (*parentp)->bref.key,
727                 idx,
728                 focus->bref.type, focus->bref.key, mtid);
729 #endif
730
731         /*
732          * Create the missing chain.  Exclusive locks are needed.
733          *
734          * Have to be careful to avoid deadlocks.
735          */
736         if (*chainp)
737                 hammer2_chain_unlock(*chainp);
738         hammer2_chain_unlock(*parentp);
739         hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_ALWAYS);
740         /* reissue lookup? */
741
742         chain = NULL;
743         hammer2_chain_create(parentp, &chain, thr->pmp,
744                              focus->bref.key, focus->bref.keybits,
745                              focus->bref.type, focus->bytes,
746                              mtid, 0);
747         hammer2_chain_modify(chain, mtid, 0);
748
749         /*
750          * Copy focus to new chain
751          */
752
753         /* type already set */
754         chain->bref.methods = focus->bref.methods;
755         /* keybits already set */
756         chain->bref.vradix = focus->bref.vradix;
757         /* mirror_tid set by flush */
758         KKASSERT(chain->bref.modify_tid == mtid);
759         chain->bref.flags = focus->bref.flags;
760         /* key already present */
761         /* check code will be recalculated */
762
763         /*
764          * Copy data body.
765          */
766         switch(chain->bref.type) {
767         case HAMMER2_BREF_TYPE_INODE:
768                 if ((focus->data->ipdata.meta.op_flags &
769                      HAMMER2_OPFLAG_DIRECTDATA) == 0) {
770                         bcopy(focus->data, chain->data,
771                               offsetof(hammer2_inode_data_t, u));
772                         break;
773                 }
774                 /* fall through */
775         case HAMMER2_BREF_TYPE_DATA:
776                 bcopy(focus->data, chain->data, chain->bytes);
777                 hammer2_chain_setcheck(chain, chain->data);
778                 break;
779         default:
780                 KKASSERT(0);
781                 break;
782         }
783
784         hammer2_chain_unlock(chain);            /* unlock, leave ref */
785         if (*chainp)
786                 hammer2_chain_drop(*chainp);
787         *chainp = chain;                        /* will be returned locked */
788
789         /*
790          * Avoid ordering deadlock when relocking.
791          */
792         hammer2_chain_unlock(*parentp);
793         hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_SHARED |
794                                      HAMMER2_RESOLVE_ALWAYS);
795         hammer2_chain_lock(chain, HAMMER2_RESOLVE_SHARED |
796                                   HAMMER2_RESOLVE_ALWAYS);
797
798         return 0;
799 }
800
801 /*
802  * Destroy an extranious chain.
803  *
804  * Both *parentp and *chainp are locked shared.
805  *
806  * On return, *chainp will be adjusted to point to the next element in the
807  * iteration and locked shared.
808  */
809 static
810 int
811 hammer2_sync_destroy(hammer2_thread_t *thr,
812                      hammer2_chain_t **parentp, hammer2_chain_t **chainp,
813                      hammer2_tid_t mtid, int idx)
814 {
815         hammer2_chain_t *chain;
816         hammer2_chain_t *parent;
817         hammer2_key_t key_next;
818         hammer2_key_t save_key;
819         int cache_index = -1;
820
821         chain = *chainp;
822
823 #if HAMMER2_THREAD_DEBUG
824         if (hammer2_debug & 1)
825         kprintf("destroy rec %p/%p slave %d %d.%016jx\n",
826                 *parentp, chain,
827                 idx, chain->bref.type, chain->bref.key);
828 #endif
829
830         save_key = chain->bref.key;
831         if (save_key != HAMMER2_KEY_MAX)
832                 ++save_key;
833
834         /*
835          * Try to avoid unnecessary I/O.
836          *
837          * XXX accounting not propagated up properly.  We might have to do
838          *     a RESOLVE_MAYBE here and pass 0 for the flags.
839          */
840         hammer2_chain_unlock(chain);    /* relock exclusive */
841         hammer2_chain_unlock(*parentp);
842         hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_ALWAYS);
843         hammer2_chain_lock(chain, HAMMER2_RESOLVE_NEVER);
844
845         hammer2_chain_delete(*parentp, chain, mtid, HAMMER2_DELETE_PERMANENT);
846         hammer2_chain_unlock(chain);
847         hammer2_chain_drop(chain);
848         chain = NULL;                   /* safety */
849
850         hammer2_chain_unlock(*parentp); /* relock shared */
851         hammer2_chain_lock(*parentp, HAMMER2_RESOLVE_SHARED |
852                                      HAMMER2_RESOLVE_ALWAYS);
853         *chainp = hammer2_chain_lookup(&parent, &key_next,
854                                      save_key, HAMMER2_KEY_MAX,
855                                      &cache_index,
856                                      HAMMER2_LOOKUP_SHARED |
857                                      HAMMER2_LOOKUP_NODIRECT |
858                                      HAMMER2_LOOKUP_NODATA);
859         return 0;
860 }
861
862 /*
863  * cparent is locked exclusively, with an extra ref, cluster is not locked.
864  * Replace element [i] in the cluster.
865  */
866 static
867 int
868 hammer2_sync_replace(hammer2_thread_t *thr,
869                      hammer2_chain_t *parent, hammer2_chain_t *chain,
870                      hammer2_tid_t mtid, int idx,
871                      hammer2_chain_t *focus)
872 {
873         int nradix;
874         uint8_t otype;
875
876 #if HAMMER2_THREAD_DEBUG
877         if (hammer2_debug & 1)
878         kprintf("replace rec %p slave %d %d.%016jx mod=%016jx\n",
879                 chain,
880                 idx,
881                 focus->bref.type, focus->bref.key, mtid);
882 #endif
883         hammer2_chain_unlock(chain);
884         hammer2_chain_lock(chain, HAMMER2_RESOLVE_ALWAYS);
885         if (chain->bytes != focus->bytes) {
886                 /* XXX what if compressed? */
887                 nradix = hammer2_getradix(chain->bytes);
888                 hammer2_chain_resize(NULL, parent, chain,
889                                      mtid, nradix, 0);
890         }
891         hammer2_chain_modify(chain, mtid, 0);
892         otype = chain->bref.type;
893         chain->bref.type = focus->bref.type;
894         chain->bref.methods = focus->bref.methods;
895         chain->bref.keybits = focus->bref.keybits;
896         chain->bref.vradix = focus->bref.vradix;
897         /* mirror_tid updated by flush */
898         KKASSERT(chain->bref.modify_tid == mtid);
899         chain->bref.flags = focus->bref.flags;
900         /* key already present */
901         /* check code will be recalculated */
902         chain->error = 0;
903
904         /*
905          * Copy data body.
906          */
907         switch(chain->bref.type) {
908         case HAMMER2_BREF_TYPE_INODE:
909                 if ((focus->data->ipdata.meta.op_flags &
910                      HAMMER2_OPFLAG_DIRECTDATA) == 0) {
911                         /*
912                          * If DIRECTDATA is transitioning to 0 or the old
913                          * chain is not an inode we have to initialize
914                          * the block table.
915                          */
916                         if (otype != HAMMER2_BREF_TYPE_INODE ||
917                             (chain->data->ipdata.meta.op_flags &
918                              HAMMER2_OPFLAG_DIRECTDATA)) {
919                                 kprintf("chain inode trans away from dd\n");
920                                 bzero(&chain->data->ipdata.u,
921                                       sizeof(chain->data->ipdata.u));
922                         }
923                         bcopy(focus->data, chain->data,
924                               offsetof(hammer2_inode_data_t, u));
925                         /* XXX setcheck on inode should not be needed */
926                         hammer2_chain_setcheck(chain, chain->data);
927                         break;
928                 }
929                 /* fall through */
930         case HAMMER2_BREF_TYPE_DATA:
931                 bcopy(focus->data, chain->data, chain->bytes);
932                 hammer2_chain_setcheck(chain, chain->data);
933                 break;
934         default:
935                 KKASSERT(0);
936                 break;
937         }
938
939         hammer2_chain_unlock(chain);
940         hammer2_chain_lock(chain, HAMMER2_RESOLVE_SHARED |
941                                   HAMMER2_RESOLVE_MAYBE);
942
943         return 0;
944 }
945
946 /****************************************************************************
947  *                          HAMMER2 XOPS THREADS                            *
948  ****************************************************************************/
949
950 void
951 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp)
952 {
953         hammer2_mtx_init(&xgrp->mtx, "h2xopq");
954         hammer2_mtx_init(&xgrp->mtx2, "h2xopio");
955 }
956
957 /*
958  * Allocate a XOP request.
959  *
960  * Once allocated a XOP request can be started, collected, and retired,
961  * and can be retired early if desired.
962  *
963  * NOTE: Fifo indices might not be zero but ri == wi on objcache_get().
964  */
965 void *
966 hammer2_xop_alloc(hammer2_inode_t *ip, int flags)
967 {
968         hammer2_xop_t *xop;
969
970         xop = objcache_get(cache_xops, M_WAITOK);
971         KKASSERT(xop->head.cluster.array[0].chain == NULL);
972         xop->head.ip = ip;
973         xop->head.func = NULL;
974         xop->head.state = 0;
975         xop->head.error = 0;
976         xop->head.collect_key = 0;
977         if (flags & HAMMER2_XOP_MODIFYING)
978                 xop->head.mtid = hammer2_trans_sub(ip->pmp);
979         else
980                 xop->head.mtid = 0;
981
982         xop->head.cluster.nchains = ip->cluster.nchains;
983         xop->head.cluster.pmp = ip->pmp;
984         xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED;
985
986         /*
987          * run_mask - Active thread (or frontend) associated with XOP
988          */
989         xop->head.run_mask = HAMMER2_XOPMASK_VOP;
990
991         hammer2_inode_ref(ip);
992
993         return xop;
994 }
995
996 void
997 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len)
998 {
999         xop->name = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO);
1000         xop->name_len = name_len;
1001         bcopy(name, xop->name, name_len);
1002 }
1003
1004 void
1005 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len)
1006 {
1007         xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO);
1008         xop->name2_len = name_len;
1009         bcopy(name, xop->name2, name_len);
1010 }
1011
1012
1013 void
1014 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2)
1015 {
1016         xop->ip2 = ip2;
1017         hammer2_inode_ref(ip2);
1018 }
1019
1020 void
1021 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3)
1022 {
1023         xop->ip3 = ip3;
1024         hammer2_inode_ref(ip3);
1025 }
1026
1027 void
1028 hammer2_xop_reinit(hammer2_xop_head_t *xop)
1029 {
1030         xop->state = 0;
1031         xop->error = 0;
1032         xop->collect_key = 0;
1033         xop->run_mask = HAMMER2_XOPMASK_VOP;
1034 }
1035
1036 /*
1037  * A mounted PFS needs Xops threads to support frontend operations.
1038  */
1039 void
1040 hammer2_xop_helper_create(hammer2_pfs_t *pmp)
1041 {
1042         int i;
1043         int j;
1044
1045         lockmgr(&pmp->lock, LK_EXCLUSIVE);
1046         pmp->has_xop_threads = 1;
1047
1048         for (i = 0; i < pmp->iroot->cluster.nchains; ++i) {
1049                 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
1050                         if (pmp->xop_groups[j].thrs[i].td)
1051                                 continue;
1052                         hammer2_thr_create(&pmp->xop_groups[j].thrs[i], pmp,
1053                                            "h2xop", i, j,
1054                                            hammer2_primary_xops_thread);
1055                 }
1056         }
1057         lockmgr(&pmp->lock, LK_RELEASE);
1058 }
1059
1060 void
1061 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp)
1062 {
1063         int i;
1064         int j;
1065
1066         for (i = 0; i < pmp->pfs_nmasters; ++i) {
1067                 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
1068                         if (pmp->xop_groups[j].thrs[i].td)
1069                                 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]);
1070                 }
1071         }
1072 }
1073
1074 /*
1075  * Start a XOP request, queueing it to all nodes in the cluster to
1076  * execute the cluster op.
1077  *
1078  * XXX optimize single-target case.
1079  */
1080 void
1081 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_func_t func,
1082                          int notidx)
1083 {
1084         hammer2_xop_group_t *xgrp;
1085         hammer2_thread_t *thr;
1086         hammer2_pfs_t *pmp;
1087         int g;
1088         int i;
1089
1090         pmp = xop->ip->pmp;
1091         if (pmp->has_xop_threads == 0)
1092                 hammer2_xop_helper_create(pmp);
1093
1094         g = pmp->xop_iterator++;
1095         g = g & HAMMER2_XOPGROUPS_MASK;
1096         xgrp = &pmp->xop_groups[g];
1097         xop->func = func;
1098         xop->xgrp = xgrp;
1099
1100         /* XXX do cluster_resolve or cluster_check here, only start
1101          * synchronized elements */
1102
1103         for (i = 0; i < xop->ip->cluster.nchains; ++i) {
1104                 thr = &xgrp->thrs[i];
1105                 if (thr->td && i != notidx) {
1106                         lockmgr(&thr->lk, LK_EXCLUSIVE);
1107                         if (thr->td &&
1108                             (thr->flags & HAMMER2_THREAD_STOP) == 0) {
1109                                 atomic_set_int(&xop->run_mask, 1U << i);
1110                                 atomic_set_int(&xop->chk_mask, 1U << i);
1111                                 TAILQ_INSERT_TAIL(&thr->xopq, xop,
1112                                                   collect[i].entry);
1113                         }
1114                         lockmgr(&thr->lk, LK_RELEASE);
1115                         wakeup(&thr->flags);
1116                 }
1117         }
1118 }
1119
1120 void
1121 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_func_t func)
1122 {
1123         hammer2_xop_start_except(xop, func, -1);
1124 }
1125
1126 /*
1127  * Retire a XOP.  Used by both the VOP frontend and by the XOP backend.
1128  */
1129 void
1130 hammer2_xop_retire(hammer2_xop_head_t *xop, uint32_t mask)
1131 {
1132         hammer2_xop_group_t *xgrp;
1133         hammer2_chain_t *chain;
1134         int i;
1135
1136         xgrp = xop->xgrp;
1137
1138         /*
1139          * Remove the frontend or remove a backend feeder.  When removing
1140          * the frontend we must wakeup any backend feeders who are waiting
1141          * for FIFO space.
1142          *
1143          * XXX optimize wakeup.
1144          */
1145         KKASSERT(xop->run_mask & mask);
1146         if (atomic_fetchadd_int(&xop->run_mask, -mask) != mask) {
1147                 if (mask == HAMMER2_XOPMASK_VOP)
1148                         wakeup(xop);
1149                 return;
1150         }
1151
1152         /*
1153          * Cleanup the collection cluster.
1154          */
1155         for (i = 0; i < xop->cluster.nchains; ++i) {
1156                 xop->cluster.array[i].flags = 0;
1157                 chain = xop->cluster.array[i].chain;
1158                 if (chain) {
1159                         xop->cluster.array[i].chain = NULL;
1160                         hammer2_chain_unlock(chain);
1161                         hammer2_chain_drop(chain);
1162                 }
1163         }
1164
1165         /*
1166          * Cleanup the fifos, use check_counter to optimize the loop.
1167          */
1168         mask = xop->chk_mask;
1169         for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) {
1170                 hammer2_xop_fifo_t *fifo = &xop->collect[i];
1171                 while (fifo->ri != fifo->wi) {
1172                         chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1173                         if (chain) {
1174                                 hammer2_chain_unlock(chain);
1175                                 hammer2_chain_drop(chain);
1176                         }
1177                         ++fifo->ri;
1178                         if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1179                                 wakeup(xop);    /* XXX optimize */
1180                 }
1181                 mask &= ~(1U << i);
1182         }
1183
1184         /*
1185          * The inode is only held at this point, simply drop it.
1186          */
1187         if (xop->ip) {
1188                 hammer2_inode_drop(xop->ip);
1189                 xop->ip = NULL;
1190         }
1191         if (xop->ip2) {
1192                 hammer2_inode_drop(xop->ip2);
1193                 xop->ip2 = NULL;
1194         }
1195         if (xop->ip3) {
1196                 hammer2_inode_drop(xop->ip3);
1197                 xop->ip3 = NULL;
1198         }
1199         if (xop->name) {
1200                 kfree(xop->name, M_HAMMER2);
1201                 xop->name = NULL;
1202                 xop->name_len = 0;
1203         }
1204         if (xop->name2) {
1205                 kfree(xop->name2, M_HAMMER2);
1206                 xop->name2 = NULL;
1207                 xop->name2_len = 0;
1208         }
1209
1210         objcache_put(cache_xops, xop);
1211 }
1212
1213 /*
1214  * (Backend) Returns non-zero if the frontend is still attached.
1215  */
1216 int
1217 hammer2_xop_active(hammer2_xop_head_t *xop)
1218 {
1219         if (xop->run_mask & HAMMER2_XOPMASK_VOP)
1220                 return 1;
1221         else
1222                 return 0;
1223 }
1224
1225 /*
1226  * (Backend) Feed chain data through the cluster validator and back to
1227  * the frontend.  Chains are fed from multiple nodes concurrently
1228  * and pipelined via per-node FIFOs in the XOP.
1229  *
1230  * No xop lock is needed because we are only manipulating fields under
1231  * our direct control.
1232  *
1233  * Returns 0 on success and a hammer error code if sync is permanently
1234  * lost.  The caller retains a ref on the chain but by convention
1235  * the lock is typically inherited by the xop (caller loses lock).
1236  *
1237  * Returns non-zero on error.  In this situation the caller retains a
1238  * ref on the chain but loses the lock (we unlock here).
1239  *
1240  * WARNING!  The chain is moving between two different threads, it must
1241  *           be locked SHARED to retain its data mapping, not exclusive.
1242  *           When multiple operations are in progress at once, chains fed
1243  *           back to the frontend for collection can wind up being locked
1244  *           in different orders, only a shared lock can prevent a deadlock.
1245  *
1246  *           Exclusive locks may only be used by a XOP backend node thread
1247  *           temporarily, with no direct or indirect dependencies (aka
1248  *           blocking/waiting) on other nodes.
1249  */
1250 int
1251 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain,
1252                  int clindex, int error)
1253 {
1254         hammer2_xop_fifo_t *fifo;
1255
1256         /*
1257          * Multi-threaded entry into the XOP collector.  We own the
1258          * fifo->wi for our clindex.
1259          */
1260         fifo = &xop->collect[clindex];
1261
1262         while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1263                 tsleep_interlock(xop, 0);
1264                 if (hammer2_xop_active(xop) == 0) {
1265                         error = EINTR;
1266                         goto done;
1267                 }
1268                 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
1269                         tsleep(xop, PINTERLOCKED, "h2feed", hz*60);
1270                 }
1271         }
1272         if (chain)
1273                 hammer2_chain_ref(chain);
1274         fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error;
1275         fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain;
1276         cpu_sfence();
1277         ++fifo->wi;
1278         atomic_add_int(&xop->check_counter, 1);
1279         wakeup(&xop->check_counter);    /* XXX optimize */
1280         error = 0;
1281
1282         /*
1283          * Cleanup.  If an error occurred we eat the lock.  If no error
1284          * occurred the fifo inherits the lock and gains an additional ref.
1285          *
1286          * The caller's ref remains in both cases.
1287          */
1288 done:
1289         if (error && chain)
1290                 hammer2_chain_unlock(chain);
1291         return error;
1292 }
1293
1294 /*
1295  * (Frontend) collect a response from a running cluster op.
1296  *
1297  * Responses are fed from all appropriate nodes concurrently
1298  * and collected into a cohesive response >= collect_key.
1299  *
1300  * The collector will return the instant quorum or other requirements
1301  * are met, even if some nodes get behind or become non-responsive.
1302  *
1303  * HAMMER2_XOP_COLLECT_NOWAIT   - Used to 'poll' a completed collection,
1304  *                                usually called synchronously from the
1305  *                                node XOPs for the strategy code to
1306  *                                fake the frontend collection and complete
1307  *                                the BIO as soon as possible.
1308  *
1309  * HAMMER2_XOP_SYNCHRONIZER     - Reqeuest synchronization with a particular
1310  *                                cluster index, prevents looping when that
1311  *                                index is out of sync so caller can act on
1312  *                                the out of sync element.  ESRCH and EDEADLK
1313  *                                can be returned if this flag is specified.
1314  *
1315  * Returns 0 on success plus a filled out xop->cluster structure.
1316  * Return ENOENT on normal termination.
1317  * Otherwise return an error.
1318  */
1319 int
1320 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags)
1321 {
1322         hammer2_xop_fifo_t *fifo;
1323         hammer2_chain_t *chain;
1324         hammer2_key_t lokey;
1325         int error;
1326         int keynull;
1327         int adv;                /* advance the element */
1328         int i;
1329         uint32_t check_counter;
1330
1331 loop:
1332         /*
1333          * First loop tries to advance pieces of the cluster which
1334          * are out of sync.
1335          */
1336         lokey = HAMMER2_KEY_MAX;
1337         keynull = HAMMER2_CHECK_NULL;
1338         check_counter = xop->check_counter;
1339         cpu_lfence();
1340
1341         for (i = 0; i < xop->cluster.nchains; ++i) {
1342                 chain = xop->cluster.array[i].chain;
1343                 if (chain == NULL) {
1344                         adv = 1;
1345                 } else if (chain->bref.key < xop->collect_key) {
1346                         adv = 1;
1347                 } else {
1348                         keynull &= ~HAMMER2_CHECK_NULL;
1349                         if (lokey > chain->bref.key)
1350                                 lokey = chain->bref.key;
1351                         adv = 0;
1352                 }
1353                 if (adv == 0)
1354                         continue;
1355
1356                 /*
1357                  * Advance element if possible, advanced element may be NULL.
1358                  */
1359                 if (chain) {
1360                         hammer2_chain_unlock(chain);
1361                         hammer2_chain_drop(chain);
1362                 }
1363                 fifo = &xop->collect[i];
1364                 if (fifo->ri != fifo->wi) {
1365                         cpu_lfence();
1366                         chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
1367                         ++fifo->ri;
1368                         xop->cluster.array[i].chain = chain;
1369                         if (chain == NULL) {
1370                                 /* XXX */
1371                                 xop->cluster.array[i].flags |=
1372                                                         HAMMER2_CITEM_NULL;
1373                         }
1374                         if (fifo->wi - fifo->ri < HAMMER2_XOPFIFO / 2)
1375                                 wakeup(xop);    /* XXX optimize */
1376                         --i;            /* loop on same index */
1377                 } else {
1378                         /*
1379                          * Retain CITEM_NULL flag.  If set just repeat EOF.
1380                          * If not, the NULL,0 combination indicates an
1381                          * operation in-progress.
1382                          */
1383                         xop->cluster.array[i].chain = NULL;
1384                         /* retain any CITEM_NULL setting */
1385                 }
1386         }
1387
1388         /*
1389          * Determine whether the lowest collected key meets clustering
1390          * requirements.  Returns:
1391          *
1392          * 0             - key valid, cluster can be returned.
1393          *
1394          * ENOENT        - normal end of scan, return ENOENT.
1395          *
1396          * ESRCH         - sufficient elements collected, quorum agreement
1397          *                 that lokey is not a valid element and should be
1398          *                 skipped.
1399          *
1400          * EDEADLK       - sufficient elements collected, no quorum agreement
1401          *                 (and no agreement possible).  In this situation a
1402          *                 repair is needed, for now we loop.
1403          *
1404          * EINPROGRESS   - insufficient elements collected to resolve, wait
1405          *                 for event and loop.
1406          */
1407         if ((flags & HAMMER2_XOP_COLLECT_WAITALL) &&
1408             xop->run_mask != HAMMER2_XOPMASK_VOP) {
1409                 error = EINPROGRESS;
1410         } else {
1411                 error = hammer2_cluster_check(&xop->cluster, lokey, keynull);
1412         }
1413         if (error == EINPROGRESS) {
1414                 if (xop->check_counter == check_counter) {
1415                         if (flags & HAMMER2_XOP_COLLECT_NOWAIT)
1416                                 goto done;
1417                         tsleep_interlock(&xop->check_counter, 0);
1418                         cpu_lfence();
1419                         if (xop->check_counter == check_counter) {
1420                                 tsleep(&xop->check_counter, PINTERLOCKED,
1421                                         "h2coll", hz*60);
1422                         }
1423                 }
1424                 goto loop;
1425         }
1426         if (error == ESRCH) {
1427                 if (lokey != HAMMER2_KEY_MAX) {
1428                         xop->collect_key = lokey + 1;
1429                         goto loop;
1430                 }
1431                 error = ENOENT;
1432         }
1433         if (error == EDEADLK) {
1434                 kprintf("hammer2: no quorum possible lokey %016jx\n",
1435                         lokey);
1436                 if (lokey != HAMMER2_KEY_MAX) {
1437                         xop->collect_key = lokey + 1;
1438                         goto loop;
1439                 }
1440                 error = ENOENT;
1441         }
1442         if (lokey == HAMMER2_KEY_MAX)
1443                 xop->collect_key = lokey;
1444         else
1445                 xop->collect_key = lokey + 1;
1446 done:
1447         return error;
1448 }
1449
1450 /*
1451  * Primary management thread for xops support.  Each node has several such
1452  * threads which replicate front-end operations on cluster nodes.
1453  *
1454  * XOPS thread node operations, allowing the function to focus on a single
1455  * node in the cluster after validating the operation with the cluster.
1456  * This is primarily what prevents dead or stalled nodes from stalling
1457  * the front-end.
1458  */
1459 void
1460 hammer2_primary_xops_thread(void *arg)
1461 {
1462         hammer2_thread_t *thr = arg;
1463         hammer2_pfs_t *pmp;
1464         hammer2_xop_head_t *xop;
1465         hammer2_xop_group_t *xgrp;
1466         uint32_t mask;
1467
1468         pmp = thr->pmp;
1469         xgrp = &pmp->xop_groups[thr->repidx];
1470         mask = 1U << thr->clindex;
1471
1472         lockmgr(&thr->lk, LK_EXCLUSIVE);
1473         while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
1474                 /*
1475                  * Handle freeze request
1476                  */
1477                 if (thr->flags & HAMMER2_THREAD_FREEZE) {
1478                         atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
1479                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
1480                 }
1481
1482                 /*
1483                  * Force idle if frozen until unfrozen or stopped.
1484                  */
1485                 if (thr->flags & HAMMER2_THREAD_FROZEN) {
1486                         lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
1487                         continue;
1488                 }
1489
1490                 /*
1491                  * Reset state on REMASTER request
1492                  */
1493                 if (thr->flags & HAMMER2_THREAD_REMASTER) {
1494                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
1495                         /* reset state */
1496                 }
1497
1498                 /*
1499                  * Process requests.  Each request can be multi-queued.
1500                  *
1501                  * If we get behind and the frontend VOP is no longer active,
1502                  * we retire the request without processing it.  The callback
1503                  * may also abort processing if the frontend VOP becomes
1504                  * inactive.
1505                  */
1506                 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1507                         TAILQ_REMOVE(&thr->xopq, xop,
1508                                      collect[thr->clindex].entry);
1509                         if (hammer2_xop_active(xop)) {
1510                                 lockmgr(&thr->lk, LK_RELEASE);
1511                                 xop->func((hammer2_xop_t *)xop, thr->clindex);
1512                                 hammer2_xop_retire(xop, mask);
1513                                 lockmgr(&thr->lk, LK_EXCLUSIVE);
1514                         } else {
1515                                 hammer2_xop_feed(xop, NULL, thr->clindex,
1516                                                  ECONNABORTED);
1517                                 hammer2_xop_retire(xop, mask);
1518                         }
1519                 }
1520
1521                 /*
1522                  * Wait for event.
1523                  */
1524                 lksleep(&thr->flags, &thr->lk, 0, "h2idle", 0);
1525         }
1526
1527         /*
1528          * Cleanup / termination
1529          */
1530         while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1531                 kprintf("hammer2_thread: aborting xop %p\n", xop->func);
1532                 TAILQ_REMOVE(&thr->xopq, xop,
1533                              collect[thr->clindex].entry);
1534                 hammer2_xop_retire(xop, mask);
1535         }
1536
1537         thr->td = NULL;
1538         wakeup(thr);
1539         lockmgr(&thr->lk, LK_RELEASE);
1540         /* thr structure can go invalid after this point */
1541 }