Line data Source code
1 : // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 : // Use of this source code is governed by a BSD-style license that can be
3 : // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 :
5 : #include "leveldb/table_builder.h"
6 :
7 : #include <assert.h>
8 : #include "leveldb/comparator.h"
9 : #include "leveldb/env.h"
10 : #include "leveldb/filter_policy.h"
11 : #include "leveldb/options.h"
12 : #include "table/block_builder.h"
13 : #include "table/filter_block.h"
14 : #include "table/format.h"
15 : #include "util/coding.h"
16 : #include "util/crc32c.h"
17 :
18 : namespace leveldb {
19 :
20 488 : struct TableBuilder::Rep {
21 : Options options;
22 : Options index_block_options;
23 : WritableFile* file;
24 : uint64_t offset;
25 : Status status;
26 : BlockBuilder data_block;
27 : BlockBuilder index_block;
28 : std::string last_key;
29 : int64_t num_entries;
30 : bool closed; // Either Finish() or Abandon() has been called.
31 : FilterBlockBuilder* filter_block;
32 :
33 : // We do not emit the index entry for a block until we have seen the
34 : // first key for the next data block. This allows us to use shorter
35 : // keys in the index block. For example, consider a block boundary
36 : // between the keys "the quick brown fox" and "the who". We can use
37 : // "the r" as the key for the index block entry since it is >= all
38 : // entries in the first block and < all entries in subsequent
39 : // blocks.
40 : //
41 : // Invariant: r->pending_index_entry is true only if data_block is empty.
42 : bool pending_index_entry;
43 : BlockHandle pending_handle; // Handle to add to index block
44 :
45 : std::string compressed_output;
46 :
47 122 : Rep(const Options& opt, WritableFile* f)
48 : : options(opt),
49 : index_block_options(opt),
50 : file(f),
51 : offset(0),
52 : data_block(&options),
53 : index_block(&index_block_options),
54 : num_entries(0),
55 : closed(false),
56 122 : filter_block(opt.filter_policy == NULL ? NULL
57 122 : : new FilterBlockBuilder(opt.filter_policy)),
58 732 : pending_index_entry(false) {
59 122 : index_block_options.block_restart_interval = 1;
60 122 : }
61 : };
62 :
63 122 : TableBuilder::TableBuilder(const Options& options, WritableFile* file)
64 122 : : rep_(new Rep(options, file)) {
65 122 : if (rep_->filter_block != NULL) {
66 122 : rep_->filter_block->StartBlock(0);
67 : }
68 122 : }
69 :
70 122 : TableBuilder::~TableBuilder() {
71 122 : assert(rep_->closed); // Catch errors where caller forgot to call Finish()
72 122 : delete rep_->filter_block;
73 122 : delete rep_;
74 122 : }
75 :
76 0 : Status TableBuilder::ChangeOptions(const Options& options) {
77 : // Note: if more fields are added to Options, update
78 : // this function to catch changes that should not be allowed to
79 : // change in the middle of building a Table.
80 0 : if (options.comparator != rep_->options.comparator) {
81 0 : return Status::InvalidArgument("changing comparator while building table");
82 : }
83 :
84 : // Note that any live BlockBuilders point to rep_->options and therefore
85 : // will automatically pick up the updated options.
86 0 : rep_->options = options;
87 0 : rep_->index_block_options = options;
88 0 : rep_->index_block_options.block_restart_interval = 1;
89 : return Status::OK();
90 : }
91 :
92 18237 : void TableBuilder::Add(const Slice& key, const Slice& value) {
93 18237 : Rep* r = rep_;
94 18237 : assert(!r->closed);
95 36474 : if (!ok()) return;
96 18237 : if (r->num_entries > 0) {
97 36230 : assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
98 : }
99 :
100 18237 : if (r->pending_index_entry) {
101 790 : assert(r->data_block.empty());
102 395 : r->options.comparator->FindShortestSeparator(&r->last_key, key);
103 : std::string handle_encoding;
104 395 : r->pending_handle.EncodeTo(&handle_encoding);
105 790 : r->index_block.Add(r->last_key, Slice(handle_encoding));
106 395 : r->pending_index_entry = false;
107 : }
108 :
109 18237 : if (r->filter_block != NULL) {
110 18237 : r->filter_block->AddKey(key);
111 : }
112 :
113 18237 : r->last_key.assign(key.data(), key.size());
114 18237 : r->num_entries++;
115 18237 : r->data_block.Add(key, value);
116 :
117 18237 : const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
118 18237 : if (estimated_block_size >= r->options.block_size) {
119 395 : Flush();
120 : }
121 : }
122 :
123 517 : void TableBuilder::Flush() {
124 517 : Rep* r = rep_;
125 517 : assert(!r->closed);
126 517 : if (!ok()) return;
127 1034 : if (r->data_block.empty()) return;
128 517 : assert(!r->pending_index_entry);
129 517 : WriteBlock(&r->data_block, &r->pending_handle);
130 517 : if (ok()) {
131 517 : r->pending_index_entry = true;
132 1034 : r->status = r->file->Flush();
133 : }
134 517 : if (r->filter_block != NULL) {
135 517 : r->filter_block->StartBlock(r->offset);
136 : }
137 : }
138 :
139 761 : void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
140 : // File format contains a sequence of blocks where each block has:
141 : // block_data: uint8[n]
142 : // type: uint8
143 : // crc: uint32
144 761 : assert(ok());
145 761 : Rep* r = rep_;
146 761 : Slice raw = block->Finish();
147 :
148 : Slice block_contents;
149 761 : CompressionType type = r->options.compression;
150 : // TODO(postrelease): Support more compression options: zlib?
151 761 : switch (type) {
152 : case kNoCompression:
153 761 : block_contents = raw;
154 761 : break;
155 :
156 : case kSnappyCompression: {
157 0 : std::string* compressed = &r->compressed_output;
158 0 : if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
159 : compressed->size() < raw.size() - (raw.size() / 8u)) {
160 : block_contents = *compressed;
161 : } else {
162 : // Snappy not supported, or compressed less than 12.5%, so just
163 : // store uncompressed form
164 0 : block_contents = raw;
165 0 : type = kNoCompression;
166 : }
167 : break;
168 : }
169 : }
170 761 : WriteRawBlock(block_contents, type, handle);
171 761 : r->compressed_output.clear();
172 761 : block->Reset();
173 761 : }
174 :
175 883 : void TableBuilder::WriteRawBlock(const Slice& block_contents,
176 : CompressionType type,
177 : BlockHandle* handle) {
178 883 : Rep* r = rep_;
179 883 : handle->set_offset(r->offset);
180 883 : handle->set_size(block_contents.size());
181 1766 : r->status = r->file->Append(block_contents);
182 1766 : if (r->status.ok()) {
183 : char trailer[kBlockTrailerSize];
184 883 : trailer[0] = type;
185 1766 : uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
186 883 : crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
187 883 : EncodeFixed32(trailer+1, crc32c::Mask(crc));
188 2649 : r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
189 1766 : if (r->status.ok()) {
190 883 : r->offset += block_contents.size() + kBlockTrailerSize;
191 : }
192 : }
193 883 : }
194 :
195 0 : Status TableBuilder::status() const {
196 20520 : return rep_->status;
197 : }
198 :
199 122 : Status TableBuilder::Finish() {
200 122 : Rep* r = rep_;
201 122 : Flush();
202 122 : assert(!r->closed);
203 122 : r->closed = true;
204 :
205 : BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
206 :
207 : // Write filter block
208 122 : if (ok() && r->filter_block != NULL) {
209 122 : WriteRawBlock(r->filter_block->Finish(), kNoCompression,
210 122 : &filter_block_handle);
211 : }
212 :
213 : // Write metaindex block
214 122 : if (ok()) {
215 122 : BlockBuilder meta_index_block(&r->options);
216 122 : if (r->filter_block != NULL) {
217 : // Add mapping from "filter.Name" to location of filter data
218 244 : std::string key = "filter.";
219 122 : key.append(r->options.filter_policy->Name());
220 : std::string handle_encoding;
221 122 : filter_block_handle.EncodeTo(&handle_encoding);
222 122 : meta_index_block.Add(key, handle_encoding);
223 : }
224 :
225 : // TODO(postrelease): Add stats and other meta blocks
226 122 : WriteBlock(&meta_index_block, &metaindex_block_handle);
227 : }
228 :
229 : // Write index block
230 122 : if (ok()) {
231 122 : if (r->pending_index_entry) {
232 122 : r->options.comparator->FindShortSuccessor(&r->last_key);
233 : std::string handle_encoding;
234 122 : r->pending_handle.EncodeTo(&handle_encoding);
235 244 : r->index_block.Add(r->last_key, Slice(handle_encoding));
236 122 : r->pending_index_entry = false;
237 : }
238 122 : WriteBlock(&r->index_block, &index_block_handle);
239 : }
240 :
241 : // Write footer
242 122 : if (ok()) {
243 : Footer footer;
244 122 : footer.set_metaindex_handle(metaindex_block_handle);
245 122 : footer.set_index_handle(index_block_handle);
246 : std::string footer_encoding;
247 122 : footer.EncodeTo(&footer_encoding);
248 366 : r->status = r->file->Append(footer_encoding);
249 244 : if (r->status.ok()) {
250 122 : r->offset += footer_encoding.size();
251 : }
252 : }
253 244 : return r->status;
254 : }
255 :
256 0 : void TableBuilder::Abandon() {
257 0 : Rep* r = rep_;
258 0 : assert(!r->closed);
259 0 : r->closed = true;
260 0 : }
261 :
262 1526 : uint64_t TableBuilder::NumEntries() const {
263 1526 : return rep_->num_entries;
264 : }
265 :
266 1641 : uint64_t TableBuilder::FileSize() const {
267 1641 : return rep_->offset;
268 : }
269 :
270 : } // namespace leveldb
|