Merge 'Allow "global" snapshot using topology coordinator + add tablet metadata to manifest' from Calle Wilund

Refs: SCYLLADB-193

Adds a "snapshot_table" topology operation and associated data structure/table columns to support dispatching a snapshot operation as a topo coordinator op.

Logic is similar, and thus broken out and semi-shared with, truncation.

Also adds optional tablet metadata to manifest, listing all tablets present in a given snapshot, as well as
tablet sstable ownership, repair status, and token ranges.

As per description in SCYLLADB-193, the alternative snapshot mechanism is in
a separate namespace under 'tablets', which while dubious is the desired destination.

The API is accessed via `nodetool cluster snapshot`, which more or less mirrors `nodetool snapshot`, but using topo op.

TTL is added to message propagation as a separate patch here, since it is not (yet) used from API (or nodetool).
Requires a syntax for both API and command line.

Closes scylladb/scylladb#28525

* github.com:scylladb/scylladb:
  topology::snapshot: Add expiry (ttl) to RPC/topo op
  test_snapshot_with_tablets: Extend test to check manifest content
  table::manifest: Add tablet info to manifest.json
  test::test_snapshot_with_tablets: Add small test for topo coordinated snapshot
  scylla-nodetool: Add "cluster snapshot" command
  api::storage_service: Add tablets/snapshots command for cluster level snapshot
  db::snapshot-ctl: Add method to do snapshot using topo coordinator
  storage_proxy: Add snapshot_keyspace method
  topology_coordinator: Add handler for snapshot_tables
  storage_proxy: Add handler for SNAPSHOT_WITH_TABLETS
  messaging_service: Add SNAPSHOT_WITH_TABLETS verb
  feature_service: Add SNAPSHOT_AS_TOPOLOGY_OPERATION feature
  topology_mutation: Add setter for snapshot part of row
  system_keyspace::topology_requests_entry: Add snapshot info to table
  topology_state_machine: Add snapshot_tables operation
  topology_coordinator: Break out logic from handle_truncate_table
  storage_proxy: Break out logic from request_truncate_with_tablets
  test/object_store: Remove create_ks_and_cf() helper
  test/object_store: Replace create_ks_and_cf() usage with standard methods
  test/object_store: Shift indentation right for test cases
This commit is contained in:
Botond Dénes
2026-02-25 10:17:53 +02:00
25 changed files with 676 additions and 65 deletions

View File

@@ -3085,6 +3085,48 @@
}
]
},
{
"path":"/storage_service/tablets/snapshots",
"operations":[
{
"method":"POST",
"summary":"Takes the snapshot for the given keyspaces/tables. A snapshot name must be specified.",
"type":"void",
"nickname":"take_cluster_snapshot",
"produces":[
"application/json"
],
"parameters":[
{
"name":"tag",
"description":"the tag given to the snapshot",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"keyspace",
"description":"Keyspace(s) to snapshot. Multiple keyspaces can be provided using a comma-separated list. If omitted, snapshot all keyspaces.",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"table",
"description":"Table(s) to snapshot. Multiple tables (in a single keyspace) can be provided using a comma-separated list. If omitted, snapshot all tables in the given keyspace(s).",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/quiesce_topology",
"operations":[

View File

@@ -2025,6 +2025,8 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("cf"), ",");
auto sfopt = req->get_query_param("sf");
auto tcopt = req->get_query_param("tc");
db::snapshot_options opts = {
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
};
@@ -2049,6 +2051,27 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
}
});
ss::take_cluster_snapshot.set(r, [&snap_ctl](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
apilog.info("take_cluster_snapshot: {}", req->get_query_params());
auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("table"), ",");
// Note: not published/active. Retain as internal option, but...
auto sfopt = req->get_query_param("skip_flush");
db::snapshot_options opts = {
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
};
std::vector<sstring> keynames = split(req->get_query_param("keyspace"), ",");
try {
co_await snap_ctl.local().take_cluster_column_family_snapshot(keynames, column_families, tag, opts);
co_return json_void();
} catch (...) {
apilog.error("take_cluster_snapshot failed: {}", std::current_exception());
throw;
}
});
ss::del_snapshot.set(r, [&snap_ctl](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
apilog.info("del_snapshot: {}", req->get_query_params());
auto tag = req->get_query_param("tag");

View File

@@ -21,14 +21,16 @@
#include "replica/database.hh"
#include "replica/global_table_ptr.hh"
#include "sstables/sstables_manager.hh"
#include "service/storage_proxy.hh"
logging::logger snap_log("snapshots");
namespace db {
snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg)
snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, sharded<service::storage_proxy>& sp, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg)
: _config(std::move(cfg))
, _db(db)
, _sp(sp)
, _ops("snapshot_ctl")
, _task_manager_module(make_shared<snapshot::task_manager_module>(tm))
, _storage_manager(sstm)
@@ -104,6 +106,45 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
});
}
future<> snapshot_ctl::take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
if (tag.empty()) {
throw std::invalid_argument("You must supply a snapshot name.");
}
if (ks_names.size() != 1 && !tables.empty()) {
throw std::invalid_argument("Cannot name tables when doing multiple keyspaces snapshot");
}
if (ks_names.empty()) {
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(ks_names));
}
return run_snapshot_modify_operation([this, ks_names = std::move(ks_names), tables = std::move(tables), tag = std::move(tag), opts] () mutable {
return do_take_cluster_column_family_snapshot(std::move(ks_names), std::move(tables), std::move(tag), opts);
});
}
future<> snapshot_ctl::do_take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
if (tables.empty()) {
co_await coroutine::parallel_for_each(ks_names, [tag, this] (const auto& ks_name) {
return check_snapshot_not_exist(ks_name, tag);
});
co_await _sp.local().snapshot_keyspace(
ks_names | std::views::transform([&](auto& ks) { return std::make_pair(ks, sstring{}); })
| std::ranges::to<std::unordered_multimap>(),
tag, opts
);
co_return;
};
auto ks = ks_names[0];
co_await check_snapshot_not_exist(ks, tag, tables);
co_await _sp.local().snapshot_keyspace(
tables | std::views::transform([&](auto& cf) { return std::make_pair(ks, cf); })
| std::ranges::to<std::unordered_multimap>(),
tag, opts
);
}
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
co_await check_snapshot_not_exist(ks_name, tag, tables);
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
@@ -185,4 +226,4 @@ future<int64_t> snapshot_ctl::true_snapshots_size(sstring ks, sstring cf) {
}));
}
}
}

View File

@@ -24,6 +24,7 @@
using namespace seastar;
namespace sstables { class storage_manager; }
namespace service { class storage_proxy; }
namespace db {
@@ -63,7 +64,7 @@ public:
using db_snapshot_details = std::vector<table_snapshot_details_ext>;
snapshot_ctl(sharded<replica::database>& db, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg);
snapshot_ctl(sharded<replica::database>& db, sharded<service::storage_proxy>&, tasks::task_manager& tm, sstables::storage_manager& sstm, config cfg);
future<> stop();
@@ -95,6 +96,17 @@ public:
*/
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
/**
* Takes the snapshot of multiple tables or a whole keyspace, or all keyspaces,
* using global, clusterwide topology coordinated op.
* A snapshot name must be specified.
*
* @param ks_names the keyspaces to snapshot
* @param tables optional - a vector of tables names to snapshot
* @param tag the tag given to the snapshot; may not be null or empty
*/
future<> take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
/**
* Remove the snapshot with the given name from the given keyspaces.
* If no tag is specified we will remove all snapshots.
@@ -111,6 +123,7 @@ public:
private:
config _config;
sharded<replica::database>& _db;
sharded<service::storage_proxy>& _sp;
seastar::rwlock _lock;
seastar::named_gate _ops;
shared_ptr<snapshot::task_manager_module> _task_manager_module;
@@ -133,6 +146,7 @@ private:
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
future<> do_take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
};
}
}

View File

@@ -335,6 +335,10 @@ schema_ptr system_keyspace::topology_requests() {
.with_column("truncate_table_id", uuid_type)
.with_column("new_keyspace_rf_change_ks_name", utf8_type)
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false))
.with_column("snapshot_table_ids", set_type_impl::get_instance(uuid_type, false))
.with_column("snapshot_tag", utf8_type)
.with_column("snapshot_expiry", timestamp_type)
.with_column("snapshot_skip_flush", boolean_type)
.set_comment("Topology request tracking")
.with_hash_version()
.build();
@@ -3581,6 +3585,18 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
entry.new_keyspace_rf_change_ks_name = row.get_as<sstring>("new_keyspace_rf_change_ks_name");
entry.new_keyspace_rf_change_data = row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
}
if (row.has("snapshot_table_ids")) {
entry.snapshot_tag = row.get_as<sstring>("snapshot_tag");
entry.snapshot_skip_flush = row.get_as<bool>("snapshot_skip_flush");
entry.snapshot_table_ids = row.get_set<utils::UUID>("snapshot_table_ids")
| std::views::transform([](auto& uuid) { return table_id(uuid); })
| std::ranges::to<std::unordered_set>()
;
;
if (row.has("snapshot_expiry")) {
entry.snapshot_expiry = row.get_as<db_clock::time_point>("snapshot_expiry");
}
}
return entry;
}

View File

@@ -417,6 +417,10 @@ public:
std::optional<sstring> new_keyspace_rf_change_ks_name;
// The KS options to be used when executing the scheduled ALTER KS statement
std::optional<std::unordered_map<sstring, sstring>> new_keyspace_rf_change_data;
std::optional<std::unordered_set<table_id>> snapshot_table_ids;
std::optional<sstring> snapshot_tag;
std::optional<db_clock::time_point> snapshot_expiry;
bool snapshot_skip_flush;
};
using topology_requests_entries = std::unordered_map<utils::UUID, system_keyspace::topology_requests_entry>;

View File

@@ -111,6 +111,7 @@ public:
gms::feature large_collection_detection { *this, "LARGE_COLLECTION_DETECTION"sv };
gms::feature range_tombstone_and_dead_rows_detection { *this, "RANGE_TOMBSTONE_AND_DEAD_ROWS_DETECTION"sv };
gms::feature truncate_as_topology_operation { *this, "TRUNCATE_AS_TOPOLOGY_OPERATION"sv };
gms::feature snapshot_as_topology_operation { *this, "SNAPSHOT_AS_TOPOLOGY_OPERATION"sv };
gms::feature secondary_indexes_on_static_columns { *this, "SECONDARY_INDEXES_ON_STATIC_COLUMNS"sv };
gms::feature tablets { *this, "TABLETS"sv };
gms::feature table_digest_insensitive_to_expiry { *this, "TABLE_DIGEST_INSENSITIVE_TO_EXPIRY"sv };

View File

@@ -35,6 +35,7 @@ verb [[with_client_info, with_timeout]] read_mutation_data (query::read_command
verb [[with_client_info, with_timeout]] read_digest (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]) -> query::result_digest, api::timestamp_type [[version 1.2.0]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]], std::optional<full_position> [[version 5.2.0]];
verb [[with_timeout]] truncate (sstring, sstring);
verb [[]] truncate_with_tablets (sstring ks_name, sstring cf_name, service::frozen_topology_guard frozen_guard);
verb [[]] snapshot_with_tablets (utils::chunked_vector<table_id> table_ids, sstring tag, gc_clock::time_point created_at, bool, std::optional<gc_clock::time_point> expiry, service::frozen_topology_guard frozen_guard);
verb [[with_client_info, with_timeout]] paxos_prepare (query::read_command cmd [[ref]], partition_key key [[ref]], utils::UUID ballot, bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info> trace_info [[ref]], service::fencing_token fence [[version 2025.4]]) -> service::paxos::prepare_response [[unique_ptr]];
verb [[with_client_info, with_timeout]] paxos_accept (service::paxos::proposal proposal [[ref]], std::optional<tracing::trace_info> trace_info [[ref]], service::fencing_token fence [[version 2025.4]]) -> bool;
verb [[with_client_info, with_timeout, one_way]] paxos_learn (service::paxos::proposal decision [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[ref]], host_id_vector_replica_set forward_id [[ref, version 6.3.0]], locator::host_id reply_to_id [[version 6.3.0]], service::fencing_token fence [[version 2025.4]]);

View File

@@ -2111,7 +2111,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
db::snapshot_ctl::config snap_cfg = {
.backup_sched_group = dbcfg.streaming_scheduling_group,
};
snapshot_ctl.start(std::ref(db), std::ref(task_manager), std::ref(sstm), snap_cfg).get();
snapshot_ctl.start(std::ref(db), std::ref(proxy), std::ref(task_manager), std::ref(sstm), snap_cfg).get();
auto stop_snapshot_ctl = defer_verbose_shutdown("snapshots", [&snapshot_ctl] {
snapshot_ctl.stop().get();
});

View File

@@ -728,6 +728,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::TABLE_LOAD_STATS_V1:
case messaging_verb::TABLE_LOAD_STATS:
case messaging_verb::WORK_ON_VIEW_BUILDING_TASKS:
case messaging_verb::SNAPSHOT_WITH_TABLETS:
return 1;
case messaging_verb::CLIENT_ID:
case messaging_verb::MUTATION:

View File

@@ -210,7 +210,8 @@ enum class messaging_verb : int32_t {
REPAIR_UPDATE_REPAIRED_AT_FOR_MERGE = 81,
WORK_ON_VIEW_BUILDING_TASKS = 82,
NOTIFY_BANNED = 83,
LAST = 84,
SNAPSHOT_WITH_TABLETS = 84,
LAST = 85,
};
} // namespace netw

View File

@@ -3304,6 +3304,13 @@ db::replay_position table::highest_flushed_replay_position() const {
return _highest_flushed_rp;
}
struct snapshot_tablet_info {
size_t id;
dht::token first_token, last_token;
db_clock::time_point repair_time;
int64_t repaired_at;
};
struct manifest_json : public json::json_base {
struct info : public json::json_base {
json::json_element<sstring> version;
@@ -3436,6 +3443,7 @@ struct manifest_json : public json::json_base {
json::json_element<uint64_t> index_size;
json::json_element<int64_t> first_token;
json::json_element<int64_t> last_token;
json::json_element<uint64_t> tablet_id;
sstable_info() {
register_params();
@@ -3448,6 +3456,9 @@ struct manifest_json : public json::json_base {
index_size = e.index_size;
first_token = e.first_token;
last_token = e.last_token;
if (e.tablet_id) {
tablet_id = *e.tablet_id;
}
}
sstable_info(const sstable_info& e) {
register_params();
@@ -3457,6 +3468,7 @@ struct manifest_json : public json::json_base {
index_size = e.index_size;
first_token = e.first_token;
last_token = e.last_token;
tablet_id = e.tablet_id;
}
sstable_info(sstable_info&& e) {
register_params();
@@ -3466,6 +3478,7 @@ struct manifest_json : public json::json_base {
index_size = e.index_size;
first_token = e.first_token;
last_token = e.last_token;
tablet_id = e.tablet_id;
}
sstable_info& operator=(sstable_info&& e) {
id = e.id;
@@ -3474,6 +3487,7 @@ struct manifest_json : public json::json_base {
index_size = e.index_size;
first_token = e.first_token;
last_token = e.last_token;
tablet_id = e.tablet_id;
return *this;
}
private:
@@ -3484,6 +3498,51 @@ struct manifest_json : public json::json_base {
add(&index_size, "index_size");
add(&first_token, "first_token");
add(&last_token, "last_token");
add(&tablet_id, "tablet_id");
}
};
struct tablet_info : public json::json_base {
json::json_element<uint64_t> id;
json::json_element<int64_t> first_token;
json::json_element<int64_t> last_token;
json::json_element<time_t> repair_time;
json::json_element<int64_t> repaired_at;
tablet_info() {
register_params();
}
tablet_info(const snapshot_tablet_info& e) {
register_params();
id = e.id;
first_token = dht::token::to_int64(e.first_token);
last_token = dht::token::to_int64(e.last_token);
repair_time = db_clock::to_time_t(e.repair_time);
repaired_at = e.repaired_at;
}
tablet_info(const tablet_info& e) {
register_params();
id = e.id;
first_token = e.first_token;
last_token = e.last_token;
repair_time = e.repair_time;
repaired_at = e.repaired_at;
}
tablet_info& operator=(tablet_info&& e) {
id = e.id;
first_token = e.first_token;
last_token = e.last_token;
repair_time = e.repair_time;
repaired_at = e.repaired_at;
return *this;
}
private:
void register_params() {
add(&id, "id");
add(&first_token, "first_token");
add(&last_token, "last_token");
add(&repair_time, "repair_time");
add(&repaired_at, "repaired_at");
}
};
@@ -3492,6 +3551,7 @@ struct manifest_json : public json::json_base {
json::json_element<snapshot_info> snapshot;
json::json_element<table_info> table;
json::json_chunked_list<sstable_info> sstables;
json::json_chunked_list<tablet_info> tablets;
manifest_json() {
register_params();
@@ -3503,6 +3563,7 @@ struct manifest_json : public json::json_base {
snapshot = std::move(e.snapshot);
table = std::move(e.table);
sstables = std::move(e.sstables);
tablets = std::move(e.tablets);
}
manifest_json& operator=(manifest_json&& e) {
if (this != &e) {
@@ -3511,6 +3572,7 @@ struct manifest_json : public json::json_base {
snapshot = std::move(e.snapshot);
table = std::move(e.table);
sstables = std::move(e.sstables);
tablets = std::move(e.tablets);
}
return *this;
}
@@ -3521,6 +3583,7 @@ private:
add(&snapshot, "snapshot");
add(&table, "table");
add(&sstables, "sstables");
add(&tablets, "tablets");
}
};
@@ -3534,7 +3597,7 @@ public:
using snapshot_sstable_set = foreign_ptr<std::unique_ptr<utils::chunked_vector<sstables::sstable_snapshot_metadata>>>;
static future<> write_manifest(const locator::topology& topology, snapshot_writer& writer, std::vector<snapshot_sstable_set> sstable_sets, sstring name, db::snapshot_options opts, schema_ptr schema, std::optional<int64_t> tablet_count) {
static future<> write_manifest(const locator::topology& topology, snapshot_writer& writer, std::vector<snapshot_sstable_set> sstable_sets, std::vector<snapshot_tablet_info> tablets, sstring name, db::snapshot_options opts, schema_ptr schema, std::optional<int64_t> tablet_count) {
manifest_json manifest;
manifest_json::info info;
@@ -3570,6 +3633,11 @@ static future<> write_manifest(const locator::topology& topology, snapshot_write
manifest.sstables.push(manifest_json::sstable_info(md));
}
}
for (const auto& sti : tablets) {
manifest.tablets.push(manifest_json::tablet_info(sti));
}
auto streamer = json::stream_object(std::move(manifest));
auto out = co_await writer.stream_for("manifest.json");
std::exception_ptr ex;
@@ -3694,11 +3762,32 @@ future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, c
tlogger.debug("snapshot {}: seal_snapshot", name);
const auto& topology = sharded_db.local().get_token_metadata().get_topology();
std::optional<int64_t> min_tablet_count;
std::vector<snapshot_tablet_info> tablets;
if (t.uses_tablets()) {
SCYLLA_ASSERT(!tablet_counts.empty());
min_tablet_count = *std::ranges::min_element(tablet_counts);
auto erm = t.get_effective_replication_map();
auto& tm = erm->get_token_metadata().tablets().get_tablet_map(s->id());
for (auto& ssts : sstable_sets) {
for (auto& sst : *ssts) {
auto tok = sst.first_token;
auto tid = tm.get_tablet_id(dht::token::from_int64(tok));
sst.tablet_id = tid.id;
if (std::none_of(tablets.begin(), tablets.end(), [tid](auto& sti) { return sti.id == tid.id; })) {
auto& tinfo = tm.get_tablet_info(tid);
tablets.emplace_back(snapshot_tablet_info{
.id = tid.id,
.first_token = tm.get_first_token(tid),
.last_token = tm.get_last_token(tid),
.repair_time = tinfo.repair_time,
.repaired_at = tinfo.sstables_repaired_at,
});
}
}
}
}
co_await write_manifest(topology, *writer, std::move(sstable_sets), name, std::move(opts), s, min_tablet_count).handle_exception([&] (std::exception_ptr ptr) {
co_await write_manifest(topology, *writer, std::move(sstable_sets), std::move(tablets), name, std::move(opts), s, min_tablet_count).handle_exception([&] (std::exception_ptr ptr) {
tlogger.error("Failed to seal snapshot in {}: {}.", name, ptr);
ex = std::move(ptr);
});

View File

@@ -242,7 +242,10 @@ class storage_proxy::remote {
const db::view::view_building_state_machine& _vb_state_machine;
abort_source _group0_as;
// These two could probably share, but nice to
// have named separations...
seastar::named_gate _truncate_gate;
seastar::named_gate _snapshot_gate;
netw::connection_drop_slot_t _connection_dropped;
netw::connection_drop_registration_t _condrop_registration;
@@ -254,6 +257,7 @@ public:
sharded<paxos::paxos_store>& paxos_store, raft_group0_client& group0_client, topology_state_machine& tsm, const db::view::view_building_state_machine& vbsm)
: _sp(sp), _ms(ms), _gossiper(g), _mm(mm), _sys_ks(sys_ks), _paxos_store(paxos_store), _group0_client(group0_client), _topology_state_machine(tsm), _vb_state_machine(vbsm)
, _truncate_gate("storage_proxy::remote::truncate_gate")
, _snapshot_gate("storage_proxy::remote::snapshot_gate")
, _connection_dropped(std::bind_front(&remote::connection_dropped, this))
, _condrop_registration(_ms.when_connection_drops(_connection_dropped))
{
@@ -268,6 +272,7 @@ public:
ser::storage_proxy_rpc_verbs::register_read_digest(&_ms, std::bind_front(&remote::handle_read_digest, this));
ser::storage_proxy_rpc_verbs::register_truncate(&_ms, std::bind_front(&remote::handle_truncate, this));
ser::storage_proxy_rpc_verbs::register_truncate_with_tablets(&_ms, std::bind_front(&remote::handle_truncate_with_tablets, this));
ser::storage_proxy_rpc_verbs::register_snapshot_with_tablets(&_ms, std::bind_front(&remote::handle_snapshot_with_tablets, this));
// Register PAXOS verb handlers
ser::storage_proxy_rpc_verbs::register_paxos_prepare(&_ms, std::bind_front(&remote::handle_paxos_prepare, this));
ser::storage_proxy_rpc_verbs::register_paxos_accept(&_ms, std::bind_front(&remote::handle_paxos_accept, this));
@@ -282,6 +287,7 @@ public:
future<> stop() {
_group0_as.request_abort();
co_await _truncate_gate.close();
co_await _snapshot_gate.close();
co_await ser::storage_proxy_rpc_verbs::unregister(&_ms);
_stopped = true;
}
@@ -474,6 +480,12 @@ public:
}
}
future<> snapshot_with_tablets(const std::vector<std::pair<sstring, sstring>>& ks_cf_names, sstring tag, const db::snapshot_options& opts) {
co_await seastar::with_gate(_snapshot_gate, [&] () -> future<> {
co_await request_snapshot_with_tablets(ks_cf_names, tag, opts);
});
}
future<> send_truncate_blocking(sstring keyspace, sstring cfname, std::chrono::milliseconds timeout_in_ms) {
auto s = _sp.local_db().find_schema(keyspace, cfname);
auto erm_ptr = s->table().get_effective_replication_map();
@@ -971,6 +983,18 @@ private:
co_await replica::database::truncate_table_on_all_shards(_sp._db, _sys_ks, ksname, cfname);
}
future<> handle_snapshot_with_tablets(utils::chunked_vector<table_id> ids, sstring tag, gc_clock::time_point ts, bool skip_flush, std::optional<gc_clock::time_point> expiry, service::frozen_topology_guard frozen_guard) {
topology_guard guard(frozen_guard);
db::snapshot_options opts {
.skip_flush = skip_flush,
.created_at = ts,
.expires_at = expiry
};
co_await coroutine::parallel_for_each(ids, [&] (const table_id& id) -> future<> {
co_await replica::database::snapshot_table_on_all_shards(_sp._db, id, tag, opts);
});
}
future<foreign_ptr<std::unique_ptr<service::paxos::prepare_response>>>
handle_paxos_prepare(
const rpc::client_info& cinfo, rpc::opt_time_point timeout,
@@ -1113,11 +1137,15 @@ private:
}
}
future<> request_truncate_with_tablets(sstring ks_name, sstring cf_name) {
using begin_op_func = std::function<std::string()>;
using can_replace_op_with_ongoing_func = std::function<bool(const db::system_keyspace::topology_requests_entry&, const service::global_topology_request&)>;
using create_op_mutations_func = std::function<global_topology_request(topology_request_tracking_mutation_builder&)>;
future<> do_topology_request(std::string_view reason, begin_op_func begin, can_replace_op_with_ongoing_func can_replace_op, create_op_mutations_func create_mutations, std::string_view origin) {
if (this_shard_id() != 0) {
// group0 is only set on shard 0
co_return co_await _sp.container().invoke_on(0, [&] (storage_proxy& sp) {
return sp.remote().request_truncate_with_tablets(ks_name, cf_name);
return sp.remote().do_topology_request(reason, begin, can_replace_op, create_mutations, origin);
});
}
@@ -1126,10 +1154,10 @@ private:
while (true) {
group0_guard guard = co_await _group0_client.start_operation(_group0_as, raft_timeout{});
const table_id table_id = _sp.local_db().find_uuid(ks_name, cf_name);
auto desc = begin();
if (!_sp._features.topology_global_request_queue) {
// Check if we already have a truncate queued for the same table. This can happen when a truncate has timed out
// Check if we already have a similar op for the same table. This can happen when a, say, truncate has timed out
// and the client retried by issuing the same truncate again. In this case, instead of failing the request with
// an "Another global topology request is ongoing" error, we can wait for the already queued request to complete.
// Note that we can not do this for a truncate which the topology coordinator has already started processing,
@@ -1138,21 +1166,15 @@ private:
utils::UUID ongoing_global_request_id = _topology_state_machine._topology.global_request_id.value();
const auto topology_requests_entry = co_await _sys_ks.local().get_topology_request_entry(ongoing_global_request_id);
auto global_request = std::get<service::global_topology_request>(topology_requests_entry.request_type);
if (global_request == global_topology_request::truncate_table) {
std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;
if (!tstate || *tstate != topology::transition_state::truncate_table) {
if (topology_requests_entry.truncate_table_id == table_id) {
global_request_id = ongoing_global_request_id;
slogger.info("Ongoing TRUNCATE for table {}.{} (global request ID {}) detected; waiting for it to complete",
ks_name, cf_name, global_request_id);
break;
}
}
if (can_replace_op(topology_requests_entry, global_request)) {
global_request_id = ongoing_global_request_id;
slogger.info("Ongoing {} (global request ID {}) detected; waiting for it to complete", desc, global_request_id);
break;
}
slogger.warn("Another global topology request ({}) is ongoing during attempt to TRUNCATE table {}.{}",
global_request, ks_name, cf_name);
throw exceptions::invalid_request_exception(::format("Another global topology request is ongoing during attempt to TRUNCATE table {}.{}, please retry.",
ks_name, cf_name));
slogger.warn("Another global topology request ({}) is ongoing during attempt to {}", global_request, desc);
throw exceptions::invalid_request_exception(::format("Another global topology request is ongoing during attempt to {}, please retry.", desc));
}
}
@@ -1160,28 +1182,24 @@ private:
topology_mutation_builder builder(guard.write_timestamp());
topology_request_tracking_mutation_builder trbuilder(global_request_id, _sp._features.topology_requests_type_column);
trbuilder.set_truncate_table_data(table_id)
.set("done", false)
.set("start_time", db_clock::now());
auto req = create_mutations(trbuilder);
if (!_sp._features.topology_global_request_queue) {
builder.set_global_topology_request(global_topology_request::truncate_table)
builder.set_global_topology_request(req)
.set_global_topology_request_id(global_request_id);
} else {
builder.queue_global_topology_request_id(global_request_id);
trbuilder.set("request_type", global_topology_request::truncate_table);
trbuilder.set("request_type", req);
}
slogger.info("Creating TRUNCATE global topology request for table {}.{}", ks_name, cf_name);
topology_change change{{builder.build(), trbuilder.build()}};
sstring reason = "Truncating table";
group0_command g0_cmd = _group0_client.prepare_command(std::move(change), guard, reason);
try {
co_await _group0_client.add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
slogger.debug("request_truncate_with_tablets: concurrent modification, retrying");
slogger.debug("{}: concurrent modification, retrying", origin);
}
}
@@ -1191,6 +1209,74 @@ private:
throw std::runtime_error(error);
}
}
future<> request_truncate_with_tablets(sstring ks_name, sstring cf_name) {
table_id id;
co_await do_topology_request("Truncating table"
, [&] {
id = _sp.local_db().find_uuid(ks_name, cf_name);
return fmt::format("TRUNCATE table {}.{}", ks_name, cf_name);
}
, [&](const db::system_keyspace::topology_requests_entry& entry, const service::global_topology_request& global_request) {
if (global_request == global_topology_request::truncate_table) {
const std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;
if (!tstate || *tstate != topology::transition_state::truncate_table) {
return entry.truncate_table_id == id;
}
}
return false;
}
, [&](topology_request_tracking_mutation_builder& trbuilder) {
trbuilder.set_truncate_table_data(id)
.set("done", false)
.set("start_time", db_clock::now());
slogger.info("Creating TRUNCATE global topology request for table {}.{}", ks_name, cf_name);
return global_topology_request::truncate_table;
}
, "request_truncate_with_tablets"
);
}
future<> request_snapshot_with_tablets(const std::vector<std::pair<sstring, sstring>> ks_cf_names, sstring tag, const db::snapshot_options& opts) {
std::unordered_set<table_id> ids;
co_await do_topology_request("Snapshot table"
, [&] {
auto& db = _sp.local_db();
for (auto& [ks_name, cf_name] : ks_cf_names) {
if (cf_name.empty()) {
auto& ks = db.find_keyspace(ks_name);
auto id_range = ks.metadata()->cf_meta_data() | std::views::values | std::views::transform(std::mem_fn(&schema::id));
ids.insert(id_range.begin(), id_range.end());
} else {
ids.insert(db.find_uuid(ks_name, cf_name));
}
}
return fmt::format("SNAPSHOT tables {}", ks_cf_names);
}
, [&](const db::system_keyspace::topology_requests_entry& entry, const service::global_topology_request& global_request) {
if (global_request == global_topology_request::snapshot_tables) {
const std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;
if (!tstate || *tstate != topology::transition_state::snapshot_tables) {
return entry.snapshot_table_ids == ids && entry.snapshot_tag == tag;
}
}
return false;
}
, [&](topology_request_tracking_mutation_builder& trbuilder) {
trbuilder.set_snapshot_tables_data(ids, tag, opts.skip_flush)
.set("done", false)
.set("start_time", db_clock::now());
if (opts.expires_at) {
trbuilder.set("snapshot_expiry", db_clock::from_time_t(gc_clock::to_time_t(*opts.expires_at)));
}
slogger.info("Creating SNAPSHOT global topology request for tables {}", ks_cf_names);
return global_topology_request::snapshot_tables;
}
, "request_snapshot_with_tablets"
);
}
};
using namespace exceptions;
@@ -7070,6 +7156,29 @@ future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname, std:
}
}
future<> storage_proxy::snapshot_keyspace(std::unordered_multimap<sstring, sstring> ks_tables, sstring tag, const db::snapshot_options& opts) {
if (!features().snapshot_as_topology_operation) {
throw std::runtime_error("Cannot do cluster wide snapshot. Feature 'snapshot_as_topology_operation' is not available in cluster");
}
for (auto& [ksname, _] : ks_tables) {
const replica::keyspace& ks = local_db().find_keyspace(ksname);
if (ks.get_replication_strategy().is_local()) {
throw std::invalid_argument(fmt::format("Keyspace {} uses local replication", ksname));
}
if (!ks.uses_tablets()) {
throw std::invalid_argument(fmt::format("Keyspace {} does not use tablets", ksname));
}
}
slogger.debug("Starting a blocking snapshot operation on keyspaces {}", ks_tables);
auto table_pairs = ks_tables | std::views::transform([](auto& p) { return std::pair<sstring, sstring>(p.first, p.second); })
| std::ranges::to<std::vector>()
;
co_await remote().snapshot_with_tablets(table_pairs, tag, opts);
}
db::system_keyspace& storage_proxy::system_keyspace() {
return remote().system_keyspace();
}

View File

@@ -72,6 +72,7 @@ class feature_service;
namespace db {
class system_keyspace;
struct snapshot_options;
namespace view {
struct view_building_state_machine;
@@ -787,6 +788,11 @@ public:
*/
future<> truncate_blocking(sstring keyspace, sstring cfname, std::chrono::milliseconds timeout_in_ms);
/**
* Performs snapshot on keyspace/tables. To snapshot all tables in a keyspace, put "ks: ''" in map
*/
future<> snapshot_keyspace(std::unordered_multimap<sstring, sstring> ks_tables, sstring tag, const db::snapshot_options& opts);
/*
* Executes data query on the whole cluster.
*

View File

@@ -834,6 +834,8 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
[[fallthrough]];
case topology::transition_state::truncate_table:
[[fallthrough]];
case topology::transition_state::snapshot_tables:
[[fallthrough]];
case topology::transition_state::rollback_to_normal:
return read_new_t::no;
case topology::transition_state::write_both_read_new:

View File

@@ -1158,6 +1158,17 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
co_await update_topology_state(std::move(guard), std::move(updates), "no-op request completed");
}
break;
case global_topology_request::snapshot_tables: {
rtlogger.info("SNAPSHOT TABLES requested");
topology_mutation_builder builder(guard.write_timestamp());
builder.set_transition_state(topology::transition_state::snapshot_tables)
.set_global_topology_request(req)
.set_global_topology_request_id(req_id)
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
.set_session(session_id(req_id));
co_await update_topology_state(std::move(guard), {builder.build()}, "SNAPSHOT TABLES requested");
}
break;
}
}
@@ -2225,7 +2236,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
}
}
future<> handle_truncate_table(group0_guard guard) {
using get_table_ids_func = std::function<std::unordered_set<table_id>(const db::system_keyspace::topology_requests_entry&)>;
using send_rpc_func = std::function<future<>(locator::host_id, const service::frozen_topology_guard&)>;
using desc_func = std::function<std::string()>;
future<> handle_topology_ordered_op(group0_guard guard, get_table_ids_func get_table_ids, send_rpc_func send_rpc, desc_func desc, std::string_view what) {
// Execute a barrier to make sure the nodes we are performing truncate on see the session
// and are able to create a topology_guard using the frozen_guard we are sending over RPC
// TODO: Exclude nodes which don't contain replicas of the table we are truncating
@@ -2237,46 +2252,44 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
// handler performed the truncate and cleared the session, but crashed before finalizing the request
if (_topo_sm._topology.session) {
const auto topology_requests_entry = co_await _sys_ks.get_topology_request_entry(global_request_id);
const table_id& table_id = topology_requests_entry.truncate_table_id;
lw_shared_ptr<replica::table> table = _db.get_tables_metadata().get_table_if_exists(table_id);
if (table) {
const sstring& ks_name = table->schema()->ks_name();
const sstring& cf_name = table->schema()->cf_name();
rtlogger.info("Performing TRUNCATE TABLE for {}.{}", ks_name, cf_name);
std::unordered_set<table_id> tables;
try {
tables = get_table_ids(topology_requests_entry);
} catch (std::exception& e) {
error = e.what();
}
if (!tables.empty()) {
// Collect the IDs of the hosts with replicas, but ignore excluded nodes
std::unordered_set<locator::host_id> replica_hosts;
const locator::tablet_map& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) {
for (const locator::tablet_replica& replica: tinfo.replicas) {
if (!_topo_sm._topology.excluded_tablet_nodes.contains(raft::server_id(replica.host.uuid()))) {
replica_hosts.insert(replica.host);
for (auto table_id : tables) {
const locator::tablet_map& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) {
for (const locator::tablet_replica& replica: tinfo.replicas) {
if (!_topo_sm._topology.excluded_tablet_nodes.contains(raft::server_id(replica.host.uuid()))) {
replica_hosts.insert(replica.host);
}
}
}
return make_ready_future<>();
});
return make_ready_future<>();
});
}
// Release the guard to avoid blocking group0 for long periods of time while invoking RPCs
release_guard(std::move(guard));
co_await utils::get_local_injector().inject("truncate_table_wait", utils::wait_for_message(std::chrono::minutes(2)));
co_await utils::get_local_injector().inject(fmt::format("{}_table_wait", what), utils::wait_for_message(std::chrono::minutes(2)));
// Check if all the nodes with replicas are alive
for (const locator::host_id& replica_host: replica_hosts) {
if (!_gossiper.is_alive(replica_host)) {
throw std::runtime_error(::format("Cannot perform TRUNCATE on table {}.{} because host {} is down", ks_name, cf_name, replica_host));
throw std::runtime_error(::format("Cannot perform {} because host {} is down", desc(), replica_host));
}
}
// Send the RPC to all replicas
const service::frozen_topology_guard frozen_guard { _topo_sm._topology.session };
co_await coroutine::parallel_for_each(replica_hosts, [&] (const locator::host_id& host_id) -> future<> {
co_await ser::storage_proxy_rpc_verbs::send_truncate_with_tablets(&_messaging, host_id, ks_name, cf_name, frozen_guard);
co_await send_rpc(host_id, frozen_guard);
});
} else {
error = ::format("Cannot TRUNCATE table with UUID {} because it does not exist.", table_id);
}
// Clear the session and save the error message
@@ -2296,15 +2309,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
}
try {
co_await update_topology_state(std::move(guard), std::move(updates), "Clear truncate session");
co_await update_topology_state(std::move(guard), std::move(updates), fmt::format("Clear {} session", what));
break;
} catch (group0_concurrent_modification&) {
}
}
}
utils::get_local_injector().inject("truncate_crash_after_session_clear", [] {
rtlogger.info("truncate_crash_after_session_clear hit, killing the node");
utils::get_local_injector().inject(fmt::format("{}_crash_after_session_clear", what), [what] {
rtlogger.info("{}_crash_after_session_clear hit, killing the node", what);
_exit(1);
});
@@ -2330,13 +2343,76 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
.build());
try {
co_await update_topology_state(std::move(guard), std::move(updates), "Truncate has completed");
co_await update_topology_state(std::move(guard), std::move(updates), fmt::format("{}{} has completed", ::toupper(what[0]), what.substr(1)));
break;
} catch (group0_concurrent_modification&) {
}
}
}
future<> handle_truncate_table(group0_guard guard) {
std::string ks_name, cf_name;
co_await handle_topology_ordered_op(std::move(guard)
, [&](const db::system_keyspace::topology_requests_entry& topology_requests_entry) {
const table_id& id = topology_requests_entry.truncate_table_id;
lw_shared_ptr<replica::table> table = _db.get_tables_metadata().get_table_if_exists(id);
if (table) {
ks_name = table->schema()->ks_name();
cf_name = table->schema()->cf_name();
rtlogger.info("Performing TRUNCATE TABLE for {}.{}", ks_name, cf_name);
return std::unordered_set<table_id>({ id });
}
throw std::invalid_argument(fmt::format("Cannot TRUNCATE table with UUID {} because it does not exist.", id));
}
, [&](locator::host_id host_id, const service::frozen_topology_guard& frozen_guard) {
return ser::storage_proxy_rpc_verbs::send_truncate_with_tablets(&_messaging, host_id, ks_name, cf_name, frozen_guard);
}
, [&] {
return fmt::format("TRUNCATE on table {}.{}", ks_name, cf_name);
}
, "truncate"
);
}
future<> handle_snapshot_tables(group0_guard guard) {
utils::chunked_vector<table_id> ids;
sstring tag;
bool skip_flush;
gc_clock::time_point t;
std::optional<gc_clock::time_point> expiry;
co_await handle_topology_ordered_op(std::move(guard)
, [&](const db::system_keyspace::topology_requests_entry& topology_requests_entry) {
tag = *topology_requests_entry.snapshot_tag;
skip_flush = topology_requests_entry.snapshot_skip_flush;
t = gc_clock::from_time_t(db_clock::to_time_t(topology_requests_entry.start_time));
if (topology_requests_entry.snapshot_expiry) {
expiry = gc_clock::from_time_t(db_clock::to_time_t(*topology_requests_entry.snapshot_expiry));
}
for (auto& id : *topology_requests_entry.snapshot_table_ids) {
lw_shared_ptr<replica::table> table = _db.get_tables_metadata().get_table_if_exists(id);
if (!table) {
throw std::invalid_argument(fmt::format("Cannot SNAPSHOT table with UUID {} because it does not exist.", id));
}
ids.emplace_back(id);
}
rtlogger.info("Performing SNAPSHOT TABLES for {}", ids);
return *topology_requests_entry.snapshot_table_ids;
}
, [&](locator::host_id host_id, const service::frozen_topology_guard& frozen_guard) {
return ser::storage_proxy_rpc_verbs::send_snapshot_with_tablets(&_messaging, host_id, ids, tag, t, skip_flush, expiry, frozen_guard);
}
, [&] {
return fmt::format("SNAPSHOT on tables {}", ids);
}
, "snapshot"
);
}
// This function must not release and reacquire the guard, callers rely
// on the fact that the block which calls this is atomic.
// FIXME: Don't take the ownership of the guard to make the above guarantee explicit.
@@ -3213,6 +3289,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
case topology::transition_state::truncate_table:
co_await handle_truncate_table(std::move(guard));
break;
case topology::transition_state::snapshot_tables:
co_await handle_snapshot_tables(std::move(guard));
break;
}
co_return true;
};

View File

@@ -361,6 +361,16 @@ topology_request_tracking_mutation_builder& topology_request_tracking_mutation_b
return *this;
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set_snapshot_tables_data(const std::unordered_set<table_id>& table_ids, const sstring& tag, bool skip_flush) {
auto uuids = table_ids | std::views::transform(std::mem_fn(&table_id::uuid));
apply_atomic("snapshot_table_ids",
make_set_value(schema().get_column_definition("snapshot_table_ids")->type,
set_type_impl::native_type(uuids.begin(), uuids.end())));
apply_atomic("snapshot_tag", tag);
apply_atomic("snapshot_skip_flush", skip_flush);
return *this;
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set_new_keyspace_rf_change_data(
const sstring& ks_name, const std::map<sstring, sstring>& rf_per_dc) {
apply_atomic("new_keyspace_rf_change_ks_name", ks_name);

View File

@@ -158,6 +158,7 @@ public:
topology_request_tracking_mutation_builder& done(std::optional<sstring> error = std::nullopt);
topology_request_tracking_mutation_builder& set_truncate_table_data(const table_id& table_id);
topology_request_tracking_mutation_builder& set_new_keyspace_rf_change_data(const sstring& ks_name, const std::map<sstring, sstring>& rf_per_dc);
topology_request_tracking_mutation_builder& set_snapshot_tables_data(const std::unordered_set<table_id>&, const sstring& tag, bool);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};

View File

@@ -145,6 +145,7 @@ static std::unordered_map<topology::transition_state, sstring> transition_state_
{topology::transition_state::left_token_ring, "left token ring"},
{topology::transition_state::rollback_to_normal, "rollback to normal"},
{topology::transition_state::truncate_table, "truncate table"},
{topology::transition_state::snapshot_tables, "snapshot tables"},
{topology::transition_state::lock, "lock"},
};
@@ -207,6 +208,7 @@ static std::unordered_map<global_topology_request, sstring> global_topology_requ
{global_topology_request::cleanup, "cleanup"},
{global_topology_request::keyspace_rf_change, "keyspace_rf_change"},
{global_topology_request::truncate_table, "truncate_table"},
{global_topology_request::snapshot_tables, "snapshot_tables"},
{global_topology_request::noop_request, "noop_request"},
};

View File

@@ -87,6 +87,7 @@ enum class global_topology_request: uint16_t {
// Used to synchronize API calls with topology coordinator.
// Ensures that all later requests and tablet scheduler will see prior updates to group0.
noop_request,
snapshot_tables,
};
struct ring_slice {
@@ -131,6 +132,7 @@ struct topology {
rollback_to_normal,
truncate_table,
lock,
snapshot_tables,
};
std::optional<transition_state> tstate;

View File

@@ -58,6 +58,7 @@ struct sstable_snapshot_metadata {
uint64_t index_size;
int64_t first_token;
int64_t last_token;
std::optional<size_t> tablet_id;
};
class storage_manager : public peering_sharded_service<storage_manager> {

View File

@@ -979,7 +979,7 @@ SEASTAR_TEST_CASE(clear_nonexistent_snapshot) {
SEASTAR_TEST_CASE(test_snapshot_ctl_details) {
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
sharded<db::snapshot_ctl> sc;
sc.start(std::ref(e.db()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get();
sc.start(std::ref(e.db()), std::ref(e.get_storage_proxy()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get();
auto stop_sc = deferred_stop(sc);
auto& cf = e.local_db().find_column_family("ks", "cf");
@@ -1027,7 +1027,7 @@ SEASTAR_TEST_CASE(test_snapshot_ctl_details) {
SEASTAR_TEST_CASE(test_snapshot_ctl_true_snapshots_size) {
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
sharded<db::snapshot_ctl> sc;
sc.start(std::ref(e.db()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get();
sc.start(std::ref(e.db()), std::ref(e.get_storage_proxy()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get();
auto stop_sc = deferred_stop(sc);
auto& cf = e.local_db().find_column_family("ks", "cf");

View File

@@ -0,0 +1,97 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import itertools
import logging
import os
import json
from test.cqlpy import nodetool
from test.pylib.manager_client import ManagerClient
from test.cluster.object_store.test_backup import create_cluster, topo
from test.cluster.util import new_test_keyspace, new_test_table, unique_name
from cassandra import ConsistencyLevel
import pytest
logger = logging.getLogger(__name__)
async def get_snapshot_path(manager:ManagerClient, server, keyspace:str, table:str, snapshot_name:str):
"""Gets snapshot path files for server and snapshot"""
workdir = await manager.server_get_workdir(server.server_id)
data_path = os.path.join(workdir, 'data', keyspace)
cf_dirs = os.listdir(data_path)
# Assumes that there is only one column family directory for table under the keyspace.
for cf_dir in cf_dirs:
if cf_dir.startswith(table):
snapshot_path = os.path.join(data_path, cf_dir, 'snapshots', snapshot_name)
return snapshot_path
raise RuntimeError(f"No column family directories found in {data_path} for {table}")
async def get_snapshot_files(manager:ManagerClient, server, keyspace:str, table:str, snapshot_name:str):
"""Gets TOC files from server"""
snapshot_path = await get_snapshot_path(manager, server, keyspace, table, snapshot_name)
return [
f.name for f in os.scandir(snapshot_path)
if f.is_file() and f.name.endswith('TOC.txt')
]
async def get_snapshot_manifest(manager:ManagerClient, server, keyspace:str, table:str, snapshot_name:str):
"""Gets TOC files from server"""
snapshot_path = await get_snapshot_path(manager, server, keyspace, table, snapshot_name)
with open(os.path.join(snapshot_path, 'manifest.json'), encoding='utf-8') as f:
return json.load(f)
async def prepare_write_workload(cql, table_name, flush=True, n: int = None):
"""write some data"""
keys = list(range(n if n else 100))
c1_values = ['value1']
c2_values = ['value2']
statement = cql.prepare(f"INSERT INTO {table_name} (key, c1, c2) VALUES (?, ?, ?)")
statement.consistency_level = ConsistencyLevel.ALL
await asyncio.gather(*[cql.run_async(statement, params) for params in
list(map(lambda x, y, z: [x, y, z], keys,
itertools.cycle(c1_values),
itertools.cycle(c2_values)))]
)
if flush:
nodetool.flush(cql, table_name)
@pytest.mark.asyncio
async def test_snapshot_on_all_nodes(manager: ManagerClient):
"""
Tests that a topology operation snapshot is done on all nodes,
not just the initiator.
"""
topology = topo(rf = 3, nodes = 3, racks = 3, dcs = 1)
servers, _ = await create_cluster(topology, True, manager, logger)
snapshot_name = unique_name('snap_')
async with new_test_keyspace(manager, f"WITH REPLICATION = {{ 'replication_factor' : {topology.rf} }} AND tablets = {{'initial': 20 }}") as ks:
async with new_test_table(manager, ks, "key int, c1 text, c2 text, PRIMARY KEY (key)", "") as tbl:
cf = tbl.split('.')[1]
await prepare_write_workload(manager.get_cql(), tbl, flush=False)
await manager.api.take_cluster_snapshot(servers[0].ip_addr, ks, tag=snapshot_name, tables=[cf])
try:
# Collect snapshot files from each server
for s in servers:
files = await get_snapshot_files(manager, s, ks, cf, snapshot_name)
assert len(files) > 0
manifest = await get_snapshot_manifest(manager, s, ks, cf, snapshot_name)
assert len(manifest['tablets'])
tablets = { t['id']: t for t in manifest['tablets'] }
for sst in manifest['sstables']:
assert sst['tablet_id'] is not None
assert tablets[sst['tablet_id']]
finally:
#todo: clear snapshot
pass

View File

@@ -391,11 +391,20 @@ class ScyllaRESTAPIClient:
params['scope'] = scope
return await self.client.post_json(f"/storage_service/restore", host=node_ip, params=params, json=sstables)
async def take_snapshot(self, node_ip: str, ks: str, tag: str) -> None:
async def take_snapshot(self, node_ip: str, ks: str, tag: str, tables: list[str] = None) -> None:
"""Take keyspace snapshot"""
params = { 'kn': ks, 'tag': tag }
if tables:
params['cf'] = ','.join(tables)
await self.client.post(f"/storage_service/snapshots", host=node_ip, params=params)
async def take_cluster_snapshot(self, node_ip: str, ks: str, tag: str, tables: list[str] = None) -> None:
"""Take keyspace snapshot"""
params = { 'keyspace': ks, 'tag': tag }
if tables:
params['table'] = ','.join(tables)
await self.client.post(f"/storage_service/tablets/snapshots", host=node_ip, params=params)
async def cleanup_keyspace(self, node_ip: str, ks: str) -> None:
"""Cleanup keyspace"""
await self.client.post(f"/storage_service/keyspace_cleanup/{ks}", host=node_ip)

View File

@@ -2392,6 +2392,49 @@ void snapshot_operation(scylla_rest_client& client, const bpo::variables_map& vm
fmt::print(std::cout, "Snapshot directory: {}\n", params["tag"]);
}
void cluster_snapshot_operation(scylla_rest_client& client, const bpo::variables_map& vm) {
std::unordered_map<sstring, sstring> params;
sstring kn_msg;
std::vector<sstring> kt_list;
if (vm.contains("keyspaces")) {
kt_list = vm["keyspaces"].as<std::vector<sstring>>();
}
if (kt_list.size() != 1 && vm.contains("table")) {
throw std::invalid_argument("when specifying the table for the snapshot, you must specify one and only one keyspace");
}
if (kt_list.empty()) {
kn_msg = "all keyspaces";
} else {
params["keyspace"] = fmt::to_string(fmt::join(kt_list.begin(), kt_list.end(), ","));
if (vm.contains("table")) {
params["table"] = vm["table"].as<sstring>();
}
}
if (vm.contains("tag")) {
params["tag"] = vm["tag"].as<sstring>();
} else {
params["tag"] = fmt::to_string(db_clock::now().time_since_epoch().count());
}
client.post("/storage_service/tablets/snapshots", params);
if (kn_msg.empty()) {
kn_msg = params["keyspace"];
}
fmt::print(std::cout, "Requested cluster snapshot(s) for [{}] with snapshot name [{}] and options {{skipFlush={}}}\n",
kn_msg,
params["tag"],
params["skip_flush"]
);
fmt::print(std::cout, "Snapshot directory: {}\n", params["tag"]);
}
void sstableinfo_operation(scylla_rest_client& client, const bpo::variables_map& vm) {
std::vector<keyspace_and_table> requests;
if (vm.contains("table")) {
@@ -3888,6 +3931,20 @@ For more information, see: {}
{
},
},
{
"snapshot",
"Take a cluster-wide snapshot of specified keyspaces or a snapshot of the specified table(s)",
fmt::format(R"(
For more information, see: {}
)", doc_link("operating-scylla/nodetool-commands/snapshot.html")),
{
typed_option<sstring>("table", "The table(s) to snapshot, multiple ones can be joined with ','"),
typed_option<sstring>("tag,t", "The name of the snapshot"),
},
{
typed_option<std::vector<sstring>>("keyspaces", "The keyspaces to snapshot", -1),
},
},
}
},
{
@@ -3897,6 +3954,9 @@ For more information, see: {}
},
{
"cleanup", { cluster_cleanup_operation }
},
{
"snapshot", { cluster_snapshot_operation }
}
}
}