schema_registry: Track synced state of schema
We need to track which schema version were synced with on current node to avoid triggering the sync on every mutation. We need to sync before mutating to be able to apply the incoming mutation using current node's schema, possibly applying irreverdible transformations to it to make it conform.
This commit is contained in:
@@ -824,3 +824,7 @@ schema::position(const column_definition& column) const {
|
||||
}
|
||||
return clustering_key_size();
|
||||
}
|
||||
|
||||
bool schema::is_synced() const {
|
||||
return _registry_entry && _registry_entry->is_synced();
|
||||
}
|
||||
|
||||
@@ -570,6 +570,11 @@ public:
|
||||
friend class schema_registry_entry;
|
||||
// May be called from different shard
|
||||
schema_registry_entry* registry_entry() const noexcept;
|
||||
// Returns true iff this schema version was synced with on current node.
|
||||
// Schema version is said to be synced with when its mutations were merged
|
||||
// into current node's schema, so that current node's schema is at least as
|
||||
// recent as this version.
|
||||
bool is_synced() const;
|
||||
};
|
||||
|
||||
bool operator==(const schema&, const schema&);
|
||||
|
||||
@@ -47,6 +47,7 @@ schema_registry_entry::schema_registry_entry(table_schema_version v, schema_regi
|
||||
: _state(state::INITIAL)
|
||||
, _version(v)
|
||||
, _registry(r)
|
||||
, _sync_state(sync_state::NOT_SYNCED)
|
||||
{ }
|
||||
|
||||
schema_ptr schema_registry::learn(const schema_ptr& s) {
|
||||
@@ -196,6 +197,51 @@ frozen_schema schema_registry_entry::frozen() const {
|
||||
return *_frozen_schema;
|
||||
}
|
||||
|
||||
future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
|
||||
switch (_sync_state) {
|
||||
case schema_registry_entry::sync_state::SYNCED:
|
||||
return make_ready_future<>();
|
||||
case schema_registry_entry::sync_state::SYNCING:
|
||||
return _synced_future;
|
||||
case schema_registry_entry::sync_state::NOT_SYNCED:
|
||||
logger.debug("Syncing {}", _version);
|
||||
_synced_promise = {};
|
||||
do_with(std::move(syncer), [] (auto& syncer) {
|
||||
return syncer();
|
||||
}).then_wrapped([this, self = shared_from_this()] (auto&& f) {
|
||||
if (_sync_state != sync_state::SYNCING) {
|
||||
return;
|
||||
}
|
||||
if (f.failed()) {
|
||||
logger.debug("Syncing of {} failed", _version);
|
||||
_sync_state = schema_registry_entry::sync_state::NOT_SYNCED;
|
||||
_synced_promise.set_exception(f.get_exception());
|
||||
} else {
|
||||
logger.debug("Synced {}", _version);
|
||||
_sync_state = schema_registry_entry::sync_state::SYNCED;
|
||||
_synced_promise.set_value();
|
||||
}
|
||||
});
|
||||
_synced_future = _synced_promise.get_future();
|
||||
_sync_state = schema_registry_entry::sync_state::SYNCING;
|
||||
return _synced_future;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
bool schema_registry_entry::is_synced() const {
|
||||
return _sync_state == sync_state::SYNCED;
|
||||
}
|
||||
|
||||
void schema_registry_entry::mark_synced() {
|
||||
if (_sync_state == sync_state::SYNCING) {
|
||||
_synced_promise.set_value();
|
||||
}
|
||||
_sync_state = sync_state::SYNCED;
|
||||
logger.debug("Marked {} as synced", _version);
|
||||
}
|
||||
|
||||
schema_registry& local_schema_registry() {
|
||||
return registry;
|
||||
}
|
||||
@@ -220,9 +266,15 @@ schema_ptr global_schema_ptr::get() const {
|
||||
// 'e' points to a foreign entry, but we know it won't be evicted
|
||||
// because _ptr is preventing this.
|
||||
const schema_registry_entry& e = *_ptr->registry_entry();
|
||||
schema_ptr s = local_schema_registry().get_or_load(e.version(), [&e] (table_schema_version) {
|
||||
return e.frozen();
|
||||
});
|
||||
schema_ptr s = local_schema_registry().get_or_null(e.version());
|
||||
if (!s) {
|
||||
s = local_schema_registry().get_or_load(e.version(), [&e](table_schema_version) {
|
||||
return e.frozen();
|
||||
});
|
||||
if (e.is_synced()) {
|
||||
s->registry_entry()->mark_synced();
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,6 +72,11 @@ class schema_registry_entry : public enable_lw_shared_from_this<schema_registry_
|
||||
// This is != nullptr when there is an alive schema_ptr associated with this entry.
|
||||
const ::schema* _schema = nullptr;
|
||||
|
||||
enum class sync_state { NOT_SYNCED, SYNCING, SYNCED };
|
||||
sync_state _sync_state;
|
||||
promise<> _synced_promise; // valid when _sync_state == SYNCING
|
||||
shared_future<> _synced_future; // valid when _sync_state == SYNCING
|
||||
|
||||
friend class schema_registry;
|
||||
public:
|
||||
schema_registry_entry(table_schema_version v, schema_registry& r);
|
||||
@@ -82,6 +87,12 @@ public:
|
||||
future<schema_ptr> start_loading(async_schema_loader);
|
||||
schema_ptr get_schema(); // call only when state >= LOADED
|
||||
// Can be called from other shards
|
||||
bool is_synced() const;
|
||||
// Initiates asynchronous schema sync or returns ready future when is already synced.
|
||||
future<> maybe_sync(std::function<future<>()> sync);
|
||||
// Marks this schema version as synced. Syncing cannot be in progress.
|
||||
void mark_synced();
|
||||
// Can be called from other shards
|
||||
frozen_schema frozen() const;
|
||||
// Can be called from other shards
|
||||
table_schema_version version() const { return _version; }
|
||||
|
||||
Reference in New Issue
Block a user