diff --git a/alternator/streams.cc b/alternator/streams.cc index 0d6103398a..db5450b9d9 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -364,41 +364,48 @@ using namespace std::string_literals; * Downside is that we get a _lot_ of shards. And with actual * generations, we'll get hubba-hubba loads of them. * - * Improvement ideas: - * 1.) Prune expired streams that cannot have records (check ttls) - * 2.) Group cdc streams into N per shards (by token range) + * We would like to group cdc streams into N per shards + * (by token range/ ID set). * - * The latter however makes it impossible to do a simple + * This however makes it impossible to do a simple * range query with limit (we need to do staggered querying, because * of how GetRecords work). - * The results of a select from cdc log on more than one cdc stream, - * where timestamp < T will typically look like: * - * ... - * .... - * ... - * ... - * ... - * ... + * We can do "paged" queries (i.e. with the get_records result limit and + * continuing on "next" iterator) across cdc log PK:s (id:s), if we + * somehow track paging state. * - * Where tsN < timestamp and tsNp < timestamp, but range [ts0-tsN] and [ts0p-tsNp] - * overlap. + * This is however problematic because while this can be encoded + * in shard iterators (long), they _cannot_ be handled in sequence + * numbers (short), and we need to be able to start a new iterator + * using just a sequence number as watershed. * - * Now, if we have limit=X, we might end on a row inside shard 0. - * For our next query, we would in the simple case just say "timestamp > ts3 and timestamp < T", - * but of course this would then miss rows in shard 1 and above. + * It also breaks per-shard sort order, where it is assumed that + * records in a shard are presented "in order". * - * However if we instead set the conditions to - * - * "(token(stream_id) = token(shard 0) and timestamp > ts3 and timestamp < T) - * or (token(stream_id) > token(shard 0) and timestamp < T)" - * - * we can get the correct data. Question is however how well this will perform, if at - * all. + * To deal with both problems we would need to do merging of + * cdc streams. But this becomes difficult with actual cql paging etc, + * we would potentially have way to many/way to few cql rows for + * whatever get_records query limit we have. And waste a lot of + * memory and cycles sorting "junk". * * For now, go simple, but maybe consider if this latter approach would work * and perform. * + * Parents: + * DynamoDB has the concept of optional "parent shard", where parent + * is the data set where data for some key range was recorded before + * sharding was changed. + * + * While probably not critical, it is meant as an indicator as to + * which shards should be read before other shards. + * + * In scylla, this is sort of akin to an ID having corresponding ID/ID:s + * that cover the token range it represents. Because ID:s are per + * vnode shard however, this relation can be somewhat ambigous. + * We still provide some semblance of this by finding the ID in + * older generation that has token start < current ID token start. + * This will be a partial overlap, but it is the best we can do. */ static std::chrono::seconds confidence_interval(const database& db) { @@ -497,28 +504,73 @@ future executor::describe_stream(client_state& cl std::optional last; + // i is now at the youngest generation we include. make a mark of it. + auto first = i; + + // if we're a paged query, skip to the generation where we left of. + if (shard_start) { + i = topologies.find(shard_start->time); + } + + // for parent-child stuff we need id:s to be sorted by token + // (see explanation above) since we want to find closest + // token boundary when determining parent. + // #7346 - we processed and searched children/parents in + // stored order, which is not neccesarily token order, + // so the finding of "closest" token boundary (using upper bound) + // could give somewhat weird results. + static auto cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) { + return id1.token() < id2.token(); + }; + + // need a prev even if we are skipping stuff + if (i != first) { + prev = std::prev(i); + std::stable_sort(prev->second.streams.begin(), prev->second.streams.end(), cmp); + } + for (; limit > 0 && i != e; prev = i, ++i) { auto& [ts, sv] = *i; last = std::nullopt; - for (auto& id : sv.streams()) { - if (shard_start && shard_start->id != id) { - continue; - } - if (shard_start && shard_start->id == id) { - shard_start = std::nullopt; - continue; + auto lo = sv.streams.begin(); + auto end = sv.streams.end(); + + // first sort by index. (see above) + std::stable_sort(lo, end, cmp); + + if (shard_start) { + // find next shard position + lo = std::upper_bound(lo, end, shard_start->id.token(), [](const dht::token& t, const cdc::stream_id& id) { + return t < id.token(); + }); + shard_start = std::nullopt; + } + + auto expired = [&]() -> std::optional { + auto j = std::next(i); + if (j == e) { + return std::nullopt; } + // add this so we sort of match potential + // sequence numbers in get_records result. + return j->first + confidence_interval(db); + }(); + + while (lo != end) { + auto& id = *lo++; auto shard = rjson::empty_object(); if (prev != e) { - auto token = id.token(); - auto& pids = prev->second.streams(); - auto pid = std::upper_bound(pids.begin(), pids.end(), token, [](const dht::token& t, const cdc::stream_id& id) { + auto& pids = prev->second.streams; + auto pid = std::upper_bound(pids.begin(), pids.end(), id.token(), [](const dht::token& t, const cdc::stream_id& id) { return t < id.token(); }); + if (pid != pids.begin()) { + pid = std::prev(pid); + } if (pid != pids.end()) { rjson::set(shard, "ParentShardId", shard_id(prev->first, *pid)); } @@ -526,11 +578,10 @@ future executor::describe_stream(client_state& cl last.emplace(ts, id); rjson::set(shard, "ShardId", *last); - auto range = rjson::empty_object(); rjson::set(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch().count()))); - if (sv.expired() && *sv.expired() < threshold) { - rjson::set(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::max_time_UUID((*sv.expired() + confidence_interval(db)).time_since_epoch().count()))); + if (expired) { + rjson::set(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(expired->time_since_epoch().count()))); } rjson::set(shard, "SequenceNumberRange", std::move(range)); diff --git a/cdc/generation.hh b/cdc/generation.hh index 0c573bfe0a..652d5be5aa 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -124,24 +124,16 @@ public: * I.e. the stream ids at a given time. */ class streams_version { - std::vector _streams; - db_clock::time_point _timestamp; - std::optional _expired; public: - streams_version(std::vector streams, db_clock::time_point ts, std::optional expired) - : _streams(std::move(streams)) - , _timestamp(ts) - , _expired(std::move(expired)) + std::vector streams; + db_clock::time_point timestamp; + std::optional expired; + + streams_version(std::vector s, db_clock::time_point ts, std::optional exp) + : streams(std::move(s)) + , timestamp(ts) + , expired(std::move(exp)) {} - const db_clock::time_point& timestamp() const { - return _timestamp; - } - const std::optional& expired() const { - return _expired; - } - const std::vector& streams() const { - return _streams; - } }; /* Should be called when we're restarting and we noticed that we didn't save any streams timestamp in our local tables,