diff --git a/db/view/view.cc b/db/view/view.cc index fc46feb89b..084bebd5b3 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -328,49 +328,20 @@ bool matches_view_filter(const schema& base, const view_info& view, const partit }); } -class view_updates final { - view_ptr _view; - const view_info& _view_info; - schema_ptr _base; - base_info_ptr _base_info; - std::unordered_map _updates; -public: - explicit view_updates(view_and_base vab) - : _view(std::move(vab.view)) - , _view_info(*_view->view_info()) - , _base(vab.base->base_schema()) - , _base_info(vab.base) - , _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view)) { - } +void view_updates::move_to(utils::chunked_vector& mutations) && { + std::transform(_updates.begin(), _updates.end(), std::back_inserter(mutations), [&, this] (auto&& m) { + auto mut = mutation(_view, dht::decorate_key(*_view, std::move(m.first)), std::move(m.second)); + return frozen_mutation_and_schema{freeze(mut), std::move(_view)}; + }); +} - void move_to(utils::chunked_vector& mutations) && { - std::transform(_updates.begin(), _updates.end(), std::back_inserter(mutations), [&, this] (auto&& m) { - auto mut = mutation(_view, dht::decorate_key(*_view, std::move(m.first)), std::move(m.second)); - return frozen_mutation_and_schema{freeze(mut), std::move(_view)}; - }); +mutation_partition& view_updates::partition_for(partition_key&& key) { + auto it = _updates.find(key); + if (it != _updates.end()) { + return it->second; } - - void generate_update(const partition_key& base_key, const clustering_row& update, const std::optional& existing, gc_clock::time_point now); -private: - mutation_partition& partition_for(partition_key&& key) { - auto it = _updates.find(key); - if (it != _updates.end()) { - return it->second; - } - return _updates.emplace(std::move(key), mutation_partition(_view)).first->second; - } - row_marker compute_row_marker(const clustering_row& base_row) const; - deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update); - bool can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const; - void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now); - void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now); - void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now); - void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now); - void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) { - create_entry(base_key, update, now); - delete_old_entry(base_key, existing, update, now); - } -}; + return _updates.emplace(std::move(key), mutation_partition(_view)).first->second; +} row_marker view_updates::compute_row_marker(const clustering_row& base_row) const { /* @@ -831,73 +802,39 @@ void view_updates::generate_update( } } -class view_update_builder { - schema_ptr _schema; // The base schema - std::vector _view_updates; - flat_mutation_reader _updates; - flat_mutation_reader_opt _existings; - range_tombstone_accumulator _update_tombstone_tracker; - range_tombstone_accumulator _existing_tombstone_tracker; - mutation_fragment_opt _update; - mutation_fragment_opt _existing; - gc_clock::time_point _now; - partition_key _key = partition_key::make_empty(); -public: +future<> view_update_builder::close() noexcept { + return when_all_succeed(_updates.close(), _existings->close()).discard_result(); +} - view_update_builder(schema_ptr s, - std::vector&& views_to_update, - flat_mutation_reader&& updates, - flat_mutation_reader_opt&& existings, - gc_clock::time_point now) - : _schema(std::move(s)) - , _view_updates(std::move(views_to_update)) - , _updates(std::move(updates)) - , _existings(std::move(existings)) - , _update_tombstone_tracker(*_schema, false) - , _existing_tombstone_tracker(*_schema, false) - , _now(now) { +future view_update_builder::advance_all() { + auto existings_f = _existings ? (*_existings)(db::no_timeout) : make_ready_future>(); + return when_all(_updates(db::no_timeout), std::move(existings_f)).then([this] (auto&& fragments) mutable { + _update = std::move(std::get<0>(fragments).get0()); + _existing = std::move(std::get<1>(fragments).get0()); + return stop_iteration::no; + }); +} + +future view_update_builder::advance_updates() { + return _updates(db::no_timeout).then([this] (auto&& update) mutable { + _update = std::move(update); + return stop_iteration::no; + }); +} + +future view_update_builder::advance_existings() { + if (!_existings) { + return make_ready_future(stop_iteration::no); } + return (*_existings)(db::no_timeout).then([this] (auto&& existing) mutable { + _existing = std::move(existing); + return stop_iteration::no; + }); +} - future> build(); - - future<> close() noexcept { - return when_all_succeed(_updates.close(), _existings->close()).discard_result(); - } - -private: - void generate_update(clustering_row&& update, std::optional&& existing); - future on_results(); - - future advance_all() { - auto existings_f = _existings ? (*_existings)(db::no_timeout) : make_ready_future>(); - return when_all(_updates(db::no_timeout), std::move(existings_f)).then([this] (auto&& fragments) mutable { - _update = std::move(std::get<0>(fragments).get0()); - _existing = std::move(std::get<1>(fragments).get0()); - return stop_iteration::no; - }); - } - - future advance_updates() { - return _updates(db::no_timeout).then([this] (auto&& update) mutable { - _update = std::move(update); - return stop_iteration::no; - }); - } - - future advance_existings() { - if (!_existings) { - return make_ready_future(stop_iteration::no); - } - return (*_existings)(db::no_timeout).then([this] (auto&& existing) mutable { - _existing = std::move(existing); - return stop_iteration::no; - }); - } - - future stop() const { - return make_ready_future(stop_iteration::yes); - } -}; +future view_update_builder::stop() const { + return make_ready_future(stop_iteration::yes); +} future> view_update_builder::build() { return advance_all().then([this] (auto&& ignored) { diff --git a/db/view/view.hh b/db/view/view.hh index bd70d55bce..eedc2adf4c 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -134,6 +134,82 @@ bool matches_view_filter(const schema& base, const view_info& view, const partit bool clustering_prefix_matches(const schema& base, const partition_key& key, const clustering_key_prefix& ck, gc_clock::time_point now); +class view_updates final { + view_ptr _view; + const view_info& _view_info; + schema_ptr _base; + base_info_ptr _base_info; + std::unordered_map _updates; +public: + explicit view_updates(view_and_base vab) + : _view(std::move(vab.view)) + , _view_info(*_view->view_info()) + , _base(vab.base->base_schema()) + , _base_info(vab.base) + , _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view)) { + } + + void move_to(utils::chunked_vector& mutations) &&; + + void generate_update(const partition_key& base_key, const clustering_row& update, const std::optional& existing, gc_clock::time_point now); +private: + mutation_partition& partition_for(partition_key&& key); + row_marker compute_row_marker(const clustering_row& base_row) const; + deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update); + bool can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const; + void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now); + void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now); + void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now); + void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now); + void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) { + create_entry(base_key, update, now); + delete_old_entry(base_key, existing, update, now); + } +}; + +class view_update_builder { + schema_ptr _schema; // The base schema + std::vector _view_updates; + flat_mutation_reader _updates; + flat_mutation_reader_opt _existings; + range_tombstone_accumulator _update_tombstone_tracker; + range_tombstone_accumulator _existing_tombstone_tracker; + mutation_fragment_opt _update; + mutation_fragment_opt _existing; + gc_clock::time_point _now; + partition_key _key = partition_key::make_empty(); +public: + + view_update_builder(schema_ptr s, + std::vector&& views_to_update, + flat_mutation_reader&& updates, + flat_mutation_reader_opt&& existings, + gc_clock::time_point now) + : _schema(std::move(s)) + , _view_updates(std::move(views_to_update)) + , _updates(std::move(updates)) + , _existings(std::move(existings)) + , _update_tombstone_tracker(*_schema, false) + , _existing_tombstone_tracker(*_schema, false) + , _now(now) { + } + view_update_builder(view_update_builder&& other) noexcept = default; + + future> build(); + + future<> close() noexcept; + +private: + void generate_update(clustering_row&& update, std::optional&& existing); + future on_results(); + + future advance_all(); + future advance_updates(); + future advance_existings(); + + future stop() const; +}; + future> generate_view_updates( const schema_ptr& base, std::vector&& views_to_update,