Migrate materalized views to flat_mutation_reader

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
Piotr Jastrzebski
2018-01-16 14:18:58 +01:00
parent b99dd17dcd
commit 4c74b8c7e7
4 changed files with 31 additions and 27 deletions

View File

@@ -4203,11 +4203,11 @@ std::vector<view_ptr> column_family::affected_views(const schema_ptr& base, cons
future<> column_family::generate_and_propagate_view_updates(const schema_ptr& base,
std::vector<view_ptr>&& views,
mutation&& m,
streamed_mutation_opt existings) const {
flat_mutation_reader_opt existings) const {
auto base_token = m.token();
return db::view::generate_view_updates(base,
std::move(views),
streamed_mutation_from_mutation(std::move(m)),
flat_mutation_reader_from_mutations({std::move(m)}),
std::move(existings)).then([base_token = std::move(base_token)] (auto&& updates) {
db::view::mutate_MV(std::move(base_token), std::move(updates));
});
@@ -4247,15 +4247,12 @@ future<> column_family::push_view_replica_updates(const schema_ptr& s, const fro
std::move(slice),
std::move(m),
[base, views = std::move(views), this] (auto& pk, auto& slice, auto& m) mutable {
auto reader = this->as_mutation_source()(
auto reader = this->as_mutation_source().make_flat_mutation_reader(
base,
pk,
slice,
service::get_local_sstable_query_read_priority());
auto f = reader();
return f.then([&m, reader = std::move(reader), base = std::move(base), views = std::move(views), this] (auto&& existing) mutable {
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(existing));
});
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader));
});
}

View File

@@ -783,7 +783,7 @@ private:
future<> generate_and_propagate_view_updates(const schema_ptr& base,
std::vector<view_ptr>&& views,
mutation&& m,
streamed_mutation_opt existings) const;
flat_mutation_reader_opt existings) const;
// One does not need to wait on this future if all we are interested in, is
// initiating the write. The writes initiated here will eventually

View File

@@ -467,19 +467,20 @@ void view_updates::generate_update(
class view_update_builder {
schema_ptr _schema; // The base schema
std::vector<view_updates> _view_updates;
streamed_mutation _updates;
streamed_mutation_opt _existings;
flat_mutation_reader _updates;
flat_mutation_reader_opt _existings;
range_tombstone_accumulator _update_tombstone_tracker;
range_tombstone_accumulator _existing_tombstone_tracker;
mutation_fragment_opt _update;
mutation_fragment_opt _existing;
gc_clock::time_point _now;
partition_key _key = partition_key::make_empty();
public:
view_update_builder(schema_ptr s,
std::vector<view_updates>&& views_to_update,
streamed_mutation&& updates,
streamed_mutation_opt&& existings)
flat_mutation_reader&& updates,
flat_mutation_reader_opt&& existings)
: _schema(std::move(s))
, _view_updates(std::move(views_to_update))
, _updates(std::move(updates))
@@ -487,10 +488,6 @@ public:
, _update_tombstone_tracker(*_schema, false)
, _existing_tombstone_tracker(*_schema, false)
, _now(gc_clock::now()) {
_update_tombstone_tracker.set_partition_tombstone(_updates.partition_tombstone());
if (_existings) {
_existing_tombstone_tracker.set_partition_tombstone(_existings->partition_tombstone());
}
}
future<std::vector<mutation>> build();
@@ -532,8 +529,17 @@ private:
future<std::vector<mutation>> view_update_builder::build() {
return advance_all().then([this] (auto&& ignored) {
return repeat([this] {
return this->on_results();
assert(_update && _update->is_partition_start());
_key = std::move(std::move(_update)->as_partition_start().key().key());
_update_tombstone_tracker.set_partition_tombstone(_update->as_partition_start().partition_tombstone());
if (_existing && _existing->is_partition_start()) {
_existing_tombstone_tracker.set_partition_tombstone(_existing->as_partition_start().partition_tombstone());
}
}).then([this] {
return advance_all().then([this] (auto&& ignored) {
return repeat([this] {
return this->on_results();
});
});
}).then([this] {
std::vector<mutation> mutations;
@@ -563,7 +569,7 @@ void view_update_builder::generate_update(clustering_row&& update, stdx::optiona
update.cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before);
for (auto&& v : _view_updates) {
v.generate_update(_updates.key(), update, existing, _now);
v.generate_update(_key, update, existing, _now);
}
}
@@ -574,7 +580,7 @@ static void apply_tracked_tombstones(range_tombstone_accumulator& tracker, clust
}
future<stop_iteration> view_update_builder::on_results() {
if (_update && _existing) {
if (_update && !_update->is_end_of_partition() && _existing && !_existing->is_end_of_partition()) {
int cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position());
if (cmp < 0) {
// We have an update where there was nothing before
@@ -616,7 +622,7 @@ future<stop_iteration> view_update_builder::on_results() {
_existing_tombstone_tracker.apply(std::move(*_existing).as_range_tombstone());
_update_tombstone_tracker.apply(std::move(*_update).as_range_tombstone());
} else if (_update->is_clustering_row()) {
assert(!_existing->is_range_tombstone());
assert(_existing->is_clustering_row());
apply_tracked_tombstones(_update_tombstone_tracker, _update->as_mutable_clustering_row());
apply_tracked_tombstones(_existing_tombstone_tracker, _existing->as_mutable_clustering_row());
generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() });
@@ -625,7 +631,7 @@ future<stop_iteration> view_update_builder::on_results() {
}
auto tombstone = _update_tombstone_tracker.current_tombstone();
if (tombstone && _existing) {
if (tombstone && _existing && !_existing->is_end_of_partition()) {
// We don't care if it's a range tombstone, as we're only looking for existing entries that get deleted
if (_existing->is_clustering_row()) {
auto& existing = _existing->as_clustering_row();
@@ -636,7 +642,7 @@ future<stop_iteration> view_update_builder::on_results() {
}
// If we have updates and it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it
if (_update) {
if (_update && !_update->is_end_of_partition()) {
if (_update->is_clustering_row()) {
generate_update(std::move(*_update).as_clustering_row(), { });
}
@@ -649,8 +655,8 @@ future<stop_iteration> view_update_builder::on_results() {
future<std::vector<mutation>> generate_view_updates(
const schema_ptr& base,
std::vector<view_ptr>&& views_to_update,
streamed_mutation&& updates,
streamed_mutation_opt&& existings) {
flat_mutation_reader&& updates,
flat_mutation_reader_opt&& existings) {
auto vs = boost::copy_range<std::vector<view_updates>>(views_to_update | boost::adaptors::transformed([&] (auto&& v) {
return view_updates(std::move(v), base);
}));

View File

@@ -26,6 +26,7 @@
#include "query-request.hh"
#include "schema.hh"
#include "streamed_mutation.hh"
#include "flat_mutation_reader.hh"
#include "stdx.hh"
namespace db {
@@ -82,8 +83,8 @@ bool clustering_prefix_matches(const schema& base, const partition_key& key, con
future<std::vector<mutation>> generate_view_updates(
const schema_ptr& base,
std::vector<view_ptr>&& views_to_update,
streamed_mutation&& updates,
streamed_mutation_opt&& existings);
flat_mutation_reader&& updates,
flat_mutation_reader_opt&& existings);
query::clustering_row_ranges calculate_affected_clustering_ranges(
const schema& base,