db/view: migrate view_update_builder to v2
To avoid noise, the interface is left as v1 and inbound readers are converted in the constructor.
This commit is contained in:
@@ -875,7 +875,7 @@ future<> view_update_builder::close() noexcept {
|
||||
}
|
||||
|
||||
future<stop_iteration> view_update_builder::advance_all() {
|
||||
auto existings_f = _existings ? (*_existings)() : make_ready_future<optimized_optional<mutation_fragment>>();
|
||||
auto existings_f = _existings ? (*_existings)() : make_ready_future<mutation_fragment_v2_opt>();
|
||||
return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable {
|
||||
_update = std::move(std::get<0>(fragments).get0());
|
||||
_existing = std::move(std::get<1>(fragments).get0());
|
||||
@@ -910,11 +910,11 @@ future<utils::chunked_vector<frozen_mutation_and_schema>> view_update_builder::b
|
||||
bool do_advance_existings = false;
|
||||
if (_update && _update->is_partition_start()) {
|
||||
_key = std::move(std::move(_update)->as_partition_start().key().key());
|
||||
_update_tombstone_tracker.set_partition_tombstone(_update->as_partition_start().partition_tombstone());
|
||||
_update_partition_tombstone = _update->as_partition_start().partition_tombstone();
|
||||
do_advance_updates = true;
|
||||
}
|
||||
if (_existing && _existing->is_partition_start()) {
|
||||
_existing_tombstone_tracker.set_partition_tombstone(_existing->as_partition_start().partition_tombstone());
|
||||
_existing_partition_tombstone = _existing->as_partition_start().partition_tombstone();
|
||||
do_advance_existings = true;
|
||||
}
|
||||
if (do_advance_updates) {
|
||||
@@ -960,10 +960,6 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional
|
||||
}
|
||||
}
|
||||
|
||||
static void apply_tracked_tombstones(range_tombstone_accumulator& tracker, clustering_row& row) {
|
||||
row.apply(tracker.tombstone_for_row(row.key()));
|
||||
}
|
||||
|
||||
future<stop_iteration> view_update_builder::on_results() {
|
||||
constexpr size_t max_rows_for_view_updates = 100;
|
||||
size_t rows_for_view_updates = std::accumulate(_view_updates.begin(), _view_updates.end(), 0, [] (size_t acc, const view_updates& vu) {
|
||||
@@ -975,12 +971,12 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
auto cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position());
|
||||
if (cmp < 0) {
|
||||
// We have an update where there was nothing before
|
||||
if (_update->is_range_tombstone()) {
|
||||
_update_tombstone_tracker.apply(std::move(_update->as_range_tombstone()));
|
||||
if (_update->is_range_tombstone_change()) {
|
||||
_update_current_tombstone = _update->as_range_tombstone_change().tombstone();
|
||||
} else if (_update->is_clustering_row()) {
|
||||
auto update = std::move(*_update).as_clustering_row();
|
||||
apply_tracked_tombstones(_update_tombstone_tracker, update);
|
||||
auto tombstone = _existing_tombstone_tracker.current_tombstone();
|
||||
update.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
||||
auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone);
|
||||
auto existing = tombstone
|
||||
? std::optional<clustering_row>(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row())
|
||||
: std::nullopt;
|
||||
@@ -991,12 +987,12 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
if (cmp > 0) {
|
||||
// We have something existing but no update (which will happen either because it's a range tombstone marker in
|
||||
// existing, or because we've fetched the existing row due to some partition/range deletion in the updates)
|
||||
if (_existing->is_range_tombstone()) {
|
||||
_existing_tombstone_tracker.apply(std::move(_existing->as_range_tombstone()));
|
||||
if (_existing->is_range_tombstone_change()) {
|
||||
_existing_current_tombstone = _existing->as_range_tombstone_change().tombstone();
|
||||
} else if (_existing->is_clustering_row()) {
|
||||
auto existing = std::move(*_existing).as_clustering_row();
|
||||
apply_tracked_tombstones(_existing_tombstone_tracker, existing);
|
||||
auto tombstone = _update_tombstone_tracker.current_tombstone();
|
||||
existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
|
||||
auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone);
|
||||
// The way we build the read command used for existing rows, we should always have a non-empty
|
||||
// tombstone, since we wouldn't have read the existing row otherwise. We don't assert that in case the
|
||||
// read method ever changes.
|
||||
@@ -1008,24 +1004,24 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
return stop_updates ? stop () : advance_existings();
|
||||
}
|
||||
// We're updating a row that had pre-existing data
|
||||
if (_update->is_range_tombstone()) {
|
||||
assert(_existing->is_range_tombstone());
|
||||
_existing_tombstone_tracker.apply(std::move(*_existing).as_range_tombstone());
|
||||
_update_tombstone_tracker.apply(std::move(*_update).as_range_tombstone());
|
||||
if (_update->is_range_tombstone_change()) {
|
||||
assert(_existing->is_range_tombstone_change());
|
||||
_existing_current_tombstone = std::move(*_existing).as_range_tombstone_change().tombstone();
|
||||
_update_current_tombstone = std::move(*_update).as_range_tombstone_change().tombstone();
|
||||
} else if (_update->is_clustering_row()) {
|
||||
assert(_existing->is_clustering_row());
|
||||
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
|
||||
apply_tracked_tombstones(_update_tombstone_tracker, cr);
|
||||
cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
||||
});
|
||||
_existing->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
|
||||
apply_tracked_tombstones(_existing_tombstone_tracker, cr);
|
||||
cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
|
||||
});
|
||||
generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() });
|
||||
}
|
||||
return stop_updates ? stop() : advance_all();
|
||||
}
|
||||
|
||||
auto tombstone = _update_tombstone_tracker.current_tombstone();
|
||||
auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone);
|
||||
if (tombstone && _existing && !_existing->is_end_of_partition()) {
|
||||
// We don't care if it's a range tombstone, as we're only looking for existing entries that get deleted
|
||||
if (_existing->is_clustering_row()) {
|
||||
@@ -1040,9 +1036,9 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
if (_update && !_update->is_end_of_partition()) {
|
||||
if (_update->is_clustering_row()) {
|
||||
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
|
||||
apply_tracked_tombstones(_update_tombstone_tracker, cr);
|
||||
cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
||||
});
|
||||
auto existing_tombstone = _existing_tombstone_tracker.current_tombstone();
|
||||
auto existing_tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone);
|
||||
auto existing = existing_tombstone
|
||||
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
|
||||
: std::nullopt;
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
#include "query-request.hh"
|
||||
#include "schema_fwd.hh"
|
||||
#include "readers/flat_mutation_reader.hh"
|
||||
#include "readers/flat_mutation_reader_v2.hh"
|
||||
#include "readers/conversion.hh"
|
||||
#include "frozen_mutation.hh"
|
||||
|
||||
class frozen_mutation_and_schema;
|
||||
@@ -163,12 +165,14 @@ private:
|
||||
class view_update_builder {
|
||||
schema_ptr _schema; // The base schema
|
||||
std::vector<view_updates> _view_updates;
|
||||
flat_mutation_reader _updates;
|
||||
flat_mutation_reader_opt _existings;
|
||||
range_tombstone_accumulator _update_tombstone_tracker;
|
||||
range_tombstone_accumulator _existing_tombstone_tracker;
|
||||
mutation_fragment_opt _update;
|
||||
mutation_fragment_opt _existing;
|
||||
flat_mutation_reader_v2 _updates;
|
||||
flat_mutation_reader_v2_opt _existings;
|
||||
tombstone _update_partition_tombstone;
|
||||
tombstone _update_current_tombstone;
|
||||
tombstone _existing_partition_tombstone;
|
||||
tombstone _existing_current_tombstone;
|
||||
mutation_fragment_v2_opt _update;
|
||||
mutation_fragment_v2_opt _existing;
|
||||
gc_clock::time_point _now;
|
||||
partition_key _key = partition_key::make_empty();
|
||||
public:
|
||||
@@ -180,10 +184,8 @@ public:
|
||||
gc_clock::time_point now)
|
||||
: _schema(std::move(s))
|
||||
, _view_updates(std::move(views_to_update))
|
||||
, _updates(std::move(updates))
|
||||
, _existings(std::move(existings))
|
||||
, _update_tombstone_tracker(*_schema)
|
||||
, _existing_tombstone_tracker(*_schema)
|
||||
, _updates(upgrade_to_v2(std::move(updates)))
|
||||
, _existings(existings ? upgrade_to_v2(std::move(*existings)) : flat_mutation_reader_v2_opt{})
|
||||
, _now(now) {
|
||||
}
|
||||
view_update_builder(view_update_builder&& other) noexcept = default;
|
||||
|
||||
Reference in New Issue
Block a user