Merge "mutation_reader: multishard_combining_reader clean-up close path" from Botond
"
The close path of the multishard combining reader is riddled with
workarounds the fact that the flat mutation reader couldn't wait on
futures when destroyed. Now that we have a close() method that can do
just that, all these workarounds can be removed.
Even more workarounds can be found in tests, where resources like the
reader concurrency semaphore are created separately for each tested
multishard reader and then destroyed after it doesn't need it, so we had
to come up with all sorts of creative and ugly workarounds to keep
these alive until background cleanup is finished.
This series fixes all this. Now, after calling close on the multishard
reader, all resources it used, including the life-cycle policy, the
semaphores created by it can be safely destroyed. This greatly
simplifies the handling of the multishard reader, and makes it much
easier to reason about life-cycle dependencies.
Tests: unit(dev, release:v2, debug:v2,
mutation_reader_test:debug -t test_multishard,
multishard_mutation_query_test:debug,
multishard_combining_reader_as_mutation_source:debug)
"
* 'multishard-combining-reader-close-cleanup/v3' of https://github.com/denesb/scylla:
mutation_reader: reader_lifecycle_policy: remove convenience methods
mutation_reader: multishard_combining_reader: store shard_reader via unique ptr
test/lib/reader_lifecycle_policy: destroy_reader: cleanup context
test/lib/reader_lifecycle_policy: get rid of lifecycle workarounds
test/lib/reader_lifecycle_policy: destroy_reader(): stop the semaphore
test/lib/reader_lifecycle_policy: use a more robust eviction mechanism
reader_concurrency_semaphore: wait for all permits to be destroyed in stop()
test/lib/reader_lifcecycle_policy: fix indentation
mutation_reader: reader_lifecycle_policy::destroy_reader(): require to be called on native shard
reader_lifecycle_policy implementations: fix indentation
mutation_reader: reader_lifecycle_policy::destroy_reader(): de-futurize reader parameter
mutation_reader: shard_reader::close(): wait on the remote reader
multishard_mutation_query: destroy remote parts in the foreground
mutation_reader: shard_reader::close(): close _reader
mutation_reader: reader_lifcecycle_policy::destroy_reader(): remove out-of-date comment
This commit is contained in:
28
database.cc
28
database.cc
@@ -2057,12 +2057,12 @@ database::stop() {
|
||||
}).then([this] {
|
||||
return _system_sstables_manager->close();
|
||||
}).finally([this] {
|
||||
return when_all_succeed(
|
||||
_read_concurrency_sem.stop(),
|
||||
_streaming_concurrency_sem.stop(),
|
||||
_compaction_concurrency_sem.stop(),
|
||||
_system_read_concurrency_sem.stop()).discard_result().finally([this] {
|
||||
return _querier_cache.stop();
|
||||
return _querier_cache.stop().finally([this] {
|
||||
return when_all_succeed(
|
||||
_read_concurrency_sem.stop(),
|
||||
_streaming_concurrency_sem.stop(),
|
||||
_compaction_concurrency_sem.stop(),
|
||||
_system_read_concurrency_sem.stop()).discard_result();
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2308,15 +2308,13 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
|
||||
return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr);
|
||||
}
|
||||
virtual future<> destroy_reader(shard_id shard, future<stopped_reader> reader_fut) noexcept override {
|
||||
return reader_fut.then([this, zis = shared_from_this(), shard] (stopped_reader&& reader) mutable {
|
||||
return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable {
|
||||
auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(*handle));
|
||||
return reader_opt ? reader_opt->close() : make_ready_future<>();
|
||||
});
|
||||
}).handle_exception([shard] (std::exception_ptr e) {
|
||||
dblog.warn("Failed to destroy shard reader of streaming multishard reader on shard {}: {}", shard, e);
|
||||
});
|
||||
virtual future<> destroy_reader(stopped_reader reader) noexcept override {
|
||||
auto ctx = std::move(_contexts[this_shard_id()]);
|
||||
auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(reader.handle));
|
||||
if (!reader_opt) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return reader_opt->close().finally([ctx = std::move(ctx)] {});
|
||||
}
|
||||
virtual reader_concurrency_semaphore& semaphore() override {
|
||||
const auto shard = this_shard_id();
|
||||
|
||||
@@ -105,34 +105,43 @@ class read_context : public reader_lifecycle_policy {
|
||||
std::unique_ptr<const dht::partition_range> range;
|
||||
std::unique_ptr<const query::partition_slice> slice;
|
||||
utils::phased_barrier::operation read_operation;
|
||||
std::optional<reader_concurrency_semaphore::inactive_read_handle> handle;
|
||||
std::optional<flat_mutation_reader::tracked_buffer> buffer;
|
||||
|
||||
remote_parts(
|
||||
reader_permit permit,
|
||||
std::unique_ptr<const dht::partition_range> range = nullptr,
|
||||
std::unique_ptr<const query::partition_slice> slice = nullptr,
|
||||
utils::phased_barrier::operation read_operation = {})
|
||||
utils::phased_barrier::operation read_operation = {},
|
||||
std::optional<reader_concurrency_semaphore::inactive_read_handle> handle = {})
|
||||
: permit(std::move(permit))
|
||||
, range(std::move(range))
|
||||
, slice(std::move(slice))
|
||||
, read_operation(std::move(read_operation)) {
|
||||
, read_operation(std::move(read_operation))
|
||||
, handle(std::move(handle)) {
|
||||
}
|
||||
};
|
||||
|
||||
reader_state state = reader_state::inexistent;
|
||||
foreign_unique_ptr<remote_parts> rparts;
|
||||
foreign_unique_ptr<reader_concurrency_semaphore::inactive_read_handle> handle;
|
||||
std::optional<flat_mutation_reader::tracked_buffer> buffer;
|
||||
std::optional<flat_mutation_reader::tracked_buffer> dismantled_buffer;
|
||||
|
||||
reader_meta() = default;
|
||||
|
||||
// Remote constructor.
|
||||
reader_meta(reader_state s, std::optional<remote_parts> rp = {}, reader_concurrency_semaphore::inactive_read_handle h = {})
|
||||
: state(s)
|
||||
, handle(make_foreign(std::make_unique<reader_concurrency_semaphore::inactive_read_handle>(std::move(h)))) {
|
||||
reader_meta(reader_state s, std::optional<remote_parts> rp = {})
|
||||
: state(s) {
|
||||
if (rp) {
|
||||
rparts = make_foreign(std::make_unique<remote_parts>(std::move(*rp)));
|
||||
}
|
||||
}
|
||||
|
||||
flat_mutation_reader::tracked_buffer& get_dismantled_buffer(const reader_permit& permit) {
|
||||
if (!dismantled_buffer) {
|
||||
dismantled_buffer.emplace(permit);
|
||||
}
|
||||
return *dismantled_buffer;
|
||||
}
|
||||
};
|
||||
|
||||
struct dismantle_buffer_stats {
|
||||
@@ -203,8 +212,6 @@ class read_context : public reader_lifecycle_policy {
|
||||
std::vector<reader_meta> _readers;
|
||||
std::vector<reader_concurrency_semaphore*> _semaphores;
|
||||
|
||||
gate _dismantling_gate;
|
||||
|
||||
static std::string_view reader_state_to_string(reader_state rs);
|
||||
|
||||
dismantle_buffer_stats dismantle_combined_buffer(flat_mutation_reader::tracked_buffer combined_buffer, const dht::decorated_key& pkey);
|
||||
@@ -247,7 +254,7 @@ public:
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr) override;
|
||||
|
||||
virtual future<> destroy_reader(shard_id shard, future<stopped_reader> reader_fut) noexcept override;
|
||||
virtual future<> destroy_reader(stopped_reader reader) noexcept override;
|
||||
|
||||
virtual reader_concurrency_semaphore& semaphore() override {
|
||||
const auto shard = this_shard_id();
|
||||
@@ -302,7 +309,7 @@ flat_mutation_reader read_context::create_reader(
|
||||
|
||||
// The reader is either in inexistent or successful lookup state.
|
||||
if (rm.state == reader_state::successful_lookup) {
|
||||
if (auto reader_opt = try_resume(std::move(*rm.handle))) {
|
||||
if (auto reader_opt = semaphore().unregister_inactive_read(std::move(*rm.rparts->handle))) {
|
||||
rm.state = reader_state::used;
|
||||
return std::move(*reader_opt);
|
||||
}
|
||||
@@ -323,47 +330,37 @@ flat_mutation_reader read_context::create_reader(
|
||||
std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
|
||||
}
|
||||
|
||||
future<> read_context::destroy_reader(shard_id shard, future<stopped_reader> reader_fut) noexcept {
|
||||
// Future is waited on indirectly in `stop()` (via `_dismantling_gate`).
|
||||
return with_gate(_dismantling_gate, [this, shard, reader_fut = std::move(reader_fut)] () mutable {
|
||||
return reader_fut.then_wrapped([this, shard] (future<stopped_reader>&& reader_fut) {
|
||||
auto& rm = _readers[shard];
|
||||
future<> read_context::destroy_reader(stopped_reader reader) noexcept {
|
||||
auto& rm = _readers[this_shard_id()];
|
||||
|
||||
if (reader_fut.failed()) {
|
||||
mmq_log.debug("Failed to stop reader on shard {}: {}", shard, reader_fut.get_exception());
|
||||
++_db.local().get_stats().multishard_query_failed_reader_stops;
|
||||
rm.state = reader_state::inexistent;
|
||||
return;
|
||||
}
|
||||
|
||||
auto reader = reader_fut.get0();
|
||||
if (rm.state == reader_state::used) {
|
||||
rm.state = reader_state::saving;
|
||||
rm.handle = std::move(reader.handle);
|
||||
rm.buffer = std::move(reader.unconsumed_fragments);
|
||||
} else {
|
||||
mmq_log.warn(
|
||||
"Unexpected request to dismantle reader in state `{}` for shard {}."
|
||||
" Reader was not created nor is in the process of being created.",
|
||||
reader_state_to_string(rm.state),
|
||||
shard);
|
||||
}
|
||||
});
|
||||
});
|
||||
if (rm.state == reader_state::used) {
|
||||
rm.state = reader_state::saving;
|
||||
rm.rparts->handle = std::move(reader.handle);
|
||||
rm.rparts->buffer = std::move(reader.unconsumed_fragments);
|
||||
} else {
|
||||
mmq_log.warn(
|
||||
"Unexpected request to dismantle reader in state `{}`."
|
||||
" Reader was not created nor is in the process of being created.",
|
||||
reader_state_to_string(rm.state));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> read_context::stop() {
|
||||
auto gate_fut = _dismantling_gate.is_closed() ? make_ready_future<>() : _dismantling_gate.close();
|
||||
return gate_fut.then([this] {
|
||||
return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) {
|
||||
if (_readers[shard].handle && *_readers[shard].handle) {
|
||||
return _db.invoke_on(shard, [rm = std::move(_readers[shard])] (database& db) mutable {
|
||||
auto reader_opt = rm.rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle));
|
||||
return reader_opt ? reader_opt->close() : make_ready_future<>();
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) {
|
||||
if (_readers[shard].rparts) {
|
||||
return _db.invoke_on(shard, [&rparts_fptr = _readers[shard].rparts] (database& db) mutable {
|
||||
auto rparts = rparts_fptr.release();
|
||||
if (rparts->handle) {
|
||||
auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rparts->handle));
|
||||
if (reader_opt) {
|
||||
return reader_opt->close().then([rparts = std::move(rparts)] { });
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -392,7 +389,7 @@ read_context::dismantle_buffer_stats read_context::dismantle_combined_buffer(fla
|
||||
continue;
|
||||
}
|
||||
|
||||
auto& shard_buffer = *_readers[shard].buffer;
|
||||
auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit);
|
||||
for (auto& smf : tmp_buffer) {
|
||||
stats.add(smf);
|
||||
shard_buffer.emplace_front(std::move(smf));
|
||||
@@ -406,7 +403,7 @@ read_context::dismantle_buffer_stats read_context::dismantle_combined_buffer(fla
|
||||
}
|
||||
|
||||
const auto shard = sharder.shard_of(pkey.token());
|
||||
auto& shard_buffer = *_readers[shard].buffer;
|
||||
auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit);
|
||||
for (auto& smf : tmp_buffer) {
|
||||
stats.add(smf);
|
||||
shard_buffer.emplace_front(std::move(smf));
|
||||
@@ -434,7 +431,7 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de
|
||||
return stats;
|
||||
}
|
||||
|
||||
auto& shard_buffer = *_readers[shard].buffer;
|
||||
auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit);
|
||||
|
||||
for (auto& rt : compaction_state.range_tombstones | boost::adaptors::reversed) {
|
||||
stats.add(*_schema, rt);
|
||||
@@ -458,22 +455,32 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las
|
||||
&last_pkey, &last_ckey, gts = tracing::global_trace_state_ptr(_trace_state)] (database& db) mutable {
|
||||
try {
|
||||
auto rparts = rm.rparts.release(); // avoid another round-trip when destroying rparts
|
||||
flat_mutation_reader_opt reader = rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle));
|
||||
flat_mutation_reader_opt reader = rparts->permit.semaphore().unregister_inactive_read(std::move(*rparts->handle));
|
||||
|
||||
if (!reader) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto& buffer = *rm.buffer;
|
||||
const auto fragments = buffer.size();
|
||||
size_t fragments = 0;
|
||||
const auto size_before = reader->buffer_size();
|
||||
const auto& schema = *reader->schema();
|
||||
|
||||
auto rit = std::reverse_iterator(buffer.cend());
|
||||
auto rend = std::reverse_iterator(buffer.cbegin());
|
||||
auto& schema = *reader->schema();
|
||||
for (;rit != rend; ++rit) {
|
||||
// Copy the fragment, the buffer is on another shard.
|
||||
reader->unpop_mutation_fragment(mutation_fragment(schema, rparts->permit, *rit));
|
||||
if (rparts->buffer) {
|
||||
fragments += rparts->buffer->size();
|
||||
auto rit = std::reverse_iterator(rparts->buffer->end());
|
||||
auto rend = std::reverse_iterator(rparts->buffer->begin());
|
||||
for (; rit != rend; ++rit) {
|
||||
reader->unpop_mutation_fragment(std::move(*rit));
|
||||
}
|
||||
}
|
||||
if (rm.dismantled_buffer) {
|
||||
fragments += rm.dismantled_buffer->size();
|
||||
auto rit = std::reverse_iterator(rm.dismantled_buffer->cend());
|
||||
auto rend = std::reverse_iterator(rm.dismantled_buffer->cbegin());
|
||||
for (; rit != rend; ++rit) {
|
||||
// Copy the fragment, the buffer is on another shard.
|
||||
reader->unpop_mutation_fragment(mutation_fragment(schema, rparts->permit, *rit));
|
||||
}
|
||||
}
|
||||
|
||||
const auto size_after = reader->buffer_size();
|
||||
@@ -538,11 +545,11 @@ future<> read_context::lookup_readers() {
|
||||
reinterpret_cast<uintptr_t>(&semaphore)));
|
||||
}
|
||||
|
||||
auto handle = pause(semaphore, std::move(q).reader());
|
||||
auto handle = semaphore.register_inactive_read(std::move(q).reader());
|
||||
return reader_meta(
|
||||
reader_state::successful_lookup,
|
||||
reader_meta::remote_parts(q.permit(), std::move(q).reader_range(), std::move(q).reader_slice(), table.read_in_progress()),
|
||||
std::move(handle));
|
||||
reader_meta::remote_parts(q.permit(), std::move(q).reader_range(), std::move(q).reader_slice(), table.read_in_progress(),
|
||||
std::move(handle)));
|
||||
}).then([this, shard] (reader_meta rm) {
|
||||
_readers[shard] = std::move(rm);
|
||||
});
|
||||
@@ -555,33 +562,23 @@ future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsu
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return _dismantling_gate.close().then([this, unconsumed_buffer = std::move(unconsumed_buffer), compaction_state = std::move(compaction_state),
|
||||
last_ckey = std::move(last_ckey)] () mutable {
|
||||
auto last_pkey = compaction_state.partition_start.key();
|
||||
auto last_pkey = compaction_state.partition_start.key();
|
||||
|
||||
// Ensure all readers have engaged reader_meta::buffer member.
|
||||
for (auto& rm : _readers) {
|
||||
if (!rm.buffer) {
|
||||
rm.buffer.emplace(_permit);
|
||||
const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey);
|
||||
tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats);
|
||||
|
||||
const auto cs_stats = dismantle_compaction_state(std::move(compaction_state));
|
||||
tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats);
|
||||
|
||||
return do_with(std::move(last_pkey), std::move(last_ckey), [this] (const dht::decorated_key& last_pkey,
|
||||
const std::optional<clustering_key_prefix>& last_ckey) {
|
||||
return parallel_for_each(boost::irange(0u, smp::count), [this, &last_pkey, &last_ckey] (shard_id shard) {
|
||||
auto& rm = _readers[shard];
|
||||
if (rm.state == reader_state::successful_lookup || rm.state == reader_state::saving) {
|
||||
return save_reader(shard, last_pkey, last_ckey);
|
||||
}
|
||||
}
|
||||
|
||||
const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey);
|
||||
tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats);
|
||||
|
||||
const auto cs_stats = dismantle_compaction_state(std::move(compaction_state));
|
||||
tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats);
|
||||
|
||||
return do_with(std::move(last_pkey), std::move(last_ckey), [this] (const dht::decorated_key& last_pkey,
|
||||
const std::optional<clustering_key_prefix>& last_ckey) {
|
||||
return parallel_for_each(boost::irange(0u, smp::count), [this, &last_pkey, &last_ckey] (shard_id shard) {
|
||||
auto& rm = _readers[shard];
|
||||
if (rm.state == reader_state::successful_lookup || rm.state == reader_state::saving) {
|
||||
return save_reader(shard, last_pkey, last_ckey);
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1571,7 +1571,7 @@ namespace {
|
||||
// Although it implements the flat_mutation_reader:impl interface it cannot be
|
||||
// wrapped into a flat_mutation_reader, as it needs to be managed by a shared
|
||||
// pointer.
|
||||
class shard_reader : public enable_lw_shared_from_this<shard_reader>, public flat_mutation_reader::impl {
|
||||
class shard_reader : public flat_mutation_reader::impl {
|
||||
private:
|
||||
shared_ptr<reader_lifecycle_policy> _lifecycle_policy;
|
||||
const unsigned _shard;
|
||||
@@ -1634,29 +1634,35 @@ future<> shard_reader::close() noexcept {
|
||||
// Nothing to do if there was no reader created, nor is there a background
|
||||
// read ahead in progress which will create one.
|
||||
if (!_reader && !_read_ahead) {
|
||||
return make_ready_future<>();
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto f = _read_ahead ? *std::exchange(_read_ahead, std::nullopt) : make_ready_future<>();
|
||||
try {
|
||||
if (_read_ahead) {
|
||||
co_await *std::exchange(_read_ahead, std::nullopt);
|
||||
}
|
||||
|
||||
// TODO: return future upstream as part of close()
|
||||
return _lifecycle_policy->destroy_reader(_shard, f.then([this] {
|
||||
return smp::submit_to(_shard, [this] {
|
||||
auto ret = std::tuple(
|
||||
make_foreign(std::make_unique<reader_concurrency_semaphore::inactive_read_handle>(std::move(*_reader).inactive_read_handle())),
|
||||
make_foreign(std::make_unique<const flat_mutation_reader::tracked_buffer>(_reader->detach_buffer())));
|
||||
_reader.reset();
|
||||
return ret;
|
||||
}).then([this] (std::tuple<foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>>,
|
||||
foreign_ptr<std::unique_ptr<const flat_mutation_reader::tracked_buffer>>> remains) {
|
||||
auto&& [irh, remote_buffer] = remains;
|
||||
auto buffer = detach_buffer();
|
||||
for (const auto& mf : *remote_buffer) {
|
||||
buffer.emplace_back(*_schema, _permit, mf); // we are copying from the remote shard.
|
||||
}
|
||||
return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer)};
|
||||
co_await smp::submit_to(_shard, [this] {
|
||||
auto irh = std::move(*_reader).inactive_read_handle();
|
||||
return with_closeable(flat_mutation_reader(_reader.release()), [this] (flat_mutation_reader& reader) mutable {
|
||||
auto permit = reader.permit();
|
||||
const auto& schema = *reader.schema();
|
||||
|
||||
auto unconsumed_fragments = reader.detach_buffer();
|
||||
auto rit = std::reverse_iterator(buffer().cend());
|
||||
auto rend = std::reverse_iterator(buffer().cbegin());
|
||||
for (; rit != rend; ++rit) {
|
||||
unconsumed_fragments.emplace_front(schema, permit, *rit); // we are copying from the remote shard.
|
||||
}
|
||||
|
||||
return unconsumed_fragments;
|
||||
}).then([this, irh = std::move(irh)] (flat_mutation_reader::tracked_buffer&& buf) mutable {
|
||||
return _lifecycle_policy->destroy_reader({std::move(irh), std::move(buf)});
|
||||
});
|
||||
});
|
||||
}).finally([zis = shared_from_this()] {}));
|
||||
} catch (...) {
|
||||
mrlog.error("shard_reader::close(): failed to stop reader on shard {}: {}", _shard, std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
@@ -1701,7 +1707,7 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
});
|
||||
}
|
||||
|
||||
return fill_buf_fut.then([this, zis = shared_from_this()] (remote_fill_buffer_result res) mutable {
|
||||
return fill_buf_fut.then([this] (remote_fill_buffer_result res) mutable {
|
||||
_end_of_stream = res.end_of_stream;
|
||||
for (const auto& mf : *res.buffer) {
|
||||
push_mutation_fragment(mutation_fragment(*_schema, _permit, mf));
|
||||
@@ -1782,7 +1788,7 @@ class multishard_combining_reader : public flat_mutation_reader::impl {
|
||||
};
|
||||
|
||||
const dht::sharder& _sharder;
|
||||
std::vector<lw_shared_ptr<shard_reader>> _shard_readers;
|
||||
std::vector<std::unique_ptr<shard_reader>> _shard_readers;
|
||||
// Contains the position of each shard with token granularity, organized
|
||||
// into a min-heap. Used to select the shard with the smallest token each
|
||||
// time a shard reader produces a new partition.
|
||||
@@ -1911,7 +1917,7 @@ multishard_combining_reader::multishard_combining_reader(
|
||||
|
||||
_shard_readers.reserve(_sharder.shard_count());
|
||||
for (unsigned i = 0; i < _sharder.shard_count(); ++i) {
|
||||
_shard_readers.emplace_back(make_lw_shared<shard_reader>(_schema, _permit, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr));
|
||||
_shard_readers.emplace_back(std::make_unique<shard_reader>(_schema, _permit, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1946,7 +1952,7 @@ future<> multishard_combining_reader::fast_forward_to(const dht::partition_range
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
on_partition_range_change(pr);
|
||||
return parallel_for_each(_shard_readers, [&pr, timeout] (lw_shared_ptr<shard_reader>& sr) {
|
||||
return parallel_for_each(_shard_readers, [&pr, timeout] (std::unique_ptr<shard_reader>& sr) {
|
||||
return sr->fast_forward_to(pr, timeout);
|
||||
});
|
||||
}
|
||||
@@ -1956,27 +1962,11 @@ future<> multishard_combining_reader::fast_forward_to(position_range pr, db::tim
|
||||
}
|
||||
|
||||
future<> multishard_combining_reader::close() noexcept {
|
||||
auto shard_readers = std::move(_shard_readers);
|
||||
return parallel_for_each(shard_readers, [] (lw_shared_ptr<shard_reader>& sr) {
|
||||
return parallel_for_each(_shard_readers, [] (std::unique_ptr<shard_reader>& sr) {
|
||||
return sr->close();
|
||||
});
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::inactive_read_handle
|
||||
reader_lifecycle_policy::pause(reader_concurrency_semaphore& sem, flat_mutation_reader reader) {
|
||||
return sem.register_inactive_read(std::move(reader));
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::inactive_read_handle
|
||||
reader_lifecycle_policy::pause(flat_mutation_reader reader) {
|
||||
return pause(semaphore(), std::move(reader));
|
||||
}
|
||||
|
||||
flat_mutation_reader_opt
|
||||
reader_lifecycle_policy::try_resume(reader_concurrency_semaphore::inactive_read_handle irh) {
|
||||
return semaphore().unregister_inactive_read(std::move(irh));
|
||||
}
|
||||
|
||||
flat_mutation_reader make_multishard_combining_reader(
|
||||
shared_ptr<reader_lifecycle_policy> lifecycle_policy,
|
||||
schema_ptr schema,
|
||||
|
||||
@@ -490,15 +490,10 @@ std::pair<flat_mutation_reader, evictable_reader_handle> make_manually_paused_ev
|
||||
class reader_lifecycle_policy {
|
||||
public:
|
||||
struct stopped_reader {
|
||||
foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>> handle;
|
||||
reader_concurrency_semaphore::inactive_read_handle handle;
|
||||
flat_mutation_reader::tracked_buffer unconsumed_fragments;
|
||||
};
|
||||
|
||||
protected:
|
||||
// Helpers for implementations, who might wish to provide the semaphore in
|
||||
// other ways than through the official `semaphore()` override.
|
||||
static reader_concurrency_semaphore::inactive_read_handle pause(reader_concurrency_semaphore& sem, flat_mutation_reader reader);
|
||||
|
||||
public:
|
||||
/// Create an appropriate reader on the shard it is called on.
|
||||
///
|
||||
@@ -516,21 +511,16 @@ public:
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr) = 0;
|
||||
|
||||
/// Wait on the shard reader to stop then destroy it.
|
||||
/// Destroy the shard reader.
|
||||
///
|
||||
/// Will be called when the multishard reader is being destroyed. It will be
|
||||
/// called for each of the shard readers. The future resolves when the
|
||||
/// reader is stopped, that is it, finishes all background and/or pending
|
||||
/// work.
|
||||
/// called for each of the shard readers.
|
||||
/// This method is expected to do a proper cleanup, that is, leave any gates,
|
||||
/// release any locks or whatever is appropriate for the shard reader.
|
||||
///
|
||||
/// The multishard reader couldn't wait on any future returned from this
|
||||
/// method (as it will be called from the destructor) so waiting on
|
||||
/// all the readers being cleaned up is up to the implementation.
|
||||
///
|
||||
/// This method has to be called on the shard the reader lives on.
|
||||
/// This method will be called from a destructor so it cannot throw.
|
||||
virtual future<> destroy_reader(shard_id shard, future<stopped_reader> reader) noexcept = 0;
|
||||
virtual future<> destroy_reader(stopped_reader reader) noexcept = 0;
|
||||
|
||||
/// Get the relevant semaphore for this read.
|
||||
///
|
||||
@@ -543,26 +533,6 @@ public:
|
||||
///
|
||||
/// This method will be called on the shard where the relevant reader lives.
|
||||
virtual reader_concurrency_semaphore& semaphore() = 0;
|
||||
|
||||
/// Pause the reader.
|
||||
///
|
||||
/// The purpose of pausing a reader is making it evictable while it is
|
||||
/// otherwise inactive. This allows freeing up resources that are in-demand
|
||||
/// by evicting these paused readers. Most notably, this allows freeing up
|
||||
/// reader permits when the node is overloaded with reads.
|
||||
/// This is just a helper method, it uses the semaphore returned by
|
||||
/// `semaphore()` for the actual pausing.
|
||||
/// \see semaphore()
|
||||
reader_concurrency_semaphore::inactive_read_handle pause(flat_mutation_reader reader);
|
||||
|
||||
/// Try to resume the reader.
|
||||
///
|
||||
/// The optional returned will be disengaged when resuming fails. This can
|
||||
/// happen if the reader was evicted while paused.
|
||||
/// This is just a helper method, it uses the semaphore returned by
|
||||
/// `semaphore()` for the actual pausing.
|
||||
/// \see semaphore()
|
||||
flat_mutation_reader_opt try_resume(reader_concurrency_semaphore::inactive_read_handle irh);
|
||||
};
|
||||
|
||||
/// Make a multishard_combining_reader.
|
||||
|
||||
@@ -87,13 +87,17 @@ public:
|
||||
: _semaphore(semaphore)
|
||||
, _schema(schema)
|
||||
, _op_name_view(op_name)
|
||||
{ }
|
||||
{
|
||||
_semaphore.on_permit_created(*this);
|
||||
}
|
||||
impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name)
|
||||
: _semaphore(semaphore)
|
||||
, _schema(schema)
|
||||
, _op_name(std::move(op_name))
|
||||
, _op_name_view(_op_name)
|
||||
{ }
|
||||
{
|
||||
_semaphore.on_permit_created(*this);
|
||||
}
|
||||
~impl() {
|
||||
if (_resources) {
|
||||
on_internal_error_noexcept(rcslog, format("reader_permit::impl::~impl(): permit {} detected a leak of {{count={}, memory={}}} resources",
|
||||
@@ -102,6 +106,8 @@ public:
|
||||
_resources.memory));
|
||||
signal(_resources);
|
||||
}
|
||||
|
||||
_semaphore.on_permit_destroyed(*this);
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore& semaphore() {
|
||||
@@ -167,13 +173,11 @@ struct reader_concurrency_semaphore::permit_list {
|
||||
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name)
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, schema, op_name))
|
||||
{
|
||||
semaphore._permit_list->permits.push_back(*_impl);
|
||||
}
|
||||
|
||||
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name)
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, schema, std::move(op_name)))
|
||||
{
|
||||
semaphore._permit_list->permits.push_back(*_impl);
|
||||
}
|
||||
|
||||
void reader_permit::on_waiting() {
|
||||
@@ -504,6 +508,7 @@ future<> reader_concurrency_semaphore::stop() noexcept {
|
||||
_stopped = true;
|
||||
clear_inactive_reads();
|
||||
co_await _close_readers_gate.close();
|
||||
co_await _permit_gate.close();
|
||||
broken(std::make_exception_ptr(stopped_exception()));
|
||||
co_return;
|
||||
}
|
||||
@@ -600,6 +605,16 @@ future<reader_permit::resource_units> reader_concurrency_semaphore::do_wait_admi
|
||||
return fut;
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) noexcept {
|
||||
_permit_list->permits.push_back(permit);
|
||||
_permit_gate.enter();
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::on_permit_destroyed(reader_permit::impl& permit) noexcept {
|
||||
permit.unlink();
|
||||
_permit_gate.leave();
|
||||
}
|
||||
|
||||
reader_permit reader_concurrency_semaphore::make_permit(const schema* const schema, const char* const op_name) {
|
||||
return reader_permit(*this, schema, std::string_view(op_name));
|
||||
}
|
||||
|
||||
@@ -173,6 +173,7 @@ private:
|
||||
std::unique_ptr<permit_list> _permit_list;
|
||||
bool _stopped = false;
|
||||
gate _close_readers_gate;
|
||||
gate _permit_gate;
|
||||
|
||||
private:
|
||||
[[nodiscard]] flat_mutation_reader detach_inactive_reader(inactive_read&, evict_reason reason) noexcept;
|
||||
@@ -186,6 +187,9 @@ private:
|
||||
void evict_readers_in_background();
|
||||
future<reader_permit::resource_units> do_wait_admission(reader_permit permit, size_t memory, db::timeout_clock::time_point timeout);
|
||||
|
||||
void on_permit_created(reader_permit::impl&) noexcept;
|
||||
void on_permit_destroyed(reader_permit::impl&) noexcept;
|
||||
|
||||
std::runtime_error stopped_exception();
|
||||
|
||||
// closes reader in the background.
|
||||
|
||||
@@ -51,8 +51,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
|
||||
|
||||
// It has to be a container that does not invalidate pointers
|
||||
std::list<dummy_sharder> keep_alive_sharder;
|
||||
test_reader_lifecycle_policy::operations_gate operations_gate;
|
||||
test_reader_lifecycle_policy::semaphore_registry semaphore_registry;
|
||||
|
||||
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
|
||||
auto make_populate = [&] (bool evict_paused_readers, bool single_fragment_buffer) {
|
||||
@@ -109,7 +107,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
|
||||
return reader;
|
||||
};
|
||||
|
||||
auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate, semaphore_registry, evict_paused_readers);
|
||||
auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), evict_paused_readers);
|
||||
auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s,
|
||||
tests::make_permit(), range, slice, pc, trace_state, fwd_mr);
|
||||
if (fwd_sm == streamed_mutation::forwarding::yes) {
|
||||
@@ -129,6 +127,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
|
||||
testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)");
|
||||
run_mutation_source_tests(make_populate(true, true));
|
||||
|
||||
return operations_gate.close();
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -1781,9 +1781,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) {
|
||||
return;
|
||||
}
|
||||
|
||||
test_reader_lifecycle_policy::operations_gate operations_gate;
|
||||
test_reader_lifecycle_policy::semaphore_registry semaphore_registry;
|
||||
|
||||
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
|
||||
std::vector<std::atomic<bool>> shards_touched(smp::count);
|
||||
simple_schema s;
|
||||
@@ -1799,7 +1796,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) {
|
||||
};
|
||||
|
||||
assert_that(make_multishard_combining_reader(
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate, semaphore_registry),
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
|
||||
s.schema(),
|
||||
tests::make_permit(),
|
||||
query::full_partition_range,
|
||||
@@ -1811,7 +1808,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) {
|
||||
BOOST_REQUIRE(shards_touched.at(i));
|
||||
}
|
||||
|
||||
return operations_gate.close();
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -2010,8 +2007,7 @@ struct multishard_reader_for_read_ahead {
|
||||
std::unique_ptr<dht::partition_range> pr;
|
||||
};
|
||||
|
||||
multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(simple_schema& s, test_reader_lifecycle_policy::operations_gate& operations_gate,
|
||||
test_reader_lifecycle_policy::semaphore_registry& semaphore_registry) {
|
||||
multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(simple_schema& s) {
|
||||
auto remote_controls = std::vector<foreign_ptr<std::unique_ptr<puppet_reader::control>>>();
|
||||
remote_controls.reserve(smp::count);
|
||||
for (unsigned i = 0; i < smp::count; ++i) {
|
||||
@@ -2068,7 +2064,7 @@ multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(s
|
||||
dht::ring_position::ending_at(pkeys_by_tokens.rbegin()->first)));
|
||||
|
||||
auto sharder = std::make_unique<dummy_sharder>(s.schema()->get_sharder(), std::move(pkeys_by_tokens));
|
||||
auto reader = make_multishard_combining_reader_for_tests(*sharder, seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate, semaphore_registry),
|
||||
auto reader = make_multishard_combining_reader_for_tests(*sharder, seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
|
||||
s.schema(), tests::make_permit(), *pr, s.schema()->full_slice(), service::get_local_sstable_query_read_priority());
|
||||
|
||||
return {std::move(reader), std::move(sharder), std::move(remote_controls), std::move(pr)};
|
||||
@@ -2082,8 +2078,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) {
|
||||
}
|
||||
|
||||
auto no_shards = smp::count - 1;
|
||||
test_reader_lifecycle_policy::operations_gate operations_gate;
|
||||
test_reader_lifecycle_policy::semaphore_registry semaphore_registry;
|
||||
|
||||
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
|
||||
std::vector<std::atomic<bool>> shards_touched(smp::count);
|
||||
@@ -2102,7 +2096,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) {
|
||||
|
||||
assert_that(make_multishard_combining_reader_for_tests(
|
||||
*sharder,
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate, semaphore_registry),
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
|
||||
s.schema(),
|
||||
tests::make_permit(),
|
||||
query::full_partition_range,
|
||||
@@ -2115,7 +2109,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) {
|
||||
}
|
||||
BOOST_REQUIRE(!shards_touched[no_shards]);
|
||||
|
||||
return operations_gate.close();
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -2126,9 +2120,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed
|
||||
return;
|
||||
}
|
||||
|
||||
test_reader_lifecycle_policy::operations_gate operations_gate;
|
||||
test_reader_lifecycle_policy::semaphore_registry semaphore_registry;
|
||||
|
||||
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
|
||||
std::vector<std::atomic<bool>> shards_touched(smp::count);
|
||||
simple_schema s;
|
||||
@@ -2171,7 +2162,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed
|
||||
inclusive_end ? "inclusive" : "exclusive");
|
||||
|
||||
assert_that(make_multishard_combining_reader(
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate, semaphore_registry),
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
|
||||
s.schema(),
|
||||
tests::make_permit(),
|
||||
pr,
|
||||
@@ -2184,7 +2175,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed
|
||||
BOOST_CHECK(shards_touched[i] == expected_shards_touched[i]);
|
||||
}
|
||||
|
||||
return operations_gate.close();
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -2218,13 +2209,10 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending
|
||||
return;
|
||||
}
|
||||
|
||||
test_reader_lifecycle_policy::operations_gate operations_gate;
|
||||
test_reader_lifecycle_policy::semaphore_registry semaphore_registry;
|
||||
|
||||
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
|
||||
auto s = simple_schema();
|
||||
|
||||
auto reader_sharder_remote_controls__ = prepare_multishard_reader_for_read_ahead_test(s, operations_gate, semaphore_registry);
|
||||
auto reader_sharder_remote_controls__ = prepare_multishard_reader_for_read_ahead_test(s);
|
||||
auto&& reader = reader_sharder_remote_controls__.reader;
|
||||
auto&& sharder = reader_sharder_remote_controls__.sharder;
|
||||
auto&& remote_controls = reader_sharder_remote_controls__.remote_controls;
|
||||
@@ -2269,7 +2257,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending
|
||||
std::logical_and<bool>()).get0();
|
||||
}));
|
||||
|
||||
return operations_gate.close();
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -2279,13 +2267,10 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_fast_forwarded_with_pe
|
||||
return;
|
||||
}
|
||||
|
||||
test_reader_lifecycle_policy::operations_gate operations_gate;
|
||||
test_reader_lifecycle_policy::semaphore_registry semaphore_registry;
|
||||
|
||||
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
|
||||
auto s = simple_schema();
|
||||
|
||||
auto reader_sharder_remote_controls_pr = prepare_multishard_reader_for_read_ahead_test(s, operations_gate, semaphore_registry);
|
||||
auto reader_sharder_remote_controls_pr = prepare_multishard_reader_for_read_ahead_test(s);
|
||||
auto&& reader = reader_sharder_remote_controls_pr.reader;
|
||||
auto&& sharder = reader_sharder_remote_controls_pr.sharder;
|
||||
auto&& remote_controls = reader_sharder_remote_controls_pr.remote_controls;
|
||||
@@ -2348,14 +2333,11 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_fast_forwarded_with_pe
|
||||
std::logical_and<bool>()).get0();
|
||||
}));
|
||||
|
||||
return operations_gate.close();
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) {
|
||||
test_reader_lifecycle_policy::operations_gate operations_gate;
|
||||
test_reader_lifecycle_policy::semaphore_registry semaphore_registry;
|
||||
|
||||
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
|
||||
env.execute_cql("CREATE KEYSPACE multishard_combining_reader_next_partition_ks"
|
||||
" WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get();
|
||||
@@ -2407,7 +2389,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) {
|
||||
return reader;
|
||||
};
|
||||
auto reader = make_multishard_combining_reader(
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate, semaphore_registry),
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
|
||||
schema,
|
||||
tests::make_permit(),
|
||||
query::full_partition_range,
|
||||
@@ -2428,7 +2410,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) {
|
||||
}
|
||||
assertions.produces_end_of_stream();
|
||||
|
||||
return operations_gate.close();
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -2516,10 +2498,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic
|
||||
BOOST_REQUIRE(mf.as_clustering_row().key().equal(*s.schema(), ckey));
|
||||
}
|
||||
|
||||
test_reader_lifecycle_policy::operations_gate operations_gate;
|
||||
test_reader_lifecycle_policy::semaphore_registry semaphore_registry;
|
||||
|
||||
do_with_cql_env_thread([=, &operations_gate, &semaphore_registry, s = std::move(s)] (cql_test_env& env) mutable -> future<> {
|
||||
do_with_cql_env_thread([=, s = std::move(s)] (cql_test_env& env) mutable -> future<> {
|
||||
auto factory = [=, gs = global_simple_schema(s)] (
|
||||
schema_ptr,
|
||||
const dht::partition_range& range,
|
||||
@@ -2545,7 +2524,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic
|
||||
BOOST_REQUIRE(mut_opt);
|
||||
|
||||
assert_that(make_multishard_combining_reader(
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate, semaphore_registry, true),
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), true),
|
||||
s.schema(),
|
||||
tests::make_permit(),
|
||||
query::full_partition_range,
|
||||
@@ -2553,7 +2532,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic
|
||||
service::get_local_sstable_query_read_priority()))
|
||||
.produces_partition(*mut_opt);
|
||||
|
||||
return operations_gate.close();
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -2567,9 +2546,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
|
||||
return;
|
||||
}
|
||||
|
||||
test_reader_lifecycle_policy::operations_gate operations_gate;
|
||||
test_reader_lifecycle_policy::semaphore_registry semaphore_registry;
|
||||
|
||||
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
|
||||
env.execute_cql("CREATE KEYSPACE multishard_streaming_reader_ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get();
|
||||
env.execute_cql("CREATE TABLE multishard_streaming_reader_ks.test (pk int, v int, PRIMARY KEY(pk));").get();
|
||||
@@ -2613,7 +2589,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
|
||||
streamed_mutation::forwarding::no, fwd_mr);
|
||||
};
|
||||
auto reference_reader = make_filtering_reader(
|
||||
make_multishard_combining_reader(seastar::make_shared<test_reader_lifecycle_policy>(std::move(reader_factory), operations_gate, semaphore_registry),
|
||||
make_multishard_combining_reader(seastar::make_shared<test_reader_lifecycle_policy>(std::move(reader_factory)),
|
||||
schema, tests::make_permit(), partition_range, schema->full_slice(), service::get_local_sstable_query_read_priority()),
|
||||
[&remote_partitioner] (const dht::decorated_key& pkey) {
|
||||
return remote_partitioner.shard_of(pkey.token()) == 0;
|
||||
@@ -2638,7 +2614,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
|
||||
assert_that(tested_muts[i]).is_equal_to(reference_muts[i]);
|
||||
}
|
||||
|
||||
return operations_gate.close();
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
|
||||
|
||||
@@ -126,8 +126,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
|
||||
|
||||
auto permit = semaphore.make_permit(s.schema().get(), get_name());
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
auto permit = semaphore.make_permit(s.schema().get(), get_name());
|
||||
|
||||
std::optional<reader_permit::resource_units> residue_units;
|
||||
|
||||
@@ -552,3 +552,28 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) {
|
||||
testlog.info("With max-lines=4: {}", semaphore.dump_diagnostics(4));
|
||||
testlog.info("With no max-lines: {}", semaphore.dump_diagnostics(0));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits) {
|
||||
BOOST_TEST_MESSAGE("0 permits");
|
||||
{
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
// Test will fail by timing out.
|
||||
semaphore.stop().get();
|
||||
}
|
||||
|
||||
BOOST_TEST_MESSAGE("1 permit");
|
||||
{
|
||||
auto semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
auto permit = std::make_unique<reader_permit>(semaphore->make_permit(nullptr, "permit1"));
|
||||
|
||||
// Test will fail via use-after-free
|
||||
auto f = semaphore->stop().then([semaphore = std::move(semaphore)] { });
|
||||
|
||||
later().get();
|
||||
BOOST_REQUIRE(!f.available());
|
||||
permit.reset();
|
||||
|
||||
// Test will fail by timing out.
|
||||
f.get();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,68 +27,6 @@
|
||||
class test_reader_lifecycle_policy
|
||||
: public reader_lifecycle_policy
|
||||
, public enable_shared_from_this<test_reader_lifecycle_policy> {
|
||||
public:
|
||||
class operations_gate {
|
||||
public:
|
||||
class operation {
|
||||
gate* _g = nullptr;
|
||||
|
||||
private:
|
||||
void leave() {
|
||||
if (_g) {
|
||||
_g->leave();
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
operation() = default;
|
||||
explicit operation(gate& g) : _g(&g) { _g->enter(); }
|
||||
operation(const operation&) = delete;
|
||||
operation(operation&& o) : _g(std::exchange(o._g, nullptr)) { }
|
||||
~operation() { leave(); }
|
||||
operation& operator=(const operation&) = delete;
|
||||
operation& operator=(operation&& o) {
|
||||
leave();
|
||||
_g = std::exchange(o._g, nullptr);
|
||||
return *this;
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
std::vector<gate> _gates;
|
||||
|
||||
public:
|
||||
operations_gate()
|
||||
: _gates(smp::count) {
|
||||
}
|
||||
|
||||
operation enter() {
|
||||
return operation(_gates[this_shard_id()]);
|
||||
}
|
||||
|
||||
future<> close() {
|
||||
return parallel_for_each(boost::irange(smp::count), [this] (shard_id shard) {
|
||||
return smp::submit_to(shard, [this, shard] {
|
||||
return _gates[shard].close();
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
class semaphore_registry {
|
||||
std::vector< // 1 per shard
|
||||
std::list<reader_concurrency_semaphore>> _semaphores;
|
||||
public:
|
||||
semaphore_registry() : _semaphores(smp::count) { }
|
||||
semaphore_registry(semaphore_registry&&) = delete;
|
||||
semaphore_registry(const semaphore_registry&) = delete;
|
||||
template <typename... Arg>
|
||||
reader_concurrency_semaphore& create_semaphore(Arg&&... arg) {
|
||||
return _semaphores[this_shard_id()].emplace_back(std::forward<Arg>(arg)...);
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
using factory_function = std::function<flat_mutation_reader(
|
||||
schema_ptr,
|
||||
const dht::partition_range&,
|
||||
@@ -98,10 +36,7 @@ private:
|
||||
mutation_reader::forwarding)>;
|
||||
|
||||
struct reader_context {
|
||||
reader_concurrency_semaphore* semaphore = nullptr;
|
||||
operations_gate::operation op;
|
||||
std::optional<reader_permit> permit;
|
||||
std::optional<future<reader_permit::resource_units>> wait_future;
|
||||
std::optional<reader_concurrency_semaphore> semaphore;
|
||||
std::optional<const dht::partition_range> range;
|
||||
std::optional<const query::partition_slice> slice;
|
||||
|
||||
@@ -111,17 +46,13 @@ private:
|
||||
};
|
||||
|
||||
factory_function _factory_function;
|
||||
operations_gate& _operation_gate;
|
||||
semaphore_registry& _semaphore_registry;
|
||||
std::vector<foreign_ptr<std::unique_ptr<reader_context>>> _contexts;
|
||||
std::vector<future<>> _destroy_futures;
|
||||
bool _evict_paused_readers = false;
|
||||
|
||||
public:
|
||||
explicit test_reader_lifecycle_policy(factory_function f, operations_gate& g, semaphore_registry& semaphore_registry, bool evict_paused_readers = false)
|
||||
explicit test_reader_lifecycle_policy(factory_function f, bool evict_paused_readers = false)
|
||||
: _factory_function(std::move(f))
|
||||
, _operation_gate(g)
|
||||
, _semaphore_registry(semaphore_registry)
|
||||
, _contexts(smp::count)
|
||||
, _evict_paused_readers(evict_paused_readers) {
|
||||
}
|
||||
@@ -140,27 +71,17 @@ public:
|
||||
} else {
|
||||
_contexts[shard] = make_foreign(std::make_unique<reader_context>(range, slice));
|
||||
}
|
||||
_contexts[shard]->op = _operation_gate.enter();
|
||||
return _factory_function(std::move(schema), *_contexts[shard]->range, *_contexts[shard]->slice, pc, std::move(trace_state), fwd_mr);
|
||||
}
|
||||
virtual future<> destroy_reader(shard_id shard, future<stopped_reader> reader) noexcept override {
|
||||
// waited via _operation_gate
|
||||
return reader.then([shard, this] (stopped_reader&& reader) {
|
||||
return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = &*_contexts[shard]] () mutable {
|
||||
auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(*handle));
|
||||
auto ret = reader_opt ? reader_opt->close() : make_ready_future<>();
|
||||
ctx->semaphore->broken();
|
||||
if (ctx->wait_future) {
|
||||
ret = ret.then([ctx = std::move(ctx)] () mutable {
|
||||
return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future<reader_permit::resource_units> f) mutable {
|
||||
f.ignore_ready_future();
|
||||
ctx->permit.reset(); // make sure it's destroyed before the semaphore
|
||||
});
|
||||
});
|
||||
}
|
||||
return std::move(ret);
|
||||
virtual future<> destroy_reader(stopped_reader reader) noexcept override {
|
||||
auto& ctx = _contexts[this_shard_id()];
|
||||
auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(reader.handle));
|
||||
auto ret = reader_opt ? reader_opt->close() : make_ready_future<>();
|
||||
return ret.finally([&ctx] {
|
||||
return ctx->semaphore->stop().finally([&ctx] {
|
||||
ctx.release();
|
||||
});
|
||||
}).finally([zis = shared_from_this()] {});
|
||||
});
|
||||
}
|
||||
virtual reader_concurrency_semaphore& semaphore() override {
|
||||
const auto shard = this_shard_id();
|
||||
@@ -170,14 +91,10 @@ public:
|
||||
return *_contexts[shard]->semaphore;
|
||||
}
|
||||
if (_evict_paused_readers) {
|
||||
_contexts[shard]->semaphore = &_semaphore_registry.create_semaphore(0, std::numeric_limits<ssize_t>::max(), format("reader_concurrency_semaphore @shard_id={}", shard));
|
||||
_contexts[shard]->permit = _contexts[shard]->semaphore->make_permit(nullptr, "tests::reader_lifecycle_policy");
|
||||
// Add a waiter, so that all registered inactive reads are
|
||||
// immediately evicted.
|
||||
// We don't care about the returned future.
|
||||
_contexts[shard]->wait_future = _contexts[shard]->permit->wait_admission(1, db::no_timeout);
|
||||
// Create with no memory, so all inactive reads are immediately evicted.
|
||||
_contexts[shard]->semaphore.emplace(1, 0, format("reader_concurrency_semaphore @shard_id={}", shard));
|
||||
} else {
|
||||
_contexts[shard]->semaphore = &_semaphore_registry.create_semaphore(reader_concurrency_semaphore::no_limits{});
|
||||
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{});
|
||||
}
|
||||
return *_contexts[shard]->semaphore;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user