database: make_streaming_reader(): require permit

As a preparation for up-front admission, add a permit parameter to
`make_streaming_reader()`, which will be the admitted permit once we
switch to up-front admission. For now it has to be a non-admitted
permit.
A nice side-effect of this patch is that now permits will have a
use-case specific description, instead of the generic "streaming" one.
This commit is contained in:
Botond Dénes
2021-03-30 16:50:25 +03:00
parent 3ec149222d
commit 999169e535
6 changed files with 23 additions and 22 deletions

View File

@@ -2311,7 +2311,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
}
virtual flat_mutation_reader create_reader(
schema_ptr schema,
reader_permit,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -2324,7 +2324,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
_contexts[shard].read_operation = make_foreign(std::make_unique<utils::phased_barrier::operation>(cf.read_in_progress()));
_contexts[shard].semaphore = &cf.streaming_read_concurrency_semaphore();
return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr);
return cf.make_streaming_reader(std::move(schema), std::move(permit), *_contexts[shard].range, slice, fwd_mr);
}
virtual future<> destroy_reader(stopped_reader reader) noexcept override {
auto ctx = std::move(_contexts[this_shard_id()]);

View File

@@ -670,20 +670,20 @@ public:
// reader and a _bounded_ amount of writes which arrive later.
// - Does not populate the cache
// Requires ranges to be sorted and disjoint.
flat_mutation_reader make_streaming_reader(schema_ptr schema,
flat_mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit,
const dht::partition_range_vector& ranges) const;
// Single range overload.
flat_mutation_reader make_streaming_reader(schema_ptr schema, const dht::partition_range& range,
flat_mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
const query::partition_slice& slice,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no) const;
flat_mutation_reader make_streaming_reader(schema_ptr schema, const dht::partition_range& range) {
return make_streaming_reader(schema, range, schema->full_slice());
flat_mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range) {
return make_streaming_reader(std::move(schema), std::move(permit), range, schema->full_slice());
}
// Stream reader from the given sstables
flat_mutation_reader make_streaming_reader(schema_ptr schema, const dht::partition_range& range,
flat_mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
lw_shared_ptr<sstables::sstable_set> sstables) const;
sstables::shared_sstable make_streaming_sstable_for_write(std::optional<sstring> subdir = {});

View File

@@ -458,14 +458,14 @@ public:
if (local_reader) {
auto ms = mutation_source([&cf] (
schema_ptr s,
reader_permit,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
return cf.make_streaming_reader(std::move(s), pr, ps, fwd_mr);
return cf.make_streaming_reader(std::move(s), std::move(permit), pr, ps, fwd_mr);
});
std::tie(_reader, _reader_handle) = make_manually_paused_evictable_reader(
std::move(ms),

View File

@@ -3260,7 +3260,8 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name,
size_t num_partitions_processed = 0;
size_t num_bytes_read = 0;
nr_sst_current += sst_processed.size();
auto reader = table.make_streaming_reader(s, full_partition_range, sst_set);
auto permit = co_await _db.local().obtain_reader_permit(table, "storage_service::load_and_stream()", db::no_timeout);
auto reader = table.make_streaming_reader(s, std::move(permit), full_partition_range, sst_set);
std::exception_ptr eptr;
bool failed = false;
try {

View File

@@ -87,20 +87,20 @@ struct send_info {
dht::token_range_vector ranges;
dht::partition_range_vector prs;
flat_mutation_reader reader;
send_info(database& db_, netw::messaging_service& ms_, utils::UUID plan_id_, utils::UUID cf_id_,
send_info(database& db_, netw::messaging_service& ms_, utils::UUID plan_id_, table& tbl_, reader_permit permit_,
dht::token_range_vector ranges_, netw::messaging_service::msg_addr id_,
uint32_t dst_cpu_id_, stream_reason reason_)
: db(db_)
, ms(ms_)
, plan_id(plan_id_)
, cf_id(cf_id_)
, cf_id(tbl_.schema()->id())
, id(id_)
, dst_cpu_id(dst_cpu_id_)
, reason(reason_)
, cf(db.find_column_family(cf_id))
, cf(tbl_)
, ranges(std::move(ranges_))
, prs(dht::to_partition_ranges(ranges))
, reader(cf.make_streaming_reader(cf.schema(), prs)) {
, reader(cf.make_streaming_reader(cf.schema(), std::move(permit_), prs)) {
}
future<bool> has_relevant_range_on_this_shard() {
return do_with(false, ranges.begin(), [this] (bool& found_relevant_range, dht::token_range_vector::iterator& ranges_it) {
@@ -221,8 +221,10 @@ future<> stream_transfer_task::execute() {
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
sort_and_merge_ranges();
auto reason = session->get_reason();
return session->get_db().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason] (database& db) {
auto si = make_lw_shared<send_info>(db, stream_session::ms(), plan_id, cf_id, std::move(ranges), id, dst_cpu_id, reason);
return session->get_db().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason] (database& db) mutable {
auto& tbl = db.find_column_family(cf_id);
return db.obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout).then([&db, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason] (reader_permit permit) mutable {
auto si = make_lw_shared<send_info>(db, stream_session::ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason);
return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) {
if (!has_relevant_range_on_this_shard) {
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}",
@@ -233,6 +235,7 @@ future<> stream_transfer_task::execute() {
}).finally([si] {
return si->reader.close();
});
});
}).then([this, plan_id, cf_id, id] {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
return session->ms().send_stream_mutation_done(id, plan_id, _ranges,

View File

@@ -236,9 +236,8 @@ sstables::shared_sstable table::make_streaming_sstable_for_write(std::optional<s
}
flat_mutation_reader
table::make_streaming_reader(schema_ptr s,
table::make_streaming_reader(schema_ptr s, reader_permit permit,
const dht::partition_range_vector& ranges) const {
auto permit = _config.streaming_read_concurrency_semaphore->make_permit(s.get(), "stream-ranges");
auto& slice = s->full_slice();
auto& pc = service::get_local_streaming_priority();
@@ -256,9 +255,8 @@ table::make_streaming_reader(schema_ptr s,
return make_flat_multi_range_reader(s, std::move(permit), std::move(source), ranges, slice, pc, nullptr, mutation_reader::forwarding::no);
}
flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range,
flat_mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
const query::partition_slice& slice, mutation_reader::forwarding fwd_mr) const {
auto permit = _config.streaming_read_concurrency_semaphore->make_permit(schema.get(), "stream-range");
const auto& pc = service::get_local_streaming_priority();
auto trace_state = tracing::trace_state_ptr();
const auto fwd = streamed_mutation::forwarding::no;
@@ -272,9 +270,8 @@ flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
}
flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range,
flat_mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
lw_shared_ptr<sstables::sstable_set> sstables) const {
auto permit = _config.streaming_read_concurrency_semaphore->make_permit(schema.get(), "load-and-stream");
auto& slice = schema->full_slice();
const auto& pc = service::get_local_streaming_priority();
auto trace_state = tracing::trace_state_ptr();