messaging_service: add canonical_support to schema pull and push verbs

The verbs are:
* DEFINITIONS_UPDATE (push)
* MIGRATION_REQUEST (pull)

Support was added in a backward-compatible way. The push verb, sends
both the old frozen mutation parameter, and the new optional canonical
mutation parameter. It is expected that new nodes will use the latter,
while old nodes will fall-back to the former. The pull verb has a new
optional `options` parameter, which for now contains a single flag:
`remote_supports_canonical_mutation_retval`. This flag, if set, means
that the remote node supports the new canonical mutation return value,
thus the old frozen mutations return value can be left empty.
This commit is contained in:
Botond Dénes
2019-08-26 17:41:04 +03:00
parent d9a8ff15d8
commit 7adc764b6e
5 changed files with 85 additions and 20 deletions

View File

@@ -797,6 +797,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/consistency_level.idl.hh',
'idl/cache_temperature.idl.hh',
'idl/view.idl.hh',
'idl/messaging_service.idl.hh',
]
headers = find_headers('.', excluded_dirs=['idl', 'build', 'seastar', '.git'])

View File

@@ -0,0 +1,28 @@
/*
* Copyright 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
namespace netw {
struct schema_pull_options {
bool remote_supports_canonical_mutation_retval;
};
} // namespace netw

View File

@@ -60,6 +60,7 @@
#include "idl/cache_temperature.dist.hh"
#include "idl/view.dist.hh"
#include "idl/mutation.dist.hh"
#include "idl/messaging_service.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/consistency_level.dist.impl.hh"
@@ -80,6 +81,7 @@
#include "idl/query.dist.impl.hh"
#include "idl/cache_temperature.dist.impl.hh"
#include "idl/mutation.dist.impl.hh"
#include "idl/messaging_service.dist.impl.hh"
#include <seastar/rpc/lz4_compressor.hh>
#include <seastar/rpc/lz4_fragmented_compressor.hh>
#include <seastar/rpc/multi_algo_compressor_factory.hh>
@@ -948,24 +950,28 @@ future<> messaging_service::send_gossip_digest_ack2(msg_addr id, gossip_digest_a
return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(msg));
}
void messaging_service::register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm)>&& func) {
void messaging_service::register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm,
rpc::optional<std::vector<canonical_mutation>> cm)>&& func) {
register_handler(this, netw::messaging_verb::DEFINITIONS_UPDATE, std::move(func));
}
void messaging_service::unregister_definitions_update() {
_rpc->unregister_handler(netw::messaging_verb::DEFINITIONS_UPDATE);
}
future<> messaging_service::send_definitions_update(msg_addr id, std::vector<frozen_mutation> fm) {
return send_message_oneway(this, messaging_verb::DEFINITIONS_UPDATE, std::move(id), std::move(fm));
future<> messaging_service::send_definitions_update(msg_addr id, std::vector<frozen_mutation> fm, std::vector<canonical_mutation> cm) {
return send_message_oneway(this, messaging_verb::DEFINITIONS_UPDATE, std::move(id), std::move(fm), std::move(cm));
}
void messaging_service::register_migration_request(std::function<future<std::vector<frozen_mutation>> (const rpc::client_info&)>&& func) {
void messaging_service::register_migration_request(std::function<future<std::vector<frozen_mutation>, std::vector<canonical_mutation>>
(const rpc::client_info&, rpc::optional<schema_pull_options>)>&& func) {
register_handler(this, netw::messaging_verb::MIGRATION_REQUEST, std::move(func));
}
void messaging_service::unregister_migration_request() {
_rpc->unregister_handler(netw::messaging_verb::MIGRATION_REQUEST);
}
future<std::vector<frozen_mutation>> messaging_service::send_migration_request(msg_addr id) {
return send_message<std::vector<frozen_mutation>>(this, messaging_verb::MIGRATION_REQUEST, std::move(id));
future<std::vector<frozen_mutation>, rpc::optional<std::vector<canonical_mutation>>> messaging_service::send_migration_request(msg_addr id,
schema_pull_options options) {
return send_message<future<std::vector<frozen_mutation>, rpc::optional<std::vector<canonical_mutation>>>>(this, messaging_verb::MIGRATION_REQUEST,
std::move(id), options);
}
void messaging_service::register_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,

View File

@@ -153,6 +153,10 @@ namespace netw {
struct serializer {};
struct schema_pull_options {
bool remote_supports_canonical_mutation_retval = true;
};
class messaging_service : public seastar::async_sharded_service<messaging_service> {
public:
struct rpc_protocol_wrapper;
@@ -380,14 +384,17 @@ public:
future<> send_gossip_digest_ack2(msg_addr id, gms::gossip_digest_ack2 msg);
// Wrapper for DEFINITIONS_UPDATE
void register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm)>&& func);
void register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm,
rpc::optional<std::vector<canonical_mutation>> cm)>&& func);
void unregister_definitions_update();
future<> send_definitions_update(msg_addr id, std::vector<frozen_mutation> fm);
future<> send_definitions_update(msg_addr id, std::vector<frozen_mutation> fm, std::vector<canonical_mutation> cm);
// Wrapper for MIGRATION_REQUEST
void register_migration_request(std::function<future<std::vector<frozen_mutation>> (const rpc::client_info&)>&& func);
void register_migration_request(std::function<future<std::vector<frozen_mutation>, std::vector<canonical_mutation>> (
const rpc::client_info&, rpc::optional<schema_pull_options>)>&& func);
void unregister_migration_request();
future<std::vector<frozen_mutation>> send_migration_request(msg_addr id);
future<std::vector<frozen_mutation>, rpc::optional<std::vector<canonical_mutation>>> send_migration_request(msg_addr id,
schema_pull_options options);
// FIXME: response_id_type is an alias in service::storage_proxy::response_id_type
using response_id_type = uint64_t;

View File

@@ -95,12 +95,20 @@ void migration_manager::init_messaging_service()
_feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema));
auto& ms = netw::get_local_messaging_service();
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm, rpc::optional<std::vector<canonical_mutation>> cm) {
auto src = netw::messaging_service::get_source(cinfo);
auto f = make_ready_future<>();
if (cm) {
f = do_with(std::move(*cm), get_local_shared_storage_proxy(), [src] (const std::vector<canonical_mutation>& mutations, shared_ptr<storage_proxy>& p) {
return service::get_local_migration_manager().merge_schema_from(src, mutations);
});
} else {
f = do_with(std::move(fm), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
return service::get_local_migration_manager().merge_schema_from(src, mutations);
});
}
// Start a new fiber.
(void)do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
return service::get_local_migration_manager().merge_schema_from(src, mutations);
}).then_wrapped([src] (auto&& f) {
(void)f.then_wrapped([src] (auto&& f) {
if (f.failed()) {
mlogger.error("Failed to update definitions from {}: {}", src, f.get_exception());
} else {
@@ -109,19 +117,27 @@ void migration_manager::init_messaging_service()
});
return netw::messaging_service::no_wait();
});
ms.register_migration_request([this] (const rpc::client_info& cinfo) {
ms.register_migration_request([this] (const rpc::client_info& cinfo, rpc::optional<netw::schema_pull_options> options) {
using frozen_mutations = std::vector<frozen_mutation>;
using canonical_mutations = std::vector<canonical_mutation>;
const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval;
auto src = netw::messaging_service::get_source(cinfo);
if (!has_compatible_schema_tables_version(src.addr)) {
mlogger.debug("Ignoring schema request from incompatible node: {}", src);
return make_ready_future<std::vector<frozen_mutation>>(std::vector<frozen_mutation>());
return make_ready_future<frozen_mutations, canonical_mutations>(frozen_mutations{}, canonical_mutations{});
}
auto features = get_local_storage_service().cluster_schema_features();
auto& proxy = get_storage_proxy();
return db::schema_tables::convert_schema_to_mutations(proxy, features).then([&proxy] (std::vector<canonical_mutation>&& schema_mutations) {
return db::schema_tables::convert_schema_to_mutations(proxy, features).then([&proxy, cm_retval_supported] (std::vector<canonical_mutation>&& cm) {
const auto& db = proxy.local().get_db().local();
return boost::copy_range<std::vector<frozen_mutation>>(schema_mutations | boost::adaptors::transformed([&db] (const canonical_mutation& cm) {
if (cm_retval_supported) {
return make_ready_future<frozen_mutations, canonical_mutations>(frozen_mutations{}, std::move(cm));
}
auto fm = boost::copy_range<std::vector<frozen_mutation>>(cm | boost::adaptors::transformed([&db] (const canonical_mutation& cm) {
return cm.to_mutation(db.find_column_family(cm.column_family_id()).schema());
}));
return make_ready_future<frozen_mutations, canonical_mutations>(std::move(fm), std::move(cm));
}).finally([p = get_local_shared_storage_proxy()] {
// keep local proxy alive
});
@@ -254,7 +270,13 @@ future<> migration_manager::do_merge_schema_from(netw::messaging_service::msg_ad
{
auto& ms = netw::get_local_messaging_service();
mlogger.info("Pulling schema from {}", id);
return ms.send_migration_request(std::move(id)).then([this, id] (std::vector<frozen_mutation> mutations) {
return ms.send_migration_request(std::move(id), netw::schema_pull_options{}).then([this, id] (std::vector<frozen_mutation> mutations,
rpc::optional<std::vector<canonical_mutation>> canonical_mutations) {
if (canonical_mutations) {
return do_with(std::move(*canonical_mutations), [this, id] (std::vector<canonical_mutation>& mutations) {
return this->merge_schema_from(id, mutations);
});
}
return do_with(std::move(mutations), [this, id] (auto&& mutations) {
return this->merge_schema_from(id, mutations);
});
@@ -880,7 +902,8 @@ future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoi
{
netw::messaging_service::msg_addr id{endpoint, 0};
auto fm = std::vector<frozen_mutation>(schema.begin(), schema.end());
return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm));
auto cm = std::vector<canonical_mutation>(schema.begin(), schema.end());
return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm), std::move(cm));
}
// Returns a future on the local application of the schema