Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions spec/stream_reader_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,32 @@ describe LavinMQ::AMQP::StreamReader do
end
end
end
it "should include x-stream-offset header" do
with_amqp_server do |s|
with_channel(s) do |ch|
x = ch.exchange("streams", "direct")
q = ch.queue("", args: AMQP::Client::Arguments.new({
"x-queue-type" => "stream",
}))
q.bind(x.name, q.name)
3.times do |i|
x.publish_confirm("test message #{i}", q.name)
end

iq = s.vhosts["/"].queues[q.name].as(LavinMQ::AMQP::Stream)
stream = iq.reader "first"

count = 0
stream.each do |env|
headers = env.message.properties.headers
headers.should_not be_nil
headers.not_nil!["x-stream-offset"].should eq (count + 1).to_i64
count += 1
end
count.should eq 3
end
end
end
it "should read over multiple segments" do
with_amqp_server do |s|
with_channel(s) do |ch|
Expand Down
3 changes: 2 additions & 1 deletion src/lavinmq/amqp/stream/stream_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,14 @@ module LavinMQ::AMQP
end
end

def read(segment : UInt32, position : UInt32) : Envelope?
def read(segment : UInt32, position : UInt32, offset : Int64) : Envelope?
return if @closed
rfile = @segments[segment]
return if position == rfile.size
begin
msg = BytesMessage.from_bytes(rfile.to_slice + position)
sp = SegmentPosition.new(segment, position, msg.bytesize.to_u32)
msg.properties.headers = add_offset_header(msg.properties.headers, offset)
Envelope.new(sp, msg, redelivered: false)
rescue ex
puts "read segment=#{segment} position=#{position}"
Expand Down
5 changes: 3 additions & 2 deletions src/lavinmq/amqp/stream/stream_reader.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ module LavinMQ::AMQP
def each(&)
stream = @stream
store = stream.stream_msg_store
_, segment, position = store.find_offset(@start_offset)
offset, segment, position = store.find_offset(@start_offset)
loop do
break if store.closed
env = store.read(segment, position)
env = store.read(segment, position, offset)
if env
position += env.segment_position.bytesize
offset += 1
else
# try read from new segment
s = store.next_segment_id(segment) || break
Expand Down
Loading