Skip to content

Commit 247293b

Browse files
authored
feat(vlm): add streaming response handling for OpenAI VLM (volcengine#740)
Add support for handling SSE streaming responses from APIs that force streaming format even when stream=False is requested. Changes: - Add _extract_content_and_usage() for sync responses - Add _extract_content_and_usage_async() for async responses - Add _handle_response() and _handle_response_async() for response handling - Add _finalize_response() to eliminate duplicate post-processing logic - Add _process_streaming_chunks() to reduce code duplication - Add _extract_content_from_chunk() and _extract_usage_from_chunk() helpers - Add response type detection with basic type filtering (str/list/dict/bytes) - Update all completion methods to use new handlers - Remove redundant _update_token_usage_from_response calls - Add warning log for empty responses - Add comprehensive unit tests Refinements: - Add choices attribute check to _iterator detection (avoid false positives) - Add docstring warning that streaming response is consumed - Add comment explaining async version doesn't reuse _process_streaming_chunks - Fix test assertions to use correct get_token_usage_summary() method
1 parent cd87c0a commit 247293b

File tree

2 files changed

+558
-13
lines changed

2 files changed

+558
-13
lines changed

openviking/models/vlm/backends/openai_vlm.py

Lines changed: 177 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,179 @@ def get_async_client(self):
4848
self._async_client = openai.AsyncOpenAI(**client_kwargs)
4949
return self._async_client
5050

51-
def _update_token_usage_from_response(self, response):
52-
if hasattr(response, "usage") and response.usage:
53-
prompt_tokens = response.usage.prompt_tokens
54-
completion_tokens = response.usage.completion_tokens
51+
def _is_streaming_response(self, response):
52+
"""Check if response is a streaming response.
53+
54+
Streaming responses are iterators that yield chunks, while non-streaming
55+
responses have a choices attribute directly.
56+
"""
57+
# Check for async streaming first to avoid false positives
58+
if hasattr(response, "__aiter__"):
59+
return False # Async responses handled separately
60+
# Streaming responses: iterators but not strings/lists/dicts with choices
61+
if hasattr(response, "__iter__") and not hasattr(response, "choices"):
62+
# Exclude basic iterable types that might slip through
63+
if isinstance(response, (str, bytes, list, dict)):
64+
return False
65+
return True
66+
# Some streaming responses might have _iterator attribute
67+
if hasattr(response, "_iterator") and not hasattr(response, "choices"):
68+
return True
69+
return False
70+
71+
def _is_async_streaming_response(self, response):
72+
"""Check if response is an async streaming response."""
73+
if hasattr(response, "__aiter__") and not hasattr(response, "choices"):
74+
# Exclude basic types that should never be treated as streaming
75+
if isinstance(response, (str, bytes, list, dict)):
76+
return False
77+
return True
78+
if hasattr(response, "_iterator") and not hasattr(response, "choices"):
79+
return True
80+
return False
81+
82+
def _extract_content_from_chunk(self, chunk):
83+
"""Extract content string from a single chunk."""
84+
try:
85+
choices = getattr(chunk, "choices", None)
86+
if not choices:
87+
return None
88+
delta = getattr(choices[0], "delta", None)
89+
if not delta:
90+
return None
91+
return getattr(delta, "content", None)
92+
except (AttributeError, IndexError):
93+
return None
94+
95+
def _extract_usage_from_chunk(self, chunk):
96+
"""Extract token usage from a chunk. Returns (prompt_tokens, completion_tokens)."""
97+
usage = getattr(chunk, "usage", None)
98+
if not usage:
99+
return 0, 0
100+
prompt_tokens = getattr(usage, "prompt_tokens", 0) or 0
101+
completion_tokens = getattr(usage, "completion_tokens", 0) or 0
102+
return prompt_tokens, completion_tokens
103+
104+
def _process_streaming_chunks(self, chunks):
105+
"""Process streaming chunks and extract content and token usage.
106+
107+
WARNING: This method consumes the iterator. Do not use the response
108+
object after calling this method as it will be exhausted.
109+
110+
Returns (content, prompt_tokens, completion_tokens).
111+
"""
112+
content_parts = []
113+
prompt_tokens = 0
114+
completion_tokens = 0
115+
116+
for chunk in chunks:
117+
content = self._extract_content_from_chunk(chunk)
118+
if content:
119+
content_parts.append(content)
120+
121+
pt, ct = self._extract_usage_from_chunk(chunk)
122+
if pt > 0:
123+
prompt_tokens = pt
124+
if ct > 0:
125+
completion_tokens = ct
126+
127+
return "".join(content_parts), prompt_tokens, completion_tokens
128+
129+
def _extract_content_and_usage(self, response):
130+
"""Extract content from response, handling both streaming and non-streaming.
131+
132+
Returns (content, prompt_tokens, completion_tokens, is_streaming).
133+
"""
134+
logger.debug(f"[OpenAIVLM] Response type: {type(response)}")
135+
136+
if self._is_streaming_response(response):
137+
content, prompt_tokens, completion_tokens = self._process_streaming_chunks(response)
138+
return content, prompt_tokens, completion_tokens, True
139+
else:
140+
# Non-streaming response
141+
content = response.choices[0].message.content or ""
142+
usage = getattr(response, "usage", None)
143+
prompt_tokens = getattr(usage, "prompt_tokens", 0) or 0
144+
completion_tokens = getattr(usage, "completion_tokens", 0) or 0
145+
return content, prompt_tokens, completion_tokens, False
146+
147+
async def _extract_content_and_usage_async(self, response):
148+
"""Extract content from async response, handling both streaming and non-streaming.
149+
150+
Returns (content, prompt_tokens, completion_tokens, is_streaming).
151+
"""
152+
logger.debug(f"[OpenAIVLM] Async response type: {type(response)}")
153+
154+
if self._is_async_streaming_response(response):
155+
# Note: This logic mirrors _process_streaming_chunks but uses
156+
# async for to handle async iterators. Python's async for and
157+
# sync for cannot be unified in a single method.
158+
content_parts = []
159+
prompt_tokens = 0
160+
completion_tokens = 0
161+
162+
async for chunk in response:
163+
content = self._extract_content_from_chunk(chunk)
164+
if content:
165+
content_parts.append(content)
166+
167+
pt, ct = self._extract_usage_from_chunk(chunk)
168+
if pt > 0:
169+
prompt_tokens = pt
170+
if ct > 0:
171+
completion_tokens = ct
172+
173+
return "".join(content_parts), prompt_tokens, completion_tokens, True
174+
else:
175+
# Non-streaming response
176+
content = response.choices[0].message.content or ""
177+
usage = getattr(response, "usage", None)
178+
prompt_tokens = getattr(usage, "prompt_tokens", 0) or 0
179+
completion_tokens = getattr(usage, "completion_tokens", 0) or 0
180+
return content, prompt_tokens, completion_tokens, False
181+
182+
def _finalize_response(
183+
self, content, prompt_tokens, completion_tokens, is_streaming, operation_name="completion"
184+
):
185+
"""Finalize response: log warnings and update token usage.
186+
187+
Common post-processing for both sync and async responses.
188+
"""
189+
if not content:
190+
logger.warning(
191+
f"[OpenAIVLM] Empty {operation_name} response received (streaming={is_streaming})"
192+
)
193+
194+
if prompt_tokens > 0 or completion_tokens > 0:
55195
self.update_token_usage(
56196
model_name=self.model or "gpt-4o-mini",
57197
provider=self.provider,
58198
prompt_tokens=prompt_tokens,
59199
completion_tokens=completion_tokens,
60200
)
61-
return
201+
202+
return content
203+
204+
def _handle_response(self, response, operation_name="completion"):
205+
"""Handle response extraction and token usage update."""
206+
content, prompt_tokens, completion_tokens, is_streaming = self._extract_content_and_usage(
207+
response
208+
)
209+
return self._finalize_response(
210+
content, prompt_tokens, completion_tokens, is_streaming, operation_name
211+
)
212+
213+
async def _handle_response_async(self, response, operation_name="completion"):
214+
"""Handle async response extraction and token usage update."""
215+
(
216+
content,
217+
prompt_tokens,
218+
completion_tokens,
219+
is_streaming,
220+
) = await self._extract_content_and_usage_async(response)
221+
return self._finalize_response(
222+
content, prompt_tokens, completion_tokens, is_streaming, operation_name
223+
)
62224

63225
def get_completion(self, prompt: str, thinking: bool = False) -> str:
64226
"""Get text completion"""
@@ -72,8 +234,8 @@ def get_completion(self, prompt: str, thinking: bool = False) -> str:
72234
kwargs["max_tokens"] = self.max_tokens
73235

74236
response = client.chat.completions.create(**kwargs)
75-
self._update_token_usage_from_response(response)
76-
return self._clean_response(response.choices[0].message.content or "")
237+
content = self._handle_response(response, operation_name="text completion")
238+
return self._clean_response(content)
77239

78240
async def get_completion_async(
79241
self, prompt: str, thinking: bool = False, max_retries: int = 0
@@ -92,8 +254,10 @@ async def get_completion_async(
92254
for attempt in range(max_retries + 1):
93255
try:
94256
response = await client.chat.completions.create(**kwargs)
95-
self._update_token_usage_from_response(response)
96-
return self._clean_response(response.choices[0].message.content or "")
257+
content = await self._handle_response_async(
258+
response, operation_name="text completion"
259+
)
260+
return self._clean_response(content)
97261
except Exception as e:
98262
last_error = e
99263
if attempt < max_retries:
@@ -179,8 +343,8 @@ def get_vision_completion(
179343
kwargs["max_tokens"] = self.max_tokens
180344

181345
response = client.chat.completions.create(**kwargs)
182-
self._update_token_usage_from_response(response)
183-
return self._clean_response(response.choices[0].message.content or "")
346+
content = self._handle_response(response, operation_name="vision completion")
347+
return self._clean_response(content)
184348

185349
async def get_vision_completion_async(
186350
self,
@@ -205,5 +369,5 @@ async def get_vision_completion_async(
205369
kwargs["max_tokens"] = self.max_tokens
206370

207371
response = await client.chat.completions.create(**kwargs)
208-
self._update_token_usage_from_response(response)
209-
return self._clean_response(response.choices[0].message.content or "")
372+
content = await self._handle_response_async(response, operation_name="vision completion")
373+
return self._clean_response(content)

0 commit comments

Comments
 (0)