db: add rate_limiter

Introduces the rate_limiter, a replica-side data structure meant for
tracking the frequence with which each partition is being accessed
(separately for reads and writes) and deciding whether the request
should be accepted and processed further or rejected.

The limiter is implemented as a statically allocated hashmap which keeps
track of the frequency with which partitions are accessed. Its entries
are incremented when an operation is admitted and are decayed
exponentially over time.

If a partition is detected to be accessed more than its limit allows,
requests are rejected with a probability calculated in such a way that,
on average, the number of accepted requests is kept at the limit.

The structure currently weights a bit above 1MB and each shard is meant
to keep a separate instance. All operations are O(1), including the
periodic timer.
This commit is contained in:
Piotr Dulikowski
2021-12-02 14:25:43 +01:00
parent 2162bb9f3b
commit 0fe8b55427
7 changed files with 690 additions and 0 deletions

View File

@@ -450,6 +450,7 @@ set(scylla_sources
db/large_data_handler.cc
db/legacy_schema_migrator.cc
db/marshal/type_parser.cc
db/rate_limiter.cc
db/schema_tables.cc
db/size_estimates_virtual_reader.cc
db/snapshot-ctl.cc

View File

@@ -505,6 +505,7 @@ scylla_tests = set([
'test/boost/group0_test',
'test/boost/exception_container_test',
'test/boost/result_utils_test',
'test/boost/rate_limiter_test',
'test/boost/expr_test',
'test/manual/ec2_snitch_test',
'test/manual/enormous_table_scan_test',
@@ -887,6 +888,7 @@ scylla_core = (['replica/database.cc',
'db/view/row_locking.cc',
'db/sstables-format-selector.cc',
'db/snapshot-ctl.cc',
'db/rate_limiter.cc',
'index/secondary_index_manager.cc',
'index/secondary_index.cc',
'utils/UUID_gen.cc',
@@ -1130,6 +1132,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/group0_state_machine.idl.hh',
'idl/forward_request.idl.hh',
'idl/replica_exception.idl.hh',
'idl/per_partition_rate_limit_info.idl.hh',
]
rusts = [
@@ -1281,6 +1284,7 @@ deps['test/boost/linearizing_input_stream_test'] = [
"test/lib/log.cc",
]
deps['test/boost/expr_test'] = ['test/boost/expr_test.cc'] + scylla_core
deps['test/boost/rate_limiter_test'] = ['test/boost/rate_limiter_test.cc', 'db/rate_limiter.cc']
deps['test/boost/duration_test'] += ['test/lib/exception_utils.cc']
deps['test/boost/schema_loader_test'] += ['tools/schema_loader.cc']

View File

@@ -0,0 +1,49 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <cstdint>
#include <variant>
#include <seastar/util/bool_class.hh>
namespace db {
using allow_per_partition_rate_limit = seastar::bool_class<class allow_per_partition_rate_limit_tag>;
namespace per_partition_rate_limit {
// Tells the replica to account the operation (increase the corresponding counter)
// and accept it regardless from the value of the counter.
//
// Used when the coordinator IS a replica (correct node and shard).
struct account_only {};
// Tells the replica to account the operation and decide whether to reject
// or not, based on the random variable sent by the coordinator.
//
// Used when the coordinator IS NOT a replica (wrong node or shard).
struct account_and_enforce {
// A random 32-bit number generated by the coordinator.
// Replicas are supposed to use it in order to decide whether
// to accept or reject.
uint32_t random_variable;
inline double get_random_variable_as_double() const {
return double(random_variable) / double(1LL << 32);
}
};
// std::monostate -> do not count to the rate limit and never reject
// account_and_enforce -> account to the rate limit and optionally reject
using info = std::variant<std::monostate, account_only, account_and_enforce>;
} // namespace per_partition_rate_limit
} // namespace db

305
db/rate_limiter.cc Normal file
View File

@@ -0,0 +1,305 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <cmath>
#include <numbers>
#include <array>
#include <random>
#include <variant>
#include <chrono>
#include <seastar/core/metrics.hh>
#include "utils/small_vector.hh"
#include "utils/murmur_hash.hh"
#include "db/rate_limiter.hh"
// The rate limiter keeps a hashmap of counters differentiated by operation type
// (e.g. read or write) and the partition token. On each operation,
// the corresponding counter is increased by 1.
//
// The counters are decremented via two mechanisms:
//
// 1. Every `time_window_duration`, all counters are halved.
// 2. Within a time window, on every `bucket_size` operations all counters
// are decremented by 1.
//
// The mechanism 1) makes sure that we do not forget about very frequent
// operations too quick and makes it possible to reject in a probabilistic
// manner (this is described in more detail in design notes).
//
// The mechanism 2) protects the internal hashmap from being flooded with
// counters with low values. This causes the rate limiter to underestimate
// the counter values by the current number of "buckets" within this
// time window. This strategy is also known as "lossy counting".
//
// Both mechanisms 1) and 2) are implemented in a lazy manner.
namespace db {
static constexpr size_t hash_bits = 16;
static constexpr size_t entry_count = 1 << hash_bits;
static constexpr size_t bucket_size = 10000;
void rate_limiter_base::on_timer() noexcept {
_time_window_history.pop_back();
_time_window_history.insert(_time_window_history.begin(), time_window_entry {
.entries_active = _current_entries_in_time_window,
.lossy_counting_decrease = _current_bucket,
});
_current_bucket = 0;
_current_ops_in_bucket = 0;
_current_entries_in_time_window = 0;
_current_time_window = (_current_time_window + 1) % (1 << time_window_bits);
// Because time window ids are 12 bit numbers and we increase the current
// time window number by 1 every second, it wraps around every 4096
// seconds (more than an hour). Because of this, some very old entry
// updated last 4096 seconds may accidentally become valid again.
//
// In order to prevent this, we make sure to update the entries
// more frequently. We do this by refreshing all the entries within half
// of the wraparound period (2048 seconds).
//
// Instead of clearing everything at once, we divide this operation
// into many small steps and perform them during time window change.
//
// All of this should make sure that each entry's time window is not
// older than 2048 seconds from the current generation.
constexpr size_t period = 1 << (time_window_bits - 1);
constexpr size_t entries_per_step = entry_count / period;
const size_t begin = _current_time_window * entries_per_step;
for (size_t i = 0; i < entries_per_step; i++) {
entry_refresh(_entries[(begin + i) % entry_count]);
}
}
rate_limiter_base::entry* rate_limiter_base::get_entry(uint32_t label, uint64_t token) noexcept {
// We need to either find the existing entry for this (label, token) combination
// or otherwise find an invalid entry which we can initialize and use.
//
// We start by looking at the entry corresponding to the computed hash,
// if it's occupied by another (label, token) try other entries using
// the quadratic probing strategy.
//
// We limit ourselves to 32 attempts - if no suitable entry is found
// then we return nullptr and admit the operation unconditionally.
// Because we use quadratic probing and entries can be deleted (lazily),
// a situation can occur where an entry A suddenly becomes inaccessible
// because another entry B which is earlier on the probe chain is deleted.
// One of the following will happen:
//
// 1. Either we will allocate a new entry over B and A becomes accessible
// again,
// 2. Or we will allocate a new entry for the same operation/partition as A
// and A will eventually expire.
//
// In the worst case, A might be a "hot" entry and be actively rate limited
// and the described situation will cause a large number of operations
// to be admitted. Fortunately, this will move the entry earlier in the
// probe chain, so this situation will happen a limited number of times (if
// any at all) for a single "hot" entry.
size_t hash = compute_hash(label, token);
static constexpr size_t max_probes = 32;
for (size_t i = 0; i < max_probes; i++) {
// Quadratic probing - every iteration jumps further than the previous one
hash = (hash + i) % entry_count;
entry& b = _entries[hash];
++_metrics.probe_count;
entry_refresh(b);
if (entry_is_empty(b)) {
++_metrics.allocations_on_empty;
b.token = token;
b.label = label;
b.op_count = _current_bucket;
return &b;
} else if (b.token == token && b.label == label) {
++_metrics.successful_lookups;
return &b;
}
}
++_metrics.failed_allocations;
return nullptr;
}
size_t rate_limiter_base::compute_hash(uint32_t label, uint64_t token) noexcept {
// The map key is a tuple (token, key) + salt
// The key is hashed with murmur hash for good hash quality
static constexpr size_t key_length = sizeof(token) + sizeof(label) + sizeof(_salt);
std::array<uint8_t, key_length> key;
uint8_t* ptr = key.data();
memcpy(ptr, &token, sizeof(token));
ptr += sizeof(token);
memcpy(ptr, &label, sizeof(label));
ptr += sizeof(label);
memcpy(ptr, &_salt, sizeof(_salt));
std::array<uint64_t, 2> out;
utils::murmur_hash::hash3_x64_128(key.data(), key_length, 0, out);
return out[0];
}
void rate_limiter_base::entry_refresh(rate_limiter_base::entry& b) noexcept {
uint32_t window_delta = _current_time_window - b.time_window;
if (window_delta == 0) {
// The entry is fresh, it was allocated in this time window
return;
}
if (window_delta < _time_window_history.size()) {
// The entry is not that old so we have to apply the effects
// of lossy counting and halving on time window switch
--_time_window_history[window_delta - 1].entries_active;
while (window_delta > 0) {
if (b.op_count > _time_window_history[window_delta - 1].lossy_counting_decrease) {
b.op_count -= _time_window_history[window_delta - 1].lossy_counting_decrease;
} else {
b.op_count = 0;
}
b.op_count /= 2;
--window_delta;
}
} else {
// The entry is very old and the op_count can be safely decreased to zero
b.op_count = 0;
}
++_current_entries_in_time_window;
b.time_window = _current_time_window;
}
bool rate_limiter_base::entry_is_empty(const rate_limiter_base::entry& b) noexcept {
return b.op_count <= _current_bucket;
}
void rate_limiter_base::register_metrics() {
namespace sm = seastar::metrics;
_metric_group.add_group("per_partition_rate_limiter", {
// TODO: Most of the following metrics are pretty low-level and not useful for users,
// perhaps they should be hidden behind a configuration flag
sm::make_counter("allocations", _metrics.allocations_on_empty,
sm::description("Number of times a entry was allocated over an empty/expired entry.")),
sm::make_counter("successful_lookups", _metrics.successful_lookups,
sm::description("Number of times a lookup returned an already allocated entry.")),
sm::make_counter("failed_allocations", _metrics.failed_allocations,
sm::description("Number of times the rate limiter gave up trying to allocate.")),
sm::make_counter("probe_count", _metrics.probe_count,
sm::description("Number of probes made during lookups.")),
sm::make_gauge("load_factor", [&] {
uint32_t occupied_entry_count = _current_entries_in_time_window;
for (const auto& twe : _time_window_history) {
occupied_entry_count += twe.entries_active;
}
return double(occupied_entry_count) / double(entry_count);
},
sm::description("Current load factor of the hash table (upper bound, may be overestimated).")),
});
}
rate_limiter_base::rate_limiter_base()
: _salt(std::random_device{}())
, _entries(entry_count)
, _time_window_history(op_count_bits - 1) {
register_metrics();
}
uint64_t rate_limiter_base::increase_and_get_counter(label& l, uint64_t token) noexcept {
// Assign a label if not done yet
if (l._label == 0) {
l._label = _next_label++;
}
entry* b = get_entry(l._label, token);
if (!b) {
// We failed to allocate a entry for this partition. This means that
// we won't track hit count for this partition during this time window.
// Assume that it's OK to admit the operation.
return 0;
}
// Protect from wrap-around
b->op_count = std::min<uint32_t>((1 << op_count_bits) - 1, b->op_count + 1);
++_current_ops_in_bucket;
if (_current_ops_in_bucket >= bucket_size) {
// Every `bucket_size` operations, virtually decrement all entries
// by one. We implement it by always subtracting the `_current_bucket`
// when comparing the count in the entry with the limit.
++_current_bucket;
_current_ops_in_bucket -= bucket_size;
}
return b->op_count - _current_bucket;
}
rate_limiter_base::can_proceed rate_limiter_base::account_operation(
label& l, uint64_t token, uint64_t limit,
const db::per_partition_rate_limit::info& rate_limit_info) noexcept {
if (std::holds_alternative<std::monostate>(rate_limit_info)) {
// Rate limiting turned off
return can_proceed::yes;
}
const uint64_t count = increase_and_get_counter(l, token);
if (auto* info = std::get_if<db::per_partition_rate_limit::account_and_enforce>(&rate_limit_info)) {
// On each time window change we halve the entry counts, therefore
// a partition with X ops/s will stabilize at 2X hits at the end
// of each time window.
if (count <= 2 * limit) {
return can_proceed::yes;
} else {
// As mentioned before, assuming a fixed operation rate, the operation
// count in a entry will oscillate between X at the beginning of the
// time window and 2X at the end. In order to only accept `limit`
// operations within a time window, we need to reject with probability
// P_c(x), where P_c(x) is a function such that, integrated over [X, 2X]
// will be equal to `limit`. `P_c(x) = limit / (x * ln 2)` satisfies
// this criterion.
//
// All replicas get the same value for the random variable X, with an
// expectation that all replicas' counters oscillate between the same
// values. Because of that, most of the time replicas will agree
// and either all accept or reject.
if (info->get_random_variable_as_double() * double(count) * std::numbers::ln2 < double(limit)) {
return can_proceed::yes;
} else {
return can_proceed::no;
}
}
} else {
return can_proceed::yes;
}
}
template class generic_rate_limiter<seastar::lowres_clock>;
}

178
db/rate_limiter.hh Normal file
View File

@@ -0,0 +1,178 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <cstdint>
#include <cstddef>
#include <chrono>
#include <limits>
#include <concepts>
#include <vector>
#include <optional>
#include <random>
#include <seastar/core/future.hh>
#include <seastar/core/timer.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/util/bool_class.hh>
#include "utils/chunked_vector.hh"
#include "db/per_partition_rate_limit_info.hh"
// A data structure used to implement per-partition rate limiting. It accounts
// operations and enforces limits when it is detected that the operation rate
// is too high.
namespace db {
class rate_limiter_base {
public:
static constexpr size_t op_count_bits = 20;
static constexpr size_t time_window_bits = 12;
private:
struct metrics {
uint64_t allocations_on_empty = 0;
uint64_t successful_lookups = 0;
uint64_t failed_allocations = 0;
uint64_t probe_count = 0;
};
// Represents a piece of the hashmap storage.
struct entry {
public:
// The partition key token of the operation which allocated this entry.
uint64_t token = 0;
// The label of the operation which allocated this entry.
// Labels are used to differentiate operations which should be counted
// separately, e.g. reads and writes to the same table or writes
// to two different tables.
uint32_t label = 0;
// The number of operations counted for given token/label.
// It is virtually decremented on each window change, so the real
// operation count is actually `op_count - _current_bucket`.
// If the number drops to zero or below, the entry is considered
// "expired" and may be overwritten by another operation.
uint32_t op_count : op_count_bits = 0;
// ID of the time window in which the entry was allocated.
uint32_t time_window : time_window_bits = 0;
};
struct time_window_entry {
// How many entries are there active within this time window?
uint32_t entries_active = 0;
// By how much should the counter should be decreased within
// this time window?
uint32_t lossy_counting_decrease = 0;
};
public:
struct can_proceed_tag{};
using can_proceed = seastar::bool_class<can_proceed_tag>;
// Identifies a type of operation which is counted separately from other
// operations. For example, reads and writes for given table should have
// separate labels.
struct label {
private:
// The current ID used to identify the label in the rate limiter.
// It is assigned on first use.
uint32_t _label = 0;
friend class rate_limiter_base;
};
private:
uint32_t _current_bucket = 0;
uint32_t _current_ops_in_bucket = 0;
uint32_t _current_entries_in_time_window = 0;
uint32_t _next_label = 1;
uint32_t _current_time_window = 0;
const uint32_t _salt;
utils::chunked_vector<entry> _entries;
std::vector<time_window_entry> _time_window_history;
metrics _metrics;
seastar::metrics::metric_groups _metric_group;
private:
entry* get_entry(uint32_t label, uint64_t token) noexcept;
size_t compute_hash(uint32_t label, uint64_t token) noexcept;
void entry_refresh(entry& b) noexcept;
bool entry_is_empty(const entry& b) noexcept;
void register_metrics();
protected:
void on_timer() noexcept;
public:
rate_limiter_base();
rate_limiter_base(const rate_limiter_base&) = delete;
rate_limiter_base(rate_limiter_base&&) = delete;
rate_limiter_base& operator=(const rate_limiter_base&) = delete;
rate_limiter_base& operator=(rate_limiter_base&&) = delete;
// (For testing purposes only)
// Increments the counter for given (label, token) and returns
// the new value of the counter.
uint64_t increase_and_get_counter(label& l, uint64_t token) noexcept;
// Increments the counter for given (label, token).
// If the counter indicates that the partition is over the limit,
// returns can_proceed::no with some probability.
//
// The `random_variable` parameter should be a value from range [0, 1).
// It is used as the source of randomness - the function chooses a threshold
// and accepts if and only if `random_variable` is below it.
//
// The probability is calculated in such a way that statistically
// only `limit` operations per second are admitted.
can_proceed account_operation(label& l, uint64_t token, uint64_t limit,
const db::per_partition_rate_limit::info& rate_limit_info) noexcept;
};
template<typename ClockType>
class generic_rate_limiter : public rate_limiter_base {
private:
seastar::timer<ClockType> _timer;
public:
generic_rate_limiter()
: rate_limiter_base() {
// Rate limiting is more accurate when the rate limiter timers
// on all nodes are synchronized. Assume that the nodes' clocks
// are synchronized and schedule the first tick on the beginning
// of the closest second.
const auto period = std::chrono::seconds(1);
const auto now = std::chrono::system_clock::now();
const auto initial_delay = period - now.time_since_epoch() % period;
_timer.set_callback([this] { on_timer(); });
_timer.arm(ClockType::now() + initial_delay, period);
}
};
extern template class generic_rate_limiter<seastar::lowres_clock>;
using rate_limiter = generic_rate_limiter<seastar::lowres_clock>;
}

View File

@@ -0,0 +1,23 @@
/*
* Copyright 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace db {
namespace per_partition_rate_limit {
struct account_only {};
struct account_and_enforce {
uint32_t random_variable;
};
// using info = std::variant<std::monostate, account_only, account_and_enforce>;
} // namespace per_partition_rate_limit
} // namespace db

View File

@@ -0,0 +1,130 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <chrono>
#include <cstdint>
#include <seastar/core/manual_clock.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/util/later.hh>
#include <seastar/testing/test_case.hh>
#include "db/rate_limiter.hh"
using namespace seastar;
using test_rate_limiter = db::generic_rate_limiter<seastar::manual_clock>;
future<> step_seconds(int seconds) {
for (int i = 0; i < seconds; i++) {
// The rate limiter's timer executes periodically every second
// and we want the timer to run `seconds` times.
// Because `manual_clock::advance` executes each timer only once
// even if they reschedule, we cannot just advance by requested
// number of seconds - instead, we must advance multiple times
// by one second.
manual_clock::advance(std::chrono::seconds(1));
co_await yield();
}
}
SEASTAR_TEST_CASE(test_rate_limiter_no_rejections_on_sequential) {
const uint64_t token_count = 1000 * 1000;
const uint64_t limit = 1;
test_rate_limiter::label lbl;
test_rate_limiter limiter;
for (uint64_t token = 0; token < token_count; token++) {
BOOST_REQUIRE_LE(limiter.increase_and_get_counter(lbl, token), 1);
co_await maybe_yield();
}
}
SEASTAR_TEST_CASE(test_rate_limiter_partition_label_separation) {
const uint64_t token_count = 30;
const uint64_t repeat_count = 10;
std::vector<test_rate_limiter::label> labels{3};
test_rate_limiter limiter;
for (uint64_t i = 0; i < repeat_count; i++) {
for (uint64_t token = 0; token < token_count; token++) {
for (auto& l : labels) {
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(l, token), i + 1);
co_await maybe_yield();
}
}
}
}
SEASTAR_TEST_CASE(test_rate_limiter_halving_over_time) {
test_rate_limiter::label lbl;
test_rate_limiter limiter;
for (int i = 0; i < 16; i++) {
limiter.increase_and_get_counter(lbl, 0);
}
// Should be cut in half
co_await step_seconds(1);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), (16 / 2) + 1);
// Should decrease four times (9 -> 2)
co_await step_seconds(2);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), (9 / 4) + 1);
// Should be reset
co_await step_seconds(10);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 1);
}
SEASTAR_TEST_CASE(test_rate_limiter_time_window_wraparound_handling) {
test_rate_limiter::label lbl;
test_rate_limiter limiter;
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 1);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 2);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 3);
// Advance far into the future so that the time window wraps around
co_await step_seconds(1 << test_rate_limiter::time_window_bits);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 1);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 2);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 3);
// TODO: Workaround for seastar#1072. Calling `manual_clock::advance`
// multiple times and then quitting the test immediately causes
// the test framework to hang. I didn't have the time to debug it, but I
// suspect there are some pending tasks which need to finish before exiting
// from the main test task.
co_await seastar::sleep(std::chrono::seconds(1));
}
SEASTAR_TEST_CASE(test_rate_limiter_account_operation) {
const uint64_t limit = 1;
const int ops_per_loop = 1000;
test_rate_limiter::label lbl;
test_rate_limiter limiter;
// We use UINT_MAX as the random parameter so that we get rejected quickly
db::per_partition_rate_limit::account_and_enforce info {
.random_variable = UINT32_MAX,
};
bool encountered_rejection = false;
for (int i = 0; i < ops_per_loop; i++) {
if (limiter.account_operation(lbl, 0, limit, info) == test_rate_limiter::can_proceed::no) {
encountered_rejection = true;
break;
}
co_await maybe_yield();
}
BOOST_REQUIRE(encountered_rejection);
}