Merge "Fixes for schema synchronization" from Tomek

"Writes may start to be rejected by replicas after issuing alter table
 which doesn't affect columns. This affects all versions with alter table
 support.

 Fixes #1258"
This commit is contained in:
Pekka Enberg
2016-05-12 09:43:25 +03:00
5 changed files with 154 additions and 12 deletions

View File

@@ -162,6 +162,7 @@ modes = {
scylla_tests = [
'tests/mutation_test',
'tests/schema_registry_test',
'tests/canonical_mutation_test',
'tests/range_test',
'tests/types_test',

View File

@@ -203,12 +203,15 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
return make_ready_future<>();
case schema_registry_entry::sync_state::SYNCING:
return _synced_future;
case schema_registry_entry::sync_state::NOT_SYNCED:
case schema_registry_entry::sync_state::NOT_SYNCED: {
logger.debug("Syncing {}", _version);
_synced_promise = {};
do_with(std::move(syncer), [] (auto& syncer) {
auto f = do_with(std::move(syncer), [] (auto& syncer) {
return syncer();
}).then_wrapped([this, self = shared_from_this()] (auto&& f) {
});
_synced_future = _synced_promise.get_future();
_sync_state = schema_registry_entry::sync_state::SYNCING;
f.then_wrapped([this, self = shared_from_this()] (auto&& f) {
if (_sync_state != sync_state::SYNCING) {
return;
}
@@ -222,9 +225,8 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
_synced_promise.set_value();
}
});
_synced_future = _synced_promise.get_future();
_sync_state = schema_registry_entry::sync_state::SYNCING;
return _synced_future;
}
default:
assert(0);
}

View File

@@ -725,20 +725,28 @@ public static class MigrationsSerializer implements IVersionedSerializer<Collect
//
// The endpoint is the node from which 's' originated.
//
// FIXME: Avoid the sync if the source was/is synced by schema_tables::merge_schema().
static future<> maybe_sync(const schema_ptr& s, net::messaging_service::msg_addr endpoint) {
if (s->is_synced()) {
return make_ready_future<>();
}
// Serialize schema sync by always doing it on shard 0.
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint] {
schema_ptr s = gs.get();
schema_registry_entry& e = *s->registry_entry();
return e.maybe_sync([endpoint, s] {
return s->registry_entry()->maybe_sync([s, endpoint] {
auto merge = [gs = global_schema_ptr(s), endpoint] {
schema_ptr s = gs.get();
logger.debug("Syncing schema of {}.{} (v={}) with {}", s->ks_name(), s->cf_name(), s->version(), endpoint);
return get_local_migration_manager().merge_schema_from(endpoint);
});
};
// Serialize schema sync by always doing it on shard 0.
if (engine().cpu_id() == 0) {
return merge();
} else {
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint, merge] {
schema_ptr s = gs.get();
schema_registry_entry& e = *s->registry_entry();
return e.maybe_sync(merge);
});
}
});
}

View File

@@ -32,6 +32,7 @@ boost_tests = [
'types_test',
'keys_test',
'mutation_test',
'schema_registry_test',
'range_test',
'mutation_reader_test',
'cql_query_test',

View File

@@ -0,0 +1,130 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#define BOOST_TEST_DYN_LINK
#include <seastar/core/thread.hh>
#include "tests/test-utils.hh"
#include "schema_registry.hh"
#include "schema_builder.hh"
#include "mutation_source_test.hh"
#include "disk-error-handler.hh"
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;
static bytes random_column_name() {
return to_bytes(to_hex(make_blob(32)));
}
static schema_ptr random_schema() {
return schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column(random_column_name(), bytes_type)
.build();
}
SEASTAR_TEST_CASE(test_async_loading) {
return seastar::async([] {
auto s1 = random_schema();
auto s2 = random_schema();
auto s1_loaded = local_schema_registry().get_or_load(s1->version(), [s1] (table_schema_version) {
return make_ready_future<frozen_schema>(frozen_schema(s1));
}).get0();
BOOST_REQUIRE(s1_loaded);
BOOST_REQUIRE(s1_loaded->version() == s1->version());
auto s1_later = local_schema_registry().get_or_null(s1->version());
BOOST_REQUIRE(s1_later);
auto s2_loaded = local_schema_registry().get_or_load(s2->version(), [s2] (table_schema_version) {
return later().then([s2] { return frozen_schema(s2); });
}).get0();
BOOST_REQUIRE(s2_loaded);
BOOST_REQUIRE(s2_loaded->version() == s2->version());
auto s2_later = local_schema_registry().get_or_null(s2_loaded->version());
BOOST_REQUIRE(s2_later);
});
}
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_doesnt_defer) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_defers) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return later(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}
SEASTAR_TEST_CASE(test_failed_sync_can_be_retried) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
promise<> fail_sync;
auto f1 = s->registry_entry()->maybe_sync([&fail_sync] () mutable {
return fail_sync.get_future().then([] {
throw std::runtime_error("sync failed");
});
});
// concurrent maybe_sync should attach the the current one
auto f2 = s->registry_entry()->maybe_sync([] { return make_ready_future<>(); });
fail_sync.set_value();
try {
f1.get();
BOOST_FAIL("Should have failed");
} catch (...) {
// expected
}
try {
f2.get();
BOOST_FAIL("Should have failed");
} catch (...) {
// expected
}
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}