Merge 'transport: improve memory accounting for big responses and slow network' from Marcin Maliszkiewicz

After obtaining the CQL response, check if its actual size exceeds the initially acquired memory permit. If so, acquire additional semaphore units and adopt them into the permit, ensuring accurate memory accounting for large responses.

Additionally, move the permit into a .then() continuation so that the semaphore units are kept alive until write_message finishes, preventing premature release of memory permit. This is especially important with slow networks and big responses when buffers can accumulate and deplete a node's memory.

Fixes: SCYLLADB-1453
Related https://scylladb.atlassian.net/browse/SCYLLADB-740

Backport: all supported versions

Closes scylladb/scylladb#29288

* github.com:scylladb/scylladb:
  transport: add per-service-level pending response memory metric
  transport: hold memory permit until response write completes
  transport: account for response size exceeding initial memory estimate

(cherry picked from commit 86417d49de)

Closes scylladb/scylladb#29410

Closes scylladb/scylladb#29455

Closes scylladb/scylladb#29590
This commit is contained in:
Marcin Maliszkiewicz
2026-04-22 16:37:09 +02:00
committed by Botond Dénes
parent e434057784
commit ca90676b2b
3 changed files with 42 additions and 6 deletions

View File

@@ -18,6 +18,13 @@ class service_permit {
friend service_permit empty_service_permit();
public:
size_t count() const { return _permit ? _permit->count() : 0; };
// Merge additional semaphore units into this permit.
// Used to grow the permit after the actual resource cost is known.
void adopt(seastar::semaphore_units<>&& units) {
if (_permit) {
_permit->adopt(std::move(units));
}
}
};
inline service_permit make_service_permit(seastar::semaphore_units<>&& permit) {

View File

@@ -240,6 +240,12 @@ void cql_sg_stats::register_metrics()
);
}
transport_metrics.emplace_back(
sm::make_gauge("cql_pending_response_memory", [this] { return _pending_response_memory; },
sm::description("Holds the total memory in bytes consumed by responses waiting to be sent."),
{{"scheduling_group_name", cur_sg_name}}).set_skip_when_empty()
);
new_metrics.add_group("transport", std::move(transport_metrics));
_metrics = std::exchange(new_metrics, {});
}
@@ -817,6 +823,8 @@ future<> cql_server::connection::process_request() {
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
try {
auto& sg_stats = _server.get_cql_sg_stats();
size_t pending_response_size = 0;
if (response_f.failed()) {
const auto message = format("request processing failed, error [{}]", response_f.get_exception());
clogger.error("{}: {}", _client_state.get_remote_address(), message);
@@ -824,9 +832,22 @@ future<> cql_server::connection::process_request() {
message,
tracing::trace_state_ptr()));
} else {
write_response(response_f.get(), std::move(mem_permit), _compression);
auto response = response_f.get();
// Account for response body size exceeding the initial estimate.
auto resp_size = response->size();
auto permit_size = mem_permit.count();
if (resp_size > permit_size) {
auto extra = resp_size - permit_size;
auto extra_units = consume_units(_server._memory_available, extra);
mem_permit.adopt(std::move(extra_units));
}
pending_response_size = resp_size;
sg_stats._pending_response_memory += pending_response_size;
write_response(std::move(response), _compression);
}
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size] {
sg_stats._pending_response_memory -= pending_response_size;
});
} catch (...) {
clogger.error("{}: request processing failed: {}",
_client_state.get_remote_address(), std::current_exception());
@@ -1667,9 +1688,9 @@ cql_server::connection::make_schema_change_event(const event::schema_change& eve
return response;
}
void cql_server::connection::write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit, cql_compression compression)
void cql_server::connection::write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, cql_compression compression)
{
_ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response), permit = std::move(permit)] () mutable {
_ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response)] () mutable {
auto message = response->make_message(_version, compression);
message.on_delete([response = std::move(response)] { });
return _write_buf.write(std::move(message)).then([this] {

View File

@@ -135,6 +135,10 @@ struct cql_sg_stats {
request_kind_stats& get_cql_opcode_stats(cql_binary_opcode op) { return _cql_requests_stats[static_cast<uint8_t>(op)]; }
void register_metrics();
void rename_metrics();
// Track total memory consumed by responses waiting to be sent.
// Incremented when a response is queued, decremented when the write completes.
int64_t _pending_response_memory = 0;
private:
bool _use_metrics = false;
seastar::metrics::metric_groups _metrics;
@@ -200,8 +204,11 @@ public:
service::endpoint_lifecycle_subscriber* get_lifecycle_listener() const noexcept;
service::migration_listener* get_migration_listener() const noexcept;
qos::qos_configuration_change_subscriber* get_qos_configuration_listener() const noexcept;
cql_sg_stats& get_cql_sg_stats() {
return scheduling_group_get_specific<cql_sg_stats>(_stats_key);
}
cql_sg_stats::request_kind_stats& get_cql_opcode_stats(cql_binary_opcode op) {
return scheduling_group_get_specific<cql_sg_stats>(_stats_key).get_cql_opcode_stats(op);
return get_cql_sg_stats().get_cql_opcode_stats(op);
}
future<utils::chunked_vector<client_data>> get_client_data();
@@ -330,7 +337,8 @@ private:
process_on_shard(shard_id shard, uint16_t stream, fragmented_temporary_buffer::istream is, service::client_state& cs,
tracing::trace_state_ptr trace_state, cql3::dialect dialect, cql3::computed_function_values&& cached_vals, Process process_fn);
void write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit = empty_service_permit(), cql_compression compression = cql_compression::none);
void write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, cql_compression compression = cql_compression::none);
friend event_notifier;
};