From 84a69b6adb3df64657956404884c3880102e84b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Nov 2022 12:10:14 +0200 Subject: [PATCH] db/view/view_update_check: check_needs_view_update_path(): filter out non-member hosts We currently don't clean up the system_distributed.view_build_status table after removed nodes. This can cause false-positive check for whether view update generation is needed for streaming. The proper fix is to clean up this table, but that will be more involved, it even when done, it might not be immediate. So until then and to be on the safe side, filter out entries belonging to unknown hosts from said table. Fixes: #11905 Refs: #11836 Closes #11860 --- db/view/view.cc | 18 +++++++++++------- db/view/view_update_checks.hh | 8 ++++++-- replica/distributed_loader.cc | 2 +- streaming/consumer.cc | 2 +- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index bc681ff764..6822746668 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2523,24 +2523,28 @@ update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog bac return std::max(backlog, _max.load(std::memory_order_relaxed)); } -future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const sstring& ks_name, const sstring& cf_name) { - return sys_dist_ks.view_status(ks_name, cf_name).then([] (std::unordered_map&& view_statuses) { - return boost::algorithm::any_of(view_statuses | boost::adaptors::map_values, [] (const sstring& view_status) { - return view_status == "STARTED"; +future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const sstring& ks_name, + const sstring& cf_name) { + using view_statuses_type = std::unordered_map; + return sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) { + return boost::algorithm::any_of(view_statuses, [&tm] (const view_statuses_type::value_type& view_status) { + // Only consider status of known hosts. + return view_status.second == "STARTED" && tm.get_endpoint_for_host_id(view_status.first); }); }); } -future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason) { +future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t, + streaming::stream_reason reason) { if (is_internal_keyspace(t.schema()->ks_name())) { return make_ready_future(false); } if (reason == streaming::stream_reason::repair && !t.views().empty()) { return make_ready_future(true); } - return do_with(t.views(), [&sys_dist_ks] (auto& views) { + return do_with(t.views(), [&sys_dist_ks, &tm] (auto& views) { return map_reduce(views, - [&sys_dist_ks] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, view->ks_name(), view->cf_name()); }, + [&sys_dist_ks, &tm] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, tm, view->ks_name(), view->cf_name()); }, false, std::logical_or()); }); diff --git a/db/view/view_update_checks.hh b/db/view/view_update_checks.hh index deffeeb034..77b7113c0d 100644 --- a/db/view/view_update_checks.hh +++ b/db/view/view_update_checks.hh @@ -22,9 +22,13 @@ class system_distributed_keyspace; } +namespace locator { +class token_metadata; +} + namespace db::view { -future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const sstring& ks_name, const sstring& cf_name); -future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason); +future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t, + streaming::stream_reason reason); } diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 48df5598d9..4782dc6814 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -354,7 +354,7 @@ distributed_loader::process_upload_dir(distributed& db, distr return make_sstable(*global_table, upload, gen); }, [] (const sstables::shared_sstable&) { return true; }).get(); - const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), *global_table, streaming::stream_reason::repair).get0(); + const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get0(); auto datadir = upload.parent_path(); if (use_view_update_path) { diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 8f8cc0ff8f..30b74ebd13 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -29,7 +29,7 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin std::exception_ptr ex; try { auto cf = db.local().find_column_family(reader.schema()).shared_from_this(); - auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), *cf, reason); + auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason); //FIXME: for better estimations this should be transmitted from remote auto metadata = mutation_source_metadata{}; auto& cs = cf->get_compaction_strategy();