db,view: move view_update_builder to the header
The builder is going to be used directly by the callers, which requires making its definition public. No semantic changes were intended.
This commit is contained in:
147
db/view/view.cc
147
db/view/view.cc
@@ -328,49 +328,20 @@ bool matches_view_filter(const schema& base, const view_info& view, const partit
|
||||
});
|
||||
}
|
||||
|
||||
class view_updates final {
|
||||
view_ptr _view;
|
||||
const view_info& _view_info;
|
||||
schema_ptr _base;
|
||||
base_info_ptr _base_info;
|
||||
std::unordered_map<partition_key, mutation_partition, partition_key::hashing, partition_key::equality> _updates;
|
||||
public:
|
||||
explicit view_updates(view_and_base vab)
|
||||
: _view(std::move(vab.view))
|
||||
, _view_info(*_view->view_info())
|
||||
, _base(vab.base->base_schema())
|
||||
, _base_info(vab.base)
|
||||
, _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view)) {
|
||||
}
|
||||
void view_updates::move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations) && {
|
||||
std::transform(_updates.begin(), _updates.end(), std::back_inserter(mutations), [&, this] (auto&& m) {
|
||||
auto mut = mutation(_view, dht::decorate_key(*_view, std::move(m.first)), std::move(m.second));
|
||||
return frozen_mutation_and_schema{freeze(mut), std::move(_view)};
|
||||
});
|
||||
}
|
||||
|
||||
void move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations) && {
|
||||
std::transform(_updates.begin(), _updates.end(), std::back_inserter(mutations), [&, this] (auto&& m) {
|
||||
auto mut = mutation(_view, dht::decorate_key(*_view, std::move(m.first)), std::move(m.second));
|
||||
return frozen_mutation_and_schema{freeze(mut), std::move(_view)};
|
||||
});
|
||||
mutation_partition& view_updates::partition_for(partition_key&& key) {
|
||||
auto it = _updates.find(key);
|
||||
if (it != _updates.end()) {
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void generate_update(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
|
||||
private:
|
||||
mutation_partition& partition_for(partition_key&& key) {
|
||||
auto it = _updates.find(key);
|
||||
if (it != _updates.end()) {
|
||||
return it->second;
|
||||
}
|
||||
return _updates.emplace(std::move(key), mutation_partition(_view)).first->second;
|
||||
}
|
||||
row_marker compute_row_marker(const clustering_row& base_row) const;
|
||||
deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update);
|
||||
bool can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const;
|
||||
void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now);
|
||||
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now);
|
||||
void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
|
||||
create_entry(base_key, update, now);
|
||||
delete_old_entry(base_key, existing, update, now);
|
||||
}
|
||||
};
|
||||
return _updates.emplace(std::move(key), mutation_partition(_view)).first->second;
|
||||
}
|
||||
|
||||
row_marker view_updates::compute_row_marker(const clustering_row& base_row) const {
|
||||
/*
|
||||
@@ -831,73 +802,39 @@ void view_updates::generate_update(
|
||||
}
|
||||
}
|
||||
|
||||
class view_update_builder {
|
||||
schema_ptr _schema; // The base schema
|
||||
std::vector<view_updates> _view_updates;
|
||||
flat_mutation_reader _updates;
|
||||
flat_mutation_reader_opt _existings;
|
||||
range_tombstone_accumulator _update_tombstone_tracker;
|
||||
range_tombstone_accumulator _existing_tombstone_tracker;
|
||||
mutation_fragment_opt _update;
|
||||
mutation_fragment_opt _existing;
|
||||
gc_clock::time_point _now;
|
||||
partition_key _key = partition_key::make_empty();
|
||||
public:
|
||||
future<> view_update_builder::close() noexcept {
|
||||
return when_all_succeed(_updates.close(), _existings->close()).discard_result();
|
||||
}
|
||||
|
||||
view_update_builder(schema_ptr s,
|
||||
std::vector<view_updates>&& views_to_update,
|
||||
flat_mutation_reader&& updates,
|
||||
flat_mutation_reader_opt&& existings,
|
||||
gc_clock::time_point now)
|
||||
: _schema(std::move(s))
|
||||
, _view_updates(std::move(views_to_update))
|
||||
, _updates(std::move(updates))
|
||||
, _existings(std::move(existings))
|
||||
, _update_tombstone_tracker(*_schema, false)
|
||||
, _existing_tombstone_tracker(*_schema, false)
|
||||
, _now(now) {
|
||||
future<stop_iteration> view_update_builder::advance_all() {
|
||||
auto existings_f = _existings ? (*_existings)(db::no_timeout) : make_ready_future<optimized_optional<mutation_fragment>>();
|
||||
return when_all(_updates(db::no_timeout), std::move(existings_f)).then([this] (auto&& fragments) mutable {
|
||||
_update = std::move(std::get<0>(fragments).get0());
|
||||
_existing = std::move(std::get<1>(fragments).get0());
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
future<stop_iteration> view_update_builder::advance_updates() {
|
||||
return _updates(db::no_timeout).then([this] (auto&& update) mutable {
|
||||
_update = std::move(update);
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
future<stop_iteration> view_update_builder::advance_existings() {
|
||||
if (!_existings) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
return (*_existings)(db::no_timeout).then([this] (auto&& existing) mutable {
|
||||
_existing = std::move(existing);
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<frozen_mutation_and_schema>> build();
|
||||
|
||||
future<> close() noexcept {
|
||||
return when_all_succeed(_updates.close(), _existings->close()).discard_result();
|
||||
}
|
||||
|
||||
private:
|
||||
void generate_update(clustering_row&& update, std::optional<clustering_row>&& existing);
|
||||
future<stop_iteration> on_results();
|
||||
|
||||
future<stop_iteration> advance_all() {
|
||||
auto existings_f = _existings ? (*_existings)(db::no_timeout) : make_ready_future<optimized_optional<mutation_fragment>>();
|
||||
return when_all(_updates(db::no_timeout), std::move(existings_f)).then([this] (auto&& fragments) mutable {
|
||||
_update = std::move(std::get<0>(fragments).get0());
|
||||
_existing = std::move(std::get<1>(fragments).get0());
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
future<stop_iteration> advance_updates() {
|
||||
return _updates(db::no_timeout).then([this] (auto&& update) mutable {
|
||||
_update = std::move(update);
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
future<stop_iteration> advance_existings() {
|
||||
if (!_existings) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
return (*_existings)(db::no_timeout).then([this] (auto&& existing) mutable {
|
||||
_existing = std::move(existing);
|
||||
return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
future<stop_iteration> stop() const {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
};
|
||||
future<stop_iteration> view_update_builder::stop() const {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<frozen_mutation_and_schema>> view_update_builder::build() {
|
||||
return advance_all().then([this] (auto&& ignored) {
|
||||
|
||||
@@ -134,6 +134,82 @@ bool matches_view_filter(const schema& base, const view_info& view, const partit
|
||||
|
||||
bool clustering_prefix_matches(const schema& base, const partition_key& key, const clustering_key_prefix& ck, gc_clock::time_point now);
|
||||
|
||||
class view_updates final {
|
||||
view_ptr _view;
|
||||
const view_info& _view_info;
|
||||
schema_ptr _base;
|
||||
base_info_ptr _base_info;
|
||||
std::unordered_map<partition_key, mutation_partition, partition_key::hashing, partition_key::equality> _updates;
|
||||
public:
|
||||
explicit view_updates(view_and_base vab)
|
||||
: _view(std::move(vab.view))
|
||||
, _view_info(*_view->view_info())
|
||||
, _base(vab.base->base_schema())
|
||||
, _base_info(vab.base)
|
||||
, _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view)) {
|
||||
}
|
||||
|
||||
void move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations) &&;
|
||||
|
||||
void generate_update(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
|
||||
private:
|
||||
mutation_partition& partition_for(partition_key&& key);
|
||||
row_marker compute_row_marker(const clustering_row& base_row) const;
|
||||
deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update);
|
||||
bool can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const;
|
||||
void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now);
|
||||
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now);
|
||||
void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
|
||||
create_entry(base_key, update, now);
|
||||
delete_old_entry(base_key, existing, update, now);
|
||||
}
|
||||
};
|
||||
|
||||
class view_update_builder {
|
||||
schema_ptr _schema; // The base schema
|
||||
std::vector<view_updates> _view_updates;
|
||||
flat_mutation_reader _updates;
|
||||
flat_mutation_reader_opt _existings;
|
||||
range_tombstone_accumulator _update_tombstone_tracker;
|
||||
range_tombstone_accumulator _existing_tombstone_tracker;
|
||||
mutation_fragment_opt _update;
|
||||
mutation_fragment_opt _existing;
|
||||
gc_clock::time_point _now;
|
||||
partition_key _key = partition_key::make_empty();
|
||||
public:
|
||||
|
||||
view_update_builder(schema_ptr s,
|
||||
std::vector<view_updates>&& views_to_update,
|
||||
flat_mutation_reader&& updates,
|
||||
flat_mutation_reader_opt&& existings,
|
||||
gc_clock::time_point now)
|
||||
: _schema(std::move(s))
|
||||
, _view_updates(std::move(views_to_update))
|
||||
, _updates(std::move(updates))
|
||||
, _existings(std::move(existings))
|
||||
, _update_tombstone_tracker(*_schema, false)
|
||||
, _existing_tombstone_tracker(*_schema, false)
|
||||
, _now(now) {
|
||||
}
|
||||
view_update_builder(view_update_builder&& other) noexcept = default;
|
||||
|
||||
future<utils::chunked_vector<frozen_mutation_and_schema>> build();
|
||||
|
||||
future<> close() noexcept;
|
||||
|
||||
private:
|
||||
void generate_update(clustering_row&& update, std::optional<clustering_row>&& existing);
|
||||
future<stop_iteration> on_results();
|
||||
|
||||
future<stop_iteration> advance_all();
|
||||
future<stop_iteration> advance_updates();
|
||||
future<stop_iteration> advance_existings();
|
||||
|
||||
future<stop_iteration> stop() const;
|
||||
};
|
||||
|
||||
future<utils::chunked_vector<frozen_mutation_and_schema>> generate_view_updates(
|
||||
const schema_ptr& base,
|
||||
std::vector<view_and_base>&& views_to_update,
|
||||
|
||||
Reference in New Issue
Block a user