Merge "raft: fix replication if existing log on leader" from Gleb
* scylla-dev/add_dummy_v2: raft: test: replication works on leader change without adding an entry raft: commit a dummy entry after leader change raft: test: fix snapshot correctness check sstables: add `may_have_partition_tombstones` method
This commit is contained in:
@@ -575,6 +575,7 @@ scylla_core = (['database.cc',
|
||||
'sstables/mp_row_consumer.cc',
|
||||
'sstables/sstables.cc',
|
||||
'sstables/sstables_manager.cc',
|
||||
'sstables/sstable_set.cc',
|
||||
'sstables/mx/writer.cc',
|
||||
'sstables/kl/writer.cc',
|
||||
'sstables/sstable_version.cc',
|
||||
|
||||
44
database.hh
44
database.hh
@@ -1001,50 +1001,6 @@ public:
|
||||
friend class distributed_loader;
|
||||
};
|
||||
|
||||
using sstable_reader_factory_type = std::function<flat_mutation_reader(sstables::shared_sstable&, const dht::partition_range& pr)>;
|
||||
|
||||
// Filters out mutation that doesn't belong to current shard.
|
||||
flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
|
||||
|
||||
/// Read a range from the passed-in sstables.
|
||||
///
|
||||
/// The reader is unrestricted, but will account its resource usage on the
|
||||
/// semaphore belonging to the passed-in permit.
|
||||
flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
|
||||
|
||||
/// Read a range from the passed-in sstables.
|
||||
///
|
||||
/// The reader is restricted, that is it will wait for admission on the semaphore
|
||||
/// belonging to the passed-in permit, before starting to read.
|
||||
flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
|
||||
|
||||
class user_types_metadata;
|
||||
|
||||
class keyspace_metadata final {
|
||||
|
||||
@@ -1438,10 +1438,9 @@ view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID bas
|
||||
void view_builder::initialize_reader_at_current_token(build_step& step) {
|
||||
step.pslice = make_partition_slice(*step.base->schema());
|
||||
step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max());
|
||||
step.reader = make_local_shard_sstable_reader(
|
||||
step.reader = step.base->get_sstable_set().make_local_shard_sstable_reader(
|
||||
step.base->schema(),
|
||||
_permit,
|
||||
make_lw_shared<sstables::sstable_set>(step.base->get_sstable_set()),
|
||||
step.prange,
|
||||
step.pslice,
|
||||
default_priority_class(),
|
||||
|
||||
@@ -28,7 +28,6 @@
|
||||
#include "query-request.hh"
|
||||
#include "service/migration_listener.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
@@ -82,7 +82,7 @@ future<> view_update_generator::start() {
|
||||
tracing::trace_state_ptr ts,
|
||||
streamed_mutation::forwarding fwd_ms,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return ::make_restricted_range_sstable_reader(s, std::move(permit), std::move(ssts), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
|
||||
return make_restricted_range_sstable_reader(std::move(ssts), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
|
||||
});
|
||||
auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
|
||||
@@ -310,7 +310,7 @@ private:
|
||||
friend class optimized_optional<flat_mutation_reader>;
|
||||
void do_upgrade_schema(const schema_ptr&);
|
||||
public:
|
||||
// Documented in mutation_reader::forwarding in mutation_reader.hh.
|
||||
// Documented in mutation_reader::forwarding.
|
||||
class partition_range_forwarding_tag;
|
||||
using partition_range_forwarding = bool_class<partition_range_forwarding_tag>;
|
||||
|
||||
@@ -519,6 +519,21 @@ flat_mutation_reader make_flat_mutation_reader(Args &&... args) {
|
||||
return flat_mutation_reader(std::make_unique<Impl>(std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
namespace mutation_reader {
|
||||
// mutation_reader::forwarding determines whether fast_forward_to() may
|
||||
// be used on the mutation reader to change the partition range being
|
||||
// read. Enabling forwarding also changes read policy: forwarding::no
|
||||
// means we will stop reading from disk at the end of the given range,
|
||||
// but with forwarding::yes we may read ahead, anticipating the user to
|
||||
// make a small skip with fast_forward_to() and continuing to read.
|
||||
//
|
||||
// Note that mutation_reader::forwarding is similarly name but different
|
||||
// from streamed_mutation::forwarding - the former is about skipping to
|
||||
// a different partition range, while the latter is about skipping
|
||||
// inside a large partition.
|
||||
using forwarding = flat_mutation_reader::partition_range_forwarding;
|
||||
}
|
||||
|
||||
// Consumes mutation fragments until StopCondition is true.
|
||||
// The consumer will stop iff StopCondition returns true, in particular
|
||||
// reaching the end of stream alone won't stop the reader.
|
||||
|
||||
@@ -31,21 +31,6 @@
|
||||
#include "flat_mutation_reader.hh"
|
||||
#include "reader_concurrency_semaphore.hh"
|
||||
|
||||
namespace mutation_reader {
|
||||
// mutation_reader::forwarding determines whether fast_forward_to() may
|
||||
// be used on the mutation reader to change the partition range being
|
||||
// read. Enabling forwarding also changes read policy: forwarding::no
|
||||
// means we will stop reading from disk at the end of the given range,
|
||||
// but with forwarding::yes we may read ahead, anticipating the user to
|
||||
// make a small skip with fast_forward_to() and continuing to read.
|
||||
//
|
||||
// Note that mutation_reader::forwarding is similarly name but different
|
||||
// from streamed_mutation::forwarding - the former is about skipping to
|
||||
// a different partition range, while the latter is about skipping
|
||||
// inside a large partition.
|
||||
using forwarding = flat_mutation_reader::partition_range_forwarding;
|
||||
}
|
||||
|
||||
class reader_selector {
|
||||
protected:
|
||||
schema_ptr _s;
|
||||
|
||||
@@ -593,6 +593,7 @@ public:
|
||||
position_in_partition&& end() && { return std::move(_end); }
|
||||
bool contains(const schema& s, position_in_partition_view pos) const;
|
||||
bool overlaps(const schema& s, position_in_partition_view start, position_in_partition_view end) const;
|
||||
bool is_all_clustered_rows(const schema&) const;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream&, const position_range&);
|
||||
};
|
||||
@@ -610,3 +611,8 @@ bool position_range::overlaps(const schema& s, position_in_partition_view start,
|
||||
position_in_partition::less_compare less(s);
|
||||
return !less(end, _start) && less(start, _end);
|
||||
}
|
||||
|
||||
inline
|
||||
bool position_range::is_all_clustered_rows(const schema& s) const {
|
||||
return _start.is_before_all_clustered_rows(s) && _end.is_after_all_clustered_rows(s);
|
||||
}
|
||||
|
||||
@@ -98,6 +98,11 @@ void fsm::become_leader() {
|
||||
_log_limiter_semaphore->sem.consume(_log.non_snapshoted_length());
|
||||
_tracker->set_configuration(_current_config.servers, _log.next_idx());
|
||||
_last_election_time = _clock.now();
|
||||
// a new leader needs to commit at lease one entry to make sure that
|
||||
// all existing entries in its log are commited as well. Also it should
|
||||
// send append entries rpc as soon as possible to establish its leqdership
|
||||
// (3.4). Do both of those by commiting a dummy entry.
|
||||
add_entry(log_entry::dummy());
|
||||
replicate();
|
||||
}
|
||||
|
||||
|
||||
@@ -60,17 +60,17 @@ public:
|
||||
auto ops_label_type = sm::label("ops");
|
||||
_metrics.add_group("node_ops", {
|
||||
sm::make_gauge("finished_percentage", [this] { return bootstrap_finished_percentage(); },
|
||||
sm::description("Number of finished percentage for bootstrap operation on this shard."), {ops_label_type("bootstrap")}),
|
||||
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("bootstrap")}),
|
||||
sm::make_gauge("finished_percentage", [this] { return replace_finished_percentage(); },
|
||||
sm::description("Number of finished percentage for replace operation on this shard."), {ops_label_type("replace")}),
|
||||
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("replace")}),
|
||||
sm::make_gauge("finished_percentage", [this] { return rebuild_finished_percentage(); },
|
||||
sm::description("Number of finished percentage for rebuild operation on this shard."), {ops_label_type("rebuild")}),
|
||||
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("rebuild")}),
|
||||
sm::make_gauge("finished_percentage", [this] { return decommission_finished_percentage(); },
|
||||
sm::description("Number of finished percentage for decommission operation on this shard."), {ops_label_type("decommission")}),
|
||||
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("decommission")}),
|
||||
sm::make_gauge("finished_percentage", [this] { return removenode_finished_percentage(); },
|
||||
sm::description("Number of finished percentage for removenode operation on this shard."), {ops_label_type("removenode")}),
|
||||
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("removenode")}),
|
||||
sm::make_gauge("finished_percentage", [this] { return repair_finished_percentage(); },
|
||||
sm::description("Number of finished percentage for repair operation on this shard."), {ops_label_type("repair")}),
|
||||
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("repair")}),
|
||||
});
|
||||
}
|
||||
uint64_t bootstrap_total_ranges{0};
|
||||
|
||||
@@ -822,9 +822,8 @@ public:
|
||||
}
|
||||
|
||||
flat_mutation_reader make_sstable_reader() const override {
|
||||
return ::make_local_shard_sstable_reader(_schema,
|
||||
return _compacting->make_local_shard_sstable_reader(_schema,
|
||||
_permit,
|
||||
_compacting,
|
||||
query::full_partition_range,
|
||||
_schema->full_slice(),
|
||||
_io_priority,
|
||||
@@ -869,9 +868,8 @@ public:
|
||||
}
|
||||
|
||||
flat_mutation_reader make_sstable_reader() const override {
|
||||
return ::make_local_shard_sstable_reader(_schema,
|
||||
return _compacting->make_local_shard_sstable_reader(_schema,
|
||||
_permit,
|
||||
_compacting,
|
||||
query::full_partition_range,
|
||||
_schema->full_slice(),
|
||||
_io_priority,
|
||||
@@ -1341,9 +1339,8 @@ public:
|
||||
|
||||
// Use reader that makes sure no non-local mutation will not be filtered out.
|
||||
flat_mutation_reader make_sstable_reader() const override {
|
||||
return ::make_range_sstable_reader(_schema,
|
||||
return _compacting->make_range_sstable_reader(_schema,
|
||||
_permit,
|
||||
_compacting,
|
||||
query::full_partition_range,
|
||||
_schema->full_slice(),
|
||||
_io_priority,
|
||||
|
||||
@@ -47,10 +47,8 @@
|
||||
#include "compaction_strategy_impl.hh"
|
||||
#include "schema.hh"
|
||||
#include "sstable_set.hh"
|
||||
#include "compatible_ring_position.hh"
|
||||
#include <boost/range/algorithm/find.hpp>
|
||||
#include <boost/range/adaptors.hpp>
|
||||
#include <boost/icl/interval_map.hpp>
|
||||
#include <boost/algorithm/cxx11/any_of.hpp>
|
||||
#include "size_tiered_compaction_strategy.hh"
|
||||
#include "date_tiered_compaction_strategy.hh"
|
||||
@@ -64,384 +62,6 @@ logging::logger leveled_manifest::logger("LeveledManifest");
|
||||
|
||||
namespace sstables {
|
||||
|
||||
extern logging::logger clogger;
|
||||
|
||||
void sstable_run::insert(shared_sstable sst) {
|
||||
_all.insert(std::move(sst));
|
||||
}
|
||||
|
||||
void sstable_run::erase(shared_sstable sst) {
|
||||
_all.erase(sst);
|
||||
}
|
||||
|
||||
uint64_t sstable_run::data_size() const {
|
||||
return boost::accumulate(_all | boost::adaptors::transformed(std::mem_fn(&sstable::data_size)), uint64_t(0));
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) {
|
||||
os << "Run = {\n";
|
||||
if (run.all().empty()) {
|
||||
os << " Identifier: not found\n";
|
||||
} else {
|
||||
os << format(" Identifier: {}\n", (*run.all().begin())->run_identifier());
|
||||
}
|
||||
|
||||
auto frags = boost::copy_range<std::vector<shared_sstable>>(run.all());
|
||||
boost::sort(frags, [] (const shared_sstable& x, const shared_sstable& y) {
|
||||
return x->get_first_decorated_key().token() < y->get_first_decorated_key().token();
|
||||
});
|
||||
os << " Fragments = {\n";
|
||||
for (auto& frag : frags) {
|
||||
os << format(" {}={}:{}\n", frag->generation(), frag->get_first_decorated_key().token(), frag->get_last_decorated_key().token());
|
||||
}
|
||||
os << " }\n}\n";
|
||||
return os;
|
||||
}
|
||||
|
||||
class incremental_selector_impl {
|
||||
public:
|
||||
virtual ~incremental_selector_impl() {}
|
||||
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_view> select(const dht::ring_position_view&) = 0;
|
||||
};
|
||||
|
||||
class sstable_set_impl {
|
||||
public:
|
||||
virtual ~sstable_set_impl() {}
|
||||
virtual std::unique_ptr<sstable_set_impl> clone() const = 0;
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const = 0;
|
||||
virtual void insert(shared_sstable sst) = 0;
|
||||
virtual void erase(shared_sstable sst) = 0;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const = 0;
|
||||
};
|
||||
|
||||
sstable_set::sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s, lw_shared_ptr<sstable_list> all)
|
||||
: _impl(std::move(impl))
|
||||
, _schema(std::move(s))
|
||||
, _all(std::move(all)) {
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(const sstable_set& x)
|
||||
: _impl(x._impl->clone())
|
||||
, _schema(x._schema)
|
||||
, _all(make_lw_shared<sstable_list>(*x._all))
|
||||
, _all_runs(x._all_runs) {
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(sstable_set&&) noexcept = default;
|
||||
|
||||
sstable_set&
|
||||
sstable_set::operator=(const sstable_set& x) {
|
||||
if (this != &x) {
|
||||
auto tmp = sstable_set(x);
|
||||
*this = std::move(tmp);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_set&
|
||||
sstable_set::operator=(sstable_set&&) noexcept = default;
|
||||
|
||||
std::vector<shared_sstable>
|
||||
sstable_set::select(const dht::partition_range& range) const {
|
||||
return _impl->select(range);
|
||||
}
|
||||
|
||||
std::vector<sstable_run>
|
||||
sstable_set::select_sstable_runs(const std::vector<shared_sstable>& sstables) const {
|
||||
auto run_ids = boost::copy_range<std::unordered_set<utils::UUID>>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier)));
|
||||
return boost::copy_range<std::vector<sstable_run>>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) {
|
||||
return _all_runs.at(run_id);
|
||||
}));
|
||||
}
|
||||
|
||||
void
|
||||
sstable_set::insert(shared_sstable sst) {
|
||||
_impl->insert(sst);
|
||||
try {
|
||||
_all->insert(sst);
|
||||
try {
|
||||
_all_runs[sst->run_identifier()].insert(sst);
|
||||
} catch (...) {
|
||||
_all->erase(sst);
|
||||
throw;
|
||||
}
|
||||
} catch (...) {
|
||||
_impl->erase(sst);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
sstable_set::erase(shared_sstable sst) {
|
||||
_impl->erase(sst);
|
||||
_all->erase(sst);
|
||||
_all_runs[sst->run_identifier()].erase(sst);
|
||||
}
|
||||
|
||||
sstable_set::~sstable_set() = default;
|
||||
|
||||
sstable_set::incremental_selector::incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s)
|
||||
: _impl(std::move(impl))
|
||||
, _cmp(s) {
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector::~incremental_selector() = default;
|
||||
|
||||
sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default;
|
||||
|
||||
sstable_set::incremental_selector::selection
|
||||
sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const {
|
||||
if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) {
|
||||
std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos);
|
||||
_current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); });
|
||||
}
|
||||
return {_current_sstables, _current_next_position};
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector
|
||||
sstable_set::make_incremental_selector() const {
|
||||
return incremental_selector(_impl->make_incremental_selector(), *_schema);
|
||||
}
|
||||
|
||||
// default sstable_set, not specialized for anything
|
||||
class bag_sstable_set : public sstable_set_impl {
|
||||
// erasing is slow, but select() is fast
|
||||
std::vector<shared_sstable> _sstables;
|
||||
public:
|
||||
virtual std::unique_ptr<sstable_set_impl> clone() const override {
|
||||
return std::make_unique<bag_sstable_set>(*this);
|
||||
}
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override {
|
||||
return _sstables;
|
||||
}
|
||||
virtual void insert(shared_sstable sst) override {
|
||||
_sstables.push_back(std::move(sst));
|
||||
}
|
||||
virtual void erase(shared_sstable sst) override {
|
||||
auto it = boost::range::find(_sstables, sst);
|
||||
if (it != _sstables.end()){
|
||||
_sstables.erase(it);
|
||||
}
|
||||
}
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
class incremental_selector;
|
||||
};
|
||||
|
||||
class bag_sstable_set::incremental_selector : public incremental_selector_impl {
|
||||
const std::vector<shared_sstable>& _sstables;
|
||||
public:
|
||||
incremental_selector(const std::vector<shared_sstable>& sstables)
|
||||
: _sstables(sstables) {
|
||||
}
|
||||
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_view> select(const dht::ring_position_view&) override {
|
||||
return std::make_tuple(dht::partition_range::make_open_ended_both_sides(), _sstables, dht::ring_position_view::max());
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<incremental_selector_impl> bag_sstable_set::make_incremental_selector() const {
|
||||
return std::make_unique<incremental_selector>(_sstables);
|
||||
}
|
||||
|
||||
// specialized when sstables are partitioned in the token range space
|
||||
// e.g. leveled compaction strategy
|
||||
class partitioned_sstable_set : public sstable_set_impl {
|
||||
using value_set = std::unordered_set<shared_sstable>;
|
||||
using interval_map_type = boost::icl::interval_map<compatible_ring_position_or_view, value_set>;
|
||||
using interval_type = interval_map_type::interval_type;
|
||||
using map_iterator = interval_map_type::const_iterator;
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
std::vector<shared_sstable> _unleveled_sstables;
|
||||
interval_map_type _leveled_sstables;
|
||||
// Change counter on interval map for leveled sstables which is used by
|
||||
// incremental selector to determine whether or not to invalidate iterators.
|
||||
uint64_t _leveled_sstables_change_cnt = 0;
|
||||
bool _use_level_metadata = false;
|
||||
private:
|
||||
static interval_type make_interval(const schema& s, const dht::partition_range& range) {
|
||||
return interval_type::closed(
|
||||
compatible_ring_position_or_view(s, dht::ring_position_view(range.start()->value())),
|
||||
compatible_ring_position_or_view(s, dht::ring_position_view(range.end()->value())));
|
||||
}
|
||||
interval_type make_interval(const dht::partition_range& range) const {
|
||||
return make_interval(*_schema, range);
|
||||
}
|
||||
static interval_type make_interval(const schema_ptr& s, const sstable& sst) {
|
||||
return interval_type::closed(
|
||||
compatible_ring_position_or_view(s, dht::ring_position(sst.get_first_decorated_key())),
|
||||
compatible_ring_position_or_view(s, dht::ring_position(sst.get_last_decorated_key())));
|
||||
}
|
||||
interval_type make_interval(const sstable& sst) {
|
||||
return make_interval(_schema, sst);
|
||||
}
|
||||
interval_type singular(const dht::ring_position& rp) const {
|
||||
// We should use the view here, since this is used for queries.
|
||||
auto rpv = dht::ring_position_view(rp);
|
||||
auto crp = compatible_ring_position_or_view(*_schema, std::move(rpv));
|
||||
return interval_type::closed(crp, crp);
|
||||
}
|
||||
std::pair<map_iterator, map_iterator> query(const dht::partition_range& range) const {
|
||||
if (range.start() && range.end()) {
|
||||
return _leveled_sstables.equal_range(make_interval(range));
|
||||
}
|
||||
else if (range.start() && !range.end()) {
|
||||
auto start = singular(range.start()->value());
|
||||
return { _leveled_sstables.lower_bound(start), _leveled_sstables.end() };
|
||||
} else if (!range.start() && range.end()) {
|
||||
auto end = singular(range.end()->value());
|
||||
return { _leveled_sstables.begin(), _leveled_sstables.upper_bound(end) };
|
||||
} else {
|
||||
return { _leveled_sstables.begin(), _leveled_sstables.end() };
|
||||
}
|
||||
}
|
||||
// SSTables are stored separately to avoid interval map's fragmentation issue when level 0 falls behind.
|
||||
bool store_as_unleveled(const shared_sstable& sst) const {
|
||||
return _use_level_metadata && sst->get_sstable_level() == 0;
|
||||
}
|
||||
public:
|
||||
static dht::ring_position to_ring_position(const compatible_ring_position_or_view& crp) {
|
||||
// Ring position views, representing bounds of sstable intervals are
|
||||
// guaranteed to have key() != nullptr;
|
||||
const auto& pos = crp.position();
|
||||
return dht::ring_position(pos.token(), *pos.key());
|
||||
}
|
||||
static dht::partition_range to_partition_range(const interval_type& i) {
|
||||
return dht::partition_range::make(
|
||||
{to_ring_position(i.lower()), boost::icl::is_left_closed(i.bounds())},
|
||||
{to_ring_position(i.upper()), boost::icl::is_right_closed(i.bounds())});
|
||||
}
|
||||
static dht::partition_range to_partition_range(const dht::ring_position_view& pos, const interval_type& i) {
|
||||
auto lower_bound = [&] {
|
||||
if (pos.key()) {
|
||||
return dht::partition_range::bound(dht::ring_position(pos.token(), *pos.key()),
|
||||
pos.is_after_key() == dht::ring_position_view::after_key::no);
|
||||
} else {
|
||||
return dht::partition_range::bound(dht::ring_position(pos.token(), pos.get_token_bound()), true);
|
||||
}
|
||||
}();
|
||||
auto upper_bound = dht::partition_range::bound(to_ring_position(i.lower()), !boost::icl::is_left_closed(i.bounds()));
|
||||
return dht::partition_range::make(std::move(lower_bound), std::move(upper_bound));
|
||||
}
|
||||
explicit partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true)
|
||||
: _schema(std::move(schema))
|
||||
, _use_level_metadata(use_level_metadata) {
|
||||
}
|
||||
virtual std::unique_ptr<sstable_set_impl> clone() const override {
|
||||
return std::make_unique<partitioned_sstable_set>(*this);
|
||||
}
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const override {
|
||||
auto ipair = query(range);
|
||||
auto b = std::move(ipair.first);
|
||||
auto e = std::move(ipair.second);
|
||||
value_set result;
|
||||
while (b != e) {
|
||||
boost::copy(b++->second, std::inserter(result, result.end()));
|
||||
}
|
||||
auto r = _unleveled_sstables;
|
||||
r.insert(r.end(), result.begin(), result.end());
|
||||
return r;
|
||||
}
|
||||
virtual void insert(shared_sstable sst) override {
|
||||
if (store_as_unleveled(sst)) {
|
||||
_unleveled_sstables.push_back(std::move(sst));
|
||||
} else {
|
||||
_leveled_sstables_change_cnt++;
|
||||
_leveled_sstables.add({make_interval(*sst), value_set({sst})});
|
||||
}
|
||||
}
|
||||
virtual void erase(shared_sstable sst) override {
|
||||
if (store_as_unleveled(sst)) {
|
||||
_unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end());
|
||||
} else {
|
||||
_leveled_sstables_change_cnt++;
|
||||
_leveled_sstables.subtract({make_interval(*sst), value_set({sst})});
|
||||
}
|
||||
}
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
class incremental_selector;
|
||||
};
|
||||
|
||||
class partitioned_sstable_set::incremental_selector : public incremental_selector_impl {
|
||||
schema_ptr _schema;
|
||||
const std::vector<shared_sstable>& _unleveled_sstables;
|
||||
const interval_map_type& _leveled_sstables;
|
||||
const uint64_t& _leveled_sstables_change_cnt;
|
||||
uint64_t _last_known_leveled_sstables_change_cnt;
|
||||
map_iterator _it;
|
||||
// Only to back the dht::ring_position_view returned from select().
|
||||
dht::ring_position _next_position;
|
||||
private:
|
||||
dht::ring_position_view next_position(map_iterator it) {
|
||||
if (it == _leveled_sstables.end()) {
|
||||
_next_position = dht::ring_position::max();
|
||||
return dht::ring_position_view::max();
|
||||
} else {
|
||||
_next_position = partitioned_sstable_set::to_ring_position(it->first.lower());
|
||||
return dht::ring_position_view(_next_position, dht::ring_position_view::after_key(!boost::icl::is_left_closed(it->first.bounds())));
|
||||
}
|
||||
}
|
||||
static bool is_before_interval(const compatible_ring_position_or_view& crp, const interval_type& interval) {
|
||||
if (boost::icl::is_left_closed(interval.bounds())) {
|
||||
return crp < interval.lower();
|
||||
} else {
|
||||
return crp <= interval.lower();
|
||||
}
|
||||
}
|
||||
void maybe_invalidate_iterator(const compatible_ring_position_or_view& crp) {
|
||||
if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) {
|
||||
_it = _leveled_sstables.lower_bound(interval_type::closed(crp, crp));
|
||||
_last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt;
|
||||
}
|
||||
}
|
||||
public:
|
||||
incremental_selector(schema_ptr schema, const std::vector<shared_sstable>& unleveled_sstables, const interval_map_type& leveled_sstables,
|
||||
const uint64_t& leveled_sstables_change_cnt)
|
||||
: _schema(std::move(schema))
|
||||
, _unleveled_sstables(unleveled_sstables)
|
||||
, _leveled_sstables(leveled_sstables)
|
||||
, _leveled_sstables_change_cnt(leveled_sstables_change_cnt)
|
||||
, _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt)
|
||||
, _it(leveled_sstables.begin())
|
||||
, _next_position(dht::ring_position::min()) {
|
||||
}
|
||||
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_view> select(const dht::ring_position_view& pos) override {
|
||||
auto crp = compatible_ring_position_or_view(*_schema, pos);
|
||||
auto ssts = _unleveled_sstables;
|
||||
using namespace dht;
|
||||
|
||||
maybe_invalidate_iterator(crp);
|
||||
|
||||
while (_it != _leveled_sstables.end()) {
|
||||
if (boost::icl::contains(_it->first, crp)) {
|
||||
ssts.insert(ssts.end(), _it->second.begin(), _it->second.end());
|
||||
return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it)));
|
||||
}
|
||||
// We don't want to skip current interval if pos lies before it.
|
||||
if (is_before_interval(crp, _it->first)) {
|
||||
return std::make_tuple(partitioned_sstable_set::to_partition_range(pos, _it->first), std::move(ssts), next_position(_it));
|
||||
}
|
||||
_it++;
|
||||
}
|
||||
return std::make_tuple(partition_range::make_open_ended_both_sides(), std::move(ssts), ring_position_view::max());
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<incremental_selector_impl> partitioned_sstable_set::make_incremental_selector() const {
|
||||
return std::make_unique<incremental_selector>(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt);
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> compaction_strategy_impl::make_sstable_set(schema_ptr schema) const {
|
||||
return std::make_unique<bag_sstable_set>();
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> leveled_compaction_strategy::make_sstable_set(schema_ptr schema) const {
|
||||
return std::make_unique<partitioned_sstable_set>(std::move(schema));
|
||||
}
|
||||
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata) {
|
||||
return sstables::sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata), schema, std::move(all));
|
||||
}
|
||||
|
||||
compaction_descriptor compaction_strategy_impl::get_major_compaction_job(column_family& cf, std::vector<sstables::shared_sstable> candidates) {
|
||||
return compaction_descriptor(std::move(candidates), cf.get_sstable_set(), service::get_local_compaction_priority());
|
||||
}
|
||||
@@ -550,6 +170,8 @@ void size_tiered_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
|
||||
|
||||
namespace sstables {
|
||||
|
||||
extern logging::logger clogger;
|
||||
|
||||
// The backlog for TWCS is just the sum of the individual backlogs in each time window.
|
||||
// We'll keep various SizeTiered backlog tracker objects-- one per window for the static SSTables.
|
||||
// We then scan the current compacting and in-progress writes and matching them to existing time
|
||||
@@ -1031,14 +653,6 @@ bool compaction_strategy::use_clustering_key_filter() const {
|
||||
return _compaction_strategy_impl->use_clustering_key_filter();
|
||||
}
|
||||
|
||||
sstable_set
|
||||
compaction_strategy::make_sstable_set(schema_ptr schema) const {
|
||||
return sstable_set(
|
||||
_compaction_strategy_impl->make_sstable_set(schema),
|
||||
schema,
|
||||
make_lw_shared<sstable_list>());
|
||||
}
|
||||
|
||||
compaction_backlog_tracker& compaction_strategy::get_backlog_tracker() {
|
||||
return _compaction_strategy_impl->get_backlog_tracker();
|
||||
}
|
||||
|
||||
659
sstables/sstable_set.cc
Normal file
659
sstables/sstable_set.cc
Normal file
@@ -0,0 +1,659 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <boost/icl/interval_map.hpp>
|
||||
|
||||
#include "compatible_ring_position.hh"
|
||||
#include "compaction_strategy_impl.hh"
|
||||
#include "leveled_compaction_strategy.hh"
|
||||
|
||||
#include "sstable_set_impl.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
void sstable_run::insert(shared_sstable sst) {
|
||||
_all.insert(std::move(sst));
|
||||
}
|
||||
|
||||
void sstable_run::erase(shared_sstable sst) {
|
||||
_all.erase(sst);
|
||||
}
|
||||
|
||||
uint64_t sstable_run::data_size() const {
|
||||
return boost::accumulate(_all | boost::adaptors::transformed(std::mem_fn(&sstable::data_size)), uint64_t(0));
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) {
|
||||
os << "Run = {\n";
|
||||
if (run.all().empty()) {
|
||||
os << " Identifier: not found\n";
|
||||
} else {
|
||||
os << format(" Identifier: {}\n", (*run.all().begin())->run_identifier());
|
||||
}
|
||||
|
||||
auto frags = boost::copy_range<std::vector<shared_sstable>>(run.all());
|
||||
boost::sort(frags, [] (const shared_sstable& x, const shared_sstable& y) {
|
||||
return x->get_first_decorated_key().token() < y->get_first_decorated_key().token();
|
||||
});
|
||||
os << " Fragments = {\n";
|
||||
for (auto& frag : frags) {
|
||||
os << format(" {}={}:{}\n", frag->generation(), frag->get_first_decorated_key().token(), frag->get_last_decorated_key().token());
|
||||
}
|
||||
os << " }\n}\n";
|
||||
return os;
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s, lw_shared_ptr<sstable_list> all)
|
||||
: _impl(std::move(impl))
|
||||
, _schema(std::move(s))
|
||||
, _all(std::move(all)) {
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(const sstable_set& x)
|
||||
: _impl(x._impl->clone())
|
||||
, _schema(x._schema)
|
||||
, _all(make_lw_shared<sstable_list>(*x._all))
|
||||
, _all_runs(x._all_runs) {
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(sstable_set&&) noexcept = default;
|
||||
|
||||
sstable_set&
|
||||
sstable_set::operator=(const sstable_set& x) {
|
||||
if (this != &x) {
|
||||
auto tmp = sstable_set(x);
|
||||
*this = std::move(tmp);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_set&
|
||||
sstable_set::operator=(sstable_set&&) noexcept = default;
|
||||
|
||||
std::vector<shared_sstable>
|
||||
sstable_set::select(const dht::partition_range& range) const {
|
||||
return _impl->select(range);
|
||||
}
|
||||
|
||||
std::vector<sstable_run>
|
||||
sstable_set::select_sstable_runs(const std::vector<shared_sstable>& sstables) const {
|
||||
auto run_ids = boost::copy_range<std::unordered_set<utils::UUID>>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier)));
|
||||
return boost::copy_range<std::vector<sstable_run>>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) {
|
||||
return _all_runs.at(run_id);
|
||||
}));
|
||||
}
|
||||
|
||||
void
|
||||
sstable_set::insert(shared_sstable sst) {
|
||||
_impl->insert(sst);
|
||||
try {
|
||||
_all->insert(sst);
|
||||
try {
|
||||
_all_runs[sst->run_identifier()].insert(sst);
|
||||
} catch (...) {
|
||||
_all->erase(sst);
|
||||
throw;
|
||||
}
|
||||
} catch (...) {
|
||||
_impl->erase(sst);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
sstable_set::erase(shared_sstable sst) {
|
||||
_impl->erase(sst);
|
||||
_all->erase(sst);
|
||||
_all_runs[sst->run_identifier()].erase(sst);
|
||||
}
|
||||
|
||||
sstable_set::~sstable_set() = default;
|
||||
|
||||
sstable_set::incremental_selector::incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s)
|
||||
: _impl(std::move(impl))
|
||||
, _cmp(s) {
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector::~incremental_selector() = default;
|
||||
|
||||
sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default;
|
||||
|
||||
sstable_set::incremental_selector::selection
|
||||
sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const {
|
||||
if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) {
|
||||
std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos);
|
||||
_current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); });
|
||||
}
|
||||
return {_current_sstables, _current_next_position};
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector
|
||||
sstable_set::make_incremental_selector() const {
|
||||
return incremental_selector(_impl->make_incremental_selector(), *_schema);
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> bag_sstable_set::clone() const {
|
||||
return std::make_unique<bag_sstable_set>(*this);
|
||||
}
|
||||
|
||||
std::vector<shared_sstable> bag_sstable_set::select(const dht::partition_range& range) const {
|
||||
return _sstables;
|
||||
}
|
||||
|
||||
void bag_sstable_set::insert(shared_sstable sst) {
|
||||
_sstables.push_back(std::move(sst));
|
||||
}
|
||||
|
||||
void bag_sstable_set::erase(shared_sstable sst) {
|
||||
auto it = boost::range::find(_sstables, sst);
|
||||
if (it != _sstables.end()){
|
||||
_sstables.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
class bag_sstable_set::incremental_selector : public incremental_selector_impl {
|
||||
const std::vector<shared_sstable>& _sstables;
|
||||
public:
|
||||
incremental_selector(const std::vector<shared_sstable>& sstables)
|
||||
: _sstables(sstables) {
|
||||
}
|
||||
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_view> select(const dht::ring_position_view&) override {
|
||||
return std::make_tuple(dht::partition_range::make_open_ended_both_sides(), _sstables, dht::ring_position_view::max());
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<incremental_selector_impl> bag_sstable_set::make_incremental_selector() const {
|
||||
return std::make_unique<incremental_selector>(_sstables);
|
||||
}
|
||||
|
||||
partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const schema& s, const dht::partition_range& range) {
|
||||
return interval_type::closed(
|
||||
compatible_ring_position_or_view(s, dht::ring_position_view(range.start()->value())),
|
||||
compatible_ring_position_or_view(s, dht::ring_position_view(range.end()->value())));
|
||||
}
|
||||
|
||||
partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const dht::partition_range& range) const {
|
||||
return make_interval(*_schema, range);
|
||||
}
|
||||
|
||||
partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const schema_ptr& s, const sstable& sst) {
|
||||
return interval_type::closed(
|
||||
compatible_ring_position_or_view(s, dht::ring_position(sst.get_first_decorated_key())),
|
||||
compatible_ring_position_or_view(s, dht::ring_position(sst.get_last_decorated_key())));
|
||||
}
|
||||
|
||||
partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const sstable& sst) {
|
||||
return make_interval(_schema, sst);
|
||||
}
|
||||
|
||||
partitioned_sstable_set::interval_type partitioned_sstable_set::singular(const dht::ring_position& rp) const {
|
||||
// We should use the view here, since this is used for queries.
|
||||
auto rpv = dht::ring_position_view(rp);
|
||||
auto crp = compatible_ring_position_or_view(*_schema, std::move(rpv));
|
||||
return interval_type::closed(crp, crp);
|
||||
}
|
||||
|
||||
std::pair<partitioned_sstable_set::map_iterator, partitioned_sstable_set::map_iterator>
|
||||
partitioned_sstable_set::query(const dht::partition_range& range) const {
|
||||
if (range.start() && range.end()) {
|
||||
return _leveled_sstables.equal_range(make_interval(range));
|
||||
}
|
||||
else if (range.start() && !range.end()) {
|
||||
auto start = singular(range.start()->value());
|
||||
return { _leveled_sstables.lower_bound(start), _leveled_sstables.end() };
|
||||
} else if (!range.start() && range.end()) {
|
||||
auto end = singular(range.end()->value());
|
||||
return { _leveled_sstables.begin(), _leveled_sstables.upper_bound(end) };
|
||||
} else {
|
||||
return { _leveled_sstables.begin(), _leveled_sstables.end() };
|
||||
}
|
||||
}
|
||||
|
||||
bool partitioned_sstable_set::store_as_unleveled(const shared_sstable& sst) const {
|
||||
return _use_level_metadata && sst->get_sstable_level() == 0;
|
||||
}
|
||||
|
||||
dht::ring_position partitioned_sstable_set::to_ring_position(const compatible_ring_position_or_view& crp) {
|
||||
// Ring position views, representing bounds of sstable intervals are
|
||||
// guaranteed to have key() != nullptr;
|
||||
const auto& pos = crp.position();
|
||||
return dht::ring_position(pos.token(), *pos.key());
|
||||
}
|
||||
|
||||
dht::partition_range partitioned_sstable_set::to_partition_range(const interval_type& i) {
|
||||
return dht::partition_range::make(
|
||||
{to_ring_position(i.lower()), boost::icl::is_left_closed(i.bounds())},
|
||||
{to_ring_position(i.upper()), boost::icl::is_right_closed(i.bounds())});
|
||||
}
|
||||
|
||||
dht::partition_range partitioned_sstable_set::to_partition_range(const dht::ring_position_view& pos, const interval_type& i) {
|
||||
auto lower_bound = [&] {
|
||||
if (pos.key()) {
|
||||
return dht::partition_range::bound(dht::ring_position(pos.token(), *pos.key()),
|
||||
pos.is_after_key() == dht::ring_position_view::after_key::no);
|
||||
} else {
|
||||
return dht::partition_range::bound(dht::ring_position(pos.token(), pos.get_token_bound()), true);
|
||||
}
|
||||
}();
|
||||
auto upper_bound = dht::partition_range::bound(to_ring_position(i.lower()), !boost::icl::is_left_closed(i.bounds()));
|
||||
return dht::partition_range::make(std::move(lower_bound), std::move(upper_bound));
|
||||
}
|
||||
|
||||
partitioned_sstable_set::partitioned_sstable_set(schema_ptr schema, bool use_level_metadata)
|
||||
: _schema(std::move(schema))
|
||||
, _use_level_metadata(use_level_metadata) {
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> partitioned_sstable_set::clone() const {
|
||||
return std::make_unique<partitioned_sstable_set>(*this);
|
||||
}
|
||||
|
||||
std::vector<shared_sstable> partitioned_sstable_set::select(const dht::partition_range& range) const {
|
||||
auto ipair = query(range);
|
||||
auto b = std::move(ipair.first);
|
||||
auto e = std::move(ipair.second);
|
||||
value_set result;
|
||||
while (b != e) {
|
||||
boost::copy(b++->second, std::inserter(result, result.end()));
|
||||
}
|
||||
auto r = _unleveled_sstables;
|
||||
r.insert(r.end(), result.begin(), result.end());
|
||||
return r;
|
||||
}
|
||||
|
||||
void partitioned_sstable_set::insert(shared_sstable sst) {
|
||||
if (store_as_unleveled(sst)) {
|
||||
_unleveled_sstables.push_back(std::move(sst));
|
||||
} else {
|
||||
_leveled_sstables_change_cnt++;
|
||||
_leveled_sstables.add({make_interval(*sst), value_set({sst})});
|
||||
}
|
||||
}
|
||||
|
||||
void partitioned_sstable_set::erase(shared_sstable sst) {
|
||||
if (store_as_unleveled(sst)) {
|
||||
_unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end());
|
||||
} else {
|
||||
_leveled_sstables_change_cnt++;
|
||||
_leveled_sstables.subtract({make_interval(*sst), value_set({sst})});
|
||||
}
|
||||
}
|
||||
|
||||
class partitioned_sstable_set::incremental_selector : public incremental_selector_impl {
|
||||
schema_ptr _schema;
|
||||
const std::vector<shared_sstable>& _unleveled_sstables;
|
||||
const interval_map_type& _leveled_sstables;
|
||||
const uint64_t& _leveled_sstables_change_cnt;
|
||||
uint64_t _last_known_leveled_sstables_change_cnt;
|
||||
map_iterator _it;
|
||||
// Only to back the dht::ring_position_view returned from select().
|
||||
dht::ring_position _next_position;
|
||||
private:
|
||||
dht::ring_position_view next_position(map_iterator it) {
|
||||
if (it == _leveled_sstables.end()) {
|
||||
_next_position = dht::ring_position::max();
|
||||
return dht::ring_position_view::max();
|
||||
} else {
|
||||
_next_position = partitioned_sstable_set::to_ring_position(it->first.lower());
|
||||
return dht::ring_position_view(_next_position, dht::ring_position_view::after_key(!boost::icl::is_left_closed(it->first.bounds())));
|
||||
}
|
||||
}
|
||||
static bool is_before_interval(const compatible_ring_position_or_view& crp, const interval_type& interval) {
|
||||
if (boost::icl::is_left_closed(interval.bounds())) {
|
||||
return crp < interval.lower();
|
||||
} else {
|
||||
return crp <= interval.lower();
|
||||
}
|
||||
}
|
||||
void maybe_invalidate_iterator(const compatible_ring_position_or_view& crp) {
|
||||
if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) {
|
||||
_it = _leveled_sstables.lower_bound(interval_type::closed(crp, crp));
|
||||
_last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt;
|
||||
}
|
||||
}
|
||||
public:
|
||||
incremental_selector(schema_ptr schema, const std::vector<shared_sstable>& unleveled_sstables, const interval_map_type& leveled_sstables,
|
||||
const uint64_t& leveled_sstables_change_cnt)
|
||||
: _schema(std::move(schema))
|
||||
, _unleveled_sstables(unleveled_sstables)
|
||||
, _leveled_sstables(leveled_sstables)
|
||||
, _leveled_sstables_change_cnt(leveled_sstables_change_cnt)
|
||||
, _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt)
|
||||
, _it(leveled_sstables.begin())
|
||||
, _next_position(dht::ring_position::min()) {
|
||||
}
|
||||
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_view> select(const dht::ring_position_view& pos) override {
|
||||
auto crp = compatible_ring_position_or_view(*_schema, pos);
|
||||
auto ssts = _unleveled_sstables;
|
||||
using namespace dht;
|
||||
|
||||
maybe_invalidate_iterator(crp);
|
||||
|
||||
while (_it != _leveled_sstables.end()) {
|
||||
if (boost::icl::contains(_it->first, crp)) {
|
||||
ssts.insert(ssts.end(), _it->second.begin(), _it->second.end());
|
||||
return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it)));
|
||||
}
|
||||
// We don't want to skip current interval if pos lies before it.
|
||||
if (is_before_interval(crp, _it->first)) {
|
||||
return std::make_tuple(partitioned_sstable_set::to_partition_range(pos, _it->first), std::move(ssts), next_position(_it));
|
||||
}
|
||||
_it++;
|
||||
}
|
||||
return std::make_tuple(partition_range::make_open_ended_both_sides(), std::move(ssts), ring_position_view::max());
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<incremental_selector_impl> partitioned_sstable_set::make_incremental_selector() const {
|
||||
return std::make_unique<incremental_selector>(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt);
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> compaction_strategy_impl::make_sstable_set(schema_ptr schema) const {
|
||||
return std::make_unique<bag_sstable_set>();
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> leveled_compaction_strategy::make_sstable_set(schema_ptr schema) const {
|
||||
return std::make_unique<partitioned_sstable_set>(std::move(schema));
|
||||
}
|
||||
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata) {
|
||||
return sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata), schema, std::move(all));
|
||||
}
|
||||
|
||||
sstable_set
|
||||
compaction_strategy::make_sstable_set(schema_ptr schema) const {
|
||||
return sstable_set(
|
||||
_compaction_strategy_impl->make_sstable_set(schema),
|
||||
schema,
|
||||
make_lw_shared<sstable_list>());
|
||||
}
|
||||
|
||||
using sstable_reader_factory_type = std::function<flat_mutation_reader(shared_sstable&, const dht::partition_range& pr)>;
|
||||
|
||||
static logging::logger irclogger("incremental_reader_selector");
|
||||
|
||||
// Incremental selector implementation for combined_mutation_reader that
|
||||
// selects readers on-demand as the read progresses through the token
|
||||
// range.
|
||||
class incremental_reader_selector : public reader_selector {
|
||||
const dht::partition_range* _pr;
|
||||
lw_shared_ptr<const sstable_set> _sstables;
|
||||
tracing::trace_state_ptr _trace_state;
|
||||
std::optional<sstable_set::incremental_selector> _selector;
|
||||
std::unordered_set<int64_t> _read_sstable_gens;
|
||||
sstable_reader_factory_type _fn;
|
||||
|
||||
flat_mutation_reader create_reader(shared_sstable sst) {
|
||||
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
|
||||
return _fn(sst, *_pr);
|
||||
}
|
||||
|
||||
public:
|
||||
explicit incremental_reader_selector(schema_ptr s,
|
||||
lw_shared_ptr<const sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
sstable_reader_factory_type fn)
|
||||
: reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min())
|
||||
, _pr(&pr)
|
||||
, _sstables(std::move(sstables))
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _selector(_sstables->make_incremental_selector())
|
||||
, _fn(std::move(fn)) {
|
||||
|
||||
irclogger.trace("{}: created for range: {} with {} sstables",
|
||||
fmt::ptr(this),
|
||||
*_pr,
|
||||
_sstables->all()->size());
|
||||
}
|
||||
|
||||
incremental_reader_selector(const incremental_reader_selector&) = delete;
|
||||
incremental_reader_selector& operator=(const incremental_reader_selector&) = delete;
|
||||
|
||||
incremental_reader_selector(incremental_reader_selector&&) = delete;
|
||||
incremental_reader_selector& operator=(incremental_reader_selector&&) = delete;
|
||||
|
||||
virtual std::vector<flat_mutation_reader> create_new_readers(const std::optional<dht::ring_position_view>& pos) override {
|
||||
irclogger.trace("{}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos));
|
||||
|
||||
auto readers = std::vector<flat_mutation_reader>();
|
||||
|
||||
do {
|
||||
auto selection = _selector->select(_selector_position);
|
||||
_selector_position = selection.next_position;
|
||||
|
||||
irclogger.trace("{}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(),
|
||||
_selector_position);
|
||||
|
||||
readers = boost::copy_range<std::vector<flat_mutation_reader>>(selection.sstables
|
||||
| boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; })
|
||||
| boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); }));
|
||||
} while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0));
|
||||
|
||||
irclogger.trace("{}: created {} new readers", fmt::ptr(this), readers.size());
|
||||
|
||||
// prevents sstable_set::incremental_selector::_current_sstables from holding reference to
|
||||
// sstables when done selecting.
|
||||
if (_selector_position.is_max()) {
|
||||
_selector.reset();
|
||||
}
|
||||
|
||||
return readers;
|
||||
}
|
||||
|
||||
virtual std::vector<flat_mutation_reader> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
|
||||
_pr = ≺
|
||||
|
||||
auto pos = dht::ring_position_view::for_range_start(*_pr);
|
||||
if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) {
|
||||
return create_new_readers(pos);
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
};
|
||||
|
||||
// The returned function uses the bloom filter to check whether the given sstable
|
||||
// may have a partition given by the ring position `pos`.
|
||||
//
|
||||
// Returning `false` means the sstable doesn't have such a partition.
|
||||
// Returning `true` means it may, i.e. we don't know whether or not it does.
|
||||
//
|
||||
// Assumes the given `pos` and `schema` are alive during the function's lifetime.
|
||||
static std::predicate<const sstable&> auto
|
||||
make_pk_filter(const dht::ring_position& pos, const schema& schema) {
|
||||
return [&pos, key = key::from_partition_key(schema, *pos.key()), cmp = dht::ring_position_comparator(schema)] (const sstable& sst) {
|
||||
return cmp(pos, sst.get_first_decorated_key()) >= 0 &&
|
||||
cmp(pos, sst.get_last_decorated_key()) <= 0 &&
|
||||
sst.filter_has_key(key);
|
||||
};
|
||||
}
|
||||
|
||||
// Filter out sstables for reader using bloom filter
|
||||
static std::vector<shared_sstable>
|
||||
filter_sstable_for_reader_by_pk(std::vector<shared_sstable>&& sstables, const schema& schema, const dht::ring_position& pos) {
|
||||
auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); };
|
||||
sstables.erase(boost::remove_if(sstables, filter), sstables.end());
|
||||
return sstables;
|
||||
}
|
||||
|
||||
// Filter out sstables for reader using sstable metadata that keeps track
|
||||
// of a range for each clustering component.
|
||||
static std::vector<shared_sstable>
|
||||
filter_sstable_for_reader_by_ck(std::vector<shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
|
||||
const query::partition_slice& slice) {
|
||||
// no clustering filtering is applied if schema defines no clustering key or
|
||||
// compaction strategy thinks it will not benefit from such an optimization,
|
||||
// or the partition_slice includes static columns.
|
||||
if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) {
|
||||
return sstables;
|
||||
}
|
||||
|
||||
::cf_stats* stats = cf.cf_stats();
|
||||
stats->clustering_filter_count++;
|
||||
stats->sstables_checked_by_clustering_filter += sstables.size();
|
||||
|
||||
auto ck_filtering_all_ranges = slice.get_all_ranges();
|
||||
// fast path to include all sstables if only one full range was specified.
|
||||
// For example, this happens if query only specifies a partition key.
|
||||
if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) {
|
||||
stats->clustering_filter_fast_path_count++;
|
||||
stats->surviving_sstables_after_clustering_filter += sstables.size();
|
||||
return sstables;
|
||||
}
|
||||
|
||||
auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const shared_sstable& sst) {
|
||||
return sst->may_contain_rows(ranges);
|
||||
});
|
||||
sstables.erase(skipped, sstables.end());
|
||||
stats->surviving_sstables_after_clustering_filter += sstables.size();
|
||||
|
||||
return sstables;
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
sstable_set::create_single_key_sstable_reader(
|
||||
column_family* cf,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
utils::estimated_histogram& sstable_histogram,
|
||||
const dht::ring_position& pos,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const
|
||||
{
|
||||
auto selected_sstables = filter_sstable_for_reader_by_pk(select({pos}), *schema, pos);
|
||||
auto num_sstables = selected_sstables.size();
|
||||
if (!num_sstables) {
|
||||
return make_empty_flat_reader(schema, permit);
|
||||
}
|
||||
auto readers = boost::copy_range<std::vector<flat_mutation_reader>>(
|
||||
filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice)
|
||||
| boost::adaptors::transformed([&] (const shared_sstable& sstable) {
|
||||
tracing::trace(trace_state, "Reading key {} from sstable {}", pos, seastar::value_of([&sstable] { return sstable->get_filename(); }));
|
||||
return sstable->read_row_flat(schema, permit, pos, slice, pc, trace_state, fwd);
|
||||
})
|
||||
);
|
||||
|
||||
// If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition
|
||||
// we want to emit partition_start/end if no rows were found,
|
||||
// to prevent https://github.com/scylladb/scylla/issues/3552.
|
||||
//
|
||||
// Use `flat_mutation_reader_from_mutations` with an empty mutation to emit
|
||||
// the partition_start/end pair and append it to the list of readers passed
|
||||
// to make_combined_reader to ensure partition_start/end are emitted even if
|
||||
// all sstables actually containing the partition were filtered.
|
||||
auto num_readers = readers.size();
|
||||
if (num_readers != num_sstables) {
|
||||
readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pos.key())}, slice, fwd));
|
||||
}
|
||||
sstable_histogram.add(num_readers);
|
||||
return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
sstable_set::make_range_sstable_reader(
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
read_monitor_generator& monitor_generator) const
|
||||
{
|
||||
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator]
|
||||
(shared_sstable& sst, const dht::partition_range& pr) mutable {
|
||||
return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
};
|
||||
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
|
||||
shared_from_this(),
|
||||
pr,
|
||||
std::move(trace_state),
|
||||
std::move(reader_factory_fn)),
|
||||
fwd,
|
||||
fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
sstable_set::make_local_shard_sstable_reader(
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
read_monitor_generator& monitor_generator) const
|
||||
{
|
||||
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator]
|
||||
(shared_sstable& sst, const dht::partition_range& pr) mutable {
|
||||
flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc,
|
||||
trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
if (sst->is_shared()) {
|
||||
auto filter = [&s = *s](const dht::decorated_key& dk) -> bool {
|
||||
return dht::shard_of(s, dk.token()) == this_shard_id();
|
||||
};
|
||||
reader = make_filtering_reader(std::move(reader), std::move(filter));
|
||||
}
|
||||
return reader;
|
||||
};
|
||||
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
|
||||
shared_from_this(),
|
||||
pr,
|
||||
std::move(trace_state),
|
||||
std::move(reader_factory_fn)),
|
||||
fwd,
|
||||
fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader make_restricted_range_sstable_reader(
|
||||
lw_shared_ptr<sstable_set> sstables,
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
read_monitor_generator& monitor_generator)
|
||||
{
|
||||
auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return sstables->make_range_sstable_reader(std::move(s), std::move(permit), pr, slice, pc,
|
||||
std::move(trace_state), fwd, fwd_mr, monitor_generator);
|
||||
});
|
||||
return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
} // namespace sstables
|
||||
@@ -21,11 +21,17 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "flat_mutation_reader.hh"
|
||||
#include "sstables/progress_monitor.hh"
|
||||
#include "shared_sstable.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <vector>
|
||||
|
||||
namespace utils {
|
||||
class estimated_histogram;
|
||||
}
|
||||
|
||||
namespace sstables {
|
||||
|
||||
class sstable_set_impl;
|
||||
@@ -43,7 +49,7 @@ public:
|
||||
const sstable_list& all() const { return _all; }
|
||||
};
|
||||
|
||||
class sstable_set {
|
||||
class sstable_set : public enable_lw_shared_from_this<sstable_set> {
|
||||
std::unique_ptr<sstable_set_impl> _impl;
|
||||
schema_ptr _schema;
|
||||
// used to support column_family::get_sstable(), which wants to return an sstable_list
|
||||
@@ -99,8 +105,63 @@ public:
|
||||
selection select(const dht::ring_position_view& pos) const;
|
||||
};
|
||||
incremental_selector make_incremental_selector() const;
|
||||
|
||||
flat_mutation_reader create_single_key_sstable_reader(
|
||||
column_family*,
|
||||
schema_ptr,
|
||||
reader_permit,
|
||||
utils::estimated_histogram&,
|
||||
const dht::ring_position&, // must contain a key
|
||||
const query::partition_slice&,
|
||||
const io_priority_class&,
|
||||
tracing::trace_state_ptr,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding) const;
|
||||
|
||||
/// Read a range from the sstable set.
|
||||
///
|
||||
/// The reader is unrestricted, but will account its resource usage on the
|
||||
/// semaphore belonging to the passed-in permit.
|
||||
flat_mutation_reader make_range_sstable_reader(
|
||||
schema_ptr,
|
||||
reader_permit,
|
||||
const dht::partition_range&,
|
||||
const query::partition_slice&,
|
||||
const io_priority_class&,
|
||||
tracing::trace_state_ptr,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding,
|
||||
read_monitor_generator& rmg = default_read_monitor_generator()) const;
|
||||
|
||||
// Filters out mutations that don't belong to the current shard.
|
||||
flat_mutation_reader make_local_shard_sstable_reader(
|
||||
schema_ptr,
|
||||
reader_permit,
|
||||
const dht::partition_range&,
|
||||
const query::partition_slice&,
|
||||
const io_priority_class&,
|
||||
tracing::trace_state_ptr,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding,
|
||||
read_monitor_generator& rmg = default_read_monitor_generator()) const;
|
||||
};
|
||||
|
||||
/// Read a range from the passed-in sstables.
|
||||
///
|
||||
/// The reader is restricted, that is it will wait for admission on the semaphore
|
||||
/// belonging to the passed-in permit, before starting to read.
|
||||
flat_mutation_reader make_restricted_range_sstable_reader(
|
||||
lw_shared_ptr<sstable_set> sstables,
|
||||
schema_ptr,
|
||||
reader_permit,
|
||||
const dht::partition_range&,
|
||||
const query::partition_slice&,
|
||||
const io_priority_class&,
|
||||
tracing::trace_state_ptr,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding,
|
||||
read_monitor_generator& rmg = default_read_monitor_generator());
|
||||
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata = true);
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run);
|
||||
|
||||
98
sstables/sstable_set_impl.hh
Normal file
98
sstables/sstable_set_impl.hh
Normal file
@@ -0,0 +1,98 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/icl/interval_map.hpp>
|
||||
|
||||
#include "compatible_ring_position.hh"
|
||||
#include "sstable_set.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
class incremental_selector_impl {
|
||||
public:
|
||||
virtual ~incremental_selector_impl() {}
|
||||
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_view> select(const dht::ring_position_view&) = 0;
|
||||
};
|
||||
|
||||
class sstable_set_impl {
|
||||
public:
|
||||
virtual ~sstable_set_impl() {}
|
||||
virtual std::unique_ptr<sstable_set_impl> clone() const = 0;
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const = 0;
|
||||
virtual void insert(shared_sstable sst) = 0;
|
||||
virtual void erase(shared_sstable sst) = 0;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const = 0;
|
||||
};
|
||||
|
||||
// default sstable_set, not specialized for anything
|
||||
class bag_sstable_set : public sstable_set_impl {
|
||||
// erasing is slow, but select() is fast
|
||||
std::vector<shared_sstable> _sstables;
|
||||
public:
|
||||
virtual std::unique_ptr<sstable_set_impl> clone() const override;
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override;
|
||||
virtual void insert(shared_sstable sst) override;
|
||||
virtual void erase(shared_sstable sst) override;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
class incremental_selector;
|
||||
};
|
||||
|
||||
// specialized when sstables are partitioned in the token range space
|
||||
// e.g. leveled compaction strategy
|
||||
class partitioned_sstable_set : public sstable_set_impl {
|
||||
using value_set = std::unordered_set<shared_sstable>;
|
||||
using interval_map_type = boost::icl::interval_map<compatible_ring_position_or_view, value_set>;
|
||||
using interval_type = interval_map_type::interval_type;
|
||||
using map_iterator = interval_map_type::const_iterator;
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
std::vector<shared_sstable> _unleveled_sstables;
|
||||
interval_map_type _leveled_sstables;
|
||||
// Change counter on interval map for leveled sstables which is used by
|
||||
// incremental selector to determine whether or not to invalidate iterators.
|
||||
uint64_t _leveled_sstables_change_cnt = 0;
|
||||
bool _use_level_metadata = false;
|
||||
private:
|
||||
static interval_type make_interval(const schema& s, const dht::partition_range& range);
|
||||
interval_type make_interval(const dht::partition_range& range) const;
|
||||
static interval_type make_interval(const schema_ptr& s, const sstable& sst);
|
||||
interval_type make_interval(const sstable& sst);
|
||||
interval_type singular(const dht::ring_position& rp) const;
|
||||
std::pair<map_iterator, map_iterator> query(const dht::partition_range& range) const;
|
||||
// SSTables are stored separately to avoid interval map's fragmentation issue when level 0 falls behind.
|
||||
bool store_as_unleveled(const shared_sstable& sst) const;
|
||||
public:
|
||||
static dht::ring_position to_ring_position(const compatible_ring_position_or_view& crp);
|
||||
static dht::partition_range to_partition_range(const interval_type& i);
|
||||
static dht::partition_range to_partition_range(const dht::ring_position_view& pos, const interval_type& i);
|
||||
explicit partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true);
|
||||
|
||||
virtual std::unique_ptr<sstable_set_impl> clone() const override;
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const override;
|
||||
virtual void insert(shared_sstable sst) override;
|
||||
virtual void erase(shared_sstable sst) override;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
class incremental_selector;
|
||||
};
|
||||
|
||||
} // namespace sstables
|
||||
@@ -797,6 +797,12 @@ public:
|
||||
// Return true if this sstable possibly stores clustering row(s) specified by ranges.
|
||||
bool may_contain_rows(const query::clustering_row_ranges& ranges) const;
|
||||
|
||||
// false => there are no partition tombstones, true => we don't know
|
||||
bool may_have_partition_tombstones() const {
|
||||
return !has_correct_min_max_column_names()
|
||||
|| _position_range.is_all_clustered_rows(*_schema);
|
||||
}
|
||||
|
||||
// Allow the test cases from sstable_test.cc to test private methods. We use
|
||||
// a placeholder to avoid cluttering this class too much. The sstable_test class
|
||||
// will then re-export as public every method it needs.
|
||||
|
||||
265
table.cc
265
table.cc
@@ -25,10 +25,7 @@
|
||||
#include "service/priority_manager.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "cell_locking.hh"
|
||||
#include "mutation_fragment.hh"
|
||||
#include "mutation_partition.hh"
|
||||
#include "utils/logalloc.hh"
|
||||
#include "sstables/progress_monitor.hh"
|
||||
#include "checked-file-impl.hh"
|
||||
#include "view_info.hh"
|
||||
#include "db/data_listeners.hh"
|
||||
@@ -52,229 +49,6 @@ static seastar::metrics::label keyspace_label("ks");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
// Filter out sstables for reader using bloom filter
|
||||
static std::vector<sstables::shared_sstable>
|
||||
filter_sstable_for_reader_by_pk(std::vector<sstables::shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
|
||||
const dht::partition_range& pr, const sstables::key& key) {
|
||||
const dht::ring_position& pr_key = pr.start()->value();
|
||||
auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const sstables::shared_sstable& sst) {
|
||||
return cmp(pr_key, sst->get_first_decorated_key()) < 0 ||
|
||||
cmp(pr_key, sst->get_last_decorated_key()) > 0 ||
|
||||
!sst->filter_has_key(key);
|
||||
};
|
||||
sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end());
|
||||
return sstables;
|
||||
}
|
||||
|
||||
// Filter out sstables for reader using sstable metadata that keeps track
|
||||
// of a range for each clustering component.
|
||||
static std::vector<sstables::shared_sstable>
|
||||
filter_sstable_for_reader_by_ck(std::vector<sstables::shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
|
||||
const query::partition_slice& slice) {
|
||||
// no clustering filtering is applied if schema defines no clustering key or
|
||||
// compaction strategy thinks it will not benefit from such an optimization,
|
||||
// or the partition_slice includes static columns.
|
||||
if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) {
|
||||
return sstables;
|
||||
}
|
||||
|
||||
::cf_stats* stats = cf.cf_stats();
|
||||
stats->clustering_filter_count++;
|
||||
stats->sstables_checked_by_clustering_filter += sstables.size();
|
||||
|
||||
auto ck_filtering_all_ranges = slice.get_all_ranges();
|
||||
// fast path to include all sstables if only one full range was specified.
|
||||
// For example, this happens if query only specifies a partition key.
|
||||
if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) {
|
||||
stats->clustering_filter_fast_path_count++;
|
||||
stats->surviving_sstables_after_clustering_filter += sstables.size();
|
||||
return sstables;
|
||||
}
|
||||
|
||||
auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const sstables::shared_sstable& sst) {
|
||||
return sst->may_contain_rows(ranges);
|
||||
});
|
||||
sstables.erase(skipped, sstables.end());
|
||||
stats->surviving_sstables_after_clustering_filter += sstables.size();
|
||||
|
||||
return sstables;
|
||||
}
|
||||
|
||||
// Incremental selector implementation for combined_mutation_reader that
|
||||
// selects readers on-demand as the read progresses through the token
|
||||
// range.
|
||||
class incremental_reader_selector : public reader_selector {
|
||||
const dht::partition_range* _pr;
|
||||
lw_shared_ptr<sstables::sstable_set> _sstables;
|
||||
tracing::trace_state_ptr _trace_state;
|
||||
std::optional<sstables::sstable_set::incremental_selector> _selector;
|
||||
std::unordered_set<int64_t> _read_sstable_gens;
|
||||
sstable_reader_factory_type _fn;
|
||||
|
||||
flat_mutation_reader create_reader(sstables::shared_sstable sst) {
|
||||
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
|
||||
return _fn(sst, *_pr);
|
||||
}
|
||||
|
||||
public:
|
||||
explicit incremental_reader_selector(schema_ptr s,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
sstable_reader_factory_type fn)
|
||||
: reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min())
|
||||
, _pr(&pr)
|
||||
, _sstables(std::move(sstables))
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _selector(_sstables->make_incremental_selector())
|
||||
, _fn(std::move(fn)) {
|
||||
|
||||
tlogger.trace("incremental_reader_selector {}: created for range: {} with {} sstables",
|
||||
fmt::ptr(this),
|
||||
*_pr,
|
||||
_sstables->all()->size());
|
||||
}
|
||||
|
||||
incremental_reader_selector(const incremental_reader_selector&) = delete;
|
||||
incremental_reader_selector& operator=(const incremental_reader_selector&) = delete;
|
||||
|
||||
incremental_reader_selector(incremental_reader_selector&&) = delete;
|
||||
incremental_reader_selector& operator=(incremental_reader_selector&&) = delete;
|
||||
|
||||
virtual std::vector<flat_mutation_reader> create_new_readers(const std::optional<dht::ring_position_view>& pos) override {
|
||||
tlogger.trace("incremental_reader_selector {}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos));
|
||||
|
||||
auto readers = std::vector<flat_mutation_reader>();
|
||||
|
||||
do {
|
||||
auto selection = _selector->select(_selector_position);
|
||||
_selector_position = selection.next_position;
|
||||
|
||||
tlogger.trace("incremental_reader_selector {}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(),
|
||||
_selector_position);
|
||||
|
||||
readers = boost::copy_range<std::vector<flat_mutation_reader>>(selection.sstables
|
||||
| boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; })
|
||||
| boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); }));
|
||||
} while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0));
|
||||
|
||||
tlogger.trace("incremental_reader_selector {}: created {} new readers", fmt::ptr(this), readers.size());
|
||||
|
||||
// prevents sstable_set::incremental_selector::_current_sstables from holding reference to
|
||||
// sstables when done selecting.
|
||||
if (_selector_position.is_max()) {
|
||||
_selector.reset();
|
||||
}
|
||||
|
||||
return readers;
|
||||
}
|
||||
|
||||
virtual std::vector<flat_mutation_reader> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
|
||||
_pr = ≺
|
||||
|
||||
auto pos = dht::ring_position_view::for_range_start(*_pr);
|
||||
if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) {
|
||||
return create_new_readers(pos);
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
};
|
||||
|
||||
static flat_mutation_reader
|
||||
create_single_key_sstable_reader(column_family* cf,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
utils::estimated_histogram& sstable_histogram,
|
||||
const dht::partition_range& pr, // must be singular
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
{
|
||||
auto& pk = pr.start()->value().key();
|
||||
auto key = sstables::key::from_partition_key(*schema, *pk);
|
||||
auto selected_sstables = filter_sstable_for_reader_by_pk(sstables->select(pr), *cf, schema, pr, key);
|
||||
auto num_sstables = selected_sstables.size();
|
||||
if (!num_sstables) {
|
||||
return make_empty_flat_reader(schema, permit);
|
||||
}
|
||||
auto readers = boost::copy_range<std::vector<flat_mutation_reader>>(
|
||||
filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice)
|
||||
| boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) {
|
||||
tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
|
||||
return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd);
|
||||
})
|
||||
);
|
||||
|
||||
// If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition
|
||||
// we want to emit partition_start/end if no rows were found,
|
||||
// to prevent https://github.com/scylladb/scylla/issues/3552.
|
||||
//
|
||||
// Use `flat_mutation_reader_from_mutations` with an empty mutation to emit
|
||||
// the partition_start/end pair and append it to the list of readers passed
|
||||
// to make_combined_reader to ensure partition_start/end are emitted even if
|
||||
// all sstables actually containing the partition were filtered.
|
||||
auto num_readers = readers.size();
|
||||
if (num_readers != num_sstables) {
|
||||
readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pk)}, slice, fwd));
|
||||
}
|
||||
sstable_histogram.add(num_readers);
|
||||
return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator)
|
||||
{
|
||||
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator]
|
||||
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
|
||||
return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
};
|
||||
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
|
||||
std::move(sstables),
|
||||
pr,
|
||||
std::move(trace_state),
|
||||
std::move(reader_factory_fn)),
|
||||
fwd,
|
||||
fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator)
|
||||
{
|
||||
auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_range_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc,
|
||||
std::move(trace_state), fwd, fwd_mr, monitor_generator);
|
||||
});
|
||||
return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
table::make_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
@@ -315,8 +89,9 @@ table::make_sstable_reader(schema_ptr s,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit), std::move(sstables),
|
||||
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
assert(pr.is_singular() && pr.start()->value().has_key());
|
||||
return sstables->create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit),
|
||||
_stats.estimated_sstable_per_read, pr.start()->value(), slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
} else {
|
||||
return mutation_source([sstables=std::move(sstables)] (
|
||||
@@ -328,7 +103,7 @@ table::make_sstable_reader(schema_ptr s,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_local_shard_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc,
|
||||
return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, pc,
|
||||
std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
}
|
||||
@@ -519,38 +294,6 @@ static bool belongs_to_other_shard(const std::vector<shard_id>& shards) {
|
||||
return shards.size() != size_t(belongs_to_current_shard(shards));
|
||||
}
|
||||
|
||||
flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator)
|
||||
{
|
||||
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator]
|
||||
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
|
||||
flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc,
|
||||
trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
if (sst->is_shared()) {
|
||||
auto filter = [&s = *s](const dht::decorated_key& dk) -> bool {
|
||||
return dht::shard_of(s, dk.token()) == this_shard_id();
|
||||
};
|
||||
reader = make_filtering_reader(std::move(reader), std::move(filter));
|
||||
}
|
||||
return reader;
|
||||
};
|
||||
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
|
||||
std::move(sstables),
|
||||
pr,
|
||||
std::move(trace_state),
|
||||
std::move(reader_factory_fn)),
|
||||
fwd,
|
||||
fwd_mr);
|
||||
}
|
||||
|
||||
sstables::shared_sstable table::make_sstable(sstring dir, int64_t generation, sstables::sstable_version_types v, sstables::sstable_format_types f,
|
||||
io_error_handler_gen error_handler_gen) {
|
||||
return get_sstables_manager().make_sstable(_schema, dir, generation, v, f, gc_clock::now(), error_handler_gen);
|
||||
|
||||
@@ -646,10 +646,9 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) {
|
||||
auto list_reader = make_combined_reader(s.schema(), tests::make_permit(),
|
||||
std::move(sstable_mutation_readers));
|
||||
|
||||
auto incremental_reader = make_local_shard_sstable_reader(
|
||||
auto incremental_reader = sstable_set->make_local_shard_sstable_reader(
|
||||
s.schema(),
|
||||
tests::make_permit(),
|
||||
sstable_set,
|
||||
query::full_partition_range,
|
||||
s.schema()->full_slice(),
|
||||
seastar::default_priority_class(),
|
||||
|
||||
@@ -69,7 +69,8 @@ BOOST_AUTO_TEST_CASE(test_election_single_node) {
|
||||
BOOST_CHECK(output.term);
|
||||
BOOST_CHECK(output.vote);
|
||||
BOOST_CHECK(output.messages.empty());
|
||||
BOOST_CHECK(output.log_entries.empty());
|
||||
// A new leader applies one dummy entry
|
||||
BOOST_CHECK(output.log_entries.size() == 1 && std::holds_alternative<raft::log_entry::dummy>(output.log_entries[0]->data));
|
||||
BOOST_CHECK(output.committed.empty());
|
||||
// The leader does not become candidate simply because
|
||||
// a timeout has elapsed, i.e. there are no spurious
|
||||
@@ -81,7 +82,8 @@ BOOST_AUTO_TEST_CASE(test_election_single_node) {
|
||||
BOOST_CHECK(!output.vote);
|
||||
BOOST_CHECK(output.messages.empty());
|
||||
BOOST_CHECK(output.log_entries.empty());
|
||||
BOOST_CHECK(output.committed.empty());
|
||||
// Dummy entry is now commited
|
||||
BOOST_CHECK(output.committed.size() == 1 && std::holds_alternative<raft::log_entry::dummy>(output.committed[0]->data));
|
||||
}
|
||||
|
||||
// Test that adding an entry to a single-node cluster
|
||||
|
||||
@@ -6083,9 +6083,8 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) {
|
||||
for (auto&& sst : all) {
|
||||
compacting->insert(std::move(sst));
|
||||
}
|
||||
auto reader = ::make_range_sstable_reader(s,
|
||||
auto reader = compacting->make_range_sstable_reader(s,
|
||||
tests::make_permit(),
|
||||
compacting,
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
service::get_local_compaction_priority(),
|
||||
@@ -6685,3 +6684,43 @@ SEASTAR_TEST_CASE(test_zero_estimated_partitions) {
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_may_have_partition_tombstones) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
storage_service_for_tests ssft;
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
auto pks = ss.make_pkeys(2);
|
||||
|
||||
auto tmp = tmpdir();
|
||||
unsigned gen = 0;
|
||||
for (auto version : all_sstable_versions) {
|
||||
if (version < sstable_version_types::md) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto mut1 = mutation(s, pks[0]);
|
||||
auto mut2 = mutation(s, pks[1]);
|
||||
mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
|
||||
mut1.partition().apply_delete(*s, ss.make_ckey(1), ss.new_tombstone());
|
||||
ss.add_row(mut1, ss.make_ckey(2), "val");
|
||||
ss.delete_range(mut1, query::clustering_range::make({ss.make_ckey(3)}, {ss.make_ckey(5)}));
|
||||
ss.add_row(mut2, ss.make_ckey(6), "val");
|
||||
|
||||
auto sst_gen = [&env, s, &tmp, &gen, version] () {
|
||||
return env.make_sstable(s, tmp.path().string(), ++gen, version, big);
|
||||
};
|
||||
|
||||
{
|
||||
auto sst = make_sstable_containing(sst_gen, {mut1, mut2});
|
||||
sst->load().get();
|
||||
BOOST_REQUIRE(!sst->may_have_partition_tombstones());
|
||||
}
|
||||
|
||||
mut2.partition().apply(ss.new_tombstone());
|
||||
auto sst = make_sstable_containing(sst_gen, {mut1, mut2});
|
||||
sst->load().get();
|
||||
BOOST_REQUIRE(sst->may_have_partition_tombstones());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -588,7 +588,7 @@ future<int> run_test(test_case test) {
|
||||
for (auto& s : persisted_snapshots) {
|
||||
auto& [snp, val] = s.second;
|
||||
auto digest = val.value.get_value();
|
||||
expected = sm_value_for(snp.idx).get_value();
|
||||
expected = sm_value_for(val.idx).get_value();
|
||||
if (digest != expected) {
|
||||
tlogger.debug("Persisted snapshot {} doesn't match {} != {}", snp.id, digest, expected);
|
||||
fail = -1;
|
||||
@@ -617,6 +617,8 @@ int main(int argc, char* argv[]) {
|
||||
{.name = "non_empty_leader_log", .nodes = 2,
|
||||
.initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3}}}},
|
||||
.updates = {entries{4}}},
|
||||
{.name = "non_empty_leader_log_no_new_entries", .nodes = 2, .total_values = 4,
|
||||
.initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3}}}}},
|
||||
// 1 nodes, 12 client entries
|
||||
{.name = "simple_1_auto_12", .nodes = 1,
|
||||
.initial_states = {}, .updates = {entries{12}}},
|
||||
|
||||
Reference in New Issue
Block a user