alternator::streams: Use end-of-record info in get_records
Fixes #7496 Since cdc log now has an end-of-batch/record marker that tells us explicitly that we've read the last row of a change, we can use this instead of timestamp checks + limit extra to ensure we have complete records. Note that this does not try to fulfill user query limit exact. To do this we would need to add a loop and potentially re-query if quried rows are not enough. But that is a separate exercise, and superbly suited for coroutines!
This commit is contained in:
@@ -851,6 +851,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
|
||||
static const bytes timestamp_column_name = cdc::log_meta_column_name_bytes("time");
|
||||
static const bytes op_column_name = cdc::log_meta_column_name_bytes("operation");
|
||||
static const bytes eor_column_name = cdc::log_meta_column_name_bytes("end_of_batch");
|
||||
|
||||
auto key_names = boost::copy_range<std::unordered_set<std::string>>(
|
||||
boost::range::join(std::move(base->partition_key_columns()), std::move(base->clustering_key_columns()))
|
||||
@@ -874,7 +875,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
std::transform(cks.begin(), cks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
|
||||
|
||||
auto regular_columns = boost::copy_range<query::column_id_vector>(schema->regular_columns()
|
||||
| boost::adaptors::filtered([](const column_definition& cdef) { return cdef.name() == op_column_name || !cdc::is_cdc_metacolumn_name(cdef.name_as_text()); })
|
||||
| boost::adaptors::filtered([](const column_definition& cdef) { return cdef.name() == op_column_name || cdef.name() == eor_column_name || !cdc::is_cdc_metacolumn_name(cdef.name_as_text()); })
|
||||
| boost::adaptors::transformed([&] (const column_definition& cdef) { columns.emplace_back(&cdef); return cdef.id; })
|
||||
);
|
||||
|
||||
@@ -907,6 +908,11 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
return cdef->name->name() == timestamp_column_name;
|
||||
})
|
||||
);
|
||||
auto eor_index = std::distance(metadata.get_names().begin(),
|
||||
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
||||
return cdef->name->name() == eor_column_name;
|
||||
})
|
||||
);
|
||||
|
||||
std::optional<utils::UUID> timestamp;
|
||||
auto dynamodb = rjson::empty_object();
|
||||
@@ -932,15 +938,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
for (auto& row : result_set->rows()) {
|
||||
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
|
||||
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
|
||||
|
||||
if (timestamp && timestamp != ts) {
|
||||
maybe_add_record();
|
||||
if (limit == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
timestamp = ts;
|
||||
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
|
||||
|
||||
if (!dynamodb.HasMember("Keys")) {
|
||||
auto keys = rjson::empty_object();
|
||||
@@ -993,9 +991,13 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
rjson::set(record, "eventName", "REMOVE");
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (limit > 0 && timestamp) {
|
||||
maybe_add_record();
|
||||
if (eor) {
|
||||
maybe_add_record();
|
||||
timestamp = ts;
|
||||
if (limit == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto ret = rjson::empty_object();
|
||||
|
||||
Reference in New Issue
Block a user