view_builder: initialize_reader_at_current_token: close reader before reassigning it
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -1435,7 +1435,8 @@ view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID bas
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void view_builder::initialize_reader_at_current_token(build_step& step) {
|
||||
future<> view_builder::initialize_reader_at_current_token(build_step& step) {
|
||||
return step.reader.close().then([this, &step] {
|
||||
step.pslice = make_partition_slice(*step.base->schema());
|
||||
step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max());
|
||||
step.reader = step.base->get_sstable_set().make_local_shard_sstable_reader(
|
||||
@@ -1447,6 +1448,7 @@ void view_builder::initialize_reader_at_current_token(build_step& step) {
|
||||
nullptr,
|
||||
streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding::no);
|
||||
});
|
||||
}
|
||||
|
||||
void view_builder::load_view_status(view_builder::view_build_status status, std::unordered_set<utils::UUID>& loaded_views) {
|
||||
@@ -1633,14 +1635,13 @@ future<> view_builder::calculate_shard_build_step(view_builder_init_state& vbi)
|
||||
vbi.bookkeeping_ops.push_back(add_new_view(view, get_or_create_build_step(view->view_info()->base_id())));
|
||||
}
|
||||
|
||||
for (auto& [_, build_step] : _base_to_build_step) {
|
||||
initialize_reader_at_current_token(build_step);
|
||||
}
|
||||
|
||||
auto f = seastar::when_all_succeed(vbi.bookkeeping_ops.begin(), vbi.bookkeeping_ops.end());
|
||||
return f.handle_exception([this] (std::exception_ptr ep) {
|
||||
return parallel_for_each(_base_to_build_step, [this] (auto& p) {
|
||||
return initialize_reader_at_current_token(p.second);
|
||||
}).then([this, &vbi] {
|
||||
return seastar::when_all_succeed(vbi.bookkeeping_ops.begin(), vbi.bookkeeping_ops.end()).handle_exception([this] (std::exception_ptr ep) {
|
||||
vlogger.warn("Failed to update materialized view bookkeeping while synchronizing view builds on all shards ({}), continuing anyway.", ep);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> view_builder::add_new_view(view_ptr view, build_step& step) {
|
||||
@@ -1677,7 +1678,7 @@ void view_builder::on_create_view(const sstring& ks_name, const sstring& view_na
|
||||
// being built to receive duplicate updates, but it simplifies things as we don't have
|
||||
// to keep around a list of new views to build the next time the reader crosses a token
|
||||
// threshold.
|
||||
initialize_reader_at_current_token(step);
|
||||
return initialize_reader_at_current_token(step).then([this, view, &step] () mutable {
|
||||
return add_new_view(view, step).then_wrapped([this, view] (future<>&& f) {
|
||||
if (f.failed()) {
|
||||
vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), f.get_exception());
|
||||
@@ -1686,6 +1687,7 @@ void view_builder::on_create_view(const sstring& ks_name, const sstring& view_na
|
||||
(void)_build_step.trigger();
|
||||
});
|
||||
});
|
||||
});
|
||||
}).handle_exception_type([] (no_such_column_family&) { });
|
||||
}
|
||||
|
||||
@@ -1760,7 +1762,7 @@ future<> view_builder::do_build_step() {
|
||||
auto base = _current_step->second.base->schema();
|
||||
vlogger.warn("Error executing build step for base {}.{}: {}", base->ks_name(), base->cf_name(), std::current_exception());
|
||||
r.retry(_as).get();
|
||||
initialize_reader_at_current_token(_current_step->second);
|
||||
initialize_reader_at_current_token(_current_step->second).get();
|
||||
}
|
||||
if (_current_step->second.build_status.empty()) {
|
||||
auto base = _current_step->second.base->schema();
|
||||
@@ -1933,6 +1935,7 @@ public:
|
||||
return stop_iteration(_step.build_status.empty());
|
||||
}
|
||||
|
||||
// Must be called in a seastar thread.
|
||||
built_views consume_end_of_stream() {
|
||||
inject_failure("view_builder_consume_end_of_stream");
|
||||
if (vlogger.is_enabled(log_level::debug)) {
|
||||
@@ -1948,7 +1951,7 @@ public:
|
||||
for (auto&& vs : _step.build_status) {
|
||||
vs.next_token = dht::minimum_token();
|
||||
}
|
||||
_builder.initialize_reader_at_current_token(_step);
|
||||
_builder.initialize_reader_at_current_token(_step).get();
|
||||
check_for_built_views();
|
||||
}
|
||||
return std::move(_built_views);
|
||||
|
||||
@@ -216,7 +216,7 @@ public:
|
||||
|
||||
private:
|
||||
build_step& get_or_create_build_step(utils::UUID);
|
||||
void initialize_reader_at_current_token(build_step&);
|
||||
future<> initialize_reader_at_current_token(build_step&);
|
||||
void load_view_status(view_build_status, std::unordered_set<utils::UUID>&);
|
||||
void reshard(std::vector<std::vector<view_build_status>>, std::unordered_set<utils::UUID>&);
|
||||
void setup_shard_build_step(view_builder_init_state& vbi, std::vector<system_keyspace::view_name>, std::vector<system_keyspace::view_build_progress>);
|
||||
|
||||
Reference in New Issue
Block a user