diff --git a/api/column_family.cc b/api/column_family.cc index 8bdee82980..bb01719edb 100644 --- a/api/column_family.cc +++ b/api/column_family.cc @@ -892,11 +892,11 @@ void set_column_family(http_context& ctx, routes& r, sharded req) { + cf::get_built_indexes.set(r, [&ctx, &sys_ks](std::unique_ptr req) { auto ks_cf = parse_fully_qualified_cf_name(req->param["name"]); auto&& ks = std::get<0>(ks_cf); auto&& cf_name = std::get<1>(ks_cf); - return db::system_keyspace::load_view_build_progress().then([ks, cf_name, &ctx](const std::vector& vb) mutable { + return sys_ks.local().load_view_build_progress().then([ks, cf_name, &ctx](const std::vector& vb) mutable { std::set vp; for (auto b : vb) { if (b.view.first == ks) { diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 02759d851b..cec6d8ffce 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -3069,7 +3069,7 @@ mutation system_keyspace::make_size_estimates_mutation(const sstring& ks, std::v future<> system_keyspace::register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) { sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS); - return qctx->execute_cql( + return execute_cql( std::move(req), std::move(ks_name), std::move(view_name), @@ -3081,7 +3081,7 @@ future<> system_keyspace::register_view_for_building(sstring ks_name, sstring vi future<> system_keyspace::update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) { sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS); - return qctx->execute_cql( + return execute_cql( std::move(req), std::move(ks_name), std::move(view_name), @@ -3090,14 +3090,14 @@ future<> system_keyspace::update_view_build_progress(sstring ks_name, sstring vi } future<> system_keyspace::remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) { - return qctx->execute_cql( + return execute_cql( format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), std::move(ks_name), std::move(view_name)).discard_result(); } future<> system_keyspace::remove_view_build_progress(sstring ks_name, sstring view_name) { - return qctx->execute_cql( + return execute_cql( format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ? AND cpu_id = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), std::move(ks_name), std::move(view_name), @@ -3105,21 +3105,21 @@ future<> system_keyspace::remove_view_build_progress(sstring ks_name, sstring vi } future<> system_keyspace::mark_view_as_built(sstring ks_name, sstring view_name) { - return qctx->execute_cql( + return execute_cql( format("INSERT INTO system.{} (keyspace_name, view_name) VALUES (?, ?)", v3::BUILT_VIEWS), std::move(ks_name), std::move(view_name)).discard_result(); } future<> system_keyspace::remove_built_view(sstring ks_name, sstring view_name) { - return qctx->execute_cql( + return execute_cql( format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::BUILT_VIEWS), std::move(ks_name), std::move(view_name)).discard_result(); } future> system_keyspace::load_built_views() { - return qctx->execute_cql(format("SELECT * FROM system.{}", v3::BUILT_VIEWS)).then([] (::shared_ptr cql_result) { + return execute_cql(format("SELECT * FROM system.{}", v3::BUILT_VIEWS)).then([] (::shared_ptr cql_result) { return boost::copy_range>(*cql_result | boost::adaptors::transformed([] (const cql3::untyped_result_set::row& row) { auto ks_name = row.get_as("keyspace_name"); @@ -3130,7 +3130,7 @@ future> system_keyspace::load_built_view } future> system_keyspace::load_view_build_progress() { - return qctx->execute_cql(format("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.{}", + return execute_cql(format("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.{}", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr cql_result) { std::vector progress; for (auto& row : *cql_result) { diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index d9bd7c6c4f..cf71e9d3d3 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -391,14 +391,14 @@ public: */ static mutation make_size_estimates_mutation(const sstring& ks, std::vector estimates); - static future<> register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token); - static future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token); - static future<> remove_view_build_progress(sstring ks_name, sstring view_name); - static future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name); - static future<> mark_view_as_built(sstring ks_name, sstring view_name); - static future<> remove_built_view(sstring ks_name, sstring view_name); - static future> load_built_views(); - static future> load_view_build_progress(); + future<> register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token); + future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token); + future<> remove_view_build_progress(sstring ks_name, sstring view_name); + future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name); + future<> mark_view_as_built(sstring ks_name, sstring view_name); + future<> remove_built_view(sstring ks_name, sstring view_name); + future> load_built_views(); + future> load_view_build_progress(); // Paxos related functions static future load_paxos_state(partition_key_view key, schema_ptr s, gc_clock::time_point now, diff --git a/db/view/view.cc b/db/view/view.cc index 93e95d8d29..2a402febfa 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1784,8 +1784,8 @@ future<> view_builder::start(service::migration_manager& mm) { while (!mm.have_schema_agreement()) { seastar::sleep_abortable(500ms, _as).get(); } - auto built = system_keyspace::load_built_views().get0(); - auto in_progress = system_keyspace::load_view_build_progress().get0(); + auto built = _sys_ks.load_built_views().get0(); + auto in_progress = _sys_ks.load_view_build_progress().get0(); setup_shard_build_step(vbi, std::move(built), std::move(in_progress)); }).then_wrapped([this] (future<>&& f) { // All shards need to arrive at the same decisions on whether or not to @@ -2001,9 +2001,9 @@ void view_builder::setup_shard_build_step( } if (this_shard_id() == 0) { vbi.bookkeeping_ops.push_back(_sys_dist_ks.remove_view(name.first, name.second)); - vbi.bookkeeping_ops.push_back(system_keyspace::remove_built_view(name.first, name.second)); + vbi.bookkeeping_ops.push_back(_sys_ks.remove_built_view(name.first, name.second)); vbi.bookkeeping_ops.push_back( - system_keyspace::remove_view_build_progress_across_all_shards( + _sys_ks.remove_view_build_progress_across_all_shards( std::move(name.first), std::move(name.second))); } @@ -2019,8 +2019,8 @@ void view_builder::setup_shard_build_step( if (auto view = maybe_fetch_view(view_name)) { if (vbi.built_views.contains(view->id())) { if (this_shard_id() == 0) { - auto f = _sys_dist_ks.finish_view_build(std::move(view_name.first), std::move(view_name.second)).then([view = std::move(view)] { - return system_keyspace::remove_view_build_progress_across_all_shards(view->cf_name(), view->ks_name()); + auto f = _sys_dist_ks.finish_view_build(std::move(view_name.first), std::move(view_name.second)).then([this, view = std::move(view)] { + return _sys_ks.remove_view_build_progress_across_all_shards(view->cf_name(), view->ks_name()); }); vbi.bookkeeping_ops.push_back(std::move(f)); } @@ -2102,7 +2102,7 @@ future<> view_builder::add_new_view(view_ptr view, build_step& step) { auto f = this_shard_id() == 0 ? _sys_dist_ks.start_view_build(view->ks_name(), view->cf_name()) : make_ready_future<>(); return when_all_succeed( std::move(f), - system_keyspace::register_view_for_building(view->ks_name(), view->cf_name(), step.current_token())).discard_result(); + _sys_ks.register_view_for_building(view->ks_name(), view->cf_name(), step.current_token())).discard_result(); } static future<> flush_base(lw_shared_ptr base, abort_source& as) { @@ -2184,11 +2184,11 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name // current shard, since shard 0 may have already processed the notification, and this // shard may since have updated the system table if the drop happened concurrently // with the build. - return system_keyspace::remove_view_build_progress(ks_name, view_name); + return _sys_ks.remove_view_build_progress(ks_name, view_name); } return when_all_succeed( - system_keyspace::remove_view_build_progress(ks_name, view_name), - system_keyspace::remove_built_view(ks_name, view_name), + _sys_ks.remove_view_build_progress(ks_name, view_name), + _sys_ks.remove_built_view(ks_name, view_name), _sys_dist_ks.remove_view(ks_name, view_name)) .discard_result() .handle_exception([ks_name, view_name] (std::exception_ptr ep) { @@ -2455,7 +2455,7 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) { for (auto& [view, _, next_token] : step.build_status) { if (next_token) { bookkeeping_ops.push_back( - system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), *next_token)); + _sys_ks.update_view_build_progress(view->ks_name(), view->cf_name(), *next_token)); } } seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([this] (std::exception_ptr ep) { @@ -2483,11 +2483,11 @@ future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_t auto view = builder._db.find_schema(view_id); vlogger.info("Finished building view {}.{}", view->ks_name(), view->cf_name()); return seastar::when_all_succeed( - system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()), - builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then_unpack([view] { + builder._sys_ks.mark_view_as_built(view->ks_name(), view->cf_name()), + builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then_unpack([&builder, view] { // The view is built, so shard 0 can remove the entry in the build progress system table on // behalf of all shards. It is guaranteed to have a higher timestamp than the per-shard entries. - return system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name()); + return builder._sys_ks.remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name()); }).then([&builder, view] { auto it = builder._build_notifiers.find(std::pair(view->ks_name(), view->cf_name())); if (it != builder._build_notifiers.end()) { @@ -2496,7 +2496,7 @@ future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_t }); }); } - return system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), next_token); + return _sys_ks.update_view_build_progress(view->ks_name(), view->cf_name(), next_token); }); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 16a2a8dbc0..69cd5cdba3 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -621,8 +621,8 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi future<> storage_service::mark_existing_views_as_built(sharded& sys_dist_ks) { assert(this_shard_id() == 0); auto views = _db.local().get_views(); - co_await coroutine::parallel_for_each(views, [&sys_dist_ks] (view_ptr& view) -> future<> { - co_await db::system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()); + co_await coroutine::parallel_for_each(views, [this, &sys_dist_ks] (view_ptr& view) -> future<> { + co_await _sys_ks.local().mark_view_as_built(view->ks_name(), view->cf_name()); co_await sys_dist_ks.local().finish_view_build(view->ks_name(), view->cf_name()); }); } diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 0f52619c19..65f689c6a1 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -72,7 +72,7 @@ SEASTAR_TEST_CASE(test_builder_with_large_partition) { "primary key (v, c, p)").get(); f.get(); - auto built = db::system_keyspace::load_built_views().get0(); + auto built = e.get_system_keyspace().load_built_views().get0(); BOOST_REQUIRE_EQUAL(built.size(), 1); BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf")); @@ -107,7 +107,7 @@ SEASTAR_TEST_CASE(test_builder_with_large_partition_2) { "primary key (p, c)").get(); f.get(); - auto built = db::system_keyspace::load_built_views().get0(); + auto built = e.get_system_keyspace().load_built_views().get0(); BOOST_REQUIRE_EQUAL(built.size(), 1); BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf")); @@ -132,7 +132,7 @@ SEASTAR_TEST_CASE(test_builder_with_multiple_partitions) { "primary key (v, c, p)").get(); f.get(); - auto built = db::system_keyspace::load_built_views().get0(); + auto built = e.get_system_keyspace().load_built_views().get0(); BOOST_REQUIRE_EQUAL(built.size(), 1); BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf")); @@ -156,7 +156,7 @@ SEASTAR_TEST_CASE(test_builder_with_multiple_partitions_of_batch_size_rows) { "primary key (v, c, p)").get(); f.get(); - auto built = db::system_keyspace::load_built_views().get0(); + auto built = e.get_system_keyspace().load_built_views().get0(); BOOST_REQUIRE_EQUAL(built.size(), 1); BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf")); @@ -189,7 +189,7 @@ SEASTAR_TEST_CASE(test_builder_view_added_during_ongoing_build) { f2.get(); f1.get(); - auto built = db::system_keyspace::load_built_views().get0(); + auto built = e.get_system_keyspace().load_built_views().get0(); BOOST_REQUIRE_EQUAL(built.size(), 2); auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0(); @@ -247,7 +247,7 @@ SEASTAR_TEST_CASE(test_builder_across_tokens_with_large_partitions) { f2.get(); f1.get(); - auto built = db::system_keyspace::load_built_views().get0(); + auto built = e.get_system_keyspace().load_built_views().get0(); BOOST_REQUIRE_EQUAL(built.size(), 2); auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0(); @@ -290,7 +290,7 @@ SEASTAR_TEST_CASE(test_builder_across_tokens_with_small_partitions) { f2.get(); f1.get(); - auto built = db::system_keyspace::load_built_views().get0(); + auto built = e.get_system_keyspace().load_built_views().get0(); BOOST_REQUIRE_EQUAL(built.size(), 2); auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0(); @@ -320,7 +320,7 @@ SEASTAR_TEST_CASE(test_builder_with_tombstones) { "primary key ((v, p), c1, c2)").get(); f.get(); - auto built = db::system_keyspace::load_built_views().get0(); + auto built = e.get_system_keyspace().load_built_views().get0(); BOOST_REQUIRE_EQUAL(built.size(), 1); auto msg = e.execute_cql("select * from vcf").get0(); @@ -865,6 +865,6 @@ SEASTAR_TEST_CASE(test_load_view_build_progress_with_values_missing) { return do_with_cql_env_thread([] (cql_test_env& e) { cquery_nofail(e, format("INSERT INTO system.{} (keyspace_name, view_name, cpu_id) VALUES ('ks', 'v', {})", db::system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS, this_shard_id())); - BOOST_REQUIRE(db::system_keyspace::load_view_build_progress().get0().empty()); + BOOST_REQUIRE(e.get_system_keyspace().load_view_build_progress().get0().empty()); }); } diff --git a/test/boost/virtual_reader_test.cc b/test/boost/virtual_reader_test.cc index 9868e6a697..5b07ba565d 100644 --- a/test/boost/virtual_reader_test.cc +++ b/test/boost/virtual_reader_test.cc @@ -177,16 +177,16 @@ SEASTAR_TEST_CASE(test_query_view_built_progress_virtual_table) { auto rand = [] { return dht::token::get_random_token(); }; auto next_token = rand(); auto next_token_str = next_token.to_sstring(); - db::system_keyspace::register_view_for_building("ks", "v1", rand()).get(); - db::system_keyspace::register_view_for_building("ks", "v2", rand()).get(); - db::system_keyspace::register_view_for_building("ks", "v3", rand()).get(); - db::system_keyspace::update_view_build_progress("ks", "v3", next_token).get(); - db::system_keyspace::register_view_for_building("ks", "v4", rand()).get(); - db::system_keyspace::update_view_build_progress("ks", "v4", next_token).get(); - db::system_keyspace::register_view_for_building("ks", "v5", rand()).get(); - db::system_keyspace::register_view_for_building("ks", "v6", rand()).get(); - db::system_keyspace::remove_view_build_progress_across_all_shards("ks", "v5").get(); - db::system_keyspace::remove_view_build_progress_across_all_shards("ks", "v6").get(); + e.get_system_keyspace().register_view_for_building("ks", "v1", rand()).get(); + e.get_system_keyspace().register_view_for_building("ks", "v2", rand()).get(); + e.get_system_keyspace().register_view_for_building("ks", "v3", rand()).get(); + e.get_system_keyspace().update_view_build_progress("ks", "v3", next_token).get(); + e.get_system_keyspace().register_view_for_building("ks", "v4", rand()).get(); + e.get_system_keyspace().update_view_build_progress("ks", "v4", next_token).get(); + e.get_system_keyspace().register_view_for_building("ks", "v5", rand()).get(); + e.get_system_keyspace().register_view_for_building("ks", "v6", rand()).get(); + e.get_system_keyspace().remove_view_build_progress_across_all_shards("ks", "v5").get(); + e.get_system_keyspace().remove_view_build_progress_across_all_shards("ks", "v6").get(); auto rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks'").get0(); assert_that(rs).is_rows().with_rows_ignore_order({ { {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v1"))}, {int32_type->decompose(0)}, { } },