mutation_compactor: prepare for sstable compaction
compact_mutation code is going to be shared among queries and sstable compaction. There are some differences though. Queries don't provide _max_purgeable and sstable compaction don't need any limits. Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
@@ -35,6 +35,11 @@ enum class emit_only_live_rows {
|
||||
yes,
|
||||
};
|
||||
|
||||
enum class compact_for_sstables {
|
||||
no,
|
||||
yes,
|
||||
};
|
||||
|
||||
/*
|
||||
template<typename T>
|
||||
concept bool CompactedMutationsConsumer() {
|
||||
@@ -55,7 +60,7 @@ concept bool CompactedMutationsConsumer() {
|
||||
// emit_only_live::yes will cause compact_for_query to emit only live
|
||||
// static and clustering rows. It doesn't affect the way range tombstones are
|
||||
// emitted.
|
||||
template<emit_only_live_rows OnlyLive, typename Consumer>
|
||||
template<emit_only_live_rows OnlyLive, compact_for_sstables SSTableCompaction, typename Consumer>
|
||||
class compact_mutation {
|
||||
const schema& _schema;
|
||||
gc_clock::time_point _query_time;
|
||||
@@ -63,9 +68,9 @@ class compact_mutation {
|
||||
std::function<api::timestamp_type(const dht::decorated_key&)> _get_max_purgeable;
|
||||
api::timestamp_type _max_purgeable = api::max_timestamp;
|
||||
const query::partition_slice& _slice;
|
||||
uint32_t _row_limit;
|
||||
uint32_t _partition_limit;
|
||||
uint32_t _partition_row_limit;
|
||||
uint32_t _row_limit{};
|
||||
uint32_t _partition_limit{};
|
||||
uint32_t _partition_row_limit{};
|
||||
|
||||
Consumer _consumer;
|
||||
tombstone _partition_tombstone;
|
||||
@@ -82,6 +87,9 @@ private:
|
||||
static constexpr bool only_live() {
|
||||
return OnlyLive == emit_only_live_rows::yes;
|
||||
}
|
||||
static constexpr bool sstable_compaction() {
|
||||
return SSTableCompaction == compact_for_sstables::yes;
|
||||
}
|
||||
|
||||
void partition_is_not_empty() {
|
||||
if (_empty_partition) {
|
||||
@@ -95,7 +103,7 @@ private:
|
||||
}
|
||||
|
||||
bool can_purge_tombstone(const tombstone& t) {
|
||||
return t.timestamp < _max_purgeable && t.deletion_time < _gc_before;
|
||||
return (!sstable_compaction() || t.timestamp < _max_purgeable) && t.deletion_time < _gc_before;
|
||||
};
|
||||
public:
|
||||
compact_mutation(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint32_t limit,
|
||||
@@ -108,7 +116,22 @@ public:
|
||||
, _partition_limit(partition_limit)
|
||||
, _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit())
|
||||
, _consumer(std::move(consumer))
|
||||
{ }
|
||||
{
|
||||
static_assert(!sstable_compaction(), "This constructor cannot be used for sstable compaction.");
|
||||
}
|
||||
|
||||
compact_mutation(const schema& s, gc_clock::time_point compaction_time, Consumer consumer,
|
||||
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable)
|
||||
: _schema(s)
|
||||
, _query_time(compaction_time)
|
||||
, _gc_before(_query_time - s.gc_grace_seconds())
|
||||
, _get_max_purgeable(std::move(get_max_purgeable))
|
||||
, _slice(query::full_slice)
|
||||
, _consumer(std::move(consumer))
|
||||
{
|
||||
static_assert(sstable_compaction(), "This constructor can only be used for sstable compaction.");
|
||||
static_assert(!only_live(), "SSTable compaction cannot be run with emit_only_live_rows::yes.");
|
||||
}
|
||||
|
||||
void consume_new_partition(const dht::decorated_key& dk) {
|
||||
auto& pk = dk.key();
|
||||
@@ -120,7 +143,7 @@ public:
|
||||
_current_tombstone = { };
|
||||
_partition_tombstone = { };
|
||||
_current_partition_limit = std::min(_row_limit, _partition_row_limit);
|
||||
if (_get_max_purgeable) {
|
||||
if (sstable_compaction()) {
|
||||
_max_purgeable = _get_max_purgeable(dk);
|
||||
}
|
||||
}
|
||||
@@ -161,7 +184,7 @@ public:
|
||||
}
|
||||
} else if (!only_live()) {
|
||||
if (is_live) {
|
||||
if (_rows_in_current_partition == _current_partition_limit) {
|
||||
if (!sstable_compaction() && _rows_in_current_partition == _current_partition_limit) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
_rows_in_current_partition++;
|
||||
@@ -204,7 +227,9 @@ public:
|
||||
_row_limit -= _rows_in_current_partition;
|
||||
_partition_limit -= 1;
|
||||
_consumer.consume_end_of_partition();
|
||||
return _row_limit && _partition_limit ? stop_iteration::no : stop_iteration::yes;
|
||||
if (!sstable_compaction()) {
|
||||
return _row_limit && _partition_limit ? stop_iteration::no : stop_iteration::yes;
|
||||
}
|
||||
}
|
||||
return stop_iteration::no;
|
||||
}
|
||||
@@ -213,3 +238,13 @@ public:
|
||||
return _consumer.consume_end_of_stream();
|
||||
}
|
||||
};
|
||||
|
||||
template<emit_only_live_rows only_live, typename Consumer>
|
||||
struct compact_for_query : compact_mutation<only_live, compact_for_sstables::no, Consumer> {
|
||||
using compact_mutation<only_live, compact_for_sstables::no, Consumer>::compact_mutation;
|
||||
};
|
||||
|
||||
template<typename Consumer>
|
||||
struct compact_for_compaction : compact_mutation<emit_only_live_rows::no, compact_for_sstables::yes, Consumer> {
|
||||
using compact_mutation<emit_only_live_rows::no, compact_for_sstables::yes, Consumer>::compact_mutation;
|
||||
};
|
||||
@@ -1799,7 +1799,7 @@ future<data_query_result> data_query(schema_ptr s, const mutation_source& source
|
||||
auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);
|
||||
|
||||
auto qrb = query_result_builder(*s, builder);
|
||||
auto cfq = compact_mutation<emit_only_live_rows::yes, query_result_builder>(*s, query_time, slice, row_limit, partition_limit, std::move(qrb));
|
||||
auto cfq = compact_for_query<emit_only_live_rows::yes, query_result_builder>(*s, query_time, slice, row_limit, partition_limit, std::move(qrb));
|
||||
|
||||
auto reader = source(s, range, query::clustering_key_filtering_context::create(s, slice), service::get_local_sstable_query_read_priority());
|
||||
return consume_flattened(std::move(reader), std::move(cfq), is_reversed);
|
||||
@@ -1875,7 +1875,7 @@ mutation_query(schema_ptr s,
|
||||
auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);
|
||||
|
||||
auto rrb = reconcilable_result_builder(*s, slice);
|
||||
auto cfq = compact_mutation<emit_only_live_rows::no, reconcilable_result_builder>(*s, query_time, slice, row_limit, partition_limit, std::move(rrb));
|
||||
auto cfq = compact_for_query<emit_only_live_rows::no, reconcilable_result_builder>(*s, query_time, slice, row_limit, partition_limit, std::move(rrb));
|
||||
|
||||
auto reader = source(s, range, query::clustering_key_filtering_context::create(s, slice), service::get_local_sstable_query_read_priority());
|
||||
return consume_flattened(std::move(reader), std::move(cfq), is_reversed);
|
||||
|
||||
Reference in New Issue
Block a user