Fix hangs with processes stuck sleeping on btalloc on i386.
[freebsd.git] / sys / cddl / contrib / opensolaris / uts / common / fs / zfs / txg.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Portions Copyright 2011 Martin Matuska <mm@FreeBSD.org>
24  * Copyright (c) 2012, 2017 by Delphix. All rights reserved.
25  */
26
27 #include <sys/zfs_context.h>
28 #include <sys/txg_impl.h>
29 #include <sys/dmu_impl.h>
30 #include <sys/dmu_tx.h>
31 #include <sys/dsl_pool.h>
32 #include <sys/dsl_scan.h>
33 #include <sys/zil.h>
34 #include <sys/callb.h>
35
36 /*
37  * ZFS Transaction Groups
38  * ----------------------
39  *
40  * ZFS transaction groups are, as the name implies, groups of transactions
41  * that act on persistent state. ZFS asserts consistency at the granularity of
42  * these transaction groups. Each successive transaction group (txg) is
43  * assigned a 64-bit consecutive identifier. There are three active
44  * transaction group states: open, quiescing, or syncing. At any given time,
45  * there may be an active txg associated with each state; each active txg may
46  * either be processing, or blocked waiting to enter the next state. There may
47  * be up to three active txgs, and there is always a txg in the open state
48  * (though it may be blocked waiting to enter the quiescing state). In broad
49  * strokes, transactions -- operations that change in-memory structures -- are
50  * accepted into the txg in the open state, and are completed while the txg is
51  * in the open or quiescing states. The accumulated changes are written to
52  * disk in the syncing state.
53  *
54  * Open
55  *
56  * When a new txg becomes active, it first enters the open state. New
57  * transactions -- updates to in-memory structures -- are assigned to the
58  * currently open txg. There is always a txg in the open state so that ZFS can
59  * accept new changes (though the txg may refuse new changes if it has hit
60  * some limit). ZFS advances the open txg to the next state for a variety of
61  * reasons such as it hitting a time or size threshold, or the execution of an
62  * administrative action that must be completed in the syncing state.
63  *
64  * Quiescing
65  *
66  * After a txg exits the open state, it enters the quiescing state. The
67  * quiescing state is intended to provide a buffer between accepting new
68  * transactions in the open state and writing them out to stable storage in
69  * the syncing state. While quiescing, transactions can continue their
70  * operation without delaying either of the other states. Typically, a txg is
71  * in the quiescing state very briefly since the operations are bounded by
72  * software latencies rather than, say, slower I/O latencies. After all
73  * transactions complete, the txg is ready to enter the next state.
74  *
75  * Syncing
76  *
77  * In the syncing state, the in-memory state built up during the open and (to
78  * a lesser degree) the quiescing states is written to stable storage. The
79  * process of writing out modified data can, in turn modify more data. For
80  * example when we write new blocks, we need to allocate space for them; those
81  * allocations modify metadata (space maps)... which themselves must be
82  * written to stable storage. During the sync state, ZFS iterates, writing out
83  * data until it converges and all in-memory changes have been written out.
84  * The first such pass is the largest as it encompasses all the modified user
85  * data (as opposed to filesystem metadata). Subsequent passes typically have
86  * far less data to write as they consist exclusively of filesystem metadata.
87  *
88  * To ensure convergence, after a certain number of passes ZFS begins
89  * overwriting locations on stable storage that had been allocated earlier in
90  * the syncing state (and subsequently freed). ZFS usually allocates new
91  * blocks to optimize for large, continuous, writes. For the syncing state to
92  * converge however it must complete a pass where no new blocks are allocated
93  * since each allocation requires a modification of persistent metadata.
94  * Further, to hasten convergence, after a prescribed number of passes, ZFS
95  * also defers frees, and stops compressing.
96  *
97  * In addition to writing out user data, we must also execute synctasks during
98  * the syncing context. A synctask is the mechanism by which some
99  * administrative activities work such as creating and destroying snapshots or
100  * datasets. Note that when a synctask is initiated it enters the open txg,
101  * and ZFS then pushes that txg as quickly as possible to completion of the
102  * syncing state in order to reduce the latency of the administrative
103  * activity. To complete the syncing state, ZFS writes out a new uberblock,
104  * the root of the tree of blocks that comprise all state stored on the ZFS
105  * pool. Finally, if there is a quiesced txg waiting, we signal that it can
106  * now transition to the syncing state.
107  */
108
109 static void txg_sync_thread(void *arg);
110 static void txg_quiesce_thread(void *arg);
111
112 int zfs_txg_timeout = 5;        /* max seconds worth of delta per txg */
113
114 SYSCTL_DECL(_vfs_zfs);
115 SYSCTL_NODE(_vfs_zfs, OID_AUTO, txg, CTLFLAG_RW | CTLFLAG_MPSAFE, 0,
116     "ZFS TXG");
117 SYSCTL_INT(_vfs_zfs_txg, OID_AUTO, timeout, CTLFLAG_RWTUN, &zfs_txg_timeout, 0,
118     "Maximum seconds worth of delta per txg");
119
120 /*
121  * Prepare the txg subsystem.
122  */
123 void
124 txg_init(dsl_pool_t *dp, uint64_t txg)
125 {
126         tx_state_t *tx = &dp->dp_tx;
127         int c;
128         bzero(tx, sizeof (tx_state_t));
129
130         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
131
132         for (c = 0; c < max_ncpus; c++) {
133                 int i;
134
135                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
136                 mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_DEFAULT,
137                     NULL);
138                 for (i = 0; i < TXG_SIZE; i++) {
139                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
140                             NULL);
141                         list_create(&tx->tx_cpu[c].tc_callbacks[i],
142                             sizeof (dmu_tx_callback_t),
143                             offsetof(dmu_tx_callback_t, dcb_node));
144                 }
145         }
146
147         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
148
149         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
150         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
151         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
152         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
153         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
154
155         tx->tx_open_txg = txg;
156 }
157
158 /*
159  * Close down the txg subsystem.
160  */
161 void
162 txg_fini(dsl_pool_t *dp)
163 {
164         tx_state_t *tx = &dp->dp_tx;
165         int c;
166
167         ASSERT0(tx->tx_threads);
168
169         mutex_destroy(&tx->tx_sync_lock);
170
171         cv_destroy(&tx->tx_sync_more_cv);
172         cv_destroy(&tx->tx_sync_done_cv);
173         cv_destroy(&tx->tx_quiesce_more_cv);
174         cv_destroy(&tx->tx_quiesce_done_cv);
175         cv_destroy(&tx->tx_exit_cv);
176
177         for (c = 0; c < max_ncpus; c++) {
178                 int i;
179
180                 mutex_destroy(&tx->tx_cpu[c].tc_open_lock);
181                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
182                 for (i = 0; i < TXG_SIZE; i++) {
183                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
184                         list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
185                 }
186         }
187
188         if (tx->tx_commit_cb_taskq != NULL)
189                 taskq_destroy(tx->tx_commit_cb_taskq);
190
191         kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
192
193         bzero(tx, sizeof (tx_state_t));
194 }
195
196 /*
197  * Start syncing transaction groups.
198  */
199 void
200 txg_sync_start(dsl_pool_t *dp)
201 {
202         tx_state_t *tx = &dp->dp_tx;
203
204         mutex_enter(&tx->tx_sync_lock);
205
206         dprintf("pool %p\n", dp);
207
208         ASSERT0(tx->tx_threads);
209
210         tx->tx_threads = 2;
211
212         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
213             dp, 0, spa_proc(dp->dp_spa), TS_RUN, minclsyspri);
214
215         /*
216          * The sync thread can need a larger-than-default stack size on
217          * 32-bit x86.  This is due in part to nested pools and
218          * scrub_visitbp() recursion.
219          */
220         tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
221             dp, 0, spa_proc(dp->dp_spa), TS_RUN, minclsyspri);
222
223         mutex_exit(&tx->tx_sync_lock);
224 }
225
226 static void
227 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
228 {
229         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
230         mutex_enter(&tx->tx_sync_lock);
231 }
232
233 static void
234 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
235 {
236         ASSERT(*tpp != NULL);
237         *tpp = NULL;
238         tx->tx_threads--;
239         cv_broadcast(&tx->tx_exit_cv);
240         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
241         thread_exit();
242 }
243
244 static void
245 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time)
246 {
247         CALLB_CPR_SAFE_BEGIN(cpr);
248
249         if (time)
250                 (void) cv_timedwait(cv, &tx->tx_sync_lock, time);
251         else
252                 cv_wait(cv, &tx->tx_sync_lock);
253
254         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
255 }
256
257 /*
258  * Stop syncing transaction groups.
259  */
260 void
261 txg_sync_stop(dsl_pool_t *dp)
262 {
263         tx_state_t *tx = &dp->dp_tx;
264
265         dprintf("pool %p\n", dp);
266         /*
267          * Finish off any work in progress.
268          */
269         ASSERT3U(tx->tx_threads, ==, 2);
270
271         /*
272          * We need to ensure that we've vacated the deferred space_maps.
273          */
274         txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
275
276         /*
277          * Wake all sync threads and wait for them to die.
278          */
279         mutex_enter(&tx->tx_sync_lock);
280
281         ASSERT3U(tx->tx_threads, ==, 2);
282
283         tx->tx_exiting = 1;
284
285         cv_broadcast(&tx->tx_quiesce_more_cv);
286         cv_broadcast(&tx->tx_quiesce_done_cv);
287         cv_broadcast(&tx->tx_sync_more_cv);
288
289         while (tx->tx_threads != 0)
290                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
291
292         tx->tx_exiting = 0;
293
294         mutex_exit(&tx->tx_sync_lock);
295 }
296
297 uint64_t
298 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
299 {
300         tx_state_t *tx = &dp->dp_tx;
301         tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
302         uint64_t txg;
303
304         mutex_enter(&tc->tc_open_lock);
305         txg = tx->tx_open_txg;
306
307         mutex_enter(&tc->tc_lock);
308         tc->tc_count[txg & TXG_MASK]++;
309         mutex_exit(&tc->tc_lock);
310
311         th->th_cpu = tc;
312         th->th_txg = txg;
313
314         return (txg);
315 }
316
317 void
318 txg_rele_to_quiesce(txg_handle_t *th)
319 {
320         tx_cpu_t *tc = th->th_cpu;
321
322         ASSERT(!MUTEX_HELD(&tc->tc_lock));
323         mutex_exit(&tc->tc_open_lock);
324 }
325
326 void
327 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
328 {
329         tx_cpu_t *tc = th->th_cpu;
330         int g = th->th_txg & TXG_MASK;
331
332         mutex_enter(&tc->tc_lock);
333         list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
334         mutex_exit(&tc->tc_lock);
335 }
336
337 void
338 txg_rele_to_sync(txg_handle_t *th)
339 {
340         tx_cpu_t *tc = th->th_cpu;
341         int g = th->th_txg & TXG_MASK;
342
343         mutex_enter(&tc->tc_lock);
344         ASSERT(tc->tc_count[g] != 0);
345         if (--tc->tc_count[g] == 0)
346                 cv_broadcast(&tc->tc_cv[g]);
347         mutex_exit(&tc->tc_lock);
348
349         th->th_cpu = NULL;      /* defensive */
350 }
351
352 /*
353  * Blocks until all transactions in the group are committed.
354  *
355  * On return, the transaction group has reached a stable state in which it can
356  * then be passed off to the syncing context.
357  */
358 static __noinline void
359 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
360 {
361         tx_state_t *tx = &dp->dp_tx;
362         int g = txg & TXG_MASK;
363         int c;
364
365         /*
366          * Grab all tc_open_locks so nobody else can get into this txg.
367          */
368         for (c = 0; c < max_ncpus; c++)
369                 mutex_enter(&tx->tx_cpu[c].tc_open_lock);
370
371         ASSERT(txg == tx->tx_open_txg);
372         tx->tx_open_txg++;
373         tx->tx_open_time = gethrtime();
374
375         DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
376         DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
377
378         /*
379          * Now that we've incremented tx_open_txg, we can let threads
380          * enter the next transaction group.
381          */
382         for (c = 0; c < max_ncpus; c++)
383                 mutex_exit(&tx->tx_cpu[c].tc_open_lock);
384
385         /*
386          * Quiesce the transaction group by waiting for everyone to txg_exit().
387          */
388         for (c = 0; c < max_ncpus; c++) {
389                 tx_cpu_t *tc = &tx->tx_cpu[c];
390                 mutex_enter(&tc->tc_lock);
391                 while (tc->tc_count[g] != 0)
392                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
393                 mutex_exit(&tc->tc_lock);
394         }
395 }
396
397 static void
398 txg_do_callbacks(void *arg)
399 {
400         list_t *cb_list = arg;
401
402         dmu_tx_do_callbacks(cb_list, 0);
403
404         list_destroy(cb_list);
405
406         kmem_free(cb_list, sizeof (list_t));
407 }
408
409 /*
410  * Dispatch the commit callbacks registered on this txg to worker threads.
411  *
412  * If no callbacks are registered for a given TXG, nothing happens.
413  * This function creates a taskq for the associated pool, if needed.
414  */
415 static void
416 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
417 {
418         int c;
419         tx_state_t *tx = &dp->dp_tx;
420         list_t *cb_list;
421
422         for (c = 0; c < max_ncpus; c++) {
423                 tx_cpu_t *tc = &tx->tx_cpu[c];
424                 /*
425                  * No need to lock tx_cpu_t at this point, since this can
426                  * only be called once a txg has been synced.
427                  */
428
429                 int g = txg & TXG_MASK;
430
431                 if (list_is_empty(&tc->tc_callbacks[g]))
432                         continue;
433
434                 if (tx->tx_commit_cb_taskq == NULL) {
435                         /*
436                          * Commit callback taskq hasn't been created yet.
437                          */
438                         tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
439                             max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
440                             TASKQ_PREPOPULATE);
441                 }
442
443                 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
444                 list_create(cb_list, sizeof (dmu_tx_callback_t),
445                     offsetof(dmu_tx_callback_t, dcb_node));
446
447                 list_move_tail(cb_list, &tc->tc_callbacks[g]);
448
449                 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
450                     txg_do_callbacks, cb_list, TQ_SLEEP);
451         }
452 }
453
454 static boolean_t
455 txg_is_syncing(dsl_pool_t *dp)
456 {
457         tx_state_t *tx = &dp->dp_tx;
458         ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
459         return (tx->tx_syncing_txg != 0);
460 }
461
462 static boolean_t
463 txg_is_quiescing(dsl_pool_t *dp)
464 {
465         tx_state_t *tx = &dp->dp_tx;
466         ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
467         return (tx->tx_quiescing_txg != 0);
468 }
469
470 static boolean_t
471 txg_has_quiesced_to_sync(dsl_pool_t *dp)
472 {
473         tx_state_t *tx = &dp->dp_tx;
474         ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
475         return (tx->tx_quiesced_txg != 0);
476 }
477
478 static void
479 txg_sync_thread(void *arg)
480 {
481         dsl_pool_t *dp = arg;
482         spa_t *spa = dp->dp_spa;
483         tx_state_t *tx = &dp->dp_tx;
484         callb_cpr_t cpr;
485         uint64_t start, delta;
486
487         txg_thread_enter(tx, &cpr);
488
489         start = delta = 0;
490         for (;;) {
491                 uint64_t timeout = zfs_txg_timeout * hz;
492                 uint64_t timer;
493                 uint64_t txg;
494                 uint64_t dirty_min_bytes =
495                     zfs_dirty_data_max * zfs_dirty_data_sync_pct / 100;
496
497                 /*
498                  * We sync when we're scanning, there's someone waiting
499                  * on us, or the quiesce thread has handed off a txg to
500                  * us, or we have reached our timeout.
501                  */
502                 timer = (delta >= timeout ? 0 : timeout - delta);
503                 while (!dsl_scan_active(dp->dp_scan) &&
504                     !tx->tx_exiting && timer > 0 &&
505                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
506                     !txg_has_quiesced_to_sync(dp) &&
507                     dp->dp_dirty_total < dirty_min_bytes) {
508                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
509                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
510                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
511                         delta = ddi_get_lbolt() - start;
512                         timer = (delta > timeout ? 0 : timeout - delta);
513                 }
514
515                 /*
516                  * Wait until the quiesce thread hands off a txg to us,
517                  * prompting it to do so if necessary.
518                  */
519                 while (!tx->tx_exiting && !txg_has_quiesced_to_sync(dp)) {
520                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
521                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
522                         cv_broadcast(&tx->tx_quiesce_more_cv);
523                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
524                 }
525
526                 if (tx->tx_exiting)
527                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
528
529                 /*
530                  * Consume the quiesced txg which has been handed off to
531                  * us.  This may cause the quiescing thread to now be
532                  * able to quiesce another txg, so we must signal it.
533                  */
534                 ASSERT(tx->tx_quiesced_txg != 0);
535                 txg = tx->tx_quiesced_txg;
536                 tx->tx_quiesced_txg = 0;
537                 tx->tx_syncing_txg = txg;
538                 DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg);
539                 cv_broadcast(&tx->tx_quiesce_more_cv);
540
541                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
542                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
543                 mutex_exit(&tx->tx_sync_lock);
544
545                 start = ddi_get_lbolt();
546                 spa_sync(spa, txg);
547                 delta = ddi_get_lbolt() - start;
548
549                 mutex_enter(&tx->tx_sync_lock);
550                 tx->tx_synced_txg = txg;
551                 tx->tx_syncing_txg = 0;
552                 DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg);
553                 cv_broadcast(&tx->tx_sync_done_cv);
554
555                 /*
556                  * Dispatch commit callbacks to worker threads.
557                  */
558                 txg_dispatch_callbacks(dp, txg);
559         }
560 }
561
562 static void
563 txg_quiesce_thread(void *arg)
564 {
565         dsl_pool_t *dp = arg;
566         tx_state_t *tx = &dp->dp_tx;
567         callb_cpr_t cpr;
568
569         txg_thread_enter(tx, &cpr);
570
571         for (;;) {
572                 uint64_t txg;
573
574                 /*
575                  * We quiesce when there's someone waiting on us.
576                  * However, we can only have one txg in "quiescing" or
577                  * "quiesced, waiting to sync" state.  So we wait until
578                  * the "quiesced, waiting to sync" txg has been consumed
579                  * by the sync thread.
580                  */
581                 while (!tx->tx_exiting &&
582                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
583                     txg_has_quiesced_to_sync(dp)))
584                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
585
586                 if (tx->tx_exiting)
587                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
588
589                 txg = tx->tx_open_txg;
590                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
591                     txg, tx->tx_quiesce_txg_waiting,
592                     tx->tx_sync_txg_waiting);
593                 tx->tx_quiescing_txg = txg;
594
595                 mutex_exit(&tx->tx_sync_lock);
596                 txg_quiesce(dp, txg);
597                 mutex_enter(&tx->tx_sync_lock);
598
599                 /*
600                  * Hand this txg off to the sync thread.
601                  */
602                 dprintf("quiesce done, handing off txg %llu\n", txg);
603                 tx->tx_quiescing_txg = 0;
604                 tx->tx_quiesced_txg = txg;
605                 DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg);
606                 cv_broadcast(&tx->tx_sync_more_cv);
607                 cv_broadcast(&tx->tx_quiesce_done_cv);
608         }
609 }
610
611 /*
612  * Delay this thread by delay nanoseconds if we are still in the open
613  * transaction group and there is already a waiting txg quiesing or quiesced.
614  * Abort the delay if this txg stalls or enters the quiesing state.
615  */
616 void
617 txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
618 {
619         tx_state_t *tx = &dp->dp_tx;
620         hrtime_t start = gethrtime();
621
622         /* don't delay if this txg could transition to quiescing immediately */
623         if (tx->tx_open_txg > txg ||
624             tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
625                 return;
626
627         mutex_enter(&tx->tx_sync_lock);
628         if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
629                 mutex_exit(&tx->tx_sync_lock);
630                 return;
631         }
632
633         while (gethrtime() - start < delay &&
634             tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) {
635                 (void) cv_timedwait_hires(&tx->tx_quiesce_more_cv,
636                     &tx->tx_sync_lock, delay, resolution, 0);
637         }
638
639         mutex_exit(&tx->tx_sync_lock);
640 }
641
642 static boolean_t
643 txg_wait_synced_impl(dsl_pool_t *dp, uint64_t txg, boolean_t wait_sig)
644 {
645         tx_state_t *tx = &dp->dp_tx;
646
647         ASSERT(!dsl_pool_config_held(dp));
648
649         mutex_enter(&tx->tx_sync_lock);
650         ASSERT3U(tx->tx_threads, ==, 2);
651         if (txg == 0)
652                 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
653         if (tx->tx_sync_txg_waiting < txg)
654                 tx->tx_sync_txg_waiting = txg;
655         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
656             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
657         while (tx->tx_synced_txg < txg) {
658                 dprintf("broadcasting sync more "
659                     "tx_synced=%llu waiting=%llu dp=%p\n",
660                     tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
661                 cv_broadcast(&tx->tx_sync_more_cv);
662                 if (wait_sig) {
663                         /*
664                          * Condition wait here but stop if the thread receives a
665                          * signal. The caller may call txg_wait_synced*() again
666                          * to resume waiting for this txg.
667                          */
668 #ifdef __FreeBSD__
669                         /*
670                          * FreeBSD returns EINTR or ERESTART if there is
671                          * a pending signal, zero if the conditional variable
672                          * is signaled.  illumos returns zero in the former case
673                          * and >0 in the latter.
674                          */
675                         if (cv_wait_sig(&tx->tx_sync_done_cv,
676                             &tx->tx_sync_lock) != 0) {
677 #else
678                         if (cv_wait_sig(&tx->tx_sync_done_cv,
679                             &tx->tx_sync_lock) == 0) {
680 #endif
681
682                                 mutex_exit(&tx->tx_sync_lock);
683                                 return (B_TRUE);
684                         }
685                 } else {
686                         cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
687                 }
688         }
689         mutex_exit(&tx->tx_sync_lock);
690         return (B_FALSE);
691 }
692
693 void
694 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
695 {
696         VERIFY0(txg_wait_synced_impl(dp, txg, B_FALSE));
697 }
698
699 /*
700  * Similar to a txg_wait_synced but it can be interrupted from a signal.
701  * Returns B_TRUE if the thread was signaled while waiting.
702  */
703 boolean_t
704 txg_wait_synced_sig(dsl_pool_t *dp, uint64_t txg)
705 {
706         return (txg_wait_synced_impl(dp, txg, B_TRUE));
707 }
708
709 void
710 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
711 {
712         tx_state_t *tx = &dp->dp_tx;
713
714         ASSERT(!dsl_pool_config_held(dp));
715
716         mutex_enter(&tx->tx_sync_lock);
717         ASSERT3U(tx->tx_threads, ==, 2);
718         if (txg == 0)
719                 txg = tx->tx_open_txg + 1;
720         if (tx->tx_quiesce_txg_waiting < txg)
721                 tx->tx_quiesce_txg_waiting = txg;
722         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
723             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
724         while (tx->tx_open_txg < txg) {
725                 cv_broadcast(&tx->tx_quiesce_more_cv);
726                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
727         }
728         mutex_exit(&tx->tx_sync_lock);
729 }
730
731 /*
732  * If there isn't a txg syncing or in the pipeline, push another txg through
733  * the pipeline by queiscing the open txg.
734  */
735 void
736 txg_kick(dsl_pool_t *dp)
737 {
738         tx_state_t *tx = &dp->dp_tx;
739
740         ASSERT(!dsl_pool_config_held(dp));
741
742         mutex_enter(&tx->tx_sync_lock);
743         if (!txg_is_syncing(dp) &&
744             !txg_is_quiescing(dp) &&
745             tx->tx_quiesce_txg_waiting <= tx->tx_open_txg &&
746             tx->tx_sync_txg_waiting <= tx->tx_synced_txg &&
747             tx->tx_quiesced_txg <= tx->tx_synced_txg) {
748                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg + 1;
749                 cv_broadcast(&tx->tx_quiesce_more_cv);
750         }
751         mutex_exit(&tx->tx_sync_lock);
752 }
753
754 boolean_t
755 txg_stalled(dsl_pool_t *dp)
756 {
757         tx_state_t *tx = &dp->dp_tx;
758         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
759 }
760
761 boolean_t
762 txg_sync_waiting(dsl_pool_t *dp)
763 {
764         tx_state_t *tx = &dp->dp_tx;
765
766         return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
767             tx->tx_quiesced_txg != 0);
768 }
769
770 /*
771  * Verify that this txg is active (open, quiescing, syncing).  Non-active
772  * txg's should not be manipulated.
773  */
774 void
775 txg_verify(spa_t *spa, uint64_t txg)
776 {
777         dsl_pool_t *dp = spa_get_dsl(spa);
778         if (txg <= TXG_INITIAL || txg == ZILTEST_TXG)
779                 return;
780         ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg);
781         ASSERT3U(txg, >=, dp->dp_tx.tx_synced_txg);
782         ASSERT3U(txg, >=, dp->dp_tx.tx_open_txg - TXG_CONCURRENT_STATES);
783 }
784
785 /*
786  * Per-txg object lists.
787  */
788 void
789 txg_list_create(txg_list_t *tl, spa_t *spa, size_t offset)
790 {
791         int t;
792
793         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
794
795         tl->tl_offset = offset;
796         tl->tl_spa = spa;
797
798         for (t = 0; t < TXG_SIZE; t++)
799                 tl->tl_head[t] = NULL;
800 }
801
802 void
803 txg_list_destroy(txg_list_t *tl)
804 {
805         int t;
806
807         for (t = 0; t < TXG_SIZE; t++)
808                 ASSERT(txg_list_empty(tl, t));
809
810         mutex_destroy(&tl->tl_lock);
811 }
812
813 boolean_t
814 txg_list_empty(txg_list_t *tl, uint64_t txg)
815 {
816         txg_verify(tl->tl_spa, txg);
817         return (tl->tl_head[txg & TXG_MASK] == NULL);
818 }
819
820 /*
821  * Returns true if all txg lists are empty.
822  *
823  * Warning: this is inherently racy (an item could be added immediately
824  * after this function returns). We don't bother with the lock because
825  * it wouldn't change the semantics.
826  */
827 boolean_t
828 txg_all_lists_empty(txg_list_t *tl)
829 {
830         for (int i = 0; i < TXG_SIZE; i++) {
831                 if (!txg_list_empty(tl, i)) {
832                         return (B_FALSE);
833                 }
834         }
835         return (B_TRUE);
836 }
837
838 /*
839  * Add an entry to the list (unless it's already on the list).
840  * Returns B_TRUE if it was actually added.
841  */
842 boolean_t
843 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
844 {
845         int t = txg & TXG_MASK;
846         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
847         boolean_t add;
848
849         txg_verify(tl->tl_spa, txg);
850         mutex_enter(&tl->tl_lock);
851         add = (tn->tn_member[t] == 0);
852         if (add) {
853                 tn->tn_member[t] = 1;
854                 tn->tn_next[t] = tl->tl_head[t];
855                 tl->tl_head[t] = tn;
856         }
857         mutex_exit(&tl->tl_lock);
858
859         return (add);
860 }
861
862 /*
863  * Add an entry to the end of the list, unless it's already on the list.
864  * (walks list to find end)
865  * Returns B_TRUE if it was actually added.
866  */
867 boolean_t
868 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
869 {
870         int t = txg & TXG_MASK;
871         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
872         boolean_t add;
873
874         txg_verify(tl->tl_spa, txg);
875         mutex_enter(&tl->tl_lock);
876         add = (tn->tn_member[t] == 0);
877         if (add) {
878                 txg_node_t **tp;
879
880                 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
881                         continue;
882
883                 tn->tn_member[t] = 1;
884                 tn->tn_next[t] = NULL;
885                 *tp = tn;
886         }
887         mutex_exit(&tl->tl_lock);
888
889         return (add);
890 }
891
892 /*
893  * Remove the head of the list and return it.
894  */
895 void *
896 txg_list_remove(txg_list_t *tl, uint64_t txg)
897 {
898         int t = txg & TXG_MASK;
899         txg_node_t *tn;
900         void *p = NULL;
901
902         txg_verify(tl->tl_spa, txg);
903         mutex_enter(&tl->tl_lock);
904         if ((tn = tl->tl_head[t]) != NULL) {
905                 ASSERT(tn->tn_member[t]);
906                 ASSERT(tn->tn_next[t] == NULL || tn->tn_next[t]->tn_member[t]);
907                 p = (char *)tn - tl->tl_offset;
908                 tl->tl_head[t] = tn->tn_next[t];
909                 tn->tn_next[t] = NULL;
910                 tn->tn_member[t] = 0;
911         }
912         mutex_exit(&tl->tl_lock);
913
914         return (p);
915 }
916
917 /*
918  * Remove a specific item from the list and return it.
919  */
920 void *
921 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
922 {
923         int t = txg & TXG_MASK;
924         txg_node_t *tn, **tp;
925
926         txg_verify(tl->tl_spa, txg);
927         mutex_enter(&tl->tl_lock);
928
929         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
930                 if ((char *)tn - tl->tl_offset == p) {
931                         *tp = tn->tn_next[t];
932                         tn->tn_next[t] = NULL;
933                         tn->tn_member[t] = 0;
934                         mutex_exit(&tl->tl_lock);
935                         return (p);
936                 }
937         }
938
939         mutex_exit(&tl->tl_lock);
940
941         return (NULL);
942 }
943
944 boolean_t
945 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
946 {
947         int t = txg & TXG_MASK;
948         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
949
950         txg_verify(tl->tl_spa, txg);
951         return (tn->tn_member[t] != 0);
952 }
953
954 /*
955  * Walk a txg list -- only safe if you know it's not changing.
956  */
957 void *
958 txg_list_head(txg_list_t *tl, uint64_t txg)
959 {
960         int t = txg & TXG_MASK;
961         txg_node_t *tn = tl->tl_head[t];
962
963         txg_verify(tl->tl_spa, txg);
964         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
965 }
966
967 void *
968 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
969 {
970         int t = txg & TXG_MASK;
971         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
972
973         txg_verify(tl->tl_spa, txg);
974         tn = tn->tn_next[t];
975
976         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
977 }