cdc: Ensure columns removed from log table are registered as dropped
If we are redefining the log table, we need to ensure any dropped
columns are registered in "dropped_columns" table, otherwise clients will not
be able to read data older than now.
Includes unit test.
Should probably be backported to all CDC enabled versions.
Fixes #10473
Closes #10474
(cherry picked from commit 78350a7e1b)
This commit is contained in:
committed by
Kamil Braun
parent
e480c5bf4d
commit
b0233cb7c5
20
cdc/log.cc
20
cdc/log.cc
@@ -59,7 +59,7 @@ using namespace std::chrono_literals;
|
||||
logging::logger cdc_log("cdc");
|
||||
|
||||
namespace cdc {
|
||||
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {});
|
||||
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {}, schema_ptr = nullptr);
|
||||
}
|
||||
|
||||
static constexpr auto cdc_group_name = "cdc";
|
||||
@@ -206,7 +206,7 @@ public:
|
||||
return;
|
||||
}
|
||||
|
||||
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt);
|
||||
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt, log_schema);
|
||||
|
||||
auto log_mut = log_schema
|
||||
? db::schema_tables::make_update_table_mutations(db, keyspace.metadata(), log_schema, new_log_schema, timestamp, false)
|
||||
@@ -484,7 +484,7 @@ bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
|
||||
return to_bytes(cdc_deleted_elements_column_prefix) + column_name;
|
||||
}
|
||||
|
||||
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid) {
|
||||
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid, schema_ptr old) {
|
||||
schema_builder b(s.ks_name(), log_name(s.cf_name()));
|
||||
b.with_partitioner("com.scylladb.dht.CDCPartitioner");
|
||||
b.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
@@ -571,6 +571,20 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
|
||||
b.set_uuid(*uuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* #10473 - if we are redefining the log table, we need to ensure any dropped
|
||||
* columns are registered in "dropped_columns" table, otherwise clients will not
|
||||
* be able to read data older than now.
|
||||
*/
|
||||
if (old) {
|
||||
// not super efficient, but we don't do this often.
|
||||
for (auto& col : old->all_columns()) {
|
||||
if (!b.has_column({col.name(), col.name_as_text() })) {
|
||||
b.without_column(col.name_as_text(), col.type, api::new_timestamp());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return b.build();
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ from cassandra.cluster import ConsistencyLevel
|
||||
from cassandra.query import SimpleStatement
|
||||
|
||||
from util import new_test_table
|
||||
from nodetool import flush
|
||||
|
||||
def test_cdc_log_entries_use_cdc_streams(scylla_only, cql, test_keyspace):
|
||||
'''Test that the stream IDs chosen for CDC log entries come from the CDC generation
|
||||
@@ -31,3 +32,16 @@ def test_cdc_log_entries_use_cdc_streams(scylla_only, cql, test_keyspace):
|
||||
|
||||
assert(log_stream_ids.issubset(stream_ids))
|
||||
|
||||
|
||||
# Test for #10473 - reading logs (from sstable) after dropping
|
||||
# column in base.
|
||||
def test_cdc_alter_table_drop_column(scylla_only, cql, test_keyspace):
|
||||
schema = "pk int primary key, v int"
|
||||
extra = " with cdc = {'enabled': true}"
|
||||
with new_test_table(cql, test_keyspace, schema, extra) as table:
|
||||
cql.execute(f"insert into {table} (pk, v) values (0, 0)")
|
||||
cql.execute(f"insert into {table} (pk, v) values (1, null)")
|
||||
flush(cql, table)
|
||||
flush(cql, table + "_scylla_cdc_log")
|
||||
cql.execute(f"alter table {table} drop v")
|
||||
cql.execute(f"select * from {table}_scylla_cdc_log")
|
||||
|
||||
Reference in New Issue
Block a user