Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions spec/clustering_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,52 @@ require "../src/lavinmq/clustering/controller"

alias IndexTree = LavinMQ::MQTT::TopicTree(String)

private def populate_msg_store(msg_store)
segment_size = LavinMQ::Config.instance.segment_size
msg_size = 1000_u64
num_messages = (segment_size // msg_size) + 10
props = LavinMQ::AMQP::Properties.new
num_messages.times do
msg = LavinMQ::Message.new(Time.utc.to_unix_ms, "exchange", "rk", props, msg_size, IO::Memory.new("x" * msg_size.to_i))
msg_store.push(msg)
end
msg_store.@segments.size.should be > 1
end

private def do_full_sync(tcp_server, replicator, wg : WaitGroup? = nil) : Fiber::ExecutionContext::Isolated
Fiber::ExecutionContext::Isolated.new("test-follower") do
client_io = TCPSocket.new("localhost", tcp_server.local_address.port)
begin
# Handshake and authentication
client_io.write LavinMQ::Clustering::Start
client_io.write_bytes replicator.password.bytesize.to_u8, IO::ByteFormat::LittleEndian
client_io.write replicator.password.to_slice
client_io.read_byte
client_io.write_bytes 2i32, IO::ByteFormat::LittleEndian
client_io.flush
# Signal that full sync is about to start
wg.try &.done
sha1_size = Digest::SHA1.new.digest_size
client_lz4 = Compress::LZ4::Reader.new(client_io)
# Do the full sync two times without requesting files (everything is up
# to date)
2.times do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like a comment here as well, explaining why this runs 2 times and maybe what happens in the loop

loop do
filename_len = client_lz4.read_bytes Int32, IO::ByteFormat::LittleEndian
break if filename_len.zero?
client_lz4.skip filename_len
client_lz4.skip sha1_size
end
# 0 means "requesting files done"
client_io.write_bytes 0i32
client_io.flush
end
ensure
client_io.close
end
end
end

describe LavinMQ::Clustering::Client, tags: "etcd" do
add_etcd_around_each

Expand Down Expand Up @@ -378,4 +424,49 @@ describe LavinMQ::Clustering::Client, tags: "etcd" do
end
end
end

describe "full sync when message store is closed" do
it "succeeds when message store is already closed before sync" do
msg_dir = File.join(LavinMQ::Config.instance.data_dir, "sync_after_close_test")
FileUtils.mkdir_p(msg_dir)
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
msg_store = LavinMQ::MessageStore.new(msg_dir, replicator)
populate_msg_store(msg_store)

msg_store.close
Fiber.yield # let the spawned close fiber run so MFiles are unmapped

tcp_server = TCPServer.new("localhost", 0)
spawn(replicator.listen(tcp_server), name: "repli server spec")

do_full_sync(tcp_server, replicator).wait
ensure
replicator.try &.close
tcp_server.try &.close
FileUtils.rm_rf msg_dir if msg_dir
end

it "is not aborted when message store is closed concurrently" do
msg_dir = File.join(LavinMQ::Config.instance.data_dir, "sync_close_concurrent_test")
FileUtils.mkdir_p(msg_dir)
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
msg_store = LavinMQ::MessageStore.new(msg_dir, replicator)
populate_msg_store(msg_store)

tcp_server = TCPServer.new("localhost", 0)
spawn(replicator.listen(tcp_server), name: "repli server spec")

wg = WaitGroup.new(1)
follower_ctx = do_full_sync(tcp_server, replicator, wg)

wg.wait # suspend until negotiation is done and files_with_hash has started
msg_store.close # concurrent close — simulates a corrupt segment triggering close
Fiber.yield # let the spawned close fiber run: wg.wait → segment.close (munmap)
follower_ctx.wait
ensure
replicator.try &.close
tcp_server.try &.close
FileUtils.rm_rf msg_dir if msg_dir
end
end
end
151 changes: 149 additions & 2 deletions spec/message_store_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,65 @@ require "file_utils"
require "time"
require "../src/lavinmq/message_store"

# Replicator mock to "record" files being registered
class SpyReplicator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs a comment - what is this and why is it here?

include LavinMQ::Clustering::Replicator

getter registered_files = Hash(String, Symbol).new
getter deleted_files = Set(String).new

def register_file(path : String)
@registered_files[path] = :path
end

def register_file(file : File)
@registered_files[file.path] = :file
end

def register_file(mfile : MFile)
@registered_files[mfile.path] = :mfile
end

def replace_file(path : String)
end

def append(path : String, obj)
end

def delete_file(path : String, wg : WaitGroup)
@deleted_files << path
end

def followers : Array(LavinMQ::Clustering::Follower)
Array(LavinMQ::Clustering::Follower).new
end

def syncing_followers : Array(LavinMQ::Clustering::Follower)
Array(LavinMQ::Clustering::Follower).new
end

def all_followers : Array(LavinMQ::Clustering::Follower)
Array(LavinMQ::Clustering::Follower).new
end

def close
end

def listen(server : TCPServer)
end

def clear
end

def password : String
""
end

def wait_for_sync(& : -> Nil) : Nil
yield
end
end

def mktmpdir(&)
path = File.tempname
Dir.mkdir_p(path)
Expand All @@ -13,9 +72,9 @@ def mktmpdir(&)
end
end

def with_store(*, durable = true, &)
def with_store(*, replicator = nil, durable = true, &)
mktmpdir do |dir|
store = LavinMQ::MessageStore.new(dir, nil, durable: durable)
store = LavinMQ::MessageStore.new(dir, replicator, durable: durable)
begin
yield store, dir
ensure
Expand Down Expand Up @@ -284,4 +343,92 @@ describe LavinMQ::MessageStore do
end
end
end

describe "replication" do
it "registers the initial segment file" do
mktmpdir do |dir|
replicator = SpyReplicator.new
store = LavinMQ::MessageStore.new(dir, replicator)
store.close
replicator.registered_files.keys.map { |p| File.basename(p) }.should contain("msgs.0000000001")
end
end

it "registers existing segment files on startup" do
mktmpdir do |dir|
File.write(File.join(dir, "msgs.0000000001"), "\x04\x00\x00\x00")
replicator = SpyReplicator.new
store = LavinMQ::MessageStore.new(dir, replicator)
store.close
replicator.registered_files.keys.map { |p| File.basename(p) }.should contain("msgs.0000000001")
end
end

it "registers ack files on startup" do
mktmpdir do |dir|
File.write(File.join(dir, "msgs.0000000001"), "\x04\x00\x00\x00")
File.write(File.join(dir, "acks.0000000001"), "")
replicator = SpyReplicator.new
store = LavinMQ::MessageStore.new(dir, replicator)
store.close
replicator.registered_files.keys.map { |p| File.basename(p) }.should contain("acks.0000000001")
end
end

it "does not register orphaned ack files" do
mktmpdir do |dir|
File.write(File.join(dir, "msgs.0000000001"), "\x04\x00\x00\x00")
File.write(File.join(dir, "acks.0000000002"), "")
replicator = SpyReplicator.new
store = LavinMQ::MessageStore.new(dir, replicator)
store.close
replicator.registered_files.keys.map { |p| File.basename(p) }.should_not contain("acks.0000000002")
end
end

it "deletes orphaned ack files from the replicator on startup" do
mktmpdir do |dir|
File.write(File.join(dir, "msgs.0000000001"), "\x04\x00\x00\x00")
orphan_path = File.join(dir, "acks.0000000002")
File.write(orphan_path, "")
replicator = SpyReplicator.new
store = LavinMQ::MessageStore.new(dir, replicator)
store.close
replicator.deleted_files.should contain(orphan_path)
end
end

it "deletes fully-acked segments from the replicator on startup" do
mktmpdir do |dir|
msg_size = LavinMQ::Config.instance.segment_size.to_u64 - (LavinMQ::BytesMessage::MIN_BYTESIZE + 5)
msg = LavinMQ::Message.new(RoughTime.unix_ms, "e", "k", AMQ::Protocol::Properties.new, msg_size, IO::Memory.new("a" * msg_size))

store = LavinMQ::MessageStore.new(dir, nil, durable: true)
2.times { store.push(msg) }
store.close
wait_for { store.closed }

File.open(File.join(dir, "acks.0000000001"), "w") do |f|
f.write_bytes(4u32)
end

replicator = SpyReplicator.new
store = LavinMQ::MessageStore.new(dir, replicator, durable: true)
store.close

replicator.deleted_files.map { |p| File.basename(p) }.should contain("msgs.0000000001")
end
end

it "re-registers files without an MFile reference when closed" do
mktmpdir do |dir|
replicator = SpyReplicator.new
store = LavinMQ::MessageStore.new(dir, replicator)
store.close
sleep 1.millisecond
file_registrations = replicator.registered_files.select { |_, t| t == :path }
file_registrations.keys.map { |p| File.basename(p) }.should contain("msgs.0000000001")
end
end
end
end
2 changes: 2 additions & 0 deletions src/lavinmq/clustering/replicator.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ require "../mfile"
module LavinMQ
module Clustering
module Replicator
abstract def register_file(path : String)
abstract def register_file(file : File)
abstract def register_file(mfile : MFile)
abstract def replace_file(path : String) # only non mfiles are ever replaced
Expand All @@ -15,6 +16,7 @@ module LavinMQ
abstract def listen(server : TCPServer)
abstract def clear
abstract def password : String
abstract def wait_for_sync(& : -> Nil) : Nil
end
end
end
14 changes: 14 additions & 0 deletions src/lavinmq/clustering/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,22 @@ module LavinMQ
@checksums.clear
end

def register_file(path : String)
path = strip_datadir path
@files[path] = nil
@checksums.delete(path)
end

def register_file(file : File)
path = strip_datadir file.path
@files[path] = nil
@checksums.delete(path)
end

def register_file(mfile : MFile)
path = strip_datadir mfile.path
@files[path] = mfile
@checksums.delete(path)
end

def replace_file(path : String) # only non mfiles are ever replaced
Expand Down Expand Up @@ -126,6 +134,12 @@ module LavinMQ
end
end

def wait_for_sync(& : -> Nil) : Nil
@sync_lock.synchronize do
yield
end
end

def followers : Array(Follower)
@lock.synchronize do
@followers.select(&.synced?).dup # for thread safety
Expand Down
26 changes: 20 additions & 6 deletions src/lavinmq/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,11 @@ module LavinMQ
wg = WaitGroup.new
replicator.delete_file(file.path, wg)
spawn(name: "wait for file deletion is replicated") do
wg.wait
ensure
file.close
replicator.wait_for_sync do
wg.wait
ensure
file.close
end
end
else
file.close
Expand All @@ -263,13 +265,25 @@ module LavinMQ
@empty.close
# To make sure that all replication actions for the segments
# have finished wait for a delete action of a nonexistent file
if (replicator = @replicator) && !replicator.all_followers.empty?
if replicator = @replicator
wg = WaitGroup.new
replicator.delete_file(File.join(@msg_dir, "nonexistent"), wg)
spawn(name: "wait for file deletion is replicated") do
wg.wait
@segments.each_value &.close
@acks.each_value &.close
# Wait for any full sync to be done before we close mfiles, else we
# may get a SEGFAULT or an aborted sync because MFile closed is raised.
replicator.wait_for_sync do
# "Re-register" the files with path only so replicator won't
# use closed mfiles.
@segments.each_value do |segment|
replicator.register_file segment.path
segment.close
end
@acks.each_value do |ackfile|
replicator.register_file ackfile.path
ackfile.close
end
end
end
else
@segments.each_value &.close
Expand Down
Loading