diff --git a/db/view/view.cc b/db/view/view.cc index 75b8109c48..613a44b101 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1435,7 +1435,8 @@ view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID bas return it->second; } -void view_builder::initialize_reader_at_current_token(build_step& step) { +future<> view_builder::initialize_reader_at_current_token(build_step& step) { + return step.reader.close().then([this, &step] { step.pslice = make_partition_slice(*step.base->schema()); step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max()); step.reader = step.base->get_sstable_set().make_local_shard_sstable_reader( @@ -1447,6 +1448,7 @@ void view_builder::initialize_reader_at_current_token(build_step& step) { nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); + }); } void view_builder::load_view_status(view_builder::view_build_status status, std::unordered_set& loaded_views) { @@ -1633,13 +1635,12 @@ future<> view_builder::calculate_shard_build_step(view_builder_init_state& vbi) vbi.bookkeeping_ops.push_back(add_new_view(view, get_or_create_build_step(view->view_info()->base_id()))); } - for (auto& [_, build_step] : _base_to_build_step) { - initialize_reader_at_current_token(build_step); - } - - auto f = seastar::when_all_succeed(vbi.bookkeeping_ops.begin(), vbi.bookkeeping_ops.end()); - return f.handle_exception([this] (std::exception_ptr ep) { - vlogger.warn("Failed to update materialized view bookkeeping while synchronizing view builds on all shards ({}), continuing anyway.", ep); + return parallel_for_each(_base_to_build_step, [this] (auto& p) { + return initialize_reader_at_current_token(p.second); + }).then([this, &vbi] { + return seastar::when_all_succeed(vbi.bookkeeping_ops.begin(), vbi.bookkeeping_ops.end()).handle_exception([this] (std::exception_ptr ep) { + vlogger.warn("Failed to update materialized view bookkeeping while synchronizing view builds on all shards ({}), continuing anyway.", ep); + }); }); } @@ -1677,7 +1678,7 @@ void view_builder::on_create_view(const sstring& ks_name, const sstring& view_na // being built to receive duplicate updates, but it simplifies things as we don't have // to keep around a list of new views to build the next time the reader crosses a token // threshold. - initialize_reader_at_current_token(step); + return initialize_reader_at_current_token(step).then([this, view, &step] () mutable { return add_new_view(view, step).then_wrapped([this, view] (future<>&& f) { if (f.failed()) { vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), f.get_exception()); @@ -1685,6 +1686,7 @@ void view_builder::on_create_view(const sstring& ks_name, const sstring& view_na // Waited on indirectly in stop(). (void)_build_step.trigger(); }); + }); }); }).handle_exception_type([] (no_such_column_family&) { }); } @@ -1760,7 +1762,7 @@ future<> view_builder::do_build_step() { auto base = _current_step->second.base->schema(); vlogger.warn("Error executing build step for base {}.{}: {}", base->ks_name(), base->cf_name(), std::current_exception()); r.retry(_as).get(); - initialize_reader_at_current_token(_current_step->second); + initialize_reader_at_current_token(_current_step->second).get(); } if (_current_step->second.build_status.empty()) { auto base = _current_step->second.base->schema(); @@ -1933,6 +1935,7 @@ public: return stop_iteration(_step.build_status.empty()); } + // Must be called in a seastar thread. built_views consume_end_of_stream() { inject_failure("view_builder_consume_end_of_stream"); if (vlogger.is_enabled(log_level::debug)) { @@ -1948,7 +1951,7 @@ public: for (auto&& vs : _step.build_status) { vs.next_token = dht::minimum_token(); } - _builder.initialize_reader_at_current_token(_step); + _builder.initialize_reader_at_current_token(_step).get(); check_for_built_views(); } return std::move(_built_views); diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index a74150f625..f4d3d39f44 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -216,7 +216,7 @@ public: private: build_step& get_or_create_build_step(utils::UUID); - void initialize_reader_at_current_token(build_step&); + future<> initialize_reader_at_current_token(build_step&); void load_view_status(view_build_status, std::unordered_set&); void reshard(std::vector>, std::unordered_set&); void setup_shard_build_step(view_builder_init_state& vbi, std::vector, std::vector);