table: Don't remove a SSTable from the backlog tracker if not previously added

After 7f1a215, a sstable is only added to backlog tracker if
sstable::shared() returns true.

sstable::shared() can return true for a sstable that is actually owned
by more than one shard, but it can also incorrectly return true for
a sstable which wasn't made explicitly unshared through set_unshared().
A recent work of mine is getting rid of set_unshared() because a
sstable has the knowledge to determine whether or not it's shared.

The problem starts with streaming sstable which hasn't set_unshared()
called for it, so it won't be added to backlog tracker, but it can
be eventually removed from the tracker when that sstable is compacted.
Also, it could happen that a shared sstable, which was resharded, will
be removed from the tracker even though it wasn't previously added.

When those problems happen, backlog tracker will have an incorrect
account of total bytes, which leads it to producing incorrect
backlogs that can potentially go negative.

These problems are fixed by making every add / removal go through
functions which take into account sstable::shared().

Fixes #6227.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20200512220226.134481-2-raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2020-05-12 19:02:26 -03:00
committed by Avi Kivity
parent fb6976f1b9
commit 077b4ee97d
2 changed files with 13 additions and 4 deletions

View File

@@ -587,6 +587,7 @@ private:
// Strong exception guarantees.
void add_sstable(sstables::shared_sstable sstable, const std::vector<unsigned>& shards_for_the_sstable);
static void add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
static void remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable);
// returns an empty pointer if sstable doesn't belong to current shard.
future<sstables::shared_sstable> open_sstable(sstables::foreign_sstable_open_info info, sstring dir,
int64_t generation, sstables::sstable_version_types v, sstables::sstable_format_types f);

View File

@@ -682,6 +682,13 @@ inline void table::add_sstable_to_backlog_tracker(compaction_backlog_tracker& tr
}
}
inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) {
// Shared sstables belong to resharding's own backlog tracker.
if (!sstable->is_shared()) {
tracker.remove_sstable(std::move(sstable));
}
}
void table::add_sstable(sstables::shared_sstable sstable, const std::vector<unsigned>& shards_for_the_sstable) {
// allow in-progress reads to continue using old list
auto new_sstables = make_lw_shared(*_sstables);
@@ -1255,10 +1262,11 @@ future<> table::replace_ancestors_needed_rewrite(std::unordered_set<uint64_t> an
}
return rebuild_sstable_list_with_deletion_sem(new_sstables, old_sstables).then([this, new_sstables, old_sstables] {
for (auto& sst : new_sstables) {
_compaction_strategy.get_backlog_tracker().add_sstable(sst);
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sst);
}
for (auto& sst : old_sstables) {
_compaction_strategy.get_backlog_tracker().remove_sstable(sst);
remove_sstable_from_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sst);
_sstables_need_rewrite.erase(sst->generation());
}
});
@@ -1929,7 +1937,7 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
rebuild_statistics();
return parallel_for_each(p->remove, [this](sstables::shared_sstable s) {
_compaction_strategy.get_backlog_tracker().remove_sstable(s);
remove_sstable_from_backlog_tracker(_compaction_strategy.get_backlog_tracker(), s);
return sstables::delete_atomically({s});
}).then([p] {
return make_ready_future<db::replay_position>(p->rp);
@@ -2492,7 +2500,7 @@ future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable>
return sst->move_to_new_dir(dir(), sst->generation(), false).then_wrapped([this, sst, &dirs_to_sync] (future<> f) {
if (!f.failed()) {
_sstables_staging.erase(sst->generation());
_compaction_strategy.get_backlog_tracker().add_sstable(sst);
add_sstable_to_backlog_tracker(_compaction_strategy.get_backlog_tracker(), sst);
return make_ready_future<>();
} else {
auto ep = f.get_exception();