Skip to content

Commit bf3987f

Browse files
authored
Ensure external streams checkpoint on commit (#2652)
* Fix checkpoint persistence for external streams * Restore checkpoint pragma after stream sessions
1 parent c32a197 commit bf3987f

3 files changed

Lines changed: 133 additions & 2 deletions

File tree

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
using System;
2+
using System.IO;
3+
using FluentAssertions;
4+
using LiteDB;
5+
using LiteDB.Tests;
6+
using Xunit;
7+
8+
namespace LiteDB.Tests.Issues
9+
{
10+
public class IssueCheckpointFlush_Tests
11+
{
12+
private class Entity
13+
{
14+
public int Id { get; set; }
15+
16+
public string Value { get; set; } = string.Empty;
17+
}
18+
19+
[Fact]
20+
public void CommittedChangesAreLostWhenClosingExternalStreamWithoutCheckpoint()
21+
{
22+
using var tempFile = new TempFile();
23+
24+
using (var createStream = new FileStream(tempFile.Filename, FileMode.Create, FileAccess.ReadWrite, FileShare.ReadWrite))
25+
{
26+
using var createDb = new LiteDatabase(createStream);
27+
var collection = createDb.GetCollection<Entity>("entities");
28+
29+
collection.Upsert(new Entity { Id = 1, Value = "initial" });
30+
31+
createDb.Commit();
32+
createStream.Flush(true);
33+
}
34+
35+
var updateStream = new FileStream(tempFile.Filename, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite);
36+
var updateDb = new LiteDatabase(updateStream);
37+
var updateCollection = updateDb.GetCollection<Entity>("entities");
38+
39+
updateCollection.Upsert(new Entity { Id = 1, Value = "updated" });
40+
41+
updateDb.Commit();
42+
updateStream.Flush(true);
43+
updateStream.Dispose();
44+
updateDb = null;
45+
46+
GC.Collect();
47+
GC.WaitForPendingFinalizers();
48+
49+
using (var verifyStream = new FileStream(tempFile.Filename, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite))
50+
using (var verifyDb = new LiteDatabase(verifyStream))
51+
{
52+
var document = verifyDb.GetCollection<Entity>("entities").FindById(1);
53+
54+
document.Should().NotBeNull();
55+
document!.Value.Should().Be("updated");
56+
}
57+
}
58+
59+
[Fact]
60+
public void StreamConstructorRestoresCheckpointSizeAfterDisposal()
61+
{
62+
using var tempFile = new TempFile();
63+
64+
using (var fileDb = new LiteDatabase(tempFile.Filename))
65+
{
66+
fileDb.CheckpointSize.Should().Be(1000);
67+
}
68+
69+
using (var stream = new FileStream(tempFile.Filename, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite))
70+
using (var streamDb = new LiteDatabase(stream))
71+
{
72+
streamDb.CheckpointSize.Should().Be(1);
73+
}
74+
75+
using (var reopened = new LiteDatabase(tempFile.Filename))
76+
{
77+
reopened.CheckpointSize.Should().Be(1000);
78+
}
79+
}
80+
81+
[Fact]
82+
public void StreamConstructorAllowsReadOnlyStreams()
83+
{
84+
using var tempFile = new TempFile();
85+
86+
using (var setup = new LiteDatabase(tempFile.Filename))
87+
{
88+
var collection = setup.GetCollection<Entity>("entities");
89+
90+
collection.Insert(new Entity { Id = 1, Value = "initial" });
91+
92+
setup.Checkpoint();
93+
}
94+
95+
using var readOnlyStream = new FileStream(tempFile.Filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
96+
using var readOnlyDb = new LiteDatabase(readOnlyStream);
97+
98+
var document = readOnlyDb.GetCollection<Entity>("entities").FindById(1);
99+
100+
document.Should().NotBeNull();
101+
document!.Value.Should().Be("initial");
102+
}
103+
}
104+
}

LiteDB/Client/Database/LiteDatabase.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public partial class LiteDatabase : ILiteDatabase
1919
private readonly ILiteEngine _engine;
2020
private readonly BsonMapper _mapper;
2121
private readonly bool _disposeOnClose;
22+
private readonly int? _checkpointOverride;
2223

2324
/// <summary>
2425
/// Get current instance of BsonMapper used in this database instance (can be BsonMapper.Global)
@@ -66,6 +67,27 @@ public LiteDatabase(Stream stream, BsonMapper mapper = null, Stream logStream =
6667
_engine = new LiteEngine(settings);
6768
_mapper = mapper ?? BsonMapper.Global;
6869
_disposeOnClose = true;
70+
71+
if (logStream == null && stream is not MemoryStream)
72+
{
73+
if (!stream.CanWrite)
74+
{
75+
// Read-only streams cannot participate in eager checkpointing because the process
76+
// writes pages back to the underlying data stream immediately.
77+
}
78+
else
79+
{
80+
// Without a dedicated log stream the WAL lives purely in memory; force
81+
// checkpointing to ensure commits reach the underlying data stream.
82+
var originalCheckpointSize = _engine.Pragma(Pragmas.CHECKPOINT);
83+
84+
if (originalCheckpointSize != 1)
85+
{
86+
_engine.Pragma(Pragmas.CHECKPOINT, 1);
87+
_checkpointOverride = originalCheckpointSize;
88+
}
89+
}
90+
}
6991
}
7092

7193
/// <summary>
@@ -373,6 +395,11 @@ protected virtual void Dispose(bool disposing)
373395
{
374396
if (disposing && _disposeOnClose)
375397
{
398+
if (_checkpointOverride.HasValue)
399+
{
400+
_engine.Pragma(Pragmas.CHECKPOINT, _checkpointOverride.Value);
401+
}
402+
376403
_engine.Dispose();
377404
}
378405
}

LiteDB/Engine/Engine/Transaction.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ private void CommitAndReleaseTransaction(TransactionService transaction)
112112
_monitor.ReleaseTransaction(transaction);
113113

114114
// try checkpoint when finish transaction and log file are bigger than checkpoint pragma value (in pages)
115-
if (_header.Pragmas.Checkpoint > 0 &&
116-
_disk.GetFileLength(FileOrigin.Log) > (_header.Pragmas.Checkpoint * PAGE_SIZE))
115+
if (_header.Pragmas.Checkpoint > 0 &&
116+
_disk.GetFileLength(FileOrigin.Log) >= (_header.Pragmas.Checkpoint * PAGE_SIZE))
117117
{
118118
_walIndex.TryCheckpoint();
119119
}

0 commit comments

Comments
 (0)