schema_tables: don't delete version cell from scylla_tables mutations from group 0

As explained in the previous commit, we use the new
`committed_by_group0` flag attached to each row of a `scylla_tables`
mutation to decide whether the `version` cell needs to be deleted or
not.

The rest of #13957 is solved by pre-existing code -- if the `version`
column is present in the mutation, we don't calculate a hash for
`schema::version()`, but take the value from the column:

```
table_schema_version schema_mutations::digest(db::schema_features sf)
const {
    if (_scylla_tables) {
        auto rs = query::result_set(*_scylla_tables);
        if (!rs.empty()) {
            auto&& row = rs.row(0);
            auto val = row.get<utils::UUID>("version");
            if (val) {
                return table_schema_version(*val);
            }
        }
    }

    ...
```

The issue will therefore be fixed once we enable
`GROUP0_SCHEMA_VERSIONING`.
This commit is contained in:
Kamil Braun
2023-08-31 16:38:51 +02:00
parent defcf9915c
commit 522540da40

View File

@@ -1122,14 +1122,27 @@ void feed_hash_for_schema_digest(hasher& h, const mutation& m, schema_features f
}
}
// Applies deletion of the "version" column to a system_schema.scylla_tables mutation.
static void delete_schema_version(mutation& m) {
// Applies deletion of the "version" column to system_schema.scylla_tables mutation rows
// which weren't committed by group 0.
static void maybe_delete_schema_version(mutation& m) {
if (m.column_family_id() != scylla_tables()->id()) {
return;
}
const column_definition& origin_col = *m.schema()->get_column_definition(to_bytes("committed_by_group0"));
const column_definition& version_col = *m.schema()->get_column_definition(to_bytes("version"));
for (auto&& row : m.partition().clustered_rows()) {
auto&& cells = row.row().cells();
if (auto&& origin_cell = cells.find_cell(origin_col.id); origin_cell) {
auto&& ac = origin_cell->as_atomic_cell(origin_col);
if (ac.is_live()) {
auto dv = origin_col.type->deserialize(managed_bytes_view(ac.value()));
auto committed_by_group0 = value_cast<bool>(dv);
if (committed_by_group0) {
// Don't delete "version" for this entry.
continue;
}
}
}
auto&& cell = cells.find_cell(version_col.id);
api::timestamp_type t = api::new_timestamp();
if (cell) {
@@ -1268,8 +1281,9 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, shar
keyspaces.emplace(std::move(keyspace_name));
column_families.emplace(mutation.column_family_id());
// We must force recalculation of schema version after the merge, since the resulting
// schema may be a mix of the old and new schemas.
delete_schema_version(mutation);
// schema may be a mix of the old and new schemas, with the exception of entries
// that originate from group 0.
maybe_delete_schema_version(mutation);
}
if (reload) {