storage_proxy: wait on already running truncate for the same table
Currently, we can not have more than one global topology operation at the same time. This means that we can not have concurrent truncate operations because truncate is implemented as a global topology operation. Truncate excludes with other topology operations, and has to wait for those to complete before truncate starts executing. This can lead to truncate timeouts. In these cases the client retries the truncate operation, which will check for ongoing global topology operations, and will fail with an "Another global topology request is ongoing, please retry." error. This can be avoided by truncate checking if we have a truncate for the same table already queued. In this case, we can wait for the ongoing truncate to complete instead of immediatelly failing the operation, and provide a better user experience.
This commit is contained in:
@@ -1074,14 +1074,33 @@ private:
|
||||
while (true) {
|
||||
group0_guard guard = co_await _group0_client.start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
if (_topology_state_machine._topology.global_request.has_value()) {
|
||||
const table_id table_id = _sp.local_db().find_uuid(ks_name, cf_name);
|
||||
|
||||
// Check if we already have a truncate queued for the same table. This can happen when a truncate has timed out
|
||||
// and the client retried by issuing the same truncate again. In this case, instead of failing the request with
|
||||
// an "Another global topology request is ongoing" error, we can wait for the already queued request to complete.
|
||||
// Note that we can not do this for a truncate which the topology coordinator has already started processing,
|
||||
// only for a truncate which is still waiting.
|
||||
std::optional<global_topology_request>& global_request = _topology_state_machine._topology.global_request;
|
||||
if (global_request.has_value()) {
|
||||
if (*global_request == global_topology_request::truncate_table) {
|
||||
std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;
|
||||
if (!tstate || *tstate != topology::transition_state::truncate_table) {
|
||||
const utils::UUID& ongoing_global_request_id = *_topology_state_machine._topology.global_request_id;
|
||||
const auto topology_requests_entry = co_await _sys_ks.local().get_topology_request_entry(ongoing_global_request_id, true);
|
||||
if (topology_requests_entry.truncate_table_id == table_id) {
|
||||
global_request_id = ongoing_global_request_id;
|
||||
slogger.info("Ongoing TRUNCATE for table {}.{} (global request ID {}) detected; waiting for it to complete",
|
||||
ks_name, cf_name, global_request_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
throw exceptions::invalid_request_exception("Another global topology request is ongoing, please retry.");
|
||||
}
|
||||
|
||||
global_request_id = guard.new_group0_state_id();
|
||||
|
||||
const table_id table_id = _sp.local_db().find_uuid(ks_name, cf_name);
|
||||
|
||||
std::vector<canonical_mutation> updates;
|
||||
updates.emplace_back(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_global_topology_request(global_topology_request::truncate_table)
|
||||
|
||||
Reference in New Issue
Block a user