replica/table: add optional compacting to make_multishard_streaming_reader()

Doing to make_multishard_streaming_reader() what the previous commit did
to make_streaming_reader(). In fact, the new compaction_time parameter
is simply forwarded to the make_streaming_reader() on the shard readers.

Call sites are updated, but none opt in just yet.
This commit is contained in:
Botond Dénes
2023-07-19 02:57:58 -04:00
parent 42b0dd5558
commit 2f8d77e97b
4 changed files with 20 additions and 12 deletions

View File

@@ -299,14 +299,14 @@ flat_mutation_reader_v2 repair_reader::make_reader(
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
}
return std::optional<dht::partition_range>();
});
}, {});
}
case read_strategy::multishard_filter: {
// We can't have two permits with count resource for 1 repair.
// So we release the one on _permit so the only one is the one the
// shard reader will obtain.
_permit.release_base_resources();
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range),
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, {}),
[&remote_sharder, remote_shard](const dht::decorated_key& k) {
return remote_sharder.shard_of(k.token()) == remote_shard;
});

View File

@@ -1536,9 +1536,14 @@ class streaming_reader_lifecycle_policy
};
distributed<replica::database>& _db;
table_id _table_id;
std::optional<gc_clock::time_point> _compaction_time;
std::vector<reader_context> _contexts;
public:
streaming_reader_lifecycle_policy(distributed<replica::database>& db, table_id table_id) : _db(db), _table_id(table_id), _contexts(smp::count) {
streaming_reader_lifecycle_policy(distributed<replica::database>& db, table_id table_id, std::optional<gc_clock::time_point> compaction_time)
: _db(db)
, _table_id(table_id)
, _compaction_time(compaction_time)
, _contexts(smp::count) {
}
virtual flat_mutation_reader_v2 create_reader(
schema_ptr schema,
@@ -1554,7 +1559,7 @@ public:
_contexts[shard].read_operation = make_foreign(std::make_unique<utils::phased_barrier::operation>(cf.read_in_progress()));
_contexts[shard].semaphore = &cf.streaming_read_concurrency_semaphore();
return cf.make_streaming_reader(std::move(schema), std::move(permit), *_contexts[shard].range, slice, fwd_mr, {});
return cf.make_streaming_reader(std::move(schema), std::move(permit), *_contexts[shard].range, slice, fwd_mr, _compaction_time);
}
virtual const dht::partition_range* get_read_range() const override {
const auto shard = this_shard_id();
@@ -2905,11 +2910,12 @@ void database::unplug_view_update_generator() noexcept {
flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit,
std::function<std::optional<dht::partition_range>()> range_generator) {
std::function<std::optional<dht::partition_range>()> range_generator,
std::optional<gc_clock::time_point> compaction_time) {
auto& table = db.local().find_column_family(schema);
auto erm = table.get_effective_replication_map();
auto ms = mutation_source([&db, erm] (schema_ptr s,
auto ms = mutation_source([&db, erm, compaction_time] (schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
@@ -2917,7 +2923,7 @@ flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::da
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
auto table_id = s->id();
return make_multishard_combining_reader_v2(make_shared<replica::streaming_reader_lifecycle_policy>(db, table_id),
return make_multishard_combining_reader_v2(seastar::make_shared<replica::streaming_reader_lifecycle_policy>(db, table_id, compaction_time),
std::move(s), erm, std::move(permit), pr, ps, std::move(trace_state), fwd_mr);
});
auto&& full_slice = schema->full_slice();
@@ -2926,13 +2932,13 @@ flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::da
}
flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit, const dht::partition_range& range)
schema_ptr schema, reader_permit permit, const dht::partition_range& range, std::optional<gc_clock::time_point> compaction_time)
{
const auto table_id = schema->id();
const auto& full_slice = schema->full_slice();
auto erm = db.local().find_column_family(schema).get_effective_replication_map();
return make_multishard_combining_reader_v2(
make_shared<replica::streaming_reader_lifecycle_policy>(db, table_id),
seastar::make_shared<replica::streaming_reader_lifecycle_policy>(db, table_id, compaction_time),
std::move(schema),
std::move(erm),
std::move(permit),

View File

@@ -1783,10 +1783,12 @@ future<> start_large_data_handler(sharded<replica::database>& db);
//
// Shard readers are created via `table::make_streaming_reader()`.
// Range generator must generate disjoint, monotonically increasing ranges.
// Opt-in for compacting the output by passing `compaction_time`, see
// make_streaming_reader() for more details.
flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db, schema_ptr schema, reader_permit permit,
std::function<std::optional<dht::partition_range>()> range_generator);
std::function<std::optional<dht::partition_range>()> range_generator, std::optional<gc_clock::time_point> compaction_time);
flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit, const dht::partition_range& range);
schema_ptr schema, reader_permit permit, const dht::partition_range& range, std::optional<gc_clock::time_point> compaction_time);
bool is_internal_keyspace(std::string_view name);

View File

@@ -2310,7 +2310,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
return dht::to_partition_range(*next);
}
return std::nullopt;
});
}, gc_clock::now());
auto close_tested_reader = deferred_close(tested_reader);
auto reader_factory = [db = &env.db()] (