LCOV - code coverage report
Current view: top level - src/leveldb/util - env_posix.cc (source / functions) Hit Total Coverage
Test: total_coverage.info Lines: 195 267 73.0 %
Date: 2015-10-12 22:39:14 Functions: 45 59 76.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
       2             : // Use of this source code is governed by a BSD-style license that can be
       3             : // found in the LICENSE file. See the AUTHORS file for names of contributors.
       4             : #if !defined(LEVELDB_PLATFORM_WINDOWS)
       5             : 
       6             : #include <dirent.h>
       7             : #include <errno.h>
       8             : #include <fcntl.h>
       9             : #include <pthread.h>
      10             : #include <stdio.h>
      11             : #include <stdlib.h>
      12             : #include <string.h>
      13             : #include <sys/mman.h>
      14             : #include <sys/stat.h>
      15             : #include <sys/time.h>
      16             : #include <sys/types.h>
      17             : #include <time.h>
      18             : #include <unistd.h>
      19             : #include <deque>
      20             : #include <set>
      21             : #include "leveldb/env.h"
      22             : #include "leveldb/slice.h"
      23             : #include "port/port.h"
      24             : #include "util/logging.h"
      25             : #include "util/mutexlock.h"
      26             : #include "util/posix_logger.h"
      27             : 
      28             : namespace leveldb {
      29             : 
      30             : namespace {
      31             : 
      32         461 : static Status IOError(const std::string& context, int err_number) {
      33         922 :   return Status::IOError(context, strerror(err_number));
      34             : }
      35             : 
      36             : class PosixSequentialFile: public SequentialFile {
      37             :  private:
      38             :   std::string filename_;
      39             :   FILE* file_;
      40             : 
      41             :  public:
      42         499 :   PosixSequentialFile(const std::string& fname, FILE* f)
      43         998 :       : filename_(fname), file_(f) { }
      44        1497 :   virtual ~PosixSequentialFile() { fclose(file_); }
      45             : 
      46         691 :   virtual Status Read(size_t n, Slice* result, char* scratch) {
      47             :     Status s;
      48        1382 :     size_t r = fread_unlocked(scratch, 1, n, file_);
      49         691 :     *result = Slice(scratch, r);
      50         691 :     if (r < n) {
      51         691 :       if (feof(file_)) {
      52             :         // We leave status as ok if we hit the end of the file
      53             :       } else {
      54             :         // A partial read with an error: return a non-ok status
      55           0 :         s = IOError(filename_, errno);
      56             :       }
      57             :     }
      58         691 :     return s;
      59             :   }
      60             : 
      61           0 :   virtual Status Skip(uint64_t n) {
      62           0 :     if (fseek(file_, n, SEEK_CUR)) {
      63           0 :       return IOError(filename_, errno);
      64             :     }
      65             :     return Status::OK();
      66             :   }
      67             : };
      68             : 
      69             : // pread() based random-access
      70             : class PosixRandomAccessFile: public RandomAccessFile {
      71             :  private:
      72             :   std::string filename_;
      73             :   int fd_;
      74             : 
      75             :  public:
      76           0 :   PosixRandomAccessFile(const std::string& fname, int fd)
      77           0 :       : filename_(fname), fd_(fd) { }
      78           0 :   virtual ~PosixRandomAccessFile() { close(fd_); }
      79             : 
      80           0 :   virtual Status Read(uint64_t offset, size_t n, Slice* result,
      81             :                       char* scratch) const {
      82             :     Status s;
      83           0 :     ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
      84           0 :     *result = Slice(scratch, (r < 0) ? 0 : r);
      85           0 :     if (r < 0) {
      86             :       // An error: return a non-ok status
      87           0 :       s = IOError(filename_, errno);
      88             :     }
      89           0 :     return s;
      90             :   }
      91             : };
      92             : 
      93             : // Helper class to limit mmap file usage so that we do not end up
      94             : // running out virtual memory or running into kernel performance
      95             : // problems for very large databases.
      96           0 : class MmapLimiter {
      97             :  public:
      98             :   // Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes.
      99          95 :   MmapLimiter() {
     100          95 :     SetAllowed(sizeof(void*) >= 8 ? 1000 : 0);
     101             :   }
     102             : 
     103             :   // If another mmap slot is available, acquire it and return true.
     104             :   // Else return false.
     105         160 :   bool Acquire() {
     106         160 :     if (GetAllowed() <= 0) {
     107             :       return false;
     108             :     }
     109         160 :     MutexLock l(&mu_);
     110         160 :     intptr_t x = GetAllowed();
     111         160 :     if (x <= 0) {
     112             :       return false;
     113             :     } else {
     114         160 :       SetAllowed(x - 1);
     115         160 :       return true;
     116             :     }
     117             :   }
     118             : 
     119             :   // Release a slot acquired by a previous call to Acquire() that returned true.
     120         160 :   void Release() {
     121         160 :     MutexLock l(&mu_);
     122         160 :     SetAllowed(GetAllowed() + 1);
     123         160 :   }
     124             : 
     125             :  private:
     126             :   port::Mutex mu_;
     127             :   port::AtomicPointer allowed_;
     128             : 
     129             :   intptr_t GetAllowed() const {
     130         960 :     return reinterpret_cast<intptr_t>(allowed_.Acquire_Load());
     131             :   }
     132             : 
     133             :   // REQUIRES: mu_ must be held
     134             :   void SetAllowed(intptr_t v) {
     135         415 :     allowed_.Release_Store(reinterpret_cast<void*>(v));
     136             :   }
     137             : 
     138             :   MmapLimiter(const MmapLimiter&);
     139             :   void operator=(const MmapLimiter&);
     140             : };
     141             : 
     142             : // mmap() based random-access
     143             : class PosixMmapReadableFile: public RandomAccessFile {
     144             :  private:
     145             :   std::string filename_;
     146             :   void* mmapped_region_;
     147             :   size_t length_;
     148             :   MmapLimiter* limiter_;
     149             : 
     150             :  public:
     151             :   // base[0,length-1] contains the mmapped contents of the file.
     152         160 :   PosixMmapReadableFile(const std::string& fname, void* base, size_t length,
     153             :                         MmapLimiter* limiter)
     154             :       : filename_(fname), mmapped_region_(base), length_(length),
     155         320 :         limiter_(limiter) {
     156         160 :   }
     157             : 
     158         640 :   virtual ~PosixMmapReadableFile() {
     159         160 :     munmap(mmapped_region_, length_);
     160         160 :     limiter_->Release();
     161         320 :   }
     162             : 
     163       13056 :   virtual Status Read(uint64_t offset, size_t n, Slice* result,
     164             :                       char* scratch) const {
     165             :     Status s;
     166       13056 :     if (offset + n > length_) {
     167           0 :       *result = Slice();
     168           0 :       s = IOError(filename_, EINVAL);
     169             :     } else {
     170       13056 :       *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
     171             :     }
     172       13056 :     return s;
     173             :   }
     174             : };
     175             : 
     176             : class PosixWritableFile : public WritableFile {
     177             :  private:
     178             :   std::string filename_;
     179             :   FILE* file_;
     180             : 
     181             :  public:
     182         852 :   PosixWritableFile(const std::string& fname, FILE* f)
     183        1704 :       : filename_(fname), file_(f) { }
     184             : 
     185        3408 :   ~PosixWritableFile() {
     186         852 :     if (file_ != NULL) {
     187             :       // Ignoring any potential errors
     188         384 :       fclose(file_);
     189             :     }
     190        1704 :   }
     191             : 
     192        4053 :   virtual Status Append(const Slice& data) {
     193        4053 :     size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
     194        4053 :     if (r != data.size()) {
     195           0 :       return IOError(filename_, errno);
     196             :     }
     197             :     return Status::OK();
     198             :   }
     199             : 
     200         468 :   virtual Status Close() {
     201             :     Status result;
     202         468 :     if (fclose(file_) != 0) {
     203           0 :       result = IOError(filename_, errno);
     204             :     }
     205         468 :     file_ = NULL;
     206         468 :     return result;
     207             :   }
     208             : 
     209        1465 :   virtual Status Flush() {
     210        1465 :     if (fflush_unlocked(file_) != 0) {
     211           0 :       return IOError(filename_, errno);
     212             :     }
     213             :     return Status::OK();
     214             :   }
     215             : 
     216         720 :   Status SyncDirIfManifest() {
     217        1440 :     const char* f = filename_.c_str();
     218         720 :     const char* sep = strrchr(f, '/');
     219             :     Slice basename;
     220             :     std::string dir;
     221         720 :     if (sep == NULL) {
     222             :       dir = ".";
     223           0 :       basename = f;
     224             :     } else {
     225        2160 :       dir = std::string(f, sep - f);
     226        1440 :       basename = sep + 1;
     227             :     }
     228             :     Status s;
     229         720 :     if (basename.starts_with("MANIFEST")) {
     230         398 :       int fd = open(dir.c_str(), O_RDONLY);
     231         199 :       if (fd < 0) {
     232           0 :         s = IOError(dir, errno);
     233             :       } else {
     234         199 :         if (fsync(fd) < 0) {
     235           0 :           s = IOError(dir, errno);
     236             :         }
     237         199 :         close(fd);
     238             :       }
     239             :     }
     240         720 :     return s;
     241             :   }
     242             : 
     243         720 :   virtual Status Sync() {
     244             :     // Ensure new files referred to by the manifest are in the filesystem.
     245         720 :     Status s = SyncDirIfManifest();
     246         720 :     if (!s.ok()) {
     247             :       return s;
     248             :     }
     249        1440 :     if (fflush_unlocked(file_) != 0 ||
     250         720 :         fdatasync(fileno(file_)) != 0) {
     251           0 :       s = Status::IOError(filename_, strerror(errno));
     252             :     }
     253             :     return s;
     254             :   }
     255             : };
     256             : 
     257         390 : static int LockOrUnlock(int fd, bool lock) {
     258         390 :   errno = 0;
     259             :   struct flock f;
     260             :   memset(&f, 0, sizeof(f));
     261         390 :   f.l_type = (lock ? F_WRLCK : F_UNLCK);
     262             :   f.l_whence = SEEK_SET;
     263             :   f.l_start = 0;
     264             :   f.l_len = 0;        // Lock/unlock entire file
     265         390 :   return fcntl(fd, F_SETLK, &f);
     266             : }
     267             : 
     268         975 : class PosixFileLock : public FileLock {
     269             :  public:
     270             :   int fd_;
     271             :   std::string name_;
     272             : };
     273             : 
     274             : // Set of locked files.  We keep a separate set instead of just
     275             : // relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide
     276             : // any protection against multiple uses from the same process.
     277          95 : class PosixLockTable {
     278             :  private:
     279             :   port::Mutex mu_;
     280             :   std::set<std::string> locked_files_;
     281             :  public:
     282         195 :   bool Insert(const std::string& fname) {
     283         195 :     MutexLock l(&mu_);
     284         585 :     return locked_files_.insert(fname).second;
     285             :   }
     286         195 :   void Remove(const std::string& fname) {
     287         195 :     MutexLock l(&mu_);
     288         195 :     locked_files_.erase(fname);
     289         195 :   }
     290             : };
     291             : 
     292             : class PosixEnv : public Env {
     293             :  public:
     294             :   PosixEnv();
     295           0 :   virtual ~PosixEnv() {
     296           0 :     char msg[] = "Destroying Env::Default()\n";
     297           0 :     fwrite(msg, 1, sizeof(msg), stderr);
     298           0 :     abort();
     299           0 :   }
     300             : 
     301         499 :   virtual Status NewSequentialFile(const std::string& fname,
     302             :                                    SequentialFile** result) {
     303         499 :     FILE* f = fopen(fname.c_str(), "r");
     304         499 :     if (f == NULL) {
     305           0 :       *result = NULL;
     306           0 :       return IOError(fname, errno);
     307             :     } else {
     308         499 :       *result = new PosixSequentialFile(fname, f);
     309             :       return Status::OK();
     310             :     }
     311             :   }
     312             : 
     313         160 :   virtual Status NewRandomAccessFile(const std::string& fname,
     314             :                                      RandomAccessFile** result) {
     315         160 :     *result = NULL;
     316             :     Status s;
     317         320 :     int fd = open(fname.c_str(), O_RDONLY);
     318         160 :     if (fd < 0) {
     319           0 :       s = IOError(fname, errno);
     320         160 :     } else if (mmap_limit_.Acquire()) {
     321             :       uint64_t size;
     322         320 :       s = GetFileSize(fname, &size);
     323         160 :       if (s.ok()) {
     324         160 :         void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
     325         160 :         if (base != MAP_FAILED) {
     326         160 :           *result = new PosixMmapReadableFile(fname, base, size, &mmap_limit_);
     327             :         } else {
     328           0 :           s = IOError(fname, errno);
     329             :         }
     330             :       }
     331         160 :       close(fd);
     332         160 :       if (!s.ok()) {
     333           0 :         mmap_limit_.Release();
     334             :       }
     335             :     } else {
     336           0 :       *result = new PosixRandomAccessFile(fname, fd);
     337             :     }
     338         160 :     return s;
     339             :   }
     340             : 
     341         852 :   virtual Status NewWritableFile(const std::string& fname,
     342             :                                  WritableFile** result) {
     343             :     Status s;
     344         852 :     FILE* f = fopen(fname.c_str(), "w");
     345         852 :     if (f == NULL) {
     346           0 :       *result = NULL;
     347           0 :       s = IOError(fname, errno);
     348             :     } else {
     349         852 :       *result = new PosixWritableFile(fname, f);
     350             :     }
     351         852 :     return s;
     352             :   }
     353             : 
     354         192 :   virtual bool FileExists(const std::string& fname) {
     355         192 :     return access(fname.c_str(), F_OK) == 0;
     356             :   }
     357             : 
     358         394 :   virtual Status GetChildren(const std::string& dir,
     359             :                              std::vector<std::string>* result) {
     360             :     result->clear();
     361         394 :     DIR* d = opendir(dir.c_str());
     362         394 :     if (d == NULL) {
     363           0 :       return IOError(dir, errno);
     364             :     }
     365             :     struct dirent* entry;
     366        3835 :     while ((entry = readdir(d)) != NULL) {
     367       10323 :       result->push_back(entry->d_name);
     368             :     }
     369         394 :     closedir(d);
     370             :     return Status::OK();
     371             :   }
     372             : 
     373         337 :   virtual Status DeleteFile(const std::string& fname) {
     374             :     Status result;
     375         337 :     if (unlink(fname.c_str()) != 0) {
     376           0 :       result = IOError(fname, errno);
     377             :     }
     378         337 :     return result;
     379             :   }
     380             : 
     381         384 :   virtual Status CreateDir(const std::string& name) {
     382             :     Status result;
     383         384 :     if (mkdir(name.c_str(), 0755) != 0) {
     384         768 :       result = IOError(name, errno);
     385             :     }
     386         384 :     return result;
     387             :   }
     388             : 
     389           3 :   virtual Status DeleteDir(const std::string& name) {
     390             :     Status result;
     391           3 :     if (rmdir(name.c_str()) != 0) {
     392           0 :       result = IOError(name, errno);
     393             :     }
     394           3 :     return result;
     395             :   }
     396             : 
     397         160 :   virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
     398             :     Status s;
     399             :     struct stat sbuf;
     400         320 :     if (stat(fname.c_str(), &sbuf) != 0) {
     401           0 :       *size = 0;
     402           0 :       s = IOError(fname, errno);
     403             :     } else {
     404         160 :       *size = sbuf.st_size;
     405             :     }
     406         160 :     return s;
     407             :   }
     408             : 
     409         461 :   virtual Status RenameFile(const std::string& src, const std::string& target) {
     410             :     Status result;
     411         922 :     if (rename(src.c_str(), target.c_str()) != 0) {
     412         154 :       result = IOError(src, errno);
     413             :     }
     414         461 :     return result;
     415             :   }
     416             : 
     417         195 :   virtual Status LockFile(const std::string& fname, FileLock** lock) {
     418         195 :     *lock = NULL;
     419             :     Status result;
     420         390 :     int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
     421         195 :     if (fd < 0) {
     422           0 :       result = IOError(fname, errno);
     423         195 :     } else if (!locks_.Insert(fname)) {
     424           0 :       close(fd);
     425           0 :       result = Status::IOError("lock " + fname, "already held by process");
     426         195 :     } else if (LockOrUnlock(fd, true) == -1) {
     427           0 :       result = IOError("lock " + fname, errno);
     428           0 :       close(fd);
     429           0 :       locks_.Remove(fname);
     430             :     } else {
     431         390 :       PosixFileLock* my_lock = new PosixFileLock;
     432         195 :       my_lock->fd_ = fd;
     433         195 :       my_lock->name_ = fname;
     434         195 :       *lock = my_lock;
     435             :     }
     436         195 :     return result;
     437             :   }
     438             : 
     439         195 :   virtual Status UnlockFile(FileLock* lock) {
     440         195 :     PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
     441             :     Status result;
     442         195 :     if (LockOrUnlock(my_lock->fd_, false) == -1) {
     443           0 :       result = IOError("unlock", errno);
     444             :     }
     445         195 :     locks_.Remove(my_lock->name_);
     446         195 :     close(my_lock->fd_);
     447         195 :     delete my_lock;
     448         195 :     return result;
     449             :   }
     450             : 
     451             :   virtual void Schedule(void (*function)(void*), void* arg);
     452             : 
     453             :   virtual void StartThread(void (*function)(void* arg), void* arg);
     454             : 
     455           0 :   virtual Status GetTestDirectory(std::string* result) {
     456           0 :     const char* env = getenv("TEST_TMPDIR");
     457           0 :     if (env && env[0] != '\0') {
     458             :       *result = env;
     459             :     } else {
     460             :       char buf[100];
     461           0 :       snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
     462             :       *result = buf;
     463             :     }
     464             :     // Directory may already exist
     465           0 :     CreateDir(*result);
     466           0 :     return Status::OK();
     467             :   }
     468             : 
     469         695 :   static uint64_t gettid() {
     470         695 :     pthread_t tid = pthread_self();
     471         695 :     uint64_t thread_id = 0;
     472        1390 :     memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
     473         695 :     return thread_id;
     474             :   }
     475             : 
     476         192 :   virtual Status NewLogger(const std::string& fname, Logger** result) {
     477         192 :     FILE* f = fopen(fname.c_str(), "w");
     478         192 :     if (f == NULL) {
     479           0 :       *result = NULL;
     480           0 :       return IOError(fname, errno);
     481             :     } else {
     482         384 :       *result = new PosixLogger(f, &PosixEnv::gettid);
     483             :       return Status::OK();
     484             :     }
     485             :   }
     486             : 
     487         244 :   virtual uint64_t NowMicros() {
     488             :     struct timeval tv;
     489         244 :     gettimeofday(&tv, NULL);
     490         244 :     return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
     491             :   }
     492             : 
     493           0 :   virtual void SleepForMicroseconds(int micros) {
     494           0 :     usleep(micros);
     495           0 :   }
     496             : 
     497             :  private:
     498         239 :   void PthreadCall(const char* label, int result) {
     499         239 :     if (result != 0) {
     500           0 :       fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
     501           0 :       abort();
     502             :     }
     503         239 :   }
     504             : 
     505             :   // BGThread() is the body of the background thread
     506             :   void BGThread();
     507           7 :   static void* BGThreadWrapper(void* arg) {
     508           7 :     reinterpret_cast<PosixEnv*>(arg)->BGThread();
     509             :     return NULL;
     510             :   }
     511             : 
     512             :   pthread_mutex_t mu_;
     513             :   pthread_cond_t bgsignal_;
     514             :   pthread_t bgthread_;
     515             :   bool started_bgthread_;
     516             : 
     517             :   // Entry per Schedule() call
     518             :   struct BGItem { void* arg; void (*function)(void*); };
     519             :   typedef std::deque<BGItem> BGQueue;
     520             :   BGQueue queue_;
     521             : 
     522             :   PosixLockTable locks_;
     523             :   MmapLimiter mmap_limit_;
     524             : };
     525             : 
     526         380 : PosixEnv::PosixEnv() : started_bgthread_(false) {
     527          95 :   PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
     528          95 :   PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
     529          95 : }
     530             : 
     531           7 : void PosixEnv::Schedule(void (*function)(void*), void* arg) {
     532           7 :   PthreadCall("lock", pthread_mutex_lock(&mu_));
     533             : 
     534             :   // Start background thread if necessary
     535           7 :   if (!started_bgthread_) {
     536           7 :     started_bgthread_ = true;
     537             :     PthreadCall(
     538             :         "create thread",
     539           7 :         pthread_create(&bgthread_, NULL,  &PosixEnv::BGThreadWrapper, this));
     540             :   }
     541             : 
     542             :   // If the queue is currently empty, the background thread may currently be
     543             :   // waiting.
     544          14 :   if (queue_.empty()) {
     545           7 :     PthreadCall("signal", pthread_cond_signal(&bgsignal_));
     546             :   }
     547             : 
     548             :   // Add to priority queue
     549          14 :   queue_.push_back(BGItem());
     550          14 :   queue_.back().function = function;
     551          14 :   queue_.back().arg = arg;
     552             : 
     553           7 :   PthreadCall("unlock", pthread_mutex_unlock(&mu_));
     554           7 : }
     555             : 
     556           7 : void PosixEnv::BGThread() {
     557             :   while (true) {
     558             :     // Wait until there is an item that is ready to run
     559          14 :     PthreadCall("lock", pthread_mutex_lock(&mu_));
     560          42 :     while (queue_.empty()) {
     561           7 :       PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
     562             :     }
     563             : 
     564          14 :     void (*function)(void*) = queue_.front().function;
     565          14 :     void* arg = queue_.front().arg;
     566           7 :     queue_.pop_front();
     567             : 
     568           7 :     PthreadCall("unlock", pthread_mutex_unlock(&mu_));
     569           7 :     (*function)(arg);
     570           7 :   }
     571             : }
     572             : 
     573             : namespace {
     574             : struct StartThreadState {
     575             :   void (*user_function)(void*);
     576             :   void* arg;
     577             : };
     578             : }
     579           0 : static void* StartThreadWrapper(void* arg) {
     580           0 :   StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
     581           0 :   state->user_function(state->arg);
     582           0 :   delete state;
     583           0 :   return NULL;
     584             : }
     585             : 
     586           0 : void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
     587             :   pthread_t t;
     588           0 :   StartThreadState* state = new StartThreadState;
     589           0 :   state->user_function = function;
     590           0 :   state->arg = arg;
     591             :   PthreadCall("start thread",
     592           0 :               pthread_create(&t, NULL,  &StartThreadWrapper, state));
     593           0 : }
     594             : 
     595             : }  // namespace
     596             : 
     597             : static pthread_once_t once = PTHREAD_ONCE_INIT;
     598             : static Env* default_env;
     599          95 : static void InitDefaultEnv() { default_env = new PosixEnv; }
     600             : 
     601         700 : Env* Env::Default() {
     602         700 :   pthread_once(&once, InitDefaultEnv);
     603         700 :   return default_env;
     604             : }
     605             : 
     606             : }  // namespace leveldb
     607             : 
     608             : #endif

Generated by: LCOV version 1.11