From b02b441c2e971f6f1ff5f998ca0409170d88682b Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 8 Oct 2020 14:53:00 +0200 Subject: [PATCH 01/10] sstables: move sstable_set implementations to a separate module All the implementations were kept in sstables/compaction_strategy.cc which is quite large even without them. `sstable_set` already had its own header file, now it gets its own implementation file. The declarations of implementation classes and interfaces (`sstable_set_impl`, `bag_sstable_set`, and so on) were also exposed in a header file, sstable_set_impl.hh, for the purposes of potential unit testing. --- configure.py | 1 + sstables/compaction_strategy.cc | 390 +------------------------------- sstables/sstable_set.cc | 389 +++++++++++++++++++++++++++++++ sstables/sstable_set_impl.hh | 98 ++++++++ 4 files changed, 490 insertions(+), 388 deletions(-) create mode 100644 sstables/sstable_set.cc create mode 100644 sstables/sstable_set_impl.hh diff --git a/configure.py b/configure.py index ba35c14420..9a8af7d252 100755 --- a/configure.py +++ b/configure.py @@ -575,6 +575,7 @@ scylla_core = (['database.cc', 'sstables/mp_row_consumer.cc', 'sstables/sstables.cc', 'sstables/sstables_manager.cc', + 'sstables/sstable_set.cc', 'sstables/mx/writer.cc', 'sstables/kl/writer.cc', 'sstables/sstable_version.cc', diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 599af7e5ae..63a51b7395 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -47,10 +47,8 @@ #include "compaction_strategy_impl.hh" #include "schema.hh" #include "sstable_set.hh" -#include "compatible_ring_position.hh" #include #include -#include #include #include "size_tiered_compaction_strategy.hh" #include "date_tiered_compaction_strategy.hh" @@ -64,384 +62,6 @@ logging::logger leveled_manifest::logger("LeveledManifest"); namespace sstables { -extern logging::logger clogger; - -void sstable_run::insert(shared_sstable sst) { - _all.insert(std::move(sst)); -} - -void sstable_run::erase(shared_sstable sst) { - _all.erase(sst); -} - -uint64_t sstable_run::data_size() const { - return boost::accumulate(_all | boost::adaptors::transformed(std::mem_fn(&sstable::data_size)), uint64_t(0)); -} - -std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) { - os << "Run = {\n"; - if (run.all().empty()) { - os << " Identifier: not found\n"; - } else { - os << format(" Identifier: {}\n", (*run.all().begin())->run_identifier()); - } - - auto frags = boost::copy_range>(run.all()); - boost::sort(frags, [] (const shared_sstable& x, const shared_sstable& y) { - return x->get_first_decorated_key().token() < y->get_first_decorated_key().token(); - }); - os << " Fragments = {\n"; - for (auto& frag : frags) { - os << format(" {}={}:{}\n", frag->generation(), frag->get_first_decorated_key().token(), frag->get_last_decorated_key().token()); - } - os << " }\n}\n"; - return os; -} - -class incremental_selector_impl { -public: - virtual ~incremental_selector_impl() {} - virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) = 0; -}; - -class sstable_set_impl { -public: - virtual ~sstable_set_impl() {} - virtual std::unique_ptr clone() const = 0; - virtual std::vector select(const dht::partition_range& range) const = 0; - virtual void insert(shared_sstable sst) = 0; - virtual void erase(shared_sstable sst) = 0; - virtual std::unique_ptr make_incremental_selector() const = 0; -}; - -sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all) - : _impl(std::move(impl)) - , _schema(std::move(s)) - , _all(std::move(all)) { -} - -sstable_set::sstable_set(const sstable_set& x) - : _impl(x._impl->clone()) - , _schema(x._schema) - , _all(make_lw_shared(*x._all)) - , _all_runs(x._all_runs) { -} - -sstable_set::sstable_set(sstable_set&&) noexcept = default; - -sstable_set& -sstable_set::operator=(const sstable_set& x) { - if (this != &x) { - auto tmp = sstable_set(x); - *this = std::move(tmp); - } - return *this; -} - -sstable_set& -sstable_set::operator=(sstable_set&&) noexcept = default; - -std::vector -sstable_set::select(const dht::partition_range& range) const { - return _impl->select(range); -} - -std::vector -sstable_set::select_sstable_runs(const std::vector& sstables) const { - auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); - return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { - return _all_runs.at(run_id); - })); -} - -void -sstable_set::insert(shared_sstable sst) { - _impl->insert(sst); - try { - _all->insert(sst); - try { - _all_runs[sst->run_identifier()].insert(sst); - } catch (...) { - _all->erase(sst); - throw; - } - } catch (...) { - _impl->erase(sst); - throw; - } -} - -void -sstable_set::erase(shared_sstable sst) { - _impl->erase(sst); - _all->erase(sst); - _all_runs[sst->run_identifier()].erase(sst); -} - -sstable_set::~sstable_set() = default; - -sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s) - : _impl(std::move(impl)) - , _cmp(s) { -} - -sstable_set::incremental_selector::~incremental_selector() = default; - -sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default; - -sstable_set::incremental_selector::selection -sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { - if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) { - std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos); - _current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); }); - } - return {_current_sstables, _current_next_position}; -} - -sstable_set::incremental_selector -sstable_set::make_incremental_selector() const { - return incremental_selector(_impl->make_incremental_selector(), *_schema); -} - -// default sstable_set, not specialized for anything -class bag_sstable_set : public sstable_set_impl { - // erasing is slow, but select() is fast - std::vector _sstables; -public: - virtual std::unique_ptr clone() const override { - return std::make_unique(*this); - } - virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override { - return _sstables; - } - virtual void insert(shared_sstable sst) override { - _sstables.push_back(std::move(sst)); - } - virtual void erase(shared_sstable sst) override { - auto it = boost::range::find(_sstables, sst); - if (it != _sstables.end()){ - _sstables.erase(it); - } - } - virtual std::unique_ptr make_incremental_selector() const override; - class incremental_selector; -}; - -class bag_sstable_set::incremental_selector : public incremental_selector_impl { - const std::vector& _sstables; -public: - incremental_selector(const std::vector& sstables) - : _sstables(sstables) { - } - virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) override { - return std::make_tuple(dht::partition_range::make_open_ended_both_sides(), _sstables, dht::ring_position_view::max()); - } -}; - -std::unique_ptr bag_sstable_set::make_incremental_selector() const { - return std::make_unique(_sstables); -} - -// specialized when sstables are partitioned in the token range space -// e.g. leveled compaction strategy -class partitioned_sstable_set : public sstable_set_impl { - using value_set = std::unordered_set; - using interval_map_type = boost::icl::interval_map; - using interval_type = interval_map_type::interval_type; - using map_iterator = interval_map_type::const_iterator; -private: - schema_ptr _schema; - std::vector _unleveled_sstables; - interval_map_type _leveled_sstables; - // Change counter on interval map for leveled sstables which is used by - // incremental selector to determine whether or not to invalidate iterators. - uint64_t _leveled_sstables_change_cnt = 0; - bool _use_level_metadata = false; -private: - static interval_type make_interval(const schema& s, const dht::partition_range& range) { - return interval_type::closed( - compatible_ring_position_or_view(s, dht::ring_position_view(range.start()->value())), - compatible_ring_position_or_view(s, dht::ring_position_view(range.end()->value()))); - } - interval_type make_interval(const dht::partition_range& range) const { - return make_interval(*_schema, range); - } - static interval_type make_interval(const schema_ptr& s, const sstable& sst) { - return interval_type::closed( - compatible_ring_position_or_view(s, dht::ring_position(sst.get_first_decorated_key())), - compatible_ring_position_or_view(s, dht::ring_position(sst.get_last_decorated_key()))); - } - interval_type make_interval(const sstable& sst) { - return make_interval(_schema, sst); - } - interval_type singular(const dht::ring_position& rp) const { - // We should use the view here, since this is used for queries. - auto rpv = dht::ring_position_view(rp); - auto crp = compatible_ring_position_or_view(*_schema, std::move(rpv)); - return interval_type::closed(crp, crp); - } - std::pair query(const dht::partition_range& range) const { - if (range.start() && range.end()) { - return _leveled_sstables.equal_range(make_interval(range)); - } - else if (range.start() && !range.end()) { - auto start = singular(range.start()->value()); - return { _leveled_sstables.lower_bound(start), _leveled_sstables.end() }; - } else if (!range.start() && range.end()) { - auto end = singular(range.end()->value()); - return { _leveled_sstables.begin(), _leveled_sstables.upper_bound(end) }; - } else { - return { _leveled_sstables.begin(), _leveled_sstables.end() }; - } - } - // SSTables are stored separately to avoid interval map's fragmentation issue when level 0 falls behind. - bool store_as_unleveled(const shared_sstable& sst) const { - return _use_level_metadata && sst->get_sstable_level() == 0; - } -public: - static dht::ring_position to_ring_position(const compatible_ring_position_or_view& crp) { - // Ring position views, representing bounds of sstable intervals are - // guaranteed to have key() != nullptr; - const auto& pos = crp.position(); - return dht::ring_position(pos.token(), *pos.key()); - } - static dht::partition_range to_partition_range(const interval_type& i) { - return dht::partition_range::make( - {to_ring_position(i.lower()), boost::icl::is_left_closed(i.bounds())}, - {to_ring_position(i.upper()), boost::icl::is_right_closed(i.bounds())}); - } - static dht::partition_range to_partition_range(const dht::ring_position_view& pos, const interval_type& i) { - auto lower_bound = [&] { - if (pos.key()) { - return dht::partition_range::bound(dht::ring_position(pos.token(), *pos.key()), - pos.is_after_key() == dht::ring_position_view::after_key::no); - } else { - return dht::partition_range::bound(dht::ring_position(pos.token(), pos.get_token_bound()), true); - } - }(); - auto upper_bound = dht::partition_range::bound(to_ring_position(i.lower()), !boost::icl::is_left_closed(i.bounds())); - return dht::partition_range::make(std::move(lower_bound), std::move(upper_bound)); - } - explicit partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true) - : _schema(std::move(schema)) - , _use_level_metadata(use_level_metadata) { - } - virtual std::unique_ptr clone() const override { - return std::make_unique(*this); - } - virtual std::vector select(const dht::partition_range& range) const override { - auto ipair = query(range); - auto b = std::move(ipair.first); - auto e = std::move(ipair.second); - value_set result; - while (b != e) { - boost::copy(b++->second, std::inserter(result, result.end())); - } - auto r = _unleveled_sstables; - r.insert(r.end(), result.begin(), result.end()); - return r; - } - virtual void insert(shared_sstable sst) override { - if (store_as_unleveled(sst)) { - _unleveled_sstables.push_back(std::move(sst)); - } else { - _leveled_sstables_change_cnt++; - _leveled_sstables.add({make_interval(*sst), value_set({sst})}); - } - } - virtual void erase(shared_sstable sst) override { - if (store_as_unleveled(sst)) { - _unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end()); - } else { - _leveled_sstables_change_cnt++; - _leveled_sstables.subtract({make_interval(*sst), value_set({sst})}); - } - } - virtual std::unique_ptr make_incremental_selector() const override; - class incremental_selector; -}; - -class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { - schema_ptr _schema; - const std::vector& _unleveled_sstables; - const interval_map_type& _leveled_sstables; - const uint64_t& _leveled_sstables_change_cnt; - uint64_t _last_known_leveled_sstables_change_cnt; - map_iterator _it; - // Only to back the dht::ring_position_view returned from select(). - dht::ring_position _next_position; -private: - dht::ring_position_view next_position(map_iterator it) { - if (it == _leveled_sstables.end()) { - _next_position = dht::ring_position::max(); - return dht::ring_position_view::max(); - } else { - _next_position = partitioned_sstable_set::to_ring_position(it->first.lower()); - return dht::ring_position_view(_next_position, dht::ring_position_view::after_key(!boost::icl::is_left_closed(it->first.bounds()))); - } - } - static bool is_before_interval(const compatible_ring_position_or_view& crp, const interval_type& interval) { - if (boost::icl::is_left_closed(interval.bounds())) { - return crp < interval.lower(); - } else { - return crp <= interval.lower(); - } - } - void maybe_invalidate_iterator(const compatible_ring_position_or_view& crp) { - if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) { - _it = _leveled_sstables.lower_bound(interval_type::closed(crp, crp)); - _last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt; - } - } -public: - incremental_selector(schema_ptr schema, const std::vector& unleveled_sstables, const interval_map_type& leveled_sstables, - const uint64_t& leveled_sstables_change_cnt) - : _schema(std::move(schema)) - , _unleveled_sstables(unleveled_sstables) - , _leveled_sstables(leveled_sstables) - , _leveled_sstables_change_cnt(leveled_sstables_change_cnt) - , _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt) - , _it(leveled_sstables.begin()) - , _next_position(dht::ring_position::min()) { - } - virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view& pos) override { - auto crp = compatible_ring_position_or_view(*_schema, pos); - auto ssts = _unleveled_sstables; - using namespace dht; - - maybe_invalidate_iterator(crp); - - while (_it != _leveled_sstables.end()) { - if (boost::icl::contains(_it->first, crp)) { - ssts.insert(ssts.end(), _it->second.begin(), _it->second.end()); - return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it))); - } - // We don't want to skip current interval if pos lies before it. - if (is_before_interval(crp, _it->first)) { - return std::make_tuple(partitioned_sstable_set::to_partition_range(pos, _it->first), std::move(ssts), next_position(_it)); - } - _it++; - } - return std::make_tuple(partition_range::make_open_ended_both_sides(), std::move(ssts), ring_position_view::max()); - } -}; - -std::unique_ptr partitioned_sstable_set::make_incremental_selector() const { - return std::make_unique(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt); -} - -std::unique_ptr compaction_strategy_impl::make_sstable_set(schema_ptr schema) const { - return std::make_unique(); -} - -std::unique_ptr leveled_compaction_strategy::make_sstable_set(schema_ptr schema) const { - return std::make_unique(std::move(schema)); -} - -sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata) { - return sstables::sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); -} - compaction_descriptor compaction_strategy_impl::get_major_compaction_job(column_family& cf, std::vector candidates) { return compaction_descriptor(std::move(candidates), cf.get_sstable_set(), service::get_local_compaction_priority()); } @@ -550,6 +170,8 @@ void size_tiered_backlog_tracker::remove_sstable(sstables::shared_sstable sst) { namespace sstables { +extern logging::logger clogger; + // The backlog for TWCS is just the sum of the individual backlogs in each time window. // We'll keep various SizeTiered backlog tracker objects-- one per window for the static SSTables. // We then scan the current compacting and in-progress writes and matching them to existing time @@ -1031,14 +653,6 @@ bool compaction_strategy::use_clustering_key_filter() const { return _compaction_strategy_impl->use_clustering_key_filter(); } -sstable_set -compaction_strategy::make_sstable_set(schema_ptr schema) const { - return sstable_set( - _compaction_strategy_impl->make_sstable_set(schema), - schema, - make_lw_shared()); -} - compaction_backlog_tracker& compaction_strategy::get_backlog_tracker() { return _compaction_strategy_impl->get_backlog_tracker(); } diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc new file mode 100644 index 0000000000..255f7c41a4 --- /dev/null +++ b/sstables/sstable_set.cc @@ -0,0 +1,389 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include + +#include "compatible_ring_position.hh" +#include "compaction_strategy_impl.hh" +#include "leveled_compaction_strategy.hh" + +#include "sstable_set_impl.hh" + +namespace sstables { + +void sstable_run::insert(shared_sstable sst) { + _all.insert(std::move(sst)); +} + +void sstable_run::erase(shared_sstable sst) { + _all.erase(sst); +} + +uint64_t sstable_run::data_size() const { + return boost::accumulate(_all | boost::adaptors::transformed(std::mem_fn(&sstable::data_size)), uint64_t(0)); +} + +std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) { + os << "Run = {\n"; + if (run.all().empty()) { + os << " Identifier: not found\n"; + } else { + os << format(" Identifier: {}\n", (*run.all().begin())->run_identifier()); + } + + auto frags = boost::copy_range>(run.all()); + boost::sort(frags, [] (const shared_sstable& x, const shared_sstable& y) { + return x->get_first_decorated_key().token() < y->get_first_decorated_key().token(); + }); + os << " Fragments = {\n"; + for (auto& frag : frags) { + os << format(" {}={}:{}\n", frag->generation(), frag->get_first_decorated_key().token(), frag->get_last_decorated_key().token()); + } + os << " }\n}\n"; + return os; +} + +sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all) + : _impl(std::move(impl)) + , _schema(std::move(s)) + , _all(std::move(all)) { +} + +sstable_set::sstable_set(const sstable_set& x) + : _impl(x._impl->clone()) + , _schema(x._schema) + , _all(make_lw_shared(*x._all)) + , _all_runs(x._all_runs) { +} + +sstable_set::sstable_set(sstable_set&&) noexcept = default; + +sstable_set& +sstable_set::operator=(const sstable_set& x) { + if (this != &x) { + auto tmp = sstable_set(x); + *this = std::move(tmp); + } + return *this; +} + +sstable_set& +sstable_set::operator=(sstable_set&&) noexcept = default; + +std::vector +sstable_set::select(const dht::partition_range& range) const { + return _impl->select(range); +} + +std::vector +sstable_set::select_sstable_runs(const std::vector& sstables) const { + auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); + return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { + return _all_runs.at(run_id); + })); +} + +void +sstable_set::insert(shared_sstable sst) { + _impl->insert(sst); + try { + _all->insert(sst); + try { + _all_runs[sst->run_identifier()].insert(sst); + } catch (...) { + _all->erase(sst); + throw; + } + } catch (...) { + _impl->erase(sst); + throw; + } +} + +void +sstable_set::erase(shared_sstable sst) { + _impl->erase(sst); + _all->erase(sst); + _all_runs[sst->run_identifier()].erase(sst); +} + +sstable_set::~sstable_set() = default; + +sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s) + : _impl(std::move(impl)) + , _cmp(s) { +} + +sstable_set::incremental_selector::~incremental_selector() = default; + +sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default; + +sstable_set::incremental_selector::selection +sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { + if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) { + std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos); + _current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); }); + } + return {_current_sstables, _current_next_position}; +} + +sstable_set::incremental_selector +sstable_set::make_incremental_selector() const { + return incremental_selector(_impl->make_incremental_selector(), *_schema); +} + +std::unique_ptr bag_sstable_set::clone() const { + return std::make_unique(*this); +} + +std::vector bag_sstable_set::select(const dht::partition_range& range) const { + return _sstables; +} + +void bag_sstable_set::insert(shared_sstable sst) { + _sstables.push_back(std::move(sst)); +} + +void bag_sstable_set::erase(shared_sstable sst) { + auto it = boost::range::find(_sstables, sst); + if (it != _sstables.end()){ + _sstables.erase(it); + } +} + +class bag_sstable_set::incremental_selector : public incremental_selector_impl { + const std::vector& _sstables; +public: + incremental_selector(const std::vector& sstables) + : _sstables(sstables) { + } + virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) override { + return std::make_tuple(dht::partition_range::make_open_ended_both_sides(), _sstables, dht::ring_position_view::max()); + } +}; + +std::unique_ptr bag_sstable_set::make_incremental_selector() const { + return std::make_unique(_sstables); +} + +partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const schema& s, const dht::partition_range& range) { + return interval_type::closed( + compatible_ring_position_or_view(s, dht::ring_position_view(range.start()->value())), + compatible_ring_position_or_view(s, dht::ring_position_view(range.end()->value()))); +} + +partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const dht::partition_range& range) const { + return make_interval(*_schema, range); +} + +partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const schema_ptr& s, const sstable& sst) { + return interval_type::closed( + compatible_ring_position_or_view(s, dht::ring_position(sst.get_first_decorated_key())), + compatible_ring_position_or_view(s, dht::ring_position(sst.get_last_decorated_key()))); +} + +partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const sstable& sst) { + return make_interval(_schema, sst); +} + +partitioned_sstable_set::interval_type partitioned_sstable_set::singular(const dht::ring_position& rp) const { + // We should use the view here, since this is used for queries. + auto rpv = dht::ring_position_view(rp); + auto crp = compatible_ring_position_or_view(*_schema, std::move(rpv)); + return interval_type::closed(crp, crp); +} + +std::pair +partitioned_sstable_set::query(const dht::partition_range& range) const { + if (range.start() && range.end()) { + return _leveled_sstables.equal_range(make_interval(range)); + } + else if (range.start() && !range.end()) { + auto start = singular(range.start()->value()); + return { _leveled_sstables.lower_bound(start), _leveled_sstables.end() }; + } else if (!range.start() && range.end()) { + auto end = singular(range.end()->value()); + return { _leveled_sstables.begin(), _leveled_sstables.upper_bound(end) }; + } else { + return { _leveled_sstables.begin(), _leveled_sstables.end() }; + } +} + +bool partitioned_sstable_set::store_as_unleveled(const shared_sstable& sst) const { + return _use_level_metadata && sst->get_sstable_level() == 0; +} + +dht::ring_position partitioned_sstable_set::to_ring_position(const compatible_ring_position_or_view& crp) { + // Ring position views, representing bounds of sstable intervals are + // guaranteed to have key() != nullptr; + const auto& pos = crp.position(); + return dht::ring_position(pos.token(), *pos.key()); +} + +dht::partition_range partitioned_sstable_set::to_partition_range(const interval_type& i) { + return dht::partition_range::make( + {to_ring_position(i.lower()), boost::icl::is_left_closed(i.bounds())}, + {to_ring_position(i.upper()), boost::icl::is_right_closed(i.bounds())}); +} + +dht::partition_range partitioned_sstable_set::to_partition_range(const dht::ring_position_view& pos, const interval_type& i) { + auto lower_bound = [&] { + if (pos.key()) { + return dht::partition_range::bound(dht::ring_position(pos.token(), *pos.key()), + pos.is_after_key() == dht::ring_position_view::after_key::no); + } else { + return dht::partition_range::bound(dht::ring_position(pos.token(), pos.get_token_bound()), true); + } + }(); + auto upper_bound = dht::partition_range::bound(to_ring_position(i.lower()), !boost::icl::is_left_closed(i.bounds())); + return dht::partition_range::make(std::move(lower_bound), std::move(upper_bound)); +} + +partitioned_sstable_set::partitioned_sstable_set(schema_ptr schema, bool use_level_metadata) + : _schema(std::move(schema)) + , _use_level_metadata(use_level_metadata) { +} + +std::unique_ptr partitioned_sstable_set::clone() const { + return std::make_unique(*this); +} + +std::vector partitioned_sstable_set::select(const dht::partition_range& range) const { + auto ipair = query(range); + auto b = std::move(ipair.first); + auto e = std::move(ipair.second); + value_set result; + while (b != e) { + boost::copy(b++->second, std::inserter(result, result.end())); + } + auto r = _unleveled_sstables; + r.insert(r.end(), result.begin(), result.end()); + return r; +} + +void partitioned_sstable_set::insert(shared_sstable sst) { + if (store_as_unleveled(sst)) { + _unleveled_sstables.push_back(std::move(sst)); + } else { + _leveled_sstables_change_cnt++; + _leveled_sstables.add({make_interval(*sst), value_set({sst})}); + } +} + +void partitioned_sstable_set::erase(shared_sstable sst) { + if (store_as_unleveled(sst)) { + _unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end()); + } else { + _leveled_sstables_change_cnt++; + _leveled_sstables.subtract({make_interval(*sst), value_set({sst})}); + } +} + +class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { + schema_ptr _schema; + const std::vector& _unleveled_sstables; + const interval_map_type& _leveled_sstables; + const uint64_t& _leveled_sstables_change_cnt; + uint64_t _last_known_leveled_sstables_change_cnt; + map_iterator _it; + // Only to back the dht::ring_position_view returned from select(). + dht::ring_position _next_position; +private: + dht::ring_position_view next_position(map_iterator it) { + if (it == _leveled_sstables.end()) { + _next_position = dht::ring_position::max(); + return dht::ring_position_view::max(); + } else { + _next_position = partitioned_sstable_set::to_ring_position(it->first.lower()); + return dht::ring_position_view(_next_position, dht::ring_position_view::after_key(!boost::icl::is_left_closed(it->first.bounds()))); + } + } + static bool is_before_interval(const compatible_ring_position_or_view& crp, const interval_type& interval) { + if (boost::icl::is_left_closed(interval.bounds())) { + return crp < interval.lower(); + } else { + return crp <= interval.lower(); + } + } + void maybe_invalidate_iterator(const compatible_ring_position_or_view& crp) { + if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) { + _it = _leveled_sstables.lower_bound(interval_type::closed(crp, crp)); + _last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt; + } + } +public: + incremental_selector(schema_ptr schema, const std::vector& unleveled_sstables, const interval_map_type& leveled_sstables, + const uint64_t& leveled_sstables_change_cnt) + : _schema(std::move(schema)) + , _unleveled_sstables(unleveled_sstables) + , _leveled_sstables(leveled_sstables) + , _leveled_sstables_change_cnt(leveled_sstables_change_cnt) + , _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt) + , _it(leveled_sstables.begin()) + , _next_position(dht::ring_position::min()) { + } + virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view& pos) override { + auto crp = compatible_ring_position_or_view(*_schema, pos); + auto ssts = _unleveled_sstables; + using namespace dht; + + maybe_invalidate_iterator(crp); + + while (_it != _leveled_sstables.end()) { + if (boost::icl::contains(_it->first, crp)) { + ssts.insert(ssts.end(), _it->second.begin(), _it->second.end()); + return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it))); + } + // We don't want to skip current interval if pos lies before it. + if (is_before_interval(crp, _it->first)) { + return std::make_tuple(partitioned_sstable_set::to_partition_range(pos, _it->first), std::move(ssts), next_position(_it)); + } + _it++; + } + return std::make_tuple(partition_range::make_open_ended_both_sides(), std::move(ssts), ring_position_view::max()); + } +}; + +std::unique_ptr partitioned_sstable_set::make_incremental_selector() const { + return std::make_unique(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt); +} + +std::unique_ptr compaction_strategy_impl::make_sstable_set(schema_ptr schema) const { + return std::make_unique(); +} + +std::unique_ptr leveled_compaction_strategy::make_sstable_set(schema_ptr schema) const { + return std::make_unique(std::move(schema)); +} + +sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata) { + return sstables::sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); +} + +sstable_set +compaction_strategy::make_sstable_set(schema_ptr schema) const { + return sstable_set( + _compaction_strategy_impl->make_sstable_set(schema), + schema, + make_lw_shared()); +} + +} // namespace sstables diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh new file mode 100644 index 0000000000..e0113a5a06 --- /dev/null +++ b/sstables/sstable_set_impl.hh @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "compatible_ring_position.hh" +#include "sstable_set.hh" + +namespace sstables { + +class incremental_selector_impl { +public: + virtual ~incremental_selector_impl() {} + virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) = 0; +}; + +class sstable_set_impl { +public: + virtual ~sstable_set_impl() {} + virtual std::unique_ptr clone() const = 0; + virtual std::vector select(const dht::partition_range& range) const = 0; + virtual void insert(shared_sstable sst) = 0; + virtual void erase(shared_sstable sst) = 0; + virtual std::unique_ptr make_incremental_selector() const = 0; +}; + +// default sstable_set, not specialized for anything +class bag_sstable_set : public sstable_set_impl { + // erasing is slow, but select() is fast + std::vector _sstables; +public: + virtual std::unique_ptr clone() const override; + virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; + virtual void insert(shared_sstable sst) override; + virtual void erase(shared_sstable sst) override; + virtual std::unique_ptr make_incremental_selector() const override; + class incremental_selector; +}; + +// specialized when sstables are partitioned in the token range space +// e.g. leveled compaction strategy +class partitioned_sstable_set : public sstable_set_impl { + using value_set = std::unordered_set; + using interval_map_type = boost::icl::interval_map; + using interval_type = interval_map_type::interval_type; + using map_iterator = interval_map_type::const_iterator; +private: + schema_ptr _schema; + std::vector _unleveled_sstables; + interval_map_type _leveled_sstables; + // Change counter on interval map for leveled sstables which is used by + // incremental selector to determine whether or not to invalidate iterators. + uint64_t _leveled_sstables_change_cnt = 0; + bool _use_level_metadata = false; +private: + static interval_type make_interval(const schema& s, const dht::partition_range& range); + interval_type make_interval(const dht::partition_range& range) const; + static interval_type make_interval(const schema_ptr& s, const sstable& sst); + interval_type make_interval(const sstable& sst); + interval_type singular(const dht::ring_position& rp) const; + std::pair query(const dht::partition_range& range) const; + // SSTables are stored separately to avoid interval map's fragmentation issue when level 0 falls behind. + bool store_as_unleveled(const shared_sstable& sst) const; +public: + static dht::ring_position to_ring_position(const compatible_ring_position_or_view& crp); + static dht::partition_range to_partition_range(const interval_type& i); + static dht::partition_range to_partition_range(const dht::ring_position_view& pos, const interval_type& i); + explicit partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true); + + virtual std::unique_ptr clone() const override; + virtual std::vector select(const dht::partition_range& range) const override; + virtual void insert(shared_sstable sst) override; + virtual void erase(shared_sstable sst) override; + virtual std::unique_ptr make_incremental_selector() const override; + class incremental_selector; +}; + +} // namespace sstables From 708093884cbc735fc8bfeef7a9ef4949f215c218 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 19 Nov 2020 13:45:00 +0100 Subject: [PATCH 02/10] mutation_reader: move mutation_reader::forwarding to flat_mutation_reader.hh Files which need this definition won't have to include mutation_reader.hh, only flat_mutation_reader.hh (so the inclusions are in total smaller; mutation_reader.hh includes flat_mutation_reader.hh). --- flat_mutation_reader.hh | 17 ++++++++++++++++- mutation_reader.hh | 15 --------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 8cb3ded3f8..bd79aa581b 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -310,7 +310,7 @@ private: friend class optimized_optional; void do_upgrade_schema(const schema_ptr&); public: - // Documented in mutation_reader::forwarding in mutation_reader.hh. + // Documented in mutation_reader::forwarding. class partition_range_forwarding_tag; using partition_range_forwarding = bool_class; @@ -508,6 +508,21 @@ flat_mutation_reader make_flat_mutation_reader(Args &&... args) { return flat_mutation_reader(std::make_unique(std::forward(args)...)); } +namespace mutation_reader { + // mutation_reader::forwarding determines whether fast_forward_to() may + // be used on the mutation reader to change the partition range being + // read. Enabling forwarding also changes read policy: forwarding::no + // means we will stop reading from disk at the end of the given range, + // but with forwarding::yes we may read ahead, anticipating the user to + // make a small skip with fast_forward_to() and continuing to read. + // + // Note that mutation_reader::forwarding is similarly name but different + // from streamed_mutation::forwarding - the former is about skipping to + // a different partition range, while the latter is about skipping + // inside a large partition. + using forwarding = flat_mutation_reader::partition_range_forwarding; +} + // Consumes mutation fragments until StopCondition is true. // The consumer will stop iff StopCondition returns true, in particular // reaching the end of stream alone won't stop the reader. diff --git a/mutation_reader.hh b/mutation_reader.hh index 519ba30f32..8222611a40 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -31,21 +31,6 @@ #include "flat_mutation_reader.hh" #include "reader_concurrency_semaphore.hh" -namespace mutation_reader { - // mutation_reader::forwarding determines whether fast_forward_to() may - // be used on the mutation reader to change the partition range being - // read. Enabling forwarding also changes read policy: forwarding::no - // means we will stop reading from disk at the end of the given range, - // but with forwarding::yes we may read ahead, anticipating the user to - // make a small skip with fast_forward_to() and continuing to read. - // - // Note that mutation_reader::forwarding is similarly name but different - // from streamed_mutation::forwarding - the former is about skipping to - // a different partition range, while the latter is about skipping - // inside a large partition. - using forwarding = flat_mutation_reader::partition_range_forwarding; -} - class reader_selector { protected: schema_ptr _s; From 40d8bfa394aa941507df20d1232396bc13c55a4f Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 8 Oct 2020 16:48:38 +0200 Subject: [PATCH 03/10] sstables: move sstable reader creation functions to `sstable_set` Lower level functions such as `create_single_key_sstable_reader` were made methods of `sstable_set`. The motivation is that each concrete sstable_set may decide to use a better sstable reading algorithm specific to the data structures used by this sstable_set. For this it needs to access the set's internals. A nice side effect is that we moved some code out of table.cc and database.hh which are huge files. --- database.hh | 44 ----- db/view/view.cc | 3 +- db/view/view_builder.hh | 1 - db/view/view_update_generator.cc | 2 +- sstables/compaction.cc | 9 +- sstables/sstable_set.cc | 264 +++++++++++++++++++++++++++- sstables/sstable_set.hh | 63 ++++++- table.cc | 262 +-------------------------- test/boost/mutation_reader_test.cc | 3 +- test/boost/sstable_datafile_test.cc | 3 +- 10 files changed, 334 insertions(+), 320 deletions(-) diff --git a/database.hh b/database.hh index 08158aa34c..5fd98b20da 100644 --- a/database.hh +++ b/database.hh @@ -1001,50 +1001,6 @@ public: friend class distributed_loader; }; -using sstable_reader_factory_type = std::function; - -// Filters out mutation that doesn't belong to current shard. -flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); - -/// Read a range from the passed-in sstables. -/// -/// The reader is unrestricted, but will account its resource usage on the -/// semaphore belonging to the passed-in permit. -flat_mutation_reader make_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); - -/// Read a range from the passed-in sstables. -/// -/// The reader is restricted, that is it will wait for admission on the semaphore -/// belonging to the passed-in permit, before starting to read. -flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); - class user_types_metadata; class keyspace_metadata final { diff --git a/db/view/view.cc b/db/view/view.cc index 2b59b0fc1e..bfdf14c5a7 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1430,10 +1430,9 @@ view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID bas void view_builder::initialize_reader_at_current_token(build_step& step) { step.pslice = make_partition_slice(*step.base->schema()); step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max()); - step.reader = make_local_shard_sstable_reader( + step.reader = step.base->get_sstable_set().make_local_shard_sstable_reader( step.base->schema(), _permit, - make_lw_shared(step.base->get_sstable_set()), step.prange, step.pslice, default_priority_class(), diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 0e4d997934..1364571806 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -28,7 +28,6 @@ #include "query-request.hh" #include "service/migration_listener.hh" #include "service/migration_manager.hh" -#include "sstables/sstable_set.hh" #include "utils/exponential_backoff_retry.hh" #include "utils/serialized_action.hh" #include "utils/UUID.hh" diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index a43c98097e..703d8ce454 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -82,7 +82,7 @@ future<> view_update_generator::start() { tracing::trace_state_ptr ts, streamed_mutation::forwarding fwd_ms, mutation_reader::forwarding fwd_mr) { - return ::make_restricted_range_sstable_reader(s, std::move(permit), std::move(ssts), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + return make_restricted_range_sstable_reader(std::move(ssts), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); }); auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader( std::move(ms), diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 8eff060c0c..8f795aa0a2 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -822,9 +822,8 @@ public: } flat_mutation_reader make_sstable_reader() const override { - return ::make_local_shard_sstable_reader(_schema, + return _compacting->make_local_shard_sstable_reader(_schema, _permit, - _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, @@ -869,9 +868,8 @@ public: } flat_mutation_reader make_sstable_reader() const override { - return ::make_local_shard_sstable_reader(_schema, + return _compacting->make_local_shard_sstable_reader(_schema, _permit, - _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, @@ -1341,9 +1339,8 @@ public: // Use reader that makes sure no non-local mutation will not be filtered out. flat_mutation_reader make_sstable_reader() const override { - return ::make_range_sstable_reader(_schema, + return _compacting->make_range_sstable_reader(_schema, _permit, - _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 255f7c41a4..3d12d2df3f 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -375,7 +375,7 @@ std::unique_ptr leveled_compaction_strategy::make_sstable_set( } sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata) { - return sstables::sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); + return sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); } sstable_set @@ -386,4 +386,266 @@ compaction_strategy::make_sstable_set(schema_ptr schema) const { make_lw_shared()); } +using sstable_reader_factory_type = std::function; + +static logging::logger irclogger("incremental_reader_selector"); + +// Incremental selector implementation for combined_mutation_reader that +// selects readers on-demand as the read progresses through the token +// range. +class incremental_reader_selector : public reader_selector { + const dht::partition_range* _pr; + lw_shared_ptr _sstables; + tracing::trace_state_ptr _trace_state; + std::optional _selector; + std::unordered_set _read_sstable_gens; + sstable_reader_factory_type _fn; + + flat_mutation_reader create_reader(shared_sstable sst) { + tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); })); + return _fn(sst, *_pr); + } + +public: + explicit incremental_reader_selector(schema_ptr s, + lw_shared_ptr sstables, + const dht::partition_range& pr, + tracing::trace_state_ptr trace_state, + sstable_reader_factory_type fn) + : reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min()) + , _pr(&pr) + , _sstables(std::move(sstables)) + , _trace_state(std::move(trace_state)) + , _selector(_sstables->make_incremental_selector()) + , _fn(std::move(fn)) { + + irclogger.trace("{}: created for range: {} with {} sstables", + fmt::ptr(this), + *_pr, + _sstables->all()->size()); + } + + incremental_reader_selector(const incremental_reader_selector&) = delete; + incremental_reader_selector& operator=(const incremental_reader_selector&) = delete; + + incremental_reader_selector(incremental_reader_selector&&) = delete; + incremental_reader_selector& operator=(incremental_reader_selector&&) = delete; + + virtual std::vector create_new_readers(const std::optional& pos) override { + irclogger.trace("{}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos)); + + auto readers = std::vector(); + + do { + auto selection = _selector->select(_selector_position); + _selector_position = selection.next_position; + + irclogger.trace("{}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(), + _selector_position); + + readers = boost::copy_range>(selection.sstables + | boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; }) + | boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); })); + } while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0)); + + irclogger.trace("{}: created {} new readers", fmt::ptr(this), readers.size()); + + // prevents sstable_set::incremental_selector::_current_sstables from holding reference to + // sstables when done selecting. + if (_selector_position.is_max()) { + _selector.reset(); + } + + return readers; + } + + virtual std::vector fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { + _pr = ≺ + + auto pos = dht::ring_position_view::for_range_start(*_pr); + if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) { + return create_new_readers(pos); + } + + return {}; + } +}; + +// Filter out sstables for reader using bloom filter +static std::vector +filter_sstable_for_reader_by_pk(std::vector&& sstables, column_family& cf, const schema_ptr& schema, + const dht::partition_range& pr, const key& key) { + const dht::ring_position& pr_key = pr.start()->value(); + auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const shared_sstable& sst) { + return cmp(pr_key, sst->get_first_decorated_key()) < 0 || + cmp(pr_key, sst->get_last_decorated_key()) > 0 || + !sst->filter_has_key(key); + }; + sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end()); + return sstables; +} + +// Filter out sstables for reader using sstable metadata that keeps track +// of a range for each clustering component. +static std::vector +filter_sstable_for_reader_by_ck(std::vector&& sstables, column_family& cf, const schema_ptr& schema, + const query::partition_slice& slice) { + // no clustering filtering is applied if schema defines no clustering key or + // compaction strategy thinks it will not benefit from such an optimization, + // or the partition_slice includes static columns. + if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) { + return sstables; + } + + ::cf_stats* stats = cf.cf_stats(); + stats->clustering_filter_count++; + stats->sstables_checked_by_clustering_filter += sstables.size(); + + auto ck_filtering_all_ranges = slice.get_all_ranges(); + // fast path to include all sstables if only one full range was specified. + // For example, this happens if query only specifies a partition key. + if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) { + stats->clustering_filter_fast_path_count++; + stats->surviving_sstables_after_clustering_filter += sstables.size(); + return sstables; + } + + auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const shared_sstable& sst) { + return sst->may_contain_rows(ranges); + }); + sstables.erase(skipped, sstables.end()); + stats->surviving_sstables_after_clustering_filter += sstables.size(); + + return sstables; +} + +flat_mutation_reader +sstable_set::create_single_key_sstable_reader( + column_family* cf, + schema_ptr schema, + reader_permit permit, + utils::estimated_histogram& sstable_histogram, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) const +{ + auto& pk = pr.start()->value().key(); + auto key = key::from_partition_key(*schema, *pk); + auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *cf, schema, pr, key); + auto num_sstables = selected_sstables.size(); + if (!num_sstables) { + return make_empty_flat_reader(schema, permit); + } + auto readers = boost::copy_range>( + filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice) + | boost::adaptors::transformed([&] (const shared_sstable& sstable) { + tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); })); + return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd); + }) + ); + + // If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition + // we want to emit partition_start/end if no rows were found, + // to prevent https://github.com/scylladb/scylla/issues/3552. + // + // Use `flat_mutation_reader_from_mutations` with an empty mutation to emit + // the partition_start/end pair and append it to the list of readers passed + // to make_combined_reader to ensure partition_start/end are emitted even if + // all sstables actually containing the partition were filtered. + auto num_readers = readers.size(); + if (num_readers != num_sstables) { + readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pk)}, slice, fwd)); + } + sstable_histogram.add(num_readers); + return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr); +} + +flat_mutation_reader +sstable_set::make_range_sstable_reader( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + read_monitor_generator& monitor_generator) const +{ + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] + (shared_sstable& sst, const dht::partition_range& pr) mutable { + return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst)); + }; + return make_combined_reader(s, std::move(permit), std::make_unique(s, + shared_from_this(), + pr, + std::move(trace_state), + std::move(reader_factory_fn)), + fwd, + fwd_mr); +} + +flat_mutation_reader +sstable_set::make_local_shard_sstable_reader( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + read_monitor_generator& monitor_generator) const +{ + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] + (shared_sstable& sst, const dht::partition_range& pr) mutable { + flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc, + trace_state, fwd, fwd_mr, monitor_generator(sst)); + if (sst->is_shared()) { + auto filter = [&s = *s](const dht::decorated_key& dk) -> bool { + return dht::shard_of(s, dk.token()) == this_shard_id(); + }; + reader = make_filtering_reader(std::move(reader), std::move(filter)); + } + return reader; + }; + return make_combined_reader(s, std::move(permit), std::make_unique(s, + shared_from_this(), + pr, + std::move(trace_state), + std::move(reader_factory_fn)), + fwd, + fwd_mr); +} + +flat_mutation_reader make_restricted_range_sstable_reader( + lw_shared_ptr sstables, + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + read_monitor_generator& monitor_generator) +{ + auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] ( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) { + return sstables->make_range_sstable_reader(std::move(s), std::move(permit), pr, slice, pc, + std::move(trace_state), fwd, fwd_mr, monitor_generator); + }); + return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); +} + } // namespace sstables diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index 28a953e7bc..69d9d006cb 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -21,11 +21,17 @@ #pragma once +#include "flat_mutation_reader.hh" +#include "sstables/progress_monitor.hh" #include "shared_sstable.hh" #include "dht/i_partitioner.hh" #include #include +namespace utils { +class estimated_histogram; +} + namespace sstables { class sstable_set_impl; @@ -43,7 +49,7 @@ public: const sstable_list& all() const { return _all; } }; -class sstable_set { +class sstable_set : public enable_lw_shared_from_this { std::unique_ptr _impl; schema_ptr _schema; // used to support column_family::get_sstable(), which wants to return an sstable_list @@ -99,8 +105,63 @@ public: selection select(const dht::ring_position_view& pos) const; }; incremental_selector make_incremental_selector() const; + + flat_mutation_reader create_single_key_sstable_reader( + column_family*, + schema_ptr, + reader_permit, + utils::estimated_histogram&, + const dht::partition_range&, // must be singular + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding) const; + + /// Read a range from the sstable set. + /// + /// The reader is unrestricted, but will account its resource usage on the + /// semaphore belonging to the passed-in permit. + flat_mutation_reader make_range_sstable_reader( + schema_ptr, + reader_permit, + const dht::partition_range&, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding, + read_monitor_generator& rmg = default_read_monitor_generator()) const; + + // Filters out mutations that don't belong to the current shard. + flat_mutation_reader make_local_shard_sstable_reader( + schema_ptr, + reader_permit, + const dht::partition_range&, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding, + read_monitor_generator& rmg = default_read_monitor_generator()) const; }; +/// Read a range from the passed-in sstables. +/// +/// The reader is restricted, that is it will wait for admission on the semaphore +/// belonging to the passed-in permit, before starting to read. +flat_mutation_reader make_restricted_range_sstable_reader( + lw_shared_ptr sstables, + schema_ptr, + reader_permit, + const dht::partition_range&, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding, + read_monitor_generator& rmg = default_read_monitor_generator()); + sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata = true); std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run); diff --git a/table.cc b/table.cc index 0be1ff8146..97bc41f173 100644 --- a/table.cc +++ b/table.cc @@ -25,10 +25,7 @@ #include "service/priority_manager.hh" #include "db/schema_tables.hh" #include "cell_locking.hh" -#include "mutation_fragment.hh" -#include "mutation_partition.hh" #include "utils/logalloc.hh" -#include "sstables/progress_monitor.hh" #include "checked-file-impl.hh" #include "view_info.hh" #include "db/data_listeners.hh" @@ -52,229 +49,6 @@ static seastar::metrics::label keyspace_label("ks"); using namespace std::chrono_literals; -// Filter out sstables for reader using bloom filter -static std::vector -filter_sstable_for_reader_by_pk(std::vector&& sstables, column_family& cf, const schema_ptr& schema, - const dht::partition_range& pr, const sstables::key& key) { - const dht::ring_position& pr_key = pr.start()->value(); - auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const sstables::shared_sstable& sst) { - return cmp(pr_key, sst->get_first_decorated_key()) < 0 || - cmp(pr_key, sst->get_last_decorated_key()) > 0 || - !sst->filter_has_key(key); - }; - sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end()); - return sstables; -} - -// Filter out sstables for reader using sstable metadata that keeps track -// of a range for each clustering component. -static std::vector -filter_sstable_for_reader_by_ck(std::vector&& sstables, column_family& cf, const schema_ptr& schema, - const query::partition_slice& slice) { - // no clustering filtering is applied if schema defines no clustering key or - // compaction strategy thinks it will not benefit from such an optimization, - // or the partition_slice includes static columns. - if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) { - return sstables; - } - - ::cf_stats* stats = cf.cf_stats(); - stats->clustering_filter_count++; - stats->sstables_checked_by_clustering_filter += sstables.size(); - - auto ck_filtering_all_ranges = slice.get_all_ranges(); - // fast path to include all sstables if only one full range was specified. - // For example, this happens if query only specifies a partition key. - if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) { - stats->clustering_filter_fast_path_count++; - stats->surviving_sstables_after_clustering_filter += sstables.size(); - return sstables; - } - - auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const sstables::shared_sstable& sst) { - return sst->may_contain_rows(ranges); - }); - sstables.erase(skipped, sstables.end()); - stats->surviving_sstables_after_clustering_filter += sstables.size(); - - return sstables; -} - -// Incremental selector implementation for combined_mutation_reader that -// selects readers on-demand as the read progresses through the token -// range. -class incremental_reader_selector : public reader_selector { - const dht::partition_range* _pr; - lw_shared_ptr _sstables; - tracing::trace_state_ptr _trace_state; - std::optional _selector; - std::unordered_set _read_sstable_gens; - sstable_reader_factory_type _fn; - - flat_mutation_reader create_reader(sstables::shared_sstable sst) { - tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); })); - return _fn(sst, *_pr); - } - -public: - explicit incremental_reader_selector(schema_ptr s, - lw_shared_ptr sstables, - const dht::partition_range& pr, - tracing::trace_state_ptr trace_state, - sstable_reader_factory_type fn) - : reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min()) - , _pr(&pr) - , _sstables(std::move(sstables)) - , _trace_state(std::move(trace_state)) - , _selector(_sstables->make_incremental_selector()) - , _fn(std::move(fn)) { - - tlogger.trace("incremental_reader_selector {}: created for range: {} with {} sstables", - fmt::ptr(this), - *_pr, - _sstables->all()->size()); - } - - incremental_reader_selector(const incremental_reader_selector&) = delete; - incremental_reader_selector& operator=(const incremental_reader_selector&) = delete; - - incremental_reader_selector(incremental_reader_selector&&) = delete; - incremental_reader_selector& operator=(incremental_reader_selector&&) = delete; - - virtual std::vector create_new_readers(const std::optional& pos) override { - tlogger.trace("incremental_reader_selector {}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos)); - - auto readers = std::vector(); - - do { - auto selection = _selector->select(_selector_position); - _selector_position = selection.next_position; - - tlogger.trace("incremental_reader_selector {}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(), - _selector_position); - - readers = boost::copy_range>(selection.sstables - | boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; }) - | boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); })); - } while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0)); - - tlogger.trace("incremental_reader_selector {}: created {} new readers", fmt::ptr(this), readers.size()); - - // prevents sstable_set::incremental_selector::_current_sstables from holding reference to - // sstables when done selecting. - if (_selector_position.is_max()) { - _selector.reset(); - } - - return readers; - } - - virtual std::vector fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { - _pr = ≺ - - auto pos = dht::ring_position_view::for_range_start(*_pr); - if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) { - return create_new_readers(pos); - } - - return {}; - } -}; - -static flat_mutation_reader -create_single_key_sstable_reader(column_family* cf, - schema_ptr schema, - reader_permit permit, - lw_shared_ptr sstables, - utils::estimated_histogram& sstable_histogram, - const dht::partition_range& pr, // must be singular - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) -{ - auto& pk = pr.start()->value().key(); - auto key = sstables::key::from_partition_key(*schema, *pk); - auto selected_sstables = filter_sstable_for_reader_by_pk(sstables->select(pr), *cf, schema, pr, key); - auto num_sstables = selected_sstables.size(); - if (!num_sstables) { - return make_empty_flat_reader(schema, permit); - } - auto readers = boost::copy_range>( - filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice) - | boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) { - tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); })); - return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd); - }) - ); - - // If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition - // we want to emit partition_start/end if no rows were found, - // to prevent https://github.com/scylladb/scylla/issues/3552. - // - // Use `flat_mutation_reader_from_mutations` with an empty mutation to emit - // the partition_start/end pair and append it to the list of readers passed - // to make_combined_reader to ensure partition_start/end are emitted even if - // all sstables actually containing the partition were filtered. - auto num_readers = readers.size(); - if (num_readers != num_sstables) { - readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pk)}, slice, fwd)); - } - sstable_histogram.add(num_readers); - return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr); -} - -flat_mutation_reader make_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator) -{ - auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] - (sstables::shared_sstable& sst, const dht::partition_range& pr) mutable { - return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst)); - }; - return make_combined_reader(s, std::move(permit), std::make_unique(s, - std::move(sstables), - pr, - std::move(trace_state), - std::move(reader_factory_fn)), - fwd, - fwd_mr); -} - -flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator) -{ - auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] ( - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) { - return make_range_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc, - std::move(trace_state), fwd, fwd_mr, monitor_generator); - }); - return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); -} - flat_mutation_reader table::make_sstable_reader(schema_ptr s, reader_permit permit, @@ -315,7 +89,7 @@ table::make_sstable_reader(schema_ptr s, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), std::move(sstables), + return sstables->create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), _stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } else { @@ -328,7 +102,7 @@ table::make_sstable_reader(schema_ptr s, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return make_local_shard_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc, + return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } @@ -519,38 +293,6 @@ static bool belongs_to_other_shard(const std::vector& shards) { return shards.size() != size_t(belongs_to_current_shard(shards)); } -flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator) -{ - auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] - (sstables::shared_sstable& sst, const dht::partition_range& pr) mutable { - flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc, - trace_state, fwd, fwd_mr, monitor_generator(sst)); - if (sst->is_shared()) { - auto filter = [&s = *s](const dht::decorated_key& dk) -> bool { - return dht::shard_of(s, dk.token()) == this_shard_id(); - }; - reader = make_filtering_reader(std::move(reader), std::move(filter)); - } - return reader; - }; - return make_combined_reader(s, std::move(permit), std::make_unique(s, - std::move(sstables), - pr, - std::move(trace_state), - std::move(reader_factory_fn)), - fwd, - fwd_mr); -} - sstables::shared_sstable table::make_sstable(sstring dir, int64_t generation, sstables::sstable_version_types v, sstables::sstable_format_types f, io_error_handler_gen error_handler_gen) { return get_sstables_manager().make_sstable(_schema, dir, generation, v, f, gc_clock::now(), error_handler_gen); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index a74d413431..b69bc3c053 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -646,10 +646,9 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) { auto list_reader = make_combined_reader(s.schema(), tests::make_permit(), std::move(sstable_mutation_readers)); - auto incremental_reader = make_local_shard_sstable_reader( + auto incremental_reader = sstable_set->make_local_shard_sstable_reader( s.schema(), tests::make_permit(), - sstable_set, query::full_partition_range, s.schema()->full_slice(), seastar::default_priority_class(), diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 472291072c..be6697850f 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -6083,9 +6083,8 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) { for (auto&& sst : all) { compacting->insert(std::move(sst)); } - auto reader = ::make_range_sstable_reader(s, + auto reader = compacting->make_range_sstable_reader(s, tests::make_permit(), - compacting, query::full_partition_range, s->full_slice(), service::get_local_compaction_priority(), From 68663d0de0c5029e8b958f08848ecd4e00a018e0 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 8 Oct 2020 17:15:38 +0200 Subject: [PATCH 04/10] sstables: pass ring_position to create_single_key_sstable_reader instead of partition_range. It would be best to pass `partition_key` or `decorated_key` here. However, the implementation of this function needs a `partition_range` to pass into `sstable_set::select`, and `partition_range` must be constructed from `ring_position`s. We could create the `ring_position` internally from the key but that would involve a copy which we want to avoid. --- sstables/sstable_set.cc | 17 ++++++++--------- sstables/sstable_set.hh | 2 +- table.cc | 3 ++- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 3d12d2df3f..e96ed4a4b3 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -474,11 +474,10 @@ public: // Filter out sstables for reader using bloom filter static std::vector filter_sstable_for_reader_by_pk(std::vector&& sstables, column_family& cf, const schema_ptr& schema, - const dht::partition_range& pr, const key& key) { - const dht::ring_position& pr_key = pr.start()->value(); + const dht::ring_position& pos, const key& key) { auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const shared_sstable& sst) { - return cmp(pr_key, sst->get_first_decorated_key()) < 0 || - cmp(pr_key, sst->get_last_decorated_key()) > 0 || + return cmp(pos, sst->get_first_decorated_key()) < 0 || + cmp(pos, sst->get_last_decorated_key()) > 0 || !sst->filter_has_key(key); }; sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end()); @@ -525,16 +524,16 @@ sstable_set::create_single_key_sstable_reader( schema_ptr schema, reader_permit permit, utils::estimated_histogram& sstable_histogram, - const dht::partition_range& pr, + const dht::ring_position& pos, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) const { - auto& pk = pr.start()->value().key(); + auto& pk = pos.key(); auto key = key::from_partition_key(*schema, *pk); - auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *cf, schema, pr, key); + auto selected_sstables = filter_sstable_for_reader_by_pk(select({pos}), *cf, schema, pos, key); auto num_sstables = selected_sstables.size(); if (!num_sstables) { return make_empty_flat_reader(schema, permit); @@ -542,8 +541,8 @@ sstable_set::create_single_key_sstable_reader( auto readers = boost::copy_range>( filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice) | boost::adaptors::transformed([&] (const shared_sstable& sstable) { - tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); })); - return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd); + tracing::trace(trace_state, "Reading key {} from sstable {}", pos, seastar::value_of([&sstable] { return sstable->get_filename(); })); + return sstable->read_row_flat(schema, permit, pos, slice, pc, trace_state, fwd); }) ); diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index 69d9d006cb..f0f5a86c73 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -111,7 +111,7 @@ public: schema_ptr, reader_permit, utils::estimated_histogram&, - const dht::partition_range&, // must be singular + const dht::ring_position&, // must contain a key const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, diff --git a/table.cc b/table.cc index 97bc41f173..703879a93e 100644 --- a/table.cc +++ b/table.cc @@ -89,8 +89,9 @@ table::make_sstable_reader(schema_ptr s, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { + assert(pr.is_singular() && pr.start()->value().has_key()); return sstables->create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), - _stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); + _stats.estimated_sstable_per_read, pr.start()->value(), slice, pc, std::move(trace_state), fwd, fwd_mr); }); } else { return mutation_source([sstables=std::move(sstables)] ( From 6c8b0af5055cbd528bd8d7d6af574c894ec99b41 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 12 Oct 2020 13:11:07 +0200 Subject: [PATCH 05/10] sstable_set: refactor filter_sstable_for_reader_by_pk Introduce a `make_pk_filter` function, which given a ring position, returns a boolean function (a filter) that given a sstable, tells whether the sstable may contain rows with the given position. The logic has been extracted from `filter_sstable_for_reader_by_pk`. --- sstables/sstable_set.cc | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index e96ed4a4b3..4e9b9f0d17 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -471,16 +471,27 @@ public: } }; +// The returned function uses the bloom filter to check whether the given sstable +// may have a partition given by the ring position `pos`. +// +// Returning `false` means the sstable doesn't have such a partition. +// Returning `true` means it may, i.e. we don't know whether or not it does. +// +// Assumes the given `pos` and `schema` are alive during the function's lifetime. +static std::predicate auto +make_pk_filter(const dht::ring_position& pos, const schema& schema) { + return [&pos, key = key::from_partition_key(schema, *pos.key()), cmp = dht::ring_position_comparator(schema)] (const sstable& sst) { + return cmp(pos, sst.get_first_decorated_key()) >= 0 && + cmp(pos, sst.get_last_decorated_key()) <= 0 && + sst.filter_has_key(key); + }; +} + // Filter out sstables for reader using bloom filter static std::vector -filter_sstable_for_reader_by_pk(std::vector&& sstables, column_family& cf, const schema_ptr& schema, - const dht::ring_position& pos, const key& key) { - auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const shared_sstable& sst) { - return cmp(pos, sst->get_first_decorated_key()) < 0 || - cmp(pos, sst->get_last_decorated_key()) > 0 || - !sst->filter_has_key(key); - }; - sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end()); +filter_sstable_for_reader_by_pk(std::vector&& sstables, const schema& schema, const dht::ring_position& pos) { + auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); }; + sstables.erase(boost::remove_if(sstables, filter), sstables.end()); return sstables; } @@ -531,9 +542,7 @@ sstable_set::create_single_key_sstable_reader( streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) const { - auto& pk = pos.key(); - auto key = key::from_partition_key(*schema, *pk); - auto selected_sstables = filter_sstable_for_reader_by_pk(select({pos}), *cf, schema, pos, key); + auto selected_sstables = filter_sstable_for_reader_by_pk(select({pos}), *schema, pos); auto num_sstables = selected_sstables.size(); if (!num_sstables) { return make_empty_flat_reader(schema, permit); @@ -556,7 +565,7 @@ sstable_set::create_single_key_sstable_reader( // all sstables actually containing the partition were filtered. auto num_readers = readers.size(); if (num_readers != num_sstables) { - readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pk)}, slice, fwd)); + readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pos.key())}, slice, fwd)); } sstable_histogram.add(num_readers); return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr); From d158921966ee8d58c1c5bdc2cc7173c5dab7cf5d Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 23 Nov 2020 14:00:38 +0100 Subject: [PATCH 06/10] sstables: add `may_have_partition_tombstones` method For sstable versions greater or equal than md, the `min_max_column_names` sstable metadata gives a range of position-in-partitions such that all clustering rows stored in this sstable have positions in this range. Partition tombstones in this context are understood as covering the entire range of clustering keys; thus, if the sstable contains at least one partition tombstone, the sstable position range is set to be the range of all clustered rows. Therefore, by checking that the position range is *not* the range of all clustered rows we know that the sstable cannot have any partition tombstones. Closes #7678 --- position_in_partition.hh | 6 +++++ sstables/sstables.hh | 6 +++++ test/boost/sstable_datafile_test.cc | 40 +++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/position_in_partition.hh b/position_in_partition.hh index 7dbbd987cc..6deeaa2c6e 100644 --- a/position_in_partition.hh +++ b/position_in_partition.hh @@ -593,6 +593,7 @@ public: position_in_partition&& end() && { return std::move(_end); } bool contains(const schema& s, position_in_partition_view pos) const; bool overlaps(const schema& s, position_in_partition_view start, position_in_partition_view end) const; + bool is_all_clustered_rows(const schema&) const; friend std::ostream& operator<<(std::ostream&, const position_range&); }; @@ -610,3 +611,8 @@ bool position_range::overlaps(const schema& s, position_in_partition_view start, position_in_partition::less_compare less(s); return !less(end, _start) && less(start, _end); } + +inline +bool position_range::is_all_clustered_rows(const schema& s) const { + return _start.is_before_all_clustered_rows(s) && _end.is_after_all_clustered_rows(s); +} diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 4a56abba33..b070179d63 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -797,6 +797,12 @@ public: // Return true if this sstable possibly stores clustering row(s) specified by ranges. bool may_contain_rows(const query::clustering_row_ranges& ranges) const; + // false => there are no partition tombstones, true => we don't know + bool may_have_partition_tombstones() const { + return !has_correct_min_max_column_names() + || _position_range.is_all_clustered_rows(*_schema); + } + // Allow the test cases from sstable_test.cc to test private methods. We use // a placeholder to avoid cluttering this class too much. The sstable_test class // will then re-export as public every method it needs. diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 472291072c..c0f46baf4b 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -6685,3 +6685,43 @@ SEASTAR_TEST_CASE(test_zero_estimated_partitions) { return make_ready_future<>(); }); } + +SEASTAR_TEST_CASE(test_may_have_partition_tombstones) { + return test_env::do_with_async([] (test_env& env) { + storage_service_for_tests ssft; + simple_schema ss; + auto s = ss.schema(); + auto pks = ss.make_pkeys(2); + + auto tmp = tmpdir(); + unsigned gen = 0; + for (auto version : all_sstable_versions) { + if (version < sstable_version_types::md) { + continue; + } + + auto mut1 = mutation(s, pks[0]); + auto mut2 = mutation(s, pks[1]); + mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp()); + mut1.partition().apply_delete(*s, ss.make_ckey(1), ss.new_tombstone()); + ss.add_row(mut1, ss.make_ckey(2), "val"); + ss.delete_range(mut1, query::clustering_range::make({ss.make_ckey(3)}, {ss.make_ckey(5)})); + ss.add_row(mut2, ss.make_ckey(6), "val"); + + auto sst_gen = [&env, s, &tmp, &gen, version] () { + return env.make_sstable(s, tmp.path().string(), ++gen, version, big); + }; + + { + auto sst = make_sstable_containing(sst_gen, {mut1, mut2}); + sst->load().get(); + BOOST_REQUIRE(!sst->may_have_partition_tombstones()); + } + + mut2.partition().apply(ss.new_tombstone()); + auto sst = make_sstable_containing(sst_gen, {mut1, mut2}); + sst->load().get(); + BOOST_REQUIRE(sst->may_have_partition_tombstones()); + } + }); +} From 1b2155eb1d3752c257e033cc10a284866266ce12 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 24 Nov 2020 15:28:12 +0800 Subject: [PATCH 07/10] repair: Use same description for the same metric In commit 9b28162f88121938219955f05cced490840764ed (repair: Use label for node ops metrics), we switched to use label for different node operations. We should use the same description for the same metric name. Fixes #7681 Closes #7682 --- repair/repair.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 747cd3da19..e7b5c2f399 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -60,17 +60,17 @@ public: auto ops_label_type = sm::label("ops"); _metrics.add_group("node_ops", { sm::make_gauge("finished_percentage", [this] { return bootstrap_finished_percentage(); }, - sm::description("Number of finished percentage for bootstrap operation on this shard."), {ops_label_type("bootstrap")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("bootstrap")}), sm::make_gauge("finished_percentage", [this] { return replace_finished_percentage(); }, - sm::description("Number of finished percentage for replace operation on this shard."), {ops_label_type("replace")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("replace")}), sm::make_gauge("finished_percentage", [this] { return rebuild_finished_percentage(); }, - sm::description("Number of finished percentage for rebuild operation on this shard."), {ops_label_type("rebuild")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("rebuild")}), sm::make_gauge("finished_percentage", [this] { return decommission_finished_percentage(); }, - sm::description("Number of finished percentage for decommission operation on this shard."), {ops_label_type("decommission")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("decommission")}), sm::make_gauge("finished_percentage", [this] { return removenode_finished_percentage(); }, - sm::description("Number of finished percentage for removenode operation on this shard."), {ops_label_type("removenode")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("removenode")}), sm::make_gauge("finished_percentage", [this] { return repair_finished_percentage(); }, - sm::description("Number of finished percentage for repair operation on this shard."), {ops_label_type("repair")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("repair")}), }); } uint64_t bootstrap_total_ranges{0}; From e3a886738b59f80801a95673418e17d4a7652ce8 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 23 Nov 2020 09:15:22 +0200 Subject: [PATCH 08/10] raft: test: fix snapshot correctness check Snapshot index cannot be used to check snapshot correctness since some entries may not be command and thus do not affect snapshot value. Lest use applied entries count instead. --- test/raft/replication_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc index 73199ba253..0d2706705d 100644 --- a/test/raft/replication_test.cc +++ b/test/raft/replication_test.cc @@ -588,7 +588,7 @@ future run_test(test_case test) { for (auto& s : persisted_snapshots) { auto& [snp, val] = s.second; auto digest = val.value.get_value(); - expected = sm_value_for(snp.idx).get_value(); + expected = sm_value_for(val.idx).get_value(); if (digest != expected) { tlogger.debug("Persisted snapshot {} doesn't match {} != {}", snp.id, digest, expected); fail = -1; From 6130fb8b3993a59719fcec8fffd2783979826c57 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 19 Nov 2020 15:50:21 +0200 Subject: [PATCH 09/10] raft: commit a dummy entry after leader change After a node becomes leader it needs to do two things: send an append message to establish its leadership and commit one entry to make sure all previous entries with smaller terms are committed as well. --- raft/fsm.cc | 5 +++++ test/boost/raft_fsm_test.cc | 6 ++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/raft/fsm.cc b/raft/fsm.cc index f3b338191f..68ac47e2b4 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -98,6 +98,11 @@ void fsm::become_leader() { _log_limiter_semaphore->sem.consume(_log.non_snapshoted_length()); _tracker->set_configuration(_current_config.servers, _log.next_idx()); _last_election_time = _clock.now(); + // a new leader needs to commit at lease one entry to make sure that + // all existing entries in its log are commited as well. Also it should + // send append entries rpc as soon as possible to establish its leqdership + // (3.4). Do both of those by commiting a dummy entry. + add_entry(log_entry::dummy()); replicate(); } diff --git a/test/boost/raft_fsm_test.cc b/test/boost/raft_fsm_test.cc index f2b8ec1158..d9cf85a81a 100644 --- a/test/boost/raft_fsm_test.cc +++ b/test/boost/raft_fsm_test.cc @@ -69,7 +69,8 @@ BOOST_AUTO_TEST_CASE(test_election_single_node) { BOOST_CHECK(output.term); BOOST_CHECK(output.vote); BOOST_CHECK(output.messages.empty()); - BOOST_CHECK(output.log_entries.empty()); + // A new leader applies one dummy entry + BOOST_CHECK(output.log_entries.size() == 1 && std::holds_alternative(output.log_entries[0]->data)); BOOST_CHECK(output.committed.empty()); // The leader does not become candidate simply because // a timeout has elapsed, i.e. there are no spurious @@ -81,7 +82,8 @@ BOOST_AUTO_TEST_CASE(test_election_single_node) { BOOST_CHECK(!output.vote); BOOST_CHECK(output.messages.empty()); BOOST_CHECK(output.log_entries.empty()); - BOOST_CHECK(output.committed.empty()); + // Dummy entry is now commited + BOOST_CHECK(output.committed.size() == 1 && std::holds_alternative(output.committed[0]->data)); } // Test that adding an entry to a single-node cluster From 51d1d20687e26e1a27dcc8b7e968903497f55b28 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 19 Nov 2020 17:08:45 +0200 Subject: [PATCH 10/10] raft: test: replication works on leader change without adding an entry Check that a newly elected leader commits all the entries in its log without waiting for more entries to be submitted. --- test/raft/replication_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc index 0d2706705d..d6ad6574e7 100644 --- a/test/raft/replication_test.cc +++ b/test/raft/replication_test.cc @@ -617,6 +617,8 @@ int main(int argc, char* argv[]) { {.name = "non_empty_leader_log", .nodes = 2, .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3}}}}, .updates = {entries{4}}}, + {.name = "non_empty_leader_log_no_new_entries", .nodes = 2, .total_values = 4, + .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3}}}}}, // 1 nodes, 12 client entries {.name = "simple_1_auto_12", .nodes = 1, .initial_states = {}, .updates = {entries{12}}},