messaging_service: Rename stop() to shutdown()

On today's stop() the messaging service is not really stopped as other
services still (may) use it and have registered handlers in it. Inside
the .stop() only the rpc servers are brought down, so the better name
for this method would be shutdown().

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2020-07-24 10:50:28 +03:00
parent e6fb2b58fc
commit 1c8ea817cd
3 changed files with 14 additions and 14 deletions

View File

@@ -455,8 +455,8 @@ future<> messaging_service::stop_client() {
});
}
future<> messaging_service::stop() {
_stopping = true;
future<> messaging_service::shutdown() {
_shutting_down = true;
return when_all(stop_nontls_server(), stop_tls_server(), stop_client()).discard_result();
}
@@ -642,7 +642,7 @@ void messaging_service::cache_preferred_ip(gms::inet_address ep, gms::inet_addre
}
shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::get_rpc_client(messaging_verb verb, msg_addr id) {
assert(!_stopping);
assert(!_shutting_down);
auto idx = get_rpc_client_idx(verb);
auto it = _clients[idx].find(id);
@@ -732,7 +732,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
}
bool messaging_service::remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only) {
if (_stopping) {
if (_shutting_down) {
// if messaging service is in a processed of been stopped no need to
// stop and remove connection here since they are being stopped already
// and we'll just interfere
@@ -783,7 +783,7 @@ rpc::sink<int32_t> messaging_service::make_sink_for_stream_mutation_fragments(rp
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>>
messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) {
using value_type = std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>;
if (is_stopping()) {
if (is_shutting_down()) {
return make_exception_future<value_type>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
@@ -823,7 +823,7 @@ do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr<mes
future<std::tuple<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>>
messaging_service::make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
auto verb = messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM;
if (is_stopping()) {
if (is_shutting_down()) {
return make_exception_future<std::tuple<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(verb, id);
@@ -845,7 +845,7 @@ future<> messaging_service::unregister_repair_get_row_diff_with_rpc_stream() {
future<std::tuple<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>>>
messaging_service::make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
auto verb = messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM;
if (is_stopping()) {
if (is_shutting_down()) {
return make_exception_future<std::tuple<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>>>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(verb, id);
@@ -867,7 +867,7 @@ future<> messaging_service::unregister_repair_put_row_diff_with_rpc_stream() {
future<std::tuple<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>>>
messaging_service::make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
auto verb = messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM;
if (is_stopping()) {
if (is_shutting_down()) {
return make_exception_future<std::tuple<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>>>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(verb, id);
@@ -889,7 +889,7 @@ future<> messaging_service::unregister_repair_get_full_row_hashes_with_rpc_strea
template <typename MsgIn, typename... MsgOut>
auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
if (ms->is_stopping()) {
if (ms->is_shutting_down()) {
using futurator = futurize<std::result_of_t<decltype(rpc_handler)(rpc_protocol::client&, MsgOut...)>>;
return futurator::make_exception_future(rpc::closed_error());
}
@@ -918,7 +918,7 @@ auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOu
template <typename MsgIn, typename Timeout, typename... MsgOut>
auto send_message_timeout(messaging_service* ms, messaging_verb verb, msg_addr id, Timeout timeout, MsgOut&&... msg) {
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
if (ms->is_stopping()) {
if (ms->is_shutting_down()) {
using futurator = futurize<std::result_of_t<decltype(rpc_handler)(rpc_protocol::client&, MsgOut...)>>;
return futurator::make_exception_future(rpc::closed_error());
}

View File

@@ -263,7 +263,7 @@ private:
std::array<std::unique_ptr<rpc_protocol_server_wrapper>, 2> _server_tls;
std::vector<clients_map> _clients;
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::LAST)] = {};
bool _stopping = false;
bool _shutting_down = false;
std::list<std::function<void(gms::inet_address ep)>> _connection_drop_notifiers;
memory_config _mcfg;
scheduling_config _scheduling_config;
@@ -286,9 +286,9 @@ public:
future<> start_listen();
uint16_t port();
gms::inet_address listen_address();
future<> stop();
future<> shutdown();
static rpc::no_wait_type no_wait();
bool is_stopping() { return _stopping; }
bool is_shutting_down() { return _shutting_down; }
gms::inet_address get_preferred_ip(gms::inet_address ep);
future<> init_local_preferred_ip_cache();
void cache_preferred_ip(gms::inet_address ep, gms::inet_address ip);

View File

@@ -2040,7 +2040,7 @@ future<> storage_service::do_stop_ms() {
}
_ms_stopped = true;
return netw::get_messaging_service().invoke_on_all([] (auto& ms) {
return ms.stop();
return ms.shutdown();
}).then([] {
slogger.info("messaging_service stopped");
});