compaction: Add support to split large partitions

Adds support for splitting large partitions during compaction.

Large partitions introduce many problems, like memory overhead and
breaks incremental compaction promise. We want to split large
partitions across fixed-size fragments. We'll allow a partition
to exceed size limit by 10%, as we don't want to unnecessarily split
partitions that just crossed the limit boundary.

To avoid having to open a minimal of 2 fragments in a read, partition
tombstone will be replicated to every fragment storing the
partition.

The splitting isn't enabled by default, and can be used by
strategies that are run aware like ICS. LCS still cannot support
it as it's still using physical level metadata, not run id.

An incremental reader for sstable runs will follow soon.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2022-08-06 00:14:47 -03:00
parent 4bc24acf81
commit e2ccafbe38
3 changed files with 136 additions and 17 deletions

View File

@@ -277,6 +277,18 @@ class compacted_fragments_writer {
stop_func_t _stop_compaction_writer;
std::optional<utils::observer<>> _stop_request_observer;
bool _unclosed_partition = false;
struct partition_state {
dht::decorated_key_opt dk;
// Partition tombstone is saved for the purpose of replicating it to every fragment storing a partition pL.
// Then when reading from the SSTable run, we won't unnecessarily have to open >= 2 fragments, the one which
// contains the tombstone and another one(s) that has the partition slice being queried.
::tombstone tombstone;
// Used to determine whether any active tombstones need closing at EOS.
::tombstone current_emitted_tombstone;
// Track last emitted clustering row, which will be used to close active tombstone if splitting partition
position_in_partition last_pos = position_in_partition::before_all_clustered_rows();
bool is_splitting_partition = false;
} _current_partition;
private:
inline void maybe_abort_compaction();
@@ -286,6 +298,13 @@ private:
consume_end_of_stream();
});
}
void stop_current_writer();
bool can_split_large_partition() const;
void track_last_position(position_in_partition_view pos);
void split_large_partition();
void do_consume_new_partition(const dht::decorated_key& dk);
stop_iteration do_consume_end_of_partition();
public:
explicit compacted_fragments_writer(compaction& c, creator_func_t cpw, stop_func_t scw)
: _c(c)
@@ -304,7 +323,7 @@ public:
void consume_new_partition(const dht::decorated_key& dk);
void consume(tombstone t) { _compaction_writer->writer.consume(t); }
void consume(tombstone t);
stop_iteration consume(static_row&& sr, tombstone, bool) {
maybe_abort_compaction();
return _compaction_writer->writer.consume(std::move(sr));
@@ -312,17 +331,11 @@ public:
stop_iteration consume(static_row&& sr) {
return consume(std::move(sr), tombstone{}, bool{});
}
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) {
maybe_abort_compaction();
return _compaction_writer->writer.consume(std::move(cr));
}
stop_iteration consume(clustering_row&& cr, row_tombstone, bool);
stop_iteration consume(clustering_row&& cr) {
return consume(std::move(cr), row_tombstone{}, bool{});
}
stop_iteration consume(range_tombstone_change&& rtc) {
maybe_abort_compaction();
return _compaction_writer->writer.consume(std::move(rtc));
}
stop_iteration consume(range_tombstone_change&& rtc);
stop_iteration consume_end_of_partition();
void consume_end_of_stream();
@@ -448,6 +461,7 @@ protected:
uint64_t _estimated_partitions = 0;
db::replay_position _rp;
encoding_stats_collector _stats_collector;
bool _can_split_large_partition = false;
bool _contains_multi_fragment_runs = false;
mutation_source_metadata _ms_metadata = {};
compaction_sstable_replacer_fn _replacer;
@@ -479,6 +493,7 @@ protected:
, _type(descriptor.options.type())
, _max_sstable_size(descriptor.max_sstable_bytes)
, _sstable_level(descriptor.level)
, _can_split_large_partition(descriptor.can_split_large_partition)
, _replacer(std::move(descriptor.replacer))
, _run_identifier(descriptor.run_identifier)
, _io_priority(descriptor.io_priority)
@@ -867,7 +882,51 @@ void compacted_fragments_writer::maybe_abort_compaction() {
}
}
void compacted_fragments_writer::consume_new_partition(const dht::decorated_key& dk) {
void compacted_fragments_writer::stop_current_writer() {
// stop sstable writer being currently used.
_stop_compaction_writer(&*_compaction_writer);
_compaction_writer = std::nullopt;
}
bool compacted_fragments_writer::can_split_large_partition() const {
return _c._can_split_large_partition;
}
void compacted_fragments_writer::track_last_position(position_in_partition_view pos) {
if (can_split_large_partition()) {
_current_partition.last_pos = pos;
}
}
void compacted_fragments_writer::split_large_partition() {
// Closes the active range tombstone if needed, before emitting partition end.
// after_key(last_pos) is used for both closing and re-opening the active tombstone, which
// will result in current fragment storing an inclusive end bound for last pos, and the
// next fragment storing an exclusive start bound for last pos. This is very important
// for not losing information on the range tombstone.
auto after_last_pos = position_in_partition::after_key(_current_partition.last_pos.key());
if (_current_partition.current_emitted_tombstone) {
auto rtc = range_tombstone_change(after_last_pos, tombstone{});
_c.log_debug("Closing active tombstone {} with {} for partition {}", _current_partition.current_emitted_tombstone, rtc, *_current_partition.dk);
_compaction_writer->writer.consume(std::move(rtc));
}
_c.log_debug("Splitting large partition {} in order to respect SSTable size limit of {}", *_current_partition.dk, pretty_printed_data_size(_c._max_sstable_size));
// Close partition in current writer, and open it again in a new writer.
do_consume_end_of_partition();
stop_current_writer();
do_consume_new_partition(*_current_partition.dk);
// Replicate partition tombstone to every fragment, allowing the SSTable run reader
// to open a single fragment during the read.
if (_current_partition.tombstone) {
consume(_current_partition.tombstone);
}
if (_current_partition.current_emitted_tombstone) {
_compaction_writer->writer.consume(range_tombstone_change(after_last_pos, _current_partition.current_emitted_tombstone));
}
_current_partition.is_splitting_partition = false;
}
void compacted_fragments_writer::do_consume_new_partition(const dht::decorated_key& dk) {
maybe_abort_compaction();
if (!_compaction_writer) {
_compaction_writer = _create_compaction_writer(dk);
@@ -875,17 +934,55 @@ void compacted_fragments_writer::consume_new_partition(const dht::decorated_key&
_c.on_new_partition();
_compaction_writer->writer.consume_new_partition(dk);
_c._cdata.total_keys_written++;
_unclosed_partition = true;
}
stop_iteration compacted_fragments_writer::consume_end_of_partition() {
auto ret = _compaction_writer->writer.consume_end_of_partition();
stop_iteration compacted_fragments_writer::do_consume_end_of_partition() {
_unclosed_partition = false;
return _compaction_writer->writer.consume_end_of_partition();
}
void compacted_fragments_writer::consume_new_partition(const dht::decorated_key& dk) {
_current_partition = {
.dk = dk,
.tombstone = tombstone(),
.current_emitted_tombstone = tombstone(),
.last_pos = position_in_partition(position_in_partition::partition_start_tag_t()),
.is_splitting_partition = false
};
do_consume_new_partition(dk);
_c._cdata.total_keys_written++;
}
void compacted_fragments_writer::consume(tombstone t) {
_current_partition.tombstone = t;
_compaction_writer->writer.consume(t);
}
stop_iteration compacted_fragments_writer::consume(clustering_row&& cr, row_tombstone, bool) {
maybe_abort_compaction();
if (_current_partition.is_splitting_partition) [[unlikely]] {
split_large_partition();
}
track_last_position(cr.position());
auto ret = _compaction_writer->writer.consume(std::move(cr));
if (can_split_large_partition() && ret == stop_iteration::yes) [[unlikely]] {
_current_partition.is_splitting_partition = true;
}
return stop_iteration::no;
}
stop_iteration compacted_fragments_writer::consume(range_tombstone_change&& rtc) {
maybe_abort_compaction();
_current_partition.current_emitted_tombstone = rtc.tombstone();
track_last_position(rtc.position());
return _compaction_writer->writer.consume(std::move(rtc));
}
stop_iteration compacted_fragments_writer::consume_end_of_partition() {
auto ret = do_consume_end_of_partition();
if (ret == stop_iteration::yes) {
// stop sstable writer being currently used.
_stop_compaction_writer(&*_compaction_writer);
_compaction_writer = std::nullopt;
stop_current_writer();
}
return ret;
}

View File

@@ -153,6 +153,8 @@ struct compaction_descriptor {
int level;
// Threshold size for sstable(s) to be created.
uint64_t max_sstable_bytes;
// Can split large partitions at clustering boundary.
bool can_split_large_partition = false;
// Run identifier of output sstables.
sstables::run_id run_identifier;
// The options passed down to the compaction code.

View File

@@ -1284,7 +1284,27 @@ stop_iteration writer::consume(clustering_row&& cr) {
ensure_tombstone_is_written();
ensure_static_row_is_written_if_needed();
write_clustered(cr);
return stop_iteration::no;
auto can_split_partition_at_clustering_boundary = [this] {
// will allow size limit to be exceeded for 10%, so we won't perform unnecessary split
// of a partition which crossed the limit by a small margin.
uint64_t size_threshold = [this] {
const uint64_t max_size = std::numeric_limits<uint64_t>::max();
if (_cfg.max_sstable_size == max_size) {
return max_size;
}
uint64_t threshold_goal = _cfg.max_sstable_size * 1.1;
// handle overflow.
return threshold_goal < _cfg.max_sstable_size ? max_size : threshold_goal;
}();
// Check there are enough promoted index entries, meaning that current fragment won't
// unnecessarily cut the current partition in the middle.
bool has_enough_promoted_index_entries = _pi_write_m.promoted_index_size >= 2;
return get_data_offset() > size_threshold && has_enough_promoted_index_entries;
};
return stop_iteration(can_split_partition_at_clustering_boundary());
}
// Write clustering prefix along with its bound kind and, if not full, its size