From e5f0314afd261bb2880f8f62b1f898de959d23a1 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 18 May 2016 21:50:24 -0300 Subject: [PATCH] db: introduce upload directory for sstable migration This change is intended to make migration process safer and easier. All column families will now have a directory called upload. With this feature, users may choose to copy migrated sstables to upload directory of respective column families, and call 'nodetool refresh'. That's supposed to be the preferred option from now on. For each sstable in upload directory, refresh will do the following: 1) Mutate sstable level to 0. 2) Create hard links to its components in column family dir, using a new generation. We make it safe by creating a hard link to temporary TOC first. 3) Remove all of its components in upload directory. This new code runs after refresh checked for new sstables in the column family directory. Otherwise, we could have a generation conflict. Unlike the first step, this new step runs with sstable write enabled. It's easier here because we know exactly which sstables are new. After that, refresh will load new sstables found in column family and upload directories. Signed-off-by: Raphael S. Carvalho --- database.cc | 51 +++++++++++++++++++++++++++++++++++++- database.hh | 11 ++++++++ service/storage_service.cc | 14 +++++++++++ sstables/sstables.hh | 3 +++ 4 files changed, 78 insertions(+), 1 deletion(-) diff --git a/database.cc b/database.cc index 15abc6755f..ad16e6ebb0 100644 --- a/database.cc +++ b/database.cc @@ -783,6 +783,51 @@ column_family::stop() { }); } +future> column_family::flush_upload_dir() { + struct work { + sstable_list sstables; + std::unordered_map descriptors; + std::vector flushed; + }; + + return do_with(work(), [this] (work& work) { + return lister::scan_dir(_config.datadir + "/upload/", { directory_entry_type::regular }, + [this, &work] (directory_entry de) { + auto comps = sstables::entry_descriptor::make_descriptor(de.name); + if (comps.component != sstables::sstable::component_type::TOC) { + return make_ready_future<>(); + } + auto sst = make_lw_shared(_schema->ks_name(), _schema->cf_name(), + _config.datadir + "/upload", comps.generation, + comps.version, comps.format); + work.sstables.emplace(comps.generation, std::move(sst)); + work.descriptors.emplace(comps.generation, std::move(comps)); + return make_ready_future<>(); + }, &manifest_json_filter).then([this, &work] { + work.flushed.reserve(work.descriptors.size()); + + return do_for_each(work.sstables, [this, &work] (auto& pair) { + auto gen = this->calculate_generation_for_new_table(); + auto& sst = pair.second; + + auto&& comps = std::move(work.descriptors.at(pair.first)); + comps.generation = gen; + work.flushed.push_back(std::move(comps)); + + // Read toc content as it will be needed for moving and deleting a sstable. + return sst->read_toc().then([&sst] { + return sst->mutate_sstable_level(0); + }).then([this, &sst, gen] { + return sst->create_links(_config.datadir, gen); + }).then([&sst] { + return sstables::remove_by_toc_name(sst->toc_filename()); + }); + }); + }).then([&work] { + return make_ready_future>(std::move(work.flushed)); + }); + }); +} future> column_family::reshuffle_sstables(std::set all_generations, int64_t start) { @@ -1667,7 +1712,11 @@ keyspace::column_family_directory(const sstring& name, utils::UUID uuid) const { future<> keyspace::make_directory_for_column_family(const sstring& name, utils::UUID uuid) { - return io_check(touch_directory, column_family_directory(name, uuid)); + auto cfdir = column_family_directory(name, uuid); + return seastar::async([cfdir = std::move(cfdir)] { + io_check(touch_directory, cfdir).get(); + io_check(touch_directory, cfdir + "/upload").get(); + }); } no_such_keyspace::no_such_keyspace(const sstring& ks_name) diff --git a/database.hh b/database.hh index d7ea03b6a8..60fdbccf02 100644 --- a/database.hh +++ b/database.hh @@ -490,6 +490,17 @@ public: return std::chrono::steady_clock::now() - _sstable_writes_disabled_at; } + // This function will iterate through upload directory in column family, + // and will do the following for each sstable found: + // 1) Mutate sstable level to 0. + // 2) Create hard links to its components in column family dir. + // 3) Remove all of its components in upload directory. + // At the end, it's expected that upload dir is empty and all of its + // previous content was moved to column family dir. + // + // Return a vector containing descriptor of sstables to be loaded. + future> flush_upload_dir(); + // Make sure the generation numbers are sequential, starting from "start". // Generations before "start" are left untouched. // diff --git a/service/storage_service.cc b/service/storage_service.cc index 909cd5c802..da87656f2c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2603,6 +2603,20 @@ future<> storage_service::load_new_sstables(sstring ks_name, sstring cf_name) { } return make_ready_future>(std::move(new_tables)); }); + }).then([this, ks_name, cf_name] (std::vector new_tables) { + auto shard = std::hash()(cf_name) % smp::count; + return _db.invoke_on(shard, [ks_name, cf_name] (database& db) { + auto& cf = db.find_column_family(ks_name, cf_name); + return cf.flush_upload_dir(); + }).then([new_tables = std::move(new_tables), ks_name, cf_name] (std::vector new_tables_from_upload) mutable { + if (new_tables_from_upload.empty()) { + logger.info("No new SSTables were found for {}.{} in upload directory", ks_name, cf_name); + } else { + // merge new sstables found in both column family and upload directories. + new_tables.insert(new_tables.end(), new_tables_from_upload.begin(), new_tables_from_upload.end()); + } + return make_ready_future>(std::move(new_tables)); + }); }).then([this, ks_name, cf_name] (std::vector new_tables) { return _db.invoke_on_all([ks_name = std::move(ks_name), cf_name = std::move(cf_name), new_tables = std::move(new_tables)] (database& db) { auto& cf = db.find_column_family(ks_name, cf_name); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 6c990bb6ea..753acb79d2 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -640,4 +640,7 @@ public: // futures complete (with an atomic_deletion_cancelled exception). void cancel_atomic_deletions(); +// Read toc content and delete all components found in it. +future<> remove_by_toc_name(sstring sstable_toc_name); + }