dm vdo: fix how dm_kcopyd_client_create() failure is checked
[linux.git] / drivers / md / dm-vdo / slab-depot.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5
6 #include "slab-depot.h"
7
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 #include <linux/err.h>
11 #include <linux/log2.h>
12 #include <linux/min_heap.h>
13 #include <linux/minmax.h>
14
15 #include "logger.h"
16 #include "memory-alloc.h"
17 #include "numeric.h"
18 #include "permassert.h"
19 #include "string-utils.h"
20
21 #include "action-manager.h"
22 #include "admin-state.h"
23 #include "completion.h"
24 #include "constants.h"
25 #include "data-vio.h"
26 #include "encodings.h"
27 #include "io-submitter.h"
28 #include "physical-zone.h"
29 #include "priority-table.h"
30 #include "recovery-journal.h"
31 #include "repair.h"
32 #include "status-codes.h"
33 #include "types.h"
34 #include "vdo.h"
35 #include "vio.h"
36 #include "wait-queue.h"
37
38 static const u64 BYTES_PER_WORD = sizeof(u64);
39 static const bool NORMAL_OPERATION = true;
40
41 /**
42  * get_lock() - Get the lock object for a slab journal block by sequence number.
43  * @journal: vdo_slab journal to retrieve from.
44  * @sequence_number: Sequence number of the block.
45  *
46  * Return: The lock object for the given sequence number.
47  */
48 static inline struct journal_lock * __must_check get_lock(struct slab_journal *journal,
49                                                           sequence_number_t sequence_number)
50 {
51         return &journal->locks[sequence_number % journal->size];
52 }
53
54 static bool is_slab_open(struct vdo_slab *slab)
55 {
56         return (!vdo_is_state_quiescing(&slab->state) &&
57                 !vdo_is_state_quiescent(&slab->state));
58 }
59
60 /**
61  * must_make_entries_to_flush() - Check whether there are entry waiters which should delay a flush.
62  * @journal: The journal to check.
63  *
64  * Return: true if there are no entry waiters, or if the slab is unrecovered.
65  */
66 static inline bool __must_check must_make_entries_to_flush(struct slab_journal *journal)
67 {
68         return ((journal->slab->status != VDO_SLAB_REBUILDING) &&
69                 vdo_waitq_has_waiters(&journal->entry_waiters));
70 }
71
72 /**
73  * is_reaping() - Check whether a reap is currently in progress.
74  * @journal: The journal which may be reaping.
75  *
76  * Return: true if the journal is reaping.
77  */
78 static inline bool __must_check is_reaping(struct slab_journal *journal)
79 {
80         return (journal->head != journal->unreapable);
81 }
82
83 /**
84  * initialize_tail_block() - Initialize tail block as a new block.
85  * @journal: The journal whose tail block is being initialized.
86  */
87 static void initialize_tail_block(struct slab_journal *journal)
88 {
89         struct slab_journal_block_header *header = &journal->tail_header;
90
91         header->sequence_number = journal->tail;
92         header->entry_count = 0;
93         header->has_block_map_increments = false;
94 }
95
96 /**
97  * initialize_journal_state() - Set all journal fields appropriately to start journaling.
98  * @journal: The journal to be reset, based on its tail sequence number.
99  */
100 static void initialize_journal_state(struct slab_journal *journal)
101 {
102         journal->unreapable = journal->head;
103         journal->reap_lock = get_lock(journal, journal->unreapable);
104         journal->next_commit = journal->tail;
105         journal->summarized = journal->last_summarized = journal->tail;
106         initialize_tail_block(journal);
107 }
108
109 /**
110  * block_is_full() - Check whether a journal block is full.
111  * @journal: The slab journal for the block.
112  *
113  * Return: true if the tail block is full.
114  */
115 static bool __must_check block_is_full(struct slab_journal *journal)
116 {
117         journal_entry_count_t count = journal->tail_header.entry_count;
118
119         return (journal->tail_header.has_block_map_increments ?
120                 (journal->full_entries_per_block == count) :
121                 (journal->entries_per_block == count));
122 }
123
124 static void add_entries(struct slab_journal *journal);
125 static void update_tail_block_location(struct slab_journal *journal);
126 static void release_journal_locks(struct vdo_waiter *waiter, void *context);
127
128 /**
129  * is_slab_journal_blank() - Check whether a slab's journal is blank.
130  *
131  * A slab journal is blank if it has never had any entries recorded in it.
132  *
133  * Return: true if the slab's journal has never been modified.
134  */
135 static bool is_slab_journal_blank(const struct vdo_slab *slab)
136 {
137         return ((slab->journal.tail == 1) &&
138                 (slab->journal.tail_header.entry_count == 0));
139 }
140
141 /**
142  * mark_slab_journal_dirty() - Put a slab journal on the dirty ring of its allocator in the correct
143  *                             order.
144  * @journal: The journal to be marked dirty.
145  * @lock: The recovery journal lock held by the slab journal.
146  */
147 static void mark_slab_journal_dirty(struct slab_journal *journal, sequence_number_t lock)
148 {
149         struct slab_journal *dirty_journal;
150         struct list_head *dirty_list = &journal->slab->allocator->dirty_slab_journals;
151
152         ASSERT_LOG_ONLY(journal->recovery_lock == 0, "slab journal was clean");
153
154         journal->recovery_lock = lock;
155         list_for_each_entry_reverse(dirty_journal, dirty_list, dirty_entry) {
156                 if (dirty_journal->recovery_lock <= journal->recovery_lock)
157                         break;
158         }
159
160         list_move_tail(&journal->dirty_entry, dirty_journal->dirty_entry.next);
161 }
162
163 static void mark_slab_journal_clean(struct slab_journal *journal)
164 {
165         journal->recovery_lock = 0;
166         list_del_init(&journal->dirty_entry);
167 }
168
169 static void check_if_slab_drained(struct vdo_slab *slab)
170 {
171         bool read_only;
172         struct slab_journal *journal = &slab->journal;
173         const struct admin_state_code *code;
174
175         if (!vdo_is_state_draining(&slab->state) ||
176             must_make_entries_to_flush(journal) ||
177             is_reaping(journal) ||
178             journal->waiting_to_commit ||
179             !list_empty(&journal->uncommitted_blocks) ||
180             journal->updating_slab_summary ||
181             (slab->active_count > 0))
182                 return;
183
184         /* When not suspending or recovering, the slab must be clean. */
185         code = vdo_get_admin_state_code(&slab->state);
186         read_only = vdo_is_read_only(slab->allocator->depot->vdo);
187         if (!read_only &&
188             vdo_waitq_has_waiters(&slab->dirty_blocks) &&
189             (code != VDO_ADMIN_STATE_SUSPENDING) &&
190             (code != VDO_ADMIN_STATE_RECOVERING))
191                 return;
192
193         vdo_finish_draining_with_result(&slab->state,
194                                         (read_only ? VDO_READ_ONLY : VDO_SUCCESS));
195 }
196
197 /* FULLNESS HINT COMPUTATION */
198
199 /**
200  * compute_fullness_hint() - Translate a slab's free block count into a 'fullness hint' that can be
201  *                           stored in a slab_summary_entry's 7 bits that are dedicated to its free
202  *                           count.
203  * @depot: The depot whose summary being updated.
204  * @free_blocks: The number of free blocks.
205  *
206  * Note: the number of free blocks must be strictly less than 2^23 blocks, even though
207  * theoretically slabs could contain precisely 2^23 blocks; there is an assumption that at least
208  * one block is used by metadata. This assumption is necessary; otherwise, the fullness hint might
209  * overflow. The fullness hint formula is roughly (fullness >> 16) & 0x7f, but (2^23 >> 16) & 0x7f
210  * is 0, which would make it impossible to distinguish completely full from completely empty.
211  *
212  * Return: A fullness hint, which can be stored in 7 bits.
213  */
214 static u8 __must_check compute_fullness_hint(struct slab_depot *depot,
215                                              block_count_t free_blocks)
216 {
217         block_count_t hint;
218
219         ASSERT_LOG_ONLY((free_blocks < (1 << 23)), "free blocks must be less than 2^23");
220
221         if (free_blocks == 0)
222                 return 0;
223
224         hint = free_blocks >> depot->hint_shift;
225         return ((hint == 0) ? 1 : hint);
226 }
227
228 /**
229  * check_summary_drain_complete() - Check whether an allocators summary has finished draining.
230  */
231 static void check_summary_drain_complete(struct block_allocator *allocator)
232 {
233         if (!vdo_is_state_draining(&allocator->summary_state) ||
234             (allocator->summary_write_count > 0))
235                 return;
236
237         vdo_finish_operation(&allocator->summary_state,
238                              (vdo_is_read_only(allocator->depot->vdo) ?
239                               VDO_READ_ONLY : VDO_SUCCESS));
240 }
241
242 /**
243  * notify_summary_waiters() - Wake all the waiters in a given queue.
244  * @allocator: The block allocator summary which owns the queue.
245  * @queue: The queue to notify.
246  */
247 static void notify_summary_waiters(struct block_allocator *allocator,
248                                    struct vdo_wait_queue *queue)
249 {
250         int result = (vdo_is_read_only(allocator->depot->vdo) ?
251                       VDO_READ_ONLY : VDO_SUCCESS);
252
253         vdo_waitq_notify_all_waiters(queue, NULL, &result);
254 }
255
256 static void launch_write(struct slab_summary_block *summary_block);
257
258 /**
259  * finish_updating_slab_summary_block() - Finish processing a block which attempted to write,
260  *                                        whether or not the attempt succeeded.
261  * @block: The block.
262  */
263 static void finish_updating_slab_summary_block(struct slab_summary_block *block)
264 {
265         notify_summary_waiters(block->allocator, &block->current_update_waiters);
266         block->writing = false;
267         block->allocator->summary_write_count--;
268         if (vdo_waitq_has_waiters(&block->next_update_waiters))
269                 launch_write(block);
270         else
271                 check_summary_drain_complete(block->allocator);
272 }
273
274 /**
275  * finish_update() - This is the callback for a successful summary block write.
276  * @completion: The write vio.
277  */
278 static void finish_update(struct vdo_completion *completion)
279 {
280         struct slab_summary_block *block =
281                 container_of(as_vio(completion), struct slab_summary_block, vio);
282
283         atomic64_inc(&block->allocator->depot->summary_statistics.blocks_written);
284         finish_updating_slab_summary_block(block);
285 }
286
287 /**
288  * handle_write_error() - Handle an error writing a slab summary block.
289  * @completion: The write VIO.
290  */
291 static void handle_write_error(struct vdo_completion *completion)
292 {
293         struct slab_summary_block *block =
294                 container_of(as_vio(completion), struct slab_summary_block, vio);
295
296         vio_record_metadata_io_error(as_vio(completion));
297         vdo_enter_read_only_mode(completion->vdo, completion->result);
298         finish_updating_slab_summary_block(block);
299 }
300
301 static void write_slab_summary_endio(struct bio *bio)
302 {
303         struct vio *vio = bio->bi_private;
304         struct slab_summary_block *block =
305                 container_of(vio, struct slab_summary_block, vio);
306
307         continue_vio_after_io(vio, finish_update, block->allocator->thread_id);
308 }
309
310 /**
311  * launch_write() - Write a slab summary block unless it is currently out for writing.
312  * @block: The block that needs to be committed.
313  */
314 static void launch_write(struct slab_summary_block *block)
315 {
316         struct block_allocator *allocator = block->allocator;
317         struct slab_depot *depot = allocator->depot;
318         physical_block_number_t pbn;
319
320         if (block->writing)
321                 return;
322
323         allocator->summary_write_count++;
324         vdo_waitq_transfer_all_waiters(&block->next_update_waiters,
325                                        &block->current_update_waiters);
326         block->writing = true;
327
328         if (vdo_is_read_only(depot->vdo)) {
329                 finish_updating_slab_summary_block(block);
330                 return;
331         }
332
333         memcpy(block->outgoing_entries, block->entries, VDO_BLOCK_SIZE);
334
335         /*
336          * Flush before writing to ensure that the slab journal tail blocks and reference updates
337          * covered by this summary update are stable (VDO-2332).
338          */
339         pbn = (depot->summary_origin +
340                (VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE * allocator->zone_number) +
341                block->index);
342         vdo_submit_metadata_vio(&block->vio, pbn, write_slab_summary_endio,
343                                 handle_write_error, REQ_OP_WRITE | REQ_PREFLUSH);
344 }
345
346 /**
347  * update_slab_summary_entry() - Update the entry for a slab.
348  * @slab: The slab whose entry is to be updated
349  * @waiter: The waiter that is updating the summary.
350  * @tail_block_offset: The offset of the slab journal's tail block.
351  * @load_ref_counts: Whether the reference counts must be loaded from disk on the vdo load.
352  * @is_clean: Whether the slab is clean.
353  * @free_blocks: The number of free blocks.
354  */
355 static void update_slab_summary_entry(struct vdo_slab *slab, struct vdo_waiter *waiter,
356                                       tail_block_offset_t tail_block_offset,
357                                       bool load_ref_counts, bool is_clean,
358                                       block_count_t free_blocks)
359 {
360         u8 index = slab->slab_number / VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK;
361         struct block_allocator *allocator = slab->allocator;
362         struct slab_summary_block *block = &allocator->summary_blocks[index];
363         int result;
364         struct slab_summary_entry *entry;
365
366         if (vdo_is_read_only(block->vio.completion.vdo)) {
367                 result = VDO_READ_ONLY;
368                 waiter->callback(waiter, &result);
369                 return;
370         }
371
372         if (vdo_is_state_draining(&allocator->summary_state) ||
373             vdo_is_state_quiescent(&allocator->summary_state)) {
374                 result = VDO_INVALID_ADMIN_STATE;
375                 waiter->callback(waiter, &result);
376                 return;
377         }
378
379         entry = &allocator->summary_entries[slab->slab_number];
380         *entry = (struct slab_summary_entry) {
381                 .tail_block_offset = tail_block_offset,
382                 .load_ref_counts = (entry->load_ref_counts || load_ref_counts),
383                 .is_dirty = !is_clean,
384                 .fullness_hint = compute_fullness_hint(allocator->depot, free_blocks),
385         };
386         vdo_waitq_enqueue_waiter(&block->next_update_waiters, waiter);
387         launch_write(block);
388 }
389
390 /**
391  * finish_reaping() - Actually advance the head of the journal now that any necessary flushes are
392  *                    complete.
393  * @journal: The journal to be reaped.
394  */
395 static void finish_reaping(struct slab_journal *journal)
396 {
397         journal->head = journal->unreapable;
398         add_entries(journal);
399         check_if_slab_drained(journal->slab);
400 }
401
402 static void reap_slab_journal(struct slab_journal *journal);
403
404 /**
405  * complete_reaping() - Finish reaping now that we have flushed the lower layer and then try
406  *                      reaping again in case we deferred reaping due to an outstanding vio.
407  * @completion: The flush vio.
408  */
409 static void complete_reaping(struct vdo_completion *completion)
410 {
411         struct slab_journal *journal = completion->parent;
412
413         return_vio_to_pool(journal->slab->allocator->vio_pool,
414                            vio_as_pooled_vio(as_vio(uds_forget(completion))));
415         finish_reaping(journal);
416         reap_slab_journal(journal);
417 }
418
419 /**
420  * handle_flush_error() - Handle an error flushing the lower layer.
421  * @completion: The flush vio.
422  */
423 static void handle_flush_error(struct vdo_completion *completion)
424 {
425         vio_record_metadata_io_error(as_vio(completion));
426         vdo_enter_read_only_mode(completion->vdo, completion->result);
427         complete_reaping(completion);
428 }
429
430 static void flush_endio(struct bio *bio)
431 {
432         struct vio *vio = bio->bi_private;
433         struct slab_journal *journal = vio->completion.parent;
434
435         continue_vio_after_io(vio, complete_reaping,
436                               journal->slab->allocator->thread_id);
437 }
438
439 /**
440  * flush_for_reaping() - A waiter callback for getting a vio with which to flush the lower layer
441  *                       prior to reaping.
442  * @waiter: The journal as a flush waiter.
443  * @context: The newly acquired flush vio.
444  */
445 static void flush_for_reaping(struct vdo_waiter *waiter, void *context)
446 {
447         struct slab_journal *journal =
448                 container_of(waiter, struct slab_journal, flush_waiter);
449         struct pooled_vio *pooled = context;
450         struct vio *vio = &pooled->vio;
451
452         vio->completion.parent = journal;
453         vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
454 }
455
456 /**
457  * reap_slab_journal() - Conduct a reap on a slab journal to reclaim unreferenced blocks.
458  * @journal: The slab journal.
459  */
460 static void reap_slab_journal(struct slab_journal *journal)
461 {
462         bool reaped = false;
463
464         if (is_reaping(journal)) {
465                 /* We already have a reap in progress so wait for it to finish. */
466                 return;
467         }
468
469         if ((journal->slab->status != VDO_SLAB_REBUILT) ||
470             !vdo_is_state_normal(&journal->slab->state) ||
471             vdo_is_read_only(journal->slab->allocator->depot->vdo)) {
472                 /*
473                  * We must not reap in the first two cases, and there's no point in read-only mode.
474                  */
475                 return;
476         }
477
478         /*
479          * Start reclaiming blocks only when the journal head has no references. Then stop when a
480          * block is referenced or reap reaches the most recently written block, referenced by the
481          * slab summary, which has the sequence number just before the tail.
482          */
483         while ((journal->unreapable < journal->tail) && (journal->reap_lock->count == 0)) {
484                 reaped = true;
485                 journal->unreapable++;
486                 journal->reap_lock++;
487                 if (journal->reap_lock == &journal->locks[journal->size])
488                         journal->reap_lock = &journal->locks[0];
489         }
490
491         if (!reaped)
492                 return;
493
494         /*
495          * It is never safe to reap a slab journal block without first issuing a flush, regardless
496          * of whether a user flush has been received or not. In the absence of the flush, the
497          * reference block write which released the locks allowing the slab journal to reap may not
498          * be persisted. Although slab summary writes will eventually issue flushes, multiple slab
499          * journal block writes can be issued while previous slab summary updates have not yet been
500          * made. Even though those slab journal block writes will be ignored if the slab summary
501          * update is not persisted, they may still overwrite the to-be-reaped slab journal block
502          * resulting in a loss of reference count updates (VDO-2912).
503          */
504         journal->flush_waiter.callback = flush_for_reaping;
505         acquire_vio_from_pool(journal->slab->allocator->vio_pool,
506                               &journal->flush_waiter);
507 }
508
509 /**
510  * adjust_slab_journal_block_reference() - Adjust the reference count for a slab journal block.
511  * @journal: The slab journal.
512  * @sequence_number: The journal sequence number of the referenced block.
513  * @adjustment: Amount to adjust the reference counter.
514  *
515  * Note that when the adjustment is negative, the slab journal will be reaped.
516  */
517 static void adjust_slab_journal_block_reference(struct slab_journal *journal,
518                                                 sequence_number_t sequence_number,
519                                                 int adjustment)
520 {
521         struct journal_lock *lock;
522
523         if (sequence_number == 0)
524                 return;
525
526         if (journal->slab->status == VDO_SLAB_REPLAYING) {
527                 /* Locks should not be used during offline replay. */
528                 return;
529         }
530
531         ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero");
532         lock = get_lock(journal, sequence_number);
533         if (adjustment < 0) {
534                 ASSERT_LOG_ONLY((-adjustment <= lock->count),
535                                 "adjustment %d of lock count %u for slab journal block %llu must not underflow",
536                                 adjustment, lock->count,
537                                 (unsigned long long) sequence_number);
538         }
539
540         lock->count += adjustment;
541         if (lock->count == 0)
542                 reap_slab_journal(journal);
543 }
544
545 /**
546  * release_journal_locks() - Callback invoked after a slab summary update completes.
547  * @waiter: The slab summary waiter that has just been notified.
548  * @context: The result code of the update.
549  *
550  * Registered in the constructor on behalf of update_tail_block_location().
551  *
552  * Implements waiter_callback_fn.
553  */
554 static void release_journal_locks(struct vdo_waiter *waiter, void *context)
555 {
556         sequence_number_t first, i;
557         struct slab_journal *journal =
558                 container_of(waiter, struct slab_journal, slab_summary_waiter);
559         int result = *((int *) context);
560
561         if (result != VDO_SUCCESS) {
562                 if (result != VDO_READ_ONLY) {
563                         /*
564                          * Don't bother logging what might be lots of errors if we are already in
565                          * read-only mode.
566                          */
567                         uds_log_error_strerror(result, "failed slab summary update %llu",
568                                                (unsigned long long) journal->summarized);
569                 }
570
571                 journal->updating_slab_summary = false;
572                 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
573                 check_if_slab_drained(journal->slab);
574                 return;
575         }
576
577         if (journal->partial_write_in_progress && (journal->summarized == journal->tail)) {
578                 journal->partial_write_in_progress = false;
579                 add_entries(journal);
580         }
581
582         first = journal->last_summarized;
583         journal->last_summarized = journal->summarized;
584         for (i = journal->summarized - 1; i >= first; i--) {
585                 /*
586                  * Release the lock the summarized block held on the recovery journal. (During
587                  * replay, recovery_start will always be 0.)
588                  */
589                 if (journal->recovery_journal != NULL) {
590                         zone_count_t zone_number = journal->slab->allocator->zone_number;
591                         struct journal_lock *lock = get_lock(journal, i);
592
593                         vdo_release_recovery_journal_block_reference(journal->recovery_journal,
594                                                                      lock->recovery_start,
595                                                                      VDO_ZONE_TYPE_PHYSICAL,
596                                                                      zone_number);
597                 }
598
599                 /*
600                  * Release our own lock against reaping for blocks that are committed. (This
601                  * function will not change locks during replay.)
602                  */
603                 adjust_slab_journal_block_reference(journal, i, -1);
604         }
605
606         journal->updating_slab_summary = false;
607
608         reap_slab_journal(journal);
609
610         /* Check if the slab summary needs to be updated again. */
611         update_tail_block_location(journal);
612 }
613
614 /**
615  * update_tail_block_location() - Update the tail block location in the slab summary, if necessary.
616  * @journal: The slab journal that is updating its tail block location.
617  */
618 static void update_tail_block_location(struct slab_journal *journal)
619 {
620         block_count_t free_block_count;
621         struct vdo_slab *slab = journal->slab;
622
623         if (journal->updating_slab_summary ||
624             vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
625             (journal->last_summarized >= journal->next_commit)) {
626                 check_if_slab_drained(slab);
627                 return;
628         }
629
630         if (slab->status != VDO_SLAB_REBUILT) {
631                 u8 hint = slab->allocator->summary_entries[slab->slab_number].fullness_hint;
632
633                 free_block_count = ((block_count_t) hint) << slab->allocator->depot->hint_shift;
634         } else {
635                 free_block_count = slab->free_blocks;
636         }
637
638         journal->summarized = journal->next_commit;
639         journal->updating_slab_summary = true;
640
641         /*
642          * Update slab summary as dirty.
643          * vdo_slab journal can only reap past sequence number 1 when all the ref counts for this
644          * slab have been written to the layer. Therefore, indicate that the ref counts must be
645          * loaded when the journal head has reaped past sequence number 1.
646          */
647         update_slab_summary_entry(slab, &journal->slab_summary_waiter,
648                                   journal->summarized % journal->size,
649                                   (journal->head > 1), false, free_block_count);
650 }
651
652 /**
653  * reopen_slab_journal() - Reopen a slab's journal by emptying it and then adding pending entries.
654  */
655 static void reopen_slab_journal(struct vdo_slab *slab)
656 {
657         struct slab_journal *journal = &slab->journal;
658         sequence_number_t block;
659
660         ASSERT_LOG_ONLY(journal->tail_header.entry_count == 0,
661                         "vdo_slab journal's active block empty before reopening");
662         journal->head = journal->tail;
663         initialize_journal_state(journal);
664
665         /* Ensure no locks are spuriously held on an empty journal. */
666         for (block = 1; block <= journal->size; block++) {
667                 ASSERT_LOG_ONLY((get_lock(journal, block)->count == 0),
668                                 "Scrubbed journal's block %llu is not locked",
669                                 (unsigned long long) block);
670         }
671
672         add_entries(journal);
673 }
674
675 static sequence_number_t get_committing_sequence_number(const struct pooled_vio *vio)
676 {
677         const struct packed_slab_journal_block *block =
678                 (const struct packed_slab_journal_block *) vio->vio.data;
679
680         return __le64_to_cpu(block->header.sequence_number);
681 }
682
683 /**
684  * complete_write() - Handle post-commit processing.
685  * @completion: The write vio as a completion.
686  *
687  * This is the callback registered by write_slab_journal_block().
688  */
689 static void complete_write(struct vdo_completion *completion)
690 {
691         int result = completion->result;
692         struct pooled_vio *pooled = vio_as_pooled_vio(as_vio(completion));
693         struct slab_journal *journal = completion->parent;
694         sequence_number_t committed = get_committing_sequence_number(pooled);
695
696         list_del_init(&pooled->list_entry);
697         return_vio_to_pool(journal->slab->allocator->vio_pool, uds_forget(pooled));
698
699         if (result != VDO_SUCCESS) {
700                 vio_record_metadata_io_error(as_vio(completion));
701                 uds_log_error_strerror(result, "cannot write slab journal block %llu",
702                                        (unsigned long long) committed);
703                 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
704                 check_if_slab_drained(journal->slab);
705                 return;
706         }
707
708         WRITE_ONCE(journal->events->blocks_written, journal->events->blocks_written + 1);
709
710         if (list_empty(&journal->uncommitted_blocks)) {
711                 /* If no blocks are outstanding, then the commit point is at the tail. */
712                 journal->next_commit = journal->tail;
713         } else {
714                 /* The commit point is always the beginning of the oldest incomplete block. */
715                 pooled = container_of(journal->uncommitted_blocks.next,
716                                       struct pooled_vio, list_entry);
717                 journal->next_commit = get_committing_sequence_number(pooled);
718         }
719
720         update_tail_block_location(journal);
721 }
722
723 static void write_slab_journal_endio(struct bio *bio)
724 {
725         struct vio *vio = bio->bi_private;
726         struct slab_journal *journal = vio->completion.parent;
727
728         continue_vio_after_io(vio, complete_write, journal->slab->allocator->thread_id);
729 }
730
731 /**
732  * write_slab_journal_block() - Write a slab journal block.
733  * @waiter: The vio pool waiter which was just notified.
734  * @context: The vio pool entry for the write.
735  *
736  * Callback from acquire_vio_from_pool() registered in commit_tail().
737  */
738 static void write_slab_journal_block(struct vdo_waiter *waiter, void *context)
739 {
740         struct pooled_vio *pooled = context;
741         struct vio *vio = &pooled->vio;
742         struct slab_journal *journal =
743                 container_of(waiter, struct slab_journal, resource_waiter);
744         struct slab_journal_block_header *header = &journal->tail_header;
745         int unused_entries = journal->entries_per_block - header->entry_count;
746         physical_block_number_t block_number;
747         const struct admin_state_code *operation;
748
749         header->head = journal->head;
750         list_add_tail(&pooled->list_entry, &journal->uncommitted_blocks);
751         vdo_pack_slab_journal_block_header(header, &journal->block->header);
752
753         /* Copy the tail block into the vio. */
754         memcpy(pooled->vio.data, journal->block, VDO_BLOCK_SIZE);
755
756         ASSERT_LOG_ONLY(unused_entries >= 0, "vdo_slab journal block is not overfull");
757         if (unused_entries > 0) {
758                 /*
759                  * Release the per-entry locks for any unused entries in the block we are about to
760                  * write.
761                  */
762                 adjust_slab_journal_block_reference(journal, header->sequence_number,
763                                                     -unused_entries);
764                 journal->partial_write_in_progress = !block_is_full(journal);
765         }
766
767         block_number = journal->slab->journal_origin +
768                 (header->sequence_number % journal->size);
769         vio->completion.parent = journal;
770
771         /*
772          * This block won't be read in recovery until the slab summary is updated to refer to it.
773          * The slab summary update does a flush which is sufficient to protect us from VDO-2331.
774          */
775         vdo_submit_metadata_vio(uds_forget(vio), block_number, write_slab_journal_endio,
776                                 complete_write, REQ_OP_WRITE);
777
778         /* Since the write is submitted, the tail block structure can be reused. */
779         journal->tail++;
780         initialize_tail_block(journal);
781         journal->waiting_to_commit = false;
782
783         operation = vdo_get_admin_state_code(&journal->slab->state);
784         if (operation == VDO_ADMIN_STATE_WAITING_FOR_RECOVERY) {
785                 vdo_finish_operation(&journal->slab->state,
786                                      (vdo_is_read_only(journal->slab->allocator->depot->vdo) ?
787                                       VDO_READ_ONLY : VDO_SUCCESS));
788                 return;
789         }
790
791         add_entries(journal);
792 }
793
794 /**
795  * commit_tail() - Commit the tail block of the slab journal.
796  * @journal: The journal whose tail block should be committed.
797  */
798 static void commit_tail(struct slab_journal *journal)
799 {
800         if ((journal->tail_header.entry_count == 0) && must_make_entries_to_flush(journal)) {
801                 /*
802                  * There are no entries at the moment, but there are some waiters, so defer
803                  * initiating the flush until those entries are ready to write.
804                  */
805                 return;
806         }
807
808         if (vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
809             journal->waiting_to_commit ||
810             (journal->tail_header.entry_count == 0)) {
811                 /*
812                  * There is nothing to do since the tail block is empty, or writing, or the journal
813                  * is in read-only mode.
814                  */
815                 return;
816         }
817
818         /*
819          * Since we are about to commit the tail block, this journal no longer needs to be on the
820          * ring of journals which the recovery journal might ask to commit.
821          */
822         mark_slab_journal_clean(journal);
823
824         journal->waiting_to_commit = true;
825
826         journal->resource_waiter.callback = write_slab_journal_block;
827         acquire_vio_from_pool(journal->slab->allocator->vio_pool,
828                               &journal->resource_waiter);
829 }
830
831 /**
832  * encode_slab_journal_entry() - Encode a slab journal entry.
833  * @tail_header: The unpacked header for the block.
834  * @payload: The journal block payload to hold the entry.
835  * @sbn: The slab block number of the entry to encode.
836  * @operation: The type of the entry.
837  * @increment: True if this is an increment.
838  *
839  * Exposed for unit tests.
840  */
841 static void encode_slab_journal_entry(struct slab_journal_block_header *tail_header,
842                                       slab_journal_payload *payload,
843                                       slab_block_number sbn,
844                                       enum journal_operation operation,
845                                       bool increment)
846 {
847         journal_entry_count_t entry_number = tail_header->entry_count++;
848
849         if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
850                 if (!tail_header->has_block_map_increments) {
851                         memset(payload->full_entries.entry_types, 0,
852                                VDO_SLAB_JOURNAL_ENTRY_TYPES_SIZE);
853                         tail_header->has_block_map_increments = true;
854                 }
855
856                 payload->full_entries.entry_types[entry_number / 8] |=
857                         ((u8)1 << (entry_number % 8));
858         }
859
860         vdo_pack_slab_journal_entry(&payload->entries[entry_number], sbn, increment);
861 }
862
863 /**
864  * expand_journal_point() - Convert a recovery journal journal_point which refers to both an
865  *                          increment and a decrement to a single point which refers to one or the
866  *                          other.
867  * @recovery_point: The journal point to convert.
868  * @increment: Whether the current entry is an increment.
869  *
870  * Return: The expanded journal point
871  *
872  * Because each data_vio has but a single recovery journal point, but may need to make both
873  * increment and decrement entries in the same slab journal. In order to distinguish the two
874  * entries, the entry count of the expanded journal point is twice the actual recovery journal
875  * entry count for increments, and one more than that for decrements.
876  */
877 static struct journal_point expand_journal_point(struct journal_point recovery_point,
878                                                  bool increment)
879 {
880         recovery_point.entry_count *= 2;
881         if (!increment)
882                 recovery_point.entry_count++;
883
884         return recovery_point;
885 }
886
887 /**
888  * add_entry() - Actually add an entry to the slab journal, potentially firing off a write if a
889  *               block becomes full.
890  * @journal: The slab journal to append to.
891  * @pbn: The pbn being adjusted.
892  * @operation: The type of entry to make.
893  * @increment: True if this is an increment.
894  * @recovery_point: The expanded recovery point.
895  *
896  * This function is synchronous.
897  */
898 static void add_entry(struct slab_journal *journal, physical_block_number_t pbn,
899                       enum journal_operation operation, bool increment,
900                       struct journal_point recovery_point)
901 {
902         struct packed_slab_journal_block *block = journal->block;
903         int result;
904
905         result = ASSERT(vdo_before_journal_point(&journal->tail_header.recovery_point,
906                                                  &recovery_point),
907                         "recovery journal point is monotonically increasing, recovery point: %llu.%u, block recovery point: %llu.%u",
908                         (unsigned long long) recovery_point.sequence_number,
909                         recovery_point.entry_count,
910                         (unsigned long long) journal->tail_header.recovery_point.sequence_number,
911                         journal->tail_header.recovery_point.entry_count);
912         if (result != VDO_SUCCESS) {
913                 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
914                 return;
915         }
916
917         if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
918                 result = ASSERT((journal->tail_header.entry_count <
919                                  journal->full_entries_per_block),
920                                 "block has room for full entries");
921                 if (result != VDO_SUCCESS) {
922                         vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo,
923                                                  result);
924                         return;
925                 }
926         }
927
928         encode_slab_journal_entry(&journal->tail_header, &block->payload,
929                                   pbn - journal->slab->start, operation, increment);
930         journal->tail_header.recovery_point = recovery_point;
931         if (block_is_full(journal))
932                 commit_tail(journal);
933 }
934
935 static inline block_count_t journal_length(const struct slab_journal *journal)
936 {
937         return journal->tail - journal->head;
938 }
939
940 /**
941  * vdo_attempt_replay_into_slab() - Replay a recovery journal entry into a slab's journal.
942  * @slab: The slab to play into.
943  * @pbn: The PBN for the entry.
944  * @operation: The type of entry to add.
945  * @increment: True if this entry is an increment.
946  * @recovery_point: The recovery journal point corresponding to this entry.
947  * @parent: The completion to notify when there is space to add the entry if the entry could not be
948  *          added immediately.
949  *
950  * Return: true if the entry was added immediately.
951  */
952 bool vdo_attempt_replay_into_slab(struct vdo_slab *slab, physical_block_number_t pbn,
953                                   enum journal_operation operation, bool increment,
954                                   struct journal_point *recovery_point,
955                                   struct vdo_completion *parent)
956 {
957         struct slab_journal *journal = &slab->journal;
958         struct slab_journal_block_header *header = &journal->tail_header;
959         struct journal_point expanded = expand_journal_point(*recovery_point, increment);
960
961         /* Only accept entries after the current recovery point. */
962         if (!vdo_before_journal_point(&journal->tail_header.recovery_point, &expanded))
963                 return true;
964
965         if ((header->entry_count >= journal->full_entries_per_block) &&
966             (header->has_block_map_increments || (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING))) {
967                 /*
968                  * The tail block does not have room for the entry we are attempting to add so
969                  * commit the tail block now.
970                  */
971                 commit_tail(journal);
972         }
973
974         if (journal->waiting_to_commit) {
975                 vdo_start_operation_with_waiter(&journal->slab->state,
976                                                 VDO_ADMIN_STATE_WAITING_FOR_RECOVERY,
977                                                 parent, NULL);
978                 return false;
979         }
980
981         if (journal_length(journal) >= journal->size) {
982                 /*
983                  * We must have reaped the current head before the crash, since the blocked
984                  * threshold keeps us from having more entries than fit in a slab journal; hence we
985                  * can just advance the head (and unreapable block), as needed.
986                  */
987                 journal->head++;
988                 journal->unreapable++;
989         }
990
991         if (journal->slab->status == VDO_SLAB_REBUILT)
992                 journal->slab->status = VDO_SLAB_REPLAYING;
993
994         add_entry(journal, pbn, operation, increment, expanded);
995         return true;
996 }
997
998 /**
999  * requires_reaping() - Check whether the journal must be reaped before adding new entries.
1000  * @journal: The journal to check.
1001  *
1002  * Return: true if the journal must be reaped.
1003  */
1004 static bool requires_reaping(const struct slab_journal *journal)
1005 {
1006         return (journal_length(journal) >= journal->blocking_threshold);
1007 }
1008
1009 /** finish_summary_update() - A waiter callback that resets the writing state of a slab. */
1010 static void finish_summary_update(struct vdo_waiter *waiter, void *context)
1011 {
1012         struct vdo_slab *slab = container_of(waiter, struct vdo_slab, summary_waiter);
1013         int result = *((int *) context);
1014
1015         slab->active_count--;
1016
1017         if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) {
1018                 uds_log_error_strerror(result, "failed to update slab summary");
1019                 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1020         }
1021
1022         check_if_slab_drained(slab);
1023 }
1024
1025 static void write_reference_block(struct vdo_waiter *waiter, void *context);
1026
1027 /**
1028  * launch_reference_block_write() - Launch the write of a dirty reference block by first acquiring
1029  *                                  a VIO for it from the pool.
1030  * @waiter: The waiter of the block which is starting to write.
1031  * @context: The parent slab of the block.
1032  *
1033  * This can be asynchronous since the writer will have to wait if all VIOs in the pool are
1034  * currently in use.
1035  */
1036 static void launch_reference_block_write(struct vdo_waiter *waiter, void *context)
1037 {
1038         struct vdo_slab *slab = context;
1039
1040         if (vdo_is_read_only(slab->allocator->depot->vdo))
1041                 return;
1042
1043         slab->active_count++;
1044         container_of(waiter, struct reference_block, waiter)->is_writing = true;
1045         waiter->callback = write_reference_block;
1046         acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
1047 }
1048
1049 static void save_dirty_reference_blocks(struct vdo_slab *slab)
1050 {
1051         vdo_waitq_notify_all_waiters(&slab->dirty_blocks,
1052                                      launch_reference_block_write, slab);
1053         check_if_slab_drained(slab);
1054 }
1055
1056 /**
1057  * finish_reference_block_write() - After a reference block has written, clean it, release its
1058  *                                  locks, and return its VIO to the pool.
1059  * @completion: The VIO that just finished writing.
1060  */
1061 static void finish_reference_block_write(struct vdo_completion *completion)
1062 {
1063         struct vio *vio = as_vio(completion);
1064         struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1065         struct reference_block *block = completion->parent;
1066         struct vdo_slab *slab = block->slab;
1067         tail_block_offset_t offset;
1068
1069         slab->active_count--;
1070
1071         /* Release the slab journal lock. */
1072         adjust_slab_journal_block_reference(&slab->journal,
1073                                             block->slab_journal_lock_to_release, -1);
1074         return_vio_to_pool(slab->allocator->vio_pool, pooled);
1075
1076         /*
1077          * We can't clear the is_writing flag earlier as releasing the slab journal lock may cause
1078          * us to be dirtied again, but we don't want to double enqueue.
1079          */
1080         block->is_writing = false;
1081
1082         if (vdo_is_read_only(completion->vdo)) {
1083                 check_if_slab_drained(slab);
1084                 return;
1085         }
1086
1087         /* Re-queue the block if it was re-dirtied while it was writing. */
1088         if (block->is_dirty) {
1089                 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1090                 if (vdo_is_state_draining(&slab->state)) {
1091                         /* We must be saving, and this block will otherwise not be relaunched. */
1092                         save_dirty_reference_blocks(slab);
1093                 }
1094
1095                 return;
1096         }
1097
1098         /*
1099          * Mark the slab as clean in the slab summary if there are no dirty or writing blocks
1100          * and no summary update in progress.
1101          */
1102         if ((slab->active_count > 0) || vdo_waitq_has_waiters(&slab->dirty_blocks)) {
1103                 check_if_slab_drained(slab);
1104                 return;
1105         }
1106
1107         offset = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
1108         slab->active_count++;
1109         slab->summary_waiter.callback = finish_summary_update;
1110         update_slab_summary_entry(slab, &slab->summary_waiter, offset,
1111                                   true, true, slab->free_blocks);
1112 }
1113
1114 /**
1115  * get_reference_counters_for_block() - Find the reference counters for a given block.
1116  * @block: The reference_block in question.
1117  *
1118  * Return: A pointer to the reference counters for this block.
1119  */
1120 static vdo_refcount_t * __must_check get_reference_counters_for_block(struct reference_block *block)
1121 {
1122         size_t block_index = block - block->slab->reference_blocks;
1123
1124         return &block->slab->counters[block_index * COUNTS_PER_BLOCK];
1125 }
1126
1127 /**
1128  * pack_reference_block() - Copy data from a reference block to a buffer ready to be written out.
1129  * @block: The block to copy.
1130  * @buffer: The char buffer to fill with the packed block.
1131  */
1132 static void pack_reference_block(struct reference_block *block, void *buffer)
1133 {
1134         struct packed_reference_block *packed = buffer;
1135         vdo_refcount_t *counters = get_reference_counters_for_block(block);
1136         sector_count_t i;
1137         struct packed_journal_point commit_point;
1138
1139         vdo_pack_journal_point(&block->slab->slab_journal_point, &commit_point);
1140
1141         for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
1142                 packed->sectors[i].commit_point = commit_point;
1143                 memcpy(packed->sectors[i].counts, counters + (i * COUNTS_PER_SECTOR),
1144                        (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
1145         }
1146 }
1147
1148 static void write_reference_block_endio(struct bio *bio)
1149 {
1150         struct vio *vio = bio->bi_private;
1151         struct reference_block *block = vio->completion.parent;
1152         thread_id_t thread_id = block->slab->allocator->thread_id;
1153
1154         continue_vio_after_io(vio, finish_reference_block_write, thread_id);
1155 }
1156
1157 /**
1158  * handle_io_error() - Handle an I/O error reading or writing a reference count block.
1159  * @completion: The VIO doing the I/O as a completion.
1160  */
1161 static void handle_io_error(struct vdo_completion *completion)
1162 {
1163         int result = completion->result;
1164         struct vio *vio = as_vio(completion);
1165         struct vdo_slab *slab = ((struct reference_block *) completion->parent)->slab;
1166
1167         vio_record_metadata_io_error(vio);
1168         return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio));
1169         slab->active_count--;
1170         vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1171         check_if_slab_drained(slab);
1172 }
1173
1174 /**
1175  * write_reference_block() - After a dirty block waiter has gotten a VIO from the VIO pool, copy
1176  *                           its counters and associated data into the VIO, and launch the write.
1177  * @waiter: The waiter of the dirty block.
1178  * @context: The VIO returned by the pool.
1179  */
1180 static void write_reference_block(struct vdo_waiter *waiter, void *context)
1181 {
1182         size_t block_offset;
1183         physical_block_number_t pbn;
1184         struct pooled_vio *pooled = context;
1185         struct vdo_completion *completion = &pooled->vio.completion;
1186         struct reference_block *block = container_of(waiter, struct reference_block,
1187                                                      waiter);
1188
1189         pack_reference_block(block, pooled->vio.data);
1190         block_offset = (block - block->slab->reference_blocks);
1191         pbn = (block->slab->ref_counts_origin + block_offset);
1192         block->slab_journal_lock_to_release = block->slab_journal_lock;
1193         completion->parent = block;
1194
1195         /*
1196          * Mark the block as clean, since we won't be committing any updates that happen after this
1197          * moment. As long as VIO order is preserved, two VIOs updating this block at once will not
1198          * cause complications.
1199          */
1200         block->is_dirty = false;
1201
1202         /*
1203          * Flush before writing to ensure that the recovery journal and slab journal entries which
1204          * cover this reference update are stable (VDO-2331).
1205          */
1206         WRITE_ONCE(block->slab->allocator->ref_counts_statistics.blocks_written,
1207                    block->slab->allocator->ref_counts_statistics.blocks_written + 1);
1208
1209         completion->callback_thread_id = ((struct block_allocator *) pooled->context)->thread_id;
1210         vdo_submit_metadata_vio(&pooled->vio, pbn, write_reference_block_endio,
1211                                 handle_io_error, REQ_OP_WRITE | REQ_PREFLUSH);
1212 }
1213
1214 static void reclaim_journal_space(struct slab_journal *journal)
1215 {
1216         block_count_t length = journal_length(journal);
1217         struct vdo_slab *slab = journal->slab;
1218         block_count_t write_count = vdo_waitq_num_waiters(&slab->dirty_blocks);
1219         block_count_t written;
1220
1221         if ((length < journal->flushing_threshold) || (write_count == 0))
1222                 return;
1223
1224         /* The slab journal is over the first threshold, schedule some reference block writes. */
1225         WRITE_ONCE(journal->events->flush_count, journal->events->flush_count + 1);
1226         if (length < journal->flushing_deadline) {
1227                 /* Schedule more writes the closer to the deadline we get. */
1228                 write_count /= journal->flushing_deadline - length + 1;
1229                 write_count = max_t(block_count_t, write_count, 1);
1230         }
1231
1232         for (written = 0; written < write_count; written++) {
1233                 vdo_waitq_notify_next_waiter(&slab->dirty_blocks,
1234                                              launch_reference_block_write, slab);
1235         }
1236 }
1237
1238 /**
1239  * reference_count_to_status() - Convert a reference count to a reference status.
1240  * @count: The count to convert.
1241  *
1242  * Return: The appropriate reference status.
1243  */
1244 static enum reference_status __must_check reference_count_to_status(vdo_refcount_t count)
1245 {
1246         if (count == EMPTY_REFERENCE_COUNT)
1247                 return RS_FREE;
1248         else if (count == 1)
1249                 return RS_SINGLE;
1250         else if (count == PROVISIONAL_REFERENCE_COUNT)
1251                 return RS_PROVISIONAL;
1252         else
1253                 return RS_SHARED;
1254 }
1255
1256 /**
1257  * dirty_block() - Mark a reference count block as dirty, potentially adding it to the dirty queue
1258  *                 if it wasn't already dirty.
1259  * @block: The reference block to mark as dirty.
1260  */
1261 static void dirty_block(struct reference_block *block)
1262 {
1263         if (block->is_dirty)
1264                 return;
1265
1266         block->is_dirty = true;
1267         if (!block->is_writing)
1268                 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1269 }
1270
1271 /**
1272  * get_reference_block() - Get the reference block that covers the given block index.
1273  */
1274 static struct reference_block * __must_check get_reference_block(struct vdo_slab *slab,
1275                                                                  slab_block_number index)
1276 {
1277         return &slab->reference_blocks[index / COUNTS_PER_BLOCK];
1278 }
1279
1280 /**
1281  * slab_block_number_from_pbn() - Determine the index within the slab of a particular physical
1282  *                                block number.
1283  * @slab: The slab.
1284  * @physical_block_number: The physical block number.
1285  * @slab_block_number_ptr: A pointer to the slab block number.
1286  *
1287  * Return: VDO_SUCCESS or an error code.
1288  */
1289 static int __must_check slab_block_number_from_pbn(struct vdo_slab *slab,
1290                                                    physical_block_number_t pbn,
1291                                                    slab_block_number *slab_block_number_ptr)
1292 {
1293         u64 slab_block_number;
1294
1295         if (pbn < slab->start)
1296                 return VDO_OUT_OF_RANGE;
1297
1298         slab_block_number = pbn - slab->start;
1299         if (slab_block_number >= slab->allocator->depot->slab_config.data_blocks)
1300                 return VDO_OUT_OF_RANGE;
1301
1302         *slab_block_number_ptr = slab_block_number;
1303         return VDO_SUCCESS;
1304 }
1305
1306 /**
1307  * get_reference_counter() - Get the reference counter that covers the given physical block number.
1308  * @slab: The slab to query.
1309  * @pbn: The physical block number.
1310  * @counter_ptr: A pointer to the reference counter.
1311  */
1312 static int __must_check get_reference_counter(struct vdo_slab *slab,
1313                                               physical_block_number_t pbn,
1314                                               vdo_refcount_t **counter_ptr)
1315 {
1316         slab_block_number index;
1317         int result = slab_block_number_from_pbn(slab, pbn, &index);
1318
1319         if (result != VDO_SUCCESS)
1320                 return result;
1321
1322         *counter_ptr = &slab->counters[index];
1323
1324         return VDO_SUCCESS;
1325 }
1326
1327 static unsigned int calculate_slab_priority(struct vdo_slab *slab)
1328 {
1329         block_count_t free_blocks = slab->free_blocks;
1330         unsigned int unopened_slab_priority = slab->allocator->unopened_slab_priority;
1331         unsigned int priority;
1332
1333         /*
1334          * Wholly full slabs must be the only ones with lowest priority, 0.
1335          *
1336          * Slabs that have never been opened (empty, newly initialized, and never been written to)
1337          * have lower priority than previously opened slabs that have a significant number of free
1338          * blocks. This ranking causes VDO to avoid writing physical blocks for the first time
1339          * unless there are very few free blocks that have been previously written to.
1340          *
1341          * Since VDO doesn't discard blocks currently, reusing previously written blocks makes VDO
1342          * a better client of any underlying storage that is thinly-provisioned (though discarding
1343          * would be better).
1344          *
1345          * For all other slabs, the priority is derived from the logarithm of the number of free
1346          * blocks. Slabs with the same order of magnitude of free blocks have the same priority.
1347          * With 2^23 blocks, the priority will range from 1 to 25. The reserved
1348          * unopened_slab_priority divides the range and is skipped by the logarithmic mapping.
1349          */
1350
1351         if (free_blocks == 0)
1352                 return 0;
1353
1354         if (is_slab_journal_blank(slab))
1355                 return unopened_slab_priority;
1356
1357         priority = (1 + ilog2(free_blocks));
1358         return ((priority < unopened_slab_priority) ? priority : priority + 1);
1359 }
1360
1361 /*
1362  * Slabs are essentially prioritized by an approximation of the number of free blocks in the slab
1363  * so slabs with lots of free blocks with be opened for allocation before slabs that have few free
1364  * blocks.
1365  */
1366 static void prioritize_slab(struct vdo_slab *slab)
1367 {
1368         ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
1369                         "a slab must not already be on a ring when prioritizing");
1370         slab->priority = calculate_slab_priority(slab);
1371         vdo_priority_table_enqueue(slab->allocator->prioritized_slabs,
1372                                    slab->priority, &slab->allocq_entry);
1373 }
1374
1375 /**
1376  * adjust_free_block_count() - Adjust the free block count and (if needed) reprioritize the slab.
1377  * @increment: should be true if the free block count went up.
1378  */
1379 static void adjust_free_block_count(struct vdo_slab *slab, bool increment)
1380 {
1381         struct block_allocator *allocator = slab->allocator;
1382
1383         WRITE_ONCE(allocator->allocated_blocks,
1384                    allocator->allocated_blocks + (increment ? -1 : 1));
1385
1386         /* The open slab doesn't need to be reprioritized until it is closed. */
1387         if (slab == allocator->open_slab)
1388                 return;
1389
1390         /* Don't bother adjusting the priority table if unneeded. */
1391         if (slab->priority == calculate_slab_priority(slab))
1392                 return;
1393
1394         /*
1395          * Reprioritize the slab to reflect the new free block count by removing it from the table
1396          * and re-enqueuing it with the new priority.
1397          */
1398         vdo_priority_table_remove(allocator->prioritized_slabs, &slab->allocq_entry);
1399         prioritize_slab(slab);
1400 }
1401
1402 /**
1403  * increment_for_data() - Increment the reference count for a data block.
1404  * @slab: The slab which owns the block.
1405  * @block: The reference block which contains the block being updated.
1406  * @block_number: The block to update.
1407  * @old_status: The reference status of the data block before this increment.
1408  * @lock: The pbn_lock associated with this increment (may be NULL).
1409  * @counter_ptr: A pointer to the count for the data block (in, out).
1410  * @adjust_block_count: Whether to update the allocator's free block count.
1411  *
1412  * Return: VDO_SUCCESS or an error.
1413  */
1414 static int increment_for_data(struct vdo_slab *slab, struct reference_block *block,
1415                               slab_block_number block_number,
1416                               enum reference_status old_status,
1417                               struct pbn_lock *lock, vdo_refcount_t *counter_ptr,
1418                               bool adjust_block_count)
1419 {
1420         switch (old_status) {
1421         case RS_FREE:
1422                 *counter_ptr = 1;
1423                 block->allocated_count++;
1424                 slab->free_blocks--;
1425                 if (adjust_block_count)
1426                         adjust_free_block_count(slab, false);
1427
1428                 break;
1429
1430         case RS_PROVISIONAL:
1431                 *counter_ptr = 1;
1432                 break;
1433
1434         default:
1435                 /* Single or shared */
1436                 if (*counter_ptr >= MAXIMUM_REFERENCE_COUNT) {
1437                         return uds_log_error_strerror(VDO_REF_COUNT_INVALID,
1438                                                       "Incrementing a block already having 254 references (slab %u, offset %u)",
1439                                                       slab->slab_number, block_number);
1440                 }
1441                 (*counter_ptr)++;
1442         }
1443
1444         if (lock != NULL)
1445                 vdo_unassign_pbn_lock_provisional_reference(lock);
1446         return VDO_SUCCESS;
1447 }
1448
1449 /**
1450  * decrement_for_data() - Decrement the reference count for a data block.
1451  * @slab: The slab which owns the block.
1452  * @block: The reference block which contains the block being updated.
1453  * @block_number: The block to update.
1454  * @old_status: The reference status of the data block before this decrement.
1455  * @updater: The reference updater doing this operation in case we need to look up the pbn lock.
1456  * @lock: The pbn_lock associated with the block being decremented (may be NULL).
1457  * @counter_ptr: A pointer to the count for the data block (in, out).
1458  * @adjust_block_count: Whether to update the allocator's free block count.
1459  *
1460  * Return: VDO_SUCCESS or an error.
1461  */
1462 static int decrement_for_data(struct vdo_slab *slab, struct reference_block *block,
1463                               slab_block_number block_number,
1464                               enum reference_status old_status,
1465                               struct reference_updater *updater,
1466                               vdo_refcount_t *counter_ptr, bool adjust_block_count)
1467 {
1468         switch (old_status) {
1469         case RS_FREE:
1470                 return uds_log_error_strerror(VDO_REF_COUNT_INVALID,
1471                                               "Decrementing free block at offset %u in slab %u",
1472                                               block_number, slab->slab_number);
1473
1474         case RS_PROVISIONAL:
1475         case RS_SINGLE:
1476                 if (updater->zpbn.zone != NULL) {
1477                         struct pbn_lock *lock = vdo_get_physical_zone_pbn_lock(updater->zpbn.zone,
1478                                                                                updater->zpbn.pbn);
1479
1480                         if (lock != NULL) {
1481                                 /*
1482                                  * There is a read lock on this block, so the block must not become
1483                                  * unreferenced.
1484                                  */
1485                                 *counter_ptr = PROVISIONAL_REFERENCE_COUNT;
1486                                 vdo_assign_pbn_lock_provisional_reference(lock);
1487                                 break;
1488                         }
1489                 }
1490
1491                 *counter_ptr = EMPTY_REFERENCE_COUNT;
1492                 block->allocated_count--;
1493                 slab->free_blocks++;
1494                 if (adjust_block_count)
1495                         adjust_free_block_count(slab, true);
1496
1497                 break;
1498
1499         default:
1500                 /* Shared */
1501                 (*counter_ptr)--;
1502         }
1503
1504         return VDO_SUCCESS;
1505 }
1506
1507 /**
1508  * increment_for_block_map() - Increment the reference count for a block map page.
1509  * @slab: The slab which owns the block.
1510  * @block: The reference block which contains the block being updated.
1511  * @block_number: The block to update.
1512  * @old_status: The reference status of the block before this increment.
1513  * @lock: The pbn_lock associated with this increment (may be NULL).
1514  * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1515  * @counter_ptr: A pointer to the count for the block (in, out).
1516  * @adjust_block_count: Whether to update the allocator's free block count.
1517  *
1518  * All block map increments should be from provisional to MAXIMUM_REFERENCE_COUNT. Since block map
1519  * blocks never dedupe they should never be adjusted from any other state. The adjustment always
1520  * results in MAXIMUM_REFERENCE_COUNT as this value is used to prevent dedupe against block map
1521  * blocks.
1522  *
1523  * Return: VDO_SUCCESS or an error.
1524  */
1525 static int increment_for_block_map(struct vdo_slab *slab, struct reference_block *block,
1526                                    slab_block_number block_number,
1527                                    enum reference_status old_status,
1528                                    struct pbn_lock *lock, bool normal_operation,
1529                                    vdo_refcount_t *counter_ptr, bool adjust_block_count)
1530 {
1531         switch (old_status) {
1532         case RS_FREE:
1533                 if (normal_operation) {
1534                         return uds_log_error_strerror(VDO_REF_COUNT_INVALID,
1535                                                       "Incrementing unallocated block map block (slab %u, offset %u)",
1536                                                       slab->slab_number, block_number);
1537                 }
1538
1539                 *counter_ptr = MAXIMUM_REFERENCE_COUNT;
1540                 block->allocated_count++;
1541                 slab->free_blocks--;
1542                 if (adjust_block_count)
1543                         adjust_free_block_count(slab, false);
1544
1545                 return VDO_SUCCESS;
1546
1547         case RS_PROVISIONAL:
1548                 if (!normal_operation)
1549                         return uds_log_error_strerror(VDO_REF_COUNT_INVALID,
1550                                                       "Block map block had provisional reference during replay (slab %u, offset %u)",
1551                                                       slab->slab_number, block_number);
1552
1553                 *counter_ptr = MAXIMUM_REFERENCE_COUNT;
1554                 if (lock != NULL)
1555                         vdo_unassign_pbn_lock_provisional_reference(lock);
1556                 return VDO_SUCCESS;
1557
1558         default:
1559                 return uds_log_error_strerror(VDO_REF_COUNT_INVALID,
1560                                               "Incrementing a block map block which is already referenced %u times (slab %u, offset %u)",
1561                                               *counter_ptr, slab->slab_number,
1562                                               block_number);
1563         }
1564 }
1565
1566 static bool __must_check is_valid_journal_point(const struct journal_point *point)
1567 {
1568         return ((point != NULL) && (point->sequence_number > 0));
1569 }
1570
1571 /**
1572  * update_reference_count() - Update the reference count of a block.
1573  * @slab: The slab which owns the block.
1574  * @block: The reference block which contains the block being updated.
1575  * @block_number: The block to update.
1576  * @slab_journal_point: The slab journal point at which this update is journaled.
1577  * @updater: The reference updater.
1578  * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1579  * @adjust_block_count: Whether to update the slab's free block count.
1580  * @provisional_decrement_ptr: A pointer which will be set to true if this update was a decrement
1581  *                             of a provisional reference.
1582  *
1583  * Return: VDO_SUCCESS or an error.
1584  */
1585 static int update_reference_count(struct vdo_slab *slab, struct reference_block *block,
1586                                   slab_block_number block_number,
1587                                   const struct journal_point *slab_journal_point,
1588                                   struct reference_updater *updater,
1589                                   bool normal_operation, bool adjust_block_count,
1590                                   bool *provisional_decrement_ptr)
1591 {
1592         vdo_refcount_t *counter_ptr = &slab->counters[block_number];
1593         enum reference_status old_status = reference_count_to_status(*counter_ptr);
1594         int result;
1595
1596         if (!updater->increment) {
1597                 result = decrement_for_data(slab, block, block_number, old_status,
1598                                             updater, counter_ptr, adjust_block_count);
1599                 if ((result == VDO_SUCCESS) && (old_status == RS_PROVISIONAL)) {
1600                         if (provisional_decrement_ptr != NULL)
1601                                 *provisional_decrement_ptr = true;
1602                         return VDO_SUCCESS;
1603                 }
1604         } else if (updater->operation == VDO_JOURNAL_DATA_REMAPPING) {
1605                 result = increment_for_data(slab, block, block_number, old_status,
1606                                             updater->lock, counter_ptr, adjust_block_count);
1607         } else {
1608                 result = increment_for_block_map(slab, block, block_number, old_status,
1609                                                  updater->lock, normal_operation,
1610                                                  counter_ptr, adjust_block_count);
1611         }
1612
1613         if (result != VDO_SUCCESS)
1614                 return result;
1615
1616         if (is_valid_journal_point(slab_journal_point))
1617                 slab->slab_journal_point = *slab_journal_point;
1618
1619         return VDO_SUCCESS;
1620 }
1621
1622 static int __must_check adjust_reference_count(struct vdo_slab *slab,
1623                                                struct reference_updater *updater,
1624                                                const struct journal_point *slab_journal_point)
1625 {
1626         slab_block_number block_number;
1627         int result;
1628         struct reference_block *block;
1629         bool provisional_decrement = false;
1630
1631         if (!is_slab_open(slab))
1632                 return VDO_INVALID_ADMIN_STATE;
1633
1634         result = slab_block_number_from_pbn(slab, updater->zpbn.pbn, &block_number);
1635         if (result != VDO_SUCCESS)
1636                 return result;
1637
1638         block = get_reference_block(slab, block_number);
1639         result = update_reference_count(slab, block, block_number, slab_journal_point,
1640                                         updater, NORMAL_OPERATION, true,
1641                                         &provisional_decrement);
1642         if ((result != VDO_SUCCESS) || provisional_decrement)
1643                 return result;
1644
1645         if (block->is_dirty && (block->slab_journal_lock > 0)) {
1646                 sequence_number_t entry_lock = slab_journal_point->sequence_number;
1647                 /*
1648                  * This block is already dirty and a slab journal entry has been made for it since
1649                  * the last time it was clean. We must release the per-entry slab journal lock for
1650                  * the entry associated with the update we are now doing.
1651                  */
1652                 result = ASSERT(is_valid_journal_point(slab_journal_point),
1653                                 "Reference count adjustments need slab journal points.");
1654                 if (result != VDO_SUCCESS)
1655                         return result;
1656
1657                 adjust_slab_journal_block_reference(&slab->journal, entry_lock, -1);
1658                 return VDO_SUCCESS;
1659         }
1660
1661         /*
1662          * This may be the first time we are applying an update for which there is a slab journal
1663          * entry to this block since the block was cleaned. Therefore, we convert the per-entry
1664          * slab journal lock to an uncommitted reference block lock, if there is a per-entry lock.
1665          */
1666         if (is_valid_journal_point(slab_journal_point))
1667                 block->slab_journal_lock = slab_journal_point->sequence_number;
1668         else
1669                 block->slab_journal_lock = 0;
1670
1671         dirty_block(block);
1672         return VDO_SUCCESS;
1673 }
1674
1675 /**
1676  * add_entry_from_waiter() - Add an entry to the slab journal.
1677  * @waiter: The vio which should make an entry now.
1678  * @context: The slab journal to make an entry in.
1679  *
1680  * This callback is invoked by add_entries() once it has determined that we are ready to make
1681  * another entry in the slab journal. Implements waiter_callback_fn.
1682  */
1683 static void add_entry_from_waiter(struct vdo_waiter *waiter, void *context)
1684 {
1685         int result;
1686         struct reference_updater *updater =
1687                 container_of(waiter, struct reference_updater, waiter);
1688         struct data_vio *data_vio = data_vio_from_reference_updater(updater);
1689         struct slab_journal *journal = context;
1690         struct slab_journal_block_header *header = &journal->tail_header;
1691         struct journal_point slab_journal_point = {
1692                 .sequence_number = header->sequence_number,
1693                 .entry_count = header->entry_count,
1694         };
1695         sequence_number_t recovery_block = data_vio->recovery_journal_point.sequence_number;
1696
1697         if (header->entry_count == 0) {
1698                 /*
1699                  * This is the first entry in the current tail block, so get a lock on the recovery
1700                  * journal which we will hold until this tail block is committed.
1701                  */
1702                 get_lock(journal, header->sequence_number)->recovery_start = recovery_block;
1703                 if (journal->recovery_journal != NULL) {
1704                         zone_count_t zone_number = journal->slab->allocator->zone_number;
1705
1706                         vdo_acquire_recovery_journal_block_reference(journal->recovery_journal,
1707                                                                      recovery_block,
1708                                                                      VDO_ZONE_TYPE_PHYSICAL,
1709                                                                      zone_number);
1710                 }
1711
1712                 mark_slab_journal_dirty(journal, recovery_block);
1713                 reclaim_journal_space(journal);
1714         }
1715
1716         add_entry(journal, updater->zpbn.pbn, updater->operation, updater->increment,
1717                   expand_journal_point(data_vio->recovery_journal_point,
1718                                        updater->increment));
1719
1720         if (journal->slab->status != VDO_SLAB_REBUILT) {
1721                 /*
1722                  * If the slab is unrecovered, scrubbing will take care of the count since the
1723                  * update is now recorded in the journal.
1724                  */
1725                 adjust_slab_journal_block_reference(journal,
1726                                                     slab_journal_point.sequence_number, -1);
1727                 result = VDO_SUCCESS;
1728         } else {
1729                 /* Now that an entry has been made in the slab journal, update the counter. */
1730                 result = adjust_reference_count(journal->slab, updater,
1731                                                 &slab_journal_point);
1732         }
1733
1734         if (updater->increment)
1735                 continue_data_vio_with_error(data_vio, result);
1736         else
1737                 vdo_continue_completion(&data_vio->decrement_completion, result);
1738 }
1739
1740 /**
1741  * is_next_entry_a_block_map_increment() - Check whether the next entry to be made is a block map
1742  *                                         increment.
1743  * @journal: The journal.
1744  *
1745  * Return: true if the first entry waiter's operation is a block map increment.
1746  */
1747 static inline bool is_next_entry_a_block_map_increment(struct slab_journal *journal)
1748 {
1749         struct vdo_waiter *waiter = vdo_waitq_get_first_waiter(&journal->entry_waiters);
1750         struct reference_updater *updater = container_of(waiter,
1751                                                          struct reference_updater,
1752                                                          waiter);
1753
1754         return (updater->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING);
1755 }
1756
1757 /**
1758  * add_entries() - Add as many entries as possible from the queue of vios waiting to make entries.
1759  * @journal: The journal to which entries may be added.
1760  *
1761  * By processing the queue in order, we ensure that slab journal entries are made in the same order
1762  * as recovery journal entries for the same increment or decrement.
1763  */
1764 static void add_entries(struct slab_journal *journal)
1765 {
1766         if (journal->adding_entries) {
1767                 /* Protect against re-entrancy. */
1768                 return;
1769         }
1770
1771         journal->adding_entries = true;
1772         while (vdo_waitq_has_waiters(&journal->entry_waiters)) {
1773                 struct slab_journal_block_header *header = &journal->tail_header;
1774
1775                 if (journal->partial_write_in_progress ||
1776                     (journal->slab->status == VDO_SLAB_REBUILDING)) {
1777                         /*
1778                          * Don't add entries while rebuilding or while a partial write is
1779                          * outstanding (VDO-2399).
1780                          */
1781                         break;
1782                 }
1783
1784                 if (journal->waiting_to_commit) {
1785                         /*
1786                          * If we are waiting for resources to write the tail block, and the tail
1787                          * block is full, we can't make another entry.
1788                          */
1789                         WRITE_ONCE(journal->events->tail_busy_count,
1790                                    journal->events->tail_busy_count + 1);
1791                         break;
1792                 } else if (is_next_entry_a_block_map_increment(journal) &&
1793                            (header->entry_count >= journal->full_entries_per_block)) {
1794                         /*
1795                          * The tail block does not have room for a block map increment, so commit
1796                          * it now.
1797                          */
1798                         commit_tail(journal);
1799                         if (journal->waiting_to_commit) {
1800                                 WRITE_ONCE(journal->events->tail_busy_count,
1801                                            journal->events->tail_busy_count + 1);
1802                                 break;
1803                         }
1804                 }
1805
1806                 /* If the slab is over the blocking threshold, make the vio wait. */
1807                 if (requires_reaping(journal)) {
1808                         WRITE_ONCE(journal->events->blocked_count,
1809                                    journal->events->blocked_count + 1);
1810                         save_dirty_reference_blocks(journal->slab);
1811                         break;
1812                 }
1813
1814                 if (header->entry_count == 0) {
1815                         struct journal_lock *lock =
1816                                 get_lock(journal, header->sequence_number);
1817
1818                         /*
1819                          * Check if the on disk slab journal is full. Because of the blocking and
1820                          * scrubbing thresholds, this should never happen.
1821                          */
1822                         if (lock->count > 0) {
1823                                 ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail,
1824                                                 "New block has locks, but journal is not full");
1825
1826                                 /*
1827                                  * The blocking threshold must let the journal fill up if the new
1828                                  * block has locks; if the blocking threshold is smaller than the
1829                                  * journal size, the new block cannot possibly have locks already.
1830                                  */
1831                                 ASSERT_LOG_ONLY((journal->blocking_threshold >= journal->size),
1832                                                 "New block can have locks already iff blocking threshold is at the end of the journal");
1833
1834                                 WRITE_ONCE(journal->events->disk_full_count,
1835                                            journal->events->disk_full_count + 1);
1836                                 save_dirty_reference_blocks(journal->slab);
1837                                 break;
1838                         }
1839
1840                         /*
1841                          * Don't allow the new block to be reaped until all of the reference count
1842                          * blocks are written and the journal block has been fully committed as
1843                          * well.
1844                          */
1845                         lock->count = journal->entries_per_block + 1;
1846
1847                         if (header->sequence_number == 1) {
1848                                 struct vdo_slab *slab = journal->slab;
1849                                 block_count_t i;
1850
1851                                 /*
1852                                  * This is the first entry in this slab journal, ever. Dirty all of
1853                                  * the reference count blocks. Each will acquire a lock on the tail
1854                                  * block so that the journal won't be reaped until the reference
1855                                  * counts are initialized. The lock acquisition must be done by the
1856                                  * ref_counts since here we don't know how many reference blocks
1857                                  * the ref_counts has.
1858                                  */
1859                                 for (i = 0; i < slab->reference_block_count; i++) {
1860                                         slab->reference_blocks[i].slab_journal_lock = 1;
1861                                         dirty_block(&slab->reference_blocks[i]);
1862                                 }
1863
1864                                 adjust_slab_journal_block_reference(journal, 1,
1865                                                                     slab->reference_block_count);
1866                         }
1867                 }
1868
1869                 vdo_waitq_notify_next_waiter(&journal->entry_waiters,
1870                                              add_entry_from_waiter, journal);
1871         }
1872
1873         journal->adding_entries = false;
1874
1875         /* If there are no waiters, and we are flushing or saving, commit the tail block. */
1876         if (vdo_is_state_draining(&journal->slab->state) &&
1877             !vdo_is_state_suspending(&journal->slab->state) &&
1878             !vdo_waitq_has_waiters(&journal->entry_waiters))
1879                 commit_tail(journal);
1880 }
1881
1882 /**
1883  * reset_search_cursor() - Reset the free block search back to the first reference counter in the
1884  *                         first reference block of a slab.
1885  */
1886 static void reset_search_cursor(struct vdo_slab *slab)
1887 {
1888         struct search_cursor *cursor = &slab->search_cursor;
1889
1890         cursor->block = cursor->first_block;
1891         cursor->index = 0;
1892         /* Unit tests have slabs with only one reference block (and it's a runt). */
1893         cursor->end_index = min_t(u32, COUNTS_PER_BLOCK, slab->block_count);
1894 }
1895
1896 /**
1897  * advance_search_cursor() - Advance the search cursor to the start of the next reference block in
1898  *                           a slab,
1899  *
1900  * Wraps around to the first reference block if the current block is the last reference block.
1901  *
1902  * Return: true unless the cursor was at the last reference block.
1903  */
1904 static bool advance_search_cursor(struct vdo_slab *slab)
1905 {
1906         struct search_cursor *cursor = &slab->search_cursor;
1907
1908         /*
1909          * If we just finished searching the last reference block, then wrap back around to the
1910          * start of the array.
1911          */
1912         if (cursor->block == cursor->last_block) {
1913                 reset_search_cursor(slab);
1914                 return false;
1915         }
1916
1917         /* We're not already at the end, so advance to cursor to the next block. */
1918         cursor->block++;
1919         cursor->index = cursor->end_index;
1920
1921         if (cursor->block == cursor->last_block) {
1922                 /* The last reference block will usually be a runt. */
1923                 cursor->end_index = slab->block_count;
1924         } else {
1925                 cursor->end_index += COUNTS_PER_BLOCK;
1926         }
1927
1928         return true;
1929 }
1930
1931 /**
1932  * vdo_adjust_reference_count_for_rebuild() - Adjust the reference count of a block during rebuild.
1933  *
1934  * Return: VDO_SUCCESS or an error.
1935  */
1936 int vdo_adjust_reference_count_for_rebuild(struct slab_depot *depot,
1937                                            physical_block_number_t pbn,
1938                                            enum journal_operation operation)
1939 {
1940         int result;
1941         slab_block_number block_number;
1942         struct reference_block *block;
1943         struct vdo_slab *slab = vdo_get_slab(depot, pbn);
1944         struct reference_updater updater = {
1945                 .operation = operation,
1946                 .increment = true,
1947         };
1948
1949         result = slab_block_number_from_pbn(slab, pbn, &block_number);
1950         if (result != VDO_SUCCESS)
1951                 return result;
1952
1953         block = get_reference_block(slab, block_number);
1954         result = update_reference_count(slab, block, block_number, NULL,
1955                                         &updater, !NORMAL_OPERATION, false, NULL);
1956         if (result != VDO_SUCCESS)
1957                 return result;
1958
1959         dirty_block(block);
1960         return VDO_SUCCESS;
1961 }
1962
1963 /**
1964  * replay_reference_count_change() - Replay the reference count adjustment from a slab journal
1965  *                                   entry into the reference count for a block.
1966  * @slab: The slab.
1967  * @entry_point: The slab journal point for the entry.
1968  * @entry: The slab journal entry being replayed.
1969  *
1970  * The adjustment will be ignored if it was already recorded in the reference count.
1971  *
1972  * Return: VDO_SUCCESS or an error code.
1973  */
1974 static int replay_reference_count_change(struct vdo_slab *slab,
1975                                          const struct journal_point *entry_point,
1976                                          struct slab_journal_entry entry)
1977 {
1978         int result;
1979         struct reference_block *block = get_reference_block(slab, entry.sbn);
1980         sector_count_t sector = (entry.sbn % COUNTS_PER_BLOCK) / COUNTS_PER_SECTOR;
1981         struct reference_updater updater = {
1982                 .operation = entry.operation,
1983                 .increment = entry.increment,
1984         };
1985
1986         if (!vdo_before_journal_point(&block->commit_points[sector], entry_point)) {
1987                 /* This entry is already reflected in the existing counts, so do nothing. */
1988                 return VDO_SUCCESS;
1989         }
1990
1991         /* This entry is not yet counted in the reference counts. */
1992         result = update_reference_count(slab, block, entry.sbn, entry_point,
1993                                         &updater, !NORMAL_OPERATION, false, NULL);
1994         if (result != VDO_SUCCESS)
1995                 return result;
1996
1997         dirty_block(block);
1998         return VDO_SUCCESS;
1999 }
2000
2001 /**
2002  * find_zero_byte_in_word() - Find the array index of the first zero byte in word-sized range of
2003  *                            reference counters.
2004  * @word_ptr: A pointer to the eight counter bytes to check.
2005  * @start_index: The array index corresponding to word_ptr[0].
2006  * @fail_index: The array index to return if no zero byte is found.
2007  *
2008  * The search does no bounds checking; the function relies on the array being sufficiently padded.
2009  *
2010  * Return: The array index of the first zero byte in the word, or the value passed as fail_index if
2011  *         no zero byte was found.
2012  */
2013 static inline slab_block_number find_zero_byte_in_word(const u8 *word_ptr,
2014                                                        slab_block_number start_index,
2015                                                        slab_block_number fail_index)
2016 {
2017         u64 word = get_unaligned_le64(word_ptr);
2018
2019         /* This looks like a loop, but GCC will unroll the eight iterations for us. */
2020         unsigned int offset;
2021
2022         for (offset = 0; offset < BYTES_PER_WORD; offset++) {
2023                 /* Assumes little-endian byte order, which we have on X86. */
2024                 if ((word & 0xFF) == 0)
2025                         return (start_index + offset);
2026                 word >>= 8;
2027         }
2028
2029         return fail_index;
2030 }
2031
2032 /**
2033  * vdo_find_free_block() - Find the first block with a reference count of zero in the specified
2034  *                         range of reference counter indexes.
2035  * @slab: The slab counters to scan.
2036  * @index_ptr: A pointer to hold the array index of the free block.
2037  *
2038  * Exposed for unit testing.
2039  *
2040  * Return: true if a free block was found in the specified range.
2041  */
2042 static bool find_free_block(const struct vdo_slab *slab, slab_block_number *index_ptr)
2043 {
2044         slab_block_number zero_index;
2045         slab_block_number next_index = slab->search_cursor.index;
2046         slab_block_number end_index = slab->search_cursor.end_index;
2047         u8 *next_counter = &slab->counters[next_index];
2048         u8 *end_counter = &slab->counters[end_index];
2049
2050         /*
2051          * Search every byte of the first unaligned word. (Array is padded so reading past end is
2052          * safe.)
2053          */
2054         zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2055         if (zero_index < end_index) {
2056                 *index_ptr = zero_index;
2057                 return true;
2058         }
2059
2060         /*
2061          * On architectures where unaligned word access is expensive, this would be a good place to
2062          * advance to an alignment boundary.
2063          */
2064         next_index += BYTES_PER_WORD;
2065         next_counter += BYTES_PER_WORD;
2066
2067         /*
2068          * Now we're word-aligned; check an word at a time until we find a word containing a zero.
2069          * (Array is padded so reading past end is safe.)
2070          */
2071         while (next_counter < end_counter) {
2072                 /*
2073                  * The following code is currently an exact copy of the code preceding the loop,
2074                  * but if you try to merge them by using a do loop, it runs slower because a jump
2075                  * instruction gets added at the start of the iteration.
2076                  */
2077                 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2078                 if (zero_index < end_index) {
2079                         *index_ptr = zero_index;
2080                         return true;
2081                 }
2082
2083                 next_index += BYTES_PER_WORD;
2084                 next_counter += BYTES_PER_WORD;
2085         }
2086
2087         return false;
2088 }
2089
2090 /**
2091  * search_current_reference_block() - Search the reference block currently saved in the search
2092  *                                    cursor for a reference count of zero, starting at the saved
2093  *                                    counter index.
2094  * @slab: The slab to search.
2095  * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2096  *
2097  * Return: true if an unreferenced counter was found.
2098  */
2099 static bool search_current_reference_block(const struct vdo_slab *slab,
2100                                            slab_block_number *free_index_ptr)
2101 {
2102         /* Don't bother searching if the current block is known to be full. */
2103         return ((slab->search_cursor.block->allocated_count < COUNTS_PER_BLOCK) &&
2104                 find_free_block(slab, free_index_ptr));
2105 }
2106
2107 /**
2108  * search_reference_blocks() - Search each reference block for a reference count of zero.
2109  * @slab: The slab to search.
2110  * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2111  *
2112  * Searches each reference block for a reference count of zero, starting at the reference block and
2113  * counter index saved in the search cursor and searching up to the end of the last reference
2114  * block. The search does not wrap.
2115  *
2116  * Return: true if an unreferenced counter was found.
2117  */
2118 static bool search_reference_blocks(struct vdo_slab *slab,
2119                                     slab_block_number *free_index_ptr)
2120 {
2121         /* Start searching at the saved search position in the current block. */
2122         if (search_current_reference_block(slab, free_index_ptr))
2123                 return true;
2124
2125         /* Search each reference block up to the end of the slab. */
2126         while (advance_search_cursor(slab)) {
2127                 if (search_current_reference_block(slab, free_index_ptr))
2128                         return true;
2129         }
2130
2131         return false;
2132 }
2133
2134 /**
2135  * make_provisional_reference() - Do the bookkeeping for making a provisional reference.
2136  */
2137 static void make_provisional_reference(struct vdo_slab *slab,
2138                                        slab_block_number block_number)
2139 {
2140         struct reference_block *block = get_reference_block(slab, block_number);
2141
2142         /*
2143          * Make the initial transition from an unreferenced block to a
2144          * provisionally allocated block.
2145          */
2146         slab->counters[block_number] = PROVISIONAL_REFERENCE_COUNT;
2147
2148         /* Account for the allocation. */
2149         block->allocated_count++;
2150         slab->free_blocks--;
2151 }
2152
2153 /**
2154  * dirty_all_reference_blocks() - Mark all reference count blocks in a slab as dirty.
2155  */
2156 static void dirty_all_reference_blocks(struct vdo_slab *slab)
2157 {
2158         block_count_t i;
2159
2160         for (i = 0; i < slab->reference_block_count; i++)
2161                 dirty_block(&slab->reference_blocks[i]);
2162 }
2163
2164 /**
2165  * clear_provisional_references() - Clear the provisional reference counts from a reference block.
2166  * @block: The block to clear.
2167  */
2168 static void clear_provisional_references(struct reference_block *block)
2169 {
2170         vdo_refcount_t *counters = get_reference_counters_for_block(block);
2171         block_count_t j;
2172
2173         for (j = 0; j < COUNTS_PER_BLOCK; j++) {
2174                 if (counters[j] == PROVISIONAL_REFERENCE_COUNT) {
2175                         counters[j] = EMPTY_REFERENCE_COUNT;
2176                         block->allocated_count--;
2177                 }
2178         }
2179 }
2180
2181 static inline bool journal_points_equal(struct journal_point first,
2182                                         struct journal_point second)
2183 {
2184         return ((first.sequence_number == second.sequence_number) &&
2185                 (first.entry_count == second.entry_count));
2186 }
2187
2188 /**
2189  * unpack_reference_block() - Unpack reference counts blocks into the internal memory structure.
2190  * @packed: The written reference block to be unpacked.
2191  * @block: The internal reference block to be loaded.
2192  */
2193 static void unpack_reference_block(struct packed_reference_block *packed,
2194                                    struct reference_block *block)
2195 {
2196         block_count_t index;
2197         sector_count_t i;
2198         struct vdo_slab *slab = block->slab;
2199         vdo_refcount_t *counters = get_reference_counters_for_block(block);
2200
2201         for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
2202                 struct packed_reference_sector *sector = &packed->sectors[i];
2203
2204                 vdo_unpack_journal_point(&sector->commit_point, &block->commit_points[i]);
2205                 memcpy(counters + (i * COUNTS_PER_SECTOR), sector->counts,
2206                        (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
2207                 /* The slab_journal_point must be the latest point found in any sector. */
2208                 if (vdo_before_journal_point(&slab->slab_journal_point,
2209                                              &block->commit_points[i]))
2210                         slab->slab_journal_point = block->commit_points[i];
2211
2212                 if ((i > 0) &&
2213                     !journal_points_equal(block->commit_points[0],
2214                                           block->commit_points[i])) {
2215                         size_t block_index = block - block->slab->reference_blocks;
2216
2217                         uds_log_warning("Torn write detected in sector %u of reference block %zu of slab %u",
2218                                         i, block_index, block->slab->slab_number);
2219                 }
2220         }
2221
2222         block->allocated_count = 0;
2223         for (index = 0; index < COUNTS_PER_BLOCK; index++) {
2224                 if (counters[index] != EMPTY_REFERENCE_COUNT)
2225                         block->allocated_count++;
2226         }
2227 }
2228
2229 /**
2230  * finish_reference_block_load() - After a reference block has been read, unpack it.
2231  * @completion: The VIO that just finished reading.
2232  */
2233 static void finish_reference_block_load(struct vdo_completion *completion)
2234 {
2235         struct vio *vio = as_vio(completion);
2236         struct pooled_vio *pooled = vio_as_pooled_vio(vio);
2237         struct reference_block *block = completion->parent;
2238         struct vdo_slab *slab = block->slab;
2239
2240         unpack_reference_block((struct packed_reference_block *) vio->data, block);
2241         return_vio_to_pool(slab->allocator->vio_pool, pooled);
2242         slab->active_count--;
2243         clear_provisional_references(block);
2244
2245         slab->free_blocks -= block->allocated_count;
2246         check_if_slab_drained(slab);
2247 }
2248
2249 static void load_reference_block_endio(struct bio *bio)
2250 {
2251         struct vio *vio = bio->bi_private;
2252         struct reference_block *block = vio->completion.parent;
2253
2254         continue_vio_after_io(vio, finish_reference_block_load,
2255                               block->slab->allocator->thread_id);
2256 }
2257
2258 /**
2259  * load_reference_block() - After a block waiter has gotten a VIO from the VIO pool, load the
2260  *                          block.
2261  * @waiter: The waiter of the block to load.
2262  * @context: The VIO returned by the pool.
2263  */
2264 static void load_reference_block(struct vdo_waiter *waiter, void *context)
2265 {
2266         struct pooled_vio *pooled = context;
2267         struct vio *vio = &pooled->vio;
2268         struct reference_block *block =
2269                 container_of(waiter, struct reference_block, waiter);
2270         size_t block_offset = (block - block->slab->reference_blocks);
2271
2272         vio->completion.parent = block;
2273         vdo_submit_metadata_vio(vio, block->slab->ref_counts_origin + block_offset,
2274                                 load_reference_block_endio, handle_io_error,
2275                                 REQ_OP_READ);
2276 }
2277
2278 /**
2279  * load_reference_blocks() - Load a slab's reference blocks from the underlying storage into a
2280  *                           pre-allocated reference counter.
2281  */
2282 static void load_reference_blocks(struct vdo_slab *slab)
2283 {
2284         block_count_t i;
2285
2286         slab->free_blocks = slab->block_count;
2287         slab->active_count = slab->reference_block_count;
2288         for (i = 0; i < slab->reference_block_count; i++) {
2289                 struct vdo_waiter *waiter = &slab->reference_blocks[i].waiter;
2290
2291                 waiter->callback = load_reference_block;
2292                 acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
2293         }
2294 }
2295
2296 /**
2297  * drain_slab() - Drain all reference count I/O.
2298  *
2299  * Depending upon the type of drain being performed (as recorded in the ref_count's vdo_slab), the
2300  * reference blocks may be loaded from disk or dirty reference blocks may be written out.
2301  */
2302 static void drain_slab(struct vdo_slab *slab)
2303 {
2304         bool save;
2305         bool load;
2306         const struct admin_state_code *state = vdo_get_admin_state_code(&slab->state);
2307
2308         if (state == VDO_ADMIN_STATE_SUSPENDING)
2309                 return;
2310
2311         if ((state != VDO_ADMIN_STATE_REBUILDING) &&
2312             (state != VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING))
2313                 commit_tail(&slab->journal);
2314
2315         if ((state == VDO_ADMIN_STATE_RECOVERING) || (slab->counters == NULL))
2316                 return;
2317
2318         save = false;
2319         load = slab->allocator->summary_entries[slab->slab_number].load_ref_counts;
2320         if (state == VDO_ADMIN_STATE_SCRUBBING) {
2321                 if (load) {
2322                         load_reference_blocks(slab);
2323                         return;
2324                 }
2325         } else if (state == VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING) {
2326                 if (!load) {
2327                         /* These reference counts were never written, so mark them all dirty. */
2328                         dirty_all_reference_blocks(slab);
2329                 }
2330                 save = true;
2331         } else if (state == VDO_ADMIN_STATE_REBUILDING) {
2332                 /*
2333                  * Write out the counters if the slab has written them before, or it has any
2334                  * non-zero reference counts, or there are any slab journal blocks.
2335                  */
2336                 block_count_t data_blocks = slab->allocator->depot->slab_config.data_blocks;
2337
2338                 if (load || (slab->free_blocks != data_blocks) ||
2339                     !is_slab_journal_blank(slab)) {
2340                         dirty_all_reference_blocks(slab);
2341                         save = true;
2342                 }
2343         } else if (state == VDO_ADMIN_STATE_SAVING) {
2344                 save = (slab->status == VDO_SLAB_REBUILT);
2345         } else {
2346                 vdo_finish_draining_with_result(&slab->state, VDO_SUCCESS);
2347                 return;
2348         }
2349
2350         if (save)
2351                 save_dirty_reference_blocks(slab);
2352 }
2353
2354 static int allocate_slab_counters(struct vdo_slab *slab)
2355 {
2356         int result;
2357         size_t index, bytes;
2358
2359         result = ASSERT(slab->reference_blocks == NULL,
2360                         "vdo_slab %u doesn't allocate refcounts twice",
2361                         slab->slab_number);
2362         if (result != VDO_SUCCESS)
2363                 return result;
2364
2365         result = uds_allocate(slab->reference_block_count, struct reference_block,
2366                               __func__, &slab->reference_blocks);
2367         if (result != VDO_SUCCESS)
2368                 return result;
2369
2370         /*
2371          * Allocate such that the runt slab has a full-length memory array, plus a little padding
2372          * so we can word-search even at the very end.
2373          */
2374         bytes = (slab->reference_block_count * COUNTS_PER_BLOCK) + (2 * BYTES_PER_WORD);
2375         result = uds_allocate(bytes, vdo_refcount_t, "ref counts array",
2376                               &slab->counters);
2377         if (result != UDS_SUCCESS) {
2378                 uds_free(uds_forget(slab->reference_blocks));
2379                 return result;
2380         }
2381
2382         slab->search_cursor.first_block = slab->reference_blocks;
2383         slab->search_cursor.last_block = &slab->reference_blocks[slab->reference_block_count - 1];
2384         reset_search_cursor(slab);
2385
2386         for (index = 0; index < slab->reference_block_count; index++) {
2387                 slab->reference_blocks[index] = (struct reference_block) {
2388                         .slab = slab,
2389                 };
2390         }
2391
2392         return VDO_SUCCESS;
2393 }
2394
2395 static int allocate_counters_if_clean(struct vdo_slab *slab)
2396 {
2397         if (vdo_is_state_clean_load(&slab->state))
2398                 return allocate_slab_counters(slab);
2399
2400         return VDO_SUCCESS;
2401 }
2402
2403 static void finish_loading_journal(struct vdo_completion *completion)
2404 {
2405         struct vio *vio = as_vio(completion);
2406         struct slab_journal *journal = completion->parent;
2407         struct vdo_slab *slab = journal->slab;
2408         struct packed_slab_journal_block *block = (struct packed_slab_journal_block *) vio->data;
2409         struct slab_journal_block_header header;
2410
2411         vdo_unpack_slab_journal_block_header(&block->header, &header);
2412
2413         /* FIXME: should it be an error if the following conditional fails? */
2414         if ((header.metadata_type == VDO_METADATA_SLAB_JOURNAL) &&
2415             (header.nonce == slab->allocator->nonce)) {
2416                 journal->tail = header.sequence_number + 1;
2417
2418                 /*
2419                  * If the slab is clean, this implies the slab journal is empty, so advance the
2420                  * head appropriately.
2421                  */
2422                 journal->head = (slab->allocator->summary_entries[slab->slab_number].is_dirty ?
2423                                  header.head : journal->tail);
2424                 journal->tail_header = header;
2425                 initialize_journal_state(journal);
2426         }
2427
2428         return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio));
2429         vdo_finish_loading_with_result(&slab->state, allocate_counters_if_clean(slab));
2430 }
2431
2432 static void read_slab_journal_tail_endio(struct bio *bio)
2433 {
2434         struct vio *vio = bio->bi_private;
2435         struct slab_journal *journal = vio->completion.parent;
2436
2437         continue_vio_after_io(vio, finish_loading_journal,
2438                               journal->slab->allocator->thread_id);
2439 }
2440
2441 static void handle_load_error(struct vdo_completion *completion)
2442 {
2443         int result = completion->result;
2444         struct slab_journal *journal = completion->parent;
2445         struct vio *vio = as_vio(completion);
2446
2447         vio_record_metadata_io_error(vio);
2448         return_vio_to_pool(journal->slab->allocator->vio_pool, vio_as_pooled_vio(vio));
2449         vdo_finish_loading_with_result(&journal->slab->state, result);
2450 }
2451
2452 /**
2453  * read_slab_journal_tail() - Read the slab journal tail block by using a vio acquired from the vio
2454  *                            pool.
2455  * @waiter: The vio pool waiter which has just been notified.
2456  * @context: The vio pool entry given to the waiter.
2457  *
2458  * This is the success callback from acquire_vio_from_pool() when loading a slab journal.
2459  */
2460 static void read_slab_journal_tail(struct vdo_waiter *waiter, void *context)
2461 {
2462         struct slab_journal *journal =
2463                 container_of(waiter, struct slab_journal, resource_waiter);
2464         struct vdo_slab *slab = journal->slab;
2465         struct pooled_vio *pooled = context;
2466         struct vio *vio = &pooled->vio;
2467         tail_block_offset_t last_commit_point =
2468                 slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2469
2470         /*
2471          * Slab summary keeps the commit point offset, so the tail block is the block before that.
2472          * Calculation supports small journals in unit tests.
2473          */
2474         tail_block_offset_t tail_block = ((last_commit_point == 0) ?
2475                                           (tail_block_offset_t)(journal->size - 1) :
2476                                           (last_commit_point - 1));
2477
2478         vio->completion.parent = journal;
2479         vio->completion.callback_thread_id = slab->allocator->thread_id;
2480         vdo_submit_metadata_vio(vio, slab->journal_origin + tail_block,
2481                                 read_slab_journal_tail_endio, handle_load_error,
2482                                 REQ_OP_READ);
2483 }
2484
2485 /**
2486  * load_slab_journal() - Load a slab's journal by reading the journal's tail.
2487  */
2488 static void load_slab_journal(struct vdo_slab *slab)
2489 {
2490         struct slab_journal *journal = &slab->journal;
2491         tail_block_offset_t last_commit_point;
2492
2493         last_commit_point = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2494         if ((last_commit_point == 0) &&
2495             !slab->allocator->summary_entries[slab->slab_number].load_ref_counts) {
2496                 /*
2497                  * This slab claims that it has a tail block at (journal->size - 1), but a head of
2498                  * 1. This is impossible, due to the scrubbing threshold, on a real system, so
2499                  * don't bother reading the (bogus) data off disk.
2500                  */
2501                 ASSERT_LOG_ONLY(((journal->size < 16) ||
2502                                  (journal->scrubbing_threshold < (journal->size - 1))),
2503                                 "Scrubbing threshold protects against reads of unwritten slab journal blocks");
2504                 vdo_finish_loading_with_result(&slab->state,
2505                                                allocate_counters_if_clean(slab));
2506                 return;
2507         }
2508
2509         journal->resource_waiter.callback = read_slab_journal_tail;
2510         acquire_vio_from_pool(slab->allocator->vio_pool, &journal->resource_waiter);
2511 }
2512
2513 static void register_slab_for_scrubbing(struct vdo_slab *slab, bool high_priority)
2514 {
2515         struct slab_scrubber *scrubber = &slab->allocator->scrubber;
2516
2517         ASSERT_LOG_ONLY((slab->status != VDO_SLAB_REBUILT),
2518                         "slab to be scrubbed is unrecovered");
2519
2520         if (slab->status != VDO_SLAB_REQUIRES_SCRUBBING)
2521                 return;
2522
2523         list_del_init(&slab->allocq_entry);
2524         if (!slab->was_queued_for_scrubbing) {
2525                 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count + 1);
2526                 slab->was_queued_for_scrubbing = true;
2527         }
2528
2529         if (high_priority) {
2530                 slab->status = VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING;
2531                 list_add_tail(&slab->allocq_entry, &scrubber->high_priority_slabs);
2532                 return;
2533         }
2534
2535         list_add_tail(&slab->allocq_entry, &scrubber->slabs);
2536 }
2537
2538 /* Queue a slab for allocation or scrubbing. */
2539 static void queue_slab(struct vdo_slab *slab)
2540 {
2541         struct block_allocator *allocator = slab->allocator;
2542         block_count_t free_blocks;
2543         int result;
2544
2545         ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
2546                         "a requeued slab must not already be on a ring");
2547
2548         if (vdo_is_read_only(allocator->depot->vdo))
2549                 return;
2550
2551         free_blocks = slab->free_blocks;
2552         result = ASSERT((free_blocks <= allocator->depot->slab_config.data_blocks),
2553                         "rebuilt slab %u must have a valid free block count (has %llu, expected maximum %llu)",
2554                         slab->slab_number, (unsigned long long) free_blocks,
2555                         (unsigned long long) allocator->depot->slab_config.data_blocks);
2556         if (result != VDO_SUCCESS) {
2557                 vdo_enter_read_only_mode(allocator->depot->vdo, result);
2558                 return;
2559         }
2560
2561         if (slab->status != VDO_SLAB_REBUILT) {
2562                 register_slab_for_scrubbing(slab, false);
2563                 return;
2564         }
2565
2566         if (!vdo_is_state_resuming(&slab->state)) {
2567                 /*
2568                  * If the slab is resuming, we've already accounted for it here, so don't do it
2569                  * again.
2570                  * FIXME: under what situation would the slab be resuming here?
2571                  */
2572                 WRITE_ONCE(allocator->allocated_blocks,
2573                            allocator->allocated_blocks - free_blocks);
2574                 if (!is_slab_journal_blank(slab)) {
2575                         WRITE_ONCE(allocator->statistics.slabs_opened,
2576                                    allocator->statistics.slabs_opened + 1);
2577                 }
2578         }
2579
2580         if (allocator->depot->vdo->suspend_type == VDO_ADMIN_STATE_SAVING)
2581                 reopen_slab_journal(slab);
2582
2583         prioritize_slab(slab);
2584 }
2585
2586 /**
2587  * initiate_slab_action() - Initiate a slab action.
2588  *
2589  * Implements vdo_admin_initiator_fn.
2590  */
2591 static void initiate_slab_action(struct admin_state *state)
2592 {
2593         struct vdo_slab *slab = container_of(state, struct vdo_slab, state);
2594
2595         if (vdo_is_state_draining(state)) {
2596                 const struct admin_state_code *operation = vdo_get_admin_state_code(state);
2597
2598                 if (operation == VDO_ADMIN_STATE_SCRUBBING)
2599                         slab->status = VDO_SLAB_REBUILDING;
2600
2601                 drain_slab(slab);
2602                 check_if_slab_drained(slab);
2603                 return;
2604         }
2605
2606         if (vdo_is_state_loading(state)) {
2607                 load_slab_journal(slab);
2608                 return;
2609         }
2610
2611         if (vdo_is_state_resuming(state)) {
2612                 queue_slab(slab);
2613                 vdo_finish_resuming(state);
2614                 return;
2615         }
2616
2617         vdo_finish_operation(state, VDO_INVALID_ADMIN_STATE);
2618 }
2619
2620 /**
2621  * get_next_slab() - Get the next slab to scrub.
2622  * @scrubber: The slab scrubber.
2623  *
2624  * Return: The next slab to scrub or NULL if there are none.
2625  */
2626 static struct vdo_slab *get_next_slab(struct slab_scrubber *scrubber)
2627 {
2628         struct vdo_slab *slab;
2629
2630         slab = list_first_entry_or_null(&scrubber->high_priority_slabs,
2631                                         struct vdo_slab, allocq_entry);
2632         if (slab != NULL)
2633                 return slab;
2634
2635         return list_first_entry_or_null(&scrubber->slabs, struct vdo_slab,
2636                                         allocq_entry);
2637 }
2638
2639 /**
2640  * has_slabs_to_scrub() - Check whether a scrubber has slabs to scrub.
2641  * @scrubber: The scrubber to check.
2642  *
2643  * Return: true if the scrubber has slabs to scrub.
2644  */
2645 static bool __must_check has_slabs_to_scrub(struct slab_scrubber *scrubber)
2646 {
2647         return (get_next_slab(scrubber) != NULL);
2648 }
2649
2650 /**
2651  * uninitialize_scrubber_vio() - Clean up the slab_scrubber's vio.
2652  * @scrubber: The scrubber.
2653  */
2654 static void uninitialize_scrubber_vio(struct slab_scrubber *scrubber)
2655 {
2656         uds_free(uds_forget(scrubber->vio.data));
2657         free_vio_components(&scrubber->vio);
2658 }
2659
2660 /**
2661  * finish_scrubbing() - Stop scrubbing, either because there are no more slabs to scrub or because
2662  *                      there's been an error.
2663  * @scrubber: The scrubber.
2664  */
2665 static void finish_scrubbing(struct slab_scrubber *scrubber, int result)
2666 {
2667         bool notify = vdo_waitq_has_waiters(&scrubber->waiters);
2668         bool done = !has_slabs_to_scrub(scrubber);
2669         struct block_allocator *allocator =
2670                 container_of(scrubber, struct block_allocator, scrubber);
2671
2672         if (done)
2673                 uninitialize_scrubber_vio(scrubber);
2674
2675         if (scrubber->high_priority_only) {
2676                 scrubber->high_priority_only = false;
2677                 vdo_fail_completion(uds_forget(scrubber->vio.completion.parent), result);
2678         } else if (done && (atomic_add_return(-1, &allocator->depot->zones_to_scrub) == 0)) {
2679                 /* All of our slabs were scrubbed, and we're the last allocator to finish. */
2680                 enum vdo_state prior_state =
2681                         atomic_cmpxchg(&allocator->depot->vdo->state, VDO_RECOVERING,
2682                                        VDO_DIRTY);
2683
2684                 /*
2685                  * To be safe, even if the CAS failed, ensure anything that follows is ordered with
2686                  * respect to whatever state change did happen.
2687                  */
2688                 smp_mb__after_atomic();
2689
2690                 /*
2691                  * We must check the VDO state here and not the depot's read_only_notifier since
2692                  * the compare-swap-above could have failed due to a read-only entry which our own
2693                  * thread does not yet know about.
2694                  */
2695                 if (prior_state == VDO_DIRTY)
2696                         uds_log_info("VDO commencing normal operation");
2697                 else if (prior_state == VDO_RECOVERING)
2698                         uds_log_info("Exiting recovery mode");
2699         }
2700
2701         /*
2702          * Note that the scrubber has stopped, and inform anyone who might be waiting for that to
2703          * happen.
2704          */
2705         if (!vdo_finish_draining(&scrubber->admin_state))
2706                 WRITE_ONCE(scrubber->admin_state.current_state,
2707                            VDO_ADMIN_STATE_SUSPENDED);
2708
2709         /*
2710          * We can't notify waiters until after we've finished draining or they'll just requeue.
2711          * Fortunately if there were waiters, we can't have been freed yet.
2712          */
2713         if (notify)
2714                 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2715 }
2716
2717 static void scrub_next_slab(struct slab_scrubber *scrubber);
2718
2719 /**
2720  * slab_scrubbed() - Notify the scrubber that a slab has been scrubbed.
2721  * @completion: The slab rebuild completion.
2722  *
2723  * This callback is registered in apply_journal_entries().
2724  */
2725 static void slab_scrubbed(struct vdo_completion *completion)
2726 {
2727         struct slab_scrubber *scrubber =
2728                 container_of(as_vio(completion), struct slab_scrubber, vio);
2729         struct vdo_slab *slab = scrubber->slab;
2730
2731         slab->status = VDO_SLAB_REBUILT;
2732         queue_slab(slab);
2733         reopen_slab_journal(slab);
2734         WRITE_ONCE(scrubber->slab_count, scrubber->slab_count - 1);
2735         scrub_next_slab(scrubber);
2736 }
2737
2738 /**
2739  * abort_scrubbing() - Abort scrubbing due to an error.
2740  * @scrubber: The slab scrubber.
2741  * @result: The error.
2742  */
2743 static void abort_scrubbing(struct slab_scrubber *scrubber, int result)
2744 {
2745         vdo_enter_read_only_mode(scrubber->vio.completion.vdo, result);
2746         finish_scrubbing(scrubber, result);
2747 }
2748
2749 /**
2750  * handle_scrubber_error() - Handle errors while rebuilding a slab.
2751  * @completion: The slab rebuild completion.
2752  */
2753 static void handle_scrubber_error(struct vdo_completion *completion)
2754 {
2755         struct vio *vio = as_vio(completion);
2756
2757         vio_record_metadata_io_error(vio);
2758         abort_scrubbing(container_of(vio, struct slab_scrubber, vio),
2759                         completion->result);
2760 }
2761
2762 /**
2763  * apply_block_entries() - Apply all the entries in a block to the reference counts.
2764  * @block: A block with entries to apply.
2765  * @entry_count: The number of entries to apply.
2766  * @block_number: The sequence number of the block.
2767  * @slab: The slab to apply the entries to.
2768  *
2769  * Return: VDO_SUCCESS or an error code.
2770  */
2771 static int apply_block_entries(struct packed_slab_journal_block *block,
2772                                journal_entry_count_t entry_count,
2773                                sequence_number_t block_number, struct vdo_slab *slab)
2774 {
2775         struct journal_point entry_point = {
2776                 .sequence_number = block_number,
2777                 .entry_count = 0,
2778         };
2779         int result;
2780         slab_block_number max_sbn = slab->end - slab->start;
2781
2782         while (entry_point.entry_count < entry_count) {
2783                 struct slab_journal_entry entry =
2784                         vdo_decode_slab_journal_entry(block, entry_point.entry_count);
2785
2786                 if (entry.sbn > max_sbn) {
2787                         /* This entry is out of bounds. */
2788                         return uds_log_error_strerror(VDO_CORRUPT_JOURNAL,
2789                                                       "vdo_slab journal entry (%llu, %u) had invalid offset %u in slab (size %u blocks)",
2790                                                       (unsigned long long) block_number,
2791                                                       entry_point.entry_count,
2792                                                       entry.sbn, max_sbn);
2793                 }
2794
2795                 result = replay_reference_count_change(slab, &entry_point, entry);
2796                 if (result != VDO_SUCCESS) {
2797                         uds_log_error_strerror(result,
2798                                                "vdo_slab journal entry (%llu, %u) (%s of offset %u) could not be applied in slab %u",
2799                                                (unsigned long long) block_number,
2800                                                entry_point.entry_count,
2801                                                vdo_get_journal_operation_name(entry.operation),
2802                                                entry.sbn, slab->slab_number);
2803                         return result;
2804                 }
2805                 entry_point.entry_count++;
2806         }
2807
2808         return VDO_SUCCESS;
2809 }
2810
2811 /**
2812  * apply_journal_entries() - Find the relevant vio of the slab journal and apply all valid entries.
2813  * @completion: The metadata read vio completion.
2814  *
2815  * This is a callback registered in start_scrubbing().
2816  */
2817 static void apply_journal_entries(struct vdo_completion *completion)
2818 {
2819         int result;
2820         struct slab_scrubber *scrubber
2821                 = container_of(as_vio(completion), struct slab_scrubber, vio);
2822         struct vdo_slab *slab = scrubber->slab;
2823         struct slab_journal *journal = &slab->journal;
2824
2825         /* Find the boundaries of the useful part of the journal. */
2826         sequence_number_t tail = journal->tail;
2827         tail_block_offset_t end_index = (tail - 1) % journal->size;
2828         char *end_data = scrubber->vio.data + (end_index * VDO_BLOCK_SIZE);
2829         struct packed_slab_journal_block *end_block =
2830                 (struct packed_slab_journal_block *) end_data;
2831
2832         sequence_number_t head = __le64_to_cpu(end_block->header.head);
2833         tail_block_offset_t head_index = head % journal->size;
2834         block_count_t index = head_index;
2835
2836         struct journal_point ref_counts_point = slab->slab_journal_point;
2837         struct journal_point last_entry_applied = ref_counts_point;
2838         sequence_number_t sequence;
2839
2840         for (sequence = head; sequence < tail; sequence++) {
2841                 char *block_data = scrubber->vio.data + (index * VDO_BLOCK_SIZE);
2842                 struct packed_slab_journal_block *block =
2843                         (struct packed_slab_journal_block *) block_data;
2844                 struct slab_journal_block_header header;
2845
2846                 vdo_unpack_slab_journal_block_header(&block->header, &header);
2847
2848                 if ((header.nonce != slab->allocator->nonce) ||
2849                     (header.metadata_type != VDO_METADATA_SLAB_JOURNAL) ||
2850                     (header.sequence_number != sequence) ||
2851                     (header.entry_count > journal->entries_per_block) ||
2852                     (header.has_block_map_increments &&
2853                      (header.entry_count > journal->full_entries_per_block))) {
2854                         /* The block is not what we expect it to be. */
2855                         uds_log_error("vdo_slab journal block for slab %u was invalid",
2856                                       slab->slab_number);
2857                         abort_scrubbing(scrubber, VDO_CORRUPT_JOURNAL);
2858                         return;
2859                 }
2860
2861                 result = apply_block_entries(block, header.entry_count, sequence, slab);
2862                 if (result != VDO_SUCCESS) {
2863                         abort_scrubbing(scrubber, result);
2864                         return;
2865                 }
2866
2867                 last_entry_applied.sequence_number = sequence;
2868                 last_entry_applied.entry_count = header.entry_count - 1;
2869                 index++;
2870                 if (index == journal->size)
2871                         index = 0;
2872         }
2873
2874         /*
2875          * At the end of rebuild, the reference counters should be accurate to the end of the
2876          * journal we just applied.
2877          */
2878         result = ASSERT(!vdo_before_journal_point(&last_entry_applied,
2879                                                   &ref_counts_point),
2880                         "Refcounts are not more accurate than the slab journal");
2881         if (result != VDO_SUCCESS) {
2882                 abort_scrubbing(scrubber, result);
2883                 return;
2884         }
2885
2886         /* Save out the rebuilt reference blocks. */
2887         vdo_prepare_completion(completion, slab_scrubbed, handle_scrubber_error,
2888                                slab->allocator->thread_id, completion->parent);
2889         vdo_start_operation_with_waiter(&slab->state,
2890                                         VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING,
2891                                         completion, initiate_slab_action);
2892 }
2893
2894 static void read_slab_journal_endio(struct bio *bio)
2895 {
2896         struct vio *vio = bio->bi_private;
2897         struct slab_scrubber *scrubber = container_of(vio, struct slab_scrubber, vio);
2898
2899         continue_vio_after_io(bio->bi_private, apply_journal_entries,
2900                               scrubber->slab->allocator->thread_id);
2901 }
2902
2903 /**
2904  * start_scrubbing() - Read the current slab's journal from disk now that it has been flushed.
2905  * @completion: The scrubber's vio completion.
2906  *
2907  * This callback is registered in scrub_next_slab().
2908  */
2909 static void start_scrubbing(struct vdo_completion *completion)
2910 {
2911         struct slab_scrubber *scrubber =
2912                 container_of(as_vio(completion), struct slab_scrubber, vio);
2913         struct vdo_slab *slab = scrubber->slab;
2914
2915         if (!slab->allocator->summary_entries[slab->slab_number].is_dirty) {
2916                 slab_scrubbed(completion);
2917                 return;
2918         }
2919
2920         vdo_submit_metadata_vio(&scrubber->vio, slab->journal_origin,
2921                                 read_slab_journal_endio, handle_scrubber_error,
2922                                 REQ_OP_READ);
2923 }
2924
2925 /**
2926  * scrub_next_slab() - Scrub the next slab if there is one.
2927  * @scrubber: The scrubber.
2928  */
2929 static void scrub_next_slab(struct slab_scrubber *scrubber)
2930 {
2931         struct vdo_completion *completion = &scrubber->vio.completion;
2932         struct vdo_slab *slab;
2933
2934         /*
2935          * Note: this notify call is always safe only because scrubbing can only be started when
2936          * the VDO is quiescent.
2937          */
2938         vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2939
2940         if (vdo_is_read_only(completion->vdo)) {
2941                 finish_scrubbing(scrubber, VDO_READ_ONLY);
2942                 return;
2943         }
2944
2945         slab = get_next_slab(scrubber);
2946         if ((slab == NULL) ||
2947             (scrubber->high_priority_only && list_empty(&scrubber->high_priority_slabs))) {
2948                 finish_scrubbing(scrubber, VDO_SUCCESS);
2949                 return;
2950         }
2951
2952         if (vdo_finish_draining(&scrubber->admin_state))
2953                 return;
2954
2955         list_del_init(&slab->allocq_entry);
2956         scrubber->slab = slab;
2957         vdo_prepare_completion(completion, start_scrubbing, handle_scrubber_error,
2958                                slab->allocator->thread_id, completion->parent);
2959         vdo_start_operation_with_waiter(&slab->state, VDO_ADMIN_STATE_SCRUBBING,
2960                                         completion, initiate_slab_action);
2961 }
2962
2963 /**
2964  * scrub_slabs() - Scrub all of an allocator's slabs that are eligible for scrubbing.
2965  * @allocator: The block_allocator to scrub.
2966  * @parent: The completion to notify when scrubbing is done, implies high_priority, may be NULL.
2967  */
2968 static void scrub_slabs(struct block_allocator *allocator, struct vdo_completion *parent)
2969 {
2970         struct slab_scrubber *scrubber = &allocator->scrubber;
2971
2972         scrubber->vio.completion.parent = parent;
2973         scrubber->high_priority_only = (parent != NULL);
2974         if (!has_slabs_to_scrub(scrubber)) {
2975                 finish_scrubbing(scrubber, VDO_SUCCESS);
2976                 return;
2977         }
2978
2979         if (scrubber->high_priority_only &&
2980             vdo_is_priority_table_empty(allocator->prioritized_slabs) &&
2981             list_empty(&scrubber->high_priority_slabs))
2982                 register_slab_for_scrubbing(get_next_slab(scrubber), true);
2983
2984         vdo_resume_if_quiescent(&scrubber->admin_state);
2985         scrub_next_slab(scrubber);
2986 }
2987
2988 static inline void assert_on_allocator_thread(thread_id_t thread_id,
2989                                               const char *function_name)
2990 {
2991         ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == thread_id),
2992                         "%s called on correct thread", function_name);
2993 }
2994
2995 static void register_slab_with_allocator(struct block_allocator *allocator,
2996                                          struct vdo_slab *slab)
2997 {
2998         allocator->slab_count++;
2999         allocator->last_slab = slab->slab_number;
3000 }
3001
3002 /**
3003  * get_depot_slab_iterator() - Return a slab_iterator over the slabs in a slab_depot.
3004  * @depot: The depot over which to iterate.
3005  * @start: The number of the slab to start iterating from.
3006  * @end: The number of the last slab which may be returned.
3007  * @stride: The difference in slab number between successive slabs.
3008  *
3009  * Iteration always occurs from higher to lower numbered slabs.
3010  *
3011  * Return: An initialized iterator structure.
3012  */
3013 static struct slab_iterator get_depot_slab_iterator(struct slab_depot *depot,
3014                                                     slab_count_t start, slab_count_t end,
3015                                                     slab_count_t stride)
3016 {
3017         struct vdo_slab **slabs = depot->slabs;
3018
3019         return (struct slab_iterator) {
3020                 .slabs = slabs,
3021                 .next = (((slabs == NULL) || (start < end)) ? NULL : slabs[start]),
3022                 .end = end,
3023                 .stride = stride,
3024         };
3025 }
3026
3027 static struct slab_iterator get_slab_iterator(const struct block_allocator *allocator)
3028 {
3029         return get_depot_slab_iterator(allocator->depot, allocator->last_slab,
3030                                        allocator->zone_number,
3031                                        allocator->depot->zone_count);
3032 }
3033
3034 /**
3035  * next_slab() - Get the next slab from a slab_iterator and advance the iterator
3036  * @iterator: The slab_iterator.
3037  *
3038  * Return: The next slab or NULL if the iterator is exhausted.
3039  */
3040 static struct vdo_slab *next_slab(struct slab_iterator *iterator)
3041 {
3042         struct vdo_slab *slab = iterator->next;
3043
3044         if ((slab == NULL) || (slab->slab_number < iterator->end + iterator->stride))
3045                 iterator->next = NULL;
3046         else
3047                 iterator->next = iterator->slabs[slab->slab_number - iterator->stride];
3048
3049         return slab;
3050 }
3051
3052 /**
3053  * abort_waiter() - Abort vios waiting to make journal entries when read-only.
3054  *
3055  * This callback is invoked on all vios waiting to make slab journal entries after the VDO has gone
3056  * into read-only mode. Implements waiter_callback_fn.
3057  */
3058 static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
3059 {
3060         struct reference_updater *updater =
3061                 container_of(waiter, struct reference_updater, waiter);
3062         struct data_vio *data_vio = data_vio_from_reference_updater(updater);
3063
3064         if (updater->increment) {
3065                 continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
3066                 return;
3067         }
3068
3069         vdo_continue_completion(&data_vio->decrement_completion, VDO_READ_ONLY);
3070 }
3071
3072 /* Implements vdo_read_only_notification_fn. */
3073 static void notify_block_allocator_of_read_only_mode(void *listener,
3074                                                      struct vdo_completion *parent)
3075 {
3076         struct block_allocator *allocator = listener;
3077         struct slab_iterator iterator;
3078
3079         assert_on_allocator_thread(allocator->thread_id, __func__);
3080         iterator = get_slab_iterator(allocator);
3081         while (iterator.next != NULL) {
3082                 struct vdo_slab *slab = next_slab(&iterator);
3083
3084                 vdo_waitq_notify_all_waiters(&slab->journal.entry_waiters,
3085                                              abort_waiter, &slab->journal);
3086                 check_if_slab_drained(slab);
3087         }
3088
3089         vdo_finish_completion(parent);
3090 }
3091
3092 /**
3093  * vdo_acquire_provisional_reference() - Acquire a provisional reference on behalf of a PBN lock if
3094  *                                       the block it locks is unreferenced.
3095  * @slab: The slab which contains the block.
3096  * @pbn: The physical block to reference.
3097  * @lock: The lock.
3098  *
3099  * Return: VDO_SUCCESS or an error.
3100  */
3101 int vdo_acquire_provisional_reference(struct vdo_slab *slab, physical_block_number_t pbn,
3102                                       struct pbn_lock *lock)
3103 {
3104         slab_block_number block_number;
3105         int result;
3106
3107         if (vdo_pbn_lock_has_provisional_reference(lock))
3108                 return VDO_SUCCESS;
3109
3110         if (!is_slab_open(slab))
3111                 return VDO_INVALID_ADMIN_STATE;
3112
3113         result = slab_block_number_from_pbn(slab, pbn, &block_number);
3114         if (result != VDO_SUCCESS)
3115                 return result;
3116
3117         if (slab->counters[block_number] == EMPTY_REFERENCE_COUNT) {
3118                 make_provisional_reference(slab, block_number);
3119                 if (lock != NULL)
3120                         vdo_assign_pbn_lock_provisional_reference(lock);
3121         }
3122
3123         if (vdo_pbn_lock_has_provisional_reference(lock))
3124                 adjust_free_block_count(slab, false);
3125
3126         return VDO_SUCCESS;
3127 }
3128
3129 static int __must_check allocate_slab_block(struct vdo_slab *slab,
3130                                             physical_block_number_t *block_number_ptr)
3131 {
3132         slab_block_number free_index;
3133
3134         if (!is_slab_open(slab))
3135                 return VDO_INVALID_ADMIN_STATE;
3136
3137         if (!search_reference_blocks(slab, &free_index))
3138                 return VDO_NO_SPACE;
3139
3140         ASSERT_LOG_ONLY((slab->counters[free_index] == EMPTY_REFERENCE_COUNT),
3141                         "free block must have ref count of zero");
3142         make_provisional_reference(slab, free_index);
3143         adjust_free_block_count(slab, false);
3144
3145         /*
3146          * Update the search hint so the next search will start at the array index just past the
3147          * free block we just found.
3148          */
3149         slab->search_cursor.index = (free_index + 1);
3150
3151         *block_number_ptr = slab->start + free_index;
3152         return VDO_SUCCESS;
3153 }
3154
3155 /**
3156  * open_slab() - Prepare a slab to be allocated from.
3157  * @slab: The slab.
3158  */
3159 static void open_slab(struct vdo_slab *slab)
3160 {
3161         reset_search_cursor(slab);
3162         if (is_slab_journal_blank(slab)) {
3163                 WRITE_ONCE(slab->allocator->statistics.slabs_opened,
3164                            slab->allocator->statistics.slabs_opened + 1);
3165                 dirty_all_reference_blocks(slab);
3166         } else {
3167                 WRITE_ONCE(slab->allocator->statistics.slabs_reopened,
3168                            slab->allocator->statistics.slabs_reopened + 1);
3169         }
3170
3171         slab->allocator->open_slab = slab;
3172 }
3173
3174
3175 /*
3176  * The block allocated will have a provisional reference and the reference must be either confirmed
3177  * with a subsequent increment or vacated with a subsequent decrement via
3178  * vdo_release_block_reference().
3179  */
3180 int vdo_allocate_block(struct block_allocator *allocator,
3181                        physical_block_number_t *block_number_ptr)
3182 {
3183         int result;
3184
3185         if (allocator->open_slab != NULL) {
3186                 /* Try to allocate the next block in the currently open slab. */
3187                 result = allocate_slab_block(allocator->open_slab, block_number_ptr);
3188                 if ((result == VDO_SUCCESS) || (result != VDO_NO_SPACE))
3189                         return result;
3190
3191                 /* Put the exhausted open slab back into the priority table. */
3192                 prioritize_slab(allocator->open_slab);
3193         }
3194
3195         /* Remove the highest priority slab from the priority table and make it the open slab. */
3196         open_slab(list_entry(vdo_priority_table_dequeue(allocator->prioritized_slabs),
3197                              struct vdo_slab, allocq_entry));
3198
3199         /*
3200          * Try allocating again. If we're out of space immediately after opening a slab, then every
3201          * slab must be fully allocated.
3202          */
3203         return allocate_slab_block(allocator->open_slab, block_number_ptr);
3204 }
3205
3206 /**
3207  * vdo_enqueue_clean_slab_waiter() - Wait for a clean slab.
3208  * @allocator: The block_allocator on which to wait.
3209  * @waiter: The waiter.
3210  *
3211  * Return: VDO_SUCCESS if the waiter was queued, VDO_NO_SPACE if there are no slabs to scrub, and
3212  *         some other error otherwise.
3213  */
3214 int vdo_enqueue_clean_slab_waiter(struct block_allocator *allocator,
3215                                   struct vdo_waiter *waiter)
3216 {
3217         if (vdo_is_read_only(allocator->depot->vdo))
3218                 return VDO_READ_ONLY;
3219
3220         if (vdo_is_state_quiescent(&allocator->scrubber.admin_state))
3221                 return VDO_NO_SPACE;
3222
3223         vdo_waitq_enqueue_waiter(&allocator->scrubber.waiters, waiter);
3224         return VDO_SUCCESS;
3225 }
3226
3227 /**
3228  * vdo_modify_reference_count() - Modify the reference count of a block by first making a slab
3229  *                                journal entry and then updating the reference counter.
3230  *
3231  * @data_vio: The data_vio for which to add the entry.
3232  * @updater: Which of the data_vio's reference updaters is being submitted.
3233  */
3234 void vdo_modify_reference_count(struct vdo_completion *completion,
3235                                 struct reference_updater *updater)
3236 {
3237         struct vdo_slab *slab = vdo_get_slab(completion->vdo->depot, updater->zpbn.pbn);
3238
3239         if (!is_slab_open(slab)) {
3240                 vdo_continue_completion(completion, VDO_INVALID_ADMIN_STATE);
3241                 return;
3242         }
3243
3244         if (vdo_is_read_only(completion->vdo)) {
3245                 vdo_continue_completion(completion, VDO_READ_ONLY);
3246                 return;
3247         }
3248
3249         vdo_waitq_enqueue_waiter(&slab->journal.entry_waiters, &updater->waiter);
3250         if ((slab->status != VDO_SLAB_REBUILT) && requires_reaping(&slab->journal))
3251                 register_slab_for_scrubbing(slab, true);
3252
3253         add_entries(&slab->journal);
3254 }
3255
3256 /* Release an unused provisional reference. */
3257 int vdo_release_block_reference(struct block_allocator *allocator,
3258                                 physical_block_number_t pbn)
3259 {
3260         struct reference_updater updater;
3261
3262         if (pbn == VDO_ZERO_BLOCK)
3263                 return VDO_SUCCESS;
3264
3265         updater = (struct reference_updater) {
3266                 .operation = VDO_JOURNAL_DATA_REMAPPING,
3267                 .increment = false,
3268                 .zpbn = {
3269                         .pbn = pbn,
3270                 },
3271         };
3272
3273         return adjust_reference_count(vdo_get_slab(allocator->depot, pbn),
3274                                       &updater, NULL);
3275 }
3276
3277 /*
3278  * This is a min_heap callback function orders slab_status structures using the 'is_clean' field as
3279  * the primary key and the 'emptiness' field as the secondary key.
3280  *
3281  * Slabs need to be pushed onto the rings in the same order they are to be popped off. Popping
3282  * should always get the most empty first, so pushing should be from most empty to least empty.
3283  * Thus, the ordering is reversed from the usual sense since min_heap returns smaller elements
3284  * before larger ones.
3285  */
3286 static bool slab_status_is_less_than(const void *item1, const void *item2)
3287 {
3288         const struct slab_status *info1 = item1;
3289         const struct slab_status *info2 = item2;
3290
3291         if (info1->is_clean != info2->is_clean)
3292                 return info1->is_clean;
3293         if (info1->emptiness != info2->emptiness)
3294                 return info1->emptiness > info2->emptiness;
3295         return info1->slab_number < info2->slab_number;
3296 }
3297
3298 static void swap_slab_statuses(void *item1, void *item2)
3299 {
3300         struct slab_status *info1 = item1;
3301         struct slab_status *info2 = item2;
3302
3303         swap(*info1, *info2);
3304 }
3305
3306 static const struct min_heap_callbacks slab_status_min_heap = {
3307         .elem_size = sizeof(struct slab_status),
3308         .less = slab_status_is_less_than,
3309         .swp = swap_slab_statuses,
3310 };
3311
3312 /* Inform the slab actor that a action has finished on some slab; used by apply_to_slabs(). */
3313 static void slab_action_callback(struct vdo_completion *completion)
3314 {
3315         struct block_allocator *allocator = vdo_as_block_allocator(completion);
3316         struct slab_actor *actor = &allocator->slab_actor;
3317
3318         if (--actor->slab_action_count == 0) {
3319                 actor->callback(completion);
3320                 return;
3321         }
3322
3323         vdo_reset_completion(completion);
3324 }
3325
3326 /* Preserve the error from part of an action and continue. */
3327 static void handle_operation_error(struct vdo_completion *completion)
3328 {
3329         struct block_allocator *allocator = vdo_as_block_allocator(completion);
3330
3331         if (allocator->state.waiter != NULL)
3332                 vdo_set_completion_result(allocator->state.waiter, completion->result);
3333         completion->callback(completion);
3334 }
3335
3336 /* Perform an action on each of an allocator's slabs in parallel. */
3337 static void apply_to_slabs(struct block_allocator *allocator, vdo_action_fn callback)
3338 {
3339         struct slab_iterator iterator;
3340
3341         vdo_prepare_completion(&allocator->completion, slab_action_callback,
3342                                handle_operation_error, allocator->thread_id, NULL);
3343         allocator->completion.requeue = false;
3344
3345         /*
3346          * Since we are going to dequeue all of the slabs, the open slab will become invalid, so
3347          * clear it.
3348          */
3349         allocator->open_slab = NULL;
3350
3351         /* Ensure that we don't finish before we're done starting. */
3352         allocator->slab_actor = (struct slab_actor) {
3353                 .slab_action_count = 1,
3354                 .callback = callback,
3355         };
3356
3357         iterator = get_slab_iterator(allocator);
3358         while (iterator.next != NULL) {
3359                 const struct admin_state_code *operation =
3360                         vdo_get_admin_state_code(&allocator->state);
3361                 struct vdo_slab *slab = next_slab(&iterator);
3362
3363                 list_del_init(&slab->allocq_entry);
3364                 allocator->slab_actor.slab_action_count++;
3365                 vdo_start_operation_with_waiter(&slab->state, operation,
3366                                                 &allocator->completion,
3367                                                 initiate_slab_action);
3368         }
3369
3370         slab_action_callback(&allocator->completion);
3371 }
3372
3373 static void finish_loading_allocator(struct vdo_completion *completion)
3374 {
3375         struct block_allocator *allocator = vdo_as_block_allocator(completion);
3376         const struct admin_state_code *operation =
3377                 vdo_get_admin_state_code(&allocator->state);
3378
3379         if (allocator->eraser != NULL)
3380                 dm_kcopyd_client_destroy(uds_forget(allocator->eraser));
3381
3382         if (operation == VDO_ADMIN_STATE_LOADING_FOR_RECOVERY) {
3383                 void *context =
3384                         vdo_get_current_action_context(allocator->depot->action_manager);
3385
3386                 vdo_replay_into_slab_journals(allocator, context);
3387                 return;
3388         }
3389
3390         vdo_finish_loading(&allocator->state);
3391 }
3392
3393 static void erase_next_slab_journal(struct block_allocator *allocator);
3394
3395 static void copy_callback(int read_err, unsigned long write_err, void *context)
3396 {
3397         struct block_allocator *allocator = context;
3398         int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO);
3399
3400         if (result != VDO_SUCCESS) {
3401                 vdo_fail_completion(&allocator->completion, result);
3402                 return;
3403         }
3404
3405         erase_next_slab_journal(allocator);
3406 }
3407
3408 /* erase_next_slab_journal() - Erase the next slab journal. */
3409 static void erase_next_slab_journal(struct block_allocator *allocator)
3410 {
3411         struct vdo_slab *slab;
3412         physical_block_number_t pbn;
3413         struct dm_io_region regions[1];
3414         struct slab_depot *depot = allocator->depot;
3415         block_count_t blocks = depot->slab_config.slab_journal_blocks;
3416
3417         if (allocator->slabs_to_erase.next == NULL) {
3418                 vdo_finish_completion(&allocator->completion);
3419                 return;
3420         }
3421
3422         slab = next_slab(&allocator->slabs_to_erase);
3423         pbn = slab->journal_origin - depot->vdo->geometry.bio_offset;
3424         regions[0] = (struct dm_io_region) {
3425                 .bdev = vdo_get_backing_device(depot->vdo),
3426                 .sector = pbn * VDO_SECTORS_PER_BLOCK,
3427                 .count = blocks * VDO_SECTORS_PER_BLOCK,
3428         };
3429         dm_kcopyd_zero(allocator->eraser, 1, regions, 0, copy_callback, allocator);
3430 }
3431
3432 /* Implements vdo_admin_initiator_fn. */
3433 static void initiate_load(struct admin_state *state)
3434 {
3435         struct block_allocator *allocator =
3436                 container_of(state, struct block_allocator, state);
3437         const struct admin_state_code *operation = vdo_get_admin_state_code(state);
3438
3439         if (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD) {
3440                 /*
3441                  * Must requeue because the kcopyd client cannot be freed in the same stack frame
3442                  * as the kcopyd callback, lest it deadlock.
3443                  */
3444                 vdo_prepare_completion_for_requeue(&allocator->completion,
3445                                                    finish_loading_allocator,
3446                                                    handle_operation_error,
3447                                                    allocator->thread_id, NULL);
3448                 allocator->eraser = dm_kcopyd_client_create(NULL);
3449                 if (IS_ERR(allocator->eraser)) {
3450                         vdo_fail_completion(&allocator->completion,
3451                                             PTR_ERR(allocator->eraser));
3452                         allocator->eraser = NULL;
3453                         return;
3454                 }
3455                 allocator->slabs_to_erase = get_slab_iterator(allocator);
3456
3457                 erase_next_slab_journal(allocator);
3458                 return;
3459         }
3460
3461         apply_to_slabs(allocator, finish_loading_allocator);
3462 }
3463
3464 /**
3465  * vdo_notify_slab_journals_are_recovered() - Inform a block allocator that its slab journals have
3466  *                                            been recovered from the recovery journal.
3467  * @completion The allocator completion
3468  */
3469 void vdo_notify_slab_journals_are_recovered(struct vdo_completion *completion)
3470 {
3471         struct block_allocator *allocator = vdo_as_block_allocator(completion);
3472
3473         vdo_finish_loading_with_result(&allocator->state, completion->result);
3474 }
3475
3476 static int get_slab_statuses(struct block_allocator *allocator,
3477                              struct slab_status **statuses_ptr)
3478 {
3479         int result;
3480         struct slab_status *statuses;
3481         struct slab_iterator iterator = get_slab_iterator(allocator);
3482
3483         result = uds_allocate(allocator->slab_count, struct slab_status, __func__,
3484                               &statuses);
3485         if (result != VDO_SUCCESS)
3486                 return result;
3487
3488         *statuses_ptr = statuses;
3489
3490         while (iterator.next != NULL)  {
3491                 slab_count_t slab_number = next_slab(&iterator)->slab_number;
3492
3493                 *statuses++ = (struct slab_status) {
3494                         .slab_number = slab_number,
3495                         .is_clean = !allocator->summary_entries[slab_number].is_dirty,
3496                         .emptiness = allocator->summary_entries[slab_number].fullness_hint,
3497                 };
3498         }
3499
3500         return VDO_SUCCESS;
3501 }
3502
3503 /* Prepare slabs for allocation or scrubbing. */
3504 static int __must_check vdo_prepare_slabs_for_allocation(struct block_allocator *allocator)
3505 {
3506         struct slab_status current_slab_status;
3507         struct min_heap heap;
3508         int result;
3509         struct slab_status *slab_statuses;
3510         struct slab_depot *depot = allocator->depot;
3511
3512         WRITE_ONCE(allocator->allocated_blocks,
3513                    allocator->slab_count * depot->slab_config.data_blocks);
3514         result = get_slab_statuses(allocator, &slab_statuses);
3515         if (result != VDO_SUCCESS)
3516                 return result;
3517
3518         /* Sort the slabs by cleanliness, then by emptiness hint. */
3519         heap = (struct min_heap) {
3520                 .data = slab_statuses,
3521                 .nr = allocator->slab_count,
3522                 .size = allocator->slab_count,
3523         };
3524         min_heapify_all(&heap, &slab_status_min_heap);
3525
3526         while (heap.nr > 0) {
3527                 bool high_priority;
3528                 struct vdo_slab *slab;
3529                 struct slab_journal *journal;
3530
3531                 current_slab_status = slab_statuses[0];
3532                 min_heap_pop(&heap, &slab_status_min_heap);
3533                 slab = depot->slabs[current_slab_status.slab_number];
3534
3535                 if ((depot->load_type == VDO_SLAB_DEPOT_REBUILD_LOAD) ||
3536                     (!allocator->summary_entries[slab->slab_number].load_ref_counts &&
3537                      current_slab_status.is_clean)) {
3538                         queue_slab(slab);
3539                         continue;
3540                 }
3541
3542                 slab->status = VDO_SLAB_REQUIRES_SCRUBBING;
3543                 journal = &slab->journal;
3544                 high_priority = ((current_slab_status.is_clean &&
3545                                  (depot->load_type == VDO_SLAB_DEPOT_NORMAL_LOAD)) ||
3546                                  (journal_length(journal) >= journal->scrubbing_threshold));
3547                 register_slab_for_scrubbing(slab, high_priority);
3548         }
3549
3550         uds_free(slab_statuses);
3551         return VDO_SUCCESS;
3552 }
3553
3554 static const char *status_to_string(enum slab_rebuild_status status)
3555 {
3556         switch (status) {
3557         case VDO_SLAB_REBUILT:
3558                 return "REBUILT";
3559         case VDO_SLAB_REQUIRES_SCRUBBING:
3560                 return "SCRUBBING";
3561         case VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING:
3562                 return "PRIORITY_SCRUBBING";
3563         case VDO_SLAB_REBUILDING:
3564                 return "REBUILDING";
3565         case VDO_SLAB_REPLAYING:
3566                 return "REPLAYING";
3567         default:
3568                 return "UNKNOWN";
3569         }
3570 }
3571
3572 void vdo_dump_block_allocator(const struct block_allocator *allocator)
3573 {
3574         unsigned int pause_counter = 0;
3575         struct slab_iterator iterator = get_slab_iterator(allocator);
3576         const struct slab_scrubber *scrubber = &allocator->scrubber;
3577
3578         uds_log_info("block_allocator zone %u", allocator->zone_number);
3579         while (iterator.next != NULL) {
3580                 struct vdo_slab *slab = next_slab(&iterator);
3581                 struct slab_journal *journal = &slab->journal;
3582
3583                 if (slab->reference_blocks != NULL) {
3584                         /* Terse because there are a lot of slabs to dump and syslog is lossy. */
3585                         uds_log_info("slab %u: P%u, %llu free", slab->slab_number,
3586                                      slab->priority,
3587                                      (unsigned long long) slab->free_blocks);
3588                 } else {
3589                         uds_log_info("slab %u: status %s", slab->slab_number,
3590                                      status_to_string(slab->status));
3591                 }
3592
3593                 uds_log_info("  slab journal: entry_waiters=%zu waiting_to_commit=%s updating_slab_summary=%s head=%llu unreapable=%llu tail=%llu next_commit=%llu summarized=%llu last_summarized=%llu recovery_lock=%llu dirty=%s",
3594                              vdo_waitq_num_waiters(&journal->entry_waiters),
3595                              uds_bool_to_string(journal->waiting_to_commit),
3596                              uds_bool_to_string(journal->updating_slab_summary),
3597                              (unsigned long long) journal->head,
3598                              (unsigned long long) journal->unreapable,
3599                              (unsigned long long) journal->tail,
3600                              (unsigned long long) journal->next_commit,
3601                              (unsigned long long) journal->summarized,
3602                              (unsigned long long) journal->last_summarized,
3603                              (unsigned long long) journal->recovery_lock,
3604                              uds_bool_to_string(journal->recovery_lock != 0));
3605                 /*
3606                  * Given the frequency with which the locks are just a tiny bit off, it might be
3607                  * worth dumping all the locks, but that might be too much logging.
3608                  */
3609
3610                 if (slab->counters != NULL) {
3611                         /* Terse because there are a lot of slabs to dump and syslog is lossy. */
3612                         uds_log_info("  slab: free=%u/%u blocks=%u dirty=%zu active=%zu journal@(%llu,%u)",
3613                                      slab->free_blocks, slab->block_count,
3614                                      slab->reference_block_count,
3615                                      vdo_waitq_num_waiters(&slab->dirty_blocks),
3616                                      slab->active_count,
3617                                      (unsigned long long) slab->slab_journal_point.sequence_number,
3618                                      slab->slab_journal_point.entry_count);
3619                 } else {
3620                         uds_log_info("  no counters");
3621                 }
3622
3623                 /*
3624                  * Wait for a while after each batch of 32 slabs dumped, an arbitrary number,
3625                  * allowing the kernel log a chance to be flushed instead of being overrun.
3626                  */
3627                 if (pause_counter++ == 31) {
3628                         pause_counter = 0;
3629                         uds_pause_for_logger();
3630                 }
3631         }
3632
3633         uds_log_info("slab_scrubber slab_count %u waiters %zu %s%s",
3634                      READ_ONCE(scrubber->slab_count),
3635                      vdo_waitq_num_waiters(&scrubber->waiters),
3636                      vdo_get_admin_state_code(&scrubber->admin_state)->name,
3637                      scrubber->high_priority_only ? ", high_priority_only " : "");
3638 }
3639
3640 static void free_slab(struct vdo_slab *slab)
3641 {
3642         if (slab == NULL)
3643                 return;
3644
3645         list_del(&slab->allocq_entry);
3646         uds_free(uds_forget(slab->journal.block));
3647         uds_free(uds_forget(slab->journal.locks));
3648         uds_free(uds_forget(slab->counters));
3649         uds_free(uds_forget(slab->reference_blocks));
3650         uds_free(slab);
3651 }
3652
3653 static int initialize_slab_journal(struct vdo_slab *slab)
3654 {
3655         struct slab_journal *journal = &slab->journal;
3656         const struct slab_config *slab_config = &slab->allocator->depot->slab_config;
3657         int result;
3658
3659         result = uds_allocate(slab_config->slab_journal_blocks, struct journal_lock,
3660                               __func__, &journal->locks);
3661         if (result != VDO_SUCCESS)
3662                 return result;
3663
3664         result = uds_allocate(VDO_BLOCK_SIZE, char, "struct packed_slab_journal_block",
3665                               (char **) &journal->block);
3666         if (result != VDO_SUCCESS)
3667                 return result;
3668
3669         journal->slab = slab;
3670         journal->size = slab_config->slab_journal_blocks;
3671         journal->flushing_threshold = slab_config->slab_journal_flushing_threshold;
3672         journal->blocking_threshold = slab_config->slab_journal_blocking_threshold;
3673         journal->scrubbing_threshold = slab_config->slab_journal_scrubbing_threshold;
3674         journal->entries_per_block = VDO_SLAB_JOURNAL_ENTRIES_PER_BLOCK;
3675         journal->full_entries_per_block = VDO_SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK;
3676         journal->events = &slab->allocator->slab_journal_statistics;
3677         journal->recovery_journal = slab->allocator->depot->vdo->recovery_journal;
3678         journal->tail = 1;
3679         journal->head = 1;
3680
3681         journal->flushing_deadline = journal->flushing_threshold;
3682         /*
3683          * Set there to be some time between the deadline and the blocking threshold, so that
3684          * hopefully all are done before blocking.
3685          */
3686         if ((journal->blocking_threshold - journal->flushing_threshold) > 5)
3687                 journal->flushing_deadline = journal->blocking_threshold - 5;
3688
3689         journal->slab_summary_waiter.callback = release_journal_locks;
3690
3691         INIT_LIST_HEAD(&journal->dirty_entry);
3692         INIT_LIST_HEAD(&journal->uncommitted_blocks);
3693
3694         journal->tail_header.nonce = slab->allocator->nonce;
3695         journal->tail_header.metadata_type = VDO_METADATA_SLAB_JOURNAL;
3696         initialize_journal_state(journal);
3697         return VDO_SUCCESS;
3698 }
3699
3700 /**
3701  * make_slab() - Construct a new, empty slab.
3702  * @slab_origin: The physical block number within the block allocator partition of the first block
3703  *               in the slab.
3704  * @allocator: The block allocator to which the slab belongs.
3705  * @slab_number: The slab number of the slab.
3706  * @is_new: true if this slab is being allocated as part of a resize.
3707  * @slab_ptr: A pointer to receive the new slab.
3708  *
3709  * Return: VDO_SUCCESS or an error code.
3710  */
3711 static int __must_check make_slab(physical_block_number_t slab_origin,
3712                                   struct block_allocator *allocator,
3713                                   slab_count_t slab_number, bool is_new,
3714                                   struct vdo_slab **slab_ptr)
3715 {
3716         const struct slab_config *slab_config = &allocator->depot->slab_config;
3717         struct vdo_slab *slab;
3718         int result;
3719
3720         result = uds_allocate(1, struct vdo_slab, __func__, &slab);
3721         if (result != VDO_SUCCESS)
3722                 return result;
3723
3724         *slab = (struct vdo_slab) {
3725                 .allocator = allocator,
3726                 .start = slab_origin,
3727                 .end = slab_origin + slab_config->slab_blocks,
3728                 .slab_number = slab_number,
3729                 .ref_counts_origin = slab_origin + slab_config->data_blocks,
3730                 .journal_origin =
3731                         vdo_get_slab_journal_start_block(slab_config, slab_origin),
3732                 .block_count = slab_config->data_blocks,
3733                 .free_blocks = slab_config->data_blocks,
3734                 .reference_block_count =
3735                         vdo_get_saved_reference_count_size(slab_config->data_blocks),
3736         };
3737         INIT_LIST_HEAD(&slab->allocq_entry);
3738
3739         result = initialize_slab_journal(slab);
3740         if (result != VDO_SUCCESS) {
3741                 free_slab(slab);
3742                 return result;
3743         }
3744
3745         if (is_new) {
3746                 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NEW);
3747                 result = allocate_slab_counters(slab);
3748                 if (result != VDO_SUCCESS) {
3749                         free_slab(slab);
3750                         return result;
3751                 }
3752         } else {
3753                 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
3754         }
3755
3756         *slab_ptr = slab;
3757         return VDO_SUCCESS;
3758 }
3759
3760 /**
3761  * allocate_slabs() - Allocate a new slab pointer array.
3762  * @depot: The depot.
3763  * @slab_count: The number of slabs the depot should have in the new array.
3764  *
3765  * Any existing slab pointers will be copied into the new array, and slabs will be allocated as
3766  * needed. The newly allocated slabs will not be distributed for use by the block allocators.
3767  *
3768  * Return: VDO_SUCCESS or an error code.
3769  */
3770 static int allocate_slabs(struct slab_depot *depot, slab_count_t slab_count)
3771 {
3772         block_count_t slab_size;
3773         bool resizing = false;
3774         physical_block_number_t slab_origin;
3775         int result;
3776
3777         result = uds_allocate(slab_count, struct vdo_slab *,
3778                               "slab pointer array", &depot->new_slabs);
3779         if (result != VDO_SUCCESS)
3780                 return result;
3781
3782         if (depot->slabs != NULL) {
3783                 memcpy(depot->new_slabs, depot->slabs,
3784                        depot->slab_count * sizeof(struct vdo_slab *));
3785                 resizing = true;
3786         }
3787
3788         slab_size = depot->slab_config.slab_blocks;
3789         slab_origin = depot->first_block + (depot->slab_count * slab_size);
3790
3791         for (depot->new_slab_count = depot->slab_count;
3792              depot->new_slab_count < slab_count;
3793              depot->new_slab_count++, slab_origin += slab_size) {
3794                 struct block_allocator *allocator =
3795                         &depot->allocators[depot->new_slab_count % depot->zone_count];
3796                 struct vdo_slab **slab_ptr = &depot->new_slabs[depot->new_slab_count];
3797
3798                 result = make_slab(slab_origin, allocator, depot->new_slab_count,
3799                                    resizing, slab_ptr);
3800                 if (result != VDO_SUCCESS)
3801                         return result;
3802         }
3803
3804         return VDO_SUCCESS;
3805 }
3806
3807 /**
3808  * vdo_abandon_new_slabs() - Abandon any new slabs in this depot, freeing them as needed.
3809  * @depot: The depot.
3810  */
3811 void vdo_abandon_new_slabs(struct slab_depot *depot)
3812 {
3813         slab_count_t i;
3814
3815         if (depot->new_slabs == NULL)
3816                 return;
3817
3818         for (i = depot->slab_count; i < depot->new_slab_count; i++)
3819                 free_slab(uds_forget(depot->new_slabs[i]));
3820         depot->new_slab_count = 0;
3821         depot->new_size = 0;
3822         uds_free(uds_forget(depot->new_slabs));
3823 }
3824
3825 /**
3826  * get_allocator_thread_id() - Get the ID of the thread on which a given allocator operates.
3827  *
3828  * Implements vdo_zone_thread_getter_fn.
3829  */
3830 static thread_id_t get_allocator_thread_id(void *context, zone_count_t zone_number)
3831 {
3832         return ((struct slab_depot *) context)->allocators[zone_number].thread_id;
3833 }
3834
3835 /**
3836  * release_recovery_journal_lock() - Request the slab journal to release the recovery journal lock
3837  *                                   it may hold on a specified recovery journal block.
3838  * @journal: The slab journal.
3839  * @recovery_lock: The sequence number of the recovery journal block whose locks should be
3840  *                 released.
3841  *
3842  * Return: true if the journal does hold a lock on the specified block (which it will release).
3843  */
3844 static bool __must_check release_recovery_journal_lock(struct slab_journal *journal,
3845                                                        sequence_number_t recovery_lock)
3846 {
3847         if (recovery_lock > journal->recovery_lock) {
3848                 ASSERT_LOG_ONLY((recovery_lock < journal->recovery_lock),
3849                                 "slab journal recovery lock is not older than the recovery journal head");
3850                 return false;
3851         }
3852
3853         if ((recovery_lock < journal->recovery_lock) ||
3854             vdo_is_read_only(journal->slab->allocator->depot->vdo))
3855                 return false;
3856
3857         /* All locks are held by the block which is in progress; write it. */
3858         commit_tail(journal);
3859         return true;
3860 }
3861
3862 /*
3863  * Request a commit of all dirty tail blocks which are locking the recovery journal block the depot
3864  * is seeking to release.
3865  *
3866  * Implements vdo_zone_action_fn.
3867  */
3868 static void release_tail_block_locks(void *context, zone_count_t zone_number,
3869                                      struct vdo_completion *parent)
3870 {
3871         struct slab_journal *journal, *tmp;
3872         struct slab_depot *depot = context;
3873         struct list_head *list = &depot->allocators[zone_number].dirty_slab_journals;
3874
3875         list_for_each_entry_safe(journal, tmp, list, dirty_entry) {
3876                 if (!release_recovery_journal_lock(journal,
3877                                                    depot->active_release_request))
3878                         break;
3879         }
3880
3881         vdo_finish_completion(parent);
3882 }
3883
3884 /**
3885  * prepare_for_tail_block_commit() - Prepare to commit oldest tail blocks.
3886  *
3887  * Implements vdo_action_preamble_fn.
3888  */
3889 static void prepare_for_tail_block_commit(void *context, struct vdo_completion *parent)
3890 {
3891         struct slab_depot *depot = context;
3892
3893         depot->active_release_request = depot->new_release_request;
3894         vdo_finish_completion(parent);
3895 }
3896
3897 /**
3898  * schedule_tail_block_commit() - Schedule a tail block commit if necessary.
3899  *
3900  * This method should not be called directly. Rather, call vdo_schedule_default_action() on the
3901  * depot's action manager.
3902  *
3903  * Implements vdo_action_scheduler_fn.
3904  */
3905 static bool schedule_tail_block_commit(void *context)
3906 {
3907         struct slab_depot *depot = context;
3908
3909         if (depot->new_release_request == depot->active_release_request)
3910                 return false;
3911
3912         return vdo_schedule_action(depot->action_manager,
3913                                    prepare_for_tail_block_commit,
3914                                    release_tail_block_locks,
3915                                    NULL, NULL);
3916 }
3917
3918 /**
3919  * initialize_slab_scrubber() - Initialize an allocator's slab scrubber.
3920  * @allocator: The allocator being initialized
3921  *
3922  * Return: VDO_SUCCESS or an error.
3923  */
3924 static int initialize_slab_scrubber(struct block_allocator *allocator)
3925 {
3926         struct slab_scrubber *scrubber = &allocator->scrubber;
3927         block_count_t slab_journal_size =
3928                 allocator->depot->slab_config.slab_journal_blocks;
3929         char *journal_data;
3930         int result;
3931
3932         result = uds_allocate(VDO_BLOCK_SIZE * slab_journal_size,
3933                               char, __func__, &journal_data);
3934         if (result != VDO_SUCCESS)
3935                 return result;
3936
3937         result = allocate_vio_components(allocator->completion.vdo,
3938                                          VIO_TYPE_SLAB_JOURNAL,
3939                                          VIO_PRIORITY_METADATA,
3940                                          allocator, slab_journal_size,
3941                                          journal_data, &scrubber->vio);
3942         if (result != VDO_SUCCESS) {
3943                 uds_free(journal_data);
3944                 return result;
3945         }
3946
3947         INIT_LIST_HEAD(&scrubber->high_priority_slabs);
3948         INIT_LIST_HEAD(&scrubber->slabs);
3949         vdo_set_admin_state_code(&scrubber->admin_state, VDO_ADMIN_STATE_SUSPENDED);
3950         return VDO_SUCCESS;
3951 }
3952
3953 /**
3954  * initialize_slab_summary_block() - Initialize a slab_summary_block.
3955  * @allocator: The allocator which owns the block.
3956  * @index: The index of this block in its zone's summary.
3957  *
3958  * Return: VDO_SUCCESS or an error.
3959  */
3960 static int __must_check initialize_slab_summary_block(struct block_allocator *allocator,
3961                                                       block_count_t index)
3962 {
3963         struct slab_summary_block *block = &allocator->summary_blocks[index];
3964         int result;
3965
3966         result = uds_allocate(VDO_BLOCK_SIZE, char, __func__, &block->outgoing_entries);
3967         if (result != VDO_SUCCESS)
3968                 return result;
3969
3970         result = allocate_vio_components(allocator->depot->vdo, VIO_TYPE_SLAB_SUMMARY,
3971                                          VIO_PRIORITY_METADATA, NULL, 1,
3972                                          block->outgoing_entries, &block->vio);
3973         if (result != VDO_SUCCESS)
3974                 return result;
3975
3976         block->allocator = allocator;
3977         block->entries = &allocator->summary_entries[VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK * index];
3978         block->index = index;
3979         return VDO_SUCCESS;
3980 }
3981
3982 static int __must_check initialize_block_allocator(struct slab_depot *depot,
3983                                                    zone_count_t zone)
3984 {
3985         int result;
3986         block_count_t i;
3987         struct block_allocator *allocator = &depot->allocators[zone];
3988         struct vdo *vdo = depot->vdo;
3989         block_count_t max_free_blocks = depot->slab_config.data_blocks;
3990         unsigned int max_priority = (2 + ilog2(max_free_blocks));
3991
3992         *allocator = (struct block_allocator) {
3993                 .depot = depot,
3994                 .zone_number = zone,
3995                 .thread_id = vdo->thread_config.physical_threads[zone],
3996                 .nonce = vdo->states.vdo.nonce,
3997         };
3998
3999         INIT_LIST_HEAD(&allocator->dirty_slab_journals);
4000         vdo_set_admin_state_code(&allocator->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
4001         result = vdo_register_read_only_listener(vdo, allocator,
4002                                                  notify_block_allocator_of_read_only_mode,
4003                                                  allocator->thread_id);
4004         if (result != VDO_SUCCESS)
4005                 return result;
4006
4007         vdo_initialize_completion(&allocator->completion, vdo, VDO_BLOCK_ALLOCATOR_COMPLETION);
4008         result = make_vio_pool(vdo, BLOCK_ALLOCATOR_VIO_POOL_SIZE, allocator->thread_id,
4009                                VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
4010                                allocator, &allocator->vio_pool);
4011         if (result != VDO_SUCCESS)
4012                 return result;
4013
4014         result = initialize_slab_scrubber(allocator);
4015         if (result != VDO_SUCCESS)
4016                 return result;
4017
4018         result = vdo_make_priority_table(max_priority, &allocator->prioritized_slabs);
4019         if (result != VDO_SUCCESS)
4020                 return result;
4021
4022         result = uds_allocate(VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE,
4023                               struct slab_summary_block, __func__,
4024                               &allocator->summary_blocks);
4025         if (result != VDO_SUCCESS)
4026                 return result;
4027
4028         vdo_set_admin_state_code(&allocator->summary_state,
4029                                  VDO_ADMIN_STATE_NORMAL_OPERATION);
4030         allocator->summary_entries = depot->summary_entries + (MAX_VDO_SLABS * zone);
4031
4032         /* Initialize each summary block. */
4033         for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4034                 result = initialize_slab_summary_block(allocator, i);
4035                 if (result != VDO_SUCCESS)
4036                         return result;
4037         }
4038
4039         /*
4040          * Performing well atop thin provisioned storage requires either that VDO discards freed
4041          * blocks, or that the block allocator try to use slabs that already have allocated blocks
4042          * in preference to slabs that have never been opened. For reasons we have not been able to
4043          * fully understand, some SSD machines have been have been very sensitive (50% reduction in
4044          * test throughput) to very slight differences in the timing and locality of block
4045          * allocation. Assigning a low priority to unopened slabs (max_priority/2, say) would be
4046          * ideal for the story, but anything less than a very high threshold (max_priority - 1)
4047          * hurts on these machines.
4048          *
4049          * This sets the free block threshold for preferring to open an unopened slab to the binary
4050          * floor of 3/4ths the total number of data blocks in a slab, which will generally evaluate
4051          * to about half the slab size.
4052          */
4053         allocator->unopened_slab_priority = (1 + ilog2((max_free_blocks * 3) / 4));
4054
4055         return VDO_SUCCESS;
4056 }
4057
4058 static int allocate_components(struct slab_depot *depot,
4059                                struct partition *summary_partition)
4060 {
4061         int result;
4062         zone_count_t zone;
4063         slab_count_t slab_count;
4064         u8 hint;
4065         u32 i;
4066         const struct thread_config *thread_config = &depot->vdo->thread_config;
4067
4068         result = vdo_make_action_manager(depot->zone_count, get_allocator_thread_id,
4069                                          thread_config->journal_thread, depot,
4070                                          schedule_tail_block_commit,
4071                                          depot->vdo, &depot->action_manager);
4072         if (result != VDO_SUCCESS)
4073                 return result;
4074
4075         depot->origin = depot->first_block;
4076
4077         /* block size must be a multiple of entry size */
4078         BUILD_BUG_ON((VDO_BLOCK_SIZE % sizeof(struct slab_summary_entry)) != 0);
4079
4080         depot->summary_origin = summary_partition->offset;
4081         depot->hint_shift = vdo_get_slab_summary_hint_shift(depot->slab_size_shift);
4082         result = uds_allocate(MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES,
4083                               struct slab_summary_entry, __func__,
4084                               &depot->summary_entries);
4085         if (result != VDO_SUCCESS)
4086                 return result;
4087
4088
4089         /* Initialize all the entries. */
4090         hint = compute_fullness_hint(depot, depot->slab_config.data_blocks);
4091         for (i = 0; i < MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES; i++) {
4092                 /*
4093                  * This default tail block offset must be reflected in
4094                  * slabJournal.c::read_slab_journal_tail().
4095                  */
4096                 depot->summary_entries[i] = (struct slab_summary_entry) {
4097                         .tail_block_offset = 0,
4098                         .fullness_hint = hint,
4099                         .load_ref_counts = false,
4100                         .is_dirty = false,
4101                 };
4102         }
4103
4104         if (result != VDO_SUCCESS)
4105                 return result;
4106
4107         slab_count = vdo_compute_slab_count(depot->first_block, depot->last_block,
4108                                             depot->slab_size_shift);
4109         if (thread_config->physical_zone_count > slab_count) {
4110                 return uds_log_error_strerror(VDO_BAD_CONFIGURATION,
4111                                               "%u physical zones exceeds slab count %u",
4112                                               thread_config->physical_zone_count,
4113                                               slab_count);
4114         }
4115
4116         /* Initialize the block allocators. */
4117         for (zone = 0; zone < depot->zone_count; zone++) {
4118                 result = initialize_block_allocator(depot, zone);
4119                 if (result != VDO_SUCCESS)
4120                         return result;
4121         }
4122
4123         /* Allocate slabs. */
4124         result = allocate_slabs(depot, slab_count);
4125         if (result != VDO_SUCCESS)
4126                 return result;
4127
4128         /* Use the new slabs. */
4129         for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4130                 struct vdo_slab *slab = depot->new_slabs[i];
4131
4132                 register_slab_with_allocator(slab->allocator, slab);
4133                 WRITE_ONCE(depot->slab_count, depot->slab_count + 1);
4134         }
4135
4136         depot->slabs = depot->new_slabs;
4137         depot->new_slabs = NULL;
4138         depot->new_slab_count = 0;
4139
4140         return VDO_SUCCESS;
4141 }
4142
4143 /**
4144  * vdo_decode_slab_depot() - Make a slab depot and configure it with the state read from the super
4145  *                           block.
4146  * @state: The slab depot state from the super block.
4147  * @vdo: The VDO which will own the depot.
4148  * @summary_partition: The partition which holds the slab summary.
4149  * @depot_ptr: A pointer to hold the depot.
4150  *
4151  * Return: A success or error code.
4152  */
4153 int vdo_decode_slab_depot(struct slab_depot_state_2_0 state, struct vdo *vdo,
4154                           struct partition *summary_partition,
4155                           struct slab_depot **depot_ptr)
4156 {
4157         unsigned int slab_size_shift;
4158         struct slab_depot *depot;
4159         int result;
4160
4161         /*
4162          * Calculate the bit shift for efficiently mapping block numbers to slabs. Using a shift
4163          * requires that the slab size be a power of two.
4164          */
4165         block_count_t slab_size = state.slab_config.slab_blocks;
4166
4167         if (!is_power_of_2(slab_size)) {
4168                 return uds_log_error_strerror(UDS_INVALID_ARGUMENT,
4169                                               "slab size must be a power of two");
4170         }
4171         slab_size_shift = ilog2(slab_size);
4172
4173         result = uds_allocate_extended(struct slab_depot,
4174                                        vdo->thread_config.physical_zone_count,
4175                                        struct block_allocator, __func__, &depot);
4176         if (result != VDO_SUCCESS)
4177                 return result;
4178
4179         depot->vdo = vdo;
4180         depot->old_zone_count = state.zone_count;
4181         depot->zone_count = vdo->thread_config.physical_zone_count;
4182         depot->slab_config = state.slab_config;
4183         depot->first_block = state.first_block;
4184         depot->last_block = state.last_block;
4185         depot->slab_size_shift = slab_size_shift;
4186
4187         result = allocate_components(depot, summary_partition);
4188         if (result != VDO_SUCCESS) {
4189                 vdo_free_slab_depot(depot);
4190                 return result;
4191         }
4192
4193         *depot_ptr = depot;
4194         return VDO_SUCCESS;
4195 }
4196
4197 static void uninitialize_allocator_summary(struct block_allocator *allocator)
4198 {
4199         block_count_t i;
4200
4201         if (allocator->summary_blocks == NULL)
4202                 return;
4203
4204         for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4205                 free_vio_components(&allocator->summary_blocks[i].vio);
4206                 uds_free(uds_forget(allocator->summary_blocks[i].outgoing_entries));
4207         }
4208
4209         uds_free(uds_forget(allocator->summary_blocks));
4210 }
4211
4212 /**
4213  * vdo_free_slab_depot() - Destroy a slab depot.
4214  * @depot: The depot to destroy.
4215  */
4216 void vdo_free_slab_depot(struct slab_depot *depot)
4217 {
4218         zone_count_t zone = 0;
4219
4220         if (depot == NULL)
4221                 return;
4222
4223         vdo_abandon_new_slabs(depot);
4224
4225         for (zone = 0; zone < depot->zone_count; zone++) {
4226                 struct block_allocator *allocator = &depot->allocators[zone];
4227
4228                 if (allocator->eraser != NULL)
4229                         dm_kcopyd_client_destroy(uds_forget(allocator->eraser));
4230
4231                 uninitialize_allocator_summary(allocator);
4232                 uninitialize_scrubber_vio(&allocator->scrubber);
4233                 free_vio_pool(uds_forget(allocator->vio_pool));
4234                 vdo_free_priority_table(uds_forget(allocator->prioritized_slabs));
4235         }
4236
4237         if (depot->slabs != NULL) {
4238                 slab_count_t i;
4239
4240                 for (i = 0; i < depot->slab_count; i++)
4241                         free_slab(uds_forget(depot->slabs[i]));
4242         }
4243
4244         uds_free(uds_forget(depot->slabs));
4245         uds_free(uds_forget(depot->action_manager));
4246         uds_free(uds_forget(depot->summary_entries));
4247         uds_free(depot);
4248 }
4249
4250 /**
4251  * vdo_record_slab_depot() - Record the state of a slab depot for encoding into the super block.
4252  * @depot: The depot to encode.
4253  *
4254  * Return: The depot state.
4255  */
4256 struct slab_depot_state_2_0 vdo_record_slab_depot(const struct slab_depot *depot)
4257 {
4258         /*
4259          * If this depot is currently using 0 zones, it must have been synchronously loaded by a
4260          * tool and is now being saved. We did not load and combine the slab summary, so we still
4261          * need to do that next time we load with the old zone count rather than 0.
4262          */
4263         struct slab_depot_state_2_0 state;
4264         zone_count_t zones_to_record = depot->zone_count;
4265
4266         if (depot->zone_count == 0)
4267                 zones_to_record = depot->old_zone_count;
4268
4269         state = (struct slab_depot_state_2_0) {
4270                 .slab_config = depot->slab_config,
4271                 .first_block = depot->first_block,
4272                 .last_block = depot->last_block,
4273                 .zone_count = zones_to_record,
4274         };
4275
4276         return state;
4277 }
4278
4279 /**
4280  * vdo_allocate_reference_counters() - Allocate the reference counters for all slabs in the depot.
4281  *
4282  * Context: This method may be called only before entering normal operation from the load thread.
4283  *
4284  * Return: VDO_SUCCESS or an error.
4285  */
4286 int vdo_allocate_reference_counters(struct slab_depot *depot)
4287 {
4288         struct slab_iterator iterator =
4289                 get_depot_slab_iterator(depot, depot->slab_count - 1, 0, 1);
4290
4291         while (iterator.next != NULL) {
4292                 int result = allocate_slab_counters(next_slab(&iterator));
4293
4294                 if (result != VDO_SUCCESS)
4295                         return result;
4296         }
4297
4298         return VDO_SUCCESS;
4299 }
4300
4301 /**
4302  * get_slab_number() - Get the number of the slab that contains a specified block.
4303  * @depot: The slab depot.
4304  * @pbn: The physical block number.
4305  * @slab_number_ptr: A pointer to hold the slab number.
4306  *
4307  * Return: VDO_SUCCESS or an error.
4308  */
4309 static int __must_check get_slab_number(const struct slab_depot *depot,
4310                                         physical_block_number_t pbn,
4311                                         slab_count_t *slab_number_ptr)
4312 {
4313         slab_count_t slab_number;
4314
4315         if (pbn < depot->first_block)
4316                 return VDO_OUT_OF_RANGE;
4317
4318         slab_number = (pbn - depot->first_block) >> depot->slab_size_shift;
4319         if (slab_number >= depot->slab_count)
4320                 return VDO_OUT_OF_RANGE;
4321
4322         *slab_number_ptr = slab_number;
4323         return VDO_SUCCESS;
4324 }
4325
4326 /**
4327  * vdo_get_slab() - Get the slab object for the slab that contains a specified block.
4328  * @depot: The slab depot.
4329  * @pbn: The physical block number.
4330  *
4331  * Will put the VDO in read-only mode if the PBN is not a valid data block nor the zero block.
4332  *
4333  * Return: The slab containing the block, or NULL if the block number is the zero block or
4334  * otherwise out of range.
4335  */
4336 struct vdo_slab *vdo_get_slab(const struct slab_depot *depot,
4337                               physical_block_number_t pbn)
4338 {
4339         slab_count_t slab_number;
4340         int result;
4341
4342         if (pbn == VDO_ZERO_BLOCK)
4343                 return NULL;
4344
4345         result = get_slab_number(depot, pbn, &slab_number);
4346         if (result != VDO_SUCCESS) {
4347                 vdo_enter_read_only_mode(depot->vdo, result);
4348                 return NULL;
4349         }
4350
4351         return depot->slabs[slab_number];
4352 }
4353
4354 /**
4355  * vdo_get_increment_limit() - Determine how many new references a block can acquire.
4356  * @depot: The slab depot.
4357  * @pbn: The physical block number that is being queried.
4358  *
4359  * Context: This method must be called from the physical zone thread of the PBN.
4360  *
4361  * Return: The number of available references.
4362  */
4363 u8 vdo_get_increment_limit(struct slab_depot *depot, physical_block_number_t pbn)
4364 {
4365         struct vdo_slab *slab = vdo_get_slab(depot, pbn);
4366         vdo_refcount_t *counter_ptr = NULL;
4367         int result;
4368
4369         if ((slab == NULL) || (slab->status != VDO_SLAB_REBUILT))
4370                 return 0;
4371
4372         result = get_reference_counter(slab, pbn, &counter_ptr);
4373         if (result != VDO_SUCCESS)
4374                 return 0;
4375
4376         if (*counter_ptr == PROVISIONAL_REFERENCE_COUNT)
4377                 return (MAXIMUM_REFERENCE_COUNT - 1);
4378
4379         return (MAXIMUM_REFERENCE_COUNT - *counter_ptr);
4380 }
4381
4382 /**
4383  * vdo_is_physical_data_block() - Determine whether the given PBN refers to a data block.
4384  * @depot: The depot.
4385  * @pbn: The physical block number to ask about.
4386  *
4387  * Return: True if the PBN corresponds to a data block.
4388  */
4389 bool vdo_is_physical_data_block(const struct slab_depot *depot,
4390                                 physical_block_number_t pbn)
4391 {
4392         slab_count_t slab_number;
4393         slab_block_number sbn;
4394
4395         return ((pbn == VDO_ZERO_BLOCK) ||
4396                 ((get_slab_number(depot, pbn, &slab_number) == VDO_SUCCESS) &&
4397                  (slab_block_number_from_pbn(depot->slabs[slab_number], pbn, &sbn) ==
4398                   VDO_SUCCESS)));
4399 }
4400
4401 /**
4402  * vdo_get_slab_depot_allocated_blocks() - Get the total number of data blocks allocated across all
4403  * the slabs in the depot.
4404  * @depot: The slab depot.
4405  *
4406  * This is the total number of blocks with a non-zero reference count.
4407  *
4408  * Context: This may be called from any thread.
4409  *
4410  * Return: The total number of blocks with a non-zero reference count.
4411  */
4412 block_count_t vdo_get_slab_depot_allocated_blocks(const struct slab_depot *depot)
4413 {
4414         block_count_t total = 0;
4415         zone_count_t zone;
4416
4417         for (zone = 0; zone < depot->zone_count; zone++) {
4418                 /* The allocators are responsible for thread safety. */
4419                 total += READ_ONCE(depot->allocators[zone].allocated_blocks);
4420         }
4421
4422         return total;
4423 }
4424
4425 /**
4426  * vdo_get_slab_depot_data_blocks() - Get the total number of data blocks in all the slabs in the
4427  *                                    depot.
4428  * @depot: The slab depot.
4429  *
4430  * Context: This may be called from any thread.
4431  *
4432  * Return: The total number of data blocks in all slabs.
4433  */
4434 block_count_t vdo_get_slab_depot_data_blocks(const struct slab_depot *depot)
4435 {
4436         return (READ_ONCE(depot->slab_count) * depot->slab_config.data_blocks);
4437 }
4438
4439 /**
4440  * finish_combining_zones() - Clean up after saving out the combined slab summary.
4441  * @completion: The vio which was used to write the summary data.
4442  */
4443 static void finish_combining_zones(struct vdo_completion *completion)
4444 {
4445         int result = completion->result;
4446         struct vdo_completion *parent = completion->parent;
4447
4448         free_vio(as_vio(uds_forget(completion)));
4449         vdo_fail_completion(parent, result);
4450 }
4451
4452 static void handle_combining_error(struct vdo_completion *completion)
4453 {
4454         vio_record_metadata_io_error(as_vio(completion));
4455         finish_combining_zones(completion);
4456 }
4457
4458 static void write_summary_endio(struct bio *bio)
4459 {
4460         struct vio *vio = bio->bi_private;
4461         struct vdo *vdo = vio->completion.vdo;
4462
4463         continue_vio_after_io(vio, finish_combining_zones,
4464                               vdo->thread_config.admin_thread);
4465 }
4466
4467 /**
4468  * combine_summaries() - Treating the current entries buffer as the on-disk value of all zones,
4469  *                       update every zone to the correct values for every slab.
4470  * @depot: The depot whose summary entries should be combined.
4471  */
4472 static void combine_summaries(struct slab_depot *depot)
4473 {
4474         /*
4475          * Combine all the old summary data into the portion of the buffer corresponding to the
4476          * first zone.
4477          */
4478         zone_count_t zone = 0;
4479         struct slab_summary_entry *entries = depot->summary_entries;
4480
4481         if (depot->old_zone_count > 1) {
4482                 slab_count_t entry_number;
4483
4484                 for (entry_number = 0; entry_number < MAX_VDO_SLABS; entry_number++) {
4485                         if (zone != 0) {
4486                                 memcpy(entries + entry_number,
4487                                        entries + (zone * MAX_VDO_SLABS) + entry_number,
4488                                        sizeof(struct slab_summary_entry));
4489                         }
4490
4491                         zone++;
4492                         if (zone == depot->old_zone_count)
4493                                 zone = 0;
4494                 }
4495         }
4496
4497         /* Copy the combined data to each zones's region of the buffer. */
4498         for (zone = 1; zone < MAX_VDO_PHYSICAL_ZONES; zone++) {
4499                 memcpy(entries + (zone * MAX_VDO_SLABS), entries,
4500                        MAX_VDO_SLABS * sizeof(struct slab_summary_entry));
4501         }
4502 }
4503
4504 /**
4505  * finish_loading_summary() - Finish loading slab summary data.
4506  * @completion: The vio which was used to read the summary data.
4507  *
4508  * Combines the slab summary data from all the previously written zones and copies the combined
4509  * summary to each partition's data region. Then writes the combined summary back out to disk. This
4510  * callback is registered in load_summary_endio().
4511  */
4512 static void finish_loading_summary(struct vdo_completion *completion)
4513 {
4514         struct slab_depot *depot = completion->vdo->depot;
4515
4516         /* Combine the summary from each zone so each zone is correct for all slabs. */
4517         combine_summaries(depot);
4518
4519         /* Write the combined summary back out. */
4520         vdo_submit_metadata_vio(as_vio(completion), depot->summary_origin,
4521                                 write_summary_endio, handle_combining_error,
4522                                 REQ_OP_WRITE);
4523 }
4524
4525 static void load_summary_endio(struct bio *bio)
4526 {
4527         struct vio *vio = bio->bi_private;
4528         struct vdo *vdo = vio->completion.vdo;
4529
4530         continue_vio_after_io(vio, finish_loading_summary,
4531                               vdo->thread_config.admin_thread);
4532 }
4533
4534 /**
4535  * load_slab_summary() - The preamble of a load operation.
4536  *
4537  * Implements vdo_action_preamble_fn.
4538  */
4539 static void load_slab_summary(void *context, struct vdo_completion *parent)
4540 {
4541         int result;
4542         struct vio *vio;
4543         struct slab_depot *depot = context;
4544         const struct admin_state_code *operation =
4545                 vdo_get_current_manager_operation(depot->action_manager);
4546
4547         result = create_multi_block_metadata_vio(depot->vdo, VIO_TYPE_SLAB_SUMMARY,
4548                                                  VIO_PRIORITY_METADATA, parent,
4549                                                  VDO_SLAB_SUMMARY_BLOCKS,
4550                                                  (char *) depot->summary_entries, &vio);
4551         if (result != VDO_SUCCESS) {
4552                 vdo_fail_completion(parent, result);
4553                 return;
4554         }
4555
4556         if ((operation == VDO_ADMIN_STATE_FORMATTING) ||
4557             (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD)) {
4558                 finish_loading_summary(&vio->completion);
4559                 return;
4560         }
4561
4562         vdo_submit_metadata_vio(vio, depot->summary_origin, load_summary_endio,
4563                                 handle_combining_error, REQ_OP_READ);
4564 }
4565
4566 /* Implements vdo_zone_action_fn. */
4567 static void load_allocator(void *context, zone_count_t zone_number,
4568                            struct vdo_completion *parent)
4569 {
4570         struct slab_depot *depot = context;
4571
4572         vdo_start_loading(&depot->allocators[zone_number].state,
4573                           vdo_get_current_manager_operation(depot->action_manager),
4574                           parent, initiate_load);
4575 }
4576
4577 /**
4578  * vdo_load_slab_depot() - Asynchronously load any slab depot state that isn't included in the
4579  *                         super_block component.
4580  * @depot: The depot to load.
4581  * @operation: The type of load to perform.
4582  * @parent: The completion to notify when the load is complete.
4583  * @context: Additional context for the load operation; may be NULL.
4584  *
4585  * This method may be called only before entering normal operation from the load thread.
4586  */
4587 void vdo_load_slab_depot(struct slab_depot *depot,
4588                          const struct admin_state_code *operation,
4589                          struct vdo_completion *parent, void *context)
4590 {
4591         if (!vdo_assert_load_operation(operation, parent))
4592                 return;
4593
4594         vdo_schedule_operation_with_context(depot->action_manager, operation,
4595                                             load_slab_summary, load_allocator,
4596                                             NULL, context, parent);
4597 }
4598
4599 /* Implements vdo_zone_action_fn. */
4600 static void prepare_to_allocate(void *context, zone_count_t zone_number,
4601                                 struct vdo_completion *parent)
4602 {
4603         struct slab_depot *depot = context;
4604         struct block_allocator *allocator = &depot->allocators[zone_number];
4605         int result;
4606
4607         result = vdo_prepare_slabs_for_allocation(allocator);
4608         if (result != VDO_SUCCESS) {
4609                 vdo_fail_completion(parent, result);
4610                 return;
4611         }
4612
4613         scrub_slabs(allocator, parent);
4614 }
4615
4616 /**
4617  * vdo_prepare_slab_depot_to_allocate() - Prepare the slab depot to come online and start
4618  *                                        allocating blocks.
4619  * @depot: The depot to prepare.
4620  * @load_type: The load type.
4621  * @parent: The completion to notify when the operation is complete.
4622  *
4623  * This method may be called only before entering normal operation from the load thread. It must be
4624  * called before allocation may proceed.
4625  */
4626 void vdo_prepare_slab_depot_to_allocate(struct slab_depot *depot,
4627                                         enum slab_depot_load_type load_type,
4628                                         struct vdo_completion *parent)
4629 {
4630         depot->load_type = load_type;
4631         atomic_set(&depot->zones_to_scrub, depot->zone_count);
4632         vdo_schedule_action(depot->action_manager, NULL,
4633                             prepare_to_allocate, NULL, parent);
4634 }
4635
4636 /**
4637  * vdo_update_slab_depot_size() - Update the slab depot to reflect its new size in memory.
4638  * @depot: The depot to update.
4639  *
4640  * This size is saved to disk as part of the super block.
4641  */
4642 void vdo_update_slab_depot_size(struct slab_depot *depot)
4643 {
4644         depot->last_block = depot->new_last_block;
4645 }
4646
4647 /**
4648  * vdo_prepare_to_grow_slab_depot() - Allocate new memory needed for a resize of a slab depot to
4649  *                                    the given size.
4650  * @depot: The depot to prepare to resize.
4651  * @partition: The new depot partition
4652  *
4653  * Return: VDO_SUCCESS or an error.
4654  */
4655 int vdo_prepare_to_grow_slab_depot(struct slab_depot *depot,
4656                                    const struct partition *partition)
4657 {
4658         struct slab_depot_state_2_0 new_state;
4659         int result;
4660         slab_count_t new_slab_count;
4661
4662         if ((partition->count >> depot->slab_size_shift) <= depot->slab_count)
4663                 return VDO_INCREMENT_TOO_SMALL;
4664
4665         /* Generate the depot configuration for the new block count. */
4666         ASSERT_LOG_ONLY(depot->first_block == partition->offset,
4667                         "New slab depot partition doesn't change origin");
4668         result = vdo_configure_slab_depot(partition, depot->slab_config,
4669                                           depot->zone_count, &new_state);
4670         if (result != VDO_SUCCESS)
4671                 return result;
4672
4673         new_slab_count = vdo_compute_slab_count(depot->first_block,
4674                                                 new_state.last_block,
4675                                                 depot->slab_size_shift);
4676         if (new_slab_count <= depot->slab_count)
4677                 return uds_log_error_strerror(VDO_INCREMENT_TOO_SMALL,
4678                                               "Depot can only grow");
4679         if (new_slab_count == depot->new_slab_count) {
4680                 /* Check it out, we've already got all the new slabs allocated! */
4681                 return VDO_SUCCESS;
4682         }
4683
4684         vdo_abandon_new_slabs(depot);
4685         result = allocate_slabs(depot, new_slab_count);
4686         if (result != VDO_SUCCESS) {
4687                 vdo_abandon_new_slabs(depot);
4688                 return result;
4689         }
4690
4691         depot->new_size = partition->count;
4692         depot->old_last_block = depot->last_block;
4693         depot->new_last_block = new_state.last_block;
4694
4695         return VDO_SUCCESS;
4696 }
4697
4698 /**
4699  * finish_registration() - Finish registering new slabs now that all of the allocators have
4700  *                         received their new slabs.
4701  *
4702  * Implements vdo_action_conclusion_fn.
4703  */
4704 static int finish_registration(void *context)
4705 {
4706         struct slab_depot *depot = context;
4707
4708         WRITE_ONCE(depot->slab_count, depot->new_slab_count);
4709         uds_free(depot->slabs);
4710         depot->slabs = depot->new_slabs;
4711         depot->new_slabs = NULL;
4712         depot->new_slab_count = 0;
4713         return VDO_SUCCESS;
4714 }
4715
4716 /* Implements vdo_zone_action_fn. */
4717 static void register_new_slabs(void *context, zone_count_t zone_number,
4718                                struct vdo_completion *parent)
4719 {
4720         struct slab_depot *depot = context;
4721         struct block_allocator *allocator = &depot->allocators[zone_number];
4722         slab_count_t i;
4723
4724         for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4725                 struct vdo_slab *slab = depot->new_slabs[i];
4726
4727                 if (slab->allocator == allocator)
4728                         register_slab_with_allocator(allocator, slab);
4729         }
4730
4731         vdo_finish_completion(parent);
4732 }
4733
4734 /**
4735  * vdo_use_new_slabs() - Use the new slabs allocated for resize.
4736  * @depot: The depot.
4737  * @parent: The object to notify when complete.
4738  */
4739 void vdo_use_new_slabs(struct slab_depot *depot, struct vdo_completion *parent)
4740 {
4741         ASSERT_LOG_ONLY(depot->new_slabs != NULL, "Must have new slabs to use");
4742         vdo_schedule_operation(depot->action_manager,
4743                                VDO_ADMIN_STATE_SUSPENDED_OPERATION,
4744                                NULL, register_new_slabs,
4745                                finish_registration, parent);
4746 }
4747
4748 /**
4749  * stop_scrubbing() - Tell the scrubber to stop scrubbing after it finishes the slab it is
4750  *                    currently working on.
4751  * @scrubber: The scrubber to stop.
4752  * @parent: The completion to notify when scrubbing has stopped.
4753  */
4754 static void stop_scrubbing(struct block_allocator *allocator)
4755 {
4756         struct slab_scrubber *scrubber = &allocator->scrubber;
4757
4758         if (vdo_is_state_quiescent(&scrubber->admin_state)) {
4759                 vdo_finish_completion(&allocator->completion);
4760         } else {
4761                 vdo_start_draining(&scrubber->admin_state,
4762                                    VDO_ADMIN_STATE_SUSPENDING,
4763                                    &allocator->completion, NULL);
4764         }
4765 }
4766
4767 /* Implements vdo_admin_initiator_fn. */
4768 static void initiate_summary_drain(struct admin_state *state)
4769 {
4770         check_summary_drain_complete(container_of(state, struct block_allocator,
4771                                                   summary_state));
4772 }
4773
4774 static void do_drain_step(struct vdo_completion *completion)
4775 {
4776         struct block_allocator *allocator = vdo_as_block_allocator(completion);
4777
4778         vdo_prepare_completion_for_requeue(&allocator->completion, do_drain_step,
4779                                            handle_operation_error, allocator->thread_id,
4780                                            NULL);
4781         switch (++allocator->drain_step) {
4782         case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4783                 stop_scrubbing(allocator);
4784                 return;
4785
4786         case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4787                 apply_to_slabs(allocator, do_drain_step);
4788                 return;
4789
4790         case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4791                 vdo_start_draining(&allocator->summary_state,
4792                                    vdo_get_admin_state_code(&allocator->state),
4793                                    completion, initiate_summary_drain);
4794                 return;
4795
4796         case VDO_DRAIN_ALLOCATOR_STEP_FINISHED:
4797                 ASSERT_LOG_ONLY(!is_vio_pool_busy(allocator->vio_pool),
4798                                 "vio pool not busy");
4799                 vdo_finish_draining_with_result(&allocator->state, completion->result);
4800                 return;
4801
4802         default:
4803                 vdo_finish_draining_with_result(&allocator->state, UDS_BAD_STATE);
4804         }
4805 }
4806
4807 /* Implements vdo_admin_initiator_fn. */
4808 static void initiate_drain(struct admin_state *state)
4809 {
4810         struct block_allocator *allocator =
4811                 container_of(state, struct block_allocator, state);
4812
4813         allocator->drain_step = VDO_DRAIN_ALLOCATOR_START;
4814         do_drain_step(&allocator->completion);
4815 }
4816
4817 /*
4818  * Drain all allocator I/O. Depending upon the type of drain, some or all dirty metadata may be
4819  * written to disk. The type of drain will be determined from the state of the allocator's depot.
4820  *
4821  * Implements vdo_zone_action_fn.
4822  */
4823 static void drain_allocator(void *context, zone_count_t zone_number,
4824                             struct vdo_completion *parent)
4825 {
4826         struct slab_depot *depot = context;
4827
4828         vdo_start_draining(&depot->allocators[zone_number].state,
4829                            vdo_get_current_manager_operation(depot->action_manager),
4830                            parent, initiate_drain);
4831 }
4832
4833 /**
4834  * vdo_drain_slab_depot() - Drain all slab depot I/O.
4835  * @depot: The depot to drain.
4836  * @operation: The drain operation (flush, rebuild, suspend, or save).
4837  * @parent: The completion to finish when the drain is complete.
4838  *
4839  * If saving, or flushing, all dirty depot metadata will be written out. If saving or suspending,
4840  * the depot will be left in a suspended state.
4841  */
4842 void vdo_drain_slab_depot(struct slab_depot *depot,
4843                           const struct admin_state_code *operation,
4844                           struct vdo_completion *parent)
4845 {
4846         vdo_schedule_operation(depot->action_manager, operation,
4847                                NULL, drain_allocator, NULL, parent);
4848 }
4849
4850 /**
4851  * resume_scrubbing() - Tell the scrubber to resume scrubbing if it has been stopped.
4852  * @allocator: The allocator being resumed.
4853  */
4854 static void resume_scrubbing(struct block_allocator *allocator)
4855 {
4856         int result;
4857         struct slab_scrubber *scrubber = &allocator->scrubber;
4858
4859         if (!has_slabs_to_scrub(scrubber)) {
4860                 vdo_finish_completion(&allocator->completion);
4861                 return;
4862         }
4863
4864         result = vdo_resume_if_quiescent(&scrubber->admin_state);
4865         if (result != VDO_SUCCESS) {
4866                 vdo_fail_completion(&allocator->completion, result);
4867                 return;
4868         }
4869
4870         scrub_next_slab(scrubber);
4871         vdo_finish_completion(&allocator->completion);
4872 }
4873
4874 static void do_resume_step(struct vdo_completion *completion)
4875 {
4876         struct block_allocator *allocator = vdo_as_block_allocator(completion);
4877
4878         vdo_prepare_completion_for_requeue(&allocator->completion, do_resume_step,
4879                                            handle_operation_error,
4880                                            allocator->thread_id, NULL);
4881         switch (--allocator->drain_step) {
4882         case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4883                 vdo_fail_completion(completion,
4884                                     vdo_resume_if_quiescent(&allocator->summary_state));
4885                 return;
4886
4887         case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4888                 apply_to_slabs(allocator, do_resume_step);
4889                 return;
4890
4891         case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4892                 resume_scrubbing(allocator);
4893                 return;
4894
4895         case VDO_DRAIN_ALLOCATOR_START:
4896                 vdo_finish_resuming_with_result(&allocator->state, completion->result);
4897                 return;
4898
4899         default:
4900                 vdo_finish_resuming_with_result(&allocator->state, UDS_BAD_STATE);
4901         }
4902 }
4903
4904 /* Implements vdo_admin_initiator_fn. */
4905 static void initiate_resume(struct admin_state *state)
4906 {
4907         struct block_allocator *allocator =
4908                 container_of(state, struct block_allocator, state);
4909
4910         allocator->drain_step = VDO_DRAIN_ALLOCATOR_STEP_FINISHED;
4911         do_resume_step(&allocator->completion);
4912 }
4913
4914 /* Implements vdo_zone_action_fn. */
4915 static void resume_allocator(void *context, zone_count_t zone_number,
4916                              struct vdo_completion *parent)
4917 {
4918         struct slab_depot *depot = context;
4919
4920         vdo_start_resuming(&depot->allocators[zone_number].state,
4921                            vdo_get_current_manager_operation(depot->action_manager),
4922                            parent, initiate_resume);
4923 }
4924
4925 /**
4926  * vdo_resume_slab_depot() - Resume a suspended slab depot.
4927  * @depot: The depot to resume.
4928  * @parent: The completion to finish when the depot has resumed.
4929  */
4930 void vdo_resume_slab_depot(struct slab_depot *depot, struct vdo_completion *parent)
4931 {
4932         if (vdo_is_read_only(depot->vdo)) {
4933                 vdo_continue_completion(parent, VDO_READ_ONLY);
4934                 return;
4935         }
4936
4937         vdo_schedule_operation(depot->action_manager, VDO_ADMIN_STATE_RESUMING,
4938                                NULL, resume_allocator, NULL, parent);
4939 }
4940
4941 /**
4942  * vdo_commit_oldest_slab_journal_tail_blocks() - Commit all dirty tail blocks which are locking a
4943  *                                                given recovery journal block.
4944  * @depot: The depot.
4945  * @recovery_block_number: The sequence number of the recovery journal block whose locks should be
4946  *                         released.
4947  *
4948  * Context: This method must be called from the journal zone thread.
4949  */
4950 void vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot *depot,
4951                                                 sequence_number_t recovery_block_number)
4952 {
4953         if (depot == NULL)
4954                 return;
4955
4956         depot->new_release_request = recovery_block_number;
4957         vdo_schedule_default_action(depot->action_manager);
4958 }
4959
4960 /* Implements vdo_zone_action_fn. */
4961 static void scrub_all_unrecovered_slabs(void *context, zone_count_t zone_number,
4962                                         struct vdo_completion *parent)
4963 {
4964         struct slab_depot *depot = context;
4965
4966         scrub_slabs(&depot->allocators[zone_number], NULL);
4967         vdo_launch_completion(parent);
4968 }
4969
4970 /**
4971  * vdo_scrub_all_unrecovered_slabs() - Scrub all unrecovered slabs.
4972  * @depot: The depot to scrub.
4973  * @parent: The object to notify when scrubbing has been launched for all zones.
4974  */
4975 void vdo_scrub_all_unrecovered_slabs(struct slab_depot *depot,
4976                                      struct vdo_completion *parent)
4977 {
4978         vdo_schedule_action(depot->action_manager, NULL,
4979                             scrub_all_unrecovered_slabs,
4980                             NULL, parent);
4981 }
4982
4983 /**
4984  * get_block_allocator_statistics() - Get the total of the statistics from all the block allocators
4985  *                                    in the depot.
4986  * @depot: The slab depot.
4987  *
4988  * Return: The statistics from all block allocators in the depot.
4989  */
4990 static struct block_allocator_statistics __must_check
4991 get_block_allocator_statistics(const struct slab_depot *depot)
4992 {
4993         struct block_allocator_statistics totals;
4994         zone_count_t zone;
4995
4996         memset(&totals, 0, sizeof(totals));
4997
4998         for (zone = 0; zone < depot->zone_count; zone++) {
4999                 const struct block_allocator *allocator = &depot->allocators[zone];
5000                 const struct block_allocator_statistics *stats = &allocator->statistics;
5001
5002                 totals.slab_count += allocator->slab_count;
5003                 totals.slabs_opened += READ_ONCE(stats->slabs_opened);
5004                 totals.slabs_reopened += READ_ONCE(stats->slabs_reopened);
5005         }
5006
5007         return totals;
5008 }
5009
5010 /**
5011  * get_ref_counts_statistics() - Get the cumulative ref_counts statistics for the depot.
5012  * @depot: The slab depot.
5013  *
5014  * Return: The cumulative statistics for all ref_counts in the depot.
5015  */
5016 static struct ref_counts_statistics __must_check
5017 get_ref_counts_statistics(const struct slab_depot *depot)
5018 {
5019         struct ref_counts_statistics totals;
5020         zone_count_t zone;
5021
5022         memset(&totals, 0, sizeof(totals));
5023
5024         for (zone = 0; zone < depot->zone_count; zone++) {
5025                 totals.blocks_written +=
5026                         READ_ONCE(depot->allocators[zone].ref_counts_statistics.blocks_written);
5027         }
5028
5029         return totals;
5030 }
5031
5032 /**
5033  * get_depot_slab_journal_statistics() - Get the aggregated slab journal statistics for the depot.
5034  * @depot: The slab depot.
5035  *
5036  * Return: The aggregated statistics for all slab journals in the depot.
5037  */
5038 static struct slab_journal_statistics __must_check
5039 get_slab_journal_statistics(const struct slab_depot *depot)
5040 {
5041         struct slab_journal_statistics totals;
5042         zone_count_t zone;
5043
5044         memset(&totals, 0, sizeof(totals));
5045
5046         for (zone = 0; zone < depot->zone_count; zone++) {
5047                 const struct slab_journal_statistics *stats =
5048                         &depot->allocators[zone].slab_journal_statistics;
5049
5050                 totals.disk_full_count += READ_ONCE(stats->disk_full_count);
5051                 totals.flush_count += READ_ONCE(stats->flush_count);
5052                 totals.blocked_count += READ_ONCE(stats->blocked_count);
5053                 totals.blocks_written += READ_ONCE(stats->blocks_written);
5054                 totals.tail_busy_count += READ_ONCE(stats->tail_busy_count);
5055         }
5056
5057         return totals;
5058 }
5059
5060 /**
5061  * vdo_get_slab_depot_statistics() - Get all the vdo_statistics fields that are properties of the
5062  *                                   slab depot.
5063  * @depot: The slab depot.
5064  * @stats: The vdo statistics structure to partially fill.
5065  */
5066 void vdo_get_slab_depot_statistics(const struct slab_depot *depot,
5067                                    struct vdo_statistics *stats)
5068 {
5069         slab_count_t slab_count = READ_ONCE(depot->slab_count);
5070         slab_count_t unrecovered = 0;
5071         zone_count_t zone;
5072
5073         for (zone = 0; zone < depot->zone_count; zone++) {
5074                 /* The allocators are responsible for thread safety. */
5075                 unrecovered += READ_ONCE(depot->allocators[zone].scrubber.slab_count);
5076         }
5077
5078         stats->recovery_percentage = (slab_count - unrecovered) * 100 / slab_count;
5079         stats->allocator = get_block_allocator_statistics(depot);
5080         stats->ref_counts = get_ref_counts_statistics(depot);
5081         stats->slab_journal = get_slab_journal_statistics(depot);
5082         stats->slab_summary = (struct slab_summary_statistics) {
5083                 .blocks_written = atomic64_read(&depot->summary_statistics.blocks_written),
5084         };
5085 }
5086
5087 /**
5088  * vdo_dump_slab_depot() - Dump the slab depot, in a thread-unsafe fashion.
5089  * @depot: The slab depot.
5090  */
5091 void vdo_dump_slab_depot(const struct slab_depot *depot)
5092 {
5093         uds_log_info("vdo slab depot");
5094         uds_log_info("  zone_count=%u old_zone_count=%u slabCount=%u active_release_request=%llu new_release_request=%llu",
5095                      (unsigned int) depot->zone_count,
5096                      (unsigned int) depot->old_zone_count, READ_ONCE(depot->slab_count),
5097                      (unsigned long long) depot->active_release_request,
5098                      (unsigned long long) depot->new_release_request);
5099 }