From 98e5f0429ba163bcfbdf749a23461f746453e181 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 27 May 2021 13:20:25 +0300 Subject: [PATCH 01/15] mutation_reader: reader_lifcecycle_policy::destroy_reader(): remove out-of-date comment About the multishard reader not being able to wait on returned future. It can now via the `close()` method. --- mutation_reader.hh | 4 ---- 1 file changed, 4 deletions(-) diff --git a/mutation_reader.hh b/mutation_reader.hh index b64605544f..3e7a689768 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -473,10 +473,6 @@ public: /// This method is expected to do a proper cleanup, that is, leave any gates, /// release any locks or whatever is appropriate for the shard reader. /// - /// The multishard reader couldn't wait on any future returned from this - /// method (as it will be called from the destructor) so waiting on - /// all the readers being cleaned up is up to the implementation. - /// /// This method will be called from a destructor so it cannot throw. virtual future<> destroy_reader(shard_id shard, future reader) noexcept = 0; From 7552cc73cf8355976d521e68484e6c319fa7ec50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 11 Jun 2021 15:39:01 +0300 Subject: [PATCH 02/15] mutation_reader: shard_reader::close(): close _reader The reason we got away without closing _reader so far is that it is an `std::unique_ptr` which is a `flat_mutation_reader::impl` instance, without the `flat_mutation_reader` wrapper, which contains the validations for close. --- mutation_reader.cc | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index ba8d46d931..8688b964a3 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1642,11 +1642,13 @@ future<> shard_reader::close() noexcept { // TODO: return future upstream as part of close() return _lifecycle_policy->destroy_reader(_shard, f.then([this] { return smp::submit_to(_shard, [this] { - auto ret = std::tuple( - make_foreign(std::make_unique(std::move(*_reader).inactive_read_handle())), - make_foreign(std::make_unique(_reader->detach_buffer()))); - _reader.reset(); - return ret; + auto irh = std::move(*_reader).inactive_read_handle(); + return with_closeable(flat_mutation_reader(_reader.release()), [irh = std::move(irh)] (flat_mutation_reader& reader) mutable { + auto ret = std::tuple( + make_foreign(std::make_unique(std::move(irh))), + make_foreign(std::make_unique(reader.detach_buffer()))); + return std::move(ret); + }); }).then([this] (std::tuple>, foreign_ptr>> remains) { auto&& [irh, remote_buffer] = remains; From ab8d2a04a57a6edcd82099dafa024c9d8e734991 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 14 Jun 2021 10:51:02 +0300 Subject: [PATCH 03/15] multishard_mutation_query: destroy remote parts in the foreground Currently the foreign fields of the reader meta are destroyed in the background via the foreign pointer's destructor (with one exception). This makes the already complicated life-cycle of these parts and their dependencies even harder to reason about, especially in tests, where even things like semaphores live only within the test. This patch makes sure to destroy all these remote fields in the foreground in either `save_reader()` or `stop()`, ensuring that once `stop()` returns, everything is cleaned up. --- multishard_mutation_query.cc | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index d2ee88fd20..e6494d8993 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -356,10 +356,17 @@ future<> read_context::stop() { auto gate_fut = _dismantling_gate.is_closed() ? make_ready_future<>() : _dismantling_gate.close(); return gate_fut.then([this] { return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) { - if (_readers[shard].handle && *_readers[shard].handle) { + if (_readers[shard].rparts) { return _db.invoke_on(shard, [rm = std::move(_readers[shard])] (database& db) mutable { - auto reader_opt = rm.rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle)); - return reader_opt ? reader_opt->close() : make_ready_future<>(); + auto rparts = rm.rparts.release(); + auto irh = rm.handle.release(); + if (*irh) { + auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle)); + if (reader_opt) { + return reader_opt->close().then([rparts = std::move(rparts)] { }); + } + } + return make_ready_future<>(); }); } return make_ready_future<>(); @@ -458,7 +465,8 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las &last_pkey, &last_ckey, gts = tracing::global_trace_state_ptr(_trace_state)] (database& db) mutable { try { auto rparts = rm.rparts.release(); // avoid another round-trip when destroying rparts - flat_mutation_reader_opt reader = rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle)); + auto irh = rm.handle.release(); + flat_mutation_reader_opt reader = rparts->permit.semaphore().unregister_inactive_read(std::move(*irh)); if (!reader) { return make_ready_future<>(); From 13d7806b622919b76011d2b24f4ad180a041cac7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 27 May 2021 13:18:08 +0300 Subject: [PATCH 04/15] mutation_reader: shard_reader::close(): wait on the remote reader We now have a future<> returning close() method so we don't need to do the cleanup of the remote reader in the background, detaching it from the shard-reader under destruction. We can now wait for the cleanup properly before the shard reader is destroyed and just pass the stopped reader to reader_lifecycle_policy::destroy_reader(). This patch does the first part -- moving the cleanup to the foreground, the API change of said method will come in the next patch. --- mutation_reader.cc | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index 8688b964a3..70084826b1 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1634,14 +1634,15 @@ future<> shard_reader::close() noexcept { // Nothing to do if there was no reader created, nor is there a background // read ahead in progress which will create one. if (!_reader && !_read_ahead) { - return make_ready_future<>(); + co_return; } - auto f = _read_ahead ? *std::exchange(_read_ahead, std::nullopt) : make_ready_future<>(); + try { + if (_read_ahead) { + co_await *std::exchange(_read_ahead, std::nullopt); + } - // TODO: return future upstream as part of close() - return _lifecycle_policy->destroy_reader(_shard, f.then([this] { - return smp::submit_to(_shard, [this] { + auto&& [irh, remote_buffer] = co_await smp::submit_to(_shard, [this] () { auto irh = std::move(*_reader).inactive_read_handle(); return with_closeable(flat_mutation_reader(_reader.release()), [irh = std::move(irh)] (flat_mutation_reader& reader) mutable { auto ret = std::tuple( @@ -1649,16 +1650,15 @@ future<> shard_reader::close() noexcept { make_foreign(std::make_unique(reader.detach_buffer()))); return std::move(ret); }); - }).then([this] (std::tuple>, - foreign_ptr>> remains) { - auto&& [irh, remote_buffer] = remains; - auto buffer = detach_buffer(); - for (const auto& mf : *remote_buffer) { - buffer.emplace_back(*_schema, _permit, mf); // we are copying from the remote shard. - } - return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer)}; }); - }).finally([zis = shared_from_this()] {})); + auto buffer = detach_buffer(); + for (const auto& mf : *remote_buffer) { + buffer.emplace_back(*_schema, _permit, mf); // we are copying from the remote shard. + } + co_await _lifecycle_policy->destroy_reader(_shard, make_ready_future(reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer)})); + } catch (...) { + mrlog.error("shard_reader::close(): failed to stop reader on shard {}: {}", _shard, std::current_exception()); + } } future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) { @@ -1958,8 +1958,7 @@ future<> multishard_combining_reader::fast_forward_to(position_range pr, db::tim } future<> multishard_combining_reader::close() noexcept { - auto shard_readers = std::move(_shard_readers); - return parallel_for_each(shard_readers, [] (lw_shared_ptr& sr) { + return parallel_for_each(_shard_readers, [] (lw_shared_ptr& sr) { return sr->close(); }); } From a7e59d3e2c6f725fa2912b858b216911c777e055 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 27 May 2021 13:29:54 +0300 Subject: [PATCH 05/15] mutation_reader: reader_lifecycle_policy::destroy_reader(): de-futurize reader parameter The shard reader is now able to wait on the stopped reader and pass the already stopped reader to `destroy_reader()`, so we can de-futurize the reader parameter of said method. The shard reader was already patched to pass a ready future so adjusting the call-site is trivial. The most prominent implementation, the multishard mutation query, can now also drop its `_dismantling_gate` which was put in place so it can wait on the background stopping if readers. A consequence of this move is that handling errors that might happen during the stopping of the reader is now handled in the shard reader, not all lifecycle policy implementations. --- database.cc | 6 +----- multishard_mutation_query.cc | 26 +++----------------------- mutation_reader.cc | 2 +- mutation_reader.hh | 8 +++----- test/lib/reader_lifecycle_policy.hh | 4 +--- 5 files changed, 9 insertions(+), 37 deletions(-) diff --git a/database.cc b/database.cc index 141c811be0..1a3004539d 100644 --- a/database.cc +++ b/database.cc @@ -2308,15 +2308,11 @@ flat_mutation_reader make_multishard_streaming_reader(distributed& db, return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr); } - virtual future<> destroy_reader(shard_id shard, future reader_fut) noexcept override { - return reader_fut.then([this, zis = shared_from_this(), shard] (stopped_reader&& reader) mutable { + virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override { return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable { auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(*handle)); return reader_opt ? reader_opt->close() : make_ready_future<>(); }); - }).handle_exception([shard] (std::exception_ptr e) { - dblog.warn("Failed to destroy shard reader of streaming multishard reader on shard {}: {}", shard, e); - }); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index e6494d8993..f5dbf63a68 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -203,8 +203,6 @@ class read_context : public reader_lifecycle_policy { std::vector _readers; std::vector _semaphores; - gate _dismantling_gate; - static std::string_view reader_state_to_string(reader_state rs); dismantle_buffer_stats dismantle_combined_buffer(flat_mutation_reader::tracked_buffer combined_buffer, const dht::decorated_key& pkey); @@ -247,7 +245,7 @@ public: tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr) override; - virtual future<> destroy_reader(shard_id shard, future reader_fut) noexcept override; + virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override; virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); @@ -323,20 +321,9 @@ flat_mutation_reader read_context::create_reader( std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr); } -future<> read_context::destroy_reader(shard_id shard, future reader_fut) noexcept { - // Future is waited on indirectly in `stop()` (via `_dismantling_gate`). - return with_gate(_dismantling_gate, [this, shard, reader_fut = std::move(reader_fut)] () mutable { - return reader_fut.then_wrapped([this, shard] (future&& reader_fut) { +future<> read_context::destroy_reader(shard_id shard, stopped_reader reader) noexcept { auto& rm = _readers[shard]; - if (reader_fut.failed()) { - mmq_log.debug("Failed to stop reader on shard {}: {}", shard, reader_fut.get_exception()); - ++_db.local().get_stats().multishard_query_failed_reader_stops; - rm.state = reader_state::inexistent; - return; - } - - auto reader = reader_fut.get0(); if (rm.state == reader_state::used) { rm.state = reader_state::saving; rm.handle = std::move(reader.handle); @@ -348,13 +335,10 @@ future<> read_context::destroy_reader(shard_id shard, future rea reader_state_to_string(rm.state), shard); } - }); - }); + return make_ready_future<>(); } future<> read_context::stop() { - auto gate_fut = _dismantling_gate.is_closed() ? make_ready_future<>() : _dismantling_gate.close(); - return gate_fut.then([this] { return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) { if (_readers[shard].rparts) { return _db.invoke_on(shard, [rm = std::move(_readers[shard])] (database& db) mutable { @@ -371,7 +355,6 @@ future<> read_context::stop() { } return make_ready_future<>(); }); - }); } read_context::dismantle_buffer_stats read_context::dismantle_combined_buffer(flat_mutation_reader::tracked_buffer combined_buffer, @@ -563,8 +546,6 @@ future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsu return make_ready_future<>(); } - return _dismantling_gate.close().then([this, unconsumed_buffer = std::move(unconsumed_buffer), compaction_state = std::move(compaction_state), - last_ckey = std::move(last_ckey)] () mutable { auto last_pkey = compaction_state.partition_start.key(); // Ensure all readers have engaged reader_meta::buffer member. @@ -591,7 +572,6 @@ future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsu return make_ready_future<>(); }); }); - }); } namespace { diff --git a/mutation_reader.cc b/mutation_reader.cc index 70084826b1..c9b70f84a3 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1655,7 +1655,7 @@ future<> shard_reader::close() noexcept { for (const auto& mf : *remote_buffer) { buffer.emplace_back(*_schema, _permit, mf); // we are copying from the remote shard. } - co_await _lifecycle_policy->destroy_reader(_shard, make_ready_future(reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer)})); + co_await _lifecycle_policy->destroy_reader(_shard, reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer)}); } catch (...) { mrlog.error("shard_reader::close(): failed to stop reader on shard {}: {}", _shard, std::current_exception()); } diff --git a/mutation_reader.hh b/mutation_reader.hh index 3e7a689768..56413726eb 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -464,17 +464,15 @@ public: tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr) = 0; - /// Wait on the shard reader to stop then destroy it. + /// Destroy the shard reader. /// /// Will be called when the multishard reader is being destroyed. It will be - /// called for each of the shard readers. The future resolves when the - /// reader is stopped, that is it, finishes all background and/or pending - /// work. + /// called for each of the shard readers. /// This method is expected to do a proper cleanup, that is, leave any gates, /// release any locks or whatever is appropriate for the shard reader. /// /// This method will be called from a destructor so it cannot throw. - virtual future<> destroy_reader(shard_id shard, future reader) noexcept = 0; + virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept = 0; /// Get the relevant semaphore for this read. /// diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 4108677d43..2dfb54a03b 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -143,9 +143,8 @@ public: _contexts[shard]->op = _operation_gate.enter(); return _factory_function(std::move(schema), *_contexts[shard]->range, *_contexts[shard]->slice, pc, std::move(trace_state), fwd_mr); } - virtual future<> destroy_reader(shard_id shard, future reader) noexcept override { + virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override { // waited via _operation_gate - return reader.then([shard, this] (stopped_reader&& reader) { return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = &*_contexts[shard]] () mutable { auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(*handle)); auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); @@ -160,7 +159,6 @@ public: } return std::move(ret); }); - }).finally([zis = shared_from_this()] {}); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); From 4ecf061c9012c74da60c4e1ea710cccd7a6dc807 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 27 May 2021 13:39:08 +0300 Subject: [PATCH 06/15] reader_lifecycle_policy implementations: fix indentation Left broken from the previous patch. --- database.cc | 8 +-- multishard_mutation_query.cc | 92 ++++++++++++++--------------- test/lib/reader_lifecycle_policy.hh | 28 ++++----- 3 files changed, 64 insertions(+), 64 deletions(-) diff --git a/database.cc b/database.cc index 1a3004539d..f621d86e2d 100644 --- a/database.cc +++ b/database.cc @@ -2309,10 +2309,10 @@ flat_mutation_reader make_multishard_streaming_reader(distributed& db, return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr); } virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override { - return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable { - auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(*handle)); - return reader_opt ? reader_opt->close() : make_ready_future<>(); - }); + return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable { + auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(*handle)); + return reader_opt ? reader_opt->close() : make_ready_future<>(); + }); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index f5dbf63a68..fa377d1e00 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -322,39 +322,39 @@ flat_mutation_reader read_context::create_reader( } future<> read_context::destroy_reader(shard_id shard, stopped_reader reader) noexcept { - auto& rm = _readers[shard]; + auto& rm = _readers[shard]; - if (rm.state == reader_state::used) { - rm.state = reader_state::saving; - rm.handle = std::move(reader.handle); - rm.buffer = std::move(reader.unconsumed_fragments); - } else { - mmq_log.warn( - "Unexpected request to dismantle reader in state `{}` for shard {}." - " Reader was not created nor is in the process of being created.", - reader_state_to_string(rm.state), - shard); - } + if (rm.state == reader_state::used) { + rm.state = reader_state::saving; + rm.handle = std::move(reader.handle); + rm.buffer = std::move(reader.unconsumed_fragments); + } else { + mmq_log.warn( + "Unexpected request to dismantle reader in state `{}` for shard {}." + " Reader was not created nor is in the process of being created.", + reader_state_to_string(rm.state), + shard); + } return make_ready_future<>(); } future<> read_context::stop() { - return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) { - if (_readers[shard].rparts) { - return _db.invoke_on(shard, [rm = std::move(_readers[shard])] (database& db) mutable { - auto rparts = rm.rparts.release(); - auto irh = rm.handle.release(); - if (*irh) { - auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle)); - if (reader_opt) { - return reader_opt->close().then([rparts = std::move(rparts)] { }); - } + return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) { + if (_readers[shard].rparts) { + return _db.invoke_on(shard, [rm = std::move(_readers[shard])] (database& db) mutable { + auto rparts = rm.rparts.release(); + auto irh = rm.handle.release(); + if (*irh) { + auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle)); + if (reader_opt) { + return reader_opt->close().then([rparts = std::move(rparts)] { }); } - return make_ready_future<>(); - }); - } - return make_ready_future<>(); - }); + } + return make_ready_future<>(); + }); + } + return make_ready_future<>(); + }); } read_context::dismantle_buffer_stats read_context::dismantle_combined_buffer(flat_mutation_reader::tracked_buffer combined_buffer, @@ -546,32 +546,32 @@ future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsu return make_ready_future<>(); } - auto last_pkey = compaction_state.partition_start.key(); + auto last_pkey = compaction_state.partition_start.key(); - // Ensure all readers have engaged reader_meta::buffer member. - for (auto& rm : _readers) { - if (!rm.buffer) { - rm.buffer.emplace(_permit); - } + // Ensure all readers have engaged reader_meta::buffer member. + for (auto& rm : _readers) { + if (!rm.buffer) { + rm.buffer.emplace(_permit); } + } - const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey); - tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats); + const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey); + tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats); - const auto cs_stats = dismantle_compaction_state(std::move(compaction_state)); - tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats); + const auto cs_stats = dismantle_compaction_state(std::move(compaction_state)); + tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats); - return do_with(std::move(last_pkey), std::move(last_ckey), [this] (const dht::decorated_key& last_pkey, - const std::optional& last_ckey) { - return parallel_for_each(boost::irange(0u, smp::count), [this, &last_pkey, &last_ckey] (shard_id shard) { - auto& rm = _readers[shard]; - if (rm.state == reader_state::successful_lookup || rm.state == reader_state::saving) { - return save_reader(shard, last_pkey, last_ckey); - } + return do_with(std::move(last_pkey), std::move(last_ckey), [this] (const dht::decorated_key& last_pkey, + const std::optional& last_ckey) { + return parallel_for_each(boost::irange(0u, smp::count), [this, &last_pkey, &last_ckey] (shard_id shard) { + auto& rm = _readers[shard]; + if (rm.state == reader_state::successful_lookup || rm.state == reader_state::saving) { + return save_reader(shard, last_pkey, last_ckey); + } - return make_ready_future<>(); - }); + return make_ready_future<>(); }); + }); } namespace { diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 2dfb54a03b..5943ef70de 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -145,20 +145,20 @@ public: } virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override { // waited via _operation_gate - return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = &*_contexts[shard]] () mutable { - auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(*handle)); - auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); - ctx->semaphore->broken(); - if (ctx->wait_future) { - ret = ret.then([ctx = std::move(ctx)] () mutable { - return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future f) mutable { - f.ignore_ready_future(); - ctx->permit.reset(); // make sure it's destroyed before the semaphore - }); - }); - } - return std::move(ret); - }); + return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = &*_contexts[shard]] () mutable { + auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(*handle)); + auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); + ctx->semaphore->broken(); + if (ctx->wait_future) { + ret = ret.then([ctx = std::move(ctx)] () mutable { + return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future f) mutable { + f.ignore_ready_future(); + ctx->permit.reset(); // make sure it's destroyed before the semaphore + }); + }); + } + return std::move(ret); + }); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); From 8c7447effd1e9415901b96ec096f7f2856e85b29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 27 May 2021 15:27:10 +0300 Subject: [PATCH 07/15] mutation_reader: reader_lifecycle_policy::destroy_reader(): require to be called on native shard Currently shard_reader::close() (its caller) goes to the remote shard, copies back all fragments left there to the local shard, then calls `destroy_reader()`, which in the case of the multishard mutation query copies it all back to the native shard. This was required before because `shard_reader::stop()` (`close()`'s) predecessor) couldn't wait on `smp::submit_to()`. But close can, so we can get rid of all this back-and-forth and just call `destroy_reader()` on the shard the reader lives on, just like we do with `create_reader()`. --- database.cc | 12 ++-- multishard_mutation_query.cc | 95 ++++++++++++++++------------- mutation_reader.cc | 26 ++++---- mutation_reader.hh | 5 +- test/lib/reader_lifecycle_policy.hh | 7 +-- 5 files changed, 80 insertions(+), 65 deletions(-) diff --git a/database.cc b/database.cc index f621d86e2d..8e8341a84a 100644 --- a/database.cc +++ b/database.cc @@ -2308,11 +2308,13 @@ flat_mutation_reader make_multishard_streaming_reader(distributed& db, return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr); } - virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override { - return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable { - auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(*handle)); - return reader_opt ? reader_opt->close() : make_ready_future<>(); - }); + virtual future<> destroy_reader(stopped_reader reader) noexcept override { + auto ctx = std::move(_contexts[this_shard_id()]); + auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(reader.handle)); + if (!reader_opt) { + return make_ready_future<>(); + } + return reader_opt->close().finally([ctx = std::move(ctx)] {}); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index fa377d1e00..ff2a7043aa 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -105,34 +105,43 @@ class read_context : public reader_lifecycle_policy { std::unique_ptr range; std::unique_ptr slice; utils::phased_barrier::operation read_operation; + std::optional handle; + std::optional buffer; remote_parts( reader_permit permit, std::unique_ptr range = nullptr, std::unique_ptr slice = nullptr, - utils::phased_barrier::operation read_operation = {}) + utils::phased_barrier::operation read_operation = {}, + std::optional handle = {}) : permit(std::move(permit)) , range(std::move(range)) , slice(std::move(slice)) - , read_operation(std::move(read_operation)) { + , read_operation(std::move(read_operation)) + , handle(std::move(handle)) { } }; reader_state state = reader_state::inexistent; foreign_unique_ptr rparts; - foreign_unique_ptr handle; - std::optional buffer; + std::optional dismantled_buffer; reader_meta() = default; // Remote constructor. - reader_meta(reader_state s, std::optional rp = {}, reader_concurrency_semaphore::inactive_read_handle h = {}) - : state(s) - , handle(make_foreign(std::make_unique(std::move(h)))) { + reader_meta(reader_state s, std::optional rp = {}) + : state(s) { if (rp) { rparts = make_foreign(std::make_unique(std::move(*rp))); } } + + flat_mutation_reader::tracked_buffer& get_dismantled_buffer(const reader_permit& permit) { + if (!dismantled_buffer) { + dismantled_buffer.emplace(permit); + } + return *dismantled_buffer; + } }; struct dismantle_buffer_stats { @@ -245,7 +254,7 @@ public: tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr) override; - virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override; + virtual future<> destroy_reader(stopped_reader reader) noexcept override; virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); @@ -300,7 +309,7 @@ flat_mutation_reader read_context::create_reader( // The reader is either in inexistent or successful lookup state. if (rm.state == reader_state::successful_lookup) { - if (auto reader_opt = try_resume(std::move(*rm.handle))) { + if (auto reader_opt = try_resume(std::move(*rm.rparts->handle))) { rm.state = reader_state::used; return std::move(*reader_opt); } @@ -321,19 +330,18 @@ flat_mutation_reader read_context::create_reader( std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr); } -future<> read_context::destroy_reader(shard_id shard, stopped_reader reader) noexcept { - auto& rm = _readers[shard]; +future<> read_context::destroy_reader(stopped_reader reader) noexcept { + auto& rm = _readers[this_shard_id()]; if (rm.state == reader_state::used) { rm.state = reader_state::saving; - rm.handle = std::move(reader.handle); - rm.buffer = std::move(reader.unconsumed_fragments); + rm.rparts->handle = std::move(reader.handle); + rm.rparts->buffer = std::move(reader.unconsumed_fragments); } else { mmq_log.warn( - "Unexpected request to dismantle reader in state `{}` for shard {}." + "Unexpected request to dismantle reader in state `{}`." " Reader was not created nor is in the process of being created.", - reader_state_to_string(rm.state), - shard); + reader_state_to_string(rm.state)); } return make_ready_future<>(); } @@ -341,11 +349,10 @@ future<> read_context::destroy_reader(shard_id shard, stopped_reader reader) noe future<> read_context::stop() { return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) { if (_readers[shard].rparts) { - return _db.invoke_on(shard, [rm = std::move(_readers[shard])] (database& db) mutable { - auto rparts = rm.rparts.release(); - auto irh = rm.handle.release(); - if (*irh) { - auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle)); + return _db.invoke_on(shard, [&rparts_fptr = _readers[shard].rparts] (database& db) mutable { + auto rparts = rparts_fptr.release(); + if (rparts->handle) { + auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rparts->handle)); if (reader_opt) { return reader_opt->close().then([rparts = std::move(rparts)] { }); } @@ -382,7 +389,7 @@ read_context::dismantle_buffer_stats read_context::dismantle_combined_buffer(fla continue; } - auto& shard_buffer = *_readers[shard].buffer; + auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit); for (auto& smf : tmp_buffer) { stats.add(smf); shard_buffer.emplace_front(std::move(smf)); @@ -396,7 +403,7 @@ read_context::dismantle_buffer_stats read_context::dismantle_combined_buffer(fla } const auto shard = sharder.shard_of(pkey.token()); - auto& shard_buffer = *_readers[shard].buffer; + auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit); for (auto& smf : tmp_buffer) { stats.add(smf); shard_buffer.emplace_front(std::move(smf)); @@ -424,7 +431,7 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de return stats; } - auto& shard_buffer = *_readers[shard].buffer; + auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit); for (auto& rt : compaction_state.range_tombstones | boost::adaptors::reversed) { stats.add(*_schema, rt); @@ -448,23 +455,32 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las &last_pkey, &last_ckey, gts = tracing::global_trace_state_ptr(_trace_state)] (database& db) mutable { try { auto rparts = rm.rparts.release(); // avoid another round-trip when destroying rparts - auto irh = rm.handle.release(); - flat_mutation_reader_opt reader = rparts->permit.semaphore().unregister_inactive_read(std::move(*irh)); + flat_mutation_reader_opt reader = rparts->permit.semaphore().unregister_inactive_read(std::move(*rparts->handle)); if (!reader) { return make_ready_future<>(); } - auto& buffer = *rm.buffer; - const auto fragments = buffer.size(); + size_t fragments = 0; const auto size_before = reader->buffer_size(); + const auto& schema = *reader->schema(); - auto rit = std::reverse_iterator(buffer.cend()); - auto rend = std::reverse_iterator(buffer.cbegin()); - auto& schema = *reader->schema(); - for (;rit != rend; ++rit) { - // Copy the fragment, the buffer is on another shard. - reader->unpop_mutation_fragment(mutation_fragment(schema, rparts->permit, *rit)); + if (rparts->buffer) { + fragments += rparts->buffer->size(); + auto rit = std::reverse_iterator(rparts->buffer->end()); + auto rend = std::reverse_iterator(rparts->buffer->begin()); + for (; rit != rend; ++rit) { + reader->unpop_mutation_fragment(std::move(*rit)); + } + } + if (rm.dismantled_buffer) { + fragments += rm.dismantled_buffer->size(); + auto rit = std::reverse_iterator(rm.dismantled_buffer->cend()); + auto rend = std::reverse_iterator(rm.dismantled_buffer->cbegin()); + for (; rit != rend; ++rit) { + // Copy the fragment, the buffer is on another shard. + reader->unpop_mutation_fragment(mutation_fragment(schema, rparts->permit, *rit)); + } } const auto size_after = reader->buffer_size(); @@ -532,8 +548,8 @@ future<> read_context::lookup_readers() { auto handle = pause(semaphore, std::move(q).reader()); return reader_meta( reader_state::successful_lookup, - reader_meta::remote_parts(q.permit(), std::move(q).reader_range(), std::move(q).reader_slice(), table.read_in_progress()), - std::move(handle)); + reader_meta::remote_parts(q.permit(), std::move(q).reader_range(), std::move(q).reader_slice(), table.read_in_progress(), + std::move(handle))); }).then([this, shard] (reader_meta rm) { _readers[shard] = std::move(rm); }); @@ -548,13 +564,6 @@ future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsu auto last_pkey = compaction_state.partition_start.key(); - // Ensure all readers have engaged reader_meta::buffer member. - for (auto& rm : _readers) { - if (!rm.buffer) { - rm.buffer.emplace(_permit); - } - } - const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey); tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats); diff --git a/mutation_reader.cc b/mutation_reader.cc index c9b70f84a3..b135f071b7 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1642,20 +1642,24 @@ future<> shard_reader::close() noexcept { co_await *std::exchange(_read_ahead, std::nullopt); } - auto&& [irh, remote_buffer] = co_await smp::submit_to(_shard, [this] () { + co_await smp::submit_to(_shard, [this] { auto irh = std::move(*_reader).inactive_read_handle(); - return with_closeable(flat_mutation_reader(_reader.release()), [irh = std::move(irh)] (flat_mutation_reader& reader) mutable { - auto ret = std::tuple( - make_foreign(std::make_unique(std::move(irh))), - make_foreign(std::make_unique(reader.detach_buffer()))); - return std::move(ret); + return with_closeable(flat_mutation_reader(_reader.release()), [this] (flat_mutation_reader& reader) mutable { + auto permit = reader.permit(); + const auto& schema = *reader.schema(); + + auto unconsumed_fragments = reader.detach_buffer(); + auto rit = std::reverse_iterator(buffer().cend()); + auto rend = std::reverse_iterator(buffer().cbegin()); + for (; rit != rend; ++rit) { + unconsumed_fragments.emplace_front(schema, permit, *rit); // we are copying from the remote shard. + } + + return unconsumed_fragments; + }).then([this, irh = std::move(irh)] (flat_mutation_reader::tracked_buffer&& buf) mutable { + return _lifecycle_policy->destroy_reader({std::move(irh), std::move(buf)}); }); }); - auto buffer = detach_buffer(); - for (const auto& mf : *remote_buffer) { - buffer.emplace_back(*_schema, _permit, mf); // we are copying from the remote shard. - } - co_await _lifecycle_policy->destroy_reader(_shard, reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer)}); } catch (...) { mrlog.error("shard_reader::close(): failed to stop reader on shard {}: {}", _shard, std::current_exception()); } diff --git a/mutation_reader.hh b/mutation_reader.hh index 56413726eb..335d8a42b9 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -438,7 +438,7 @@ std::pair make_manually_paused_ev class reader_lifecycle_policy { public: struct stopped_reader { - foreign_ptr> handle; + reader_concurrency_semaphore::inactive_read_handle handle; flat_mutation_reader::tracked_buffer unconsumed_fragments; }; @@ -471,8 +471,9 @@ public: /// This method is expected to do a proper cleanup, that is, leave any gates, /// release any locks or whatever is appropriate for the shard reader. /// + /// This method has to be called on the shard the reader lives on. /// This method will be called from a destructor so it cannot throw. - virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept = 0; + virtual future<> destroy_reader(stopped_reader reader) noexcept = 0; /// Get the relevant semaphore for this read. /// diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 5943ef70de..df0eddb05b 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -143,10 +143,10 @@ public: _contexts[shard]->op = _operation_gate.enter(); return _factory_function(std::move(schema), *_contexts[shard]->range, *_contexts[shard]->slice, pc, std::move(trace_state), fwd_mr); } - virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override { + virtual future<> destroy_reader(stopped_reader reader) noexcept override { // waited via _operation_gate - return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = &*_contexts[shard]] () mutable { - auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(*handle)); + auto ctx = &*_contexts[this_shard_id()]; + auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(reader.handle)); auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); ctx->semaphore->broken(); if (ctx->wait_future) { @@ -158,7 +158,6 @@ public: }); } return std::move(ret); - }); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); From a10a6e253ea695f3819454c8d2c5bfe82c5c9d85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 27 May 2021 15:42:17 +0300 Subject: [PATCH 08/15] test/lib/reader_lifcecycle_policy: fix indentation Left broken from the previous patch. --- test/lib/reader_lifecycle_policy.hh | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index df0eddb05b..ea6c912fb1 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -147,17 +147,17 @@ public: // waited via _operation_gate auto ctx = &*_contexts[this_shard_id()]; auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(reader.handle)); - auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); - ctx->semaphore->broken(); - if (ctx->wait_future) { - ret = ret.then([ctx = std::move(ctx)] () mutable { - return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future f) mutable { - f.ignore_ready_future(); - ctx->permit.reset(); // make sure it's destroyed before the semaphore - }); - }); - } - return std::move(ret); + auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); + ctx->semaphore->broken(); + if (ctx->wait_future) { + ret = ret.then([ctx = std::move(ctx)] () mutable { + return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future f) mutable { + f.ignore_ready_future(); + ctx->permit.reset(); // make sure it's destroyed before the semaphore + }); + }); + } + return std::move(ret); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); From 578a092e4a206c0d8db6b4c4437d7699610bc7ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 14 Jun 2021 12:43:50 +0300 Subject: [PATCH 09/15] reader_concurrency_semaphore: wait for all permits to be destroyed in stop() To prevent use-after-free resulting from any permit out-living the semaphore. --- database.cc | 12 ++++----- reader_concurrency_semaphore.cc | 23 +++++++++++++--- reader_concurrency_semaphore.hh | 4 +++ .../reader_concurrency_semaphore_test.cc | 27 ++++++++++++++++++- 4 files changed, 55 insertions(+), 11 deletions(-) diff --git a/database.cc b/database.cc index 8e8341a84a..ff0c51ddc4 100644 --- a/database.cc +++ b/database.cc @@ -2057,12 +2057,12 @@ database::stop() { }).then([this] { return _system_sstables_manager->close(); }).finally([this] { - return when_all_succeed( - _read_concurrency_sem.stop(), - _streaming_concurrency_sem.stop(), - _compaction_concurrency_sem.stop(), - _system_read_concurrency_sem.stop()).discard_result().finally([this] { - return _querier_cache.stop(); + return _querier_cache.stop().finally([this] { + return when_all_succeed( + _read_concurrency_sem.stop(), + _streaming_concurrency_sem.stop(), + _compaction_concurrency_sem.stop(), + _system_read_concurrency_sem.stop()).discard_result(); }); }); } diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index b7e72f673b..a68c9b6f5d 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -87,13 +87,17 @@ public: : _semaphore(semaphore) , _schema(schema) , _op_name_view(op_name) - { } + { + _semaphore.on_permit_created(*this); + } impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name) : _semaphore(semaphore) , _schema(schema) , _op_name(std::move(op_name)) , _op_name_view(_op_name) - { } + { + _semaphore.on_permit_created(*this); + } ~impl() { if (_resources) { on_internal_error_noexcept(rcslog, format("reader_permit::impl::~impl(): permit {} detected a leak of {{count={}, memory={}}} resources", @@ -102,6 +106,8 @@ public: _resources.memory)); signal(_resources); } + + _semaphore.on_permit_destroyed(*this); } reader_concurrency_semaphore& semaphore() { @@ -167,13 +173,11 @@ struct reader_concurrency_semaphore::permit_list { reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name) : _impl(::seastar::make_shared(semaphore, schema, op_name)) { - semaphore._permit_list->permits.push_back(*_impl); } reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name) : _impl(::seastar::make_shared(semaphore, schema, std::move(op_name))) { - semaphore._permit_list->permits.push_back(*_impl); } void reader_permit::on_waiting() { @@ -504,6 +508,7 @@ future<> reader_concurrency_semaphore::stop() noexcept { _stopped = true; clear_inactive_reads(); co_await _close_readers_gate.close(); + co_await _permit_gate.close(); broken(std::make_exception_ptr(stopped_exception())); co_return; } @@ -600,6 +605,16 @@ future reader_concurrency_semaphore::do_wait_admi return fut; } +void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) noexcept { + _permit_list->permits.push_back(permit); + _permit_gate.enter(); +} + +void reader_concurrency_semaphore::on_permit_destroyed(reader_permit::impl& permit) noexcept { + permit.unlink(); + _permit_gate.leave(); +} + reader_permit reader_concurrency_semaphore::make_permit(const schema* const schema, const char* const op_name) { return reader_permit(*this, schema, std::string_view(op_name)); } diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index f184c6ec23..79adc26e37 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -173,6 +173,7 @@ private: std::unique_ptr _permit_list; bool _stopped = false; gate _close_readers_gate; + gate _permit_gate; private: [[nodiscard]] flat_mutation_reader detach_inactive_reader(inactive_read&, evict_reason reason) noexcept; @@ -186,6 +187,9 @@ private: void evict_readers_in_background(); future do_wait_admission(reader_permit permit, size_t memory, db::timeout_clock::time_point timeout); + void on_permit_created(reader_permit::impl&) noexcept; + void on_permit_destroyed(reader_permit::impl&) noexcept; + std::runtime_error stopped_exception(); // closes reader in the background. diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index e5be3597c5..a706a8983f 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -126,8 +126,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024}; reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name()); - auto permit = semaphore.make_permit(s.schema().get(), get_name()); auto stop_sem = deferred_stop(semaphore); + auto permit = semaphore.make_permit(s.schema().get(), get_name()); std::optional residue_units; @@ -552,3 +552,28 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) { testlog.info("With max-lines=4: {}", semaphore.dump_diagnostics(4)); testlog.info("With no max-lines: {}", semaphore.dump_diagnostics(0)); } + +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits) { + BOOST_TEST_MESSAGE("0 permits"); + { + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); + // Test will fail by timing out. + semaphore.stop().get(); + } + + BOOST_TEST_MESSAGE("1 permit"); + { + auto semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}, get_name()); + auto permit = std::make_unique(semaphore->make_permit(nullptr, "permit1")); + + // Test will fail via use-after-free + auto f = semaphore->stop().then([semaphore = std::move(semaphore)] { }); + + later().get(); + BOOST_REQUIRE(!f.available()); + permit.reset(); + + // Test will fail by timing out. + f.get(); + } +} From c09c62a0fb51e32e80d9270ed6657b7ec7e2c377 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 20 May 2021 18:31:15 +0300 Subject: [PATCH 10/15] test/lib/reader_lifecycle_policy: use a more robust eviction mechanism The test reader lifecycle policy has a mode in which it wants to ensure all inactive readers are evicted, so tests can stress reader recreation logic. For this it currently employs a trick of creating a waiter on the semaphore. I don't even know how this even works (or if it even does) but it sure complicates the lifecycle policy code a lot. So switch to the much more reliable and simple method of creating the semaphore with a single count and no memory. This ensures that all inactive reads are immediately evicted, while still allows a single read to be admitted at all times. --- test/lib/reader_lifecycle_policy.hh | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index ea6c912fb1..37ad604149 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -100,8 +100,6 @@ private: struct reader_context { reader_concurrency_semaphore* semaphore = nullptr; operations_gate::operation op; - std::optional permit; - std::optional> wait_future; std::optional range; std::optional slice; @@ -149,14 +147,6 @@ public: auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(reader.handle)); auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); ctx->semaphore->broken(); - if (ctx->wait_future) { - ret = ret.then([ctx = std::move(ctx)] () mutable { - return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future f) mutable { - f.ignore_ready_future(); - ctx->permit.reset(); // make sure it's destroyed before the semaphore - }); - }); - } return std::move(ret); } virtual reader_concurrency_semaphore& semaphore() override { @@ -167,12 +157,8 @@ public: return *_contexts[shard]->semaphore; } if (_evict_paused_readers) { - _contexts[shard]->semaphore = &_semaphore_registry.create_semaphore(0, std::numeric_limits::max(), format("reader_concurrency_semaphore @shard_id={}", shard)); - _contexts[shard]->permit = _contexts[shard]->semaphore->make_permit(nullptr, "tests::reader_lifecycle_policy"); - // Add a waiter, so that all registered inactive reads are - // immediately evicted. - // We don't care about the returned future. - _contexts[shard]->wait_future = _contexts[shard]->permit->wait_admission(1, db::no_timeout); + // Create with no memory, so all inactive reads are immediately evicted. + _contexts[shard]->semaphore = &_semaphore_registry.create_semaphore(1, 0, format("reader_concurrency_semaphore @shard_id={}", shard)); } else { _contexts[shard]->semaphore = &_semaphore_registry.create_semaphore(reader_concurrency_semaphore::no_limits{}); } From 5a271e42a52dd5a6d6bd0e7494792faf9ce1c442 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 14 Jun 2021 15:40:51 +0300 Subject: [PATCH 11/15] test/lib/reader_lifecycle_policy: destroy_reader(): stop the semaphore So that when this method returns the semaphore is safe to destroy. This in turn will enable us to get rid of all the machinery we have in place to deal with the semaphore having to out-live the lifecycle policy without a clear time as to when it can be safe to destroy. --- test/lib/reader_lifecycle_policy.hh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 37ad604149..b52bd1e4bc 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -146,8 +146,9 @@ public: auto ctx = &*_contexts[this_shard_id()]; auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(reader.handle)); auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); - ctx->semaphore->broken(); - return std::move(ret); + return ret.finally([&ctx] { + return ctx->semaphore->stop(); + }); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); From d2ddaced4e65dfaedad60914b66fd5fb63fa85c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 27 May 2021 15:42:54 +0300 Subject: [PATCH 12/15] test/lib/reader_lifecycle_policy: get rid of lifecycle workarounds The lifecycle of the reader lifecycle policy and all the resources the reads use is now enclosed in that of the multishard reader thanks to its close() method. We can now remove all the workarounds we had in place to keep different resources as long as background reader cleanup finishes. --- ...ombining_reader_as_mutation_source_test.cc | 6 +- test/boost/mutation_reader_test.cc | 62 +++++---------- test/lib/reader_lifecycle_policy.hh | 77 +------------------ 3 files changed, 25 insertions(+), 120 deletions(-) diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc index 12513c69d1..728e11f2b0 100644 --- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc +++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc @@ -51,8 +51,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { // It has to be a container that does not invalidate pointers std::list keep_alive_sharder; - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { auto make_populate = [&] (bool evict_paused_readers, bool single_fragment_buffer) { @@ -109,7 +107,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { return reader; }; - auto lifecycle_policy = seastar::make_shared(std::move(factory), operations_gate, semaphore_registry, evict_paused_readers); + auto lifecycle_policy = seastar::make_shared(std::move(factory), evict_paused_readers); auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s, tests::make_permit(), range, slice, pc, trace_state, fwd_mr); if (fwd_sm == streamed_mutation::forwarding::yes) { @@ -129,6 +127,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { testlog.info("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)"); run_mutation_source_tests(make_populate(true, true)); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index d02ec0a754..1d43aa6d2a 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -1781,9 +1781,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { std::vector> shards_touched(smp::count); simple_schema s; @@ -1799,7 +1796,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { }; assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), query::full_partition_range, @@ -1811,7 +1808,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) { BOOST_REQUIRE(shards_touched.at(i)); } - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2010,8 +2007,7 @@ struct multishard_reader_for_read_ahead { std::unique_ptr pr; }; -multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(simple_schema& s, test_reader_lifecycle_policy::operations_gate& operations_gate, - test_reader_lifecycle_policy::semaphore_registry& semaphore_registry) { +multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(simple_schema& s) { auto remote_controls = std::vector>>(); remote_controls.reserve(smp::count); for (unsigned i = 0; i < smp::count; ++i) { @@ -2068,7 +2064,7 @@ multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(s dht::ring_position::ending_at(pkeys_by_tokens.rbegin()->first))); auto sharder = std::make_unique(s.schema()->get_sharder(), std::move(pkeys_by_tokens)); - auto reader = make_multishard_combining_reader_for_tests(*sharder, seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + auto reader = make_multishard_combining_reader_for_tests(*sharder, seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), *pr, s.schema()->full_slice(), service::get_local_sstable_query_read_priority()); return {std::move(reader), std::move(sharder), std::move(remote_controls), std::move(pr)}; @@ -2082,8 +2078,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) { } auto no_shards = smp::count - 1; - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { std::vector> shards_touched(smp::count); @@ -2102,7 +2096,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) { assert_that(make_multishard_combining_reader_for_tests( *sharder, - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), query::full_partition_range, @@ -2115,7 +2109,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) { } BOOST_REQUIRE(!shards_touched[no_shards]); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2126,9 +2120,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { std::vector> shards_touched(smp::count); simple_schema s; @@ -2171,7 +2162,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed inclusive_end ? "inclusive" : "exclusive"); assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), s.schema(), tests::make_permit(), pr, @@ -2184,7 +2175,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed BOOST_CHECK(shards_touched[i] == expected_shards_touched[i]); } - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2218,13 +2209,10 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { auto s = simple_schema(); - auto reader_sharder_remote_controls__ = prepare_multishard_reader_for_read_ahead_test(s, operations_gate, semaphore_registry); + auto reader_sharder_remote_controls__ = prepare_multishard_reader_for_read_ahead_test(s); auto&& reader = reader_sharder_remote_controls__.reader; auto&& sharder = reader_sharder_remote_controls__.sharder; auto&& remote_controls = reader_sharder_remote_controls__.remote_controls; @@ -2269,7 +2257,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending std::logical_and()).get0(); })); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2279,13 +2267,10 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_fast_forwarded_with_pe return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { auto s = simple_schema(); - auto reader_sharder_remote_controls_pr = prepare_multishard_reader_for_read_ahead_test(s, operations_gate, semaphore_registry); + auto reader_sharder_remote_controls_pr = prepare_multishard_reader_for_read_ahead_test(s); auto&& reader = reader_sharder_remote_controls_pr.reader; auto&& sharder = reader_sharder_remote_controls_pr.sharder; auto&& remote_controls = reader_sharder_remote_controls_pr.remote_controls; @@ -2348,14 +2333,11 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_fast_forwarded_with_pe std::logical_and()).get0(); })); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { env.execute_cql("CREATE KEYSPACE multishard_combining_reader_next_partition_ks" " WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get(); @@ -2407,7 +2389,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { return reader; }; auto reader = make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry), + seastar::make_shared(std::move(factory)), schema, tests::make_permit(), query::full_partition_range, @@ -2428,7 +2410,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { } assertions.produces_end_of_stream(); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2516,10 +2498,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic BOOST_REQUIRE(mf.as_clustering_row().key().equal(*s.schema(), ckey)); } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - - do_with_cql_env_thread([=, &operations_gate, &semaphore_registry, s = std::move(s)] (cql_test_env& env) mutable -> future<> { + do_with_cql_env_thread([=, s = std::move(s)] (cql_test_env& env) mutable -> future<> { auto factory = [=, gs = global_simple_schema(s)] ( schema_ptr, const dht::partition_range& range, @@ -2545,7 +2524,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic BOOST_REQUIRE(mut_opt); assert_that(make_multishard_combining_reader( - seastar::make_shared(std::move(factory), operations_gate, semaphore_registry, true), + seastar::make_shared(std::move(factory), true), s.schema(), tests::make_permit(), query::full_partition_range, @@ -2553,7 +2532,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic service::get_local_sstable_query_read_priority())) .produces_partition(*mut_opt); - return operations_gate.close(); + return make_ready_future<>(); }).get(); } @@ -2567,9 +2546,6 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { return; } - test_reader_lifecycle_policy::operations_gate operations_gate; - test_reader_lifecycle_policy::semaphore_registry semaphore_registry; - do_with_cql_env_thread([&] (cql_test_env& env) -> future<> { env.execute_cql("CREATE KEYSPACE multishard_streaming_reader_ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1};").get(); env.execute_cql("CREATE TABLE multishard_streaming_reader_ks.test (pk int, v int, PRIMARY KEY(pk));").get(); @@ -2613,7 +2589,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { streamed_mutation::forwarding::no, fwd_mr); }; auto reference_reader = make_filtering_reader( - make_multishard_combining_reader(seastar::make_shared(std::move(reader_factory), operations_gate, semaphore_registry), + make_multishard_combining_reader(seastar::make_shared(std::move(reader_factory)), schema, tests::make_permit(), partition_range, schema->full_slice(), service::get_local_sstable_query_read_priority()), [&remote_partitioner] (const dht::decorated_key& pkey) { return remote_partitioner.shard_of(pkey.token()) == 0; @@ -2638,7 +2614,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { assert_that(tested_muts[i]).is_equal_to(reference_muts[i]); } - return operations_gate.close(); + return make_ready_future<>(); }).get(); } diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index b52bd1e4bc..cdcfd7ca75 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -27,68 +27,6 @@ class test_reader_lifecycle_policy : public reader_lifecycle_policy , public enable_shared_from_this { -public: - class operations_gate { - public: - class operation { - gate* _g = nullptr; - - private: - void leave() { - if (_g) { - _g->leave(); - } - } - - public: - operation() = default; - explicit operation(gate& g) : _g(&g) { _g->enter(); } - operation(const operation&) = delete; - operation(operation&& o) : _g(std::exchange(o._g, nullptr)) { } - ~operation() { leave(); } - operation& operator=(const operation&) = delete; - operation& operator=(operation&& o) { - leave(); - _g = std::exchange(o._g, nullptr); - return *this; - } - }; - - private: - std::vector _gates; - - public: - operations_gate() - : _gates(smp::count) { - } - - operation enter() { - return operation(_gates[this_shard_id()]); - } - - future<> close() { - return parallel_for_each(boost::irange(smp::count), [this] (shard_id shard) { - return smp::submit_to(shard, [this, shard] { - return _gates[shard].close(); - }); - }); - } - }; - - class semaphore_registry { - std::vector< // 1 per shard - std::list> _semaphores; - public: - semaphore_registry() : _semaphores(smp::count) { } - semaphore_registry(semaphore_registry&&) = delete; - semaphore_registry(const semaphore_registry&) = delete; - template - reader_concurrency_semaphore& create_semaphore(Arg&&... arg) { - return _semaphores[this_shard_id()].emplace_back(std::forward(arg)...); - } - }; - -private: using factory_function = std::function; struct reader_context { - reader_concurrency_semaphore* semaphore = nullptr; - operations_gate::operation op; + std::optional semaphore; std::optional range; std::optional slice; @@ -109,17 +46,13 @@ private: }; factory_function _factory_function; - operations_gate& _operation_gate; - semaphore_registry& _semaphore_registry; std::vector>> _contexts; std::vector> _destroy_futures; bool _evict_paused_readers = false; public: - explicit test_reader_lifecycle_policy(factory_function f, operations_gate& g, semaphore_registry& semaphore_registry, bool evict_paused_readers = false) + explicit test_reader_lifecycle_policy(factory_function f, bool evict_paused_readers = false) : _factory_function(std::move(f)) - , _operation_gate(g) - , _semaphore_registry(semaphore_registry) , _contexts(smp::count) , _evict_paused_readers(evict_paused_readers) { } @@ -138,11 +71,9 @@ public: } else { _contexts[shard] = make_foreign(std::make_unique(range, slice)); } - _contexts[shard]->op = _operation_gate.enter(); return _factory_function(std::move(schema), *_contexts[shard]->range, *_contexts[shard]->slice, pc, std::move(trace_state), fwd_mr); } virtual future<> destroy_reader(stopped_reader reader) noexcept override { - // waited via _operation_gate auto ctx = &*_contexts[this_shard_id()]; auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(reader.handle)); auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); @@ -159,9 +90,9 @@ public: } if (_evict_paused_readers) { // Create with no memory, so all inactive reads are immediately evicted. - _contexts[shard]->semaphore = &_semaphore_registry.create_semaphore(1, 0, format("reader_concurrency_semaphore @shard_id={}", shard)); + _contexts[shard]->semaphore.emplace(1, 0, format("reader_concurrency_semaphore @shard_id={}", shard)); } else { - _contexts[shard]->semaphore = &_semaphore_registry.create_semaphore(reader_concurrency_semaphore::no_limits{}); + _contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{}); } return *_contexts[shard]->semaphore; } From a69db31b5c6f90ee79ab6df43ea5e2f2f0165216 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 15 Jun 2021 12:38:18 +0300 Subject: [PATCH 13/15] test/lib/reader_lifecycle_policy: destroy_reader: cleanup context Now that we don't rely on any external machinery to keep the relevant parts of the context alive until needed as its life-cycle is effectively enclosed in that of the life-cycle policy itself, we can cleanup the context in `destroy_reader()` itself, avoiding a background trip back to this shard. --- test/lib/reader_lifecycle_policy.hh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index cdcfd7ca75..37daf31b16 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -74,11 +74,13 @@ public: return _factory_function(std::move(schema), *_contexts[shard]->range, *_contexts[shard]->slice, pc, std::move(trace_state), fwd_mr); } virtual future<> destroy_reader(stopped_reader reader) noexcept override { - auto ctx = &*_contexts[this_shard_id()]; + auto& ctx = _contexts[this_shard_id()]; auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(reader.handle)); auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); return ret.finally([&ctx] { - return ctx->semaphore->stop(); + return ctx->semaphore->stop().finally([&ctx] { + ctx.release(); + }); }); } virtual reader_concurrency_semaphore& semaphore() override { From 63f08391641bf4b2a4b411d3388d772f39b83311 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 3 Jun 2021 16:23:20 +0300 Subject: [PATCH 14/15] mutation_reader: multishard_combining_reader: store shard_reader via unique ptr No need for a shared pointer anymore, as we don't have to potentially keep the shard reader alive after the multishard reader is destroyed, we now do proper cleanup in close(). We still need a pointer as the shard reader is un-movable but is stored in a vector which requires movable values. --- mutation_reader.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index b135f071b7..52f0cd8b45 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1571,7 +1571,7 @@ namespace { // Although it implements the flat_mutation_reader:impl interface it cannot be // wrapped into a flat_mutation_reader, as it needs to be managed by a shared // pointer. -class shard_reader : public enable_lw_shared_from_this, public flat_mutation_reader::impl { +class shard_reader : public flat_mutation_reader::impl { private: shared_ptr _lifecycle_policy; const unsigned _shard; @@ -1707,7 +1707,7 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) { }); } - return fill_buf_fut.then([this, zis = shared_from_this()] (remote_fill_buffer_result res) mutable { + return fill_buf_fut.then([this] (remote_fill_buffer_result res) mutable { _end_of_stream = res.end_of_stream; for (const auto& mf : *res.buffer) { push_mutation_fragment(mutation_fragment(*_schema, _permit, mf)); @@ -1788,7 +1788,7 @@ class multishard_combining_reader : public flat_mutation_reader::impl { }; const dht::sharder& _sharder; - std::vector> _shard_readers; + std::vector> _shard_readers; // Contains the position of each shard with token granularity, organized // into a min-heap. Used to select the shard with the smallest token each // time a shard reader produces a new partition. @@ -1917,7 +1917,7 @@ multishard_combining_reader::multishard_combining_reader( _shard_readers.reserve(_sharder.shard_count()); for (unsigned i = 0; i < _sharder.shard_count(); ++i) { - _shard_readers.emplace_back(make_lw_shared(_schema, _permit, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr)); + _shard_readers.emplace_back(std::make_unique(_schema, _permit, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr)); } } @@ -1952,7 +1952,7 @@ future<> multishard_combining_reader::fast_forward_to(const dht::partition_range clear_buffer(); _end_of_stream = false; on_partition_range_change(pr); - return parallel_for_each(_shard_readers, [&pr, timeout] (lw_shared_ptr& sr) { + return parallel_for_each(_shard_readers, [&pr, timeout] (std::unique_ptr& sr) { return sr->fast_forward_to(pr, timeout); }); } @@ -1962,7 +1962,7 @@ future<> multishard_combining_reader::fast_forward_to(position_range pr, db::tim } future<> multishard_combining_reader::close() noexcept { - return parallel_for_each(_shard_readers, [] (lw_shared_ptr& sr) { + return parallel_for_each(_shard_readers, [] (std::unique_ptr& sr) { return sr->close(); }); } From 28c2b54875b6ca107267b0510183ec6ef73cdc8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 3 Jun 2021 16:48:10 +0300 Subject: [PATCH 15/15] mutation_reader: reader_lifecycle_policy: remove convenience methods These convenience methods are not used as much anymore and they are not even really necessary as the register/unregister inactive read API got streamlined a lot to the point where all of these "convenience methods" are just one-liners, which we can just inline into their few callers without loosing readability. --- multishard_mutation_query.cc | 4 ++-- mutation_reader.cc | 15 --------------- mutation_reader.hh | 25 ------------------------- 3 files changed, 2 insertions(+), 42 deletions(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index ff2a7043aa..1d35ee1885 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -309,7 +309,7 @@ flat_mutation_reader read_context::create_reader( // The reader is either in inexistent or successful lookup state. if (rm.state == reader_state::successful_lookup) { - if (auto reader_opt = try_resume(std::move(*rm.rparts->handle))) { + if (auto reader_opt = semaphore().unregister_inactive_read(std::move(*rm.rparts->handle))) { rm.state = reader_state::used; return std::move(*reader_opt); } @@ -545,7 +545,7 @@ future<> read_context::lookup_readers() { reinterpret_cast(&semaphore))); } - auto handle = pause(semaphore, std::move(q).reader()); + auto handle = semaphore.register_inactive_read(std::move(q).reader()); return reader_meta( reader_state::successful_lookup, reader_meta::remote_parts(q.permit(), std::move(q).reader_range(), std::move(q).reader_slice(), table.read_in_progress(), diff --git a/mutation_reader.cc b/mutation_reader.cc index 52f0cd8b45..4ce59ed3a2 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1967,21 +1967,6 @@ future<> multishard_combining_reader::close() noexcept { }); } -reader_concurrency_semaphore::inactive_read_handle -reader_lifecycle_policy::pause(reader_concurrency_semaphore& sem, flat_mutation_reader reader) { - return sem.register_inactive_read(std::move(reader)); -} - -reader_concurrency_semaphore::inactive_read_handle -reader_lifecycle_policy::pause(flat_mutation_reader reader) { - return pause(semaphore(), std::move(reader)); -} - -flat_mutation_reader_opt -reader_lifecycle_policy::try_resume(reader_concurrency_semaphore::inactive_read_handle irh) { - return semaphore().unregister_inactive_read(std::move(irh)); -} - flat_mutation_reader make_multishard_combining_reader( shared_ptr lifecycle_policy, schema_ptr schema, diff --git a/mutation_reader.hh b/mutation_reader.hh index 335d8a42b9..48b79083ec 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -442,11 +442,6 @@ public: flat_mutation_reader::tracked_buffer unconsumed_fragments; }; -protected: - // Helpers for implementations, who might wish to provide the semaphore in - // other ways than through the official `semaphore()` override. - static reader_concurrency_semaphore::inactive_read_handle pause(reader_concurrency_semaphore& sem, flat_mutation_reader reader); - public: /// Create an appropriate reader on the shard it is called on. /// @@ -486,26 +481,6 @@ public: /// /// This method will be called on the shard where the relevant reader lives. virtual reader_concurrency_semaphore& semaphore() = 0; - - /// Pause the reader. - /// - /// The purpose of pausing a reader is making it evictable while it is - /// otherwise inactive. This allows freeing up resources that are in-demand - /// by evicting these paused readers. Most notably, this allows freeing up - /// reader permits when the node is overloaded with reads. - /// This is just a helper method, it uses the semaphore returned by - /// `semaphore()` for the actual pausing. - /// \see semaphore() - reader_concurrency_semaphore::inactive_read_handle pause(flat_mutation_reader reader); - - /// Try to resume the reader. - /// - /// The optional returned will be disengaged when resuming fails. This can - /// happen if the reader was evicted while paused. - /// This is just a helper method, it uses the semaphore returned by - /// `semaphore()` for the actual pausing. - /// \see semaphore() - flat_mutation_reader_opt try_resume(reader_concurrency_semaphore::inactive_read_handle irh); }; /// Make a multishard_combining_reader.