Revert 'Compact staging sstables'

This patch reverts the following patches merged in
78750c2e1a "Merge 'Compact staging sstables' from Benny Halevy"

> 597e415c38 "table: clone staging sstables into table dir"
> ce5bd505dc "view_update_generator: discover_staging_sstables: reindent"
> 59874b2837 "table: add get_staging_sstables"
> 7536dd7f00 "distributed_loader: populate table directory first"

The feature causes regressions seen with e.g.
https://jenkins.scylladb.com/view/master/job/scylla-master/job/dtest-daily-release/41/testReport/materialized_views_test/TestMaterializedViews/Run_Dtest_Parallel_Cloud_Machines___FullDtest___full_split011___test_base_replica_repair/
```
AssertionError: Expected [[0, 0, 'a', 3.0]] from SELECT * FROM t_by_v WHERE v = 0, but got []
```

Where views aren't updated properly.
Apparently since `table::stream_view_replica_updates`
doesn't exclude the staging sstables anymore and
since they are cloned to the base table as new sstables
it seems to the view builder that no view updates are
required since there's no changes comparing to the base table.

Reopens #9559

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #10890
This commit is contained in:
Benny Halevy
2022-06-27 08:13:33 +03:00
committed by Botond Dénes
parent 8bccd5e9c5
commit 81fa1ce9a1
6 changed files with 22 additions and 65 deletions

View File

@@ -113,7 +113,7 @@ future<> view_update_generator::start() {
auto& [t, sstables] = *it;
try {
inject_failure("view_update_generator_move_staging_sstable");
t->remove_sstables_from_staging(sstables).get();
t->move_sstables_from_staging(sstables).get();
} catch (...) {
// Move from staging will be retried upon restart.
vug_logger.warn("Moving some sstable from staging failed: {}. Ignoring...", std::current_exception());
@@ -173,13 +173,13 @@ void view_update_generator::setup_metrics() {
void view_update_generator::discover_staging_sstables() {
for (auto& x : _db.get_column_families()) {
auto t = x.second->shared_from_this();
const auto& sstables = t->get_staging_sstables();
_sstables_with_tables[t].reserve(_sstables_with_tables[t].size() + sstables.size());
for (auto& sst : sstables | boost::adaptors::map_values) {
_sstables_with_tables[t].push_back(sst);
// we're at early stage here, no need to kick _pending_sstables (the
// bulding fiber is not running), neither we can wait on the semaphore
_registration_sem.consume(1);
for (auto sstables = t->get_sstables(); sstables::shared_sstable sst : *sstables) {
if (sst->requires_view_building()) {
_sstables_with_tables[t].push_back(std::move(sst));
// we're at early stage here, no need to kick _pending_sstables (the
// bulding fiber is not running), neither we can wait on the semaphore
_registration_sem.consume(1);
}
}
}
}

View File

@@ -510,7 +510,7 @@ public:
future<> add_sstable_and_update_cache(sstables::shared_sstable sst,
sstables::offstrategy offstrategy = sstables::offstrategy::no);
future<> add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts);
future<> remove_sstables_from_staging(std::vector<sstables::shared_sstable>);
future<> move_sstables_from_staging(std::vector<sstables::shared_sstable>);
sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f,
io_error_handler_gen error_handler_gen);
sstables::shared_sstable make_sstable(sstring dir, sstables::generation_type generation, sstables::sstable_version_types v, sstables::sstable_format_types f);
@@ -909,9 +909,6 @@ public:
size_t sstables_count() const;
std::vector<uint64_t> sstable_count_per_level() const;
int64_t get_unleveled_sstables() const;
const auto& get_staging_sstables() const {
return _sstables_staging;
}
void start_compaction();
void trigger_compaction();

View File

@@ -546,12 +546,9 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
try {
co_await ks.make_directory_for_column_family(cfname, uuid);
// Populate the table base directory first so we can clone
// staging sstables into it later when populating the table
// from the staging_dir.
co_await distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes);
co_await distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname, allow_offstrategy_compaction::no);
co_await distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, allow_offstrategy_compaction::no, must_exist::no);
co_await distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes);
} catch (...) {
std::exception_ptr eptr = std::current_exception();
std::string msg =

View File

@@ -392,13 +392,12 @@ table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::s
if (belongs_to_other_shard(sstable->get_shards_for_this_sstable())) {
on_internal_error(tlogger, format("Attempted to load the shared SSTable {} at table", sstable->get_filename()));
}
if (sstable->requires_view_building()) {
on_internal_error(tlogger, format("Attempted to load staging SSTable {} at table", sstable->get_filename()));
}
// allow in-progress reads to continue using old list
auto new_sstables = make_lw_shared<sstables::sstable_set>(*sstables);
new_sstables->insert(sstable);
if (backlog_tracker) {
if (sstable->requires_view_building()) {
_sstables_staging.emplace(sstable->generation(), sstable);
} else if (backlog_tracker) {
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sstable);
}
// update sstable set last in case either updating
@@ -439,28 +438,6 @@ void table::enable_off_strategy_trigger() {
future<>
table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy) {
if (sst->requires_view_building()) {
auto [it, inserted] = _sstables_staging.emplace(sst->generation(), sst);
if (!inserted) {
on_internal_error(tlogger, format("could not add staging sstable: generation {} already exists", sst->generation()));
}
// clone staging sstables so their content may be compacted while
// views are built. When done, the hard-linked copy in the staging
// subsirectory will be simply unlinked.
//
// Note that after restart, we don't know whether we already cloned
// the staging sstable or we might have restarted right after sealing it
// and before cloning here, so we might be resurrecting an sstable
// in the base directory in this rare corner case.
try {
sst = co_await sst->clone_at(dir(), calculate_generation_for_new_table());
} catch (...) {
_sstables_staging.erase(it);
throw;
}
}
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
co_return co_await get_row_cache().invalidate(row_cache::external_updater([this, sst, offstrategy] () noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
@@ -2295,18 +2272,21 @@ table::make_reader_v2_excluding_sstables(schema_ptr s,
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
}
future<> table::remove_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {
auto units = co_await get_units(_sstable_deletion_sem, 1);
std::set<sstring> dirs_to_sync;
auto dirs_to_sync = std::set<sstring>({dir()});
auto main_sstables = _main_sstables->all();
for (auto sst : sstables) {
dirs_to_sync.emplace(sst->get_dir());
tlogger.debug("Removing sstable {} from staging", sst->get_filename());
try {
co_await sst->unlink();
co_await sst->move_to_new_dir(dir(), sst->generation(), false);
_sstables_staging.erase(sst->generation());
// Maintenance SSTables being moved from staging shouldn't be added to tracker because they're off-strategy
if (main_sstables->contains(sst)) {
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sst);
}
} catch (...) {
tlogger.warn("Failed to remove sstable {} from staging: {}", sst->get_filename(), std::current_exception());
tlogger.warn("Failed to move sstable {} from staging: {}", sst->get_filename(), std::current_exception());
throw;
}
}

View File

@@ -2143,17 +2143,6 @@ future<> sstable::move_to_quarantine(bool do_sync_dirs) {
co_await move_to_new_dir(std::move(new_dir), generation(), do_sync_dirs);
}
future<shared_sstable> sstable::clone_at(const sstring& new_dir, std::optional<generation_type> opt_gen) {
if (fs::canonical(fs::path(new_dir)) == fs::canonical(fs::path(_dir))) {
on_internal_error(sstlog, format("Cannot clone sstable {} into same dir", get_filename()));
}
auto gen = opt_gen.value_or(_generation);
co_await create_links(new_dir, gen);
auto cloned_sst = _manager.make_sstable(_schema, new_dir, gen, _version, _format);
co_await cloned_sst->load(co_await get_open_info());
co_return cloned_sst;
}
flat_mutation_reader_v2
sstable::make_reader(
schema_ptr schema,

View File

@@ -201,12 +201,6 @@ public:
// will move it into a quarantine_dir subdirectory of its current directory.
future<> move_to_quarantine(bool do_sync_dirs = true);
// Clone the sstable at a new directory.
// hardlink all sstable components in the new dir
// with the same generation and return a new, shared
// sstable object for the clone.
future<shared_sstable> clone_at(const sstring& new_dir, std::optional<generation_type> opt_gen);
generation_type generation() const {
return _generation;
}