From 999169e535b1b5c18e3a9aa2ebb9f926aef0cb49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 30 Mar 2021 16:50:25 +0300 Subject: [PATCH] database: make_streaming_reader(): require permit As a preparation for up-front admission, add a permit parameter to `make_streaming_reader()`, which will be the admitted permit once we switch to up-front admission. For now it has to be a non-admitted permit. A nice side-effect of this patch is that now permits will have a use-case specific description, instead of the generic "streaming" one. --- database.cc | 4 ++-- database.hh | 10 +++++----- repair/row_level.cc | 4 ++-- service/storage_service.cc | 3 ++- streaming/stream_transfer_task.cc | 15 +++++++++------ table.cc | 9 +++------ 6 files changed, 23 insertions(+), 22 deletions(-) diff --git a/database.cc b/database.cc index 2f466850b9..fe29360348 100644 --- a/database.cc +++ b/database.cc @@ -2311,7 +2311,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed& db, } virtual flat_mutation_reader create_reader( schema_ptr schema, - reader_permit, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -2324,7 +2324,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed& db, _contexts[shard].read_operation = make_foreign(std::make_unique(cf.read_in_progress())); _contexts[shard].semaphore = &cf.streaming_read_concurrency_semaphore(); - return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr); + return cf.make_streaming_reader(std::move(schema), std::move(permit), *_contexts[shard].range, slice, fwd_mr); } virtual future<> destroy_reader(stopped_reader reader) noexcept override { auto ctx = std::move(_contexts[this_shard_id()]); diff --git a/database.hh b/database.hh index 5f6d67c6b8..90492b445e 100644 --- a/database.hh +++ b/database.hh @@ -670,20 +670,20 @@ public: // reader and a _bounded_ amount of writes which arrive later. // - Does not populate the cache // Requires ranges to be sorted and disjoint. - flat_mutation_reader make_streaming_reader(schema_ptr schema, + flat_mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range_vector& ranges) const; // Single range overload. - flat_mutation_reader make_streaming_reader(schema_ptr schema, const dht::partition_range& range, + flat_mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no) const; - flat_mutation_reader make_streaming_reader(schema_ptr schema, const dht::partition_range& range) { - return make_streaming_reader(schema, range, schema->full_slice()); + flat_mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range) { + return make_streaming_reader(std::move(schema), std::move(permit), range, schema->full_slice()); } // Stream reader from the given sstables - flat_mutation_reader make_streaming_reader(schema_ptr schema, const dht::partition_range& range, + flat_mutation_reader make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, lw_shared_ptr sstables) const; sstables::shared_sstable make_streaming_sstable_for_write(std::optional subdir = {}); diff --git a/repair/row_level.cc b/repair/row_level.cc index c3693945d7..44490fdc48 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -458,14 +458,14 @@ public: if (local_reader) { auto ms = mutation_source([&cf] ( schema_ptr s, - reader_permit, + reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, const io_priority_class& pc, tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding fwd_mr) { - return cf.make_streaming_reader(std::move(s), pr, ps, fwd_mr); + return cf.make_streaming_reader(std::move(s), std::move(permit), pr, ps, fwd_mr); }); std::tie(_reader, _reader_handle) = make_manually_paused_evictable_reader( std::move(ms), diff --git a/service/storage_service.cc b/service/storage_service.cc index bb2c7a30e1..4f27641455 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3260,7 +3260,8 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name, size_t num_partitions_processed = 0; size_t num_bytes_read = 0; nr_sst_current += sst_processed.size(); - auto reader = table.make_streaming_reader(s, full_partition_range, sst_set); + auto permit = co_await _db.local().obtain_reader_permit(table, "storage_service::load_and_stream()", db::no_timeout); + auto reader = table.make_streaming_reader(s, std::move(permit), full_partition_range, sst_set); std::exception_ptr eptr; bool failed = false; try { diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index d216bdbffd..ec7520a1c2 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -87,20 +87,20 @@ struct send_info { dht::token_range_vector ranges; dht::partition_range_vector prs; flat_mutation_reader reader; - send_info(database& db_, netw::messaging_service& ms_, utils::UUID plan_id_, utils::UUID cf_id_, + send_info(database& db_, netw::messaging_service& ms_, utils::UUID plan_id_, table& tbl_, reader_permit permit_, dht::token_range_vector ranges_, netw::messaging_service::msg_addr id_, uint32_t dst_cpu_id_, stream_reason reason_) : db(db_) , ms(ms_) , plan_id(plan_id_) - , cf_id(cf_id_) + , cf_id(tbl_.schema()->id()) , id(id_) , dst_cpu_id(dst_cpu_id_) , reason(reason_) - , cf(db.find_column_family(cf_id)) + , cf(tbl_) , ranges(std::move(ranges_)) , prs(dht::to_partition_ranges(ranges)) - , reader(cf.make_streaming_reader(cf.schema(), prs)) { + , reader(cf.make_streaming_reader(cf.schema(), std::move(permit_), prs)) { } future has_relevant_range_on_this_shard() { return do_with(false, ranges.begin(), [this] (bool& found_relevant_range, dht::token_range_vector::iterator& ranges_it) { @@ -221,8 +221,10 @@ future<> stream_transfer_task::execute() { sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id); sort_and_merge_ranges(); auto reason = session->get_reason(); - return session->get_db().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason] (database& db) { - auto si = make_lw_shared(db, stream_session::ms(), plan_id, cf_id, std::move(ranges), id, dst_cpu_id, reason); + return session->get_db().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason] (database& db) mutable { + auto& tbl = db.find_column_family(cf_id); + return db.obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout).then([&db, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason] (reader_permit permit) mutable { + auto si = make_lw_shared(db, stream_session::ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason); return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) { if (!has_relevant_range_on_this_shard) { sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}", @@ -233,6 +235,7 @@ future<> stream_transfer_task::execute() { }).finally([si] { return si->reader.close(); }); + }); }).then([this, plan_id, cf_id, id] { sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id); return session->ms().send_stream_mutation_done(id, plan_id, _ranges, diff --git a/table.cc b/table.cc index c47426aa1c..6cec63b235 100644 --- a/table.cc +++ b/table.cc @@ -236,9 +236,8 @@ sstables::shared_sstable table::make_streaming_sstable_for_write(std::optionalmake_permit(s.get(), "stream-ranges"); auto& slice = s->full_slice(); auto& pc = service::get_local_streaming_priority(); @@ -256,9 +255,8 @@ table::make_streaming_reader(schema_ptr s, return make_flat_multi_range_reader(s, std::move(permit), std::move(source), ranges, slice, pc, nullptr, mutation_reader::forwarding::no); } -flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range, +flat_mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, mutation_reader::forwarding fwd_mr) const { - auto permit = _config.streaming_read_concurrency_semaphore->make_permit(schema.get(), "stream-range"); const auto& pc = service::get_local_streaming_priority(); auto trace_state = tracing::trace_state_ptr(); const auto fwd = streamed_mutation::forwarding::no; @@ -272,9 +270,8 @@ flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht:: return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr); } -flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range, +flat_mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, lw_shared_ptr sstables) const { - auto permit = _config.streaming_read_concurrency_semaphore->make_permit(schema.get(), "load-and-stream"); auto& slice = schema->full_slice(); const auto& pc = service::get_local_streaming_priority(); auto trace_state = tracing::trace_state_ptr();