db: introduce upload directory for sstable migration

This change is intended to make migration process safer and easier.
All column families will now have a directory called upload.
With this feature, users may choose to copy migrated sstables to upload
directory of respective column families, and call 'nodetool refresh'.
That's supposed to be the preferred option from now on.

For each sstable in upload directory, refresh will do the following:
1) Mutate sstable level to 0.
2) Create hard links to its components in column family dir, using
a new generation. We make it safe by creating a hard link to temporary
TOC first.
3) Remove all of its components in upload directory.

This new code runs after refresh checked for new sstables in the column
family directory. Otherwise, we could have a generation conflict.
Unlike the first step, this new step runs with sstable write enabled.
It's easier here because we know exactly which sstables are new.

After that, refresh will load new sstables found in column family
and upload directories.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2016-05-18 21:50:24 -03:00
parent 70b793e4d3
commit e5f0314afd
4 changed files with 78 additions and 1 deletions

View File

@@ -783,6 +783,51 @@ column_family::stop() {
});
}
future<std::vector<sstables::entry_descriptor>> column_family::flush_upload_dir() {
struct work {
sstable_list sstables;
std::unordered_map<int64_t, sstables::entry_descriptor> descriptors;
std::vector<sstables::entry_descriptor> flushed;
};
return do_with(work(), [this] (work& work) {
return lister::scan_dir(_config.datadir + "/upload/", { directory_entry_type::regular },
[this, &work] (directory_entry de) {
auto comps = sstables::entry_descriptor::make_descriptor(de.name);
if (comps.component != sstables::sstable::component_type::TOC) {
return make_ready_future<>();
}
auto sst = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(),
_config.datadir + "/upload", comps.generation,
comps.version, comps.format);
work.sstables.emplace(comps.generation, std::move(sst));
work.descriptors.emplace(comps.generation, std::move(comps));
return make_ready_future<>();
}, &manifest_json_filter).then([this, &work] {
work.flushed.reserve(work.descriptors.size());
return do_for_each(work.sstables, [this, &work] (auto& pair) {
auto gen = this->calculate_generation_for_new_table();
auto& sst = pair.second;
auto&& comps = std::move(work.descriptors.at(pair.first));
comps.generation = gen;
work.flushed.push_back(std::move(comps));
// Read toc content as it will be needed for moving and deleting a sstable.
return sst->read_toc().then([&sst] {
return sst->mutate_sstable_level(0);
}).then([this, &sst, gen] {
return sst->create_links(_config.datadir, gen);
}).then([&sst] {
return sstables::remove_by_toc_name(sst->toc_filename());
});
});
}).then([&work] {
return make_ready_future<std::vector<sstables::entry_descriptor>>(std::move(work.flushed));
});
});
}
future<std::vector<sstables::entry_descriptor>>
column_family::reshuffle_sstables(std::set<int64_t> all_generations, int64_t start) {
@@ -1667,7 +1712,11 @@ keyspace::column_family_directory(const sstring& name, utils::UUID uuid) const {
future<>
keyspace::make_directory_for_column_family(const sstring& name, utils::UUID uuid) {
return io_check(touch_directory, column_family_directory(name, uuid));
auto cfdir = column_family_directory(name, uuid);
return seastar::async([cfdir = std::move(cfdir)] {
io_check(touch_directory, cfdir).get();
io_check(touch_directory, cfdir + "/upload").get();
});
}
no_such_keyspace::no_such_keyspace(const sstring& ks_name)

View File

@@ -490,6 +490,17 @@ public:
return std::chrono::steady_clock::now() - _sstable_writes_disabled_at;
}
// This function will iterate through upload directory in column family,
// and will do the following for each sstable found:
// 1) Mutate sstable level to 0.
// 2) Create hard links to its components in column family dir.
// 3) Remove all of its components in upload directory.
// At the end, it's expected that upload dir is empty and all of its
// previous content was moved to column family dir.
//
// Return a vector containing descriptor of sstables to be loaded.
future<std::vector<sstables::entry_descriptor>> flush_upload_dir();
// Make sure the generation numbers are sequential, starting from "start".
// Generations before "start" are left untouched.
//

View File

@@ -2603,6 +2603,20 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) {
}
return make_ready_future<std::vector<sstables::entry_descriptor>>(std::move(new_tables));
});
}).then([this, ks_name, cf_name] (std::vector<sstables::entry_descriptor> new_tables) {
auto shard = std::hash<sstring>()(cf_name) % smp::count;
return _db.invoke_on(shard, [ks_name, cf_name] (database& db) {
auto& cf = db.find_column_family(ks_name, cf_name);
return cf.flush_upload_dir();
}).then([new_tables = std::move(new_tables), ks_name, cf_name] (std::vector<sstables::entry_descriptor> new_tables_from_upload) mutable {
if (new_tables_from_upload.empty()) {
logger.info("No new SSTables were found for {}.{} in upload directory", ks_name, cf_name);
} else {
// merge new sstables found in both column family and upload directories.
new_tables.insert(new_tables.end(), new_tables_from_upload.begin(), new_tables_from_upload.end());
}
return make_ready_future<std::vector<sstables::entry_descriptor>>(std::move(new_tables));
});
}).then([this, ks_name, cf_name] (std::vector<sstables::entry_descriptor> new_tables) {
return _db.invoke_on_all([ks_name = std::move(ks_name), cf_name = std::move(cf_name), new_tables = std::move(new_tables)] (database& db) {
auto& cf = db.find_column_family(ks_name, cf_name);

View File

@@ -640,4 +640,7 @@ public:
// futures complete (with an atomic_deletion_cancelled exception).
void cancel_atomic_deletions();
// Read toc content and delete all components found in it.
future<> remove_by_toc_name(sstring sstable_toc_name);
}