Merge 'Avoid using qctx in schema_tables' column-mapping queries' from Pavel Emelyanov
There are three methods in system_keyspace namespace that run queries over `system.scylla_table_schema_history` table. For that they use qctx which's not nice. Fortunately, all the callers already have the system_keyspace& local variable or argument they can pass to those methods. Since the accessed table belongs to system keyspace, the latter declares the querying methods as "friends" to let them get private `query_processor& _qp` member Closes #14876 * github.com:scylladb/scylladb: schema_tables: Extract query_processor from system_keyspace for querying schema_tables: Add system_keyspace& argument to ..._column_mapping() calls migration_manager: Add system_keyspace argument to get_schema_mapping()
This commit is contained in:
@@ -1576,9 +1576,9 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
|
||||
store_column_mapping(proxy, altered.old_schema.get(), true),
|
||||
store_column_mapping(proxy, altered.new_schema.get(), false));
|
||||
});
|
||||
co_await max_concurrent_for_each(tables_diff.dropped, max_concurrent, [] (schema_diff::dropped_schema& dropped) -> future<> {
|
||||
co_await max_concurrent_for_each(tables_diff.dropped, max_concurrent, [&sys_ks] (schema_diff::dropped_schema& dropped) -> future<> {
|
||||
schema_ptr s = dropped.schema.get();
|
||||
co_await drop_column_mapping(s->id(), s->version());
|
||||
co_await drop_column_mapping(sys_ks.local(), s->id(), s->version());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3728,8 +3728,8 @@ future<schema_mutations> read_table_mutations(distributed<service::storage_proxy
|
||||
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
|
||||
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
|
||||
|
||||
future<column_mapping> get_column_mapping(::table_id table_id, table_schema_version version) {
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await qctx->qp().execute_internal(
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) {
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
{table_id.uuid(), version.uuid()},
|
||||
@@ -3769,8 +3769,8 @@ future<column_mapping> get_column_mapping(::table_id table_id, table_schema_vers
|
||||
co_return std::move(cm);
|
||||
}
|
||||
|
||||
future<bool> column_mapping_exists(table_id table_id, table_schema_version version) {
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await qctx->qp().execute_internal(
|
||||
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
{table_id.uuid(), version.uuid()},
|
||||
@@ -3779,11 +3779,11 @@ future<bool> column_mapping_exists(table_id table_id, table_schema_version versi
|
||||
co_return !results->empty();
|
||||
}
|
||||
|
||||
future<> drop_column_mapping(table_id table_id, table_schema_version version) {
|
||||
future<> drop_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
|
||||
const static sstring DEL_COLUMN_MAPPING_QUERY =
|
||||
format("DELETE FROM system.{} WHERE cf_id = ? and schema_version = ?",
|
||||
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
|
||||
co_await qctx->qp().execute_internal(
|
||||
co_await sys_ks._qp.execute_internal(
|
||||
DEL_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
{table_id.uuid(), version.uuid()},
|
||||
|
||||
@@ -299,11 +299,11 @@ std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const ss
|
||||
/// overwriting an existing column mapping to garbage collect obsolete entries.
|
||||
future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl);
|
||||
/// Query column mapping for a given version of the table locally.
|
||||
future<column_mapping> get_column_mapping(table_id table_id, table_schema_version version);
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
/// Check that column mapping exists for a given version of the table
|
||||
future<bool> column_mapping_exists(table_id table_id, table_schema_version version);
|
||||
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
/// Delete matching column mapping entries from the `system.scylla_table_schema_history` table
|
||||
future<> drop_column_mapping(table_id table_id, table_schema_version version);
|
||||
future<> drop_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
|
||||
} // namespace schema_tables
|
||||
} // namespace db
|
||||
|
||||
@@ -78,6 +78,13 @@ namespace db {
|
||||
|
||||
sstring system_keyspace_name();
|
||||
|
||||
class system_keyspace;
|
||||
namespace schema_tables {
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version);
|
||||
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
future<> drop_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
}
|
||||
|
||||
class config;
|
||||
struct local_cache;
|
||||
|
||||
@@ -509,6 +516,10 @@ public:
|
||||
future<::shared_ptr<cql3::untyped_result_set>> execute_cql(sstring req, Args&&... args) {
|
||||
return execute_cql(req, { data_value(std::forward<Args>(args))... });
|
||||
}
|
||||
|
||||
friend future<column_mapping> db::schema_tables::get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version);
|
||||
friend future<bool> db::schema_tables::column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
friend future<> db::schema_tables::drop_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
}; // class system_keyspace
|
||||
|
||||
} // namespace db
|
||||
|
||||
@@ -835,7 +835,7 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
|
||||
auto s = t.second;
|
||||
// Recreate missing column mapping entries in case
|
||||
// we failed to persist them for some reason after a schema change
|
||||
bool cm_exists = co_await db::schema_tables::column_mapping_exists(s->id(), s->version());
|
||||
bool cm_exists = co_await db::schema_tables::column_mapping_exists(sys_ks.local(), s->id(), s->version());
|
||||
if (cm_exists) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
@@ -1197,12 +1197,12 @@ future<> migration_manager::sync_schema(const replica::database& db, const std::
|
||||
});
|
||||
}
|
||||
|
||||
future<column_mapping> get_column_mapping(table_id table_id, table_schema_version v) {
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version v) {
|
||||
schema_ptr s = local_schema_registry().get_or_null(v);
|
||||
if (s) {
|
||||
return make_ready_future<column_mapping>(s->get_column_mapping());
|
||||
}
|
||||
return db::schema_tables::get_column_mapping(table_id, v);
|
||||
return db::schema_tables::get_column_mapping(sys_ks, table_id, v);
|
||||
}
|
||||
|
||||
future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
|
||||
@@ -244,6 +244,6 @@ public:
|
||||
void set_concurrent_ddl_retries(size_t);
|
||||
};
|
||||
|
||||
future<column_mapping> get_column_mapping(table_id, table_schema_version v);
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id, table_schema_version v);
|
||||
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, db::system_keys
|
||||
prv, tr_state, timeout);
|
||||
});
|
||||
});
|
||||
return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, schema] (auto t) mutable {
|
||||
return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, schema, &sys_ks] (auto t) mutable {
|
||||
if (utils::get_local_injector().enter("paxos_error_after_save_promise")) {
|
||||
return make_exception_future<prepare_response>(utils::injected_error("injected_error_after_save_promise"));
|
||||
}
|
||||
@@ -105,7 +105,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, db::system_keys
|
||||
auto ex = f2.get_exception();
|
||||
logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex));
|
||||
}
|
||||
auto upgrade_if_needed = [schema = std::move(schema)] (std::optional<proposal> p) {
|
||||
auto upgrade_if_needed = [schema = std::move(schema), &sys_ks] (std::optional<proposal> p) {
|
||||
if (!p || p->update.schema_version() == schema->version()) {
|
||||
return make_ready_future<std::optional<proposal>>(std::move(p));
|
||||
}
|
||||
@@ -117,7 +117,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, db::system_keys
|
||||
// for that version and upgrade the mutation with it.
|
||||
logger.debug("Stored mutation references outdated schema version. "
|
||||
"Trying to upgrade the accepted proposal mutation to the most recent schema version.");
|
||||
return service::get_column_mapping(p->update.column_family_id(), p->update.schema_version()).then([schema, p = std::move(p)] (const column_mapping& cm) {
|
||||
return service::get_column_mapping(sys_ks, p->update.column_family_id(), p->update.schema_version()).then([schema, p = std::move(p)] (const column_mapping& cm) {
|
||||
return make_ready_future<std::optional<proposal>>(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm))));
|
||||
});
|
||||
};
|
||||
|
||||
@@ -30,14 +30,14 @@ SEASTAR_TEST_CASE(test_column_mapping_persistence) {
|
||||
|
||||
// Check that stored column mapping is correctly serialized and deserialized
|
||||
column_mapping cm;
|
||||
BOOST_REQUIRE_NO_THROW(cm = db::schema_tables::get_column_mapping(table_id, v1).get0());
|
||||
BOOST_REQUIRE_NO_THROW(cm = db::schema_tables::get_column_mapping(e.get_system_keyspace().local(), table_id, v1).get0());
|
||||
BOOST_REQUIRE_EQUAL(orig_cm, cm);
|
||||
|
||||
// Alter the test table and check that new column mapping is also inserted for the new schema version
|
||||
cquery_nofail(e, "alter table test ADD dummy int");
|
||||
auto altered_schema = e.local_db().find_schema("ks", "test");
|
||||
column_mapping altered_cm;
|
||||
BOOST_REQUIRE_NO_THROW(altered_cm = db::schema_tables::get_column_mapping(table_id, altered_schema->version()).get0());
|
||||
BOOST_REQUIRE_NO_THROW(altered_cm = db::schema_tables::get_column_mapping(e.get_system_keyspace().local(), table_id, altered_schema->version()).get0());
|
||||
BOOST_REQUIRE_EQUAL(altered_schema->get_column_mapping(), altered_cm);
|
||||
});
|
||||
}
|
||||
@@ -82,4 +82,4 @@ SEASTAR_TEST_CASE(test_column_mapping_ttl_check) {
|
||||
int32_t ttl_val = value_cast<int32_t>(int32_type->deserialize(*row[0]));
|
||||
BOOST_REQUIRE(ttl_val > 0);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user