diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 13d549484a..8450871f66 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -757,8 +757,9 @@ redact_columns_for_missing_features(mutation m, schema_features features) { future calculate_schema_digest(distributed& proxy, schema_features features, noncopyable_function accept_keyspace) { auto map = [&proxy, features, accept_keyspace = std::move(accept_keyspace)] (sstring table) mutable -> future> { - auto rs = co_await db::system_keyspace::query_mutations(proxy, NAME, table); - auto s = proxy.local().get_db().local().find_schema(NAME, table); + auto& db = proxy.local().get_db(); + auto rs = co_await db::system_keyspace::query_mutations(db, NAME, table); + auto s = db.local().find_schema(NAME, table); std::vector mutations; for (auto&& p : rs->partitions()) { auto mut = p.mut().unfreeze(s); @@ -802,8 +803,9 @@ future calculate_schema_digest(distributed> convert_schema_to_mutations(distributed& proxy, schema_features features) { auto map = [&proxy, features] (sstring table) -> future> { - auto rs = co_await db::system_keyspace::query_mutations(proxy, NAME, table); - auto s = proxy.local().get_db().local().find_schema(NAME, table); + auto& db = proxy.local().get_db(); + auto rs = co_await db::system_keyspace::query_mutations(db, NAME, table); + auto s = db.local().find_schema(NAME, table); std::vector results; for (auto&& p : rs->partitions()) { auto mut = p.mut().unfreeze(s); @@ -878,7 +880,7 @@ read_schema_partition_for_keyspace(distributed& proxy, s auto schema = proxy.local().get_db().local().find_schema(NAME, schema_table_name); auto keyspace_key = dht::decorate_key(*schema, partition_key::from_singular(*schema, keyspace_name)); - auto rs = co_await db::system_keyspace::query(proxy, NAME, schema_table_name, keyspace_key); + auto rs = co_await db::system_keyspace::query(proxy.local().get_db(), NAME, schema_table_name, keyspace_key); co_return schema_result_value_type{keyspace_name, std::move(rs)}; } @@ -990,7 +992,7 @@ future<> recalculate_schema_version(sharded& sys_ks, distri future> static read_table_names_of_keyspace(distributed& proxy, const sstring& keyspace_name, schema_ptr schema_table) { auto pkey = dht::decorate_key(*schema_table, partition_key::from_singular(*schema_table, keyspace_name)); - auto&& rs = co_await db::system_keyspace::query(proxy, schema_table->ks_name(), schema_table->cf_name(), pkey); + auto&& rs = co_await db::system_keyspace::query(proxy.local().get_db(), schema_table->ks_name(), schema_table->cf_name(), pkey); co_return boost::copy_range>(rs->rows() | boost::adaptors::transformed([schema_table] (const query::result_set_row& row) { const sstring name = schema_table->clustering_key_columns().begin()->name_as_text(); return row.get_nonnull(name); @@ -1321,7 +1323,7 @@ future> extract_scylla_specific_keyspace_info(d auto&& row = rs->row(0); auto keyspace_name = row.get_nonnull("keyspace_name"); auto keyspace_key = dht::decorate_key(*scylla_keyspaces(), partition_key::from_singular(*scylla_keyspaces(), keyspace_name)); - scylla_specific_rs = co_await db::system_keyspace::query(proxy, NAME, SCYLLA_KEYSPACES, keyspace_key); + scylla_specific_rs = co_await db::system_keyspace::query(proxy.local().get_db(), NAME, SCYLLA_KEYSPACES, keyspace_key); } co_return scylla_specific_rs; } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index bf443568eb..eb5c2c3e74 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -19,7 +19,6 @@ #include #include "system_keyspace.hh" #include "types/types.hh" -#include "service/storage_proxy.hh" #include "service/client_state.hh" #include "service/query_state.hh" #include "cql3/query_options.hh" @@ -69,6 +68,7 @@ #include "sstables/generation_type.hh" #include "cdc/generation.hh" #include "replica/tablets.hh" +#include "replica/query.hh" using days = std::chrono::duration>; @@ -3007,52 +3007,39 @@ locator::endpoint_dc_rack system_keyspace::local_dc_rack() const { } future>> -system_keyspace::query_mutations(distributed& proxy, const sstring& ks_name, const sstring& cf_name) { - replica::database& db = proxy.local().get_db().local(); - schema_ptr schema = db.find_schema(ks_name, cf_name); - auto slice = partition_slice_builder(*schema).build(); - auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max); - return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), query::full_partition_range, db::no_timeout) - .then([] (rpc::tuple>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); }); +system_keyspace::query_mutations(distributed& db, const sstring& ks_name, const sstring& cf_name) { + schema_ptr schema = db.local().find_schema(ks_name, cf_name); + return replica::query_mutations(db, schema, query::full_partition_range, schema->full_slice(), db::no_timeout); } future>> -system_keyspace::query_mutations(distributed& proxy, const sstring& ks_name, const sstring& cf_name, const dht::partition_range& partition_range, query::clustering_range row_range) { - auto& db = proxy.local().get_db().local(); - auto schema = db.find_schema(ks_name, cf_name); - auto slice = partition_slice_builder(*schema) +system_keyspace::query_mutations(distributed& db, const sstring& ks_name, const sstring& cf_name, const dht::partition_range& partition_range, query::clustering_range row_range) { + auto schema = db.local().find_schema(ks_name, cf_name); + auto slice_ptr = std::make_unique(partition_slice_builder(*schema) .with_range(std::move(row_range)) - .build(); - auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max); - return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), partition_range, db::no_timeout) - .then([] (rpc::tuple>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); }); + .build()); + return replica::query_mutations(db, std::move(schema), partition_range, *slice_ptr, db::no_timeout).finally([slice_ptr = std::move(slice_ptr)] { }); } future> -system_keyspace::query(distributed& proxy, const sstring& ks_name, const sstring& cf_name) { - replica::database& db = proxy.local().get_db().local(); - schema_ptr schema = db.find_schema(ks_name, cf_name); - auto slice = partition_slice_builder(*schema).build(); - auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max); - return proxy.local().query(schema, cmd, {query::full_partition_range}, db::consistency_level::ONE, - {db::no_timeout, empty_service_permit(), service::client_state::for_internal_calls(), nullptr}).then([schema, cmd] (auto&& qr) { - return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *qr.query_result)); +system_keyspace::query(distributed& db, const sstring& ks_name, const sstring& cf_name) { + schema_ptr schema = db.local().find_schema(ks_name, cf_name); + return replica::query_data(db, schema, query::full_partition_range, schema->full_slice(), db::no_timeout).then([schema] (auto&& qr) { + return make_lw_shared(query::result_set::from_raw_result(schema, schema->full_slice(), *qr)); }); } future> -system_keyspace::query(distributed& proxy, const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key, query::clustering_range row_range) +system_keyspace::query(distributed& db, const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key, query::clustering_range row_range) { - auto&& db = proxy.local().get_db().local(); - auto schema = db.find_schema(ks_name, cf_name); - auto slice = partition_slice_builder(*schema) + auto schema = db.local().find_schema(ks_name, cf_name); + auto pr_ptr = std::make_unique(dht::partition_range::make_singular(key)); + auto slice_ptr = std::make_unique(partition_slice_builder(*schema) .with_range(std::move(row_range)) - .build(); - auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max); - - return proxy.local().query(schema, cmd, {dht::partition_range::make_singular(key)}, db::consistency_level::ONE, - {db::no_timeout, empty_service_permit(), service::client_state::for_internal_calls(), nullptr}).then([schema, cmd] (auto&& qr) { - return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *qr.query_result)); + .build()); + return replica::query_data(db, schema, *pr_ptr, *slice_ptr, db::no_timeout).then( + [schema, pr_ptr = std::move(pr_ptr), slice_ptr = std::move(slice_ptr)] (auto&& qr) { + return make_lw_shared(query::result_set::from_raw_result(schema, schema->full_slice(), *qr)); }); } @@ -3480,9 +3467,9 @@ mutation system_keyspace::make_group0_history_state_id_mutation( return m; } -future system_keyspace::get_group0_history(distributed& sp) { +future system_keyspace::get_group0_history(distributed& db) { auto s = group0_history(); - auto rs = co_await db::system_keyspace::query_mutations(sp, db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY); + auto rs = co_await db::system_keyspace::query_mutations(db, db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY); assert(rs); auto& ps = rs->partitions(); for (auto& p: ps) { diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 14ad99e9af..fd30d2f569 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -32,7 +32,6 @@ namespace sstables { namespace service { -class storage_proxy; class storage_service; class raft_group_registry; struct topology; @@ -280,12 +279,12 @@ public: /// overloads future>> - static query_mutations(distributed& proxy, + static query_mutations(distributed& db, const sstring& ks_name, const sstring& cf_name); future>> - static query_mutations(distributed& proxy, + static query_mutations(distributed& db, const sstring& ks_name, const sstring& cf_name, const dht::partition_range& partition_range, @@ -293,14 +292,14 @@ public: // Returns all data from given system table. // Intended to be used by code which is not performance critical. - static future> query(distributed& proxy, + static future> query(distributed& db, const sstring& ks_name, const sstring& cf_name); // Returns a slice of given system table. // Intended to be used by code which is not performance critical. static future> query( - distributed& proxy, + distributed& db, const sstring& ks_name, const sstring& cf_name, const dht::decorated_key& key, @@ -474,7 +473,7 @@ public: // Obtain the contents of the group 0 history table in mutation form. // Assumes that the history table exists, i.e. Raft experimental feature is enabled. - static future get_group0_history(distributed&); + static future get_group0_history(distributed&); future<> sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc); future sstables_registry_lookup_entry(sstring location, sstables::generation_type gen); diff --git a/replica/database.cc b/replica/database.cc index 802a8b57f9..09aa2440b5 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -48,6 +48,7 @@ #include "service/storage_proxy.hh" #include "db/operation_type.hh" #include "db/view/view_update_generator.hh" +#include "multishard_mutation_query.hh" #include "utils/human_readable.hh" #include "utils/fb_utilities.hh" @@ -777,7 +778,7 @@ static future<> do_parse_schema_tables(distributed& proxy, const sstring cf_name, std::function (db::schema_tables::schema_result_value_type&)> func) { using namespace db::schema_tables; - auto rs = co_await db::system_keyspace::query(proxy, db::schema_tables::NAME, cf_name); + auto rs = co_await db::system_keyspace::query(proxy.local().get_db(), db::schema_tables::NAME, cf_name); auto names = std::set(); for (auto& r : rs->rows()) { auto keyspace_name = r.template get_nonnull("keyspace_name"); @@ -2878,3 +2879,54 @@ const timeout_config infinite_timeout_config = { // not really infinite, but long enough 1h, 1h, 1h, 1h, 1h, 1h, 1h, }; + +namespace replica { + +future>> query_mutations( + sharded& db, + schema_ptr s, + const dht::partition_range& pr, + const query::partition_slice& ps, + db::timeout_clock::time_point timeout) { + auto max_size = db.local().get_unlimited_query_max_result_size(); + auto max_res_size = query::max_result_size(max_size.soft_limit, max_size.hard_limit, query::result_memory_limiter::maximum_result_size); + auto cmd = query::read_command(s->id(), s->version(), ps, max_res_size, query::tombstone_limit::max); + if (pr.is_singular()) { + unsigned shard = dht::shard_of(*s, pr.start()->value().token()); + co_return co_await db.invoke_on(shard, [gs = global_schema_ptr(s), &cmd, &pr, timeout] (replica::database& db) mutable { + return db.query_mutations(gs, cmd, pr, {}, timeout).then([] (std::tuple&& res) { + return make_foreign(make_lw_shared(std::move(std::get<0>(res)))); + }); + }); + } else { + auto prs = dht::partition_range_vector{pr}; + auto&& [res, _] = co_await query_mutations_on_all_shards(db, std::move(s), cmd, prs, {}, timeout); + co_return std::move(res); + } +} + +future>> query_data( + sharded& db, + schema_ptr s, + const dht::partition_range& pr, + const query::partition_slice& ps, + db::timeout_clock::time_point timeout) { + auto max_size = db.local().get_unlimited_query_max_result_size(); + auto max_res_size = query::max_result_size(max_size.soft_limit, max_size.hard_limit, query::result_memory_limiter::maximum_result_size); + auto cmd = query::read_command(s->id(), s->version(), ps, max_res_size, query::tombstone_limit::max); + auto prs = dht::partition_range_vector{pr}; + auto opts = query::result_options::only_result(); + if (pr.is_singular()) { + unsigned shard = dht::shard_of(*s, pr.start()->value().token()); + co_return co_await db.invoke_on(shard, [gs = global_schema_ptr(s), &cmd, opts, &prs, timeout] (replica::database& db) mutable { + return db.query(gs, cmd, opts, prs, {}, timeout).then([] (std::tuple, cache_temperature>&& res) { + return make_foreign(std::move(std::get<0>(res))); + }); + }); + } else { + auto&& [res, _] = co_await query_data_on_all_shards(db, std::move(s), cmd, prs, opts, {}, timeout); + co_return std::move(res); + } +} + +} // namespace replica diff --git a/replica/query.hh b/replica/query.hh new file mode 100644 index 0000000000..c203ef477e --- /dev/null +++ b/replica/query.hh @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2018-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +/* + * Utilities for executing queries on the local replica. + * + * Allows for bypassing storage proxy entirely when querying local (system) tables. + */ + +#include "replica/database.hh" + +namespace replica { + +/// Reads the specified range and slice of the given table, from the local replica. +/// +/// There is no paging or limits applied to the result, make sure the result is +/// sufficiently small. +future>> query_mutations( + sharded& db, + schema_ptr s, + const dht::partition_range& pr, + const query::partition_slice& ps, + db::timeout_clock::time_point timeout); + +/// Reads the specified range and slice of the given table, from the local replica. +/// +/// A variant of query_mutations() which returns query result, instead of mutations (only live data). +future>> query_data( + sharded& db, + schema_ptr s, + const dht::partition_range& pr, + const query::partition_slice& ps, + db::timeout_clock::time_point timeout); + +} // namespace replica diff --git a/replica/tablets.cc b/replica/tablets.cc index ec5655a414..15754bbbce 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -199,9 +199,9 @@ future read_tablet_metadata(cql3::query_processor& qp) { co_return std::move(tm); } -future> read_tablet_mutations(seastar::sharded& proxy) { +future> read_tablet_mutations(seastar::sharded& db) { auto s = db::system_keyspace::tablets(); - auto rs = co_await db::system_keyspace::query_mutations(proxy, db::system_keyspace::NAME, db::system_keyspace::TABLETS); + auto rs = co_await db::system_keyspace::query_mutations(db, db::system_keyspace::NAME, db::system_keyspace::TABLETS); std::vector result; result.reserve(rs->partitions().size()); for (auto& p: rs->partitions()) { diff --git a/replica/tablets.hh b/replica/tablets.hh index 70026688be..de00c332ef 100644 --- a/replica/tablets.hh +++ b/replica/tablets.hh @@ -26,10 +26,6 @@ class query_processor; } -namespace service { -class storage_proxy; -} - namespace replica { schema_ptr make_tablets_schema(); @@ -60,6 +56,6 @@ future<> save_tablet_metadata(replica::database&, const locator::tablet_metadata future read_tablet_metadata(cql3::query_processor&); /// Reads tablet metadata from system.tablets in the form of mutations. -future> read_tablet_mutations(seastar::sharded&); +future> read_tablet_mutations(seastar::sharded&); } // namespace replica diff --git a/service/migration_manager.cc b/service/migration_manager.cc index ddcde78a0a..c18b516d45 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -140,6 +140,7 @@ void migration_manager::init_messaging_service() auto features = self._feat.cluster_schema_features(); auto& proxy = self._storage_proxy.container(); + auto& db = proxy.local().get_db(); auto cm = co_await db::schema_tables::convert_schema_to_mutations(proxy, features); if (options->group0_snapshot_transfer) { // if `group0_snapshot_transfer` is `true`, the sender must also understand canonical mutations @@ -148,15 +149,15 @@ void migration_manager::init_messaging_service() on_internal_error(mlogger, "migration request handler: group0 snapshot transfer requested, but canonical mutations not supported"); } - cm.emplace_back(co_await db::system_keyspace::get_group0_history(proxy)); - for (auto&& m : co_await replica::read_tablet_mutations(proxy)) { + cm.emplace_back(co_await db::system_keyspace::get_group0_history(db)); + for (auto&& m : co_await replica::read_tablet_mutations(db)) { cm.emplace_back(std::move(m)); } } if (cm_retval_supported) { co_return rpc::tuple(std::vector{}, std::move(cm)); } - auto fm = boost::copy_range>(cm | boost::adaptors::transformed([&db = proxy.local().get_db().local()] (const canonical_mutation& cm) { + auto fm = boost::copy_range>(cm | boost::adaptors::transformed([&db = db.local()] (const canonical_mutation& cm) { return cm.to_mutation(db.find_column_family(cm.column_family_id()).schema()); })); co_return rpc::tuple(std::move(fm), std::move(cm)); diff --git a/service/storage_service.cc b/service/storage_service.cc index 9fddebda12..63a8b61f95 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5003,6 +5003,8 @@ void storage_service::init_messaging_service(sharded& pr co_return raft_topology_snapshot{}; } + auto& db = proxy.local().get_db(); + std::vector topology_mutations; std::optional curr_cdc_gen_id; { @@ -5010,7 +5012,7 @@ void storage_service::init_messaging_service(sharded& pr // might be useful if multiple nodes are trying to pull concurrently. auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex(); auto rs = co_await db::system_keyspace::query_mutations( - proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); + db, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); topology_mutations.reserve(rs->partitions().size()); boost::range::transform( @@ -5040,7 +5042,7 @@ void storage_service::init_messaging_service(sharded& pr auto key = dht::decorate_key(*s, partition_key::from_singular(*s, curr_cdc_gen_id->id)); auto partition_range = dht::partition_range::make_singular(key); auto rs = co_await db::system_keyspace::query_mutations( - proxy, db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3, partition_range); + db, db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3, partition_range); if (rs->partitions().size() != 1) { on_internal_error(slogger, ::format( "pull_raft_topology_snapshot: expected a single partition in CDC generation query," diff --git a/test/perf/perf_tablets.cc b/test/perf/perf_tablets.cc index 7da83e2da5..19bdd2d5b1 100644 --- a/test/perf/perf_tablets.cc +++ b/test/perf/perf_tablets.cc @@ -124,7 +124,7 @@ static future<> test_basic_operations(app_template& app) { std::vector muts; auto time_to_read_muts = duration_in_seconds([&] { - muts = replica::read_tablet_mutations(e.local_qp().proxy().container()).get0(); + muts = replica::read_tablet_mutations(e.local_qp().proxy().get_db()).get0(); }); testlog.info("Read mutations in {:.6f} [ms]", time_to_read_muts.count() * 1000);