mutation_reader: make readers return streamed_mutations

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
Paweł Dziepak
2016-05-25 00:21:04 +01:00
parent 52a0b405f8
commit 737eb73499
19 changed files with 206 additions and 112 deletions

View File

@@ -163,8 +163,8 @@ logalloc::occupancy_stats column_family::occupancy() const {
}
static
bool belongs_to_current_shard(const mutation& m) {
return dht::shard_of(m.token()) == engine().cpu_id();
bool belongs_to_current_shard(const streamed_mutation& m) {
return dht::shard_of(m.decorated_key().token()) == engine().cpu_id();
}
class range_sstable_reader final : public mutation_reader::impl {
@@ -201,7 +201,7 @@ public:
range_sstable_reader(range_sstable_reader&&) = delete; // reader takes reference to member fields
virtual future<mutation_opt> operator()() override {
virtual future<streamed_mutation_opt> operator()() override {
return _reader();
}
};
@@ -229,9 +229,9 @@ public:
, _ck_filtering(ck_filtering)
{ }
virtual future<mutation_opt> operator()() override {
virtual future<streamed_mutation_opt> operator()() override {
if (_done) {
return make_ready_future<mutation_opt>();
return make_ready_future<streamed_mutation_opt>();
}
return parallel_for_each(*_sstables | boost::adaptors::map_values,
[this](const lw_shared_ptr<sstables::sstable>& sstable) {
@@ -241,7 +241,10 @@ public:
});
}).then([this] {
_done = true;
return std::move(_m);
if (!_m) {
return streamed_mutation_opt();
}
return streamed_mutation_opt(streamed_mutation_from_mutation(std::move(*_m)));
});
}
};
@@ -286,7 +289,9 @@ future<column_family::const_mutation_partition_ptr>
column_family::find_partition(schema_ptr s, const dht::decorated_key& key) const {
return do_with(query::partition_range::make_singular(key), [s = std::move(s), this] (auto& range) {
return do_with(this->make_reader(s, range), [] (mutation_reader& reader) {
return reader().then([] (mutation_opt&& mo) -> std::unique_ptr<const mutation_partition> {
return reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([] (mutation_opt&& mo) -> std::unique_ptr<const mutation_partition> {
if (!mo) {
return {};
}
@@ -385,7 +390,9 @@ column_family::for_all_partitions(schema_ptr s, Func&& func) const {
return do_with(iteration_state(std::move(s), *this, std::move(func)), [] (iteration_state& is) {
return do_until([&is] { return is.done(); }, [&is] {
return is.reader().then([&is](mutation_opt&& mo) {
return is.reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([&is](mutation_opt&& mo) {
if (!mo) {
is.empty = true;
} else {

View File

@@ -122,7 +122,7 @@ class key_from_mutation_reader final : public key_reader::impl {
public:
key_from_mutation_reader(mutation_reader&& reader) : _reader(std::move(reader)) { }
virtual future<dht::decorated_key_opt> operator()() override {
return _reader().then([] (mutation_opt&& mo) {
return _reader().then([] (streamed_mutation_opt&& mo) {
if (mo) {
return dht::decorated_key_opt(std::move(mo->decorated_key()));
} else {

View File

@@ -161,7 +161,7 @@ public:
, _ck_filtering(ck_filtering)
{ }
virtual future<mutation_opt> operator()() override {
virtual future<streamed_mutation_opt> operator()() override {
if (_delegate_range) {
return _delegate();
}
@@ -181,13 +181,13 @@ public:
managed_bytes::linearization_context_guard lcg;
update_iterators();
if (_i == _end) {
return make_ready_future<mutation_opt>(stdx::nullopt);
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
}
partition_entry& e = *_i;
++_i;
_last = e.key();
_memtable->upgrade_entry(e);
return make_ready_future<mutation_opt>(e.read(_schema, _ck_filtering));
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(e.read(_schema, _ck_filtering)));
}
};

View File

@@ -28,11 +28,18 @@
namespace stdx = std::experimental;
template<typename T>
T move_and_clear(T& obj) {
T x = std::move(obj);
obj = T();
return x;
}
// Combines multiple mutation_readers into one.
class combined_reader final : public mutation_reader::impl {
std::vector<mutation_reader> _readers;
struct mutation_and_reader {
mutation m;
streamed_mutation m;
mutation_reader* read;
};
std::vector<mutation_and_reader> _ptables;
@@ -42,7 +49,7 @@ class combined_reader final : public mutation_reader::impl {
// order of comparison is inverted, because heaps produce greatest value first
return b.m.decorated_key().less_compare(*s, a.m.decorated_key());
}
mutation_opt _current;
std::vector<streamed_mutation> _current;
bool _inited = false;
private:
// Produces next mutation or disengaged optional if there are no more.
@@ -50,23 +57,26 @@ private:
// Entry conditions:
// - either _ptables is empty or_ptables.back() is the next item to be consumed.
// - the _ptables heap is in invalid state (if not empty), waiting for pop_back or push_heap.
future<mutation_opt> next() {
future<streamed_mutation_opt> next() {
if (_ptables.empty()) {
return make_ready_future<mutation_opt>(move_and_disengage(_current));
if (_current.empty()) {
return make_ready_future<streamed_mutation_opt>();
}
return make_ready_future<streamed_mutation_opt>(merge_mutations(move_and_clear(_current)));
};
auto& candidate = _ptables.back();
mutation& m = candidate.m;
streamed_mutation& m = candidate.m;
if (_current && !_current->decorated_key().equal(*m.schema(), m.decorated_key())) {
if (!_current.empty() && !_current.back().decorated_key().equal(*m.schema(), m.decorated_key())) {
// key has changed, so emit accumulated mutation
return make_ready_future<mutation_opt>(move_and_disengage(_current));
return make_ready_future<streamed_mutation_opt>(merge_mutations(move_and_clear(_current)));
}
apply(_current, std::move(m));
_current.emplace_back(std::move(m));
return (*candidate.read)().then([this] (mutation_opt&& more) {
return (*candidate.read)().then([this] (streamed_mutation_opt&& more) {
// Restore heap to valid state
if (!more) {
_ptables.pop_back();
@@ -84,10 +94,10 @@ public:
: _readers(std::move(readers))
{ }
virtual future<mutation_opt> operator()() override {
virtual future<streamed_mutation_opt> operator()() override {
if (!_inited) {
return parallel_for_each(_readers, [this] (mutation_reader& reader) {
return reader().then([this, &reader](mutation_opt&& m) {
return reader().then([this, &reader](streamed_mutation_opt&& m) {
if (m) {
_ptables.push_back({std::move(*m), &reader});
}
@@ -119,50 +129,63 @@ make_combined_reader(mutation_reader&& a, mutation_reader&& b) {
}
class reader_returning final : public mutation_reader::impl {
mutation _m;
streamed_mutation _m;
bool _done = false;
public:
reader_returning(mutation m) : _m(std::move(m)) {
reader_returning(streamed_mutation m) : _m(std::move(m)) {
}
virtual future<mutation_opt> operator()() override {
virtual future<streamed_mutation_opt> operator()() override {
if (_done) {
return make_ready_future<mutation_opt>();
return make_ready_future<streamed_mutation_opt>();
} else {
_done = true;
return make_ready_future<mutation_opt>(std::move(_m));
return make_ready_future<streamed_mutation_opt>(std::move(_m));
}
}
};
mutation_reader make_reader_returning(mutation m) {
return make_mutation_reader<reader_returning>(streamed_mutation_from_mutation(std::move(m)));
}
mutation_reader make_reader_returning(streamed_mutation m) {
return make_mutation_reader<reader_returning>(std::move(m));
}
class reader_returning_many final : public mutation_reader::impl {
std::vector<mutation> _m;
std::vector<streamed_mutation> _m;
bool _done = false;
public:
reader_returning_many(std::vector<mutation> m) : _m(std::move(m)) {
reader_returning_many(std::vector<streamed_mutation> m) : _m(std::move(m)) {
boost::range::reverse(_m);
}
virtual future<mutation_opt> operator()() override {
virtual future<streamed_mutation_opt> operator()() override {
if (_m.empty()) {
return make_ready_future<mutation_opt>();
return make_ready_future<streamed_mutation_opt>();
}
auto m = std::move(_m.back());
_m.pop_back();
return make_ready_future<mutation_opt>(std::move(m));
return make_ready_future<streamed_mutation_opt>(std::move(m));
}
};
mutation_reader make_reader_returning_many(std::vector<mutation> mutations) {
std::vector<streamed_mutation> streamed_mutations;
streamed_mutations.reserve(mutations.size());
for (auto& m : mutations) {
streamed_mutations.emplace_back(streamed_mutation_from_mutation(std::move(m)));
}
return make_mutation_reader<reader_returning_many>(std::move(streamed_mutations));
}
mutation_reader make_reader_returning_many(std::vector<streamed_mutation> mutations) {
return make_mutation_reader<reader_returning_many>(std::move(mutations));
}
class empty_reader final : public mutation_reader::impl {
public:
virtual future<mutation_opt> operator()() override {
return make_ready_future<mutation_opt>();
virtual future<streamed_mutation_opt> operator()() override {
return make_ready_future<streamed_mutation_opt>();
}
};

View File

@@ -44,12 +44,12 @@ public:
class impl {
public:
virtual ~impl() {}
virtual future<mutation_opt> operator()() = 0;
virtual future<streamed_mutation_opt> operator()() = 0;
};
private:
class null_impl final : public impl {
public:
virtual future<mutation_opt> operator()() override { throw std::bad_function_call(); }
virtual future<streamed_mutation_opt> operator()() override { throw std::bad_function_call(); }
};
private:
std::unique_ptr<impl> _impl;
@@ -60,7 +60,7 @@ public:
mutation_reader(const mutation_reader&) = delete;
mutation_reader& operator=(mutation_reader&&) = default;
mutation_reader& operator=(const mutation_reader&) = delete;
future<mutation_opt> operator()() { return _impl->operator()(); }
future<streamed_mutation_opt> operator()() { return _impl->operator()(); }
};
// Impl: derived from mutation_reader::impl; Args/args: arguments for Impl's constructor
@@ -78,22 +78,32 @@ mutation_reader make_combined_reader(std::vector<mutation_reader>);
mutation_reader make_combined_reader(mutation_reader&& a, mutation_reader&& b);
// reads from the input readers, in order
mutation_reader make_reader_returning(mutation);
mutation_reader make_reader_returning(streamed_mutation);
mutation_reader make_reader_returning_many(std::vector<mutation>);
mutation_reader make_reader_returning_many(std::vector<streamed_mutation>);
mutation_reader make_empty_reader();
/*
template<typename T>
concept bool StreamedMutationFilter() {
return requires(T t, const streamed_mutation& sm) {
{ t(sm) } -> bool;
};
}
*/
template <typename MutationFilter>
class filtering_reader : public mutation_reader::impl {
mutation_reader _rd;
MutationFilter _filter;
mutation_opt _current;
static_assert(std::is_same<bool, std::result_of_t<MutationFilter(const mutation&)>>::value, "bad MutationFilter signature");
streamed_mutation_opt _current;
static_assert(std::is_same<bool, std::result_of_t<MutationFilter(const streamed_mutation&)>>::value, "bad MutationFilter signature");
public:
filtering_reader(mutation_reader rd, MutationFilter&& filter)
: _rd(std::move(rd)), _filter(std::forward<MutationFilter>(filter)) {
}
virtual future<mutation_opt> operator()() override {\
virtual future<streamed_mutation_opt> operator()() override {\
return repeat([this] {
return _rd().then([this] (mutation_opt&& mo) mutable {
return _rd().then([this] (streamed_mutation_opt&& mo) mutable {
if (!mo) {
_current = std::move(mo);
return stop_iteration::yes;
@@ -106,7 +116,7 @@ public:
}
});
}).then([this] {
return make_ready_future<mutation_opt>(std::move(_current));
return make_ready_future<streamed_mutation_opt>(std::move(_current));
});
};
};
@@ -133,7 +143,9 @@ future<> consume(mutation_reader& reader, Consumer consumer) {
return do_with(std::move(consumer), [&reader] (Consumer& c) -> future<> {
return repeat([&reader, &c] () {
return reader().then([&c] (mutation_opt&& mo) -> future<stop_iteration> {
return reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([&c] (mutation_opt&& mo) -> future<stop_iteration> {
if (!mo) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}

View File

@@ -334,7 +334,9 @@ static future<partition_checksum> checksum_range_shard(database &db,
return do_with(std::move(reader), partition_checksum(),
[] (auto& reader, auto& checksum) {
return repeat([&reader, &checksum] () {
return reader().then([&checksum] (auto mopt) {
return reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([&checksum] (auto mopt) {
if (mopt) {
checksum.add(partition_checksum(*mopt));
return stop_iteration::no;

View File

@@ -189,13 +189,16 @@ public:
, _delegate(std::move(delegate))
{ }
virtual future<mutation_opt> operator()() override {
return _delegate().then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) {
virtual future<streamed_mutation_opt> operator()() override {
return _delegate().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt {
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
return streamed_mutation_from_mutation(std::move(*mo));
}
return std::move(mo);
return { };
});
}
};
@@ -258,18 +261,18 @@ public:
just_cache_scanning_reader(schema_ptr s, row_cache& cache, const query::partition_range& range)
: _schema(std::move(s)), _cache(cache), _range(range)
{ }
virtual future<mutation_opt> operator()() override {
virtual future<streamed_mutation_opt> operator()() override {
return _cache._read_section(_cache._tracker.region(), [this] {
return with_linearized_managed_bytes([&] {
update_iterators();
if (_it == _end) {
return make_ready_future<mutation_opt>();
return make_ready_future<streamed_mutation_opt>();
}
auto& ce = *_it;
++_it;
_last = ce.key();
_cache.upgrade_entry(ce);
return make_ready_future<mutation_opt>(ce.read(_schema));
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(ce.read(_schema)));
});
});
}
@@ -280,7 +283,7 @@ class scanning_and_populating_reader final : public mutation_reader::impl {
schema_ptr _schema;
mutation_reader _primary;
bool _secondary_only = false;
mutation_opt _next_primary;
streamed_mutation_opt _next_primary;
mutation_source& _underlying;
mutation_reader _secondary;
utils::phased_barrier::phase_type _secondary_phase;
@@ -302,7 +305,7 @@ public:
_keys(_underlying_keys(range, pc)),
_pc(pc)
{ }
virtual future<mutation_opt> operator()() override {
virtual future<streamed_mutation_opt> operator()() override {
// FIXME: store in cache information whether the immediate successor
// of the current entry is present. As long as it is consulting
// index_reader is not necessary.
@@ -310,9 +313,9 @@ public:
return next_secondary();
}
return next_key().then([this] (dht::decorated_key_opt dk) mutable {
return _primary().then([this, dk = std::move(dk)] (mutation_opt&& mo) {
return _primary().then([this, dk = std::move(dk)] (streamed_mutation_opt&& mo) {
if (!mo && !dk) {
return make_ready_future<mutation_opt>();
return make_ready_future<streamed_mutation_opt>();
}
if (mo) {
auto cmp = dk ? dk->tri_compare(*_schema, mo->decorated_key()) : 0;
@@ -321,7 +324,7 @@ public:
_next_key = std::move(dk);
}
_cache.on_hit();
return make_ready_future<mutation_opt>(std::move(mo));
return make_ready_future<streamed_mutation_opt>(std::move(mo));
}
}
_next_primary = std::move(mo);
@@ -342,7 +345,7 @@ public:
});
}
private:
future<mutation_opt> next_secondary() {
future<streamed_mutation_opt> next_secondary() {
if (_secondary_phase != _cache._populate_phaser.phase()) {
assert(_last_secondary_key);
auto cmp = dht::ring_position_comparator(*_schema);
@@ -350,7 +353,9 @@ private:
_secondary_phase = _cache._populate_phaser.phase();
_secondary = _underlying(_cache._schema, _range, query::no_clustering_key_filtering, _pc);
}
return _secondary().then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) {
return _secondary().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt {
if (!mo && _next_primary) {
auto cmp = dht::ring_position_comparator(*_schema);
_range = _original_range.split_after(_next_primary->decorated_key(), cmp);
@@ -359,13 +364,14 @@ private:
_cache.on_hit();
return std::move(_next_primary);
}
_cache.on_miss();
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
_last_secondary_key = mo->decorated_key();
return streamed_mutation_from_mutation(std::move(*mo));
}
_cache.on_miss();
return std::move(mo);
return { };
});
}
future<dht::decorated_key_opt> next_key() {
@@ -392,9 +398,9 @@ private:
mutation_reader _underlying;
query::clustering_key_filtering_context _ck_filtering;
future<mutation_opt> filter(mutation_opt&& mut) {
future<streamed_mutation_opt> filter(mutation_opt&& mut) {
if (!mut) {
return make_ready_future<mutation_opt>();
return make_ready_future<streamed_mutation_opt>();
}
const query::clustering_row_ranges& ck_ranges = _ck_filtering.get_ranges(mut->key());
@@ -402,7 +408,7 @@ private:
if (!filtered_partition.empty()) {
mut->partition() = std::move(filtered_partition);
return make_ready_future<mutation_opt>(std::move(mut));
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*mut)));
}
return operator()();
@@ -412,8 +418,10 @@ public:
slicing_reader(mutation_reader&& reader, query::clustering_key_filtering_context ck_filtering)
: _underlying(std::move(reader)), _ck_filtering(std::move(ck_filtering)) {}
virtual future<mutation_opt> operator()() override {
return _underlying().then([this] (mutation_opt&& mut) { return filter(std::move(mut)); });
virtual future<streamed_mutation_opt> operator()() override {
return _underlying().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([this] (mutation_opt&& mut) { return filter(std::move(mut)); });
}
};

View File

@@ -35,7 +35,12 @@ public:
: _sst(sst)
, _smr(sst->read_range_rows(std::move(s), pr, ck_filtering, pc)) {
}
virtual future<mutation_opt> operator()() override {
return _smr.read();
virtual future<streamed_mutation_opt> operator()() override {
return _smr.read().then([] (auto mopt) -> streamed_mutation_opt {
if (!mopt) {
return { };
}
return streamed_mutation_from_mutation(std::move(*mopt));
});
}
};

View File

@@ -73,11 +73,16 @@ public:
: _sst(std::move(sst))
, _reader(_sst->read_rows(schema, service::get_local_compaction_priority()))
{}
virtual future<mutation_opt> operator()() override {
virtual future<streamed_mutation_opt> operator()() override {
return _reader.read().handle_exception([sst = _sst] (auto ep) {
logger.error("Compaction found an exception when reading sstable {} : {}",
sst->get_filename(), ep);
return make_exception_future<mutation_opt>(ep);
}).then([] (auto mo) -> streamed_mutation_opt {
if (!mo) {
return { };
}
return streamed_mutation_from_mutation(std::move(*mo));
});
}
};
@@ -201,10 +206,12 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
, _cleanup(cleanup)
{ }
virtual future<mutation_opt> operator()() override {
return _reader().then([this] (mutation_opt m) {
virtual future<streamed_mutation_opt> operator()() override {
return _reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([this] (mutation_opt m) {
if (!bool(m)) {
return make_ready_future<mutation_opt>(std::move(m));
return make_ready_future<streamed_mutation_opt>();
}
// Filter out mutation that doesn't belong to current shard.
if (dht::shard_of(m->token()) != engine().cpu_id()) {
@@ -216,7 +223,7 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
auto max_purgeable = get_max_purgeable_timestamp(_schema, _not_compacted_sstables, m->decorated_key());
m->partition().compact_for_compaction(*_schema, max_purgeable, _now);
if (!m->partition().empty()) {
return make_ready_future<mutation_opt>(std::move(m));
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*m)));
}
return operator()();
});
@@ -249,7 +256,9 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
// Compaction manager will catch this exception and re-schedule the compaction.
throw compaction_stop_exception(info->ks, info->cf, info->stop_requested);
}
return reader().then([output_writer, info] (auto mopt) {
return reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([output_writer, info] (auto mopt) {
if (mopt) {
info->total_keys_written++;
return output_writer->write(std::move(*mopt)).then([] {
@@ -264,9 +273,12 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
struct queue_reader final : public ::mutation_reader::impl {
lw_shared_ptr<seastar::pipe_reader<mutation>> pr;
queue_reader(lw_shared_ptr<seastar::pipe_reader<mutation>> pr) : pr(std::move(pr)) {}
virtual future<mutation_opt> operator()() override {
virtual future<streamed_mutation_opt> operator()() override {
return pr->read().then([] (std::experimental::optional<mutation> m) mutable {
return make_ready_future<mutation_opt>(std::move(m));
if (!m) {
return streamed_mutation_opt();
}
return streamed_mutation_opt(streamed_mutation_from_mutation(std::move(*m)));
});
}
};

View File

@@ -1412,7 +1412,7 @@ void sstable::do_write_components(::mutation_reader mr,
// Iterate through CQL partitions, then CQL rows, then CQL columns.
// Each mt.all_partitions() entry is a set of clustered rows sharing the same partition key.
while (get_offset() < max_sstable_size) {
mutation_opt mut = mr().get0();
mutation_opt mut = mr().then([] (auto sm) { return mutation_from_streamed_mutation(std::move(sm)); }).get0();
if (!mut) {
break;
}

View File

@@ -109,7 +109,9 @@ future<> send_mutations(auto si) {
return do_with(cf.make_reader(cf.schema(), si->pr, query::no_clustering_key_filtering, priority), [si] (auto& reader) {
return repeat([si, &reader] {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
return reader().then([si] (auto mopt) {
return reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([si] (auto mopt) {
if (mopt && si->db.column_family_exists(si->cf_id)) {
si->mutations_nr++;
auto fm = frozen_mutation(*mopt);

View File

@@ -34,7 +34,9 @@ public:
{ }
reader_assertions& produces(mutation m) {
_reader().then([this, m = std::move(m)] (mutation_opt&& mo) mutable {
_reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([this, m = std::move(m)] (mutation_opt&& mo) mutable {
BOOST_REQUIRE(bool(mo));
assert_that(*mo).is_equal_to(m);
}).get0();
@@ -42,7 +44,9 @@ public:
}
mutation_assertion next_mutation() {
return _reader().then([] (mutation_opt&& mo) mutable {
return _reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([] (mutation_opt&& mo) mutable {
BOOST_REQUIRE(bool(mo));
return mutation_assertion(std::move(*mo));
}).get0();
@@ -57,7 +61,9 @@ public:
}
reader_assertions& produces_end_of_stream() {
_reader().then([this] (mutation_opt&& mo) mutable {
_reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([this] (mutation_opt&& mo) mutable {
if (bool(mo)) {
BOOST_FAIL(sprint("Expected end of stream, got %s", *mo));
}

View File

@@ -139,7 +139,7 @@ SEASTAR_TEST_CASE(test_filtering) {
// All pass
assert_that(make_filtering_reader(make_reader_returning_many({m1, m2, m3, m4}),
[] (const mutation& m) { return true; }))
[] (const streamed_mutation& m) { return true; }))
.produces(m1)
.produces(m2)
.produces(m3)
@@ -148,47 +148,47 @@ SEASTAR_TEST_CASE(test_filtering) {
// None pass
assert_that(make_filtering_reader(make_reader_returning_many({m1, m2, m3, m4}),
[] (const mutation& m) { return false; }))
[] (const streamed_mutation& m) { return false; }))
.produces_end_of_stream();
// Trim front
assert_that(make_filtering_reader(make_reader_returning_many({m1, m2, m3, m4}),
[&] (const mutation& m) { return !m.key().equal(*s, m1.key()); }))
[&] (const streamed_mutation& m) { return !m.key().equal(*s, m1.key()); }))
.produces(m2)
.produces(m3)
.produces(m4)
.produces_end_of_stream();
assert_that(make_filtering_reader(make_reader_returning_many({m1, m2, m3, m4}),
[&] (const mutation& m) { return !m.key().equal(*s, m1.key()) && !m.key().equal(*s, m2.key()); }))
[&] (const streamed_mutation& m) { return !m.key().equal(*s, m1.key()) && !m.key().equal(*s, m2.key()); }))
.produces(m3)
.produces(m4)
.produces_end_of_stream();
// Trim back
assert_that(make_filtering_reader(make_reader_returning_many({m1, m2, m3, m4}),
[&] (const mutation& m) { return !m.key().equal(*s, m4.key()); }))
[&] (const streamed_mutation& m) { return !m.key().equal(*s, m4.key()); }))
.produces(m1)
.produces(m2)
.produces(m3)
.produces_end_of_stream();
assert_that(make_filtering_reader(make_reader_returning_many({m1, m2, m3, m4}),
[&] (const mutation& m) { return !m.key().equal(*s, m4.key()) && !m.key().equal(*s, m3.key()); }))
[&] (const streamed_mutation& m) { return !m.key().equal(*s, m4.key()) && !m.key().equal(*s, m3.key()); }))
.produces(m1)
.produces(m2)
.produces_end_of_stream();
// Trim middle
assert_that(make_filtering_reader(make_reader_returning_many({m1, m2, m3, m4}),
[&] (const mutation& m) { return !m.key().equal(*s, m3.key()); }))
[&] (const streamed_mutation& m) { return !m.key().equal(*s, m3.key()); }))
.produces(m1)
.produces(m2)
.produces(m4)
.produces_end_of_stream();
assert_that(make_filtering_reader(make_reader_returning_many({m1, m2, m3, m4}),
[&] (const mutation& m) { return !m.key().equal(*s, m2.key()) && !m.key().equal(*s, m3.key()); }))
[&] (const streamed_mutation& m) { return !m.key().equal(*s, m2.key()) && !m.key().equal(*s, m3.key()); }))
.produces(m1)
.produces(m4)
.produces_end_of_stream();

View File

@@ -64,7 +64,7 @@ static atomic_cell make_atomic_cell(bytes value) {
static mutation_partition get_partition(memtable& mt, const partition_key& key) {
auto dk = dht::global_partitioner().decorate_key(*mt.schema(), key);
auto reader = mt.make_reader(mt.schema(), query::partition_range::make_singular(dk));
auto mo = reader().get0();
auto mo = mutation_from_streamed_mutation(reader().get0()).get0();
BOOST_REQUIRE(bool(mo));
return std::move(mo->partition());
}

View File

@@ -193,7 +193,7 @@ int main(int argc, char** argv) {
for (auto&& key : keys) {
auto range = query::partition_range::make_singular(key);
auto reader = cache.make_reader(s, range);
auto mo = reader().get0();
auto mo = mutation_from_streamed_mutation(reader().get0()).get0();
assert(mo);
assert(mo->partition().live_row_count(*s) ==
row_count + 1 /* one row was already in cache before update()*/);

View File

@@ -434,7 +434,7 @@ SEASTAR_TEST_CASE(test_update_failure) {
// verify that there are no stale partitions
auto reader = cache.make_reader(s, query::partition_range::make_open_ended_both_sides());
for (int i = 0; i < partition_count; i++) {
auto mopt = reader().get0();
auto mopt = mutation_from_streamed_mutation(reader().get0()).get0();
if (!mopt) {
break;
}
@@ -498,7 +498,7 @@ private:
, _reader(std::move(r))
{}
virtual future<mutation_opt> operator()() override {
virtual future<streamed_mutation_opt> operator()() override {
return _reader().finally([this] () {
return _throttle.enter();
});

View File

@@ -1085,7 +1085,9 @@ SEASTAR_TEST_CASE(compact) {
// nadav - deleted partition
return open_sstable("tests/sstables/tests-temporary", generation).then([s] (shared_sstable sst) {
auto reader = make_lw_shared(sstable_reader(sst, s)); // reader holds sst and s alive.
return (*reader)().then([reader, s] (mutation_opt m) {
return (*reader)().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([reader, s] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("jerry")))));
BOOST_REQUIRE(!m->partition().partition_tombstone());
@@ -1097,6 +1099,8 @@ SEASTAR_TEST_CASE(compact) {
BOOST_REQUIRE(cells.cell_at(s->get_column_definition("age")->id).as_atomic_cell().value() == bytes({0,0,0,40}));
BOOST_REQUIRE(cells.cell_at(s->get_column_definition("height")->id).as_atomic_cell().value() == bytes({0,0,0,(char)170}));
return (*reader)();
}).then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([reader, s] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("tom")))));
@@ -1109,6 +1113,8 @@ SEASTAR_TEST_CASE(compact) {
BOOST_REQUIRE(cells.cell_at(s->get_column_definition("age")->id).as_atomic_cell().value() == bytes({0,0,0,20}));
BOOST_REQUIRE(cells.cell_at(s->get_column_definition("height")->id).as_atomic_cell().value() == bytes({0,0,0,(char)180}));
return (*reader)();
}).then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([reader, s] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("john")))));
@@ -1121,6 +1127,8 @@ SEASTAR_TEST_CASE(compact) {
BOOST_REQUIRE(cells.cell_at(s->get_column_definition("age")->id).as_atomic_cell().value() == bytes({0,0,0,20}));
BOOST_REQUIRE(cells.find_cell(s->get_column_definition("height")->id) == nullptr);
return (*reader)();
}).then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([reader, s] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("nadav")))));
@@ -1128,7 +1136,7 @@ SEASTAR_TEST_CASE(compact) {
auto &rows = m->partition().clustered_rows();
BOOST_REQUIRE(rows.size() == 0);
return (*reader)();
}).then([reader] (mutation_opt m) {
}).then([reader] (streamed_mutation_opt m) {
BOOST_REQUIRE(!m);
});
});
@@ -1271,7 +1279,7 @@ static future<> check_compacted_sstables(unsigned long generation, std::vector<u
return do_with(std::move(reader), [generations, s, keys] (::mutation_reader& reader) {
return do_for_each(*generations, [&reader, s, keys] (unsigned long generation) mutable {
return reader().then([generation, keys] (mutation_opt m) {
return reader().then([generation, keys] (streamed_mutation_opt m) {
BOOST_REQUIRE(m);
keys->push_back(m->key());
});
@@ -1574,7 +1582,7 @@ SEASTAR_TEST_CASE(datafile_generation_47) {
return reusable_sst("tests/sstables/tests-temporary", 47).then([s] (auto sstp) mutable {
auto reader = make_lw_shared(sstable_reader(sstp, s));
return repeat([reader] {
return (*reader)().then([] (mutation_opt m) {
return (*reader)().then([] (streamed_mutation_opt m) {
if (!m) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
@@ -2215,17 +2223,17 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
auto sst = (*sstables)[0];
BOOST_REQUIRE(sst->generation() == 1);
auto reader = make_lw_shared(sstable_reader(sst, s));
return (*reader)().then([s, reader] (mutation_opt m) {
return (*reader)().then([s, reader] (streamed_mutation_opt m) {
BOOST_REQUIRE(m);
auto beta = partition_key::from_exploded(*s, {to_bytes("beta")});
BOOST_REQUIRE(m->key().equal(*s, beta));
return (*reader)();
}).then([s, reader] (mutation_opt m) {
}).then([s, reader] (streamed_mutation_opt m) {
BOOST_REQUIRE(m);
auto alpha = partition_key::from_exploded(*s, {to_bytes("alpha")});
BOOST_REQUIRE(m->key().equal(*s, alpha));
return (*reader)();
}).then([reader] (mutation_opt m) {
}).then([reader] (streamed_mutation_opt m) {
BOOST_REQUIRE(!m);
});
}).then([s, sstables, tomb] {
@@ -2233,13 +2241,13 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
auto sst = (*sstables)[1];
BOOST_REQUIRE(sst->generation() == 2);
auto reader = make_lw_shared(sstable_reader(sst, s));
return (*reader)().then([s, reader, tomb] (mutation_opt m) {
return (*reader)().then([s, reader, tomb] (streamed_mutation_opt m) {
BOOST_REQUIRE(m);
auto alpha = partition_key::from_exploded(*s, {to_bytes("alpha")});
BOOST_REQUIRE(m->key().equal(*s, alpha));
BOOST_REQUIRE(m->partition().partition_tombstone() == tomb);
BOOST_REQUIRE(m->partition_tombstone() == tomb);
return (*reader)();
}).then([reader] (mutation_opt m) {
}).then([reader] (streamed_mutation_opt m) {
BOOST_REQUIRE(!m);
});
}).then([s, tmp, sstables] {
@@ -2253,13 +2261,13 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
return sstables::compact_sstables(*sstables, *cf, create, std::numeric_limits<uint64_t>::max(), 0).then([s, tmp, sstables, cf, cm] (auto) {
return open_sstable(tmp->path, 3).then([s] (shared_sstable sst) {
auto reader = make_lw_shared(sstable_reader(sst, s)); // reader holds sst and s alive.
return (*reader)().then([s, reader] (mutation_opt m) {
return (*reader)().then([s, reader] (streamed_mutation_opt m) {
BOOST_REQUIRE(m);
auto beta = partition_key::from_exploded(*s, {to_bytes("beta")});
BOOST_REQUIRE(m->key().equal(*s, beta));
BOOST_REQUIRE(!m->partition().partition_tombstone());
BOOST_REQUIRE(!m->partition_tombstone());
return (*reader)();
}).then([reader] (mutation_opt m) {
}).then([reader] (streamed_mutation_opt m) {
BOOST_REQUIRE(!m);
});
});
@@ -2296,7 +2304,9 @@ SEASTAR_TEST_CASE(check_multi_schema) {
auto f = sst->load();
return f.then([sst, s] {
auto reader = make_lw_shared(sstable_reader(sst, s));
return (*reader)().then([reader, s] (mutation_opt m) {
return (*reader)().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([reader, s] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, 0)));
auto& rows = m->partition().clustered_rows();
@@ -2307,7 +2317,7 @@ SEASTAR_TEST_CASE(check_multi_schema) {
BOOST_REQUIRE_EQUAL(cells.size(), 1);
BOOST_REQUIRE_EQUAL(cells.cell_at(s->get_column_definition("e")->id).as_atomic_cell().value(), int32_type->decompose(5));
return (*reader)();
}).then([reader, s] (mutation_opt m) {
}).then([reader, s] (streamed_mutation_opt m) {
BOOST_REQUIRE(!m);
});
});
@@ -2356,12 +2366,12 @@ SEASTAR_TEST_CASE(sstable_rewrite) {
auto newsst = (*new_tables)[0];
BOOST_REQUIRE(newsst->generation() == 52);
auto reader = make_lw_shared(sstable_reader(newsst, s));
return (*reader)().then([s, reader, key] (mutation_opt m) {
return (*reader)().then([s, reader, key] (streamed_mutation_opt m) {
BOOST_REQUIRE(m);
auto pkey = partition_key::from_exploded(*s, {to_bytes(key)});
BOOST_REQUIRE(m->key().equal(*s, pkey));
return (*reader)();
}).then([reader] (mutation_opt m) {
}).then([reader] (streamed_mutation_opt m) {
BOOST_REQUIRE(!m);
});
}).then([cm, cf] {});

View File

@@ -481,7 +481,9 @@ SEASTAR_TEST_CASE(broken_ranges_collection) {
auto s = peers_schema();
auto reader = make_lw_shared<::mutation_reader>(as_mutation_reader(sstp, sstp->read_rows(s)));
return repeat([s, reader] {
return (*reader)().then([s, reader] (mutation_opt mut) {
return (*reader)().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([s, reader] (mutation_opt mut) {
auto key_equal = [s, &mut] (sstring ip) {
return mut->key().equal(*s, partition_key::from_deeply_exploded(*s, { net::ipv4_address(ip) }));
};

View File

@@ -585,8 +585,13 @@ struct test_mutation_reader final : public ::mutation_reader::impl {
public:
test_mutation_reader(sstables::shared_sstable sst, sstables::mutation_reader rd)
: _sst(std::move(sst)), _rd(std::move(rd)) {}
virtual future<mutation_opt> operator()() override {
return _rd.read();
virtual future<streamed_mutation_opt> operator()() override {
return _rd.read().then([] (auto m) -> streamed_mutation_opt {
if (!m) {
return { };
}
return streamed_mutation_from_mutation(std::move(*m));
});
}
};