transport: introduce service_permit class and use it instead of semaphore_units
service_permit is a new class that allows sharing a permit among different parts of request processing many of which can complete at different times.
This commit is contained in:
40
service_permit.hh
Normal file
40
service_permit.hh
Normal file
@@ -0,0 +1,40 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
class service_permit {
|
||||
seastar::lw_shared_ptr<seastar::semaphore_units<>> _permit;
|
||||
service_permit(seastar::semaphore_units<>&& u) : _permit(seastar::make_lw_shared(std::move(u))) {}
|
||||
friend service_permit make_service_permit(seastar::semaphore_units<>&& permit);
|
||||
friend service_permit empty_service_permit();
|
||||
};
|
||||
|
||||
inline service_permit make_service_permit(seastar::semaphore_units<>&& permit) {
|
||||
return service_permit(std::move(permit));
|
||||
}
|
||||
|
||||
inline service_permit empty_service_permit() {
|
||||
return make_service_permit(seastar::semaphore_units<>());
|
||||
}
|
||||
@@ -585,7 +585,7 @@ future<> cql_server::connection::process_request() {
|
||||
}
|
||||
|
||||
return fut.then([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (semaphore_units<> mem_permit) {
|
||||
return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = std::move(mem_permit)] (fragmented_temporary_buffer buf) mutable {
|
||||
return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable {
|
||||
|
||||
++_server._requests_served;
|
||||
++_server._requests_serving;
|
||||
@@ -1223,7 +1223,7 @@ cql_server::connection::make_schema_change_event(const event::schema_change& eve
|
||||
return response;
|
||||
}
|
||||
|
||||
void cql_server::connection::write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, semaphore_units<> permit, cql_compression compression)
|
||||
void cql_server::connection::write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit, cql_compression compression)
|
||||
{
|
||||
_ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response), permit = std::move(permit)] () mutable {
|
||||
auto message = response->make_message(_version, compression);
|
||||
|
||||
@@ -37,6 +37,7 @@
|
||||
#include <seastar/net/tls.hh>
|
||||
#include <seastar/core/metrics_registration.hh>
|
||||
#include "utils/fragmented_temporary_buffer.hh"
|
||||
#include "service_permit.hh"
|
||||
|
||||
namespace scollectd {
|
||||
|
||||
@@ -220,7 +221,7 @@ private:
|
||||
std::unique_ptr<cql_server::response> make_auth_success(int16_t, bytes, const tracing::trace_state_ptr& tr_state);
|
||||
std::unique_ptr<cql_server::response> make_auth_challenge(int16_t, bytes, const tracing::trace_state_ptr& tr_state);
|
||||
|
||||
void write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, semaphore_units<> permit = semaphore_units<>(), cql_compression compression = cql_compression::none);
|
||||
void write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit = empty_service_permit(), cql_compression compression = cql_compression::none);
|
||||
|
||||
void init_cql_serialization_format();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user