system_keysace: De-static calls that update view-building tables
There's a bunch of them used by mainly view_builder and also by the API and storage_service. All use global qctx to make its job, now when the callers have main-local sharded<system_keysace> references they can be made non-static. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -892,11 +892,11 @@ void set_column_family(http_context& ctx, routes& r, sharded<db::system_keyspace
|
||||
});
|
||||
});
|
||||
|
||||
cf::get_built_indexes.set(r, [&ctx](std::unique_ptr<request> req) {
|
||||
cf::get_built_indexes.set(r, [&ctx, &sys_ks](std::unique_ptr<request> req) {
|
||||
auto ks_cf = parse_fully_qualified_cf_name(req->param["name"]);
|
||||
auto&& ks = std::get<0>(ks_cf);
|
||||
auto&& cf_name = std::get<1>(ks_cf);
|
||||
return db::system_keyspace::load_view_build_progress().then([ks, cf_name, &ctx](const std::vector<db::system_keyspace_view_build_progress>& vb) mutable {
|
||||
return sys_ks.local().load_view_build_progress().then([ks, cf_name, &ctx](const std::vector<db::system_keyspace_view_build_progress>& vb) mutable {
|
||||
std::set<sstring> vp;
|
||||
for (auto b : vb) {
|
||||
if (b.view.first == ks) {
|
||||
|
||||
@@ -3069,7 +3069,7 @@ mutation system_keyspace::make_size_estimates_mutation(const sstring& ks, std::v
|
||||
future<> system_keyspace::register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token) {
|
||||
sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, generation_number, cpu_id, first_token) VALUES (?, ?, ?, ?, ?)",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
return qctx->execute_cql(
|
||||
return execute_cql(
|
||||
std::move(req),
|
||||
std::move(ks_name),
|
||||
std::move(view_name),
|
||||
@@ -3081,7 +3081,7 @@ future<> system_keyspace::register_view_for_building(sstring ks_name, sstring vi
|
||||
future<> system_keyspace::update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) {
|
||||
sstring req = format("INSERT INTO system.{} (keyspace_name, view_name, next_token, cpu_id) VALUES (?, ?, ?, ?)",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS);
|
||||
return qctx->execute_cql(
|
||||
return execute_cql(
|
||||
std::move(req),
|
||||
std::move(ks_name),
|
||||
std::move(view_name),
|
||||
@@ -3090,14 +3090,14 @@ future<> system_keyspace::update_view_build_progress(sstring ks_name, sstring vi
|
||||
}
|
||||
|
||||
future<> system_keyspace::remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name) {
|
||||
return qctx->execute_cql(
|
||||
return execute_cql(
|
||||
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::remove_view_build_progress(sstring ks_name, sstring view_name) {
|
||||
return qctx->execute_cql(
|
||||
return execute_cql(
|
||||
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ? AND cpu_id = ?", v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name),
|
||||
@@ -3105,21 +3105,21 @@ future<> system_keyspace::remove_view_build_progress(sstring ks_name, sstring vi
|
||||
}
|
||||
|
||||
future<> system_keyspace::mark_view_as_built(sstring ks_name, sstring view_name) {
|
||||
return qctx->execute_cql(
|
||||
return execute_cql(
|
||||
format("INSERT INTO system.{} (keyspace_name, view_name) VALUES (?, ?)", v3::BUILT_VIEWS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::remove_built_view(sstring ks_name, sstring view_name) {
|
||||
return qctx->execute_cql(
|
||||
return execute_cql(
|
||||
format("DELETE FROM system.{} WHERE keyspace_name = ? AND view_name = ?", v3::BUILT_VIEWS),
|
||||
std::move(ks_name),
|
||||
std::move(view_name)).discard_result();
|
||||
}
|
||||
|
||||
future<std::vector<system_keyspace::view_name>> system_keyspace::load_built_views() {
|
||||
return qctx->execute_cql(format("SELECT * FROM system.{}", v3::BUILT_VIEWS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return execute_cql(format("SELECT * FROM system.{}", v3::BUILT_VIEWS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return boost::copy_range<std::vector<view_name>>(*cql_result
|
||||
| boost::adaptors::transformed([] (const cql3::untyped_result_set::row& row) {
|
||||
auto ks_name = row.get_as<sstring>("keyspace_name");
|
||||
@@ -3130,7 +3130,7 @@ future<std::vector<system_keyspace::view_name>> system_keyspace::load_built_view
|
||||
}
|
||||
|
||||
future<std::vector<system_keyspace::view_build_progress>> system_keyspace::load_view_build_progress() {
|
||||
return qctx->execute_cql(format("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.{}",
|
||||
return execute_cql(format("SELECT keyspace_name, view_name, first_token, next_token, cpu_id FROM system.{}",
|
||||
v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS)).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
std::vector<view_build_progress> progress;
|
||||
for (auto& row : *cql_result) {
|
||||
|
||||
@@ -391,14 +391,14 @@ public:
|
||||
*/
|
||||
static mutation make_size_estimates_mutation(const sstring& ks, std::vector<range_estimates> estimates);
|
||||
|
||||
static future<> register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token);
|
||||
static future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token);
|
||||
static future<> remove_view_build_progress(sstring ks_name, sstring view_name);
|
||||
static future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name);
|
||||
static future<> mark_view_as_built(sstring ks_name, sstring view_name);
|
||||
static future<> remove_built_view(sstring ks_name, sstring view_name);
|
||||
static future<std::vector<view_name>> load_built_views();
|
||||
static future<std::vector<view_build_progress>> load_view_build_progress();
|
||||
future<> register_view_for_building(sstring ks_name, sstring view_name, const dht::token& token);
|
||||
future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token);
|
||||
future<> remove_view_build_progress(sstring ks_name, sstring view_name);
|
||||
future<> remove_view_build_progress_across_all_shards(sstring ks_name, sstring view_name);
|
||||
future<> mark_view_as_built(sstring ks_name, sstring view_name);
|
||||
future<> remove_built_view(sstring ks_name, sstring view_name);
|
||||
future<std::vector<view_name>> load_built_views();
|
||||
future<std::vector<view_build_progress>> load_view_build_progress();
|
||||
|
||||
// Paxos related functions
|
||||
static future<service::paxos::paxos_state> load_paxos_state(partition_key_view key, schema_ptr s, gc_clock::time_point now,
|
||||
|
||||
@@ -1784,8 +1784,8 @@ future<> view_builder::start(service::migration_manager& mm) {
|
||||
while (!mm.have_schema_agreement()) {
|
||||
seastar::sleep_abortable(500ms, _as).get();
|
||||
}
|
||||
auto built = system_keyspace::load_built_views().get0();
|
||||
auto in_progress = system_keyspace::load_view_build_progress().get0();
|
||||
auto built = _sys_ks.load_built_views().get0();
|
||||
auto in_progress = _sys_ks.load_view_build_progress().get0();
|
||||
setup_shard_build_step(vbi, std::move(built), std::move(in_progress));
|
||||
}).then_wrapped([this] (future<>&& f) {
|
||||
// All shards need to arrive at the same decisions on whether or not to
|
||||
@@ -2001,9 +2001,9 @@ void view_builder::setup_shard_build_step(
|
||||
}
|
||||
if (this_shard_id() == 0) {
|
||||
vbi.bookkeeping_ops.push_back(_sys_dist_ks.remove_view(name.first, name.second));
|
||||
vbi.bookkeeping_ops.push_back(system_keyspace::remove_built_view(name.first, name.second));
|
||||
vbi.bookkeeping_ops.push_back(_sys_ks.remove_built_view(name.first, name.second));
|
||||
vbi.bookkeeping_ops.push_back(
|
||||
system_keyspace::remove_view_build_progress_across_all_shards(
|
||||
_sys_ks.remove_view_build_progress_across_all_shards(
|
||||
std::move(name.first),
|
||||
std::move(name.second)));
|
||||
}
|
||||
@@ -2019,8 +2019,8 @@ void view_builder::setup_shard_build_step(
|
||||
if (auto view = maybe_fetch_view(view_name)) {
|
||||
if (vbi.built_views.contains(view->id())) {
|
||||
if (this_shard_id() == 0) {
|
||||
auto f = _sys_dist_ks.finish_view_build(std::move(view_name.first), std::move(view_name.second)).then([view = std::move(view)] {
|
||||
return system_keyspace::remove_view_build_progress_across_all_shards(view->cf_name(), view->ks_name());
|
||||
auto f = _sys_dist_ks.finish_view_build(std::move(view_name.first), std::move(view_name.second)).then([this, view = std::move(view)] {
|
||||
return _sys_ks.remove_view_build_progress_across_all_shards(view->cf_name(), view->ks_name());
|
||||
});
|
||||
vbi.bookkeeping_ops.push_back(std::move(f));
|
||||
}
|
||||
@@ -2102,7 +2102,7 @@ future<> view_builder::add_new_view(view_ptr view, build_step& step) {
|
||||
auto f = this_shard_id() == 0 ? _sys_dist_ks.start_view_build(view->ks_name(), view->cf_name()) : make_ready_future<>();
|
||||
return when_all_succeed(
|
||||
std::move(f),
|
||||
system_keyspace::register_view_for_building(view->ks_name(), view->cf_name(), step.current_token())).discard_result();
|
||||
_sys_ks.register_view_for_building(view->ks_name(), view->cf_name(), step.current_token())).discard_result();
|
||||
}
|
||||
|
||||
static future<> flush_base(lw_shared_ptr<replica::column_family> base, abort_source& as) {
|
||||
@@ -2184,11 +2184,11 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name
|
||||
// current shard, since shard 0 may have already processed the notification, and this
|
||||
// shard may since have updated the system table if the drop happened concurrently
|
||||
// with the build.
|
||||
return system_keyspace::remove_view_build_progress(ks_name, view_name);
|
||||
return _sys_ks.remove_view_build_progress(ks_name, view_name);
|
||||
}
|
||||
return when_all_succeed(
|
||||
system_keyspace::remove_view_build_progress(ks_name, view_name),
|
||||
system_keyspace::remove_built_view(ks_name, view_name),
|
||||
_sys_ks.remove_view_build_progress(ks_name, view_name),
|
||||
_sys_ks.remove_built_view(ks_name, view_name),
|
||||
_sys_dist_ks.remove_view(ks_name, view_name))
|
||||
.discard_result()
|
||||
.handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
@@ -2455,7 +2455,7 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) {
|
||||
for (auto& [view, _, next_token] : step.build_status) {
|
||||
if (next_token) {
|
||||
bookkeeping_ops.push_back(
|
||||
system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), *next_token));
|
||||
_sys_ks.update_view_build_progress(view->ks_name(), view->cf_name(), *next_token));
|
||||
}
|
||||
}
|
||||
seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([this] (std::exception_ptr ep) {
|
||||
@@ -2483,11 +2483,11 @@ future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_t
|
||||
auto view = builder._db.find_schema(view_id);
|
||||
vlogger.info("Finished building view {}.{}", view->ks_name(), view->cf_name());
|
||||
return seastar::when_all_succeed(
|
||||
system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()),
|
||||
builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then_unpack([view] {
|
||||
builder._sys_ks.mark_view_as_built(view->ks_name(), view->cf_name()),
|
||||
builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then_unpack([&builder, view] {
|
||||
// The view is built, so shard 0 can remove the entry in the build progress system table on
|
||||
// behalf of all shards. It is guaranteed to have a higher timestamp than the per-shard entries.
|
||||
return system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name());
|
||||
return builder._sys_ks.remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name());
|
||||
}).then([&builder, view] {
|
||||
auto it = builder._build_notifiers.find(std::pair(view->ks_name(), view->cf_name()));
|
||||
if (it != builder._build_notifiers.end()) {
|
||||
@@ -2496,7 +2496,7 @@ future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_t
|
||||
});
|
||||
});
|
||||
}
|
||||
return system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), next_token);
|
||||
return _sys_ks.update_view_build_progress(view->ks_name(), view->cf_name(), next_token);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -621,8 +621,8 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
||||
future<> storage_service::mark_existing_views_as_built(sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
assert(this_shard_id() == 0);
|
||||
auto views = _db.local().get_views();
|
||||
co_await coroutine::parallel_for_each(views, [&sys_dist_ks] (view_ptr& view) -> future<> {
|
||||
co_await db::system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name());
|
||||
co_await coroutine::parallel_for_each(views, [this, &sys_dist_ks] (view_ptr& view) -> future<> {
|
||||
co_await _sys_ks.local().mark_view_as_built(view->ks_name(), view->cf_name());
|
||||
co_await sys_dist_ks.local().finish_view_build(view->ks_name(), view->cf_name());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -72,7 +72,7 @@ SEASTAR_TEST_CASE(test_builder_with_large_partition) {
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
f.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
auto built = e.get_system_keyspace().load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf"));
|
||||
|
||||
@@ -107,7 +107,7 @@ SEASTAR_TEST_CASE(test_builder_with_large_partition_2) {
|
||||
"primary key (p, c)").get();
|
||||
|
||||
f.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
auto built = e.get_system_keyspace().load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf"));
|
||||
|
||||
@@ -132,7 +132,7 @@ SEASTAR_TEST_CASE(test_builder_with_multiple_partitions) {
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
f.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
auto built = e.get_system_keyspace().load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf"));
|
||||
|
||||
@@ -156,7 +156,7 @@ SEASTAR_TEST_CASE(test_builder_with_multiple_partitions_of_batch_size_rows) {
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
f.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
auto built = e.get_system_keyspace().load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(built[0].second, sstring("vcf"));
|
||||
|
||||
@@ -189,7 +189,7 @@ SEASTAR_TEST_CASE(test_builder_view_added_during_ongoing_build) {
|
||||
|
||||
f2.get();
|
||||
f1.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
auto built = e.get_system_keyspace().load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 2);
|
||||
|
||||
auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0();
|
||||
@@ -247,7 +247,7 @@ SEASTAR_TEST_CASE(test_builder_across_tokens_with_large_partitions) {
|
||||
|
||||
f2.get();
|
||||
f1.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
auto built = e.get_system_keyspace().load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 2);
|
||||
|
||||
auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0();
|
||||
@@ -290,7 +290,7 @@ SEASTAR_TEST_CASE(test_builder_across_tokens_with_small_partitions) {
|
||||
f2.get();
|
||||
f1.get();
|
||||
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
auto built = e.get_system_keyspace().load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 2);
|
||||
|
||||
auto msg = e.execute_cql("select count(*) from vcf1 where v = 0").get0();
|
||||
@@ -320,7 +320,7 @@ SEASTAR_TEST_CASE(test_builder_with_tombstones) {
|
||||
"primary key ((v, p), c1, c2)").get();
|
||||
|
||||
f.get();
|
||||
auto built = db::system_keyspace::load_built_views().get0();
|
||||
auto built = e.get_system_keyspace().load_built_views().get0();
|
||||
BOOST_REQUIRE_EQUAL(built.size(), 1);
|
||||
|
||||
auto msg = e.execute_cql("select * from vcf").get0();
|
||||
@@ -865,6 +865,6 @@ SEASTAR_TEST_CASE(test_load_view_build_progress_with_values_missing) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
cquery_nofail(e, format("INSERT INTO system.{} (keyspace_name, view_name, cpu_id) VALUES ('ks', 'v', {})",
|
||||
db::system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS, this_shard_id()));
|
||||
BOOST_REQUIRE(db::system_keyspace::load_view_build_progress().get0().empty());
|
||||
BOOST_REQUIRE(e.get_system_keyspace().load_view_build_progress().get0().empty());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -177,16 +177,16 @@ SEASTAR_TEST_CASE(test_query_view_built_progress_virtual_table) {
|
||||
auto rand = [] { return dht::token::get_random_token(); };
|
||||
auto next_token = rand();
|
||||
auto next_token_str = next_token.to_sstring();
|
||||
db::system_keyspace::register_view_for_building("ks", "v1", rand()).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v2", rand()).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v3", rand()).get();
|
||||
db::system_keyspace::update_view_build_progress("ks", "v3", next_token).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v4", rand()).get();
|
||||
db::system_keyspace::update_view_build_progress("ks", "v4", next_token).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v5", rand()).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v6", rand()).get();
|
||||
db::system_keyspace::remove_view_build_progress_across_all_shards("ks", "v5").get();
|
||||
db::system_keyspace::remove_view_build_progress_across_all_shards("ks", "v6").get();
|
||||
e.get_system_keyspace().register_view_for_building("ks", "v1", rand()).get();
|
||||
e.get_system_keyspace().register_view_for_building("ks", "v2", rand()).get();
|
||||
e.get_system_keyspace().register_view_for_building("ks", "v3", rand()).get();
|
||||
e.get_system_keyspace().update_view_build_progress("ks", "v3", next_token).get();
|
||||
e.get_system_keyspace().register_view_for_building("ks", "v4", rand()).get();
|
||||
e.get_system_keyspace().update_view_build_progress("ks", "v4", next_token).get();
|
||||
e.get_system_keyspace().register_view_for_building("ks", "v5", rand()).get();
|
||||
e.get_system_keyspace().register_view_for_building("ks", "v6", rand()).get();
|
||||
e.get_system_keyspace().remove_view_build_progress_across_all_shards("ks", "v5").get();
|
||||
e.get_system_keyspace().remove_view_build_progress_across_all_shards("ks", "v6").get();
|
||||
auto rs = e.execute_cql("select * from system.views_builds_in_progress where keyspace_name = 'ks'").get0();
|
||||
assert_that(rs).is_rows().with_rows_ignore_order({
|
||||
{ {utf8_type->decompose(sstring("ks"))}, {utf8_type->decompose(sstring("v1"))}, {int32_type->decompose(0)}, { } },
|
||||
|
||||
Reference in New Issue
Block a user