restricted_mutation_reader: don't pass timeouts through the config structure
This patch enables passing a timeout to the restricted_mutation_reader through the read path interface -- using fill_buffer and friends. This will serve as a basis for having per-timeout requests. The config structure still has a timeout, but that is so far only used to actually pass the value to the query interface. Once that starts coming from the storage proxy layer (next patch) we will remove. The query callers are patched so that we pass the timeout down. We patch the callers in database.cc, but leave the streaming ones alone. That can be safely done because the default for the query path is now no_timeout, and that is what the streaming code wants. So there is no need to complicate the interface to allow for passing a timeout that we intend to disable. Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
15
database.cc
15
database.cc
@@ -2946,18 +2946,19 @@ future<lw_shared_ptr<query::result>>
|
||||
column_family::query(schema_ptr s, const query::read_command& cmd, query::result_request request,
|
||||
const dht::partition_range_vector& partition_ranges,
|
||||
tracing::trace_state_ptr trace_state, query::result_memory_limiter& memory_limiter,
|
||||
uint64_t max_size) {
|
||||
uint64_t max_size, db::timeout_clock::time_point timeout) {
|
||||
utils::latency_counter lc;
|
||||
_stats.reads.set_latency(lc);
|
||||
auto f = request == query::result_request::only_digest
|
||||
? memory_limiter.new_digest_read(max_size) : memory_limiter.new_data_read(max_size);
|
||||
return f.then([this, lc, s = std::move(s), &cmd, request, &partition_ranges, trace_state = std::move(trace_state)] (query::result_memory_accounter accounter) mutable {
|
||||
return f.then([this, lc, s = std::move(s), &cmd, request, &partition_ranges, trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) mutable {
|
||||
auto qs_ptr = std::make_unique<query_state>(std::move(s), cmd, request, partition_ranges, std::move(accounter));
|
||||
auto& qs = *qs_ptr;
|
||||
return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state)] {
|
||||
return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout] {
|
||||
auto&& range = *qs.current_partition_range++;
|
||||
return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.remaining_rows(),
|
||||
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, trace_state);
|
||||
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, trace_state,
|
||||
timeout);
|
||||
}).then([qs_ptr = std::move(qs_ptr), &qs] {
|
||||
return make_ready_future<lw_shared_ptr<query::result>>(
|
||||
make_lw_shared<query::result>(qs.builder.build()));
|
||||
@@ -3005,7 +3006,8 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_requ
|
||||
column_family& cf = find_column_family(cmd.cf_id);
|
||||
return data_query_stage(&cf, std::move(s), seastar::cref(cmd), request, seastar::cref(ranges),
|
||||
std::move(trace_state), seastar::ref(get_result_memory_limiter()),
|
||||
max_result_size).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) {
|
||||
max_result_size,
|
||||
cf.read_request_timeout()).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) {
|
||||
if (f.failed()) {
|
||||
++s->total_reads_failed;
|
||||
return make_exception_future<lw_shared_ptr<query::result>, cache_temperature>(f.get_exception());
|
||||
@@ -3023,7 +3025,8 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
|
||||
query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_state) {
|
||||
column_family& cf = find_column_family(cmd.cf_id);
|
||||
return mutation_query(std::move(s), cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.partition_limit,
|
||||
cmd.timestamp, std::move(accounter), std::move(trace_state)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) {
|
||||
cmd.timestamp, std::move(accounter), std::move(trace_state),
|
||||
cf.read_request_timeout()).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate()] (auto f) {
|
||||
if (f.failed()) {
|
||||
++s->total_reads_failed;
|
||||
return make_exception_future<reconcilable_result, cache_temperature>(f.get_exception());
|
||||
|
||||
19
database.hh
19
database.hh
@@ -641,7 +641,8 @@ public:
|
||||
const dht::partition_range_vector& ranges,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
query::result_memory_limiter& memory_limiter,
|
||||
uint64_t max_result_size);
|
||||
uint64_t max_result_size,
|
||||
db::timeout_clock::time_point timeout = db::no_timeout);
|
||||
|
||||
void start();
|
||||
future<> stop();
|
||||
@@ -838,6 +839,14 @@ public:
|
||||
friend class column_family_test;
|
||||
|
||||
friend class distributed_loader;
|
||||
|
||||
db::timeout_clock::time_point read_request_timeout() const {
|
||||
auto timeout = _config.read_concurrency_config.timeout;
|
||||
if (timeout.count() == 0) {
|
||||
return db::no_timeout;
|
||||
}
|
||||
return db::timeout_clock::now() + timeout;
|
||||
}
|
||||
};
|
||||
|
||||
using sstable_reader_factory_type = std::function<flat_mutation_reader(sstables::shared_sstable&, const dht::partition_range& pr)>;
|
||||
@@ -1072,10 +1081,10 @@ private:
|
||||
seastar::thread_scheduling_group _background_writer_scheduling_group;
|
||||
flush_cpu_controller _memtable_cpu_controller;
|
||||
|
||||
semaphore _read_concurrency_sem{max_memory_concurrent_reads()};
|
||||
semaphore _streaming_concurrency_sem{max_memory_streaming_concurrent_reads()};
|
||||
db::timeout_semaphore _read_concurrency_sem{max_memory_concurrent_reads()};
|
||||
db::timeout_semaphore _streaming_concurrency_sem{max_memory_streaming_concurrent_reads()};
|
||||
restricted_mutation_reader_config _read_concurrency_config;
|
||||
semaphore _system_read_concurrency_sem{max_memory_system_concurrent_reads()};
|
||||
db::timeout_semaphore _system_read_concurrency_sem{max_memory_system_concurrent_reads()};
|
||||
restricted_mutation_reader_config _system_read_concurrency_config;
|
||||
|
||||
semaphore _sstable_load_concurrency_sem{max_concurrent_sstable_loads()};
|
||||
@@ -1246,7 +1255,7 @@ public:
|
||||
std::unordered_set<sstring> get_initial_tokens();
|
||||
std::experimental::optional<gms::inet_address> get_replace_address();
|
||||
bool is_replacing();
|
||||
semaphore& system_keyspace_read_concurrency_sem() {
|
||||
db::timeout_semaphore& system_keyspace_read_concurrency_sem() {
|
||||
return _system_read_concurrency_sem;
|
||||
}
|
||||
semaphore& sstable_load_concurrency_sem() {
|
||||
|
||||
@@ -646,13 +646,13 @@ mutation_reader make_empty_reader() {
|
||||
// operations.
|
||||
class tracking_file_impl : public file_impl {
|
||||
file _tracked_file;
|
||||
semaphore* _semaphore;
|
||||
db::timeout_semaphore* _semaphore;
|
||||
|
||||
// Shouldn't be called if semaphore is NULL.
|
||||
temporary_buffer<uint8_t> make_tracked_buf(temporary_buffer<uint8_t> buf) {
|
||||
return seastar::temporary_buffer<uint8_t>(buf.get_write(),
|
||||
buf.size(),
|
||||
make_deleter(buf.release(), std::bind(&semaphore::signal, _semaphore, buf.size())));
|
||||
make_deleter(buf.release(), std::bind(&db::timeout_semaphore::signal, _semaphore, buf.size())));
|
||||
}
|
||||
|
||||
public:
|
||||
@@ -756,9 +756,9 @@ class restricting_mutation_reader : public flat_mutation_reader::impl {
|
||||
|
||||
static const std::size_t new_reader_base_cost{16 * 1024};
|
||||
|
||||
future<> create_reader() {
|
||||
auto f = _config.timeout.count() != 0
|
||||
? _config.resources_sem->wait(_config.timeout, new_reader_base_cost)
|
||||
future<> create_reader(db::timeout_clock::time_point timeout) {
|
||||
auto f = timeout != db::no_timeout
|
||||
? _config.resources_sem->wait(timeout, new_reader_base_cost)
|
||||
: _config.resources_sem->wait(new_reader_base_cost);
|
||||
|
||||
return f.then([this] {
|
||||
@@ -780,11 +780,11 @@ class restricting_mutation_reader : public flat_mutation_reader::impl {
|
||||
fn(reader);
|
||||
}
|
||||
)
|
||||
decltype(auto) with_reader(Function fn) {
|
||||
decltype(auto) with_reader(Function fn, db::timeout_clock::time_point timeout) {
|
||||
if (auto* reader = boost::get<flat_mutation_reader>(&_reader_or_mutation_source)) {
|
||||
return fn(*reader);
|
||||
}
|
||||
return create_reader().then([this, fn = std::move(fn)] () mutable {
|
||||
return create_reader(timeout).then([this, fn = std::move(fn)] () mutable {
|
||||
return fn(boost::get<flat_mutation_reader>(_reader_or_mutation_source));
|
||||
});
|
||||
}
|
||||
@@ -823,7 +823,7 @@ public:
|
||||
push_mutation_fragment(reader.pop_mutation_fragment());
|
||||
}
|
||||
});
|
||||
});
|
||||
}, timeout);
|
||||
}
|
||||
virtual void next_partition() override {
|
||||
clear_buffer_to_next_partition();
|
||||
@@ -840,14 +840,14 @@ public:
|
||||
_end_of_stream = false;
|
||||
return with_reader([&pr, timeout] (flat_mutation_reader& reader) {
|
||||
return reader.fast_forward_to(pr, timeout);
|
||||
});
|
||||
}, timeout);
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
|
||||
forward_buffer_to(pr.start());
|
||||
_end_of_stream = false;
|
||||
return with_reader([pr = std::move(pr), timeout] (flat_mutation_reader& reader) mutable {
|
||||
return reader.fast_forward_to(std::move(pr), timeout);
|
||||
});
|
||||
}, timeout);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -478,9 +478,9 @@ mutation_source make_empty_mutation_source();
|
||||
snapshot_source make_empty_snapshot_source();
|
||||
|
||||
struct restricted_mutation_reader_config {
|
||||
semaphore* resources_sem = nullptr;
|
||||
db::timeout_semaphore* resources_sem = nullptr;
|
||||
uint64_t* active_reads = nullptr;
|
||||
std::chrono::nanoseconds timeout = {};
|
||||
db::timeout_clock::time_point::duration timeout = {};
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max();
|
||||
std::function<void ()> raise_queue_overloaded_exception = default_raise_queue_overloaded_exception;
|
||||
|
||||
|
||||
@@ -23,12 +23,13 @@
|
||||
|
||||
#include <core/file.hh>
|
||||
#include <core/semaphore.hh>
|
||||
#include "db/timeout_clock.hh"
|
||||
|
||||
class reader_resource_tracker {
|
||||
seastar::semaphore* _sem = nullptr;
|
||||
db::timeout_semaphore* _sem = nullptr;
|
||||
public:
|
||||
reader_resource_tracker() = default;
|
||||
explicit reader_resource_tracker(seastar::semaphore* sem)
|
||||
explicit reader_resource_tracker(db::timeout_semaphore* sem)
|
||||
: _sem(sem) {
|
||||
}
|
||||
|
||||
@@ -38,7 +39,7 @@ public:
|
||||
|
||||
file track(file f) const;
|
||||
|
||||
semaphore* get_semaphore() const {
|
||||
db::timeout_semaphore* get_semaphore() const {
|
||||
return _sem;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -758,7 +758,7 @@ class tracking_reader : public flat_mutation_reader::impl {
|
||||
std::size_t _call_count{0};
|
||||
std::size_t _ff_count{0};
|
||||
public:
|
||||
tracking_reader(semaphore* resources_sem, schema_ptr schema, lw_shared_ptr<sstables::sstable> sst)
|
||||
tracking_reader(db::timeout_semaphore* resources_sem, schema_ptr schema, lw_shared_ptr<sstables::sstable> sst)
|
||||
: impl(schema)
|
||||
, _reader(sst->read_range_rows_flat(
|
||||
schema,
|
||||
@@ -813,12 +813,16 @@ public:
|
||||
class reader_wrapper {
|
||||
flat_mutation_reader _reader;
|
||||
tracking_reader* _tracker{nullptr};
|
||||
|
||||
db::timeout_clock::time_point _timeout;
|
||||
public:
|
||||
reader_wrapper(
|
||||
const restricted_mutation_reader_config& config,
|
||||
schema_ptr schema,
|
||||
lw_shared_ptr<sstables::sstable> sst) : _reader(make_empty_flat_reader(schema)) {
|
||||
lw_shared_ptr<sstables::sstable> sst,
|
||||
db::timeout_clock::duration timeout_duration = {})
|
||||
: _reader(make_empty_flat_reader(schema))
|
||||
, _timeout(db::timeout_clock::now() + timeout_duration)
|
||||
{
|
||||
auto ms = mutation_source([this, &config, sst=std::move(sst)] (schema_ptr schema, const dht::partition_range&) {
|
||||
auto tracker_ptr = std::make_unique<tracking_reader>(config.resources_sem, std::move(schema), std::move(sst));
|
||||
_tracker = tracker_ptr.get();
|
||||
@@ -832,7 +836,7 @@ public:
|
||||
while (!_reader.is_buffer_empty()) {
|
||||
_reader.pop_mutation_fragment();
|
||||
}
|
||||
return _reader.fill_buffer(db::no_timeout);
|
||||
return _reader.fill_buffer(_timeout);
|
||||
}
|
||||
|
||||
future<> fast_forward_to(const dht::partition_range& pr) {
|
||||
@@ -853,15 +857,13 @@ public:
|
||||
};
|
||||
|
||||
struct restriction_data {
|
||||
std::unique_ptr<semaphore> reader_semaphore;
|
||||
std::unique_ptr<db::timeout_semaphore> reader_semaphore;
|
||||
restricted_mutation_reader_config config;
|
||||
|
||||
restriction_data(std::size_t units,
|
||||
std::chrono::nanoseconds timeout = {},
|
||||
std::size_t max_queue_length = std::numeric_limits<std::size_t>::max())
|
||||
: reader_semaphore(std::make_unique<semaphore>(units)) {
|
||||
: reader_semaphore(std::make_unique<db::timeout_semaphore>(units)) {
|
||||
config.resources_sem = reader_semaphore.get();
|
||||
config.timeout = timeout;
|
||||
config.max_queue_length = max_queue_length;
|
||||
}
|
||||
};
|
||||
@@ -1033,24 +1035,25 @@ SEASTAR_TEST_CASE(restricted_reader_reading) {
|
||||
SEASTAR_TEST_CASE(restricted_reader_timeout) {
|
||||
return async([&] {
|
||||
storage_service_for_tests ssft;
|
||||
restriction_data rd(new_reader_base_cost, std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds{10}));
|
||||
|
||||
restriction_data rd(new_reader_base_cost);
|
||||
{
|
||||
simple_schema s;
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
auto sst = create_sstable(s, tmp->path);
|
||||
|
||||
auto reader1 = reader_wrapper(rd.config, s.schema(), sst);
|
||||
auto timeout = std::chrono::duration_cast<db::timeout_clock::time_point::duration>(std::chrono::milliseconds{10});
|
||||
auto reader1 = reader_wrapper(rd.config, s.schema(), sst, timeout);
|
||||
|
||||
reader1().get();
|
||||
|
||||
auto reader2 = reader_wrapper(rd.config, s.schema(), sst);
|
||||
auto reader2 = reader_wrapper(rd.config, s.schema(), sst, timeout);
|
||||
auto read_fut = reader2();
|
||||
|
||||
seastar::sleep(std::chrono::milliseconds(20)).get();
|
||||
|
||||
// The read should have timed out.
|
||||
BOOST_REQUIRE(read_fut.failed());
|
||||
BOOST_REQUIRE_THROW(std::rethrow_exception(read_fut.get_exception()), semaphore_timed_out);
|
||||
BOOST_REQUIRE_THROW(std::rethrow_exception(read_fut.get_exception()), timed_out_error);
|
||||
}
|
||||
|
||||
// All units should have been deposited back.
|
||||
@@ -1061,7 +1064,7 @@ SEASTAR_TEST_CASE(restricted_reader_timeout) {
|
||||
SEASTAR_TEST_CASE(restricted_reader_max_queue_length) {
|
||||
return async([&] {
|
||||
storage_service_for_tests ssft;
|
||||
restriction_data rd(new_reader_base_cost, {}, 1);
|
||||
restriction_data rd(new_reader_base_cost, 1);
|
||||
|
||||
{
|
||||
simple_schema s;
|
||||
|
||||
Reference in New Issue
Block a user