hammer2 - Refactor frontend part 7/many
[dragonfly.git] / sys / vfs / hammer2 / hammer2_thread.c
1 /*
2  * Copyright (c) 2015 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module implements various PFS-based helper threads.
36  */
37 #include "hammer2.h"
38
39 #define HAMMER2_THREAD_DEBUG 1
40
41 static int hammer2_sync_slaves(hammer2_thread_t *thr,
42                         hammer2_cluster_t *cparent, int *errors);
43 static void hammer2_update_pfs_status(hammer2_thread_t *thr,
44                         hammer2_cluster_t *cparent);
45 static int hammer2_sync_insert(hammer2_thread_t *thr,
46                         hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
47                         hammer2_tid_t modify_tid,
48                         int i, int *errors);
49 static int hammer2_sync_destroy(hammer2_thread_t *thr,
50                         hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
51                         int i, int *errors);
52 static int hammer2_sync_replace(hammer2_thread_t *thr,
53                         hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
54                         hammer2_tid_t modify_tid,
55                         int i, int *errors);
56
57 /****************************************************************************
58  *                          HAMMER2 THREAD API                              *
59  ****************************************************************************/
60 /*
61  * Initialize the suspplied thread structure, starting the specified
62  * thread.
63  */
64 void
65 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp,
66                    const char *id, int clindex, int repidx,
67                    void (*func)(void *arg))
68 {
69         lockinit(&thr->lk, "h2thr", 0, 0);
70         thr->pmp = pmp;
71         thr->clindex = clindex;
72         thr->repidx = repidx;
73         lwkt_create(func, thr, &thr->td, NULL, 0, -1,
74                     "%s-%s", id, pmp->pfs_names[clindex]);
75 }
76
77 /*
78  * Terminate a thread.  This function will silently return if the thread
79  * was never initialized or has already been deleted.
80  *
81  * This is accomplished by setting the STOP flag and waiting for the td
82  * structure to become NULL.
83  */
84 void
85 hammer2_thr_delete(hammer2_thread_t *thr)
86 {
87         if (thr->td == NULL)
88                 return;
89         lockmgr(&thr->lk, LK_EXCLUSIVE);
90         atomic_set_int(&thr->flags, HAMMER2_THREAD_STOP);
91         wakeup(&thr->flags);
92         while (thr->td) {
93                 lksleep(thr, &thr->lk, 0, "h2thr", hz);
94         }
95         lockmgr(&thr->lk, LK_RELEASE);
96         thr->pmp = NULL;
97         lockuninit(&thr->lk);
98 }
99
100 /*
101  * Asynchronous remaster request.  Ask the synchronization thread to
102  * start over soon (as if it were frozen and unfrozen, but without waiting).
103  * The thread always recalculates mastership relationships when restarting.
104  */
105 void
106 hammer2_thr_remaster(hammer2_thread_t *thr)
107 {
108         if (thr->td == NULL)
109                 return;
110         lockmgr(&thr->lk, LK_EXCLUSIVE);
111         atomic_set_int(&thr->flags, HAMMER2_THREAD_REMASTER);
112         wakeup(&thr->flags);
113         lockmgr(&thr->lk, LK_RELEASE);
114 }
115
116 void
117 hammer2_thr_freeze_async(hammer2_thread_t *thr)
118 {
119         atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
120         wakeup(&thr->flags);
121 }
122
123 void
124 hammer2_thr_freeze(hammer2_thread_t *thr)
125 {
126         if (thr->td == NULL)
127                 return;
128         lockmgr(&thr->lk, LK_EXCLUSIVE);
129         atomic_set_int(&thr->flags, HAMMER2_THREAD_FREEZE);
130         wakeup(&thr->flags);
131         while ((thr->flags & HAMMER2_THREAD_FROZEN) == 0) {
132                 lksleep(thr, &thr->lk, 0, "h2frz", hz);
133         }
134         lockmgr(&thr->lk, LK_RELEASE);
135 }
136
137 void
138 hammer2_thr_unfreeze(hammer2_thread_t *thr)
139 {
140         if (thr->td == NULL)
141                 return;
142         lockmgr(&thr->lk, LK_EXCLUSIVE);
143         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FROZEN);
144         wakeup(&thr->flags);
145         lockmgr(&thr->lk, LK_RELEASE);
146 }
147
148 /****************************************************************************
149  *                          HAMMER2 SYNC THREADS                            *
150  ****************************************************************************/
151 /*
152  * Primary management thread for an element of a node.  A thread will exist
153  * for each element requiring management.
154  *
155  * No management threads are needed for the SPMP or for any PMP with only
156  * a single MASTER.
157  *
158  * On the SPMP - handles bulkfree and dedup operations
159  * On a PFS    - handles remastering and synchronization
160  */
161 void
162 hammer2_primary_sync_thread(void *arg)
163 {
164         hammer2_thread_t *thr = arg;
165         hammer2_cluster_t *cparent;
166         hammer2_chain_t *chain;
167         hammer2_pfs_t *pmp;
168         int errors[HAMMER2_MAXCLUSTER];
169         int error;
170
171         pmp = thr->pmp;
172
173         lockmgr(&thr->lk, LK_EXCLUSIVE);
174         while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
175                 /*
176                  * Handle freeze request
177                  */
178                 if (thr->flags & HAMMER2_THREAD_FREEZE) {
179                         atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
180                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
181                 }
182
183                 /*
184                  * Force idle if frozen until unfrozen or stopped.
185                  */
186                 if (thr->flags & HAMMER2_THREAD_FROZEN) {
187                         lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
188                         continue;
189                 }
190
191                 /*
192                  * Reset state on REMASTER request
193                  */
194                 if (thr->flags & HAMMER2_THREAD_REMASTER) {
195                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
196                         /* reset state */
197                 }
198
199                 /*
200                  * Synchronization scan.
201                  */
202                 hammer2_trans_init(&thr->trans, pmp, HAMMER2_TRANS_KEEPMODIFY);
203                 hammer2_inode_lock(pmp->iroot, HAMMER2_RESOLVE_ALWAYS);
204                 cparent = hammer2_inode_cluster(pmp->iroot,
205                                                 HAMMER2_RESOLVE_ALWAYS);
206                 hammer2_update_pfs_status(thr, cparent);
207                 hammer2_inode_unlock(pmp->iroot, NULL);
208                 bzero(errors, sizeof(errors));
209                 kprintf("sync_slaves clindex %d\n", thr->clindex);
210
211                 /*
212                  * We are the syncer, not a normal frontend operator,
213                  * so force cparent good to prime the scan.
214                  */
215                 hammer2_cluster_forcegood(cparent);
216                 error = hammer2_sync_slaves(thr, cparent, errors);
217                 if (error)
218                         kprintf("hammer2_sync_slaves: error %d\n", error);
219                 chain = cparent->array[thr->clindex].chain;
220
221                 /*
222                  * Retain chain for our node and release the cluster.
223                  */
224                 hammer2_chain_ref(chain);
225                 hammer2_chain_lock(chain, HAMMER2_RESOLVE_ALWAYS);
226                 hammer2_cluster_unlock(cparent);
227                 hammer2_cluster_drop(cparent);
228
229                 /*
230                  * Flush the chain.
231                  */
232                 hammer2_flush(&thr->trans, chain, 1);
233                 hammer2_chain_unlock(chain);
234                 hammer2_chain_drop(chain);
235
236                 hammer2_trans_done(&thr->trans);
237
238                 /*
239                  * Wait for event, or 5-second poll.
240                  */
241                 lksleep(&thr->flags, &thr->lk, 0, "h2idle", hz * 5);
242         }
243         thr->td = NULL;
244         wakeup(thr);
245         lockmgr(&thr->lk, LK_RELEASE);
246         /* thr structure can go invalid after this point */
247 }
248
249 /*
250  * Given a locked cluster created from pmp->iroot, update the PFS's
251  * reporting status.
252  */
253 static
254 void
255 hammer2_update_pfs_status(hammer2_thread_t *thr, hammer2_cluster_t *cparent)
256 {
257         hammer2_pfs_t *pmp = thr->pmp;
258         uint32_t flags;
259
260         flags = cparent->flags & HAMMER2_CLUSTER_ZFLAGS;
261         if (pmp->flags == flags)
262                 return;
263         pmp->flags = flags;
264
265         kprintf("pfs %p", pmp);
266         if (flags & HAMMER2_CLUSTER_MSYNCED)
267                 kprintf(" masters-all-good");
268         if (flags & HAMMER2_CLUSTER_SSYNCED)
269                 kprintf(" slaves-all-good");
270
271         if (flags & HAMMER2_CLUSTER_WRHARD)
272                 kprintf(" quorum/rw");
273         else if (flags & HAMMER2_CLUSTER_RDHARD)
274                 kprintf(" quorum/ro");
275
276         if (flags & HAMMER2_CLUSTER_UNHARD)
277                 kprintf(" out-of-sync-masters");
278         else if (flags & HAMMER2_CLUSTER_NOHARD)
279                 kprintf(" no-masters-visible");
280
281         if (flags & HAMMER2_CLUSTER_WRSOFT)
282                 kprintf(" soft/rw");
283         else if (flags & HAMMER2_CLUSTER_RDSOFT)
284                 kprintf(" soft/ro");
285
286         if (flags & HAMMER2_CLUSTER_UNSOFT)
287                 kprintf(" out-of-sync-slaves");
288         else if (flags & HAMMER2_CLUSTER_NOSOFT)
289                 kprintf(" no-slaves-visible");
290         kprintf("\n");
291 }
292
293 static
294 void
295 dumpcluster(const char *label,
296             hammer2_cluster_t *cparent, hammer2_cluster_t *cluster)
297 {
298         hammer2_chain_t *chain;
299         int i;
300
301         if ((hammer2_debug & 1) == 0)
302                 return;
303
304         kprintf("%s\t", label);
305         KKASSERT(cparent->nchains == cluster->nchains);
306         for (i = 0; i < cparent->nchains; ++i) {
307                 if (i)
308                         kprintf("\t");
309                 kprintf("%d ", i);
310                 if ((chain = cparent->array[i].chain) != NULL) {
311                         kprintf("%016jx%s ",
312                                 chain->bref.key,
313                                 ((cparent->array[i].flags &
314                                   HAMMER2_CITEM_INVALID) ? "(I)" : "   ")
315                         );
316                 } else {
317                         kprintf("      NULL      %s ", "   ");
318                 }
319                 if ((chain = cluster->array[i].chain) != NULL) {
320                         kprintf("%016jx%s ",
321                                 chain->bref.key,
322                                 ((cluster->array[i].flags &
323                                   HAMMER2_CITEM_INVALID) ? "(I)" : "   ")
324                         );
325                 } else {
326                         kprintf("      NULL      %s ", "   ");
327                 }
328                 kprintf("\n");
329         }
330 }
331
332 /*
333  * TODO - have cparent use a shared lock normally instead of exclusive,
334  *        (needs to be upgraded for slave adjustments).
335  */
336 static
337 int
338 hammer2_sync_slaves(hammer2_thread_t *thr, hammer2_cluster_t *cparent,
339                     int *errors)
340 {
341         hammer2_pfs_t *pmp;
342         hammer2_cluster_t *cluster;
343         hammer2_cluster_t *scluster;
344         hammer2_chain_t *focus;
345         hammer2_chain_t *chain;
346         hammer2_key_t key_next;
347         int error;
348         int nerror;
349         int idx;
350         int n;
351         int nowork;
352         int dorecursion;
353
354         pmp = thr->pmp;
355         idx = thr->clindex;     /* cluster node we are responsible for */
356
357         /*
358          * Nothing to do if all slaves are synchronized.
359          * Nothing to do if cluster not authoritatively readable.
360          */
361         if (pmp->flags & HAMMER2_CLUSTER_SSYNCED)
362                 return(0);
363         if ((pmp->flags & HAMMER2_CLUSTER_RDHARD) == 0)
364                 return(HAMMER2_ERROR_INCOMPLETE);
365
366         error = 0;
367
368         /*
369          * XXX snapshot the source to provide a stable source to copy.
370          */
371
372         /*
373          * Update all local slaves (remote slaves are handled by the sync
374          * threads on their respective hosts).
375          *
376          * Do a full topology scan, insert/delete elements on slaves as
377          * needed.  cparent must be ref'd so we can unlock and relock it
378          * on the recursion.
379          *
380          * ALLNODES - Allows clusters with a NULL focus to be returned if
381          *            elements remain on other nodes.
382          */
383         hammer2_cluster_ref(cparent);
384         cluster = hammer2_cluster_lookup(cparent, &key_next,
385                                          HAMMER2_KEY_MIN, HAMMER2_KEY_MAX,
386                                          HAMMER2_LOOKUP_NODATA |
387                                          HAMMER2_LOOKUP_NOLOCK |
388                                          HAMMER2_LOOKUP_NODIRECT |
389                                          HAMMER2_LOOKUP_ALLNODES);
390         dumpcluster("lookup", cparent, cluster);
391
392         /*
393          * Scan elements
394          */
395         while (cluster) {
396                 /*
397                  * nowork is adjusted during the loop,
398                  * dorecursion is calculated here.
399                  */
400                 nowork = 1;
401                 focus = cluster->focus;
402                 if (focus && focus->bref.type == HAMMER2_BREF_TYPE_INODE)
403                         dorecursion = 1;
404                 else
405                         dorecursion = 0;
406
407                 if (idx == 3 && (hammer2_debug & 1) && focus)
408                         kprintf("scan3 focus %d.%016jx %d.%016jx\n",
409                             (cparent ? cparent->focus->bref.type : 0xFF),
410                             (cparent ? cparent->focus->bref.key : (uintmax_t)-1LLU),
411                             focus->bref.type, focus->bref.key);
412 repeat1:
413                 /*
414                  * Synchronize chains to focus
415                  */
416                 if (idx >= cluster->nchains)
417                         goto skip1;
418                 chain = cluster->array[idx].chain;
419                 if (idx == 3 && (hammer2_debug & 1) && chain)
420                         kprintf("scan3 slave %d.%016jx %d.%016jx\n",
421                             ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.type : 0xFF),
422                             ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.key : (uintmax_t)-1LLU),
423                             cluster->array[idx].chain->bref.type,
424                             cluster->array[idx].chain->bref.key);
425                 if (idx == 3 && (hammer2_debug & 1) && chain == NULL)
426                         kprintf("scan3 slave %d.%16jx NULL\n",
427                             ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.type : 0xFF),
428                             ((cparent && cparent->array[idx].chain) ? cparent->array[idx].chain->bref.key : (uintmax_t)-1LLU)
429                         );
430
431                 /*
432                  * Disable recursion for this index and loop up
433                  * if a chain error is detected.
434                  *
435                  * A NULL chain is ok, it simply indicates that
436                  * the slave reached the end of its scan, but we
437                  * might have stuff from the master that still
438                  * needs to be copied in.
439                  */
440                 if (chain && chain->error) {
441                         kprintf("chain error index %d: %d\n",
442                                 idx, chain->error);
443                         errors[idx] = chain->error;
444                         error = chain->error;
445                         cluster->array[idx].flags |= HAMMER2_CITEM_INVALID;
446                         goto skip1;
447                 }
448
449                 /*
450                  * Skip if the slave already has the record (everything
451                  * matches including the modify_tid).  Note that the
452                  * mirror_tid does not have to match, mirror_tid is
453                  * a per-block-device entity.
454                  */
455                 if (chain &&
456                     (cluster->array[idx].flags & HAMMER2_CITEM_INVALID) == 0) {
457                         goto skip1;
458                 }
459
460                 /*
461                  * Invalid element needs to be updated.
462                  */
463                 nowork = 0;
464
465                 /*
466                  * Otherwise adjust the slave.  Compare the focus to
467                  * the chain.  Note that focus and chain can
468                  * independently be NULL.
469                  */
470                 KKASSERT(cluster->focus == focus);
471                 if (focus) {
472                         if (chain)
473                                 n = hammer2_chain_cmp(focus, chain);
474                         else
475                                 n = -1; /* end-of-scan on slave */
476                 } else {
477                         if (chain)
478                                 n = 1;  /* end-of-scan on focus */
479                         else
480                                 n = 0;  /* end-of-scan on both */
481                 }
482
483                 if (n < 0) {
484                         /*
485                          * slave chain missing, create missing chain.
486                          *
487                          * If we are going to recurse we have to set
488                          * the initial modify_tid to 0 until the
489                          * sub-tree is completely synchronized.
490                          * Setting (n = 0) in this situation forces
491                          * the replacement call to run on the way
492                          * back up after the sub-tree has
493                          * synchronized.
494                          */
495                         if (dorecursion) {
496                                 nerror = hammer2_sync_insert(
497                                                 thr, cparent, cluster,
498                                                 0,
499                                                 idx, errors);
500                                 if (nerror == 0)
501                                         n = 0;
502                         } else {
503                                 nerror = hammer2_sync_insert(
504                                                 thr, cparent, cluster,
505                                                 focus->bref.modify_tid,
506                                                 idx, errors);
507                         }
508                 } else if (n > 0) {
509                         /*
510                          * excess slave chain, destroy
511                          */
512                         nerror = hammer2_sync_destroy(thr,
513                                                       cparent, cluster,
514                                                       idx, errors);
515                         hammer2_cluster_next_single_chain(
516                                 cparent, cluster,
517                                 &key_next,
518                                 HAMMER2_KEY_MIN,
519                                 HAMMER2_KEY_MAX,
520                                 idx,
521                                 HAMMER2_LOOKUP_NODATA |
522                                 HAMMER2_LOOKUP_NOLOCK |
523                                 HAMMER2_LOOKUP_NODIRECT |
524                                 HAMMER2_LOOKUP_ALLNODES);
525                         /*
526                          * Re-execute same index, there might be more
527                          * items to delete before this slave catches
528                          * up to the focus.
529                          */
530                         goto repeat1;
531                 } else {
532                         /*
533                          * Key matched but INVALID was set which likely
534                          * means that modify_tid is out of sync.
535                          *
536                          * If we are going to recurse we have to do
537                          * a partial replacement of the parent to
538                          * ensure that the block array is compatible.
539                          * For example, the current slave inode might
540                          * be flagged DIRECTDATA when the focus is not.
541                          * We must set modify_tid to 0 for now and
542                          * will fix it when recursion is complete.
543                          *
544                          * If we are not going to recurse we can do
545                          * a normal replacement.
546                          *
547                          * focus && chain can both be NULL on a match.
548                          */
549                         if (dorecursion) {
550                                 nerror = hammer2_sync_replace(
551                                                 thr, cparent, cluster,
552                                                 0,
553                                                 idx, errors);
554                         } else if (focus) {
555                                 nerror = hammer2_sync_replace(
556                                                 thr, cparent, cluster,
557                                                 focus->bref.modify_tid,
558                                                 idx, errors);
559                         } else {
560                                 nerror = 0;
561                         }
562                 }
563                 if (nerror)
564                         error = nerror;
565                 /* finished primary synchronization of chains */
566
567 skip1:
568 #if 0
569                 /*
570                  * Operation may have modified cparent, we must replace
571                  * iroot->cluster if we are at the top level.
572                  */
573                 if (thr->depth == 0)
574                         hammer2_inode_repoint_one(pmp->iroot, cparent, idx);
575 #endif
576                 KKASSERT(cluster->focus == focus);
577
578                 /*
579                  * If no work to do this iteration, skip any recursion.
580                  */
581                 if (nowork)
582                         goto skip2;
583
584                 /*
585                  * EXECUTE RECURSION (skip if no recursion)
586                  *
587                  * Indirect blocks are absorbed by the iteration so we only
588                  * have to recurse on inodes.
589                  *
590                  * Do not resolve scluster, it represents the iteration
591                  * parent and while it is logically in-sync the physical
592                  * elements might not match due to the presence of indirect
593                  * blocks and such.
594                  */
595                 if (dorecursion == 0)
596                         goto skip2;
597                 if (thr->depth > 20) {
598                         kprintf("depth limit reached\n");
599                         nerror = HAMMER2_ERROR_DEPTH;
600                 } else {
601                         hammer2_cluster_unlock(cparent);
602                         scluster = hammer2_cluster_copy(cluster);
603                         hammer2_cluster_lock(scluster, HAMMER2_RESOLVE_ALWAYS);
604                         ++thr->depth;
605                         nerror = hammer2_sync_slaves(thr, scluster, errors);
606                         --thr->depth;
607                         hammer2_cluster_unlock(scluster);
608                         hammer2_cluster_drop(scluster);
609                         /* XXX modify_tid on scluster */
610                         /* flush needs to not update modify_tid */
611                         hammer2_cluster_lock(cparent, HAMMER2_RESOLVE_ALWAYS);
612                 }
613                 if (nerror)
614                         goto skip2;
615
616                 /*
617                  * Fixup parent nodes on the way back up from the recursion
618                  * if no error occurred.  The modify_tid for these nodes
619                  * would have been set to 0 and must be set to their final
620                  * value.
621                  */
622                 chain = cluster->array[idx].chain;
623                 if (chain == NULL || chain->error)
624                         goto skip2;
625                 /*
626                  * should not be set but must fixup parents.
627                 if ((cluster->array[idx].flags & HAMMER2_CITEM_INVALID) == 0)
628                         goto skip2;
629                 */
630
631                 /*
632                  * At this point we have to have key-matched non-NULL
633                  * elements.
634                  */
635                 n = hammer2_chain_cmp(focus, chain);
636                 if (n != 0) {
637                         kprintf("hammer2_sync_slaves: illegal "
638                                 "post-recursion state %d\n", n);
639                         goto skip2;
640                 }
641
642                 /*
643                  * Update modify_tid on the way back up.
644                  */
645                 nerror = hammer2_sync_replace(
646                                 thr, cparent, cluster,
647                                 focus->bref.modify_tid,
648                                 idx, errors);
649                 if (nerror)
650                         error = nerror;
651
652 #if 0
653                 /*
654                  * Operation may modify cparent, must replace
655                  * iroot->cluster if we are at the top level.
656                  */
657                 if (thr->depth == 0)
658                         hammer2_inode_repoint_one(pmp->iroot, cparent, idx);
659 #endif
660
661 skip2:
662                 /*
663                  * Iterate.
664                  */
665                 dumpcluster("adjust", cparent, cluster);
666                 cluster = hammer2_cluster_next(cparent, cluster,
667                                                &key_next,
668                                                HAMMER2_KEY_MIN,
669                                                HAMMER2_KEY_MAX,
670                                                HAMMER2_LOOKUP_NODATA |
671                                                HAMMER2_LOOKUP_NOLOCK |
672                                                HAMMER2_LOOKUP_NODIRECT |
673                                                HAMMER2_LOOKUP_ALLNODES);
674                 dumpcluster("nextcl", cparent, cluster);
675         }
676         hammer2_cluster_drop(cparent);
677         if (cluster)
678                 hammer2_cluster_drop(cluster);
679
680         return error;
681 }
682
683 /*
684  * cparent is locked exclusively, with an extra ref, cluster is not locked.
685  */
686 static
687 int
688 hammer2_sync_insert(hammer2_thread_t *thr,
689                     hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
690                     hammer2_tid_t modify_tid, int i, int *errors)
691 {
692         hammer2_chain_t *focus;
693         hammer2_chain_t *chain;
694         hammer2_key_t dummy;
695
696         focus = cluster->focus;
697 #if HAMMER2_THREAD_DEBUG
698         if (hammer2_debug & 1)
699         kprintf("insert rec par=%p/%d.%016jx slave %d %d.%016jx mod=%016jx\n",
700                 cparent->array[i].chain, 
701                 cparent->array[i].chain->bref.type,
702                 cparent->array[i].chain->bref.key,
703                 i, focus->bref.type, focus->bref.key, modify_tid);
704 #endif
705
706         /*
707          * We have to do a lookup to position ourselves at the correct
708          * parent when inserting a record into a new slave because the
709          * cluster iteration for this slave might not be pointing to the
710          * right place.  Our expectation is that the record will not be
711          * found.
712          */
713         hammer2_cluster_unlock_except(cparent, i);
714         chain = hammer2_chain_lookup(&cparent->array[i].chain, &dummy,
715                                      focus->bref.key, focus->bref.key,
716                                      &cparent->array[i].cache_index,
717                                      HAMMER2_LOOKUP_NODIRECT);
718         if (cparent->focus_index == i)
719                 cparent->focus = cparent->array[i].chain;
720         KKASSERT(chain == NULL);
721
722         /*
723          * Create the missing chain.
724          *
725          * Have to be careful to avoid deadlocks.
726          */
727         chain = NULL;
728         if (cluster->focus_index < i)
729                 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
730         hammer2_chain_create(&thr->trans, &cparent->array[i].chain,
731                              &chain, thr->pmp,
732                              focus->bref.key, focus->bref.keybits,
733                              focus->bref.type, focus->bytes,
734                              0);
735         if (cluster->focus_index > i)
736                 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
737         if (cparent->focus_index == i)
738                 cparent->focus = cparent->array[i].chain;
739         hammer2_chain_modify(&thr->trans, chain, 0);
740
741         /*
742          * Copy focus to new chain
743          */
744
745         /* type already set */
746         chain->bref.methods = focus->bref.methods;
747         /* keybits already set */
748         chain->bref.vradix = focus->bref.vradix;
749         /* mirror_tid set by flush */
750         chain->bref.modify_tid = modify_tid;
751         chain->bref.flags = focus->bref.flags;
752         /* key already present */
753         /* check code will be recalculated */
754
755         /*
756          * Copy data body.
757          */
758         switch(chain->bref.type) {
759         case HAMMER2_BREF_TYPE_INODE:
760                 if ((focus->data->ipdata.meta.op_flags &
761                      HAMMER2_OPFLAG_DIRECTDATA) == 0) {
762                         bcopy(focus->data, chain->data,
763                               offsetof(hammer2_inode_data_t, u));
764                         break;
765                 }
766                 /* fall through */
767         case HAMMER2_BREF_TYPE_DATA:
768                 bcopy(focus->data, chain->data, chain->bytes);
769                 hammer2_chain_setcheck(chain, chain->data);
770                 break;
771         default:
772                 KKASSERT(0);
773                 break;
774         }
775
776         hammer2_chain_unlock(focus);
777         hammer2_chain_unlock(chain);            /* unlock, leave ref */
778
779         /*
780          * Avoid ordering deadlock when relocking cparent.
781          */
782         if (i == 0) {
783                 hammer2_cluster_lock_except(cparent, i, HAMMER2_RESOLVE_ALWAYS);
784         } else {
785                 hammer2_chain_unlock(cparent->array[i].chain);
786                 hammer2_cluster_lock(cparent, HAMMER2_RESOLVE_ALWAYS);
787         }
788
789         /*
790          * Enter item into (unlocked) cluster.
791          *
792          * Must clear invalid for iteration to work properly.
793          */
794         if (cluster->array[i].chain)
795                 hammer2_chain_drop(cluster->array[i].chain);
796         cluster->array[i].chain = chain;
797         cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
798
799         return 0;
800 }
801
802 /*
803  * cparent is locked exclusively, with an extra ref, cluster is not locked.
804  */
805 static
806 int
807 hammer2_sync_destroy(hammer2_thread_t *thr,
808                      hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
809                      int i, int *errors)
810 {
811         hammer2_chain_t *chain;
812
813         chain = cluster->array[i].chain;
814 #if HAMMER2_THREAD_DEBUG
815         if (hammer2_debug & 1)
816         kprintf("destroy rec %p/%p slave %d %d.%016jx\n",
817                 cparent, cluster,
818                 i, chain->bref.type, chain->bref.key);
819 #endif
820         /*
821          * Try to avoid unnecessary I/O.
822          *
823          * XXX accounting not propagated up properly.  We might have to do
824          *     a RESOLVE_MAYBE here and pass 0 for the flags.
825          */
826         hammer2_chain_lock(chain, HAMMER2_RESOLVE_NEVER);
827         hammer2_chain_delete(&thr->trans, cparent->array[i].chain, chain,
828                              HAMMER2_DELETE_NOSTATS |
829                              HAMMER2_DELETE_PERMANENT);
830         hammer2_chain_unlock(chain);
831
832         /*
833          * The element is not valid in that it doesn't match the other
834          * elements, but we have to mark it valid here to allow the
835          * cluster_next() call to advance this index to the next element.
836          */
837         cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
838
839         return 0;
840 }
841
842 /*
843  * cparent is locked exclusively, with an extra ref, cluster is not locked.
844  * Replace element [i] in the cluster.
845  */
846 static
847 int
848 hammer2_sync_replace(hammer2_thread_t *thr,
849                      hammer2_cluster_t *cparent, hammer2_cluster_t *cluster,
850                      hammer2_tid_t modify_tid, int i, int *errors)
851 {
852         hammer2_chain_t *focus;
853         hammer2_chain_t *chain;
854         int nradix;
855         uint8_t otype;
856
857         focus = cluster->focus;
858         chain = cluster->array[i].chain;
859 #if HAMMER2_THREAD_DEBUG
860         if (hammer2_debug & 1)
861         kprintf("replace rec %p/%p slave %d %d.%016jx mod=%016jx\n",
862                 cparent, cluster,
863                 i, focus->bref.type, focus->bref.key, modify_tid);
864 #endif
865         if (cluster->focus_index < i)
866                 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
867         hammer2_chain_lock(chain, HAMMER2_RESOLVE_ALWAYS);
868         if (cluster->focus_index >= i)
869                 hammer2_chain_lock(focus, HAMMER2_RESOLVE_ALWAYS);
870         if (chain->bytes != focus->bytes) {
871                 /* XXX what if compressed? */
872                 nradix = hammer2_getradix(chain->bytes);
873                 hammer2_chain_resize(&thr->trans, NULL,
874                                      cparent->array[i].chain, chain,
875                                      nradix, 0);
876         }
877         hammer2_chain_modify(&thr->trans, chain, 0);
878         otype = chain->bref.type;
879         chain->bref.type = focus->bref.type;
880         chain->bref.methods = focus->bref.methods;
881         chain->bref.keybits = focus->bref.keybits;
882         chain->bref.vradix = focus->bref.vradix;
883         /* mirror_tid updated by flush */
884         chain->bref.modify_tid = modify_tid;
885         chain->bref.flags = focus->bref.flags;
886         /* key already present */
887         /* check code will be recalculated */
888         chain->error = 0;
889
890         /*
891          * Copy data body.
892          */
893         switch(chain->bref.type) {
894         case HAMMER2_BREF_TYPE_INODE:
895                 if ((focus->data->ipdata.meta.op_flags &
896                      HAMMER2_OPFLAG_DIRECTDATA) == 0) {
897                         /*
898                          * If DIRECTDATA is transitioning to 0 or the old
899                          * chain is not an inode we have to initialize
900                          * the block table.
901                          */
902                         if (otype != HAMMER2_BREF_TYPE_INODE ||
903                             (chain->data->ipdata.meta.op_flags &
904                              HAMMER2_OPFLAG_DIRECTDATA)) {
905                                 kprintf("chain inode trans away from dd\n");
906                                 bzero(&chain->data->ipdata.u,
907                                       sizeof(chain->data->ipdata.u));
908                         }
909                         bcopy(focus->data, chain->data,
910                               offsetof(hammer2_inode_data_t, u));
911                         /* XXX setcheck on inode should not be needed */
912                         hammer2_chain_setcheck(chain, chain->data);
913                         break;
914                 }
915                 /* fall through */
916         case HAMMER2_BREF_TYPE_DATA:
917                 bcopy(focus->data, chain->data, chain->bytes);
918                 hammer2_chain_setcheck(chain, chain->data);
919                 break;
920         default:
921                 KKASSERT(0);
922                 break;
923         }
924
925         hammer2_chain_unlock(focus);
926         hammer2_chain_unlock(chain);
927
928         /*
929          * Must clear invalid for iteration to work properly.
930          */
931         cluster->array[i].flags &= ~HAMMER2_CITEM_INVALID;
932
933         return 0;
934 }
935
936 /****************************************************************************
937  *                          HAMMER2 XOPS THREADS                            *
938  ****************************************************************************/
939
940 void
941 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp)
942 {
943         hammer2_mtx_init(&xgrp->mtx, "h2xopq");
944         xgrp->xop_tailp = &xgrp->marker.next;
945         xgrp->marker.refs = 0x7FFFFFFF;
946 }
947
948 /*
949  * Primary management thread for xops support.  Each node has several such
950  * threads which replicate front-end operations on cluster nodes.
951  *
952  * XOPS thread node operations, allowing the function to focus on a single
953  * node in the cluster after validating the operation with the cluster.
954  * This is primarily what prevents dead or stalled nodes from stalling
955  * the front-end.
956  */
957 void
958 hammer2_primary_xops_thread(void *arg)
959 {
960         hammer2_thread_t *thr = arg;
961         hammer2_pfs_t *pmp;
962         hammer2_xop_t *xop;
963         hammer2_xop_t *prev;
964         hammer2_xop_group_t *xgrp;
965
966         pmp = thr->pmp;
967         xgrp = &pmp->xop_groups[thr->repidx];
968         prev = &xgrp->marker;
969
970         lockmgr(&thr->lk, LK_EXCLUSIVE);
971         while ((thr->flags & HAMMER2_THREAD_STOP) == 0) {
972                 /*
973                  * Handle freeze request
974                  */
975                 if (thr->flags & HAMMER2_THREAD_FREEZE) {
976                         atomic_set_int(&thr->flags, HAMMER2_THREAD_FROZEN);
977                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_FREEZE);
978                 }
979
980                 /*
981                  * Force idle if frozen until unfrozen or stopped.
982                  */
983                 if (thr->flags & HAMMER2_THREAD_FROZEN) {
984                         lksleep(&thr->flags, &thr->lk, 0, "frozen", 0);
985                         continue;
986                 }
987
988                 /*
989                  * Reset state on REMASTER request
990                  */
991                 if (thr->flags & HAMMER2_THREAD_REMASTER) {
992                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_REMASTER);
993                         /* reset state */
994                 }
995
996                 /*
997                  * Process requests.  All requests are persistent until the
998                  * last thread has processed it.
999                  */
1000                 kprintf("xops_slave clindex %d\n", thr->clindex);
1001
1002                 while ((xop = prev->next) != NULL) {
1003                         if (atomic_fetchadd_int(&prev->refs, -1) == 1) {
1004                                 KKASSERT(prev == xgrp->marker.next);
1005                                 xgrp->marker.next = xop;
1006                                 objcache_put(cache_xops, prev);
1007                         }
1008                         xop->func(thr, xop);
1009                         prev = xop;
1010                 }
1011
1012                 /*
1013                  * Wait for event.
1014                  */
1015                 lksleep(&thr->flags, &thr->lk, 0, "h2idle", 0);
1016         }
1017         thr->td = NULL;
1018         wakeup(thr);
1019         lockmgr(&thr->lk, LK_RELEASE);
1020         /* thr structure can go invalid after this point */
1021 }