Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add db method to determine recovered records using sequence number
  • Loading branch information
sydp committed May 16, 2024
commit 9e6f0ee5b209c10dc99bcf0512eccefb84de582d
12 changes: 9 additions & 3 deletions dfindexeddb/leveldb/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class PhysicalRecord(utils.FromDecoderMixin):
@classmethod
def FromDecoder(
cls, decoder: utils.LevelDBDecoder, base_offset: int = 0
) -> PhysicalRecord:
) -> Optional[PhysicalRecord]:
"""Decodes a PhysicalRecord from the current position of a LevelDBDecoder.

Args:
Expand All @@ -161,11 +161,13 @@ def FromDecoder(
read from.

Returns:
A PhysicalRecord.
A PhysicalRecord or None if the parsed header is 0.
"""
offset, checksum = decoder.DecodeUint32()
_, length = decoder.DecodeUint16()
_, record_type_byte = decoder.DecodeUint8()
if checksum == 0 or length == 0 or record_type_byte == 0:
return None
try:
record_type = definitions.LogFilePhysicalRecordType(record_type_byte)
except ValueError as error:
Expand Down Expand Up @@ -206,7 +208,11 @@ def GetPhysicalRecords(self) -> Generator[PhysicalRecord, None, None]:
buffer_length = len(self.data)

while buffer.tell() + PhysicalRecord.PHYSICAL_HEADER_LENGTH < buffer_length:
yield PhysicalRecord.FromStream(buffer, base_offset=self.offset)
record = PhysicalRecord.FromStream(buffer, base_offset=self.offset)
if record:
yield record
else:
return

@classmethod
def FromStream(cls, stream: BinaryIO) -> Optional[Block]:
Expand Down
32 changes: 30 additions & 2 deletions dfindexeddb/leveldb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
"""A module for records from LevelDB files."""
from __future__ import annotations
from collections import defaultdict
import dataclasses
import pathlib
import re
Expand Down Expand Up @@ -297,6 +298,34 @@ def _RecordsByManifest(self) -> Generator[LevelDBRecord, None, None]:
record.recovered = True
yield record

def _RecordsBySequenceNumber(self) -> Generator[LevelDBRecord, None, None]:
"""Yields LevelDBRecords using the sequence number to determine if a record
is active.

Yields:
LevelDBRecords.
"""
unsorted_records = defaultdict(list)

for filename in self.foldername.iterdir():
for leveldb_record in LevelDBRecord.FromFile(filename):
if leveldb_record:
unsorted_records[leveldb_record.record.key].append(leveldb_record)
for key, unsorted_records in unsorted_records.items():
num_unsorted_records = len(unsorted_records)
if num_unsorted_records == 1:
unsorted_records[0].recovered = False
yield unsorted_records[0]
else:
for i, record in enumerate(sorted(
unsorted_records, key=lambda x: (
x.record.sequence_number, x.record.offset))):
if i == num_unsorted_records:
record.recovered = False
else:
record.recovered = True
yield record

def GetRecords(
self,
use_manifest: bool = False
Expand All @@ -313,5 +342,4 @@ def GetRecords(
if use_manifest:
yield from self._RecordsByManifest()
else:
for filename in self.foldername.iterdir():
yield from LevelDBRecord.FromFile(filename)
yield from self._RecordsBySequenceNumber()