diff --git a/query-result.hh b/query-result.hh index 1ed234ff50..8efde28ec9 100644 --- a/query-result.hh +++ b/query-result.hh @@ -33,6 +33,162 @@ namespace stdx = std::experimental; namespace query { +// result_memory_limiter, result_memory_accounter and result_memory_tracker +// form an infrastructure for limiting size of query results. +// +// result_memory_limiter is a shard-local object which ensures that all results +// combined do not use more than 10% of the shard memory. +// +// result_memory_accounter is used by result producers, updates the shard-local +// limits as well as keeps track of the individual maximum result size limit +// which is 1 MB. +// +// result_memory_tracker is just an object that makes sure the +// result_memory_limiter is notified when memory is released (but not sooner). + +class result_memory_accounter; + +class result_memory_limiter { + semaphore _memory_limiter; +public: + static constexpr size_t minimum_result_size = 4 * 1024; + static constexpr size_t maximum_result_size = 1 * 1024 * 1024; +public: + result_memory_limiter() + : _memory_limiter(memory::stats().total_memory() / 10) + { } + + result_memory_limiter(const result_memory_limiter&) = delete; + result_memory_limiter(result_memory_limiter&&) = delete; + + // Reserves minimum_result_size and creates new memory accounter. + future new_read(); + + // Checks whether the result can grow any more, takes into account only + // the per shard limit. + stop_iteration check() const { + return stop_iteration(_memory_limiter.current() <= 0); + } + + // Consumes n bytes from memory limiter and checks whether the result + // can grow any more (considering just the per-shard limit). + stop_iteration update_and_check(size_t n) { + _memory_limiter.consume(n); + return check(); + } + + void release(size_t n) noexcept { + _memory_limiter.signal(n); + } + + semaphore& sem() noexcept { return _memory_limiter; } +}; + + +class result_memory_tracker { + semaphore_units<> _units; + size_t _used_memory; +private: + static thread_local semaphore _dummy; +public: + result_memory_tracker() noexcept : _units(_dummy, 0), _used_memory(0) { } + result_memory_tracker(semaphore& sem, size_t blocked, size_t used) noexcept + : _units(sem, blocked), _used_memory(used) { } + size_t used_memory() const { return _used_memory; } +}; + +class result_memory_accounter { + result_memory_limiter* _limiter = nullptr; + size_t _blocked_bytes = 0; + size_t _used_memory = 0; + size_t _total_used_memory = 0; +private: + explicit result_memory_accounter(result_memory_limiter& limiter) noexcept + : _limiter(&limiter) + , _blocked_bytes(result_memory_limiter::minimum_result_size) + { } + friend class result_memory_limiter; +public: + result_memory_accounter() = default; + + // This constructor is used in cases when a result is produced on multiple + // shards (range queries). foreign_accounter is an accounter that, possibly, + // exist on the other shard and is used for merging the result. This + // accouter will learn how big the total result alread is and limit the + // part produced on this shard so that after merging the final result + // does not exceed the individual limit. + result_memory_accounter(result_memory_limiter& limiter, const result_memory_accounter& foreign_accounter) noexcept + : _limiter(&limiter) + , _total_used_memory(foreign_accounter.used_memory()) + { } + + result_memory_accounter(result_memory_accounter&& other) noexcept + : _limiter(std::exchange(other._limiter, nullptr)) + , _blocked_bytes(other._blocked_bytes) + , _used_memory(other._used_memory) + , _total_used_memory(other._total_used_memory) + { } + + result_memory_accounter& operator=(result_memory_accounter&& other) noexcept { + if (this != &other) { + this->~result_memory_accounter(); + new (this) result_memory_accounter(std::move(other)); + } + return *this; + } + + ~result_memory_accounter() { + if (_limiter) { + _limiter->release(_blocked_bytes); + } + } + + size_t used_memory() const { return _used_memory; } + + // Consume n more bytes for the result. Returns stop_iteration::yes if + // the result cannot grow any more (taking into account both individual + // and per-shard limits). + stop_iteration update_and_check(size_t n) { + _used_memory += n; + _total_used_memory += n; + auto stop = stop_iteration(_total_used_memory > result_memory_limiter::maximum_result_size); + if (_limiter && _used_memory > _blocked_bytes) { + auto to_block = std::min(_used_memory - _blocked_bytes, n); + _blocked_bytes += to_block; + stop = _limiter->update_and_check(to_block) || stop; + } + return stop; + } + + // Checks whether the result can grow any more. + stop_iteration check() const { + stop_iteration stop { _total_used_memory > result_memory_limiter::maximum_result_size }; + if (!stop && _used_memory >= _blocked_bytes && _limiter) { + return _limiter->check(); + } + return stop; + } + + // Consume n more bytes for the result. + void update(size_t n) { + update_and_check(n); + } + + result_memory_tracker done() && { + if (!_limiter) { + return { }; + } + auto& sem = std::exchange(_limiter, nullptr)->sem(); + return result_memory_tracker(sem, _blocked_bytes, _used_memory); + } +}; + +inline future result_memory_limiter::new_read() { + return _memory_limiter.wait(minimum_result_size).then([this] { + return result_memory_accounter(*this); + }); +} + enum class result_request { only_result, only_digest, diff --git a/query.cc b/query.cc index 3c83aa11ae..3f18bf0c31 100644 --- a/query.cc +++ b/query.cc @@ -32,6 +32,8 @@ namespace query { +thread_local semaphore result_memory_tracker::_dummy { 0 }; + const partition_range full_partition_range = partition_range::make_open_ended_both_sides(); const query::partition_slice full_slice = query::partition_slice({ query::clustering_range::make_open_ended_both_sides() }, { }, { }, { });