diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 303cc742b5..07bece8808 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -113,7 +113,7 @@ future<> view_update_generator::start() { auto& [t, sstables] = *it; try { inject_failure("view_update_generator_move_staging_sstable"); - t->remove_sstables_from_staging(sstables).get(); + t->move_sstables_from_staging(sstables).get(); } catch (...) { // Move from staging will be retried upon restart. vug_logger.warn("Moving some sstable from staging failed: {}. Ignoring...", std::current_exception()); @@ -173,13 +173,13 @@ void view_update_generator::setup_metrics() { void view_update_generator::discover_staging_sstables() { for (auto& x : _db.get_column_families()) { auto t = x.second->shared_from_this(); - const auto& sstables = t->get_staging_sstables(); - _sstables_with_tables[t].reserve(_sstables_with_tables[t].size() + sstables.size()); - for (auto& sst : sstables | boost::adaptors::map_values) { - _sstables_with_tables[t].push_back(sst); - // we're at early stage here, no need to kick _pending_sstables (the - // bulding fiber is not running), neither we can wait on the semaphore - _registration_sem.consume(1); + for (auto sstables = t->get_sstables(); sstables::shared_sstable sst : *sstables) { + if (sst->requires_view_building()) { + _sstables_with_tables[t].push_back(std::move(sst)); + // we're at early stage here, no need to kick _pending_sstables (the + // bulding fiber is not running), neither we can wait on the semaphore + _registration_sem.consume(1); + } } } } diff --git a/replica/database.hh b/replica/database.hh index e1c0e88116..f642ee2c61 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -510,7 +510,7 @@ public: future<> add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy = sstables::offstrategy::no); future<> add_sstables_and_update_cache(const std::vector& ssts); - future<> remove_sstables_from_staging(std::vector); + future<> move_sstables_from_staging(std::vector); sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f, io_error_handler_gen error_handler_gen); sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f); @@ -909,9 +909,6 @@ public: size_t sstables_count() const; std::vector sstable_count_per_level() const; int64_t get_unleveled_sstables() const; - const auto& get_staging_sstables() const { - return _sstables_staging; - } void start_compaction(); void trigger_compaction(); diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 3aefc0c61f..a5c0c47e7d 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -546,12 +546,9 @@ future<> distributed_loader::populate_keyspace(distributed& d try { co_await ks.make_directory_for_column_family(cfname, uuid); - // Populate the table base directory first so we can clone - // staging sstables into it later when populating the table - // from the staging_dir. - co_await distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes); co_await distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname, allow_offstrategy_compaction::no); co_await distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, allow_offstrategy_compaction::no, must_exist::no); + co_await distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes); } catch (...) { std::exception_ptr eptr = std::current_exception(); std::string msg = diff --git a/replica/table.cc b/replica/table.cc index df70d2f607..7c6622e065 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -392,13 +392,12 @@ table::do_add_sstable(lw_shared_ptr sstables, sstables::s if (belongs_to_other_shard(sstable->get_shards_for_this_sstable())) { on_internal_error(tlogger, format("Attempted to load the shared SSTable {} at table", sstable->get_filename())); } - if (sstable->requires_view_building()) { - on_internal_error(tlogger, format("Attempted to load staging SSTable {} at table", sstable->get_filename())); - } // allow in-progress reads to continue using old list auto new_sstables = make_lw_shared(*sstables); new_sstables->insert(sstable); - if (backlog_tracker) { + if (sstable->requires_view_building()) { + _sstables_staging.emplace(sstable->generation(), sstable); + } else if (backlog_tracker) { add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sstable); } // update sstable set last in case either updating @@ -439,28 +438,6 @@ void table::enable_off_strategy_trigger() { future<> table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) { - if (sst->requires_view_building()) { - auto [it, inserted] = _sstables_staging.emplace(sst->generation(), sst); - if (!inserted) { - on_internal_error(tlogger, format("could not add staging sstable: generation {} already exists", sst->generation())); - } - - // clone staging sstables so their content may be compacted while - // views are built. When done, the hard-linked copy in the staging - // subsirectory will be simply unlinked. - // - // Note that after restart, we don't know whether we already cloned - // the staging sstable or we might have restarted right after sealing it - // and before cloning here, so we might be resurrecting an sstable - // in the base directory in this rare corner case. - try { - sst = co_await sst->clone_at(dir(), calculate_generation_for_new_table()); - } catch (...) { - _sstables_staging.erase(it); - throw; - } - } - auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1); co_return co_await get_row_cache().invalidate(row_cache::external_updater([this, sst, offstrategy] () noexcept { // FIXME: this is not really noexcept, but we need to provide strong exception guarantees. @@ -2295,18 +2272,21 @@ table::make_reader_v2_excluding_sstables(schema_ptr s, return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr); } -future<> table::remove_sstables_from_staging(std::vector sstables) { +future<> table::move_sstables_from_staging(std::vector sstables) { auto units = co_await get_units(_sstable_deletion_sem, 1); - std::set dirs_to_sync; - + auto dirs_to_sync = std::set({dir()}); + auto main_sstables = _main_sstables->all(); for (auto sst : sstables) { dirs_to_sync.emplace(sst->get_dir()); - tlogger.debug("Removing sstable {} from staging", sst->get_filename()); try { - co_await sst->unlink(); + co_await sst->move_to_new_dir(dir(), sst->generation(), false); _sstables_staging.erase(sst->generation()); + // Maintenance SSTables being moved from staging shouldn't be added to tracker because they're off-strategy + if (main_sstables->contains(sst)) { + add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sst); + } } catch (...) { - tlogger.warn("Failed to remove sstable {} from staging: {}", sst->get_filename(), std::current_exception()); + tlogger.warn("Failed to move sstable {} from staging: {}", sst->get_filename(), std::current_exception()); throw; } } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index eecc8a2350..755067e68f 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2143,17 +2143,6 @@ future<> sstable::move_to_quarantine(bool do_sync_dirs) { co_await move_to_new_dir(std::move(new_dir), generation(), do_sync_dirs); } -future sstable::clone_at(const sstring& new_dir, std::optional opt_gen) { - if (fs::canonical(fs::path(new_dir)) == fs::canonical(fs::path(_dir))) { - on_internal_error(sstlog, format("Cannot clone sstable {} into same dir", get_filename())); - } - auto gen = opt_gen.value_or(_generation); - co_await create_links(new_dir, gen); - auto cloned_sst = _manager.make_sstable(_schema, new_dir, gen, _version, _format); - co_await cloned_sst->load(co_await get_open_info()); - co_return cloned_sst; -} - flat_mutation_reader_v2 sstable::make_reader( schema_ptr schema, diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 1b19d94ac1..82b789c98c 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -201,12 +201,6 @@ public: // will move it into a quarantine_dir subdirectory of its current directory. future<> move_to_quarantine(bool do_sync_dirs = true); - // Clone the sstable at a new directory. - // hardlink all sstable components in the new dir - // with the same generation and return a new, shared - // sstable object for the clone. - future clone_at(const sstring& new_dir, std::optional opt_gen); - generation_type generation() const { return _generation; }