diff --git a/configure.py b/configure.py
index 8eba6a728f..ece836c33c 100755
--- a/configure.py
+++ b/configure.py
@@ -797,6 +797,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/consistency_level.idl.hh',
'idl/cache_temperature.idl.hh',
'idl/view.idl.hh',
+ 'idl/messaging_service.idl.hh',
]
headers = find_headers('.', excluded_dirs=['idl', 'build', 'seastar', '.git'])
diff --git a/idl/messaging_service.idl.hh b/idl/messaging_service.idl.hh
new file mode 100644
index 0000000000..fcd8d82094
--- /dev/null
+++ b/idl/messaging_service.idl.hh
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2019 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+namespace netw {
+
+struct schema_pull_options {
+ bool remote_supports_canonical_mutation_retval;
+};
+
+} // namespace netw
diff --git a/message/messaging_service.cc b/message/messaging_service.cc
index 7421e1ea00..1c3b02e737 100644
--- a/message/messaging_service.cc
+++ b/message/messaging_service.cc
@@ -60,6 +60,7 @@
#include "idl/cache_temperature.dist.hh"
#include "idl/view.dist.hh"
#include "idl/mutation.dist.hh"
+#include "idl/messaging_service.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/consistency_level.dist.impl.hh"
@@ -80,6 +81,7 @@
#include "idl/query.dist.impl.hh"
#include "idl/cache_temperature.dist.impl.hh"
#include "idl/mutation.dist.impl.hh"
+#include "idl/messaging_service.dist.impl.hh"
#include
#include
#include
@@ -948,24 +950,28 @@ future<> messaging_service::send_gossip_digest_ack2(msg_addr id, gossip_digest_a
return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(msg));
}
-void messaging_service::register_definitions_update(std::function fm)>&& func) {
+void messaging_service::register_definitions_update(std::function fm,
+ rpc::optional> cm)>&& func) {
register_handler(this, netw::messaging_verb::DEFINITIONS_UPDATE, std::move(func));
}
void messaging_service::unregister_definitions_update() {
_rpc->unregister_handler(netw::messaging_verb::DEFINITIONS_UPDATE);
}
-future<> messaging_service::send_definitions_update(msg_addr id, std::vector fm) {
- return send_message_oneway(this, messaging_verb::DEFINITIONS_UPDATE, std::move(id), std::move(fm));
+future<> messaging_service::send_definitions_update(msg_addr id, std::vector fm, std::vector cm) {
+ return send_message_oneway(this, messaging_verb::DEFINITIONS_UPDATE, std::move(id), std::move(fm), std::move(cm));
}
-void messaging_service::register_migration_request(std::function> (const rpc::client_info&)>&& func) {
+void messaging_service::register_migration_request(std::function, std::vector>
+ (const rpc::client_info&, rpc::optional)>&& func) {
register_handler(this, netw::messaging_verb::MIGRATION_REQUEST, std::move(func));
}
void messaging_service::unregister_migration_request() {
_rpc->unregister_handler(netw::messaging_verb::MIGRATION_REQUEST);
}
-future> messaging_service::send_migration_request(msg_addr id) {
- return send_message>(this, messaging_verb::MIGRATION_REQUEST, std::move(id));
+future, rpc::optional>> messaging_service::send_migration_request(msg_addr id,
+ schema_pull_options options) {
+ return send_message, rpc::optional>>>(this, messaging_verb::MIGRATION_REQUEST,
+ std::move(id), options);
}
void messaging_service::register_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward,
diff --git a/message/messaging_service.hh b/message/messaging_service.hh
index 46b657f22e..74f6a45ecc 100644
--- a/message/messaging_service.hh
+++ b/message/messaging_service.hh
@@ -153,6 +153,10 @@ namespace netw {
struct serializer {};
+struct schema_pull_options {
+ bool remote_supports_canonical_mutation_retval = true;
+};
+
class messaging_service : public seastar::async_sharded_service {
public:
struct rpc_protocol_wrapper;
@@ -380,14 +384,17 @@ public:
future<> send_gossip_digest_ack2(msg_addr id, gms::gossip_digest_ack2 msg);
// Wrapper for DEFINITIONS_UPDATE
- void register_definitions_update(std::function fm)>&& func);
+ void register_definitions_update(std::function fm,
+ rpc::optional> cm)>&& func);
void unregister_definitions_update();
- future<> send_definitions_update(msg_addr id, std::vector fm);
+ future<> send_definitions_update(msg_addr id, std::vector fm, std::vector cm);
// Wrapper for MIGRATION_REQUEST
- void register_migration_request(std::function> (const rpc::client_info&)>&& func);
+ void register_migration_request(std::function, std::vector> (
+ const rpc::client_info&, rpc::optional)>&& func);
void unregister_migration_request();
- future> send_migration_request(msg_addr id);
+ future, rpc::optional>> send_migration_request(msg_addr id,
+ schema_pull_options options);
// FIXME: response_id_type is an alias in service::storage_proxy::response_id_type
using response_id_type = uint64_t;
diff --git a/service/migration_manager.cc b/service/migration_manager.cc
index 60499ac455..1ad3991312 100644
--- a/service/migration_manager.cc
+++ b/service/migration_manager.cc
@@ -95,12 +95,20 @@ void migration_manager::init_messaging_service()
_feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema));
auto& ms = netw::get_local_messaging_service();
- ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector m) {
+ ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector fm, rpc::optional> cm) {
auto src = netw::messaging_service::get_source(cinfo);
+ auto f = make_ready_future<>();
+ if (cm) {
+ f = do_with(std::move(*cm), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) {
+ return service::get_local_migration_manager().merge_schema_from(src, mutations);
+ });
+ } else {
+ f = do_with(std::move(fm), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) {
+ return service::get_local_migration_manager().merge_schema_from(src, mutations);
+ });
+ }
// Start a new fiber.
- (void)do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) {
- return service::get_local_migration_manager().merge_schema_from(src, mutations);
- }).then_wrapped([src] (auto&& f) {
+ (void)f.then_wrapped([src] (auto&& f) {
if (f.failed()) {
mlogger.error("Failed to update definitions from {}: {}", src, f.get_exception());
} else {
@@ -109,19 +117,27 @@ void migration_manager::init_messaging_service()
});
return netw::messaging_service::no_wait();
});
- ms.register_migration_request([this] (const rpc::client_info& cinfo) {
+ ms.register_migration_request([this] (const rpc::client_info& cinfo, rpc::optional options) {
+ using frozen_mutations = std::vector;
+ using canonical_mutations = std::vector;
+ const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval;
+
auto src = netw::messaging_service::get_source(cinfo);
if (!has_compatible_schema_tables_version(src.addr)) {
mlogger.debug("Ignoring schema request from incompatible node: {}", src);
- return make_ready_future>(std::vector());
+ return make_ready_future(frozen_mutations{}, canonical_mutations{});
}
auto features = get_local_storage_service().cluster_schema_features();
auto& proxy = get_storage_proxy();
- return db::schema_tables::convert_schema_to_mutations(proxy, features).then([&proxy] (std::vector&& schema_mutations) {
+ return db::schema_tables::convert_schema_to_mutations(proxy, features).then([&proxy, cm_retval_supported] (std::vector&& cm) {
const auto& db = proxy.local().get_db().local();
- return boost::copy_range>(schema_mutations | boost::adaptors::transformed([&db] (const canonical_mutation& cm) {
+ if (cm_retval_supported) {
+ return make_ready_future(frozen_mutations{}, std::move(cm));
+ }
+ auto fm = boost::copy_range>(cm | boost::adaptors::transformed([&db] (const canonical_mutation& cm) {
return cm.to_mutation(db.find_column_family(cm.column_family_id()).schema());
}));
+ return make_ready_future(std::move(fm), std::move(cm));
}).finally([p = get_local_shared_storage_proxy()] {
// keep local proxy alive
});
@@ -254,7 +270,13 @@ future<> migration_manager::do_merge_schema_from(netw::messaging_service::msg_ad
{
auto& ms = netw::get_local_messaging_service();
mlogger.info("Pulling schema from {}", id);
- return ms.send_migration_request(std::move(id)).then([this, id] (std::vector mutations) {
+ return ms.send_migration_request(std::move(id), netw::schema_pull_options{}).then([this, id] (std::vector mutations,
+ rpc::optional> canonical_mutations) {
+ if (canonical_mutations) {
+ return do_with(std::move(*canonical_mutations), [this, id] (std::vector& mutations) {
+ return this->merge_schema_from(id, mutations);
+ });
+ }
return do_with(std::move(mutations), [this, id] (auto&& mutations) {
return this->merge_schema_from(id, mutations);
});
@@ -880,7 +902,8 @@ future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoi
{
netw::messaging_service::msg_addr id{endpoint, 0};
auto fm = std::vector(schema.begin(), schema.end());
- return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm));
+ auto cm = std::vector(schema.begin(), schema.end());
+ return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm), std::move(cm));
}
// Returns a future on the local application of the schema