diff --git a/configure.py b/configure.py index c3f42037b0..a0a038b6d5 100755 --- a/configure.py +++ b/configure.py @@ -162,6 +162,7 @@ modes = { scylla_tests = [ 'tests/mutation_test', + 'tests/schema_registry_test', 'tests/canonical_mutation_test', 'tests/range_test', 'tests/types_test', diff --git a/schema_registry.cc b/schema_registry.cc index 01efbf1b92..aa1c16ecf5 100644 --- a/schema_registry.cc +++ b/schema_registry.cc @@ -203,12 +203,15 @@ future<> schema_registry_entry::maybe_sync(std::function()> syncer) { return make_ready_future<>(); case schema_registry_entry::sync_state::SYNCING: return _synced_future; - case schema_registry_entry::sync_state::NOT_SYNCED: + case schema_registry_entry::sync_state::NOT_SYNCED: { logger.debug("Syncing {}", _version); _synced_promise = {}; - do_with(std::move(syncer), [] (auto& syncer) { + auto f = do_with(std::move(syncer), [] (auto& syncer) { return syncer(); - }).then_wrapped([this, self = shared_from_this()] (auto&& f) { + }); + _synced_future = _synced_promise.get_future(); + _sync_state = schema_registry_entry::sync_state::SYNCING; + f.then_wrapped([this, self = shared_from_this()] (auto&& f) { if (_sync_state != sync_state::SYNCING) { return; } @@ -222,9 +225,8 @@ future<> schema_registry_entry::maybe_sync(std::function()> syncer) { _synced_promise.set_value(); } }); - _synced_future = _synced_promise.get_future(); - _sync_state = schema_registry_entry::sync_state::SYNCING; return _synced_future; + } default: assert(0); } diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 7c13d212ca..06d06820d2 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -725,20 +725,28 @@ public static class MigrationsSerializer implements IVersionedSerializer maybe_sync(const schema_ptr& s, net::messaging_service::msg_addr endpoint) { if (s->is_synced()) { return make_ready_future<>(); } - // Serialize schema sync by always doing it on shard 0. - return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint] { - schema_ptr s = gs.get(); - schema_registry_entry& e = *s->registry_entry(); - return e.maybe_sync([endpoint, s] { + return s->registry_entry()->maybe_sync([s, endpoint] { + auto merge = [gs = global_schema_ptr(s), endpoint] { + schema_ptr s = gs.get(); logger.debug("Syncing schema of {}.{} (v={}) with {}", s->ks_name(), s->cf_name(), s->version(), endpoint); return get_local_migration_manager().merge_schema_from(endpoint); - }); + }; + + // Serialize schema sync by always doing it on shard 0. + if (engine().cpu_id() == 0) { + return merge(); + } else { + return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint, merge] { + schema_ptr s = gs.get(); + schema_registry_entry& e = *s->registry_entry(); + return e.maybe_sync(merge); + }); + } }); } diff --git a/test.py b/test.py index f9fe81519d..103a66d414 100755 --- a/test.py +++ b/test.py @@ -32,6 +32,7 @@ boost_tests = [ 'types_test', 'keys_test', 'mutation_test', + 'schema_registry_test', 'range_test', 'mutation_reader_test', 'cql_query_test', diff --git a/tests/schema_registry_test.cc b/tests/schema_registry_test.cc new file mode 100644 index 0000000000..8649180593 --- /dev/null +++ b/tests/schema_registry_test.cc @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2016 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#define BOOST_TEST_DYN_LINK + +#include + +#include "tests/test-utils.hh" +#include "schema_registry.hh" +#include "schema_builder.hh" +#include "mutation_source_test.hh" + +#include "disk-error-handler.hh" + +thread_local disk_error_signal_type commit_error; +thread_local disk_error_signal_type general_disk_error; + +static bytes random_column_name() { + return to_bytes(to_hex(make_blob(32))); +} + +static schema_ptr random_schema() { + return schema_builder("ks", "cf") + .with_column("pk", bytes_type, column_kind::partition_key) + .with_column(random_column_name(), bytes_type) + .build(); +} + +SEASTAR_TEST_CASE(test_async_loading) { + return seastar::async([] { + auto s1 = random_schema(); + auto s2 = random_schema(); + + auto s1_loaded = local_schema_registry().get_or_load(s1->version(), [s1] (table_schema_version) { + return make_ready_future(frozen_schema(s1)); + }).get0(); + + BOOST_REQUIRE(s1_loaded); + BOOST_REQUIRE(s1_loaded->version() == s1->version()); + auto s1_later = local_schema_registry().get_or_null(s1->version()); + BOOST_REQUIRE(s1_later); + + auto s2_loaded = local_schema_registry().get_or_load(s2->version(), [s2] (table_schema_version) { + return later().then([s2] { return frozen_schema(s2); }); + }).get0(); + + BOOST_REQUIRE(s2_loaded); + BOOST_REQUIRE(s2_loaded->version() == s2->version()); + auto s2_later = local_schema_registry().get_or_null(s2_loaded->version()); + BOOST_REQUIRE(s2_later); + }); +} + +SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_doesnt_defer) { + return seastar::async([] { + auto s = random_schema(); + s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); }); + BOOST_REQUIRE(!s->is_synced()); + s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get(); + BOOST_REQUIRE(s->is_synced()); + }); +} + +SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_defers) { + return seastar::async([] { + auto s = random_schema(); + s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); }); + BOOST_REQUIRE(!s->is_synced()); + s->registry_entry()->maybe_sync([] { return later(); }).get(); + BOOST_REQUIRE(s->is_synced()); + }); +} + +SEASTAR_TEST_CASE(test_failed_sync_can_be_retried) { + return seastar::async([] { + auto s = random_schema(); + s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); }); + BOOST_REQUIRE(!s->is_synced()); + + promise<> fail_sync; + + auto f1 = s->registry_entry()->maybe_sync([&fail_sync] () mutable { + return fail_sync.get_future().then([] { + throw std::runtime_error("sync failed"); + }); + }); + + // concurrent maybe_sync should attach the the current one + auto f2 = s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }); + + fail_sync.set_value(); + + try { + f1.get(); + BOOST_FAIL("Should have failed"); + } catch (...) { + // expected + } + + try { + f2.get(); + BOOST_FAIL("Should have failed"); + } catch (...) { + // expected + } + + BOOST_REQUIRE(!s->is_synced()); + + s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get(); + BOOST_REQUIRE(s->is_synced()); + }); +}