diff --git a/alternator/streams.cc b/alternator/streams.cc index 81eb8e5e5e..a989aaa016 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -167,46 +167,8 @@ static schema_ptr get_schema_from_arn(service::storage_proxy& proxy, const strea } } -// ShardId. Must be between 28 and 65 characters inclusive. -// UUID is 36 bytes as string (including dashes). -// Prepend a version/type marker (`S`) -> 37 -class stream_shard_id : public utils::UUID { -public: - using UUID = utils::UUID; - static constexpr char marker = 'S'; - - stream_shard_id() = default; - stream_shard_id(const UUID& uuid) - : UUID(uuid) - {} - stream_shard_id(const table_id& tid) - : UUID(tid.uuid()) - {} - stream_shard_id(std::string_view v) - : UUID(v.substr(1)) - { - if (v[0] != marker) { - throw std::invalid_argument(std::string(v)); - } - } - friend std::ostream& operator<<(std::ostream& os, const stream_shard_id& arn) { - const UUID& uuid = arn; - return os << marker << uuid; - } - friend std::istream& operator>>(std::istream& is, stream_shard_id& arn) { - std::string s; - is >> s; - arn = stream_shard_id(s); - return is; - } -}; - } // namespace alternator -template -struct rapidjson::internal::TypeHelper - : public from_string_helper -{}; template struct rapidjson::internal::TypeHelper : public from_string_helper @@ -218,7 +180,8 @@ future alternator::executor::list_str _stats.api_operations.list_streams++; auto limit = rjson::get_opt(request, "Limit").value_or(100); - auto streams_start = rjson::get_opt(request, "ExclusiveStartStreamArn"); + auto streams_start = rjson::get_opt(request, "ExclusiveStartStreamArn"); + auto table = find_table(_proxy, request); auto db = _proxy.data_dictionary(); @@ -244,34 +207,34 @@ future alternator::executor::list_str cfs = db.get_tables(); } - // # 12601 (maybe?) - sort the set of tables on ID. This should ensure we never - // generate duplicates in a paged listing here. Can obviously miss things if they - // are added between paged calls and end up with a "smaller" UUID/ARN, but that - // is to be expected. + // We need to sort the tables to ensure a stable order for paging. + // We sort by keyspace and table name, which will also allow us to skip to + // the right position by ExclusiveStartStreamArn. + auto cmp = [](std::string_view ks1, std::string_view cf1, std::string_view ks2, std::string_view cf2) { + return ks1 == ks2 ? cf1 < cf2 : ks1 < ks2; + }; if (std::cmp_less(limit, cfs.size()) || streams_start) { - std::sort(cfs.begin(), cfs.end(), [](const data_dictionary::table& t1, const data_dictionary::table& t2) { - return t1.schema()->id().uuid() < t2.schema()->id().uuid(); - }); + std::sort(cfs.begin(), cfs.end(), + [&cmp](const data_dictionary::table& t1, const data_dictionary::table& t2) { + return cmp(t1.schema()->ks_name(), t1.schema()->cf_name(), + t2.schema()->ks_name(), t2.schema()->cf_name()); + }); } auto i = cfs.begin(); auto e = cfs.end(); if (streams_start) { - i = std::find_if(i, e, [&](const data_dictionary::table& t) { - return t.schema()->id().uuid() == streams_start - && cdc::get_base_table(db.real_database(), *t.schema()) - && is_alternator_keyspace(t.schema()->ks_name()) - ; - }); - if (i != e) { - ++i; - } + i = std::upper_bound(i, e, *streams_start, + [&cmp](const stream_arn& arn, const data_dictionary::table& t) { + return cmp(arn.keyspace_name(), arn.table_name(), + t.schema()->ks_name(), t.schema()->cf_name()); + }); } auto ret = rjson::empty_object(); auto streams = rjson::empty_array(); - std::optional last; + std::optional last; for (;limit > 0 && i != e; ++i) { auto s = i->schema(); @@ -282,8 +245,9 @@ future alternator::executor::list_str } if (cdc::is_log_for_some_table(db.real_database(), ks_name, cf_name)) { rjson::value new_entry = rjson::empty_object(); - last = i->schema()->id(); + auto arn = stream_arn{ i->schema(), cdc::get_base_table(db.real_database(), *i->schema()) }; + last = std::string(arn.unparsed()); rjson::add(new_entry, "StreamArn", arn); rjson::add(new_entry, "StreamLabel", rjson::from_string(stream_label(*s))); rjson::add(new_entry, "TableName", rjson::from_string(cdc::base_name(s->cf_name()))); @@ -294,8 +258,12 @@ future alternator::executor::list_str rjson::add(ret, "Streams", std::move(streams)); - if (last) { - rjson::add(ret, "LastEvaluatedStreamArn", *last); + // Only emit LastEvaluatedStreamArn when we stopped because we hit the + // limit (limit == 0), meaning there may be more streams to list. + // If we exhausted all tables naturally (limit > 0), there are no more + // streams, so we must not emit a cookie. + if (last && limit == 0) { + rjson::add(ret, "LastEvaluatedStreamArn", rjson::from_string(*last)); } return make_ready_future(rjson::print(std::move(ret))); } diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index bfaf675bda..1eaa4b60f5 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -179,6 +179,136 @@ def test_list_streams_paged(dynamodb, dynamodbstreams): break streams = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=streams['LastEvaluatedStreamArn']) +# The previous test (test_list_streams_paged) verifies that paging in +# ListStreams works by passing the cookie returned as LastEvaluatedStreamArn +# from one call, into ExclusiveStartStreamArn given to the next call. But that +# test did not check what the value of this "ExclusiveStartStreamArn" looks +# like. The DynamoDB documentation says that it should be "The stream ARN of +# the item where the operation stopped", so in this test we check that it +# indeed is the last returned ARN - and not some opaque cookie of a different +# form. +# This test also verifies that the final page has no LastEvaluatedStreamArn, +# something which test_list_streams_paged also did not check. +def test_list_streams_paged_cookie(dynamodb, dynamodbstreams): + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table1: + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table2: + wait_for_active_stream(dynamodbstreams, table1) + wait_for_active_stream(dynamodbstreams, table2) + # Page through all streams one at a time, checking on every page + # that LastEvaluatedStreamArn equals the StreamArn of the last + # returned stream. The DynamoDB documentation says it should be + # "The stream ARN of the item where the operation stopped" - i.e., + # the actual ARN, not an opaque cookie of a different form. + # We also verify that the final page has no LastEvaluatedStreamArn, + # indicating the end of the list. + seen_arns = [] + response = dynamodbstreams.list_streams(Limit=1) + while True: + assert 'Streams' in response + # All but the last page must return exactly one stream because + # we used Limit=1, but the last page may return 0 or 1 stream. + if 'LastEvaluatedStreamArn' not in response: + # Reached the last page. + assert len(response['Streams']) <= 1 + if response['Streams']: + seen_arns.append(response['Streams'][0]['StreamArn']) + break + assert len(response['Streams']) == 1 + seen_arns.append(response['Streams'][0]['StreamArn']) + # The cookie must equal the StreamArn of the last returned item. + assert response['LastEvaluatedStreamArn'] == response['Streams'][-1]['StreamArn'] + response = dynamodbstreams.list_streams( + Limit=1, ExclusiveStartStreamArn=response['LastEvaluatedStreamArn']) + # For completeness, validate that both test tables were listed in + # the result, and there were no duplicates. + assert len(seen_arns) == len(set(seen_arns)) + for table in [table1, table2]: + table_arns = [s['StreamArn'] for s in + dynamodbstreams.list_streams(TableName=table.name)['Streams']] + assert any(arn in seen_arns for arn in table_arns) + +# If the last page of ListStreams results is not full because there are no +# more streams to list, it shouldn't have LastEvaluatedStreamArn: It's silly +# to return one and then force the user to retrieve another empty page). +# Note that we focus on the case of the last page not being full because if +# the last page *is* full (list Limit streams), the implementation may not be +# aware that there are no more streams to list. +def test_list_streams_unfull_last_page_no_cookie(dynamodb, dynamodbstreams): + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table: + wait_for_active_stream(dynamodbstreams, table) + # With a very high Limit the page is definitely not full, so + # LastEvaluatedStreamArn must be absent. Limit=100 is not very + # high, but DynamoDB doesn't allow any higher and it is very + # unlikely we'll ever run this test with 100 live streams so + # we can expect the page to not be full. + response = dynamodbstreams.list_streams(Limit=100) + assert 'Streams' in response + assert len(response['Streams']) >= 1 + assert 'LastEvaluatedStreamArn' not in response + +# Test what happens if we do a paging ListStreams, a page returns as +# LastEvaluatedStreamArn an ARN of some table's stream, but then we delete +# that table before retrieving the next page. The paging should be able to +# continue without error, and see the rest of the streams that we haven't +# seen yet. +def test_list_streams_paged_resume_on_deleted_table(dynamodb, dynamodbstreams): + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table1: + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table2: + wait_for_active_stream(dynamodbstreams, table1) + wait_for_active_stream(dynamodbstreams, table2) + # Page through the streams one at a time (Limit=1), waiting until + # we get either table1 or table2. + # Set found_table to the table found, other_table to the one not + # yet found, and last_arn to LastEvaluatedStreamArn where we stopped. + response = dynamodbstreams.list_streams(Limit=1) + while True: + assert 'Streams' in response + assert len(response['Streams']) == 1 + last_arn = response['LastEvaluatedStreamArn'] + if response['Streams'][0]['TableName'] == table1.name: + found_table, other_table = table1, table2 + break + if response['Streams'][0]['TableName'] == table2.name: + found_table, other_table = table2, table1 + break + response = dynamodbstreams.list_streams( + Limit=1, ExclusiveStartStreamArn=last_arn) + # Delete found_table, the one which last_arn that we are holding + # refers to. + found_table.delete() + found_table.meta.client.get_waiter('table_not_exists').wait(TableName=found_table.name) + # Try to continue the paging, with last_arn that now refers to the + # already deleted table. We expect to be able to continue without + # error, and see other_table which we haven't seen yet (and not + # see found_table again, of course). Other than that, it's not clear + # what we expect to see - it would be nice not to have the list + # restarted from scratch, but this test doesn't rule this + # implementation out (and I'm not sure we should rule it out). + response = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=last_arn) + found_both = False + while True: + assert 'Streams' in response + if response['Streams']: + assert len(response['Streams']) == 1 + assert response['Streams'][0]['TableName'] != found_table.name + if response['Streams'][0]['TableName'] == other_table.name: + found_both = True + if 'LastEvaluatedStreamArn' not in response: + break + response = dynamodbstreams.list_streams( + Limit=1, ExclusiveStartStreamArn=response['LastEvaluatedStreamArn']) + assert found_both + # When we'll go out of scope on create_stream_test_table that + # create found_table - which we deleted - it will expect this + # table to exist and fail when it doesn't. So let's recreate the table. + # It doesn't matter which schema we use, it just needs to exist. + found_table = dynamodb.create_table(TableName=found_table.name, + KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ], + AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ], + BillingMode='PAY_PER_REQUEST') + found_table.meta.client.get_waiter('table_exists').wait(TableName=found_table.name) + + # ListStreams with paging should be able correctly return a full list of # pre-existing streams even if additional tables were added between pages # and caused Scylla's hash table of tables to be reorganized. @@ -294,6 +424,36 @@ def test_describe_nonexistent_stream(dynamodb, dynamodbstreams): with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'): dynamodbstreams.describe_stream(StreamArn='sdfadfsdfnlfkajakfgjalksfgklasjklasdjfklasdfasdfgasf') +# test_describe_nonexistent_stream checked the case where a StreamArn is +# completely bogus. Here we want to check well-formed names that point to +# table names that do not exist. We should return ResourceNotFoundException +# in this case, not ValidationException. +# To reduce our assumptions of what valid ARNs look like, we don't hardcode +# any ARN formats here. Instead, we create a stream, get its real ARN, and +# then modify it to point to a non-existent table. +def test_describe_stream_nonexistent_table(dynamodb, dynamodbstreams): + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table: + streams = dynamodbstreams.list_streams(TableName=table.name) + arn = streams['Streams'][0]['StreamArn'] + # Replace the table name in the ARN with one that doesn't exist. + # In both DynamoDB and Alternator, the table name appears in the ARN + # at least once, and replacing it yields a well-formed but non-existent + # ARN. + # In Alternator, the table name appears twice - in both keyspace and + # CDC log name parts - so we want to try both replacements to verify + # that both incorrect keyspace name and incorrect table names are + # handled. + nonexistent = table.name + '_nonexistent_' + random_string() + count = arn.count(table.name) + assert count >= 1 # sanity check that the name appeared in the ARN at all + for i in range(count): + # Replace only the i-th occurrence of table.name. + parts = arn.split(table.name) + modified_arn = table.name.join(parts[:i+1]) + nonexistent + table.name.join(parts[i+1:]) + assert modified_arn != arn + with pytest.raises(ClientError, match='ResourceNotFoundException'): + dynamodbstreams.describe_stream(StreamArn=modified_arn) + def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams): with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table: streams = dynamodbstreams.list_streams(TableName=table.name)