replica: make_[multishard_]streaming_reader(): make compaction_time mandatory

Now that all users have opted in unconditionally, there is no point in
keeping this optional. Make it mandatory to make sure there are no
opt-out by mistake.
The global override via enable_compacting_data_for_streaming_and_repair
config item still remains, allowing compaction to be force turned-off.
This commit is contained in:
Botond Dénes
2023-07-19 10:20:58 -04:00
parent fdaf908967
commit b599f15b26
3 changed files with 16 additions and 16 deletions

View File

@@ -1536,10 +1536,10 @@ class streaming_reader_lifecycle_policy
};
distributed<replica::database>& _db;
table_id _table_id;
std::optional<gc_clock::time_point> _compaction_time;
gc_clock::time_point _compaction_time;
std::vector<reader_context> _contexts;
public:
streaming_reader_lifecycle_policy(distributed<replica::database>& db, table_id table_id, std::optional<gc_clock::time_point> compaction_time)
streaming_reader_lifecycle_policy(distributed<replica::database>& db, table_id table_id, gc_clock::time_point compaction_time)
: _db(db)
, _table_id(table_id)
, _compaction_time(compaction_time)
@@ -2911,7 +2911,7 @@ void database::unplug_view_update_generator() noexcept {
flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit,
std::function<std::optional<dht::partition_range>()> range_generator,
std::optional<gc_clock::time_point> compaction_time) {
gc_clock::time_point compaction_time) {
auto& table = db.local().find_column_family(schema);
auto erm = table.get_effective_replication_map();
@@ -2932,7 +2932,7 @@ flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::da
}
flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit, const dht::partition_range& range, std::optional<gc_clock::time_point> compaction_time)
schema_ptr schema, reader_permit permit, const dht::partition_range& range, gc_clock::time_point compaction_time)
{
const auto table_id = schema->id();
const auto& full_slice = schema->full_slice();

View File

@@ -691,21 +691,21 @@ public:
// When compaction_time is engaged, the reader's output will be compacted, with the provided query time.
// This compaction doesn't do tombstone garbage collection.
flat_mutation_reader_v2 make_streaming_reader(schema_ptr schema, reader_permit permit,
const dht::partition_range_vector& ranges, std::optional<gc_clock::time_point> compaction_time) const;
const dht::partition_range_vector& ranges, gc_clock::time_point compaction_time) const;
// Single range overload.
flat_mutation_reader_v2 make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
const query::partition_slice& slice,
mutation_reader::forwarding fwd_mr,
std::optional<gc_clock::time_point> compaction_time) const;
gc_clock::time_point compaction_time) const;
flat_mutation_reader_v2 make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, std::optional<gc_clock::time_point> compaction_time) {
flat_mutation_reader_v2 make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, gc_clock::time_point compaction_time) {
return make_streaming_reader(schema, std::move(permit), range, schema->full_slice(), mutation_reader::forwarding::no, compaction_time);
}
// Stream reader from the given sstables
flat_mutation_reader_v2 make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
lw_shared_ptr<sstables::sstable_set> sstables, std::optional<gc_clock::time_point> compaction_time) const;
lw_shared_ptr<sstables::sstable_set> sstables, gc_clock::time_point compaction_time) const;
// Make a reader which reads only from the row-cache.
// The reader doens't populate the cache, it reads only what is in the cache
@@ -1786,9 +1786,9 @@ future<> start_large_data_handler(sharded<replica::database>& db);
// Opt-in for compacting the output by passing `compaction_time`, see
// make_streaming_reader() for more details.
flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db, schema_ptr schema, reader_permit permit,
std::function<std::optional<dht::partition_range>()> range_generator, std::optional<gc_clock::time_point> compaction_time);
std::function<std::optional<dht::partition_range>()> range_generator, gc_clock::time_point compaction_time);
flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit, const dht::partition_range& range, std::optional<gc_clock::time_point> compaction_time);
schema_ptr schema, reader_permit permit, const dht::partition_range& range, gc_clock::time_point compaction_time);
bool is_internal_keyspace(std::string_view name);

View File

@@ -292,13 +292,13 @@ sstables::shared_sstable table::make_streaming_staging_sstable() {
return make_streaming_sstable_for_write(sstables::staging_dir);
}
static flat_mutation_reader_v2 maybe_compact_for_streaming(flat_mutation_reader_v2 underlying, const compaction_manager& cm, std::optional<gc_clock::time_point> compaction_time, bool compaction_enabled) {
if (!compaction_time || !compaction_enabled) {
static flat_mutation_reader_v2 maybe_compact_for_streaming(flat_mutation_reader_v2 underlying, const compaction_manager& cm, gc_clock::time_point compaction_time, bool compaction_enabled) {
if (!compaction_enabled) {
return underlying;
}
return make_compacting_reader(
std::move(underlying),
*compaction_time,
compaction_time,
[] (const dht::decorated_key&) { return api::min_timestamp; }, // disable tombstone purging
cm.get_tombstone_gc_state(),
streamed_mutation::forwarding::no);
@@ -307,7 +307,7 @@ static flat_mutation_reader_v2 maybe_compact_for_streaming(flat_mutation_reader_
flat_mutation_reader_v2
table::make_streaming_reader(schema_ptr s, reader_permit permit,
const dht::partition_range_vector& ranges,
std::optional<gc_clock::time_point> compaction_time) const {
gc_clock::time_point compaction_time) const {
auto& slice = s->full_slice();
auto source = mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice,
@@ -328,7 +328,7 @@ table::make_streaming_reader(schema_ptr s, reader_permit permit,
}
flat_mutation_reader_v2 table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
const query::partition_slice& slice, mutation_reader::forwarding fwd_mr, std::optional<gc_clock::time_point> compaction_time) const {
const query::partition_slice& slice, mutation_reader::forwarding fwd_mr, gc_clock::time_point compaction_time) const {
auto trace_state = tracing::trace_state_ptr();
const auto fwd = streamed_mutation::forwarding::no;
@@ -345,7 +345,7 @@ flat_mutation_reader_v2 table::make_streaming_reader(schema_ptr schema, reader_p
}
flat_mutation_reader_v2 table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
lw_shared_ptr<sstables::sstable_set> sstables, std::optional<gc_clock::time_point> compaction_time) const {
lw_shared_ptr<sstables::sstable_set> sstables, gc_clock::time_point compaction_time) const {
auto& slice = schema->full_slice();
auto trace_state = tracing::trace_state_ptr();
const auto fwd = streamed_mutation::forwarding::no;