1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
6 #include "slab-depot.h"
8 #include <linux/atomic.h>
10 #include <linux/err.h>
11 #include <linux/log2.h>
12 #include <linux/min_heap.h>
13 #include <linux/minmax.h>
16 #include "memory-alloc.h"
18 #include "permassert.h"
19 #include "string-utils.h"
21 #include "action-manager.h"
22 #include "admin-state.h"
23 #include "completion.h"
24 #include "constants.h"
26 #include "encodings.h"
27 #include "io-submitter.h"
28 #include "physical-zone.h"
29 #include "priority-table.h"
30 #include "recovery-journal.h"
32 #include "status-codes.h"
36 #include "wait-queue.h"
38 static const u64 BYTES_PER_WORD = sizeof(u64);
39 static const bool NORMAL_OPERATION = true;
42 * get_lock() - Get the lock object for a slab journal block by sequence number.
43 * @journal: vdo_slab journal to retrieve from.
44 * @sequence_number: Sequence number of the block.
46 * Return: The lock object for the given sequence number.
48 static inline struct journal_lock * __must_check get_lock(struct slab_journal *journal,
49 sequence_number_t sequence_number)
51 return &journal->locks[sequence_number % journal->size];
54 static bool is_slab_open(struct vdo_slab *slab)
56 return (!vdo_is_state_quiescing(&slab->state) &&
57 !vdo_is_state_quiescent(&slab->state));
61 * must_make_entries_to_flush() - Check whether there are entry waiters which should delay a flush.
62 * @journal: The journal to check.
64 * Return: true if there are no entry waiters, or if the slab is unrecovered.
66 static inline bool __must_check must_make_entries_to_flush(struct slab_journal *journal)
68 return ((journal->slab->status != VDO_SLAB_REBUILDING) &&
69 vdo_waitq_has_waiters(&journal->entry_waiters));
73 * is_reaping() - Check whether a reap is currently in progress.
74 * @journal: The journal which may be reaping.
76 * Return: true if the journal is reaping.
78 static inline bool __must_check is_reaping(struct slab_journal *journal)
80 return (journal->head != journal->unreapable);
84 * initialize_tail_block() - Initialize tail block as a new block.
85 * @journal: The journal whose tail block is being initialized.
87 static void initialize_tail_block(struct slab_journal *journal)
89 struct slab_journal_block_header *header = &journal->tail_header;
91 header->sequence_number = journal->tail;
92 header->entry_count = 0;
93 header->has_block_map_increments = false;
97 * initialize_journal_state() - Set all journal fields appropriately to start journaling.
98 * @journal: The journal to be reset, based on its tail sequence number.
100 static void initialize_journal_state(struct slab_journal *journal)
102 journal->unreapable = journal->head;
103 journal->reap_lock = get_lock(journal, journal->unreapable);
104 journal->next_commit = journal->tail;
105 journal->summarized = journal->last_summarized = journal->tail;
106 initialize_tail_block(journal);
110 * block_is_full() - Check whether a journal block is full.
111 * @journal: The slab journal for the block.
113 * Return: true if the tail block is full.
115 static bool __must_check block_is_full(struct slab_journal *journal)
117 journal_entry_count_t count = journal->tail_header.entry_count;
119 return (journal->tail_header.has_block_map_increments ?
120 (journal->full_entries_per_block == count) :
121 (journal->entries_per_block == count));
124 static void add_entries(struct slab_journal *journal);
125 static void update_tail_block_location(struct slab_journal *journal);
126 static void release_journal_locks(struct vdo_waiter *waiter, void *context);
129 * is_slab_journal_blank() - Check whether a slab's journal is blank.
131 * A slab journal is blank if it has never had any entries recorded in it.
133 * Return: true if the slab's journal has never been modified.
135 static bool is_slab_journal_blank(const struct vdo_slab *slab)
137 return ((slab->journal.tail == 1) &&
138 (slab->journal.tail_header.entry_count == 0));
142 * mark_slab_journal_dirty() - Put a slab journal on the dirty ring of its allocator in the correct
144 * @journal: The journal to be marked dirty.
145 * @lock: The recovery journal lock held by the slab journal.
147 static void mark_slab_journal_dirty(struct slab_journal *journal, sequence_number_t lock)
149 struct slab_journal *dirty_journal;
150 struct list_head *dirty_list = &journal->slab->allocator->dirty_slab_journals;
152 ASSERT_LOG_ONLY(journal->recovery_lock == 0, "slab journal was clean");
154 journal->recovery_lock = lock;
155 list_for_each_entry_reverse(dirty_journal, dirty_list, dirty_entry) {
156 if (dirty_journal->recovery_lock <= journal->recovery_lock)
160 list_move_tail(&journal->dirty_entry, dirty_journal->dirty_entry.next);
163 static void mark_slab_journal_clean(struct slab_journal *journal)
165 journal->recovery_lock = 0;
166 list_del_init(&journal->dirty_entry);
169 static void check_if_slab_drained(struct vdo_slab *slab)
172 struct slab_journal *journal = &slab->journal;
173 const struct admin_state_code *code;
175 if (!vdo_is_state_draining(&slab->state) ||
176 must_make_entries_to_flush(journal) ||
177 is_reaping(journal) ||
178 journal->waiting_to_commit ||
179 !list_empty(&journal->uncommitted_blocks) ||
180 journal->updating_slab_summary ||
181 (slab->active_count > 0))
184 /* When not suspending or recovering, the slab must be clean. */
185 code = vdo_get_admin_state_code(&slab->state);
186 read_only = vdo_is_read_only(slab->allocator->depot->vdo);
188 vdo_waitq_has_waiters(&slab->dirty_blocks) &&
189 (code != VDO_ADMIN_STATE_SUSPENDING) &&
190 (code != VDO_ADMIN_STATE_RECOVERING))
193 vdo_finish_draining_with_result(&slab->state,
194 (read_only ? VDO_READ_ONLY : VDO_SUCCESS));
197 /* FULLNESS HINT COMPUTATION */
200 * compute_fullness_hint() - Translate a slab's free block count into a 'fullness hint' that can be
201 * stored in a slab_summary_entry's 7 bits that are dedicated to its free
203 * @depot: The depot whose summary being updated.
204 * @free_blocks: The number of free blocks.
206 * Note: the number of free blocks must be strictly less than 2^23 blocks, even though
207 * theoretically slabs could contain precisely 2^23 blocks; there is an assumption that at least
208 * one block is used by metadata. This assumption is necessary; otherwise, the fullness hint might
209 * overflow. The fullness hint formula is roughly (fullness >> 16) & 0x7f, but (2^23 >> 16) & 0x7f
210 * is 0, which would make it impossible to distinguish completely full from completely empty.
212 * Return: A fullness hint, which can be stored in 7 bits.
214 static u8 __must_check compute_fullness_hint(struct slab_depot *depot,
215 block_count_t free_blocks)
219 ASSERT_LOG_ONLY((free_blocks < (1 << 23)), "free blocks must be less than 2^23");
221 if (free_blocks == 0)
224 hint = free_blocks >> depot->hint_shift;
225 return ((hint == 0) ? 1 : hint);
229 * check_summary_drain_complete() - Check whether an allocators summary has finished draining.
231 static void check_summary_drain_complete(struct block_allocator *allocator)
233 if (!vdo_is_state_draining(&allocator->summary_state) ||
234 (allocator->summary_write_count > 0))
237 vdo_finish_operation(&allocator->summary_state,
238 (vdo_is_read_only(allocator->depot->vdo) ?
239 VDO_READ_ONLY : VDO_SUCCESS));
243 * notify_summary_waiters() - Wake all the waiters in a given queue.
244 * @allocator: The block allocator summary which owns the queue.
245 * @queue: The queue to notify.
247 static void notify_summary_waiters(struct block_allocator *allocator,
248 struct vdo_wait_queue *queue)
250 int result = (vdo_is_read_only(allocator->depot->vdo) ?
251 VDO_READ_ONLY : VDO_SUCCESS);
253 vdo_waitq_notify_all_waiters(queue, NULL, &result);
256 static void launch_write(struct slab_summary_block *summary_block);
259 * finish_updating_slab_summary_block() - Finish processing a block which attempted to write,
260 * whether or not the attempt succeeded.
263 static void finish_updating_slab_summary_block(struct slab_summary_block *block)
265 notify_summary_waiters(block->allocator, &block->current_update_waiters);
266 block->writing = false;
267 block->allocator->summary_write_count--;
268 if (vdo_waitq_has_waiters(&block->next_update_waiters))
271 check_summary_drain_complete(block->allocator);
275 * finish_update() - This is the callback for a successful summary block write.
276 * @completion: The write vio.
278 static void finish_update(struct vdo_completion *completion)
280 struct slab_summary_block *block =
281 container_of(as_vio(completion), struct slab_summary_block, vio);
283 atomic64_inc(&block->allocator->depot->summary_statistics.blocks_written);
284 finish_updating_slab_summary_block(block);
288 * handle_write_error() - Handle an error writing a slab summary block.
289 * @completion: The write VIO.
291 static void handle_write_error(struct vdo_completion *completion)
293 struct slab_summary_block *block =
294 container_of(as_vio(completion), struct slab_summary_block, vio);
296 vio_record_metadata_io_error(as_vio(completion));
297 vdo_enter_read_only_mode(completion->vdo, completion->result);
298 finish_updating_slab_summary_block(block);
301 static void write_slab_summary_endio(struct bio *bio)
303 struct vio *vio = bio->bi_private;
304 struct slab_summary_block *block =
305 container_of(vio, struct slab_summary_block, vio);
307 continue_vio_after_io(vio, finish_update, block->allocator->thread_id);
311 * launch_write() - Write a slab summary block unless it is currently out for writing.
312 * @block: The block that needs to be committed.
314 static void launch_write(struct slab_summary_block *block)
316 struct block_allocator *allocator = block->allocator;
317 struct slab_depot *depot = allocator->depot;
318 physical_block_number_t pbn;
323 allocator->summary_write_count++;
324 vdo_waitq_transfer_all_waiters(&block->next_update_waiters,
325 &block->current_update_waiters);
326 block->writing = true;
328 if (vdo_is_read_only(depot->vdo)) {
329 finish_updating_slab_summary_block(block);
333 memcpy(block->outgoing_entries, block->entries, VDO_BLOCK_SIZE);
336 * Flush before writing to ensure that the slab journal tail blocks and reference updates
337 * covered by this summary update are stable (VDO-2332).
339 pbn = (depot->summary_origin +
340 (VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE * allocator->zone_number) +
342 vdo_submit_metadata_vio(&block->vio, pbn, write_slab_summary_endio,
343 handle_write_error, REQ_OP_WRITE | REQ_PREFLUSH);
347 * update_slab_summary_entry() - Update the entry for a slab.
348 * @slab: The slab whose entry is to be updated
349 * @waiter: The waiter that is updating the summary.
350 * @tail_block_offset: The offset of the slab journal's tail block.
351 * @load_ref_counts: Whether the reference counts must be loaded from disk on the vdo load.
352 * @is_clean: Whether the slab is clean.
353 * @free_blocks: The number of free blocks.
355 static void update_slab_summary_entry(struct vdo_slab *slab, struct vdo_waiter *waiter,
356 tail_block_offset_t tail_block_offset,
357 bool load_ref_counts, bool is_clean,
358 block_count_t free_blocks)
360 u8 index = slab->slab_number / VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK;
361 struct block_allocator *allocator = slab->allocator;
362 struct slab_summary_block *block = &allocator->summary_blocks[index];
364 struct slab_summary_entry *entry;
366 if (vdo_is_read_only(block->vio.completion.vdo)) {
367 result = VDO_READ_ONLY;
368 waiter->callback(waiter, &result);
372 if (vdo_is_state_draining(&allocator->summary_state) ||
373 vdo_is_state_quiescent(&allocator->summary_state)) {
374 result = VDO_INVALID_ADMIN_STATE;
375 waiter->callback(waiter, &result);
379 entry = &allocator->summary_entries[slab->slab_number];
380 *entry = (struct slab_summary_entry) {
381 .tail_block_offset = tail_block_offset,
382 .load_ref_counts = (entry->load_ref_counts || load_ref_counts),
383 .is_dirty = !is_clean,
384 .fullness_hint = compute_fullness_hint(allocator->depot, free_blocks),
386 vdo_waitq_enqueue_waiter(&block->next_update_waiters, waiter);
391 * finish_reaping() - Actually advance the head of the journal now that any necessary flushes are
393 * @journal: The journal to be reaped.
395 static void finish_reaping(struct slab_journal *journal)
397 journal->head = journal->unreapable;
398 add_entries(journal);
399 check_if_slab_drained(journal->slab);
402 static void reap_slab_journal(struct slab_journal *journal);
405 * complete_reaping() - Finish reaping now that we have flushed the lower layer and then try
406 * reaping again in case we deferred reaping due to an outstanding vio.
407 * @completion: The flush vio.
409 static void complete_reaping(struct vdo_completion *completion)
411 struct slab_journal *journal = completion->parent;
413 return_vio_to_pool(journal->slab->allocator->vio_pool,
414 vio_as_pooled_vio(as_vio(uds_forget(completion))));
415 finish_reaping(journal);
416 reap_slab_journal(journal);
420 * handle_flush_error() - Handle an error flushing the lower layer.
421 * @completion: The flush vio.
423 static void handle_flush_error(struct vdo_completion *completion)
425 vio_record_metadata_io_error(as_vio(completion));
426 vdo_enter_read_only_mode(completion->vdo, completion->result);
427 complete_reaping(completion);
430 static void flush_endio(struct bio *bio)
432 struct vio *vio = bio->bi_private;
433 struct slab_journal *journal = vio->completion.parent;
435 continue_vio_after_io(vio, complete_reaping,
436 journal->slab->allocator->thread_id);
440 * flush_for_reaping() - A waiter callback for getting a vio with which to flush the lower layer
442 * @waiter: The journal as a flush waiter.
443 * @context: The newly acquired flush vio.
445 static void flush_for_reaping(struct vdo_waiter *waiter, void *context)
447 struct slab_journal *journal =
448 container_of(waiter, struct slab_journal, flush_waiter);
449 struct pooled_vio *pooled = context;
450 struct vio *vio = &pooled->vio;
452 vio->completion.parent = journal;
453 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
457 * reap_slab_journal() - Conduct a reap on a slab journal to reclaim unreferenced blocks.
458 * @journal: The slab journal.
460 static void reap_slab_journal(struct slab_journal *journal)
464 if (is_reaping(journal)) {
465 /* We already have a reap in progress so wait for it to finish. */
469 if ((journal->slab->status != VDO_SLAB_REBUILT) ||
470 !vdo_is_state_normal(&journal->slab->state) ||
471 vdo_is_read_only(journal->slab->allocator->depot->vdo)) {
473 * We must not reap in the first two cases, and there's no point in read-only mode.
479 * Start reclaiming blocks only when the journal head has no references. Then stop when a
480 * block is referenced or reap reaches the most recently written block, referenced by the
481 * slab summary, which has the sequence number just before the tail.
483 while ((journal->unreapable < journal->tail) && (journal->reap_lock->count == 0)) {
485 journal->unreapable++;
486 journal->reap_lock++;
487 if (journal->reap_lock == &journal->locks[journal->size])
488 journal->reap_lock = &journal->locks[0];
495 * It is never safe to reap a slab journal block without first issuing a flush, regardless
496 * of whether a user flush has been received or not. In the absence of the flush, the
497 * reference block write which released the locks allowing the slab journal to reap may not
498 * be persisted. Although slab summary writes will eventually issue flushes, multiple slab
499 * journal block writes can be issued while previous slab summary updates have not yet been
500 * made. Even though those slab journal block writes will be ignored if the slab summary
501 * update is not persisted, they may still overwrite the to-be-reaped slab journal block
502 * resulting in a loss of reference count updates (VDO-2912).
504 journal->flush_waiter.callback = flush_for_reaping;
505 acquire_vio_from_pool(journal->slab->allocator->vio_pool,
506 &journal->flush_waiter);
510 * adjust_slab_journal_block_reference() - Adjust the reference count for a slab journal block.
511 * @journal: The slab journal.
512 * @sequence_number: The journal sequence number of the referenced block.
513 * @adjustment: Amount to adjust the reference counter.
515 * Note that when the adjustment is negative, the slab journal will be reaped.
517 static void adjust_slab_journal_block_reference(struct slab_journal *journal,
518 sequence_number_t sequence_number,
521 struct journal_lock *lock;
523 if (sequence_number == 0)
526 if (journal->slab->status == VDO_SLAB_REPLAYING) {
527 /* Locks should not be used during offline replay. */
531 ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero");
532 lock = get_lock(journal, sequence_number);
533 if (adjustment < 0) {
534 ASSERT_LOG_ONLY((-adjustment <= lock->count),
535 "adjustment %d of lock count %u for slab journal block %llu must not underflow",
536 adjustment, lock->count,
537 (unsigned long long) sequence_number);
540 lock->count += adjustment;
541 if (lock->count == 0)
542 reap_slab_journal(journal);
546 * release_journal_locks() - Callback invoked after a slab summary update completes.
547 * @waiter: The slab summary waiter that has just been notified.
548 * @context: The result code of the update.
550 * Registered in the constructor on behalf of update_tail_block_location().
552 * Implements waiter_callback_fn.
554 static void release_journal_locks(struct vdo_waiter *waiter, void *context)
556 sequence_number_t first, i;
557 struct slab_journal *journal =
558 container_of(waiter, struct slab_journal, slab_summary_waiter);
559 int result = *((int *) context);
561 if (result != VDO_SUCCESS) {
562 if (result != VDO_READ_ONLY) {
564 * Don't bother logging what might be lots of errors if we are already in
567 uds_log_error_strerror(result, "failed slab summary update %llu",
568 (unsigned long long) journal->summarized);
571 journal->updating_slab_summary = false;
572 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
573 check_if_slab_drained(journal->slab);
577 if (journal->partial_write_in_progress && (journal->summarized == journal->tail)) {
578 journal->partial_write_in_progress = false;
579 add_entries(journal);
582 first = journal->last_summarized;
583 journal->last_summarized = journal->summarized;
584 for (i = journal->summarized - 1; i >= first; i--) {
586 * Release the lock the summarized block held on the recovery journal. (During
587 * replay, recovery_start will always be 0.)
589 if (journal->recovery_journal != NULL) {
590 zone_count_t zone_number = journal->slab->allocator->zone_number;
591 struct journal_lock *lock = get_lock(journal, i);
593 vdo_release_recovery_journal_block_reference(journal->recovery_journal,
594 lock->recovery_start,
595 VDO_ZONE_TYPE_PHYSICAL,
600 * Release our own lock against reaping for blocks that are committed. (This
601 * function will not change locks during replay.)
603 adjust_slab_journal_block_reference(journal, i, -1);
606 journal->updating_slab_summary = false;
608 reap_slab_journal(journal);
610 /* Check if the slab summary needs to be updated again. */
611 update_tail_block_location(journal);
615 * update_tail_block_location() - Update the tail block location in the slab summary, if necessary.
616 * @journal: The slab journal that is updating its tail block location.
618 static void update_tail_block_location(struct slab_journal *journal)
620 block_count_t free_block_count;
621 struct vdo_slab *slab = journal->slab;
623 if (journal->updating_slab_summary ||
624 vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
625 (journal->last_summarized >= journal->next_commit)) {
626 check_if_slab_drained(slab);
630 if (slab->status != VDO_SLAB_REBUILT) {
631 u8 hint = slab->allocator->summary_entries[slab->slab_number].fullness_hint;
633 free_block_count = ((block_count_t) hint) << slab->allocator->depot->hint_shift;
635 free_block_count = slab->free_blocks;
638 journal->summarized = journal->next_commit;
639 journal->updating_slab_summary = true;
642 * Update slab summary as dirty.
643 * vdo_slab journal can only reap past sequence number 1 when all the ref counts for this
644 * slab have been written to the layer. Therefore, indicate that the ref counts must be
645 * loaded when the journal head has reaped past sequence number 1.
647 update_slab_summary_entry(slab, &journal->slab_summary_waiter,
648 journal->summarized % journal->size,
649 (journal->head > 1), false, free_block_count);
653 * reopen_slab_journal() - Reopen a slab's journal by emptying it and then adding pending entries.
655 static void reopen_slab_journal(struct vdo_slab *slab)
657 struct slab_journal *journal = &slab->journal;
658 sequence_number_t block;
660 ASSERT_LOG_ONLY(journal->tail_header.entry_count == 0,
661 "vdo_slab journal's active block empty before reopening");
662 journal->head = journal->tail;
663 initialize_journal_state(journal);
665 /* Ensure no locks are spuriously held on an empty journal. */
666 for (block = 1; block <= journal->size; block++) {
667 ASSERT_LOG_ONLY((get_lock(journal, block)->count == 0),
668 "Scrubbed journal's block %llu is not locked",
669 (unsigned long long) block);
672 add_entries(journal);
675 static sequence_number_t get_committing_sequence_number(const struct pooled_vio *vio)
677 const struct packed_slab_journal_block *block =
678 (const struct packed_slab_journal_block *) vio->vio.data;
680 return __le64_to_cpu(block->header.sequence_number);
684 * complete_write() - Handle post-commit processing.
685 * @completion: The write vio as a completion.
687 * This is the callback registered by write_slab_journal_block().
689 static void complete_write(struct vdo_completion *completion)
691 int result = completion->result;
692 struct pooled_vio *pooled = vio_as_pooled_vio(as_vio(completion));
693 struct slab_journal *journal = completion->parent;
694 sequence_number_t committed = get_committing_sequence_number(pooled);
696 list_del_init(&pooled->list_entry);
697 return_vio_to_pool(journal->slab->allocator->vio_pool, uds_forget(pooled));
699 if (result != VDO_SUCCESS) {
700 vio_record_metadata_io_error(as_vio(completion));
701 uds_log_error_strerror(result, "cannot write slab journal block %llu",
702 (unsigned long long) committed);
703 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
704 check_if_slab_drained(journal->slab);
708 WRITE_ONCE(journal->events->blocks_written, journal->events->blocks_written + 1);
710 if (list_empty(&journal->uncommitted_blocks)) {
711 /* If no blocks are outstanding, then the commit point is at the tail. */
712 journal->next_commit = journal->tail;
714 /* The commit point is always the beginning of the oldest incomplete block. */
715 pooled = container_of(journal->uncommitted_blocks.next,
716 struct pooled_vio, list_entry);
717 journal->next_commit = get_committing_sequence_number(pooled);
720 update_tail_block_location(journal);
723 static void write_slab_journal_endio(struct bio *bio)
725 struct vio *vio = bio->bi_private;
726 struct slab_journal *journal = vio->completion.parent;
728 continue_vio_after_io(vio, complete_write, journal->slab->allocator->thread_id);
732 * write_slab_journal_block() - Write a slab journal block.
733 * @waiter: The vio pool waiter which was just notified.
734 * @context: The vio pool entry for the write.
736 * Callback from acquire_vio_from_pool() registered in commit_tail().
738 static void write_slab_journal_block(struct vdo_waiter *waiter, void *context)
740 struct pooled_vio *pooled = context;
741 struct vio *vio = &pooled->vio;
742 struct slab_journal *journal =
743 container_of(waiter, struct slab_journal, resource_waiter);
744 struct slab_journal_block_header *header = &journal->tail_header;
745 int unused_entries = journal->entries_per_block - header->entry_count;
746 physical_block_number_t block_number;
747 const struct admin_state_code *operation;
749 header->head = journal->head;
750 list_add_tail(&pooled->list_entry, &journal->uncommitted_blocks);
751 vdo_pack_slab_journal_block_header(header, &journal->block->header);
753 /* Copy the tail block into the vio. */
754 memcpy(pooled->vio.data, journal->block, VDO_BLOCK_SIZE);
756 ASSERT_LOG_ONLY(unused_entries >= 0, "vdo_slab journal block is not overfull");
757 if (unused_entries > 0) {
759 * Release the per-entry locks for any unused entries in the block we are about to
762 adjust_slab_journal_block_reference(journal, header->sequence_number,
764 journal->partial_write_in_progress = !block_is_full(journal);
767 block_number = journal->slab->journal_origin +
768 (header->sequence_number % journal->size);
769 vio->completion.parent = journal;
772 * This block won't be read in recovery until the slab summary is updated to refer to it.
773 * The slab summary update does a flush which is sufficient to protect us from VDO-2331.
775 vdo_submit_metadata_vio(uds_forget(vio), block_number, write_slab_journal_endio,
776 complete_write, REQ_OP_WRITE);
778 /* Since the write is submitted, the tail block structure can be reused. */
780 initialize_tail_block(journal);
781 journal->waiting_to_commit = false;
783 operation = vdo_get_admin_state_code(&journal->slab->state);
784 if (operation == VDO_ADMIN_STATE_WAITING_FOR_RECOVERY) {
785 vdo_finish_operation(&journal->slab->state,
786 (vdo_is_read_only(journal->slab->allocator->depot->vdo) ?
787 VDO_READ_ONLY : VDO_SUCCESS));
791 add_entries(journal);
795 * commit_tail() - Commit the tail block of the slab journal.
796 * @journal: The journal whose tail block should be committed.
798 static void commit_tail(struct slab_journal *journal)
800 if ((journal->tail_header.entry_count == 0) && must_make_entries_to_flush(journal)) {
802 * There are no entries at the moment, but there are some waiters, so defer
803 * initiating the flush until those entries are ready to write.
808 if (vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
809 journal->waiting_to_commit ||
810 (journal->tail_header.entry_count == 0)) {
812 * There is nothing to do since the tail block is empty, or writing, or the journal
813 * is in read-only mode.
819 * Since we are about to commit the tail block, this journal no longer needs to be on the
820 * ring of journals which the recovery journal might ask to commit.
822 mark_slab_journal_clean(journal);
824 journal->waiting_to_commit = true;
826 journal->resource_waiter.callback = write_slab_journal_block;
827 acquire_vio_from_pool(journal->slab->allocator->vio_pool,
828 &journal->resource_waiter);
832 * encode_slab_journal_entry() - Encode a slab journal entry.
833 * @tail_header: The unpacked header for the block.
834 * @payload: The journal block payload to hold the entry.
835 * @sbn: The slab block number of the entry to encode.
836 * @operation: The type of the entry.
837 * @increment: True if this is an increment.
839 * Exposed for unit tests.
841 static void encode_slab_journal_entry(struct slab_journal_block_header *tail_header,
842 slab_journal_payload *payload,
843 slab_block_number sbn,
844 enum journal_operation operation,
847 journal_entry_count_t entry_number = tail_header->entry_count++;
849 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
850 if (!tail_header->has_block_map_increments) {
851 memset(payload->full_entries.entry_types, 0,
852 VDO_SLAB_JOURNAL_ENTRY_TYPES_SIZE);
853 tail_header->has_block_map_increments = true;
856 payload->full_entries.entry_types[entry_number / 8] |=
857 ((u8)1 << (entry_number % 8));
860 vdo_pack_slab_journal_entry(&payload->entries[entry_number], sbn, increment);
864 * expand_journal_point() - Convert a recovery journal journal_point which refers to both an
865 * increment and a decrement to a single point which refers to one or the
867 * @recovery_point: The journal point to convert.
868 * @increment: Whether the current entry is an increment.
870 * Return: The expanded journal point
872 * Because each data_vio has but a single recovery journal point, but may need to make both
873 * increment and decrement entries in the same slab journal. In order to distinguish the two
874 * entries, the entry count of the expanded journal point is twice the actual recovery journal
875 * entry count for increments, and one more than that for decrements.
877 static struct journal_point expand_journal_point(struct journal_point recovery_point,
880 recovery_point.entry_count *= 2;
882 recovery_point.entry_count++;
884 return recovery_point;
888 * add_entry() - Actually add an entry to the slab journal, potentially firing off a write if a
889 * block becomes full.
890 * @journal: The slab journal to append to.
891 * @pbn: The pbn being adjusted.
892 * @operation: The type of entry to make.
893 * @increment: True if this is an increment.
894 * @recovery_point: The expanded recovery point.
896 * This function is synchronous.
898 static void add_entry(struct slab_journal *journal, physical_block_number_t pbn,
899 enum journal_operation operation, bool increment,
900 struct journal_point recovery_point)
902 struct packed_slab_journal_block *block = journal->block;
905 result = ASSERT(vdo_before_journal_point(&journal->tail_header.recovery_point,
907 "recovery journal point is monotonically increasing, recovery point: %llu.%u, block recovery point: %llu.%u",
908 (unsigned long long) recovery_point.sequence_number,
909 recovery_point.entry_count,
910 (unsigned long long) journal->tail_header.recovery_point.sequence_number,
911 journal->tail_header.recovery_point.entry_count);
912 if (result != VDO_SUCCESS) {
913 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
917 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
918 result = ASSERT((journal->tail_header.entry_count <
919 journal->full_entries_per_block),
920 "block has room for full entries");
921 if (result != VDO_SUCCESS) {
922 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo,
928 encode_slab_journal_entry(&journal->tail_header, &block->payload,
929 pbn - journal->slab->start, operation, increment);
930 journal->tail_header.recovery_point = recovery_point;
931 if (block_is_full(journal))
932 commit_tail(journal);
935 static inline block_count_t journal_length(const struct slab_journal *journal)
937 return journal->tail - journal->head;
941 * vdo_attempt_replay_into_slab() - Replay a recovery journal entry into a slab's journal.
942 * @slab: The slab to play into.
943 * @pbn: The PBN for the entry.
944 * @operation: The type of entry to add.
945 * @increment: True if this entry is an increment.
946 * @recovery_point: The recovery journal point corresponding to this entry.
947 * @parent: The completion to notify when there is space to add the entry if the entry could not be
950 * Return: true if the entry was added immediately.
952 bool vdo_attempt_replay_into_slab(struct vdo_slab *slab, physical_block_number_t pbn,
953 enum journal_operation operation, bool increment,
954 struct journal_point *recovery_point,
955 struct vdo_completion *parent)
957 struct slab_journal *journal = &slab->journal;
958 struct slab_journal_block_header *header = &journal->tail_header;
959 struct journal_point expanded = expand_journal_point(*recovery_point, increment);
961 /* Only accept entries after the current recovery point. */
962 if (!vdo_before_journal_point(&journal->tail_header.recovery_point, &expanded))
965 if ((header->entry_count >= journal->full_entries_per_block) &&
966 (header->has_block_map_increments || (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING))) {
968 * The tail block does not have room for the entry we are attempting to add so
969 * commit the tail block now.
971 commit_tail(journal);
974 if (journal->waiting_to_commit) {
975 vdo_start_operation_with_waiter(&journal->slab->state,
976 VDO_ADMIN_STATE_WAITING_FOR_RECOVERY,
981 if (journal_length(journal) >= journal->size) {
983 * We must have reaped the current head before the crash, since the blocked
984 * threshold keeps us from having more entries than fit in a slab journal; hence we
985 * can just advance the head (and unreapable block), as needed.
988 journal->unreapable++;
991 if (journal->slab->status == VDO_SLAB_REBUILT)
992 journal->slab->status = VDO_SLAB_REPLAYING;
994 add_entry(journal, pbn, operation, increment, expanded);
999 * requires_reaping() - Check whether the journal must be reaped before adding new entries.
1000 * @journal: The journal to check.
1002 * Return: true if the journal must be reaped.
1004 static bool requires_reaping(const struct slab_journal *journal)
1006 return (journal_length(journal) >= journal->blocking_threshold);
1009 /** finish_summary_update() - A waiter callback that resets the writing state of a slab. */
1010 static void finish_summary_update(struct vdo_waiter *waiter, void *context)
1012 struct vdo_slab *slab = container_of(waiter, struct vdo_slab, summary_waiter);
1013 int result = *((int *) context);
1015 slab->active_count--;
1017 if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) {
1018 uds_log_error_strerror(result, "failed to update slab summary");
1019 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1022 check_if_slab_drained(slab);
1025 static void write_reference_block(struct vdo_waiter *waiter, void *context);
1028 * launch_reference_block_write() - Launch the write of a dirty reference block by first acquiring
1029 * a VIO for it from the pool.
1030 * @waiter: The waiter of the block which is starting to write.
1031 * @context: The parent slab of the block.
1033 * This can be asynchronous since the writer will have to wait if all VIOs in the pool are
1036 static void launch_reference_block_write(struct vdo_waiter *waiter, void *context)
1038 struct vdo_slab *slab = context;
1040 if (vdo_is_read_only(slab->allocator->depot->vdo))
1043 slab->active_count++;
1044 container_of(waiter, struct reference_block, waiter)->is_writing = true;
1045 waiter->callback = write_reference_block;
1046 acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
1049 static void save_dirty_reference_blocks(struct vdo_slab *slab)
1051 vdo_waitq_notify_all_waiters(&slab->dirty_blocks,
1052 launch_reference_block_write, slab);
1053 check_if_slab_drained(slab);
1057 * finish_reference_block_write() - After a reference block has written, clean it, release its
1058 * locks, and return its VIO to the pool.
1059 * @completion: The VIO that just finished writing.
1061 static void finish_reference_block_write(struct vdo_completion *completion)
1063 struct vio *vio = as_vio(completion);
1064 struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1065 struct reference_block *block = completion->parent;
1066 struct vdo_slab *slab = block->slab;
1067 tail_block_offset_t offset;
1069 slab->active_count--;
1071 /* Release the slab journal lock. */
1072 adjust_slab_journal_block_reference(&slab->journal,
1073 block->slab_journal_lock_to_release, -1);
1074 return_vio_to_pool(slab->allocator->vio_pool, pooled);
1077 * We can't clear the is_writing flag earlier as releasing the slab journal lock may cause
1078 * us to be dirtied again, but we don't want to double enqueue.
1080 block->is_writing = false;
1082 if (vdo_is_read_only(completion->vdo)) {
1083 check_if_slab_drained(slab);
1087 /* Re-queue the block if it was re-dirtied while it was writing. */
1088 if (block->is_dirty) {
1089 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1090 if (vdo_is_state_draining(&slab->state)) {
1091 /* We must be saving, and this block will otherwise not be relaunched. */
1092 save_dirty_reference_blocks(slab);
1099 * Mark the slab as clean in the slab summary if there are no dirty or writing blocks
1100 * and no summary update in progress.
1102 if ((slab->active_count > 0) || vdo_waitq_has_waiters(&slab->dirty_blocks)) {
1103 check_if_slab_drained(slab);
1107 offset = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
1108 slab->active_count++;
1109 slab->summary_waiter.callback = finish_summary_update;
1110 update_slab_summary_entry(slab, &slab->summary_waiter, offset,
1111 true, true, slab->free_blocks);
1115 * get_reference_counters_for_block() - Find the reference counters for a given block.
1116 * @block: The reference_block in question.
1118 * Return: A pointer to the reference counters for this block.
1120 static vdo_refcount_t * __must_check get_reference_counters_for_block(struct reference_block *block)
1122 size_t block_index = block - block->slab->reference_blocks;
1124 return &block->slab->counters[block_index * COUNTS_PER_BLOCK];
1128 * pack_reference_block() - Copy data from a reference block to a buffer ready to be written out.
1129 * @block: The block to copy.
1130 * @buffer: The char buffer to fill with the packed block.
1132 static void pack_reference_block(struct reference_block *block, void *buffer)
1134 struct packed_reference_block *packed = buffer;
1135 vdo_refcount_t *counters = get_reference_counters_for_block(block);
1137 struct packed_journal_point commit_point;
1139 vdo_pack_journal_point(&block->slab->slab_journal_point, &commit_point);
1141 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
1142 packed->sectors[i].commit_point = commit_point;
1143 memcpy(packed->sectors[i].counts, counters + (i * COUNTS_PER_SECTOR),
1144 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
1148 static void write_reference_block_endio(struct bio *bio)
1150 struct vio *vio = bio->bi_private;
1151 struct reference_block *block = vio->completion.parent;
1152 thread_id_t thread_id = block->slab->allocator->thread_id;
1154 continue_vio_after_io(vio, finish_reference_block_write, thread_id);
1158 * handle_io_error() - Handle an I/O error reading or writing a reference count block.
1159 * @completion: The VIO doing the I/O as a completion.
1161 static void handle_io_error(struct vdo_completion *completion)
1163 int result = completion->result;
1164 struct vio *vio = as_vio(completion);
1165 struct vdo_slab *slab = ((struct reference_block *) completion->parent)->slab;
1167 vio_record_metadata_io_error(vio);
1168 return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio));
1169 slab->active_count--;
1170 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1171 check_if_slab_drained(slab);
1175 * write_reference_block() - After a dirty block waiter has gotten a VIO from the VIO pool, copy
1176 * its counters and associated data into the VIO, and launch the write.
1177 * @waiter: The waiter of the dirty block.
1178 * @context: The VIO returned by the pool.
1180 static void write_reference_block(struct vdo_waiter *waiter, void *context)
1182 size_t block_offset;
1183 physical_block_number_t pbn;
1184 struct pooled_vio *pooled = context;
1185 struct vdo_completion *completion = &pooled->vio.completion;
1186 struct reference_block *block = container_of(waiter, struct reference_block,
1189 pack_reference_block(block, pooled->vio.data);
1190 block_offset = (block - block->slab->reference_blocks);
1191 pbn = (block->slab->ref_counts_origin + block_offset);
1192 block->slab_journal_lock_to_release = block->slab_journal_lock;
1193 completion->parent = block;
1196 * Mark the block as clean, since we won't be committing any updates that happen after this
1197 * moment. As long as VIO order is preserved, two VIOs updating this block at once will not
1198 * cause complications.
1200 block->is_dirty = false;
1203 * Flush before writing to ensure that the recovery journal and slab journal entries which
1204 * cover this reference update are stable (VDO-2331).
1206 WRITE_ONCE(block->slab->allocator->ref_counts_statistics.blocks_written,
1207 block->slab->allocator->ref_counts_statistics.blocks_written + 1);
1209 completion->callback_thread_id = ((struct block_allocator *) pooled->context)->thread_id;
1210 vdo_submit_metadata_vio(&pooled->vio, pbn, write_reference_block_endio,
1211 handle_io_error, REQ_OP_WRITE | REQ_PREFLUSH);
1214 static void reclaim_journal_space(struct slab_journal *journal)
1216 block_count_t length = journal_length(journal);
1217 struct vdo_slab *slab = journal->slab;
1218 block_count_t write_count = vdo_waitq_num_waiters(&slab->dirty_blocks);
1219 block_count_t written;
1221 if ((length < journal->flushing_threshold) || (write_count == 0))
1224 /* The slab journal is over the first threshold, schedule some reference block writes. */
1225 WRITE_ONCE(journal->events->flush_count, journal->events->flush_count + 1);
1226 if (length < journal->flushing_deadline) {
1227 /* Schedule more writes the closer to the deadline we get. */
1228 write_count /= journal->flushing_deadline - length + 1;
1229 write_count = max_t(block_count_t, write_count, 1);
1232 for (written = 0; written < write_count; written++) {
1233 vdo_waitq_notify_next_waiter(&slab->dirty_blocks,
1234 launch_reference_block_write, slab);
1239 * reference_count_to_status() - Convert a reference count to a reference status.
1240 * @count: The count to convert.
1242 * Return: The appropriate reference status.
1244 static enum reference_status __must_check reference_count_to_status(vdo_refcount_t count)
1246 if (count == EMPTY_REFERENCE_COUNT)
1248 else if (count == 1)
1250 else if (count == PROVISIONAL_REFERENCE_COUNT)
1251 return RS_PROVISIONAL;
1257 * dirty_block() - Mark a reference count block as dirty, potentially adding it to the dirty queue
1258 * if it wasn't already dirty.
1259 * @block: The reference block to mark as dirty.
1261 static void dirty_block(struct reference_block *block)
1263 if (block->is_dirty)
1266 block->is_dirty = true;
1267 if (!block->is_writing)
1268 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1272 * get_reference_block() - Get the reference block that covers the given block index.
1274 static struct reference_block * __must_check get_reference_block(struct vdo_slab *slab,
1275 slab_block_number index)
1277 return &slab->reference_blocks[index / COUNTS_PER_BLOCK];
1281 * slab_block_number_from_pbn() - Determine the index within the slab of a particular physical
1284 * @physical_block_number: The physical block number.
1285 * @slab_block_number_ptr: A pointer to the slab block number.
1287 * Return: VDO_SUCCESS or an error code.
1289 static int __must_check slab_block_number_from_pbn(struct vdo_slab *slab,
1290 physical_block_number_t pbn,
1291 slab_block_number *slab_block_number_ptr)
1293 u64 slab_block_number;
1295 if (pbn < slab->start)
1296 return VDO_OUT_OF_RANGE;
1298 slab_block_number = pbn - slab->start;
1299 if (slab_block_number >= slab->allocator->depot->slab_config.data_blocks)
1300 return VDO_OUT_OF_RANGE;
1302 *slab_block_number_ptr = slab_block_number;
1307 * get_reference_counter() - Get the reference counter that covers the given physical block number.
1308 * @slab: The slab to query.
1309 * @pbn: The physical block number.
1310 * @counter_ptr: A pointer to the reference counter.
1312 static int __must_check get_reference_counter(struct vdo_slab *slab,
1313 physical_block_number_t pbn,
1314 vdo_refcount_t **counter_ptr)
1316 slab_block_number index;
1317 int result = slab_block_number_from_pbn(slab, pbn, &index);
1319 if (result != VDO_SUCCESS)
1322 *counter_ptr = &slab->counters[index];
1327 static unsigned int calculate_slab_priority(struct vdo_slab *slab)
1329 block_count_t free_blocks = slab->free_blocks;
1330 unsigned int unopened_slab_priority = slab->allocator->unopened_slab_priority;
1331 unsigned int priority;
1334 * Wholly full slabs must be the only ones with lowest priority, 0.
1336 * Slabs that have never been opened (empty, newly initialized, and never been written to)
1337 * have lower priority than previously opened slabs that have a significant number of free
1338 * blocks. This ranking causes VDO to avoid writing physical blocks for the first time
1339 * unless there are very few free blocks that have been previously written to.
1341 * Since VDO doesn't discard blocks currently, reusing previously written blocks makes VDO
1342 * a better client of any underlying storage that is thinly-provisioned (though discarding
1345 * For all other slabs, the priority is derived from the logarithm of the number of free
1346 * blocks. Slabs with the same order of magnitude of free blocks have the same priority.
1347 * With 2^23 blocks, the priority will range from 1 to 25. The reserved
1348 * unopened_slab_priority divides the range and is skipped by the logarithmic mapping.
1351 if (free_blocks == 0)
1354 if (is_slab_journal_blank(slab))
1355 return unopened_slab_priority;
1357 priority = (1 + ilog2(free_blocks));
1358 return ((priority < unopened_slab_priority) ? priority : priority + 1);
1362 * Slabs are essentially prioritized by an approximation of the number of free blocks in the slab
1363 * so slabs with lots of free blocks with be opened for allocation before slabs that have few free
1366 static void prioritize_slab(struct vdo_slab *slab)
1368 ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
1369 "a slab must not already be on a ring when prioritizing");
1370 slab->priority = calculate_slab_priority(slab);
1371 vdo_priority_table_enqueue(slab->allocator->prioritized_slabs,
1372 slab->priority, &slab->allocq_entry);
1376 * adjust_free_block_count() - Adjust the free block count and (if needed) reprioritize the slab.
1377 * @increment: should be true if the free block count went up.
1379 static void adjust_free_block_count(struct vdo_slab *slab, bool increment)
1381 struct block_allocator *allocator = slab->allocator;
1383 WRITE_ONCE(allocator->allocated_blocks,
1384 allocator->allocated_blocks + (increment ? -1 : 1));
1386 /* The open slab doesn't need to be reprioritized until it is closed. */
1387 if (slab == allocator->open_slab)
1390 /* Don't bother adjusting the priority table if unneeded. */
1391 if (slab->priority == calculate_slab_priority(slab))
1395 * Reprioritize the slab to reflect the new free block count by removing it from the table
1396 * and re-enqueuing it with the new priority.
1398 vdo_priority_table_remove(allocator->prioritized_slabs, &slab->allocq_entry);
1399 prioritize_slab(slab);
1403 * increment_for_data() - Increment the reference count for a data block.
1404 * @slab: The slab which owns the block.
1405 * @block: The reference block which contains the block being updated.
1406 * @block_number: The block to update.
1407 * @old_status: The reference status of the data block before this increment.
1408 * @lock: The pbn_lock associated with this increment (may be NULL).
1409 * @counter_ptr: A pointer to the count for the data block (in, out).
1410 * @adjust_block_count: Whether to update the allocator's free block count.
1412 * Return: VDO_SUCCESS or an error.
1414 static int increment_for_data(struct vdo_slab *slab, struct reference_block *block,
1415 slab_block_number block_number,
1416 enum reference_status old_status,
1417 struct pbn_lock *lock, vdo_refcount_t *counter_ptr,
1418 bool adjust_block_count)
1420 switch (old_status) {
1423 block->allocated_count++;
1424 slab->free_blocks--;
1425 if (adjust_block_count)
1426 adjust_free_block_count(slab, false);
1430 case RS_PROVISIONAL:
1435 /* Single or shared */
1436 if (*counter_ptr >= MAXIMUM_REFERENCE_COUNT) {
1437 return uds_log_error_strerror(VDO_REF_COUNT_INVALID,
1438 "Incrementing a block already having 254 references (slab %u, offset %u)",
1439 slab->slab_number, block_number);
1445 vdo_unassign_pbn_lock_provisional_reference(lock);
1450 * decrement_for_data() - Decrement the reference count for a data block.
1451 * @slab: The slab which owns the block.
1452 * @block: The reference block which contains the block being updated.
1453 * @block_number: The block to update.
1454 * @old_status: The reference status of the data block before this decrement.
1455 * @updater: The reference updater doing this operation in case we need to look up the pbn lock.
1456 * @lock: The pbn_lock associated with the block being decremented (may be NULL).
1457 * @counter_ptr: A pointer to the count for the data block (in, out).
1458 * @adjust_block_count: Whether to update the allocator's free block count.
1460 * Return: VDO_SUCCESS or an error.
1462 static int decrement_for_data(struct vdo_slab *slab, struct reference_block *block,
1463 slab_block_number block_number,
1464 enum reference_status old_status,
1465 struct reference_updater *updater,
1466 vdo_refcount_t *counter_ptr, bool adjust_block_count)
1468 switch (old_status) {
1470 return uds_log_error_strerror(VDO_REF_COUNT_INVALID,
1471 "Decrementing free block at offset %u in slab %u",
1472 block_number, slab->slab_number);
1474 case RS_PROVISIONAL:
1476 if (updater->zpbn.zone != NULL) {
1477 struct pbn_lock *lock = vdo_get_physical_zone_pbn_lock(updater->zpbn.zone,
1482 * There is a read lock on this block, so the block must not become
1485 *counter_ptr = PROVISIONAL_REFERENCE_COUNT;
1486 vdo_assign_pbn_lock_provisional_reference(lock);
1491 *counter_ptr = EMPTY_REFERENCE_COUNT;
1492 block->allocated_count--;
1493 slab->free_blocks++;
1494 if (adjust_block_count)
1495 adjust_free_block_count(slab, true);
1508 * increment_for_block_map() - Increment the reference count for a block map page.
1509 * @slab: The slab which owns the block.
1510 * @block: The reference block which contains the block being updated.
1511 * @block_number: The block to update.
1512 * @old_status: The reference status of the block before this increment.
1513 * @lock: The pbn_lock associated with this increment (may be NULL).
1514 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1515 * @counter_ptr: A pointer to the count for the block (in, out).
1516 * @adjust_block_count: Whether to update the allocator's free block count.
1518 * All block map increments should be from provisional to MAXIMUM_REFERENCE_COUNT. Since block map
1519 * blocks never dedupe they should never be adjusted from any other state. The adjustment always
1520 * results in MAXIMUM_REFERENCE_COUNT as this value is used to prevent dedupe against block map
1523 * Return: VDO_SUCCESS or an error.
1525 static int increment_for_block_map(struct vdo_slab *slab, struct reference_block *block,
1526 slab_block_number block_number,
1527 enum reference_status old_status,
1528 struct pbn_lock *lock, bool normal_operation,
1529 vdo_refcount_t *counter_ptr, bool adjust_block_count)
1531 switch (old_status) {
1533 if (normal_operation) {
1534 return uds_log_error_strerror(VDO_REF_COUNT_INVALID,
1535 "Incrementing unallocated block map block (slab %u, offset %u)",
1536 slab->slab_number, block_number);
1539 *counter_ptr = MAXIMUM_REFERENCE_COUNT;
1540 block->allocated_count++;
1541 slab->free_blocks--;
1542 if (adjust_block_count)
1543 adjust_free_block_count(slab, false);
1547 case RS_PROVISIONAL:
1548 if (!normal_operation)
1549 return uds_log_error_strerror(VDO_REF_COUNT_INVALID,
1550 "Block map block had provisional reference during replay (slab %u, offset %u)",
1551 slab->slab_number, block_number);
1553 *counter_ptr = MAXIMUM_REFERENCE_COUNT;
1555 vdo_unassign_pbn_lock_provisional_reference(lock);
1559 return uds_log_error_strerror(VDO_REF_COUNT_INVALID,
1560 "Incrementing a block map block which is already referenced %u times (slab %u, offset %u)",
1561 *counter_ptr, slab->slab_number,
1566 static bool __must_check is_valid_journal_point(const struct journal_point *point)
1568 return ((point != NULL) && (point->sequence_number > 0));
1572 * update_reference_count() - Update the reference count of a block.
1573 * @slab: The slab which owns the block.
1574 * @block: The reference block which contains the block being updated.
1575 * @block_number: The block to update.
1576 * @slab_journal_point: The slab journal point at which this update is journaled.
1577 * @updater: The reference updater.
1578 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1579 * @adjust_block_count: Whether to update the slab's free block count.
1580 * @provisional_decrement_ptr: A pointer which will be set to true if this update was a decrement
1581 * of a provisional reference.
1583 * Return: VDO_SUCCESS or an error.
1585 static int update_reference_count(struct vdo_slab *slab, struct reference_block *block,
1586 slab_block_number block_number,
1587 const struct journal_point *slab_journal_point,
1588 struct reference_updater *updater,
1589 bool normal_operation, bool adjust_block_count,
1590 bool *provisional_decrement_ptr)
1592 vdo_refcount_t *counter_ptr = &slab->counters[block_number];
1593 enum reference_status old_status = reference_count_to_status(*counter_ptr);
1596 if (!updater->increment) {
1597 result = decrement_for_data(slab, block, block_number, old_status,
1598 updater, counter_ptr, adjust_block_count);
1599 if ((result == VDO_SUCCESS) && (old_status == RS_PROVISIONAL)) {
1600 if (provisional_decrement_ptr != NULL)
1601 *provisional_decrement_ptr = true;
1604 } else if (updater->operation == VDO_JOURNAL_DATA_REMAPPING) {
1605 result = increment_for_data(slab, block, block_number, old_status,
1606 updater->lock, counter_ptr, adjust_block_count);
1608 result = increment_for_block_map(slab, block, block_number, old_status,
1609 updater->lock, normal_operation,
1610 counter_ptr, adjust_block_count);
1613 if (result != VDO_SUCCESS)
1616 if (is_valid_journal_point(slab_journal_point))
1617 slab->slab_journal_point = *slab_journal_point;
1622 static int __must_check adjust_reference_count(struct vdo_slab *slab,
1623 struct reference_updater *updater,
1624 const struct journal_point *slab_journal_point)
1626 slab_block_number block_number;
1628 struct reference_block *block;
1629 bool provisional_decrement = false;
1631 if (!is_slab_open(slab))
1632 return VDO_INVALID_ADMIN_STATE;
1634 result = slab_block_number_from_pbn(slab, updater->zpbn.pbn, &block_number);
1635 if (result != VDO_SUCCESS)
1638 block = get_reference_block(slab, block_number);
1639 result = update_reference_count(slab, block, block_number, slab_journal_point,
1640 updater, NORMAL_OPERATION, true,
1641 &provisional_decrement);
1642 if ((result != VDO_SUCCESS) || provisional_decrement)
1645 if (block->is_dirty && (block->slab_journal_lock > 0)) {
1646 sequence_number_t entry_lock = slab_journal_point->sequence_number;
1648 * This block is already dirty and a slab journal entry has been made for it since
1649 * the last time it was clean. We must release the per-entry slab journal lock for
1650 * the entry associated with the update we are now doing.
1652 result = ASSERT(is_valid_journal_point(slab_journal_point),
1653 "Reference count adjustments need slab journal points.");
1654 if (result != VDO_SUCCESS)
1657 adjust_slab_journal_block_reference(&slab->journal, entry_lock, -1);
1662 * This may be the first time we are applying an update for which there is a slab journal
1663 * entry to this block since the block was cleaned. Therefore, we convert the per-entry
1664 * slab journal lock to an uncommitted reference block lock, if there is a per-entry lock.
1666 if (is_valid_journal_point(slab_journal_point))
1667 block->slab_journal_lock = slab_journal_point->sequence_number;
1669 block->slab_journal_lock = 0;
1676 * add_entry_from_waiter() - Add an entry to the slab journal.
1677 * @waiter: The vio which should make an entry now.
1678 * @context: The slab journal to make an entry in.
1680 * This callback is invoked by add_entries() once it has determined that we are ready to make
1681 * another entry in the slab journal. Implements waiter_callback_fn.
1683 static void add_entry_from_waiter(struct vdo_waiter *waiter, void *context)
1686 struct reference_updater *updater =
1687 container_of(waiter, struct reference_updater, waiter);
1688 struct data_vio *data_vio = data_vio_from_reference_updater(updater);
1689 struct slab_journal *journal = context;
1690 struct slab_journal_block_header *header = &journal->tail_header;
1691 struct journal_point slab_journal_point = {
1692 .sequence_number = header->sequence_number,
1693 .entry_count = header->entry_count,
1695 sequence_number_t recovery_block = data_vio->recovery_journal_point.sequence_number;
1697 if (header->entry_count == 0) {
1699 * This is the first entry in the current tail block, so get a lock on the recovery
1700 * journal which we will hold until this tail block is committed.
1702 get_lock(journal, header->sequence_number)->recovery_start = recovery_block;
1703 if (journal->recovery_journal != NULL) {
1704 zone_count_t zone_number = journal->slab->allocator->zone_number;
1706 vdo_acquire_recovery_journal_block_reference(journal->recovery_journal,
1708 VDO_ZONE_TYPE_PHYSICAL,
1712 mark_slab_journal_dirty(journal, recovery_block);
1713 reclaim_journal_space(journal);
1716 add_entry(journal, updater->zpbn.pbn, updater->operation, updater->increment,
1717 expand_journal_point(data_vio->recovery_journal_point,
1718 updater->increment));
1720 if (journal->slab->status != VDO_SLAB_REBUILT) {
1722 * If the slab is unrecovered, scrubbing will take care of the count since the
1723 * update is now recorded in the journal.
1725 adjust_slab_journal_block_reference(journal,
1726 slab_journal_point.sequence_number, -1);
1727 result = VDO_SUCCESS;
1729 /* Now that an entry has been made in the slab journal, update the counter. */
1730 result = adjust_reference_count(journal->slab, updater,
1731 &slab_journal_point);
1734 if (updater->increment)
1735 continue_data_vio_with_error(data_vio, result);
1737 vdo_continue_completion(&data_vio->decrement_completion, result);
1741 * is_next_entry_a_block_map_increment() - Check whether the next entry to be made is a block map
1743 * @journal: The journal.
1745 * Return: true if the first entry waiter's operation is a block map increment.
1747 static inline bool is_next_entry_a_block_map_increment(struct slab_journal *journal)
1749 struct vdo_waiter *waiter = vdo_waitq_get_first_waiter(&journal->entry_waiters);
1750 struct reference_updater *updater = container_of(waiter,
1751 struct reference_updater,
1754 return (updater->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING);
1758 * add_entries() - Add as many entries as possible from the queue of vios waiting to make entries.
1759 * @journal: The journal to which entries may be added.
1761 * By processing the queue in order, we ensure that slab journal entries are made in the same order
1762 * as recovery journal entries for the same increment or decrement.
1764 static void add_entries(struct slab_journal *journal)
1766 if (journal->adding_entries) {
1767 /* Protect against re-entrancy. */
1771 journal->adding_entries = true;
1772 while (vdo_waitq_has_waiters(&journal->entry_waiters)) {
1773 struct slab_journal_block_header *header = &journal->tail_header;
1775 if (journal->partial_write_in_progress ||
1776 (journal->slab->status == VDO_SLAB_REBUILDING)) {
1778 * Don't add entries while rebuilding or while a partial write is
1779 * outstanding (VDO-2399).
1784 if (journal->waiting_to_commit) {
1786 * If we are waiting for resources to write the tail block, and the tail
1787 * block is full, we can't make another entry.
1789 WRITE_ONCE(journal->events->tail_busy_count,
1790 journal->events->tail_busy_count + 1);
1792 } else if (is_next_entry_a_block_map_increment(journal) &&
1793 (header->entry_count >= journal->full_entries_per_block)) {
1795 * The tail block does not have room for a block map increment, so commit
1798 commit_tail(journal);
1799 if (journal->waiting_to_commit) {
1800 WRITE_ONCE(journal->events->tail_busy_count,
1801 journal->events->tail_busy_count + 1);
1806 /* If the slab is over the blocking threshold, make the vio wait. */
1807 if (requires_reaping(journal)) {
1808 WRITE_ONCE(journal->events->blocked_count,
1809 journal->events->blocked_count + 1);
1810 save_dirty_reference_blocks(journal->slab);
1814 if (header->entry_count == 0) {
1815 struct journal_lock *lock =
1816 get_lock(journal, header->sequence_number);
1819 * Check if the on disk slab journal is full. Because of the blocking and
1820 * scrubbing thresholds, this should never happen.
1822 if (lock->count > 0) {
1823 ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail,
1824 "New block has locks, but journal is not full");
1827 * The blocking threshold must let the journal fill up if the new
1828 * block has locks; if the blocking threshold is smaller than the
1829 * journal size, the new block cannot possibly have locks already.
1831 ASSERT_LOG_ONLY((journal->blocking_threshold >= journal->size),
1832 "New block can have locks already iff blocking threshold is at the end of the journal");
1834 WRITE_ONCE(journal->events->disk_full_count,
1835 journal->events->disk_full_count + 1);
1836 save_dirty_reference_blocks(journal->slab);
1841 * Don't allow the new block to be reaped until all of the reference count
1842 * blocks are written and the journal block has been fully committed as
1845 lock->count = journal->entries_per_block + 1;
1847 if (header->sequence_number == 1) {
1848 struct vdo_slab *slab = journal->slab;
1852 * This is the first entry in this slab journal, ever. Dirty all of
1853 * the reference count blocks. Each will acquire a lock on the tail
1854 * block so that the journal won't be reaped until the reference
1855 * counts are initialized. The lock acquisition must be done by the
1856 * ref_counts since here we don't know how many reference blocks
1857 * the ref_counts has.
1859 for (i = 0; i < slab->reference_block_count; i++) {
1860 slab->reference_blocks[i].slab_journal_lock = 1;
1861 dirty_block(&slab->reference_blocks[i]);
1864 adjust_slab_journal_block_reference(journal, 1,
1865 slab->reference_block_count);
1869 vdo_waitq_notify_next_waiter(&journal->entry_waiters,
1870 add_entry_from_waiter, journal);
1873 journal->adding_entries = false;
1875 /* If there are no waiters, and we are flushing or saving, commit the tail block. */
1876 if (vdo_is_state_draining(&journal->slab->state) &&
1877 !vdo_is_state_suspending(&journal->slab->state) &&
1878 !vdo_waitq_has_waiters(&journal->entry_waiters))
1879 commit_tail(journal);
1883 * reset_search_cursor() - Reset the free block search back to the first reference counter in the
1884 * first reference block of a slab.
1886 static void reset_search_cursor(struct vdo_slab *slab)
1888 struct search_cursor *cursor = &slab->search_cursor;
1890 cursor->block = cursor->first_block;
1892 /* Unit tests have slabs with only one reference block (and it's a runt). */
1893 cursor->end_index = min_t(u32, COUNTS_PER_BLOCK, slab->block_count);
1897 * advance_search_cursor() - Advance the search cursor to the start of the next reference block in
1900 * Wraps around to the first reference block if the current block is the last reference block.
1902 * Return: true unless the cursor was at the last reference block.
1904 static bool advance_search_cursor(struct vdo_slab *slab)
1906 struct search_cursor *cursor = &slab->search_cursor;
1909 * If we just finished searching the last reference block, then wrap back around to the
1910 * start of the array.
1912 if (cursor->block == cursor->last_block) {
1913 reset_search_cursor(slab);
1917 /* We're not already at the end, so advance to cursor to the next block. */
1919 cursor->index = cursor->end_index;
1921 if (cursor->block == cursor->last_block) {
1922 /* The last reference block will usually be a runt. */
1923 cursor->end_index = slab->block_count;
1925 cursor->end_index += COUNTS_PER_BLOCK;
1932 * vdo_adjust_reference_count_for_rebuild() - Adjust the reference count of a block during rebuild.
1934 * Return: VDO_SUCCESS or an error.
1936 int vdo_adjust_reference_count_for_rebuild(struct slab_depot *depot,
1937 physical_block_number_t pbn,
1938 enum journal_operation operation)
1941 slab_block_number block_number;
1942 struct reference_block *block;
1943 struct vdo_slab *slab = vdo_get_slab(depot, pbn);
1944 struct reference_updater updater = {
1945 .operation = operation,
1949 result = slab_block_number_from_pbn(slab, pbn, &block_number);
1950 if (result != VDO_SUCCESS)
1953 block = get_reference_block(slab, block_number);
1954 result = update_reference_count(slab, block, block_number, NULL,
1955 &updater, !NORMAL_OPERATION, false, NULL);
1956 if (result != VDO_SUCCESS)
1964 * replay_reference_count_change() - Replay the reference count adjustment from a slab journal
1965 * entry into the reference count for a block.
1967 * @entry_point: The slab journal point for the entry.
1968 * @entry: The slab journal entry being replayed.
1970 * The adjustment will be ignored if it was already recorded in the reference count.
1972 * Return: VDO_SUCCESS or an error code.
1974 static int replay_reference_count_change(struct vdo_slab *slab,
1975 const struct journal_point *entry_point,
1976 struct slab_journal_entry entry)
1979 struct reference_block *block = get_reference_block(slab, entry.sbn);
1980 sector_count_t sector = (entry.sbn % COUNTS_PER_BLOCK) / COUNTS_PER_SECTOR;
1981 struct reference_updater updater = {
1982 .operation = entry.operation,
1983 .increment = entry.increment,
1986 if (!vdo_before_journal_point(&block->commit_points[sector], entry_point)) {
1987 /* This entry is already reflected in the existing counts, so do nothing. */
1991 /* This entry is not yet counted in the reference counts. */
1992 result = update_reference_count(slab, block, entry.sbn, entry_point,
1993 &updater, !NORMAL_OPERATION, false, NULL);
1994 if (result != VDO_SUCCESS)
2002 * find_zero_byte_in_word() - Find the array index of the first zero byte in word-sized range of
2003 * reference counters.
2004 * @word_ptr: A pointer to the eight counter bytes to check.
2005 * @start_index: The array index corresponding to word_ptr[0].
2006 * @fail_index: The array index to return if no zero byte is found.
2008 * The search does no bounds checking; the function relies on the array being sufficiently padded.
2010 * Return: The array index of the first zero byte in the word, or the value passed as fail_index if
2011 * no zero byte was found.
2013 static inline slab_block_number find_zero_byte_in_word(const u8 *word_ptr,
2014 slab_block_number start_index,
2015 slab_block_number fail_index)
2017 u64 word = get_unaligned_le64(word_ptr);
2019 /* This looks like a loop, but GCC will unroll the eight iterations for us. */
2020 unsigned int offset;
2022 for (offset = 0; offset < BYTES_PER_WORD; offset++) {
2023 /* Assumes little-endian byte order, which we have on X86. */
2024 if ((word & 0xFF) == 0)
2025 return (start_index + offset);
2033 * vdo_find_free_block() - Find the first block with a reference count of zero in the specified
2034 * range of reference counter indexes.
2035 * @slab: The slab counters to scan.
2036 * @index_ptr: A pointer to hold the array index of the free block.
2038 * Exposed for unit testing.
2040 * Return: true if a free block was found in the specified range.
2042 static bool find_free_block(const struct vdo_slab *slab, slab_block_number *index_ptr)
2044 slab_block_number zero_index;
2045 slab_block_number next_index = slab->search_cursor.index;
2046 slab_block_number end_index = slab->search_cursor.end_index;
2047 u8 *next_counter = &slab->counters[next_index];
2048 u8 *end_counter = &slab->counters[end_index];
2051 * Search every byte of the first unaligned word. (Array is padded so reading past end is
2054 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2055 if (zero_index < end_index) {
2056 *index_ptr = zero_index;
2061 * On architectures where unaligned word access is expensive, this would be a good place to
2062 * advance to an alignment boundary.
2064 next_index += BYTES_PER_WORD;
2065 next_counter += BYTES_PER_WORD;
2068 * Now we're word-aligned; check an word at a time until we find a word containing a zero.
2069 * (Array is padded so reading past end is safe.)
2071 while (next_counter < end_counter) {
2073 * The following code is currently an exact copy of the code preceding the loop,
2074 * but if you try to merge them by using a do loop, it runs slower because a jump
2075 * instruction gets added at the start of the iteration.
2077 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2078 if (zero_index < end_index) {
2079 *index_ptr = zero_index;
2083 next_index += BYTES_PER_WORD;
2084 next_counter += BYTES_PER_WORD;
2091 * search_current_reference_block() - Search the reference block currently saved in the search
2092 * cursor for a reference count of zero, starting at the saved
2094 * @slab: The slab to search.
2095 * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2097 * Return: true if an unreferenced counter was found.
2099 static bool search_current_reference_block(const struct vdo_slab *slab,
2100 slab_block_number *free_index_ptr)
2102 /* Don't bother searching if the current block is known to be full. */
2103 return ((slab->search_cursor.block->allocated_count < COUNTS_PER_BLOCK) &&
2104 find_free_block(slab, free_index_ptr));
2108 * search_reference_blocks() - Search each reference block for a reference count of zero.
2109 * @slab: The slab to search.
2110 * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2112 * Searches each reference block for a reference count of zero, starting at the reference block and
2113 * counter index saved in the search cursor and searching up to the end of the last reference
2114 * block. The search does not wrap.
2116 * Return: true if an unreferenced counter was found.
2118 static bool search_reference_blocks(struct vdo_slab *slab,
2119 slab_block_number *free_index_ptr)
2121 /* Start searching at the saved search position in the current block. */
2122 if (search_current_reference_block(slab, free_index_ptr))
2125 /* Search each reference block up to the end of the slab. */
2126 while (advance_search_cursor(slab)) {
2127 if (search_current_reference_block(slab, free_index_ptr))
2135 * make_provisional_reference() - Do the bookkeeping for making a provisional reference.
2137 static void make_provisional_reference(struct vdo_slab *slab,
2138 slab_block_number block_number)
2140 struct reference_block *block = get_reference_block(slab, block_number);
2143 * Make the initial transition from an unreferenced block to a
2144 * provisionally allocated block.
2146 slab->counters[block_number] = PROVISIONAL_REFERENCE_COUNT;
2148 /* Account for the allocation. */
2149 block->allocated_count++;
2150 slab->free_blocks--;
2154 * dirty_all_reference_blocks() - Mark all reference count blocks in a slab as dirty.
2156 static void dirty_all_reference_blocks(struct vdo_slab *slab)
2160 for (i = 0; i < slab->reference_block_count; i++)
2161 dirty_block(&slab->reference_blocks[i]);
2165 * clear_provisional_references() - Clear the provisional reference counts from a reference block.
2166 * @block: The block to clear.
2168 static void clear_provisional_references(struct reference_block *block)
2170 vdo_refcount_t *counters = get_reference_counters_for_block(block);
2173 for (j = 0; j < COUNTS_PER_BLOCK; j++) {
2174 if (counters[j] == PROVISIONAL_REFERENCE_COUNT) {
2175 counters[j] = EMPTY_REFERENCE_COUNT;
2176 block->allocated_count--;
2181 static inline bool journal_points_equal(struct journal_point first,
2182 struct journal_point second)
2184 return ((first.sequence_number == second.sequence_number) &&
2185 (first.entry_count == second.entry_count));
2189 * unpack_reference_block() - Unpack reference counts blocks into the internal memory structure.
2190 * @packed: The written reference block to be unpacked.
2191 * @block: The internal reference block to be loaded.
2193 static void unpack_reference_block(struct packed_reference_block *packed,
2194 struct reference_block *block)
2196 block_count_t index;
2198 struct vdo_slab *slab = block->slab;
2199 vdo_refcount_t *counters = get_reference_counters_for_block(block);
2201 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
2202 struct packed_reference_sector *sector = &packed->sectors[i];
2204 vdo_unpack_journal_point(§or->commit_point, &block->commit_points[i]);
2205 memcpy(counters + (i * COUNTS_PER_SECTOR), sector->counts,
2206 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
2207 /* The slab_journal_point must be the latest point found in any sector. */
2208 if (vdo_before_journal_point(&slab->slab_journal_point,
2209 &block->commit_points[i]))
2210 slab->slab_journal_point = block->commit_points[i];
2213 !journal_points_equal(block->commit_points[0],
2214 block->commit_points[i])) {
2215 size_t block_index = block - block->slab->reference_blocks;
2217 uds_log_warning("Torn write detected in sector %u of reference block %zu of slab %u",
2218 i, block_index, block->slab->slab_number);
2222 block->allocated_count = 0;
2223 for (index = 0; index < COUNTS_PER_BLOCK; index++) {
2224 if (counters[index] != EMPTY_REFERENCE_COUNT)
2225 block->allocated_count++;
2230 * finish_reference_block_load() - After a reference block has been read, unpack it.
2231 * @completion: The VIO that just finished reading.
2233 static void finish_reference_block_load(struct vdo_completion *completion)
2235 struct vio *vio = as_vio(completion);
2236 struct pooled_vio *pooled = vio_as_pooled_vio(vio);
2237 struct reference_block *block = completion->parent;
2238 struct vdo_slab *slab = block->slab;
2240 unpack_reference_block((struct packed_reference_block *) vio->data, block);
2241 return_vio_to_pool(slab->allocator->vio_pool, pooled);
2242 slab->active_count--;
2243 clear_provisional_references(block);
2245 slab->free_blocks -= block->allocated_count;
2246 check_if_slab_drained(slab);
2249 static void load_reference_block_endio(struct bio *bio)
2251 struct vio *vio = bio->bi_private;
2252 struct reference_block *block = vio->completion.parent;
2254 continue_vio_after_io(vio, finish_reference_block_load,
2255 block->slab->allocator->thread_id);
2259 * load_reference_block() - After a block waiter has gotten a VIO from the VIO pool, load the
2261 * @waiter: The waiter of the block to load.
2262 * @context: The VIO returned by the pool.
2264 static void load_reference_block(struct vdo_waiter *waiter, void *context)
2266 struct pooled_vio *pooled = context;
2267 struct vio *vio = &pooled->vio;
2268 struct reference_block *block =
2269 container_of(waiter, struct reference_block, waiter);
2270 size_t block_offset = (block - block->slab->reference_blocks);
2272 vio->completion.parent = block;
2273 vdo_submit_metadata_vio(vio, block->slab->ref_counts_origin + block_offset,
2274 load_reference_block_endio, handle_io_error,
2279 * load_reference_blocks() - Load a slab's reference blocks from the underlying storage into a
2280 * pre-allocated reference counter.
2282 static void load_reference_blocks(struct vdo_slab *slab)
2286 slab->free_blocks = slab->block_count;
2287 slab->active_count = slab->reference_block_count;
2288 for (i = 0; i < slab->reference_block_count; i++) {
2289 struct vdo_waiter *waiter = &slab->reference_blocks[i].waiter;
2291 waiter->callback = load_reference_block;
2292 acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
2297 * drain_slab() - Drain all reference count I/O.
2299 * Depending upon the type of drain being performed (as recorded in the ref_count's vdo_slab), the
2300 * reference blocks may be loaded from disk or dirty reference blocks may be written out.
2302 static void drain_slab(struct vdo_slab *slab)
2306 const struct admin_state_code *state = vdo_get_admin_state_code(&slab->state);
2308 if (state == VDO_ADMIN_STATE_SUSPENDING)
2311 if ((state != VDO_ADMIN_STATE_REBUILDING) &&
2312 (state != VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING))
2313 commit_tail(&slab->journal);
2315 if ((state == VDO_ADMIN_STATE_RECOVERING) || (slab->counters == NULL))
2319 load = slab->allocator->summary_entries[slab->slab_number].load_ref_counts;
2320 if (state == VDO_ADMIN_STATE_SCRUBBING) {
2322 load_reference_blocks(slab);
2325 } else if (state == VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING) {
2327 /* These reference counts were never written, so mark them all dirty. */
2328 dirty_all_reference_blocks(slab);
2331 } else if (state == VDO_ADMIN_STATE_REBUILDING) {
2333 * Write out the counters if the slab has written them before, or it has any
2334 * non-zero reference counts, or there are any slab journal blocks.
2336 block_count_t data_blocks = slab->allocator->depot->slab_config.data_blocks;
2338 if (load || (slab->free_blocks != data_blocks) ||
2339 !is_slab_journal_blank(slab)) {
2340 dirty_all_reference_blocks(slab);
2343 } else if (state == VDO_ADMIN_STATE_SAVING) {
2344 save = (slab->status == VDO_SLAB_REBUILT);
2346 vdo_finish_draining_with_result(&slab->state, VDO_SUCCESS);
2351 save_dirty_reference_blocks(slab);
2354 static int allocate_slab_counters(struct vdo_slab *slab)
2357 size_t index, bytes;
2359 result = ASSERT(slab->reference_blocks == NULL,
2360 "vdo_slab %u doesn't allocate refcounts twice",
2362 if (result != VDO_SUCCESS)
2365 result = uds_allocate(slab->reference_block_count, struct reference_block,
2366 __func__, &slab->reference_blocks);
2367 if (result != VDO_SUCCESS)
2371 * Allocate such that the runt slab has a full-length memory array, plus a little padding
2372 * so we can word-search even at the very end.
2374 bytes = (slab->reference_block_count * COUNTS_PER_BLOCK) + (2 * BYTES_PER_WORD);
2375 result = uds_allocate(bytes, vdo_refcount_t, "ref counts array",
2377 if (result != UDS_SUCCESS) {
2378 uds_free(uds_forget(slab->reference_blocks));
2382 slab->search_cursor.first_block = slab->reference_blocks;
2383 slab->search_cursor.last_block = &slab->reference_blocks[slab->reference_block_count - 1];
2384 reset_search_cursor(slab);
2386 for (index = 0; index < slab->reference_block_count; index++) {
2387 slab->reference_blocks[index] = (struct reference_block) {
2395 static int allocate_counters_if_clean(struct vdo_slab *slab)
2397 if (vdo_is_state_clean_load(&slab->state))
2398 return allocate_slab_counters(slab);
2403 static void finish_loading_journal(struct vdo_completion *completion)
2405 struct vio *vio = as_vio(completion);
2406 struct slab_journal *journal = completion->parent;
2407 struct vdo_slab *slab = journal->slab;
2408 struct packed_slab_journal_block *block = (struct packed_slab_journal_block *) vio->data;
2409 struct slab_journal_block_header header;
2411 vdo_unpack_slab_journal_block_header(&block->header, &header);
2413 /* FIXME: should it be an error if the following conditional fails? */
2414 if ((header.metadata_type == VDO_METADATA_SLAB_JOURNAL) &&
2415 (header.nonce == slab->allocator->nonce)) {
2416 journal->tail = header.sequence_number + 1;
2419 * If the slab is clean, this implies the slab journal is empty, so advance the
2420 * head appropriately.
2422 journal->head = (slab->allocator->summary_entries[slab->slab_number].is_dirty ?
2423 header.head : journal->tail);
2424 journal->tail_header = header;
2425 initialize_journal_state(journal);
2428 return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio));
2429 vdo_finish_loading_with_result(&slab->state, allocate_counters_if_clean(slab));
2432 static void read_slab_journal_tail_endio(struct bio *bio)
2434 struct vio *vio = bio->bi_private;
2435 struct slab_journal *journal = vio->completion.parent;
2437 continue_vio_after_io(vio, finish_loading_journal,
2438 journal->slab->allocator->thread_id);
2441 static void handle_load_error(struct vdo_completion *completion)
2443 int result = completion->result;
2444 struct slab_journal *journal = completion->parent;
2445 struct vio *vio = as_vio(completion);
2447 vio_record_metadata_io_error(vio);
2448 return_vio_to_pool(journal->slab->allocator->vio_pool, vio_as_pooled_vio(vio));
2449 vdo_finish_loading_with_result(&journal->slab->state, result);
2453 * read_slab_journal_tail() - Read the slab journal tail block by using a vio acquired from the vio
2455 * @waiter: The vio pool waiter which has just been notified.
2456 * @context: The vio pool entry given to the waiter.
2458 * This is the success callback from acquire_vio_from_pool() when loading a slab journal.
2460 static void read_slab_journal_tail(struct vdo_waiter *waiter, void *context)
2462 struct slab_journal *journal =
2463 container_of(waiter, struct slab_journal, resource_waiter);
2464 struct vdo_slab *slab = journal->slab;
2465 struct pooled_vio *pooled = context;
2466 struct vio *vio = &pooled->vio;
2467 tail_block_offset_t last_commit_point =
2468 slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2471 * Slab summary keeps the commit point offset, so the tail block is the block before that.
2472 * Calculation supports small journals in unit tests.
2474 tail_block_offset_t tail_block = ((last_commit_point == 0) ?
2475 (tail_block_offset_t)(journal->size - 1) :
2476 (last_commit_point - 1));
2478 vio->completion.parent = journal;
2479 vio->completion.callback_thread_id = slab->allocator->thread_id;
2480 vdo_submit_metadata_vio(vio, slab->journal_origin + tail_block,
2481 read_slab_journal_tail_endio, handle_load_error,
2486 * load_slab_journal() - Load a slab's journal by reading the journal's tail.
2488 static void load_slab_journal(struct vdo_slab *slab)
2490 struct slab_journal *journal = &slab->journal;
2491 tail_block_offset_t last_commit_point;
2493 last_commit_point = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2494 if ((last_commit_point == 0) &&
2495 !slab->allocator->summary_entries[slab->slab_number].load_ref_counts) {
2497 * This slab claims that it has a tail block at (journal->size - 1), but a head of
2498 * 1. This is impossible, due to the scrubbing threshold, on a real system, so
2499 * don't bother reading the (bogus) data off disk.
2501 ASSERT_LOG_ONLY(((journal->size < 16) ||
2502 (journal->scrubbing_threshold < (journal->size - 1))),
2503 "Scrubbing threshold protects against reads of unwritten slab journal blocks");
2504 vdo_finish_loading_with_result(&slab->state,
2505 allocate_counters_if_clean(slab));
2509 journal->resource_waiter.callback = read_slab_journal_tail;
2510 acquire_vio_from_pool(slab->allocator->vio_pool, &journal->resource_waiter);
2513 static void register_slab_for_scrubbing(struct vdo_slab *slab, bool high_priority)
2515 struct slab_scrubber *scrubber = &slab->allocator->scrubber;
2517 ASSERT_LOG_ONLY((slab->status != VDO_SLAB_REBUILT),
2518 "slab to be scrubbed is unrecovered");
2520 if (slab->status != VDO_SLAB_REQUIRES_SCRUBBING)
2523 list_del_init(&slab->allocq_entry);
2524 if (!slab->was_queued_for_scrubbing) {
2525 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count + 1);
2526 slab->was_queued_for_scrubbing = true;
2529 if (high_priority) {
2530 slab->status = VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING;
2531 list_add_tail(&slab->allocq_entry, &scrubber->high_priority_slabs);
2535 list_add_tail(&slab->allocq_entry, &scrubber->slabs);
2538 /* Queue a slab for allocation or scrubbing. */
2539 static void queue_slab(struct vdo_slab *slab)
2541 struct block_allocator *allocator = slab->allocator;
2542 block_count_t free_blocks;
2545 ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
2546 "a requeued slab must not already be on a ring");
2548 if (vdo_is_read_only(allocator->depot->vdo))
2551 free_blocks = slab->free_blocks;
2552 result = ASSERT((free_blocks <= allocator->depot->slab_config.data_blocks),
2553 "rebuilt slab %u must have a valid free block count (has %llu, expected maximum %llu)",
2554 slab->slab_number, (unsigned long long) free_blocks,
2555 (unsigned long long) allocator->depot->slab_config.data_blocks);
2556 if (result != VDO_SUCCESS) {
2557 vdo_enter_read_only_mode(allocator->depot->vdo, result);
2561 if (slab->status != VDO_SLAB_REBUILT) {
2562 register_slab_for_scrubbing(slab, false);
2566 if (!vdo_is_state_resuming(&slab->state)) {
2568 * If the slab is resuming, we've already accounted for it here, so don't do it
2570 * FIXME: under what situation would the slab be resuming here?
2572 WRITE_ONCE(allocator->allocated_blocks,
2573 allocator->allocated_blocks - free_blocks);
2574 if (!is_slab_journal_blank(slab)) {
2575 WRITE_ONCE(allocator->statistics.slabs_opened,
2576 allocator->statistics.slabs_opened + 1);
2580 if (allocator->depot->vdo->suspend_type == VDO_ADMIN_STATE_SAVING)
2581 reopen_slab_journal(slab);
2583 prioritize_slab(slab);
2587 * initiate_slab_action() - Initiate a slab action.
2589 * Implements vdo_admin_initiator_fn.
2591 static void initiate_slab_action(struct admin_state *state)
2593 struct vdo_slab *slab = container_of(state, struct vdo_slab, state);
2595 if (vdo_is_state_draining(state)) {
2596 const struct admin_state_code *operation = vdo_get_admin_state_code(state);
2598 if (operation == VDO_ADMIN_STATE_SCRUBBING)
2599 slab->status = VDO_SLAB_REBUILDING;
2602 check_if_slab_drained(slab);
2606 if (vdo_is_state_loading(state)) {
2607 load_slab_journal(slab);
2611 if (vdo_is_state_resuming(state)) {
2613 vdo_finish_resuming(state);
2617 vdo_finish_operation(state, VDO_INVALID_ADMIN_STATE);
2621 * get_next_slab() - Get the next slab to scrub.
2622 * @scrubber: The slab scrubber.
2624 * Return: The next slab to scrub or NULL if there are none.
2626 static struct vdo_slab *get_next_slab(struct slab_scrubber *scrubber)
2628 struct vdo_slab *slab;
2630 slab = list_first_entry_or_null(&scrubber->high_priority_slabs,
2631 struct vdo_slab, allocq_entry);
2635 return list_first_entry_or_null(&scrubber->slabs, struct vdo_slab,
2640 * has_slabs_to_scrub() - Check whether a scrubber has slabs to scrub.
2641 * @scrubber: The scrubber to check.
2643 * Return: true if the scrubber has slabs to scrub.
2645 static bool __must_check has_slabs_to_scrub(struct slab_scrubber *scrubber)
2647 return (get_next_slab(scrubber) != NULL);
2651 * uninitialize_scrubber_vio() - Clean up the slab_scrubber's vio.
2652 * @scrubber: The scrubber.
2654 static void uninitialize_scrubber_vio(struct slab_scrubber *scrubber)
2656 uds_free(uds_forget(scrubber->vio.data));
2657 free_vio_components(&scrubber->vio);
2661 * finish_scrubbing() - Stop scrubbing, either because there are no more slabs to scrub or because
2662 * there's been an error.
2663 * @scrubber: The scrubber.
2665 static void finish_scrubbing(struct slab_scrubber *scrubber, int result)
2667 bool notify = vdo_waitq_has_waiters(&scrubber->waiters);
2668 bool done = !has_slabs_to_scrub(scrubber);
2669 struct block_allocator *allocator =
2670 container_of(scrubber, struct block_allocator, scrubber);
2673 uninitialize_scrubber_vio(scrubber);
2675 if (scrubber->high_priority_only) {
2676 scrubber->high_priority_only = false;
2677 vdo_fail_completion(uds_forget(scrubber->vio.completion.parent), result);
2678 } else if (done && (atomic_add_return(-1, &allocator->depot->zones_to_scrub) == 0)) {
2679 /* All of our slabs were scrubbed, and we're the last allocator to finish. */
2680 enum vdo_state prior_state =
2681 atomic_cmpxchg(&allocator->depot->vdo->state, VDO_RECOVERING,
2685 * To be safe, even if the CAS failed, ensure anything that follows is ordered with
2686 * respect to whatever state change did happen.
2688 smp_mb__after_atomic();
2691 * We must check the VDO state here and not the depot's read_only_notifier since
2692 * the compare-swap-above could have failed due to a read-only entry which our own
2693 * thread does not yet know about.
2695 if (prior_state == VDO_DIRTY)
2696 uds_log_info("VDO commencing normal operation");
2697 else if (prior_state == VDO_RECOVERING)
2698 uds_log_info("Exiting recovery mode");
2702 * Note that the scrubber has stopped, and inform anyone who might be waiting for that to
2705 if (!vdo_finish_draining(&scrubber->admin_state))
2706 WRITE_ONCE(scrubber->admin_state.current_state,
2707 VDO_ADMIN_STATE_SUSPENDED);
2710 * We can't notify waiters until after we've finished draining or they'll just requeue.
2711 * Fortunately if there were waiters, we can't have been freed yet.
2714 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2717 static void scrub_next_slab(struct slab_scrubber *scrubber);
2720 * slab_scrubbed() - Notify the scrubber that a slab has been scrubbed.
2721 * @completion: The slab rebuild completion.
2723 * This callback is registered in apply_journal_entries().
2725 static void slab_scrubbed(struct vdo_completion *completion)
2727 struct slab_scrubber *scrubber =
2728 container_of(as_vio(completion), struct slab_scrubber, vio);
2729 struct vdo_slab *slab = scrubber->slab;
2731 slab->status = VDO_SLAB_REBUILT;
2733 reopen_slab_journal(slab);
2734 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count - 1);
2735 scrub_next_slab(scrubber);
2739 * abort_scrubbing() - Abort scrubbing due to an error.
2740 * @scrubber: The slab scrubber.
2741 * @result: The error.
2743 static void abort_scrubbing(struct slab_scrubber *scrubber, int result)
2745 vdo_enter_read_only_mode(scrubber->vio.completion.vdo, result);
2746 finish_scrubbing(scrubber, result);
2750 * handle_scrubber_error() - Handle errors while rebuilding a slab.
2751 * @completion: The slab rebuild completion.
2753 static void handle_scrubber_error(struct vdo_completion *completion)
2755 struct vio *vio = as_vio(completion);
2757 vio_record_metadata_io_error(vio);
2758 abort_scrubbing(container_of(vio, struct slab_scrubber, vio),
2759 completion->result);
2763 * apply_block_entries() - Apply all the entries in a block to the reference counts.
2764 * @block: A block with entries to apply.
2765 * @entry_count: The number of entries to apply.
2766 * @block_number: The sequence number of the block.
2767 * @slab: The slab to apply the entries to.
2769 * Return: VDO_SUCCESS or an error code.
2771 static int apply_block_entries(struct packed_slab_journal_block *block,
2772 journal_entry_count_t entry_count,
2773 sequence_number_t block_number, struct vdo_slab *slab)
2775 struct journal_point entry_point = {
2776 .sequence_number = block_number,
2780 slab_block_number max_sbn = slab->end - slab->start;
2782 while (entry_point.entry_count < entry_count) {
2783 struct slab_journal_entry entry =
2784 vdo_decode_slab_journal_entry(block, entry_point.entry_count);
2786 if (entry.sbn > max_sbn) {
2787 /* This entry is out of bounds. */
2788 return uds_log_error_strerror(VDO_CORRUPT_JOURNAL,
2789 "vdo_slab journal entry (%llu, %u) had invalid offset %u in slab (size %u blocks)",
2790 (unsigned long long) block_number,
2791 entry_point.entry_count,
2792 entry.sbn, max_sbn);
2795 result = replay_reference_count_change(slab, &entry_point, entry);
2796 if (result != VDO_SUCCESS) {
2797 uds_log_error_strerror(result,
2798 "vdo_slab journal entry (%llu, %u) (%s of offset %u) could not be applied in slab %u",
2799 (unsigned long long) block_number,
2800 entry_point.entry_count,
2801 vdo_get_journal_operation_name(entry.operation),
2802 entry.sbn, slab->slab_number);
2805 entry_point.entry_count++;
2812 * apply_journal_entries() - Find the relevant vio of the slab journal and apply all valid entries.
2813 * @completion: The metadata read vio completion.
2815 * This is a callback registered in start_scrubbing().
2817 static void apply_journal_entries(struct vdo_completion *completion)
2820 struct slab_scrubber *scrubber
2821 = container_of(as_vio(completion), struct slab_scrubber, vio);
2822 struct vdo_slab *slab = scrubber->slab;
2823 struct slab_journal *journal = &slab->journal;
2825 /* Find the boundaries of the useful part of the journal. */
2826 sequence_number_t tail = journal->tail;
2827 tail_block_offset_t end_index = (tail - 1) % journal->size;
2828 char *end_data = scrubber->vio.data + (end_index * VDO_BLOCK_SIZE);
2829 struct packed_slab_journal_block *end_block =
2830 (struct packed_slab_journal_block *) end_data;
2832 sequence_number_t head = __le64_to_cpu(end_block->header.head);
2833 tail_block_offset_t head_index = head % journal->size;
2834 block_count_t index = head_index;
2836 struct journal_point ref_counts_point = slab->slab_journal_point;
2837 struct journal_point last_entry_applied = ref_counts_point;
2838 sequence_number_t sequence;
2840 for (sequence = head; sequence < tail; sequence++) {
2841 char *block_data = scrubber->vio.data + (index * VDO_BLOCK_SIZE);
2842 struct packed_slab_journal_block *block =
2843 (struct packed_slab_journal_block *) block_data;
2844 struct slab_journal_block_header header;
2846 vdo_unpack_slab_journal_block_header(&block->header, &header);
2848 if ((header.nonce != slab->allocator->nonce) ||
2849 (header.metadata_type != VDO_METADATA_SLAB_JOURNAL) ||
2850 (header.sequence_number != sequence) ||
2851 (header.entry_count > journal->entries_per_block) ||
2852 (header.has_block_map_increments &&
2853 (header.entry_count > journal->full_entries_per_block))) {
2854 /* The block is not what we expect it to be. */
2855 uds_log_error("vdo_slab journal block for slab %u was invalid",
2857 abort_scrubbing(scrubber, VDO_CORRUPT_JOURNAL);
2861 result = apply_block_entries(block, header.entry_count, sequence, slab);
2862 if (result != VDO_SUCCESS) {
2863 abort_scrubbing(scrubber, result);
2867 last_entry_applied.sequence_number = sequence;
2868 last_entry_applied.entry_count = header.entry_count - 1;
2870 if (index == journal->size)
2875 * At the end of rebuild, the reference counters should be accurate to the end of the
2876 * journal we just applied.
2878 result = ASSERT(!vdo_before_journal_point(&last_entry_applied,
2880 "Refcounts are not more accurate than the slab journal");
2881 if (result != VDO_SUCCESS) {
2882 abort_scrubbing(scrubber, result);
2886 /* Save out the rebuilt reference blocks. */
2887 vdo_prepare_completion(completion, slab_scrubbed, handle_scrubber_error,
2888 slab->allocator->thread_id, completion->parent);
2889 vdo_start_operation_with_waiter(&slab->state,
2890 VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING,
2891 completion, initiate_slab_action);
2894 static void read_slab_journal_endio(struct bio *bio)
2896 struct vio *vio = bio->bi_private;
2897 struct slab_scrubber *scrubber = container_of(vio, struct slab_scrubber, vio);
2899 continue_vio_after_io(bio->bi_private, apply_journal_entries,
2900 scrubber->slab->allocator->thread_id);
2904 * start_scrubbing() - Read the current slab's journal from disk now that it has been flushed.
2905 * @completion: The scrubber's vio completion.
2907 * This callback is registered in scrub_next_slab().
2909 static void start_scrubbing(struct vdo_completion *completion)
2911 struct slab_scrubber *scrubber =
2912 container_of(as_vio(completion), struct slab_scrubber, vio);
2913 struct vdo_slab *slab = scrubber->slab;
2915 if (!slab->allocator->summary_entries[slab->slab_number].is_dirty) {
2916 slab_scrubbed(completion);
2920 vdo_submit_metadata_vio(&scrubber->vio, slab->journal_origin,
2921 read_slab_journal_endio, handle_scrubber_error,
2926 * scrub_next_slab() - Scrub the next slab if there is one.
2927 * @scrubber: The scrubber.
2929 static void scrub_next_slab(struct slab_scrubber *scrubber)
2931 struct vdo_completion *completion = &scrubber->vio.completion;
2932 struct vdo_slab *slab;
2935 * Note: this notify call is always safe only because scrubbing can only be started when
2936 * the VDO is quiescent.
2938 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2940 if (vdo_is_read_only(completion->vdo)) {
2941 finish_scrubbing(scrubber, VDO_READ_ONLY);
2945 slab = get_next_slab(scrubber);
2946 if ((slab == NULL) ||
2947 (scrubber->high_priority_only && list_empty(&scrubber->high_priority_slabs))) {
2948 finish_scrubbing(scrubber, VDO_SUCCESS);
2952 if (vdo_finish_draining(&scrubber->admin_state))
2955 list_del_init(&slab->allocq_entry);
2956 scrubber->slab = slab;
2957 vdo_prepare_completion(completion, start_scrubbing, handle_scrubber_error,
2958 slab->allocator->thread_id, completion->parent);
2959 vdo_start_operation_with_waiter(&slab->state, VDO_ADMIN_STATE_SCRUBBING,
2960 completion, initiate_slab_action);
2964 * scrub_slabs() - Scrub all of an allocator's slabs that are eligible for scrubbing.
2965 * @allocator: The block_allocator to scrub.
2966 * @parent: The completion to notify when scrubbing is done, implies high_priority, may be NULL.
2968 static void scrub_slabs(struct block_allocator *allocator, struct vdo_completion *parent)
2970 struct slab_scrubber *scrubber = &allocator->scrubber;
2972 scrubber->vio.completion.parent = parent;
2973 scrubber->high_priority_only = (parent != NULL);
2974 if (!has_slabs_to_scrub(scrubber)) {
2975 finish_scrubbing(scrubber, VDO_SUCCESS);
2979 if (scrubber->high_priority_only &&
2980 vdo_is_priority_table_empty(allocator->prioritized_slabs) &&
2981 list_empty(&scrubber->high_priority_slabs))
2982 register_slab_for_scrubbing(get_next_slab(scrubber), true);
2984 vdo_resume_if_quiescent(&scrubber->admin_state);
2985 scrub_next_slab(scrubber);
2988 static inline void assert_on_allocator_thread(thread_id_t thread_id,
2989 const char *function_name)
2991 ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == thread_id),
2992 "%s called on correct thread", function_name);
2995 static void register_slab_with_allocator(struct block_allocator *allocator,
2996 struct vdo_slab *slab)
2998 allocator->slab_count++;
2999 allocator->last_slab = slab->slab_number;
3003 * get_depot_slab_iterator() - Return a slab_iterator over the slabs in a slab_depot.
3004 * @depot: The depot over which to iterate.
3005 * @start: The number of the slab to start iterating from.
3006 * @end: The number of the last slab which may be returned.
3007 * @stride: The difference in slab number between successive slabs.
3009 * Iteration always occurs from higher to lower numbered slabs.
3011 * Return: An initialized iterator structure.
3013 static struct slab_iterator get_depot_slab_iterator(struct slab_depot *depot,
3014 slab_count_t start, slab_count_t end,
3015 slab_count_t stride)
3017 struct vdo_slab **slabs = depot->slabs;
3019 return (struct slab_iterator) {
3021 .next = (((slabs == NULL) || (start < end)) ? NULL : slabs[start]),
3027 static struct slab_iterator get_slab_iterator(const struct block_allocator *allocator)
3029 return get_depot_slab_iterator(allocator->depot, allocator->last_slab,
3030 allocator->zone_number,
3031 allocator->depot->zone_count);
3035 * next_slab() - Get the next slab from a slab_iterator and advance the iterator
3036 * @iterator: The slab_iterator.
3038 * Return: The next slab or NULL if the iterator is exhausted.
3040 static struct vdo_slab *next_slab(struct slab_iterator *iterator)
3042 struct vdo_slab *slab = iterator->next;
3044 if ((slab == NULL) || (slab->slab_number < iterator->end + iterator->stride))
3045 iterator->next = NULL;
3047 iterator->next = iterator->slabs[slab->slab_number - iterator->stride];
3053 * abort_waiter() - Abort vios waiting to make journal entries when read-only.
3055 * This callback is invoked on all vios waiting to make slab journal entries after the VDO has gone
3056 * into read-only mode. Implements waiter_callback_fn.
3058 static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
3060 struct reference_updater *updater =
3061 container_of(waiter, struct reference_updater, waiter);
3062 struct data_vio *data_vio = data_vio_from_reference_updater(updater);
3064 if (updater->increment) {
3065 continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
3069 vdo_continue_completion(&data_vio->decrement_completion, VDO_READ_ONLY);
3072 /* Implements vdo_read_only_notification_fn. */
3073 static void notify_block_allocator_of_read_only_mode(void *listener,
3074 struct vdo_completion *parent)
3076 struct block_allocator *allocator = listener;
3077 struct slab_iterator iterator;
3079 assert_on_allocator_thread(allocator->thread_id, __func__);
3080 iterator = get_slab_iterator(allocator);
3081 while (iterator.next != NULL) {
3082 struct vdo_slab *slab = next_slab(&iterator);
3084 vdo_waitq_notify_all_waiters(&slab->journal.entry_waiters,
3085 abort_waiter, &slab->journal);
3086 check_if_slab_drained(slab);
3089 vdo_finish_completion(parent);
3093 * vdo_acquire_provisional_reference() - Acquire a provisional reference on behalf of a PBN lock if
3094 * the block it locks is unreferenced.
3095 * @slab: The slab which contains the block.
3096 * @pbn: The physical block to reference.
3099 * Return: VDO_SUCCESS or an error.
3101 int vdo_acquire_provisional_reference(struct vdo_slab *slab, physical_block_number_t pbn,
3102 struct pbn_lock *lock)
3104 slab_block_number block_number;
3107 if (vdo_pbn_lock_has_provisional_reference(lock))
3110 if (!is_slab_open(slab))
3111 return VDO_INVALID_ADMIN_STATE;
3113 result = slab_block_number_from_pbn(slab, pbn, &block_number);
3114 if (result != VDO_SUCCESS)
3117 if (slab->counters[block_number] == EMPTY_REFERENCE_COUNT) {
3118 make_provisional_reference(slab, block_number);
3120 vdo_assign_pbn_lock_provisional_reference(lock);
3123 if (vdo_pbn_lock_has_provisional_reference(lock))
3124 adjust_free_block_count(slab, false);
3129 static int __must_check allocate_slab_block(struct vdo_slab *slab,
3130 physical_block_number_t *block_number_ptr)
3132 slab_block_number free_index;
3134 if (!is_slab_open(slab))
3135 return VDO_INVALID_ADMIN_STATE;
3137 if (!search_reference_blocks(slab, &free_index))
3138 return VDO_NO_SPACE;
3140 ASSERT_LOG_ONLY((slab->counters[free_index] == EMPTY_REFERENCE_COUNT),
3141 "free block must have ref count of zero");
3142 make_provisional_reference(slab, free_index);
3143 adjust_free_block_count(slab, false);
3146 * Update the search hint so the next search will start at the array index just past the
3147 * free block we just found.
3149 slab->search_cursor.index = (free_index + 1);
3151 *block_number_ptr = slab->start + free_index;
3156 * open_slab() - Prepare a slab to be allocated from.
3159 static void open_slab(struct vdo_slab *slab)
3161 reset_search_cursor(slab);
3162 if (is_slab_journal_blank(slab)) {
3163 WRITE_ONCE(slab->allocator->statistics.slabs_opened,
3164 slab->allocator->statistics.slabs_opened + 1);
3165 dirty_all_reference_blocks(slab);
3167 WRITE_ONCE(slab->allocator->statistics.slabs_reopened,
3168 slab->allocator->statistics.slabs_reopened + 1);
3171 slab->allocator->open_slab = slab;
3176 * The block allocated will have a provisional reference and the reference must be either confirmed
3177 * with a subsequent increment or vacated with a subsequent decrement via
3178 * vdo_release_block_reference().
3180 int vdo_allocate_block(struct block_allocator *allocator,
3181 physical_block_number_t *block_number_ptr)
3185 if (allocator->open_slab != NULL) {
3186 /* Try to allocate the next block in the currently open slab. */
3187 result = allocate_slab_block(allocator->open_slab, block_number_ptr);
3188 if ((result == VDO_SUCCESS) || (result != VDO_NO_SPACE))
3191 /* Put the exhausted open slab back into the priority table. */
3192 prioritize_slab(allocator->open_slab);
3195 /* Remove the highest priority slab from the priority table and make it the open slab. */
3196 open_slab(list_entry(vdo_priority_table_dequeue(allocator->prioritized_slabs),
3197 struct vdo_slab, allocq_entry));
3200 * Try allocating again. If we're out of space immediately after opening a slab, then every
3201 * slab must be fully allocated.
3203 return allocate_slab_block(allocator->open_slab, block_number_ptr);
3207 * vdo_enqueue_clean_slab_waiter() - Wait for a clean slab.
3208 * @allocator: The block_allocator on which to wait.
3209 * @waiter: The waiter.
3211 * Return: VDO_SUCCESS if the waiter was queued, VDO_NO_SPACE if there are no slabs to scrub, and
3212 * some other error otherwise.
3214 int vdo_enqueue_clean_slab_waiter(struct block_allocator *allocator,
3215 struct vdo_waiter *waiter)
3217 if (vdo_is_read_only(allocator->depot->vdo))
3218 return VDO_READ_ONLY;
3220 if (vdo_is_state_quiescent(&allocator->scrubber.admin_state))
3221 return VDO_NO_SPACE;
3223 vdo_waitq_enqueue_waiter(&allocator->scrubber.waiters, waiter);
3228 * vdo_modify_reference_count() - Modify the reference count of a block by first making a slab
3229 * journal entry and then updating the reference counter.
3231 * @data_vio: The data_vio for which to add the entry.
3232 * @updater: Which of the data_vio's reference updaters is being submitted.
3234 void vdo_modify_reference_count(struct vdo_completion *completion,
3235 struct reference_updater *updater)
3237 struct vdo_slab *slab = vdo_get_slab(completion->vdo->depot, updater->zpbn.pbn);
3239 if (!is_slab_open(slab)) {
3240 vdo_continue_completion(completion, VDO_INVALID_ADMIN_STATE);
3244 if (vdo_is_read_only(completion->vdo)) {
3245 vdo_continue_completion(completion, VDO_READ_ONLY);
3249 vdo_waitq_enqueue_waiter(&slab->journal.entry_waiters, &updater->waiter);
3250 if ((slab->status != VDO_SLAB_REBUILT) && requires_reaping(&slab->journal))
3251 register_slab_for_scrubbing(slab, true);
3253 add_entries(&slab->journal);
3256 /* Release an unused provisional reference. */
3257 int vdo_release_block_reference(struct block_allocator *allocator,
3258 physical_block_number_t pbn)
3260 struct reference_updater updater;
3262 if (pbn == VDO_ZERO_BLOCK)
3265 updater = (struct reference_updater) {
3266 .operation = VDO_JOURNAL_DATA_REMAPPING,
3273 return adjust_reference_count(vdo_get_slab(allocator->depot, pbn),
3278 * This is a min_heap callback function orders slab_status structures using the 'is_clean' field as
3279 * the primary key and the 'emptiness' field as the secondary key.
3281 * Slabs need to be pushed onto the rings in the same order they are to be popped off. Popping
3282 * should always get the most empty first, so pushing should be from most empty to least empty.
3283 * Thus, the ordering is reversed from the usual sense since min_heap returns smaller elements
3284 * before larger ones.
3286 static bool slab_status_is_less_than(const void *item1, const void *item2)
3288 const struct slab_status *info1 = item1;
3289 const struct slab_status *info2 = item2;
3291 if (info1->is_clean != info2->is_clean)
3292 return info1->is_clean;
3293 if (info1->emptiness != info2->emptiness)
3294 return info1->emptiness > info2->emptiness;
3295 return info1->slab_number < info2->slab_number;
3298 static void swap_slab_statuses(void *item1, void *item2)
3300 struct slab_status *info1 = item1;
3301 struct slab_status *info2 = item2;
3303 swap(*info1, *info2);
3306 static const struct min_heap_callbacks slab_status_min_heap = {
3307 .elem_size = sizeof(struct slab_status),
3308 .less = slab_status_is_less_than,
3309 .swp = swap_slab_statuses,
3312 /* Inform the slab actor that a action has finished on some slab; used by apply_to_slabs(). */
3313 static void slab_action_callback(struct vdo_completion *completion)
3315 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3316 struct slab_actor *actor = &allocator->slab_actor;
3318 if (--actor->slab_action_count == 0) {
3319 actor->callback(completion);
3323 vdo_reset_completion(completion);
3326 /* Preserve the error from part of an action and continue. */
3327 static void handle_operation_error(struct vdo_completion *completion)
3329 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3331 if (allocator->state.waiter != NULL)
3332 vdo_set_completion_result(allocator->state.waiter, completion->result);
3333 completion->callback(completion);
3336 /* Perform an action on each of an allocator's slabs in parallel. */
3337 static void apply_to_slabs(struct block_allocator *allocator, vdo_action_fn callback)
3339 struct slab_iterator iterator;
3341 vdo_prepare_completion(&allocator->completion, slab_action_callback,
3342 handle_operation_error, allocator->thread_id, NULL);
3343 allocator->completion.requeue = false;
3346 * Since we are going to dequeue all of the slabs, the open slab will become invalid, so
3349 allocator->open_slab = NULL;
3351 /* Ensure that we don't finish before we're done starting. */
3352 allocator->slab_actor = (struct slab_actor) {
3353 .slab_action_count = 1,
3354 .callback = callback,
3357 iterator = get_slab_iterator(allocator);
3358 while (iterator.next != NULL) {
3359 const struct admin_state_code *operation =
3360 vdo_get_admin_state_code(&allocator->state);
3361 struct vdo_slab *slab = next_slab(&iterator);
3363 list_del_init(&slab->allocq_entry);
3364 allocator->slab_actor.slab_action_count++;
3365 vdo_start_operation_with_waiter(&slab->state, operation,
3366 &allocator->completion,
3367 initiate_slab_action);
3370 slab_action_callback(&allocator->completion);
3373 static void finish_loading_allocator(struct vdo_completion *completion)
3375 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3376 const struct admin_state_code *operation =
3377 vdo_get_admin_state_code(&allocator->state);
3379 if (allocator->eraser != NULL)
3380 dm_kcopyd_client_destroy(uds_forget(allocator->eraser));
3382 if (operation == VDO_ADMIN_STATE_LOADING_FOR_RECOVERY) {
3384 vdo_get_current_action_context(allocator->depot->action_manager);
3386 vdo_replay_into_slab_journals(allocator, context);
3390 vdo_finish_loading(&allocator->state);
3393 static void erase_next_slab_journal(struct block_allocator *allocator);
3395 static void copy_callback(int read_err, unsigned long write_err, void *context)
3397 struct block_allocator *allocator = context;
3398 int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO);
3400 if (result != VDO_SUCCESS) {
3401 vdo_fail_completion(&allocator->completion, result);
3405 erase_next_slab_journal(allocator);
3408 /* erase_next_slab_journal() - Erase the next slab journal. */
3409 static void erase_next_slab_journal(struct block_allocator *allocator)
3411 struct vdo_slab *slab;
3412 physical_block_number_t pbn;
3413 struct dm_io_region regions[1];
3414 struct slab_depot *depot = allocator->depot;
3415 block_count_t blocks = depot->slab_config.slab_journal_blocks;
3417 if (allocator->slabs_to_erase.next == NULL) {
3418 vdo_finish_completion(&allocator->completion);
3422 slab = next_slab(&allocator->slabs_to_erase);
3423 pbn = slab->journal_origin - depot->vdo->geometry.bio_offset;
3424 regions[0] = (struct dm_io_region) {
3425 .bdev = vdo_get_backing_device(depot->vdo),
3426 .sector = pbn * VDO_SECTORS_PER_BLOCK,
3427 .count = blocks * VDO_SECTORS_PER_BLOCK,
3429 dm_kcopyd_zero(allocator->eraser, 1, regions, 0, copy_callback, allocator);
3432 /* Implements vdo_admin_initiator_fn. */
3433 static void initiate_load(struct admin_state *state)
3435 struct block_allocator *allocator =
3436 container_of(state, struct block_allocator, state);
3437 const struct admin_state_code *operation = vdo_get_admin_state_code(state);
3439 if (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD) {
3441 * Must requeue because the kcopyd client cannot be freed in the same stack frame
3442 * as the kcopyd callback, lest it deadlock.
3444 vdo_prepare_completion_for_requeue(&allocator->completion,
3445 finish_loading_allocator,
3446 handle_operation_error,
3447 allocator->thread_id, NULL);
3448 allocator->eraser = dm_kcopyd_client_create(NULL);
3449 if (IS_ERR(allocator->eraser)) {
3450 vdo_fail_completion(&allocator->completion,
3451 PTR_ERR(allocator->eraser));
3452 allocator->eraser = NULL;
3455 allocator->slabs_to_erase = get_slab_iterator(allocator);
3457 erase_next_slab_journal(allocator);
3461 apply_to_slabs(allocator, finish_loading_allocator);
3465 * vdo_notify_slab_journals_are_recovered() - Inform a block allocator that its slab journals have
3466 * been recovered from the recovery journal.
3467 * @completion The allocator completion
3469 void vdo_notify_slab_journals_are_recovered(struct vdo_completion *completion)
3471 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3473 vdo_finish_loading_with_result(&allocator->state, completion->result);
3476 static int get_slab_statuses(struct block_allocator *allocator,
3477 struct slab_status **statuses_ptr)
3480 struct slab_status *statuses;
3481 struct slab_iterator iterator = get_slab_iterator(allocator);
3483 result = uds_allocate(allocator->slab_count, struct slab_status, __func__,
3485 if (result != VDO_SUCCESS)
3488 *statuses_ptr = statuses;
3490 while (iterator.next != NULL) {
3491 slab_count_t slab_number = next_slab(&iterator)->slab_number;
3493 *statuses++ = (struct slab_status) {
3494 .slab_number = slab_number,
3495 .is_clean = !allocator->summary_entries[slab_number].is_dirty,
3496 .emptiness = allocator->summary_entries[slab_number].fullness_hint,
3503 /* Prepare slabs for allocation or scrubbing. */
3504 static int __must_check vdo_prepare_slabs_for_allocation(struct block_allocator *allocator)
3506 struct slab_status current_slab_status;
3507 struct min_heap heap;
3509 struct slab_status *slab_statuses;
3510 struct slab_depot *depot = allocator->depot;
3512 WRITE_ONCE(allocator->allocated_blocks,
3513 allocator->slab_count * depot->slab_config.data_blocks);
3514 result = get_slab_statuses(allocator, &slab_statuses);
3515 if (result != VDO_SUCCESS)
3518 /* Sort the slabs by cleanliness, then by emptiness hint. */
3519 heap = (struct min_heap) {
3520 .data = slab_statuses,
3521 .nr = allocator->slab_count,
3522 .size = allocator->slab_count,
3524 min_heapify_all(&heap, &slab_status_min_heap);
3526 while (heap.nr > 0) {
3528 struct vdo_slab *slab;
3529 struct slab_journal *journal;
3531 current_slab_status = slab_statuses[0];
3532 min_heap_pop(&heap, &slab_status_min_heap);
3533 slab = depot->slabs[current_slab_status.slab_number];
3535 if ((depot->load_type == VDO_SLAB_DEPOT_REBUILD_LOAD) ||
3536 (!allocator->summary_entries[slab->slab_number].load_ref_counts &&
3537 current_slab_status.is_clean)) {
3542 slab->status = VDO_SLAB_REQUIRES_SCRUBBING;
3543 journal = &slab->journal;
3544 high_priority = ((current_slab_status.is_clean &&
3545 (depot->load_type == VDO_SLAB_DEPOT_NORMAL_LOAD)) ||
3546 (journal_length(journal) >= journal->scrubbing_threshold));
3547 register_slab_for_scrubbing(slab, high_priority);
3550 uds_free(slab_statuses);
3554 static const char *status_to_string(enum slab_rebuild_status status)
3557 case VDO_SLAB_REBUILT:
3559 case VDO_SLAB_REQUIRES_SCRUBBING:
3561 case VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING:
3562 return "PRIORITY_SCRUBBING";
3563 case VDO_SLAB_REBUILDING:
3564 return "REBUILDING";
3565 case VDO_SLAB_REPLAYING:
3572 void vdo_dump_block_allocator(const struct block_allocator *allocator)
3574 unsigned int pause_counter = 0;
3575 struct slab_iterator iterator = get_slab_iterator(allocator);
3576 const struct slab_scrubber *scrubber = &allocator->scrubber;
3578 uds_log_info("block_allocator zone %u", allocator->zone_number);
3579 while (iterator.next != NULL) {
3580 struct vdo_slab *slab = next_slab(&iterator);
3581 struct slab_journal *journal = &slab->journal;
3583 if (slab->reference_blocks != NULL) {
3584 /* Terse because there are a lot of slabs to dump and syslog is lossy. */
3585 uds_log_info("slab %u: P%u, %llu free", slab->slab_number,
3587 (unsigned long long) slab->free_blocks);
3589 uds_log_info("slab %u: status %s", slab->slab_number,
3590 status_to_string(slab->status));
3593 uds_log_info(" slab journal: entry_waiters=%zu waiting_to_commit=%s updating_slab_summary=%s head=%llu unreapable=%llu tail=%llu next_commit=%llu summarized=%llu last_summarized=%llu recovery_lock=%llu dirty=%s",
3594 vdo_waitq_num_waiters(&journal->entry_waiters),
3595 uds_bool_to_string(journal->waiting_to_commit),
3596 uds_bool_to_string(journal->updating_slab_summary),
3597 (unsigned long long) journal->head,
3598 (unsigned long long) journal->unreapable,
3599 (unsigned long long) journal->tail,
3600 (unsigned long long) journal->next_commit,
3601 (unsigned long long) journal->summarized,
3602 (unsigned long long) journal->last_summarized,
3603 (unsigned long long) journal->recovery_lock,
3604 uds_bool_to_string(journal->recovery_lock != 0));
3606 * Given the frequency with which the locks are just a tiny bit off, it might be
3607 * worth dumping all the locks, but that might be too much logging.
3610 if (slab->counters != NULL) {
3611 /* Terse because there are a lot of slabs to dump and syslog is lossy. */
3612 uds_log_info(" slab: free=%u/%u blocks=%u dirty=%zu active=%zu journal@(%llu,%u)",
3613 slab->free_blocks, slab->block_count,
3614 slab->reference_block_count,
3615 vdo_waitq_num_waiters(&slab->dirty_blocks),
3617 (unsigned long long) slab->slab_journal_point.sequence_number,
3618 slab->slab_journal_point.entry_count);
3620 uds_log_info(" no counters");
3624 * Wait for a while after each batch of 32 slabs dumped, an arbitrary number,
3625 * allowing the kernel log a chance to be flushed instead of being overrun.
3627 if (pause_counter++ == 31) {
3629 uds_pause_for_logger();
3633 uds_log_info("slab_scrubber slab_count %u waiters %zu %s%s",
3634 READ_ONCE(scrubber->slab_count),
3635 vdo_waitq_num_waiters(&scrubber->waiters),
3636 vdo_get_admin_state_code(&scrubber->admin_state)->name,
3637 scrubber->high_priority_only ? ", high_priority_only " : "");
3640 static void free_slab(struct vdo_slab *slab)
3645 list_del(&slab->allocq_entry);
3646 uds_free(uds_forget(slab->journal.block));
3647 uds_free(uds_forget(slab->journal.locks));
3648 uds_free(uds_forget(slab->counters));
3649 uds_free(uds_forget(slab->reference_blocks));
3653 static int initialize_slab_journal(struct vdo_slab *slab)
3655 struct slab_journal *journal = &slab->journal;
3656 const struct slab_config *slab_config = &slab->allocator->depot->slab_config;
3659 result = uds_allocate(slab_config->slab_journal_blocks, struct journal_lock,
3660 __func__, &journal->locks);
3661 if (result != VDO_SUCCESS)
3664 result = uds_allocate(VDO_BLOCK_SIZE, char, "struct packed_slab_journal_block",
3665 (char **) &journal->block);
3666 if (result != VDO_SUCCESS)
3669 journal->slab = slab;
3670 journal->size = slab_config->slab_journal_blocks;
3671 journal->flushing_threshold = slab_config->slab_journal_flushing_threshold;
3672 journal->blocking_threshold = slab_config->slab_journal_blocking_threshold;
3673 journal->scrubbing_threshold = slab_config->slab_journal_scrubbing_threshold;
3674 journal->entries_per_block = VDO_SLAB_JOURNAL_ENTRIES_PER_BLOCK;
3675 journal->full_entries_per_block = VDO_SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK;
3676 journal->events = &slab->allocator->slab_journal_statistics;
3677 journal->recovery_journal = slab->allocator->depot->vdo->recovery_journal;
3681 journal->flushing_deadline = journal->flushing_threshold;
3683 * Set there to be some time between the deadline and the blocking threshold, so that
3684 * hopefully all are done before blocking.
3686 if ((journal->blocking_threshold - journal->flushing_threshold) > 5)
3687 journal->flushing_deadline = journal->blocking_threshold - 5;
3689 journal->slab_summary_waiter.callback = release_journal_locks;
3691 INIT_LIST_HEAD(&journal->dirty_entry);
3692 INIT_LIST_HEAD(&journal->uncommitted_blocks);
3694 journal->tail_header.nonce = slab->allocator->nonce;
3695 journal->tail_header.metadata_type = VDO_METADATA_SLAB_JOURNAL;
3696 initialize_journal_state(journal);
3701 * make_slab() - Construct a new, empty slab.
3702 * @slab_origin: The physical block number within the block allocator partition of the first block
3704 * @allocator: The block allocator to which the slab belongs.
3705 * @slab_number: The slab number of the slab.
3706 * @is_new: true if this slab is being allocated as part of a resize.
3707 * @slab_ptr: A pointer to receive the new slab.
3709 * Return: VDO_SUCCESS or an error code.
3711 static int __must_check make_slab(physical_block_number_t slab_origin,
3712 struct block_allocator *allocator,
3713 slab_count_t slab_number, bool is_new,
3714 struct vdo_slab **slab_ptr)
3716 const struct slab_config *slab_config = &allocator->depot->slab_config;
3717 struct vdo_slab *slab;
3720 result = uds_allocate(1, struct vdo_slab, __func__, &slab);
3721 if (result != VDO_SUCCESS)
3724 *slab = (struct vdo_slab) {
3725 .allocator = allocator,
3726 .start = slab_origin,
3727 .end = slab_origin + slab_config->slab_blocks,
3728 .slab_number = slab_number,
3729 .ref_counts_origin = slab_origin + slab_config->data_blocks,
3731 vdo_get_slab_journal_start_block(slab_config, slab_origin),
3732 .block_count = slab_config->data_blocks,
3733 .free_blocks = slab_config->data_blocks,
3734 .reference_block_count =
3735 vdo_get_saved_reference_count_size(slab_config->data_blocks),
3737 INIT_LIST_HEAD(&slab->allocq_entry);
3739 result = initialize_slab_journal(slab);
3740 if (result != VDO_SUCCESS) {
3746 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NEW);
3747 result = allocate_slab_counters(slab);
3748 if (result != VDO_SUCCESS) {
3753 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
3761 * allocate_slabs() - Allocate a new slab pointer array.
3762 * @depot: The depot.
3763 * @slab_count: The number of slabs the depot should have in the new array.
3765 * Any existing slab pointers will be copied into the new array, and slabs will be allocated as
3766 * needed. The newly allocated slabs will not be distributed for use by the block allocators.
3768 * Return: VDO_SUCCESS or an error code.
3770 static int allocate_slabs(struct slab_depot *depot, slab_count_t slab_count)
3772 block_count_t slab_size;
3773 bool resizing = false;
3774 physical_block_number_t slab_origin;
3777 result = uds_allocate(slab_count, struct vdo_slab *,
3778 "slab pointer array", &depot->new_slabs);
3779 if (result != VDO_SUCCESS)
3782 if (depot->slabs != NULL) {
3783 memcpy(depot->new_slabs, depot->slabs,
3784 depot->slab_count * sizeof(struct vdo_slab *));
3788 slab_size = depot->slab_config.slab_blocks;
3789 slab_origin = depot->first_block + (depot->slab_count * slab_size);
3791 for (depot->new_slab_count = depot->slab_count;
3792 depot->new_slab_count < slab_count;
3793 depot->new_slab_count++, slab_origin += slab_size) {
3794 struct block_allocator *allocator =
3795 &depot->allocators[depot->new_slab_count % depot->zone_count];
3796 struct vdo_slab **slab_ptr = &depot->new_slabs[depot->new_slab_count];
3798 result = make_slab(slab_origin, allocator, depot->new_slab_count,
3799 resizing, slab_ptr);
3800 if (result != VDO_SUCCESS)
3808 * vdo_abandon_new_slabs() - Abandon any new slabs in this depot, freeing them as needed.
3809 * @depot: The depot.
3811 void vdo_abandon_new_slabs(struct slab_depot *depot)
3815 if (depot->new_slabs == NULL)
3818 for (i = depot->slab_count; i < depot->new_slab_count; i++)
3819 free_slab(uds_forget(depot->new_slabs[i]));
3820 depot->new_slab_count = 0;
3821 depot->new_size = 0;
3822 uds_free(uds_forget(depot->new_slabs));
3826 * get_allocator_thread_id() - Get the ID of the thread on which a given allocator operates.
3828 * Implements vdo_zone_thread_getter_fn.
3830 static thread_id_t get_allocator_thread_id(void *context, zone_count_t zone_number)
3832 return ((struct slab_depot *) context)->allocators[zone_number].thread_id;
3836 * release_recovery_journal_lock() - Request the slab journal to release the recovery journal lock
3837 * it may hold on a specified recovery journal block.
3838 * @journal: The slab journal.
3839 * @recovery_lock: The sequence number of the recovery journal block whose locks should be
3842 * Return: true if the journal does hold a lock on the specified block (which it will release).
3844 static bool __must_check release_recovery_journal_lock(struct slab_journal *journal,
3845 sequence_number_t recovery_lock)
3847 if (recovery_lock > journal->recovery_lock) {
3848 ASSERT_LOG_ONLY((recovery_lock < journal->recovery_lock),
3849 "slab journal recovery lock is not older than the recovery journal head");
3853 if ((recovery_lock < journal->recovery_lock) ||
3854 vdo_is_read_only(journal->slab->allocator->depot->vdo))
3857 /* All locks are held by the block which is in progress; write it. */
3858 commit_tail(journal);
3863 * Request a commit of all dirty tail blocks which are locking the recovery journal block the depot
3864 * is seeking to release.
3866 * Implements vdo_zone_action_fn.
3868 static void release_tail_block_locks(void *context, zone_count_t zone_number,
3869 struct vdo_completion *parent)
3871 struct slab_journal *journal, *tmp;
3872 struct slab_depot *depot = context;
3873 struct list_head *list = &depot->allocators[zone_number].dirty_slab_journals;
3875 list_for_each_entry_safe(journal, tmp, list, dirty_entry) {
3876 if (!release_recovery_journal_lock(journal,
3877 depot->active_release_request))
3881 vdo_finish_completion(parent);
3885 * prepare_for_tail_block_commit() - Prepare to commit oldest tail blocks.
3887 * Implements vdo_action_preamble_fn.
3889 static void prepare_for_tail_block_commit(void *context, struct vdo_completion *parent)
3891 struct slab_depot *depot = context;
3893 depot->active_release_request = depot->new_release_request;
3894 vdo_finish_completion(parent);
3898 * schedule_tail_block_commit() - Schedule a tail block commit if necessary.
3900 * This method should not be called directly. Rather, call vdo_schedule_default_action() on the
3901 * depot's action manager.
3903 * Implements vdo_action_scheduler_fn.
3905 static bool schedule_tail_block_commit(void *context)
3907 struct slab_depot *depot = context;
3909 if (depot->new_release_request == depot->active_release_request)
3912 return vdo_schedule_action(depot->action_manager,
3913 prepare_for_tail_block_commit,
3914 release_tail_block_locks,
3919 * initialize_slab_scrubber() - Initialize an allocator's slab scrubber.
3920 * @allocator: The allocator being initialized
3922 * Return: VDO_SUCCESS or an error.
3924 static int initialize_slab_scrubber(struct block_allocator *allocator)
3926 struct slab_scrubber *scrubber = &allocator->scrubber;
3927 block_count_t slab_journal_size =
3928 allocator->depot->slab_config.slab_journal_blocks;
3932 result = uds_allocate(VDO_BLOCK_SIZE * slab_journal_size,
3933 char, __func__, &journal_data);
3934 if (result != VDO_SUCCESS)
3937 result = allocate_vio_components(allocator->completion.vdo,
3938 VIO_TYPE_SLAB_JOURNAL,
3939 VIO_PRIORITY_METADATA,
3940 allocator, slab_journal_size,
3941 journal_data, &scrubber->vio);
3942 if (result != VDO_SUCCESS) {
3943 uds_free(journal_data);
3947 INIT_LIST_HEAD(&scrubber->high_priority_slabs);
3948 INIT_LIST_HEAD(&scrubber->slabs);
3949 vdo_set_admin_state_code(&scrubber->admin_state, VDO_ADMIN_STATE_SUSPENDED);
3954 * initialize_slab_summary_block() - Initialize a slab_summary_block.
3955 * @allocator: The allocator which owns the block.
3956 * @index: The index of this block in its zone's summary.
3958 * Return: VDO_SUCCESS or an error.
3960 static int __must_check initialize_slab_summary_block(struct block_allocator *allocator,
3961 block_count_t index)
3963 struct slab_summary_block *block = &allocator->summary_blocks[index];
3966 result = uds_allocate(VDO_BLOCK_SIZE, char, __func__, &block->outgoing_entries);
3967 if (result != VDO_SUCCESS)
3970 result = allocate_vio_components(allocator->depot->vdo, VIO_TYPE_SLAB_SUMMARY,
3971 VIO_PRIORITY_METADATA, NULL, 1,
3972 block->outgoing_entries, &block->vio);
3973 if (result != VDO_SUCCESS)
3976 block->allocator = allocator;
3977 block->entries = &allocator->summary_entries[VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK * index];
3978 block->index = index;
3982 static int __must_check initialize_block_allocator(struct slab_depot *depot,
3987 struct block_allocator *allocator = &depot->allocators[zone];
3988 struct vdo *vdo = depot->vdo;
3989 block_count_t max_free_blocks = depot->slab_config.data_blocks;
3990 unsigned int max_priority = (2 + ilog2(max_free_blocks));
3992 *allocator = (struct block_allocator) {
3994 .zone_number = zone,
3995 .thread_id = vdo->thread_config.physical_threads[zone],
3996 .nonce = vdo->states.vdo.nonce,
3999 INIT_LIST_HEAD(&allocator->dirty_slab_journals);
4000 vdo_set_admin_state_code(&allocator->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
4001 result = vdo_register_read_only_listener(vdo, allocator,
4002 notify_block_allocator_of_read_only_mode,
4003 allocator->thread_id);
4004 if (result != VDO_SUCCESS)
4007 vdo_initialize_completion(&allocator->completion, vdo, VDO_BLOCK_ALLOCATOR_COMPLETION);
4008 result = make_vio_pool(vdo, BLOCK_ALLOCATOR_VIO_POOL_SIZE, allocator->thread_id,
4009 VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
4010 allocator, &allocator->vio_pool);
4011 if (result != VDO_SUCCESS)
4014 result = initialize_slab_scrubber(allocator);
4015 if (result != VDO_SUCCESS)
4018 result = vdo_make_priority_table(max_priority, &allocator->prioritized_slabs);
4019 if (result != VDO_SUCCESS)
4022 result = uds_allocate(VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE,
4023 struct slab_summary_block, __func__,
4024 &allocator->summary_blocks);
4025 if (result != VDO_SUCCESS)
4028 vdo_set_admin_state_code(&allocator->summary_state,
4029 VDO_ADMIN_STATE_NORMAL_OPERATION);
4030 allocator->summary_entries = depot->summary_entries + (MAX_VDO_SLABS * zone);
4032 /* Initialize each summary block. */
4033 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4034 result = initialize_slab_summary_block(allocator, i);
4035 if (result != VDO_SUCCESS)
4040 * Performing well atop thin provisioned storage requires either that VDO discards freed
4041 * blocks, or that the block allocator try to use slabs that already have allocated blocks
4042 * in preference to slabs that have never been opened. For reasons we have not been able to
4043 * fully understand, some SSD machines have been have been very sensitive (50% reduction in
4044 * test throughput) to very slight differences in the timing and locality of block
4045 * allocation. Assigning a low priority to unopened slabs (max_priority/2, say) would be
4046 * ideal for the story, but anything less than a very high threshold (max_priority - 1)
4047 * hurts on these machines.
4049 * This sets the free block threshold for preferring to open an unopened slab to the binary
4050 * floor of 3/4ths the total number of data blocks in a slab, which will generally evaluate
4051 * to about half the slab size.
4053 allocator->unopened_slab_priority = (1 + ilog2((max_free_blocks * 3) / 4));
4058 static int allocate_components(struct slab_depot *depot,
4059 struct partition *summary_partition)
4063 slab_count_t slab_count;
4066 const struct thread_config *thread_config = &depot->vdo->thread_config;
4068 result = vdo_make_action_manager(depot->zone_count, get_allocator_thread_id,
4069 thread_config->journal_thread, depot,
4070 schedule_tail_block_commit,
4071 depot->vdo, &depot->action_manager);
4072 if (result != VDO_SUCCESS)
4075 depot->origin = depot->first_block;
4077 /* block size must be a multiple of entry size */
4078 BUILD_BUG_ON((VDO_BLOCK_SIZE % sizeof(struct slab_summary_entry)) != 0);
4080 depot->summary_origin = summary_partition->offset;
4081 depot->hint_shift = vdo_get_slab_summary_hint_shift(depot->slab_size_shift);
4082 result = uds_allocate(MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES,
4083 struct slab_summary_entry, __func__,
4084 &depot->summary_entries);
4085 if (result != VDO_SUCCESS)
4089 /* Initialize all the entries. */
4090 hint = compute_fullness_hint(depot, depot->slab_config.data_blocks);
4091 for (i = 0; i < MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES; i++) {
4093 * This default tail block offset must be reflected in
4094 * slabJournal.c::read_slab_journal_tail().
4096 depot->summary_entries[i] = (struct slab_summary_entry) {
4097 .tail_block_offset = 0,
4098 .fullness_hint = hint,
4099 .load_ref_counts = false,
4104 if (result != VDO_SUCCESS)
4107 slab_count = vdo_compute_slab_count(depot->first_block, depot->last_block,
4108 depot->slab_size_shift);
4109 if (thread_config->physical_zone_count > slab_count) {
4110 return uds_log_error_strerror(VDO_BAD_CONFIGURATION,
4111 "%u physical zones exceeds slab count %u",
4112 thread_config->physical_zone_count,
4116 /* Initialize the block allocators. */
4117 for (zone = 0; zone < depot->zone_count; zone++) {
4118 result = initialize_block_allocator(depot, zone);
4119 if (result != VDO_SUCCESS)
4123 /* Allocate slabs. */
4124 result = allocate_slabs(depot, slab_count);
4125 if (result != VDO_SUCCESS)
4128 /* Use the new slabs. */
4129 for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4130 struct vdo_slab *slab = depot->new_slabs[i];
4132 register_slab_with_allocator(slab->allocator, slab);
4133 WRITE_ONCE(depot->slab_count, depot->slab_count + 1);
4136 depot->slabs = depot->new_slabs;
4137 depot->new_slabs = NULL;
4138 depot->new_slab_count = 0;
4144 * vdo_decode_slab_depot() - Make a slab depot and configure it with the state read from the super
4146 * @state: The slab depot state from the super block.
4147 * @vdo: The VDO which will own the depot.
4148 * @summary_partition: The partition which holds the slab summary.
4149 * @depot_ptr: A pointer to hold the depot.
4151 * Return: A success or error code.
4153 int vdo_decode_slab_depot(struct slab_depot_state_2_0 state, struct vdo *vdo,
4154 struct partition *summary_partition,
4155 struct slab_depot **depot_ptr)
4157 unsigned int slab_size_shift;
4158 struct slab_depot *depot;
4162 * Calculate the bit shift for efficiently mapping block numbers to slabs. Using a shift
4163 * requires that the slab size be a power of two.
4165 block_count_t slab_size = state.slab_config.slab_blocks;
4167 if (!is_power_of_2(slab_size)) {
4168 return uds_log_error_strerror(UDS_INVALID_ARGUMENT,
4169 "slab size must be a power of two");
4171 slab_size_shift = ilog2(slab_size);
4173 result = uds_allocate_extended(struct slab_depot,
4174 vdo->thread_config.physical_zone_count,
4175 struct block_allocator, __func__, &depot);
4176 if (result != VDO_SUCCESS)
4180 depot->old_zone_count = state.zone_count;
4181 depot->zone_count = vdo->thread_config.physical_zone_count;
4182 depot->slab_config = state.slab_config;
4183 depot->first_block = state.first_block;
4184 depot->last_block = state.last_block;
4185 depot->slab_size_shift = slab_size_shift;
4187 result = allocate_components(depot, summary_partition);
4188 if (result != VDO_SUCCESS) {
4189 vdo_free_slab_depot(depot);
4197 static void uninitialize_allocator_summary(struct block_allocator *allocator)
4201 if (allocator->summary_blocks == NULL)
4204 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4205 free_vio_components(&allocator->summary_blocks[i].vio);
4206 uds_free(uds_forget(allocator->summary_blocks[i].outgoing_entries));
4209 uds_free(uds_forget(allocator->summary_blocks));
4213 * vdo_free_slab_depot() - Destroy a slab depot.
4214 * @depot: The depot to destroy.
4216 void vdo_free_slab_depot(struct slab_depot *depot)
4218 zone_count_t zone = 0;
4223 vdo_abandon_new_slabs(depot);
4225 for (zone = 0; zone < depot->zone_count; zone++) {
4226 struct block_allocator *allocator = &depot->allocators[zone];
4228 if (allocator->eraser != NULL)
4229 dm_kcopyd_client_destroy(uds_forget(allocator->eraser));
4231 uninitialize_allocator_summary(allocator);
4232 uninitialize_scrubber_vio(&allocator->scrubber);
4233 free_vio_pool(uds_forget(allocator->vio_pool));
4234 vdo_free_priority_table(uds_forget(allocator->prioritized_slabs));
4237 if (depot->slabs != NULL) {
4240 for (i = 0; i < depot->slab_count; i++)
4241 free_slab(uds_forget(depot->slabs[i]));
4244 uds_free(uds_forget(depot->slabs));
4245 uds_free(uds_forget(depot->action_manager));
4246 uds_free(uds_forget(depot->summary_entries));
4251 * vdo_record_slab_depot() - Record the state of a slab depot for encoding into the super block.
4252 * @depot: The depot to encode.
4254 * Return: The depot state.
4256 struct slab_depot_state_2_0 vdo_record_slab_depot(const struct slab_depot *depot)
4259 * If this depot is currently using 0 zones, it must have been synchronously loaded by a
4260 * tool and is now being saved. We did not load and combine the slab summary, so we still
4261 * need to do that next time we load with the old zone count rather than 0.
4263 struct slab_depot_state_2_0 state;
4264 zone_count_t zones_to_record = depot->zone_count;
4266 if (depot->zone_count == 0)
4267 zones_to_record = depot->old_zone_count;
4269 state = (struct slab_depot_state_2_0) {
4270 .slab_config = depot->slab_config,
4271 .first_block = depot->first_block,
4272 .last_block = depot->last_block,
4273 .zone_count = zones_to_record,
4280 * vdo_allocate_reference_counters() - Allocate the reference counters for all slabs in the depot.
4282 * Context: This method may be called only before entering normal operation from the load thread.
4284 * Return: VDO_SUCCESS or an error.
4286 int vdo_allocate_reference_counters(struct slab_depot *depot)
4288 struct slab_iterator iterator =
4289 get_depot_slab_iterator(depot, depot->slab_count - 1, 0, 1);
4291 while (iterator.next != NULL) {
4292 int result = allocate_slab_counters(next_slab(&iterator));
4294 if (result != VDO_SUCCESS)
4302 * get_slab_number() - Get the number of the slab that contains a specified block.
4303 * @depot: The slab depot.
4304 * @pbn: The physical block number.
4305 * @slab_number_ptr: A pointer to hold the slab number.
4307 * Return: VDO_SUCCESS or an error.
4309 static int __must_check get_slab_number(const struct slab_depot *depot,
4310 physical_block_number_t pbn,
4311 slab_count_t *slab_number_ptr)
4313 slab_count_t slab_number;
4315 if (pbn < depot->first_block)
4316 return VDO_OUT_OF_RANGE;
4318 slab_number = (pbn - depot->first_block) >> depot->slab_size_shift;
4319 if (slab_number >= depot->slab_count)
4320 return VDO_OUT_OF_RANGE;
4322 *slab_number_ptr = slab_number;
4327 * vdo_get_slab() - Get the slab object for the slab that contains a specified block.
4328 * @depot: The slab depot.
4329 * @pbn: The physical block number.
4331 * Will put the VDO in read-only mode if the PBN is not a valid data block nor the zero block.
4333 * Return: The slab containing the block, or NULL if the block number is the zero block or
4334 * otherwise out of range.
4336 struct vdo_slab *vdo_get_slab(const struct slab_depot *depot,
4337 physical_block_number_t pbn)
4339 slab_count_t slab_number;
4342 if (pbn == VDO_ZERO_BLOCK)
4345 result = get_slab_number(depot, pbn, &slab_number);
4346 if (result != VDO_SUCCESS) {
4347 vdo_enter_read_only_mode(depot->vdo, result);
4351 return depot->slabs[slab_number];
4355 * vdo_get_increment_limit() - Determine how many new references a block can acquire.
4356 * @depot: The slab depot.
4357 * @pbn: The physical block number that is being queried.
4359 * Context: This method must be called from the physical zone thread of the PBN.
4361 * Return: The number of available references.
4363 u8 vdo_get_increment_limit(struct slab_depot *depot, physical_block_number_t pbn)
4365 struct vdo_slab *slab = vdo_get_slab(depot, pbn);
4366 vdo_refcount_t *counter_ptr = NULL;
4369 if ((slab == NULL) || (slab->status != VDO_SLAB_REBUILT))
4372 result = get_reference_counter(slab, pbn, &counter_ptr);
4373 if (result != VDO_SUCCESS)
4376 if (*counter_ptr == PROVISIONAL_REFERENCE_COUNT)
4377 return (MAXIMUM_REFERENCE_COUNT - 1);
4379 return (MAXIMUM_REFERENCE_COUNT - *counter_ptr);
4383 * vdo_is_physical_data_block() - Determine whether the given PBN refers to a data block.
4384 * @depot: The depot.
4385 * @pbn: The physical block number to ask about.
4387 * Return: True if the PBN corresponds to a data block.
4389 bool vdo_is_physical_data_block(const struct slab_depot *depot,
4390 physical_block_number_t pbn)
4392 slab_count_t slab_number;
4393 slab_block_number sbn;
4395 return ((pbn == VDO_ZERO_BLOCK) ||
4396 ((get_slab_number(depot, pbn, &slab_number) == VDO_SUCCESS) &&
4397 (slab_block_number_from_pbn(depot->slabs[slab_number], pbn, &sbn) ==
4402 * vdo_get_slab_depot_allocated_blocks() - Get the total number of data blocks allocated across all
4403 * the slabs in the depot.
4404 * @depot: The slab depot.
4406 * This is the total number of blocks with a non-zero reference count.
4408 * Context: This may be called from any thread.
4410 * Return: The total number of blocks with a non-zero reference count.
4412 block_count_t vdo_get_slab_depot_allocated_blocks(const struct slab_depot *depot)
4414 block_count_t total = 0;
4417 for (zone = 0; zone < depot->zone_count; zone++) {
4418 /* The allocators are responsible for thread safety. */
4419 total += READ_ONCE(depot->allocators[zone].allocated_blocks);
4426 * vdo_get_slab_depot_data_blocks() - Get the total number of data blocks in all the slabs in the
4428 * @depot: The slab depot.
4430 * Context: This may be called from any thread.
4432 * Return: The total number of data blocks in all slabs.
4434 block_count_t vdo_get_slab_depot_data_blocks(const struct slab_depot *depot)
4436 return (READ_ONCE(depot->slab_count) * depot->slab_config.data_blocks);
4440 * finish_combining_zones() - Clean up after saving out the combined slab summary.
4441 * @completion: The vio which was used to write the summary data.
4443 static void finish_combining_zones(struct vdo_completion *completion)
4445 int result = completion->result;
4446 struct vdo_completion *parent = completion->parent;
4448 free_vio(as_vio(uds_forget(completion)));
4449 vdo_fail_completion(parent, result);
4452 static void handle_combining_error(struct vdo_completion *completion)
4454 vio_record_metadata_io_error(as_vio(completion));
4455 finish_combining_zones(completion);
4458 static void write_summary_endio(struct bio *bio)
4460 struct vio *vio = bio->bi_private;
4461 struct vdo *vdo = vio->completion.vdo;
4463 continue_vio_after_io(vio, finish_combining_zones,
4464 vdo->thread_config.admin_thread);
4468 * combine_summaries() - Treating the current entries buffer as the on-disk value of all zones,
4469 * update every zone to the correct values for every slab.
4470 * @depot: The depot whose summary entries should be combined.
4472 static void combine_summaries(struct slab_depot *depot)
4475 * Combine all the old summary data into the portion of the buffer corresponding to the
4478 zone_count_t zone = 0;
4479 struct slab_summary_entry *entries = depot->summary_entries;
4481 if (depot->old_zone_count > 1) {
4482 slab_count_t entry_number;
4484 for (entry_number = 0; entry_number < MAX_VDO_SLABS; entry_number++) {
4486 memcpy(entries + entry_number,
4487 entries + (zone * MAX_VDO_SLABS) + entry_number,
4488 sizeof(struct slab_summary_entry));
4492 if (zone == depot->old_zone_count)
4497 /* Copy the combined data to each zones's region of the buffer. */
4498 for (zone = 1; zone < MAX_VDO_PHYSICAL_ZONES; zone++) {
4499 memcpy(entries + (zone * MAX_VDO_SLABS), entries,
4500 MAX_VDO_SLABS * sizeof(struct slab_summary_entry));
4505 * finish_loading_summary() - Finish loading slab summary data.
4506 * @completion: The vio which was used to read the summary data.
4508 * Combines the slab summary data from all the previously written zones and copies the combined
4509 * summary to each partition's data region. Then writes the combined summary back out to disk. This
4510 * callback is registered in load_summary_endio().
4512 static void finish_loading_summary(struct vdo_completion *completion)
4514 struct slab_depot *depot = completion->vdo->depot;
4516 /* Combine the summary from each zone so each zone is correct for all slabs. */
4517 combine_summaries(depot);
4519 /* Write the combined summary back out. */
4520 vdo_submit_metadata_vio(as_vio(completion), depot->summary_origin,
4521 write_summary_endio, handle_combining_error,
4525 static void load_summary_endio(struct bio *bio)
4527 struct vio *vio = bio->bi_private;
4528 struct vdo *vdo = vio->completion.vdo;
4530 continue_vio_after_io(vio, finish_loading_summary,
4531 vdo->thread_config.admin_thread);
4535 * load_slab_summary() - The preamble of a load operation.
4537 * Implements vdo_action_preamble_fn.
4539 static void load_slab_summary(void *context, struct vdo_completion *parent)
4543 struct slab_depot *depot = context;
4544 const struct admin_state_code *operation =
4545 vdo_get_current_manager_operation(depot->action_manager);
4547 result = create_multi_block_metadata_vio(depot->vdo, VIO_TYPE_SLAB_SUMMARY,
4548 VIO_PRIORITY_METADATA, parent,
4549 VDO_SLAB_SUMMARY_BLOCKS,
4550 (char *) depot->summary_entries, &vio);
4551 if (result != VDO_SUCCESS) {
4552 vdo_fail_completion(parent, result);
4556 if ((operation == VDO_ADMIN_STATE_FORMATTING) ||
4557 (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD)) {
4558 finish_loading_summary(&vio->completion);
4562 vdo_submit_metadata_vio(vio, depot->summary_origin, load_summary_endio,
4563 handle_combining_error, REQ_OP_READ);
4566 /* Implements vdo_zone_action_fn. */
4567 static void load_allocator(void *context, zone_count_t zone_number,
4568 struct vdo_completion *parent)
4570 struct slab_depot *depot = context;
4572 vdo_start_loading(&depot->allocators[zone_number].state,
4573 vdo_get_current_manager_operation(depot->action_manager),
4574 parent, initiate_load);
4578 * vdo_load_slab_depot() - Asynchronously load any slab depot state that isn't included in the
4579 * super_block component.
4580 * @depot: The depot to load.
4581 * @operation: The type of load to perform.
4582 * @parent: The completion to notify when the load is complete.
4583 * @context: Additional context for the load operation; may be NULL.
4585 * This method may be called only before entering normal operation from the load thread.
4587 void vdo_load_slab_depot(struct slab_depot *depot,
4588 const struct admin_state_code *operation,
4589 struct vdo_completion *parent, void *context)
4591 if (!vdo_assert_load_operation(operation, parent))
4594 vdo_schedule_operation_with_context(depot->action_manager, operation,
4595 load_slab_summary, load_allocator,
4596 NULL, context, parent);
4599 /* Implements vdo_zone_action_fn. */
4600 static void prepare_to_allocate(void *context, zone_count_t zone_number,
4601 struct vdo_completion *parent)
4603 struct slab_depot *depot = context;
4604 struct block_allocator *allocator = &depot->allocators[zone_number];
4607 result = vdo_prepare_slabs_for_allocation(allocator);
4608 if (result != VDO_SUCCESS) {
4609 vdo_fail_completion(parent, result);
4613 scrub_slabs(allocator, parent);
4617 * vdo_prepare_slab_depot_to_allocate() - Prepare the slab depot to come online and start
4618 * allocating blocks.
4619 * @depot: The depot to prepare.
4620 * @load_type: The load type.
4621 * @parent: The completion to notify when the operation is complete.
4623 * This method may be called only before entering normal operation from the load thread. It must be
4624 * called before allocation may proceed.
4626 void vdo_prepare_slab_depot_to_allocate(struct slab_depot *depot,
4627 enum slab_depot_load_type load_type,
4628 struct vdo_completion *parent)
4630 depot->load_type = load_type;
4631 atomic_set(&depot->zones_to_scrub, depot->zone_count);
4632 vdo_schedule_action(depot->action_manager, NULL,
4633 prepare_to_allocate, NULL, parent);
4637 * vdo_update_slab_depot_size() - Update the slab depot to reflect its new size in memory.
4638 * @depot: The depot to update.
4640 * This size is saved to disk as part of the super block.
4642 void vdo_update_slab_depot_size(struct slab_depot *depot)
4644 depot->last_block = depot->new_last_block;
4648 * vdo_prepare_to_grow_slab_depot() - Allocate new memory needed for a resize of a slab depot to
4650 * @depot: The depot to prepare to resize.
4651 * @partition: The new depot partition
4653 * Return: VDO_SUCCESS or an error.
4655 int vdo_prepare_to_grow_slab_depot(struct slab_depot *depot,
4656 const struct partition *partition)
4658 struct slab_depot_state_2_0 new_state;
4660 slab_count_t new_slab_count;
4662 if ((partition->count >> depot->slab_size_shift) <= depot->slab_count)
4663 return VDO_INCREMENT_TOO_SMALL;
4665 /* Generate the depot configuration for the new block count. */
4666 ASSERT_LOG_ONLY(depot->first_block == partition->offset,
4667 "New slab depot partition doesn't change origin");
4668 result = vdo_configure_slab_depot(partition, depot->slab_config,
4669 depot->zone_count, &new_state);
4670 if (result != VDO_SUCCESS)
4673 new_slab_count = vdo_compute_slab_count(depot->first_block,
4674 new_state.last_block,
4675 depot->slab_size_shift);
4676 if (new_slab_count <= depot->slab_count)
4677 return uds_log_error_strerror(VDO_INCREMENT_TOO_SMALL,
4678 "Depot can only grow");
4679 if (new_slab_count == depot->new_slab_count) {
4680 /* Check it out, we've already got all the new slabs allocated! */
4684 vdo_abandon_new_slabs(depot);
4685 result = allocate_slabs(depot, new_slab_count);
4686 if (result != VDO_SUCCESS) {
4687 vdo_abandon_new_slabs(depot);
4691 depot->new_size = partition->count;
4692 depot->old_last_block = depot->last_block;
4693 depot->new_last_block = new_state.last_block;
4699 * finish_registration() - Finish registering new slabs now that all of the allocators have
4700 * received their new slabs.
4702 * Implements vdo_action_conclusion_fn.
4704 static int finish_registration(void *context)
4706 struct slab_depot *depot = context;
4708 WRITE_ONCE(depot->slab_count, depot->new_slab_count);
4709 uds_free(depot->slabs);
4710 depot->slabs = depot->new_slabs;
4711 depot->new_slabs = NULL;
4712 depot->new_slab_count = 0;
4716 /* Implements vdo_zone_action_fn. */
4717 static void register_new_slabs(void *context, zone_count_t zone_number,
4718 struct vdo_completion *parent)
4720 struct slab_depot *depot = context;
4721 struct block_allocator *allocator = &depot->allocators[zone_number];
4724 for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4725 struct vdo_slab *slab = depot->new_slabs[i];
4727 if (slab->allocator == allocator)
4728 register_slab_with_allocator(allocator, slab);
4731 vdo_finish_completion(parent);
4735 * vdo_use_new_slabs() - Use the new slabs allocated for resize.
4736 * @depot: The depot.
4737 * @parent: The object to notify when complete.
4739 void vdo_use_new_slabs(struct slab_depot *depot, struct vdo_completion *parent)
4741 ASSERT_LOG_ONLY(depot->new_slabs != NULL, "Must have new slabs to use");
4742 vdo_schedule_operation(depot->action_manager,
4743 VDO_ADMIN_STATE_SUSPENDED_OPERATION,
4744 NULL, register_new_slabs,
4745 finish_registration, parent);
4749 * stop_scrubbing() - Tell the scrubber to stop scrubbing after it finishes the slab it is
4750 * currently working on.
4751 * @scrubber: The scrubber to stop.
4752 * @parent: The completion to notify when scrubbing has stopped.
4754 static void stop_scrubbing(struct block_allocator *allocator)
4756 struct slab_scrubber *scrubber = &allocator->scrubber;
4758 if (vdo_is_state_quiescent(&scrubber->admin_state)) {
4759 vdo_finish_completion(&allocator->completion);
4761 vdo_start_draining(&scrubber->admin_state,
4762 VDO_ADMIN_STATE_SUSPENDING,
4763 &allocator->completion, NULL);
4767 /* Implements vdo_admin_initiator_fn. */
4768 static void initiate_summary_drain(struct admin_state *state)
4770 check_summary_drain_complete(container_of(state, struct block_allocator,
4774 static void do_drain_step(struct vdo_completion *completion)
4776 struct block_allocator *allocator = vdo_as_block_allocator(completion);
4778 vdo_prepare_completion_for_requeue(&allocator->completion, do_drain_step,
4779 handle_operation_error, allocator->thread_id,
4781 switch (++allocator->drain_step) {
4782 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4783 stop_scrubbing(allocator);
4786 case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4787 apply_to_slabs(allocator, do_drain_step);
4790 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4791 vdo_start_draining(&allocator->summary_state,
4792 vdo_get_admin_state_code(&allocator->state),
4793 completion, initiate_summary_drain);
4796 case VDO_DRAIN_ALLOCATOR_STEP_FINISHED:
4797 ASSERT_LOG_ONLY(!is_vio_pool_busy(allocator->vio_pool),
4798 "vio pool not busy");
4799 vdo_finish_draining_with_result(&allocator->state, completion->result);
4803 vdo_finish_draining_with_result(&allocator->state, UDS_BAD_STATE);
4807 /* Implements vdo_admin_initiator_fn. */
4808 static void initiate_drain(struct admin_state *state)
4810 struct block_allocator *allocator =
4811 container_of(state, struct block_allocator, state);
4813 allocator->drain_step = VDO_DRAIN_ALLOCATOR_START;
4814 do_drain_step(&allocator->completion);
4818 * Drain all allocator I/O. Depending upon the type of drain, some or all dirty metadata may be
4819 * written to disk. The type of drain will be determined from the state of the allocator's depot.
4821 * Implements vdo_zone_action_fn.
4823 static void drain_allocator(void *context, zone_count_t zone_number,
4824 struct vdo_completion *parent)
4826 struct slab_depot *depot = context;
4828 vdo_start_draining(&depot->allocators[zone_number].state,
4829 vdo_get_current_manager_operation(depot->action_manager),
4830 parent, initiate_drain);
4834 * vdo_drain_slab_depot() - Drain all slab depot I/O.
4835 * @depot: The depot to drain.
4836 * @operation: The drain operation (flush, rebuild, suspend, or save).
4837 * @parent: The completion to finish when the drain is complete.
4839 * If saving, or flushing, all dirty depot metadata will be written out. If saving or suspending,
4840 * the depot will be left in a suspended state.
4842 void vdo_drain_slab_depot(struct slab_depot *depot,
4843 const struct admin_state_code *operation,
4844 struct vdo_completion *parent)
4846 vdo_schedule_operation(depot->action_manager, operation,
4847 NULL, drain_allocator, NULL, parent);
4851 * resume_scrubbing() - Tell the scrubber to resume scrubbing if it has been stopped.
4852 * @allocator: The allocator being resumed.
4854 static void resume_scrubbing(struct block_allocator *allocator)
4857 struct slab_scrubber *scrubber = &allocator->scrubber;
4859 if (!has_slabs_to_scrub(scrubber)) {
4860 vdo_finish_completion(&allocator->completion);
4864 result = vdo_resume_if_quiescent(&scrubber->admin_state);
4865 if (result != VDO_SUCCESS) {
4866 vdo_fail_completion(&allocator->completion, result);
4870 scrub_next_slab(scrubber);
4871 vdo_finish_completion(&allocator->completion);
4874 static void do_resume_step(struct vdo_completion *completion)
4876 struct block_allocator *allocator = vdo_as_block_allocator(completion);
4878 vdo_prepare_completion_for_requeue(&allocator->completion, do_resume_step,
4879 handle_operation_error,
4880 allocator->thread_id, NULL);
4881 switch (--allocator->drain_step) {
4882 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4883 vdo_fail_completion(completion,
4884 vdo_resume_if_quiescent(&allocator->summary_state));
4887 case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4888 apply_to_slabs(allocator, do_resume_step);
4891 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4892 resume_scrubbing(allocator);
4895 case VDO_DRAIN_ALLOCATOR_START:
4896 vdo_finish_resuming_with_result(&allocator->state, completion->result);
4900 vdo_finish_resuming_with_result(&allocator->state, UDS_BAD_STATE);
4904 /* Implements vdo_admin_initiator_fn. */
4905 static void initiate_resume(struct admin_state *state)
4907 struct block_allocator *allocator =
4908 container_of(state, struct block_allocator, state);
4910 allocator->drain_step = VDO_DRAIN_ALLOCATOR_STEP_FINISHED;
4911 do_resume_step(&allocator->completion);
4914 /* Implements vdo_zone_action_fn. */
4915 static void resume_allocator(void *context, zone_count_t zone_number,
4916 struct vdo_completion *parent)
4918 struct slab_depot *depot = context;
4920 vdo_start_resuming(&depot->allocators[zone_number].state,
4921 vdo_get_current_manager_operation(depot->action_manager),
4922 parent, initiate_resume);
4926 * vdo_resume_slab_depot() - Resume a suspended slab depot.
4927 * @depot: The depot to resume.
4928 * @parent: The completion to finish when the depot has resumed.
4930 void vdo_resume_slab_depot(struct slab_depot *depot, struct vdo_completion *parent)
4932 if (vdo_is_read_only(depot->vdo)) {
4933 vdo_continue_completion(parent, VDO_READ_ONLY);
4937 vdo_schedule_operation(depot->action_manager, VDO_ADMIN_STATE_RESUMING,
4938 NULL, resume_allocator, NULL, parent);
4942 * vdo_commit_oldest_slab_journal_tail_blocks() - Commit all dirty tail blocks which are locking a
4943 * given recovery journal block.
4944 * @depot: The depot.
4945 * @recovery_block_number: The sequence number of the recovery journal block whose locks should be
4948 * Context: This method must be called from the journal zone thread.
4950 void vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot *depot,
4951 sequence_number_t recovery_block_number)
4956 depot->new_release_request = recovery_block_number;
4957 vdo_schedule_default_action(depot->action_manager);
4960 /* Implements vdo_zone_action_fn. */
4961 static void scrub_all_unrecovered_slabs(void *context, zone_count_t zone_number,
4962 struct vdo_completion *parent)
4964 struct slab_depot *depot = context;
4966 scrub_slabs(&depot->allocators[zone_number], NULL);
4967 vdo_launch_completion(parent);
4971 * vdo_scrub_all_unrecovered_slabs() - Scrub all unrecovered slabs.
4972 * @depot: The depot to scrub.
4973 * @parent: The object to notify when scrubbing has been launched for all zones.
4975 void vdo_scrub_all_unrecovered_slabs(struct slab_depot *depot,
4976 struct vdo_completion *parent)
4978 vdo_schedule_action(depot->action_manager, NULL,
4979 scrub_all_unrecovered_slabs,
4984 * get_block_allocator_statistics() - Get the total of the statistics from all the block allocators
4986 * @depot: The slab depot.
4988 * Return: The statistics from all block allocators in the depot.
4990 static struct block_allocator_statistics __must_check
4991 get_block_allocator_statistics(const struct slab_depot *depot)
4993 struct block_allocator_statistics totals;
4996 memset(&totals, 0, sizeof(totals));
4998 for (zone = 0; zone < depot->zone_count; zone++) {
4999 const struct block_allocator *allocator = &depot->allocators[zone];
5000 const struct block_allocator_statistics *stats = &allocator->statistics;
5002 totals.slab_count += allocator->slab_count;
5003 totals.slabs_opened += READ_ONCE(stats->slabs_opened);
5004 totals.slabs_reopened += READ_ONCE(stats->slabs_reopened);
5011 * get_ref_counts_statistics() - Get the cumulative ref_counts statistics for the depot.
5012 * @depot: The slab depot.
5014 * Return: The cumulative statistics for all ref_counts in the depot.
5016 static struct ref_counts_statistics __must_check
5017 get_ref_counts_statistics(const struct slab_depot *depot)
5019 struct ref_counts_statistics totals;
5022 memset(&totals, 0, sizeof(totals));
5024 for (zone = 0; zone < depot->zone_count; zone++) {
5025 totals.blocks_written +=
5026 READ_ONCE(depot->allocators[zone].ref_counts_statistics.blocks_written);
5033 * get_depot_slab_journal_statistics() - Get the aggregated slab journal statistics for the depot.
5034 * @depot: The slab depot.
5036 * Return: The aggregated statistics for all slab journals in the depot.
5038 static struct slab_journal_statistics __must_check
5039 get_slab_journal_statistics(const struct slab_depot *depot)
5041 struct slab_journal_statistics totals;
5044 memset(&totals, 0, sizeof(totals));
5046 for (zone = 0; zone < depot->zone_count; zone++) {
5047 const struct slab_journal_statistics *stats =
5048 &depot->allocators[zone].slab_journal_statistics;
5050 totals.disk_full_count += READ_ONCE(stats->disk_full_count);
5051 totals.flush_count += READ_ONCE(stats->flush_count);
5052 totals.blocked_count += READ_ONCE(stats->blocked_count);
5053 totals.blocks_written += READ_ONCE(stats->blocks_written);
5054 totals.tail_busy_count += READ_ONCE(stats->tail_busy_count);
5061 * vdo_get_slab_depot_statistics() - Get all the vdo_statistics fields that are properties of the
5063 * @depot: The slab depot.
5064 * @stats: The vdo statistics structure to partially fill.
5066 void vdo_get_slab_depot_statistics(const struct slab_depot *depot,
5067 struct vdo_statistics *stats)
5069 slab_count_t slab_count = READ_ONCE(depot->slab_count);
5070 slab_count_t unrecovered = 0;
5073 for (zone = 0; zone < depot->zone_count; zone++) {
5074 /* The allocators are responsible for thread safety. */
5075 unrecovered += READ_ONCE(depot->allocators[zone].scrubber.slab_count);
5078 stats->recovery_percentage = (slab_count - unrecovered) * 100 / slab_count;
5079 stats->allocator = get_block_allocator_statistics(depot);
5080 stats->ref_counts = get_ref_counts_statistics(depot);
5081 stats->slab_journal = get_slab_journal_statistics(depot);
5082 stats->slab_summary = (struct slab_summary_statistics) {
5083 .blocks_written = atomic64_read(&depot->summary_statistics.blocks_written),
5088 * vdo_dump_slab_depot() - Dump the slab depot, in a thread-unsafe fashion.
5089 * @depot: The slab depot.
5091 void vdo_dump_slab_depot(const struct slab_depot *depot)
5093 uds_log_info("vdo slab depot");
5094 uds_log_info(" zone_count=%u old_zone_count=%u slabCount=%u active_release_request=%llu new_release_request=%llu",
5095 (unsigned int) depot->zone_count,
5096 (unsigned int) depot->old_zone_count, READ_ONCE(depot->slab_count),
5097 (unsigned long long) depot->active_release_request,
5098 (unsigned long long) depot->new_release_request);