Merge 'truncate: make TRUNCATE TABLE safe with tablets' from Ferenc Szili

Currently truncating a table works by issuing an RPC to all the nodes which call `database::truncate_table_on_all_shards()`, which makes sure that older writes are dropped.

It works with tablets, but is not safe. A concurrent replication process may bring back old data.

This change makes makes TRUNCATE TABLE a topology operation, so that it excludes with other processes in the system which could interfere with it. More specifically, it makes TRUNCATE a global topology request.

Backporting is not needed.

Fixes #16411

Closes scylladb/scylladb#19789

* github.com:scylladb/scylladb:
  docs: docs: topology-over-raft: Document truncate_table request
  storage_proxy: fix indentation and remove empty catch/rethrow
  test: add tests for truncate with tablets
  storage_proxy: use new TRUNCATE for tablets
  truncate: make TRUNCATE a global topology operation
  storage_service: move logic of wait_for_topology_request_completion()
  RPC: add truncate_with_tablets RPC with frozen_topology_guard
  feature_service: added cluster feature for system.topology schema change
  system.topology_requests: change schema
  storage_proxy: propagate group0 client and TSM dependency
This commit is contained in:
Tomasz Grabiec
2024-12-10 17:50:50 +01:00
18 changed files with 514 additions and 38 deletions

View File

@@ -283,6 +283,7 @@ schema_ptr system_keyspace::topology_requests() {
.with_column("done", boolean_type)
.with_column("error", utf8_type)
.with_column("end_time", timestamp_type)
.with_column("truncate_table_id", uuid_type)
.set_comment("Topology request tracking")
.with_hash_version()
.build();
@@ -3388,6 +3389,9 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
if (row.has("end_time")) {
entry.end_time = row.get_as<db_clock::time_point>("end_time");
}
if (row.has("truncate_table_id")) {
entry.truncate_table_id = table_id(row.get_as<utils::UUID>("truncate_table_id"));
}
return entry;
}

View File

@@ -406,6 +406,7 @@ public:
sstring error;
db_clock::time_point end_time;
db_clock::time_point ts;
table_id truncate_table_id;
};
using topology_requests_entries = std::unordered_map<utils::UUID, system_keyspace::topology_requests_entry>;

View File

@@ -155,8 +155,20 @@ transition. An example of this is the `rebuilding` state which does not change
the topology but requires streaming data.
Separately from the per-node requests, there is also a 'global' request field
for operations that don't affect any specific node but the entire cluster,
such as `check_and_repair_cdc_streams`.
for operations that don't affect any specific node but the entire cluster. These
are the currently supported global topology operations:
- `new_cdc_generation` aka `check_and_repair_cdc_streams`
- `cleanup`
- `keyspace_rf_change`
- `truncate_table` Truncate for keyspaces with tablets is implemented as a topology
request in order to serialize it with other topology operations (migration, repair)
and avoid issues with data resurrection when truncate is executed during tablet
migrations. Truncate has only one implicit transition stage. When the topology
coordinator executes the truncate request, it issues truncate RPCs to nodes which
contain replicas of the table being truncated. It uses [sessions](#Topology guards)
to make sure that no stale RPCs are executed outside of the scope of the request.
# Load balancing

View File

@@ -116,6 +116,7 @@ public:
gms::feature collection_indexing { *this, "COLLECTION_INDEXING"sv };
gms::feature large_collection_detection { *this, "LARGE_COLLECTION_DETECTION"sv };
gms::feature range_tombstone_and_dead_rows_detection { *this, "RANGE_TOMBSTONE_AND_DEAD_ROWS_DETECTION"sv };
gms::feature truncate_as_topology_operation { *this, "TRUNCATE_AS_TOPOLOGY_OPERATION"sv };
gms::feature secondary_indexes_on_static_columns { *this, "SECONDARY_INDEXES_ON_STATIC_COLUMNS"sv };
gms::feature tablets { *this, "TABLETS"sv };
gms::feature uuid_sstable_identifiers { *this, "UUID_SSTABLE_IDENTIFIERS"sv };

View File

@@ -32,6 +32,7 @@ verb [[with_client_info, with_timeout]] read_data (query::read_command cmd [[ref
verb [[with_client_info, with_timeout]] read_mutation_data (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, service::fencing_token fence [[version 5.4.0]]) -> reconcilable_result [[lw_shared_ptr]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]];
verb [[with_client_info, with_timeout]] read_digest (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]) -> query::result_digest, api::timestamp_type [[version 1.2.0]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]], std::optional<full_position> [[version 5.2.0]];
verb [[with_timeout]] truncate (sstring, sstring);
verb [[]] truncate_with_tablets (sstring ks_name, sstring cf_name, service::frozen_topology_guard frozen_guard);
verb [[with_client_info, with_timeout]] paxos_prepare (query::read_command cmd [[ref]], partition_key key [[ref]], utils::UUID ballot, bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info> trace_info [[ref]]) -> service::paxos::prepare_response [[unique_ptr]];
verb [[with_client_info, with_timeout]] paxos_accept (service::paxos::proposal proposal [[ref]], std::optional<tracing::trace_info> trace_info [[ref]]) -> bool;
verb [[with_client_info, with_timeout, one_way]] paxos_learn (service::paxos::proposal decision [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[ref]], host_id_vector_replica_set forward_id [[ref, version 6.3.0]], locator::host_id reply_to_id [[version 6.3.0]]);

View File

@@ -1811,7 +1811,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
static seastar::sharded<memory_threshold_guard> mtg;
mtg.start(cfg->large_memory_allocation_warning_threshold()).get();
supervisor::notify("initializing storage proxy RPC verbs");
proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(messaging), std::ref(gossiper), std::ref(mm), std::ref(sys_ks)).get();
proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(messaging), std::ref(gossiper), std::ref(mm), std::ref(sys_ks), std::ref(group0_client), std::ref(tsm)).get();
auto stop_proxy_handlers = defer_verbose_shutdown("storage proxy RPC verbs", [&proxy] {
proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
});

View File

@@ -45,6 +45,7 @@
#include "serializer.hh"
#include "db/per_partition_rate_limit_info.hh"
#include "service/topology_state_machine.hh"
#include "service/topology_guard.hh"
#include "service/raft/join_node.hh"
#include "idl/consistency_level.dist.hh"
#include "idl/tracing.dist.hh"
@@ -643,6 +644,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::READ_DIGEST:
case messaging_verb::DEFINITIONS_UPDATE:
case messaging_verb::TRUNCATE:
case messaging_verb::TRUNCATE_WITH_TABLETS:
case messaging_verb::MIGRATION_REQUEST:
case messaging_verb::SCHEMA_CHECK:
case messaging_verb::COUNTER_MUTATION:

View File

@@ -203,7 +203,8 @@ enum class messaging_verb : int32_t {
JOIN_NODE_QUERY = 73,
TASKS_GET_CHILDREN = 74,
TABLET_REPAIR = 75,
LAST = 76,
TRUNCATE_WITH_TABLETS = 76,
LAST = 77,
};
} // namespace netw

View File

@@ -59,6 +59,7 @@
#include "service/migration_manager.hh"
#include "service/client_state.hh"
#include "service/paxos/proposal.hh"
#include "service/topology_mutation.hh"
#include "locator/token_metadata.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/parallel_for_each.hh>
@@ -238,6 +239,10 @@ class storage_proxy::remote {
const gms::gossiper& _gossiper;
migration_manager& _mm;
sharded<db::system_keyspace>& _sys_ks;
raft_group0_client& _group0_client;
topology_state_machine& _topology_state_machine;
abort_source _group0_as;
future<> _truncate_table_fiber = make_ready_future<>();
netw::connection_drop_slot_t _connection_dropped;
netw::connection_drop_registration_t _condrop_registration;
@@ -245,8 +250,9 @@ class storage_proxy::remote {
bool _stopped{false};
public:
remote(storage_proxy& sp, netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded<db::system_keyspace>& sys_ks)
: _sp(sp), _ms(ms), _gossiper(g), _mm(mm), _sys_ks(sys_ks)
remote(storage_proxy& sp, netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded<db::system_keyspace>& sys_ks,
raft_group0_client& group0_client, topology_state_machine& tsm)
: _sp(sp), _ms(ms), _gossiper(g), _mm(mm), _sys_ks(sys_ks), _group0_client(group0_client), _topology_state_machine(tsm)
, _connection_dropped(std::bind_front(&remote::connection_dropped, this))
, _condrop_registration(_ms.when_connection_drops(_connection_dropped))
{
@@ -260,6 +266,7 @@ public:
ser::storage_proxy_rpc_verbs::register_read_mutation_data(&_ms, std::bind_front(&remote::handle_read_mutation_data, this));
ser::storage_proxy_rpc_verbs::register_read_digest(&_ms, std::bind_front(&remote::handle_read_digest, this));
ser::storage_proxy_rpc_verbs::register_truncate(&_ms, std::bind_front(&remote::handle_truncate, this));
ser::storage_proxy_rpc_verbs::register_truncate_with_tablets(&_ms, std::bind_front(&remote::handle_truncate_with_tablets, this));
// Register PAXOS verb handlers
ser::storage_proxy_rpc_verbs::register_paxos_prepare(&_ms, std::bind_front(&remote::handle_paxos_prepare, this));
ser::storage_proxy_rpc_verbs::register_paxos_accept(&_ms, std::bind_front(&remote::handle_paxos_accept, this));
@@ -272,6 +279,8 @@ public:
// Must call before destroying the `remote` object.
future<> stop() {
_group0_as.request_abort();
co_await std::move(_truncate_table_fiber);
co_await ser::storage_proxy_rpc_verbs::unregister(&_ms);
_stopped = true;
}
@@ -445,6 +454,25 @@ public:
return ser::storage_proxy_rpc_verbs::send_paxos_prune(&_ms, addr, timeout, schema_id, key, ballot, tracing::make_trace_info(tr_state));
}
future<> truncate_with_tablets(sstring ks_name, sstring cf_name, std::chrono::milliseconds timeout_in_ms) {
if (!_truncate_table_fiber.available()) {
throw std::runtime_error("Another TRUNCATE TABLE is ongoing, please retry.");
}
auto sem = make_lw_shared<seastar::semaphore>(0);
_truncate_table_fiber = request_truncate_with_tablets(ks_name, cf_name).then_wrapped([sem] (auto f) {
if (f.failed()) {
sem->broken(f.get_exception());
} else {
sem->signal(1);
}
});
try {
co_await sem->wait(timeout_in_ms, 1);
} catch (seastar::semaphore_timed_out&) {
throw std::runtime_error(format("Timeout during TRUNCATE TABLE of {}.{}", ks_name, cf_name));
}
}
future<> send_truncate_blocking(sstring keyspace, sstring cfname, std::chrono::milliseconds timeout_in_ms) {
if (!_gossiper.get_unreachable_token_owners().empty()) {
slogger.info("Cannot perform truncate, some hosts are down");
@@ -467,12 +495,10 @@ public:
co_await coroutine::parallel_for_each(all_endpoints, [&] (auto ep) {
return send_truncate(ep, timeout, keyspace, cfname);
});
} catch (rpc::timeout_error& e) {
slogger.trace("Truncation of {} timed out: {}", cfname, e.what());
throw;
} catch (...) {
throw;
}
} catch (rpc::timeout_error& e) {
slogger.trace("Truncation of {} timed out: {}", cfname, e.what());
throw;
}
}
private:
@@ -922,7 +948,12 @@ private:
}
future<> handle_truncate(rpc::opt_time_point timeout, sstring ksname, sstring cfname) {
return replica::database::truncate_table_on_all_shards(_sp._db, _sys_ks, ksname, cfname);
co_await replica::database::truncate_table_on_all_shards(_sp._db, _sys_ks, ksname, cfname);
}
future<> handle_truncate_with_tablets(sstring ksname, sstring cfname, service::frozen_topology_guard frozen_guard) {
topology_guard guard(frozen_guard);
co_await replica::database::truncate_table_on_all_shards(_sp._db, _sys_ks, ksname, cfname);
}
future<foreign_ptr<std::unique_ptr<service::paxos::prepare_response>>>
@@ -1035,6 +1066,58 @@ private:
cf->drop_hit_rate(*id);
}
}
future<> request_truncate_with_tablets(sstring ks_name, sstring cf_name) {
if (this_shard_id() != 0) {
// group0 is only set on shard 0
co_return co_await _sp.container().invoke_on(0, [&] (storage_proxy& sp) {
return sp.remote().request_truncate_with_tablets(ks_name, cf_name);
});
}
// Create the global topology request
utils::UUID global_request_id;
while (true) {
group0_guard guard = co_await _group0_client.start_operation(_group0_as, raft_timeout{});
if (_topology_state_machine._topology.global_request.has_value()) {
throw exceptions::invalid_request_exception("Another global topology request is ongoing, please retry.");
}
global_request_id = guard.new_group0_state_id();
const table_id table_id = _sp.local_db().find_uuid(ks_name, cf_name);
std::vector<canonical_mutation> updates;
updates.emplace_back(topology_mutation_builder(guard.write_timestamp())
.set_global_topology_request(global_topology_request::truncate_table)
.set_global_topology_request_id(global_request_id)
.set_session(session_id(global_request_id))
.build());
updates.emplace_back(topology_request_tracking_mutation_builder(global_request_id)
.set_truncate_table_data(table_id)
.set("done", false)
.set("start_time", db_clock::now())
.build());
topology_change change{std::move(updates)};
sstring reason = "Truncating table";
group0_command g0_cmd = _group0_client.prepare_command(std::move(change), guard, reason);
try {
co_await _group0_client.add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
slogger.debug("request_truncate_with_tablets: concurrent modification, retrying");
}
}
// Wait for the topology request to complete
sstring error = co_await _topology_state_machine.wait_for_request_completion(_sys_ks.local(), global_request_id, true);
if (!error.empty()) {
throw std::runtime_error(error);
}
}
};
using namespace exceptions;
@@ -6603,15 +6686,19 @@ db::hints::manager& storage_proxy::hints_manager_for(db::write_type type) {
future<> storage_proxy::truncate_blocking(sstring keyspace, sstring cfname, std::chrono::milliseconds timeout_in_ms) {
slogger.debug("Starting a blocking truncate operation on keyspace {}, CF {}", keyspace, cfname);
if (local_db().find_keyspace(keyspace).get_replication_strategy().get_type() == locator::replication_strategy_type::local) {
return replica::database::truncate_table_on_all_shards(_db, remote().system_keyspace().container(), keyspace, cfname);
const replica::keyspace& ks = local_db().find_keyspace(keyspace);
if (ks.get_replication_strategy().get_type() == locator::replication_strategy_type::local) {
co_await replica::database::truncate_table_on_all_shards(_db, remote().system_keyspace().container(), keyspace, cfname);
} else if (ks.uses_tablets() && features().truncate_as_topology_operation) {
co_await remote().truncate_with_tablets(std::move(keyspace), std::move(cfname), timeout_in_ms);
} else {
co_await remote().send_truncate_blocking(std::move(keyspace), std::move(cfname), timeout_in_ms);
}
return remote().send_truncate_blocking(std::move(keyspace), std::move(cfname), timeout_in_ms);
}
void storage_proxy::start_remote(netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded<db::system_keyspace>& sys_ks) {
_remote = std::make_unique<struct remote>(*this, ms, g, mm, sys_ks);
void storage_proxy::start_remote(netw::messaging_service& ms, gms::gossiper& g, migration_manager& mm, sharded<db::system_keyspace>& sys_ks,
raft_group0_client& group0_client, topology_state_machine& tsm) {
_remote = std::make_unique<struct remote>(*this, ms, g, mm, sys_ks, group0_client, tsm);
}
future<> storage_proxy::stop_remote() {

View File

@@ -38,6 +38,7 @@
#include "replica/exceptions.hh"
#include "locator/host_id.hh"
#include "dht/token_range_endpoints.hh"
#include "service/storage_service.hh"
class reconcilable_result;
class frozen_mutation_and_schema;
@@ -475,7 +476,8 @@ private:
public:
storage_proxy(distributed<replica::database>& db, config cfg, db::view::node_update_backlog& max_view_update_backlog,
scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm, locator::effective_replication_map_factory& erm_factory);
scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm,
locator::effective_replication_map_factory& erm_factory);
~storage_proxy();
const distributed<replica::database>& get_db() const {
@@ -510,7 +512,7 @@ public:
}
// Start/stop the remote part of `storage_proxy` that is required for performing distributed queries.
void start_remote(netw::messaging_service&, gms::gossiper&, migration_manager&, sharded<db::system_keyspace>& sys_ks);
void start_remote(netw::messaging_service&, gms::gossiper&, migration_manager&, sharded<db::system_keyspace>& sys_ks, raft_group0_client&, topology_state_machine&);
future<> stop_remote();
gms::inet_address my_address() const noexcept;

View File

@@ -4689,22 +4689,7 @@ future<> storage_service::do_cluster_cleanup() {
}
future<sstring> storage_service::wait_for_topology_request_completion(utils::UUID id, bool require_entry) {
rtlogger.debug("Start waiting for topology request completion (request id {})", id);
while (true) {
auto c = _topology_state_machine.reload_count;
auto [done, error] = co_await _sys_ks.local().get_topology_request_state(id, require_entry);
if (done) {
rtlogger.debug("Request with id {} is completed with status: {}", id, error.empty() ? sstring("success") : error);
co_return error;
}
if (c == _topology_state_machine.reload_count) {
// wait only if the state was not reloaded while we were preempted
rtlogger.debug("Waiting for a topology event while waiting for topology request completion (request id {})", id);
co_await _topology_state_machine.event.when();
}
}
co_return sstring();
co_return co_await _topology_state_machine.wait_for_request_completion(_sys_ks.local(), id, require_entry);
}
future<> storage_service::wait_for_topology_not_busy() {
@@ -6055,6 +6040,10 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
throw std::runtime_error(fmt::format("Cannot stream within the same node using regular migration, tablet: {}, shard {} -> {}",
tablet, leaving_replica->shard, trinfo->pending_replica->shard));
}
co_await utils::get_local_injector().inject("migration_streaming_wait", [] (auto& handler) {
rtlogger.info("migration_streaming_wait: start");
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
});
auto& table = _db.local().find_column_family(tablet.table);
std::vector<sstring> tables = {table.schema()->cf_name()};
auto my_id = tm->get_my_id();

View File

@@ -54,6 +54,9 @@
#include "idl/join_node.dist.hh"
#include "idl/storage_service.dist.hh"
#include "replica/exceptions.hh"
#include "service/paxos/prepare_response.hh"
#include "idl/storage_proxy.dist.hh"
#include "service/topology_coordinator.hh"
@@ -920,6 +923,131 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
}
break;
case global_topology_request::truncate_table: {
// Execute a barrier to make sure the nodes we are performing truncate on see the session
// and are able to create a topology_guard using the frozen_guard we are sending over RPC
// TODO: Exclude nodes which don't contain replicas of the table we are truncating
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
const utils::UUID& global_request_id = _topo_sm._topology.global_request_id.value();
std::optional<sstring> error;
// We should perform TRUNCATE only if the session is still valid. It could be cleared if a previous truncate
// handler performed the truncate and cleared the session, but crashed before finalizing the request
if (_topo_sm._topology.session) {
const auto topology_requests_entry = co_await _sys_ks.get_topology_request_entry(global_request_id, true);
const table_id& table_id = topology_requests_entry.truncate_table_id;
lw_shared_ptr<replica::table> table = _db.get_tables_metadata().get_table_if_exists(table_id);
if (table) {
const sstring& ks_name = table->schema()->ks_name();
const sstring& cf_name = table->schema()->cf_name();
rtlogger.info("Performing TRUNCATE TABLE global topology request for {}.{}", ks_name, cf_name);
// Collect the IDs of the hosts with replicas, but ignore excluded nodes
std::unordered_set<locator::host_id> replica_hosts;
const std::unordered_set<raft::server_id> excluded_nodes = _topo_sm._topology.get_excluded_nodes();
const locator::tablet_map& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) {
for (const locator::tablet_replica& replica: tinfo.replicas) {
if (!excluded_nodes.contains(raft::server_id(replica.host.uuid()))) {
replica_hosts.insert(replica.host);
}
}
return make_ready_future<>();
});
// Release the guard to avoid blocking group0 for long periods of time while invoking RPCs
release_guard(std::move(guard));
co_await utils::get_local_injector().inject("truncate_table_wait", [] (auto& handler) {
rtlogger.info("truncate_table_wait: start");
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
});
// Check if all the nodes with replicas are alive
for (const locator::host_id& replica_host: replica_hosts) {
if (!_gossiper.is_alive(replica_host)) {
throw std::runtime_error(::format("Cannot perform TRUNCATE on table {}.{} because host {} is down", ks_name, cf_name, replica_host));
}
}
// Send the RPC to all replicas
const service::frozen_topology_guard frozen_guard { _topo_sm._topology.session };
co_await coroutine::parallel_for_each(replica_hosts, [&] (const locator::host_id& host_id) -> future<> {
co_await ser::storage_proxy_rpc_verbs::send_truncate_with_tablets(&_messaging, host_id, ks_name, cf_name, frozen_guard);
});
} else {
error = ::format("Table with UUID {} does not exist.", table_id);
}
// Clear the session and save the error message
while (true) {
if (!guard) {
guard = co_await start_operation();
}
std::vector<canonical_mutation> updates;
updates.push_back(topology_mutation_builder(guard.write_timestamp())
.del_session()
.build());
if (error) {
updates.push_back(topology_request_tracking_mutation_builder(global_request_id)
.set("error", *error)
.build());
}
sstring reason = "Clear truncate session";
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
break;
} catch (group0_concurrent_modification&) {
rtlogger.info("handle_global_request(): concurrent modification, retrying");
}
}
}
utils::get_local_injector().inject("truncate_crash_after_session_clear", [] {
rtlogger.info("truncate_crash_after_session_clear hit, killing the node");
_exit(1);
});
// Execute a barrier to ensure the TRUNCATE RPC can't run on any nodes after this point
if (!guard) {
guard = co_await start_operation();
}
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
// Finalize the request
while (true) {
if (!guard) {
guard = co_await start_operation();
}
std::vector<canonical_mutation> updates;
updates.push_back(topology_mutation_builder(guard.write_timestamp())
.del_global_topology_request()
.del_global_topology_request_id()
.build());
updates.push_back(topology_request_tracking_mutation_builder(global_request_id)
.set("end_time", db_clock::now())
.set("done", true)
.build());
sstring reason = "Truncate has completed";
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
break;
} catch (group0_concurrent_modification&) {
rtlogger.info("handle_global_request(): concurrent modification, retrying");
}
}
break;
}
}
}

View File

@@ -313,6 +313,11 @@ topology_request_tracking_mutation_builder& topology_request_tracking_mutation_b
return set("done", true);
}
topology_request_tracking_mutation_builder& topology_request_tracking_mutation_builder::set_truncate_table_data(const table_id& table_id) {
apply_atomic("truncate_table_id", table_id.uuid());
return *this;
}
template class topology_mutation_builder_base<topology_mutation_builder>;
template class topology_mutation_builder_base<topology_node_mutation_builder>;
template class topology_mutation_builder_base<topology_request_tracking_mutation_builder>;

View File

@@ -151,6 +151,7 @@ public:
using builder_base::del;
topology_request_tracking_mutation_builder& set(const char* cell, topology_request value);
topology_request_tracking_mutation_builder& done(std::optional<sstring> error = std::nullopt);
topology_request_tracking_mutation_builder& set_truncate_table_data(const table_id& table_id);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};

View File

@@ -9,6 +9,7 @@
#include "topology_state_machine.hh"
#include "utils/log.hh"
#include "db/system_keyspace.hh"
#include <boost/range/adaptor/map.hpp>
@@ -212,6 +213,7 @@ static std::unordered_map<global_topology_request, sstring> global_topology_requ
{global_topology_request::new_cdc_generation, "new_cdc_generation"},
{global_topology_request::cleanup, "cleanup"},
{global_topology_request::keyspace_rf_change, "keyspace_rf_change"},
{global_topology_request::truncate_table, "truncate_table"},
};
global_topology_request global_topology_request_from_string(const sstring& s) {
@@ -260,6 +262,25 @@ future<> topology_state_machine::await_not_busy() {
}
}
future<sstring> topology_state_machine::wait_for_request_completion(db::system_keyspace& sys_ks, utils::UUID id, bool require_entry) {
tsmlogger.debug("Start waiting for topology request completion (request id {})", id);
while (true) {
auto c = reload_count;
auto [done, error] = co_await sys_ks.get_topology_request_state(id, require_entry);
if (done) {
tsmlogger.debug("Request with id {} is completed with status: {}", id, error.empty() ? sstring("success") : error);
co_return error;
}
if (c == reload_count) {
// wait only if the state was not reloaded while we were preempted
tsmlogger.debug("Waiting for a topology event while waiting for topology request completion (request id {})", id);
co_await event.when();
}
}
co_return sstring();
}
}
auto fmt::formatter<service::cleanup_status>::format(service::cleanup_status status,

View File

@@ -22,6 +22,10 @@
#include "service/session.hh"
#include "mutation/canonical_mutation.hh"
namespace db {
class system_keyspace;
}
namespace service {
enum class node_state: uint16_t {
@@ -72,6 +76,7 @@ enum class global_topology_request: uint16_t {
new_cdc_generation,
cleanup,
keyspace_rf_change,
truncate_table,
};
struct ring_slice {
@@ -231,6 +236,7 @@ struct topology_state_machine {
size_t reload_count = 0;
future<> await_not_busy();
future<sstring> wait_for_request_completion(db::system_keyspace& sys_ks, utils::UUID id, bool require_entry);
};
// Raft leader uses this command to drive bootstrap process on other nodes

View File

@@ -930,7 +930,7 @@ private:
});
if (cfg_in.need_remote_proxy) {
_proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(_ms), std::ref(_gossiper), std::ref(_mm), std::ref(_sys_ks)).get();
_proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(_ms), std::ref(_gossiper), std::ref(_mm), std::ref(_sys_ks), std::ref(group0_client), std::ref(_topology_state_machine)).get();
}
auto stop_proxy_remote = defer([this, need = cfg_in.need_remote_proxy] {
if (need) {

View File

@@ -0,0 +1,215 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
from cassandra.query import SimpleStatement, ConsistencyLevel
from cassandra.protocol import InvalidRequest
from test.pylib.manager_client import ManagerClient
from test.topology.conftest import skip_mode
from test.topology.util import get_topology_coordinator
from test.pylib.tablets import get_all_tablet_replicas
from test.pylib.util import wait_for_cql_and_get_hosts
import time
import pytest
import logging
import asyncio
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_truncate_while_migration(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'error_injections_at_startup': ['migration_streaming_wait']
}
servers = []
servers.append(await manager.server_add(config=cfg))
cql = manager.get_cql()
# Create a keyspace with tablets and initial_tablets == 2, then insert data
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);')
keys = range(1024)
await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys])
# Add a node to the cluster. This will cause the tablet load balancer to migrate one tablet to the new node
servers.append(await manager.server_add(config=cfg))
# Wait for tablet streaming to start
pending_node = servers[1]
pending_log = await manager.server_open_log(pending_node.server_id)
await pending_log.wait_for('migration_streaming_wait: start')
await manager.api.message_injection(pending_node.ip_addr, 'migration_streaming_wait')
# Do a TRUNCATE TABLE while the tablet is being streamed
await cql.run_async('TRUNCATE TABLE test.test')
# Wait for streaming to complete
await pending_log.wait_for('raft_topology - Streaming for tablet migration of.*successful')
# Check if we have any data
row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL))
assert row[0].count == 0
async def get_raft_leader_and_log(manager: ManagerClient, servers):
raft_leader_host_id = await get_topology_coordinator(manager)
for s in servers:
if raft_leader_host_id == await manager.get_host_id(s.server_id):
raft_leader = s
break
raft_leader_log = await manager.server_open_log(raft_leader.server_id)
return (raft_leader, raft_leader_log)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_truncate_with_concurrent_drop(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'error_injections_at_startup': ['truncate_table_wait']
}
servers = []
servers.append(await manager.server_add(config=cfg))
servers.append(await manager.server_add(config=cfg))
servers.append(await manager.server_add(config=cfg))
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# Create a keyspace with tablets and initial_tablets == 2, then insert data
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);')
keys = range(1024)
await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys])
(raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers)
if raft_leader == servers[0]:
trunc_host = hosts[1]
drop_host = hosts[2]
elif raft_leader == servers[1]:
trunc_host = hosts[0]
drop_host = hosts[2]
elif raft_leader == servers[2]:
trunc_host = hosts[0]
drop_host = hosts[1]
else:
assert False, 'Unable to determine raft leader'
# Start a TRUNCATE in the background
trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=trunc_host)
# Wait for the topology coordinator to reach a point wher it is about to start sending the truncate RPCs
await raft_leader_log.wait_for('truncate_table_wait: start')
# Execute DROP TABLE
await cql.run_async('DROP TABLE test.test', host=drop_host)
# Release TRUNCATE table in topology coordinator
await manager.api.message_injection(raft_leader.ip_addr, 'truncate_table_wait')
# Check we received an error
with pytest.raises(InvalidRequest, match='unconfigured table test'):
await trunc_future
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_truncate_while_node_restart(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True }
servers = []
servers.append(await manager.server_add(config=cfg))
servers.append(await manager.server_add(config=cfg))
servers.append(await manager.server_add(config=cfg))
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# Create a keyspace with tablets and initial_tablets == 2, then insert data
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);')
keys = range(1024)
await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys])
(raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers)
# Decide which node to restart; select a node with a replica but not the raft leader
tablet_replicas = await get_all_tablet_replicas(manager, raft_leader, 'test', 'test')
replica_hosts = [tr.replicas[0][0] for tr in tablet_replicas]
for s in servers:
if s != raft_leader:
host_id = await manager.get_host_id(s.server_id)
if host_id in replica_hosts:
restart_node = s
break
# Shutdown the node containing a replica
await manager.server_stop_gracefully(restart_node.server_id)
# Start truncating in the background
trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=hosts[0])
# Restart the node
await manager.server_start(restart_node.server_id)
# Wait for truncate to complete
await trunc_future
# Check if truncate was successful
row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL))
assert row[0].count == 0
@pytest.mark.xfail(reason="issue #21719")
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_truncate_with_coordinator_crash(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True }
servers = []
servers.append(await manager.server_add(config=cfg))
servers.append(await manager.server_add(config=cfg))
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# Create a keyspace with tablets and initial_tablets == 2, then insert data
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);')
keys = range(1024)
await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys])
(raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers)
if raft_leader == servers[0]:
trunc_host = hosts[1]
else:
trunc_host = hosts[0]
# Enable injection to crash the raft leader after truncate cleared the session ID
await manager.api.enable_injection(raft_leader.ip_addr, 'truncate_crash_after_session_clear', one_shot=False)
# Start a TRUNCATE in the background
trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=trunc_host)
# Wait for the topology coordinator to crash
await raft_leader_log.wait_for('truncate_crash_after_session_clear hit, killing the node')
await manager.server_stop(raft_leader.server_id)
# Restart the crashed node
await manager.server_start(raft_leader.server_id)
# Wait for truncate to complete
await trunc_future
# Check if we have any data
row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL))
assert row[0].count == 0