Skip to content

refactor: extract streaming chunk processor to eliminate code duplication#782

Merged
MervinPraison merged 1 commit into
mainfrom
claude/issue-754-20250709_155935
Jul 10, 2025
Merged

refactor: extract streaming chunk processor to eliminate code duplication#782
MervinPraison merged 1 commit into
mainfrom
claude/issue-754-20250709_155935

Conversation

@MervinPraison

@MervinPraison MervinPraison commented Jul 9, 2025

Copy link
Copy Markdown
Owner

User description

Addresses issue #754 by extracting the duplicated streaming chunk processing logic into a single helper method.

Changes

  • Add _process_stream_delta() helper method to consolidate duplicate logic
  • Refactor 4 duplicated streaming sections (sync/async, verbose/non-verbose)
  • Eliminates ~50 lines of duplicated code
  • Maintains 100% backward compatibility with no API changes

Generated with Claude Code


PR Type

Enhancement


Description

  • Extract duplicated streaming chunk processing logic into helper method

  • Refactor 4 streaming sections (sync/async, verbose/non-verbose modes)

  • Eliminate ~50 lines of duplicated code

  • Maintain 100% backward compatibility


Changes diagram

flowchart LR
  A["Duplicated streaming logic"] --> B["_process_stream_delta() helper"]
  B --> C["Sync verbose streaming"]
  B --> D["Sync non-verbose streaming"]
  B --> E["Async verbose streaming"]
  B --> F["Async non-verbose streaming"]
Loading

Changes walkthrough πŸ“

Relevant files
Enhancement
llm.py
Extract streaming chunk processor helper methodΒ  Β  Β  Β  Β  Β  Β  Β  Β  Β 

src/praisonai-agents/praisonaiagents/llm/llm.py

  • Add _process_stream_delta() helper method to consolidate duplicate
    logic
  • Replace 4 duplicated streaming sections with helper method calls
  • Remove ~50 lines of duplicated code for processing chunks and tool
    calls
  • Maintain identical functionality across sync/async and
    verbose/non-verbose modes
  • +45/-62Β 

    Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • Summary by CodeRabbit

    • Refactor
      • Improved internal code structure for handling streaming LLM responses, resulting in more maintainable and consistent behavior. No changes to user-facing features or functionality.

    …tion
    
    - Add _process_stream_delta() helper method to consolidate duplicate logic
    - Refactor 4 duplicated streaming sections (sync/async, verbose/non-verbose)
    - Eliminates ~50 lines of duplicated code
    - Maintains 100% backward compatibility with no API changes
    - Addresses original issue #754 about streaming code duplication
    
    Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
    @coderabbitai

    coderabbitai Bot commented Jul 9, 2025

    Copy link
    Copy Markdown
    Contributor

    Walkthrough

    A new private method, _process_stream_delta, was added to the LLM class to centralize the logic for processing streaming delta chunks during LLM completions. Existing synchronous and asynchronous streaming response methods were refactored to use this helper, removing duplicated code for accumulating response text and tool calls.

    Changes

    File(s) Change Summary
    src/praisonai-agents/praisonaiagents/llm/llm.py Added _process_stream_delta method to encapsulate streaming delta processing; refactored streaming loops in get_response and get_response_async to use this helper, removing duplicated logic.

    Poem

    In code where deltas used to roam,
    Now a helper calls them home.
    No more copy, paste, repeat,
    Streamlined logicβ€”oh, so neat!
    Rabbits cheer with every hop,
    For tidy code that just won't stop. πŸ‡βœ¨


    πŸ“œ Recent review details

    Configuration used: CodeRabbit UI
    Review profile: CHILL
    Plan: Pro

    πŸ“₯ Commits

    Reviewing files that changed from the base of the PR and between 892d46c and 53dc2be.

    πŸ“’ Files selected for processing (1)
    • src/praisonai-agents/praisonaiagents/llm/llm.py (5 hunks)
    🧰 Additional context used
    🧠 Learnings (2)
    πŸ““ Common learnings
    Learnt from: CR
    PR: MervinPraison/PraisonAI#0
    File: src/praisonai-ts/.cursorrules:0-0
    Timestamp: 2025-06-30T10:05:51.843Z
    Learning: Applies to src/praisonai-ts/src/llm/llm.ts : The 'LLM' class in 'llm.ts' should wrap 'aisdk.generateText' calls for generating text responses.
    
    Learnt from: CR
    PR: MervinPraison/PraisonAI#0
    File: src/praisonai-ts/.cursorrules:0-0
    Timestamp: 2025-06-30T10:05:51.843Z
    Learning: Applies to src/praisonai-ts/src/llm/llm.ts : Replace all references to 'LLM' or 'litellm' with 'aisdk' usage for large language model calls in Node.js/TypeScript code.
    
    Learnt from: CR
    PR: MervinPraison/PraisonAI#0
    File: src/praisonai-agents/CLAUDE.md:0-0
    Timestamp: 2025-06-30T10:06:17.673Z
    Learning: Use the unified LLM wrapper in `praisonaiagents/llm/` for integrating with multiple LLM providers.
    
    src/praisonai-agents/praisonaiagents/llm/llm.py (4)
    Learnt from: CR
    PR: MervinPraison/PraisonAI#0
    File: src/praisonai-ts/.cursorrules:0-0
    Timestamp: 2025-06-30T10:05:51.843Z
    Learning: Applies to src/praisonai-ts/src/llm/llm.ts : The 'LLM' class in 'llm.ts' should wrap 'aisdk.generateText' calls for generating text responses.
    
    Learnt from: CR
    PR: MervinPraison/PraisonAI#0
    File: src/praisonai-ts/.cursorrules:0-0
    Timestamp: 2025-06-30T10:05:51.843Z
    Learning: Applies to src/praisonai-ts/src/llm/llm.ts : Replace all references to 'LLM' or 'litellm' with 'aisdk' usage for large language model calls in Node.js/TypeScript code.
    
    Learnt from: CR
    PR: MervinPraison/PraisonAI#0
    File: src/praisonai-agents/CLAUDE.md:0-0
    Timestamp: 2025-06-30T10:06:17.673Z
    Learning: Use the unified LLM wrapper in `praisonaiagents/llm/` for integrating with multiple LLM providers.
    
    Learnt from: CR
    PR: MervinPraison/PraisonAI#0
    File: src/praisonai-ts/.windsurfrules:0-0
    Timestamp: 2025-06-30T10:06:44.129Z
    Learning: Applies to src/praisonai-ts/src/{llm,agent,agents,task}/**/*.ts : Replace all references to 'LLM' or 'litellm' with 'aisdk' usage in TypeScript code.
    
    ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
    • GitHub Check: Cursor BugBot
    • GitHub Check: quick-test
    • GitHub Check: Run tests and collect coverage
    • GitHub Check: test-core (3.11)
    πŸ”‡ Additional comments (5)
    src/praisonai-agents/praisonaiagents/llm/llm.py (5)

    232-264: Excellent refactoring - well-designed helper method.

    The new _process_stream_delta method successfully centralizes the streaming chunk processing logic with proper parameter handling and clear documentation. The method correctly:

    • Accumulates content from delta.content
    • Processes tool calls when provider supports streaming tools
    • Handles tool call structure with proper index-based accumulation
    • Returns a tuple for clean unpacking in calling code

    687-689: Clean integration of the helper method.

    The refactored code properly calls _process_stream_delta with correct parameters and unpacks the tuple return value. This eliminates the duplicated streaming logic while maintaining identical functionality.


    705-707: Consistent application of the refactoring.

    The non-verbose streaming section correctly uses the same helper method, ensuring consistency across both verbose and non-verbose scenarios.


    1351-1353: Proper async implementation.

    The async version correctly calls the same _process_stream_delta helper method, maintaining consistency between sync and async implementations while eliminating code duplication.


    1370-1372: Complete coverage of async streaming scenarios.

    The async non-verbose streaming section properly uses the helper method, ensuring all four streaming scenarios (sync/async Γ— verbose/non-verbose) are consistently refactored.

    ✨ Finishing Touches
    • πŸ“ Generate Docstrings

    Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

    ❀️ Share
    πŸͺ§ Tips

    Chat

    There are 3 ways to chat with CodeRabbit:

    • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
      • I pushed a fix in commit <commit_id>, please review it.
      • Explain this complex logic.
      • Open a follow-up GitHub issue for this discussion.
    • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
      • @coderabbitai explain this code block.
      • @coderabbitai modularize this function.
    • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
      • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
      • @coderabbitai read src/utils.ts and explain its main purpose.
      • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
      • @coderabbitai help me debug CodeRabbit configuration file.

    Support

    Need help? Create a ticket on our support page for assistance with any issues or questions.

    Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

    CodeRabbit Commands (Invoked using PR comments)

    • @coderabbitai pause to pause the reviews on a PR.
    • @coderabbitai resume to resume the paused reviews.
    • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
    • @coderabbitai full review to do a full review from scratch and review all the files again.
    • @coderabbitai summary to regenerate the summary of the PR.
    • @coderabbitai generate docstrings to generate docstrings for this PR.
    • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
    • @coderabbitai resolve resolve all the CodeRabbit review comments.
    • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
    • @coderabbitai help to get help.

    Other keywords and placeholders

    • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
    • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
    • Add @coderabbitai anywhere in the PR title to generate the title automatically.

    CodeRabbit Configuration File (.coderabbit.yaml)

    • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
    • Please see the configuration documentation for more information.
    • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

    Documentation and Community

    • Visit our Documentation for detailed information on how to use CodeRabbit.
    • Join our Discord Community to get help, request features, and share feedback.
    • Follow us on X/Twitter for updates and announcements.

    @gemini-code-assist gemini-code-assist Bot left a comment

    Copy link
    Copy Markdown
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Summary of Changes

    Hello @MervinPraison, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

    This pull request significantly improves the maintainability and readability of the LLM streaming logic by centralizing a previously duplicated code block. By extracting the common streaming delta processing into a dedicated helper method, I've reduced code redundancy and made future modifications or debugging of this core functionality much simpler, without impacting any external interfaces.

    Highlights

    • Code Refactoring: I've extracted the common logic for processing streaming delta chunks into a new private helper method, _process_stream_delta. This consolidates the code responsible for accumulating response text and parsing tool calls during streaming.
    • Duplication Elimination: This pull request addresses issue #754 by removing approximately 50 lines of duplicated code across four different streaming sections within the get_response and get_response_async methods. These sections previously contained identical logic for handling stream deltas in both verbose and non-verbose modes, for both synchronous and asynchronous calls.
    • Backward Compatibility: The changes are purely internal refactoring and maintain 100% backward compatibility with no changes to the public API.
    Using Gemini Code Assist

    The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

    Invoking Gemini

    You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

    Feature Command Description
    Code Review /gemini review Performs a code review for the current pull request in its current state.
    Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
    Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
    Help /gemini help Displays a list of available commands.

    Customization

    To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

    Limitations & Feedback

    Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with πŸ‘ and πŸ‘Ž on @gemini-code-assist comments to provide feedback.

    You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

    Footnotes

    1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩

    @qodo-code-review

    Copy link
    Copy Markdown

    PR Reviewer Guide πŸ”

    Here are some key observations to aid the review process:

    🎫 Ticket compliance analysis πŸ”Ά

    754 - Partially compliant

    Compliant requirements:

    β€’ Refactor code duplication in synchronous streaming around line 587 in llm.py
    β€’ Refactor code duplication in asynchronous streaming around line 1365 in llm.py
    β€’ Extract duplicated logic for processing chunks and capturing tool calls into helper functions
    β€’ Improve maintainability by consolidating nearly identical logic between verbose and non-verbose streaming modes

    Non-compliant requirements:

    β€’ Create a single process_chunk() function for sync streaming to reduce duplication
    β€’ Create an async process_chunk() function for async streaming to eliminate duplicated logic

    Requires further human verification:

    β€’ Verify that the refactored streaming functionality maintains identical behavior across all modes
    β€’ Test that tool call processing works correctly in all streaming scenarios

    ⏱️ Estimated effort to review: 2 πŸ”΅πŸ”΅βšͺβšͺβšͺ
    πŸ§ͺΒ No relevant tests
    πŸ”’Β No security concerns identified
    ⚑ Recommended focus areas for review

    Implementation Mismatch

    The helper method is named _process_stream_delta instead of the suggested process_chunk() from the ticket. Also, the implementation uses a single synchronous helper for both sync and async contexts rather than separate sync/async versions as suggested.

    def _process_stream_delta(self, delta, response_text: str, tool_calls: List[Dict], formatted_tools: Optional[List] = None) -> tuple:
        """
        Process a streaming delta chunk to extract content and tool calls.
    
        Args:
            delta: The delta object from a streaming chunk
            response_text: The accumulated response text so far
            tool_calls: The accumulated tool calls list so far
            formatted_tools: Optional list of formatted tools for tool call support check
    
        Returns:
            tuple: (updated_response_text, updated_tool_calls)
        """
        # Process content
        if delta.content:
            response_text += delta.content
    
        # Capture tool calls from streaming chunks if provider supports it
        if formatted_tools and self._supports_streaming_tools() and hasattr(delta, 'tool_calls') and delta.tool_calls:
            for tc in delta.tool_calls:
                if tc.index >= len(tool_calls):
                    tool_calls.append({
                        "id": tc.id,
                        "type": "function",
                        "function": {"name": "", "arguments": ""}
                    })
                if tc.function.name:
                    tool_calls[tc.index]["function"]["name"] = tc.function.name
                if tc.function.arguments:
                    tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments
    
        return response_text, tool_calls
    Logic Change

    The refactored code processes content accumulation inside the helper method but still checks delta.content after the helper call for display updates, which could lead to inconsistent behavior if the helper method logic changes.

    response_text, tool_calls = self._process_stream_delta(
        delta, response_text, tool_calls, formatted_tools
    )
    if delta.content:
        live.update(display_generating(response_text, current_time))

    @MervinPraison

    Copy link
    Copy Markdown
    Owner Author

    @claude review this again, making sure nothing is breaking, no features removed, everything exactly the same in regards to the usage and the response.

    @claude

    claude Bot commented Jul 9, 2025

    Copy link
    Copy Markdown
    Contributor

    Claude encountered an error β€”β€” View job


    Claude Code is reviewing the PR...

    Todo List

    • Read the modified file to understand the refactoring
    • Verify the extracted helper method logic
    • Check all 4 streaming sections are properly refactored
    • Confirm no public API changes
    • Look for potential edge cases or behavioral differences

    @qodo-code-review

    Copy link
    Copy Markdown

    PR Code Suggestions ✨

    No code suggestions found for the PR.

    @gemini-code-assist gemini-code-assist Bot left a comment

    Copy link
    Copy Markdown
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    Code Review

    This pull request refactors duplicated streaming logic into a new helper method, _process_stream_delta, improving code maintainability. Suggestions have been provided to enhance type safety and clarify the behavior of the new method.


    return any(endpoint in base_url or endpoint in api_base for endpoint in ollama_endpoints)

    def _process_stream_delta(self, delta, response_text: str, tool_calls: List[Dict], formatted_tools: Optional[List] = None) -> tuple:

    Copy link
    Copy Markdown
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    medium

    Consider adding specific type hints for the delta parameter and the return tuple in the _process_stream_delta method. Using delta: Any and Tuple[str, List[Dict]] can improve code clarity and enable static type checking.

    Suggested change
    def _process_stream_delta(self, delta, response_text: str, tool_calls: List[Dict], formatted_tools: Optional[List] = None) -> tuple:
    def _process_stream_delta(self, delta: Any, response_text: str, tool_calls: List[Dict], formatted_tools: Optional[List] = None) -> Tuple[str, List[Dict]]:

    Comment on lines +250 to +261
    if formatted_tools and self._supports_streaming_tools() and hasattr(delta, 'tool_calls') and delta.tool_calls:
    for tc in delta.tool_calls:
    if tc.index >= len(tool_calls):
    tool_calls.append({
    "id": tc.id,
    "type": "function",
    "function": {"name": "", "arguments": ""}
    })
    if tc.function.name:
    tool_calls[tc.index]["function"]["name"] = tc.function.name
    if tc.function.arguments:
    tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments

    Copy link
    Copy Markdown
    Contributor

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    medium

    The _process_stream_delta method modifies the tool_calls list in place, which might be unexpected for callers. To avoid this side effect, consider working on a copy of the tool_calls list within the function or documenting the mutation clearly in the docstring.

    @claude

    claude Bot commented Jul 9, 2025

    Copy link
    Copy Markdown
    Contributor

    Claude encountered an error β€”β€” View job


    I'll analyze this and get back to you.

    @claude

    claude Bot commented Jul 9, 2025

    Copy link
    Copy Markdown
    Contributor

    Claude finished @MervinPraison's task β€”β€” View job


    Code Review Complete βœ…

    I've thoroughly reviewed this refactoring PR and can confirm that nothing is breaking, no features are removed, and everything remains exactly the same in terms of usage and response.

    Review Summary

    βœ… Refactoring is Safe and Correct

    • The _process_stream_delta() helper method correctly extracts the duplicated logic
    • All 4 streaming sections are properly refactored:
      • Sync verbose streaming (lines 687-689)
      • Sync non-verbose streaming (lines 705-707)
      • Async verbose streaming (lines 1351-1353)
      • Async non-verbose streaming (lines 1370-1372)

    βœ… No Behavioral Changes

    • The helper method preserves the exact same logic:
      • Content accumulation: if delta.content: response_text += delta.content
      • Tool call processing with identical conditions and logic
      • Same return values and side effects
    • Display updates in verbose mode still work correctly (checking delta.content after helper call)

    βœ… No Public API Changes

    • The helper method is private (prefixed with _)
    • All public methods maintain their exact signatures
    • No changes to return types or parameters

    βœ… Edge Cases Handled Correctly

    • All safety checks preserved (hasattr, if delta.content, etc.)
    • List mutation behavior identical (tool_calls modified in-place)
    • String immutability handled properly (response_text returned)

    βœ… Other Streaming Code Unaffected

    • Ollama-specific follow-up streaming (lines 880, 892) correctly left unchanged
    • Simple streaming in response() method (lines 1932, 1944) correctly left unchanged
    • These don't involve tool calls and weren't part of the duplication

    Conclusion

    This is an excellent refactoring that successfully eliminates ~50 lines of duplicated code while maintaining 100% backward compatibility. The code is now more maintainable without any risk of breaking existing functionality.


    @MervinPraison MervinPraison merged commit 898948e into main Jul 10, 2025
    14 checks passed
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

    Projects

    None yet

    Development

    Successfully merging this pull request may close these issues.

    1 participant