schema, everywhere: define and use table_schema_version as a strong type

Define table_schema_version as a distinct tagged_uuid class,
So it can be differentiated from other uuid-class types,
in particular table_id.

Added reversed(table_schema_version) for convenience
and uniformity since the same logic is currently open coded
in several places.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-08-04 17:27:46 +03:00
parent 257d74bb34
commit 2b017ce285
28 changed files with 95 additions and 92 deletions

View File

@@ -506,7 +506,7 @@ future<> manager::end_point_hints_manager::flush_current_hints() noexcept {
class no_column_mapping : public std::out_of_range {
public:
no_column_mapping(const utils::UUID& id) : std::out_of_range(format("column mapping for CF {} is missing", id)) {}
no_column_mapping(const table_schema_version& id) : std::out_of_range(format("column mapping for CF schema_version {} is missing", id)) {}
};
future<> manager::end_point_hints_manager::sender::flush_maybe() noexcept {

View File

@@ -731,7 +731,7 @@ redact_columns_for_missing_features(mutation m, schema_features features) {
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
* will be converted into UUID which would act as content-based version of the schema.
*/
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features, noncopyable_function<bool(std::string_view)> accept_keyspace)
future<table_schema_version> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features, noncopyable_function<bool(std::string_view)> accept_keyspace)
{
auto map = [&proxy, features, accept_keyspace = std::move(accept_keyspace)] (sstring table) mutable -> future<std::vector<mutation>> {
auto rs = co_await db::system_keyspace::query_mutations(proxy, NAME, table);
@@ -771,7 +771,7 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
}
}
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features)
future<table_schema_version> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features)
{
return calculate_schema_digest(proxy, features, std::not_fn(&is_system_keyspace));
}
@@ -1073,7 +1073,7 @@ future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema
const auto ts = api::new_timestamp();
for (const auto& cdef : boost::range::join(s->static_columns(), s->regular_columns())) {
mutation m(history_tbl, pk);
auto ckey = clustering_key::from_exploded(*history_tbl, {uuid_type->decompose(s->version()),
auto ckey = clustering_key::from_exploded(*history_tbl, {uuid_type->decompose(s->version().uuid()),
utf8_type->decompose(cdef.name_as_text())});
fill_column_info(*s, ckey, cdef, ts, ttl, m);
muts.emplace_back(std::move(m));
@@ -2324,7 +2324,7 @@ mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type times
auto pkey = partition_key::from_singular(*s, table->ks_name());
auto ckey = clustering_key::from_singular(*s, table->cf_name());
mutation m(scylla_tables(), pkey);
m.set_clustered_cell(ckey, "version", utils::UUID(table->version()), timestamp);
m.set_clustered_cell(ckey, "version", table->version().uuid(), timestamp);
// Since 4.0, we stopped using cdc column in scylla tables. Extensions are
// used instead. Since we stopped reading this column in commit 861c7b5, we
// can now keep it always empty.
@@ -3455,7 +3455,7 @@ table_schema_version schema_mutations::digest() const {
const db::schema_features no_features;
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies, no_features);
db::schema_tables::feed_hash_for_schema_digest(h, _columns, no_features);
return utils::UUID_gen::get_name_UUID(h.finalize());
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
}
future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy,
@@ -3475,7 +3475,7 @@ future<column_mapping> get_column_mapping(::table_id table_id, table_schema_vers
shared_ptr<cql3::untyped_result_set> results = co_await qctx->qp().execute_internal(
GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
{table_id.uuid(), version},
{table_id.uuid(), version.uuid()},
cql3::query_processor::cache_internal::no
);
if (results->empty()) {
@@ -3516,7 +3516,7 @@ future<bool> column_mapping_exists(table_id table_id, table_schema_version versi
shared_ptr<cql3::untyped_result_set> results = co_await qctx->qp().execute_internal(
GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
{table_id.uuid(), version},
{table_id.uuid(), version.uuid()},
cql3::query_processor::cache_internal::yes
);
co_return !results->empty();
@@ -3529,7 +3529,7 @@ future<> drop_column_mapping(table_id table_id, table_schema_version version) {
co_await qctx->qp().execute_internal(
DEL_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
{table_id.uuid(), version},
{table_id.uuid(), version.uuid()},
cql3::query_processor::cache_internal::no);
}

View File

@@ -170,9 +170,9 @@ future<> save_system_schema(cql3::query_processor& qp, const sstring & ks);
// saves/creates "system_schema" keyspace
future<> save_system_keyspace_schema(cql3::query_processor& qp);
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features, noncopyable_function<bool(std::string_view)> accept_keyspace);
future<table_schema_version> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features, noncopyable_function<bool(std::string_view)> accept_keyspace);
// Calculates schema digest for all non-system keyspaces
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features);
future<table_schema_version> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features);
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features);
std::vector<mutation> adjust_schema_for_schema_features(std::vector<mutation> schema, schema_features features);

View File

@@ -89,7 +89,7 @@ table_schema_version system_keyspace::generate_schema_version(::table_id table_i
md5_hasher h;
feed_hash(h, table_id);
feed_hash(h, version_sequence_number + offset);
return utils::UUID_gen::get_name_UUID(h.finalize());
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
}
// Currently, the type variables (uuid_type, etc.) are thread-local reference-
@@ -1628,9 +1628,9 @@ future<std::optional<sstring>> system_keyspace::get_scylla_local_param(const sst
return get_scylla_local_param_as<sstring>(key);
}
future<> system_keyspace::update_schema_version(utils::UUID version) {
future<> system_keyspace::update_schema_version(table_schema_version version) {
sstring req = format("INSERT INTO system.{} (key, schema_version) VALUES (?, ?)", LOCAL);
return execute_cql(req, sstring(LOCAL), version).discard_result();
return execute_cql(req, sstring(LOCAL), version.uuid()).discard_result();
}
/**

View File

@@ -223,7 +223,7 @@ public:
static table_schema_version generate_schema_version(table_id table_id, uint16_t offset = 0);
future<> setup(sharded<netw::messaging_service>& ms);
future<> update_schema_version(utils::UUID version);
future<> update_schema_version(table_schema_version version);
/*
* Save tokens used by this node in the LOCAL table.

View File

@@ -14,7 +14,6 @@
#include "counters.hh"
#include "partition_builder.hh"
#include "mutation_partition_serializer.hh"
#include "utils/UUID.hh"
#include "utils/data_input.hh"
#include "query-result-set.hh"
#include "idl/mutation.dist.hh"
@@ -41,7 +40,7 @@ frozen_mutation::column_family_id() const {
return mutation_view().table_id();
}
utils::UUID
table_schema_version
frozen_mutation::schema_version() const {
return mutation_view().schema_version();
}

View File

@@ -167,7 +167,7 @@ public:
frozen_mutation& operator=(const frozen_mutation&) = default;
const bytes_ostream& representation() const { return _bytes; }
table_id column_family_id() const;
utils::UUID schema_version() const; // FIXME: Should replace column_family_id()
table_schema_version schema_version() const; // FIXME: Should replace column_family_id()
partition_key_view key() const;
dht::decorated_key decorated_key(const schema& s) const;
mutation_partition_view partition() const;

View File

@@ -126,7 +126,7 @@ public:
return versioned_value(to_sstring(load));
}
static versioned_value schema(const utils::UUID &new_version) {
static versioned_value schema(const table_schema_version& new_version) {
return versioned_value(new_version.to_sstring());
}

View File

@@ -28,7 +28,7 @@ class schema_mutations {
};
class schema stub [[writable]] {
utils::UUID version;
table_schema_version version;
schema_mutations mutations;
};

View File

@@ -124,7 +124,7 @@ class mutation_partition stub [[writable]] {
class mutation stub [[writable]] {
::table_id table_id;
utils::UUID schema_version;
table_schema_version schema_version;
partition_key key;
mutation_partition partition;
};
@@ -141,7 +141,7 @@ class column_mapping {
class canonical_mutation stub [[writable]] {
::table_id table_id;
utils::UUID schema_version;
table_schema_version schema_version;
partition_key key;
column_mapping mapping;
mutation_partition partition;

View File

@@ -48,7 +48,7 @@ struct max_result_size {
class read_command {
table_id cf_id;
utils::UUID schema_version;
table_schema_version schema_version;
query::partition_slice slice;
uint32_t row_limit_low_bits;
std::chrono::time_point<gc_clock, gc_clock::duration> timestamp;

View File

@@ -34,4 +34,4 @@ verb [[with_timeout]] truncate (sstring, sstring);
verb [[with_client_info, with_timeout]] paxos_prepare (query::read_command cmd, partition_key key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info> trace_info) -> service::paxos::prepare_response [[unique_ptr]];
verb [[with_client_info, with_timeout]] paxos_accept (service::paxos::proposal proposal [[ref]], std::optional<tracing::trace_info> trace_info) -> bool;
verb [[with_client_info, with_timeout, one_way]] paxos_learn (service::paxos::proposal decision, inet_address_vector_replica_set forward, gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info>);
verb [[with_client_info, with_timeout, one_way]] paxos_prune (utils::UUID schema_id, partition_key key [[ref]], utils::UUID ballot, std::optional<tracing::trace_info> trace_info);
verb [[with_client_info, with_timeout, one_way]] paxos_prune (table_schema_version schema_id, partition_key key [[ref]], utils::UUID ballot, std::optional<tracing::trace_info> trace_info);

View File

@@ -19,3 +19,7 @@ class UUID final {
class table_id final {
utils::UUID uuid();
};
class table_schema_version final {
utils::UUID uuid();
};

View File

@@ -823,14 +823,14 @@ rpc::sink<int32_t> messaging_service::make_sink_for_stream_mutation_fragments(rp
}
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>>
messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) {
messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, utils::UUID plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) {
using value_type = std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>;
if (is_shutting_down()) {
return make_exception_future<value_type>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
return rpc_client->make_stream_sink<netw::serializer, frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (utils::UUID, utils::UUID, table_id, uint64_t, streaming::stream_reason, rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (utils::UUID, table_schema_version, table_id, uint64_t, streaming::stream_reason, rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink).then_wrapped([sink, rpc_client] (future<rpc::source<int32_t>> source) mutable {
return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable {
return make_ready_future<value_type>(value_type(std::move(sink), source.get0()));
@@ -839,7 +839,7 @@ messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUI
});
}
void messaging_service::register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func) {
void messaging_service::register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, UUID plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func));
}
@@ -1090,14 +1090,14 @@ future<frozen_schema> messaging_service::send_get_schema_version(msg_addr dst, t
return send_message<frozen_schema>(this, messaging_verb::GET_SCHEMA_VERSION, dst, static_cast<unsigned>(dst.cpu_id), v);
}
void messaging_service::register_schema_check(std::function<future<utils::UUID>()>&& func) {
void messaging_service::register_schema_check(std::function<future<table_schema_version>()>&& func) {
register_handler(this, netw::messaging_verb::SCHEMA_CHECK, std::move(func));
}
future<> messaging_service::unregister_schema_check() {
return unregister_handler(netw::messaging_verb::SCHEMA_CHECK);
}
future<utils::UUID> messaging_service::send_schema_check(msg_addr dst) {
return send_message<utils::UUID>(this, netw::messaging_verb::SCHEMA_CHECK, dst);
future<table_schema_version> messaging_service::send_schema_check(msg_addr dst) {
return send_message<table_schema_version>(this, netw::messaging_verb::SCHEMA_CHECK, dst);
}
// Wrapper for REPLICATION_FINISHED

View File

@@ -95,8 +95,6 @@ class node_ops_cmd_response;
class node_ops_cmd_request;
enum class row_level_diff_detect_algorithm : uint8_t;
using table_schema_version = utils::UUID;
namespace streaming {
enum class stream_reason : uint8_t;
@@ -350,10 +348,10 @@ public:
// Wrapper for STREAM_MUTATION_FRAGMENTS
// The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, other status code value are reserved for future use.
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func);
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, UUID plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func);
future<> unregister_stream_mutation_fragments();
rpc::sink<int32_t> make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>>& source);
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id);
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>> make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, utils::UUID plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id);
// Wrapper for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM
future<std::tuple<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>> make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id);
@@ -486,9 +484,9 @@ public:
future<frozen_schema> send_get_schema_version(msg_addr, table_schema_version);
// Wrapper for SCHEMA_CHECK
void register_schema_check(std::function<future<utils::UUID>()>&& func);
void register_schema_check(std::function<future<table_schema_version>()>&& func);
future<> unregister_schema_check();
future<utils::UUID> send_schema_check(msg_addr);
future<table_schema_version> send_schema_check(msg_addr);
// Wrapper for REPLICATION_FINISHED verb
void register_replication_finished(std::function<future<> (inet_address from)>&& func);

View File

@@ -120,7 +120,7 @@ bool string_pair_eq::operator()(spair lhs, spair rhs) const {
return lhs == rhs;
}
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
table_schema_version database::empty_version = table_schema_version(utils::UUID_gen::get_name_UUID(bytes{}));
namespace {
@@ -751,14 +751,14 @@ database::~database() {
_user_types->deactivate();
}
void database::update_version(const utils::UUID& version) {
void database::update_version(const table_schema_version& version) {
if (_version.get() != version) {
_schema_change_count++;
}
_version.set(version);
}
const utils::UUID& database::get_version() const {
const table_schema_version& database::get_version() const {
return _version.get();
}

View File

@@ -14,7 +14,6 @@
#include <seastar/core/sstring.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/execution_stage.hh>
#include "utils/UUID.hh"
#include "utils/hash.hh"
#include "db_clock.hh"
#include "gc_clock.hh"
@@ -1329,7 +1328,7 @@ private:
ks_cf_to_uuid_t _ks_cf_to_uuid;
std::unique_ptr<db::commitlog> _commitlog;
std::unique_ptr<db::commitlog> _schema_commitlog;
utils::updateable_value_source<utils::UUID> _version;
utils::updateable_value_source<table_schema_version> _version;
uint32_t _schema_change_count = 0;
// compaction_manager object is referenced by all column families of a database.
compaction_manager& _compaction_manager;
@@ -1408,7 +1407,7 @@ private:
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>&, locator::effective_replication_map_factory& erm_factory, bool is_bootstrap, system_keyspace system);
void remove(const table&) noexcept;
public:
static utils::UUID empty_version;
static table_schema_version empty_version;
query::result_memory_limiter& get_result_memory_limiter() {
return _result_memory_limiter;
@@ -1444,10 +1443,10 @@ public:
cache_tracker& row_cache_tracker() { return _row_cache_tracker; }
future<> drop_caches() const;
void update_version(const utils::UUID& version);
void update_version(const table_schema_version& version);
const utils::UUID& get_version() const;
utils::observable<utils::UUID>& observable_schema_version() const { return _version.as_observable(); }
const table_schema_version& get_version() const;
utils::observable<table_schema_version>& observable_schema_version() const { return _version.as_observable(); }
db::commitlog* commitlog() const {
return _commitlog.get();

View File

@@ -431,7 +431,7 @@ schema::schema(const schema& o)
schema::schema(reversed_tag, const schema& o)
: schema(o, [] (schema& s) {
s._raw._version = utils::UUID_gen::negate(s._raw._version);
s._raw._version = reversed(s._raw._version);
for (auto& col : s._raw._columns) {
if (col.kind == column_kind::clustering_key) {
col.type = reversed(col.type);
@@ -1232,7 +1232,7 @@ schema_ptr schema_builder::build() {
if (_version) {
new_raw._version = *_version;
} else {
new_raw._version = utils::UUID_gen::get_time_UUID();
new_raw._version = table_schema_version(utils::UUID_gen::get_time_UUID());
}
if (new_raw._is_counter) {
@@ -1670,7 +1670,7 @@ schema_ptr schema::make_reversed() const {
}
schema_ptr schema::get_reversed() const {
return local_schema_registry().get_or_load(utils::UUID_gen::negate(_raw._version), [this] (table_schema_version) {
return local_schema_registry().get_or_load(reversed(_raw._version), [this] (table_schema_version) {
return frozen_schema(make_reversed());
});
}

View File

@@ -22,7 +22,6 @@
#include "types.hh"
#include "compound.hh"
#include "gc_clock.hh"
#include "utils/UUID.hh"
#include "compress.hh"
#include "compaction/compaction_strategy_type.hh"
#include "caching_options.hh"
@@ -109,20 +108,6 @@ private:
bitset _mask;
};
// Cluster-wide identifier of schema version of particular table.
//
// The version changes the value not only on structural changes but also
// temporal. For example, schemas with the same set of columns but created at
// different times should have different versions. This allows nodes to detect
// if the version they see was already synchronized with or not even if it has
// the same structure as the past versions.
//
// Schema changes merged in any order should result in the same final version.
//
// When table_schema_version changes, schema_tables::calculate_schema_digest() should
// also change when schema mutations are applied.
using table_schema_version = utils::UUID;
class schema_registry_entry;
class schema_builder;

View File

@@ -11,6 +11,7 @@
#include <seastar/core/shared_ptr.hh>
#include "utils/UUID.hh"
#include "utils/UUID_gen.hh"
using column_count_type = uint32_t;
@@ -23,3 +24,21 @@ class schema_extension;
using schema_ptr = seastar::lw_shared_ptr<const schema>;
using table_id = utils::tagged_uuid<struct table_id_tag>;
// Cluster-wide identifier of schema version of particular table.
//
// The version changes the value not only on structural changes but also
// temporal. For example, schemas with the same set of columns but created at
// different times should have different versions. This allows nodes to detect
// if the version they see was already synchronized with or not even if it has
// the same structure as the past versions.
//
// Schema changes merged in any order should result in the same final version.
//
// When table_schema_version changes, schema_tables::calculate_schema_digest() should
// also change when schema mutations are applied.
using table_schema_version = utils::tagged_uuid<struct table_schema_version_tag>;
inline table_schema_version reversed(table_schema_version v) noexcept {
return table_schema_version(utils::UUID_gen::negate(v.uuid()));
}

View File

@@ -56,7 +56,7 @@ table_schema_version schema_mutations::digest() const {
auto&& row = rs.row(0);
auto val = row.get<utils::UUID>("version");
if (val) {
return *val;
return table_schema_version(*val);
}
}
}
@@ -90,7 +90,7 @@ table_schema_version schema_mutations::digest() const {
if (_scylla_tables) {
db::schema_tables::feed_hash_for_schema_digest(h, *_scylla_tables, sf);
}
return utils::UUID_gen::get_name_UUID(h.finalize());
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
}
std::optional<sstring> schema_mutations::partitioner() const {

View File

@@ -142,7 +142,7 @@ void migration_manager::init_messaging_service()
co_return rpc::tuple(std::move(fm), std::move(cm));
}, std::ref(*this)));
_messaging.register_schema_check([this] {
return make_ready_future<utils::UUID>(_storage_proxy.get_db().local().get_version());
return make_ready_future<table_schema_version>(_storage_proxy.get_db().local().get_version());
});
_messaging.register_get_schema_version([this] (unsigned shard, table_schema_version v) {
// FIXME: should this get an smp_service_group? Probably one separate from reads and writes.
@@ -179,7 +179,7 @@ void migration_manager::schedule_schema_pull(const gms::inet_address& endpoint,
if (endpoint != utils::fb_utilities::get_broadcast_address() && value) {
// FIXME: discarded future
(void)maybe_schedule_schema_pull(utils::UUID{value->value}, endpoint).handle_exception([endpoint] (auto ep) {
(void)maybe_schedule_schema_pull(table_schema_version(utils::UUID{value->value}), endpoint).handle_exception([endpoint] (auto ep) {
mlogger.warn("Fail to pull schema from {}: {}", endpoint, ep);
});
}
@@ -205,7 +205,7 @@ bool migration_manager::have_schema_agreement() {
mlogger.debug("Schema state not yet available for {}.", endpoint);
return false;
}
utils::UUID remote_version{schema->value};
auto remote_version = table_schema_version(utils::UUID{schema->value});
if (our_version != remote_version) {
mlogger.debug("Schema mismatch for {} ({} != {}).", endpoint, our_version, remote_version);
return false;
@@ -220,7 +220,7 @@ bool migration_manager::have_schema_agreement() {
* If versions differ this node sends request with local migration list to the endpoint
* and expecting to receive a list of migrations to apply locally.
*/
future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_version, const gms::inet_address& endpoint)
future<> migration_manager::maybe_schedule_schema_pull(const table_schema_version& their_version, const gms::inet_address& endpoint)
{
auto& proxy = _storage_proxy;
auto& db = proxy.get_db().local();
@@ -251,7 +251,7 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
mlogger.debug("application_state::SCHEMA does not exist for {}, not submitting migration task", endpoint);
return make_ready_future<>();
}
utils::UUID current_version{value->value};
auto current_version = table_schema_version(utils::UUID{value->value});
if (db.get_version() == current_version) {
mlogger.debug("not submitting migration task for {} because our versions match", endpoint);
return make_ready_future<>();
@@ -934,7 +934,7 @@ future<group0_guard> migration_manager::start_group0_operation() {
*
* @param version The schema version to announce
*/
void migration_manager::passive_announce(utils::UUID version) {
void migration_manager::passive_announce(table_schema_version version) {
_schema_version_to_publish = version;
(void)_schema_push.trigger().handle_exception([version = std::move(version)] (std::exception_ptr ex) {
mlogger.warn("Passive announcing of version {} failed: {}. Ignored.", version);
@@ -1102,10 +1102,10 @@ future<schema_ptr> migration_manager::get_schema_for_write(table_schema_version
}
future<> migration_manager::sync_schema(const replica::database& db, const std::vector<gms::inet_address>& nodes) {
using schema_and_hosts = std::unordered_map<utils::UUID, std::vector<gms::inet_address>>;
return do_with(schema_and_hosts(), db.get_version(), [this, &nodes] (schema_and_hosts& schema_map, utils::UUID& my_version) {
using schema_and_hosts = std::unordered_map<table_schema_version, std::vector<gms::inet_address>>;
return do_with(schema_and_hosts(), db.get_version(), [this, &nodes] (schema_and_hosts& schema_map, table_schema_version& my_version) {
return parallel_for_each(nodes, [this, &schema_map, &my_version] (const gms::inet_address& node) {
return _messaging.send_schema_check(netw::msg_addr(node)).then([node, &schema_map, &my_version] (utils::UUID remote_version) {
return _messaging.send_schema_check(netw::msg_addr(node)).then([node, &schema_map, &my_version] (table_schema_version remote_version) {
if (my_version != remote_version) {
schema_map[remote_version].emplace_back(node);
}

View File

@@ -20,7 +20,7 @@
#include "gms/feature.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "message/msg_addr.hh"
#include "utils/UUID.hh"
#include "schema_fwd.hh"
#include "utils/serialized_action.hh"
#include "service/raft/raft_group_registry.hh"
#include "service/raft/raft_group0_client.hh"
@@ -71,7 +71,7 @@ private:
service::raft_group0_client& _group0_client;
sharded<db::system_keyspace>& _sys_ks;
serialized_action _schema_push;
utils::UUID _schema_version_to_publish;
table_schema_version _schema_version_to_publish;
friend class group0_state_machine; // needed for access to _messaging
size_t _concurrent_ddl_retries;
@@ -163,7 +163,7 @@ public:
// The future resolves after the change is applied locally.
future<> announce(std::vector<mutation> schema, group0_guard, std::string_view description = "");
void passive_announce(utils::UUID version);
void passive_announce(table_schema_version version);
future<> drain();
future<> stop();
@@ -190,7 +190,7 @@ private:
void schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state);
future<> maybe_schedule_schema_pull(const utils::UUID& their_version, const gms::inet_address& endpoint);
future<> maybe_schedule_schema_pull(const table_schema_version& their_version, const gms::inet_address& endpoint);
future<> announce_with_raft(std::vector<mutation> schema, group0_guard, std::string_view description);
future<> announce_without_raft(std::vector<mutation> schema);

View File

@@ -318,7 +318,7 @@ public:
future<> send_paxos_prune(
netw::msg_addr addr, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state,
utils::UUID schema_id, const partition_key& key, utils::UUID ballot) {
table_schema_version schema_id, const partition_key& key, utils::UUID ballot) {
return ser::storage_proxy_rpc_verbs::send_paxos_prune(&_ms, addr, timeout, schema_id, key, ballot, tracing::make_trace_info(tr_state));
}
@@ -393,7 +393,7 @@ private:
future<rpc::no_wait_type> handle_write(
netw::messaging_service::msg_addr src_addr, rpc::opt_time_point t,
utils::UUID schema_version, auto in, inet_address_vector_replica_set forward, gms::inet_address reply_to,
auto schema_version, auto in, inet_address_vector_replica_set forward, gms::inet_address reply_to,
unsigned shard, storage_proxy::response_id_type response_id, std::optional<tracing::trace_info> trace_info,
auto&& apply_fn, auto&& forward_fn) {
tracing::trace_state_ptr trace_state_ptr;
@@ -499,7 +499,7 @@ private:
auto src_addr = netw::messaging_service::get_source(cinfo);
auto rate_limit_info = rate_limit_info_opt.value_or(std::monostate());
utils::UUID schema_version = in.schema_version();
auto schema_version = in.schema_version();
return handle_write(src_addr, t, schema_version, std::move(in), std::move(forward), reply_to, shard, response_id,
trace_info ? *trace_info : std::nullopt,
/* apply_fn */ [smp_grp, rate_limit_info] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s, const frozen_mutation& m,
@@ -520,7 +520,7 @@ private:
tracing::trace_state_ptr trace_state_ptr;
auto src_addr = netw::messaging_service::get_source(cinfo);
utils::UUID schema_version = decision.update.schema_version();
auto schema_version = decision.update.schema_version();
return handle_write(src_addr, t, schema_version, std::move(decision), std::move(forward), reply_to, shard,
response_id, trace_info,
/* apply_fn */ [] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s,
@@ -758,7 +758,7 @@ private:
future<rpc::no_wait_type> handle_paxos_prune(
const rpc::client_info& cinfo, rpc::opt_time_point timeout,
utils::UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
table_schema_version schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
static thread_local uint16_t pruning = 0;
static constexpr uint16_t pruning_limit = 1000; // since PRUNE verb is one way replica side has its own queue limit
auto src_addr = netw::messaging_service::get_source(cinfo);

View File

@@ -428,7 +428,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
auto advertise = gms::advertise_myself(!replacing_a_node_with_same_ip);
co_await _gossiper.start_gossiping(generation_number, app_states, advertise);
auto schema_change_announce = _db.local().observable_schema_version().observe([this] (utils::UUID schema_version) mutable {
auto schema_change_announce = _db.local().observable_schema_version().observe([this] (table_schema_version schema_version) mutable {
_migration_manager.local().passive_announce(std::move(schema_version));
});
_listeners.emplace_back(make_lw_shared(std::move(schema_change_announce)));
@@ -1747,9 +1747,9 @@ future<std::unordered_map<sstring, std::vector<sstring>>> storage_service::descr
return std::move(f0).then_wrapped([host] (auto f) {
if (f.failed()) {
f.ignore_ready_future();
return std::pair<gms::inet_address, std::optional<utils::UUID>>(host, std::nullopt);
return std::pair<gms::inet_address, std::optional<table_schema_version>>(host, std::nullopt);
}
return std::pair<gms::inet_address, std::optional<utils::UUID>>(host, f.get0());
return std::pair<gms::inet_address, std::optional<table_schema_version>>(host, f.get0());
});
}, std::move(results), [] (auto results, auto host_and_version) {
auto version = host_and_version.second ? host_and_version.second->to_sstring() : UNREACHABLE;

View File

@@ -13,7 +13,6 @@
#include "schema_fwd.hh"
#include "sstables/types.hh"
#include "utils/UUID.hh"
#include "db/marshal/type_parser.hh"
namespace sstables {
@@ -61,7 +60,7 @@ private:
const sstable_enabled_features& features,
bool is_static);
utils::UUID schema_uuid;
table_schema_version schema_uuid;
std::vector<column_info> regular_schema_columns_from_sstable;
std::vector<column_info> static_schema_columns_from_sstable;
column_values_fixed_lengths clustering_column_value_fix_lengths;

View File

@@ -74,7 +74,7 @@ void stream_manager::init_messaging_service_handler() {
return make_ready_future<>();
});
});
ms.register_stream_mutation_fragments([this] (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>> source) {
ms.register_stream_mutation_fragments([this] (const rpc::client_info& cinfo, UUID plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>> source) {
auto from = netw::messaging_service::get_source(cinfo);
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason));

View File

@@ -680,7 +680,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
if (regenerate) {
std::cout << format(" utils::UUID(\"{}\"),", actual) << "\n";
} else {
BOOST_REQUIRE_EQUAL(actual, expected);
BOOST_REQUIRE_EQUAL(actual.uuid(), expected);
}
};
@@ -689,7 +689,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
if (regenerate) {
std::cout << format(" utils::UUID(\"{}\"),", actual) << "\n";
} else {
BOOST_REQUIRE_EQUAL(actual, expected);
BOOST_REQUIRE_EQUAL(actual.uuid(), expected);
}
};
@@ -893,7 +893,7 @@ SEASTAR_TEST_CASE(test_schema_make_reversed) {
testlog.info(" reversed_schema->version(): {}", reversed_schema->version());
BOOST_REQUIRE(schema->version() != reversed_schema->version());
BOOST_REQUIRE(utils::UUID_gen::negate(schema->version()) == reversed_schema->version());
BOOST_REQUIRE(reversed(schema->version()) == reversed_schema->version());
auto re_reversed_schema = reversed_schema->make_reversed();
testlog.info("re_reversed_schema->version(): {}", re_reversed_schema->version());