diff --git a/db/view/view.cc b/db/view/view.cc
index 9f9ad1ae4a..ef6bcdeeed 100644
--- a/db/view/view.cc
+++ b/db/view/view.cc
@@ -39,22 +39,33 @@
* along with Scylla. If not, see .
*/
-#include
#include
+#include
+#include
+#include
+#include
+#include
#include
#include
+#include
+
+#include "database.hh"
#include "clustering_bounds_comparator.hh"
#include "cql3/statements/select_statement.hh"
#include "cql3/util.hh"
#include "db/view/view.hh"
+#include "db/view/view_builder.hh"
#include "gms/inet_address.hh"
#include "keys.hh"
#include "locator/network_topology_strategy.hh"
+#include "service/migration_manager.hh"
#include "service/storage_service.hh"
#include "view_info.hh"
+using namespace std::chrono_literals;
+
static logging::logger vlogger("view");
view_info::view_info(const schema& schema, const raw_view_info& raw_view_info)
@@ -904,6 +915,261 @@ future<> mutate_MV(const dht::token& base_token,
return f.finally([fs = std::move(fs)] { });
}
+view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_dist_ks)
+ : _db(db)
+ , _sys_dist_ks(sys_dist_ks) {
+}
+
+future<> view_builder::start() {
+ return seastar::async([this] {
+ auto built = system_keyspace::load_built_views().get0();
+ auto in_progress = system_keyspace::load_view_build_progress().get0();
+ calculate_shard_build_step(std::move(built), std::move(in_progress)).get();
+ _current_step = _base_to_build_step.begin();
+ _build_step.trigger();
+ });
+}
+
+future<> view_builder::stop() {
+ vlogger.info("Stopping view builder");
+ return _build_step.join();
+}
+
+static query::partition_slice make_partition_slice(const schema& s) {
+ query::partition_slice::option_set opts;
+ opts.set(query::partition_slice::option::send_partition_key);
+ opts.set(query::partition_slice::option::send_clustering_key);
+ opts.set(query::partition_slice::option::send_timestamp);
+ opts.set(query::partition_slice::option::send_ttl);
+ return query::partition_slice(
+ {query::full_clustering_range},
+ { },
+ boost::copy_range>(s.regular_columns()
+ | boost::adaptors::transformed(std::mem_fn(&column_definition::id))),
+ std::move(opts));
+}
+
+view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID base_id) {
+ auto it = _base_to_build_step.find(base_id);
+ if (it == _base_to_build_step.end()) {
+ auto base = _db.find_column_family(base_id).shared_from_this();
+ auto p = _base_to_build_step.emplace(base_id, build_step{base, make_partition_slice(*base->schema())});
+ // Iterators could have been invalidated if there was rehashing, so just reset the cursor.
+ _current_step = p.first;
+ it = p.first;
+ }
+ return it->second;
+}
+
+void view_builder::initialize_reader_at_current_token(build_step& step) {
+ step.pslice = make_partition_slice(*step.base->schema());
+ step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max());
+ step.reader = make_local_shard_sstable_reader(
+ step.base->schema(),
+ make_lw_shared(sstables::sstable_set(step.base->get_sstable_set())),
+ step.prange,
+ step.pslice,
+ default_priority_class(),
+ no_resource_tracking(),
+ nullptr,
+ streamed_mutation::forwarding::no,
+ mutation_reader::forwarding::no);
+}
+
+void view_builder::load_view_status(view_builder::view_build_status status, std::unordered_set& loaded_views) {
+ if (!status.next_token) {
+ // No progress was made on this view, so we'll treat it as new.
+ return;
+ }
+ vlogger.info0("Resuming to build view {}.{} at {}", status.view->ks_name(), status.view->cf_name(), *status.next_token);
+ loaded_views.insert(status.view->id());
+ if (status.first_token == *status.next_token) {
+ // Completed, so nothing to do for this shard. Consider the view
+ // as loaded and not as a new view.
+ _built_views.emplace(status.view->id());
+ return;
+ }
+ get_or_create_build_step(status.view->view_info()->base_id()).build_status.emplace_back(std::move(status));
+}
+
+void view_builder::reshard(
+ std::vector> view_build_status_per_shard,
+ std::unordered_set& loaded_views) {
+ // We must reshard. We aim for a simple algorithm, a step above not starting from scratch.
+ // Shards build entries at different paces, so both first and last tokens will differ. We
+ // want to be conservative when selecting the range that has been built. To do that, we
+ // select the intersection of all the previous shard's ranges for each view.
+ struct view_ptr_hash {
+ std::size_t operator()(const view_ptr& v) const noexcept {
+ return std::hash()(v->id());
+ }
+ };
+ struct view_ptr_equals {
+ bool operator()(const view_ptr& v1, const view_ptr& v2) const noexcept {
+ return v1->id() == v2->id();
+ }
+ };
+ std::unordered_map>, view_ptr_hash, view_ptr_equals> my_status;
+ for (auto& shard_status : view_build_status_per_shard) {
+ for (auto& [view, first_token, next_token] : shard_status ) {
+ // We start from an open-ended range, which we'll try to restrict.
+ auto& my_range = my_status.emplace(
+ std::move(view),
+ nonwrapping_range::make_open_ended_both_sides()).first->second;
+ if (!next_token || !my_range) {
+ // A previous shard made no progress, so for this view we'll start over.
+ my_range = stdx::nullopt;
+ continue;
+ }
+ if (first_token == *next_token) {
+ // Completed, so don't consider this shard's progress. We know that if the view
+ // is marked as in-progress, then at least one shard will have a non-full range.
+ continue;
+ }
+ wrapping_range other_range(first_token, *next_token);
+ if (other_range.is_wrap_around(dht::token_comparator())) {
+ // The intersection of a wrapping range with a non-wrapping range may yield more
+ // multiple non-contiguous ranges. To avoid the complexity of dealing with more
+ // than one range, we'll just take one of the intersections.
+ auto [bottom_range, top_range] = other_range.unwrap();
+ if (auto bottom_int = my_range->intersection(nonwrapping_range(std::move(bottom_range)), dht::token_comparator())) {
+ my_range = std::move(bottom_int);
+ } else {
+ my_range = my_range->intersection(nonwrapping_range(std::move(top_range)), dht::token_comparator());
+ }
+ } else {
+ my_range = my_range->intersection(nonwrapping_range(std::move(other_range)), dht::token_comparator());
+ }
+ }
+ }
+ view_builder::base_to_build_step_type build_step;
+ for (auto& [view, opt_range] : my_status) {
+ if (!opt_range) {
+ continue; // Treat it as a new table.
+ }
+ auto start_bound = opt_range->start() ? std::move(opt_range->start()->value()) : dht::minimum_token();
+ auto end_bound = opt_range->end() ? std::move(opt_range->end()->value()) : dht::minimum_token();
+ auto s = view_build_status{std::move(view), std::move(start_bound), std::move(end_bound)};
+ load_view_status(std::move(s), loaded_views);
+ }
+}
+
+future<> view_builder::calculate_shard_build_step(
+ std::vector built,
+ std::vector in_progress) {
+ // Shard 0 makes cleanup changes to the system tables, but none that could conflict
+ // with the other shards; everyone is thus able to proceed independently.
+ auto bookkeeping_ops = std::make_unique>>();
+ auto base_table_exists = [&, this] (const view_ptr& view) {
+ // This is a safety check in case this node missed a create MV statement
+ // but got a drop table for the base, and another node didn't get the
+ // drop notification and sent us the view schema.
+ try {
+ _db.find_schema(view->view_info()->base_id());
+ return true;
+ } catch (const no_such_column_family&) {
+ return false;
+ }
+ };
+ auto maybe_fetch_view = [&, this] (system_keyspace::view_name& name) {
+ try {
+ auto s = _db.find_schema(name.first, name.second);
+ if (s->is_view()) {
+ auto view = view_ptr(std::move(s));
+ if (base_table_exists(view)) {
+ return view;
+ }
+ }
+ // The view was dropped and a table was re-created with the same name,
+ // but the write to the view-related system tables didn't make it.
+ } catch (const no_such_column_family&) {
+ // Fall-through
+ }
+ if (engine().cpu_id() == 0) {
+ bookkeeping_ops->push_back(_sys_dist_ks.remove_view(name.first, name.second));
+ bookkeeping_ops->push_back(system_keyspace::remove_built_view(name.first, name.second));
+ bookkeeping_ops->push_back(
+ system_keyspace::remove_view_build_progress_across_all_shards(
+ std::move(name.first),
+ std::move(name.second)));
+ }
+ return view_ptr(nullptr);
+ };
+
+ auto built_views = boost::copy_range>(built
+ | boost::adaptors::transformed(maybe_fetch_view)
+ | boost::adaptors::filtered([] (const view_ptr& v) { return bool(v); })
+ | boost::adaptors::transformed([] (const view_ptr& v) { return v->id(); }));
+
+ std::vector> view_build_status_per_shard;
+ for (auto& [view_name, first_token, next_token_opt, cpu_id] : in_progress) {
+ if (auto view = maybe_fetch_view(view_name)) {
+ if (built_views.find(view->id()) != built_views.end()) {
+ if (engine().cpu_id() == 0) {
+ auto f = _sys_dist_ks.finish_view_build(std::move(view_name.first), std::move(view_name.second)).then([view = std::move(view)] {
+ system_keyspace::remove_view_build_progress_across_all_shards(view->cf_name(), view->ks_name());
+ });
+ bookkeeping_ops->push_back(std::move(f));
+ }
+ continue;
+ }
+ view_build_status_per_shard.resize(std::max(view_build_status_per_shard.size(), size_t(cpu_id + 1)));
+ view_build_status_per_shard[cpu_id].emplace_back(view_build_status{
+ std::move(view),
+ std::move(first_token),
+ std::move(next_token_opt)});
+ }
+ }
+
+ std::unordered_set loaded_views;
+ if (view_build_status_per_shard.size() != smp::count) {
+ reshard(std::move(view_build_status_per_shard), loaded_views);
+ } else if (!view_build_status_per_shard.empty()) {
+ for (auto& status : view_build_status_per_shard[engine().cpu_id()]) {
+ load_view_status(std::move(status), loaded_views);
+ }
+ }
+
+ for (auto& [_, build_step] : _base_to_build_step) {
+ boost::sort(build_step.build_status, [] (view_build_status s1, view_build_status s2) {
+ return *s1.next_token < *s2.next_token;
+ });
+ if (!build_step.build_status.empty()) {
+ build_step.current_key = dht::decorated_key{*build_step.build_status.front().next_token, partition_key::make_empty()};
+ }
+ }
+
+ auto all_views = _db.get_views();
+ auto is_new = [&] (const view_ptr& v) {
+ return base_table_exists(v) && loaded_views.find(v->id()) == loaded_views.end()
+ && built_views.find(v->id()) == built_views.end();
+ };
+ for (auto&& view : all_views | boost::adaptors::filtered(is_new)) {
+ bookkeeping_ops->push_back(add_new_view(view, get_or_create_build_step(view->view_info()->base_id())));
+ }
+
+ for (auto& [_, build_step] : _base_to_build_step) {
+ initialize_reader_at_current_token(build_step);
+ }
+
+ auto f = seastar::when_all_succeed(bookkeeping_ops->begin(), bookkeeping_ops->end());
+ return f.handle_exception([bookkeeping_ops = std::move(bookkeeping_ops)] (std::exception_ptr ep) {
+ vlogger.error("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep);
+ });
+}
+
+future<> view_builder::add_new_view(view_ptr view, build_step& step) {
+ vlogger.info0("Building view {}.{}, starting at token {}", view->ks_name(), view->cf_name(), step.current_token());
+ step.build_status.emplace(step.build_status.begin(), view_build_status{view, step.current_token(), std::nullopt});
+ return when_all_succeed(
+ system_keyspace::register_view_for_building(view->ks_name(), view->cf_name(), step.current_token()),
+ _sys_dist_ks.start_view_build(view->ks_name(), view->cf_name()));
+}
+
+future<> view_builder::do_build_step() {
+ return make_ready_future<>();
+}
+
} // namespace view
} // namespace db
diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh
new file mode 100644
index 0000000000..f75365f1ed
--- /dev/null
+++ b/db/view/view_builder.hh
@@ -0,0 +1,162 @@
+/*
+ * Copyright (C) 2018 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#pragma once
+
+#include "database_fwd.hh"
+#include "db/system_keyspace.hh"
+#include "db/system_distributed_keyspace.hh"
+#include "dht/i_partitioner.hh"
+#include "keys.hh"
+#include "query-request.hh"
+#include "service/migration_listener.hh"
+#include "service/migration_manager.hh"
+#include "sstables/sstable_set.hh"
+#include "utils/exponential_backoff_retry.hh"
+#include "utils/serialized_action.hh"
+#include "utils/UUID.hh"
+
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+namespace db::view {
+
+/**
+ * The view_builder is a sharded service responsible for building all defined materialized views.
+ * This process entails walking over the existing data in a given base table, and using it to
+ * calculate and insert the respective entries for one or more views.
+ *
+ * We employ a flat_mutation_reader for each base table for which we're building views.
+ *
+ * View building is necessarily a sharded process. That means that on restart, if the number of shards
+ * has changed, we need to calculate the most conservative token range that has been built, and build
+ * the remainder.
+ *
+ * Interaction with the system tables:
+ * - When we start building a view, we add an entry to the scylla_views_builds_in_progress
+ * system table. If the node restarts at this point, we'll consider these newly inserted
+ * views as having made no progress, and we'll treat them as new views;
+ * - When we finish a build step, we update the progress of the views that we built during
+ * this step by writing the next token to the scylla_views_builds_in_progress table. If
+ * the node restarts here, we'll start building the views at the token in the next_token column.
+ * - When we finish building a view, we mark it as completed in the built views system table, and
+ * remove it from the in-progress system table. Under failure, the following can happen:
+ * * When we fail to mark the view as built, we'll redo the last step upon node reboot;
+ * * When we fail to delete the in-progress record, upon reboot we'll remove this record.
+ * A view is marked as completed only when all shards have finished their share of the work, that is,
+ * if a view is not built, then all shards will still have an entry in the in-progress system table,
+ * - A view that a shard finished building, but not all other shards, remains in the in-progress system
+ * table, with first_token == next_token.
+ * Interaction with the distributed system table (view_build_status):
+ * - When we start building a view, we mark the view build as being in-progress;
+ * - When we finish building a view, we mark the view as being built. Upon failure,
+ * we ensure that if the view is in the in-progress system table, then it may not
+ * have been written to this table. We don't load the built views from this table
+ * when starting. When starting, the following happens:
+ * * If the view is in the system.built_views table and not the in-progress
+ * system table, then it will be in view_build_status;
+ * * If the view is in the system.built_views table and not in this one, it
+ * will still be in the in-progress system table - we detect this and mark
+ * it as built in this table too, keeping the invariant;
+ * * If the view is in this table but not in system.built_views, then it will
+ * also be in the in-progress system table - we don't detect this and will
+ * redo the missing step, for simplicity.
+ */
+class view_builder final {
+ /**
+ * Keeps track of the build progress for a particular view.
+ * When the view is built, next_token == first_token.
+ */
+ struct view_build_status final {
+ view_ptr view;
+ dht::token first_token;
+ std::optional next_token;
+ };
+
+ /**
+ * Keeps track of the build progress for all the views of a particular
+ * base table. Each execution of the build step comprises a query of
+ * the base table for the selected range.
+ *
+ * We pin the set of sstables that potentially contain data that should be added to a
+ * view (they are pinned by the flat_mutation_reader). Adding a view v' overwrites the
+ * set of pinned sstables, regardless of there being another view v'' being built. The
+ * new set will potentially contain new data already in v'', written as part of the write
+ * path. We assume this case is rare and optimize for fewer disk space in detriment of
+ * network bandwidth.
+ */
+ struct build_step final {
+ // Ensure we pin the column_family. It may happen that all views are removed,
+ // and that the base table is too before we can detect it.
+ lw_shared_ptr base;
+ query::partition_slice pslice;
+ dht::partition_range prange;
+ flat_mutation_reader reader{nullptr};
+ dht::decorated_key current_key{dht::minimum_token(), partition_key::make_empty()};
+ std::vector build_status;
+
+ const dht::token& current_token() const {
+ return current_key.token();
+ }
+ };
+
+ using base_to_build_step_type = std::unordered_map;
+
+ database& _db;
+ db::system_distributed_keyspace& _sys_dist_ks;
+ base_to_build_step_type _base_to_build_step;
+ base_to_build_step_type::iterator _current_step = _base_to_build_step.end();
+ serialized_action _build_step{std::bind(&view_builder::do_build_step, this)};
+
+public:
+ view_builder(database&, db::system_distributed_keyspace&);
+ view_builder(view_builder&&) = delete;
+
+ /**
+ * Loads the state stored in the system tables to resume building the existing views.
+ * Requires that all views have been loaded from the system tables and are accessible
+ * through the database, and that the commitlog has been replayed.
+ */
+ future<> start();
+
+ /**
+ * Stops the view building process.
+ */
+ future<> stop();
+
+private:
+ build_step& get_or_create_build_step(utils::UUID);
+ void initialize_reader_at_current_token(build_step&);
+ void load_view_status(view_build_status, std::unordered_set&);
+ void reshard(std::vector>, std::unordered_set&);
+ future<> calculate_shard_build_step(std::vector, std::vector);
+ future<> add_new_view(view_ptr, build_step&);
+ future<> do_build_step();
+};
+
+}
\ No newline at end of file
diff --git a/main.cc b/main.cc
index 3aca8d2f54..1fe7c2c419 100644
--- a/main.cc
+++ b/main.cc
@@ -40,6 +40,7 @@
#include "db/commitlog/commitlog.hh"
#include "db/hints/manager.hh"
#include "db/commitlog/commitlog_replayer.hh"
+#include "db/view/view_builder.hh"
#include "utils/runtime.hh"
#include "utils/file_lock.hh"
#include "log.hh"
@@ -691,6 +692,11 @@ int main(int ac, char** av) {
proxy.invoke_on_all([] (service::storage_proxy& local_proxy) { local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this()); }).get();
}
+ static sharded view_builder;
+ supervisor::notify("starting the view builder");
+ view_builder.start(std::ref(db), std::ref(sys_dist_ks)).get();
+ view_builder.invoke_on_all(&db::view::view_builder::start).get();
+
supervisor::notify("starting native transport");
service::get_local_storage_service().start_native_transport().get();
if (start_thrift) {
@@ -721,6 +727,10 @@ int main(int ac, char** av) {
return service::get_local_storage_service().drain_on_shutdown();
});
+ engine().at_exit([] {
+ return view_builder.stop();
+ });
+
engine().at_exit([&db] {
return db.invoke_on_all([](auto& db) {
return db.get_compaction_manager().stop();