hammer2 - Revamp flush and xopq mechanism, stabilization
[dragonfly.git] / sys / vfs / hammer2 / hammer2_admin.c
1 /*
2  * Copyright (c) 2015 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module implements the hammer2 helper thread API, including
36  * the frontend/backend XOP API.
37  */
38 #include "hammer2.h"
39
40 /*
41  * Signal that the thread has work.
42  */
43 void
44 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags)
45 {
46         uint32_t oflags;
47
48         for (;;) {
49                 oflags = thr->flags;
50                 cpu_ccfence();
51                 if (oflags & HAMMER2_THREAD_WAITING) {
52                         if (atomic_cmpset_int(&thr->flags, oflags,
53                                   (oflags | flags) & ~HAMMER2_THREAD_WAITING)) {
54                                 wakeup(&thr->flags);
55                                 break;
56                         }
57                 } else {
58                         if (atomic_cmpset_int(&thr->flags, oflags,
59                                               oflags | flags)) {
60                                 break;
61                         }
62                 }
63         }
64 }
65
66 /*
67  * Return status to waiting client(s)
68  */
69 void
70 hammer2_thr_return(hammer2_thread_t *thr, uint32_t flags)
71 {
72         uint32_t oflags;
73         uint32_t nflags;
74
75         for (;;) {
76                 oflags = thr->flags;
77                 cpu_ccfence();
78                 nflags = (oflags | flags) & ~HAMMER2_THREAD_CLIENTWAIT;
79
80                 if (oflags & HAMMER2_THREAD_CLIENTWAIT) {
81                         if (atomic_cmpset_int(&thr->flags, oflags, nflags)) {
82                                 wakeup(thr);
83                                 break;
84                         }
85                 } else {
86                         if (atomic_cmpset_int(&thr->flags, oflags, nflags))
87                                 break;
88                 }
89         }
90 }
91
92 /*
93  * Wait until the bits in flags are set.
94  */
95 void
96 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags)
97 {
98         uint32_t oflags;
99         uint32_t nflags;
100
101         for (;;) {
102                 oflags = thr->flags;
103                 cpu_ccfence();
104                 if ((oflags & flags) == flags)
105                         break;
106                 nflags = oflags | HAMMER2_THREAD_CLIENTWAIT;
107                 tsleep_interlock(thr, 0);
108                 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) {
109                         tsleep(thr, PINTERLOCKED, "h2twait", hz*60);
110                 }
111         }
112 }
113
114 /*
115  * Wait until the bits in flags are clear.
116  */
117 void
118 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags)
119 {
120         uint32_t oflags;
121         uint32_t nflags;
122
123         for (;;) {
124                 oflags = thr->flags;
125                 cpu_ccfence();
126                 if ((oflags & flags) == 0)
127                         break;
128                 nflags = oflags | HAMMER2_THREAD_CLIENTWAIT;
129                 tsleep_interlock(thr, 0);
130                 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) {
131                         tsleep(thr, PINTERLOCKED, "h2twait", hz*60);
132                 }
133         }
134 }
135
136 /*
137  * Initialize the supplied thread structure, starting the specified
138  * thread.
139  */
140 void
141 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp,
142                    const char *id, int clindex, int repidx,
143                    void (*func)(void *arg))
144 {
145         thr->pmp = pmp;
146         thr->clindex = clindex;
147         thr->repidx = repidx;
148         TAILQ_INIT(&thr->xopq);
149         if (repidx >= 0) {
150                 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus,
151                             "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx);
152         } else {
153                 lwkt_create(func, thr, &thr->td, NULL, 0, -1,
154                             "%s-%s", id, pmp->pfs_names[clindex]);
155         }
156 }
157
158 /*
159  * Terminate a thread.  This function will silently return if the thread
160  * was never initialized or has already been deleted.
161  *
162  * This is accomplished by setting the STOP flag and waiting for the td
163  * structure to become NULL.
164  */
165 void
166 hammer2_thr_delete(hammer2_thread_t *thr)
167 {
168         if (thr->td == NULL)
169                 return;
170         hammer2_thr_signal(thr, HAMMER2_THREAD_STOP);
171         hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED);
172         thr->pmp = NULL;
173         KKASSERT(TAILQ_EMPTY(&thr->xopq));
174 }
175
176 /*
177  * Asynchronous remaster request.  Ask the synchronization thread to
178  * start over soon (as if it were frozen and unfrozen, but without waiting).
179  * The thread always recalculates mastership relationships when restarting.
180  */
181 void
182 hammer2_thr_remaster(hammer2_thread_t *thr)
183 {
184         if (thr->td == NULL)
185                 return;
186         hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER);
187 }
188
189 void
190 hammer2_thr_freeze_async(hammer2_thread_t *thr)
191 {
192         hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE);
193 }
194
195 void
196 hammer2_thr_freeze(hammer2_thread_t *thr)
197 {
198         if (thr->td == NULL)
199                 return;
200         hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE);
201         hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN);
202 }
203
204 void
205 hammer2_thr_unfreeze(hammer2_thread_t *thr)
206 {
207         if (thr->td == NULL)
208                 return;
209         hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE);
210         hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN);
211 }
212
213 int
214 hammer2_thr_break(hammer2_thread_t *thr)
215 {
216         if (thr->flags & (HAMMER2_THREAD_STOP |
217                           HAMMER2_THREAD_REMASTER |
218                           HAMMER2_THREAD_FREEZE)) {
219                 return 1;
220         }
221         return 0;
222 }
223
224 /****************************************************************************
225  *                          HAMMER2 XOPS API                                *
226  ****************************************************************************/
227
228 void
229 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp)
230 {
231         /* no extra fields in structure at the moment */
232 }
233
234 /*
235  * Allocate a XOP request.
236  *
237  * Once allocated a XOP request can be started, collected, and retired,
238  * and can be retired early if desired.
239  *
240  * NOTE: Fifo indices might not be zero but ri == wi on objcache_get().
241  */
242 void *
243 hammer2_xop_alloc(hammer2_inode_t *ip, int flags)
244 {
245         hammer2_xop_t *xop;
246
247         xop = objcache_get(cache_xops, M_WAITOK);
248         KKASSERT(xop->head.cluster.array[0].chain == NULL);
249
250         xop->head.ip1 = ip;
251         xop->head.func = NULL;
252         xop->head.flags = flags;
253         xop->head.state = 0;
254         xop->head.error = 0;
255         xop->head.collect_key = 0;
256         xop->head.check_counter = 0;
257         if (flags & HAMMER2_XOP_MODIFYING)
258                 xop->head.mtid = hammer2_trans_sub(ip->pmp);
259         else
260                 xop->head.mtid = 0;
261
262         xop->head.cluster.nchains = ip->cluster.nchains;
263         xop->head.cluster.pmp = ip->pmp;
264         xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED;
265
266         /*
267          * run_mask - Active thread (or frontend) associated with XOP
268          */
269         xop->head.run_mask = HAMMER2_XOPMASK_VOP;
270
271         hammer2_inode_ref(ip);
272
273         return xop;
274 }
275
276 void
277 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len)
278 {
279         xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO);
280         xop->name1_len = name_len;
281         bcopy(name, xop->name1, name_len);
282 }
283
284 void
285 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len)
286 {
287         xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO);
288         xop->name2_len = name_len;
289         bcopy(name, xop->name2, name_len);
290 }
291
292 size_t
293 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum)
294 {
295         const size_t name_len = 18;
296
297         xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO);
298         xop->name1_len = name_len;
299         ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum);
300
301         return name_len;
302 }
303
304
305 void
306 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2)
307 {
308         xop->ip2 = ip2;
309         hammer2_inode_ref(ip2);
310 }
311
312 void
313 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3)
314 {
315         xop->ip3 = ip3;
316         hammer2_inode_ref(ip3);
317 }
318
319 void
320 hammer2_xop_reinit(hammer2_xop_head_t *xop)
321 {
322         xop->state = 0;
323         xop->error = 0;
324         xop->collect_key = 0;
325         xop->run_mask = HAMMER2_XOPMASK_VOP;
326 }
327
328 /*
329  * A mounted PFS needs Xops threads to support frontend operations.
330  */
331 void
332 hammer2_xop_helper_create(hammer2_pfs_t *pmp)
333 {
334         int i;
335         int j;
336
337         lockmgr(&pmp->lock, LK_EXCLUSIVE);
338         pmp->has_xop_threads = 1;
339
340         for (i = 0; i < pmp->iroot->cluster.nchains; ++i) {
341                 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
342                         if (pmp->xop_groups[j].thrs[i].td)
343                                 continue;
344                         hammer2_thr_create(&pmp->xop_groups[j].thrs[i], pmp,
345                                            "h2xop", i, j,
346                                            hammer2_primary_xops_thread);
347                 }
348         }
349         lockmgr(&pmp->lock, LK_RELEASE);
350 }
351
352 void
353 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp)
354 {
355         int i;
356         int j;
357
358         for (i = 0; i < pmp->pfs_nmasters; ++i) {
359                 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) {
360                         if (pmp->xop_groups[j].thrs[i].td)
361                                 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]);
362                 }
363         }
364 }
365
366 /*
367  * Start a XOP request, queueing it to all nodes in the cluster to
368  * execute the cluster op.
369  *
370  * XXX optimize single-target case.
371  */
372 void
373 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_func_t func,
374                          int notidx)
375 {
376         hammer2_inode_t *ip1;
377         hammer2_pfs_t *pmp;
378         hammer2_thread_t *thr;
379         int i;
380         int ng;
381         int nchains;
382
383         ip1 = xop->ip1;
384         pmp = ip1->pmp;
385         if (pmp->has_xop_threads == 0)
386                 hammer2_xop_helper_create(pmp);
387
388         /*
389          * The intent of the XOP sequencer is to ensure that ops on the same inode
390          * execute in the same order.  This is necessary when issuing modifying operations
391          * to multiple targets because some targets might get behind and the frontend is
392          * allowed to complete the moment a quorum of targets succeed.
393          *
394          * Strategy operations must be segregated from non-strategy operations to avoid
395          * a deadlock.  For example, if a vfsync and a bread/bwrite were queued to
396          * the same worker thread, the locked buffer in the strategy operation can deadlock
397          * the vfsync's buffer list scan.
398          *
399          * TODO - RENAME fails here because it is potentially modifying three different
400          *        inodes.
401          */
402         if (xop->flags & HAMMER2_XOP_STRATEGY) {
403                 hammer2_xop_strategy_t *xopst;
404
405                 xopst = &((hammer2_xop_t *)xop)->xop_strategy;
406                 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1)) ^
407                            hammer2_icrc32(&xopst->lbase, sizeof(xopst->lbase)));
408                 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1);
409                 ng += HAMMER2_XOPGROUPS / 2;
410         } else {
411                 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1)));
412                 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1);
413         }
414         xop->func = func;
415
416         /*
417          * The instant xop is queued another thread can pick it off.  In the
418          * case of asynchronous ops, another thread might even finish and
419          * deallocate it.
420          */
421         hammer2_spin_ex(&pmp->xop_spin);
422         nchains = ip1->cluster.nchains;
423         for (i = 0; i < nchains; ++i) {
424                 /*
425                  * XXX ip1->cluster.array* not stable here.  This temporary
426                  *     hack fixes basic issues in target XOPs which need to
427                  *     obtain a starting chain from the inode but does not
428                  *     address possible races against inode updates which
429                  *     might NULL-out a chain.
430                  */
431                 if (i != notidx && ip1->cluster.array[i].chain) {
432                         thr = &pmp->xop_groups[ng].thrs[i];
433                         atomic_set_int(&xop->run_mask, 1U << i);
434                         atomic_set_int(&xop->chk_mask, 1U << i);
435                         TAILQ_INSERT_TAIL(&thr->xopq, xop, collect[i].entry);
436                 }
437         }
438         hammer2_spin_unex(&pmp->xop_spin);
439         /* xop can become invalid at this point */
440
441         /*
442          * Each thread has its own xopq
443          */
444         for (i = 0; i < nchains; ++i) {
445                 if (i != notidx) {
446                         thr = &pmp->xop_groups[ng].thrs[i];
447                         hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ);
448                 }
449         }
450 }
451
452 void
453 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_func_t func)
454 {
455         hammer2_xop_start_except(xop, func, -1);
456 }
457
458 /*
459  * Retire a XOP.  Used by both the VOP frontend and by the XOP backend.
460  */
461 void
462 hammer2_xop_retire(hammer2_xop_head_t *xop, uint32_t mask)
463 {
464         hammer2_chain_t *chain;
465         uint32_t nmask;
466         int i;
467
468         /*
469          * Remove the frontend collector or remove a backend feeder.
470          * When removing the frontend we must wakeup any backend feeders
471          * who are waiting for FIFO space.
472          *
473          * XXX optimize wakeup.
474          */
475         KKASSERT(xop->run_mask & mask);
476         nmask = atomic_fetchadd_int(&xop->run_mask, -mask);
477         if ((nmask & ~HAMMER2_XOPMASK_FIFOW) != mask) {
478                 if (mask == HAMMER2_XOPMASK_VOP) {
479                         if (nmask & HAMMER2_XOPMASK_FIFOW)
480                                 wakeup(xop);
481                 }
482                 return;
483         }
484         /* else nobody else left, we can ignore FIFOW */
485
486         /*
487          * All collectors are gone, we can cleanup and dispose of the XOP.
488          * Note that this can wind up being a frontend OR a backend.
489          * Pending chains are locked shared and not owned by any thread.
490          */
491 #if 0
492         /*
493          * Cache the terminating cluster.
494          */
495         hammer2_inode_t *ip;
496         if ((ip = xop->ip1) != NULL) {
497                 hammer2_cluster_t *tmpclu;
498
499                 tmpclu = hammer2_cluster_copy(&xop->cluster);
500                 hammer2_spin_ex(&ip->cluster_spin);
501                 tmpclu = atomic_swap_ptr((volatile void **)&ip->cluster_cache,
502                                          tmpclu);
503                 hammer2_spin_unex(&ip->cluster_spin);
504                 if (tmpclu)
505                         hammer2_cluster_drop(tmpclu);
506         }
507 #endif
508
509         /*
510          * Cleanup the collection cluster.
511          */
512         for (i = 0; i < xop->cluster.nchains; ++i) {
513                 xop->cluster.array[i].flags = 0;
514                 chain = xop->cluster.array[i].chain;
515                 if (chain) {
516                         xop->cluster.array[i].chain = NULL;
517                         hammer2_chain_drop_unhold(chain);
518                 }
519         }
520
521         /*
522          * Cleanup the fifos, use check_counter to optimize the loop.
523          * Since we are the only entity left on this xop we don't have
524          * to worry about fifo flow control, and one lfence() will do the
525          * job.
526          */
527         cpu_lfence();
528         mask = xop->chk_mask;
529         for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) {
530                 hammer2_xop_fifo_t *fifo = &xop->collect[i];
531                 while (fifo->ri != fifo->wi) {
532                         chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
533                         if (chain)
534                                 hammer2_chain_drop_unhold(chain);
535                         ++fifo->ri;
536                 }
537                 mask &= ~(1U << i);
538         }
539
540         /*
541          * The inode is only held at this point, simply drop it.
542          */
543         if (xop->ip1) {
544                 hammer2_inode_drop(xop->ip1);
545                 xop->ip1 = NULL;
546         }
547         if (xop->ip2) {
548                 hammer2_inode_drop(xop->ip2);
549                 xop->ip2 = NULL;
550         }
551         if (xop->ip3) {
552                 hammer2_inode_drop(xop->ip3);
553                 xop->ip3 = NULL;
554         }
555         if (xop->name1) {
556                 kfree(xop->name1, M_HAMMER2);
557                 xop->name1 = NULL;
558                 xop->name1_len = 0;
559         }
560         if (xop->name2) {
561                 kfree(xop->name2, M_HAMMER2);
562                 xop->name2 = NULL;
563                 xop->name2_len = 0;
564         }
565
566         objcache_put(cache_xops, xop);
567 }
568
569 /*
570  * (Backend) Returns non-zero if the frontend is still attached.
571  */
572 int
573 hammer2_xop_active(hammer2_xop_head_t *xop)
574 {
575         if (xop->run_mask & HAMMER2_XOPMASK_VOP)
576                 return 1;
577         else
578                 return 0;
579 }
580
581 /*
582  * (Backend) Feed chain data through the cluster validator and back to
583  * the frontend.  Chains are fed from multiple nodes concurrently
584  * and pipelined via per-node FIFOs in the XOP.
585  *
586  * The chain must be locked (either shared or exclusive).  The caller may
587  * unlock and drop the chain on return.  This function will add an extra
588  * ref and hold the chain's data for the pass-back.
589  *
590  * No xop lock is needed because we are only manipulating fields under
591  * our direct control.
592  *
593  * Returns 0 on success and a hammer error code if sync is permanently
594  * lost.  The caller retains a ref on the chain but by convention
595  * the lock is typically inherited by the xop (caller loses lock).
596  *
597  * Returns non-zero on error.  In this situation the caller retains a
598  * ref on the chain but loses the lock (we unlock here).
599  */
600 int
601 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain,
602                  int clindex, int error)
603 {
604         hammer2_xop_fifo_t *fifo;
605         uint32_t mask;
606
607         /*
608          * Early termination (typicaly of xop_readir)
609          */
610         if (hammer2_xop_active(xop) == 0) {
611                 error = EINTR;
612                 goto done;
613         }
614
615         /*
616          * Multi-threaded entry into the XOP collector.  We own the
617          * fifo->wi for our clindex.
618          */
619         fifo = &xop->collect[clindex];
620
621         if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO)
622                 lwkt_yield();
623         while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
624                 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL);
625                 mask = xop->run_mask;
626                 if ((mask & HAMMER2_XOPMASK_VOP) == 0) {
627                         error = EINTR;
628                         goto done;
629                 }
630                 tsleep_interlock(xop, 0);
631                 if (atomic_cmpset_int(&xop->run_mask, mask,
632                                       mask | HAMMER2_XOPMASK_FIFOW)) {
633                         if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) {
634                                 tsleep(xop, PINTERLOCKED, "h2feed", hz*60);
635                         }
636                 }
637                 /* retry */
638         }
639         atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL);
640         if (chain)
641                 hammer2_chain_ref_hold(chain);
642         if (error == 0 && chain)
643                 error = chain->error;
644         fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error;
645         fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain;
646         cpu_sfence();
647         ++fifo->wi;
648         if (atomic_fetchadd_int(&xop->check_counter, HAMMER2_XOP_CHKINC) &
649             HAMMER2_XOP_CHKWAIT) {
650                 atomic_clear_int(&xop->check_counter, HAMMER2_XOP_CHKWAIT);
651                 wakeup(&xop->check_counter);
652         }
653         error = 0;
654
655         /*
656          * Cleanup.  If an error occurred we eat the lock.  If no error
657          * occurred the fifo inherits the lock and gains an additional ref.
658          *
659          * The caller's ref remains in both cases.
660          */
661 done:
662         return error;
663 }
664
665 /*
666  * (Frontend) collect a response from a running cluster op.
667  *
668  * Responses are fed from all appropriate nodes concurrently
669  * and collected into a cohesive response >= collect_key.
670  *
671  * The collector will return the instant quorum or other requirements
672  * are met, even if some nodes get behind or become non-responsive.
673  *
674  * HAMMER2_XOP_COLLECT_NOWAIT   - Used to 'poll' a completed collection,
675  *                                usually called synchronously from the
676  *                                node XOPs for the strategy code to
677  *                                fake the frontend collection and complete
678  *                                the BIO as soon as possible.
679  *
680  * HAMMER2_XOP_SYNCHRONIZER     - Reqeuest synchronization with a particular
681  *                                cluster index, prevents looping when that
682  *                                index is out of sync so caller can act on
683  *                                the out of sync element.  ESRCH and EDEADLK
684  *                                can be returned if this flag is specified.
685  *
686  * Returns 0 on success plus a filled out xop->cluster structure.
687  * Return ENOENT on normal termination.
688  * Otherwise return an error.
689  */
690 int
691 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags)
692 {
693         hammer2_xop_fifo_t *fifo;
694         hammer2_chain_t *chain;
695         hammer2_key_t lokey;
696         int error;
697         int keynull;
698         int adv;                /* advance the element */
699         int i;
700         uint32_t check_counter;
701
702 loop:
703         /*
704          * First loop tries to advance pieces of the cluster which
705          * are out of sync.
706          */
707         lokey = HAMMER2_KEY_MAX;
708         keynull = HAMMER2_CHECK_NULL;
709         check_counter = xop->check_counter;
710         cpu_lfence();
711
712         for (i = 0; i < xop->cluster.nchains; ++i) {
713                 chain = xop->cluster.array[i].chain;
714                 if (chain == NULL) {
715                         adv = 1;
716                 } else if (chain->bref.key < xop->collect_key) {
717                         adv = 1;
718                 } else {
719                         keynull &= ~HAMMER2_CHECK_NULL;
720                         if (lokey > chain->bref.key)
721                                 lokey = chain->bref.key;
722                         adv = 0;
723                 }
724                 if (adv == 0)
725                         continue;
726
727                 /*
728                  * Advance element if possible, advanced element may be NULL.
729                  */
730                 if (chain)
731                         hammer2_chain_drop_unhold(chain);
732
733                 fifo = &xop->collect[i];
734                 if (fifo->ri != fifo->wi) {
735                         cpu_lfence();
736                         chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK];
737                         error = fifo->errors[fifo->ri & HAMMER2_XOPFIFO_MASK];
738                         ++fifo->ri;
739                         xop->cluster.array[i].chain = chain;
740                         xop->cluster.array[i].error = error;
741                         if (chain == NULL) {
742                                 /* XXX */
743                                 xop->cluster.array[i].flags |=
744                                                         HAMMER2_CITEM_NULL;
745                         }
746                         if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) {
747                                 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) {
748                                         atomic_clear_int(&fifo->flags,
749                                                     HAMMER2_XOP_FIFO_STALL);
750                                         wakeup(xop);
751                                         lwkt_yield();
752                                 }
753                         }
754                         --i;            /* loop on same index */
755                 } else {
756                         /*
757                          * Retain CITEM_NULL flag.  If set just repeat EOF.
758                          * If not, the NULL,0 combination indicates an
759                          * operation in-progress.
760                          */
761                         xop->cluster.array[i].chain = NULL;
762                         /* retain any CITEM_NULL setting */
763                 }
764         }
765
766         /*
767          * Determine whether the lowest collected key meets clustering
768          * requirements.  Returns:
769          *
770          * 0             - key valid, cluster can be returned.
771          *
772          * ENOENT        - normal end of scan, return ENOENT.
773          *
774          * ESRCH         - sufficient elements collected, quorum agreement
775          *                 that lokey is not a valid element and should be
776          *                 skipped.
777          *
778          * EDEADLK       - sufficient elements collected, no quorum agreement
779          *                 (and no agreement possible).  In this situation a
780          *                 repair is needed, for now we loop.
781          *
782          * EINPROGRESS   - insufficient elements collected to resolve, wait
783          *                 for event and loop.
784          */
785         if ((flags & HAMMER2_XOP_COLLECT_WAITALL) &&
786             xop->run_mask != HAMMER2_XOPMASK_VOP) {
787                 error = EINPROGRESS;
788         } else {
789                 error = hammer2_cluster_check(&xop->cluster, lokey, keynull);
790         }
791         if (error == EINPROGRESS) {
792                 if ((flags & HAMMER2_XOP_COLLECT_NOWAIT) == 0)
793                         tsleep_interlock(&xop->check_counter, 0);
794                 if (atomic_cmpset_int(&xop->check_counter,
795                                       check_counter,
796                                       check_counter | HAMMER2_XOP_CHKWAIT)) {
797                         if (flags & HAMMER2_XOP_COLLECT_NOWAIT)
798                                 goto done;
799                         tsleep(&xop->check_counter, PINTERLOCKED, "h2coll", hz*60);
800                 }
801                 goto loop;
802         }
803         if (error == ESRCH) {
804                 if (lokey != HAMMER2_KEY_MAX) {
805                         xop->collect_key = lokey + 1;
806                         goto loop;
807                 }
808                 error = ENOENT;
809         }
810         if (error == EDEADLK) {
811                 kprintf("hammer2: no quorum possible lokey %016jx\n",
812                         lokey);
813                 if (lokey != HAMMER2_KEY_MAX) {
814                         xop->collect_key = lokey + 1;
815                         goto loop;
816                 }
817                 error = ENOENT;
818         }
819         if (lokey == HAMMER2_KEY_MAX)
820                 xop->collect_key = lokey;
821         else
822                 xop->collect_key = lokey + 1;
823 done:
824         return error;
825 }
826
827 /*
828  * N x M processing threads are available to handle XOPs, N per cluster
829  * index x M cluster nodes.  All the threads for any given cluster index
830  * share and pull from the same xopq.
831  *
832  * Locate and return the next runnable xop, or NULL if no xops are
833  * present or none of the xops are currently runnable (for various reasons).
834  * The xop is left on the queue and serves to block other dependent xops
835  * from being run.
836  *
837  * Dependent xops will not be returned.
838  *
839  * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL.
840  *
841  * NOTE! Xops run concurrently for each cluster index.
842  */
843 #define XOP_HASH_SIZE   16
844 #define XOP_HASH_MASK   (XOP_HASH_SIZE - 1)
845
846 static __inline
847 int
848 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash)
849 {
850         uint32_t mask;
851         int hv;
852
853         hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t);
854         mask = 1U << (hv & 31);
855         hv >>= 5;
856
857         return ((int)(hash[hv & XOP_HASH_MASK] & mask));
858 }
859
860 static __inline
861 void
862 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash)
863 {
864         uint32_t mask;
865         int hv;
866
867         hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t);
868         mask = 1U << (hv & 31);
869         hv >>= 5;
870
871         hash[hv & XOP_HASH_MASK] |= mask;
872 }
873
874 static
875 hammer2_xop_head_t *
876 hammer2_xop_next(hammer2_thread_t *thr)
877 {
878         hammer2_pfs_t *pmp = thr->pmp;
879         int clindex = thr->clindex;
880         uint32_t hash[XOP_HASH_SIZE] = { 0 };
881         hammer2_xop_head_t *xop;
882
883         hammer2_spin_ex(&pmp->xop_spin);
884         TAILQ_FOREACH(xop, &thr->xopq, collect[clindex].entry) {
885                 /*
886                  * Check dependency
887                  */
888                 if (xop_testhash(thr, xop->ip1, hash) ||
889                     (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) ||
890                     (xop->ip3 && xop_testhash(thr, xop->ip3, hash))) {
891                         continue;
892                 }
893                 xop_sethash(thr, xop->ip1, hash);
894                 if (xop->ip2)
895                         xop_sethash(thr, xop->ip2, hash);
896                 if (xop->ip3)
897                         xop_sethash(thr, xop->ip3, hash);
898
899                 /*
900                  * Check already running
901                  */
902                 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN)
903                         continue;
904
905                 /*
906                  * Found a good one, return it.
907                  */
908                 atomic_set_int(&xop->collect[clindex].flags,
909                                HAMMER2_XOP_FIFO_RUN);
910                 break;
911         }
912         hammer2_spin_unex(&pmp->xop_spin);
913
914         return xop;
915 }
916
917 /*
918  * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN.
919  *
920  * NOTE! Xops run concurrently for each cluster index.
921  */
922 static
923 void
924 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop)
925 {
926         hammer2_pfs_t *pmp = thr->pmp;
927         int clindex = thr->clindex;
928
929         hammer2_spin_ex(&pmp->xop_spin);
930         TAILQ_REMOVE(&thr->xopq, xop, collect[clindex].entry);
931         atomic_clear_int(&xop->collect[clindex].flags,
932                          HAMMER2_XOP_FIFO_RUN);
933         hammer2_spin_unex(&pmp->xop_spin);
934         if (TAILQ_FIRST(&thr->xopq))
935                 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ);
936 }
937
938 /*
939  * Primary management thread for xops support.  Each node has several such
940  * threads which replicate front-end operations on cluster nodes.
941  *
942  * XOPS thread node operations, allowing the function to focus on a single
943  * node in the cluster after validating the operation with the cluster.
944  * This is primarily what prevents dead or stalled nodes from stalling
945  * the front-end.
946  */
947 void
948 hammer2_primary_xops_thread(void *arg)
949 {
950         hammer2_thread_t *thr = arg;
951         hammer2_pfs_t *pmp;
952         hammer2_xop_head_t *xop;
953         uint32_t mask;
954         uint32_t flags;
955         uint32_t nflags;
956         hammer2_xop_func_t last_func = NULL;
957
958         pmp = thr->pmp;
959         /*xgrp = &pmp->xop_groups[thr->repidx]; not needed */
960         mask = 1U << thr->clindex;
961
962         for (;;) {
963                 flags = thr->flags;
964
965                 /*
966                  * Handle stop request
967                  */
968                 if (flags & HAMMER2_THREAD_STOP)
969                         break;
970
971                 /*
972                  * Handle freeze request
973                  */
974                 if (flags & HAMMER2_THREAD_FREEZE) {
975                         nflags = (flags & ~(HAMMER2_THREAD_FREEZE |
976                                             HAMMER2_THREAD_CLIENTWAIT)) |
977                                  HAMMER2_THREAD_FROZEN;
978                         if (!atomic_cmpset_int(&thr->flags, flags, nflags))
979                                 continue;
980                         if (flags & HAMMER2_THREAD_CLIENTWAIT)
981                                 wakeup(&thr->flags);
982                         flags = nflags;
983                         /* fall through */
984                 }
985
986                 if (flags & HAMMER2_THREAD_UNFREEZE) {
987                         nflags = flags & ~(HAMMER2_THREAD_UNFREEZE |
988                                            HAMMER2_THREAD_FROZEN |
989                                            HAMMER2_THREAD_CLIENTWAIT);
990                         if (!atomic_cmpset_int(&thr->flags, flags, nflags))
991                                 continue;
992                         if (flags & HAMMER2_THREAD_CLIENTWAIT)
993                                 wakeup(&thr->flags);
994                         flags = nflags;
995                         /* fall through */
996                 }
997
998                 /*
999                  * Force idle if frozen until unfrozen or stopped.
1000                  */
1001                 if (flags & HAMMER2_THREAD_FROZEN) {
1002                         nflags = flags | HAMMER2_THREAD_WAITING;
1003                         tsleep_interlock(&thr->flags, 0);
1004                         if (atomic_cmpset_int(&thr->flags, flags, nflags)) {
1005                                 tsleep(&thr->flags, PINTERLOCKED, "frozen", 0);
1006                                 atomic_clear_int(&thr->flags,
1007                                                  HAMMER2_THREAD_WAITING);
1008                         }
1009                         continue;
1010                 }
1011
1012                 /*
1013                  * Reset state on REMASTER request
1014                  */
1015                 if (flags & HAMMER2_THREAD_REMASTER) {
1016                         nflags = flags & ~HAMMER2_THREAD_REMASTER;
1017                         if (atomic_cmpset_int(&thr->flags, flags, nflags)) {
1018                                 /* reset state here */
1019                         }
1020                         continue;
1021                 }
1022
1023                 /*
1024                  * Process requests.  Each request can be multi-queued.
1025                  *
1026                  * If we get behind and the frontend VOP is no longer active,
1027                  * we retire the request without processing it.  The callback
1028                  * may also abort processing if the frontend VOP becomes
1029                  * inactive.
1030                  */
1031                 if (flags & HAMMER2_THREAD_XOPQ) {
1032                         nflags = flags & ~HAMMER2_THREAD_XOPQ;
1033                         if (!atomic_cmpset_int(&thr->flags, flags, nflags))
1034                                 continue;
1035                         flags = nflags;
1036                         /* fall through */
1037                 }
1038                 while ((xop = hammer2_xop_next(thr)) != NULL) {
1039                         if (hammer2_xop_active(xop)) {
1040                                 last_func = xop->func;
1041                                 xop->func((hammer2_xop_t *)xop, thr->clindex);
1042                                 hammer2_xop_dequeue(thr, xop);
1043                                 hammer2_xop_retire(xop, mask);
1044                         } else {
1045                                 last_func = xop->func;
1046                                 hammer2_xop_feed(xop, NULL, thr->clindex,
1047                                                  ECONNABORTED);
1048                                 hammer2_xop_dequeue(thr, xop);
1049                                 hammer2_xop_retire(xop, mask);
1050                         }
1051                 }
1052
1053                 /*
1054                  * Wait for event, interlock using THREAD_WAITING and
1055                  * THREAD_SIGNAL.
1056                  *
1057                  * For robustness poll on a 30-second interval, but nominally
1058                  * expect to be woken up.
1059                  */
1060                 nflags = flags | HAMMER2_THREAD_WAITING;
1061
1062                 tsleep_interlock(&thr->flags, 0);
1063                 if (atomic_cmpset_int(&thr->flags, flags, nflags)) {
1064                         tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30);
1065                         atomic_clear_int(&thr->flags, HAMMER2_THREAD_WAITING);
1066                 }
1067         }
1068
1069 #if 0
1070         /*
1071          * Cleanup / termination
1072          */
1073         while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) {
1074                 kprintf("hammer2_thread: aborting xop %p\n", xop->func);
1075                 TAILQ_REMOVE(&thr->xopq, xop,
1076                              collect[thr->clindex].entry);
1077                 hammer2_xop_retire(xop, mask);
1078         }
1079 #endif
1080         thr->td = NULL;
1081         hammer2_thr_return(thr, HAMMER2_THREAD_STOPPED);
1082         /* thr structure can go invalid after this point */
1083         wakeup(thr);
1084 }