schema: get cdc options from schema extensions
Removes logic responsible for setting cdc_options from dedicated column in scylla_tables, and uses the "cdc" schema extension instead.
This commit is contained in:
@@ -284,10 +284,6 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_
|
||||
builder.set_compressor_params(compression_parameters(*compression_options));
|
||||
}
|
||||
|
||||
auto cdc_options = get_cdc_options(schema_extensions);
|
||||
if (cdc_options) {
|
||||
builder.set_cdc_options(cdc::options(*cdc_options));
|
||||
}
|
||||
#if 0
|
||||
CachingOptions cachingOptions = getCachingOptions();
|
||||
if (cachingOptions != null)
|
||||
|
||||
@@ -60,7 +60,6 @@
|
||||
#include "mutation_query.hh"
|
||||
#include "system_keyspace.hh"
|
||||
#include "cql3/cql3_type.hh"
|
||||
#include "cdc/log.hh"
|
||||
#include "cql3/functions/functions.hh"
|
||||
#include "cql3/util.hh"
|
||||
#include "types/list.hh"
|
||||
@@ -2330,10 +2329,6 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
|
||||
} else {
|
||||
builder.with_version(sm.digest());
|
||||
}
|
||||
if (auto map = sm.cdc_options()) {
|
||||
cdc::options cdc_options(*map);
|
||||
builder.set_cdc_options(std::move(cdc_options));
|
||||
}
|
||||
|
||||
if (is_system_keyspace(ks_name) && is_extra_durable(cf_name)) {
|
||||
builder.set_wait_for_sync_to_commitlog(true);
|
||||
|
||||
15
schema.cc
15
schema.cc
@@ -37,6 +37,7 @@
|
||||
#include "database.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "cdc/cdc_extension.hh"
|
||||
|
||||
constexpr int32_t schema::NAME_LENGTH;
|
||||
|
||||
@@ -448,7 +449,7 @@ bool operator==(const schema& x, const schema& y)
|
||||
&& x._raw._compaction_strategy == y._raw._compaction_strategy
|
||||
&& x._raw._compaction_strategy_options == y._raw._compaction_strategy_options
|
||||
&& x._raw._compaction_enabled == y._raw._compaction_enabled
|
||||
&& x._raw._cdc_options == y._raw._cdc_options
|
||||
&& x.cdc_options() == y.cdc_options()
|
||||
&& x._raw._caching_options == y._raw._caching_options
|
||||
&& x._raw._dropped_columns == y._raw._dropped_columns
|
||||
&& x._raw._collections == y._raw._collections
|
||||
@@ -603,7 +604,7 @@ std::ostream& operator<<(std::ostream& os, const schema& s) {
|
||||
os << ",bloomFilterFpChance=" << s._raw._bloom_filter_fp_chance;
|
||||
os << ",memtableFlushPeriod=" << s._raw._memtable_flush_period;
|
||||
os << ",caching=" << s._raw._caching_options.to_sstring();
|
||||
os << ",cdc=" << s._raw._cdc_options.to_sstring();
|
||||
os << ",cdc=" << s.cdc_options().to_sstring();
|
||||
os << ",defaultTimeToLive=" << s._raw._default_time_to_live.count();
|
||||
os << ",minIndexInterval=" << s._raw._min_index_interval;
|
||||
os << ",maxIndexInterval=" << s._raw._max_index_interval;
|
||||
@@ -1113,6 +1114,16 @@ schema_ptr schema_builder::build() {
|
||||
return make_lw_shared<schema>(schema(new_raw, _view_info));
|
||||
}
|
||||
|
||||
const cdc::options& schema::cdc_options() const {
|
||||
static const cdc::options default_cdc_options;
|
||||
const auto& schema_extensions = _raw._extensions;
|
||||
|
||||
if (auto it = schema_extensions.find(cdc::cdc_extension::NAME); it != schema_extensions.end()) {
|
||||
return dynamic_pointer_cast<cdc::cdc_extension>(it->second)->get_options();
|
||||
}
|
||||
return default_cdc_options;
|
||||
}
|
||||
|
||||
schema_ptr schema_builder::build(compact_storage cp) {
|
||||
return with(cp).build();
|
||||
}
|
||||
|
||||
@@ -627,7 +627,6 @@ private:
|
||||
std::map<sstring, sstring> _compaction_strategy_options;
|
||||
bool _compaction_enabled = true;
|
||||
caching_options _caching_options;
|
||||
cdc::options _cdc_options;
|
||||
table_schema_version _version;
|
||||
std::unordered_map<sstring, dropped_column> _dropped_columns;
|
||||
std::map<bytes, data_type> _collections;
|
||||
@@ -803,9 +802,7 @@ public:
|
||||
return _raw._compaction_enabled;
|
||||
}
|
||||
|
||||
const cdc::options& cdc_options() const {
|
||||
return _raw._cdc_options;
|
||||
}
|
||||
const cdc::options& cdc_options() const;
|
||||
|
||||
const ::speculative_retry& speculative_retry() const {
|
||||
return _raw._speculative_retry;
|
||||
|
||||
@@ -142,15 +142,6 @@ public:
|
||||
return _raw._compaction_enabled;
|
||||
}
|
||||
|
||||
schema_builder& set_cdc_options(cdc::options options) {
|
||||
_raw._cdc_options = std::move(options);
|
||||
return *this;
|
||||
}
|
||||
|
||||
const cdc::options& cdc_options() const {
|
||||
return _raw._cdc_options;
|
||||
}
|
||||
|
||||
schema_builder& set_min_index_interval(int32_t t) {
|
||||
_raw._min_index_interval = t;
|
||||
return *this;
|
||||
|
||||
@@ -105,19 +105,6 @@ table_schema_version schema_mutations::digest() const {
|
||||
return utils::UUID_gen::get_name_UUID(h.finalize());
|
||||
}
|
||||
|
||||
std::optional<std::map<sstring, sstring>> schema_mutations::cdc_options() const {
|
||||
if (_scylla_tables) {
|
||||
auto rs = query::result_set(*_scylla_tables);
|
||||
if (!rs.empty()) {
|
||||
auto map = db::schema_tables::get_map<sstring, sstring>(rs.row(0), "cdc");
|
||||
if (map && !map->empty()) {
|
||||
return map;
|
||||
}
|
||||
}
|
||||
}
|
||||
return { };
|
||||
}
|
||||
|
||||
static mutation_opt compact(const mutation_opt& m) {
|
||||
if (!m) {
|
||||
return m;
|
||||
|
||||
@@ -138,7 +138,6 @@ public:
|
||||
bool is_view() const;
|
||||
|
||||
table_schema_version digest() const;
|
||||
std::optional<std::map<sstring, sstring>> cdc_options() const;
|
||||
|
||||
bool operator==(const schema_mutations&) const;
|
||||
bool operator!=(const schema_mutations&) const;
|
||||
|
||||
Reference in New Issue
Block a user