From c9ee67c85cbde9d2f566cb7a9b00b8c30ea834b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 9 Apr 2026 13:36:31 +0300 Subject: [PATCH] Merge 'transport: improve memory accounting for big responses and slow network' from Marcin Maliszkiewicz After obtaining the CQL response, check if its actual size exceeds the initially acquired memory permit. If so, acquire additional semaphore units and adopt them into the permit, ensuring accurate memory accounting for large responses. Additionally, move the permit into a .then() continuation so that the semaphore units are kept alive until write_message finishes, preventing premature release of memory permit. This is especially important with slow networks and big responses when buffers can accumulate and deplete a node's memory. Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1306 Related https://scylladb.atlassian.net/browse/SCYLLADB-740 Backport: all supported versions Closes scylladb/scylladb#29288 * github.com:scylladb/scylladb: transport: add per-service-level pending response memory metric transport: hold memory permit until response write completes transport: account for response size exceeding initial memory estimate (cherry picked from commit 86417d49de1ad5b29d92f340ac12e613607135ee) Closes scylladb/scylladb#29410 Closes scylladb/scylladb#29455 --- service_permit.hh | 7 +++++++ transport/server.cc | 29 +++++++++++++++++++++++++---- transport/server.hh | 11 +++++++++-- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/service_permit.hh b/service_permit.hh index 0934d9e86a..5aeec14987 100644 --- a/service_permit.hh +++ b/service_permit.hh @@ -18,6 +18,13 @@ class service_permit { friend service_permit empty_service_permit(); public: size_t count() const { return _permit ? _permit->count() : 0; }; + // Merge additional semaphore units into this permit. + // Used to grow the permit after the actual resource cost is known. + void adopt(seastar::semaphore_units<>&& units) { + if (_permit) { + _permit->adopt(std::move(units)); + } + } }; inline service_permit make_service_permit(seastar::semaphore_units<>&& permit) { diff --git a/transport/server.cc b/transport/server.cc index 3ade50cc61..bace5529f0 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -243,6 +243,12 @@ void cql_sg_stats::register_metrics() ); } + transport_metrics.emplace_back( + sm::make_gauge("cql_pending_response_memory", [this] { return _pending_response_memory; }, + sm::description("Holds the total memory in bytes consumed by responses waiting to be sent."), + {{"scheduling_group_name", cur_sg_name}}).set_skip_when_empty() + ); + new_metrics.add_group("transport", std::move(transport_metrics)); _metrics = std::exchange(new_metrics, {}); } @@ -831,6 +837,8 @@ future<> cql_server::connection::process_request() { future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future>> response_f) mutable { try { + auto& sg_stats = _server.get_cql_sg_stats(); + size_t pending_response_size = 0; if (response_f.failed()) { const auto message = format("request processing failed, error [{}]", response_f.get_exception()); clogger.error("{}: {}", _client_state.get_remote_address(), message); @@ -838,9 +846,22 @@ future<> cql_server::connection::process_request() { message, tracing::trace_state_ptr())); } else { - write_response(response_f.get(), std::move(mem_permit), _compression); + auto response = response_f.get(); + // Account for response body size exceeding the initial estimate. + auto resp_size = response->size(); + auto permit_size = mem_permit.count(); + if (resp_size > permit_size) { + auto extra = resp_size - permit_size; + auto extra_units = consume_units(_server._memory_available, extra); + mem_permit.adopt(std::move(extra_units)); + } + pending_response_size = resp_size; + sg_stats._pending_response_memory += pending_response_size; + write_response(std::move(response), _compression); } - _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {}); + _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size] { + sg_stats._pending_response_memory -= pending_response_size; + }); } catch (...) { clogger.error("{}: request processing failed: {}", _client_state.get_remote_address(), std::current_exception()); @@ -1754,9 +1775,9 @@ cql_server::connection::make_schema_change_event(const event::schema_change& eve return response; } -void cql_server::connection::write_response(foreign_ptr>&& response, service_permit permit, cql_compression compression) +void cql_server::connection::write_response(foreign_ptr>&& response, cql_compression compression) { - _ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response), permit = std::move(permit)] () mutable { + _ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response)] () mutable { utils::result_with_exception_ptr> message = response->make_message(_version, compression); if (!message) [[unlikely]] { return make_exception_future<>(std::move(message).assume_error()); diff --git a/transport/server.hh b/transport/server.hh index 249a952575..1de459779b 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -138,6 +138,10 @@ struct cql_sg_stats { request_kind_stats& get_cql_opcode_stats(cql_binary_opcode op) { return _cql_requests_stats[static_cast(op)]; } void register_metrics(); void rename_metrics(); + + // Track total memory consumed by responses waiting to be sent. + // Incremented when a response is queued, decremented when the write completes. + int64_t _pending_response_memory = 0; private: bool _use_metrics = false; seastar::metrics::metric_groups _metrics; @@ -229,8 +233,11 @@ public: service::endpoint_lifecycle_subscriber* get_lifecycle_listener() const noexcept; service::migration_listener* get_migration_listener() const noexcept; qos::qos_configuration_change_subscriber* get_qos_configuration_listener() const noexcept; + cql_sg_stats& get_cql_sg_stats() { + return scheduling_group_get_specific(_stats_key); + } cql_sg_stats::request_kind_stats& get_cql_opcode_stats(cql_binary_opcode op) { - return scheduling_group_get_specific(_stats_key).get_cql_opcode_stats(op); + return get_cql_sg_stats().get_cql_opcode_stats(op); } future> get_client_data(); @@ -358,7 +365,7 @@ private: process_on_shard(shard_id shard, uint16_t stream, fragmented_temporary_buffer::istream is, service::client_state& cs, tracing::trace_state_ptr trace_state, cql3::dialect dialect, cql3::computed_function_values&& cached_vals, Process process_fn); - void write_response(foreign_ptr>&& response, service_permit permit = empty_service_permit(), cql_compression compression = cql_compression::none); + void write_response(foreign_ptr>&& response, cql_compression compression = cql_compression::none); friend event_notifier; };