diff --git a/server/src/flowforge_server/api/routes/tasks.py b/server/src/flowforge_server/api/routes/tasks.py index 4110d95..b12f4f3 100644 --- a/server/src/flowforge_server/api/routes/tasks.py +++ b/server/src/flowforge_server/api/routes/tasks.py @@ -2,7 +2,7 @@ import uuid -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException, Query, Request from pydantic import BaseModel, Field from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession @@ -10,6 +10,7 @@ from flowforge_server.api.deps import TenantWithDevFallback from flowforge_server.db import get_session from flowforge_server.db.models import Task, TaskPriority, TaskStatus +from flowforge_server.services.container import get_task_service router = APIRouter(prefix="/tasks", tags=["tasks"]) @@ -185,6 +186,7 @@ async def get_task_board( @router.post("", response_model=TaskResponse, status_code=201) async def create_task( + request: Request, data: CreateTaskRequest, tenant: TenantWithDevFallback, session: AsyncSession = Depends(get_session), @@ -219,6 +221,13 @@ async def create_task( await session.commit() await session.refresh(task) + # Task automation hooks + try: + task_service = get_task_service(request) + await task_service.on_task_created(session, task) + except Exception: + pass # Automation errors should not block task creation + return TaskResponse(**task.to_dict()) @@ -244,6 +253,7 @@ async def get_task( @router.patch("/{task_id}", response_model=TaskResponse) async def update_task( task_id: str, + request: Request, data: UpdateTaskRequest, tenant: TenantWithDevFallback, session: AsyncSession = Depends(get_session), @@ -259,6 +269,7 @@ async def update_task( raise HTTPException(status_code=404, detail="Task not found") update_data = data.model_dump(exclude_unset=True) + previous_values = {k: getattr(task, k) for k in update_data} # Convert UUID strings to UUID objects for field in ("assignee_user_id", "assignee_agent_id", "function_id", "run_id"): @@ -277,6 +288,13 @@ async def update_task( await session.commit() await session.refresh(task) + # Task automation hooks + try: + task_service = get_task_service(request) + await task_service.on_task_updated(session, task, update_data, previous_values) + except Exception: + pass # Automation errors should not block task update + return TaskResponse(**task.to_dict()) diff --git a/server/src/flowforge_server/db/models/task.py b/server/src/flowforge_server/db/models/task.py index 4877af1..eef8e17 100644 --- a/server/src/flowforge_server/db/models/task.py +++ b/server/src/flowforge_server/db/models/task.py @@ -156,13 +156,13 @@ class Task(Base, TimestampMixin): # Relationships tenant: Mapped["Tenant"] = relationship("Tenant", back_populates="tasks") assignee_user: Mapped["User | None"] = relationship( - "User", foreign_keys=[assignee_user_id] + "User", foreign_keys=[assignee_user_id], lazy="selectin" ) assignee_agent: Mapped["Agent | None"] = relationship( - "Agent", foreign_keys=[assignee_agent_id] + "Agent", foreign_keys=[assignee_agent_id], lazy="selectin" ) created_by: Mapped["User | None"] = relationship( - "User", foreign_keys=[created_by_user_id] + "User", foreign_keys=[created_by_user_id], lazy="selectin" ) parent_task: Mapped["Task | None"] = relationship( "Task", remote_side=[id], foreign_keys=[parent_task_id] diff --git a/server/src/flowforge_server/services/builtin_tools.py b/server/src/flowforge_server/services/builtin_tools.py index 3e29538..6e982d2 100644 --- a/server/src/flowforge_server/services/builtin_tools.py +++ b/server/src/flowforge_server/services/builtin_tools.py @@ -13,6 +13,7 @@ from urllib.parse import urlparse import httpx +from sqlalchemy import select from flowforge_server.services.network_utils import create_ssrf_safe_client, validate_webhook_url @@ -27,6 +28,17 @@ class BuiltinToolDefinition: approval_timeout: str | None = None +@dataclass +class ToolContext: + """Execution context for tools that need database access.""" + session: Any # AsyncSession + run: Any # Run model + tenant_id: Any # uuid.UUID + + +CONTEXT_AWARE_BUILTINS = {"update_task", "comment_on_task"} + + # ============================================================================= # BUILT-IN TOOL DEFINITIONS # ============================================================================= @@ -139,6 +151,32 @@ class BuiltinToolDefinition: requires_approval=True, approval_timeout="24h", # Give users more time to respond to questions ), + + # Task Management Tools + BuiltinToolDefinition( + name="update_task", + description="Update the status or priority of a task. Use to report progress on your assigned task.", + parameters={ + "type": "object", + "properties": { + "task_id": {"type": "string", "description": "Task UUID. If omitted, updates the task linked to the current run."}, + "status": {"type": "string", "enum": ["todo", "in_progress", "in_review", "done", "blocked", "cancelled"], "description": "New task status"}, + "priority": {"type": "string", "enum": ["urgent", "high", "medium", "low", "none"], "description": "New task priority"}, + }, + }, + ), + BuiltinToolDefinition( + name="comment_on_task", + description="Post a comment on a task. Use to report progress, findings, or ask questions.", + parameters={ + "type": "object", + "properties": { + "task_id": {"type": "string", "description": "Task UUID. If omitted, comments on the task linked to the current run."}, + "content": {"type": "string", "description": "Comment text (markdown supported)"}, + }, + "required": ["content"], + }, + ), ] @@ -396,3 +434,112 @@ def get_builtin_tool_definitions() -> list[BuiltinToolDefinition]: def get_builtin_tool_names() -> list[str]: """Get names of all built-in tools.""" return list(TOOL_EXECUTORS.keys()) + + +# ============================================================================= +# CONTEXT-AWARE TOOL IMPLEMENTATIONS +# ============================================================================= + +async def execute_context_builtin_tool( + name: str, + arguments: dict[str, Any], + context: ToolContext, +) -> dict[str, Any]: + """Execute a built-in tool that requires database/execution context.""" + if name == "update_task": + return await _execute_update_task(arguments, context) + elif name == "comment_on_task": + return await _execute_comment_on_task(arguments, context) + return {"error": f"Unknown context-aware tool: {name}"} + + +async def _execute_update_task( + arguments: dict[str, Any], + context: ToolContext, +) -> dict[str, Any]: + """Update a task's status or priority.""" + from flowforge_server.db.models.task import Task + + session = context.session + task_id = arguments.get("task_id") + + if not task_id: + # Find task linked to current run + result = await session.execute( + select(Task).where(Task.run_id == context.run.id) + ) + task = result.scalar_one_or_none() + if not task: + return {"error": "No task linked to the current run. Provide task_id explicitly."} + else: + result = await session.execute( + select(Task).where( + Task.id == uuid_mod.UUID(task_id), + Task.tenant_id == context.tenant_id, + ) + ) + task = result.scalar_one_or_none() + if not task: + return {"error": f"Task {task_id} not found"} + + updated_fields = [] + if "status" in arguments: + task.status = arguments["status"] + updated_fields.append(f"status={arguments['status']}") + if "priority" in arguments: + task.priority = arguments["priority"] + updated_fields.append(f"priority={arguments['priority']}") + + if not updated_fields: + return {"error": "No fields to update. Provide status or priority."} + + await session.commit() + return {"success": True, "task_id": str(task.id), "identifier": task.identifier, "updated": updated_fields} + + +async def _execute_comment_on_task( + arguments: dict[str, Any], + context: ToolContext, +) -> dict[str, Any]: + """Post a comment on a task.""" + from flowforge_server.db.models.comment import Comment + from flowforge_server.db.models.task import Task + + session = context.session + task_id = arguments.get("task_id") + content = arguments.get("content", "") + + if not content: + return {"error": "Comment content is required"} + + if not task_id: + result = await session.execute( + select(Task).where(Task.run_id == context.run.id) + ) + task = result.scalar_one_or_none() + if not task: + return {"error": "No task linked to the current run. Provide task_id explicitly."} + task_id = str(task.id) + + # Determine agent_id from the run's function/agent context + agent_id = None + if hasattr(context.run, 'function') and context.run.function: + # Try to find the agent assigned to the task + result = await session.execute( + select(Task).where(Task.id == uuid_mod.UUID(task_id)) + ) + linked_task = result.scalar_one_or_none() + if linked_task and linked_task.assignee_agent_id: + agent_id = linked_task.assignee_agent_id + + comment = Comment( + tenant_id=context.tenant_id, + task_id=uuid_mod.UUID(task_id) if isinstance(task_id, str) else task_id, + content=content, + comment_type="comment", + author_agent_id=agent_id, + ) + session.add(comment) + await session.commit() + + return {"success": True, "task_id": task_id, "comment_id": str(comment.id)} diff --git a/server/src/flowforge_server/services/container.py b/server/src/flowforge_server/services/container.py index 99be283..98071e7 100644 --- a/server/src/flowforge_server/services/container.py +++ b/server/src/flowforge_server/services/container.py @@ -14,6 +14,7 @@ if TYPE_CHECKING: from flowforge_server.queue import FairQueue from flowforge_server.services.ai import AIService + from flowforge_server.services.task_service import TaskService from flowforge_server.stream import RedisEventStream from flowforge_server.stream.pubsub import RunEventPubSub @@ -36,6 +37,7 @@ class ServiceContainer: job_queue: FairQueue event_stream: RedisEventStream pubsub: RunEventPubSub + task_service: TaskService # Optional flag to track initialization _initialized: bool = field(default=False, repr=False) @@ -88,17 +90,22 @@ def create_service_container() -> ServiceContainer: from flowforge_server.queue import FairQueue from flowforge_server.services.ai import AIService from flowforge_server.services.providers import get_provider_registry + from flowforge_server.services.task_service import TaskService from flowforge_server.stream import RedisEventStream from flowforge_server.stream.pubsub import RunEventPubSub ai_service = AIService() ai_service.provider_registry = get_provider_registry() + event_stream = RedisEventStream() + task_service = TaskService(event_stream=event_stream) + return ServiceContainer( ai_service=ai_service, job_queue=FairQueue(), - event_stream=RedisEventStream(), + event_stream=event_stream, pubsub=RunEventPubSub(), + task_service=task_service, ) @@ -143,3 +150,8 @@ def get_event_stream(request: Request) -> RedisEventStream: def get_pubsub(request: Request) -> RunEventPubSub: """Get the pubsub from the request.""" return get_services(request).pubsub + + +def get_task_service(request: Request) -> TaskService: + """Get the task service from the request.""" + return get_services(request).task_service diff --git a/server/src/flowforge_server/services/executor.py b/server/src/flowforge_server/services/executor.py index 3b909b7..00b7cf3 100644 --- a/server/src/flowforge_server/services/executor.py +++ b/server/src/flowforge_server/services/executor.py @@ -29,6 +29,7 @@ if TYPE_CHECKING: from flowforge_server.services.inline_executor import InlineExecutor + from flowforge_server.stream import RedisEventStream class Executor: @@ -67,6 +68,7 @@ def __init__( self._http_client: httpx.AsyncClient | None = None self._ai_service: AIService | None = None self._inline_executor: InlineExecutor | None = None + self._event_stream: RedisEventStream | None = None async def _get_http_client(self) -> httpx.AsyncClient: """Get or create HTTP client.""" @@ -98,6 +100,12 @@ def _get_inline_executor(self) -> "InlineExecutor": self._inline_executor = InlineExecutor(self._get_ai_service()) return self._inline_executor + def _get_event_stream(self) -> "RedisEventStream": + if self._event_stream is None: + from flowforge_server.stream import RedisEventStream + self._event_stream = RedisEventStream() + return self._event_stream + async def start(self) -> None: """Start the executor service.""" print(f"[Executor] Starting {self.worker_id} with concurrency={self.concurrency}...") @@ -398,6 +406,9 @@ async def _handle_result( print(f"[Executor] Run {run.id} completed") + # Sync linked task status + await self._sync_task_on_run_end(session, run, success=True) + elif status == "step_complete": # Step completed, process and continue step_id = result.get("step_id") @@ -488,10 +499,27 @@ async def _handle_result( print(f"[Executor] Run {run.id} permanently failed") + # Sync linked task status + await self._sync_task_on_run_end(session, run, success=False) + else: print(f"[Executor] Unknown result status: {status}") await self.queue.complete(job.id) + async def _sync_task_on_run_end( + self, session: AsyncSession, run: Run, *, success: bool + ) -> None: + """Update linked task when run completes or fails.""" + try: + from flowforge_server.services.task_service import TaskService + task_service = TaskService(event_stream=self._get_event_stream()) + if success: + await task_service.on_run_completed(session, run) + else: + await task_service.on_run_failed(session, run) + except Exception as e: + print(f"[Executor] Task sync failed for run {run.id}: {e}") + def _sanitize_output(self, value: Any) -> Any: """Recursively strip null bytes from strings so PostgreSQL JSONB accepts the value.""" if isinstance(value, str): diff --git a/server/src/flowforge_server/services/inline_executor.py b/server/src/flowforge_server/services/inline_executor.py index 9d4a5cc..dc5e337 100644 --- a/server/src/flowforge_server/services/inline_executor.py +++ b/server/src/flowforge_server/services/inline_executor.py @@ -28,7 +28,13 @@ ) from flowforge_server.logging import Loggers from flowforge_server.services.ai import AIService, ToolCall -from flowforge_server.services.builtin_tools import execute_builtin_tool, get_builtin_tool_names +from flowforge_server.services.builtin_tools import ( + CONTEXT_AWARE_BUILTINS, + ToolContext, + execute_builtin_tool, + execute_context_builtin_tool, + get_builtin_tool_names, +) from flowforge_server.services.credentials import ( CredentialResolutionError, resolve_dict_placeholders, @@ -580,7 +586,7 @@ def to_dict(self): # Execute the tool with potentially modified arguments try: tool_result = await self._execute_tool( - session, tool_info, tool_arguments + session, tool_info, tool_arguments, run=run ) result_str = str(tool_result) except Exception as e: @@ -797,8 +803,17 @@ async def _execute_tool( session: AsyncSession, tool_info: dict[str, Any], arguments: dict[str, Any], + *, + run: Any = None, ) -> Any: """Execute a tool with the given arguments.""" + # Context-aware builtins need database access + if tool_info["is_builtin"] and tool_info["name"] in CONTEXT_AWARE_BUILTINS: + if run: + context = ToolContext(session=session, run=run, tenant_id=run.tenant_id) + return await execute_context_builtin_tool(tool_info["name"], arguments, context) + else: + return {"error": f"Tool '{tool_info['name']}' requires an execution context (run)"} if tool_info["is_builtin"]: return await execute_builtin_tool(tool_info["name"], arguments) elif tool_info.get("webhook_url"): diff --git a/server/src/flowforge_server/services/task_service.py b/server/src/flowforge_server/services/task_service.py new file mode 100644 index 0000000..b476f2f --- /dev/null +++ b/server/src/flowforge_server/services/task_service.py @@ -0,0 +1,628 @@ +"""Task automation service — bridges tasks with the event/execution pipeline.""" + +import uuid +from datetime import datetime +from typing import Any + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from flowforge_server.db.models import Event, Function, Run, RunStatus +from flowforge_server.db.models.agent import Agent +from flowforge_server.db.models.comment import Comment +from flowforge_server.db.models.notification import Notification +from flowforge_server.db.models.task import Task, TaskStatus +from flowforge_server.logging import Loggers +from flowforge_server.stream import RedisEventStream, StreamMessage + +log = Loggers.api() + + +class TaskService: + """Bridges task CRUD with the event/function/agent execution pipeline.""" + + MAX_TASK_EVENT_DEPTH = 3 + + def __init__(self, event_stream: RedisEventStream) -> None: + self.event_stream = event_stream + + async def on_task_created( + self, + session: AsyncSession, + task: Task, + *, + _depth: int = 0, + ) -> None: + """ + Handle a newly created task. + + Emits a ``task/created`` event and, if the task is already assigned + to an agent with a linked function, kicks off agent execution. + """ + await self._emit_task_event( + session, + task.tenant_id, + "task/created", + { + "task_id": str(task.id), + "identifier": task.identifier, + "title": task.title, + "description": task.description, + "status": task.status, + "priority": task.priority, + "assignee_agent_id": str(task.assignee_agent_id) if task.assignee_agent_id else None, + "assignee_user_id": str(task.assignee_user_id) if task.assignee_user_id else None, + "function_id": str(task.function_id) if task.function_id else None, + "metadata": task.task_metadata, + }, + _depth=_depth, + ) + + if task.assignee_agent_id and task.function_id: + await self._maybe_trigger_agent_execution(session, task, _depth=_depth) + + async def on_task_updated( + self, + session: AsyncSession, + task: Task, + changes: dict[str, Any], + previous_values: dict[str, Any], + *, + _depth: int = 0, + ) -> None: + """ + Handle task field changes. + + Emits targeted events for assignment and status transitions, creates + notifications for affected users, and triggers agent execution when + appropriate. + """ + agent_assigned = ( + "assignee_agent_id" in changes + and changes["assignee_agent_id"] is not None + ) + user_assigned = ( + "assignee_user_id" in changes + and changes["assignee_user_id"] is not None + ) + status_changed = "status" in changes + + if agent_assigned: + await self._emit_task_event( + session, + task.tenant_id, + "task/assigned", + { + "task_id": str(task.id), + "identifier": task.identifier, + "title": task.title, + "assignee_type": "agent", + "assignee_agent_id": str(task.assignee_agent_id), + "previous_assignee_agent_id": str(previous_values.get("assignee_agent_id")) + if previous_values.get("assignee_agent_id") else None, + }, + _depth=_depth, + ) + + if user_assigned: + await self._emit_task_event( + session, + task.tenant_id, + "task/assigned", + { + "task_id": str(task.id), + "identifier": task.identifier, + "title": task.title, + "assignee_type": "user", + "assignee_user_id": str(task.assignee_user_id), + "previous_assignee_user_id": str(previous_values.get("assignee_user_id")) + if previous_values.get("assignee_user_id") else None, + }, + _depth=_depth, + ) + self._create_notification( + session, + user_id=task.assignee_user_id, + notification_type="task_assigned", + title=f"You've been assigned to {task.identifier}", + body=task.title, + resource_type="task", + resource_id=str(task.id), + data={"task_id": str(task.id), "identifier": task.identifier}, + ) + + if status_changed: + await self._emit_task_event( + session, + task.tenant_id, + "task/status_changed", + { + "task_id": str(task.id), + "identifier": task.identifier, + "title": task.title, + "status": task.status, + "previous_status": previous_values.get("status"), + }, + _depth=_depth, + ) + + if task.status == TaskStatus.DONE.value: + await self._emit_task_event( + session, + task.tenant_id, + "task/completed", + { + "task_id": str(task.id), + "identifier": task.identifier, + "title": task.title, + }, + _depth=_depth, + ) + if task.created_by_user_id: + self._create_notification( + session, + user_id=task.created_by_user_id, + notification_type="task_completed", + title=f"{task.identifier} is done", + body=task.title, + resource_type="task", + resource_id=str(task.id), + data={"task_id": str(task.id), "identifier": task.identifier}, + ) + + # If agent was newly assigned and a function is linked, trigger execution + if agent_assigned and task.function_id: + await self._maybe_trigger_agent_execution(session, task, _depth=_depth) + + async def on_run_completed(self, session: AsyncSession, run: Run) -> None: + """ + Update linked task when a run finishes successfully. + + Sets task status to DONE, posts a system comment with the output + summary, and notifies the task creator. + """ + result = await session.execute( + select(Task).where(Task.run_id == run.id) + ) + task = result.scalar_one_or_none() + if task is None: + return + + try: + task.status = TaskStatus.DONE.value + + output_summary = _summarise_run_output(run) + self._create_system_comment( + session, + task_id=task.id, + content=f"Run completed successfully. {output_summary}", + comment_type="system", + agent_id=None, + run_id=run.id, + ) + + if task.created_by_user_id: + self._create_notification( + session, + user_id=task.created_by_user_id, + notification_type="task_completed", + title=f"{task.identifier} completed", + body=f"The run for '{task.title}' finished successfully.", + resource_type="task", + resource_id=str(task.id), + data={ + "task_id": str(task.id), + "identifier": task.identifier, + "run_id": str(run.id), + }, + ) + + await session.commit() + except Exception as exc: + log.warning( + "task_service.on_run_completed.error", + task_id=str(task.id), + run_id=str(run.id), + error=str(exc), + ) + await session.rollback() + + async def on_run_failed(self, session: AsyncSession, run: Run) -> None: + """ + Update linked task when a run fails. + + Sets task status to BLOCKED, posts a system comment with the error, + and notifies the task creator. + """ + result = await session.execute( + select(Task).where(Task.run_id == run.id) + ) + task = result.scalar_one_or_none() + if task is None: + return + + try: + task.status = TaskStatus.BLOCKED.value + + error_summary = _summarise_run_error(run) + self._create_system_comment( + session, + task_id=task.id, + content=f"Run failed. {error_summary}", + comment_type="system", + agent_id=None, + run_id=run.id, + ) + + if task.created_by_user_id: + self._create_notification( + session, + user_id=task.created_by_user_id, + notification_type="run_failed", + title=f"{task.identifier} is blocked", + body=f"The run for '{task.title}' failed: {error_summary}", + resource_type="task", + resource_id=str(task.id), + data={ + "task_id": str(task.id), + "identifier": task.identifier, + "run_id": str(run.id), + "error": run.error, + }, + ) + + await session.commit() + except Exception as exc: + log.warning( + "task_service.on_run_failed.error", + task_id=str(task.id), + run_id=str(run.id), + error=str(exc), + ) + await session.rollback() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + async def _maybe_trigger_agent_execution( + self, + session: AsyncSession, + task: Task, + *, + _depth: int = 0, + ) -> None: + """Resolve agent and function, then delegate to _trigger_agent_execution.""" + try: + agent_result = await session.execute( + select(Agent).where(Agent.id == task.assignee_agent_id) + ) + agent = agent_result.scalar_one_or_none() + + function_result = await session.execute( + select(Function).where(Function.id == task.function_id) + ) + function = function_result.scalar_one_or_none() + + if agent is None: + log.warning( + "task_service.trigger_agent_execution.agent_not_found", + task_id=str(task.id), + assignee_agent_id=str(task.assignee_agent_id), + ) + return + + if function is None: + log.warning( + "task_service.trigger_agent_execution.function_not_found", + task_id=str(task.id), + function_id=str(task.function_id), + ) + return + + await self._trigger_agent_execution(session, task, agent, function) + except Exception as exc: + log.warning( + "task_service.maybe_trigger_agent_execution.error", + task_id=str(task.id), + error=str(exc), + ) + + async def _trigger_agent_execution( + self, + session: AsyncSession, + task: Task, + agent: Agent, + function: Function, + ) -> None: + """ + Create a Run for the task and publish it to the stream for the Runner. + + Skips execution if an existing non-terminal run is already linked. + """ + # Guard: skip if a live run already exists for this task + if task.run_id is not None: + existing_result = await session.execute( + select(Run).where(Run.id == task.run_id) + ) + existing_run = existing_result.scalar_one_or_none() + if existing_run is not None and not existing_run.is_terminal: + log.info( + "task_service.trigger_agent_execution.skipped_active_run", + task_id=str(task.id), + run_id=str(task.run_id), + run_status=existing_run.status, + ) + return + + try: + now = datetime.utcnow() + event_id = str(uuid.uuid4()) + max_attempts = getattr(function, "retries", 2) + 1 + + run = Run( + tenant_id=task.tenant_id, + function_id=function.id, + status=RunStatus.PENDING, + trigger_type="event", + trigger_data={ + "event": { + "name": "task/assigned", + "data": { + "task_id": str(task.id), + "identifier": task.identifier, + "title": task.title, + "description": task.description, + "metadata": task.task_metadata, + }, + } + }, + attempt=1, + max_attempts=max_attempts, + ) + session.add(run) + await session.flush() # populate run.id + + task.run_id = run.id + task.status = TaskStatus.IN_PROGRESS.value + + self._create_system_comment( + session, + task_id=task.id, + content=f"Agent '{agent.name}' started working on this task", + comment_type="system", + agent_id=agent.id, + run_id=run.id, + ) + + await session.commit() + + # Publish to stream so the Runner picks it up + message = StreamMessage( + id=str(run.id), + event_name="task/assigned", + event_id=event_id, + event_data={ + "task_id": str(task.id), + "identifier": task.identifier, + "title": task.title, + "description": task.description, + "metadata": task.task_metadata, + }, + tenant_id=str(task.tenant_id), + timestamp=now, + run_id=str(run.id), + ) + await self.event_stream.publish(message) + + log.info( + "task_service.trigger_agent_execution.run_created", + task_id=str(task.id), + run_id=str(run.id), + agent_id=str(agent.id), + function_id=str(function.id), + ) + except Exception as exc: + log.warning( + "task_service.trigger_agent_execution.error", + task_id=str(task.id), + agent_id=str(agent.id), + function_id=str(function.id), + error=str(exc), + ) + await session.rollback() + + async def _emit_task_event( + self, + session: AsyncSession, + tenant_id: uuid.UUID, + event_name: str, + event_data: dict[str, Any], + *, + _depth: int = 0, + ) -> None: + """ + Persist an Event record and publish it to the stream. + + Depth tracking prevents circular automation chains from running + indefinitely. + """ + if _depth >= self.MAX_TASK_EVENT_DEPTH: + log.warning( + "task_service.emit_task_event.depth_limit_reached", + event_name=event_name, + depth=_depth, + ) + return + + enriched_data = { + **event_data, + "_task_event_depth": _depth + 1, + "_source": "automation", + } + + try: + event_id = str(uuid.uuid4()) + now = datetime.utcnow() + + event = Event( + tenant_id=tenant_id, + event_id=event_id, + name=event_name, + data=enriched_data, + timestamp=now, + received_at=now, + processed=False, + ) + session.add(event) + await session.flush() + + # Check for registered functions that match this event + functions_result = await session.execute( + select(Function).where( + Function.tenant_id == tenant_id, + Function.trigger_type == "event", + Function.trigger_value == event_name, + Function.is_active == True, # noqa: E712 + ) + ) + matching_functions = list(functions_result.scalars().all()) + + run_id: str | None = None + + if matching_functions: + # Create a pending run for the first matching function. + # Additional functions would each need their own run — callers + # creating more targeted automation should wire that up explicitly. + function = matching_functions[0] + max_attempts = getattr(function, "retries", 2) + 1 + run = Run( + tenant_id=tenant_id, + function_id=function.id, + event_id=event.id, + status=RunStatus.PENDING, + trigger_type="event", + trigger_data={"event": {"name": event_name, "data": enriched_data}}, + attempt=1, + max_attempts=max_attempts, + ) + session.add(run) + await session.flush() + run_id = str(run.id) + + event.processed = True + await session.commit() + + message = StreamMessage( + id=str(event.id), + event_name=event_name, + event_id=event_id, + event_data=enriched_data, + tenant_id=str(tenant_id), + timestamp=now, + run_id=run_id, + ) + await self.event_stream.publish(message) + + log.info( + "task_service.emit_task_event", + event_name=event_name, + event_id=event_id, + tenant_id=str(tenant_id), + depth=_depth, + matched_functions=len(matching_functions), + ) + except Exception as exc: + log.warning( + "task_service.emit_task_event.error", + event_name=event_name, + tenant_id=str(tenant_id), + error=str(exc), + ) + await session.rollback() + + def _create_notification( + self, + session: AsyncSession, + user_id: uuid.UUID, + notification_type: str, + title: str, + body: str | None, + resource_type: str | None, + resource_id: str | None, + data: dict[str, Any], + ) -> Notification: + """ + Build and stage a Notification record. + + Does NOT commit — caller is responsible for the transaction boundary. + """ + notification = Notification( + user_id=user_id, + notification_type=notification_type, + title=title, + body=body, + resource_type=resource_type, + resource_id=resource_id, + data=data, + is_read=False, + is_archived=False, + ) + session.add(notification) + return notification + + def _create_system_comment( + self, + session: AsyncSession, + task_id: uuid.UUID, + content: str, + comment_type: str, + agent_id: uuid.UUID | None, + run_id: uuid.UUID | None, + ) -> Comment: + """ + Build and stage a Comment record. + + Does NOT commit — caller is responsible for the transaction boundary. + """ + comment = Comment( + task_id=task_id, + run_id=run_id, + content=content, + comment_type=comment_type, + author_agent_id=agent_id, + author_user_id=None, + mentions=[], + reactions={}, + ) + session.add(comment) + return comment + + +# ------------------------------------------------------------------ +# Private module-level helpers +# ------------------------------------------------------------------ + + +def _summarise_run_output(run: Run) -> str: + """Return a short human-readable summary of a completed run's output.""" + if not run.output: + return "" + if isinstance(run.output, dict): + result = run.output.get("result") or run.output.get("output") or run.output.get("message") + if result and isinstance(result, str): + return result[:200] + return "" + return str(run.output)[:200] + + +def _summarise_run_error(run: Run) -> str: + """Return a short human-readable summary of a failed run's error.""" + if not run.error: + return "An unknown error occurred." + if isinstance(run.error, dict): + message = run.error.get("message") or run.error.get("error") or run.error.get("detail") + if message and isinstance(message, str): + return message[:300] + return str(run.error)[:300] + return str(run.error)[:300]