From fb5658ede1fa69a9ce90c042de4cf775d5c9bc5e Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 9 Dec 2015 18:50:45 +0100 Subject: [PATCH] schema_registry: Track synced state of schema We need to track which schema version were synced with on current node to avoid triggering the sync on every mutation. We need to sync before mutating to be able to apply the incoming mutation using current node's schema, possibly applying irreverdible transformations to it to make it conform. --- schema.cc | 4 ++++ schema.hh | 5 ++++ schema_registry.cc | 58 +++++++++++++++++++++++++++++++++++++++++++--- schema_registry.hh | 11 +++++++++ 4 files changed, 75 insertions(+), 3 deletions(-) diff --git a/schema.cc b/schema.cc index 0103910825..60e1f6a216 100644 --- a/schema.cc +++ b/schema.cc @@ -824,3 +824,7 @@ schema::position(const column_definition& column) const { } return clustering_key_size(); } + +bool schema::is_synced() const { + return _registry_entry && _registry_entry->is_synced(); +} diff --git a/schema.hh b/schema.hh index 71bd495deb..930c7b20d3 100644 --- a/schema.hh +++ b/schema.hh @@ -570,6 +570,11 @@ public: friend class schema_registry_entry; // May be called from different shard schema_registry_entry* registry_entry() const noexcept; + // Returns true iff this schema version was synced with on current node. + // Schema version is said to be synced with when its mutations were merged + // into current node's schema, so that current node's schema is at least as + // recent as this version. + bool is_synced() const; }; bool operator==(const schema&, const schema&); diff --git a/schema_registry.cc b/schema_registry.cc index f9d6b5b71d..01efbf1b92 100644 --- a/schema_registry.cc +++ b/schema_registry.cc @@ -47,6 +47,7 @@ schema_registry_entry::schema_registry_entry(table_schema_version v, schema_regi : _state(state::INITIAL) , _version(v) , _registry(r) + , _sync_state(sync_state::NOT_SYNCED) { } schema_ptr schema_registry::learn(const schema_ptr& s) { @@ -196,6 +197,51 @@ frozen_schema schema_registry_entry::frozen() const { return *_frozen_schema; } +future<> schema_registry_entry::maybe_sync(std::function()> syncer) { + switch (_sync_state) { + case schema_registry_entry::sync_state::SYNCED: + return make_ready_future<>(); + case schema_registry_entry::sync_state::SYNCING: + return _synced_future; + case schema_registry_entry::sync_state::NOT_SYNCED: + logger.debug("Syncing {}", _version); + _synced_promise = {}; + do_with(std::move(syncer), [] (auto& syncer) { + return syncer(); + }).then_wrapped([this, self = shared_from_this()] (auto&& f) { + if (_sync_state != sync_state::SYNCING) { + return; + } + if (f.failed()) { + logger.debug("Syncing of {} failed", _version); + _sync_state = schema_registry_entry::sync_state::NOT_SYNCED; + _synced_promise.set_exception(f.get_exception()); + } else { + logger.debug("Synced {}", _version); + _sync_state = schema_registry_entry::sync_state::SYNCED; + _synced_promise.set_value(); + } + }); + _synced_future = _synced_promise.get_future(); + _sync_state = schema_registry_entry::sync_state::SYNCING; + return _synced_future; + default: + assert(0); + } +} + +bool schema_registry_entry::is_synced() const { + return _sync_state == sync_state::SYNCED; +} + +void schema_registry_entry::mark_synced() { + if (_sync_state == sync_state::SYNCING) { + _synced_promise.set_value(); + } + _sync_state = sync_state::SYNCED; + logger.debug("Marked {} as synced", _version); +} + schema_registry& local_schema_registry() { return registry; } @@ -220,9 +266,15 @@ schema_ptr global_schema_ptr::get() const { // 'e' points to a foreign entry, but we know it won't be evicted // because _ptr is preventing this. const schema_registry_entry& e = *_ptr->registry_entry(); - schema_ptr s = local_schema_registry().get_or_load(e.version(), [&e] (table_schema_version) { - return e.frozen(); - }); + schema_ptr s = local_schema_registry().get_or_null(e.version()); + if (!s) { + s = local_schema_registry().get_or_load(e.version(), [&e](table_schema_version) { + return e.frozen(); + }); + if (e.is_synced()) { + s->registry_entry()->mark_synced(); + } + } return s; } } diff --git a/schema_registry.hh b/schema_registry.hh index 387dc94f6d..14af01ee58 100644 --- a/schema_registry.hh +++ b/schema_registry.hh @@ -72,6 +72,11 @@ class schema_registry_entry : public enable_lw_shared_from_this _synced_promise; // valid when _sync_state == SYNCING + shared_future<> _synced_future; // valid when _sync_state == SYNCING + friend class schema_registry; public: schema_registry_entry(table_schema_version v, schema_registry& r); @@ -82,6 +87,12 @@ public: future start_loading(async_schema_loader); schema_ptr get_schema(); // call only when state >= LOADED // Can be called from other shards + bool is_synced() const; + // Initiates asynchronous schema sync or returns ready future when is already synced. + future<> maybe_sync(std::function()> sync); + // Marks this schema version as synced. Syncing cannot be in progress. + void mark_synced(); + // Can be called from other shards frozen_schema frozen() const; // Can be called from other shards table_schema_version version() const { return _version; }