api, storage_service: Recalculate table digests on relocal_schema api call
Currently, the API call recalculates only per-node schema version. To workaround issues like #4485 we want to recalculate per-table digests. One way to do that is to restart the node, but that's slow and has impact on availability. Use like this: curl -X POST http://127.0.0.1:10000/storage_service/relocal_schema Fixes #15380 Closes #15381
This commit is contained in:
committed by
Botond Dénes
parent
0a5d9532f9
commit
c27d212f4b
@@ -1954,7 +1954,7 @@
|
||||
"operations":[
|
||||
{
|
||||
"method":"POST",
|
||||
"summary":"Reset local schema",
|
||||
"summary":"Forces this node to recalculate versions of schema objects.",
|
||||
"type":"void",
|
||||
"nickname":"reset_local_schema",
|
||||
"produces":[
|
||||
|
||||
@@ -1014,13 +1014,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
return make_ready_future<json::json_return_type>(res);
|
||||
});
|
||||
|
||||
ss::reset_local_schema.set(r, [&ctx, &sys_ks](std::unique_ptr<http::request> req) {
|
||||
ss::reset_local_schema.set(r, [&ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
// FIXME: We should truncate schema tables if more than one node in the cluster.
|
||||
auto& fs = ctx.sp.local().features();
|
||||
apilog.info("reset_local_schema");
|
||||
return db::schema_tables::recalculate_schema_version(sys_ks, ctx.sp, fs).then([] {
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
});
|
||||
co_await ss.local().reload_schema();
|
||||
co_return json_void();
|
||||
});
|
||||
|
||||
ss::set_trace_probability.set(r, [](std::unique_ptr<http::request> req) {
|
||||
|
||||
@@ -2460,6 +2460,12 @@ future<> database::flush_table_on_all_shards(sharded<database>& sharded_db, tabl
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id) {
|
||||
return sharded_db.invoke_on_all([id] (replica::database& db) {
|
||||
return db.find_column_family(id).get_row_cache().invalidate(row_cache::external_updater([] {}));
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::flush_table_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::string_view table_name) {
|
||||
return flush_table_on_all_shards(sharded_db, sharded_db.local().find_uuid(ks_name, table_name));
|
||||
}
|
||||
@@ -2477,6 +2483,13 @@ future<> database::flush_keyspace_on_all_shards(sharded<database>& sharded_db, s
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name) {
|
||||
auto& ks = sharded_db.local().find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [&] (auto& pair) {
|
||||
return drop_cache_for_table_on_all_shards(sharded_db, pair.second->id());
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring table_name, sstring tag, db::snapshot_ctl::snap_views snap_views, bool skip_flush) {
|
||||
if (!skip_flush) {
|
||||
co_await flush_table_on_all_shards(sharded_db, ks_name, table_name);
|
||||
|
||||
@@ -1734,6 +1734,9 @@ public:
|
||||
// flush all tables in a keyspace on all shards.
|
||||
static future<> flush_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);
|
||||
|
||||
static future<> drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id);
|
||||
static future<> drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);
|
||||
|
||||
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring table_name, sstring tag, db::snapshot_ctl::snap_views, bool skip_flush);
|
||||
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_ctl::snap_views, bool skip_flush);
|
||||
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush);
|
||||
|
||||
@@ -5175,6 +5175,16 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::reload_schema() {
|
||||
// Flush memtables and clear cache so that we use the same state we would after node restart
|
||||
// to rule out potential discrepancies which could stem from merging with memtable/cache readers.
|
||||
co_await replica::database::flush_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME);
|
||||
co_await replica::database::drop_cache_for_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME);
|
||||
co_await _migration_manager.invoke_on(0, [] (auto& mm) {
|
||||
return mm.reload_schema();
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::drain() {
|
||||
return run_with_api_lock(sstring("drain"), [] (storage_service& ss) {
|
||||
if (ss._operation_mode == mode::DRAINED) {
|
||||
|
||||
@@ -687,6 +687,9 @@ public:
|
||||
*/
|
||||
future<> drain();
|
||||
|
||||
// Recalculates schema digests on this node from contents of tables on disk.
|
||||
future<> reload_schema();
|
||||
|
||||
future<std::map<gms::inet_address, float>> get_ownership();
|
||||
|
||||
future<std::map<gms::inet_address, float>> effective_ownership(sstring keyspace_name);
|
||||
|
||||
Reference in New Issue
Block a user