shard_reader: use semaphore directly to pause-resume

The shard reader relies on the `reader_lifecycle_policy` for pausing and
resuming the remote reader. The lifecycle policy's API was designed to
be as general as possible, allowing for any implementation of
pause/resume. However, in practice, we have a single implementation of
pause/resume: registering/unregistering the reader with the relevant
`reader_concurrency_semaphore`, and we don't expect any new
implementations to appear in the future.
Thus, the generic API of the lifecycle policy, is needlessly abstract
making its implementations needlessly complex. We can instead make this
very concrete and have the lifecycle policy just return the relevant
semaphore, removing the need for every implementor of the lifecycle
policy interface to have a duplicate implementation of the very same
logic.

For now just emulate the old interface inside shard reader. We will
overhaul the shard reader after some further changes to minimize noise.
This commit is contained in:
Botond Dénes
2019-01-22 13:19:20 +02:00
parent fae5a2a8c8
commit 57d1f6589c
5 changed files with 158 additions and 154 deletions

View File

@@ -1862,19 +1862,9 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
class streaming_reader_lifecycle_policy
: public reader_lifecycle_policy
, public enable_shared_from_this<streaming_reader_lifecycle_policy> {
struct inactive_read : public reader_concurrency_semaphore::inactive_read {
foreign_unique_ptr<flat_mutation_reader> reader;
explicit inactive_read(foreign_unique_ptr<flat_mutation_reader> reader)
: reader(std::move(reader)) {
}
virtual void evict() override {
reader.reset();
}
};
struct reader_context {
std::unique_ptr<const dht::partition_range> range;
foreign_unique_ptr<utils::phased_barrier::operation> read_operation;
std::optional<reader_concurrency_semaphore::inactive_read_handle> pause_handle_opt;
reader_concurrency_semaphore* semaphore;
};
distributed<database>& _db;
@@ -1908,25 +1898,14 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
virtual void destroy_reader(shard_id shard, future<paused_or_stopped_reader> reader_fut) noexcept override {
reader_fut.then([this, zis = shared_from_this(), shard] (paused_or_stopped_reader&& reader) mutable {
return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), reader = std::move(reader.remote_reader)] () mutable {
reader.release();
if (auto maybe_paused_reader = std::get_if<paused_or_stopped_reader::paused_reader>(&reader)) {
ctx.semaphore->unregister_inactive_read(std::move(**maybe_paused_reader));
}
});
});
}
virtual future<> pause(foreign_unique_ptr<flat_mutation_reader> reader) override {
const auto shard = reader.get_owner_shard();
return _db.invoke_on(shard, [sem = _contexts[shard].semaphore, reader = std::move(reader)] (database& db) mutable {
return sem->register_inactive_read(std::make_unique<inactive_read>(std::move(reader)));
}).then([this, zis = shared_from_this(), shard] (reader_concurrency_semaphore::inactive_read_handle handle) {
_contexts[shard].pause_handle_opt = handle;
});
}
virtual future<foreign_unique_ptr<flat_mutation_reader>> try_resume(shard_id shard) override {
return _db.invoke_on(shard, [sem = _contexts[shard].semaphore, handle = *_contexts[shard].pause_handle_opt] (database& db) mutable {
if (auto ir_ptr = sem->unregister_inactive_read(handle)) {
return std::move(static_cast<inactive_read&>(*ir_ptr).reader);
}
return foreign_unique_ptr<flat_mutation_reader>{};
});
virtual reader_concurrency_semaphore& semaphore() override {
return *_contexts[engine().cpu_id()].semaphore;
}
};
auto ms = mutation_source([&db, &partitioner] (schema_ptr s,

View File

@@ -85,19 +85,9 @@ class read_context : public reader_lifecycle_policy {
reader_concurrency_semaphore* semaphore;
};
struct paused_reader {
shard_id shard;
reader_concurrency_semaphore::inactive_read_handle handle;
foreign_unique_ptr<reader_concurrency_semaphore::inactive_read_handle> handle;
bool has_pending_next_partition;
};
struct inactive_read : public reader_concurrency_semaphore::inactive_read {
foreign_unique_ptr<flat_mutation_reader> reader;
explicit inactive_read(foreign_unique_ptr<flat_mutation_reader> reader)
: reader(std::move(reader)) {
}
virtual void evict() override {
reader.reset();
}
};
using inexistent_state = std::monostate;
struct successful_lookup_state {
@@ -121,32 +111,23 @@ class read_context : public reader_lifecycle_policy {
std::variant<foreign_unique_ptr<flat_mutation_reader>, paused_reader> reader;
circular_buffer<mutation_fragment> buffer;
};
struct paused_state {
foreign_unique_ptr<reader_params> params;
foreign_unique_ptr<utils::phased_barrier::operation> read_operation;
reader_concurrency_semaphore::inactive_read_handle handle;
};
struct evicted_state {
};
// ( ) (O)
// | ^
// | |
// +--- inexistent ---+
// | |
// (1) | (3) | (3)
// | | +------ evicted -> (O)
// successful_lookup | | ^
// | | | | (7) |
// | | | +-------+ | (8)
// | | (4) | | | |
// | +----------> used paused
// | | | (6) ^ |
// (2) | | +-------+ |
// | (5) | | (5)
// | | |
// | | |
// | dismantling <------+
// (1) | (3) |
// | |
// successful_lookup |
// | | |
// | | |
// | | (4) |
// | +---------> used
// (2) | |
// | (5) |
// | |
// | dismantling
// | |
// | (2) |
// | |
@@ -159,15 +140,10 @@ class read_context : public reader_lifecycle_policy {
// 3) do_make_remote_reader()
// 4) make_remote_reader()
// 5) dismantle_reader()
// 6) pause_reader()
// 7) try_resume() - success
// 8) try_resume() - failure
using reader_state = std::variant<
inexistent_state,
successful_lookup_state,
used_state,
paused_state,
evicted_state,
dismantling_state,
ready_to_save_state>;
@@ -240,10 +216,6 @@ class read_context : public reader_lifecycle_policy {
gate _dismantling_gate;
reader_concurrency_semaphore& semaphore() {
return *_semaphores[engine().cpu_id()];
}
static std::string_view reader_state_to_string(const reader_state& rs);
static future<bundled_remote_reader> do_make_remote_reader(
@@ -304,8 +276,9 @@ public:
dismantle_reader(shard, std::move(reader_fut));
}
virtual future<> pause(foreign_unique_ptr<flat_mutation_reader> reader) override;
virtual future<foreign_unique_ptr<flat_mutation_reader>> try_resume(shard_id shard) override;
virtual reader_concurrency_semaphore& semaphore() override {
return *_semaphores[engine().cpu_id()];
}
future<> lookup_readers();
@@ -321,16 +294,12 @@ std::string_view read_context::reader_state_to_string(const std::variant<
inexistent_state,
successful_lookup_state,
used_state,
paused_state,
evicted_state,
dismantling_state,
ready_to_save_state>& rs) {
static const std::array<const char*, 7> reader_state_names{
"inexistent",
"successful_lookup",
"used",
"paused",
"evicted",
"dismantling",
"ready_to_save",
};
@@ -373,16 +342,15 @@ future<foreign_unique_ptr<flat_mutation_reader>> read_context::make_remote_reade
mutation_reader::forwarding) {
auto& rs = _readers[shard];
if (!std::holds_alternative<successful_lookup_state>(rs) && !std::holds_alternative<inexistent_state>(rs)
&& !std::holds_alternative<evicted_state>(rs)) {
if (!std::holds_alternative<used_state>(rs) && !std::holds_alternative<successful_lookup_state>(rs) && !std::holds_alternative<inexistent_state>(rs)) {
auto msg = format("Unexpected request to create reader for shard {}."
" The reader is expected to be in either `successful_lookup`, `inexistent` or `evicted` state,"
" The reader is expected to be in either `used`, `successful_lookup` or `inexistent` state,"
" but is in `{}` state instead.", shard, reader_state_to_string(rs));
mmq_log.warn(msg.c_str());
throw std::logic_error(msg.c_str());
}
// The reader is either in inexistent, evicted or successful lookup state.
// The reader is either in inexistent or successful lookup state.
if (auto current_state = std::get_if<successful_lookup_state>(&rs)) {
auto reader = std::move(current_state->reader);
rs = used_state{std::move(current_state->params), std::move(current_state->read_operation)};
@@ -413,16 +381,17 @@ void read_context::dismantle_reader(shard_id shard, future<paused_or_stopped_rea
if (auto* maybe_used_state = std::get_if<used_state>(&rs)) {
auto read_operation = std::move(maybe_used_state->read_operation);
auto params = std::move(maybe_used_state->params);
rs = dismantling_state{std::move(params), std::move(read_operation), std::move(reader.remote_reader),
std::move(reader.unconsumed_fragments)};
} else if (auto* maybe_paused_state = std::get_if<paused_state>(&rs)) {
auto read_operation = std::move(maybe_paused_state->read_operation);
auto params = std::move(maybe_paused_state->params);
auto handle = maybe_paused_state->handle;
rs = dismantling_state{std::move(params), std::move(read_operation), paused_reader{shard, handle, reader.has_pending_next_partition},
std::move(reader.unconsumed_fragments)};
// Do nothing for evicted readers.
} else if (!std::holds_alternative<evicted_state>(rs)) {
if (auto maybe_paused_reader = std::get_if<paused_or_stopped_reader::paused_reader>(&reader.remote_reader)) {
rs = dismantling_state{std::move(params), std::move(read_operation),
paused_reader{std::move(*maybe_paused_reader), reader.has_pending_next_partition}, std::move(reader.unconsumed_fragments)};
} else {
rs = dismantling_state{
std::move(params),
std::move(read_operation),
std::get<paused_or_stopped_reader::stopped_reader>(std::move(reader.remote_reader)),
std::move(reader.unconsumed_fragments)};
}
} else {
mmq_log.warn(
"Unexpected request to dismantle reader in state `{}` for shard {}."
" Reader was not created nor is in the process of being created.",
@@ -448,7 +417,7 @@ future<> read_context::stop() {
} else {
// We cannot use semaphore() here, as this can be already destroyed.
auto& table = db.find_column_family(schema);
table.read_concurrency_semaphore().unregister_inactive_read(std::get<paused_reader>(reader).handle);
table.read_concurrency_semaphore().unregister_inactive_read(std::move(*std::get<paused_reader>(reader).handle));
}
params.release();
read_operation.release();
@@ -553,7 +522,7 @@ future<> read_context::save_reader(ready_to_save_state& current_state, const dht
auto* maybe_stopped_reader = std::get_if<foreign_unique_ptr<flat_mutation_reader>>(&current_state.reader);
const auto shard = maybe_stopped_reader
? maybe_stopped_reader->get_owner_shard()
: std::get<paused_reader>(current_state.reader).shard;
: std::get<paused_reader>(current_state.reader).handle.get_owner_shard();
return _db.invoke_on(shard, [this, shard, query_uuid = _cmd.query_uuid, query_ranges = _ranges, &current_state,
&last_pkey, &last_ckey, gts = tracing::global_trace_state_ptr(_trace_state)] (database& db) mutable {
@@ -563,8 +532,8 @@ future<> read_context::save_reader(ready_to_save_state& current_state, const dht
flat_mutation_reader_opt reader;
if (auto* maybe_paused_reader = std::get_if<paused_reader>(&current_state.reader)) {
if (auto inactive_read_ptr = semaphore().unregister_inactive_read(maybe_paused_reader->handle)) {
reader = std::move(*static_cast<inactive_read&>(*inactive_read_ptr).reader);
if (auto reader_opt = try_resume(std::move(*maybe_paused_reader->handle))) {
reader = std::move(*reader_opt);
if (maybe_paused_reader->has_pending_next_partition) {
reader->next_partition();
}
@@ -619,33 +588,6 @@ future<> read_context::save_reader(ready_to_save_state& current_state, const dht
});
}
future<> read_context::pause(foreign_unique_ptr<flat_mutation_reader> reader) {
const auto shard = reader.get_owner_shard();
return _db.invoke_on(shard, [this, reader = std::move(reader)] (database& db) mutable {
return semaphore().register_inactive_read(std::make_unique<inactive_read>(std::move(reader)));
}).then([this, shard] (reader_concurrency_semaphore::inactive_read_handle handle) {
auto& current_state = std::get<used_state>(_readers[shard]);
_readers[shard] = paused_state{std::move(current_state.params), std::move(current_state.read_operation), handle};
});
}
future<foreign_unique_ptr<flat_mutation_reader>> read_context::try_resume(shard_id shard) {
return _db.invoke_on(shard, [this, handle = std::get<paused_state>(_readers[shard]).handle] (database& db) mutable {
if (auto inactive_read_ptr = semaphore().unregister_inactive_read(handle)) {
return std::move(static_cast<inactive_read&>(*inactive_read_ptr).reader);
}
return foreign_unique_ptr<flat_mutation_reader>();
}).then([this, shard] (foreign_unique_ptr<flat_mutation_reader> reader) {
if (reader) {
auto& current_state = std::get<paused_state>(_readers[shard]);
_readers[shard] = used_state{std::move(current_state.params), std::move(current_state.read_operation)};
} else {
_readers[shard] = evicted_state{};
}
return std::move(reader);
});
}
future<> read_context::lookup_readers() {
if (_cmd.query_uuid == utils::UUID{} || _cmd.is_first_page) {
return make_ready_future<>();

View File

@@ -895,6 +895,7 @@ private:
tracing::global_trace_state_ptr _trace_state;
const mutation_reader::forwarding _fwd_mr;
foreign_ptr<std::unique_ptr<flat_mutation_reader>> _reader;
foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>> _irh;
bool _pending_next_partition = false;
bool _stopped = false;
bool _drop_partition_start = false;
@@ -917,6 +918,8 @@ private:
future<> resume();
future<> do_fill_buffer(db::timeout_clock::time_point timeout);
void process_remote_buffer(const circular_buffer<mutation_fragment>& buffer, bool end_of_stream);
future<foreign_ptr<std::unique_ptr<flat_mutation_reader>>> try_resume();
future<> do_pause();
public:
shard_reader(
@@ -987,7 +990,11 @@ void shard_reader::stop() noexcept {
}();
_lifecycle_policy->destroy_reader(_shard, f.then([this] {
return reader_lifecycle_policy::paused_or_stopped_reader{std::move(_reader), detach_buffer(), _pending_next_partition};
if (_reader) {
return reader_lifecycle_policy::paused_or_stopped_reader{std::move(_reader), detach_buffer(), _pending_next_partition};
} else {
return reader_lifecycle_policy::paused_or_stopped_reader{std::move(_irh), detach_buffer(), _pending_next_partition};
}
}).finally([zis = shared_from_this()] {}));
}
@@ -1094,7 +1101,7 @@ future<> shard_reader::resume() {
if (_stopped) {
return make_ready_future<>();
}
return _lifecycle_policy->try_resume(_shard).then([this] (foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader) mutable {
return try_resume().then([this] (foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader) mutable {
if (reader) {
_reader = std::move(reader);
return make_ready_future<>();
@@ -1157,6 +1164,24 @@ void shard_reader::process_remote_buffer(const circular_buffer<mutation_fragment
}
}
future<foreign_ptr<std::unique_ptr<flat_mutation_reader>>> shard_reader::try_resume() {
return smp::submit_to(_shard, [this, irh = std::exchange(_irh, {})] () mutable {
if (auto reader_opt = _lifecycle_policy->try_resume(std::move(*irh))) {
return make_foreign(std::make_unique<flat_mutation_reader>(std::move(*reader_opt)));
} else {
return foreign_ptr<std::unique_ptr<flat_mutation_reader>>{};
}
});
}
future<> shard_reader::do_pause() {
return smp::submit_to(_shard, [this, reader = std::exchange(_reader, {})] () mutable {
return make_foreign(std::make_unique<reader_concurrency_semaphore::inactive_read_handle>(_lifecycle_policy->pause(std::move(*reader))));
}).then([this] (foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>> irh) {
_irh = std::move(irh);
});
}
future<> shard_reader::fill_buffer(db::timeout_clock::time_point timeout) {
if (_read_ahead) {
return *std::exchange(_read_ahead, std::nullopt);
@@ -1258,7 +1283,7 @@ void shard_reader::pause() {
return make_ready_future<>();
}
return _lifecycle_policy->pause(std::move(_reader));
return do_pause();
}).finally([zis = shared_from_this()] {});
}
@@ -1455,6 +1480,49 @@ future<> multishard_combining_reader::fast_forward_to(position_range pr, db::tim
return make_exception_future<>(std::bad_function_call());
}
namespace {
class inactive_shard_read : public reader_concurrency_semaphore::inactive_read {
flat_mutation_reader_opt _reader;
public:
inactive_shard_read(flat_mutation_reader reader)
: _reader(std::move(reader)) {
}
flat_mutation_reader reader() && {
return std::move(*_reader);
}
virtual void evict() override {
_reader = {};
}
};
}
reader_concurrency_semaphore::inactive_read_handle
reader_lifecycle_policy::pause(reader_concurrency_semaphore& sem, flat_mutation_reader reader) {
return sem.register_inactive_read(std::make_unique<inactive_shard_read>(std::move(reader)));
}
flat_mutation_reader_opt
reader_lifecycle_policy::try_resume(reader_concurrency_semaphore& sem, reader_concurrency_semaphore::inactive_read_handle irh) {
auto ir_ptr = sem.unregister_inactive_read(std::move(irh));
if (!ir_ptr) {
return {};
}
auto& ir = static_cast<inactive_shard_read&>(*ir_ptr);
return std::move(ir).reader();
}
reader_concurrency_semaphore::inactive_read_handle
reader_lifecycle_policy::pause(flat_mutation_reader reader) {
return pause(semaphore(), std::move(reader));
}
flat_mutation_reader_opt
reader_lifecycle_policy::try_resume(reader_concurrency_semaphore::inactive_read_handle irh) {
return try_resume(semaphore(), std::move(irh));
}
flat_mutation_reader make_multishard_combining_reader(
shared_ptr<reader_lifecycle_policy> lifecycle_policy,
const dht::i_partitioner& partitioner,

View File

@@ -404,13 +404,21 @@ flat_mutation_reader make_foreign_reader(schema_ptr schema,
class reader_lifecycle_policy {
public:
struct paused_or_stopped_reader {
// Null when the reader is paused.
foreign_ptr<std::unique_ptr<flat_mutation_reader>> remote_reader;
using paused_reader = foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>>;
using stopped_reader = foreign_ptr<std::unique_ptr<flat_mutation_reader>>;
std::variant<paused_reader, stopped_reader> remote_reader;
circular_buffer<mutation_fragment> unconsumed_fragments;
// Only set for paused readers.
bool has_pending_next_partition;
};
protected:
// Helpers for implementations, who might wish to provide the semaphore in
// other ways than through the official `semaphore()` override.
static reader_concurrency_semaphore::inactive_read_handle pause(reader_concurrency_semaphore& sem, flat_mutation_reader reader);
static flat_mutation_reader_opt try_resume(reader_concurrency_semaphore& sem, reader_concurrency_semaphore::inactive_read_handle irh);
public:
/// Create an appropriate reader on the specified shard.
///
@@ -443,19 +451,37 @@ public:
/// This method will be called from a destructor so it cannot throw.
virtual void destroy_reader(shard_id shard, future<paused_or_stopped_reader> reader) noexcept = 0;
/// Get the relevant semaphore for this read.
///
/// The semaphore is used to register paused readers with as inactive
/// readers. The semaphore then can evict these readers when resources are
/// in-demand.
/// The multishard reader will pause and resume readers via the `pause()`
/// and `try_resume()` helper methods. Clients can resume any paused readers
/// after the multishard reader is destroyed via the same helper methods.
///
/// This method will be called on the shard where the relevant reader lives.
virtual reader_concurrency_semaphore& semaphore() = 0;
/// Pause the reader.
///
/// The purpose of pausing a reader is making it evictable while it is
/// otherwise inactive. This allows freeing up resources that are in-demand
/// by evicting these paused readers. Most notably, this allows freeing up
/// reader permits when the node is overloaded with reads.
virtual future<> pause(foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader) = 0;
/// This is just a helper method, it uses the semaphore returned by
/// `semaphore()` for the actual pausing.
/// \see semaphore()
reader_concurrency_semaphore::inactive_read_handle pause(flat_mutation_reader reader);
/// Try to resume the reader.
///
/// The pointer returned will be null when resuming fails. This can happen
/// if the reader was evicted while paused.
virtual future<foreign_ptr<std::unique_ptr<flat_mutation_reader>>> try_resume(shard_id shard) = 0;
/// The optional returned will be disengaged when resuming fails. This can
/// happen if the reader was evicted while paused.
/// This is just a helper method, it uses the semaphore returned by
/// `semaphore()` for the actual pausing.
/// \see semaphore()
flat_mutation_reader_opt try_resume(reader_concurrency_semaphore::inactive_read_handle irh);
};
/// Make a multishard_combining_reader.

View File

@@ -1523,8 +1523,6 @@ public:
enum class operation {
none,
create,
pause,
try_resume,
};
using delay_function = std::function<future<>()>;
@@ -1548,8 +1546,8 @@ private:
const query::partition_slice slice;
};
struct reader_context {
foreign_ptr<std::unique_ptr<reader_concurrency_semaphore>> semaphore;
std::unique_ptr<reader_params> params;
foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader;
operation operation_in_progress = operation::none;
};
@@ -1565,10 +1563,6 @@ private:
return "none";
case operation::create:
return "create";
case operation::pause:
return "pause";
case operation::try_resume:
return "try_resume";
}
return "unknown";
}
@@ -1606,32 +1600,27 @@ public:
virtual void destroy_reader(shard_id shard, future<paused_or_stopped_reader> reader) noexcept override {
reader.then([shard, this] (paused_or_stopped_reader&& reader) {
return smp::submit_to(shard, [reader = std::move(reader.remote_reader), ctx = std::move(_contexts[shard])] () mutable {
reader.release();
if (auto* maybe_paused_reader = std::get_if<paused_or_stopped_reader::paused_reader>(&reader)) {
ctx.semaphore->unregister_inactive_read(std::move(**maybe_paused_reader));
} else {
std::get<paused_or_stopped_reader::stopped_reader>(reader).release();
}
});
}).finally([zis = shared_from_this()] {});
}
virtual future<> pause(foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader) override {
const auto shard = reader.get_owner_shard();
set_current_operation(shard, operation::pause);
return _delay().then([this, shard, reader = std::move(reader)] () mutable {
_contexts[shard].reader = std::move(reader);
}).finally([this, zis = shared_from_this(), shard] {
_contexts[shard].operation_in_progress = operation::none;
});
}
virtual future<foreign_ptr<std::unique_ptr<flat_mutation_reader>>> try_resume(shard_id shard) override {
set_current_operation(shard, operation::try_resume);
return _delay().then([this, shard] {
virtual reader_concurrency_semaphore& semaphore() override {
const auto shard = engine().cpu_id();
if (!_contexts[shard].semaphore) {
if (_evict_paused_readers) {
_contexts[shard].reader.reset();
_contexts[shard].semaphore = make_foreign(std::make_unique<reader_concurrency_semaphore>(0, std::numeric_limits<ssize_t>::max()));
// Add a waiter, so that all registered inactive reads are
// immediately evicted.
_contexts[shard].semaphore->wait_admission(1);
} else {
_contexts[shard].semaphore = make_foreign(std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}));
}
return std::move(_contexts[shard].reader);
}).finally([this, zis = shared_from_this(), shard] {
_contexts[shard].operation_in_progress = operation::none;
});
}
return *_contexts[shard].semaphore;
}
};