alternator::streams: Improve paging and fix parent-child calculation
Fixes #7345 Fixes #7346 Do a more efficient collection skip when doing paging, instead of iterating the full sets. Ensure some semblance of sanity in the parent-child relationship between shards by ensuring token order sorting and finding the apparent previous ID coverting the approximate range of new gen. Fix endsequencenumber generation by looking at whether we are last gen or not, instead of the (not filled in) 'expired' column.
This commit is contained in:
@@ -364,41 +364,48 @@ using namespace std::string_literals;
|
||||
* Downside is that we get a _lot_ of shards. And with actual
|
||||
* generations, we'll get hubba-hubba loads of them.
|
||||
*
|
||||
* Improvement ideas:
|
||||
* 1.) Prune expired streams that cannot have records (check ttls)
|
||||
* 2.) Group cdc streams into N per shards (by token range)
|
||||
* We would like to group cdc streams into N per shards
|
||||
* (by token range/ ID set).
|
||||
*
|
||||
* The latter however makes it impossible to do a simple
|
||||
* This however makes it impossible to do a simple
|
||||
* range query with limit (we need to do staggered querying, because
|
||||
* of how GetRecords work).
|
||||
* The results of a select from cdc log on more than one cdc stream,
|
||||
* where timestamp < T will typically look like:
|
||||
*
|
||||
* <shard 0> <ts0> ...
|
||||
* ....
|
||||
* <shard 0> <tsN> ...
|
||||
* <shard 1> <ts0p> ...
|
||||
* ...
|
||||
* <shard 1> <tsNp> ...
|
||||
* We can do "paged" queries (i.e. with the get_records result limit and
|
||||
* continuing on "next" iterator) across cdc log PK:s (id:s), if we
|
||||
* somehow track paging state.
|
||||
*
|
||||
* Where tsN < timestamp and tsNp < timestamp, but range [ts0-tsN] and [ts0p-tsNp]
|
||||
* overlap.
|
||||
* This is however problematic because while this can be encoded
|
||||
* in shard iterators (long), they _cannot_ be handled in sequence
|
||||
* numbers (short), and we need to be able to start a new iterator
|
||||
* using just a sequence number as watershed.
|
||||
*
|
||||
* Now, if we have limit=X, we might end on a row inside shard 0.
|
||||
* For our next query, we would in the simple case just say "timestamp > ts3 and timestamp < T",
|
||||
* but of course this would then miss rows in shard 1 and above.
|
||||
* It also breaks per-shard sort order, where it is assumed that
|
||||
* records in a shard are presented "in order".
|
||||
*
|
||||
* However if we instead set the conditions to
|
||||
*
|
||||
* "(token(stream_id) = token(shard 0) and timestamp > ts3 and timestamp < T)
|
||||
* or (token(stream_id) > token(shard 0) and timestamp < T)"
|
||||
*
|
||||
* we can get the correct data. Question is however how well this will perform, if at
|
||||
* all.
|
||||
* To deal with both problems we would need to do merging of
|
||||
* cdc streams. But this becomes difficult with actual cql paging etc,
|
||||
* we would potentially have way to many/way to few cql rows for
|
||||
* whatever get_records query limit we have. And waste a lot of
|
||||
* memory and cycles sorting "junk".
|
||||
*
|
||||
* For now, go simple, but maybe consider if this latter approach would work
|
||||
* and perform.
|
||||
*
|
||||
* Parents:
|
||||
* DynamoDB has the concept of optional "parent shard", where parent
|
||||
* is the data set where data for some key range was recorded before
|
||||
* sharding was changed.
|
||||
*
|
||||
* While probably not critical, it is meant as an indicator as to
|
||||
* which shards should be read before other shards.
|
||||
*
|
||||
* In scylla, this is sort of akin to an ID having corresponding ID/ID:s
|
||||
* that cover the token range it represents. Because ID:s are per
|
||||
* vnode shard however, this relation can be somewhat ambigous.
|
||||
* We still provide some semblance of this by finding the ID in
|
||||
* older generation that has token start < current ID token start.
|
||||
* This will be a partial overlap, but it is the best we can do.
|
||||
*/
|
||||
|
||||
static std::chrono::seconds confidence_interval(const database& db) {
|
||||
@@ -497,28 +504,73 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
|
||||
std::optional<shard_id> last;
|
||||
|
||||
// i is now at the youngest generation we include. make a mark of it.
|
||||
auto first = i;
|
||||
|
||||
// if we're a paged query, skip to the generation where we left of.
|
||||
if (shard_start) {
|
||||
i = topologies.find(shard_start->time);
|
||||
}
|
||||
|
||||
// for parent-child stuff we need id:s to be sorted by token
|
||||
// (see explanation above) since we want to find closest
|
||||
// token boundary when determining parent.
|
||||
// #7346 - we processed and searched children/parents in
|
||||
// stored order, which is not neccesarily token order,
|
||||
// so the finding of "closest" token boundary (using upper bound)
|
||||
// could give somewhat weird results.
|
||||
static auto cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
|
||||
return id1.token() < id2.token();
|
||||
};
|
||||
|
||||
// need a prev even if we are skipping stuff
|
||||
if (i != first) {
|
||||
prev = std::prev(i);
|
||||
std::stable_sort(prev->second.streams.begin(), prev->second.streams.end(), cmp);
|
||||
}
|
||||
|
||||
for (; limit > 0 && i != e; prev = i, ++i) {
|
||||
auto& [ts, sv] = *i;
|
||||
|
||||
last = std::nullopt;
|
||||
|
||||
for (auto& id : sv.streams()) {
|
||||
if (shard_start && shard_start->id != id) {
|
||||
continue;
|
||||
}
|
||||
if (shard_start && shard_start->id == id) {
|
||||
shard_start = std::nullopt;
|
||||
continue;
|
||||
auto lo = sv.streams.begin();
|
||||
auto end = sv.streams.end();
|
||||
|
||||
// first sort by index. (see above)
|
||||
std::stable_sort(lo, end, cmp);
|
||||
|
||||
if (shard_start) {
|
||||
// find next shard position
|
||||
lo = std::upper_bound(lo, end, shard_start->id.token(), [](const dht::token& t, const cdc::stream_id& id) {
|
||||
return t < id.token();
|
||||
});
|
||||
shard_start = std::nullopt;
|
||||
}
|
||||
|
||||
auto expired = [&]() -> std::optional<db_clock::time_point> {
|
||||
auto j = std::next(i);
|
||||
if (j == e) {
|
||||
return std::nullopt;
|
||||
}
|
||||
// add this so we sort of match potential
|
||||
// sequence numbers in get_records result.
|
||||
return j->first + confidence_interval(db);
|
||||
}();
|
||||
|
||||
while (lo != end) {
|
||||
auto& id = *lo++;
|
||||
|
||||
auto shard = rjson::empty_object();
|
||||
|
||||
if (prev != e) {
|
||||
auto token = id.token();
|
||||
auto& pids = prev->second.streams();
|
||||
auto pid = std::upper_bound(pids.begin(), pids.end(), token, [](const dht::token& t, const cdc::stream_id& id) {
|
||||
auto& pids = prev->second.streams;
|
||||
auto pid = std::upper_bound(pids.begin(), pids.end(), id.token(), [](const dht::token& t, const cdc::stream_id& id) {
|
||||
return t < id.token();
|
||||
});
|
||||
if (pid != pids.begin()) {
|
||||
pid = std::prev(pid);
|
||||
}
|
||||
if (pid != pids.end()) {
|
||||
rjson::set(shard, "ParentShardId", shard_id(prev->first, *pid));
|
||||
}
|
||||
@@ -526,11 +578,10 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
|
||||
last.emplace(ts, id);
|
||||
rjson::set(shard, "ShardId", *last);
|
||||
|
||||
auto range = rjson::empty_object();
|
||||
rjson::set(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch().count())));
|
||||
if (sv.expired() && *sv.expired() < threshold) {
|
||||
rjson::set(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::max_time_UUID((*sv.expired() + confidence_interval(db)).time_since_epoch().count())));
|
||||
if (expired) {
|
||||
rjson::set(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(expired->time_since_epoch().count())));
|
||||
}
|
||||
|
||||
rjson::set(shard, "SequenceNumberRange", std::move(range));
|
||||
|
||||
@@ -124,24 +124,16 @@ public:
|
||||
* I.e. the stream ids at a given time.
|
||||
*/
|
||||
class streams_version {
|
||||
std::vector<stream_id> _streams;
|
||||
db_clock::time_point _timestamp;
|
||||
std::optional<db_clock::time_point> _expired;
|
||||
public:
|
||||
streams_version(std::vector<stream_id> streams, db_clock::time_point ts, std::optional<db_clock::time_point> expired)
|
||||
: _streams(std::move(streams))
|
||||
, _timestamp(ts)
|
||||
, _expired(std::move(expired))
|
||||
std::vector<stream_id> streams;
|
||||
db_clock::time_point timestamp;
|
||||
std::optional<db_clock::time_point> expired;
|
||||
|
||||
streams_version(std::vector<stream_id> s, db_clock::time_point ts, std::optional<db_clock::time_point> exp)
|
||||
: streams(std::move(s))
|
||||
, timestamp(ts)
|
||||
, expired(std::move(exp))
|
||||
{}
|
||||
const db_clock::time_point& timestamp() const {
|
||||
return _timestamp;
|
||||
}
|
||||
const std::optional<db_clock::time_point>& expired() const {
|
||||
return _expired;
|
||||
}
|
||||
const std::vector<stream_id>& streams() const {
|
||||
return _streams;
|
||||
}
|
||||
};
|
||||
|
||||
/* Should be called when we're restarting and we noticed that we didn't save any streams timestamp in our local tables,
|
||||
|
||||
Reference in New Issue
Block a user