diff --git a/db/view/view.cc b/db/view/view.cc
index 2d6332ab46..cbd99bc519 100644
--- a/db/view/view.cc
+++ b/db/view/view.cc
@@ -39,6 +39,7 @@
* along with Scylla. If not, see .
*/
+#include
#include
#include
#include
@@ -60,6 +61,8 @@
#include "gms/inet_address.hh"
#include "keys.hh"
#include "locator/network_topology_strategy.hh"
+#include "mutation.hh"
+#include "mutation_partition.hh"
#include "service/migration_manager.hh"
#include "service/storage_service.hh"
#include "view_info.hh"
@@ -1235,6 +1238,7 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name
}
for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) {
if (it->view->cf_name() == view_name) {
+ _built_views.erase(it->view->id());
step.build_status.erase(it);
return;
}
@@ -1254,9 +1258,232 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name
}
future<> view_builder::do_build_step() {
- return make_ready_future<>();
+ return seastar::async([this] {
+ exponential_backoff_retry r(1s, 1min);
+ while (!_base_to_build_step.empty() && !_as.abort_requested()) {
+ auto units = get_units(_sem, 1).get0();
+ try {
+ execute(_current_step->second, exponential_backoff_retry(1s, 1min));
+ r.reset();
+ } catch (const abort_requested_exception&) {
+ return;
+ } catch (...) {
+ auto base = _current_step->second.base->schema();
+ vlogger.warn("Error executing build step for base {}.{}: {}", base->ks_name(), base->cf_name(), std::current_exception());
+ r.retry(_as).get();
+ initialize_reader_at_current_token(_current_step->second);
+ }
+ if (_current_step->second.build_status.empty()) {
+ _current_step = _base_to_build_step.erase(_current_step);
+ } else {
+ ++_current_step;
+ }
+ if (_current_step == _base_to_build_step.end()) {
+ _current_step = _base_to_build_step.begin();
+ }
+ }
+ });
+}
+
+// Called in the context of a seastar::thread.
+class view_builder::consumer {
+public:
+ struct built_views {
+ build_step& step;
+ std::vector views;
+
+ built_views(build_step& step)
+ : step(step) {
+ }
+
+ built_views(built_views&& other)
+ : step(other.step)
+ , views(std::move(other.views)) {
+ }
+
+ ~built_views() {
+ for (auto&& status : views) {
+ std::cout << "putting " << status.view->cf_name() << " back\n";
+ // Use step.current_token(), which may have wrapped around and become < first_token.
+ step.build_status.emplace_back(view_build_status{std::move(status.view), step.current_token(), step.current_token()});
+ }
+ }
+
+ void release() {
+ views.clear();
+ }
+ };
+
+private:
+ view_builder& _builder;
+ build_step& _step;
+ built_views _built_views;
+ std::vector _views_to_build;
+ std::deque _fragments;
+
+public:
+ consumer(view_builder& builder, build_step& step)
+ : _builder(builder)
+ , _step(step)
+ , _built_views{step} {
+ if (!step.current_key.key().is_empty(*_step.reader.schema())) {
+ load_views_to_build();
+ }
+ }
+
+ void load_views_to_build() {
+ for (auto&& vs : _step.build_status) {
+ if (_step.current_token() >= vs.next_token) {
+ if (partition_key_matches(*_step.reader.schema(), *vs.view->view_info(), _step.current_key)) {
+ _views_to_build.push_back(vs.view);
+ }
+ if (vs.next_token || _step.current_token() != vs.first_token) {
+ vs.next_token = _step.current_key.token();
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ void check_for_built_views() {
+ for (auto it = _step.build_status.begin(); it != _step.build_status.end();) {
+ // A view starts being built at token t1. Due to resharding, that may not necessarily be a
+ // shard-owned token. We finish building the view when the next_token to build is just before
+ // (or at) the first token, but the shard-owned current token is after (or at) the first token.
+ // In the system tables, we set first_token = next_token to signal the completion of the build
+ // process in case of a restart.
+ if (it->next_token && *it->next_token <= it->first_token && _step.current_token() >= it->first_token) {
+ _built_views.views.push_back(std::move(*it));
+ it = _step.build_status.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+
+ stop_iteration consume_new_partition(const dht::decorated_key& dk) {
+ _step.current_key = std::move(dk);
+ check_for_built_views();
+ _views_to_build.clear();
+ load_views_to_build();
+ return stop_iteration(_views_to_build.empty());
+ }
+
+ stop_iteration consume(tombstone) {
+ return stop_iteration::no;
+ }
+
+ stop_iteration consume(static_row&&, tombstone, bool) {
+ return stop_iteration::no;
+ }
+
+ stop_iteration consume(clustering_row&& cr, row_tombstone, bool) {
+ if (_views_to_build.empty() || _builder._as.abort_requested()) {
+ return stop_iteration::yes;
+ }
+
+ _fragments.push_back(std::move(cr));
+ return stop_iteration::no;
+ }
+
+ stop_iteration consume(range_tombstone&&) {
+ return stop_iteration::no;
+ }
+
+ stop_iteration consume_end_of_partition() {
+ _builder._as.check();
+ if (!_fragments.empty()) {
+ _fragments.push_front(partition_start(_step.current_key, tombstone()));
+ _step.base->populate_views(
+ _views_to_build,
+ _step.current_token(),
+ make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments))).get();
+ _fragments.clear();
+ }
+ return stop_iteration(_step.build_status.empty());
+ }
+
+ built_views consume_end_of_stream() {
+ if (vlogger.is_enabled(log_level::debug)) {
+ auto view_names = boost::copy_range>(
+ _views_to_build | boost::adaptors::transformed([](auto v) {
+ return v->cf_name();
+ }));
+ vlogger.debug("Completed build step for base {}.{}, at token {}; views={}", _step.base->schema()->ks_name(),
+ _step.base->schema()->cf_name(), _step.current_token(), view_names);
+ }
+ if (_step.reader.is_end_of_stream() && _step.reader.is_buffer_empty()) {
+ _step.current_key = {dht::minimum_token(), partition_key::make_empty()};
+ for (auto&& vs : _step.build_status) {
+ vs.next_token = dht::minimum_token();
+ }
+ _builder.initialize_reader_at_current_token(_step);
+ check_for_built_views();
+ }
+ return std::move(_built_views);
+ }
+};
+
+// Called in the context of a seastar::thread.
+void view_builder::execute(build_step& step, exponential_backoff_retry r) {
+ auto consumer = compact_for_query(
+ *step.reader.schema(),
+ gc_clock::now(),
+ step.pslice,
+ batch_size,
+ query::max_partitions,
+ view_builder::consumer{*this, step});
+ consumer.consume_new_partition(step.current_key); // Initialize the state in case we're resuming a partition
+ auto built = step.reader.consume_in_thread(std::move(consumer));
+
+ _as.check();
+
+ std::vector> bookkeeping_ops;
+ bookkeeping_ops.reserve(built.views.size() + step.build_status.size());
+ for (auto& [view, first_token, _] : built.views) {
+ bookkeeping_ops.push_back(maybe_mark_view_as_built(view, first_token));
+ }
+ built.release();
+ for (auto& [view, _, next_token] : step.build_status) {
+ if (next_token) {
+ bookkeeping_ops.push_back(
+ system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), *next_token));
+ }
+ }
+ seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([] (std::exception_ptr ep) {
+ vlogger.error("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep);
+ }).get();
+}
+
+future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_token) {
+ _built_views.emplace(view->id());
+ vlogger.debug("Shard finished building view {}.{}", view->ks_name(), view->cf_name());
+ return container().map_reduce0(
+ [view_id = view->id()] (view_builder& builder) {
+ return builder._built_views.count(view_id);
+ },
+ true,
+ [] (bool result, bool shard_complete) {
+ return result & shard_complete;
+ }).then([this, view, next_token = std::move(next_token)] (bool built) {
+ if (built) {
+ return container().invoke_on_all([view_id = view->id()] (view_builder& builder) {
+ if (builder._built_views.erase(view_id) == 0 || engine().cpu_id() != 0) {
+ return make_ready_future<>();
+ }
+ auto view = builder._db.find_schema(view_id);
+ vlogger.info("Finished building view {}.{}", view->ks_name(), view->cf_name());
+ return seastar::when_all_succeed(
+ system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()),
+ builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then([view] {
+ return system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name());
+ });
+ });
+ }
+ return system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), next_token);
+ });
}
} // namespace view
} // namespace db
-
diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh
index 15b7ff344c..959a914384 100644
--- a/db/view/view_builder.hh
+++ b/db/view/view_builder.hh
@@ -53,6 +53,14 @@ namespace db::view {
*
* We employ a flat_mutation_reader for each base table for which we're building views.
*
+ * We aim to be resource-conscious. On a given shard, at any given moment, we consume at most
+ * from one reader. We also strive for fairness, in that each build step inserts entries for
+ * the views of a different base. Each build step reads and generates updates for batch_size rows.
+ *
+ * We lack a controller, which could potentially allow us to go faster (to execute multiple steps at
+ * the same time, or consume more rows per batch), and also which would apply backpressure, so we
+ * could, for example, delay executing a build step.
+ *
* View building is necessarily a sharded process. That means that on restart, if the number of shards
* has changed, we need to calculate the most conservative token range that has been built, and build
* the remainder.
@@ -87,7 +95,7 @@ namespace db::view {
* also be in the in-progress system table - we don't detect this and will
* redo the missing step, for simplicity.
*/
-class view_builder final : public service::migration_listener::only_view_notifications {
+class view_builder final : public service::migration_listener::only_view_notifications, public seastar::peering_sharded_service {
/**
* Keeps track of the build progress for a particular view.
* When the view is built, next_token == first_token.
@@ -138,6 +146,11 @@ class view_builder final : public service::migration_listener::only_view_notific
// the algorithms. Also synchronizes an operation wrt. a call to stop().
seastar::semaphore _sem{1};
seastar::abort_source _as;
+ // Used to coordinate between shards the conclusion of the build process for a particular view.
+ std::unordered_set _built_views;
+
+public:
+ static constexpr size_t batch_size = 128;
public:
view_builder(database&, db::system_distributed_keyspace&, service::migration_manager&);
@@ -167,6 +180,10 @@ private:
future<> calculate_shard_build_step(std::vector, std::vector);
future<> add_new_view(view_ptr, build_step&);
future<> do_build_step();
+ void execute(build_step&, exponential_backoff_retry);
+ future<> maybe_mark_view_as_built(view_ptr, dht::token);
+
+ struct consumer;
};
}
\ No newline at end of file
diff --git a/keys.hh b/keys.hh
index 81f112c7e1..9fce736d59 100644
--- a/keys.hh
+++ b/keys.hh
@@ -304,6 +304,10 @@ public:
return get_compound_type(s)->end(_bytes);
}
+ bool is_empty(const schema& s) const {
+ return begin(s) == end(s);
+ }
+
// Returns a range of bytes_view
auto components() const {
return TopLevelView::compound::element_type::components(representation());