diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 4b9fe316c6..3cb8d5a4d5 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -77,6 +77,20 @@ assert UINT32_MAX == 2**32-1 assert _MAX_VALUE % 2 == 1 +def hashindex_variant(fn): + """peek into an index file and find out what it is""" + with open(fn, 'rb') as f: + hh = f.read(18) # len(HashHeader) + magic = hh[0:8] + if magic == b'BORG_IDX': + key_size = hh[16] + value_size = hh[17] + return f'k{key_size}_v{value_size}' + if magic == b'12345678': # used by unit tests + return 'k32_v16' # just return the current variant + raise ValueError(f'unknown hashindex format, magic: {magic!r}') + + @cython.internal cdef class IndexBase: cdef HashIndex *index @@ -196,8 +210,115 @@ cdef class FuseVersionsIndex(IndexBase): return hashindex_get(self.index, key) != NULL +NSIndexEntry = namedtuple('NSIndexEntry', 'segment offset size') + + cdef class NSIndex(IndexBase): + value_size = 16 + + def __getitem__(self, key): + assert len(key) == self.key_size + data = hashindex_get(self.index, key) + if not data: + raise KeyError(key) + cdef uint32_t segment = _le32toh(data[0]) + assert segment <= _MAX_VALUE, "maximum number of segments reached" + return NSIndexEntry(segment, _le32toh(data[1]), _le32toh(data[2])) + + def __setitem__(self, key, value): + assert len(key) == self.key_size + cdef uint32_t[4] data + cdef uint32_t segment = value[0] + assert segment <= _MAX_VALUE, "maximum number of segments reached" + data[0] = _htole32(segment) + data[1] = _htole32(value[1]) + data[2] = _htole32(value[2]) + data[3] = 0 # init flags to all cleared + if not hashindex_set(self.index, key, data): + raise Exception('hashindex_set failed') + + def __contains__(self, key): + cdef uint32_t segment + assert len(key) == self.key_size + data = hashindex_get(self.index, key) + if data != NULL: + segment = _le32toh(data[0]) + assert segment <= _MAX_VALUE, "maximum number of segments reached" + return data != NULL + + def iteritems(self, marker=None, mask=0, value=0): + """iterate over all items or optionally only over items having specific flag values""" + cdef const unsigned char *key + assert isinstance(mask, int) + assert isinstance(value, int) + iter = NSKeyIterator(self.key_size, mask, value) + iter.idx = self + iter.index = self.index + if marker: + key = hashindex_get(self.index, marker) + if marker is None: + raise IndexError + iter.key = key - self.key_size + return iter + + def flags(self, key, mask=0xFFFFFFFF, value=None): + """query and optionally set flags""" + assert len(key) == self.key_size + assert isinstance(mask, int) + data = hashindex_get(self.index, key) + if not data: + raise KeyError(key) + flags = _le32toh(data[3]) + if isinstance(value, int): + new_flags = flags & ~mask # clear masked bits + new_flags |= value & mask # set value bits + data[3] = _htole32(new_flags) + return flags & mask # always return previous flags value + + +cdef class NSKeyIterator: + cdef NSIndex idx + cdef HashIndex *index + cdef const unsigned char *key + cdef int key_size + cdef int exhausted + cdef int flag_mask + cdef int flag_value + + def __cinit__(self, key_size, mask, value): + self.key = NULL + self.key_size = key_size + # note: mask and value both default to 0, so they will match all entries + self.flag_mask = _htole32(mask) + self.flag_value = _htole32(value) + self.exhausted = 0 + + def __iter__(self): + return self + + def __next__(self): + cdef uint32_t *value + if self.exhausted: + raise StopIteration + while True: + self.key = hashindex_next_key(self.index, self.key) + if not self.key: + self.exhausted = 1 + raise StopIteration + value = (self.key + self.key_size) + if value[3] & self.flag_mask == self.flag_value: + # we found a matching entry! + break + + cdef uint32_t segment = _le32toh(value[0]) + assert segment <= _MAX_VALUE, "maximum number of segments reached" + return ((self.key)[:self.key_size], + NSIndexEntry(segment, _le32toh(value[1]), _le32toh(value[2]))) + + +cdef class NSIndex1(IndexBase): # legacy borg 1.x + value_size = 8 def __getitem__(self, key): @@ -230,7 +351,7 @@ cdef class NSIndex(IndexBase): def iteritems(self, marker=None): cdef const unsigned char *key - iter = NSKeyIterator(self.key_size) + iter = NSKeyIterator1(self.key_size) iter.idx = self iter.index = self.index if marker: @@ -241,8 +362,8 @@ cdef class NSIndex(IndexBase): return iter -cdef class NSKeyIterator: - cdef NSIndex idx +cdef class NSKeyIterator1: # legacy borg 1.x + cdef NSIndex1 idx cdef HashIndex *index cdef const unsigned char *key cdef int key_size diff --git a/src/borg/repository.py b/src/borg/repository.py index 3fcc72aadd..1020a1eabe 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -13,7 +13,7 @@ from itertools import islice from .constants import * # NOQA -from .hashindex import NSIndex +from .hashindex import NSIndexEntry, NSIndex, NSIndex1, hashindex_variant from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size from .helpers import Location from .helpers import ProgressIndicatorPercent @@ -52,6 +52,18 @@ FreeSpace = partial(defaultdict, int) +def header_size(tag): + if tag == TAG_PUT2: + size = LoggedIO.HEADER_ID_SIZE + LoggedIO.ENTRY_HASH_SIZE + elif tag == TAG_PUT or tag == TAG_DELETE: + size = LoggedIO.HEADER_ID_SIZE + elif tag == TAG_COMMIT: + size = LoggedIO.header_fmt.size + else: + raise ValueError(f"unsupported tag: {tag!r}") + return size + + class Repository: """ Filesystem based transactional key value store @@ -525,10 +537,14 @@ def open_index(self, transaction_id, auto_recover=True): if transaction_id is None: return NSIndex() index_path = os.path.join(self.path, 'index.%d' % transaction_id) + variant = hashindex_variant(index_path) integrity_data = self._read_integrity(transaction_id, 'index') try: with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd: - return NSIndex.read(fd) + if variant == 'k32_v16': + return NSIndex.read(fd) + if variant == 'k32_v8': # legacy + return NSIndex1.read(fd) except (ValueError, OSError, FileIntegrityError) as exc: logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc) os.unlink(index_path) @@ -798,14 +814,14 @@ def complete_xfer(intermediate=True): if tag == TAG_COMMIT: continue in_index = self.index.get(key) - is_index_object = in_index == (segment, offset) + is_index_object = in_index and (in_index.segment, in_index.offset) == (segment, offset) if tag in (TAG_PUT2, TAG_PUT) and is_index_object: try: new_segment, offset = self.io.write_put(key, data, raise_full=True) except LoggedIO.SegmentFull: complete_xfer() new_segment, offset = self.io.write_put(key, data) - self.index[key] = new_segment, offset + self.index[key] = NSIndexEntry(new_segment, offset, len(data)) segments.setdefault(new_segment, 0) segments[new_segment] += 1 segments[segment] -= 1 @@ -821,10 +837,7 @@ def complete_xfer(intermediate=True): # do not remove entry with empty shadowed_segments list here, # it is needed for shadowed_put_exists code (see below)! pass - if tag == TAG_PUT2: - self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE - elif tag == TAG_PUT: - self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.storage_quota_use -= header_size(tag) + len(data) elif tag == TAG_DELETE and not in_index: # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag, # therefore we do not drop the delete, but write it to a current segment. @@ -919,27 +932,26 @@ def _update_index(self, segment, objects, report=None): if tag in (TAG_PUT2, TAG_PUT): try: # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space - s, _ = self.index[key] - self.compact[s] += size - self.segments[s] -= 1 + in_index = self.index[key] + self.compact[in_index.segment] += header_size(tag) + size + self.segments[in_index.segment] -= 1 except KeyError: pass - self.index[key] = segment, offset + self.index[key] = NSIndexEntry(segment, offset, size) self.segments[segment] += 1 - self.storage_quota_use += size # note: size already includes the put_header_fmt overhead + self.storage_quota_use += header_size(tag) + size elif tag == TAG_DELETE: try: # if the deleted PUT is not in the index, there is nothing to clean up - s, offset = self.index.pop(key) + in_index = self.index.pop(key) except KeyError: pass else: - if self.io.segment_exists(s): + if self.io.segment_exists(in_index.segment): # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment # is already gone, then it was already compacted. - self.segments[s] -= 1 - size = self.io.read(s, offset, key, read_data=False) - self.compact[s] += size + self.segments[in_index.segment] -= 1 + self.compact[in_index.segment] += header_size(tag) + in_index.size elif tag == TAG_COMMIT: continue else: @@ -968,12 +980,13 @@ def _rebuild_sparse(self, segment): self.compact[segment] = 0 for tag, key, offset, size in self.io.iter_objects(segment, read_data=False): if tag in (TAG_PUT2, TAG_PUT): - if self.index.get(key, (-1, -1)) != (segment, offset): + in_index = self.index.get(key) + if not in_index or (in_index.segment, in_index.offset) != (segment, offset): # This PUT is superseded later - self.compact[segment] += size + self.compact[segment] += header_size(tag) + size elif tag == TAG_DELETE: # The outcome of the DELETE has been recorded in the PUT branch already - self.compact[segment] += size + self.compact[segment] += header_size(tag) + size def check(self, repair=False, save_space=False, max_duration=0): """Check repository consistency @@ -1169,7 +1182,7 @@ def scan(self, limit=None, marker=None): self.index = self.open_index(transaction_id) at_start = marker is None # smallest valid seg is 0, smallest valid offs is 8 - start_segment, start_offset = (0, 0) if at_start else self.index[marker] + start_segment, start_offset, _ = (0, 0, 0) if at_start else self.index[marker] result = [] for segment, filename in self.io.segment_iterator(start_segment): obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False) @@ -1186,19 +1199,21 @@ def scan(self, limit=None, marker=None): # also, for the next segment, we need to start at offset 0. start_offset = 0 continue - if tag in (TAG_PUT2, TAG_PUT) and (segment, offset) == self.index.get(id): - # we have found an existing and current object - result.append(id) - if len(result) == limit: - return result + if tag in (TAG_PUT2, TAG_PUT): + in_index = self.index.get(id) + if in_index and (in_index.segment, in_index.offset) == (segment, offset): + # we have found an existing and current object + result.append(id) + if len(result) == limit: + return result return result def get(self, id): if not self.index: self.index = self.open_index(self.get_transaction_id()) try: - segment, offset = self.index[id] - return self.io.read(segment, offset, id) + in_index = NSIndexEntry(*((self.index[id] + (None, ))[:3])) # legacy: index entriess have no size element + return self.io.read(in_index.segment, in_index.offset, id, expected_size=in_index.size) except KeyError: raise self.ObjectNotFound(id, self.path) from None @@ -1215,7 +1230,7 @@ def put(self, id, data, wait=True): if not self._active_txn: self.prepare_txn(self.get_transaction_id()) try: - segment, offset = self.index[id] + in_index = self.index[id] except KeyError: pass else: @@ -1223,12 +1238,12 @@ def put(self, id, data, wait=True): # we do not want to update the shadow_index here, because # we know already that we will PUT to this id, so it will # be in the repo index (and we won't need it in the shadow_index). - self._delete(id, segment, offset, update_shadow_index=False) + self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=False) segment, offset = self.io.write_put(id, data) - self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE + self.storage_quota_use += header_size(TAG_PUT2) + len(data) self.segments.setdefault(segment, 0) self.segments[segment] += 1 - self.index[id] = segment, offset + self.index[id] = NSIndexEntry(segment, offset, len(data)) if self.storage_quota and self.storage_quota_use > self.storage_quota: self.transaction_doomed = self.StorageQuotaExceeded( format_file_size(self.storage_quota), format_file_size(self.storage_quota_use)) @@ -1243,22 +1258,21 @@ def delete(self, id, wait=True): if not self._active_txn: self.prepare_txn(self.get_transaction_id()) try: - segment, offset = self.index.pop(id) + in_index = self.index.pop(id) except KeyError: raise self.ObjectNotFound(id, self.path) from None # if we get here, there is an object with this id in the repo, # we write a DEL here that shadows the respective PUT. # after the delete, the object is not in the repo index any more, # for the compaction code, we need to update the shadow_index in this case. - self._delete(id, segment, offset, update_shadow_index=True) + self._delete(id, in_index.segment, in_index.offset, in_index.size, update_shadow_index=True) - def _delete(self, id, segment, offset, *, update_shadow_index): + def _delete(self, id, segment, offset, size, *, update_shadow_index): # common code used by put and delete if update_shadow_index: self.shadow_index.setdefault(id, []).append(segment) self.segments[segment] -= 1 - size = self.io.read(segment, offset, id, read_data=False) - self.compact[segment] += size + self.compact[segment] += header_size(TAG_PUT2) + size segment, size = self.io.write_delete(id) self.compact[segment] += size self.segments.setdefault(segment, 0) @@ -1448,6 +1462,9 @@ def clean_old(): del self.fds[k] clean_old() + if self._write_fd is not None: + # without this, we have a test failure now + self._write_fd.sync() try: ts, fd = self.fds[segment] except KeyError: @@ -1515,7 +1532,8 @@ def iter_objects(self, segment, offset=0, include_data=False, read_data=True): if include_data: yield tag, key, offset, data else: - yield tag, key, offset, size + yield tag, key, offset, size - header_size(tag) # corresponds to len(data) + assert size >= 0 offset += size # we must get the fd via get_fd() here again as we yielded to our caller and it might # have triggered closing of the fd we had before (e.g. by calling io.read() for @@ -1580,7 +1598,7 @@ def entry_hash(self, *data): h.update(d) return h.digest() - def read(self, segment, offset, id, read_data=True): + def read(self, segment, offset, id, read_data=True, *, expected_size=None): """ Read entry from *segment* at *offset* with *id*. If read_data is False the size of the entry is returned instead. @@ -1596,7 +1614,11 @@ def read(self, segment, offset, id, read_data=True): if id != key: raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format( segment, offset)) - return data if read_data else size + data_size_from_header = size - header_size(tag) + if expected_size is not None and expected_size != data_size_from_header: + raise IntegrityError(f'size from repository index: {expected_size} != ' + f'size from entry header: {data_size_from_header}') + return data if read_data else data_size_from_header def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True): """ diff --git a/src/borg/selftest.py b/src/borg/selftest.py index 006e85a56b..00356cd0ed 100644 --- a/src/borg/selftest.py +++ b/src/borg/selftest.py @@ -33,7 +33,7 @@ ChunkerTestCase, ] -SELFTEST_COUNT = 36 +SELFTEST_COUNT = 37 class SelfTestResult(TestResult): diff --git a/src/borg/testsuite/hashindex.py b/src/borg/testsuite/hashindex.py index 7a3b53e33c..b05ff77fdf 100644 --- a/src/borg/testsuite/hashindex.py +++ b/src/borg/testsuite/hashindex.py @@ -87,8 +87,8 @@ def _generic_test(self, cls, make_value, sha): del idx def test_nsindex(self): - self._generic_test(NSIndex, lambda x: (x, x), - '85f72b036c692c8266e4f51ccf0cff2147204282b5e316ae508d30a448d88fef') + self._generic_test(NSIndex, lambda x: (x, x, x), + '7d70671d0b7e9d2f51b2691ecf35184b9f8ecc1202cceb2748c905c8fc04c256') def test_chunkindex(self): self._generic_test(ChunkIndex, lambda x: (x, x), @@ -102,7 +102,7 @@ def test_resize(self): initial_size = os.path.getsize(filepath) self.assert_equal(len(idx), 0) for x in range(n): - idx[H(x)] = x, x + idx[H(x)] = x, x, x, x idx.write(filepath) assert initial_size < os.path.getsize(filepath) for x in range(n): @@ -114,7 +114,7 @@ def test_resize(self): def test_iteritems(self): idx = NSIndex() for x in range(100): - idx[H(x)] = x, x + idx[H(x)] = x, x, x, x iterator = idx.iteritems() all = list(iterator) self.assert_equal(len(all), 100) @@ -153,6 +153,70 @@ def test_chunkindex_summarize(self): assert chunks == 1 + 2 + 3 assert unique_chunks == 3 + def test_flags(self): + idx = NSIndex() + key = H(0) + self.assert_raises(KeyError, idx.flags, key, 0) + idx[key] = 0, 0, 0 # create entry + # check bit 0 and 1, should be both 0 after entry creation + self.assert_equal(idx.flags(key, mask=3), 0) + # set bit 0 + idx.flags(key, mask=1, value=1) + self.assert_equal(idx.flags(key, mask=1), 1) + # set bit 1 + idx.flags(key, mask=2, value=2) + self.assert_equal(idx.flags(key, mask=2), 2) + # check both bit 0 and 1, both should be set + self.assert_equal(idx.flags(key, mask=3), 3) + # clear bit 1 + idx.flags(key, mask=2, value=0) + self.assert_equal(idx.flags(key, mask=2), 0) + # clear bit 0 + idx.flags(key, mask=1, value=0) + self.assert_equal(idx.flags(key, mask=1), 0) + # check both bit 0 and 1, both should be cleared + self.assert_equal(idx.flags(key, mask=3), 0) + + def test_flags_iteritems(self): + idx = NSIndex() + keys_flagged0 = {H(i) for i in (1, 2, 3, 42)} + keys_flagged1 = {H(i) for i in (11, 12, 13, 142)} + keys_flagged2 = {H(i) for i in (21, 22, 23, 242)} + keys_flagged3 = {H(i) for i in (31, 32, 33, 342)} + for key in keys_flagged0: + idx[key] = 0, 0, 0 # create entry + idx.flags(key, mask=3, value=0) # not really necessary, unflagged is default + for key in keys_flagged1: + idx[key] = 0, 0, 0 # create entry + idx.flags(key, mask=3, value=1) + for key in keys_flagged2: + idx[key] = 0, 0, 0 # create entry + idx.flags(key, mask=3, value=2) + for key in keys_flagged3: + idx[key] = 0, 0, 0 # create entry + idx.flags(key, mask=3, value=3) + # check if we can iterate over all items + k_all = {k for k, v in idx.iteritems()} + self.assert_equal(k_all, keys_flagged0 | keys_flagged1 | keys_flagged2 | keys_flagged3) + # check if we can iterate over the flagged0 items + k0 = {k for k, v in idx.iteritems(mask=3, value=0)} + self.assert_equal(k0, keys_flagged0) + # check if we can iterate over the flagged1 items + k1 = {k for k, v in idx.iteritems(mask=3, value=1)} + self.assert_equal(k1, keys_flagged1) + # check if we can iterate over the flagged2 items + k1 = {k for k, v in idx.iteritems(mask=3, value=2)} + self.assert_equal(k1, keys_flagged2) + # check if we can iterate over the flagged3 items + k1 = {k for k, v in idx.iteritems(mask=3, value=3)} + self.assert_equal(k1, keys_flagged3) + # check if we can iterate over the flagged1 + flagged3 items + k1 = {k for k, v in idx.iteritems(mask=1, value=1)} + self.assert_equal(k1, keys_flagged1 | keys_flagged3) + # check if we can iterate over the flagged0 + flagged2 items + k1 = {k for k, v in idx.iteritems(mask=1, value=0)} + self.assert_equal(k1, keys_flagged0 | keys_flagged2) + class HashIndexExtraTestCase(BaseTestCase): """These tests are separate because they should not become part of the selftest. @@ -514,9 +578,9 @@ class NSIndexTestCase(BaseTestCase): def test_nsindex_segment_limit(self): idx = NSIndex() with self.assert_raises(AssertionError): - idx[H(1)] = NSIndex.MAX_VALUE + 1, 0 + idx[H(1)] = NSIndex.MAX_VALUE + 1, 0, 0, 0 assert H(1) not in idx - idx[H(2)] = NSIndex.MAX_VALUE, 0 + idx[H(2)] = NSIndex.MAX_VALUE, 0, 0, 0 assert H(2) in idx @@ -531,38 +595,38 @@ def test_bug_4829(self): from struct import pack - def HH(x, y): - # make some 32byte long thing that depends on x and y. + def HH(x, y, z): + # make some 32byte long thing that depends on x, y, z. # same x will mean a collision in the hashtable as bucket index is computed from # first 4 bytes. giving a specific x targets bucket index x. # y is to create different keys and does not go into the bucket index calculation. # so, same x + different y --> collision - return pack('