query: add result size limiter

This patch introduces an infrastrucutre for limiting result size.

There is a shard-local limit which makes sure that all results combined
do not use more than 10% of the shard memory.
There is also an invidual limit which restricts a result to 4 MB.
In order

In order to avoid sending tiny results there is minimum guaranteed size
(4 kB), which the query needs to reserve before it starts producing the
result.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
Paweł Dziepak
2016-11-17 16:05:06 +00:00
parent 43fe3439ca
commit ee89d80d5c
2 changed files with 158 additions and 0 deletions

View File

@@ -33,6 +33,162 @@ namespace stdx = std::experimental;
namespace query {
// result_memory_limiter, result_memory_accounter and result_memory_tracker
// form an infrastructure for limiting size of query results.
//
// result_memory_limiter is a shard-local object which ensures that all results
// combined do not use more than 10% of the shard memory.
//
// result_memory_accounter is used by result producers, updates the shard-local
// limits as well as keeps track of the individual maximum result size limit
// which is 1 MB.
//
// result_memory_tracker is just an object that makes sure the
// result_memory_limiter is notified when memory is released (but not sooner).
class result_memory_accounter;
class result_memory_limiter {
semaphore _memory_limiter;
public:
static constexpr size_t minimum_result_size = 4 * 1024;
static constexpr size_t maximum_result_size = 1 * 1024 * 1024;
public:
result_memory_limiter()
: _memory_limiter(memory::stats().total_memory() / 10)
{ }
result_memory_limiter(const result_memory_limiter&) = delete;
result_memory_limiter(result_memory_limiter&&) = delete;
// Reserves minimum_result_size and creates new memory accounter.
future<result_memory_accounter> new_read();
// Checks whether the result can grow any more, takes into account only
// the per shard limit.
stop_iteration check() const {
return stop_iteration(_memory_limiter.current() <= 0);
}
// Consumes n bytes from memory limiter and checks whether the result
// can grow any more (considering just the per-shard limit).
stop_iteration update_and_check(size_t n) {
_memory_limiter.consume(n);
return check();
}
void release(size_t n) noexcept {
_memory_limiter.signal(n);
}
semaphore& sem() noexcept { return _memory_limiter; }
};
class result_memory_tracker {
semaphore_units<> _units;
size_t _used_memory;
private:
static thread_local semaphore _dummy;
public:
result_memory_tracker() noexcept : _units(_dummy, 0), _used_memory(0) { }
result_memory_tracker(semaphore& sem, size_t blocked, size_t used) noexcept
: _units(sem, blocked), _used_memory(used) { }
size_t used_memory() const { return _used_memory; }
};
class result_memory_accounter {
result_memory_limiter* _limiter = nullptr;
size_t _blocked_bytes = 0;
size_t _used_memory = 0;
size_t _total_used_memory = 0;
private:
explicit result_memory_accounter(result_memory_limiter& limiter) noexcept
: _limiter(&limiter)
, _blocked_bytes(result_memory_limiter::minimum_result_size)
{ }
friend class result_memory_limiter;
public:
result_memory_accounter() = default;
// This constructor is used in cases when a result is produced on multiple
// shards (range queries). foreign_accounter is an accounter that, possibly,
// exist on the other shard and is used for merging the result. This
// accouter will learn how big the total result alread is and limit the
// part produced on this shard so that after merging the final result
// does not exceed the individual limit.
result_memory_accounter(result_memory_limiter& limiter, const result_memory_accounter& foreign_accounter) noexcept
: _limiter(&limiter)
, _total_used_memory(foreign_accounter.used_memory())
{ }
result_memory_accounter(result_memory_accounter&& other) noexcept
: _limiter(std::exchange(other._limiter, nullptr))
, _blocked_bytes(other._blocked_bytes)
, _used_memory(other._used_memory)
, _total_used_memory(other._total_used_memory)
{ }
result_memory_accounter& operator=(result_memory_accounter&& other) noexcept {
if (this != &other) {
this->~result_memory_accounter();
new (this) result_memory_accounter(std::move(other));
}
return *this;
}
~result_memory_accounter() {
if (_limiter) {
_limiter->release(_blocked_bytes);
}
}
size_t used_memory() const { return _used_memory; }
// Consume n more bytes for the result. Returns stop_iteration::yes if
// the result cannot grow any more (taking into account both individual
// and per-shard limits).
stop_iteration update_and_check(size_t n) {
_used_memory += n;
_total_used_memory += n;
auto stop = stop_iteration(_total_used_memory > result_memory_limiter::maximum_result_size);
if (_limiter && _used_memory > _blocked_bytes) {
auto to_block = std::min(_used_memory - _blocked_bytes, n);
_blocked_bytes += to_block;
stop = _limiter->update_and_check(to_block) || stop;
}
return stop;
}
// Checks whether the result can grow any more.
stop_iteration check() const {
stop_iteration stop { _total_used_memory > result_memory_limiter::maximum_result_size };
if (!stop && _used_memory >= _blocked_bytes && _limiter) {
return _limiter->check();
}
return stop;
}
// Consume n more bytes for the result.
void update(size_t n) {
update_and_check(n);
}
result_memory_tracker done() && {
if (!_limiter) {
return { };
}
auto& sem = std::exchange(_limiter, nullptr)->sem();
return result_memory_tracker(sem, _blocked_bytes, _used_memory);
}
};
inline future<result_memory_accounter> result_memory_limiter::new_read() {
return _memory_limiter.wait(minimum_result_size).then([this] {
return result_memory_accounter(*this);
});
}
enum class result_request {
only_result,
only_digest,

View File

@@ -32,6 +32,8 @@
namespace query {
thread_local semaphore result_memory_tracker::_dummy { 0 };
const partition_range full_partition_range = partition_range::make_open_ended_both_sides();
const query::partition_slice full_slice = query::partition_slice({ query::clustering_range::make_open_ended_both_sides() }, { }, { }, { });