Merge 'db/system_keyspace: remove the dependency on storage_proxy' from Botond Dénes

The `system_keyspace` has several methods to query the tables in it. These currently require a storage proxy parameter, because the read has to go through storage-proxy. This PR uses the observation that all these reads are really local-replica reads and they only actually need a relatively small code snippet from storage proxy. These small code snippets are exported into standalone function in a new header (`replica/query.hh`). Then the system keyspace code is patched to use these new standalone functions instead of their equivalent in storage proxy. This allows us to replace the storage proxy dependency with a much more reasonable dependency on `replica::database`.

This PR patches the system keyspace code and the signatures of the affected methods as well as their immediate callers. Indirect callers are only patched to the extent it was needed to avoid introducing new includes (some had only a forward-declaration of storage proxy and so couldn't get database from it). There are a lot of opportunities left to free other methods or maybe even entire subsystems from storage proxy dependency, but this is not pursued in this PR, instead being left for follow-ups.

This PR was conceived to help us break the storage proxy -> storage service -> system tables -> storage proxy dependency loop, which become a major roadblock in migrating from IP -> host_id. After this PR, system keyspace still indirectly depends on storage proxy, because it still uses `cql3::query_processor` in some places. This will be addressed in another PR.

Refs: #11870

Closes #13869

* github.com:scylladb/scylladb:
  db/system_keyspace: remove dependency on storage_proxy
  db/system_keyspace: replace storage_proxy::query*() with  replica:: equivalent
  replica: add query.hh
This commit is contained in:
Pavel Emelyanov
2023-05-18 10:53:26 +03:00
10 changed files with 142 additions and 63 deletions

View File

@@ -757,8 +757,9 @@ redact_columns_for_missing_features(mutation m, schema_features features) {
future<table_schema_version> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features, noncopyable_function<bool(std::string_view)> accept_keyspace)
{
auto map = [&proxy, features, accept_keyspace = std::move(accept_keyspace)] (sstring table) mutable -> future<std::vector<mutation>> {
auto rs = co_await db::system_keyspace::query_mutations(proxy, NAME, table);
auto s = proxy.local().get_db().local().find_schema(NAME, table);
auto& db = proxy.local().get_db();
auto rs = co_await db::system_keyspace::query_mutations(db, NAME, table);
auto s = db.local().find_schema(NAME, table);
std::vector<mutation> mutations;
for (auto&& p : rs->partitions()) {
auto mut = p.mut().unfreeze(s);
@@ -802,8 +803,9 @@ future<table_schema_version> calculate_schema_digest(distributed<service::storag
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features features)
{
auto map = [&proxy, features] (sstring table) -> future<std::vector<canonical_mutation>> {
auto rs = co_await db::system_keyspace::query_mutations(proxy, NAME, table);
auto s = proxy.local().get_db().local().find_schema(NAME, table);
auto& db = proxy.local().get_db();
auto rs = co_await db::system_keyspace::query_mutations(db, NAME, table);
auto s = db.local().find_schema(NAME, table);
std::vector<canonical_mutation> results;
for (auto&& p : rs->partitions()) {
auto mut = p.mut().unfreeze(s);
@@ -878,7 +880,7 @@ read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, s
auto schema = proxy.local().get_db().local().find_schema(NAME, schema_table_name);
auto keyspace_key = dht::decorate_key(*schema,
partition_key::from_singular(*schema, keyspace_name));
auto rs = co_await db::system_keyspace::query(proxy, NAME, schema_table_name, keyspace_key);
auto rs = co_await db::system_keyspace::query(proxy.local().get_db(), NAME, schema_table_name, keyspace_key);
co_return schema_result_value_type{keyspace_name, std::move(rs)};
}
@@ -990,7 +992,7 @@ future<> recalculate_schema_version(sharded<db::system_keyspace>& sys_ks, distri
future<std::vector<sstring>>
static read_table_names_of_keyspace(distributed<service::storage_proxy>& proxy, const sstring& keyspace_name, schema_ptr schema_table) {
auto pkey = dht::decorate_key(*schema_table, partition_key::from_singular(*schema_table, keyspace_name));
auto&& rs = co_await db::system_keyspace::query(proxy, schema_table->ks_name(), schema_table->cf_name(), pkey);
auto&& rs = co_await db::system_keyspace::query(proxy.local().get_db(), schema_table->ks_name(), schema_table->cf_name(), pkey);
co_return boost::copy_range<std::vector<sstring>>(rs->rows() | boost::adaptors::transformed([schema_table] (const query::result_set_row& row) {
const sstring name = schema_table->clustering_key_columns().begin()->name_as_text();
return row.get_nonnull<sstring>(name);
@@ -1321,7 +1323,7 @@ future<lw_shared_ptr<query::result_set>> extract_scylla_specific_keyspace_info(d
auto&& row = rs->row(0);
auto keyspace_name = row.get_nonnull<sstring>("keyspace_name");
auto keyspace_key = dht::decorate_key(*scylla_keyspaces(), partition_key::from_singular(*scylla_keyspaces(), keyspace_name));
scylla_specific_rs = co_await db::system_keyspace::query(proxy, NAME, SCYLLA_KEYSPACES, keyspace_key);
scylla_specific_rs = co_await db::system_keyspace::query(proxy.local().get_db(), NAME, SCYLLA_KEYSPACES, keyspace_key);
}
co_return scylla_specific_rs;
}

View File

@@ -19,7 +19,6 @@
#include <seastar/json/json_elements.hh>
#include "system_keyspace.hh"
#include "types/types.hh"
#include "service/storage_proxy.hh"
#include "service/client_state.hh"
#include "service/query_state.hh"
#include "cql3/query_options.hh"
@@ -69,6 +68,7 @@
#include "sstables/generation_type.hh"
#include "cdc/generation.hh"
#include "replica/tablets.hh"
#include "replica/query.hh"
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
@@ -3007,52 +3007,39 @@ locator::endpoint_dc_rack system_keyspace::local_dc_rack() const {
}
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
system_keyspace::query_mutations(distributed<service::storage_proxy>& proxy, const sstring& ks_name, const sstring& cf_name) {
replica::database& db = proxy.local().get_db().local();
schema_ptr schema = db.find_schema(ks_name, cf_name);
auto slice = partition_slice_builder(*schema).build();
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max);
return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), query::full_partition_range, db::no_timeout)
.then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); });
system_keyspace::query_mutations(distributed<replica::database>& db, const sstring& ks_name, const sstring& cf_name) {
schema_ptr schema = db.local().find_schema(ks_name, cf_name);
return replica::query_mutations(db, schema, query::full_partition_range, schema->full_slice(), db::no_timeout);
}
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
system_keyspace::query_mutations(distributed<service::storage_proxy>& proxy, const sstring& ks_name, const sstring& cf_name, const dht::partition_range& partition_range, query::clustering_range row_range) {
auto& db = proxy.local().get_db().local();
auto schema = db.find_schema(ks_name, cf_name);
auto slice = partition_slice_builder(*schema)
system_keyspace::query_mutations(distributed<replica::database>& db, const sstring& ks_name, const sstring& cf_name, const dht::partition_range& partition_range, query::clustering_range row_range) {
auto schema = db.local().find_schema(ks_name, cf_name);
auto slice_ptr = std::make_unique<query::partition_slice>(partition_slice_builder(*schema)
.with_range(std::move(row_range))
.build();
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max);
return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), partition_range, db::no_timeout)
.then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); });
.build());
return replica::query_mutations(db, std::move(schema), partition_range, *slice_ptr, db::no_timeout).finally([slice_ptr = std::move(slice_ptr)] { });
}
future<lw_shared_ptr<query::result_set>>
system_keyspace::query(distributed<service::storage_proxy>& proxy, const sstring& ks_name, const sstring& cf_name) {
replica::database& db = proxy.local().get_db().local();
schema_ptr schema = db.find_schema(ks_name, cf_name);
auto slice = partition_slice_builder(*schema).build();
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max);
return proxy.local().query(schema, cmd, {query::full_partition_range}, db::consistency_level::ONE,
{db::no_timeout, empty_service_permit(), service::client_state::for_internal_calls(), nullptr}).then([schema, cmd] (auto&& qr) {
return make_lw_shared<query::result_set>(query::result_set::from_raw_result(schema, cmd->slice, *qr.query_result));
system_keyspace::query(distributed<replica::database>& db, const sstring& ks_name, const sstring& cf_name) {
schema_ptr schema = db.local().find_schema(ks_name, cf_name);
return replica::query_data(db, schema, query::full_partition_range, schema->full_slice(), db::no_timeout).then([schema] (auto&& qr) {
return make_lw_shared<query::result_set>(query::result_set::from_raw_result(schema, schema->full_slice(), *qr));
});
}
future<lw_shared_ptr<query::result_set>>
system_keyspace::query(distributed<service::storage_proxy>& proxy, const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key, query::clustering_range row_range)
system_keyspace::query(distributed<replica::database>& db, const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key, query::clustering_range row_range)
{
auto&& db = proxy.local().get_db().local();
auto schema = db.find_schema(ks_name, cf_name);
auto slice = partition_slice_builder(*schema)
auto schema = db.local().find_schema(ks_name, cf_name);
auto pr_ptr = std::make_unique<dht::partition_range>(dht::partition_range::make_singular(key));
auto slice_ptr = std::make_unique<query::partition_slice>(partition_slice_builder(*schema)
.with_range(std::move(row_range))
.build();
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max);
return proxy.local().query(schema, cmd, {dht::partition_range::make_singular(key)}, db::consistency_level::ONE,
{db::no_timeout, empty_service_permit(), service::client_state::for_internal_calls(), nullptr}).then([schema, cmd] (auto&& qr) {
return make_lw_shared<query::result_set>(query::result_set::from_raw_result(schema, cmd->slice, *qr.query_result));
.build());
return replica::query_data(db, schema, *pr_ptr, *slice_ptr, db::no_timeout).then(
[schema, pr_ptr = std::move(pr_ptr), slice_ptr = std::move(slice_ptr)] (auto&& qr) {
return make_lw_shared<query::result_set>(query::result_set::from_raw_result(schema, schema->full_slice(), *qr));
});
}
@@ -3480,9 +3467,9 @@ mutation system_keyspace::make_group0_history_state_id_mutation(
return m;
}
future<mutation> system_keyspace::get_group0_history(distributed<service::storage_proxy>& sp) {
future<mutation> system_keyspace::get_group0_history(distributed<replica::database>& db) {
auto s = group0_history();
auto rs = co_await db::system_keyspace::query_mutations(sp, db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY);
auto rs = co_await db::system_keyspace::query_mutations(db, db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY);
assert(rs);
auto& ps = rs->partitions();
for (auto& p: ps) {

View File

@@ -32,7 +32,6 @@ namespace sstables {
namespace service {
class storage_proxy;
class storage_service;
class raft_group_registry;
struct topology;
@@ -280,12 +279,12 @@ public:
/// overloads
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
static query_mutations(distributed<service::storage_proxy>& proxy,
static query_mutations(distributed<replica::database>& db,
const sstring& ks_name,
const sstring& cf_name);
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
static query_mutations(distributed<service::storage_proxy>& proxy,
static query_mutations(distributed<replica::database>& db,
const sstring& ks_name,
const sstring& cf_name,
const dht::partition_range& partition_range,
@@ -293,14 +292,14 @@ public:
// Returns all data from given system table.
// Intended to be used by code which is not performance critical.
static future<lw_shared_ptr<query::result_set>> query(distributed<service::storage_proxy>& proxy,
static future<lw_shared_ptr<query::result_set>> query(distributed<replica::database>& db,
const sstring& ks_name,
const sstring& cf_name);
// Returns a slice of given system table.
// Intended to be used by code which is not performance critical.
static future<lw_shared_ptr<query::result_set>> query(
distributed<service::storage_proxy>& proxy,
distributed<replica::database>& db,
const sstring& ks_name,
const sstring& cf_name,
const dht::decorated_key& key,
@@ -474,7 +473,7 @@ public:
// Obtain the contents of the group 0 history table in mutation form.
// Assumes that the history table exists, i.e. Raft experimental feature is enabled.
static future<mutation> get_group0_history(distributed<service::storage_proxy>&);
static future<mutation> get_group0_history(distributed<replica::database>&);
future<> sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc);
future<utils::UUID> sstables_registry_lookup_entry(sstring location, sstables::generation_type gen);

View File

@@ -48,6 +48,7 @@
#include "service/storage_proxy.hh"
#include "db/operation_type.hh"
#include "db/view/view_update_generator.hh"
#include "multishard_mutation_query.hh"
#include "utils/human_readable.hh"
#include "utils/fb_utilities.hh"
@@ -777,7 +778,7 @@ static future<>
do_parse_schema_tables(distributed<service::storage_proxy>& proxy, const sstring cf_name, std::function<future<> (db::schema_tables::schema_result_value_type&)> func) {
using namespace db::schema_tables;
auto rs = co_await db::system_keyspace::query(proxy, db::schema_tables::NAME, cf_name);
auto rs = co_await db::system_keyspace::query(proxy.local().get_db(), db::schema_tables::NAME, cf_name);
auto names = std::set<sstring>();
for (auto& r : rs->rows()) {
auto keyspace_name = r.template get_nonnull<sstring>("keyspace_name");
@@ -2878,3 +2879,54 @@ const timeout_config infinite_timeout_config = {
// not really infinite, but long enough
1h, 1h, 1h, 1h, 1h, 1h, 1h,
};
namespace replica {
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_mutations(
sharded<database>& db,
schema_ptr s,
const dht::partition_range& pr,
const query::partition_slice& ps,
db::timeout_clock::time_point timeout) {
auto max_size = db.local().get_unlimited_query_max_result_size();
auto max_res_size = query::max_result_size(max_size.soft_limit, max_size.hard_limit, query::result_memory_limiter::maximum_result_size);
auto cmd = query::read_command(s->id(), s->version(), ps, max_res_size, query::tombstone_limit::max);
if (pr.is_singular()) {
unsigned shard = dht::shard_of(*s, pr.start()->value().token());
co_return co_await db.invoke_on(shard, [gs = global_schema_ptr(s), &cmd, &pr, timeout] (replica::database& db) mutable {
return db.query_mutations(gs, cmd, pr, {}, timeout).then([] (std::tuple<reconcilable_result, cache_temperature>&& res) {
return make_foreign(make_lw_shared<reconcilable_result>(std::move(std::get<0>(res))));
});
});
} else {
auto prs = dht::partition_range_vector{pr};
auto&& [res, _] = co_await query_mutations_on_all_shards(db, std::move(s), cmd, prs, {}, timeout);
co_return std::move(res);
}
}
future<foreign_ptr<lw_shared_ptr<query::result>>> query_data(
sharded<database>& db,
schema_ptr s,
const dht::partition_range& pr,
const query::partition_slice& ps,
db::timeout_clock::time_point timeout) {
auto max_size = db.local().get_unlimited_query_max_result_size();
auto max_res_size = query::max_result_size(max_size.soft_limit, max_size.hard_limit, query::result_memory_limiter::maximum_result_size);
auto cmd = query::read_command(s->id(), s->version(), ps, max_res_size, query::tombstone_limit::max);
auto prs = dht::partition_range_vector{pr};
auto opts = query::result_options::only_result();
if (pr.is_singular()) {
unsigned shard = dht::shard_of(*s, pr.start()->value().token());
co_return co_await db.invoke_on(shard, [gs = global_schema_ptr(s), &cmd, opts, &prs, timeout] (replica::database& db) mutable {
return db.query(gs, cmd, opts, prs, {}, timeout).then([] (std::tuple<lw_shared_ptr<query::result>, cache_temperature>&& res) {
return make_foreign(std::move(std::get<0>(res)));
});
});
} else {
auto&& [res, _] = co_await query_data_on_all_shards(db, std::move(s), cmd, prs, opts, {}, timeout);
co_return std::move(res);
}
}
} // namespace replica

40
replica/query.hh Normal file
View File

@@ -0,0 +1,40 @@
/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
/*
* Utilities for executing queries on the local replica.
*
* Allows for bypassing storage proxy entirely when querying local (system) tables.
*/
#include "replica/database.hh"
namespace replica {
/// Reads the specified range and slice of the given table, from the local replica.
///
/// There is no paging or limits applied to the result, make sure the result is
/// sufficiently small.
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_mutations(
sharded<database>& db,
schema_ptr s,
const dht::partition_range& pr,
const query::partition_slice& ps,
db::timeout_clock::time_point timeout);
/// Reads the specified range and slice of the given table, from the local replica.
///
/// A variant of query_mutations() which returns query result, instead of mutations (only live data).
future<foreign_ptr<lw_shared_ptr<query::result>>> query_data(
sharded<database>& db,
schema_ptr s,
const dht::partition_range& pr,
const query::partition_slice& ps,
db::timeout_clock::time_point timeout);
} // namespace replica

View File

@@ -199,9 +199,9 @@ future<tablet_metadata> read_tablet_metadata(cql3::query_processor& qp) {
co_return std::move(tm);
}
future<std::vector<canonical_mutation>> read_tablet_mutations(seastar::sharded<service::storage_proxy>& proxy) {
future<std::vector<canonical_mutation>> read_tablet_mutations(seastar::sharded<replica::database>& db) {
auto s = db::system_keyspace::tablets();
auto rs = co_await db::system_keyspace::query_mutations(proxy, db::system_keyspace::NAME, db::system_keyspace::TABLETS);
auto rs = co_await db::system_keyspace::query_mutations(db, db::system_keyspace::NAME, db::system_keyspace::TABLETS);
std::vector<canonical_mutation> result;
result.reserve(rs->partitions().size());
for (auto& p: rs->partitions()) {

View File

@@ -26,10 +26,6 @@ class query_processor;
}
namespace service {
class storage_proxy;
}
namespace replica {
schema_ptr make_tablets_schema();
@@ -60,6 +56,6 @@ future<> save_tablet_metadata(replica::database&, const locator::tablet_metadata
future<locator::tablet_metadata> read_tablet_metadata(cql3::query_processor&);
/// Reads tablet metadata from system.tablets in the form of mutations.
future<std::vector<canonical_mutation>> read_tablet_mutations(seastar::sharded<service::storage_proxy>&);
future<std::vector<canonical_mutation>> read_tablet_mutations(seastar::sharded<database>&);
} // namespace replica

View File

@@ -140,6 +140,7 @@ void migration_manager::init_messaging_service()
auto features = self._feat.cluster_schema_features();
auto& proxy = self._storage_proxy.container();
auto& db = proxy.local().get_db();
auto cm = co_await db::schema_tables::convert_schema_to_mutations(proxy, features);
if (options->group0_snapshot_transfer) {
// if `group0_snapshot_transfer` is `true`, the sender must also understand canonical mutations
@@ -148,15 +149,15 @@ void migration_manager::init_messaging_service()
on_internal_error(mlogger,
"migration request handler: group0 snapshot transfer requested, but canonical mutations not supported");
}
cm.emplace_back(co_await db::system_keyspace::get_group0_history(proxy));
for (auto&& m : co_await replica::read_tablet_mutations(proxy)) {
cm.emplace_back(co_await db::system_keyspace::get_group0_history(db));
for (auto&& m : co_await replica::read_tablet_mutations(db)) {
cm.emplace_back(std::move(m));
}
}
if (cm_retval_supported) {
co_return rpc::tuple(std::vector<frozen_mutation>{}, std::move(cm));
}
auto fm = boost::copy_range<std::vector<frozen_mutation>>(cm | boost::adaptors::transformed([&db = proxy.local().get_db().local()] (const canonical_mutation& cm) {
auto fm = boost::copy_range<std::vector<frozen_mutation>>(cm | boost::adaptors::transformed([&db = db.local()] (const canonical_mutation& cm) {
return cm.to_mutation(db.find_column_family(cm.column_family_id()).schema());
}));
co_return rpc::tuple(std::move(fm), std::move(cm));

View File

@@ -5003,6 +5003,8 @@ void storage_service::init_messaging_service(sharded<service::storage_proxy>& pr
co_return raft_topology_snapshot{};
}
auto& db = proxy.local().get_db();
std::vector<canonical_mutation> topology_mutations;
std::optional<cdc::generation_id_v2> curr_cdc_gen_id;
{
@@ -5010,7 +5012,7 @@ void storage_service::init_messaging_service(sharded<service::storage_proxy>& pr
// might be useful if multiple nodes are trying to pull concurrently.
auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex();
auto rs = co_await db::system_keyspace::query_mutations(
proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
db, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
topology_mutations.reserve(rs->partitions().size());
boost::range::transform(
@@ -5040,7 +5042,7 @@ void storage_service::init_messaging_service(sharded<service::storage_proxy>& pr
auto key = dht::decorate_key(*s, partition_key::from_singular(*s, curr_cdc_gen_id->id));
auto partition_range = dht::partition_range::make_singular(key);
auto rs = co_await db::system_keyspace::query_mutations(
proxy, db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3, partition_range);
db, db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3, partition_range);
if (rs->partitions().size() != 1) {
on_internal_error(slogger, ::format(
"pull_raft_topology_snapshot: expected a single partition in CDC generation query,"

View File

@@ -124,7 +124,7 @@ static future<> test_basic_operations(app_template& app) {
std::vector<canonical_mutation> muts;
auto time_to_read_muts = duration_in_seconds([&] {
muts = replica::read_tablet_mutations(e.local_qp().proxy().container()).get0();
muts = replica::read_tablet_mutations(e.local_qp().proxy().get_db()).get0();
});
testlog.info("Read mutations in {:.6f} [ms]", time_to_read_muts.count() * 1000);