feat: wire tasks into event/agent execution pipeline#16
Conversation
- Add TaskService to bridge task CRUD with event/execution pipeline - Task lifecycle changes emit events (task/created, task/assigned, etc.) - Assigning a task to an agent with a function auto-triggers execution - Run completion/failure auto-updates linked task status + posts comments - Add update_task and comment_on_task built-in tools for agent use - Create notifications on task assignment and completion - Circular trigger prevention via depth counter (max 3) - Fix task model lazy-loading bug (add selectin to assignee relationships)
There was a problem hiding this comment.
Pull request overview
Wires tasks into the event/function/agent execution pipeline so task lifecycle changes can trigger automations, and run outcomes can update tasks.
Changes:
- Added
TaskServiceto emit task lifecycle events and auto-create/trigger runs when tasks are assigned to agents with linked functions. - Added context-aware built-in tools (
update_task,comment_on_task) and InlineExecutor support for executing them with DB/run context. - Synced task status/comments/notifications on run completion/failure; fixed task relationship loading behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 18 comments.
Show a summary per file
| File | Description |
|---|---|
server/src/flowforge_server/services/task_service.py |
New service for emitting task events, triggering runs, and syncing tasks from run outcomes. |
server/src/flowforge_server/services/inline_executor.py |
Adds context-aware builtin execution path (requires run + DB session). |
server/src/flowforge_server/services/executor.py |
Calls task sync on run completion/failure. |
server/src/flowforge_server/services/container.py |
Registers TaskService in the service container and exposes a FastAPI dep getter. |
server/src/flowforge_server/services/builtin_tools.py |
Adds task-management builtins + context-aware executor plumbing. |
server/src/flowforge_server/db/models/task.py |
Adjusts relationship loading strategy to avoid task-creation 500s. |
server/src/flowforge_server/api/routes/tasks.py |
Invokes task automation hooks on create/update. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| event.processed = True | ||
| await session.commit() | ||
|
|
||
| message = StreamMessage( | ||
| id=str(event.id), |
There was a problem hiding this comment.
In _emit_task_event, event.processed is set and the DB transaction is committed before publishing to the event stream. If publish() fails, the exception handler rolls back, but the commit has already persisted processed=True, so the event will appear processed even though it never made it to the stream. Align with api/routes/events.py by publishing first, then setting processed=True and committing only after a successful publish (or keep processed=False on failure).
| await session.commit() | ||
|
|
||
| # Publish to stream so the Runner picks it up | ||
| message = StreamMessage( | ||
| id=str(run.id), |
There was a problem hiding this comment.
_trigger_agent_execution commits the new Run + task.run_id/task.status changes before publishing the StreamMessage. If stream publish fails after the commit, the task will remain in_progress with a pending run that the Runner never receives. Consider publishing before commit (flush to get IDs), or on publish failure mark the run/task as failed/blocked and persist that state.
| agent_assigned = ( | ||
| "assignee_agent_id" in changes |
There was a problem hiding this comment.
agent_assigned is treated as true whenever assignee_agent_id is present in changes and non-null, even if the value didn't actually change. This can emit duplicate task/assigned events and (more importantly) re-trigger execution on no-op PATCH requests. Compare against previous_values['assignee_agent_id'] (and consider handling unassignments when the new value is None).
| agent_assigned = ( | |
| "assignee_agent_id" in changes | |
| agent_assignee_changed = ( | |
| "assignee_agent_id" in changes | |
| and changes["assignee_agent_id"] != previous_values.get("assignee_agent_id") | |
| ) | |
| agent_assigned = ( | |
| agent_assignee_changed |
| ) | ||
| user_assigned = ( | ||
| "assignee_user_id" in changes | ||
| and changes["assignee_user_id"] is not None | ||
| ) | ||
| status_changed = "status" in changes |
There was a problem hiding this comment.
user_assigned / status_changed are inferred purely from the presence of keys in changes, which means no-op updates (e.g., PATCH with the current status) can emit duplicate events/notifications. Use previous_values to confirm the value actually changed before emitting task/assigned, task/status_changed, and task/completed.
| ) | |
| user_assigned = ( | |
| "assignee_user_id" in changes | |
| and changes["assignee_user_id"] is not None | |
| ) | |
| status_changed = "status" in changes | |
| and changes["assignee_agent_id"] != previous_values.get("assignee_agent_id") | |
| ) | |
| user_assigned = ( | |
| "assignee_user_id" in changes | |
| and changes["assignee_user_id"] is not None | |
| and changes["assignee_user_id"] != previous_values.get("assignee_user_id") | |
| ) | |
| status_changed = ( | |
| "status" in changes | |
| and changes["status"] != previous_values.get("status") | |
| ) |
| if task.assignee_agent_id and task.function_id: | ||
| await self._maybe_trigger_agent_execution(session, task, _depth=_depth) | ||
|
|
There was a problem hiding this comment.
When a task is created with assignee_agent_id already set, on_task_created emits task/created but never emits task/assigned. That means automations listening for task/assigned won't fire for tasks that start life already assigned. Consider emitting task/assigned as part of creation when an assignee is present (mirroring the update path).
| session = context.session | ||
| task_id = arguments.get("task_id") | ||
| content = arguments.get("content", "") | ||
|
|
||
| if not content: |
There was a problem hiding this comment.
When task_id is provided, the tool never verifies that the task exists and belongs to context.tenant_id before inserting the comment. Consider always loading the task with tenant scoping and returning a not-found error if it doesn't match, to prevent cross-tenant writes.
| self._create_notification( | ||
| session, | ||
| user_id=task.assignee_user_id, | ||
| notification_type="task_assigned", | ||
| title=f"You've been assigned to {task.identifier}", |
There was a problem hiding this comment.
The notification is staged after _emit_task_event(...) has already committed, and on_task_updated doesn’t perform another commit on the user-assignment-only path. This means the Notification record may never be persisted. Ensure there’s a commit after staging notifications (or stage them before a commit boundary that will definitely run).
| if task.created_by_user_id: | ||
| self._create_notification( | ||
| session, | ||
| user_id=task.created_by_user_id, | ||
| notification_type="task_completed", | ||
| title=f"{task.identifier} is done", |
There was a problem hiding this comment.
Completion notifications are staged after event emission commits, and on_task_updated doesn’t commit afterward, so this notification can be dropped. Add a commit after creating notifications (or restructure so notifications are created before a commit boundary).
| try: | ||
| task_service = get_task_service(request) | ||
| await task_service.on_task_created(session, task) | ||
| except Exception: | ||
| pass # Automation errors should not block task creation |
There was a problem hiding this comment.
Automation hook exceptions are silently swallowed. This makes it very hard to debug production automation failures (and can mask data consistency issues). At minimum, log the exception (with task_id/tenant_id) before continuing so task creation remains non-blocking but failures are observable.
| try: | ||
| task_service = get_task_service(request) | ||
| await task_service.on_task_updated(session, task, update_data, previous_values) | ||
| except Exception: | ||
| pass # Automation errors should not block task update |
There was a problem hiding this comment.
Automation hook exceptions are silently swallowed here as well. Please log the exception with context (task_id, fields changed) so automation failures can be diagnosed without impacting the task update API response.
Summary
Transforms the passive Kanban board into an automation layer by wiring tasks into the event/function/agent pipeline:
task/created,task/assigned,task/status_changed,task/completed,task/blocked) that can trigger functionsupdate_taskandcomment_on_tasklet agents interact with their assigned task during executionTest plan