LCOV - code coverage report
Current view: top level - src/leveldb/db - db_impl.cc (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 500 760 65.8 %
Date: 2015-10-12 22:39:14 Functions: 33 59 55.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
       2             : // Use of this source code is governed by a BSD-style license that can be
       3             : // found in the LICENSE file. See the AUTHORS file for names of contributors.
       4             : 
       5             : #include "db/db_impl.h"
       6             : 
       7             : #include <algorithm>
       8             : #include <set>
       9             : #include <string>
      10             : #include <stdint.h>
      11             : #include <stdio.h>
      12             : #include <vector>
      13             : #include "db/builder.h"
      14             : #include "db/db_iter.h"
      15             : #include "db/dbformat.h"
      16             : #include "db/filename.h"
      17             : #include "db/log_reader.h"
      18             : #include "db/log_writer.h"
      19             : #include "db/memtable.h"
      20             : #include "db/table_cache.h"
      21             : #include "db/version_set.h"
      22             : #include "db/write_batch_internal.h"
      23             : #include "leveldb/db.h"
      24             : #include "leveldb/env.h"
      25             : #include "leveldb/status.h"
      26             : #include "leveldb/table.h"
      27             : #include "leveldb/table_builder.h"
      28             : #include "port/port.h"
      29             : #include "table/block.h"
      30             : #include "table/merger.h"
      31             : #include "table/two_level_iterator.h"
      32             : #include "util/coding.h"
      33             : #include "util/logging.h"
      34             : #include "util/mutexlock.h"
      35             : 
      36             : namespace leveldb {
      37             : 
      38             : const int kNumNonTableCacheFiles = 10;
      39             : 
      40             : // Information kept for every waiting writer
      41        1102 : struct DBImpl::Writer {
      42             :   Status status;
      43             :   WriteBatch* batch;
      44             :   bool sync;
      45             :   bool done;
      46             :   port::CondVar cv;
      47             : 
      48        1102 :   explicit Writer(port::Mutex* mu) : cv(mu) { }
      49             : };
      50             : 
      51           7 : struct DBImpl::CompactionState {
      52             :   Compaction* const compaction;
      53             : 
      54             :   // Sequence numbers < smallest_snapshot are not significant since we
      55             :   // will never have to service a snapshot below smallest_snapshot.
      56             :   // Therefore if we have seen a sequence number S <= smallest_snapshot,
      57             :   // we can drop all entries for the same key with sequence numbers < S.
      58             :   SequenceNumber smallest_snapshot;
      59             : 
      60             :   // Files produced by compaction
      61          77 :   struct Output {
      62             :     uint64_t number;
      63             :     uint64_t file_size;
      64             :     InternalKey smallest, largest;
      65             :   };
      66             :   std::vector<Output> outputs;
      67             : 
      68             :   // State kept for output being generated
      69             :   WritableFile* outfile;
      70             :   TableBuilder* builder;
      71             : 
      72             :   uint64_t total_bytes;
      73             : 
      74        3080 :   Output* current_output() { return &outputs[outputs.size()-1]; }
      75             : 
      76           0 :   explicit CompactionState(Compaction* c)
      77             :       : compaction(c),
      78             :         outfile(NULL),
      79             :         builder(NULL),
      80          14 :         total_bytes(0) {
      81           0 :   }
      82             : };
      83             : 
      84             : // Fix user-supplied options to be reasonable
      85             : template <class T,class V>
      86             : static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
      87         732 :   if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
      88         732 :   if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
      89             : }
      90         244 : Options SanitizeOptions(const std::string& dbname,
      91             :                         const InternalKeyComparator* icmp,
      92             :                         const InternalFilterPolicy* ipolicy,
      93             :                         const Options& src) {
      94         244 :   Options result = src;
      95         244 :   result.comparator = icmp;
      96         244 :   result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
      97             :   ClipToRange(&result.max_open_files,    64 + kNumNonTableCacheFiles, 50000);
      98             :   ClipToRange(&result.write_buffer_size, 64<<10,                      1<<30);
      99             :   ClipToRange(&result.block_size,        1<<10,                       4<<20);
     100         244 :   if (result.info_log == NULL) {
     101             :     // Open a log file in the same directory as the db
     102         488 :     src.env->CreateDir(dbname);  // In case it does not exist
     103         976 :     src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
     104         488 :     Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
     105         244 :     if (!s.ok()) {
     106             :       // No place suitable for logging
     107           0 :       result.info_log = NULL;
     108             :     }
     109             :   }
     110         244 :   if (result.block_cache == NULL) {
     111           0 :     result.block_cache = NewLRUCache(8 << 20);
     112             :   }
     113         244 :   return result;
     114             : }
     115             : 
     116         244 : DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
     117             :     : env_(raw_options.env),
     118             :       internal_comparator_(raw_options.comparator),
     119             :       internal_filter_policy_(raw_options.filter_policy),
     120             :       options_(SanitizeOptions(dbname, &internal_comparator_,
     121         244 :                                &internal_filter_policy_, raw_options)),
     122         244 :       owns_info_log_(options_.info_log != raw_options.info_log),
     123         244 :       owns_cache_(options_.block_cache != raw_options.block_cache),
     124             :       dbname_(dbname),
     125             :       db_lock_(NULL),
     126             :       shutting_down_(NULL),
     127             :       bg_cv_(&mutex_),
     128         244 :       mem_(new MemTable(internal_comparator_)),
     129             :       imm_(NULL),
     130             :       logfile_(NULL),
     131             :       logfile_number_(0),
     132             :       log_(NULL),
     133             :       seed_(0),
     134         244 :       tmp_batch_(new WriteBatch),
     135             :       bg_compaction_scheduled_(false),
     136        5124 :       manual_compaction_(NULL) {
     137         244 :   mem_->Ref();
     138         244 :   has_imm_.Release_Store(NULL);
     139             : 
     140             :   // Reserve ten files or so for other uses and give the rest to TableCache.
     141         244 :   const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
     142         244 :   table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
     143             : 
     144             :   versions_ = new VersionSet(dbname_, &options_, table_cache_,
     145         244 :                              &internal_comparator_);
     146         244 : }
     147             : 
     148        2440 : DBImpl::~DBImpl() {
     149             :   // Wait for background work to finish
     150         244 :   mutex_.Lock();
     151         244 :   shutting_down_.Release_Store(this);  // Any non-NULL value is ok
     152         244 :   while (bg_compaction_scheduled_) {
     153           0 :     bg_cv_.Wait();
     154             :   }
     155         244 :   mutex_.Unlock();
     156             : 
     157         244 :   if (db_lock_ != NULL) {
     158         488 :     env_->UnlockFile(db_lock_);
     159             :   }
     160             : 
     161         244 :   delete versions_;
     162         244 :   if (mem_ != NULL) mem_->Unref();
     163         244 :   if (imm_ != NULL) imm_->Unref();
     164         244 :   delete tmp_batch_;
     165         244 :   delete log_;
     166         244 :   delete logfile_;
     167         244 :   delete table_cache_;
     168             : 
     169         244 :   if (owns_info_log_) {
     170         244 :     delete options_.info_log;
     171             :   }
     172         244 :   if (owns_cache_) {
     173           0 :     delete options_.block_cache;
     174             :   }
     175         488 : }
     176             : 
     177         129 : Status DBImpl::NewDB() {
     178         129 :   VersionEdit new_db;
     179         258 :   new_db.SetComparatorName(user_comparator()->Name());
     180         129 :   new_db.SetLogNumber(0);
     181         129 :   new_db.SetNextFile(2);
     182         129 :   new_db.SetLastSequence(0);
     183             : 
     184         129 :   const std::string manifest = DescriptorFileName(dbname_, 1);
     185             :   WritableFile* file;
     186         129 :   Status s = env_->NewWritableFile(manifest, &file);
     187         129 :   if (!s.ok()) {
     188             :     return s;
     189             :   }
     190             :   {
     191         129 :     log::Writer log(file);
     192             :     std::string record;
     193         129 :     new_db.EncodeTo(&record);
     194         258 :     s = log.AddRecord(record);
     195         129 :     if (s.ok()) {
     196         258 :       s = file->Close();
     197         129 :     }
     198             :   }
     199         129 :   delete file;
     200         129 :   if (s.ok()) {
     201             :     // Make "CURRENT" file that points to the new manifest file.
     202         258 :     s = SetCurrentFile(env_, dbname_, 1);
     203             :   } else {
     204           0 :     env_->DeleteFile(manifest);
     205             :   }
     206         129 :   return s;
     207             : }
     208             : 
     209         283 : void DBImpl::MaybeIgnoreError(Status* s) const {
     210         283 :   if (s->ok() || options_.paranoid_checks) {
     211             :     // No change needed
     212             :   } else {
     213           0 :     Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
     214           0 :     *s = Status::OK();
     215             :   }
     216         283 : }
     217             : 
     218         251 : void DBImpl::DeleteObsoleteFiles() {
     219         502 :   if (!bg_error_.ok()) {
     220             :     // After a background error, we don't know whether a new version may
     221             :     // or may not have been committed, so we cannot safely garbage collect.
     222           0 :     return;
     223             :   }
     224             : 
     225             :   // Make a set of all of the live files
     226         251 :   std::set<uint64_t> live = pending_outputs_;
     227         251 :   versions_->AddLiveFiles(&live);
     228             : 
     229         251 :   std::vector<std::string> filenames;
     230         502 :   env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
     231             :   uint64_t number;
     232             :   FileType type;
     233        4918 :   for (size_t i = 0; i < filenames.size(); i++) {
     234        4416 :     if (ParseFileName(filenames[i], &number, &type)) {
     235        1810 :       bool keep = true;
     236        1810 :       switch (type) {
     237             :         case kLogFile:
     238         481 :           keep = ((number >= versions_->LogNumber()) ||
     239         481 :                   (number == versions_->PrevLogNumber()));
     240         366 :           break;
     241             :         case kDescriptorFile:
     242             :           // Keep my manifest file, and any newer incarnations'
     243             :           // (in case there is a race that allows other incarnations)
     244         495 :           keep = (number >= versions_->ManifestFileNumber());
     245         495 :           break;
     246             :         case kTableFile:
     247         534 :           keep = (live.find(number) != live.end());
     248         178 :           break;
     249             :         case kTempFile:
     250             :           // Any temp files that are currently being written to must
     251             :           // be recorded in pending_outputs_, which is inserted into "live"
     252           0 :           keep = (live.find(number) != live.end());
     253           0 :           break;
     254             :         case kCurrentFile:
     255             :         case kDBLockFile:
     256             :         case kInfoLogFile:
     257             :           keep = true;
     258             :           break;
     259             :       }
     260             : 
     261        1810 :       if (!keep) {
     262         374 :         if (type == kTableFile) {
     263          15 :           table_cache_->Evict(number);
     264             :         }
     265             :         Log(options_.info_log, "Delete type=%d #%lld\n",
     266             :             int(type),
     267         374 :             static_cast<unsigned long long>(number));
     268        2244 :         env_->DeleteFile(dbname_ + "/" + filenames[i]);
     269             :       }
     270             :     }
     271             :   }
     272             : }
     273             : 
     274         244 : Status DBImpl::Recover(VersionEdit* edit) {
     275         244 :   mutex_.AssertHeld();
     276             : 
     277             :   // Ignore error from CreateDir since the creation of the DB is
     278             :   // committed only when the descriptor is created, and this directory
     279             :   // may already exist from a previous failed creation attempt.
     280         488 :   env_->CreateDir(dbname_);
     281         244 :   assert(db_lock_ == NULL);
     282         488 :   Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
     283         244 :   if (!s.ok()) {
     284             :     return s;
     285             :   }
     286             : 
     287         488 :   if (!env_->FileExists(CurrentFileName(dbname_))) {
     288         129 :     if (options_.create_if_missing) {
     289         258 :       s = NewDB();
     290         129 :       if (!s.ok()) {
     291             :         return s;
     292             :       }
     293             :     } else {
     294             :       return Status::InvalidArgument(
     295           0 :           dbname_, "does not exist (create_if_missing is false)");
     296             :     }
     297             :   } else {
     298         115 :     if (options_.error_if_exists) {
     299             :       return Status::InvalidArgument(
     300           0 :           dbname_, "exists (error_if_exists is true)");
     301             :     }
     302             :   }
     303             : 
     304         488 :   s = versions_->Recover();
     305         244 :   if (s.ok()) {
     306         244 :     SequenceNumber max_sequence(0);
     307             : 
     308             :     // Recover from all newer log files than the ones named in the
     309             :     // descriptor (new log files may have been added by the previous
     310             :     // incarnation without registering them in the descriptor).
     311             :     //
     312             :     // Note that PrevLogNumber() is no longer used, but we pay
     313             :     // attention to it in case we are recovering a database
     314             :     // produced by an older version of leveldb.
     315         244 :     const uint64_t min_log = versions_->LogNumber();
     316         244 :     const uint64_t prev_log = versions_->PrevLogNumber();
     317             :     std::vector<std::string> filenames;
     318         488 :     s = env_->GetChildren(dbname_, &filenames);
     319         244 :     if (!s.ok()) {
     320           0 :       return s;
     321             :     }
     322             :     std::set<uint64_t> expected;
     323         244 :     versions_->AddLiveFiles(&expected);
     324             :     uint64_t number;
     325             :     FileType type;
     326             :     std::vector<uint64_t> logs;
     327        3536 :     for (size_t i = 0; i < filenames.size(); i++) {
     328        3048 :       if (ParseFileName(filenames[i], &number, &type)) {
     329             :         expected.erase(number);
     330        1140 :         if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
     331         115 :           logs.push_back(number);
     332             :       }
     333             :     }
     334         244 :     if (!expected.empty()) {
     335             :       char buf[50];
     336             :       snprintf(buf, sizeof(buf), "%d missing files; e.g.",
     337           0 :                static_cast<int>(expected.size()));
     338           0 :       return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
     339             :     }
     340             : 
     341             :     // Recover in the order in which the logs were generated
     342             :     std::sort(logs.begin(), logs.end());
     343         718 :     for (size_t i = 0; i < logs.size(); i++) {
     344         345 :       s = RecoverLogFile(logs[i], edit, &max_sequence);
     345             : 
     346             :       // The previous incarnation may not have written any MANIFEST
     347             :       // records after allocating this log number.  So we manually
     348             :       // update the file number allocation counter in VersionSet.
     349         230 :       versions_->MarkFileNumberUsed(logs[i]);
     350             :     }
     351             : 
     352         244 :     if (s.ok()) {
     353         244 :       if (versions_->LastSequence() < max_sequence) {
     354         115 :         versions_->SetLastSequence(max_sequence);
     355             :       }
     356         244 :     }
     357             :   }
     358             : 
     359             :   return s;
     360             : }
     361             : 
     362         115 : Status DBImpl::RecoverLogFile(uint64_t log_number,
     363             :                               VersionEdit* edit,
     364             :                               SequenceNumber* max_sequence) {
     365         345 :   struct LogReporter : public log::Reader::Reporter {
     366             :     Env* env;
     367             :     Logger* info_log;
     368             :     const char* fname;
     369             :     Status* status;  // NULL if options_.paranoid_checks==false
     370           0 :     virtual void Corruption(size_t bytes, const Status& s) {
     371             :       Log(info_log, "%s%s: dropping %d bytes; %s",
     372           0 :           (this->status == NULL ? "(ignoring error) " : ""),
     373           0 :           fname, static_cast<int>(bytes), s.ToString().c_str());
     374           0 :       if (this->status != NULL && this->status->ok()) *this->status = s;
     375           0 :     }
     376             :   };
     377             : 
     378         115 :   mutex_.AssertHeld();
     379             : 
     380             :   // Open the log file
     381         115 :   std::string fname = LogFileName(dbname_, log_number);
     382             :   SequentialFile* file;
     383         115 :   Status status = env_->NewSequentialFile(fname, &file);
     384         115 :   if (!status.ok()) {
     385           0 :     MaybeIgnoreError(&status);
     386             :     return status;
     387             :   }
     388             : 
     389             :   // Create the log reader.
     390             :   LogReporter reporter;
     391         115 :   reporter.env = env_;
     392         115 :   reporter.info_log = options_.info_log;
     393         115 :   reporter.fname = fname.c_str();
     394         115 :   reporter.status = (options_.paranoid_checks ? &status : NULL);
     395             :   // We intentionally make log::Reader do checksumming even if
     396             :   // paranoid_checks==false so that corruptions cause entire commits
     397             :   // to be skipped instead of propagating bad information (like overly
     398             :   // large sequence numbers).
     399             :   log::Reader reader(file, &reporter, true/*checksum*/,
     400         230 :                      0/*initial_offset*/);
     401             :   Log(options_.info_log, "Recovering log #%llu",
     402         115 :       (unsigned long long) log_number);
     403             : 
     404             :   // Read all the records and add to a memtable
     405             :   std::string scratch;
     406             :   Slice record;
     407         230 :   WriteBatch batch;
     408             :   MemTable* mem = NULL;
     409         681 :   while (reader.ReadRecord(&record, &scratch) &&
     410             :          status.ok()) {
     411         283 :     if (record.size() < 12) {
     412             :       reporter.Corruption(
     413           0 :           record.size(), Status::Corruption("log record too small"));
     414           0 :       continue;
     415             :     }
     416         283 :     WriteBatchInternal::SetContents(&batch, record);
     417             : 
     418         283 :     if (mem == NULL) {
     419         115 :       mem = new MemTable(internal_comparator_);
     420         115 :       mem->Ref();
     421             :     }
     422         566 :     status = WriteBatchInternal::InsertInto(&batch, mem);
     423         283 :     MaybeIgnoreError(&status);
     424         283 :     if (!status.ok()) {
     425             :       break;
     426             :     }
     427             :     const SequenceNumber last_seq =
     428         566 :         WriteBatchInternal::Sequence(&batch) +
     429         566 :         WriteBatchInternal::Count(&batch) - 1;
     430         283 :     if (last_seq > *max_sequence) {
     431         283 :       *max_sequence = last_seq;
     432             :     }
     433             : 
     434         283 :     if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
     435           0 :       status = WriteLevel0Table(mem, edit, NULL);
     436           0 :       if (!status.ok()) {
     437             :         // Reflect errors immediately so that conditions like full
     438             :         // file-systems cause the DB::Open() to fail.
     439             :         break;
     440             :       }
     441           0 :       mem->Unref();
     442           0 :       mem = NULL;
     443             :     }
     444             :   }
     445             : 
     446         115 :   if (status.ok() && mem != NULL) {
     447         230 :     status = WriteLevel0Table(mem, edit, NULL);
     448             :     // Reflect errors immediately so that conditions like full
     449             :     // file-systems cause the DB::Open() to fail.
     450             :   }
     451             : 
     452         115 :   if (mem != NULL) mem->Unref();
     453         115 :   delete file;
     454             :   return status;
     455             : }
     456             : 
     457         115 : Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
     458             :                                 Version* base) {
     459         115 :   mutex_.AssertHeld();
     460         115 :   const uint64_t start_micros = env_->NowMicros();
     461             :   FileMetaData meta;
     462         230 :   meta.number = versions_->NewFileNumber();
     463         115 :   pending_outputs_.insert(meta.number);
     464         115 :   Iterator* iter = mem->NewIterator();
     465             :   Log(options_.info_log, "Level-0 table #%llu: started",
     466         115 :       (unsigned long long) meta.number);
     467             : 
     468             :   Status s;
     469             :   {
     470         115 :     mutex_.Unlock();
     471         230 :     s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
     472         115 :     mutex_.Lock();
     473             :   }
     474             : 
     475             :   Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
     476             :       (unsigned long long) meta.number,
     477             :       (unsigned long long) meta.file_size,
     478         345 :       s.ToString().c_str());
     479         115 :   delete iter;
     480         115 :   pending_outputs_.erase(meta.number);
     481             : 
     482             : 
     483             :   // Note that if file_size is zero, the file has been deleted and
     484             :   // should not be added to the manifest.
     485         115 :   int level = 0;
     486         115 :   if (s.ok() && meta.file_size > 0) {
     487         115 :     const Slice min_user_key = meta.smallest.user_key();
     488         115 :     const Slice max_user_key = meta.largest.user_key();
     489         115 :     if (base != NULL) {
     490           0 :       level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
     491             :     }
     492             :     edit->AddFile(level, meta.number, meta.file_size,
     493         115 :                   meta.smallest, meta.largest);
     494             :   }
     495             : 
     496             :   CompactionStats stats;
     497         115 :   stats.micros = env_->NowMicros() - start_micros;
     498         115 :   stats.bytes_written = meta.file_size;
     499         115 :   stats_[level].Add(stats);
     500         115 :   return s;
     501             : }
     502             : 
     503           0 : void DBImpl::CompactMemTable() {
     504           0 :   mutex_.AssertHeld();
     505           0 :   assert(imm_ != NULL);
     506             : 
     507             :   // Save the contents of the memtable as a new Table
     508           0 :   VersionEdit edit;
     509           0 :   Version* base = versions_->current();
     510           0 :   base->Ref();
     511           0 :   Status s = WriteLevel0Table(imm_, &edit, base);
     512           0 :   base->Unref();
     513             : 
     514           0 :   if (s.ok() && shutting_down_.Acquire_Load()) {
     515           0 :     s = Status::IOError("Deleting DB during memtable compaction");
     516             :   }
     517             : 
     518             :   // Replace immutable memtable with the generated Table
     519           0 :   if (s.ok()) {
     520           0 :     edit.SetPrevLogNumber(0);
     521           0 :     edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
     522           0 :     s = versions_->LogAndApply(&edit, &mutex_);
     523             :   }
     524             : 
     525           0 :   if (s.ok()) {
     526             :     // Commit to the new state
     527           0 :     imm_->Unref();
     528           0 :     imm_ = NULL;
     529           0 :     has_imm_.Release_Store(NULL);
     530           0 :     DeleteObsoleteFiles();
     531             :   } else {
     532           0 :     RecordBackgroundError(s);
     533           0 :   }
     534           0 : }
     535             : 
     536           0 : void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
     537           0 :   int max_level_with_files = 1;
     538             :   {
     539           0 :     MutexLock l(&mutex_);
     540           0 :     Version* base = versions_->current();
     541           0 :     for (int level = 1; level < config::kNumLevels; level++) {
     542           0 :       if (base->OverlapInLevel(level, begin, end)) {
     543           0 :         max_level_with_files = level;
     544             :       }
     545             :     }
     546             :   }
     547           0 :   TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
     548           0 :   for (int level = 0; level < max_level_with_files; level++) {
     549           0 :     TEST_CompactRange(level, begin, end);
     550             :   }
     551           0 : }
     552             : 
     553           0 : void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
     554           0 :   assert(level >= 0);
     555           0 :   assert(level + 1 < config::kNumLevels);
     556             : 
     557             :   InternalKey begin_storage, end_storage;
     558             : 
     559             :   ManualCompaction manual;
     560           0 :   manual.level = level;
     561           0 :   manual.done = false;
     562           0 :   if (begin == NULL) {
     563           0 :     manual.begin = NULL;
     564             :   } else {
     565           0 :     begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
     566           0 :     manual.begin = &begin_storage;
     567             :   }
     568           0 :   if (end == NULL) {
     569           0 :     manual.end = NULL;
     570             :   } else {
     571           0 :     end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
     572           0 :     manual.end = &end_storage;
     573             :   }
     574             : 
     575           0 :   MutexLock l(&mutex_);
     576           0 :   while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
     577           0 :     if (manual_compaction_ == NULL) {  // Idle
     578           0 :       manual_compaction_ = &manual;
     579           0 :       MaybeScheduleCompaction();
     580             :     } else {  // Running either my compaction or another compaction.
     581           0 :       bg_cv_.Wait();
     582             :     }
     583             :   }
     584           0 :   if (manual_compaction_ == &manual) {
     585             :     // Cancel my manual compaction since we aborted early for some reason.
     586           0 :     manual_compaction_ = NULL;
     587             :   }
     588           0 : }
     589             : 
     590           0 : Status DBImpl::TEST_CompactMemTable() {
     591             :   // NULL batch means just wait for earlier writes to be done
     592           0 :   Status s = Write(WriteOptions(), NULL);
     593           0 :   if (s.ok()) {
     594             :     // Wait until the compaction completes
     595           0 :     MutexLock l(&mutex_);
     596           0 :     while (imm_ != NULL && bg_error_.ok()) {
     597           0 :       bg_cv_.Wait();
     598             :     }
     599           0 :     if (imm_ != NULL) {
     600           0 :       s = bg_error_;
     601             :     }
     602             :   }
     603           0 :   return s;
     604             : }
     605             : 
     606           0 : void DBImpl::RecordBackgroundError(const Status& s) {
     607           0 :   mutex_.AssertHeld();
     608           0 :   if (bg_error_.ok()) {
     609           0 :     bg_error_ = s;
     610           0 :     bg_cv_.SignalAll();
     611             :   }
     612           0 : }
     613             : 
     614         258 : void DBImpl::MaybeScheduleCompaction() {
     615         258 :   mutex_.AssertHeld();
     616         258 :   if (bg_compaction_scheduled_) {
     617             :     // Already scheduled
     618         516 :   } else if (shutting_down_.Acquire_Load()) {
     619             :     // DB is being deleted; no more background compactions
     620         516 :   } else if (!bg_error_.ok()) {
     621             :     // Already got an error; no more changes
     622         774 :   } else if (imm_ == NULL &&
     623         516 :              manual_compaction_ == NULL &&
     624         258 :              !versions_->NeedsCompaction()) {
     625             :     // No work to be done
     626             :   } else {
     627           7 :     bg_compaction_scheduled_ = true;
     628           7 :     env_->Schedule(&DBImpl::BGWork, this);
     629             :   }
     630         258 : }
     631             : 
     632           7 : void DBImpl::BGWork(void* db) {
     633           7 :   reinterpret_cast<DBImpl*>(db)->BackgroundCall();
     634           7 : }
     635             : 
     636           7 : void DBImpl::BackgroundCall() {
     637           7 :   MutexLock l(&mutex_);
     638           7 :   assert(bg_compaction_scheduled_);
     639          14 :   if (shutting_down_.Acquire_Load()) {
     640             :     // No more background work when shutting down.
     641          14 :   } else if (!bg_error_.ok()) {
     642             :     // No more background work after a background error.
     643             :   } else {
     644           7 :     BackgroundCompaction();
     645             :   }
     646             : 
     647           7 :   bg_compaction_scheduled_ = false;
     648             : 
     649             :   // Previous compaction may have produced too many files in a level,
     650             :   // so reschedule another compaction if needed.
     651           7 :   MaybeScheduleCompaction();
     652           7 :   bg_cv_.SignalAll();
     653           7 : }
     654             : 
     655           7 : void DBImpl::BackgroundCompaction() {
     656           7 :   mutex_.AssertHeld();
     657             : 
     658           7 :   if (imm_ != NULL) {
     659           0 :     CompactMemTable();
     660           7 :     return;
     661             :   }
     662             : 
     663             :   Compaction* c;
     664           7 :   bool is_manual = (manual_compaction_ != NULL);
     665             :   InternalKey manual_end;
     666           7 :   if (is_manual) {
     667           0 :     ManualCompaction* m = manual_compaction_;
     668           0 :     c = versions_->CompactRange(m->level, m->begin, m->end);
     669           0 :     m->done = (c == NULL);
     670           0 :     if (c != NULL) {
     671           0 :       manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
     672             :     }
     673             :     Log(options_.info_log,
     674             :         "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
     675             :         m->level,
     676           0 :         (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
     677           0 :         (m->end ? m->end->DebugString().c_str() : "(end)"),
     678           0 :         (m->done ? "(end)" : manual_end.DebugString().c_str()));
     679             :   } else {
     680           7 :     c = versions_->PickCompaction();
     681             :   }
     682             : 
     683             :   Status status;
     684           7 :   if (c == NULL) {
     685             :     // Nothing to do
     686           7 :   } else if (!is_manual && c->IsTrivialMove()) {
     687             :     // Move file to next level
     688           0 :     assert(c->num_input_files(0) == 1);
     689           0 :     FileMetaData* f = c->input(0, 0);
     690           0 :     c->edit()->DeleteFile(c->level(), f->number);
     691           0 :     c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
     692           0 :                        f->smallest, f->largest);
     693           0 :     status = versions_->LogAndApply(c->edit(), &mutex_);
     694           0 :     if (!status.ok()) {
     695           0 :       RecordBackgroundError(status);
     696             :     }
     697             :     VersionSet::LevelSummaryStorage tmp;
     698             :     Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
     699             :         static_cast<unsigned long long>(f->number),
     700           0 :         c->level() + 1,
     701             :         static_cast<unsigned long long>(f->file_size),
     702             :         status.ToString().c_str(),
     703           0 :         versions_->LevelSummary(&tmp));
     704             :   } else {
     705          14 :     CompactionState* compact = new CompactionState(c);
     706          14 :     status = DoCompactionWork(compact);
     707           7 :     if (!status.ok()) {
     708           0 :       RecordBackgroundError(status);
     709             :     }
     710           7 :     CleanupCompaction(compact);
     711           7 :     c->ReleaseInputs();
     712           7 :     DeleteObsoleteFiles();
     713             :   }
     714           7 :   delete c;
     715             : 
     716           7 :   if (status.ok()) {
     717             :     // Done
     718           0 :   } else if (shutting_down_.Acquire_Load()) {
     719             :     // Ignore compaction errors found during shutting down
     720             :   } else {
     721             :     Log(options_.info_log,
     722           0 :         "Compaction error: %s", status.ToString().c_str());
     723             :   }
     724             : 
     725           7 :   if (is_manual) {
     726           0 :     ManualCompaction* m = manual_compaction_;
     727           0 :     if (!status.ok()) {
     728           0 :       m->done = true;
     729             :     }
     730           0 :     if (!m->done) {
     731             :       // We only compacted part of the requested range.  Update *m
     732             :       // to the range that is left to be compacted.
     733           0 :       m->tmp_storage = manual_end;
     734           0 :       m->begin = &m->tmp_storage;
     735             :     }
     736           0 :     manual_compaction_ = NULL;
     737             :   }
     738             : }
     739             : 
     740           7 : void DBImpl::CleanupCompaction(CompactionState* compact) {
     741           7 :   mutex_.AssertHeld();
     742           7 :   if (compact->builder != NULL) {
     743             :     // May happen if we get a shutdown call in the middle of compaction
     744           0 :     compact->builder->Abandon();
     745           0 :     delete compact->builder;
     746             :   } else {
     747           7 :     assert(compact->outfile == NULL);
     748             :   }
     749           7 :   delete compact->outfile;
     750          21 :   for (size_t i = 0; i < compact->outputs.size(); i++) {
     751          14 :     const CompactionState::Output& out = compact->outputs[i];
     752           7 :     pending_outputs_.erase(out.number);
     753             :   }
     754          14 :   delete compact;
     755           7 : }
     756             : 
     757           7 : Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
     758           7 :   assert(compact != NULL);
     759           7 :   assert(compact->builder == NULL);
     760             :   uint64_t file_number;
     761             :   {
     762           7 :     mutex_.Lock();
     763          14 :     file_number = versions_->NewFileNumber();
     764           7 :     pending_outputs_.insert(file_number);
     765             :     CompactionState::Output out;
     766           7 :     out.number = file_number;
     767             :     out.smallest.Clear();
     768             :     out.largest.Clear();
     769           7 :     compact->outputs.push_back(out);
     770           7 :     mutex_.Unlock();
     771             :   }
     772             : 
     773             :   // Make the output file
     774           7 :   std::string fname = TableFileName(dbname_, file_number);
     775           7 :   Status s = env_->NewWritableFile(fname, &compact->outfile);
     776           7 :   if (s.ok()) {
     777           7 :     compact->builder = new TableBuilder(options_, compact->outfile);
     778             :   }
     779           7 :   return s;
     780             : }
     781             : 
     782           7 : Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
     783             :                                           Iterator* input) {
     784           7 :   assert(compact != NULL);
     785           7 :   assert(compact->outfile != NULL);
     786           7 :   assert(compact->builder != NULL);
     787             : 
     788           7 :   const uint64_t output_number = compact->current_output()->number;
     789           7 :   assert(output_number != 0);
     790             : 
     791             :   // Check for iterator errors
     792           7 :   Status s = input->status();
     793           7 :   const uint64_t current_entries = compact->builder->NumEntries();
     794           7 :   if (s.ok()) {
     795          14 :     s = compact->builder->Finish();
     796             :   } else {
     797           0 :     compact->builder->Abandon();
     798             :   }
     799           7 :   const uint64_t current_bytes = compact->builder->FileSize();
     800           7 :   compact->current_output()->file_size = current_bytes;
     801           7 :   compact->total_bytes += current_bytes;
     802           7 :   delete compact->builder;
     803           7 :   compact->builder = NULL;
     804             : 
     805             :   // Finish and check for file errors
     806           7 :   if (s.ok()) {
     807          14 :     s = compact->outfile->Sync();
     808             :   }
     809           7 :   if (s.ok()) {
     810          14 :     s = compact->outfile->Close();
     811             :   }
     812           7 :   delete compact->outfile;
     813           7 :   compact->outfile = NULL;
     814             : 
     815           7 :   if (s.ok() && current_entries > 0) {
     816             :     // Verify that the table is usable
     817             :     Iterator* iter = table_cache_->NewIterator(ReadOptions(),
     818             :                                                output_number,
     819           7 :                                                current_bytes);
     820          14 :     s = iter->status();
     821           7 :     delete iter;
     822           7 :     if (s.ok()) {
     823             :       Log(options_.info_log,
     824             :           "Generated table #%llu: %lld keys, %lld bytes",
     825             :           (unsigned long long) output_number,
     826             :           (unsigned long long) current_entries,
     827           7 :           (unsigned long long) current_bytes);
     828             :     }
     829             :   }
     830           7 :   return s;
     831             : }
     832             : 
     833             : 
     834           7 : Status DBImpl::InstallCompactionResults(CompactionState* compact) {
     835           7 :   mutex_.AssertHeld();
     836             :   Log(options_.info_log,  "Compacted %d@%d + %d@%d files => %lld bytes",
     837             :       compact->compaction->num_input_files(0),
     838             :       compact->compaction->level(),
     839             :       compact->compaction->num_input_files(1),
     840           7 :       compact->compaction->level() + 1,
     841          28 :       static_cast<long long>(compact->total_bytes));
     842             : 
     843             :   // Add compaction outputs
     844          14 :   compact->compaction->AddInputDeletions(compact->compaction->edit());
     845           7 :   const int level = compact->compaction->level();
     846          28 :   for (size_t i = 0; i < compact->outputs.size(); i++) {
     847          14 :     const CompactionState::Output& out = compact->outputs[i];
     848             :     compact->compaction->edit()->AddFile(
     849             :         level + 1,
     850          14 :         out.number, out.file_size, out.smallest, out.largest);
     851             :   }
     852          14 :   return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
     853             : }
     854             : 
     855           7 : Status DBImpl::DoCompactionWork(CompactionState* compact) {
     856           7 :   const uint64_t start_micros = env_->NowMicros();
     857           7 :   int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
     858             : 
     859             :   Log(options_.info_log,  "Compacting %d@%d + %d@%d files",
     860             :       compact->compaction->num_input_files(0),
     861             :       compact->compaction->level(),
     862             :       compact->compaction->num_input_files(1),
     863          21 :       compact->compaction->level() + 1);
     864             : 
     865           7 :   assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
     866           7 :   assert(compact->builder == NULL);
     867           7 :   assert(compact->outfile == NULL);
     868          14 :   if (snapshots_.empty()) {
     869           7 :     compact->smallest_snapshot = versions_->LastSequence();
     870             :   } else {
     871           0 :     compact->smallest_snapshot = snapshots_.oldest()->number_;
     872             :   }
     873             : 
     874             :   // Release mutex while we're actually doing the compaction work
     875           7 :   mutex_.Unlock();
     876             : 
     877           7 :   Iterator* input = versions_->MakeInputIterator(compact->compaction);
     878           7 :   input->SeekToFirst();
     879             :   Status status;
     880             :   ParsedInternalKey ikey;
     881             :   std::string current_user_key;
     882           7 :   bool has_current_user_key = false;
     883           7 :   SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
     884        3094 :   for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
     885             :     // Prioritize immutable compaction work
     886        3080 :     if (has_imm_.NoBarrier_Load() != NULL) {
     887           0 :       const uint64_t imm_start = env_->NowMicros();
     888           0 :       mutex_.Lock();
     889           0 :       if (imm_ != NULL) {
     890           0 :         CompactMemTable();
     891           0 :         bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary
     892             :       }
     893           0 :       mutex_.Unlock();
     894           0 :       imm_micros += (env_->NowMicros() - imm_start);
     895             :     }
     896             : 
     897        1540 :     Slice key = input->key();
     898        1540 :     if (compact->compaction->ShouldStopBefore(key) &&
     899           0 :         compact->builder != NULL) {
     900           0 :       status = FinishCompactionOutputFile(compact, input);
     901           0 :       if (!status.ok()) {
     902             :         break;
     903             :       }
     904             :     }
     905             : 
     906             :     // Handle key/value, add to state, etc.
     907        1540 :     bool drop = false;
     908        1540 :     if (!ParseInternalKey(key, &ikey)) {
     909             :       // Do not hide error keys
     910             :       current_user_key.clear();
     911             :       has_current_user_key = false;
     912             :       last_sequence_for_key = kMaxSequenceNumber;
     913             :     } else {
     914        4620 :       if (!has_current_user_key ||
     915             :           user_comparator()->Compare(ikey.user_key,
     916        6139 :                                      Slice(current_user_key)) != 0) {
     917             :         // First occurrence of this user key
     918        1519 :         current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
     919             :         has_current_user_key = true;
     920             :         last_sequence_for_key = kMaxSequenceNumber;
     921             :       }
     922             : 
     923        1540 :       if (last_sequence_for_key <= compact->smallest_snapshot) {
     924             :         // Hidden by an newer entry for same user key
     925             :         drop = true;    // (A)
     926        3038 :       } else if (ikey.type == kTypeDeletion &&
     927        1519 :                  ikey.sequence <= compact->smallest_snapshot &&
     928           0 :                  compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
     929             :         // For this user key:
     930             :         // (1) there is no data in higher levels
     931             :         // (2) data in lower levels will have larger sequence numbers
     932             :         // (3) data in layers that are being compacted here and have
     933             :         //     smaller sequence numbers will be dropped in the next
     934             :         //     few iterations of this loop (by rule (A) above).
     935             :         // Therefore this deletion marker is obsolete and can be dropped.
     936           0 :         drop = true;
     937             :       }
     938             : 
     939        1540 :       last_sequence_for_key = ikey.sequence;
     940             :     }
     941             : #if 0
     942             :     Log(options_.info_log,
     943             :         "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
     944             :         "%d smallest_snapshot: %d",
     945             :         ikey.user_key.ToString().c_str(),
     946             :         (int)ikey.sequence, ikey.type, kTypeValue, drop,
     947             :         compact->compaction->IsBaseLevelForKey(ikey.user_key),
     948             :         (int)last_sequence_for_key, (int)compact->smallest_snapshot);
     949             : #endif
     950             : 
     951        1540 :     if (!drop) {
     952             :       // Open output file if necessary
     953        1519 :       if (compact->builder == NULL) {
     954          14 :         status = OpenCompactionOutputFile(compact);
     955           7 :         if (!status.ok()) {
     956             :           break;
     957             :         }
     958             :       }
     959        1519 :       if (compact->builder->NumEntries() == 0) {
     960           7 :         compact->current_output()->smallest.DecodeFrom(key);
     961             :       }
     962        1519 :       compact->current_output()->largest.DecodeFrom(key);
     963        1519 :       compact->builder->Add(key, input->value());
     964             : 
     965             :       // Close output file if it is big enough
     966        3038 :       if (compact->builder->FileSize() >=
     967        1519 :           compact->compaction->MaxOutputFileSize()) {
     968           0 :         status = FinishCompactionOutputFile(compact, input);
     969           0 :         if (!status.ok()) {
     970             :           break;
     971             :         }
     972             :       }
     973             :     }
     974             : 
     975        1540 :     input->Next();
     976             :   }
     977             : 
     978          14 :   if (status.ok() && shutting_down_.Acquire_Load()) {
     979           0 :     status = Status::IOError("Deleting DB during compaction");
     980             :   }
     981           7 :   if (status.ok() && compact->builder != NULL) {
     982          14 :     status = FinishCompactionOutputFile(compact, input);
     983             :   }
     984           7 :   if (status.ok()) {
     985          14 :     status = input->status();
     986             :   }
     987           7 :   delete input;
     988           7 :   input = NULL;
     989             : 
     990             :   CompactionStats stats;
     991           7 :   stats.micros = env_->NowMicros() - start_micros - imm_micros;
     992          21 :   for (int which = 0; which < 2; which++) {
     993          50 :     for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
     994          36 :       stats.bytes_read += compact->compaction->input(which, i)->file_size;
     995             :     }
     996             :   }
     997          21 :   for (size_t i = 0; i < compact->outputs.size(); i++) {
     998          14 :     stats.bytes_written += compact->outputs[i].file_size;
     999             :   }
    1000             : 
    1001           7 :   mutex_.Lock();
    1002           7 :   stats_[compact->compaction->level() + 1].Add(stats);
    1003             : 
    1004           7 :   if (status.ok()) {
    1005          14 :     status = InstallCompactionResults(compact);
    1006             :   }
    1007           7 :   if (!status.ok()) {
    1008           0 :     RecordBackgroundError(status);
    1009             :   }
    1010             :   VersionSet::LevelSummaryStorage tmp;
    1011             :   Log(options_.info_log,
    1012           7 :       "compacted to: %s", versions_->LevelSummary(&tmp));
    1013           7 :   return status;
    1014             : }
    1015             : 
    1016             : namespace {
    1017             : struct IterState {
    1018             :   port::Mutex* mu;
    1019             :   Version* version;
    1020             :   MemTable* mem;
    1021             :   MemTable* imm;
    1022             : };
    1023             : 
    1024         159 : static void CleanupIteratorState(void* arg1, void* arg2) {
    1025         159 :   IterState* state = reinterpret_cast<IterState*>(arg1);
    1026         159 :   state->mu->Lock();
    1027         159 :   state->mem->Unref();
    1028         159 :   if (state->imm != NULL) state->imm->Unref();
    1029         159 :   state->version->Unref();
    1030         159 :   state->mu->Unlock();
    1031         159 :   delete state;
    1032         159 : }
    1033             : }  // namespace
    1034             : 
    1035         159 : Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
    1036             :                                       SequenceNumber* latest_snapshot,
    1037             :                                       uint32_t* seed) {
    1038         159 :   IterState* cleanup = new IterState;
    1039         159 :   mutex_.Lock();
    1040         159 :   *latest_snapshot = versions_->LastSequence();
    1041             : 
    1042             :   // Collect together all needed child iterators
    1043             :   std::vector<Iterator*> list;
    1044         318 :   list.push_back(mem_->NewIterator());
    1045         159 :   mem_->Ref();
    1046         159 :   if (imm_ != NULL) {
    1047           0 :     list.push_back(imm_->NewIterator());
    1048           0 :     imm_->Ref();
    1049             :   }
    1050         159 :   versions_->current()->AddIterators(options, &list);
    1051             :   Iterator* internal_iter =
    1052         318 :       NewMergingIterator(&internal_comparator_, &list[0], list.size());
    1053         159 :   versions_->current()->Ref();
    1054             : 
    1055         159 :   cleanup->mu = &mutex_;
    1056         159 :   cleanup->mem = mem_;
    1057         159 :   cleanup->imm = imm_;
    1058         159 :   cleanup->version = versions_->current();
    1059         159 :   internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
    1060             : 
    1061         159 :   *seed = ++seed_;
    1062         159 :   mutex_.Unlock();
    1063         318 :   return internal_iter;
    1064             : }
    1065             : 
    1066           0 : Iterator* DBImpl::TEST_NewInternalIterator() {
    1067             :   SequenceNumber ignored;
    1068             :   uint32_t ignored_seed;
    1069           0 :   return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
    1070             : }
    1071             : 
    1072           0 : int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
    1073           0 :   MutexLock l(&mutex_);
    1074           0 :   return versions_->MaxNextLevelOverlappingBytes();
    1075             : }
    1076             : 
    1077       36658 : Status DBImpl::Get(const ReadOptions& options,
    1078             :                    const Slice& key,
    1079             :                    std::string* value) {
    1080             :   Status s;
    1081       36658 :   MutexLock l(&mutex_);
    1082             :   SequenceNumber snapshot;
    1083       36658 :   if (options.snapshot != NULL) {
    1084           0 :     snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
    1085             :   } else {
    1086       36658 :     snapshot = versions_->LastSequence();
    1087             :   }
    1088             : 
    1089       36658 :   MemTable* mem = mem_;
    1090       36658 :   MemTable* imm = imm_;
    1091       36658 :   Version* current = versions_->current();
    1092       36658 :   mem->Ref();
    1093       36658 :   if (imm != NULL) imm->Ref();
    1094       36658 :   current->Ref();
    1095             : 
    1096       36658 :   bool have_stat_update = false;
    1097             :   Version::GetStats stats;
    1098             : 
    1099             :   // Unlock while reading from files and memtables
    1100             :   {
    1101       36658 :     mutex_.Unlock();
    1102             :     // First look in the memtable, then in the immutable memtable (if any).
    1103       36658 :     LookupKey lkey(key, snapshot);
    1104       36658 :     if (mem->Get(lkey, value, &s)) {
    1105             :       // Done
    1106       36651 :     } else if (imm != NULL && imm->Get(lkey, value, &s)) {
    1107             :       // Done
    1108             :     } else {
    1109       73302 :       s = current->Get(options, lkey, value, &stats);
    1110       36651 :       have_stat_update = true;
    1111             :     }
    1112       36658 :     mutex_.Lock();
    1113             :   }
    1114             : 
    1115       36658 :   if (have_stat_update && current->UpdateStats(stats)) {
    1116           7 :     MaybeScheduleCompaction();
    1117             :   }
    1118       36658 :   mem->Unref();
    1119       36658 :   if (imm != NULL) imm->Unref();
    1120       36658 :   current->Unref();
    1121       36658 :   return s;
    1122             : }
    1123             : 
    1124         159 : Iterator* DBImpl::NewIterator(const ReadOptions& options) {
    1125             :   SequenceNumber latest_snapshot;
    1126             :   uint32_t seed;
    1127         159 :   Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
    1128             :   return NewDBIterator(
    1129             :       this, user_comparator(), iter,
    1130         159 :       (options.snapshot != NULL
    1131             :        ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
    1132             :        : latest_snapshot),
    1133         477 :       seed);
    1134             : }
    1135             : 
    1136          57 : void DBImpl::RecordReadSample(Slice key) {
    1137          57 :   MutexLock l(&mutex_);
    1138          57 :   if (versions_->current()->RecordReadSample(key)) {
    1139           0 :     MaybeScheduleCompaction();
    1140             :   }
    1141          57 : }
    1142             : 
    1143           0 : const Snapshot* DBImpl::GetSnapshot() {
    1144           0 :   MutexLock l(&mutex_);
    1145           0 :   return snapshots_.New(versions_->LastSequence());
    1146             : }
    1147             : 
    1148           0 : void DBImpl::ReleaseSnapshot(const Snapshot* s) {
    1149           0 :   MutexLock l(&mutex_);
    1150           0 :   snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
    1151           0 : }
    1152             : 
    1153             : // Convenience methods
    1154           0 : Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
    1155           0 :   return DB::Put(o, key, val);
    1156             : }
    1157             : 
    1158           0 : Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
    1159           0 :   return DB::Delete(options, key);
    1160             : }
    1161             : 
    1162         551 : Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
    1163         551 :   Writer w(&mutex_);
    1164         551 :   w.batch = my_batch;
    1165         551 :   w.sync = options.sync;
    1166         551 :   w.done = false;
    1167             : 
    1168         551 :   MutexLock l(&mutex_);
    1169        1102 :   writers_.push_back(&w);
    1170        1653 :   while (!w.done && &w != writers_.front()) {
    1171           0 :     w.cv.Wait();
    1172             :   }
    1173         551 :   if (w.done) {
    1174             :     return w.status;
    1175             :   }
    1176             : 
    1177             :   // May temporarily unlock and wait.
    1178         551 :   Status status = MakeRoomForWrite(my_batch == NULL);
    1179         551 :   uint64_t last_sequence = versions_->LastSequence();
    1180         551 :   Writer* last_writer = &w;
    1181         551 :   if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
    1182         551 :     WriteBatch* updates = BuildBatchGroup(&last_writer);
    1183         551 :     WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    1184         551 :     last_sequence += WriteBatchInternal::Count(updates);
    1185             : 
    1186             :     // Add to log and apply to memtable.  We can release the lock
    1187             :     // during this phase since &w is currently responsible for logging
    1188             :     // and protects against concurrent loggers and concurrent writes
    1189             :     // into mem_.
    1190             :     {
    1191         551 :       mutex_.Unlock();
    1192        1102 :       status = log_->AddRecord(WriteBatchInternal::Contents(updates));
    1193         551 :       bool sync_error = false;
    1194         551 :       if (status.ok() && options.sync) {
    1195         310 :         status = logfile_->Sync();
    1196         155 :         if (!status.ok()) {
    1197           0 :           sync_error = true;
    1198             :         }
    1199             :       }
    1200         551 :       if (status.ok()) {
    1201        1102 :         status = WriteBatchInternal::InsertInto(updates, mem_);
    1202             :       }
    1203         551 :       mutex_.Lock();
    1204         551 :       if (sync_error) {
    1205             :         // The state of the log file is indeterminate: the log record we
    1206             :         // just added may or may not show up when the DB is re-opened.
    1207             :         // So we force the DB into a mode where all future writes fail.
    1208           0 :         RecordBackgroundError(status);
    1209             :       }
    1210             :     }
    1211         551 :     if (updates == tmp_batch_) tmp_batch_->Clear();
    1212             : 
    1213         551 :     versions_->SetLastSequence(last_sequence);
    1214             :   }
    1215             : 
    1216             :   while (true) {
    1217        1102 :     Writer* ready = writers_.front();
    1218         551 :     writers_.pop_front();
    1219         551 :     if (ready != &w) {
    1220           0 :       ready->status = status;
    1221           0 :       ready->done = true;
    1222           0 :       ready->cv.Signal();
    1223             :     }
    1224         551 :     if (ready == last_writer) break;
    1225             :   }
    1226             : 
    1227             :   // Notify new head of write queue
    1228        1102 :   if (!writers_.empty()) {
    1229           0 :     writers_.front()->cv.Signal();
    1230             :   }
    1231             : 
    1232         551 :   return status;
    1233             : }
    1234             : 
    1235             : // REQUIRES: Writer list must be non-empty
    1236             : // REQUIRES: First writer must have a non-NULL batch
    1237         551 : WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
    1238        1102 :   assert(!writers_.empty());
    1239        1102 :   Writer* first = writers_.front();
    1240         551 :   WriteBatch* result = first->batch;
    1241         551 :   assert(result != NULL);
    1242             : 
    1243        1102 :   size_t size = WriteBatchInternal::ByteSize(first->batch);
    1244             : 
    1245             :   // Allow the group to grow up to a maximum size, but if the
    1246             :   // original write is small, limit the growth so we do not slow
    1247             :   // down the small write too much.
    1248         551 :   size_t max_size = 1 << 20;
    1249         551 :   if (size <= (128<<10)) {
    1250         550 :     max_size = size + (128<<10);
    1251             :   }
    1252             : 
    1253         551 :   *last_writer = first;
    1254         551 :   std::deque<Writer*>::iterator iter = writers_.begin();
    1255             :   ++iter;  // Advance past "first"
    1256        1102 :   for (; iter != writers_.end(); ++iter) {
    1257           0 :     Writer* w = *iter;
    1258           0 :     if (w->sync && !first->sync) {
    1259             :       // Do not include a sync write into a batch handled by a non-sync write.
    1260             :       break;
    1261             :     }
    1262             : 
    1263           0 :     if (w->batch != NULL) {
    1264           0 :       size += WriteBatchInternal::ByteSize(w->batch);
    1265           0 :       if (size > max_size) {
    1266             :         // Do not make batch too big
    1267             :         break;
    1268             :       }
    1269             : 
    1270             :       // Append to *result
    1271           0 :       if (result == first->batch) {
    1272             :         // Switch to temporary batch instead of disturbing caller's batch
    1273           0 :         result = tmp_batch_;
    1274           0 :         assert(WriteBatchInternal::Count(result) == 0);
    1275           0 :         WriteBatchInternal::Append(result, first->batch);
    1276             :       }
    1277           0 :       WriteBatchInternal::Append(result, w->batch);
    1278             :     }
    1279           0 :     *last_writer = w;
    1280             :   }
    1281         551 :   return result;
    1282             : }
    1283             : 
    1284             : // REQUIRES: mutex_ is held
    1285             : // REQUIRES: this thread is currently at the front of the writer queue
    1286         551 : Status DBImpl::MakeRoomForWrite(bool force) {
    1287         551 :   mutex_.AssertHeld();
    1288        1102 :   assert(!writers_.empty());
    1289         551 :   bool allow_delay = !force;
    1290             :   Status s;
    1291             :   while (true) {
    1292        1102 :     if (!bg_error_.ok()) {
    1293             :       // Yield previous error
    1294           0 :       s = bg_error_;
    1295             :       break;
    1296         551 :     } else if (
    1297        1102 :         allow_delay &&
    1298         551 :         versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
    1299             :       // We are getting close to hitting a hard limit on the number of
    1300             :       // L0 files.  Rather than delaying a single write by several
    1301             :       // seconds when we hit the hard limit, start delaying each
    1302             :       // individual write by 1ms to reduce latency variance.  Also,
    1303             :       // this delay hands over some CPU to the compaction thread in
    1304             :       // case it is sharing the same core as the writer.
    1305           0 :       mutex_.Unlock();
    1306           0 :       env_->SleepForMicroseconds(1000);
    1307           0 :       allow_delay = false;  // Do not delay a single write more than once
    1308           0 :       mutex_.Lock();
    1309        1102 :     } else if (!force &&
    1310         551 :                (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
    1311             :       // There is room in current memtable
    1312             :       break;
    1313           0 :     } else if (imm_ != NULL) {
    1314             :       // We have filled up the current memtable, but the previous
    1315             :       // one is still being compacted, so we wait.
    1316           0 :       Log(options_.info_log, "Current memtable full; waiting...\n");
    1317           0 :       bg_cv_.Wait();
    1318           0 :     } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
    1319             :       // There are too many level-0 files.
    1320           0 :       Log(options_.info_log, "Too many L0 files; waiting...\n");
    1321           0 :       bg_cv_.Wait();
    1322             :     } else {
    1323             :       // Attempt to switch to a new memtable and trigger compaction of old
    1324           0 :       assert(versions_->PrevLogNumber() == 0);
    1325           0 :       uint64_t new_log_number = versions_->NewFileNumber();
    1326           0 :       WritableFile* lfile = NULL;
    1327           0 :       s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
    1328           0 :       if (!s.ok()) {
    1329             :         // Avoid chewing through file number space in a tight loop.
    1330           0 :         versions_->ReuseFileNumber(new_log_number);
    1331           0 :         break;
    1332             :       }
    1333           0 :       delete log_;
    1334           0 :       delete logfile_;
    1335           0 :       logfile_ = lfile;
    1336           0 :       logfile_number_ = new_log_number;
    1337           0 :       log_ = new log::Writer(lfile);
    1338           0 :       imm_ = mem_;
    1339           0 :       has_imm_.Release_Store(imm_);
    1340           0 :       mem_ = new MemTable(internal_comparator_);
    1341           0 :       mem_->Ref();
    1342           0 :       force = false;   // Do not force another compaction if have room
    1343           0 :       MaybeScheduleCompaction();
    1344             :     }
    1345             :   }
    1346         551 :   return s;
    1347             : }
    1348             : 
    1349           0 : bool DBImpl::GetProperty(const Slice& property, std::string* value) {
    1350             :   value->clear();
    1351             : 
    1352           0 :   MutexLock l(&mutex_);
    1353           0 :   Slice in = property;
    1354             :   Slice prefix("leveldb.");
    1355           0 :   if (!in.starts_with(prefix)) return false;
    1356           0 :   in.remove_prefix(prefix.size());
    1357             : 
    1358           0 :   if (in.starts_with("num-files-at-level")) {
    1359           0 :     in.remove_prefix(strlen("num-files-at-level"));
    1360             :     uint64_t level;
    1361           0 :     bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
    1362           0 :     if (!ok || level >= config::kNumLevels) {
    1363             :       return false;
    1364             :     } else {
    1365             :       char buf[100];
    1366             :       snprintf(buf, sizeof(buf), "%d",
    1367           0 :                versions_->NumLevelFiles(static_cast<int>(level)));
    1368             :       *value = buf;
    1369             :       return true;
    1370             :     }
    1371           0 :   } else if (in == "stats") {
    1372             :     char buf[200];
    1373             :     snprintf(buf, sizeof(buf),
    1374             :              "                               Compactions\n"
    1375             :              "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
    1376             :              "--------------------------------------------------\n"
    1377             :              );
    1378           0 :     value->append(buf);
    1379           0 :     for (int level = 0; level < config::kNumLevels; level++) {
    1380           0 :       int files = versions_->NumLevelFiles(level);
    1381           0 :       if (stats_[level].micros > 0 || files > 0) {
    1382             :         snprintf(
    1383             :             buf, sizeof(buf),
    1384             :             "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
    1385             :             level,
    1386             :             files,
    1387           0 :             versions_->NumLevelBytes(level) / 1048576.0,
    1388             :             stats_[level].micros / 1e6,
    1389             :             stats_[level].bytes_read / 1048576.0,
    1390           0 :             stats_[level].bytes_written / 1048576.0);
    1391           0 :         value->append(buf);
    1392             :       }
    1393             :     }
    1394             :     return true;
    1395           0 :   } else if (in == "sstables") {
    1396           0 :     *value = versions_->current()->DebugString();
    1397           0 :     return true;
    1398             :   }
    1399             : 
    1400             :   return false;
    1401             : }
    1402             : 
    1403           0 : void DBImpl::GetApproximateSizes(
    1404             :     const Range* range, int n,
    1405             :     uint64_t* sizes) {
    1406             :   // TODO(opt): better implementation
    1407             :   Version* v;
    1408             :   {
    1409           0 :     MutexLock l(&mutex_);
    1410           0 :     versions_->current()->Ref();
    1411           0 :     v = versions_->current();
    1412             :   }
    1413             : 
    1414           0 :   for (int i = 0; i < n; i++) {
    1415             :     // Convert user_key into a corresponding internal key.
    1416           0 :     InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
    1417           0 :     InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
    1418           0 :     uint64_t start = versions_->ApproximateOffsetOf(v, k1);
    1419           0 :     uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
    1420           0 :     sizes[i] = (limit >= start ? limit - start : 0);
    1421             :   }
    1422             : 
    1423             :   {
    1424           0 :     MutexLock l(&mutex_);
    1425           0 :     v->Unref();
    1426             :   }
    1427           0 : }
    1428             : 
    1429             : // Default implementations of convenience methods that subclasses of DB
    1430             : // can call if they wish
    1431           0 : Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
    1432           0 :   WriteBatch batch;
    1433           0 :   batch.Put(key, value);
    1434           0 :   return Write(opt, &batch);
    1435             : }
    1436             : 
    1437           0 : Status DB::Delete(const WriteOptions& opt, const Slice& key) {
    1438           0 :   WriteBatch batch;
    1439           0 :   batch.Delete(key);
    1440           0 :   return Write(opt, &batch);
    1441             : }
    1442             : 
    1443         244 : DB::~DB() { }
    1444             : 
    1445         244 : Status DB::Open(const Options& options, const std::string& dbname,
    1446             :                 DB** dbptr) {
    1447         244 :   *dbptr = NULL;
    1448             : 
    1449         244 :   DBImpl* impl = new DBImpl(options, dbname);
    1450         244 :   impl->mutex_.Lock();
    1451         244 :   VersionEdit edit;
    1452         244 :   Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
    1453         244 :   if (s.ok()) {
    1454         488 :     uint64_t new_log_number = impl->versions_->NewFileNumber();
    1455             :     WritableFile* lfile;
    1456         732 :     s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
    1457         488 :                                      &lfile);
    1458         244 :     if (s.ok()) {
    1459         244 :       edit.SetLogNumber(new_log_number);
    1460         244 :       impl->logfile_ = lfile;
    1461         244 :       impl->logfile_number_ = new_log_number;
    1462         244 :       impl->log_ = new log::Writer(lfile);
    1463         488 :       s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
    1464             :     }
    1465         244 :     if (s.ok()) {
    1466         244 :       impl->DeleteObsoleteFiles();
    1467         244 :       impl->MaybeScheduleCompaction();
    1468             :     }
    1469             :   }
    1470         244 :   impl->mutex_.Unlock();
    1471         244 :   if (s.ok()) {
    1472         244 :     *dbptr = impl;
    1473             :   } else {
    1474           0 :     delete impl;
    1475             :   }
    1476         244 :   return s;
    1477             : }
    1478             : 
    1479         244 : Snapshot::~Snapshot() {
    1480         244 : }
    1481             : 
    1482           3 : Status DestroyDB(const std::string& dbname, const Options& options) {
    1483           3 :   Env* env = options.env;
    1484             :   std::vector<std::string> filenames;
    1485             :   // Ignore error in case directory does not exist
    1486           6 :   env->GetChildren(dbname, &filenames);
    1487           3 :   if (filenames.empty()) {
    1488             :     return Status::OK();
    1489             :   }
    1490             : 
    1491             :   FileLock* lock;
    1492           3 :   const std::string lockname = LockFileName(dbname);
    1493           3 :   Status result = env->LockFile(lockname, &lock);
    1494           3 :   if (result.ok()) {
    1495             :     uint64_t number;
    1496             :     FileType type;
    1497          45 :     for (size_t i = 0; i < filenames.size(); i++) {
    1498          57 :       if (ParseFileName(filenames[i], &number, &type) &&
    1499          15 :           type != kDBLockFile) {  // Lock file will be deleted at end
    1500          60 :         Status del = env->DeleteFile(dbname + "/" + filenames[i]);
    1501          24 :         if (result.ok() && !del.ok()) {
    1502           0 :           result = del;
    1503             :         }
    1504             :       }
    1505             :     }
    1506           6 :     env->UnlockFile(lock);  // Ignore error since state is already gone
    1507           6 :     env->DeleteFile(lockname);
    1508           6 :     env->DeleteDir(dbname);  // Ignore error in case dir contains other files
    1509             :   }
    1510           3 :   return result;
    1511             : }
    1512             : 
    1513             : }  // namespace leveldb

Generated by: LCOV version 1.11