From e2ccafbe38be8515523644a43534f596ee1db66e Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sat, 6 Aug 2022 00:14:47 -0300 Subject: [PATCH] compaction: Add support to split large partitions Adds support for splitting large partitions during compaction. Large partitions introduce many problems, like memory overhead and breaks incremental compaction promise. We want to split large partitions across fixed-size fragments. We'll allow a partition to exceed size limit by 10%, as we don't want to unnecessarily split partitions that just crossed the limit boundary. To avoid having to open a minimal of 2 fragments in a read, partition tombstone will be replicated to every fragment storing the partition. The splitting isn't enabled by default, and can be used by strategies that are run aware like ICS. LCS still cannot support it as it's still using physical level metadata, not run id. An incremental reader for sstable runs will follow soon. Signed-off-by: Raphael S. Carvalho --- compaction/compaction.cc | 129 ++++++++++++++++++++++++---- compaction/compaction_descriptor.hh | 2 + sstables/mx/writer.cc | 22 ++++- 3 files changed, 136 insertions(+), 17 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index a31f2838f1..9420f645c3 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -277,6 +277,18 @@ class compacted_fragments_writer { stop_func_t _stop_compaction_writer; std::optional> _stop_request_observer; bool _unclosed_partition = false; + struct partition_state { + dht::decorated_key_opt dk; + // Partition tombstone is saved for the purpose of replicating it to every fragment storing a partition pL. + // Then when reading from the SSTable run, we won't unnecessarily have to open >= 2 fragments, the one which + // contains the tombstone and another one(s) that has the partition slice being queried. + ::tombstone tombstone; + // Used to determine whether any active tombstones need closing at EOS. + ::tombstone current_emitted_tombstone; + // Track last emitted clustering row, which will be used to close active tombstone if splitting partition + position_in_partition last_pos = position_in_partition::before_all_clustered_rows(); + bool is_splitting_partition = false; + } _current_partition; private: inline void maybe_abort_compaction(); @@ -286,6 +298,13 @@ private: consume_end_of_stream(); }); } + + void stop_current_writer(); + bool can_split_large_partition() const; + void track_last_position(position_in_partition_view pos); + void split_large_partition(); + void do_consume_new_partition(const dht::decorated_key& dk); + stop_iteration do_consume_end_of_partition(); public: explicit compacted_fragments_writer(compaction& c, creator_func_t cpw, stop_func_t scw) : _c(c) @@ -304,7 +323,7 @@ public: void consume_new_partition(const dht::decorated_key& dk); - void consume(tombstone t) { _compaction_writer->writer.consume(t); } + void consume(tombstone t); stop_iteration consume(static_row&& sr, tombstone, bool) { maybe_abort_compaction(); return _compaction_writer->writer.consume(std::move(sr)); @@ -312,17 +331,11 @@ public: stop_iteration consume(static_row&& sr) { return consume(std::move(sr), tombstone{}, bool{}); } - stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { - maybe_abort_compaction(); - return _compaction_writer->writer.consume(std::move(cr)); - } + stop_iteration consume(clustering_row&& cr, row_tombstone, bool); stop_iteration consume(clustering_row&& cr) { return consume(std::move(cr), row_tombstone{}, bool{}); } - stop_iteration consume(range_tombstone_change&& rtc) { - maybe_abort_compaction(); - return _compaction_writer->writer.consume(std::move(rtc)); - } + stop_iteration consume(range_tombstone_change&& rtc); stop_iteration consume_end_of_partition(); void consume_end_of_stream(); @@ -448,6 +461,7 @@ protected: uint64_t _estimated_partitions = 0; db::replay_position _rp; encoding_stats_collector _stats_collector; + bool _can_split_large_partition = false; bool _contains_multi_fragment_runs = false; mutation_source_metadata _ms_metadata = {}; compaction_sstable_replacer_fn _replacer; @@ -479,6 +493,7 @@ protected: , _type(descriptor.options.type()) , _max_sstable_size(descriptor.max_sstable_bytes) , _sstable_level(descriptor.level) + , _can_split_large_partition(descriptor.can_split_large_partition) , _replacer(std::move(descriptor.replacer)) , _run_identifier(descriptor.run_identifier) , _io_priority(descriptor.io_priority) @@ -867,7 +882,51 @@ void compacted_fragments_writer::maybe_abort_compaction() { } } -void compacted_fragments_writer::consume_new_partition(const dht::decorated_key& dk) { +void compacted_fragments_writer::stop_current_writer() { + // stop sstable writer being currently used. + _stop_compaction_writer(&*_compaction_writer); + _compaction_writer = std::nullopt; +} + +bool compacted_fragments_writer::can_split_large_partition() const { + return _c._can_split_large_partition; +} + +void compacted_fragments_writer::track_last_position(position_in_partition_view pos) { + if (can_split_large_partition()) { + _current_partition.last_pos = pos; + } +} + +void compacted_fragments_writer::split_large_partition() { + // Closes the active range tombstone if needed, before emitting partition end. + // after_key(last_pos) is used for both closing and re-opening the active tombstone, which + // will result in current fragment storing an inclusive end bound for last pos, and the + // next fragment storing an exclusive start bound for last pos. This is very important + // for not losing information on the range tombstone. + auto after_last_pos = position_in_partition::after_key(_current_partition.last_pos.key()); + if (_current_partition.current_emitted_tombstone) { + auto rtc = range_tombstone_change(after_last_pos, tombstone{}); + _c.log_debug("Closing active tombstone {} with {} for partition {}", _current_partition.current_emitted_tombstone, rtc, *_current_partition.dk); + _compaction_writer->writer.consume(std::move(rtc)); + } + _c.log_debug("Splitting large partition {} in order to respect SSTable size limit of {}", *_current_partition.dk, pretty_printed_data_size(_c._max_sstable_size)); + // Close partition in current writer, and open it again in a new writer. + do_consume_end_of_partition(); + stop_current_writer(); + do_consume_new_partition(*_current_partition.dk); + // Replicate partition tombstone to every fragment, allowing the SSTable run reader + // to open a single fragment during the read. + if (_current_partition.tombstone) { + consume(_current_partition.tombstone); + } + if (_current_partition.current_emitted_tombstone) { + _compaction_writer->writer.consume(range_tombstone_change(after_last_pos, _current_partition.current_emitted_tombstone)); + } + _current_partition.is_splitting_partition = false; +} + +void compacted_fragments_writer::do_consume_new_partition(const dht::decorated_key& dk) { maybe_abort_compaction(); if (!_compaction_writer) { _compaction_writer = _create_compaction_writer(dk); @@ -875,17 +934,55 @@ void compacted_fragments_writer::consume_new_partition(const dht::decorated_key& _c.on_new_partition(); _compaction_writer->writer.consume_new_partition(dk); - _c._cdata.total_keys_written++; _unclosed_partition = true; } -stop_iteration compacted_fragments_writer::consume_end_of_partition() { - auto ret = _compaction_writer->writer.consume_end_of_partition(); +stop_iteration compacted_fragments_writer::do_consume_end_of_partition() { _unclosed_partition = false; + return _compaction_writer->writer.consume_end_of_partition(); +} + +void compacted_fragments_writer::consume_new_partition(const dht::decorated_key& dk) { + _current_partition = { + .dk = dk, + .tombstone = tombstone(), + .current_emitted_tombstone = tombstone(), + .last_pos = position_in_partition(position_in_partition::partition_start_tag_t()), + .is_splitting_partition = false + }; + do_consume_new_partition(dk); + _c._cdata.total_keys_written++; +} + +void compacted_fragments_writer::consume(tombstone t) { + _current_partition.tombstone = t; + _compaction_writer->writer.consume(t); +} + +stop_iteration compacted_fragments_writer::consume(clustering_row&& cr, row_tombstone, bool) { + maybe_abort_compaction(); + if (_current_partition.is_splitting_partition) [[unlikely]] { + split_large_partition(); + } + track_last_position(cr.position()); + auto ret = _compaction_writer->writer.consume(std::move(cr)); + if (can_split_large_partition() && ret == stop_iteration::yes) [[unlikely]] { + _current_partition.is_splitting_partition = true; + } + return stop_iteration::no; +} + +stop_iteration compacted_fragments_writer::consume(range_tombstone_change&& rtc) { + maybe_abort_compaction(); + _current_partition.current_emitted_tombstone = rtc.tombstone(); + track_last_position(rtc.position()); + return _compaction_writer->writer.consume(std::move(rtc)); +} + +stop_iteration compacted_fragments_writer::consume_end_of_partition() { + auto ret = do_consume_end_of_partition(); if (ret == stop_iteration::yes) { - // stop sstable writer being currently used. - _stop_compaction_writer(&*_compaction_writer); - _compaction_writer = std::nullopt; + stop_current_writer(); } return ret; } diff --git a/compaction/compaction_descriptor.hh b/compaction/compaction_descriptor.hh index d474d9675a..28727a6102 100644 --- a/compaction/compaction_descriptor.hh +++ b/compaction/compaction_descriptor.hh @@ -153,6 +153,8 @@ struct compaction_descriptor { int level; // Threshold size for sstable(s) to be created. uint64_t max_sstable_bytes; + // Can split large partitions at clustering boundary. + bool can_split_large_partition = false; // Run identifier of output sstables. sstables::run_id run_identifier; // The options passed down to the compaction code. diff --git a/sstables/mx/writer.cc b/sstables/mx/writer.cc index d7a02e9096..aa67511d5f 100644 --- a/sstables/mx/writer.cc +++ b/sstables/mx/writer.cc @@ -1284,7 +1284,27 @@ stop_iteration writer::consume(clustering_row&& cr) { ensure_tombstone_is_written(); ensure_static_row_is_written_if_needed(); write_clustered(cr); - return stop_iteration::no; + + auto can_split_partition_at_clustering_boundary = [this] { + // will allow size limit to be exceeded for 10%, so we won't perform unnecessary split + // of a partition which crossed the limit by a small margin. + uint64_t size_threshold = [this] { + const uint64_t max_size = std::numeric_limits::max(); + if (_cfg.max_sstable_size == max_size) { + return max_size; + } + uint64_t threshold_goal = _cfg.max_sstable_size * 1.1; + // handle overflow. + return threshold_goal < _cfg.max_sstable_size ? max_size : threshold_goal; + }(); + // Check there are enough promoted index entries, meaning that current fragment won't + // unnecessarily cut the current partition in the middle. + bool has_enough_promoted_index_entries = _pi_write_m.promoted_index_size >= 2; + + return get_data_offset() > size_threshold && has_enough_promoted_index_entries; + }; + + return stop_iteration(can_split_partition_at_clustering_boundary()); } // Write clustering prefix along with its bound kind and, if not full, its size