diff --git a/configure.py b/configure.py
index 89cee8e6fc..312e93bd71 100755
--- a/configure.py
+++ b/configure.py
@@ -601,6 +601,7 @@ scylla_core = (['database.cc',
'db/consistency_level.cc',
'db/system_keyspace.cc',
'db/system_distributed_keyspace.cc',
+ 'db/size_estimates_virtual_reader.cc',
'db/schema_tables.cc',
'db/cql_type_parser.cc',
'db/legacy_schema_migrator.cc',
diff --git a/db/size_estimates_virtual_reader.cc b/db/size_estimates_virtual_reader.cc
new file mode 100644
index 0000000000..52920fbe33
--- /dev/null
+++ b/db/size_estimates_virtual_reader.cc
@@ -0,0 +1,328 @@
+/*
+ * Copyright (C) 2019 ScyllaDB
+ *
+ * Modified by ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+
+#include "clustering_bounds_comparator.hh"
+#include "database_fwd.hh"
+#include "db/system_keyspace.hh"
+#include "dht/i_partitioner.hh"
+#include "partition_range_compat.hh"
+#include "range.hh"
+#include "service/storage_service.hh"
+#include "mutation_fragment.hh"
+#include "sstables/sstables.hh"
+#include "db/timeout_clock.hh"
+#include "database.hh"
+
+#include "db/size_estimates_virtual_reader.hh"
+
+namespace db {
+
+namespace size_estimates {
+
+struct virtual_row {
+ const bytes& cf_name;
+ const token_range& tokens;
+ clustering_key_prefix as_key() const {
+ return clustering_key_prefix::from_exploded(std::vector{cf_name, tokens.start, tokens.end});
+ }
+};
+
+struct virtual_row_comparator {
+ schema_ptr _schema;
+ virtual_row_comparator(schema_ptr schema) : _schema(schema) { }
+ bool operator()(const clustering_key_prefix& key1, const clustering_key_prefix& key2) {
+ return clustering_key_prefix::prefix_equality_less_compare(*_schema)(key1, key2);
+ }
+ bool operator()(const virtual_row& row, const clustering_key_prefix& key) {
+ return operator()(row.as_key(), key);
+ }
+ bool operator()(const clustering_key_prefix& key, const virtual_row& row) {
+ return operator()(key, row.as_key());
+ }
+};
+
+// Iterating over the cartesian product of cf_names and token_ranges.
+class virtual_row_iterator : public std::iterator {
+ std::reference_wrapper> _cf_names;
+ std::reference_wrapper> _ranges;
+ size_t _cf_names_idx = 0;
+ size_t _ranges_idx = 0;
+public:
+ struct end_iterator_tag {};
+ virtual_row_iterator(const std::vector& cf_names, const std::vector& ranges)
+ : _cf_names(std::ref(cf_names))
+ , _ranges(std::ref(ranges))
+ { }
+ virtual_row_iterator(const std::vector& cf_names, const std::vector& ranges, end_iterator_tag)
+ : _cf_names(std::ref(cf_names))
+ , _ranges(std::ref(ranges))
+ , _cf_names_idx(cf_names.size())
+ , _ranges_idx(ranges.size())
+ {
+ if (cf_names.empty() || ranges.empty()) {
+ // The product of an empty range with any range is an empty range.
+ // In this case we want the end iterator to be equal to the begin iterator,
+ // which has_ranges_idx = _cf_names_idx = 0.
+ _ranges_idx = _cf_names_idx = 0;
+ }
+ }
+ virtual_row_iterator& operator++() {
+ if (++_ranges_idx == _ranges.get().size() && ++_cf_names_idx < _cf_names.get().size()) {
+ _ranges_idx = 0;
+ }
+ return *this;
+ }
+ virtual_row_iterator operator++(int) {
+ virtual_row_iterator i(*this);
+ ++(*this);
+ return i;
+ }
+ const value_type operator*() const {
+ return { _cf_names.get()[_cf_names_idx], _ranges.get()[_ranges_idx] };
+ }
+ bool operator==(const virtual_row_iterator& i) const {
+ return _cf_names_idx == i._cf_names_idx
+ && _ranges_idx == i._ranges_idx;
+ }
+ bool operator!=(const virtual_row_iterator& i) const {
+ return !(*this == i);
+ }
+};
+
+/**
+ * Returns the keyspaces, ordered by name, as selected by the partition_range.
+ */
+static std::vector get_keyspaces(const schema& s, const database& db, dht::partition_range range) {
+ struct keyspace_less_comparator {
+ const schema& _s;
+ keyspace_less_comparator(const schema& s) : _s(s) { }
+ dht::ring_position as_ring_position(const sstring& ks) {
+ auto pkey = partition_key::from_single_value(_s, utf8_type->decompose(ks));
+ return dht::global_partitioner().decorate_key(_s, std::move(pkey));
+ }
+ bool operator()(const sstring& ks1, const sstring& ks2) {
+ return as_ring_position(ks1).less_compare(_s, as_ring_position(ks2));
+ }
+ bool operator()(const sstring& ks, const dht::ring_position& rp) {
+ return as_ring_position(ks).less_compare(_s, rp);
+ }
+ bool operator()(const dht::ring_position& rp, const sstring& ks) {
+ return rp.less_compare(_s, as_ring_position(ks));
+ }
+ };
+ auto keyspaces = db.get_non_system_keyspaces();
+ auto cmp = keyspace_less_comparator(s);
+ boost::sort(keyspaces, cmp);
+ return boost::copy_range>(
+ range.slice(keyspaces, std::move(cmp)) | boost::adaptors::filtered([&s] (const auto& ks) {
+ // If this is a range query, results are divided between shards by the partition key (keyspace_name).
+ return shard_of(dht::global_partitioner().get_token(s,
+ partition_key::from_single_value(s, utf8_type->decompose(ks))))
+ == engine().cpu_id();
+ })
+ );
+}
+
+/**
+ * Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables.
+ */
+static dht::partition_range as_ring_position_range(dht::token_range& r) {
+ std::optional::bound> start_bound, end_bound;
+ if (r.start()) {
+ start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }};
+ }
+ if (r.end()) {
+ end_bound = {{ dht::ring_position(r.end()->value(), dht::ring_position::token_bound::end), r.end()->is_inclusive() }};
+ }
+ return dht::partition_range(std::move(start_bound), std::move(end_bound), r.is_singular());
+}
+
+/**
+ * Add a new range_estimates for the specified range, considering the sstables associated with `cf`.
+ */
+static system_keyspace::range_estimates estimate(const column_family& cf, const token_range& r) {
+ int64_t count{0};
+ utils::estimated_histogram hist{0};
+ auto from_bytes = [] (auto& b) {
+ return dht::global_partitioner().from_sstring(utf8_type->to_string(b));
+ };
+ dht::token_range_vector ranges;
+ ::compat::unwrap_into(
+ wrapping_range({{ from_bytes(r.start), false }}, {{ from_bytes(r.end) }}),
+ dht::token_comparator(),
+ [&] (auto&& rng) { ranges.push_back(std::move(rng)); });
+ for (auto&& r : ranges) {
+ auto rp_range = as_ring_position_range(r);
+ for (auto&& sstable : cf.select_sstables(rp_range)) {
+ count += sstable->estimated_keys_for_range(r);
+ hist.merge(sstable->get_stats_metadata().estimated_partition_size);
+ }
+ }
+ return {cf.schema(), r.start, r.end, count, count > 0 ? hist.mean() : 0};
+}
+
+future> get_local_ranges() {
+ auto& ss = service::get_local_storage_service();
+ return ss.get_local_tokens().then([&ss] (auto&& tokens) {
+ auto ranges = ss.get_token_metadata().get_primary_ranges_for(std::move(tokens));
+ std::vector local_ranges;
+ auto to_bytes = [](const std::optional& b) {
+ assert(b);
+ return utf8_type->decompose(dht::global_partitioner().to_sstring(b->value()));
+ };
+ // We merge the ranges to be compatible with how Cassandra shows it's size estimates table.
+ // All queries will be on that table, where all entries are text and there's no notion of
+ // token ranges form the CQL point of view.
+ auto left_inf = boost::find_if(ranges, [] (auto&& r) {
+ return !r.start() || r.start()->value() == dht::minimum_token();
+ });
+ auto right_inf = boost::find_if(ranges, [] (auto&& r) {
+ return !r.end() || r.start()->value() == dht::maximum_token();
+ });
+ if (left_inf != right_inf && left_inf != ranges.end() && right_inf != ranges.end()) {
+ local_ranges.push_back(token_range{to_bytes(right_inf->start()), to_bytes(left_inf->end())});
+ ranges.erase(left_inf);
+ ranges.erase(right_inf);
+ }
+ for (auto&& r : ranges) {
+ local_ranges.push_back(token_range{to_bytes(r.start()), to_bytes(r.end())});
+ }
+ boost::sort(local_ranges, [] (auto&& tr1, auto&& tr2) {
+ return utf8_type->less(tr1.start, tr2.start);
+ });
+ return local_ranges;
+ });
+}
+
+size_estimates_mutation_reader::size_estimates_mutation_reader(schema_ptr schema, const dht::partition_range& prange, const query::partition_slice& slice, streamed_mutation::forwarding fwd)
+ : impl(schema)
+ , _schema(std::move(schema))
+ , _prange(&prange)
+ , _slice(slice)
+ , _fwd(fwd)
+ { }
+
+future<> size_estimates_mutation_reader::get_next_partition() {
+ auto& db = service::get_local_storage_proxy().get_db().local();
+ if (!_keyspaces) {
+ _keyspaces = get_keyspaces(*_schema, db, *_prange);
+ _current_partition = _keyspaces->begin();
+ }
+ if (_current_partition == _keyspaces->end()) {
+ _end_of_stream = true;
+ return make_ready_future<>();
+ }
+ return get_local_ranges().then([&db, this] (auto&& ranges) {
+ auto estimates = this->estimates_for_current_keyspace(db, std::move(ranges));
+ auto mutations = db::system_keyspace::make_size_estimates_mutation(*_current_partition, std::move(estimates));
+ ++_current_partition;
+ std::vector ms;
+ ms.emplace_back(std::move(mutations));
+ _partition_reader = flat_mutation_reader_from_mutations(std::move(ms), _fwd);
+ });
+}
+
+future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_point timeout) {
+ return do_until([this, timeout] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
+ if (!_partition_reader) {
+ return get_next_partition();
+ }
+ return _partition_reader->consume_pausable([this] (mutation_fragment mf) {
+ push_mutation_fragment(std::move(mf));
+ return stop_iteration(is_buffer_full());
+ }, timeout).then([this] {
+ if (_partition_reader->is_end_of_stream() && _partition_reader->is_buffer_empty()) {
+ _partition_reader = std::nullopt;
+ }
+ });
+ });
+}
+
+void size_estimates_mutation_reader::next_partition() {
+ clear_buffer_to_next_partition();
+ if (is_buffer_empty()) {
+ _partition_reader = std::nullopt;
+ }
+}
+
+future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
+ clear_buffer();
+ _prange = ≺
+ _keyspaces = std::nullopt;
+ _partition_reader = std::nullopt;
+ _end_of_stream = false;
+ return make_ready_future<>();
+}
+
+future<> size_estimates_mutation_reader::fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) {
+ forward_buffer_to(pr.start());
+ _end_of_stream = false;
+ if (_partition_reader) {
+ return _partition_reader->fast_forward_to(std::move(pr), timeout);
+ }
+ return make_ready_future<>();
+}
+
+size_t size_estimates_mutation_reader::buffer_size() const {
+ if (_partition_reader) {
+ return flat_mutation_reader::impl::buffer_size() + _partition_reader->buffer_size();
+ }
+ return flat_mutation_reader::impl::buffer_size();
+}
+
+std::vector
+size_estimates_mutation_reader::estimates_for_current_keyspace(const database& db, std::vector local_ranges) const {
+ // For each specified range, estimate (crudely) mean partition size and partitions count.
+ auto pkey = partition_key::from_single_value(*_schema, utf8_type->decompose(*_current_partition));
+ auto cfs = db.find_keyspace(*_current_partition).metadata()->cf_meta_data();
+ auto cf_names = boost::copy_range>(cfs | boost::adaptors::transformed([] (auto&& cf) {
+ return utf8_type->decompose(cf.first);
+ }));
+ boost::sort(cf_names, [] (auto&& n1, auto&& n2) {
+ return utf8_type->less(n1, n2);
+ });
+ std::vector estimates;
+ for (auto& range : _slice.row_ranges(*_schema, pkey)) {
+ auto rows = boost::make_iterator_range(
+ virtual_row_iterator(cf_names, local_ranges),
+ virtual_row_iterator(cf_names, local_ranges, virtual_row_iterator::end_iterator_tag()));
+ auto rows_to_estimate = range.slice(rows, virtual_row_comparator(_schema));
+ for (auto&& r : rows_to_estimate) {
+ auto& cf = db.find_column_family(*_current_partition, utf8_type->to_string(r.cf_name));
+ estimates.push_back(estimate(cf, r.tokens));
+ if (estimates.size() >= _slice.partition_row_limit()) {
+ return estimates;
+ }
+ }
+ }
+ return estimates;
+}
+
+} // namespace size_estimates
+
+} // namespace db
diff --git a/db/size_estimates_virtual_reader.hh b/db/size_estimates_virtual_reader.hh
index 5a5ec47c8c..4bf738dba9 100644
--- a/db/size_estimates_virtual_reader.hh
+++ b/db/size_estimates_virtual_reader.hh
@@ -21,33 +21,18 @@
* along with Scylla. If not, see .
*/
-#include
-#include
-#include
-#include
-
-#include "clustering_bounds_comparator.hh"
-#include "database_fwd.hh"
-#include "db/system_keyspace.hh"
-#include "dht/i_partitioner.hh"
#include "mutation_reader.hh"
-#include "partition_range_compat.hh"
-#include "range.hh"
-#include "service/storage_service.hh"
-#include "mutation_fragment.hh"
-#include "sstables/sstables.hh"
-#include "db/timeout_clock.hh"
-#include "database.hh"
namespace db {
namespace size_estimates {
+struct token_range {
+ bytes start;
+ bytes end;
+};
+
class size_estimates_mutation_reader final : public flat_mutation_reader::impl {
- struct token_range {
- bytes start;
- bytes end;
- };
schema_ptr _schema;
const dht::partition_range* _prange;
const query::partition_slice& _slice;
@@ -57,267 +42,18 @@ class size_estimates_mutation_reader final : public flat_mutation_reader::impl {
streamed_mutation::forwarding _fwd;
flat_mutation_reader_opt _partition_reader;
public:
- size_estimates_mutation_reader(schema_ptr schema, const dht::partition_range& prange, const query::partition_slice& slice, streamed_mutation::forwarding fwd)
- : impl(schema)
- , _schema(std::move(schema))
- , _prange(&prange)
- , _slice(slice)
- , _fwd(fwd)
- { }
+ size_estimates_mutation_reader(schema_ptr, const dht::partition_range&, const query::partition_slice&, streamed_mutation::forwarding);
+ virtual future<> fill_buffer(db::timeout_clock::time_point) override;
+ virtual void next_partition() override;
+ virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override;
+ virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override;
+ virtual size_t buffer_size() const override;
private:
- future<> get_next_partition() {
- // For each specified range, estimate (crudely) mean partition size and partitions count.
- auto& db = service::get_local_storage_proxy().get_db().local();
- if (!_keyspaces) {
- _keyspaces = get_keyspaces(*_schema, db, *_prange);
- _current_partition = _keyspaces->begin();
- }
- if (_current_partition == _keyspaces->end()) {
- _end_of_stream = true;
- return make_ready_future<>();
- }
- return get_local_ranges().then([&db, this] (auto&& ranges) {
- auto estimates = this->estimates_for_current_keyspace(db, std::move(ranges));
- auto mutations = db::system_keyspace::make_size_estimates_mutation(*_current_partition, std::move(estimates));
- ++_current_partition;
- std::vector ms;
- ms.emplace_back(std::move(mutations));
- _partition_reader = flat_mutation_reader_from_mutations(std::move(ms), _fwd);
- });
- }
-public:
- virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
- return do_until([this, timeout] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
- if (!_partition_reader) {
- return get_next_partition();
- }
- return _partition_reader->consume_pausable([this] (mutation_fragment mf) {
- push_mutation_fragment(std::move(mf));
- return stop_iteration(is_buffer_full());
- }, timeout).then([this] {
- if (_partition_reader->is_end_of_stream() && _partition_reader->is_buffer_empty()) {
- _partition_reader = std::nullopt;
- }
- });
- });
- }
- virtual void next_partition() override {
- clear_buffer_to_next_partition();
- if (is_buffer_empty()) {
- _partition_reader = std::nullopt;
- }
- }
- virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
- clear_buffer();
- _prange = ≺
- _keyspaces = std::nullopt;
- _partition_reader = std::nullopt;
- _end_of_stream = false;
- return make_ready_future<>();
- }
- virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
- forward_buffer_to(pr.start());
- _end_of_stream = false;
- if (_partition_reader) {
- return _partition_reader->fast_forward_to(std::move(pr), timeout);
- }
- return make_ready_future<>();
- }
- virtual size_t buffer_size() const override {
- if (_partition_reader) {
- return flat_mutation_reader::impl::buffer_size() + _partition_reader->buffer_size();
- }
- return flat_mutation_reader::impl::buffer_size();
- }
- /**
- * Returns the primary ranges for the local node.
- * Used for testing as well.
- */
- static future> get_local_ranges() {
- auto& ss = service::get_local_storage_service();
- return ss.get_local_tokens().then([&ss] (auto&& tokens) {
- auto ranges = ss.get_token_metadata().get_primary_ranges_for(std::move(tokens));
- std::vector local_ranges;
- auto to_bytes = [](const std::optional& b) {
- assert(b);
- return utf8_type->decompose(dht::global_partitioner().to_sstring(b->value()));
- };
- // We merge the ranges to be compatible with how Cassandra shows it's size estimates table.
- // All queries will be on that table, where all entries are text and there's no notion of
- // token ranges form the CQL point of view.
- auto left_inf = boost::find_if(ranges, [] (auto&& r) {
- return !r.start() || r.start()->value() == dht::minimum_token();
- });
- auto right_inf = boost::find_if(ranges, [] (auto&& r) {
- return !r.end() || r.start()->value() == dht::maximum_token();
- });
- if (left_inf != right_inf && left_inf != ranges.end() && right_inf != ranges.end()) {
- local_ranges.push_back(token_range{to_bytes(right_inf->start()), to_bytes(left_inf->end())});
- ranges.erase(left_inf);
- ranges.erase(right_inf);
- }
- for (auto&& r : ranges) {
- local_ranges.push_back(token_range{to_bytes(r.start()), to_bytes(r.end())});
- }
- boost::sort(local_ranges, [] (auto&& tr1, auto&& tr2) {
- return utf8_type->less(tr1.start, tr2.start);
- });
- return local_ranges;
- });
- }
-private:
- struct virtual_row {
- const bytes& cf_name;
- const token_range& tokens;
- clustering_key_prefix as_key() const {
- return clustering_key_prefix::from_exploded(std::vector{cf_name, tokens.start, tokens.end});
- }
- };
- struct virtual_row_comparator {
- schema_ptr _schema;
- virtual_row_comparator(schema_ptr schema) : _schema(schema) { }
- bool operator()(const clustering_key_prefix& key1, const clustering_key_prefix& key2) {
- return clustering_key_prefix::prefix_equality_less_compare(*_schema)(key1, key2);
- }
- bool operator()(const virtual_row& row, const clustering_key_prefix& key) {
- return operator()(row.as_key(), key);
- }
- bool operator()(const clustering_key_prefix& key, const virtual_row& row) {
- return operator()(key, row.as_key());
- }
- };
- class virtual_row_iterator : public std::iterator {
- std::reference_wrapper> _cf_names;
- std::reference_wrapper> _ranges;
- size_t _cf_names_idx = 0;
- size_t _ranges_idx = 0;
- public:
- struct end_iterator_tag {};
- virtual_row_iterator(const std::vector& cf_names, const std::vector& ranges)
- : _cf_names(std::ref(cf_names))
- , _ranges(std::ref(ranges))
- { }
- virtual_row_iterator(const std::vector& cf_names, const std::vector& ranges, end_iterator_tag)
- : _cf_names(std::ref(cf_names))
- , _ranges(std::ref(ranges))
- , _cf_names_idx(cf_names.size())
- , _ranges_idx(ranges.size())
- { }
- virtual_row_iterator& operator++() {
- if (++_ranges_idx == _ranges.get().size() && ++_cf_names_idx < _cf_names.get().size()) {
- _ranges_idx = 0;
- }
- return *this;
- }
- virtual_row_iterator operator++(int) {
- virtual_row_iterator i(*this);
- ++(*this);
- return i;
- }
- const value_type operator*() const {
- return { _cf_names.get()[_cf_names_idx], _ranges.get()[_ranges_idx] };
- }
- bool operator==(const virtual_row_iterator& i) const {
- return _cf_names_idx == i._cf_names_idx
- && _ranges_idx == i._ranges_idx;
- }
- bool operator!=(const virtual_row_iterator& i) const {
- return !(*this == i);
- }
- };
+ future<> get_next_partition();
std::vector
- estimates_for_current_keyspace(const database& db, std::vector local_ranges) const {
- auto pkey = partition_key::from_single_value(*_schema, utf8_type->decompose(*_current_partition));
- auto cfs = db.find_keyspace(*_current_partition).metadata()->cf_meta_data();
- auto cf_names = boost::copy_range>(cfs | boost::adaptors::transformed([] (auto&& cf) {
- return utf8_type->decompose(cf.first);
- }));
- boost::sort(cf_names, [] (auto&& n1, auto&& n2) {
- return utf8_type->less(n1, n2);
- });
- std::vector estimates;
- for (auto& range : _slice.row_ranges(*_schema, pkey)) {
- auto rows = boost::make_iterator_range(
- virtual_row_iterator(cf_names, local_ranges),
- virtual_row_iterator(cf_names, local_ranges, virtual_row_iterator::end_iterator_tag()));
- auto rows_to_estimate = range.slice(rows, virtual_row_comparator(_schema));
- for (auto&& r : rows_to_estimate) {
- auto& cf = db.find_column_family(*_current_partition, utf8_type->to_string(r.cf_name));
- estimates.push_back(estimate(cf, r.tokens));
- if (estimates.size() >= _slice.partition_row_limit()) {
- return estimates;
- }
- }
- }
- return estimates;
- }
-
- /**
- * Returns the keyspaces, ordered by name, as selected by the partition_range.
- */
- static ks_range get_keyspaces(const schema& s, const database& db, dht::partition_range range) {
- struct keyspace_less_comparator {
- const schema& _s;
- keyspace_less_comparator(const schema& s) : _s(s) { }
- dht::ring_position as_ring_position(const sstring& ks) {
- auto pkey = partition_key::from_single_value(_s, utf8_type->decompose(ks));
- return dht::global_partitioner().decorate_key(_s, std::move(pkey));
- }
- bool operator()(const sstring& ks1, const sstring& ks2) {
- return as_ring_position(ks1).less_compare(_s, as_ring_position(ks2));
- }
- bool operator()(const sstring& ks, const dht::ring_position& rp) {
- return as_ring_position(ks).less_compare(_s, rp);
- }
- bool operator()(const dht::ring_position& rp, const sstring& ks) {
- return rp.less_compare(_s, as_ring_position(ks));
- }
- };
- auto keyspaces = db.get_non_system_keyspaces();
- auto cmp = keyspace_less_comparator(s);
- boost::sort(keyspaces, cmp);
- return boost::copy_range(range.slice(keyspaces, std::move(cmp)));
- }
-
- /**
- * Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables.
- */
- static dht::partition_range as_ring_position_range(dht::token_range& r) {
- std::optional::bound> start_bound, end_bound;
- if (r.start()) {
- start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }};
- }
- if (r.end()) {
- end_bound = {{ dht::ring_position(r.end()->value(), dht::ring_position::token_bound::end), r.end()->is_inclusive() }};
- }
- return dht::partition_range(std::move(start_bound), std::move(end_bound), r.is_singular());
- }
-
- /**
- * Add a new range_estimates for the specified range, considering the sstables associated with `cf`.
- */
- static system_keyspace::range_estimates estimate(const column_family& cf, const token_range& r) {
- int64_t count{0};
- utils::estimated_histogram hist{0};
- auto from_bytes = [] (auto& b) {
- return dht::global_partitioner().from_sstring(utf8_type->to_string(b));
- };
- dht::token_range_vector ranges;
- ::compat::unwrap_into(
- wrapping_range({{ from_bytes(r.start), false }}, {{ from_bytes(r.end) }}),
- dht::token_comparator(),
- [&] (auto&& rng) { ranges.push_back(std::move(rng)); });
- for (auto&& r : ranges) {
- auto rp_range = as_ring_position_range(r);
- for (auto&& sstable : cf.select_sstables(rp_range)) {
- count += sstable->estimated_keys_for_range(r);
- hist.merge(sstable->get_stats_metadata().estimated_partition_size);
- }
- }
- return {cf.schema(), r.start, r.end, count, count > 0 ? hist.mean() : 0};
- }
+ estimates_for_current_keyspace(const database&, std::vector local_ranges) const;
};
struct virtual_reader {
@@ -332,6 +68,12 @@ struct virtual_reader {
}
};
+/**
+ * Returns the primary ranges for the local node.
+ * Used for testing as well.
+ */
+future> get_local_ranges();
+
} // namespace size_estimates
} // namespace db
diff --git a/tests/virtual_reader_test.cc b/tests/virtual_reader_test.cc
index 50e572b83e..ae108f9635 100644
--- a/tests/virtual_reader_test.cc
+++ b/tests/virtual_reader_test.cc
@@ -45,180 +45,143 @@
using namespace std::literals::chrono_literals;
SEASTAR_TEST_CASE(test_query_size_estimates_virtual_table) {
- return do_with_cql_env([] (auto& e) {
- auto ranges = db::size_estimates::size_estimates_mutation_reader::get_local_ranges().get0();
+ return do_with_cql_env_thread([] (cql_test_env& e) {
+ auto ranges = db::size_estimates::get_local_ranges().get0();
auto start_token1 = utf8_type->to_string(ranges[3].start);
auto start_token2 = utf8_type->to_string(ranges[5].start);
auto end_token1 = utf8_type->to_string(ranges[3].end);
auto end_token2 = utf8_type->to_string(ranges[55].end);
- auto &qp = e.local_qp();
- return e.execute_cql("create table cf1(pk text PRIMARY KEY, v int);").discard_result().then([&e] {
- return e.execute_cql("create table cf2(pk text PRIMARY KEY, v int);").discard_result();
- }).then([&qp] {
- return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks';").then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 512);
- });
- }).then([&qp] {
- return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' limit 100;").then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 100);
- });
- }).then([&qp] {
- return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1';").then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 256);
- });
- }).then([&qp] {
- return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name > 'cf1';").then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 256);
- });
- }).then([&qp] {
- return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name >= 'cf1';").then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 512);
- });
- }).then([&qp] {
- return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name < 'cf2';").then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 256);
- });
- }).then([&qp] {
- return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name <= 'cf2';").then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 512);
- });
- }).then([&qp] {
- return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name in ('cf1', 'cf2');").then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 512);
- });
- }).then([&qp] {
- return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name >= 'cf1' and table_name <= 'cf1';").then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 256);
- });
- }).then([&qp] {
- return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name >= 'cf1' and table_name <= 'cf2';").then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 512);
- });
- }).then([&qp] {
- return qp.execute_internal("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name > 'cf1' and table_name < 'cf2';").then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 0);
- });
- }).then([&qp, start_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start = '%s';", start_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 1);
- });
- }).then([&qp, start_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start >= '%s';", start_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 253);
- });
- }).then([&qp, start_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start > '%s';", start_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 252);
- });
- }).then([&qp, start_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start <= '%s';", start_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 4);
- });
- }).then([&qp, start_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start < '%s';", start_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 3);
- });
- }).then([&qp, start_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 1);
- });
- }).then([&qp, start_token1, start_token2] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token2)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 3);
- });
- }).then([&qp, start_token1, start_token2] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start > '%s' and range_start < '%s';", start_token1, start_token2)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 1);
- });
- }).then([&qp, start_token1, start_token2] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start in ('%s', '%s');", start_token1, start_token2)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 2);
- });
- }).then([&qp, start_token1, start_token2] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start > '%s' and range_start <= '%s';", start_token1, start_token2)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 2);
- });
- }).then([&qp, start_token1, start_token2] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start >= '%s' and range_start < '%s';", start_token1, start_token2)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 2);
- });
- }).then([&qp, start_token1, end_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start = '%s' and range_end = '%s';", start_token1, end_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 1);
- });
- }).then([&qp, start_token1, end_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start = '%s' and range_end >= '%s';", start_token1, end_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 1);
- });
- }).then([&qp, start_token1, end_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start = '%s' and range_end > '%s';", start_token1, end_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 0);
- });
- }).then([&qp, start_token1, end_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start = '%s' and range_end <= '%s';", start_token1, end_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 1);
- });
- }).then([&qp, start_token1, end_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start = '%s' and range_end < '%s';", start_token1, end_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 0);
- });
- }).then([&qp, start_token1, end_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start = '%s' and range_end >= '%s' and range_end <= '%s';", start_token1, end_token1, end_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 1);
- });
- }).then([&qp, start_token1, end_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and table_name = 'cf1' and range_start = '%s' and range_end > '%s' and range_end < '%s';", start_token1, end_token1, end_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 0);
- });
- }).then([&qp, start_token1, end_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and (table_name, range_start, range_end) = ('cf1', '%s', '%s');", start_token1, end_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 1);
- });
- }).then([&qp, start_token1, end_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') and (table_name) <= ('cf2');", start_token1, end_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 509);
- });
- }).then([&qp, start_token1, start_token2, end_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') "
- "and (table_name, range_start) <= ('cf2', '%s');", start_token1, end_token1, start_token2)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 259);
- });
- }).then([&qp, start_token1] {
- return qp.execute_internal(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
- "and (table_name, range_start) < ('cf2', '%s');", start_token1)).then([](auto rs) {
- BOOST_REQUIRE_EQUAL(rs->size(), 259);
- });
- }).discard_result();
+
+ // Should not timeout.
+ e.execute_cql("select * from system.size_estimates;").discard_result().get();
+
+ auto rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks';").get0();
+ assert_that(rs).is_rows().with_size(0);
+
+ e.execute_cql("create table cf1(pk text PRIMARY KEY, v int);").discard_result().get();
+ e.execute_cql("create table cf2(pk text PRIMARY KEY, v int);").discard_result().get();
+
+ rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks';").get0();
+ assert_that(rs).is_rows().with_size(512);
+
+ rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' limit 100;").get0();
+ assert_that(rs).is_rows().with_size(100);
+
+ rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name = 'cf1';").get0();
+ assert_that(rs).is_rows().with_size(256);
+
+ rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name > 'cf1';").get0();
+ assert_that(rs).is_rows().with_size(256);
+
+ rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name >= 'cf1';").get0();
+ assert_that(rs).is_rows().with_size(512);
+
+ rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name < 'cf2';").get0();
+ assert_that(rs).is_rows().with_size(256);
+
+ rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name <= 'cf2';").get0();
+ assert_that(rs).is_rows().with_size(512);
+
+ rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name in ('cf1', 'cf2');").get0();
+ assert_that(rs).is_rows().with_size(512);
+
+ rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name >= 'cf1' and table_name <= 'cf1';").get0();
+ assert_that(rs).is_rows().with_size(256);
+
+ rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name >= 'cf1' and table_name <= 'cf2';").get0();
+ assert_that(rs).is_rows().with_size(512);
+
+ rs = e.execute_cql("select * from system.size_estimates where keyspace_name = 'ks' and table_name > 'cf1' and table_name < 'cf2';").get0();
+ assert_that(rs).is_rows().with_size(0);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start = '%s';", start_token1)).get0();
+ assert_that(rs).is_rows().with_size(1);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start >= '%s';", start_token1)).get0();
+ assert_that(rs).is_rows().with_size(253);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start > '%s';", start_token1)).get0();
+ assert_that(rs).is_rows().with_size(252);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start <= '%s';", start_token1)).get0();
+ assert_that(rs).is_rows().with_size(4);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start < '%s';", start_token1)).get0();
+ assert_that(rs).is_rows().with_size(3);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token1)).get0();
+ assert_that(rs).is_rows().with_size(1);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start >= '%s' and range_start <= '%s';", start_token1, start_token2)).get0();
+ assert_that(rs).is_rows().with_size(3);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start > '%s' and range_start < '%s';", start_token1, start_token2)).get0();
+ assert_that(rs).is_rows().with_size(1);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start in ('%s', '%s');", start_token1, start_token2)).get0();
+ assert_that(rs).is_rows().with_size(2);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start > '%s' and range_start <= '%s';", start_token1, start_token2)).get0();
+ assert_that(rs).is_rows().with_size(2);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start >= '%s' and range_start < '%s';", start_token1, start_token2)).get0();
+ assert_that(rs).is_rows().with_size(2);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start = '%s' and range_end = '%s';", start_token1, end_token1)).get0();
+ assert_that(rs).is_rows().with_size(1);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start = '%s' and range_end >= '%s';", start_token1, end_token1)).get0();
+ assert_that(rs).is_rows().with_size(1);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start = '%s' and range_end > '%s';", start_token1, end_token1)).get0();
+ assert_that(rs).is_rows().with_size(0);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start = '%s' and range_end <= '%s';", start_token1, end_token1)).get0();
+ assert_that(rs).is_rows().with_size(1);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start = '%s' and range_end < '%s';", start_token1, end_token1)).get0();
+ assert_that(rs).is_rows().with_size(0);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start = '%s' and range_end >= '%s' and range_end <= '%s';", start_token1, end_token1, end_token1)).get0();
+ assert_that(rs).is_rows().with_size(1);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and table_name = 'cf1' and range_start = '%s' and range_end > '%s' and range_end < '%s';", start_token1, end_token1, end_token1)).get0();
+ assert_that(rs).is_rows().with_size(0);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and (table_name, range_start, range_end) = ('cf1', '%s', '%s');", start_token1, end_token1)).get0();
+ assert_that(rs).is_rows().with_size(1);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') and (table_name) <= ('cf2');", start_token1, end_token1)).get0();
+ assert_that(rs).is_rows().with_size(509);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and (table_name, range_start, range_end) >= ('cf1', '%s', '%s') "
+ "and (table_name, range_start) <= ('cf2', '%s');", start_token1, end_token1, start_token2)).get0();
+ assert_that(rs).is_rows().with_size(259);
+
+ rs = e.execute_cql(sprint("select * from system.size_estimates where keyspace_name = 'ks' "
+ "and (table_name, range_start) < ('cf2', '%s');", start_token1)).get0();
+ assert_that(rs).is_rows().with_size(259);
});
}