-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Inworld websocket improvements #4533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Inworld websocket improvements #4533
Conversation
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughShared WebSocket-based Inworld TTS pool added: manages multiple connections and per-connection contexts with waiter-based capacity signaling, outbound/inbound loops, context lifecycle (create/send/flush/close), idle eviction, and integration into TTS public API. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant TTS
participant Pool as ConnectionPool
participant Conn as InworldConnection
participant Inworld as InworldAPI
Client->>TTS: synthesize(text)
TTS->>Pool: acquire_context()
alt capacity available
Pool->>Conn: assign existing connection/context
else create or wait
Pool-->>TTS: wait for capacity
Pool->>Conn: create new connection if under max_connections
end
Pool-->>TTS: return context_id & connection
loop chunked text (<=1000 chars)
TTS->>Conn: send_text(chunk)
Conn->>Inworld: WebSocket send
Inworld-->>Conn: audio frames/events
Conn-->>TTS: deliver frames/events to stream
end
TTS->>Conn: flush_context()
Conn->>Inworld: flush message
Inworld-->>Conn: final frames
TTS->>Conn: close_context()
Conn->>Pool: release context capacity
Pool->>Pool: signal waiting acquirers / evict idle
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧹 Recent nitpick comments
📜 Recent review detailsConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used📓 Path-based instructions (1)**/*.py📄 CodeRabbit inference engine (AGENTS.md)
Files:
🔇 Additional comments (10)
✏️ Tip: You can disable this entire section by setting Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py`:
- Around line 102-108: The stale-context cleanup uses created_at so long-lived
streams get evicted immediately after close_context and their waiter futures
stay unresolved; add a close_started_at timestamp to _ContextInfo, set it in
close_context, update the cleanup logic to base eviction age on close_started_at
when present (fallback to created_at), and when evicting a context ensure any
waiter (tts._ContextInfo.waiter) is resolved (e.g., set_result(None) if not
done) so upstream timeouts don't occur; also make sure the existing
contextClosed handling still resolves the waiter if it arrives before cleanup.
- Around line 522-543: Before checking capacity and creating a new connection,
prune closed entries from self._connections so closed connections don't count
toward self._max_connections; iterate self._connections, remove or filter out
conn where conn._closed is True, then perform the existing capacity loop and the
len(self._connections) < self._max_connections check. Update references in this
block that create _InworldConnection, call conn.acquire_context, and use
notify_capacity_available so behavior is unchanged except closed connections are
excluded.
- Around line 212-257: The acquire_context loops have a lost-wakeup race because
they call self._context_available.clear() before awaiting
self._context_available.wait(); move the clear() call to after wait() returns so
a set() that happens between the capacity check and the wait won't be lost.
Update both _InworldConnection.acquire_context and
_ConnectionPool.acquire_context: remove/relocate the pre-wait clear() and call
clear() only after asyncio.Event.wait() completes (or equivalently clear
immediately after the awaited wait_for returns) while preserving the timeout
logic and exception handling.
- Around line 365-372: When status.code != 0 in the response-handling block, you
must release the stuck creation context instead of leaving it in self._contexts
in CREATING state; after setting ctx.waiter exception
(ctx.waiter.set_exception(error)) call the cleanup path (e.g., invoke
close_context(ctx.id) or transition ctx.state to ContextState.CLOSING and remove
it from self._contexts) and then signal capacity (call the existing
_signal_capacity() or equivalent) so the pool no longer counts the failed
context toward capacity. Ensure you reference the same symbols used in the file:
ctx, ctx.waiter, self._contexts, close_context (or ContextState.CLOSING) and
_signal_capacity when implementing the fix.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings
Files:
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
🧬 Code graph analysis (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py (2)
livekit-agents/livekit/agents/utils/misc.py (2)
shortuuid(21-22)is_given(25-26)livekit-agents/livekit/agents/_exceptions.py (1)
APIError(14-42)
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
Outdated
Show resolved
Hide resolved
|
@davidzhao made small updates in response to coderabbit review. re-tested both the normal agent convo flow and connection pool performance and looks good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py`:
- Around line 1037-1039: The generic exception handler logs the error and raises
APIConnectionError but does not call close_context, which can leak context_id in
the pool; update the except Exception as e block (where logger.error and raise
APIConnectionError() from e occur) to call close_context(context_id) before
raising (ideally in a finally-like cleanup or immediately after logging), and
ensure close_context is itself protected (e.g., swallow/log any errors from
close_context) so the original exception is re-raised as APIConnectionError.
- Around line 548-561: The new connection is appended to self._connections
before calling _InworldConnection.acquire_context, so if acquire_context raises
the pool retains a failed connection; either defer appending until after ctx
acquisition succeeds or catch exceptions around acquire_context, remove the conn
from self._connections and close it. Specifically, create the _InworldConnection
with session/_ws_url/_authorization, then call await
conn.acquire_context(emitter, opts, remaining_timeout) inside a try block; on
success append conn to self._connections and log; on exception ensure you call
conn.close()/cleanup, do not append (or remove) and re-raise or return the
error. Ensure notify_capacity_available usage remains unchanged.
🧹 Nitpick comments (5)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py (5)
159-160: Consider bounding the outbound queue to prevent memory growth.The
_outbound_queueis unbounded. If the WebSocket send rate is slower than the producer rate (e.g., during network issues), the queue can grow indefinitely. Consider setting a reasonablemaxsize(e.g., 1000) to apply backpressure.♻️ Suggested change
- self._outbound_queue: asyncio.Queue[_OutboundMessage] = asyncio.Queue() + self._outbound_queue: asyncio.Queue[_OutboundMessage] = asyncio.Queue(maxsize=1000)
833-838:prewarmdoesn't actually establish a connection.The current implementation only creates the pool object but doesn't establish any WebSocket connection until the first
acquire_contextcall. If the intent is to reduce latency on first synthesis, consider havingprewarmactually create and connect at least one connection.♻️ Suggested enhancement
async def _prewarm_impl(self) -> None: - # Just ensure the pool is created - first acquire will establish a connection - await self._get_pool() + pool = await self._get_pool() + # Create one connection to reduce latency on first synthesis + conn = _InworldConnection( + session=self._ensure_session(), + ws_url=self._ws_url, + authorization=self._authorization, + on_capacity_available=pool.notify_capacity_available, + ) + await conn.connect() + async with pool._pool_lock: + pool._connections.append(conn)
1028-1028: Consider extracting the magic number60to a named constant.The
+ 60buffer on the waiter timeout is not immediately clear. Consider defining a constant likeSYNTHESIS_TIMEOUT_BUFFER = 60.0to document its purpose.
1015-1019:_mark_started()is called for every chunk; consider calling once.
_mark_started()is called inside the loop for each 1000-char chunk. If this method has side effects or is not idempotent, it may cause unexpected behavior. Consider moving it outside the inner loop or guarding with a flag.♻️ Suggested fix
async def _send_task() -> None: + started = False async for ev in sent_tokenizer_stream: text = ev.token # Chunk to stay within Inworld's 1000 char limit for i in range(0, len(text), 1000): connection.send_text(context_id, text[i : i + 1000]) - self._mark_started() + if not started: + self._mark_started() + started = True connection.flush_context(context_id) connection.close_context(context_id)
235-235: Useasyncio.get_running_loop()instead ofget_event_loop()in async context.
asyncio.get_event_loop()emits a DeprecationWarning in Python 3.10+ when called from a coroutine. Sinceacquire_context()is an async function, useasyncio.get_running_loop().create_future()instead.♻️ Suggested fix
- waiter: asyncio.Future[None] = asyncio.get_event_loop().create_future() + waiter: asyncio.Future[None] = asyncio.get_running_loop().create_future()
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings
Files:
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: type-check (3.9)
- GitHub Check: type-check (3.13)
- GitHub Check: unit-tests
🔇 Additional comments (6)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py (6)
21-25: LGTM!The new imports (
time,field,Enum) are appropriate for the connection pool infrastructure.
94-135: LGTM!The context state tracking and message types are well-structured. The addition of
close_started_atproperly addresses the stale-context cleanup concern from prior review.
424-440: LGTM on stale context cleanup implementation.The cleanup logic now correctly uses
close_started_atfor timing (falling back tocreated_at), and resolves the waiter before eviction. This addresses the prior review feedback.
588-615: LGTM!The idle connection cleanup logic properly keeps at least one connection, checks periodically, and closes connections outside the lock to avoid blocking the pool.
759-770: LGTM!The lazy pool initialization with proper locking and closed-pool detection is well implemented.
855-862: LGTM!The shutdown sequence properly closes streams first, then the pool, ensuring clean resource cleanup.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py`:
- Around line 442-448: When a connection errors, in addition to failing each
context (ctx.waiter) and setting self._closed, also wake any local waiters and
the pool capacity event so callers can retry/create new connections: after
clearing self._contexts and setting self._closed in _handle_connection_error, if
the instance has a pending waiter attribute (e.g., self._waiter or similar) and
it is not done, set_exception(error) on it, and if the pool-level
asyncio.Event/condition named _capacity_available exists, set/notify it (e.g.,
call its set() or notify_all) so pool callers blocked on capacity are unblocked
to observe the closed connection and create a replacement.
- Around line 225-259: The acquire_context flow can enqueue a create on a dead
connection; before creating the _ContextInfo and after waking from
self._context_available.wait(), check self._closed (and/or any WS-closed flag)
and if true raise an immediate connection-closed error instead of proceeding;
specifically update the block guarded by self._acquire_lock (around has_capacity
/ ctx creation) to bail early when self._closed, and add the same self._closed
guard right after the wait/clear so waiters fail fast rather than enqueuing
_CreateContextMsg or returning a waiter on a closed connection.
- Around line 530-567: Holding self._pool_lock while awaiting
conn.acquire_context causes head-of-line blocking; instead, inside the lock
select or construct the candidate connection(s) but do not call await while
locked: pick an existing conn or instantiate _InworldConnection (with session,
ws_url, authorization, on_capacity_available=self.notify_capacity_available) and
store it in a local variable, then release the lock and call await
conn.acquire_context(emitter, opts, remaining_timeout) outside the lock; after
the await succeeds, re-acquire the lock to append a newly created connection to
self._connections (and set self._cleanup_task if needed) or to prune closed
connections as before, and ensure you close and cleanup the conn on exception.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings
Files:
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
🧬 Code graph analysis (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py (3)
livekit-plugins/livekit-plugins-cartesia/livekit/plugins/cartesia/tts.py (2)
_TTSOptions(63-82)_ensure_session(223-227)livekit-agents/livekit/agents/utils/misc.py (2)
shortuuid(21-22)is_given(25-26)livekit-agents/livekit/agents/_exceptions.py (3)
APITimeoutError(91-95)APIConnectionError(84-88)APIError(14-42)
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py (1)
1-25: Fix Ruff formatting to pass CI.The pipeline indicates that Ruff formatting check failed. Run
ruff formatto fix code style issues before merging.ruff format livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
🤖 Fix all issues with AI agents
In `@livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py`:
- Around line 621-648: The idle-cleanup loop in _cleanup_idle_connections can
remove every connection because the check uses the original self._connections
length; change the logic so you compute the remaining pool size accounting for
already-selected closures (e.g., remaining = len(self._connections) -
len(connections_to_close)) and only append a conn to connections_to_close if
remaining > 1 (or remaining >= 1 depending on intent) to guarantee at least one
connection remains; keep the rest of the removal and aclose behavior the same
and reference _cleanup_idle_connections, self._connections,
connections_to_close, conn.is_idle, conn.last_activity, and self._idle_timeout
when making the change.
🧹 Nitpick comments (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py (1)
273-296: Consider removing deadQueueFullexception handling or adding queue bounds.The outbound queue (line 160) is created without a
maxsize, making it unbounded. Theexcept asyncio.QueueFullhandlers will never execute sinceput_nowait()only raisesQueueFullfor bounded queues.Either:
- Remove the try/except and use
put_nowait()directly (if unbounded is intentional), or- Add a
maxsizeto the queue if you want backpressure protection.Option 1: Remove dead code (if unbounded is intentional)
def send_text(self, context_id: str, text: str) -> None: """Queue text to be sent to a context.""" - try: - self._outbound_queue.put_nowait(_SendTextMsg(context_id=context_id, text=text)) - except asyncio.QueueFull: - logger.warning("Outbound queue full, dropping text") + self._outbound_queue.put_nowait(_SendTextMsg(context_id=context_id, text=text)) def flush_context(self, context_id: str) -> None: """Queue a flush message for a context.""" - try: - self._outbound_queue.put_nowait(_FlushContextMsg(context_id=context_id)) - except asyncio.QueueFull: - logger.warning("Outbound queue full, dropping flush") + self._outbound_queue.put_nowait(_FlushContextMsg(context_id=context_id))
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings
Files:
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
🧬 Code graph analysis (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py (2)
livekit-agents/livekit/agents/_exceptions.py (3)
APIConnectionError(84-88)APITimeoutError(91-95)APIError(14-42)livekit-agents/livekit/agents/utils/misc.py (1)
shortuuid(21-22)
🪛 GitHub Actions: CI
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
[error] 1-1: Ruff formatting check failed. 1 file would be reformatted. Run 'ruff format' (or 'uv run ruff format --fix .') to fix code style issues.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: unit-tests
🔇 Additional comments (12)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py (12)
94-109: LGTM!The
_ContextStateenum and_ContextInfodataclass properly track context lifecycle. Theclose_started_atfield correctly addresses the previous concern about stale-context cleanup using the wrong timestamp.
112-135: LGTM!Clean message type definitions for the outbound queue.
137-196: LGTM!The
_InworldConnectionclass is well-structured with clear capacity management (MAX_CONTEXTS = 5), proper synchronization primitives, and useful properties for pool management.
197-271: LGTM!The connection establishment and context acquisition logic correctly addresses previous concerns:
- Closed-state guards at lines 228-229, 235-236, and 268-269 prevent hanging waiters.
- The
clear()afterwait()pattern at lines 263-265 fixes the lost-wakeup race.- Atomic capacity check with
_acquire_lockensures thread-safe context creation.
298-353: LGTM!The send loop correctly processes all message types and propagates errors to
_handle_connection_erroron failure.
355-434: LGTM!The receive loop correctly handles all message types, including proper cleanup and capacity signaling on errors (lines 388-392) and context closures (lines 423-427). The previous concern about stuck contexts on server errors is addressed.
436-452: LGTM!The stale context cleanup correctly uses
close_started_at(falling back tocreated_at) for eviction timing, and properly resolves waiters before eviction. This addresses the previous concern about long-lived streams being evicted prematurely.
454-467: LGTM!The error handler correctly fails all waiters, clears contexts, and signals both local waiters and the pool to enable recovery. This addresses the previous concern about waiters timing out when connections die.
469-491: LGTM!Clean shutdown implementation that properly cancels tasks, fails waiters, and closes the WebSocket.
690-803: LGTM!The TTS class correctly integrates the connection pool with new configuration parameters and lazy initialization via
_get_pool(). The docstrings clearly document the newmax_connectionsandidle_connection_timeoutparameters.
866-895: LGTM!The
prewarm()implementation correctly initializes the pool lazily, andaclose()properly cleans up pool resources.
1030-1077: LGTM!The stream implementation correctly:
- Acquires contexts from the connection pool (lines 1030-1035)
- Chunks text to stay within Inworld's 1000-character limit (lines 1048-1050)
- Calls
close_context()on all error paths (lines 1063, 1066, 1072), addressing the previous concern about context leaks
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
davidzhao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work!
Inworld has a maximum of 20 connections and 5 contexts per websocket connection, so the plugin has been updated to support this.
Summary by CodeRabbit
New Features
Performance Improvements
Breaking Changes
✏️ Tip: You can customize this high-level summary in your review settings.