compaction: Make SSTable cleanup more efficient by fast forwarding to next owned range

Today, SSTable cleanup skips to the next partition, one at a time, when it finds
that the current partition is no longer owned by this node.

That's very inefficient because when a cluster is growing in size, existing
nodes lose multiple sequential tokens in its owned ranges. Another inefficiency
comes from fetching index pages spanning all unowned tokens, which was described
in #14317.

To solve both problems, cleanup will now use multi range reader, to guarantee
that it will only process the owned data and as a result skip unowned data.
This results in cleanup scanning an owned range and then fast forwarding to the
next one, until it's done with them all. This reduces significantly the amount
of data in the index caching, as index will only be invoked at each range
boundary instead.

Without further ado,

before:

... 2GB to 1GB (~50% of original) in 26248ms = 81MB/s. ~9443072 total partitions merged to 4750028.

after:

... 2GB to 1GB (~50% of original) in 17424ms = 123MB/s. ~9443072 total partitions merged to 4750028.

Fixes #12998.
Fixes #14317.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2023-07-01 06:23:09 -03:00
parent 1fefe597e6
commit 8d58ff1be6
2 changed files with 46 additions and 24 deletions

View File

@@ -50,7 +50,7 @@
#include "utils/utf8.hh"
#include "utils/fmt-compat.hh"
#include "utils/error_injection.hh"
#include "readers/filtering.hh"
#include "readers/multi_range.hh"
#include "readers/compacting.hh"
#include "tombstone_gc.hh"
#include "keys.hh"
@@ -610,16 +610,6 @@ protected:
bool enable_garbage_collected_sstable_writer() const noexcept {
return _contains_multi_fragment_runs && _max_sstable_size != std::numeric_limits<uint64_t>::max();
}
flat_mutation_reader_v2::filter make_partition_filter() const {
return [this] (const dht::decorated_key& dk) {
if (!_owned_ranges_checker->belongs_to_current_node(dk.token())) {
log_trace("Token {} does not belong to this node, skipping", dk.token());
return false;
}
return true;
};
}
public:
compaction& operator=(const compaction&) = delete;
compaction(const compaction&) = delete;
@@ -639,22 +629,47 @@ private:
streamed_mutation::forwarding fwd,
mutation_reader::forwarding) const = 0;
// Make a filtering reader if needed
// FIXME: the sstable reader itself should be pass the owned ranges
// so it can skip over the disowned ranges efficiently using the index.
// Ref https://github.com/scylladb/scylladb/issues/12998
flat_mutation_reader_v2 setup_sstable_reader() const {
auto reader = make_sstable_reader(_schema,
_permit,
query::full_partition_range,
_schema->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no);
if (!_owned_ranges_checker) {
return reader;
return make_sstable_reader(_schema,
_permit,
query::full_partition_range,
_schema->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no);
}
return make_filtering_reader(std::move(reader), make_partition_filter());
auto source = mutation_source([this] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
log_trace("Creating sstable set reader with range {}", range);
return make_sstable_reader(std::move(s),
std::move(permit),
range,
slice,
std::move(trace_state),
fwd,
fwd_mr);
});
auto owned_range_generator = [this] () -> std::optional<dht::partition_range> {
auto r = _owned_ranges_checker->next_owned_range();
if (r == nullptr) {
return std::nullopt;
}
log_trace("Skipping to the next owned range {}", *r);
return dht::to_partition_range(*r);
};
return make_flat_multi_range_reader(_schema, _permit, std::move(source),
std::move(owned_range_generator),
_schema->full_slice(),
tracing::trace_state_ptr());
}
virtual sstables::sstable_set make_sstable_set_for_input() const {

View File

@@ -39,6 +39,13 @@ public:
return _it != _sorted_owned_ranges.end() && _it->contains(t, dht::token_comparator());
}
const dht::token_range* next_owned_range() const noexcept {
if (_it == _sorted_owned_ranges.end()) {
return nullptr;
}
return &*_it++;
}
static flat_mutation_reader_v2::filter make_partition_filter(const dht::token_range_vector& sorted_owned_ranges);
};