From 5f822e3928f9861b7040700e2214adca5744a3cd Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 12 Mar 2018 21:01:15 +0000 Subject: [PATCH] db/view/view_builder: Actually build views This patch adds the missing view building code to the eponymous class. We consume from the reader associated with each base table until all its views are built. If the reader reaches the end and there are incomplete views, then a view was added while others were being built. In such cases, we restart the reader to the beginning of the current token, but not to the beginning of the token range, when the view is added. Then, when we exhaust the reader, we simply create a new one for the whole token range, and resume building the pending views. We aim to be resource-conscious. On a given shard, at any given moment, we consume at most from one reader. We also strive for fairness, in that each build step inserts entries for the views of a different base. Each build step reads and generates updates for batch_size rows. We lack a controller, which could potentially allow us to go faster (to execute multiple steps at the same time, or consume more rows per batch), and also which would apply backpressure, so we could, for example, delay executing a build step. Signed-off-by: Duarte Nunes --- db/view/view.cc | 231 +++++++++++++++++++++++++++++++++++++++- db/view/view_builder.hh | 19 +++- keys.hh | 4 + 3 files changed, 251 insertions(+), 3 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 2d6332ab46..cbd99bc519 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -39,6 +39,7 @@ * along with Scylla. If not, see . */ +#include #include #include #include @@ -60,6 +61,8 @@ #include "gms/inet_address.hh" #include "keys.hh" #include "locator/network_topology_strategy.hh" +#include "mutation.hh" +#include "mutation_partition.hh" #include "service/migration_manager.hh" #include "service/storage_service.hh" #include "view_info.hh" @@ -1235,6 +1238,7 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name } for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) { if (it->view->cf_name() == view_name) { + _built_views.erase(it->view->id()); step.build_status.erase(it); return; } @@ -1254,9 +1258,232 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name } future<> view_builder::do_build_step() { - return make_ready_future<>(); + return seastar::async([this] { + exponential_backoff_retry r(1s, 1min); + while (!_base_to_build_step.empty() && !_as.abort_requested()) { + auto units = get_units(_sem, 1).get0(); + try { + execute(_current_step->second, exponential_backoff_retry(1s, 1min)); + r.reset(); + } catch (const abort_requested_exception&) { + return; + } catch (...) { + auto base = _current_step->second.base->schema(); + vlogger.warn("Error executing build step for base {}.{}: {}", base->ks_name(), base->cf_name(), std::current_exception()); + r.retry(_as).get(); + initialize_reader_at_current_token(_current_step->second); + } + if (_current_step->second.build_status.empty()) { + _current_step = _base_to_build_step.erase(_current_step); + } else { + ++_current_step; + } + if (_current_step == _base_to_build_step.end()) { + _current_step = _base_to_build_step.begin(); + } + } + }); +} + +// Called in the context of a seastar::thread. +class view_builder::consumer { +public: + struct built_views { + build_step& step; + std::vector views; + + built_views(build_step& step) + : step(step) { + } + + built_views(built_views&& other) + : step(other.step) + , views(std::move(other.views)) { + } + + ~built_views() { + for (auto&& status : views) { + std::cout << "putting " << status.view->cf_name() << " back\n"; + // Use step.current_token(), which may have wrapped around and become < first_token. + step.build_status.emplace_back(view_build_status{std::move(status.view), step.current_token(), step.current_token()}); + } + } + + void release() { + views.clear(); + } + }; + +private: + view_builder& _builder; + build_step& _step; + built_views _built_views; + std::vector _views_to_build; + std::deque _fragments; + +public: + consumer(view_builder& builder, build_step& step) + : _builder(builder) + , _step(step) + , _built_views{step} { + if (!step.current_key.key().is_empty(*_step.reader.schema())) { + load_views_to_build(); + } + } + + void load_views_to_build() { + for (auto&& vs : _step.build_status) { + if (_step.current_token() >= vs.next_token) { + if (partition_key_matches(*_step.reader.schema(), *vs.view->view_info(), _step.current_key)) { + _views_to_build.push_back(vs.view); + } + if (vs.next_token || _step.current_token() != vs.first_token) { + vs.next_token = _step.current_key.token(); + } + } else { + break; + } + } + } + + void check_for_built_views() { + for (auto it = _step.build_status.begin(); it != _step.build_status.end();) { + // A view starts being built at token t1. Due to resharding, that may not necessarily be a + // shard-owned token. We finish building the view when the next_token to build is just before + // (or at) the first token, but the shard-owned current token is after (or at) the first token. + // In the system tables, we set first_token = next_token to signal the completion of the build + // process in case of a restart. + if (it->next_token && *it->next_token <= it->first_token && _step.current_token() >= it->first_token) { + _built_views.views.push_back(std::move(*it)); + it = _step.build_status.erase(it); + } else { + ++it; + } + } + } + + stop_iteration consume_new_partition(const dht::decorated_key& dk) { + _step.current_key = std::move(dk); + check_for_built_views(); + _views_to_build.clear(); + load_views_to_build(); + return stop_iteration(_views_to_build.empty()); + } + + stop_iteration consume(tombstone) { + return stop_iteration::no; + } + + stop_iteration consume(static_row&&, tombstone, bool) { + return stop_iteration::no; + } + + stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { + if (_views_to_build.empty() || _builder._as.abort_requested()) { + return stop_iteration::yes; + } + + _fragments.push_back(std::move(cr)); + return stop_iteration::no; + } + + stop_iteration consume(range_tombstone&&) { + return stop_iteration::no; + } + + stop_iteration consume_end_of_partition() { + _builder._as.check(); + if (!_fragments.empty()) { + _fragments.push_front(partition_start(_step.current_key, tombstone())); + _step.base->populate_views( + _views_to_build, + _step.current_token(), + make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments))).get(); + _fragments.clear(); + } + return stop_iteration(_step.build_status.empty()); + } + + built_views consume_end_of_stream() { + if (vlogger.is_enabled(log_level::debug)) { + auto view_names = boost::copy_range>( + _views_to_build | boost::adaptors::transformed([](auto v) { + return v->cf_name(); + })); + vlogger.debug("Completed build step for base {}.{}, at token {}; views={}", _step.base->schema()->ks_name(), + _step.base->schema()->cf_name(), _step.current_token(), view_names); + } + if (_step.reader.is_end_of_stream() && _step.reader.is_buffer_empty()) { + _step.current_key = {dht::minimum_token(), partition_key::make_empty()}; + for (auto&& vs : _step.build_status) { + vs.next_token = dht::minimum_token(); + } + _builder.initialize_reader_at_current_token(_step); + check_for_built_views(); + } + return std::move(_built_views); + } +}; + +// Called in the context of a seastar::thread. +void view_builder::execute(build_step& step, exponential_backoff_retry r) { + auto consumer = compact_for_query( + *step.reader.schema(), + gc_clock::now(), + step.pslice, + batch_size, + query::max_partitions, + view_builder::consumer{*this, step}); + consumer.consume_new_partition(step.current_key); // Initialize the state in case we're resuming a partition + auto built = step.reader.consume_in_thread(std::move(consumer)); + + _as.check(); + + std::vector> bookkeeping_ops; + bookkeeping_ops.reserve(built.views.size() + step.build_status.size()); + for (auto& [view, first_token, _] : built.views) { + bookkeeping_ops.push_back(maybe_mark_view_as_built(view, first_token)); + } + built.release(); + for (auto& [view, _, next_token] : step.build_status) { + if (next_token) { + bookkeeping_ops.push_back( + system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), *next_token)); + } + } + seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([] (std::exception_ptr ep) { + vlogger.error("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep); + }).get(); +} + +future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_token) { + _built_views.emplace(view->id()); + vlogger.debug("Shard finished building view {}.{}", view->ks_name(), view->cf_name()); + return container().map_reduce0( + [view_id = view->id()] (view_builder& builder) { + return builder._built_views.count(view_id); + }, + true, + [] (bool result, bool shard_complete) { + return result & shard_complete; + }).then([this, view, next_token = std::move(next_token)] (bool built) { + if (built) { + return container().invoke_on_all([view_id = view->id()] (view_builder& builder) { + if (builder._built_views.erase(view_id) == 0 || engine().cpu_id() != 0) { + return make_ready_future<>(); + } + auto view = builder._db.find_schema(view_id); + vlogger.info("Finished building view {}.{}", view->ks_name(), view->cf_name()); + return seastar::when_all_succeed( + system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()), + builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then([view] { + return system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name()); + }); + }); + } + return system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), next_token); + }); } } // namespace view } // namespace db - diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 15b7ff344c..959a914384 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -53,6 +53,14 @@ namespace db::view { * * We employ a flat_mutation_reader for each base table for which we're building views. * + * We aim to be resource-conscious. On a given shard, at any given moment, we consume at most + * from one reader. We also strive for fairness, in that each build step inserts entries for + * the views of a different base. Each build step reads and generates updates for batch_size rows. + * + * We lack a controller, which could potentially allow us to go faster (to execute multiple steps at + * the same time, or consume more rows per batch), and also which would apply backpressure, so we + * could, for example, delay executing a build step. + * * View building is necessarily a sharded process. That means that on restart, if the number of shards * has changed, we need to calculate the most conservative token range that has been built, and build * the remainder. @@ -87,7 +95,7 @@ namespace db::view { * also be in the in-progress system table - we don't detect this and will * redo the missing step, for simplicity. */ -class view_builder final : public service::migration_listener::only_view_notifications { +class view_builder final : public service::migration_listener::only_view_notifications, public seastar::peering_sharded_service { /** * Keeps track of the build progress for a particular view. * When the view is built, next_token == first_token. @@ -138,6 +146,11 @@ class view_builder final : public service::migration_listener::only_view_notific // the algorithms. Also synchronizes an operation wrt. a call to stop(). seastar::semaphore _sem{1}; seastar::abort_source _as; + // Used to coordinate between shards the conclusion of the build process for a particular view. + std::unordered_set _built_views; + +public: + static constexpr size_t batch_size = 128; public: view_builder(database&, db::system_distributed_keyspace&, service::migration_manager&); @@ -167,6 +180,10 @@ private: future<> calculate_shard_build_step(std::vector, std::vector); future<> add_new_view(view_ptr, build_step&); future<> do_build_step(); + void execute(build_step&, exponential_backoff_retry); + future<> maybe_mark_view_as_built(view_ptr, dht::token); + + struct consumer; }; } \ No newline at end of file diff --git a/keys.hh b/keys.hh index 81f112c7e1..9fce736d59 100644 --- a/keys.hh +++ b/keys.hh @@ -304,6 +304,10 @@ public: return get_compound_type(s)->end(_bytes); } + bool is_empty(const schema& s) const { + return begin(s) == end(s); + } + // Returns a range of bytes_view auto components() const { return TopLevelView::compound::element_type::components(representation());