Line data Source code
1 : // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 : // Use of this source code is governed by a BSD-style license that can be
3 : // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 : #if !defined(LEVELDB_PLATFORM_WINDOWS)
5 :
6 : #include <dirent.h>
7 : #include <errno.h>
8 : #include <fcntl.h>
9 : #include <pthread.h>
10 : #include <stdio.h>
11 : #include <stdlib.h>
12 : #include <string.h>
13 : #include <sys/mman.h>
14 : #include <sys/stat.h>
15 : #include <sys/time.h>
16 : #include <sys/types.h>
17 : #include <time.h>
18 : #include <unistd.h>
19 : #include <deque>
20 : #include <set>
21 : #include "leveldb/env.h"
22 : #include "leveldb/slice.h"
23 : #include "port/port.h"
24 : #include "util/logging.h"
25 : #include "util/mutexlock.h"
26 : #include "util/posix_logger.h"
27 :
28 : namespace leveldb {
29 :
30 : namespace {
31 :
32 461 : static Status IOError(const std::string& context, int err_number) {
33 922 : return Status::IOError(context, strerror(err_number));
34 : }
35 :
36 : class PosixSequentialFile: public SequentialFile {
37 : private:
38 : std::string filename_;
39 : FILE* file_;
40 :
41 : public:
42 499 : PosixSequentialFile(const std::string& fname, FILE* f)
43 998 : : filename_(fname), file_(f) { }
44 1497 : virtual ~PosixSequentialFile() { fclose(file_); }
45 :
46 691 : virtual Status Read(size_t n, Slice* result, char* scratch) {
47 : Status s;
48 1382 : size_t r = fread_unlocked(scratch, 1, n, file_);
49 691 : *result = Slice(scratch, r);
50 691 : if (r < n) {
51 691 : if (feof(file_)) {
52 : // We leave status as ok if we hit the end of the file
53 : } else {
54 : // A partial read with an error: return a non-ok status
55 0 : s = IOError(filename_, errno);
56 : }
57 : }
58 691 : return s;
59 : }
60 :
61 0 : virtual Status Skip(uint64_t n) {
62 0 : if (fseek(file_, n, SEEK_CUR)) {
63 0 : return IOError(filename_, errno);
64 : }
65 : return Status::OK();
66 : }
67 : };
68 :
69 : // pread() based random-access
70 : class PosixRandomAccessFile: public RandomAccessFile {
71 : private:
72 : std::string filename_;
73 : int fd_;
74 :
75 : public:
76 0 : PosixRandomAccessFile(const std::string& fname, int fd)
77 0 : : filename_(fname), fd_(fd) { }
78 0 : virtual ~PosixRandomAccessFile() { close(fd_); }
79 :
80 0 : virtual Status Read(uint64_t offset, size_t n, Slice* result,
81 : char* scratch) const {
82 : Status s;
83 0 : ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
84 0 : *result = Slice(scratch, (r < 0) ? 0 : r);
85 0 : if (r < 0) {
86 : // An error: return a non-ok status
87 0 : s = IOError(filename_, errno);
88 : }
89 0 : return s;
90 : }
91 : };
92 :
93 : // Helper class to limit mmap file usage so that we do not end up
94 : // running out virtual memory or running into kernel performance
95 : // problems for very large databases.
96 0 : class MmapLimiter {
97 : public:
98 : // Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes.
99 95 : MmapLimiter() {
100 95 : SetAllowed(sizeof(void*) >= 8 ? 1000 : 0);
101 : }
102 :
103 : // If another mmap slot is available, acquire it and return true.
104 : // Else return false.
105 160 : bool Acquire() {
106 160 : if (GetAllowed() <= 0) {
107 : return false;
108 : }
109 160 : MutexLock l(&mu_);
110 160 : intptr_t x = GetAllowed();
111 160 : if (x <= 0) {
112 : return false;
113 : } else {
114 160 : SetAllowed(x - 1);
115 160 : return true;
116 : }
117 : }
118 :
119 : // Release a slot acquired by a previous call to Acquire() that returned true.
120 160 : void Release() {
121 160 : MutexLock l(&mu_);
122 160 : SetAllowed(GetAllowed() + 1);
123 160 : }
124 :
125 : private:
126 : port::Mutex mu_;
127 : port::AtomicPointer allowed_;
128 :
129 : intptr_t GetAllowed() const {
130 960 : return reinterpret_cast<intptr_t>(allowed_.Acquire_Load());
131 : }
132 :
133 : // REQUIRES: mu_ must be held
134 : void SetAllowed(intptr_t v) {
135 415 : allowed_.Release_Store(reinterpret_cast<void*>(v));
136 : }
137 :
138 : MmapLimiter(const MmapLimiter&);
139 : void operator=(const MmapLimiter&);
140 : };
141 :
142 : // mmap() based random-access
143 : class PosixMmapReadableFile: public RandomAccessFile {
144 : private:
145 : std::string filename_;
146 : void* mmapped_region_;
147 : size_t length_;
148 : MmapLimiter* limiter_;
149 :
150 : public:
151 : // base[0,length-1] contains the mmapped contents of the file.
152 160 : PosixMmapReadableFile(const std::string& fname, void* base, size_t length,
153 : MmapLimiter* limiter)
154 : : filename_(fname), mmapped_region_(base), length_(length),
155 320 : limiter_(limiter) {
156 160 : }
157 :
158 640 : virtual ~PosixMmapReadableFile() {
159 160 : munmap(mmapped_region_, length_);
160 160 : limiter_->Release();
161 320 : }
162 :
163 13056 : virtual Status Read(uint64_t offset, size_t n, Slice* result,
164 : char* scratch) const {
165 : Status s;
166 13056 : if (offset + n > length_) {
167 0 : *result = Slice();
168 0 : s = IOError(filename_, EINVAL);
169 : } else {
170 13056 : *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
171 : }
172 13056 : return s;
173 : }
174 : };
175 :
176 : class PosixWritableFile : public WritableFile {
177 : private:
178 : std::string filename_;
179 : FILE* file_;
180 :
181 : public:
182 852 : PosixWritableFile(const std::string& fname, FILE* f)
183 1704 : : filename_(fname), file_(f) { }
184 :
185 3408 : ~PosixWritableFile() {
186 852 : if (file_ != NULL) {
187 : // Ignoring any potential errors
188 384 : fclose(file_);
189 : }
190 1704 : }
191 :
192 4053 : virtual Status Append(const Slice& data) {
193 4053 : size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
194 4053 : if (r != data.size()) {
195 0 : return IOError(filename_, errno);
196 : }
197 : return Status::OK();
198 : }
199 :
200 468 : virtual Status Close() {
201 : Status result;
202 468 : if (fclose(file_) != 0) {
203 0 : result = IOError(filename_, errno);
204 : }
205 468 : file_ = NULL;
206 468 : return result;
207 : }
208 :
209 1465 : virtual Status Flush() {
210 1465 : if (fflush_unlocked(file_) != 0) {
211 0 : return IOError(filename_, errno);
212 : }
213 : return Status::OK();
214 : }
215 :
216 720 : Status SyncDirIfManifest() {
217 1440 : const char* f = filename_.c_str();
218 720 : const char* sep = strrchr(f, '/');
219 : Slice basename;
220 : std::string dir;
221 720 : if (sep == NULL) {
222 : dir = ".";
223 0 : basename = f;
224 : } else {
225 2160 : dir = std::string(f, sep - f);
226 1440 : basename = sep + 1;
227 : }
228 : Status s;
229 720 : if (basename.starts_with("MANIFEST")) {
230 398 : int fd = open(dir.c_str(), O_RDONLY);
231 199 : if (fd < 0) {
232 0 : s = IOError(dir, errno);
233 : } else {
234 199 : if (fsync(fd) < 0) {
235 0 : s = IOError(dir, errno);
236 : }
237 199 : close(fd);
238 : }
239 : }
240 720 : return s;
241 : }
242 :
243 720 : virtual Status Sync() {
244 : // Ensure new files referred to by the manifest are in the filesystem.
245 720 : Status s = SyncDirIfManifest();
246 720 : if (!s.ok()) {
247 : return s;
248 : }
249 1440 : if (fflush_unlocked(file_) != 0 ||
250 720 : fdatasync(fileno(file_)) != 0) {
251 0 : s = Status::IOError(filename_, strerror(errno));
252 : }
253 : return s;
254 : }
255 : };
256 :
257 390 : static int LockOrUnlock(int fd, bool lock) {
258 390 : errno = 0;
259 : struct flock f;
260 : memset(&f, 0, sizeof(f));
261 390 : f.l_type = (lock ? F_WRLCK : F_UNLCK);
262 : f.l_whence = SEEK_SET;
263 : f.l_start = 0;
264 : f.l_len = 0; // Lock/unlock entire file
265 390 : return fcntl(fd, F_SETLK, &f);
266 : }
267 :
268 975 : class PosixFileLock : public FileLock {
269 : public:
270 : int fd_;
271 : std::string name_;
272 : };
273 :
274 : // Set of locked files. We keep a separate set instead of just
275 : // relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide
276 : // any protection against multiple uses from the same process.
277 95 : class PosixLockTable {
278 : private:
279 : port::Mutex mu_;
280 : std::set<std::string> locked_files_;
281 : public:
282 195 : bool Insert(const std::string& fname) {
283 195 : MutexLock l(&mu_);
284 585 : return locked_files_.insert(fname).second;
285 : }
286 195 : void Remove(const std::string& fname) {
287 195 : MutexLock l(&mu_);
288 195 : locked_files_.erase(fname);
289 195 : }
290 : };
291 :
292 : class PosixEnv : public Env {
293 : public:
294 : PosixEnv();
295 0 : virtual ~PosixEnv() {
296 0 : char msg[] = "Destroying Env::Default()\n";
297 0 : fwrite(msg, 1, sizeof(msg), stderr);
298 0 : abort();
299 0 : }
300 :
301 499 : virtual Status NewSequentialFile(const std::string& fname,
302 : SequentialFile** result) {
303 499 : FILE* f = fopen(fname.c_str(), "r");
304 499 : if (f == NULL) {
305 0 : *result = NULL;
306 0 : return IOError(fname, errno);
307 : } else {
308 499 : *result = new PosixSequentialFile(fname, f);
309 : return Status::OK();
310 : }
311 : }
312 :
313 160 : virtual Status NewRandomAccessFile(const std::string& fname,
314 : RandomAccessFile** result) {
315 160 : *result = NULL;
316 : Status s;
317 320 : int fd = open(fname.c_str(), O_RDONLY);
318 160 : if (fd < 0) {
319 0 : s = IOError(fname, errno);
320 160 : } else if (mmap_limit_.Acquire()) {
321 : uint64_t size;
322 320 : s = GetFileSize(fname, &size);
323 160 : if (s.ok()) {
324 160 : void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
325 160 : if (base != MAP_FAILED) {
326 160 : *result = new PosixMmapReadableFile(fname, base, size, &mmap_limit_);
327 : } else {
328 0 : s = IOError(fname, errno);
329 : }
330 : }
331 160 : close(fd);
332 160 : if (!s.ok()) {
333 0 : mmap_limit_.Release();
334 : }
335 : } else {
336 0 : *result = new PosixRandomAccessFile(fname, fd);
337 : }
338 160 : return s;
339 : }
340 :
341 852 : virtual Status NewWritableFile(const std::string& fname,
342 : WritableFile** result) {
343 : Status s;
344 852 : FILE* f = fopen(fname.c_str(), "w");
345 852 : if (f == NULL) {
346 0 : *result = NULL;
347 0 : s = IOError(fname, errno);
348 : } else {
349 852 : *result = new PosixWritableFile(fname, f);
350 : }
351 852 : return s;
352 : }
353 :
354 192 : virtual bool FileExists(const std::string& fname) {
355 192 : return access(fname.c_str(), F_OK) == 0;
356 : }
357 :
358 394 : virtual Status GetChildren(const std::string& dir,
359 : std::vector<std::string>* result) {
360 : result->clear();
361 394 : DIR* d = opendir(dir.c_str());
362 394 : if (d == NULL) {
363 0 : return IOError(dir, errno);
364 : }
365 : struct dirent* entry;
366 3835 : while ((entry = readdir(d)) != NULL) {
367 10323 : result->push_back(entry->d_name);
368 : }
369 394 : closedir(d);
370 : return Status::OK();
371 : }
372 :
373 337 : virtual Status DeleteFile(const std::string& fname) {
374 : Status result;
375 337 : if (unlink(fname.c_str()) != 0) {
376 0 : result = IOError(fname, errno);
377 : }
378 337 : return result;
379 : }
380 :
381 384 : virtual Status CreateDir(const std::string& name) {
382 : Status result;
383 384 : if (mkdir(name.c_str(), 0755) != 0) {
384 768 : result = IOError(name, errno);
385 : }
386 384 : return result;
387 : }
388 :
389 3 : virtual Status DeleteDir(const std::string& name) {
390 : Status result;
391 3 : if (rmdir(name.c_str()) != 0) {
392 0 : result = IOError(name, errno);
393 : }
394 3 : return result;
395 : }
396 :
397 160 : virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
398 : Status s;
399 : struct stat sbuf;
400 320 : if (stat(fname.c_str(), &sbuf) != 0) {
401 0 : *size = 0;
402 0 : s = IOError(fname, errno);
403 : } else {
404 160 : *size = sbuf.st_size;
405 : }
406 160 : return s;
407 : }
408 :
409 461 : virtual Status RenameFile(const std::string& src, const std::string& target) {
410 : Status result;
411 922 : if (rename(src.c_str(), target.c_str()) != 0) {
412 154 : result = IOError(src, errno);
413 : }
414 461 : return result;
415 : }
416 :
417 195 : virtual Status LockFile(const std::string& fname, FileLock** lock) {
418 195 : *lock = NULL;
419 : Status result;
420 390 : int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
421 195 : if (fd < 0) {
422 0 : result = IOError(fname, errno);
423 195 : } else if (!locks_.Insert(fname)) {
424 0 : close(fd);
425 0 : result = Status::IOError("lock " + fname, "already held by process");
426 195 : } else if (LockOrUnlock(fd, true) == -1) {
427 0 : result = IOError("lock " + fname, errno);
428 0 : close(fd);
429 0 : locks_.Remove(fname);
430 : } else {
431 390 : PosixFileLock* my_lock = new PosixFileLock;
432 195 : my_lock->fd_ = fd;
433 195 : my_lock->name_ = fname;
434 195 : *lock = my_lock;
435 : }
436 195 : return result;
437 : }
438 :
439 195 : virtual Status UnlockFile(FileLock* lock) {
440 195 : PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
441 : Status result;
442 195 : if (LockOrUnlock(my_lock->fd_, false) == -1) {
443 0 : result = IOError("unlock", errno);
444 : }
445 195 : locks_.Remove(my_lock->name_);
446 195 : close(my_lock->fd_);
447 195 : delete my_lock;
448 195 : return result;
449 : }
450 :
451 : virtual void Schedule(void (*function)(void*), void* arg);
452 :
453 : virtual void StartThread(void (*function)(void* arg), void* arg);
454 :
455 0 : virtual Status GetTestDirectory(std::string* result) {
456 0 : const char* env = getenv("TEST_TMPDIR");
457 0 : if (env && env[0] != '\0') {
458 : *result = env;
459 : } else {
460 : char buf[100];
461 0 : snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
462 : *result = buf;
463 : }
464 : // Directory may already exist
465 0 : CreateDir(*result);
466 0 : return Status::OK();
467 : }
468 :
469 695 : static uint64_t gettid() {
470 695 : pthread_t tid = pthread_self();
471 695 : uint64_t thread_id = 0;
472 1390 : memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
473 695 : return thread_id;
474 : }
475 :
476 192 : virtual Status NewLogger(const std::string& fname, Logger** result) {
477 192 : FILE* f = fopen(fname.c_str(), "w");
478 192 : if (f == NULL) {
479 0 : *result = NULL;
480 0 : return IOError(fname, errno);
481 : } else {
482 384 : *result = new PosixLogger(f, &PosixEnv::gettid);
483 : return Status::OK();
484 : }
485 : }
486 :
487 244 : virtual uint64_t NowMicros() {
488 : struct timeval tv;
489 244 : gettimeofday(&tv, NULL);
490 244 : return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
491 : }
492 :
493 0 : virtual void SleepForMicroseconds(int micros) {
494 0 : usleep(micros);
495 0 : }
496 :
497 : private:
498 239 : void PthreadCall(const char* label, int result) {
499 239 : if (result != 0) {
500 0 : fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
501 0 : abort();
502 : }
503 239 : }
504 :
505 : // BGThread() is the body of the background thread
506 : void BGThread();
507 7 : static void* BGThreadWrapper(void* arg) {
508 7 : reinterpret_cast<PosixEnv*>(arg)->BGThread();
509 : return NULL;
510 : }
511 :
512 : pthread_mutex_t mu_;
513 : pthread_cond_t bgsignal_;
514 : pthread_t bgthread_;
515 : bool started_bgthread_;
516 :
517 : // Entry per Schedule() call
518 : struct BGItem { void* arg; void (*function)(void*); };
519 : typedef std::deque<BGItem> BGQueue;
520 : BGQueue queue_;
521 :
522 : PosixLockTable locks_;
523 : MmapLimiter mmap_limit_;
524 : };
525 :
526 380 : PosixEnv::PosixEnv() : started_bgthread_(false) {
527 95 : PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
528 95 : PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
529 95 : }
530 :
531 7 : void PosixEnv::Schedule(void (*function)(void*), void* arg) {
532 7 : PthreadCall("lock", pthread_mutex_lock(&mu_));
533 :
534 : // Start background thread if necessary
535 7 : if (!started_bgthread_) {
536 7 : started_bgthread_ = true;
537 : PthreadCall(
538 : "create thread",
539 7 : pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
540 : }
541 :
542 : // If the queue is currently empty, the background thread may currently be
543 : // waiting.
544 14 : if (queue_.empty()) {
545 7 : PthreadCall("signal", pthread_cond_signal(&bgsignal_));
546 : }
547 :
548 : // Add to priority queue
549 14 : queue_.push_back(BGItem());
550 14 : queue_.back().function = function;
551 14 : queue_.back().arg = arg;
552 :
553 7 : PthreadCall("unlock", pthread_mutex_unlock(&mu_));
554 7 : }
555 :
556 7 : void PosixEnv::BGThread() {
557 : while (true) {
558 : // Wait until there is an item that is ready to run
559 14 : PthreadCall("lock", pthread_mutex_lock(&mu_));
560 42 : while (queue_.empty()) {
561 7 : PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
562 : }
563 :
564 14 : void (*function)(void*) = queue_.front().function;
565 14 : void* arg = queue_.front().arg;
566 7 : queue_.pop_front();
567 :
568 7 : PthreadCall("unlock", pthread_mutex_unlock(&mu_));
569 7 : (*function)(arg);
570 7 : }
571 : }
572 :
573 : namespace {
574 : struct StartThreadState {
575 : void (*user_function)(void*);
576 : void* arg;
577 : };
578 : }
579 0 : static void* StartThreadWrapper(void* arg) {
580 0 : StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
581 0 : state->user_function(state->arg);
582 0 : delete state;
583 0 : return NULL;
584 : }
585 :
586 0 : void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
587 : pthread_t t;
588 0 : StartThreadState* state = new StartThreadState;
589 0 : state->user_function = function;
590 0 : state->arg = arg;
591 : PthreadCall("start thread",
592 0 : pthread_create(&t, NULL, &StartThreadWrapper, state));
593 0 : }
594 :
595 : } // namespace
596 :
597 : static pthread_once_t once = PTHREAD_ONCE_INIT;
598 : static Env* default_env;
599 95 : static void InitDefaultEnv() { default_env = new PosixEnv; }
600 :
601 700 : Env* Env::Default() {
602 700 : pthread_once(&once, InitDefaultEnv);
603 700 : return default_env;
604 : }
605 :
606 : } // namespace leveldb
607 :
608 : #endif
|