Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions schema/enums/regions.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
"Laayoune-Sakia El Hamra",
"Marrakesh-Safi",
"Oriental",
"Rabat-Sale-Kenitra",
"Rabat-Sale-Kenitra",
"Souss-Massa",
"Tanger-Tetouan-Al Hoceima,
"Tanger-Tetouan-Al Hoceima"
]
}
143 changes: 108 additions & 35 deletions scripts/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@
import concurrent.futures
from pathlib import Path
from datetime import datetime
from jsonschema import validate, RefResolver, ValidationError
from urllib.parse import urljoin
from referencing import Registry, Resource
from referencing.jsonschema import SchemaRegistry
from jsonschema import validate, ValidationError


def load_json_file(file_path):
"""Load and parse a JSON file."""
try:
with open(file_path, 'r') as f:
return json.load(f)
content = f.read().strip()
if not content:
print(f"Warning: {file_path} is empty")
return {"empty": True}
return json.loads(content)
except json.JSONDecodeError as e:
print(f"Error parsing {file_path}: {e}")
return None
Expand All @@ -35,68 +42,136 @@ def get_schema_for_data_folder(data_folder_name, schema_dir):
return None


def validate_file(data_file, schema_file, schema_store=None):
def validate_file(data_file, schema_file, registry=None):
"""Validate a single JSON file against its schema."""
data = load_json_file(data_file)
if data is None:
return False, f"Failed to load data file: {data_file}"

# Special handling for empty files
if isinstance(data, dict) and data.get("empty") is True:
return False, f"File is empty: {data_file}"

schema = load_json_file(schema_file)
if schema is None:
return False, f"Failed to load schema file: {schema_file}"

# Create a resolver with a store of schemas
resolver = RefResolver(
base_uri=f"file://{os.path.abspath(schema_file)}",
referrer=schema,
store=schema_store or {}
)
# Generate URIs for validation scope
schema_dir = os.path.dirname(os.path.abspath(schema_file))
schema_uri = f"file://{os.path.abspath(schema_file)}"

try:
validate(instance=data, schema=schema, resolver=resolver)
# Set up validation environment
if registry:
# Create a registry for this specific validation context with proper scoping
# Add the schema with its URI to establish the validation scope
current_schema_registry = registry.with_resource(schema_uri, Resource.from_contents(schema))

# Use the located schema resource from the registry for validation
validate(
instance=data,
schema=schema,
registry=current_schema_registry
)
else:
# Create a minimal registry for this schema if no global registry provided
local_registry = SchemaRegistry().with_resource(schema_uri, Resource.from_contents(schema))
validate(instance=data, schema=schema, registry=local_registry)
return True, None
except ValidationError as e:
return False, str(e)


def build_schema_store(schema_dir):
"""Build a schema store with all available schemas for reference resolution."""
schema_store = {}
def build_schema_registry(schema_dir):
"""Build a schema registry with all available schemas for reference resolution."""
registry = SchemaRegistry()
base_dir = f"file://{os.path.abspath(schema_dir)}/"

# Add base schema
base_schema_path = schema_dir / "base.json"
if base_schema_path.exists():
base_schema = load_json_file(base_schema_path)
if base_schema:
schema_store[f"file://{os.path.abspath(base_schema_path)}"] = base_schema

# Add component schemas
# Add with multiple URI patterns to maximize compatibility

# 1. Full URI with base_dir
base_uri = f"{base_dir}base.json"
registry = registry.with_resource(base_uri, Resource.from_contents(base_schema))

# 2. Simple filename for relative references
registry = registry.with_resource("base.json", Resource.from_contents(base_schema))

# 3. Absolute file URI
abs_uri = f"file://{os.path.abspath(base_schema_path)}"
if abs_uri != base_uri:
registry = registry.with_resource(abs_uri, Resource.from_contents(base_schema))
# Add component schemas
components_dir = schema_dir / "components"
if components_dir.exists():
for file_path in components_dir.glob("*.json"):
schema = load_json_file(file_path)
if schema:
schema_store[f"file://{os.path.abspath(file_path)}"] = schema

# Add enum schemas
file_name = file_path.name
relative_path = file_path.relative_to(schema_dir)
relative_path_str = str(relative_path).replace(os.sep, '/')

# 1. Register with full base directory URI
rel_uri = f"{base_dir}{relative_path_str}"
registry = registry.with_resource(rel_uri, Resource.from_contents(schema))

# 2. Register with just the relative path (components/file.json)
registry = registry.with_resource(relative_path_str, Resource.from_contents(schema))

# 3. Register with absolute file URI for backwards compatibility
abs_uri = f"file://{os.path.abspath(file_path)}"
if abs_uri != rel_uri:
registry = registry.with_resource(abs_uri, Resource.from_contents(schema))
# Add enum schemas
enums_dir = schema_dir / "enums"
if enums_dir.exists():
for file_path in enums_dir.glob("*.json"):
schema = load_json_file(file_path)
if schema:
schema_store[f"file://{os.path.abspath(file_path)}"] = schema

# Add main schemas
file_name = file_path.name
relative_path = file_path.relative_to(schema_dir)
relative_path_str = str(relative_path).replace(os.sep, '/')

# 1. Register with full base directory URI
rel_uri = f"{base_dir}{relative_path_str}"
registry = registry.with_resource(rel_uri, Resource.from_contents(schema))

# 2. Register with just the relative path (enums/file.json)
registry = registry.with_resource(relative_path_str, Resource.from_contents(schema))

# 3. Register with absolute file URI for backwards compatibility
abs_uri = f"file://{os.path.abspath(file_path)}"
if abs_uri != rel_uri:
registry = registry.with_resource(abs_uri, Resource.from_contents(schema))
# Add main schemas
for file_path in schema_dir.glob("*.json"):
if file_path.name != "base.json":
if file_path.name != "base.json": # base.json already added
schema = load_json_file(file_path)
if schema:
schema_store[f"file://{os.path.abspath(file_path)}"] = schema
file_name = file_path.name
relative_path = file_path.relative_to(schema_dir)
relative_path_str = str(relative_path).replace(os.sep, '/')

# 1. Register with full base directory URI
rel_uri = f"{base_dir}{relative_path_str}"
registry = registry.with_resource(rel_uri, Resource.from_contents(schema))

# 2. Register with just the filename (people.json, places.json)
registry = registry.with_resource(file_name, Resource.from_contents(schema))

# 3. Register with absolute file URI for backwards compatibility
abs_uri = f"file://{os.path.abspath(file_path)}"
if abs_uri != rel_uri:
registry = registry.with_resource(abs_uri, Resource.from_contents(schema))

return schema_store
return registry


def process_data_folder(data_folder_path, schema_file, schema_store, parallel=False):
def process_data_folder(data_folder_path, schema_file, registry, parallel=False):
"""Process all JSON files in a data folder."""
results = []
json_files = [f for f in data_folder_path.glob("*.json")]
Expand All @@ -105,7 +180,7 @@ def process_data_folder(data_folder_path, schema_file, schema_store, parallel=Fa
if parallel and len(json_files) > 1:
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {
executor.submit(validate_file, data_file, schema_file, schema_store): data_file
executor.submit(validate_file, data_file, schema_file, registry): data_file
for data_file in json_files
}

Expand All @@ -116,7 +191,7 @@ def process_data_folder(data_folder_path, schema_file, schema_store, parallel=Fa
else:
# Sequential processing
for data_file in json_files:
is_valid, error_msg = validate_file(data_file, schema_file, schema_store)
is_valid, error_msg = validate_file(data_file, schema_file, registry)
results.append((data_file, is_valid, error_msg))

return results
Expand Down Expand Up @@ -182,19 +257,17 @@ def main():
if not data_dir.exists():
print(f"Error: Data directory not found at {data_dir}")
return 1

# Check if schema directory exists
# Check if schema directory exists
if not schema_dir.exists():
print(f"Error: Schema directory not found at {schema_dir}")
return 1

# Build schema store for reference resolution
schema_store = build_schema_store(schema_dir)
# Build schema registry for reference resolution
registry = build_schema_registry(schema_dir)

# Collect all validation results
all_results = []

# Process each data folder
# Process each data folder
for data_folder_path in data_dir.iterdir():
if not data_folder_path.is_dir():
continue
Expand All @@ -211,7 +284,7 @@ def main():
folder_results = process_data_folder(
data_folder_path,
schema_file,
schema_store,
registry,
parallel=args.parallel
)

Expand Down