Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion server/src/flowforge_server/api/routes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import uuid

from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from pydantic import BaseModel, Field
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession

from flowforge_server.api.deps import TenantWithDevFallback
from flowforge_server.db import get_session
from flowforge_server.db.models import Task, TaskPriority, TaskStatus
from flowforge_server.services.container import get_task_service

router = APIRouter(prefix="/tasks", tags=["tasks"])

Expand Down Expand Up @@ -185,6 +186,7 @@ async def get_task_board(

@router.post("", response_model=TaskResponse, status_code=201)
async def create_task(
request: Request,
data: CreateTaskRequest,
tenant: TenantWithDevFallback,
session: AsyncSession = Depends(get_session),
Expand Down Expand Up @@ -219,6 +221,13 @@ async def create_task(
await session.commit()
await session.refresh(task)

# Task automation hooks
try:
task_service = get_task_service(request)
await task_service.on_task_created(session, task)
except Exception:
pass # Automation errors should not block task creation
Comment on lines +225 to +229
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automation hook exceptions are silently swallowed. This makes it very hard to debug production automation failures (and can mask data consistency issues). At minimum, log the exception (with task_id/tenant_id) before continuing so task creation remains non-blocking but failures are observable.

Copilot uses AI. Check for mistakes.

return TaskResponse(**task.to_dict())


Expand All @@ -244,6 +253,7 @@ async def get_task(
@router.patch("/{task_id}", response_model=TaskResponse)
async def update_task(
task_id: str,
request: Request,
data: UpdateTaskRequest,
tenant: TenantWithDevFallback,
session: AsyncSession = Depends(get_session),
Expand All @@ -259,6 +269,7 @@ async def update_task(
raise HTTPException(status_code=404, detail="Task not found")

update_data = data.model_dump(exclude_unset=True)
previous_values = {k: getattr(task, k) for k in update_data}

Comment on lines 271 to 273
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous_values = {k: getattr(task, k) ...} assumes request field names match ORM attributes. For metadata, the mapped attribute is task_metadata (the DB column is named metadata), so this will capture the wrong value and any downstream change-detection will be incorrect. Consider translating metadata -> task_metadata in update_data (and previous_values) before applying updates.

Copilot uses AI. Check for mistakes.
# Convert UUID strings to UUID objects
for field in ("assignee_user_id", "assignee_agent_id", "function_id", "run_id"):
Expand All @@ -277,6 +288,13 @@ async def update_task(
await session.commit()
await session.refresh(task)

# Task automation hooks
try:
task_service = get_task_service(request)
await task_service.on_task_updated(session, task, update_data, previous_values)
except Exception:
pass # Automation errors should not block task update
Comment on lines +292 to +296
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automation hook exceptions are silently swallowed here as well. Please log the exception with context (task_id, fields changed) so automation failures can be diagnosed without impacting the task update API response.

Copilot uses AI. Check for mistakes.

return TaskResponse(**task.to_dict())


Expand Down
6 changes: 3 additions & 3 deletions server/src/flowforge_server/db/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ class Task(Base, TimestampMixin):
# Relationships
tenant: Mapped["Tenant"] = relationship("Tenant", back_populates="tasks")
assignee_user: Mapped["User | None"] = relationship(
"User", foreign_keys=[assignee_user_id]
"User", foreign_keys=[assignee_user_id], lazy="selectin"
)
assignee_agent: Mapped["Agent | None"] = relationship(
"Agent", foreign_keys=[assignee_agent_id]
"Agent", foreign_keys=[assignee_agent_id], lazy="selectin"
)
created_by: Mapped["User | None"] = relationship(
"User", foreign_keys=[created_by_user_id]
"User", foreign_keys=[created_by_user_id], lazy="selectin"
)
parent_task: Mapped["Task | None"] = relationship(
"Task", remote_side=[id], foreign_keys=[parent_task_id]
Expand Down
147 changes: 147 additions & 0 deletions server/src/flowforge_server/services/builtin_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from urllib.parse import urlparse

import httpx
from sqlalchemy import select

from flowforge_server.services.network_utils import create_ssrf_safe_client, validate_webhook_url

Expand All @@ -27,6 +28,17 @@ class BuiltinToolDefinition:
approval_timeout: str | None = None


@dataclass
class ToolContext:
"""Execution context for tools that need database access."""
session: Any # AsyncSession
run: Any # Run model
tenant_id: Any # uuid.UUID


CONTEXT_AWARE_BUILTINS = {"update_task", "comment_on_task"}


# =============================================================================
# BUILT-IN TOOL DEFINITIONS
# =============================================================================
Expand Down Expand Up @@ -139,6 +151,32 @@ class BuiltinToolDefinition:
requires_approval=True,
approval_timeout="24h", # Give users more time to respond to questions
),

# Task Management Tools
BuiltinToolDefinition(
name="update_task",
description="Update the status or priority of a task. Use to report progress on your assigned task.",
parameters={
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "Task UUID. If omitted, updates the task linked to the current run."},
"status": {"type": "string", "enum": ["todo", "in_progress", "in_review", "done", "blocked", "cancelled"], "description": "New task status"},
"priority": {"type": "string", "enum": ["urgent", "high", "medium", "low", "none"], "description": "New task priority"},
},
},
),
BuiltinToolDefinition(
name="comment_on_task",
description="Post a comment on a task. Use to report progress, findings, or ask questions.",
parameters={
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "Task UUID. If omitted, comments on the task linked to the current run."},
"content": {"type": "string", "description": "Comment text (markdown supported)"},
},
"required": ["content"],
},
),
]


Expand Down Expand Up @@ -396,3 +434,112 @@ def get_builtin_tool_definitions() -> list[BuiltinToolDefinition]:
def get_builtin_tool_names() -> list[str]:
"""Get names of all built-in tools."""
return list(TOOL_EXECUTORS.keys())


# =============================================================================
# CONTEXT-AWARE TOOL IMPLEMENTATIONS
# =============================================================================

async def execute_context_builtin_tool(
name: str,
arguments: dict[str, Any],
context: ToolContext,
) -> dict[str, Any]:
"""Execute a built-in tool that requires database/execution context."""
if name == "update_task":
return await _execute_update_task(arguments, context)
elif name == "comment_on_task":
return await _execute_comment_on_task(arguments, context)
return {"error": f"Unknown context-aware tool: {name}"}


async def _execute_update_task(
arguments: dict[str, Any],
context: ToolContext,
) -> dict[str, Any]:
"""Update a task's status or priority."""
from flowforge_server.db.models.task import Task

session = context.session
task_id = arguments.get("task_id")

if not task_id:
# Find task linked to current run
result = await session.execute(
select(Task).where(Task.run_id == context.run.id)
)
task = result.scalar_one_or_none()
if not task:
return {"error": "No task linked to the current run. Provide task_id explicitly."}
else:
result = await session.execute(
select(Task).where(
Task.id == uuid_mod.UUID(task_id),
Task.tenant_id == context.tenant_id,
)
)
task = result.scalar_one_or_none()
if not task:
return {"error": f"Task {task_id} not found"}

updated_fields = []
if "status" in arguments:
task.status = arguments["status"]
updated_fields.append(f"status={arguments['status']}")
if "priority" in arguments:
task.priority = arguments["priority"]
Comment on lines +486 to +490
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update_task assigns task.status / task.priority directly from tool arguments without validation. Even with an enum in the tool schema, callers can still send invalid values and the DB columns are String. Validate against TaskStatus/TaskPriority (and ignore/return an error for invalid values) before persisting.

Copilot uses AI. Check for mistakes.
updated_fields.append(f"priority={arguments['priority']}")

if not updated_fields:
return {"error": "No fields to update. Provide status or priority."}

await session.commit()
return {"success": True, "task_id": str(task.id), "identifier": task.identifier, "updated": updated_fields}
Comment on lines +493 to +497
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update_task commits task changes directly, bypassing the task automation hooks (TaskService.on_task_updated) that emit lifecycle events and notifications. If agents are expected to update task status/priority during execution, consider routing these updates through TaskService (or reusing the same logic) so task/status_changed (and related notifications) still fire consistently.

Copilot uses AI. Check for mistakes.


async def _execute_comment_on_task(
arguments: dict[str, Any],
context: ToolContext,
) -> dict[str, Any]:
"""Post a comment on a task."""
from flowforge_server.db.models.comment import Comment
from flowforge_server.db.models.task import Task

session = context.session
task_id = arguments.get("task_id")
content = arguments.get("content", "")

if not content:
Comment on lines +508 to +512
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When task_id is provided, the tool never verifies that the task exists and belongs to context.tenant_id before inserting the comment. Consider always loading the task with tenant scoping and returning a not-found error if it doesn't match, to prevent cross-tenant writes.

Copilot uses AI. Check for mistakes.
return {"error": "Comment content is required"}

if not task_id:
result = await session.execute(
select(Task).where(Task.run_id == context.run.id)
)
task = result.scalar_one_or_none()
if not task:
return {"error": "No task linked to the current run. Provide task_id explicitly."}
task_id = str(task.id)

# Determine agent_id from the run's function/agent context
agent_id = None
if hasattr(context.run, 'function') and context.run.function:
# Try to find the agent assigned to the task
result = await session.execute(
select(Task).where(Task.id == uuid_mod.UUID(task_id))
Comment on lines +527 to +529
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment_on_task looks up linked_task by Task.id only when determining agent_id. This should be tenant-scoped (add Task.tenant_id == context.tenant_id) to avoid cross-tenant reads/attribution if a task UUID from another tenant is provided.

Suggested change
# Try to find the agent assigned to the task
result = await session.execute(
select(Task).where(Task.id == uuid_mod.UUID(task_id))
# Try to find the agent assigned to the task within the current tenant
result = await session.execute(
select(Task).where(
Task.id == uuid_mod.UUID(task_id),
Task.tenant_id == context.tenant_id,
)

Copilot uses AI. Check for mistakes.
)
linked_task = result.scalar_one_or_none()
if linked_task and linked_task.assignee_agent_id:
agent_id = linked_task.assignee_agent_id

comment = Comment(
tenant_id=context.tenant_id,
task_id=uuid_mod.UUID(task_id) if isinstance(task_id, str) else task_id,
Comment on lines +515 to +537
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment model does not have a tenant_id column/field (see db/models/comment.py), so passing tenant_id=... here will raise at runtime when this tool is invoked. Remove tenant_id from the Comment(...) constructor and ensure tenant scoping by validating the task belongs to context.tenant_id before inserting the comment.

Suggested change
if not task_id:
result = await session.execute(
select(Task).where(Task.run_id == context.run.id)
)
task = result.scalar_one_or_none()
if not task:
return {"error": "No task linked to the current run. Provide task_id explicitly."}
task_id = str(task.id)
# Determine agent_id from the run's function/agent context
agent_id = None
if hasattr(context.run, 'function') and context.run.function:
# Try to find the agent assigned to the task
result = await session.execute(
select(Task).where(Task.id == uuid_mod.UUID(task_id))
)
linked_task = result.scalar_one_or_none()
if linked_task and linked_task.assignee_agent_id:
agent_id = linked_task.assignee_agent_id
comment = Comment(
tenant_id=context.tenant_id,
task_id=uuid_mod.UUID(task_id) if isinstance(task_id, str) else task_id,
linked_task = None
if not task_id:
result = await session.execute(
select(Task).where(
Task.run_id == context.run.id,
Task.tenant_id == context.tenant_id,
)
)
linked_task = result.scalar_one_or_none()
if not linked_task:
return {"error": "No task linked to the current run. Provide task_id explicitly."}
else:
try:
task_uuid = uuid_mod.UUID(task_id) if isinstance(task_id, str) else task_id
except (ValueError, TypeError, AttributeError):
return {"error": "Invalid task_id"}
result = await session.execute(
select(Task).where(
Task.id == task_uuid,
Task.tenant_id == context.tenant_id,
)
)
linked_task = result.scalar_one_or_none()
if not linked_task:
return {"error": "Task not found"}
task_id = str(linked_task.id)
# Determine agent_id from the run's function/agent context
agent_id = None
if hasattr(context.run, 'function') and context.run.function:
if linked_task.assignee_agent_id:
agent_id = linked_task.assignee_agent_id
comment = Comment(
task_id=linked_task.id,

Copilot uses AI. Check for mistakes.
content=content,
comment_type="comment",
author_agent_id=agent_id,
)
session.add(comment)
await session.commit()

return {"success": True, "task_id": task_id, "comment_id": str(comment.id)}
14 changes: 13 additions & 1 deletion server/src/flowforge_server/services/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
if TYPE_CHECKING:
from flowforge_server.queue import FairQueue
from flowforge_server.services.ai import AIService
from flowforge_server.services.task_service import TaskService
from flowforge_server.stream import RedisEventStream
from flowforge_server.stream.pubsub import RunEventPubSub

Expand All @@ -36,6 +37,7 @@ class ServiceContainer:
job_queue: FairQueue
event_stream: RedisEventStream
pubsub: RunEventPubSub
task_service: TaskService

# Optional flag to track initialization
_initialized: bool = field(default=False, repr=False)
Expand Down Expand Up @@ -88,17 +90,22 @@ def create_service_container() -> ServiceContainer:
from flowforge_server.queue import FairQueue
from flowforge_server.services.ai import AIService
from flowforge_server.services.providers import get_provider_registry
from flowforge_server.services.task_service import TaskService
from flowforge_server.stream import RedisEventStream
from flowforge_server.stream.pubsub import RunEventPubSub

ai_service = AIService()
ai_service.provider_registry = get_provider_registry()

event_stream = RedisEventStream()
task_service = TaskService(event_stream=event_stream)

return ServiceContainer(
ai_service=ai_service,
job_queue=FairQueue(),
event_stream=RedisEventStream(),
event_stream=event_stream,
pubsub=RunEventPubSub(),
task_service=task_service,
)


Expand Down Expand Up @@ -143,3 +150,8 @@ def get_event_stream(request: Request) -> RedisEventStream:
def get_pubsub(request: Request) -> RunEventPubSub:
"""Get the pubsub from the request."""
return get_services(request).pubsub


def get_task_service(request: Request) -> TaskService:
"""Get the task service from the request."""
return get_services(request).task_service
28 changes: 28 additions & 0 deletions server/src/flowforge_server/services/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

if TYPE_CHECKING:
from flowforge_server.services.inline_executor import InlineExecutor
from flowforge_server.stream import RedisEventStream


class Executor:
Expand Down Expand Up @@ -67,6 +68,7 @@ def __init__(
self._http_client: httpx.AsyncClient | None = None
self._ai_service: AIService | None = None
self._inline_executor: InlineExecutor | None = None
self._event_stream: RedisEventStream | None = None

async def _get_http_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client."""
Expand Down Expand Up @@ -98,6 +100,12 @@ def _get_inline_executor(self) -> "InlineExecutor":
self._inline_executor = InlineExecutor(self._get_ai_service())
return self._inline_executor

def _get_event_stream(self) -> "RedisEventStream":
if self._event_stream is None:
from flowforge_server.stream import RedisEventStream
self._event_stream = RedisEventStream()
return self._event_stream

async def start(self) -> None:
"""Start the executor service."""
print(f"[Executor] Starting {self.worker_id} with concurrency={self.concurrency}...")
Expand Down Expand Up @@ -398,6 +406,9 @@ async def _handle_result(

print(f"[Executor] Run {run.id} completed")

# Sync linked task status
await self._sync_task_on_run_end(session, run, success=True)

elif status == "step_complete":
# Step completed, process and continue
step_id = result.get("step_id")
Expand Down Expand Up @@ -488,10 +499,27 @@ async def _handle_result(

print(f"[Executor] Run {run.id} permanently failed")

# Sync linked task status
await self._sync_task_on_run_end(session, run, success=False)

else:
print(f"[Executor] Unknown result status: {status}")
await self.queue.complete(job.id)

async def _sync_task_on_run_end(
self, session: AsyncSession, run: Run, *, success: bool
) -> None:
"""Update linked task when run completes or fails."""
try:
from flowforge_server.services.task_service import TaskService
task_service = TaskService(event_stream=self._get_event_stream())
if success:
await task_service.on_run_completed(session, run)
else:
await task_service.on_run_failed(session, run)
except Exception as e:
print(f"[Executor] Task sync failed for run {run.id}: {e}")

def _sanitize_output(self, value: Any) -> Any:
"""Recursively strip null bytes from strings so PostgreSQL JSONB accepts the value."""
if isinstance(value, str):
Expand Down
Loading
Loading