diff --git a/alternator/error.hh b/alternator/error.hh index 8b67be908a..a063d59399 100644 --- a/alternator/error.hh +++ b/alternator/error.hh @@ -88,6 +88,9 @@ public: static api_error table_not_found(std::string msg) { return api_error("TableNotFoundException", std::move(msg)); } + static api_error limit_exceeded(std::string msg) { + return api_error("LimitExceededException", std::move(msg)); + } static api_error internal(std::string msg) { return api_error("InternalServerError", std::move(msg), http::reply::status_type::internal_server_error); } diff --git a/alternator/executor.cc b/alternator/executor.cc index 7dcd4bc0d6..5bd89e93a2 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -7,6 +7,7 @@ */ #include +#include #include "alternator/executor.hh" #include "alternator/consumed_capacity.hh" #include "auth/permission.hh" @@ -55,6 +56,9 @@ #include "utils/error_injection.hh" #include "db/schema_tables.hh" #include "utils/rjson.hh" +#include "alternator/extract_from_attrs.hh" +#include "types/types.hh" +#include "db/system_keyspace.hh" using namespace std::chrono_literals; @@ -215,7 +219,7 @@ static void validate_table_name(const std::string& name) { // instead of each component individually as DynamoDB does. // The view_name() function assumes the table_name has already been validated // but validates the legality of index_name and the combination of both. -static std::string view_name(const std::string& table_name, std::string_view index_name, const std::string& delim = ":") { +static std::string view_name(std::string_view table_name, std::string_view index_name, const std::string& delim = ":") { if (index_name.length() < 3) { throw api_error::validation("IndexName must be at least 3 characters long"); } @@ -223,7 +227,7 @@ static std::string view_name(const std::string& table_name, std::string_view ind throw api_error::validation( fmt::format("IndexName '{}' must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", index_name)); } - std::string ret = table_name + delim + std::string(index_name); + std::string ret = std::string(table_name) + delim + std::string(index_name); if (ret.length() > max_table_name_length) { throw api_error::validation( fmt::format("The total length of TableName ('{}') and IndexName ('{}') cannot exceed {} characters", @@ -232,7 +236,7 @@ static std::string view_name(const std::string& table_name, std::string_view ind return ret; } -static std::string lsi_name(const std::string& table_name, std::string_view index_name) { +static std::string lsi_name(std::string_view table_name, std::string_view index_name) { return view_name(table_name, index_name, "!:"); } @@ -469,7 +473,90 @@ static rjson::value generate_arn_for_index(const schema& schema, std::string_vie schema.ks_name(), schema.cf_name(), index_name)); } -static rjson::value fill_table_description(schema_ptr schema, table_status tbl_status, service::storage_proxy const& proxy) +// The following function checks if a given view has finished building. +// We need this for describe_table() to know if a view is still backfilling, +// or active. +// +// Currently we don't have in view_ptr the knowledge whether a view finished +// building long ago - so checking this involves a somewhat inefficient, but +// still node-local, process: +// We need a table that can accurately tell that all nodes have finished +// building this view. system.built_views is not good enough because it only +// knows the view building status in the current node. In recent versions, +// after PR #19745, we have a local table system.view_build_status_v2 with +// global information, replacing the old system_distributed.view_build_status. +// In theory, there can be a period during upgrading an old cluster when this +// table is not yet available. However, since the IndexStatus is a new feature +// too, it is acceptable that it doesn't yet work in the middle of the update. +static future is_view_built( + view_ptr view, + service::storage_proxy& proxy, + service::client_state& client_state, + tracing::trace_state_ptr trace_state, + service_permit permit) { + auto schema = proxy.data_dictionary().find_table( + "system", db::system_keyspace::VIEW_BUILD_STATUS_V2).schema(); + // The table system.view_build_status_v2 has "keyspace_name" and + // "view_name" as the partition key, and each clustering row has + // "host_id" as clustering key and a string "status". We need to + // read a single partition: + partition_key pk = partition_key::from_exploded(*schema, + {utf8_type->decompose(view->ks_name()), + utf8_type->decompose(view->cf_name())}); + dht::partition_range_vector partition_ranges{ + dht::partition_range(dht::decorate_key(*schema, pk))}; + auto selection = cql3::selection::selection::wildcard(schema); // only for get_query_options()! + auto partition_slice = query::partition_slice( + {query::clustering_range::make_open_ended_both_sides()}, + {}, // static columns + {schema->get_column_definition("status")->id}, // regular columns + selection->get_query_options()); + auto command = ::make_lw_shared( + schema->id(), schema->version(), partition_slice, + proxy.get_max_result_size(partition_slice), + query::tombstone_limit(proxy.get_tombstone_limit())); + service::storage_proxy::coordinator_query_result qr = + co_await proxy.query( + schema, std::move(command), std::move(partition_ranges), + db::consistency_level::LOCAL_ONE, + service::storage_proxy::coordinator_query_options( + executor::default_timeout(), std::move(permit), client_state, trace_state)); + query::result_set rs = query::result_set::from_raw_result( + schema, partition_slice, *qr.query_result); + std::unordered_map statuses; + for (auto&& r : rs.rows()) { + auto host_id = r.get("host_id"); + auto status = r.get("status"); + if (host_id && status) { + statuses.emplace(locator::host_id(*host_id), *status); + } + } + // A view is considered "built" if all nodes reported SUCCESS in having + // built this view. Note that we need this "SUCCESS" for all nodes in the + // cluster - even those that are temporarily down (their success is known + // by this node, even if they are down). Conversely, we don't care what is + // the recorded status for any node which is no longer in the cluster - it + // is possible we forgot to erase the status of nodes that left the + // cluster, but here we just ignore them and look at the nodes actually + // in the topology. + bool all_built = true; + auto token_metadata = proxy.get_token_metadata_ptr(); + token_metadata->get_topology().for_each_node( + [&] (const locator::node& node) { + // Note: we could skip nodes in DCs which have no replication of + // this view. However, in practice even those nodes would run + // the view building (and just see empty content) so we don't + // need to bother with this skipping. + auto it = statuses.find(node.host_id()); + if (it == statuses.end() || it->second != "SUCCESS") { + all_built = false; + } + }); + co_return all_built; + +} + +static future fill_table_description(schema_ptr schema, table_status tbl_status, service::storage_proxy& proxy, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) { rjson::value table_description = rjson::empty_object(); auto tags_ptr = db::get_tags_of_table(schema); @@ -548,7 +635,22 @@ static rjson::value fill_table_description(schema_ptr schema, table_status tbl_s // FIXME: we have to get ProjectionType from the schema when it is added rjson::add(view_entry, "Projection", std::move(projection)); // Local secondary indexes are marked by an extra '!' sign occurring before the ':' delimiter - rjson::value& index_array = (delim_it > 1 && cf_name[delim_it-1] == '!') ? lsi_array : gsi_array; + bool is_lsi = (delim_it > 1 && cf_name[delim_it-1] == '!'); + // Add IndexStatus and Backfilling flags, but only for GSIs - + // LSIs can only be created with the table itself and do not + // have a status. Alternator schema operations are synchronous + // so only two combinations of these flags are possible: ACTIVE + // (for a built view) or CREATING+Backfilling (if view building + // is in progress). + if (!is_lsi) { + if (co_await is_view_built(vptr, proxy, client_state, trace_state, permit)) { + rjson::add(view_entry, "IndexStatus", "ACTIVE"); + } else { + rjson::add(view_entry, "IndexStatus", "CREATING"); + rjson::add(view_entry, "Backfilling", rjson::value(true)); + } + } + rjson::value& index_array = is_lsi ? lsi_array : gsi_array; rjson::push_back(index_array, std::move(view_entry)); } if (!lsi_array.Empty()) { @@ -572,7 +674,7 @@ static rjson::value fill_table_description(schema_ptr schema, table_status tbl_s executor::supplement_table_stream_info(table_description, *schema, proxy); // FIXME: still missing some response fields (issue #5026) - return table_description; + co_return table_description; } bool is_alternator_keyspace(const sstring& ks_name) { @@ -591,11 +693,11 @@ future executor::describe_table(client_state& cli tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); - rjson::value table_description = fill_table_description(schema, table_status::active, _proxy); + rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit); rjson::value response = rjson::empty_object(); rjson::add(response, "Table", std::move(table_description)); elogger.trace("returning {}", response); - return make_ready_future(make_jsonable(std::move(response))); + co_return make_jsonable(std::move(response)); } // Check CQL's Role-Based Access Control (RBAC) permission_to_check (MODIFY, @@ -656,7 +758,7 @@ future executor::delete_table(client_state& clien auto& p = _proxy.container(); schema_ptr schema = get_table(_proxy, request); - rjson::value table_description = fill_table_description(schema, table_status::deleting, _proxy); + rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit); co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::DROP); co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> { // FIXME: the following needs to be in a loop. If mm.announce() below @@ -704,7 +806,7 @@ future executor::delete_table(client_state& clien co_return make_jsonable(std::move(response)); } -static data_type parse_key_type(const std::string& type) { +static data_type parse_key_type(std::string_view type) { // Note that keys are only allowed to be string, blob or number (S/B/N). // The other types: boolean and various lists or sets - are not allowed. if (type.length() == 1) { @@ -719,7 +821,7 @@ static data_type parse_key_type(const std::string& type) { } -static void add_column(schema_builder& builder, const std::string& name, const rjson::value& attribute_definitions, column_kind kind) { +static void add_column(schema_builder& builder, const std::string& name, const rjson::value& attribute_definitions, column_kind kind, bool computed_column=false) { // FIXME: Currently, the column name ATTRS_COLUMN_NAME is not allowed // because we use it for our untyped attribute map, and we can't have a // second column with the same name. We should fix this, by renaming @@ -731,7 +833,16 @@ static void add_column(schema_builder& builder, const std::string& name, const r const rjson::value& attribute_info = *it; if (attribute_info["AttributeName"].GetString() == name) { auto type = attribute_info["AttributeType"].GetString(); - builder.with_column(to_bytes(name), parse_key_type(type), kind); + data_type dt = parse_key_type(type); + if (computed_column) { + // Computed column for GSI (doesn't choose a real column as-is + // but rather extracts a single value from the ":attrs" map) + alternator_type at = type_info_from_string(type).atype; + builder.with_computed_column(to_bytes(name), dt, kind, + std::make_unique(to_bytes(name), at)); + } else { + builder.with_column(to_bytes(name), dt, kind); + } return; } } @@ -1072,6 +1183,87 @@ static std::unordered_set validate_attribute_definitions(const rjso return seen_attribute_names; } +// The following "extract_from_attrs_column_computation" implementation is +// what allows Alternator GSIs to use in a materialized view's key a member +// from the ":attrs" map instead of a real column in the schema: + +const bytes extract_from_attrs_column_computation::MAP_NAME = executor::ATTRS_COLUMN_NAME; + +column_computation_ptr extract_from_attrs_column_computation::clone() const { + return std::make_unique(*this); +} + +// Serialize the *definition* of this column computation into a JSON +// string with a unique "type" string - TYPE_NAME - which then causes +// column_computation::deserialize() to create an object from this class. +bytes extract_from_attrs_column_computation::serialize() const { + rjson::value ret = rjson::empty_object(); + rjson::add(ret, "type", TYPE_NAME); + rjson::add(ret, "attr_name", rjson::from_string(to_string_view(_attr_name))); + rjson::add(ret, "desired_type", represent_type(_desired_type).ident); + return to_bytes(rjson::print(ret)); +} + +// Construct an extract_from_attrs_column_computation object based on the +// saved output of serialize(). Calls on_internal_error() if the string +// doesn't match the expected output format of serialize(). "type" is not +// checked - we assume the caller (column_computation::deserialize()) won't +// call this constructor if "type" doesn't match. +extract_from_attrs_column_computation::extract_from_attrs_column_computation(const rjson::value &v) { + const rjson::value* attr_name = rjson::find(v, "attr_name"); + if (attr_name->IsString()) { + _attr_name = bytes(to_bytes_view(rjson::to_string_view(*attr_name))); + const rjson::value* desired_type = rjson::find(v, "desired_type"); + if (desired_type->IsString()) { + _desired_type = type_info_from_string(rjson::to_string_view(*desired_type)).atype; + switch (_desired_type) { + case alternator_type::S: + case alternator_type::B: + case alternator_type::N: + // We're done + return; + default: + // Fall through to on_internal_error below. + break; + } + } + } + on_internal_error(elogger, format("Improperly formatted alternator::extract_from_attrs_column_computation computed column definition: {}", v)); +} + +regular_column_transformation::result extract_from_attrs_column_computation::compute_value( + const schema& schema, + const partition_key& key, + const db::view::clustering_or_static_row& row) const +{ + const column_definition* attrs_col = schema.get_column_definition(MAP_NAME); + if (!attrs_col || !attrs_col->is_regular() || !attrs_col->is_multi_cell()) { + on_internal_error(elogger, "extract_from_attrs_column_computation::compute_value() on a table without an attrs map"); + } + // Look for the desired attribute _attr_name in the attrs_col map in row: + const atomic_cell_or_collection* attrs = row.cells().find_cell(attrs_col->id); + if (!attrs) { + return regular_column_transformation::result(); + } + collection_mutation_view cmv = attrs->as_collection_mutation(); + return cmv.with_deserialized(*attrs_col->type, [this] (const collection_mutation_view_description& cmvd) { + for (auto&& [key, cell] : cmvd.cells) { + if (key == _attr_name) { + return regular_column_transformation::result(cell, + std::bind(serialized_value_if_type, std::placeholders::_1, _desired_type)); + } + } + return regular_column_transformation::result(); + }); +} + +// extract_from_attrs_column_computation needs the whole row to compute +// value, it cann't use just the partition key. +bytes extract_from_attrs_column_computation::compute_value(const schema&, const partition_key&) const { + on_internal_error(elogger, "extract_from_attrs_column_computation::compute_value called without row"); +} + + static future create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization) { SCYLLA_ASSERT(this_shard_id() == 0); @@ -1110,67 +1302,15 @@ static future create_table_on_shard0(service::cli schema_ptr partial_schema = builder.build(); - // Parse GlobalSecondaryIndexes parameters before creating the base - // table, so if we have a parse errors we can fail without creating + // Parse Local/GlobalSecondaryIndexes parameters before creating the + // base table, so if we have a parse errors we can fail without creating // any table. - const rjson::value* gsi = rjson::find(request, "GlobalSecondaryIndexes"); std::vector view_builders; std::unordered_set index_names; - if (gsi) { - if (!gsi->IsArray()) { - co_return api_error::validation("GlobalSecondaryIndexes must be an array."); - } - for (const rjson::value& g : gsi->GetArray()) { - const rjson::value* index_name_v = rjson::find(g, "IndexName"); - if (!index_name_v || !index_name_v->IsString()) { - co_return api_error::validation("GlobalSecondaryIndexes IndexName must be a string."); - } - std::string_view index_name = rjson::to_string_view(*index_name_v); - auto [it, added] = index_names.emplace(index_name); - if (!added) { - co_return api_error::validation(fmt::format("Duplicate IndexName '{}', ", index_name)); - } - std::string vname(view_name(table_name, index_name)); - elogger.trace("Adding GSI {}", index_name); - // FIXME: read and handle "Projection" parameter. This will - // require the MV code to copy just parts of the attrs map. - schema_builder view_builder(keyspace_name, vname); - auto [view_hash_key, view_range_key] = parse_key_schema(g); - if (partial_schema->get_column_definition(to_bytes(view_hash_key)) == nullptr) { - // A column that exists in a global secondary index is upgraded from being a map entry - // to having a regular column definition in the base schema - add_column(builder, view_hash_key, attribute_definitions, column_kind::regular_column); - } - add_column(view_builder, view_hash_key, attribute_definitions, column_kind::partition_key); - unused_attribute_definitions.erase(view_hash_key); - if (!view_range_key.empty()) { - if (partial_schema->get_column_definition(to_bytes(view_range_key)) == nullptr) { - // A column that exists in a global secondary index is upgraded from being a map entry - // to having a regular column definition in the base schema - if (partial_schema->get_column_definition(to_bytes(view_hash_key)) == nullptr) { - // FIXME: this is alternator limitation only, because Scylla's materialized views - // we use underneath do not allow more than 1 base regular column to be part of the MV key - elogger.warn("Only 1 regular column from the base table should be used in the GSI key in order to ensure correct liveness management without assumptions"); - } - add_column(builder, view_range_key, attribute_definitions, column_kind::regular_column); - } - add_column(view_builder, view_range_key, attribute_definitions, column_kind::clustering_key); - unused_attribute_definitions.erase(view_range_key); - } - // Base key columns which aren't part of the index's key need to - // be added to the view nonetheless, as (additional) clustering - // key(s). - if (hash_key != view_hash_key && hash_key != view_range_key) { - add_column(view_builder, hash_key, attribute_definitions, column_kind::clustering_key); - } - if (!range_key.empty() && range_key != view_hash_key && range_key != view_range_key) { - add_column(view_builder, range_key, attribute_definitions, column_kind::clustering_key); - } - // GSIs have no tags: - view_builder.add_extension(db::tags_extension::NAME, ::make_shared()); - view_builders.emplace_back(std::move(view_builder)); - } - } + // Remember the attributes used for LSI keys. Since LSI must be created + // with the table, we make these attributes real schema columns, and need + // to remember this below if the same attributes are used as GSI keys. + std::unordered_set lsi_range_keys; const rjson::value* lsi = rjson::find(request, "LocalSecondaryIndexes"); if (lsi) { @@ -1228,9 +1368,68 @@ static future create_table_on_shard0(service::cli std::map tags_map = {{db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY, "true"}}; view_builder.add_extension(db::tags_extension::NAME, ::make_shared(tags_map)); view_builders.emplace_back(std::move(view_builder)); + lsi_range_keys.emplace(view_range_key); } } + const rjson::value* gsi = rjson::find(request, "GlobalSecondaryIndexes"); + if (gsi) { + if (!gsi->IsArray()) { + co_return api_error::validation("GlobalSecondaryIndexes must be an array."); + } + for (const rjson::value& g : gsi->GetArray()) { + const rjson::value* index_name_v = rjson::find(g, "IndexName"); + if (!index_name_v || !index_name_v->IsString()) { + co_return api_error::validation("GlobalSecondaryIndexes IndexName must be a string."); + } + std::string_view index_name = rjson::to_string_view(*index_name_v); + auto [it, added] = index_names.emplace(index_name); + if (!added) { + co_return api_error::validation(fmt::format("Duplicate IndexName '{}', ", index_name)); + } + std::string vname(view_name(table_name, index_name)); + elogger.trace("Adding GSI {}", index_name); + // FIXME: read and handle "Projection" parameter. This will + // require the MV code to copy just parts of the attrs map. + schema_builder view_builder(keyspace_name, vname); + auto [view_hash_key, view_range_key] = parse_key_schema(g); + + // If an attribute is already a real column in the base table + // (i.e., a key attribute) or we already made it a real column + // as an LSI key above, we can use it directly as a view key. + // Otherwise, we need to add it as a "computed column", which + // extracts and deserializes the attribute from the ":attrs" map. + bool view_hash_key_real_column = + partial_schema->get_column_definition(to_bytes(view_hash_key)) || + lsi_range_keys.contains(view_hash_key); + add_column(view_builder, view_hash_key, attribute_definitions, column_kind::partition_key, !view_hash_key_real_column); + unused_attribute_definitions.erase(view_hash_key); + if (!view_range_key.empty()) { + bool view_range_key_real_column = + partial_schema->get_column_definition(to_bytes(view_range_key)) || + lsi_range_keys.contains(view_range_key); + add_column(view_builder, view_range_key, attribute_definitions, column_kind::clustering_key, !view_range_key_real_column); + if (!partial_schema->get_column_definition(to_bytes(view_range_key)) && + !partial_schema->get_column_definition(to_bytes(view_hash_key))) { + // FIXME: This warning should go away. See issue #6714 + elogger.warn("Only 1 regular column from the base table should be used in the GSI key in order to ensure correct liveness management without assumptions"); + } + unused_attribute_definitions.erase(view_range_key); + } + // Base key columns which aren't part of the index's key need to + // be added to the view nonetheless, as (additional) clustering + // key(s). + if (hash_key != view_hash_key && hash_key != view_range_key) { + add_column(view_builder, hash_key, attribute_definitions, column_kind::clustering_key); + } + if (!range_key.empty() && range_key != view_hash_key && range_key != view_range_key) { + add_column(view_builder, range_key, attribute_definitions, column_kind::clustering_key); + } + // GSIs have no tags: + view_builder.add_extension(db::tags_extension::NAME, ::make_shared()); + view_builders.emplace_back(std::move(view_builder)); + } + } if (!unused_attribute_definitions.empty()) { co_return api_error::validation(fmt::format( "AttributeDefinitions defines spurious attributes not used by any KeySchema: {}", @@ -1371,12 +1570,37 @@ future executor::create_table(client_state& clien }); } +// When UpdateTable adds a GSI, the type of its key columns must be specified +// in a AttributeDefinitions. If one of these key columns are *already* key +// columns of the base table or any of its prior GSIs or LSIs, the type +// given in AttributeDefinitions must match the type of the existing key - +// otherise Alternator will not know which type to enforce in new writes. +// This function checks for such conflicts. It assumes that the structure of +// the given attribute_definitions was already validated (with +// validate_attribute_definitions()). +// This function should be called multiple times - once for the base schema +// and once for each of its views (existing GSIs and LSIs on this table). + static void check_attribute_definitions_conflicts(const rjson::value& attribute_definitions, const schema& schema) { + for (auto& def : schema.primary_key_columns()) { + std::string def_type = type_to_string(def.type); + for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) { + const rjson::value& attribute_info = *it; + if (attribute_info["AttributeName"].GetString() == def.name_as_text()) { + auto type = attribute_info["AttributeType"].GetString(); + if (type != def_type) { + throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type)); + } + break; + } + } + } +} + future executor::update_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) { _stats.api_operations.update_table++; elogger.trace("Updating table {}", request); static const std::vector unsupported = { - "GlobalSecondaryIndexUpdates", "ProvisionedThroughput", "ReplicaUpdates", "SSESpecification", @@ -1388,11 +1612,14 @@ future executor::update_table(client_state& clien } } + bool empty_request = true; + if (rjson::find(request, "BillingMode")) { + empty_request = false; verify_billing_mode(request); } - co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard()] + co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request] (service::migration_manager& mm) mutable -> future { // FIXME: the following needs to be in a loop. If mm.announce() below // fails, we need to retry the whole thing. @@ -1412,6 +1639,7 @@ future executor::update_table(client_state& clien rjson::value* stream_specification = rjson::find(request, "StreamSpecification"); if (stream_specification && stream_specification->IsObject()) { + empty_request = false; add_stream_options(*stream_specification, builder, p.local()); // Alternator Streams doesn't yet work when the table uses tablets (#16317) auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled"); @@ -1423,8 +1651,162 @@ future executor::update_table(client_state& clien } auto schema = builder.build(); + std::vector new_views; + std::vector dropped_views; + + rjson::value* gsi_updates = rjson::find(request, "GlobalSecondaryIndexUpdates"); + if (gsi_updates) { + if (!gsi_updates->IsArray()) { + co_return api_error::validation("GlobalSecondaryIndexUpdates must be an array"); + } + if (gsi_updates->Size() > 1) { + // Although UpdateTable takes an array of operations and could + // support multiple Create and/or Delete operations in one + // command, DynamoDB doesn't actually allows this, and throws + // a LimitExceededException if this is attempted. + co_return api_error::limit_exceeded("GlobalSecondaryIndexUpdates only allows one index creation or deletion"); + } + if (gsi_updates->Size() == 1) { + empty_request = false; + if (!(*gsi_updates)[0].IsObject() || (*gsi_updates)[0].MemberCount() != 1) { + co_return api_error::validation("GlobalSecondaryIndexUpdates array must contain one object with a Create, Delete or Update operation"); + } + auto it = (*gsi_updates)[0].MemberBegin(); + const std::string_view op = rjson::to_string_view(it->name); + if (!it->value.IsObject()) { + co_return api_error::validation("GlobalSecondaryIndexUpdates entries must be objects"); + } + const rjson::value* index_name_v = rjson::find(it->value, "IndexName"); + if (!index_name_v || !index_name_v->IsString()) { + co_return api_error::validation("GlobalSecondaryIndexUpdates operation must have IndexName"); + } + std::string_view index_name = rjson::to_string_view(*index_name_v); + std::string_view table_name = schema->cf_name(); + std::string_view keyspace_name = schema->ks_name(); + std::string vname(view_name(table_name, index_name)); + if (op == "Create") { + const rjson::value* attribute_definitions = rjson::find(request, "AttributeDefinitions"); + if (!attribute_definitions) { + co_return api_error::validation("GlobalSecondaryIndexUpdates Create needs AttributeDefinitions"); + } + std::unordered_set unused_attribute_definitions = + validate_attribute_definitions(*attribute_definitions); + check_attribute_definitions_conflicts(*attribute_definitions, *schema); + for (auto& view : p.local().data_dictionary().find_column_family(tab).views()) { + check_attribute_definitions_conflicts(*attribute_definitions, *view); + } + + if (p.local().data_dictionary().has_schema(keyspace_name, vname)) { + // Surprisingly, DynamoDB uses validation error here, not resource_in_use + co_return api_error::validation(fmt::format( + "GSI {} already exists in table {}", index_name, table_name)); + } + if (p.local().data_dictionary().has_schema(keyspace_name, lsi_name(table_name, index_name))) { + co_return api_error::validation(fmt::format( + "LSI {} already exists in table {}, can't use same name for GSI", index_name, table_name)); + } + + elogger.trace("Adding GSI {}", index_name); + // FIXME: read and handle "Projection" parameter. This will + // require the MV code to copy just parts of the attrs map. + schema_builder view_builder(keyspace_name, vname); + auto [view_hash_key, view_range_key] = parse_key_schema(it->value); + // If an attribute is already a real column in the base + // table (i.e., a key attribute in the base table or LSI), + // we can use it directly as a view key. Otherwise, we + // need to add it as a "computed column", which extracts + // and deserializes the attribute from the ":attrs" map. + bool view_hash_key_real_column = + schema->get_column_definition(to_bytes(view_hash_key)); + add_column(view_builder, view_hash_key, *attribute_definitions, column_kind::partition_key, !view_hash_key_real_column); + unused_attribute_definitions.erase(view_hash_key); + if (!view_range_key.empty()) { + bool view_range_key_real_column = + schema->get_column_definition(to_bytes(view_range_key)); + add_column(view_builder, view_range_key, *attribute_definitions, column_kind::clustering_key, !view_range_key_real_column); + if (!schema->get_column_definition(to_bytes(view_range_key)) && + !schema->get_column_definition(to_bytes(view_hash_key))) { + // FIXME: This warning should go away. See issue #6714 + elogger.warn("Only 1 regular column from the base table should be used in the GSI key in order to ensure correct liveness management without assumptions"); + } + unused_attribute_definitions.erase(view_range_key); + } + // Surprisingly, although DynamoDB checks for unused + // AttributeDefinitions in CreateTable, it does not + // check it in UpdateTable. We decided to check anyway. + if (!unused_attribute_definitions.empty()) { + co_return api_error::validation(fmt::format( + "AttributeDefinitions defines spurious attributes not used by any KeySchema: {}", + unused_attribute_definitions)); + } + // Base key columns which aren't part of the index's key need to + // be added to the view nonetheless, as (additional) clustering + // key(s). + for (auto& def : schema->primary_key_columns()) { + if (def.name_as_text() != view_hash_key && def.name_as_text() != view_range_key) { + view_builder.with_column(def.name(), def.type, column_kind::clustering_key); + } + } + // GSIs have no tags: + view_builder.add_extension(db::tags_extension::NAME, ::make_shared()); + // Note below we don't need to add virtual columns, as all + // base columns were copied to view. TODO: reconsider the need + // for virtual columns when we support Projection. + for (const column_definition& regular_cdef : schema->regular_columns()) { + if (!view_builder.has_column(*cql3::to_identifier(regular_cdef))) { + view_builder.with_column(regular_cdef.name(), regular_cdef.type, column_kind::regular_column); + } + } + const bool include_all_columns = true; + view_builder.with_view_info(*schema, include_all_columns, ""/*where clause*/); + new_views.emplace_back(view_builder.build()); + } else if (op == "Delete") { + elogger.trace("Deleting GSI {}", index_name); + if (!p.local().data_dictionary().has_schema(keyspace_name, vname)) { + co_return api_error::resource_not_found(fmt::format("No GSI {} in table {}", index_name, table_name)); + } + dropped_views.emplace_back(vname); + } else if (op == "Update") { + co_return api_error::validation("GlobalSecondaryIndexUpdates Update not yet supported"); + } else { + co_return api_error::validation(fmt::format("GlobalSecondaryIndexUpdates supports a Create, Delete or Update operation, saw '{}'", op)); + } + } + } + + if (empty_request) { + co_return api_error::validation("UpdateTable requires one of GlobalSecondaryIndexUpdates, StreamSpecification or BillingMode to be specified"); + } + co_await verify_permission(enforce_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER); - auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector(), group0_guard.write_timestamp()); + auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector(), group0_guard.write_timestamp()); + for (view_ptr view : new_views) { + auto m2 = co_await service::prepare_new_view_announcement(p.local(), view, group0_guard.write_timestamp()); + std::move(m2.begin(), m2.end(), std::back_inserter(m)); + } + for (const std::string& view_name : dropped_views) { + auto m2 = co_await service::prepare_view_drop_announcement(p.local(), schema->ks_name(), view_name, group0_guard.write_timestamp()); + std::move(m2.begin(), m2.end(), std::back_inserter(m)); + } + // If a role is allowed to create a GSI, we should give it permissions + // to read the GSI it just created. This is known as "auto-grant". + // Also, when we delete a GSI we should revoke any permissions set on + // it - so if it's ever created again the old permissions wouldn't be + // remembered for the new GSI. This is known as "auto-revoke" + if (client_state_other_shard.get().user() && (!new_views.empty() || !dropped_views.empty())) { + service::group0_batch mc(std::move(group0_guard)); + mc.add_mutations(std::move(m)); + for (view_ptr view : new_views) { + auto resource = auth::make_data_resource(view->ks_name(), view->cf_name()); + co_await auth::grant_applicable_permissions( + *client_state_other_shard.get().get_auth_service(), *client_state_other_shard.get().user(), resource, mc); + } + for (const auto& view_name : dropped_views) { + auto resource = auth::make_data_resource(schema->ks_name(), view_name); + co_await auth::revoke_all(*client_state_other_shard.get().get_auth_service(), resource, mc); + } + std::tie(m, group0_guard) = co_await std::move(mc).extract(); + } co_await mm.announce(std::move(m), std::move(group0_guard), format("alternator-executor: update {} table", tab->cf_name())); @@ -1546,7 +1928,7 @@ public: struct delete_item {}; struct put_item {}; put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item); - put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item); + put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map key_attributes); // put_or_delete_item doesn't keep a reference to schema (so it can be // moved between shards for LWT) so it needs to be given again to build(): mutation build(schema_ptr schema, api::timestamp_type ts) const; @@ -1578,7 +1960,75 @@ static inline const column_definition* find_attribute(const schema& schema, cons return cdef; } -put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item) + +// Get a list of all attributes that serve as a key attributes for any of the +// GSIs or LSIs of this table, and the declared type for each (can be only +// "S", "B", or "N"). The implementation below will also list the base table's +// key columns (they are the views' clustering keys). +std::unordered_map si_key_attributes(data_dictionary::table t) { + std::unordered_map ret; + for (const view_ptr& v : t.views()) { + for (const column_definition& cdef : v->partition_key_columns()) { + ret[cdef.name()] = type_to_string(cdef.type); + } + for (const column_definition& cdef : v->clustering_key_columns()) { + ret[cdef.name()] = type_to_string(cdef.type); + } + } + return ret; +} + +// When an attribute is a key (hash or sort) of one of the GSIs on a table, +// DynamoDB refuses an update to that attribute with an unsuitable value. +// Unsuitable values are: +// 1. An empty string (those are normally allowed as values, but not allowed +// as keys, including GSI keys). +// 2. A value with a type different than that declared for the GSI key. +// Normally non-key attributes can take values of any type (DynamoDB is +// schema-less), but as soon as an attribute is used as a GSI key, it +// must be set only to the specific type declared for that key. +// (Note that a missing value for an GSI key attribute is fine - the update +// will happen on the base table, but won't reach the view table. In this +// case, this function simply won't be called for this attribute.) +// +// This function checks if the given attribute update is an update to some +// GSI's key, and if the value is unsuitable, a api_error::validation is +// thrown. The checking here is similar to the checking done in +// get_key_from_typed_value() for the base table's key columns. +// +// validate_value_if_gsi_key() should only be called after validate_value() +// already validated that the value itself has a valid form. +static inline void validate_value_if_gsi_key( + std::unordered_map key_attributes, + const bytes& attribute, + const rjson::value& value) { + if (key_attributes.empty()) { + return; + } + auto it = key_attributes.find(attribute); + if (it == key_attributes.end()) { + // Given attribute is not a key column with a fixed type, so no + // more validation to do. + return; + } + const std::string& expected_type = it->second; + // We assume that validate_value() was previously called on this value, + // so value is known to be of the proper format (an object with one + // member, whose key and value are strings) + std::string_view value_type = rjson::to_string_view(value.MemberBegin()->name); + if (expected_type != value_type) { + throw api_error::validation(fmt::format( + "Type mismatch: expected type {} for GSI key attribute {}, got type {}", + expected_type, to_string_view(attribute), value_type)); + } + std::string_view value_content = rjson::to_string_view(value.MemberBegin()->value); + if (value_content.empty()) { + throw api_error::validation(fmt::format( + "GSI key attribute {} cannot be set to an empty string", to_string_view(attribute))); + } +} + +put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map key_attributes) : _pk(pk_from_json(item, schema)), _ck(ck_from_json(item, schema)) { _cells = std::vector(); _cells->reserve(item.MemberCount()); @@ -1588,6 +2038,9 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche const column_definition* cdef = find_attribute(*schema, column_name); _length_in_bytes += column_name.size(); if (!cdef) { + // This attribute may be a key column of one of the GSI, in which + // case there are some limitations on the value + validate_value_if_gsi_key(key_attributes, column_name, it->value); bytes value = serialize_item(it->value); if (value.size()) { // ScyllaDB uses one extra byte compared to DynamoDB for the bytes length @@ -1595,7 +2048,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche } _cells->push_back({std::move(column_name), serialize_item(it->value)}); } else if (!cdef->is_primary_key()) { - // Fixed-type regular column can be used for GSI key + // Fixed-type regular column can be used for LSI key bytes value = get_key_from_typed_value(it->value, *cdef); _cells->push_back({std::move(column_name), value}); @@ -1954,7 +2407,8 @@ public: parsed::condition_expression _condition_expression; put_item_operation(service::storage_proxy& proxy, rjson::value&& request) : rmw_operation(proxy, std::move(request)) - , _mutation_builder(rjson::get(_request, "Item"), schema(), put_or_delete_item::put_item{}) { + , _mutation_builder(rjson::get(_request, "Item"), schema(), put_or_delete_item::put_item{}, + si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name()))) { _pk = _mutation_builder.pk(); _ck = _mutation_builder.ck(); if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) { @@ -2315,7 +2769,8 @@ future executor::batch_write_item(client_state& c const rjson::value& put_request = r->value; const rjson::value& item = put_request["Item"]; mutation_builders.emplace_back(schema, put_or_delete_item( - item, schema, put_or_delete_item::put_item{})); + item, schema, put_or_delete_item::put_item{}, + si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name())))); auto mut_key = std::make_pair(mutation_builders.back().second.pk(), mutation_builders.back().second.ck()); if (used_keys.contains(mut_key)) { co_return api_error::validation("Provided list of item keys contains duplicates"); @@ -2859,6 +3314,10 @@ public: // them by top-level attribute, and detects forbidden overlaps/conflicts. attribute_path_map _update_expression; + // Saved list of GSI keys in the table being updated, used for + // validate_value_if_gsi_key() + std::unordered_map _key_attributes; + parsed::condition_expression _condition_expression; update_item_operation(service::storage_proxy& proxy, rjson::value&& request); @@ -2950,6 +3409,9 @@ update_item_operation::update_item_operation(service::storage_proxy& proxy, rjso if (expression_attribute_values) { _consumed_capacity._total_bytes += estimate_value_size(*expression_attribute_values); } + + _key_attributes = si_key_attributes(proxy.data_dictionary().find_table( + _schema->ks_name(), _schema->cf_name())); } // These are the cases where update_item_operation::apply() needs to use @@ -3247,6 +3709,9 @@ update_item_operation::apply(std::unique_ptr previous_item, api::t bytes column_value = get_key_from_typed_value(json_value, *cdef); row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, ts, column_value)); } else { + // This attribute may be a key column of one of the GSIs, in which + // case there are some limitations on the value. + validate_value_if_gsi_key(_key_attributes, column_name, json_value); attrs_collector.put(std::move(column_name), serialize_item(json_value), ts); } }; diff --git a/alternator/extract_from_attrs.hh b/alternator/extract_from_attrs.hh new file mode 100644 index 0000000000..52966a382e --- /dev/null +++ b/alternator/extract_from_attrs.hh @@ -0,0 +1,73 @@ +/* + * Copyright 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include +#include + +#include "utils/rjson.hh" +#include "serialization.hh" +#include "column_computation.hh" +#include "db/view/regular_column_transformation.hh" + +namespace alternator { + +// An implementation of a "column_computation" which extracts a specific +// non-key attribute from the big map (":attrs") of all non-key attributes, +// and deserializes it if it has the desired type. GSI will use this computed +// column as a materialized-view key when the view key attribute isn't a +// full-fledged CQL column but rather stored in ":attrs". +class extract_from_attrs_column_computation : public regular_column_transformation { + // The name of the CQL column name holding the attribute map. It is a + // constant defined in executor.cc (as ":attrs"), so doesn't need + // to be specified when constructing the column computation. + static const bytes MAP_NAME; + // The top-level attribute name to extract from the ":attrs" map. + bytes _attr_name; + // The type we expect for the value stored in the attribute. If the type + // matches the expected type, it is decoded from the serialized format + // we store in the map's values) into the raw CQL type value that we use + // for keys, and returned by compute_value(). Only the types "S" (string), + // "B" (bytes) and "N" (number) are allowed as keys in DynamoDB, and + // therefore in desired_type. + alternator_type _desired_type; +public: + virtual column_computation_ptr clone() const override; + // TYPE_NAME is a unique string that distinguishes this class from other + // column_computation subclasses. column_computation::deserialize() will + // construct an object of this subclass if it sees a "type" TYPE_NAME. + static inline const std::string TYPE_NAME = "alternator_extract_from_attrs"; + // Serialize the *definition* of this column computation into a JSON + // string with a unique "type" string - TYPE_NAME - which then causes + // column_computation::deserialize() to create an object from this class. + virtual bytes serialize() const override; + // Construct this object based on the previous output of serialize(). + // Calls on_internal_error() if the string doesn't match the output format + // of serialize(). "type" is not checked column_computation::deserialize() + // won't call this constructor if "type" doesn't match. + extract_from_attrs_column_computation(const rjson::value &v); + extract_from_attrs_column_computation(bytes_view attr_name, alternator_type desired_type) + : _attr_name(attr_name), _desired_type(desired_type) + {} + // Implement regular_column_transformation's compute_value() that + // accepts the full row: + result compute_value(const schema& schema, const partition_key& key, + const db::view::clustering_or_static_row& row) const override; + // But do not implement column_computation's compute_value() that + // accepts only a partition key - that's not enough so our implementation + // of this function does on_internal_error(). + bytes compute_value(const schema& schema, const partition_key& key) const override; + // This computed column does depend on a non-primary key column, so + // its result may change in the update and we need to compute it + // before and after the update. + virtual bool depends_on_non_primary_key_column() const override { + return true; + } +}; +} // namespace alternator diff --git a/alternator/serialization.cc b/alternator/serialization.cc index 1ab60feed7..dae6998a53 100644 --- a/alternator/serialization.cc +++ b/alternator/serialization.cc @@ -245,6 +245,27 @@ rjson::value deserialize_item(bytes_view bv) { return deserialized; } +// This function takes a bytes_view created earlier by serialize_item(), and +// if has the type "expected_type", the function returns the value as a +// raw Scylla type. If the type doesn't match, returns an unset optional. +// This function only supports the key types S (string), B (bytes) and N +// (number) - serialize_item() serializes those types as a single-byte type +// followed by the serialized raw Scylla type, so all this function needs to +// do is to remove the first byte. This makes this function much more +// efficient than deserialize_item() above because it avoids transformation +// to/from JSON. +std::optional serialized_value_if_type(bytes_view bv, alternator_type expected_type) { + if (bv.empty() || alternator_type(bv[0]) != expected_type) { + return std::nullopt; + } + // Currently, serialize_item() for types in alternator_type (notably S, B + // and N) are nothing more than Scylla's raw format for these types + // preceded by a type byte. So we just need to skip that byte and we are + // left by exactly what we need to return. + bv.remove_prefix(1); + return bytes(bv); +} + std::string type_to_string(data_type type) { static thread_local std::unordered_map types = { {utf8_type, "S"}, diff --git a/alternator/serialization.hh b/alternator/serialization.hh index 90d842cd22..ff4e94f007 100644 --- a/alternator/serialization.hh +++ b/alternator/serialization.hh @@ -43,6 +43,7 @@ type_representation represent_type(alternator_type atype); bytes serialize_item(const rjson::value& item); rjson::value deserialize_item(bytes_view bv); +std::optional serialized_value_if_type(bytes_view bv, alternator_type expected_type); std::string type_to_string(data_type type); diff --git a/configure.py b/configure.py index 85f635606b..cf6616fde9 100755 --- a/configure.py +++ b/configure.py @@ -1573,7 +1573,7 @@ deps['test/boost/linearizing_input_stream_test'] = [ "test/boost/linearizing_input_stream_test.cc", "test/lib/log.cc", ] -deps['test/boost/expr_test'] = ['test/boost/expr_test.cc', 'test/lib/expr_test_utils.cc'] + scylla_core +deps['test/boost/expr_test'] = ['test/boost/expr_test.cc', 'test/lib/expr_test_utils.cc'] + scylla_core + alternator deps['test/boost/rate_limiter_test'] = ['test/boost/rate_limiter_test.cc', 'db/rate_limiter.cc'] deps['test/boost/exceptions_optimized_test'] = ['test/boost/exceptions_optimized_test.cc', 'utils/exceptions.cc'] deps['test/boost/exceptions_fallback_test'] = ['test/boost/exceptions_fallback_test.cc', 'utils/exceptions.cc'] @@ -1590,8 +1590,8 @@ deps['test/raft/many_test'] = ['test/raft/many_test.cc', 'test/raft/replication. deps['test/raft/fsm_test'] = ['test/raft/fsm_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_raft_dependencies deps['test/raft/etcd_test'] = ['test/raft/etcd_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc'] + scylla_raft_dependencies deps['test/raft/raft_sys_table_storage_test'] = ['test/raft/raft_sys_table_storage_test.cc'] + \ - scylla_core + scylla_tests_generic_dependencies -deps['test/boost/address_map_test'] = ['test/boost/address_map_test.cc'] + scylla_core + scylla_core + alternator + scylla_tests_generic_dependencies +deps['test/boost/address_map_test'] = ['test/boost/address_map_test.cc'] + scylla_core + alternator deps['test/raft/discovery_test'] = ['test/raft/discovery_test.cc', 'test/raft/helpers.cc', 'test/lib/log.cc', diff --git a/db/view/regular_column_transformation.hh b/db/view/regular_column_transformation.hh new file mode 100644 index 0000000000..33f8de3a6c --- /dev/null +++ b/db/view/regular_column_transformation.hh @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include "column_computation.hh" +#include "mutation/atomic_cell.hh" +#include "timestamp.hh" +#include + +class row_marker; + +// In a basic column_computation defined in column_computation.hh, the +// compute_value() method is only based on the partition key, and it must +// return a value. That API has very limited applications - basically the +// only thing we can implement with it is token_column_computation which +// we used to create the token column in secondary indexes. +// The regular_column_transformation base class here is more powerful, but +// still is not a completely general computation: Its compute_value() virtual +// method can transform the value read from a single cell of a regular column +// into a new cell stored in a structure regular_column_transformation::result. +// +// In more details, the assumptions of regular_column_transformation is: +// 1. compute_value() computes the value based on a *single* column in a +// row passed to compute_value(). +// This assumption means that the value or deletion of the value always +// has a single known timestamp (and the value can't be half-missing) +// and single TTL information. That would not have been possible if we +// allowed the computation to depend on multiple columns. +// 2. compute_value() computes the value based on a *regular* column in the +// base table. This means that an update can modify this value (unlike a +// base-table key column that can't change in an update), so the view +// update code needs to compute the value before and after the update, +// and potentially delete and create view rows. +// 3. compute_value() returns a column_computation::result which includes +// a value and its liveness information (timestamp and ttl/expiry) or +// is missing a value. + +class regular_column_transformation : public column_computation { +public: + struct result { + // We can use "bytes" instead of "managed_bytes" here because we know + // that a column_computation is only used for generating a key value, + // and that is limited to 64K. This limitation is enforced below - + // we never linearize a cell's value if its size is more than 64K. + std::optional _value; + + // _ttl and _expiry are only defined if _value is set. + // The default values below are used when the source cell does not + // expire, and are the same values that row_marker uses for a non- + // expiring marker. This is useful when creating a row_marker from + // get_ttl() and get_expiry(). + gc_clock::duration _ttl { 0 }; + gc_clock::time_point _expiry { gc_clock::duration(0) }; + + // _ts may be set even if _value is missing, which can remember the + // timestamp of a tombstone. Note that the current view-update code + // that uses this class doesn't use _ts when _value is missing. + api::timestamp_type _ts = api::missing_timestamp; + + api::timestamp_type get_ts() const { + return _ts; + } + + bool has_value() const { + return _value.has_value(); + } + + // Should only be called if has_value() is true: + const bytes& get_value() const { + return *_value; + } + gc_clock::duration get_ttl() const { + return _ttl; + } + gc_clock::time_point get_expiry() const { + return _expiry; + } + + // A missing computation result + result() { } + + // Construct a computation result by copying a given atomic_cell - + // including its value, timestamp, and ttl - or deletion timestamp. + // The second parameter is an optional transformation function f - + // taking a bytes and returning an optional - that transforms + // the value of the cell but keeps its other liveness information. + // If f returns a nullopt, it causes the view row should be deleted. + template + requires std::invocable && std::convertible_to, std::optional> + result(atomic_cell_view cell, Func f = {}) { + _ts = cell.timestamp(); + if (cell.is_live()) { + // If the cell is larger than what a key can hold (64KB), + // return a missing value. This lets us skip this item during + // view building and avoid hanging the view build as described + // in #8627. But it doesn't prevent later inserting such a item + // to the base table, nor does it implement front-end specific + // limits (such as Alternator's 1K or 2K limits - see #10347). + // Those stricter limits should be validated in the base-table + // write code, not here - deep inside the view update code. + // Note also we assume that f() doesn't grow the value further. + if (cell.value().size() >= 65536) { + return; + } + _value = f(to_bytes(cell.value())); + if (_value) { + if (cell.is_live_and_has_ttl()) { + _ttl = cell.ttl(); + _expiry = cell.expiry(); + } + } + } + } + }; + + virtual ~regular_column_transformation() = default; + virtual result compute_value( + const schema& schema, + const partition_key& key, + const db::view::clustering_or_static_row& row) const = 0; + }; diff --git a/db/view/view.cc b/db/view/view.cc index 566063efa7..069d49ab29 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -37,6 +37,7 @@ #include "db/view/view_builder.hh" #include "db/view/view_updating_consumer.hh" #include "db/view/view_update_generator.hh" +#include "db/view/regular_column_transformation.hh" #include "db/system_keyspace_view_types.hh" #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" @@ -508,79 +509,6 @@ size_t view_updates::op_count() const { return _op_count; } -row_marker view_updates::compute_row_marker(const clustering_or_static_row& base_row) const { - /* - * We need to compute both the timestamp and expiration for view rows. - * - * Below there are several distinct cases depending on how many new key - * columns the view has - i.e., how many of the view's key columns were - * regular columns in the base. base_regular_columns_in_view_pk.size(): - * - * Zero new key columns: - * The view rows key is composed only from base key columns, and those - * cannot be changed in an update, so the view row remains alive as - * long as the base row is alive. We need to return the same row - * marker as the base for the view - to keep an empty view row alive - * for as long as an empty base row exists. - * Note that in this case, if there are *unselected* base columns, we - * may need to keep an empty view row alive even without a row marker - * because the base row (which has additional columns) is still alive. - * For that we have the "virtual columns" feature: In the zero new - * key columns case, we put unselected columns in the view as empty - * columns, to keep the view row alive. - * - * One new key column: - * In this case, there is a regular base column that is part of the - * view key. This regular column can be added or deleted in an update, - * or its expiration be set, and those can cause the view row - - * including its row marker - to need to appear or disappear as well. - * So the liveness of cell of this one column determines the liveness - * of the view row and the row marker that we return. - * - * Two or more new key columns: - * This case is explicitly NOT supported in CQL - one cannot create a - * view with more than one base-regular columns in its key. In general - * picking one liveness (timestamp and expiration) is not possible - * if there are multiple regular base columns in the view key, as - * those can have different liveness. - * However, we do allow this case for Alternator - we need to allow - * the case of two (but not more) because the DynamoDB API allows - * creating a GSI whose two key columns (hash and range key) were - * regular columns. - * We can support this case in Alternator because it doesn't use - * expiration (the "TTL" it does support is different), and doesn't - * support user-defined timestamps. But, the two columns can still - * have different timestamps - this happens if an update modifies - * just one of them. In this case the timestamp of the view update - * (and that of the row marker we return) is the later of these two - * updated columns. - */ - const auto& col_ids = base_row.is_clustering_row() - ? _base_info->base_regular_columns_in_view_pk() - : _base_info->base_static_columns_in_view_pk(); - if (!col_ids.empty()) { - auto& def = _base->column_at(base_row.column_kind(), col_ids[0]); - // Note: multi-cell columns can't be part of the primary key. - auto cell = base_row.cells().cell_at(col_ids[0]).as_atomic_cell(def); - auto ts = cell.timestamp(); - if (col_ids.size() > 1){ - // As explained above, this case only happens in Alternator, - // and we may need to pick a higher ts: - auto& second_def = _base->column_at(base_row.column_kind(), col_ids[1]); - auto second_cell = base_row.cells().cell_at(col_ids[1]).as_atomic_cell(second_def); - auto second_ts = second_cell.timestamp(); - ts = std::max(ts, second_ts); - // Alternator isn't supposed to have TTL or more than two col_ids! - if (col_ids.size() != 2 || cell.is_live_and_has_ttl() || second_cell.is_live_and_has_ttl()) [[unlikely]] { - utils::on_internal_error(format("Unexpected col_ids length {} or has TTL", col_ids.size())); - } - } - return cell.is_live_and_has_ttl() ? row_marker(ts, cell.ttl(), cell.expiry()) : row_marker(ts); - } - - return base_row.marker(); -} - namespace { // The following struct is identical to view_key_with_action, except the key // is stored as a managed_bytes_view instead of bytes. @@ -656,8 +584,8 @@ public: return {_update.key()->get_component(_base, base_col->position())}; default: if (base_col->kind != _update.column_kind()) { - on_internal_error(vlogger, format("Tried to get a {} column from a {} row update, which is impossible", - to_sstring(base_col->kind), _update.is_clustering_row() ? "clustering" : "static")); + on_internal_error(vlogger, format("Tried to get a {} column {} from a {} row update, which is impossible", + to_sstring(base_col->kind), base_col->name_as_text(), _update.is_clustering_row() ? "clustering" : "static")); } auto& c = _update.cells().cell_at(base_col->id); auto value_view = base_col->is_atomic() ? c.as_atomic_cell(cdef).value() : c.as_collection_mutation().data; @@ -678,6 +606,22 @@ private: return handle_collection_column_computation(collection_computation); } + // TODO: we already calculated this computation in updatable_view_key_cols, + // so perhaps we should pass it here and not re-compute it. But this will + // mean computed columns will only work for view key columns (currently + // we assume that anyway) + if (auto* c = dynamic_cast(&computation)) { + regular_column_transformation::result after = + c->compute_value(_base, _base_key, _update); + if (after.has_value()) { + return {managed_bytes_view(linearized_values.emplace_back(after.get_value()))}; + } + // We only get to this function when we know the _update row + // exists and call it to read its key columns, so we don't expect + // to see a missing value for any of those columns + on_internal_error(vlogger, fmt::format("unexpected call to handle_computed_column {} missing in update", cdef.name_as_text())); + } + auto computed_value = computation.compute_value(_base, _base_key); return {managed_bytes_view(linearized_values.emplace_back(std::move(computed_value)))}; } @@ -729,7 +673,6 @@ view_updates::get_view_rows(const partition_key& base_key, const clustering_or_s if (partition.partition_tombstone() && partition.partition_tombstone() == row_delete_tomb.tomb()) { return; } - ret.push_back({&partition.clustered_row(*_view, std::move(ckey)), action}); }; @@ -936,13 +879,12 @@ static void add_cells_to_view(const schema& base, const schema& view, column_kin * Creates a view entry corresponding to the provided base row. * This method checks that the base row does match the view filter before applying anything. */ -void view_updates::create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now) { +void view_updates::create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now, row_marker update_marker) { if (!matches_view_filter(db, *_base, _view_info, base_key, update, now)) { return; } auto view_rows = get_view_rows(base_key, update, std::nullopt, {}); - auto update_marker = compute_row_marker(update); const auto kind = update.column_kind(); for (const auto& [r, action]: view_rows) { if (auto rm = std::get_if(&action)) { @@ -960,48 +902,28 @@ void view_updates::create_entry(data_dictionary::database db, const partition_ke * Deletes the view entry corresponding to the provided base row. * This method checks that the base row does match the view filter before bothering. */ -void view_updates::delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now) { +void view_updates::delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now, api::timestamp_type deletion_ts) { // Before deleting an old entry, make sure it was matching the view filter // (otherwise there is nothing to delete) if (matches_view_filter(db, *_base, _view_info, base_key, existing, now)) { - do_delete_old_entry(base_key, existing, update, now); + do_delete_old_entry(base_key, existing, update, now, deletion_ts); } } -void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now) { +void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now, api::timestamp_type deletion_ts) { auto view_rows = get_view_rows(base_key, existing, std::nullopt, update.tomb()); const auto kind = existing.column_kind(); for (const auto& [r, action] : view_rows) { const auto& col_ids = existing.is_clustering_row() ? _base_info->base_regular_columns_in_view_pk() : _base_info->base_static_columns_in_view_pk(); - if (_view_info.has_computed_column_depending_on_base_non_primary_key()) { - if (auto ts_tag = std::get_if(&action)) { - r->apply(ts_tag->into_shadowable_tombstone(now)); - } - } else if (!col_ids.empty()) { - // We delete the old row using a shadowable row tombstone, making sure that - // the tombstone deletes everything in the row (or it might still show up). - // Note: multi-cell columns can't be part of the primary key. - auto& def = _base->column_at(kind, col_ids[0]); - auto cell = existing.cells().cell_at(col_ids[0]).as_atomic_cell(def); - auto ts = cell.timestamp(); - if (col_ids.size() > 1) { - // This is the Alternator-only support for two regular base - // columns that become view key columns. See explanation in - // view_updates::compute_row_marker(). - auto& second_def = _base->column_at(kind, col_ids[1]); - auto second_cell = existing.cells().cell_at(col_ids[1]).as_atomic_cell(second_def); - auto second_ts = second_cell.timestamp(); - ts = std::max(ts, second_ts); - // Alternator isn't supposed to have more than two col_ids! - if (col_ids.size() != 2) [[unlikely]] { - utils::on_internal_error(format("Unexpected col_ids length {}", col_ids.size())); - } - } - if (cell.is_live()) { - r->apply(shadowable_tombstone(ts, now)); - } + if (!col_ids.empty() || _view_info.has_computed_column_depending_on_base_non_primary_key()) { + // The view key could have been modified because it contains or + // depends on a non-primary-key. The fact that this function was + // called instead of update_entry() means the caller knows it + // wants to delete the old row (with the given deletion_ts) and + // will create a different one. So let's honor this. + r->apply(shadowable_tombstone(deletion_ts, now)); } else { // "update" caused the base row to have been deleted, and !col_id // means view row is the same - so it needs to be deleted as well @@ -1102,15 +1024,15 @@ bool view_updates::can_skip_view_updates(const clustering_or_static_row& update, * This method checks that the base row (before and after) matches the view filter before * applying anything. */ -void view_updates::update_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now) { +void view_updates::update_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now, row_marker update_marker) { // While we know update and existing correspond to the same view entry, // they may not match the view filter. if (!matches_view_filter(db, *_base, _view_info, base_key, existing, now)) { - create_entry(db, base_key, update, now); + create_entry(db, base_key, update, now, update_marker); return; } if (!matches_view_filter(db, *_base, _view_info, base_key, update, now)) { - do_delete_old_entry(base_key, existing, update, now); + do_delete_old_entry(base_key, existing, update, now, update_marker.timestamp()); return; } @@ -1119,7 +1041,7 @@ void view_updates::update_entry(data_dictionary::database db, const partition_ke } auto view_rows = get_view_rows(base_key, update, std::nullopt, {}); - auto update_marker = compute_row_marker(update); + const auto kind = update.column_kind(); for (const auto& [r, action] : view_rows) { if (auto rm = std::get_if(&action)) { @@ -1135,6 +1057,8 @@ void view_updates::update_entry(data_dictionary::database db, const partition_ke _op_count += view_rows.size(); } +// Note: despite the general-sounding name of this function, it is used +// just for the case of collection indexing. void view_updates::update_entry_for_computed_column( const partition_key& base_key, const clustering_or_static_row& update, @@ -1157,30 +1081,72 @@ void view_updates::update_entry_for_computed_column( } } +// view_updates::generate_update() is the main function for taking an update +// to a base table row - consisting of existing and updated versions of row - +// and creating from it zero or more updates to a given materialized view. +// These view updates may consist of updating an existing view row, deleting +// an old view row, and/or creating a new view row. +// There are several distinct cases depending on how many of the view's key +// columns are "new key columns", i.e., were regular key columns in the base +// or are a computed column based on a regular column (these computed columns +// are used by, for example, Alternator's GSI): +// +// Zero new key columns: +// The view rows key is composed only from base key columns, and those can't +// be changed in an update, so the view row remains alive as long as the +// base row is alive. The row marker for the view needs to be set to the +// same row marker in the base - to keep an empty view row alive for as long +// as an empty base row exists. +// Note that in this case, if there are *unselected* base columns, we may +// need to keep an empty view row alive even without a row marker because +// the base row (which has additional columns) is still alive. For that we +// have the "virtual columns" feature: In the zero new key columns case, we +// put unselected columns in the view as empty columns, to keep the view +// row alive. +// +// One new key column: +// In this case, there is a regular base column that is part of the view +// key. This regular column can be added or deleted in an update, or its +// expiration be set, and those can cause the view row - including its row +// marker - to need to appear or disappear as well. So the liveness of cell +// of this one column determines the liveness of the view row and the row +// marker that we set for it. +// +// Two or more new key columns: +// This case is explicitly NOT supported in CQL - one cannot create a view +// with more than one base-regular columns in its key. In general picking +// one liveness (timestamp and expiration) is not possible if there are +// multiple regular base columns in the view key, asthose can have different +// liveness. +// However, we do allow this case for Alternator - we need to allow the case +// of two (but not more) because the DynamoDB API allows creating a GSI +// whose two key columns (hash and range key) were regular columns. We can +// support this case in Alternator because it doesn't use expiration (the +// "TTL" it does support is different), and doesn't support user-defined +// timestamps. But, the two columns can still have different timestamps - +// this happens if an update modifies just one of them. In this case the +// timestamp of the view update (and that of the row marker) is the later +// of these two updated columns. void view_updates::generate_update( data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const std::optional& existing, gc_clock::time_point now) { - - // Note that the base PK columns in update and existing are the same, since we're intrinsically dealing - // with the same base row. So we have to check 3 things: - // 1) that the clustering key doesn't have a null, which can happen for compact tables. If that's the case, - // there is no corresponding entries. - // 2) if there is a column not part of the base PK in the view PK, whether it is changed by the update. - // 3) whether the update actually matches the view SELECT filter - + // FIXME: The following if() is old code which may be related to COMPACT + // STORAGE. If this is a real case, refer to a test that demonstrates it. + // If it's not a real case, remove this if(). if (update.is_clustering_row()) { if (!update.key()->is_full(*_base)) { return; } } - - if (_view_info.has_computed_column_depending_on_base_non_primary_key()) { - return update_entry_for_computed_column(base_key, update, existing, now); - } - if (!_base_info->has_base_non_pk_columns_in_view_pk) { + // If the view key depends on any regular column in the base, the update + // may change the view key and may require deleting an old view row and + // inserting a new row. The other case, which we'll handle here first, + // is easier and require just modifying one view row. + if (!_base_info->has_base_non_pk_columns_in_view_pk && + !_view_info.has_computed_column_depending_on_base_non_primary_key()) { if (update.is_static_row()) { // TODO: support static rows in views with pk only including columns from base pk return; @@ -1188,85 +1154,186 @@ void view_updates::generate_update( // The view key is necessarily the same pre and post update. if (existing && existing->is_live(*_base)) { if (update.is_live(*_base)) { - update_entry(db, base_key, update, *existing, now); + update_entry(db, base_key, update, *existing, now, update.marker()); } else { - delete_old_entry(db, base_key, *existing, update, now); + delete_old_entry(db, base_key, *existing, update, now, api::missing_timestamp); } } else if (update.is_live(*_base)) { - create_entry(db, base_key, update, now); + create_entry(db, base_key, update, now, update.marker()); } return; } - const auto& col_ids = update.is_clustering_row() - ? _base_info->base_regular_columns_in_view_pk() - : _base_info->base_static_columns_in_view_pk(); - - // The view has a non-primary-key column from the base table as its primary key. - // That means it's either a regular or static column. If we are currently - // processing an update which does not correspond to the column's kind, - // just stop here. - if (col_ids.empty()) { + // Find the view key columns that may be changed by an update. + // This case is interesting because a change to the view key means that + // we may need to delete an old view row and/or create a new view row. + // The columns we look for are view key columns that are neither base key + // columns nor computed columns based just on key columns. In other words, + // we look here for columns which were regular columns or static columns + // in the base table, or computed columns based on regular columns. + struct updatable_view_key_col { + column_id view_col_id; + regular_column_transformation::result before; + regular_column_transformation::result after; + }; + std::vector updatable_view_key_cols; + for (const column_definition& view_col : _view->primary_key_columns()) { + if (view_col.is_computed()) { + const column_computation& computation = view_col.get_computation(); + if (computation.depends_on_non_primary_key_column()) { + // Column is a computed column that does not depend just on + // the base key, so it may change in the update. + if (auto* c = dynamic_cast(&computation)) { + updatable_view_key_cols.emplace_back(view_col.id, + existing ? c->compute_value(*_base, base_key, *existing) : regular_column_transformation::result(), + c->compute_value(*_base, base_key, update)); + } else { + // The only other column_computation we have which has + // depends_on_non_primary_key_column is + // collection_column_computation, and we have a special + // function to handle that case: + return update_entry_for_computed_column(base_key, update, existing, now); + } + } + } else { + const column_definition* base_col = _base->get_column_definition(view_col.name()); + if (!base_col) { + on_internal_error(vlogger, fmt::format("Column {} in view {}.{} was not found in the base table {}.{}", + view_col.name(), _view->ks_name(), _view->cf_name(), _base->ks_name(), _base->cf_name())); + } + // If the view key column was also a base primary key column, then + // it can't possibly change in this update. But the column was not + // not a primary key column - i.e., a regular column or static + // column, the update might have changed it and we need to list it + // on updatable_view_key_cols. + // We check base_col->kind == update.column_kind() instead of just + // !base_col->is_primary_key() because when update is a static row + // we know it can't possibly update a regular column (and vice + // versa). + if (base_col->kind == update.column_kind()) { + // This is view key, so we know it is atomic + std::optional after; + auto afterp = update.cells().find_cell(base_col->id); + if (afterp) { + after = afterp->as_atomic_cell(*base_col); + } + std::optional before; + if (existing) { + auto beforep = existing->cells().find_cell(base_col->id); + if (beforep) { + before = beforep->as_atomic_cell(*base_col); + } + } + updatable_view_key_cols.emplace_back(view_col.id, + before ? regular_column_transformation::result(*before) : regular_column_transformation::result(), + after ? regular_column_transformation::result(*after) : regular_column_transformation::result()); + } + } + } + // If we reached here, the view has a non-primary-key column from the base + // table as its primary key. That means it's either a regular or static + // column. If we are currently processing an update which does not + // correspond to the column's kind, updatable_view_key_cols will be empty + // and we can just stop here. + if (updatable_view_key_cols.empty()) { return; } - const auto kind = update.column_kind(); - - // If one of the key columns is missing, set has_new_row = false - // meaning that after the update there will be no view row. - // If one of the key columns is missing in the existing value, - // set has_old_row = false meaning we don't have an old row to - // delete. + // Use updatable_view_key_cols - the before and after values of the + // view key columns that may have changed - to determine if the update + // changes an existing view row, deletes an old row or creates a new row. bool has_old_row = true; bool has_new_row = true; - bool same_row = true; - for (auto col_id : col_ids) { - auto* after = update.cells().find_cell(col_id); - auto& cdef = _base->column_at(kind, col_id); - if (existing) { - auto* before = existing->cells().find_cell(col_id); - // Note that this cell is necessarily atomic, because col_ids are - // view key columns, and keys must be atomic. - if (before && before->as_atomic_cell(cdef).is_live()) { - if (after && after->as_atomic_cell(cdef).is_live()) { - // We need to compare just the values of the keys, not - // metadata like the timestamp. This is because below, - // if the old and new view row have the same key, we need - // to be sure to reach the update_entry() case. - auto cmp = compare_unsigned(before->as_atomic_cell(cdef).value(), after->as_atomic_cell(cdef).value()); - if (cmp != 0) { - same_row = false; - } + bool same_row = true; // undefined if either has_old_row or has_new_row are false + for (const auto& u : updatable_view_key_cols) { + if (u.before.has_value()) { + if (u.after.has_value()) { + if (compare_unsigned(u.before.get_value(), u.after.get_value()) != 0) { + same_row = false; } } else { - has_old_row = false; + has_new_row = false; } } else { has_old_row = false; - } - if (!after || !after->as_atomic_cell(cdef).is_live()) { - has_new_row = false; + if (!u.after.has_value()) { + has_new_row = false; + } } } + + // If has_new_row, calculate a row marker for this view row - i.e., a + // timestamp and ttl - based on those of the updatable view key column + // (or, in an Alternator-only extension, more than one). + row_marker new_row_rm; // only set if has_new_row + if (has_new_row) { + // Note: + // 1. By reaching here we know that updatable_view_key_cols has at + // least one member (in CQL, it's always one, in Alternator it + // may be two). + // 2. Because has_new_row, we know all elements in that array have + // after.has_value() true, so we can use after.get_ts() et al. + api::timestamp_type new_row_ts = updatable_view_key_cols[0].after.get_ts(); + // This is the Alternator-only support for *two* regular base columns + // that become view key columns. The timestamp we use is the *maximum* + // of the two key columns, as explained in pull-request #17172. + if (updatable_view_key_cols.size() > 1) { + auto second_ts = updatable_view_key_cols[1].after.get_ts(); + new_row_ts = std::max(new_row_ts, second_ts); + // Alternator isn't supposed to have more than two updatable view key columns! + if (updatable_view_key_cols.size() != 2) [[unlikely]] { + utils::on_internal_error(format("Unexpected updatable_view_key_col length {}", updatable_view_key_cols.size())); + } + } + // We assume that either updatable_view_key_cols has just one column + // (the only situation allowed in CQL) or if there is more then one + // they have the same expiry information (in Alternator, there is + // never a CQL TTL set). + new_row_rm = row_marker(new_row_ts, updatable_view_key_cols[0].after.get_ttl(), updatable_view_key_cols[0].after.get_expiry()); + } + if (has_old_row) { + // As explained in #19977, when there is one updatable_view_key_cols + // (the only case allowed in CQL) the deletion timestamp is before's + // timestamp. As explained in #17119, if there are two of them (only + // possible in Alternator), we take the maximum. + // Note: + // 1. By reaching here we know that updatable_view_key_cols has at + // least one member (in CQL, it's always one, in Alternator it + // may be two). + // 2. Because has_old_row, we know all elements in that array have + // before.has_value() true, so we can use before.get_ts(). + auto old_row_ts = updatable_view_key_cols[0].before.get_ts(); + if (updatable_view_key_cols.size() > 1) { + // This is the Alternator-only support for two regular base + // columns that become view key columns. See explanation in + // view_updates::compute_row_marker(). + auto second_ts = updatable_view_key_cols[1].before.get_ts(); + old_row_ts = std::max(old_row_ts, second_ts); + // Alternator isn't supposed to have more than two updatable view key columns! + if (updatable_view_key_cols.size() != 2) [[unlikely]] { + utils::on_internal_error(format("Unexpected updatable_view_key_col length {}", updatable_view_key_cols.size())); + } + } if (has_new_row) { if (same_row) { - update_entry(db, base_key, update, *existing, now); + update_entry(db, base_key, update, *existing, now, new_row_rm); } else { - // This code doesn't work if the old and new view row have the - // same key, because if they do we get both data and tombstone - // for the same timestamp (now) and the tombstone wins. This - // is why we need the "same_row" case above - it's not just a - // performance optimization. - delete_old_entry(db, base_key, *existing, update, now); - create_entry(db, base_key, update, now); + // The following code doesn't work if the old and new view row + // have the same key, because if they do we can get both data + // and tombstone for the same timestamp and the tombstone + // wins. This is why we need the "same_row" case above - it's + // not just a performance optimization. + delete_old_entry(db, base_key, *existing, update, now, old_row_ts); + create_entry(db, base_key, update, now, new_row_rm); } } else { - delete_old_entry(db, base_key, *existing, update, now); + delete_old_entry(db, base_key, *existing, update, now, old_row_ts); } } else if (has_new_row) { - create_entry(db, base_key, update, now); + create_entry(db, base_key, update, now, new_row_rm); } + } bool view_updates::is_partition_key_permutation_of_base_partition_key() const { diff --git a/db/view/view.hh b/db/view/view.hh index 59a17d5174..34f1862afc 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -240,10 +240,10 @@ private: }; std::vector get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional& existing, row_tombstone update_tomb); bool can_skip_view_updates(const clustering_or_static_row& update, const clustering_or_static_row& existing) const; - void create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now); - void delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now); - void do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now); - void update_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now); + void create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now, row_marker update_marker); + void delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now, api::timestamp_type deletion_ts); + void do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now, api::timestamp_type deletion_ts); + void update_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now, row_marker update_marker); void update_entry_for_computed_column(const partition_key& base_key, const clustering_or_static_row& update, const std::optional& existing, gc_clock::time_point now); }; diff --git a/docs/alternator/compatibility.md b/docs/alternator/compatibility.md index 3c1d3f0d1c..3965aa3784 100644 --- a/docs/alternator/compatibility.md +++ b/docs/alternator/compatibility.md @@ -272,12 +272,6 @@ behave the same in Alternator. However, there are a few features which we have not implemented yet. Unimplemented features return an error when used, so they should be easy to detect. Here is a list of these unimplemented features: -* Currently in Alternator, a GSI (Global Secondary Index) can only be added - to a table at table creation time. DynamoDB allows adding a GSI (but not an - LSI) to an existing table using an UpdateTable operation, and similarly it - allows removing a GSI from a table. - - * GSI (Global Secondary Index) and LSI (Local Secondary Index) may be configured to project only a subset of the base-table attributes to the index. This option is not yet respected by Alternator - all attributes @@ -378,3 +372,14 @@ they should be easy to detect. Here is a list of these unimplemented features: that can be used to forbid table deletion. This table option was added to DynamoDB in March 2023. + +* Alternator does not support the table option WarmThroughput that can be + used to check or guarantee that the database has "warmed" to handle a + particular throughput. This table option was added to DynamoDB in + November 2024. + + +* Alternator does not support the table option MultiRegionConsistency + that can be used to achieve consistent reads on global (multi-region) tables. + This table option was added as a preview to DynamoDB in December 2024. + diff --git a/mutation/mutation_partition.cc b/mutation/mutation_partition.cc index 9c891f7664..68183e0c9a 100644 --- a/mutation/mutation_partition.cc +++ b/mutation/mutation_partition.cc @@ -1591,7 +1591,7 @@ void row::apply_monotonically(const schema& our_schema, const schema& their_sche // we erase the live cells according to the shadowable_tombstone rules. static bool dead_marker_shadows_row(const schema& s, column_kind kind, const row_marker& marker) { return s.is_view() - && s.view_info()->has_base_non_pk_columns_in_view_pk() + && (s.view_info()->has_base_non_pk_columns_in_view_pk() || s.view_info()->has_computed_column_depending_on_base_non_primary_key()) && !marker.is_live() && kind == column_kind::regular_column; // not applicable to static rows } diff --git a/schema/CMakeLists.txt b/schema/CMakeLists.txt index 14688f7b7a..ae8c4b84b7 100644 --- a/schema/CMakeLists.txt +++ b/schema/CMakeLists.txt @@ -13,7 +13,9 @@ target_link_libraries(schema idl Seastar::seastar xxHash::xxhash - absl::headers) + absl::headers + PRIVATE + alternator) check_headers(check-headers schema GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh) diff --git a/schema/schema.cc b/schema/schema.cc index 3073681f19..39ed944239 100644 --- a/schema/schema.cc +++ b/schema/schema.cc @@ -36,6 +36,7 @@ #include "index/target_parser.hh" #include "utils/hashing.hh" #include "utils/hashers.hh" +#include "alternator/extract_from_attrs.hh" #include @@ -2090,6 +2091,9 @@ column_computation_ptr column_computation::deserialize(bytes_view raw) { } } } + if (type == alternator::extract_from_attrs_column_computation::TYPE_NAME) { + return std::make_unique(parsed); + } throw std::runtime_error(format("Incorrect column computation type {} found when parsing {}", *type_json, parsed)); } diff --git a/test/alternator/test_cql_rbac.py b/test/alternator/test_cql_rbac.py index b7a7906206..097b09708a 100644 --- a/test/alternator/test_cql_rbac.py +++ b/test/alternator/test_cql_rbac.py @@ -655,7 +655,6 @@ def test_rbac_updatetable(dynamodb, cql): # to create a completely separate and independent table. # Below we also have two additional tests for auto-grant when creating a GSI, # and auto-revoke when deleting it. -@pytest.mark.xfail(reason="#11567") def test_rbac_updatetable_gsi(dynamodb, cql): schema = { 'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ], @@ -713,7 +712,6 @@ def test_rbac_updatetable_gsi(dynamodb, cql): # Test the "autogrant" feature of UpdateTable's GSI Create feature: If a role # is allowed to create a GSI, this role is automatically given full (SELECT) # permissions to the newly-created GSI. -@pytest.mark.xfail(reason="#11567") def test_rbac_updatetable_gsi_autogrant(dynamodb, cql): schema = { 'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ], @@ -760,7 +758,6 @@ def test_rbac_updatetable_gsi_autogrant(dynamodb, cql): # on the deleted GSI are revoked. If we forgot to do this revocation, it is # possible for role1 to create a GSI, delete it, and then role2 creates a # different GSI with the same name and role1 might be able to read it. -@pytest.mark.xfail(reason="#11567") def test_rbac_updatetable_gsi_autorevoke(dynamodb, cql): schema = { 'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ], diff --git a/test/alternator/test_gsi.py b/test/alternator/test_gsi.py index cfbbc769eb..42534e3b99 100644 --- a/test/alternator/test_gsi.py +++ b/test/alternator/test_gsi.py @@ -15,7 +15,7 @@ import pytest import time from botocore.exceptions import ClientError -from test.alternator.util import create_test_table, random_string, random_bytes, full_scan, full_query, multiset, list_tables, new_test_table +from .util import create_test_table, random_string, random_bytes, full_scan, full_query, multiset, list_tables, new_test_table, wait_for_gsi # GSIs only support eventually consistent reads, so tests that involve # writing to a table and then expect to read something from it cannot be @@ -261,8 +261,16 @@ def test_gsi_describe(test_table_gsi_1): # backfilled might be in other states, but that case is tested in different # tests in test_gsi_updatetable.py. # Reproduces #11471. -@pytest.mark.xfail(reason="issue #11471") def test_gsi_describe_indexstatus(test_table_gsi_1): + # In DynamoDB, a GSI created together with the table is always immediately + # ACTIVE, but this is not always true in Alternator: Although a new table + # is completely empty and its "view building" phase has nothing to do, + # this "nothing" can still take a short while (especially in debug builds) + # and in the mean time the test might see the CREATING state and be flaky. + # So let's wait_for_gsi() just to be sure the view building is over. + # Note that this makes the explicit IndexStatus check below redundant, + # because wait_for_gsi() already does it.. + wait_for_gsi(test_table_gsi_1, 'hello') desc = test_table_gsi_1.meta.client.describe_table(TableName=test_table_gsi_1.name) gsis = desc['Table']['GlobalSecondaryIndexes'] assert len(gsis) == 1 @@ -607,6 +615,122 @@ def test_gsi_wrong_type_attribute_batch(test_table_gsi_2): for p in [p1, p2, p3]: assert not 'Item' in test_table_gsi_2.get_item(Key={'p': p}, ConsistentRead=True) +# Test when a table has a GSI, if the indexed attribute is a partition key +# in the GSI and its value is 2048 bytes, the update operation is rejected, +# and is added to neither base table nor index. DynamoDB limits partition +# keys to that length (see test_limits.py::test_limit_partition_key_len_2048) +# so wants to limit the GSI keys as well. +# Note that in test_gsi_updatetable.py we have a similar test for when adding +# a pre-existing table. In that case we can't reject the base-table update +# because the oversized attribute is already there - but can just drop this +# item from the GSI. +@pytest.mark.xfail(reason="issue #10347: key length limits not enforced") +def test_gsi_limit_partition_key_len_2048(test_table_gsi_2): + # A value for 'x' (the GSI's partition key) of length 2048 is fine: + p = random_string() + x = 'a'*2048 + test_table_gsi_2.put_item(Item={'p': p, 'x': x}) + assert_index_query(test_table_gsi_2, 'hello', [{'p': p, 'x': x}], + KeyConditions={ + 'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}}) + # PutItem with oversized for 'x' is rejected, item isn't created even + # in the base table. + p = random_string() + x = 'a'*2049 + with pytest.raises(ClientError, match='ValidationException.*2048'): + test_table_gsi_2.put_item(Item={'p': p, 'x': x}) + assert not 'Item' in test_table_gsi_2.get_item(Key={'p': p}, ConsistentRead=True) + +# This is a variant of the above test, where we don't insist that the +# partition key length limit must be exactly 2048 bytes as in DynamoDB, +# but that it be *at least* 2408. I.e., we verify that 2048-byte values +# are allowed for GSI partition keys, while very long keys that surpass +# Scylla's low-level key-length limit (64 KB) are forbidden with an +# appropriate error message and not an "internal server error". This test +# should pass even if Alternator decides to adopt a different key length +# limits from DynamoDB. We do have to adopt *some* limit because the +# internal Scylla implementation has a 64 KB limit on key lengths. +@pytest.mark.xfail(reason="issue #10347: key length limits not enforced") +def test_gsi_limit_partition_key_len(test_table_gsi_2): + # A value for 'x' (the GSI's partition key) of length 2048 is fine: + p = random_string() + x = 'a'*2048 + test_table_gsi_2.put_item(Item={'p': p, 'x': x}) + assert_index_query(test_table_gsi_2, 'hello', [{'p': p, 'x': x}], + KeyConditions={ + 'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}}) + # Attribute, that is a GSI partition key, of length 64 KB + 1 is forbidden: + # it obviously exceeds DynamoDB's limit (2048 bytes), but also exceeds + # Scylla's internal limit on key length (64 KB - 1). We except to get a + # reasonable error on request validation - not some "internal server error". + # We actually used to get this "internal server error" for 64 KB - 2 + # (this is probably related to issue #16772). + p = random_string() + x = 'a'*65536 + with pytest.raises(ClientError, match='ValidationException.*limit'): + test_table_gsi_2.put_item(Item={'p': p, 'x': x}) + assert not 'Item' in test_table_gsi_2.get_item(Key={'p': p}, ConsistentRead=True) + +# Test when a table has a GSI, if the indexed attribute is a partition key +# in the GSI and its value is 1024 bytes, the update operation is rejected, +# and is added to neither base table nor index. DynamoDB limits partition +# keys to that length (see test_limits.py::test_limit_partition_key_len_1024) +# so wants to limit the GSI keys as well. +# Note that in test_gsi_updatetable.py we have a similar test for when adding +# a pre-existing table. In that case we can't reject the base-table update +# because the oversized attribute is already there - but can just drop this +# item from the GSI. +@pytest.mark.xfail(reason="issue #10347: key length limits not enforced") +def test_gsi_limit_sort_key_len_1024(test_table_gsi_5): + # A value for 'x' (the GSI's partition key) of length 1024 is fine: + p = random_string() + c = random_string() + x = 'a'*1024 + test_table_gsi_5.put_item(Item={'p': p, 'c': c, 'x': x}) + assert_index_query(test_table_gsi_5, 'hello', [{'p': p, 'c': c, 'x': x}], + KeyConditions={ + 'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'}, + 'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}}) + # PutItem with oversized for 'x' is rejected, item isn't created even + # in the base table. + p = random_string() + x = 'a'*1025 + with pytest.raises(ClientError, match='ValidationException.*1024'): + test_table_gsi_5.put_item(Item={'p': p, 'c': c, 'x': x}) + assert not 'Item' in test_table_gsi_5.get_item(Key={'p': p, 'c': c}, ConsistentRead=True) + +# This is a variant of the above test, where we don't insist that the +# partition key length limit must be exactly 1024 bytes as in DynamoDB, +# but that it be *at least* 1024. I.e., we verify that 1024-byte values +# are allowed for GSI partition keys, while very long keys that surpass +# Scylla's low-level key-length limit (64 KB) are forbidden with an +# appropriate error message and not an "internal server error". This test +# should pass even if Alternator decides to adopt a different key length +# limits from DynamoDB. We do have to adopt *some* limit because the +# internal Scylla implementation has a 64 KB limit on key lengths. +@pytest.mark.xfail(reason="issue #10347: key length limits not enforced") +def test_gsi_limit_sort_key_len(test_table_gsi_5): + # A value for 'x' (the GSI's partition key) of length 1024 is fine: + p = random_string() + c = random_string() + x = 'a'*1024 + test_table_gsi_5.put_item(Item={'p': p, 'c': c, 'x': x}) + assert_index_query(test_table_gsi_5, 'hello', [{'p': p, 'c': c, 'x': x}], + KeyConditions={ + 'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'}, + 'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}}) + # Attribute, that is a GSI partition key, of length 64 KB + 1 is forbidden: + # it obviously exceeds DynamoDB's limit (1024 bytes), but also exceeds + # Scylla's internal limit on key length (64 KB - 1). We except to get a + # reasonable error on request validation - not some "internal server error". + # We actually used to get this "internal server error" for 64 KB - 2 + # (this is probably related to issue #16772). + p = random_string() + x = 'a'*65536 + with pytest.raises(ClientError, match='ValidationException.*limit'): + test_table_gsi_5.put_item(Item={'p': p, 'c': c, 'x': x}) + assert not 'Item' in test_table_gsi_5.get_item(Key={'p': p, 'c': c}, ConsistentRead=True) + # A third scenario of GSI. Index has a hash key and a sort key, both are # non-key attributes from the base table. This scenario may be very # difficult to implement in Alternator because Scylla's materialized-views diff --git a/test/alternator/test_gsi_updatetable.py b/test/alternator/test_gsi_updatetable.py index d38bc5c336..a693fbab96 100644 --- a/test/alternator/test_gsi_updatetable.py +++ b/test/alternator/test_gsi_updatetable.py @@ -12,59 +12,9 @@ import pytest import time from botocore.exceptions import ClientError from .util import random_string, full_scan, full_query, multiset, \ - new_test_table + new_test_table, wait_for_gsi, wait_for_gsi_gone from .test_gsi import assert_index_query -# UpdateTable for creating a GSI is an asynchronous operation. The table's -# TableStatus changes from ACTIVE to UPDATING for a short while, and then -# goes back to ACTIVE, but the new GSI's IndexStatus appears as CREATING, -# until eventually (in Amazon DynamoDB - it tests a *long* time...) it -# becomes ACTIVE. During the CREATING phase, at some point the Backfilling -# attribute also appears, until it eventually disappears. We need to wait -# until all three markers indicate completion. -# Unfortunately, while boto3 has a client.get_waiter('table_exists') to -# wait for a table to exists, there is no such function to wait for an -# index to come up, so we need to code it ourselves. -def wait_for_gsi(table, gsi_name): - start_time = time.time() - # The timeout needs to be long because on Amazon DynamoDB, even on a - # a tiny table, it sometimes takes minutes. - while time.time() < start_time + 600: - desc = table.meta.client.describe_table(TableName=table.name) - table_status = desc['Table']['TableStatus'] - if table_status != 'ACTIVE': - time.sleep(0.1) - continue - index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name] - assert len(index_desc) == 1 - index_status = index_desc[0]['IndexStatus'] - if index_status != 'ACTIVE': - time.sleep(0.1) - continue - # When the index is ACTIVE, this must be after backfilling completed - assert not 'Backfilling' in index_desc[0] - return - raise AssertionError("wait_for_gsi did not complete") - -# Similarly to how wait_for_gsi() waits for a GSI to finish adding, -# this function waits for a GSI to be finally deleted. -def wait_for_gsi_gone(table, gsi_name): - start_time = time.time() - while time.time() < start_time + 600: - desc = table.meta.client.describe_table(TableName=table.name) - table_status = desc['Table']['TableStatus'] - if table_status != 'ACTIVE': - time.sleep(0.1) - continue - if 'GlobalSecondaryIndexes' in desc['Table']: - index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name] - if len(index_desc) != 0: - index_status = index_desc[0]['IndexStatus'] - time.sleep(0.1) - continue - return - raise AssertionError("wait_for_gsi_gone did not complete") - # All tests in test_gsi.py involved creating a new table with a GSI up-front. # This test will be about creating a base table *without* a GSI, putting data # in it, and then adding a GSI with the UpdateTable operation. This starts @@ -73,7 +23,6 @@ def wait_for_gsi_gone(table, gsi_name): # the wrong type are silently ignored and not added to the index. We also # check that after adding the GSI, it is no longer possible to add more # items with wrong types to the base table. -@pytest.mark.xfail(reason="issue #11567") def test_gsi_backfill(dynamodb): # First create, and fill, a table without GSI. The items in items1 # will have the appropriate string type for 'x' and will later get @@ -141,7 +90,6 @@ def test_gsi_backfill(dynamodb): # check that the new GSI works. In Alternator's implementation, the LSI key # column will become a real column in the schema, and the GSI needs to use # that instead of the usual computed column. -@pytest.mark.xfail(reason="issue #11567") def test_gsi_backfill_with_lsi(dynamodb): # First create, and fill, a table with an LSI but without GSI. with new_test_table(dynamodb, @@ -208,7 +156,6 @@ def test_gsi_backfill_with_lsi(dynamodb): # checked the case of a new GSI key being a real column because it was an # LSI key. In this test the GSI key is a real column because it was a # key column of the base table itself. -@pytest.mark.xfail(reason="issue #11567") def test_gsi_backfill_with_real_column(dynamodb): with new_test_table(dynamodb, KeySchema=[ @@ -237,7 +184,6 @@ def test_gsi_backfill_with_real_column(dynamodb): assert multiset(items) == multiset(full_scan(table, ConsistentRead=False, IndexName='gsi')) # Test deleting an existing GSI using UpdateTable -@pytest.mark.xfail(reason="issue #11567") def test_gsi_delete(dynamodb): with new_test_table(dynamodb, KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ], @@ -285,7 +231,6 @@ def test_gsi_delete(dynamodb): # still enforced because it is still an LSI key. In Alternator's # implementation this happens because the LSI key column was - and remains - # a real column in the schema. -@pytest.mark.xfail(reason="issue #11567") def test_gsi_delete_with_lsi(dynamodb): # A table whose non-key column "x" serves as a range key in an LSI, # and partition key in a GSI. @@ -315,6 +260,10 @@ def test_gsi_delete_with_lsi(dynamodb): 'Projection': { 'ProjectionType': 'ALL' } } ]) as table: + # We shouldn't need to wait for a GSI created together with the + # table, but let's do it anyway to work around bug #9059 (which + # isn't what this test is trying to reproduce). + wait_for_gsi(table, 'gsi') items = [{'p': random_string(), 'c': random_string(), 'x': random_string()} for i in range(10)] with table.batch_writer() as batch: for item in items: @@ -362,7 +311,6 @@ def test_gsi_delete_with_lsi(dynamodb): # operation on a table set up by CreateTable. In this test we try several # of these operations in sequence, to check we can add more than one GSI, # delete a GSI that we just added, recreate a GSI that we just deleted, etc. -@pytest.mark.xfail(reason="issue #11567") def test_gsi_creates_and_deletes(dynamodb): schema = { 'KeySchema': [ { 'AttributeName': 'p', 'KeyType': 'HASH' } ], @@ -491,7 +439,6 @@ def test_gsi_backfill_empty_string(dynamodb): # happens during the table creation, and one here where the second GSI is # added after the table already exists with the first GSI. # Reproduces #13870. -@pytest.mark.xfail(reason="issue #11567") def test_gsi_key_type_conflict_on_update(dynamodb): with new_test_table(dynamodb, KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }], @@ -532,7 +479,6 @@ def table1(dynamodb): yield table # An empty update_table() call, without any parameters changed, is not allowed. -@pytest.mark.xfail(reason="issue #11567") def test_updatetable_empty(dynamodb, table1): with pytest.raises(ClientError, match='ValidationException.*UpdateTable'): dynamodb.meta.client.update_table(TableName=table1.name) @@ -543,7 +489,6 @@ def test_updatetable_empty(dynamodb, table1): GlobalSecondaryIndexUpdates=[]) # Test various invalid cases of UpdateTable's GlobalSecondaryIndexUpdates. -@pytest.mark.xfail(reason="issue #11567") def test_gsi_updatetable_errors(dynamodb, table1): client = dynamodb.meta.client @@ -637,7 +582,6 @@ def test_gsi_updatetable_errors(dynamodb, table1): # In Alternator, we decided to detect this case anyway - it can help users # notice problems (see #19784). So because we differ from DynamoDB on this, # this test is marked scylla_only. -@pytest.mark.xfail(reason="issue #11567") def test_gsi_updatetable_spurious_attribute_definitions(table1, scylla_only): with pytest.raises(ClientError, match='ValidationException.*AttributeDefinitions'): table1.meta.client.update_table(TableName=table1.name, @@ -654,9 +598,132 @@ def test_gsi_updatetable_spurious_attribute_definitions(table1, scylla_only): # Check that attempting to delete a GSI that doesn't exist results in # the expected ResourceNotFoundException. -@pytest.mark.xfail(reason="issue #11567") def test_updatetable_delete_missing_gsi(dynamodb, table1): with pytest.raises(ClientError, match='ResourceNotFoundException'): dynamodb.meta.client.update_table(TableName=table1.name, GlobalSecondaryIndexUpdates=[{ 'Delete': { 'IndexName': 'nonexistent' } }]) + +# Whereas DynamoDB allows attribute values to reach a generous length (they +# are only limited by the item's size limit, 400 KB), an attribute which is +# a *key* has much stricter limits - 2048 bytes for a partition key, 1024 +# bytes for a sort key. This means that if a table has a GSI or LSI and +# one of the attributes serves as a key in that GSI and LSI, DynamoDB +# limits its length. In the tests test_gsi.py::test_gsi_limit_* we verified +# that attempts to write an oversized value to an attribute which is a +# GSI key are rejected. Here we test what happens when adding a GSI to +# a table with pre-existing data, which already includes items with oversized +# values for the key attribute. These items can't be "rejected" - they +# are already in the base table - but should be skipped while filling the +# GSI. What we don't want to happen is to see the view building hang, +# as described in issue #8627 and #10347. +# The first test here, test_gsi_backfill_oversized_key(), doesn't check the +# specific limits of 2048 and 1024 bytes, it only checks that an item with +# a 65 KB attribute (above Scylla's internal limitations for keys) are +# cleanly skipped and don't cause view build hangs. The following test +# test_gsi_backfill_key_limits will check the specific limits. +def test_gsi_backfill_oversized_key(dynamodb): + # First create, and fill, a table without GSI: + with new_test_table(dynamodb, + KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, + { 'AttributeName': 'c', 'KeyType': 'RANGE' } ], + AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, + { 'AttributeName': 'c', 'AttributeType': 'S' } ]) as table: + p1 = random_string() + p2 = random_string() + c = random_string() + # Create two items, one has a small "x" attribute, the other has + # a 65 KB "x" attribute. + table.put_item(Item={'p': p1, 'c': c, 'x': 'hello'}) + table.put_item(Item={'p': p2, 'c': c, 'x': 'a'*66500}) + # Now use UpdateTable to create two GSIs. In one of them "x" will be + # the partition key, and in the other "x" will be a sort key. + # DynamoDB limits the number of indexes that can be added in one + # UpdateTable command to just one, so we need to do it in two separate + # commands and wait for each to complete. + dynamodb.meta.client.update_table(TableName=table.name, + AttributeDefinitions=[{ 'AttributeName': 'x', 'AttributeType': 'S' }], + GlobalSecondaryIndexUpdates=[ + { 'Create': { 'IndexName': 'index1', + 'KeySchema': [{ 'AttributeName': 'x', 'KeyType': 'HASH' }], + 'Projection': { 'ProjectionType': 'ALL' }} + } + ]) + wait_for_gsi(table, 'index1') + dynamodb.meta.client.update_table(TableName=table.name, + AttributeDefinitions=[{ 'AttributeName': 'x', 'AttributeType': 'S' }, + { 'AttributeName': 'c', 'AttributeType': 'S' }], + GlobalSecondaryIndexUpdates=[ + { 'Create': { 'IndexName': 'index2', + 'KeySchema': [{ 'AttributeName': 'c', 'KeyType': 'HASH' }, + { 'AttributeName': 'x', 'KeyType': 'RANGE' }], + 'Projection': { 'ProjectionType': 'ALL' }} + } + ]) + wait_for_gsi(table, 'index2') + # Verify that the items with the oversized x are missing from both + # GSIs, so only the one item with x = hello should appear in both. + # Note that we don't need to retry the reads here (i.e., use the + # assert_index_scan() or assert_index_query() functions) because after + # we waited for backfilling to complete, we know all the pre-existing + # data is already in the index. + assert [{'p': p1, 'c': c, 'x': 'hello'}] == full_scan(table, ConsistentRead=False, IndexName='index1') + assert [{'p': p1, 'c': c, 'x': 'hello'}] == full_scan(table, ConsistentRead=False, IndexName='index2') + +# The previous test, test_gsi_backfill_oversized_key(), checked that a +# grossly oversized GSI key attribute (over Scylla's internal key limit +# of 64 KB) doesn't hang the view building process. This test verifies +# more specifically that DynamoDB's documented limits - 2048 bytes for +# a GSI partition key and 1024 for a GSI sort key - are implemented. An +# item that has an attribute longer than that should simply be skipped +# during view building. +# Reproduces issue #10347. +@pytest.mark.xfail(reason="issue #10347: key length limits not enforced") +def test_gsi_backfill_key_limits(dynamodb): + # First create, and fill, a table without GSI: + with new_test_table(dynamodb, + KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, + { 'AttributeName': 'c', 'KeyType': 'RANGE' } ], + AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, + { 'AttributeName': 'c', 'AttributeType': 'S' } ]) as table: + # Create four items, with 'x' attribute sizes of 1024, 1025, 2048 + # and 2049. Only one item (1024) has x suitable for a sort key, + # and three (1024, 1025 and 2048) have length suitable for a partition + # key. The unsuitable items will be missing from the indexes. + lengths = [1024, 1025, 2048, 2049] + p = [random_string() for length in lengths] + x = ['a'*length for length in lengths] + c = random_string() + for i in range(len(lengths)): + table.put_item(Item={'p': p[i], 'c': c, 'x': x[i]}) + # Now use UpdateTable to create two GSIs. In one of them "x" will be + # the partition key, and in the other "x" will be a sort key. + # DynamoDB limits the number of indexes that can be added in one + # UpdateTable command to just one, so we need to do it in two separate + # commands and wait for each to complete. + dynamodb.meta.client.update_table(TableName=table.name, + AttributeDefinitions=[{ 'AttributeName': 'x', 'AttributeType': 'S' }], + GlobalSecondaryIndexUpdates=[ + { 'Create': { 'IndexName': 'index1', + 'KeySchema': [{ 'AttributeName': 'x', 'KeyType': 'HASH' }], + 'Projection': { 'ProjectionType': 'ALL' }} + } + ]) + wait_for_gsi(table, 'index1') + dynamodb.meta.client.update_table(TableName=table.name, + AttributeDefinitions=[{ 'AttributeName': 'x', 'AttributeType': 'S' }, + { 'AttributeName': 'c', 'AttributeType': 'S' }], + GlobalSecondaryIndexUpdates=[ + { 'Create': { 'IndexName': 'index2', + 'KeySchema': [{ 'AttributeName': 'c', 'KeyType': 'HASH' }, + { 'AttributeName': 'x', 'KeyType': 'RANGE' }], + 'Projection': { 'ProjectionType': 'ALL' }} + } + ]) + wait_for_gsi(table, 'index2') + # Verify that the items with the oversized x are missing from both + # GSIs. For index1 (x is a partition key, limited to 2048 bytes) + # items 0,1,2 should appear, for index2 (x is a sort key, limited + # to 1024 bytes), only item 0 should appear. + assert multiset([{'p': p[i], 'c': c, 'x': x[i]} for i in range(3)]) == multiset(full_scan(table, ConsistentRead=False, IndexName='index1')) + assert [{'p': p[0], 'c': c, 'x': x[0]}] == full_scan(table, ConsistentRead=False, IndexName='index2') diff --git a/test/alternator/util.py b/test/alternator/util.py index 2ea2aa470e..dedfc9bdab 100644 --- a/test/alternator/util.py +++ b/test/alternator/util.py @@ -245,3 +245,53 @@ def scylla_inject_error(rest_api, err, one_shot=False): def scylla_log(optional_rest_api, message, level): if optional_rest_api: requests.post(f'{optional_rest_api}/system/log?message={requests.utils.quote(message)}&level={level}') + +# UpdateTable for creating a GSI is an asynchronous operation. The table's +# TableStatus changes from ACTIVE to UPDATING for a short while, and then +# goes back to ACTIVE, but the new GSI's IndexStatus appears as CREATING, +# until eventually (in Amazon DynamoDB - it tests a *long* time...) it +# becomes ACTIVE. During the CREATING phase, at some point the Backfilling +# attribute also appears, until it eventually disappears. We need to wait +# until all three markers indicate completion. +# Unfortunately, while boto3 has a client.get_waiter('table_exists') to +# wait for a table to exists, there is no such function to wait for an +# index to come up, so we need to code it ourselves. +def wait_for_gsi(table, gsi_name): + start_time = time.time() + # The timeout needs to be long because on Amazon DynamoDB, even on a + # a tiny table, it sometimes takes minutes. + while time.time() < start_time + 600: + desc = table.meta.client.describe_table(TableName=table.name) + table_status = desc['Table']['TableStatus'] + if table_status != 'ACTIVE': + time.sleep(0.1) + continue + index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name] + assert len(index_desc) == 1 + index_status = index_desc[0]['IndexStatus'] + if index_status != 'ACTIVE': + time.sleep(0.1) + continue + # When the index is ACTIVE, this must be after backfilling completed + assert not 'Backfilling' in index_desc[0] + return + raise AssertionError("wait_for_gsi did not complete") + +# Similarly to how wait_for_gsi() waits for a GSI to finish adding, +# this function waits for a GSI to be finally deleted. +def wait_for_gsi_gone(table, gsi_name): + start_time = time.time() + while time.time() < start_time + 600: + desc = table.meta.client.describe_table(TableName=table.name) + table_status = desc['Table']['TableStatus'] + if table_status != 'ACTIVE': + time.sleep(0.1) + continue + if 'GlobalSecondaryIndexes' in desc['Table']: + index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name] + if len(index_desc) != 0: + index_status = index_desc[0]['IndexStatus'] + time.sleep(0.1) + continue + return + raise AssertionError("wait_for_gsi_gone did not complete")