multishard_mutation_query: pass a valid permit to shard mutation sources

In preparation of a valid permit being required to be passed to all
mutation sources, create a permit before creating the shard readers and
pass it to the mutation source when doing so. The permit is also
persisted in the `shard_mutation_querier` object when saving the reader,
which is another forward looking change, to allow the querier-cache to
use it to obtain the semaphore the read is actually registered with.
This commit is contained in:
Botond Dénes
2020-04-14 17:20:09 +03:00
parent bad53c4245
commit d5ebd763ff
3 changed files with 19 additions and 15 deletions

View File

@@ -98,17 +98,21 @@ class read_context : public reader_lifecycle_policy {
struct reader_meta {
struct remote_parts {
reader_concurrency_semaphore& semaphore;
reader_permit permit;
std::unique_ptr<const dht::partition_range> range;
std::unique_ptr<const query::partition_slice> slice;
utils::phased_barrier::operation read_operation;
explicit remote_parts(reader_concurrency_semaphore& semaphore)
: permit(semaphore.make_permit()) {
}
remote_parts(
reader_concurrency_semaphore& semaphore,
reader_permit permit,
std::unique_ptr<const dht::partition_range> range = nullptr,
std::unique_ptr<const query::partition_slice> slice = nullptr,
utils::phased_barrier::operation read_operation = {})
: semaphore(semaphore)
: permit(std::move(permit))
, range(std::move(range))
, slice(std::move(slice))
, read_operation(std::move(read_operation)) {
@@ -236,7 +240,7 @@ public:
virtual void destroy_reader(shard_id shard, future<stopped_reader> reader_fut) noexcept override;
virtual reader_concurrency_semaphore& semaphore() override {
return _readers[this_shard_id()].rparts->semaphore;
return *_readers[this_shard_id()].rparts->permit.semaphore();
}
future<> lookup_readers();
@@ -300,7 +304,7 @@ flat_mutation_reader read_context::create_reader(
rm.rparts->read_operation = table.read_in_progress();
rm.state = reader_state::used;
return table.as_mutation_source().make_reader(std::move(schema), no_reader_permit(), *rm.rparts->range, *rm.rparts->slice, pc,
return table.as_mutation_source().make_reader(std::move(schema), rm.rparts->permit, *rm.rparts->range, *rm.rparts->slice, pc,
std::move(trace_state));
}
@@ -445,7 +449,7 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las
return _db.invoke_on(shard, [this, shard, query_uuid = _cmd.query_uuid, query_ranges = _ranges, rm = std::exchange(_readers[shard], {}),
&last_pkey, &last_ckey, gts = tracing::global_trace_state_ptr(_trace_state)] (database& db) mutable {
try {
flat_mutation_reader_opt reader = try_resume(rm.rparts->semaphore, std::move(*rm.handle));
flat_mutation_reader_opt reader = try_resume(*rm.rparts->permit.semaphore(), std::move(*rm.handle));
if (!reader) {
return;
@@ -474,6 +478,7 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las
std::move(rm.rparts->range),
std::move(rm.rparts->slice),
std::move(*reader),
std::move(rm.rparts->permit),
last_pkey,
last_ckey);
@@ -518,7 +523,7 @@ future<> read_context::lookup_readers() {
auto handle = pause(semaphore, std::move(q).reader());
return reader_meta(
reader_state::successful_lookup,
reader_meta::remote_parts(semaphore, std::move(q).reader_range(), std::move(q).reader_slice(), table.read_in_progress()),
reader_meta::remote_parts(q.permit(), std::move(q).reader_range(), std::move(q).reader_slice(), table.read_in_progress()),
std::move(handle));
}).then([this, shard] (reader_meta rm) {
_readers[shard] = std::move(rm);

View File

@@ -238,6 +238,7 @@ class shard_mutation_querier {
std::unique_ptr<const dht::partition_range> _reader_range;
std::unique_ptr<const query::partition_slice> _reader_slice;
flat_mutation_reader _reader;
reader_permit _permit;
dht::decorated_key _nominal_pkey;
std::optional<clustering_key_prefix> _nominal_ckey;
@@ -247,12 +248,14 @@ public:
std::unique_ptr<const dht::partition_range> reader_range,
std::unique_ptr<const query::partition_slice> reader_slice,
flat_mutation_reader reader,
reader_permit permit,
dht::decorated_key nominal_pkey,
std::optional<clustering_key_prefix> nominal_ckey)
: _query_ranges(std::move(query_ranges))
, _reader_range(std::move(reader_range))
, _reader_slice(std::move(reader_slice))
, _reader(std::move(reader))
, _permit(std::move(permit))
, _nominal_pkey(std::move(nominal_pkey))
, _nominal_ckey(std::move(nominal_ckey)) {
}
@@ -269,6 +272,10 @@ public:
return _reader.schema();
}
reader_permit& permit() {
return _permit;
}
position_view current_position() const {
return {&_nominal_pkey, _nominal_ckey ? &*_nominal_ckey : nullptr};
}

View File

@@ -111,14 +111,6 @@ private:
template <typename Querier>
Querier make_querier(const dht::partition_range& range) {
if constexpr (std::is_same_v<Querier, query::shard_mutation_querier>) {
return Querier(_mutation_source,
_s.schema(),
range,
_s.schema()->full_slice(),
service::get_local_sstable_query_read_priority(),
nullptr);
}
return Querier(_mutation_source,
_s.schema(),
_sem.make_permit(),