Line data Source code
1 : // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 : // Use of this source code is governed by a BSD-style license that can be
3 : // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 : //
5 : // WriteBatch::rep_ :=
6 : // sequence: fixed64
7 : // count: fixed32
8 : // data: record[count]
9 : // record :=
10 : // kTypeValue varstring varstring |
11 : // kTypeDeletion varstring
12 : // varstring :=
13 : // len: varint32
14 : // data: uint8[len]
15 :
16 : #include "leveldb/write_batch.h"
17 :
18 : #include "leveldb/db.h"
19 : #include "db/dbformat.h"
20 : #include "db/memtable.h"
21 : #include "db/write_batch_internal.h"
22 : #include "util/coding.h"
23 :
24 : namespace leveldb {
25 :
26 : // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
27 : static const size_t kHeader = 12;
28 :
29 910 : WriteBatch::WriteBatch() {
30 910 : Clear();
31 910 : }
32 :
33 1820 : WriteBatch::~WriteBatch() { }
34 :
35 834 : WriteBatch::Handler::~Handler() { }
36 :
37 910 : void WriteBatch::Clear() {
38 910 : rep_.clear();
39 910 : rep_.resize(kHeader);
40 910 : }
41 :
42 834 : Status WriteBatch::Iterate(Handler* handler) const {
43 834 : Slice input(rep_);
44 834 : if (input.size() < kHeader) {
45 0 : return Status::Corruption("malformed WriteBatch (too small)");
46 : }
47 :
48 834 : input.remove_prefix(kHeader);
49 : Slice key, value;
50 834 : int found = 0;
51 30217 : while (!input.empty()) {
52 28549 : found++;
53 28549 : char tag = input[0];
54 28549 : input.remove_prefix(1);
55 28549 : switch (tag) {
56 : case kTypeValue:
57 56472 : if (GetLengthPrefixedSlice(&input, &key) &&
58 28236 : GetLengthPrefixedSlice(&input, &value)) {
59 28236 : handler->Put(key, value);
60 : } else {
61 0 : return Status::Corruption("bad WriteBatch Put");
62 : }
63 : break;
64 : case kTypeDeletion:
65 313 : if (GetLengthPrefixedSlice(&input, &key)) {
66 313 : handler->Delete(key);
67 : } else {
68 0 : return Status::Corruption("bad WriteBatch Delete");
69 : }
70 : break;
71 : default:
72 0 : return Status::Corruption("unknown WriteBatch tag");
73 : }
74 : }
75 834 : if (found != WriteBatchInternal::Count(this)) {
76 0 : return Status::Corruption("WriteBatch has wrong count");
77 : } else {
78 : return Status::OK();
79 : }
80 : }
81 :
82 834 : int WriteBatchInternal::Count(const WriteBatch* b) {
83 40497 : return DecodeFixed32(b->rep_.data() + 8);
84 : }
85 :
86 11831 : void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
87 23662 : EncodeFixed32(&b->rep_[8], n);
88 11831 : }
89 :
90 283 : SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
91 2517 : return SequenceNumber(DecodeFixed64(b->rep_.data()));
92 : }
93 :
94 551 : void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
95 1102 : EncodeFixed64(&b->rep_[0], seq);
96 551 : }
97 :
98 11518 : void WriteBatch::Put(const Slice& key, const Slice& value) {
99 11518 : WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
100 11518 : rep_.push_back(static_cast<char>(kTypeValue));
101 11518 : PutLengthPrefixedSlice(&rep_, key);
102 11518 : PutLengthPrefixedSlice(&rep_, value);
103 11518 : }
104 :
105 313 : void WriteBatch::Delete(const Slice& key) {
106 313 : WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
107 313 : rep_.push_back(static_cast<char>(kTypeDeletion));
108 313 : PutLengthPrefixedSlice(&rep_, key);
109 313 : }
110 :
111 : namespace {
112 3336 : class MemTableInserter : public WriteBatch::Handler {
113 : public:
114 : SequenceNumber sequence_;
115 : MemTable* mem_;
116 :
117 28236 : virtual void Put(const Slice& key, const Slice& value) {
118 28236 : mem_->Add(sequence_, kTypeValue, key, value);
119 28236 : sequence_++;
120 28236 : }
121 313 : virtual void Delete(const Slice& key) {
122 313 : mem_->Add(sequence_, kTypeDeletion, key, Slice());
123 313 : sequence_++;
124 313 : }
125 : };
126 : } // namespace
127 :
128 834 : Status WriteBatchInternal::InsertInto(const WriteBatch* b,
129 : MemTable* memtable) {
130 : MemTableInserter inserter;
131 834 : inserter.sequence_ = WriteBatchInternal::Sequence(b);
132 834 : inserter.mem_ = memtable;
133 1668 : return b->Iterate(&inserter);
134 : }
135 :
136 283 : void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
137 283 : assert(contents.size() >= kHeader);
138 283 : b->rep_.assign(contents.data(), contents.size());
139 283 : }
140 :
141 0 : void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
142 0 : SetCount(dst, Count(dst) + Count(src));
143 0 : assert(src->rep_.size() >= kHeader);
144 0 : dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
145 0 : }
146 :
147 : } // namespace leveldb
|