lwt: store column_mapping's for each table schema version upon a DDL change

This patch introduces a new system table: `system.scylla_table_schema_history`,
which is used to keep track of column mappings for obsolete table
schema versions (i.e. schema becomes obsolete when it's being changed
by means of `CREATE TABLE` or `ALTER TABLE` DDL operations).

It is populated automatically when a new schema version is being
pulled from a remote in get_schema_definition() at migration_manager.cc
and also when schema change is being propagated to system schema tables
in do_merge_schema() at schema_tables.cc.

The data referring to the most recent table schema version is always
present. Other entries are garbage-collected when the corresponding
table schema version is obsoleted (they will be updated with a TTL equal
to `DEFAULT_GC_GRACE_SECONDS` on `ALTER TABLE`).

In case we failed to persist column mapping after a schema change,
missing entries will be recreated on node boot.

Later, the information from this table is used in `paxos_state::learn`
callback in case we have a mismatch between the most recent schema
version and the one that is stored inside the `frozen_mutation`
for the accepted proposal.

Such situation may arise under following circumstances:
 1. The previous LWT operation crashed on the "accept" stage,
    leaving behind a stale accepted proposal, which waits to be
    repaired.
 2. The table affected by LWT operation is being altered, so that
    schema version is now different. Stored proposal now references
    obsolete schema.
 3. LWT query is retried, so that Scylla tries to repair the
    unfinished Paxos round and apply the mutation in the learn stage.

When such mismatch happens, prior to that patch the stored
`frozen_mutation` is able to be applied only if we are lucky enough
and column_mapping in the mutation is "compatible" with the new
table schema.

It wouldn't work if, for example, the columns are reordered, or
some columns, which are referenced by an LWT query, are dropped.

With this patch we try to look up the column mapping for
the obsolete schema version, then upgrade the stored mutation
using obtained column mapping and apply an upgraded mutation instead.

In case we don't find a column_mapping we just return an error
from the learn stage.

Tests: unit(dev, debug), dtests(paxos_tests.py:TestPaxos.schema_mismatch_*_test)

Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
This commit is contained in:
Pavel Solodovnikov
2020-09-15 05:39:56 +03:00
parent e02301890b
commit 055fd3d8ad
7 changed files with 244 additions and 26 deletions

View File

@@ -629,9 +629,18 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
});
}).then([&proxy, this] {
return do_parse_schema_tables(proxy, db::schema_tables::TABLES, [this, &proxy] (schema_result_value_type &v) {
return create_tables_from_tables_partition(proxy, v.second).then([this] (std::map<sstring, schema_ptr> tables) {
return parallel_for_each(tables.begin(), tables.end(), [this] (auto& t) {
return this->add_column_family_and_make_directory(t.second);
return create_tables_from_tables_partition(proxy, v.second).then([this, &proxy] (std::map<sstring, schema_ptr> tables) {
return parallel_for_each(tables.begin(), tables.end(), [this, &proxy] (auto& t) {
return this->add_column_family_and_make_directory(t.second).then([&proxy, s = t.second] {
// Recreate missing column mapping entries in case
// we failed to persist them for some reason after a schema change
return db::schema_tables::column_mapping_exists(s->id(), s->version()).then([&proxy, s] (bool cm_exists) {
if (cm_exists) {
return make_ready_future<>();
}
return db::schema_tables::store_column_mapping(proxy, s, false);
});
});
});
});
});

View File

@@ -90,6 +90,14 @@
#include "index/target_parser.hh"
#include "lua.hh"
#include "db/query_context.hh"
#include "serializer.hh"
#include "idl/mutation.dist.hh"
#include "serializer_impl.hh"
#include "idl/mutation.dist.impl.hh"
#include "db/system_keyspace.hh"
#include "cql3/untyped_result_set.hh"
using namespace db::system_keyspace;
using namespace std::chrono_literals;
@@ -614,6 +622,38 @@ schema_ptr aggregates() {
return schema;
}
schema_ptr scylla_table_schema_history() {
static thread_local auto s = [] {
schema_builder builder(make_lw_shared(schema(
generate_legacy_id(db::system_keyspace::NAME, SCYLLA_TABLE_SCHEMA_HISTORY), db::system_keyspace::NAME, SCYLLA_TABLE_SCHEMA_HISTORY,
// partition key
{{"cf_id", uuid_type}},
// clustering key
{{"schema_version", uuid_type}, {"column_name", utf8_type}},
// regular columns
// mirrors the structure of the "columns" table which is essentially
// needed to represent a column mapping in serialized form
{
{"clustering_order", utf8_type},
{"column_name_bytes", bytes_type},
{"kind", utf8_type},
{"position", int32_type},
{"type", utf8_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"Scylla specific table to store a history of column mappings "
"for each table schema version upon an CREATE TABLE/ALTER TABLE operations"
)));
builder.with_version(generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::no);
}();
return s;
}
}
#if 0
@@ -906,6 +946,7 @@ static void fill_column_info(const schema& table,
const clustering_key& ckey,
const column_definition& column,
api::timestamp_type timestamp,
ttl_opt ttl,
mutation& m) {
auto order = "NONE";
if (column.is_clustering_key()) {
@@ -923,11 +964,38 @@ static void fill_column_info(const schema& table,
pos = table.position(column);
}
m.set_clustered_cell(ckey, "column_name_bytes", data_value(column.name()), timestamp);
m.set_clustered_cell(ckey, "kind", serialize_kind(column.kind), timestamp);
m.set_clustered_cell(ckey, "position", pos, timestamp);
m.set_clustered_cell(ckey, "clustering_order", sstring(order), timestamp);
m.set_clustered_cell(ckey, "type", type->as_cql3_type().to_string(), timestamp);
m.set_clustered_cell(ckey, "column_name_bytes", data_value(column.name()), timestamp, ttl);
m.set_clustered_cell(ckey, "kind", serialize_kind(column.kind), timestamp, ttl);
m.set_clustered_cell(ckey, "position", pos, timestamp, ttl);
m.set_clustered_cell(ckey, "clustering_order", sstring(order), timestamp, ttl);
m.set_clustered_cell(ckey, "type", type->as_cql3_type().to_string(), timestamp, ttl);
}
future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl) {
// Skip "system*" tables -- only user-related tables are relevant
if (static_cast<std::string_view>(s->ks_name()).starts_with(db::system_keyspace::NAME)) {
return make_ready_future<>();
}
schema_ptr history_tbl = scylla_table_schema_history();
// Insert the new column mapping for a given schema version (without TTL)
std::vector<mutation> muts;
partition_key pk = partition_key::from_exploded(*history_tbl, {uuid_type->decompose(s->id())});
ttl_opt ttl;
if (with_ttl) {
ttl = gc_clock::duration(DEFAULT_GC_GRACE_SECONDS);
}
// Use one timestamp for all mutations for the ease of debugging
const auto ts = api::new_timestamp();
for (const auto& cdef : boost::range::join(s->static_columns(), s->regular_columns())) {
mutation m(history_tbl, pk);
auto ckey = clustering_key::from_exploded(*history_tbl, {uuid_type->decompose(s->version()),
utf8_type->decompose(cdef.name_as_text())});
fill_column_info(*s, ckey, cdef, ts, ttl, m);
muts.emplace_back(std::move(m));
}
return proxy.local().mutate_locally(std::move(muts), tracing::trace_state_ptr());
}
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
@@ -1057,8 +1125,13 @@ struct schema_diff {
}};
};
struct altered_schema {
global_schema_ptr old_schema;
global_schema_ptr new_schema;
};
std::vector<global_schema_ptr> created;
std::vector<global_schema_ptr> altered;
std::vector<altered_schema> altered;
std::vector<dropped_schema> dropped;
size_t size() const {
@@ -1084,9 +1157,10 @@ static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy
d.created.emplace_back(s);
}
for (auto&& key : diff.entries_differing) {
auto s_before = create_schema(std::move(before.at(key)));
auto s = create_schema(std::move(after.at(key)));
slogger.info("Altering {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
d.altered.emplace_back(s);
d.altered.emplace_back(s_before, s);
}
return d;
}
@@ -1136,8 +1210,8 @@ static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
}
std::vector<bool> columns_changed;
columns_changed.reserve(tables_diff.altered.size() + views_diff.altered.size());
for (auto&& gs : boost::range::join(tables_diff.altered, views_diff.altered)) {
columns_changed.push_back(db.update_column_family(gs));
for (auto&& altered : boost::range::join(tables_diff.altered, views_diff.altered)) {
columns_changed.push_back(db.update_column_family(altered.new_schema));
}
auto it = columns_changed.begin();
@@ -1152,10 +1226,35 @@ static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
notify(tables_diff.created, [&] (auto&& gs) { return db.get_notifier().create_column_family(gs); });
notify(views_diff.created, [&] (auto&& gs) { return db.get_notifier().create_view(view_ptr(gs)); });
// Table altering is notified first, in case new base columns appear
notify(tables_diff.altered, [&] (auto&& gs) { return db.get_notifier().update_column_family(gs, *it++); });
notify(views_diff.altered, [&] (auto&& gs) { return db.get_notifier().update_view(view_ptr(gs), *it++); });
notify(tables_diff.altered, [&] (auto&& altered) { return db.get_notifier().update_column_family(altered.new_schema, *it++); });
notify(views_diff.altered, [&] (auto&& altered) { return db.get_notifier().update_view(view_ptr(altered.new_schema), *it++); });
});
}).get();
// Insert column_mapping into history table for altered and created tables.
//
// Entries for new tables are inserted without TTL, which means that the most
// recent schema version should always be available.
//
// For altered tables we both insert a new column mapping without TTL and
// overwrite the previous version entries with TTL to expire them eventually.
//
// Drop column mapping entries for dropped tables since these will not be TTLed automatically
// and will stay there forever if we don't clean them up manually
when_all_succeed(
parallel_for_each(tables_diff.created, [&proxy] (global_schema_ptr& gs) {
return store_column_mapping(proxy, gs.get(), false);
}),
parallel_for_each(tables_diff.altered, [&proxy] (schema_diff::altered_schema& altered) {
return when_all_succeed(
store_column_mapping(proxy, altered.old_schema.get(), true),
store_column_mapping(proxy, altered.new_schema.get(), false)).discard_result();
}),
parallel_for_each(tables_diff.dropped, [&proxy] (schema_diff::dropped_schema& dropped) {
schema_ptr s = dropped.schema.get();
return drop_column_mapping(s->id(), s->version());
})
).get();
}
static std::vector<const query::result_set_row*> collect_rows(const std::set<sstring>& keys, const schema_result& result) {
@@ -2396,7 +2495,7 @@ static void add_column_to_schema_mutation(schema_ptr table,
{
auto ckey = clustering_key::from_exploded(*m.schema(), {utf8_type->decompose(table->cf_name()),
utf8_type->decompose(column.name_as_text())});
fill_column_info(*table, ckey, column, timestamp, m);
fill_column_info(*table, ckey, column, timestamp, std::nullopt, m);
}
static void add_computed_column_to_schema_mutation(schema_ptr table,
@@ -2963,5 +3062,73 @@ future<schema_mutations> read_table_mutations(distributed<service::storage_proxy
} // namespace legacy
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version version) {
auto cm_fut = cql3::get_local_query_processor().execute_internal(
GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{table_id, version}
);
return cm_fut.then([version] (shared_ptr<cql3::untyped_result_set> results) {
if (results->empty()) {
// If we don't have a stored column_mapping for an obsolete schema version
// then it means it's way too old and been cleaned up already.
// Fail the whole learn stage in this case.
return make_exception_future<column_mapping>(std::runtime_error(
format("Failed to look up column mapping for schema version {}",
version)));
}
std::vector<column_definition> static_columns, regular_columns;
for (const auto& row : *results) {
auto kind = deserialize_kind(row.get_as<sstring>("kind"));
auto type = cql_type_parser::parse("" /*unused*/, row.get_as<sstring>("type"));
auto name_bytes = row.get_blob("column_name_bytes");
column_id position = row.get_as<int32_t>("position");
auto order = row.get_as<sstring>("clustering_order");
std::transform(order.begin(), order.end(), order.begin(), ::toupper);
if (order == "DESC") {
type = reversed_type_impl::get_instance(type);
}
if (kind == column_kind::static_column) {
static_columns.emplace_back(name_bytes, type, kind, position);
} else if (kind == column_kind::regular_column) {
regular_columns.emplace_back(name_bytes, type, kind, position);
}
}
std::vector<column_mapping_entry> cm_columns;
for (const column_definition& def : boost::range::join(static_columns, regular_columns)) {
cm_columns.emplace_back(column_mapping_entry{def.name(), def.type});
}
column_mapping cm(std::move(cm_columns), static_columns.size());
return make_ready_future<column_mapping>(std::move(cm));
});
}
future<bool> column_mapping_exists(utils::UUID table_id, table_schema_version version) {
return cql3::get_local_query_processor().execute_internal(
GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{table_id, version}
).then([] (shared_ptr<cql3::untyped_result_set> results) {
return !results->empty();
});
}
future<> drop_column_mapping(utils::UUID table_id, table_schema_version version) {
const static sstring DEL_COLUMN_MAPPING_QUERY =
format("DELETE FROM system.{} WHERE cf_id = ? and schema_version = ?",
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
return cql3::get_local_query_processor().execute_internal(
DEL_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
infinite_timeout_config,
{table_id, version}).discard_result();
}
} // namespace schema_tables
} // namespace schema
} // namespace schema

View File

@@ -109,6 +109,7 @@ static constexpr auto AGGREGATES = "aggregates";
static constexpr auto INDEXES = "indexes";
static constexpr auto VIEW_VIRTUAL_COLUMNS = "view_virtual_columns"; // Scylla specific
static constexpr auto COMPUTED_COLUMNS = "computed_columns"; // Scylla specific
static constexpr auto SCYLLA_TABLE_SCHEMA_HISTORY = "scylla_table_schema_history"; // Scylla specific;
schema_ptr columns();
schema_ptr view_virtual_columns();
@@ -118,6 +119,8 @@ schema_ptr tables();
schema_ptr scylla_tables(schema_features features = schema_features::full());
schema_ptr views();
schema_ptr computed_columns();
// Belongs to the "system" keyspace
schema_ptr scylla_table_schema_history();
}
@@ -247,5 +250,17 @@ std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const ss
return std::nullopt;
}
/// Stores the column mapping for the table being created or altered in the system table
/// which holds a history of schema versions alongside with their column mappings.
/// Can be used to insert entries with TTL (equal to DEFAULT_GC_GRACE_SECONDS) in case we are
/// overwriting an existing column mapping to garbage collect obsolete entries.
future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl);
/// Query column mapping for a given version of the table locally.
future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version version);
/// Check that column mapping exists for a given version of the table
future<bool> column_mapping_exists(utils::UUID table_id, table_schema_version version);
/// Delete matching column mapping entries from the `system.scylla_table_schema_history` table
future<> drop_column_mapping(utils::UUID table_id, table_schema_version version);
} // namespace schema_tables
} // namespace db

View File

@@ -1711,7 +1711,8 @@ std::vector<schema_ptr> all_tables() {
peers(), peer_events(), range_xfers(),
compactions_in_progress(), compaction_history(),
sstable_activity(), clients(), size_estimates(), large_partitions(), large_rows(), large_cells(),
scylla_local(), v3::views_builds_in_progress(), v3::built_views(),
scylla_local(), db::schema_tables::scylla_table_schema_history(),
v3::views_builds_in_progress(), v3::built_views(),
v3::scylla_views_builds_in_progress(),
v3::truncated(),
v3::cdc_local(),

View File

@@ -1110,7 +1110,17 @@ static future<> maybe_sync(const schema_ptr& s, netw::messaging_service::msg_add
future<schema_ptr> get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms) {
return local_schema_registry().get_or_load(v, [&ms, dst] (table_schema_version v) {
mlogger.debug("Requesting schema {} from {}", v, dst);
return ms.send_get_schema_version(dst, v);
return ms.send_get_schema_version(dst, v).then([] (frozen_schema s) {
auto& proxy = get_storage_proxy();
// Since the latest schema version is always present in the schema registry
// we only happen to query already outdated schema version, which is
// referenced by the incoming request.
// That means the column mapping for the schema should always be inserted
// with TTL (refresh TTL in case column mapping already existed prior to that).
return db::schema_tables::store_column_mapping(proxy, s.unfreeze(db::schema_ctxt(proxy)), true).then([s] {
return s;
});
});
});
}
@@ -1145,4 +1155,12 @@ future<> migration_manager::sync_schema(const database& db, const std::vector<gm
});
}
future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version v) {
schema_ptr s = local_schema_registry().get_or_null(v);
if (s) {
return make_ready_future<column_mapping>(s->get_column_mapping());
}
return db::schema_tables::get_column_mapping(table_id, v);
}
}

View File

@@ -208,4 +208,6 @@ future<schema_ptr> get_schema_for_read(table_schema_version, netw::msg_addr from
// Intended to be used in the write path, which relies on synchronized schema.
future<schema_ptr> get_schema_for_write(table_schema_version, netw::msg_addr from, netw::messaging_service& ms);
future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version v);
}

View File

@@ -32,6 +32,9 @@
#include "utils/error_injection.hh"
#include "db/schema_tables.hh"
#include "service/migration_manager.hh"
namespace service::paxos {
logging::logger paxos_state::logger("paxos");
@@ -195,20 +198,23 @@ future<> paxos_state::learn(schema_ptr schema, proposal decision, clock_type::ti
logger.debug("Committing decision {}", decision);
tracing::trace(tr_state, "Committing decision {}", decision);
schema_ptr s = schema;
// In case current schema is not the same as the schema in the decision
// try to look it up in the local schema_registry cache and upgrade
// try to look it up first in the local schema_registry cache and upgrade
// the mutation using schema from the cache.
//
// If there's no schema in the cache, then retrieve persisted column mapping
// for that version and upgrade the mutation with it.
if (decision.update.schema_version() != schema->version()) {
logger.debug("Stored mutation references outdated schema version. "
"Trying to upgrade the accepted proposal mutation to the most recent schema version.");
schema_ptr cached_schema = local_schema_registry().get_or_null(decision.update.schema_version());
if (cached_schema) {
logger.debug("Found the desired schema version in the local schema_registry.");
s = cached_schema;
}
return service::get_column_mapping(decision.update.column_family_id(), decision.update.schema_version())
.then([schema, tr_state, timeout, &decision] (const column_mapping& cm) {
return do_with(decision.update.unfreeze_upgrading(schema, cm), [tr_state, timeout] (const mutation& upgraded) {
return get_local_storage_proxy().mutate_locally(upgraded, tr_state, db::commitlog::force_sync::yes, timeout);
});
});
}
return get_local_storage_proxy().mutate_locally(s, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
return get_local_storage_proxy().mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
});
} else {
logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);