Merge "Silence schema pull errors during upgrade from 1.7 to 2.0" from Tomasz

"Old and new nodes will advertise different schema version because
of different format of schema tables. This will result in attempts
to sync the schema by each of the node.

Currently this will result in scary error messages in logs about
sync failing due to not being able to find schema of given version.
It's benign, but may scare users. It the future incompatibilities
could result in more subtle errors. Better to inhibit it completely."

* 'tgrabiec/fix-schema-pull-errors-during-upgrade' of github.com:cloudius-systems/seastar-dev:
  migration_manager: Give empty response to schema pulls from incompatible nodes
  migration_manager: Don't pull schema from incompatible nodes
  service: Advertise schema tables format version through gossip

(cherry picked from commit 91221e020b)
This commit is contained in:
Avi Kivity
2017-07-10 14:04:04 +03:00
parent e02d4935ee
commit d475a44b01
8 changed files with 29 additions and 13 deletions

View File

@@ -82,6 +82,8 @@ namespace schema_tables {
logging::logger slogger("schema_tables");
const sstring version = "3";
struct push_back_and_return {
std::vector<mutation> muts;

View File

@@ -83,6 +83,10 @@ schema_ptr views();
using namespace v3;
// Change on non-backwards compatible changes of schema mutations.
// Replication of schema between nodes with different version is inhibited.
extern const sstring version;
extern std::vector<const char*> ALL;
std::vector<schema_ptr> all_tables();

View File

@@ -59,8 +59,8 @@ enum class application_state {
TOKENS,
SUPPORTED_FEATURES,
CACHE_HITRATES,
SCHEMA_TABLES_VERSION,
// pad to allow adding new states to existing cluster
X3,
X4,
X5,
X6,

View File

@@ -835,7 +835,7 @@ future<> messaging_service::send_definitions_update(msg_addr id, std::vector<fro
return send_message_oneway(this, messaging_verb::DEFINITIONS_UPDATE, std::move(id), std::move(fm));
}
void messaging_service::register_migration_request(std::function<future<std::vector<frozen_mutation>> ()>&& func) {
void messaging_service::register_migration_request(std::function<future<std::vector<frozen_mutation>> (const rpc::client_info&)>&& func) {
register_handler(this, netw::messaging_verb::MIGRATION_REQUEST, std::move(func));
}
void messaging_service::unregister_migration_request() {

View File

@@ -288,7 +288,7 @@ public:
future<> send_definitions_update(msg_addr id, std::vector<frozen_mutation> fm);
// Wrapper for MIGRATION_REQUEST
void register_migration_request(std::function<future<std::vector<frozen_mutation>> ()>&& func);
void register_migration_request(std::function<future<std::vector<frozen_mutation>> (const rpc::client_info&)>&& func);
void unregister_migration_request();
future<std::vector<frozen_mutation>> send_migration_request(msg_addr id);

View File

@@ -86,7 +86,12 @@ void migration_manager::init_messaging_service()
});
return netw::messaging_service::no_wait();
});
ms.register_migration_request([this] () {
ms.register_migration_request([this] (const rpc::client_info& cinfo) {
auto src = netw::messaging_service::get_source(cinfo);
if (!has_compatible_schema_tables_version(src.addr)) {
mlogger.debug("Ignoring schema request from incompatible node: {}", src);
return make_ready_future<std::vector<frozen_mutation>>(std::vector<frozen_mutation>());
}
return db::schema_tables::convert_schema_to_mutations(get_storage_proxy()).finally([p = get_local_shared_storage_proxy()] {
// keep local proxy alive
});
@@ -220,15 +225,18 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
});
}
bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoint)
{
/*
* Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
* Don't request schema from fat clients
*/
auto& ms = netw::get_local_messaging_service();
return ms.knows_version(endpoint)
&& ms.get_raw_version(endpoint) == netw::messaging_service::current_version
bool migration_manager::has_compatible_schema_tables_version(const gms::inet_address& endpoint) {
auto& gossiper = gms::get_local_gossiper();
auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint);
if (!ep_state) {
return false;
}
auto&& version_opt = ep_state->get_application_state(gms::application_state::SCHEMA_TABLES_VERSION);
return version_opt && version_opt->value == db::schema_tables::version;
}
bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoint) {
return has_compatible_schema_tables_version(endpoint)
&& !gms::get_local_gossiper().is_gossip_only_member(endpoint);
}

View File

@@ -94,6 +94,7 @@ public:
future<> notify_drop_view(const view_ptr& view);
bool should_pull_schema_from(const gms::inet_address& endpoint);
bool has_compatible_schema_tables_version(const gms::inet_address& endpoint);
future<> announce_keyspace_update(lw_shared_ptr<keyspace_metadata> ksm, bool announce_locally = false);

View File

@@ -301,6 +301,7 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
app_states.emplace(gms::application_state::RELEASE_VERSION, value_factory.release_version());
app_states.emplace(gms::application_state::SUPPORTED_FEATURES, value_factory.supported_features(features));
app_states.emplace(gms::application_state::CACHE_HITRATES, value_factory.cache_hitrates(""));
app_states.emplace(gms::application_state::SCHEMA_TABLES_VERSION, versioned_value(db::schema_tables::version));
slogger.info("Starting up server gossip");
auto& gossiper = gms::get_local_gossiper();