view: Add view_update_builder class

This patch adds the view_update_builder class, which is responsible
for calculating the mutations to apply to a column family's
materialized views, given a streamed_mutation representing an update
to the base table and a streamed_mutation representing the
pre-existing rows which the update covers.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2016-12-29 11:34:00 +00:00
parent 2ab9ba995a
commit d5a61a8c48
2 changed files with 194 additions and 0 deletions

View File

@@ -422,6 +422,193 @@ void view_updates::generate_update(
}
}
class view_update_builder {
schema_ptr _schema; // The base schema
std::vector<view_updates> _view_updates;
streamed_mutation _updates;
streamed_mutation _existings;
range_tombstone_accumulator _update_tombstone_tracker;
range_tombstone_accumulator _existing_tombstone_tracker;
mutation_fragment_opt _update;
mutation_fragment_opt _existing;
gc_clock::time_point _now;
public:
view_update_builder(schema_ptr s,
std::vector<view_updates>&& views_to_update,
streamed_mutation&& updates,
streamed_mutation&& existings)
: _schema(std::move(s))
, _view_updates(std::move(views_to_update))
, _updates(std::move(updates))
, _existings(std::move(existings))
, _update_tombstone_tracker(*_schema, false)
, _existing_tombstone_tracker(*_schema, false)
, _now(gc_clock::now()) {
_update_tombstone_tracker.set_partition_tombstone(_updates.partition_tombstone());
_existing_tombstone_tracker.set_partition_tombstone(_existings.partition_tombstone());
}
future<std::vector<mutation>> build();
private:
void generate_update(clustering_row&& update, stdx::optional<clustering_row>&& existing);
future<stop_iteration> on_results();
future<stop_iteration> advance_all() {
return when_all(_updates(), _existings()).then([this] (auto&& fragments) mutable {
_update = std::move(std::get<mutation_fragment_opt>(std::get<0>(fragments).get()));
_existing = std::move(std::get<mutation_fragment_opt>(std::get<1>(fragments).get()));
return stop_iteration::no;
});
}
future<stop_iteration> advance_updates() {
return _updates().then([this] (auto&& update) mutable {
_update = std::move(update);
return stop_iteration::no;
});
}
future<stop_iteration> advance_existings() {
return _existings().then([this] (auto&& existing) mutable {
_existing = std::move(existing);
return stop_iteration::no;
});
}
future<stop_iteration> stop() const {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
};
future<std::vector<mutation>> view_update_builder::build() {
return advance_all().then([this] (auto&& ignored) {
return repeat([this] {
return this->on_results();
});
}).then([this] {
std::vector<mutation> mutations;
for (auto&& update : _view_updates) {
std::move(update).move_to(mutations);
}
return mutations;
});
}
void view_update_builder::generate_update(clustering_row&& update, stdx::optional<clustering_row>&& existing) {
// If we have no update at all, we shouldn't get there.
if (update.empty()) {
throw std::logic_error("Empty materialized view updated");
}
auto gc_before = _now - _schema->gc_grace_seconds();
// We allow existing to be disengaged, which we treat the same as an empty row.
if (existing) {
existing->marker().compact_and_expire(tombstone(), _now, always_gc, gc_before);
existing->cells().compact_and_expire(*_schema, column_kind::regular_column, tombstone(), _now, always_gc, gc_before);
update.apply(*_schema, *existing);
}
update.marker().compact_and_expire(tombstone(), _now, always_gc, gc_before);
update.cells().compact_and_expire(*_schema, column_kind::regular_column, tombstone(), _now, always_gc, gc_before);
for (auto&& v : _view_updates) {
v.generate_update(_updates.key(), update, existing, _now);
}
}
static void apply_tracked_tombstones(range_tombstone_accumulator& tracker, clustering_row& row) {
for (auto&& rt : tracker.range_tombstones_for_row(row.key())) {
row.apply(rt.tomb);
}
}
future<stop_iteration> view_update_builder::on_results() {
if (_update && _existing) {
int cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position());
if (cmp < 0) {
// We have an update where there was nothing before
if (_update->is_range_tombstone()) {
_update_tombstone_tracker.apply(std::move(_update->as_range_tombstone()));
} else {
auto& update = _update->as_clustering_row();
apply_tracked_tombstones(_update_tombstone_tracker, update);
auto tombstone = _existing_tombstone_tracker.current_tombstone();
auto existing = tombstone
? stdx::optional<clustering_row>(stdx::in_place, update.key(), std::move(tombstone), row_marker(), ::row())
: stdx::nullopt;
generate_update(std::move(update), std::move(existing));
}
return advance_updates();
}
if (cmp > 0) {
// We have something existing but no update (which will happen either because it's a range tombstone marker in
// existing, or because we've fetched the existing row due to some partition/range deletion in the updates)
if (_existing->is_range_tombstone()) {
_existing_tombstone_tracker.apply(std::move(_existing->as_range_tombstone()));
} else {
auto& existing = _existing->as_clustering_row();
apply_tracked_tombstones(_existing_tombstone_tracker, existing);
auto tombstone = _update_tombstone_tracker.current_tombstone();
// The way we build the read command used for existing rows, we should always have a non-empty
// tombstone, since we wouldn't have read the existing row otherwise. We don't assert that in case the
// read method ever changes.
if (tombstone) {
auto update = clustering_row(existing.key(), std::move(tombstone), row_marker(), ::row());
generate_update(std::move(update), { std::move(existing) });
}
}
return advance_existings();
}
// We're updating a row that had pre-existing data
if (_update->is_range_tombstone()) {
assert(_existing->is_range_tombstone());
_existing_tombstone_tracker.apply(std::move(_existing->as_range_tombstone()));
_update_tombstone_tracker.apply(std::move(_update->as_range_tombstone()));
} else {
assert(!_existing->is_range_tombstone());
apply_tracked_tombstones(_update_tombstone_tracker, _update->as_clustering_row());
apply_tracked_tombstones(_existing_tombstone_tracker, _existing->as_clustering_row());
generate_update(std::move(_update->as_clustering_row()), { std::move(_existing->as_clustering_row()) });
}
return advance_all();
}
auto tombstone = _update_tombstone_tracker.current_tombstone();
if (tombstone && _existing) {
// We don't care if it's a range tombstone, as we're only looking for existing entries that get deleted
if (!_existing->is_range_tombstone()) {
auto& existing = _existing->as_clustering_row();
auto update = clustering_row(existing.key(), std::move(tombstone), row_marker(), ::row());
generate_update(std::move(update), { std::move(existing) });
}
return advance_existings();
}
// If we have updates and it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it
if (_update && !_update->is_range_tombstone()) {
generate_update(std::move(_update->as_clustering_row()), { });
return advance_updates();
}
return stop();
}
future<std::vector<mutation>> generate_view_updates(
const schema_ptr& base,
std::vector<lw_shared_ptr<view>>&& views_to_update,
streamed_mutation&& updates,
streamed_mutation&& existings) {
auto vs = boost::copy_range<std::vector<view_updates>>(views_to_update | boost::adaptors::transformed([&] (auto&& v) {
return view_updates(std::move(v), base);
}));
auto builder = std::make_unique<view_update_builder>(base, std::move(vs), std::move(updates), std::move(existings));
auto f = builder->build();
return f.finally([builder = std::move(builder)] { });
}
} // namespace view
} // namespace db

View File

@@ -25,6 +25,7 @@
#include "gc_clock.hh"
#include "query-request.hh"
#include "schema.hh"
#include "streamed_mutation.hh"
#include "stdx.hh"
namespace cql3 {
@@ -114,6 +115,12 @@ private:
void set_base_non_pk_column_in_view_pk(const ::schema& base);
};
future<std::vector<mutation>> generate_view_updates(
const schema_ptr& base,
std::vector<lw_shared_ptr<view>>&& views_to_update,
streamed_mutation&& updates,
streamed_mutation&& existings);
}
}