-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathsafety.py
More file actions
60 lines (42 loc) · 1.63 KB
/
safety.py
File metadata and controls
60 lines (42 loc) · 1.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
"""Safety port — the ★① pipeline that gates every SQL execution.
Airport-security model: SQL passes a *line* of layers; each returns pass /
block / needs-confirmation / rewrite. New checks (v1.5 AST validation, function
blocklist, metadata enrichment) are "one class + slot it in the line" with zero
``run_sql`` changes.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Protocol, Sequence, runtime_checkable
class Verdict(str, Enum):
PASS = "pass"
BLOCK = "block"
CONFIRM = "confirm" # ask the user before proceeding
REWRITE = "rewrite" # layer rewrote the SQL (e.g. attach LIMIT)
@dataclass
class SafetyDecision:
verdict: Verdict
sql: str # possibly rewritten
reason: str = ""
layer: str = "" # which layer decided
confirm_prompt: str = "" # populated when verdict is CONFIRM
@dataclass
class SafetyContext:
"""Knobs a layer reads (timeout, row cap). Grows over versions."""
timeout_seconds: int = 30
row_limit: int = 1000
extras: dict = field(default_factory=dict)
@runtime_checkable
class SafetyLayerPort(Protocol):
"""One check in the line. Pure: SQL in, decision out."""
@property
def name(self) -> str: ...
def check(self, sql: str, ctx: SafetyContext) -> SafetyDecision:
...
@runtime_checkable
class SafetyPipelinePort(Protocol):
"""Runs layers in order; first non-PASS short-circuits."""
def evaluate(self, sql: str, ctx: SafetyContext) -> SafetyDecision:
...
@property
def layers(self) -> Sequence[SafetyLayerPort]: ...