diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index b27e2b2ceb..303cc742b5 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -113,7 +113,7 @@ future<> view_update_generator::start() { auto& [t, sstables] = *it; try { inject_failure("view_update_generator_move_staging_sstable"); - t->move_sstables_from_staging(sstables).get(); + t->remove_sstables_from_staging(sstables).get(); } catch (...) { // Move from staging will be retried upon restart. vug_logger.warn("Moving some sstable from staging failed: {}. Ignoring...", std::current_exception()); diff --git a/replica/database.hh b/replica/database.hh index 2cab6c8633..fb59a196a9 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -502,7 +502,7 @@ public: future<> add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy = sstables::offstrategy::no); future<> add_sstables_and_update_cache(const std::vector& ssts); - future<> move_sstables_from_staging(std::vector); + future<> remove_sstables_from_staging(std::vector); sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f, io_error_handler_gen error_handler_gen); sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f); diff --git a/replica/table.cc b/replica/table.cc index a80079be02..a7ee5281b8 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -392,12 +392,13 @@ table::do_add_sstable(lw_shared_ptr sstables, sstables::s if (belongs_to_other_shard(sstable->get_shards_for_this_sstable())) { on_internal_error(tlogger, format("Attempted to load the shared SSTable {} at table", sstable->get_filename())); } + if (sstable->requires_view_building()) { + on_internal_error(tlogger, format("Attempted to load staging SSTable {} at table", sstable->get_filename())); + } // allow in-progress reads to continue using old list auto new_sstables = make_lw_shared(*sstables); new_sstables->insert(sstable); - if (sstable->requires_view_building()) { - _sstables_staging.emplace(sstable->generation(), sstable); - } else if (backlog_tracker) { + if (backlog_tracker) { add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sstable); } // update sstable set last in case either updating @@ -438,6 +439,28 @@ void table::enable_off_strategy_trigger() { future<> table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) { + if (sst->requires_view_building()) { + auto [it, inserted] = _sstables_staging.emplace(sst->generation(), sst); + if (!inserted) { + on_internal_error(tlogger, format("could not add staging sstable: generation {} already exists", sst->generation())); + } + + // clone staging sstables so their content may be compacted while + // views are built. When done, the hard-linked copy in the staging + // subsirectory will be simply unlinked. + // + // Note that after restart, we don't know whether we already cloned + // the staging sstable or we might have restarted right after sealing it + // and before cloning here, so we might be resurrecting an sstable + // in the base directory in this rare corner case. + try { + sst = co_await sst->clone_at(dir(), calculate_generation_for_new_table()); + } catch (...) { + _sstables_staging.erase(it); + throw; + } + } + auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1); co_return co_await get_row_cache().invalidate(row_cache::external_updater([this, sst, offstrategy] () noexcept { // FIXME: this is not really noexcept, but we need to provide strong exception guarantees. @@ -2276,21 +2299,18 @@ table::make_reader_v2_excluding_sstables(schema_ptr s, return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr); } -future<> table::move_sstables_from_staging(std::vector sstables) { +future<> table::remove_sstables_from_staging(std::vector sstables) { auto units = co_await get_units(_sstable_deletion_sem, 1); - auto dirs_to_sync = std::set({dir()}); - auto main_sstables = _main_sstables->all(); + std::set dirs_to_sync; + for (auto sst : sstables) { dirs_to_sync.emplace(sst->get_dir()); + tlogger.debug("Removing sstable {} from staging", sst->get_filename()); try { - co_await sst->move_to_new_dir(dir(), sst->generation(), false); + co_await sst->unlink(); _sstables_staging.erase(sst->generation()); - // Maintenance SSTables being moved from staging shouldn't be added to tracker because they're off-strategy - if (main_sstables->contains(sst)) { - add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sst); - } } catch (...) { - tlogger.warn("Failed to move sstable {} from staging: {}", sst->get_filename(), std::current_exception()); + tlogger.warn("Failed to remove sstable {} from staging: {}", sst->get_filename(), std::current_exception()); throw; } } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 755067e68f..eecc8a2350 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2143,6 +2143,17 @@ future<> sstable::move_to_quarantine(bool do_sync_dirs) { co_await move_to_new_dir(std::move(new_dir), generation(), do_sync_dirs); } +future sstable::clone_at(const sstring& new_dir, std::optional opt_gen) { + if (fs::canonical(fs::path(new_dir)) == fs::canonical(fs::path(_dir))) { + on_internal_error(sstlog, format("Cannot clone sstable {} into same dir", get_filename())); + } + auto gen = opt_gen.value_or(_generation); + co_await create_links(new_dir, gen); + auto cloned_sst = _manager.make_sstable(_schema, new_dir, gen, _version, _format); + co_await cloned_sst->load(co_await get_open_info()); + co_return cloned_sst; +} + flat_mutation_reader_v2 sstable::make_reader( schema_ptr schema, diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 82b789c98c..1b19d94ac1 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -201,6 +201,12 @@ public: // will move it into a quarantine_dir subdirectory of its current directory. future<> move_to_quarantine(bool do_sync_dirs = true); + // Clone the sstable at a new directory. + // hardlink all sstable components in the new dir + // with the same generation and return a new, shared + // sstable object for the clone. + future clone_at(const sstring& new_dir, std::optional opt_gen); + generation_type generation() const { return _generation; }