From 1ac910c2ab877b9f2bfce7fa8269420e03ed1a51 Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Tue, 14 Apr 2026 19:43:19 +0300 Subject: [PATCH 1/5] alternator: fix ListStreams to return real ARN as LastEvaluatedStreamArn Alternator Streams' "ListStreams" does paging by returning a "cookie" LastEvaluatedStreamArn from one request, that the user passes to the next request as ExclusiveStartStreamArn. In the past, Alternator's stream ARNs were UUIDs, but we recently changed them to match DynamoDB's ARN format which the KCL library requires. However, we didn't change ListStream's cookie format, and it remained UUIDs. This, however, goes against the documentation of DynamoDB, which states that LastEvaluatedStreamArn should be "the stream ARN of the item where the operation stopped". It shouldn't be some weird opaque cookie. So in this patch we add a test that confirms that indeed, in DynamoDB the LastEvaluatedStreamARN is really the last returned ARN and not an opaque cookie. The new test passes on DynamoDB, and fails on Alternator before the simple fix that this patch then does. Fixes SCYLLADB-539. --- alternator/streams.cc | 13 +++++---- test/alternator/test_streams.py | 48 +++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/alternator/streams.cc b/alternator/streams.cc index bb150856a1..0939867384 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -218,7 +218,8 @@ future alternator::executor::list_str _stats.api_operations.list_streams++; auto limit = rjson::get_opt(request, "Limit").value_or(100); - auto streams_start = rjson::get_opt(request, "ExclusiveStartStreamArn"); + auto streams_start = rjson::get_opt(request, "ExclusiveStartStreamArn"); + auto table = find_table(_proxy, request); auto db = _proxy.data_dictionary(); @@ -259,7 +260,8 @@ future alternator::executor::list_str if (streams_start) { i = std::find_if(i, e, [&](const data_dictionary::table& t) { - return t.schema()->id().uuid() == streams_start + return t.schema()->ks_name() == streams_start->keyspace_name() + && t.schema()->cf_name() == streams_start->table_name() && cdc::get_base_table(db.real_database(), *t.schema()) && is_alternator_keyspace(t.schema()->ks_name()) ; @@ -271,7 +273,7 @@ future alternator::executor::list_str auto ret = rjson::empty_object(); auto streams = rjson::empty_array(); - std::optional last; + std::optional last; for (;limit > 0 && i != e; ++i) { auto s = i->schema(); @@ -282,8 +284,9 @@ future alternator::executor::list_str } if (cdc::is_log_for_some_table(db.real_database(), ks_name, cf_name)) { rjson::value new_entry = rjson::empty_object(); - last = i->schema()->id(); + auto arn = stream_arn{ i->schema(), cdc::get_base_table(db.real_database(), *i->schema()) }; + last = std::string(arn.unparsed()); rjson::add(new_entry, "StreamArn", arn); rjson::add(new_entry, "StreamLabel", rjson::from_string(stream_label(*s))); rjson::add(new_entry, "TableName", rjson::from_string(cdc::base_name(s->cf_name()))); @@ -295,7 +298,7 @@ future alternator::executor::list_str rjson::add(ret, "Streams", std::move(streams)); if (last) { - rjson::add(ret, "LastEvaluatedStreamArn", *last); + rjson::add(ret, "LastEvaluatedStreamArn", rjson::from_string(*last)); } return make_ready_future(rjson::print(std::move(ret))); } diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index bfaf675bda..2c8b1e4108 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -179,6 +179,54 @@ def test_list_streams_paged(dynamodb, dynamodbstreams): break streams = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=streams['LastEvaluatedStreamArn']) +# The previous test (test_list_streams_paged) verifies that paging in +# ListStreams works by passing the cookie returned as LastEvaluatedStreamArn +# from one call, into ExclusiveStartStreamArn given to the next call. But that +# test did not check what the value of this "ExclusiveStartStreamArn" looks +# like. The DynamoDB documentation says that it should be "The stream ARN of +# the item where the operation stopped", so in this test we check that it +# indeed is the last returned ARN - and not some opaque cookie of a different +# form. +# This test also verifies that the final page has no LastEvaluatedStreamArn, +# something which test_list_streams_paged also did not check. +def test_list_streams_paged_cookie(dynamodb, dynamodbstreams): + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table1: + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table2: + wait_for_active_stream(dynamodbstreams, table1) + wait_for_active_stream(dynamodbstreams, table2) + # Page through all streams one at a time, checking on every page + # that LastEvaluatedStreamArn equals the StreamArn of the last + # returned stream. The DynamoDB documentation says it should be + # "The stream ARN of the item where the operation stopped" - i.e., + # the actual ARN, not an opaque cookie of a different form. + # We also verify that the final page has no LastEvaluatedStreamArn, + # indicating the end of the list. + seen_arns = [] + response = dynamodbstreams.list_streams(Limit=1) + while True: + assert 'Streams' in response + # All but the last page must return exactly one stream because + # we used Limit=1, but the last page may return 0 or 1 stream. + if 'LastEvaluatedStreamArn' not in response: + # Reached the last page. + assert len(response['Streams']) <= 1 + if response['Streams']: + seen_arns.append(response['Streams'][0]['StreamArn']) + break + assert len(response['Streams']) == 1 + seen_arns.append(response['Streams'][0]['StreamArn']) + # The cookie must equal the StreamArn of the last returned item. + assert response['LastEvaluatedStreamArn'] == response['Streams'][-1]['StreamArn'] + response = dynamodbstreams.list_streams( + Limit=1, ExclusiveStartStreamArn=response['LastEvaluatedStreamArn']) + # For completeness, validate that both test tables were listed in + # the result, and there were no duplicates. + assert len(seen_arns) == len(set(seen_arns)) + for table in [table1, table2]: + table_arns = [s['StreamArn'] for s in + dynamodbstreams.list_streams(TableName=table.name)['Streams']] + assert any(arn in seen_arns for arn in table_arns) + # ListStreams with paging should be able correctly return a full list of # pre-existing streams even if additional tables were added between pages # and caused Scylla's hash table of tables to be reorganized. From 68b783103e1fa8088d876e9b2c4545e2f78b4caa Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Tue, 14 Apr 2026 21:10:43 +0300 Subject: [PATCH 2/5] alternator: remove dead code stream_shard_id The class "stream_shard_id" was used in the past (with the old name stream_arn) for representing stream ARNs. It was renamed "stream_shard_id" under the mistaken believe that it will be used to represent DynamoDB Streams "shards" - but it wasn't used for that either (we have a separate "struct shard_id" in the code). So this class is now dead code and can be removed. Signed-off-by: Nadav Har'El --- alternator/streams.cc | 38 -------------------------------------- 1 file changed, 38 deletions(-) diff --git a/alternator/streams.cc b/alternator/streams.cc index 0939867384..929ffa3554 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -167,46 +167,8 @@ static schema_ptr get_schema_from_arn(service::storage_proxy& proxy, const strea } } -// ShardId. Must be between 28 and 65 characters inclusive. -// UUID is 36 bytes as string (including dashes). -// Prepend a version/type marker (`S`) -> 37 -class stream_shard_id : public utils::UUID { -public: - using UUID = utils::UUID; - static constexpr char marker = 'S'; - - stream_shard_id() = default; - stream_shard_id(const UUID& uuid) - : UUID(uuid) - {} - stream_shard_id(const table_id& tid) - : UUID(tid.uuid()) - {} - stream_shard_id(std::string_view v) - : UUID(v.substr(1)) - { - if (v[0] != marker) { - throw std::invalid_argument(std::string(v)); - } - } - friend std::ostream& operator<<(std::ostream& os, const stream_shard_id& arn) { - const UUID& uuid = arn; - return os << marker << uuid; - } - friend std::istream& operator>>(std::istream& is, stream_shard_id& arn) { - std::string s; - is >> s; - arn = stream_shard_id(s); - return is; - } -}; - } // namespace alternator -template -struct rapidjson::internal::TypeHelper - : public from_string_helper -{}; template struct rapidjson::internal::TypeHelper : public from_string_helper From 02d474fca826d036940d68738dc3ccf47e6804de Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Tue, 14 Apr 2026 21:29:19 +0300 Subject: [PATCH 3/5] alternator: ListStreams: on last page, avoid LastEvaluatedStreamArn When ListStreams is on its last page and ran out streams to list, it shouldn't return a paging cookie (LastEvaluatedStreamArn) at all. Before this patch it does, and forces the user to make another call just to get another empty page, which is silly. This patch includes a fix and a reproducer test (that, as usual, passes on DynamoDB and fails on Alternator before the patch and succeeds after). Signed-off-by: Nadav Har'El --- alternator/streams.cc | 6 +++++- test/alternator/test_streams.py | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/alternator/streams.cc b/alternator/streams.cc index 929ffa3554..47010494b4 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -259,7 +259,11 @@ future alternator::executor::list_str rjson::add(ret, "Streams", std::move(streams)); - if (last) { + // Only emit LastEvaluatedStreamArn when we stopped because we hit the + // limit (limit == 0), meaning there may be more streams to list. + // If we exhausted all tables naturally (limit > 0), there are no more + // streams, so we must not emit a cookie. + if (last && limit == 0) { rjson::add(ret, "LastEvaluatedStreamArn", rjson::from_string(*last)); } return make_ready_future(rjson::print(std::move(ret))); diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index 2c8b1e4108..049fee8ab3 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -227,6 +227,25 @@ def test_list_streams_paged_cookie(dynamodb, dynamodbstreams): dynamodbstreams.list_streams(TableName=table.name)['Streams']] assert any(arn in seen_arns for arn in table_arns) +# If the last page of ListStreams results is not full because there are no +# more streams to list, it shouldn't have LastEvaluatedStreamArn: It's silly +# to return one and then force the user to retrieve another empty page). +# Note that we focus on the case of the last page not being full because if +# the last page *is* full (list Limit streams), the implementation may not be +# aware that there are no more streams to list. +def test_list_streams_unfull_last_page_no_cookie(dynamodb, dynamodbstreams): + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table: + wait_for_active_stream(dynamodbstreams, table) + # With a very high Limit the page is definitely not full, so + # LastEvaluatedStreamArn must be absent. Limit=100 is not very + # high, but DynamoDB doesn't allow any higher and it is very + # unlikely we'll ever run this test with 100 live streams so + # we can expect the page to not be full. + response = dynamodbstreams.list_streams(Limit=100) + assert 'Streams' in response + assert len(response['Streams']) >= 1 + assert 'LastEvaluatedStreamArn' not in response + # ListStreams with paging should be able correctly return a full list of # pre-existing streams even if additional tables were added between pages # and caused Scylla's hash table of tables to be reorganized. From 930fb4c330165ea94e9930c140aaf58ca4f16c5a Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Tue, 14 Apr 2026 21:55:32 +0300 Subject: [PATCH 4/5] test/alternator: test DescribeStream on non-existent table We already had a test for DescribeStream being called on a bogus ARN returns a ValidationException. But if the stream is more legitimate- looking but refers to a non-existent table (e.g., an ARN taken in the past from a table that no longer exists), we should return ResourceNotFoundException. In this patch we add a test that verifies we indeed do this correctly. Moreover, Alternator's current stream ARNs include both a keyspace name and a table name, and either one being incorrect should lead to ResourceNotFoundException, and indeed the new test validates that it works as expected - there is no bug here (AI guessed we have a bug in the missing *keyspace* case, but this guess was wrong). --- test/alternator/test_streams.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index 049fee8ab3..8db16ed77d 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -361,6 +361,36 @@ def test_describe_nonexistent_stream(dynamodb, dynamodbstreams): with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'): dynamodbstreams.describe_stream(StreamArn='sdfadfsdfnlfkajakfgjalksfgklasjklasdjfklasdfasdfgasf') +# test_describe_nonexistent_stream checked the case where a StreamArn is +# completely bogus. Here we want to check well-formed names that point to +# table names that do not exist. We should return ResourceNotFoundException +# in this case, not ValidationException. +# To reduce our assumptions of what valid ARNs look like, we don't hardcode +# any ARN formats here. Instead, we create a stream, get its real ARN, and +# then modify it to point to a non-existent table. +def test_describe_stream_nonexistent_table(dynamodb, dynamodbstreams): + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table: + streams = dynamodbstreams.list_streams(TableName=table.name) + arn = streams['Streams'][0]['StreamArn'] + # Replace the table name in the ARN with one that doesn't exist. + # In both DynamoDB and Alternator, the table name appears in the ARN + # at least once, and replacing it yields a well-formed but non-existent + # ARN. + # In Alternator, the table name appears twice - in both keyspace and + # CDC log name parts - so we want to try both replacements to verify + # that both incorrect keyspace name and incorrect table names are + # handled. + nonexistent = table.name + '_nonexistent_' + random_string() + count = arn.count(table.name) + assert count >= 1 # sanity check that the name appeared in the ARN at all + for i in range(count): + # Replace only the i-th occurrence of table.name. + parts = arn.split(table.name) + modified_arn = table.name.join(parts[:i+1]) + nonexistent + table.name.join(parts[i+1:]) + assert modified_arn != arn + with pytest.raises(ClientError, match='ResourceNotFoundException'): + dynamodbstreams.describe_stream(StreamArn=modified_arn) + def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams): with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table: streams = dynamodbstreams.list_streams(TableName=table.name) From 0d05e3b4a46e01570ce966ecd00d448d0236e34d Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Wed, 15 Apr 2026 12:28:33 +0300 Subject: [PATCH 5/5] alternator: fix ListStreams paging if table is deleted during paging Currently, ListStreams paging works by looking in the list of tables for ExclusiveStartStreamArn and starting there. But it's possible that during the paging process, one of the tables got deleted and ExclusiveStartStreamArn no longer points to an existing table. In the current implementation this caused the paging to stop (think it reached the end). The solution is simple: ListStreams will now sort the list of tables by name (it anyway needs to be sorted by something to be consistent across pages), and will look with std::upper_bound for the first table *after* the ExclusiveStartStreamArn - we don't need to find that table name itself. The patch also includes a test reproducing this bug. As usual, the test passes on DynamoDB, fails on Alternator before this patch, and passes with the patch. Signed-off-by: Nadav Har'El --- alternator/streams.cc | 33 +++++++++-------- test/alternator/test_streams.py | 63 +++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 17 deletions(-) diff --git a/alternator/streams.cc b/alternator/streams.cc index 47010494b4..eb4d5fa3b2 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -207,30 +207,29 @@ future alternator::executor::list_str cfs = db.get_tables(); } - // # 12601 (maybe?) - sort the set of tables on ID. This should ensure we never - // generate duplicates in a paged listing here. Can obviously miss things if they - // are added between paged calls and end up with a "smaller" UUID/ARN, but that - // is to be expected. + // We need to sort the tables to ensure a stable order for paging. + // We sort by keyspace and table name, which will also allow us to skip to + // the right position by ExclusiveStartStreamArn. + auto cmp = [](std::string_view ks1, std::string_view cf1, std::string_view ks2, std::string_view cf2) { + return ks1 == ks2 ? cf1 < cf2 : ks1 < ks2; + }; if (std::cmp_less(limit, cfs.size()) || streams_start) { - std::sort(cfs.begin(), cfs.end(), [](const data_dictionary::table& t1, const data_dictionary::table& t2) { - return t1.schema()->id().uuid() < t2.schema()->id().uuid(); - }); + std::sort(cfs.begin(), cfs.end(), + [&cmp](const data_dictionary::table& t1, const data_dictionary::table& t2) { + return cmp(t1.schema()->ks_name(), t1.schema()->cf_name(), + t2.schema()->ks_name(), t2.schema()->cf_name()); + }); } auto i = cfs.begin(); auto e = cfs.end(); if (streams_start) { - i = std::find_if(i, e, [&](const data_dictionary::table& t) { - return t.schema()->ks_name() == streams_start->keyspace_name() - && t.schema()->cf_name() == streams_start->table_name() - && cdc::get_base_table(db.real_database(), *t.schema()) - && is_alternator_keyspace(t.schema()->ks_name()) - ; - }); - if (i != e) { - ++i; - } + i = std::upper_bound(i, e, *streams_start, + [&cmp](const stream_arn& arn, const data_dictionary::table& t) { + return cmp(arn.keyspace_name(), arn.table_name(), + t.schema()->ks_name(), t.schema()->cf_name()); + }); } auto ret = rjson::empty_object(); diff --git a/test/alternator/test_streams.py b/test/alternator/test_streams.py index 8db16ed77d..1eaa4b60f5 100644 --- a/test/alternator/test_streams.py +++ b/test/alternator/test_streams.py @@ -246,6 +246,69 @@ def test_list_streams_unfull_last_page_no_cookie(dynamodb, dynamodbstreams): assert len(response['Streams']) >= 1 assert 'LastEvaluatedStreamArn' not in response +# Test what happens if we do a paging ListStreams, a page returns as +# LastEvaluatedStreamArn an ARN of some table's stream, but then we delete +# that table before retrieving the next page. The paging should be able to +# continue without error, and see the rest of the streams that we haven't +# seen yet. +def test_list_streams_paged_resume_on_deleted_table(dynamodb, dynamodbstreams): + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table1: + with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table2: + wait_for_active_stream(dynamodbstreams, table1) + wait_for_active_stream(dynamodbstreams, table2) + # Page through the streams one at a time (Limit=1), waiting until + # we get either table1 or table2. + # Set found_table to the table found, other_table to the one not + # yet found, and last_arn to LastEvaluatedStreamArn where we stopped. + response = dynamodbstreams.list_streams(Limit=1) + while True: + assert 'Streams' in response + assert len(response['Streams']) == 1 + last_arn = response['LastEvaluatedStreamArn'] + if response['Streams'][0]['TableName'] == table1.name: + found_table, other_table = table1, table2 + break + if response['Streams'][0]['TableName'] == table2.name: + found_table, other_table = table2, table1 + break + response = dynamodbstreams.list_streams( + Limit=1, ExclusiveStartStreamArn=last_arn) + # Delete found_table, the one which last_arn that we are holding + # refers to. + found_table.delete() + found_table.meta.client.get_waiter('table_not_exists').wait(TableName=found_table.name) + # Try to continue the paging, with last_arn that now refers to the + # already deleted table. We expect to be able to continue without + # error, and see other_table which we haven't seen yet (and not + # see found_table again, of course). Other than that, it's not clear + # what we expect to see - it would be nice not to have the list + # restarted from scratch, but this test doesn't rule this + # implementation out (and I'm not sure we should rule it out). + response = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=last_arn) + found_both = False + while True: + assert 'Streams' in response + if response['Streams']: + assert len(response['Streams']) == 1 + assert response['Streams'][0]['TableName'] != found_table.name + if response['Streams'][0]['TableName'] == other_table.name: + found_both = True + if 'LastEvaluatedStreamArn' not in response: + break + response = dynamodbstreams.list_streams( + Limit=1, ExclusiveStartStreamArn=response['LastEvaluatedStreamArn']) + assert found_both + # When we'll go out of scope on create_stream_test_table that + # create found_table - which we deleted - it will expect this + # table to exist and fail when it doesn't. So let's recreate the table. + # It doesn't matter which schema we use, it just needs to exist. + found_table = dynamodb.create_table(TableName=found_table.name, + KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' } ], + AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ], + BillingMode='PAY_PER_REQUEST') + found_table.meta.client.get_waiter('table_exists').wait(TableName=found_table.name) + + # ListStreams with paging should be able correctly return a full list of # pre-existing streams even if additional tables were added between pages # and caused Scylla's hash table of tables to be reorganized.