Skip to content

Commit c4d774f

Browse files
Chore // Uses buffering to get around final JSON chunk being arbitrarily split (#5)
* uses buffering * adds docs
1 parent 437639c commit c4d774f

File tree

1 file changed

+40
-71
lines changed

1 file changed

+40
-71
lines changed

lib/revelry_ai/stream.ex

Lines changed: 40 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,110 +1,79 @@
11
defmodule RevelryAI.Stream do
2-
@moduledoc false
3-
4-
@doc """
2+
@moduledoc """
53
Creates a readable stream of information that another process can consume.
4+
"""
65

6+
@doc """
77
This is used for endpoints that support returning text as it is generated,
88
rather than waiting for the entire generation output to be ready.
9+
10+
Note: We use a buffer to store the response until we have a complete chunk. While RevelryAI usually returns the entire response at once, this is not guaranteed if
11+
streaming across a network boundary.
912
"""
1013
def new(start_fun) do
1114
Stream.resource(
12-
start_fun,
13-
fn
14-
{:error, %HTTPoison.Error{} = error} ->
15-
{
16-
[
17-
%{
18-
"status" => :error,
19-
"reason" => error.reason
20-
}
21-
],
22-
error
23-
}
15+
fn ->
16+
case start_fun.() do
17+
{:ok, %HTTPoison.AsyncResponse{id: id} = res} ->
18+
{res, id, ""}
2419

25-
%HTTPoison.Error{} = error ->
26-
{:halt, error}
20+
{:error, %HTTPoison.Error{} = error} ->
21+
{:error, error}
2722

28-
res ->
29-
{res, id} =
30-
case res do
31-
{:ok, %HTTPoison.AsyncResponse{id: id} = res} ->
32-
{res, id}
23+
%HTTPoison.AsyncResponse{id: id} = res ->
24+
{res, id, ""}
25+
end
26+
end,
27+
fn
28+
{:error, %HTTPoison.Error{} = error} ->
29+
{[%{"status" => :error, "reason" => error.reason}], {:halt, error}}
3330

34-
%HTTPoison.AsyncResponse{id: id} = res ->
35-
{res, id}
36-
end
31+
{:halt, _} = halt ->
32+
halt
3733

34+
{res, id, buffer} ->
3835
receive do
3936
%HTTPoison.AsyncStatus{id: ^id, code: code} ->
4037
HTTPoison.stream_next(res)
4138

42-
case code do
43-
200 ->
44-
{[], res}
45-
46-
_ ->
47-
{
48-
[
49-
%{
50-
"status" => :error,
51-
"code" => code,
52-
"choices" => []
53-
}
54-
],
55-
res
56-
}
39+
if code == 200 do
40+
{[], {res, id, buffer}}
41+
else
42+
{[%{"status" => :error, "code" => code, "choices" => []}], {:halt, res}}
5743
end
5844

59-
%HTTPoison.AsyncHeaders{id: ^id, headers: _headers} ->
45+
%HTTPoison.AsyncHeaders{id: ^id} ->
6046
HTTPoison.stream_next(res)
61-
{[], res}
47+
{[], {res, id, buffer}}
6248

6349
%HTTPoison.AsyncChunk{chunk: chunk} ->
64-
{events, _} = ServerSentEvents.parse(chunk)
50+
combined_chunk = buffer <> chunk
6551

66-
data =
67-
Enum.flat_map(events, &parse_events/1)
52+
{events, rest_buffer} = ServerSentEvents.parse(combined_chunk)
53+
data = Enum.flat_map(events, &parse_events/1)
6854

6955
HTTPoison.stream_next(res)
70-
{data, res}
56+
{data, {res, id, rest_buffer}}
7157

7258
%HTTPoison.AsyncEnd{} ->
7359
{:halt, res}
7460
end
7561
end,
76-
fn %{id: id} ->
77-
:hackney.stop_async(id)
62+
fn
63+
%{id: id} -> :hackney.stop_async(id)
7864
end
7965
)
8066
end
8167

82-
defp parse_events(%{data: content}) do
83-
decoded = decode_json_content(content)
84-
[decoded]
85-
end
86-
87-
defp parse_events([%{data: content}]) do
88-
decoded = decode_json_content(content)
89-
[decoded]
90-
end
91-
92-
defp parse_events(_) do
93-
[]
94-
end
68+
defp parse_events(%{data: content}), do: [decode_json_content(content)]
69+
defp parse_events([%{data: content}]), do: [decode_json_content(content)]
70+
defp parse_events(_), do: []
9571

9672
defp decode_json_content(data) do
9773
case Jason.decode(data) do
98-
{:ok, %{"content" => content}} ->
99-
content
100-
101-
{:ok, map} when is_map(map) ->
102-
# For response_body events or other structured data
103-
map
104-
105-
{:error, _error} ->
106-
# Not JSON or doesn't have content field, return as is
107-
data
74+
{:ok, %{"content" => content}} -> content
75+
{:ok, map} when is_map(map) -> map
76+
{:error, _} -> data
10877
end
10978
end
11079
end

0 commit comments

Comments
 (0)