feat: added generators

This commit is contained in:
Andrei Zavgorodnii
2025-10-14 20:27:23 +02:00
committed by Andrei Zavgorodnii
parent 3dda3f2b79
commit 5763547cda
39 changed files with 2347 additions and 2359 deletions
+57
View File
@@ -0,0 +1,57 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
.venv/
venv/
ENV/
env/
.pytest_cache/
# Go
*.exe
*.exe~
*.dll
*.so
*.dylib
*.test
*.out
go.work
go.sum
# TypeScript / Node
node_modules/
dist/
*.tsbuildinfo
npm-debug.log*
yarn-debug.log*
yarn-error.log*
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
+22
View File
@@ -0,0 +1,22 @@
MIT License
Copyright (c) 2024 Pentagi Taxonomy Contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+165
View File
@@ -0,0 +1,165 @@
.PHONY: help all validate validate-all generate generate-python generate-go generate-typescript test test-all test-python clean clean-all bump-version
# Default target
help:
@echo "Pentagi Taxonomy Makefile"
@echo ""
@echo "Available targets:"
@echo " all VERSION=N - Run full cycle: validate → generate → test"
@echo " validate VERSION=N - Validate YAML schema for version N"
@echo " validate-all - Validate all version schemas"
@echo " generate VERSION=N - Generate code for all languages (version N)"
@echo " generate-python VERSION=N - Generate only Python code (version N)"
@echo " generate-go VERSION=N - Generate only Go code (version N)"
@echo " generate-typescript VERSION=N - Generate only TypeScript code (version N)"
@echo " test VERSION=N - Run Python tests for version N"
@echo " test-all - Run Python tests for all versions"
@echo " clean VERSION=N - Remove generated files for version N"
@echo " clean-all - Remove all generated files"
@echo " bump-version - Create new major version directory"
@echo ""
@echo "Examples:"
@echo " make all VERSION=2 # Full cycle for v2"
@echo " make generate VERSION=2"
@echo " make test VERSION=1"
@echo " make validate-all"
# Full cycle: validate, generate, and test
all:
@if [ -z "$(VERSION)" ]; then \
echo "ERROR: VERSION is required. Usage: make all VERSION=N"; \
exit 1; \
fi
@echo "Running full cycle for version $(VERSION)..."
@echo ""
@$(MAKE) validate VERSION=$(VERSION)
@echo ""
@$(MAKE) generate VERSION=$(VERSION)
@echo ""
@$(MAKE) test VERSION=$(VERSION)
@echo ""
@echo "✓ Full cycle complete for version $(VERSION)!"
# Validate schema for specific version
validate:
@if [ -z "$(VERSION)" ]; then \
echo "ERROR: VERSION is required. Usage: make validate VERSION=N"; \
exit 1; \
fi
@echo "Validating schema for version $(VERSION)..."
@python codegen/shared/validator.py v$(VERSION)/entities.yml
# Validate all version schemas
validate-all:
@echo "Validating all schemas..."
@for dir in v*/; do \
if [ -f "$$dir/entities.yml" ]; then \
echo "Validating $$dir..."; \
python codegen/shared/validator.py "$$dir/entities.yml" || exit 1; \
fi \
done
@echo "✓ All schemas validated successfully"
# Generate code for all languages (specific version)
generate: generate-python generate-go generate-typescript
# Generate Python code
generate-python:
@if [ -z "$(VERSION)" ]; then \
echo "ERROR: VERSION is required. Usage: make generate-python VERSION=N"; \
exit 1; \
fi
@python codegen/python/generate.py $(VERSION)
# Generate Go code
generate-go:
@if [ -z "$(VERSION)" ]; then \
echo "ERROR: VERSION is required. Usage: make generate-go VERSION=N"; \
exit 1; \
fi
@python codegen/go/generate.py $(VERSION)
# Generate TypeScript code
generate-typescript:
@if [ -z "$(VERSION)" ]; then \
echo "ERROR: VERSION is required. Usage: make generate-typescript VERSION=N"; \
exit 1; \
fi
@python codegen/typescript/generate.py $(VERSION)
# Test specific version (Python package only)
test: test-python
# Test all versions
test-all:
@echo "Running tests for all versions..."
@for dir in v*/; do \
if [ -f "$$dir/entities.yml" ]; then \
version=$$(basename $$dir | sed 's/v//'); \
echo "Testing version $$version..."; \
$(MAKE) test VERSION=$$version || exit 1; \
fi \
done
@echo "✓ All version tests passed"
# Test Python package
test-python:
@if [ -z "$(VERSION)" ]; then \
echo "ERROR: VERSION is required. Usage: make test-python VERSION=N"; \
exit 1; \
fi
@echo "Testing Python package for version $(VERSION)..."
@cd v$(VERSION)/python && python -m pytest tests/ -v
# Clean generated files for specific version
clean:
@if [ -z "$(VERSION)" ]; then \
echo "ERROR: VERSION is required. Usage: make clean VERSION=N"; \
exit 1; \
fi
@echo "Cleaning generated files for version $(VERSION)..."
@rm -rf v$(VERSION)/python/pentagi_taxonomy
@rm -rf v$(VERSION)/go/entities
@rm -rf v$(VERSION)/typescript/src
@rm -rf v$(VERSION)/typescript/dist
@rm -rf v$(VERSION)/typescript/node_modules
@echo "✓ Cleaned version $(VERSION)"
# Clean all generated files
clean-all:
@echo "Cleaning all generated files..."
@for dir in v*/; do \
if [ -f "$$dir/entities.yml" ]; then \
version=$$(basename $$dir | sed 's/v//'); \
echo "Cleaning version $$version..."; \
$(MAKE) clean VERSION=$$version; \
fi \
done
@echo "✓ All generated files cleaned"
# Bump to new major version
bump-version:
@echo "Reading current version from version.yml..."
@current=$$(grep 'version:' version.yml | awk '{print $$2}'); \
new=$$((current + 1)); \
echo "Current version: $$current"; \
echo "New version: $$new"; \
echo ""; \
echo "Creating v$$new/ directory..."; \
mkdir -p v$$new; \
echo "Copying entities.yml from v$$current/ to v$$new/..."; \
cp v$$current/entities.yml v$$new/entities.yml; \
echo "Updating version field in v$$new/entities.yml..."; \
sed -i.bak "s/^version: $$current/version: $$new/" v$$new/entities.yml && rm v$$new/entities.yml.bak; \
echo "Updating version.yml to version $$new..."; \
echo "# Current global taxonomy version" > version.yml; \
echo "version: $$new" >> version.yml; \
echo ""; \
echo "✓ Created new version v$$new"; \
echo ""; \
echo "Next steps:"; \
echo " 1. Edit v$$new/entities.yml with your changes"; \
echo " 2. Run: make validate VERSION=$$new"; \
echo " 3. Run: make generate VERSION=$$new"; \
echo " 4. Commit the new version"
+419
View File
@@ -0,0 +1,419 @@
# Pentagi Taxonomy
A versioned, multi-language entity taxonomy for penetration testing and security assessment tools. Define your entity schema once in YAML, and automatically generate type-safe code for Python, Go, and TypeScript.
## Overview
Pentagi Taxonomy is a code generation framework that helps you maintain consistent entity definitions across multiple programming languages. It's designed for penetration testing tools but can be adapted for any domain that requires versioned entity schemas.
**Key Features:**
- 🔄 **Single Source of Truth**: Define entities, fields, and relationships once in YAML
- 🛡️ **Type Safety**: Generate Pydantic models (Python), structs with validation (Go), and Zod schemas (TypeScript)
- 📦 **Multi-Version Support**: Maintain multiple schema versions simultaneously
-**Built-in Validation**: Comprehensive schema validation with field constraints
- 🚀 **Easy to Use**: Simple Makefile-based workflow
## Project Structure
```
pentagi-taxonomy/
├── version.yml # Current global taxonomy version
├── v1/ # Version 1 of the taxonomy
│ ├── entities.yml # Entity definitions (nodes, edges, relationships)
│ ├── python/ # Generated Python package
│ ├── go/ # Generated Go package
│ └── typescript/ # Generated TypeScript package
├── codegen/ # Code generation infrastructure
│ ├── python/
│ │ └── generate.py # Python code generator
│ ├── go/
│ │ └── generate.py # Go code generator
│ ├── typescript/
│ │ └── generate.py # TypeScript code generator
│ ├── shared/
│ │ └── validator.py # Schema validation logic
│ └── templates/ # Jinja2 templates for each language
│ ├── python/
│ ├── go/
│ └── typescript/
└── Makefile # Convenience commands for common tasks
```
## How It Works
1. **Define Schema**: Create or modify `entities.yml` in a version directory (e.g., `v1/entities.yml`)
2. **Validate**: Run validation to ensure schema correctness
3. **Generate Code**: Generate type-safe code for Python, Go, and TypeScript
4. **Use**: Import and use the generated packages in your projects
### Entity Schema Format
The `entities.yml` file defines three main sections:
#### 1. Nodes (Entities)
Nodes represent primary entities in your domain:
```yaml
nodes:
Target:
description: "A target system being assessed"
fields:
uuid:
type: string
description: "Unique identifier"
hostname:
type: string
description: "DNS hostname if known"
ip_address:
type: string
description: "IP address"
regex: "^(?:[0-9]{1,3}\\.){3}[0-9]{1,3}$"
status:
type: string
description: "Current status"
enum: [active, inactive, scanning]
risk_score:
type: float
description: "Risk score"
min: 0.0
max: 10.0
```
#### 2. Edges (Relationships)
Edges represent relationships between nodes:
```yaml
edges:
HAS_PORT:
description: "A target has a port"
fields:
timestamp:
type: timestamp
description: "When association was established"
AFFECTS:
description: "A vulnerability affects a target"
fields:
timestamp:
type: timestamp
description: "When identified"
impact:
type: string
enum: [direct, indirect]
```
#### 3. Relationships
Define which nodes can be connected by which edges:
```yaml
relationships:
- source: Target
target: Port
edges: [HAS_PORT]
- source: Vulnerability
target: Target
edges: [AFFECTS]
```
### Supported Field Types
- `string` - Text values
- `int` - Integer numbers
- `float` - Floating-point numbers
- `boolean` - True/false values
- `timestamp` - Unix timestamps (float)
- Arrays: Add `[]` suffix (e.g., `string[]`, `int[]`)
### Field Constraints
- `enum: [value1, value2]` - Restrict to enumerated values (string only)
- `regex: "pattern"` - Validate against regex pattern (string only)
- `min: value` - Minimum value (numeric types only)
- `max: value` - Maximum value (numeric types only)
- `description: "text"` - Field documentation
## Quick Start
### Prerequisites
- Python 3.8+ (for code generation)
- Make (for convenience commands)
- Language-specific tools for using generated code:
- Python: `pip`
- Go: Go 1.19+
- TypeScript: Node.js 16+
### Installation
1. Clone the repository:
```bash
git clone https://github.com/vxcontrol/pentagi-taxonomy.git
cd pentagi-taxonomy
```
2. Install code generation dependencies:
```bash
pip install -r codegen/requirements.txt
```
### Basic Usage
Generate code for the latest version (v1):
```bash
# Full workflow: validate → generate → test
make all VERSION=1
# Or step by step:
make validate VERSION=1 # Validate schema
make generate VERSION=1 # Generate code for all languages
make test VERSION=1 # Run tests
```
Generate code for a specific language:
```bash
make generate-python VERSION=1
make generate-go VERSION=1
make generate-typescript VERSION=1
```
## Makefile Commands
| Command | Description | Example |
|---------|-------------|---------|
| `make help` | Show all available commands | `make help` |
| `make all VERSION=N` | Run full cycle (validate → generate → test) | `make all VERSION=1` |
| `make validate VERSION=N` | Validate schema for version N | `make validate VERSION=1` |
| `make validate-all` | Validate all version schemas | `make validate-all` |
| `make generate VERSION=N` | Generate code for all languages | `make generate VERSION=1` |
| `make generate-python VERSION=N` | Generate only Python code | `make generate-python VERSION=1` |
| `make generate-go VERSION=N` | Generate only Go code | `make generate-go VERSION=1` |
| `make generate-typescript VERSION=N` | Generate only TypeScript code | `make generate-typescript VERSION=1` |
| `make test VERSION=N` | Run Python tests | `make test VERSION=1` |
| `make test-all` | Run tests for all versions | `make test-all` |
| `make clean VERSION=N` | Remove generated files for version N | `make clean VERSION=1` |
| `make clean-all` | Remove all generated files | `make clean-all` |
| `make bump-version` | Create new major version | `make bump-version` |
## Using Generated Code
### Python
```python
from pentagi_taxonomy import TAXONOMY_VERSION
from pentagi_taxonomy.nodes import Target, Port, Vulnerability
from pentagi_taxonomy.edges import HasPort, Affects
# Create entities with type checking and validation
target = Target(
uuid="target-123",
hostname="example.com",
ip_address="192.168.1.1",
status="active",
risk_score=7.5
)
vulnerability = Vulnerability(
uuid="vuln-456",
title="SQL Injection",
severity="critical",
cvss_score=9.8,
exploitable=True
)
# Validation happens automatically
print(f"Using taxonomy version: {TAXONOMY_VERSION}")
print(target.model_dump_json())
```
Install the Python package:
```bash
# Development mode
cd v1/python
pip install -e .
# Or from Git
pip install git+https://github.com/vxcontrol/pentagi-taxonomy.git@f08fc9160ab46feb21408a5e641c22c6cda48e45#subdirectory=v1/python
```
### Go
```go
package main
import (
"fmt"
"github.com/vxcontrol/pentagi-taxonomy/v1/go/entities"
)
func main() {
target := entities.Target{
UUID: "target-123",
Hostname: "example.com",
IPAddress: "192.168.1.1",
Status: "active",
RiskScore: 7.5,
}
// Validate the entity
if err := target.Validate(); err != nil {
panic(err)
}
fmt.Printf("Target: %+v\n", target)
}
```
## TypeScript Installation with gitpkg
### gitpkg URL Format
```
https://gitpkg.now.sh/<owner>/<repo>/<path/to/package>?<branch-or-tag>&<custom-scripts>
```
**Components:**
- `<owner>/<repo>` - GitHub repository (e.g., `vxcontrol/pentagi-taxonomy`)
- `<path/to/package>` - Subdirectory path containing `package.json` (e.g., `v1/typescript`)
- `?<branch-or-tag>` - Git reference: branch name (`main`) or tag (`v1.1.0`)
- `&<custom-scripts>` - Optional custom npm scripts to run after installation
### Automatic Build Configuration
Since gitpkg fetches source code (not compiled JavaScript), we use the `scripts.postinstall` parameter to automatically build the package after installation:
```
scripts.postinstall=npm%20install%20--ignore-scripts%20%26%26%20npm%20run%20build
```
This eliminates manual build steps - the package compiles automatically after `npm install`.
### Installation Examples
```bash
# Install from specific tag (pinned version) with automatic build
npm install 'https://gitpkg.now.sh/vxcontrol/pentagi-taxonomy/v1/typescript?v1.1.0&scripts.postinstall=npm%20install%20--ignore-scripts%20%26%26%20npm%20run%20build'
```
### Using Multiple Versions with Aliases
Add to your `package.json` with npm aliases to use multiple versions simultaneously:
```json
{
"dependencies": {
"@pentagi/taxonomy-v1": "https://gitpkg.now.sh/vxcontrol/pentagi-taxonomy/v1/typescript?main&scripts.postinstall=npm%20install%20--ignore-scripts%20%26%26%20npm%20run%20build"
}
}
```
Then import from aliased packages:
```typescript
import { TargetSchema as TargetV1 } from '@pentagi/taxonomy-v1';
```
### Updating Packages
When the taxonomy is updated on GitHub, refresh your installation:
```bash
# Clear npm cache (gitpkg caches packages)
npm cache clean --force
# Remove node_modules and reinstall
rm -rf node_modules package-lock.json
npm install
```
The `scripts.postinstall` parameter automatically rebuilds the packages after installation.
## Version Management
### Creating a New Version
To create a new major version of the taxonomy:
```bash
make bump-version
```
This will:
1. Read the current version from `version.yml`
2. Create a new version directory (e.g., `v3/`)
3. Copy the previous version's `entities.yml` as a starting point
4. Update the version number in the new `entities.yml`
5. Update `version.yml` to point to the new version
After creating a new version:
1. Edit `vN/entities.yml` with your changes
2. Validate: `make validate VERSION=N`
3. Generate code: `make generate VERSION=N`
4. Test: `make test VERSION=N`
5. Commit the new version
### Version Compatibility
Each version is independent and can be used simultaneously. This allows:
- Gradual migration between versions
- Supporting multiple API versions
- Maintaining backward compatibility
## Development Workflow
### Adding a New Entity
1. Edit `vN/entities.yml` to add your node definition
2. Validate the schema:
```bash
make validate VERSION=N
```
3. Generate code:
```bash
make generate VERSION=N
```
4. Review generated code in `vN/python/`, `vN/go/`, and `vN/typescript/`
5. Run tests:
```bash
make test VERSION=N
```
### Modifying Existing Entities
For **backward-compatible changes** (adding optional fields):
- Modify the current version's `entities.yml`
- Regenerate code
For **breaking changes** (removing fields, changing types):
- Create a new major version with `make bump-version`
- Make changes in the new version
- Keep the old version for backward compatibility
### Adding Custom Validation
You can extend the generated code with custom validation:
**Python**: Subclass the generated models and add Pydantic validators
**Go**: Add methods to the generated structs
**TypeScript**: Use Zod's refinement methods
## Testing
The Python packages include basic tests. Run them with:
```bash
# Test specific version
make test VERSION=1
# Test all versions
make test-all
# Or manually
cd 1/python
pytest tests/ -v
```
+206
View File
@@ -0,0 +1,206 @@
#!/usr/bin/env python
"""
Go code generator for pentagi-taxonomy.
Generates Go structs with validation tags from YAML entity definitions using Jinja2 templates.
"""
import sys
import re
from pathlib import Path
from typing import Any, Dict, List
from jinja2 import Environment, FileSystemLoader
# Add shared validator to path
sys.path.insert(0, str(Path(__file__).parent.parent / "shared"))
from validator import validate_schema, SchemaValidationError
def to_pascal_case(snake_case: str) -> str:
"""Convert snake_case to PascalCase."""
return ''.join(word.capitalize() for word in snake_case.split('_'))
def go_type_from_yaml(yaml_type: str) -> str:
"""Convert YAML type to Go type."""
if yaml_type.endswith("[]"):
base_type = yaml_type[:-2]
go_base = go_type_from_yaml(base_type)
return f"*[]{go_base}"
type_map = {
"string": "*string",
"int": "*int",
"float": "*float64",
"boolean": "*bool",
"timestamp": "*float64",
}
return type_map.get(yaml_type, "*string")
def is_ipv4_regex(regex: str) -> bool:
"""Check if regex pattern matches IPv4."""
ipv4_patterns = [
r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$",
r"^(?:[0-9]+\.){3}[0-9]+$",
]
normalized = regex.replace("\\\\", "\\")
return any(pattern in normalized for pattern in ipv4_patterns)
def validation_tag_from_field(field_def: Dict[str, Any]) -> str:
"""Generate validator/v10 struct tag for a field."""
yaml_type = field_def.get("type", "string")
base_type = yaml_type.rstrip("[]")
tags = ["omitempty"]
# Enum validation
if "enum" in field_def:
enum_values = field_def["enum"]
enum_str = " ".join(enum_values)
tags.append(f"oneof={enum_str}")
# Regex validation (map to built-in validators where possible)
if "regex" in field_def and "enum" not in field_def:
regex = field_def["regex"]
if is_ipv4_regex(regex):
tags.append("ipv4")
elif "email" in regex.lower():
tags.append("email")
elif regex.startswith("^https?://"):
tags.append("url")
# For other regex patterns, we'd need custom validators
# For now, skip custom regex validators in generated code
# Min/max validation
if "min" in field_def:
tags.append(f"min={field_def['min']}")
if "max" in field_def:
tags.append(f"max={field_def['max']}")
if len(tags) > 1: # More than just "omitempty"
return f'validate:"{",".join(tags)}"'
return ""
def prepare_field_for_go(field_name: str, field_def: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare field data for Go struct generation."""
go_type = go_type_from_yaml(field_def.get("type", "string"))
go_name = to_pascal_case(field_name)
# Build struct tags
json_tag = f'json:"{field_name},omitempty"'
validate_tag = validation_tag_from_field(field_def)
if validate_tag:
struct_tag = f'`{json_tag} {validate_tag}`'
else:
struct_tag = f'`{json_tag}`'
return {
"go_name": go_name,
"go_type": go_type,
"struct_tag": struct_tag,
"description": field_def.get("description", "")
}
def prepare_nodes_data(schema: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare node data for template rendering."""
nodes = {}
for node_name, node_def in schema.get("nodes", {}).items():
fields = {}
for field_name, field_def in node_def.get("fields", {}).items():
fields[field_name] = prepare_field_for_go(field_name, field_def)
nodes[node_name] = {
"description": node_def.get("description", ""),
"fields": fields
}
return nodes
def prepare_edges_data(schema: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare edge data for template rendering."""
edges = {}
for edge_name, edge_def in schema.get("edges", {}).items():
# Convert SCREAMING_SNAKE_CASE to PascalCase
class_name = ''.join(word.capitalize() for word in edge_name.split('_'))
fields = {}
for field_name, field_def in edge_def.get("fields", {}).items():
fields[field_name] = prepare_field_for_go(field_name, field_def)
edges[edge_name] = {
"class_name": class_name,
"description": edge_def.get("description", ""),
"fields": fields
}
return edges
def main():
if len(sys.argv) != 2:
print("Usage: python generate.py <version>")
print("Example: python generate.py 2")
sys.exit(1)
version_arg = sys.argv[1]
# Determine paths
codegen_dir = Path(__file__).parent.parent.parent
version_dir = codegen_dir / f"v{version_arg}"
schema_path = version_dir / "entities.yml"
output_dir = version_dir / "go"
entities_dir = output_dir / "entities"
templates_dir = codegen_dir / "codegen" / "templates" / "go"
print(f"Generating Go code for version {version_arg}...")
print(f" Schema: {schema_path}")
print(f" Output: {output_dir}")
# Validate schema
try:
schema = validate_schema(schema_path)
except SchemaValidationError as e:
print(f"✗ Schema validation failed: {e}", file=sys.stderr)
sys.exit(1)
schema_version = schema.get("version", int(version_arg))
# Create output directories
entities_dir.mkdir(parents=True, exist_ok=True)
# Setup Jinja2 environment
env = Environment(loader=FileSystemLoader(templates_dir), trim_blocks=True, lstrip_blocks=True)
# Prepare data for templates
nodes_data = prepare_nodes_data(schema)
edges_data = prepare_edges_data(schema)
# Generate files
print(" Generating go.mod...")
go_mod_template = env.get_template("go.mod.j2")
go_mod = go_mod_template.render(version=schema_version, org="vxcontrol")
(output_dir / "go.mod").write_text(go_mod)
print(" Generating entities/entities.go...")
entities_template = env.get_template("entities.go.j2")
entities_content = entities_template.render(nodes=nodes_data, edges=edges_data)
(entities_dir / "entities.go").write_text(entities_content)
print(" Generating entities/validators.go...")
validators_template = env.get_template("validators.go.j2")
validators_content = validators_template.render(nodes=nodes_data, edges=edges_data)
(entities_dir / "validators.go").write_text(validators_content)
print(f"✓ Go code generation complete!")
print(f" Generated {len(nodes_data)} node structs")
print(f" Generated {len(edges_data)} edge structs")
if __name__ == "__main__":
main()
+182
View File
@@ -0,0 +1,182 @@
#!/usr/bin/env python
"""
Python code generator for pentagi-taxonomy.
Generates Pydantic models from YAML entity definitions using Jinja2 templates.
"""
import sys
import yaml
from pathlib import Path
from typing import Any, Dict, List
from jinja2 import Environment, FileSystemLoader
# Add shared validator to path
sys.path.insert(0, str(Path(__file__).parent.parent / "shared"))
from validator import validate_schema, SchemaValidationError
def python_type_from_yaml(yaml_type: str) -> str:
"""Convert YAML type to Python type hint."""
if yaml_type.endswith("[]"):
base_type = yaml_type[:-2]
python_base = python_type_from_yaml(base_type)
return f"list[{python_base}]"
type_map = {
"string": "str",
"int": "int",
"float": "float",
"boolean": "bool",
"timestamp": "float",
}
return type_map.get(yaml_type, "str")
def prepare_field_for_pydantic(field_name: str, field_def: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare field data for Pydantic Field definition."""
yaml_type = field_def.get("type", "string")
python_type = python_type_from_yaml(yaml_type)
description = field_def.get("description", "")
# Build Field() arguments
field_args = ["None"]
field_kwargs = []
if description:
field_kwargs.append(f"description={repr(description)}")
# Add validation constraints
if "min" in field_def:
field_kwargs.append(f"ge={field_def['min']}")
if "max" in field_def:
field_kwargs.append(f"le={field_def['max']}")
# Handle enum as Literal type
if "enum" in field_def:
enum_values = field_def["enum"]
enum_str = ", ".join(repr(v) for v in enum_values)
python_type = f"Literal[{enum_str}]"
field_call = ", ".join(field_args + field_kwargs)
return {
"python_type": python_type,
"field_args": field_call,
"description": description
}
def prepare_nodes_data(schema: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare node data for template rendering."""
nodes = {}
for node_name, node_def in schema.get("nodes", {}).items():
fields = {}
for field_name, field_def in node_def.get("fields", {}).items():
fields[field_name] = prepare_field_for_pydantic(field_name, field_def)
nodes[node_name] = {
"description": node_def.get("description", ""),
"fields": fields
}
return nodes
def prepare_edges_data(schema: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare edge data for template rendering."""
edges = {}
for edge_name, edge_def in schema.get("edges", {}).items():
# Convert SCREAMING_SNAKE_CASE to PascalCase for class name
class_name = ''.join(word.capitalize() for word in edge_name.split('_'))
fields = {}
for field_name, field_def in edge_def.get("fields", {}).items():
fields[field_name] = prepare_field_for_pydantic(field_name, field_def)
edges[edge_name] = {
"class_name": class_name,
"description": edge_def.get("description", ""),
"fields": fields
}
return edges
def main():
if len(sys.argv) != 2:
print("Usage: python generate.py <version>")
print("Example: python generate.py 2")
sys.exit(1)
version_arg = sys.argv[1]
# Determine paths
codegen_dir = Path(__file__).parent.parent.parent
version_dir = codegen_dir / f"v{version_arg}"
schema_path = version_dir / "entities.yml"
output_dir = version_dir / "python" / "pentagi_taxonomy"
templates_dir = codegen_dir / "codegen" / "templates" / "python"
print(f"Generating Python code for version {version_arg}...")
print(f" Schema: {schema_path}")
print(f" Output: {output_dir}")
# Validate schema
try:
schema = validate_schema(schema_path)
except SchemaValidationError as e:
print(f"✗ Schema validation failed: {e}", file=sys.stderr)
sys.exit(1)
# Read global version from version.yml
version_yml_path = codegen_dir / "version.yml"
with open(version_yml_path, 'r') as f:
version_data = yaml.safe_load(f)
global_version = version_data.get("version", int(version_arg))
# Use the version from the schema file itself
schema_version = schema.get("version", int(version_arg))
# Create output directory
output_dir.mkdir(parents=True, exist_ok=True)
# Setup Jinja2 environment
env = Environment(loader=FileSystemLoader(templates_dir), trim_blocks=True, lstrip_blocks=True)
# Prepare data for templates
nodes_data = prepare_nodes_data(schema)
edges_data = prepare_edges_data(schema)
# Generate files
print(" Generating __init__.py...")
init_template = env.get_template("__init__.py.j2")
init_content = init_template.render(version=schema_version)
(output_dir / "__init__.py").write_text(init_content)
print(" Generating nodes.py...")
nodes_template = env.get_template("nodes.py.j2")
nodes_content = nodes_template.render(nodes=nodes_data)
(output_dir / "nodes.py").write_text(nodes_content)
print(" Generating edges.py...")
edges_template = env.get_template("edges.py.j2")
edges_content = edges_template.render(edges=edges_data)
(output_dir / "edges.py").write_text(edges_content)
print(" Generating entity_map.py...")
entity_map_template = env.get_template("entity_map.py.j2")
entity_map_content = entity_map_template.render(
nodes=nodes_data,
edges=edges_data,
relationships=schema.get("relationships", [])
)
(output_dir / "entity_map.py").write_text(entity_map_content)
print(f"✓ Python code generation complete!")
print(f" Generated {len(nodes_data)} node models")
print(f" Generated {len(edges_data)} edge models")
if __name__ == "__main__":
main()
+3
View File
@@ -0,0 +1,3 @@
PyYAML>=6.0
Jinja2>=3.1.0
+238
View File
@@ -0,0 +1,238 @@
"""
Shared schema validation logic for pentagi-taxonomy.
Used by all code generators to ensure consistent validation.
"""
import yaml
from typing import Any, Dict, List, Set
from pathlib import Path
SUPPORTED_PRIMITIVE_TYPES = {"string", "int", "float", "boolean", "timestamp"}
class SchemaValidationError(Exception):
"""Exception raised for schema validation errors."""
pass
def validate_field_type(field_name: str, field_def: Dict[str, Any], entity_type: str) -> None:
"""Validate that a field has a supported type."""
if "type" not in field_def:
raise SchemaValidationError(
f"Field '{field_name}' in {entity_type} must have a 'type' property"
)
field_type = field_def["type"]
# Check for array types
if field_type.endswith("[]"):
base_type = field_type[:-2]
if base_type not in SUPPORTED_PRIMITIVE_TYPES:
raise SchemaValidationError(
f"Field '{field_name}' in {entity_type} has unsupported array base type '{base_type}'. "
f"Supported types: {', '.join(SUPPORTED_PRIMITIVE_TYPES)}"
)
elif field_type not in SUPPORTED_PRIMITIVE_TYPES:
raise SchemaValidationError(
f"Field '{field_name}' in {entity_type} has unsupported type '{field_type}'. "
f"Supported types: {', '.join(SUPPORTED_PRIMITIVE_TYPES)} and arrays of these (e.g., 'string[]')"
)
def validate_field_constraints(field_name: str, field_def: Dict[str, Any], entity_type: str) -> None:
"""Validate field validation constraints are appropriate for the field type."""
field_type = field_def.get("type", "")
base_type = field_type.rstrip("[]")
# Validate enum constraints
if "enum" in field_def:
if base_type != "string":
raise SchemaValidationError(
f"Field '{field_name}' in {entity_type} has 'enum' constraint but type is '{field_type}'. "
f"Enum constraints are only valid for 'string' type."
)
enum_values = field_def["enum"]
if not isinstance(enum_values, list) or len(enum_values) == 0:
raise SchemaValidationError(
f"Field '{field_name}' in {entity_type} has invalid 'enum' constraint. "
f"Must be a non-empty list."
)
# Validate regex constraints
if "regex" in field_def:
if base_type != "string":
raise SchemaValidationError(
f"Field '{field_name}' in {entity_type} has 'regex' constraint but type is '{field_type}'. "
f"Regex constraints are only valid for 'string' type."
)
# Validate min/max constraints
if "min" in field_def or "max" in field_def:
if base_type not in ["int", "float", "timestamp"]:
raise SchemaValidationError(
f"Field '{field_name}' in {entity_type} has min/max constraint but type is '{field_type}'. "
f"Min/max constraints are only valid for numeric types (int, float, timestamp)."
)
# Validate min <= max
if "min" in field_def and "max" in field_def:
min_val = field_def["min"]
max_val = field_def["max"]
if min_val > max_val:
raise SchemaValidationError(
f"Field '{field_name}' in {entity_type} has min ({min_val}) > max ({max_val}). "
f"Min must be less than or equal to max."
)
def validate_relationships(schema: Dict[str, Any]) -> None:
"""Validate that all entities referenced in relationships are defined in nodes."""
if "relationships" not in schema:
return
nodes = set(schema.get("nodes", {}).keys())
edges = set(schema.get("edges", {}).keys())
for i, rel in enumerate(schema["relationships"]):
if "source" not in rel:
raise SchemaValidationError(
f"Relationship #{i+1} missing 'source' field"
)
if "target" not in rel:
raise SchemaValidationError(
f"Relationship #{i+1} missing 'target' field"
)
if "edges" not in rel:
raise SchemaValidationError(
f"Relationship #{i+1} missing 'edges' field"
)
source = rel["source"]
target = rel["target"]
rel_edges = rel["edges"]
if source not in nodes:
raise SchemaValidationError(
f"Relationship #{i+1} references undefined source node '{source}'"
)
if target not in nodes:
raise SchemaValidationError(
f"Relationship #{i+1} references undefined target node '{target}'"
)
if not isinstance(rel_edges, list) or len(rel_edges) == 0:
raise SchemaValidationError(
f"Relationship #{i+1} must have non-empty 'edges' list"
)
for edge in rel_edges:
if edge not in edges:
raise SchemaValidationError(
f"Relationship #{i+1} references undefined edge type '{edge}'"
)
def validate_schema(schema_path: Path) -> Dict[str, Any]:
"""
Validate a YAML schema file.
Args:
schema_path: Path to entities.yml file
Returns:
Parsed schema dictionary
Raises:
SchemaValidationError: If schema is invalid
"""
if not schema_path.exists():
raise SchemaValidationError(f"Schema file not found: {schema_path}")
try:
with open(schema_path, 'r') as f:
schema = yaml.safe_load(f)
except yaml.YAMLError as e:
raise SchemaValidationError(f"Invalid YAML syntax: {e}")
if not isinstance(schema, dict):
raise SchemaValidationError("Schema must be a dictionary")
# Validate version field
if "version" not in schema:
raise SchemaValidationError("Schema must have a 'version' field")
if not isinstance(schema["version"], int):
raise SchemaValidationError("Schema 'version' must be an integer")
# Validate nodes section
if "nodes" in schema:
if not isinstance(schema["nodes"], dict):
raise SchemaValidationError("'nodes' section must be a dictionary")
for node_name, node_def in schema["nodes"].items():
if not isinstance(node_def, dict):
raise SchemaValidationError(f"Node '{node_name}' definition must be a dictionary")
if "fields" not in node_def:
raise SchemaValidationError(f"Node '{node_name}' must have 'fields' property")
if not isinstance(node_def["fields"], dict):
raise SchemaValidationError(f"Node '{node_name}' 'fields' must be a dictionary")
for field_name, field_def in node_def["fields"].items():
if not isinstance(field_def, dict):
raise SchemaValidationError(
f"Field '{field_name}' in node '{node_name}' must be a dictionary"
)
validate_field_type(field_name, field_def, f"node '{node_name}'")
validate_field_constraints(field_name, field_def, f"node '{node_name}'")
# Validate edges section
if "edges" in schema:
if not isinstance(schema["edges"], dict):
raise SchemaValidationError("'edges' section must be a dictionary")
for edge_name, edge_def in schema["edges"].items():
if not isinstance(edge_def, dict):
raise SchemaValidationError(f"Edge '{edge_name}' definition must be a dictionary")
if "fields" not in edge_def:
raise SchemaValidationError(f"Edge '{edge_name}' must have 'fields' property")
if not isinstance(edge_def["fields"], dict):
raise SchemaValidationError(f"Edge '{edge_name}' 'fields' must be a dictionary")
for field_name, field_def in edge_def["fields"].items():
if not isinstance(field_def, dict):
raise SchemaValidationError(
f"Field '{field_name}' in edge '{edge_name}' must be a dictionary"
)
validate_field_type(field_name, field_def, f"edge '{edge_name}'")
validate_field_constraints(field_name, field_def, f"edge '{edge_name}'")
# Validate relationships
validate_relationships(schema)
return schema
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Usage: python validator.py <path-to-entities.yml>")
sys.exit(1)
schema_path = Path(sys.argv[1])
try:
schema = validate_schema(schema_path)
print(f"✓ Schema validation passed for version {schema['version']}")
print(f" - {len(schema.get('nodes', {}))} node types")
print(f" - {len(schema.get('edges', {}))} edge types")
print(f" - {len(schema.get('relationships', []))} relationships")
except SchemaValidationError as e:
print(f"✗ Schema validation failed: {e}", file=sys.stderr)
sys.exit(1)
+41
View File
@@ -0,0 +1,41 @@
// Auto-generated entity definitions for pentagi-taxonomy.
// DO NOT EDIT - this file is generated from entities.yml
package entities
{% for node_name, node_def in nodes.items() %}
{% if node_def.description %}
// {{ node_name }} {{ node_def.description }}
{% else %}
// {{ node_name }} entity
{% endif %}
type {{ node_name }} struct {
{% for field_name, field_def in node_def.fields.items() %}
{% if field_def.description %}
{{ field_def.go_name }} {{ field_def.go_type }} {{ field_def.struct_tag }} // {{ field_def.description }}
{% else %}
{{ field_def.go_name }} {{ field_def.go_type }} {{ field_def.struct_tag }}
{% endif %}
{% endfor %}
}
{% endfor %}
{% for edge_name, edge_def in edges.items() %}
{% if edge_def.description %}
// {{ edge_def.class_name }} {{ edge_def.description }}
{% else %}
// {{ edge_def.class_name }} edge
{% endif %}
type {{ edge_def.class_name }} struct {
{% for field_name, field_def in edge_def.fields.items() %}
{% if field_def.description %}
{{ field_def.go_name }} {{ field_def.go_type }} {{ field_def.struct_tag }} // {{ field_def.description }}
{% else %}
{{ field_def.go_name }} {{ field_def.go_type }} {{ field_def.struct_tag }}
{% endif %}
{% endfor %}
}
{% endfor %}
+8
View File
@@ -0,0 +1,8 @@
module github.com/{{ org }}/pentagi-taxonomy/v{{ version }}/go
go 1.21
require (
github.com/go-playground/validator/v10 v10.22.0
)
+34
View File
@@ -0,0 +1,34 @@
// Auto-generated validator helpers for pentagi-taxonomy.
// DO NOT EDIT - this file is generated from entities.yml
package entities
import (
"github.com/go-playground/validator/v10"
)
// Validator is the shared validator instance for all entities
var Validator *validator.Validate
func init() {
Validator = validator.New()
// Register custom validators for complex regex patterns here
// Example: Validator.RegisterValidation("cve_id", cveIDValidator)
}
{% for node_name in nodes.keys() %}
// Validate validates a {{ node_name }} entity
func (e *{{ node_name }}) Validate() error {
return Validator.Struct(e)
}
{% endfor %}
{%- for edge_name, edge_def in edges.items() %}
// Validate validates a {{ edge_def.class_name }} edge
func (e *{{ edge_def.class_name }}) Validate() error {
return Validator.Struct(e)
}
{% endfor %}
+18
View File
@@ -0,0 +1,18 @@
"""
Auto-generated Pentagi Taxonomy package.
DO NOT EDIT - this file is generated from entities.yml
"""
# Version constant (read from version.yml during generation)
TAXONOMY_VERSION: int = {{ version }}
# Re-export entity types for convenience
from .entity_map import ENTITY_TYPES, EDGE_TYPES, EDGE_TYPE_MAP
__all__ = [
'TAXONOMY_VERSION',
'ENTITY_TYPES',
'EDGE_TYPES',
'EDGE_TYPE_MAP',
]
+19
View File
@@ -0,0 +1,19 @@
"""
Auto-generated edge models for pentagi-taxonomy.
DO NOT EDIT - this file is generated from entities.yml
"""
from pydantic import BaseModel, Field
from typing import Literal
{% for edge_name, edge_def in edges.items() %}
class {{ edge_def.class_name }}(BaseModel):
{% if edge_def.description %}
"""{{ edge_def.description }}"""
{% endif %}
{% for field_name, field_def in edge_def.fields.items() %}
{{ field_name }}: {{ field_def.python_type }} | None = Field({{ field_def.field_args }})
{% endfor %}
{% endfor %}
+30
View File
@@ -0,0 +1,30 @@
"""
Auto-generated entity mappings for pentagi-taxonomy.
DO NOT EDIT - this file is generated from entities.yml
"""
{% if nodes %}
from .nodes import {{ nodes.keys() | join(', ') }}
{% endif %}
{% if edges %}
from .edges import {{ edges.values() | map(attribute='class_name') | join(', ') }}
{% endif %}
ENTITY_TYPES = {
{% for node_name in nodes.keys() %}
'{{ node_name }}': {{ node_name }},
{% endfor %}
}
EDGE_TYPES = {
{% for edge_name, edge_def in edges.items() %}
'{{ edge_name }}': {{ edge_def.class_name }},
{% endfor %}
}
EDGE_TYPE_MAP = {
{% for rel in relationships %}
('{{ rel.source }}', '{{ rel.target }}'): [{% for edge in rel.edges %}'{{ edge }}'{% if not loop.last %}, {% endif %}{% endfor %}],
{% endfor %}
}
+19
View File
@@ -0,0 +1,19 @@
"""
Auto-generated node models for pentagi-taxonomy.
DO NOT EDIT - this file is generated from entities.yml
"""
from pydantic import BaseModel, Field
from typing import Literal
{% for node_name, node_def in nodes.items() %}
class {{ node_name }}(BaseModel):
{% if node_def.description %}
"""{{ node_def.description }}"""
{% endif %}
{% for field_name, field_def in node_def.fields.items() %}
{{ field_name }}: {{ field_def.python_type }} | None = Field({{ field_def.field_args }})
{% endfor %}
{% endfor %}
+9
View File
@@ -0,0 +1,9 @@
/**
* Auto-generated index for pentagi-taxonomy.
* DO NOT EDIT - this file is generated from entities.yml
*/
export * from './schemas';
export const TAXONOMY_VERSION = {{ version }};
@@ -0,0 +1,24 @@
{
"name": "pentagi-taxonomy",
"version": "{{ version }}.0.0",
"description": "Pentagi taxonomy entities v{{ version }}",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc",
"prepublishOnly": "npm run build"
},
"dependencies": {
"zod": "^3.22.4"
},
"devDependencies": {
"typescript": "^5.3.0"
},
"keywords": [
"pentagi",
"taxonomy",
"graphiti",
"validation"
]
}
@@ -0,0 +1,41 @@
/**
* Auto-generated Zod schemas for pentagi-taxonomy.
* DO NOT EDIT - this file is generated from entities.yml
*/
import { z } from 'zod';
{% for node_name, node_def in nodes.items() %}
{% if node_def.description %}
// {{ node_def.description }}
{% endif %}
export const {{ node_name }}Schema = z.object({
{% for field_name, field_def in node_def.fields.items() %}
{% if field_def.description %}
// {{ field_def.description }}
{% endif %}
{{ field_name }}: {{ field_def.zod_schema }}.optional(),
{% endfor %}
});
export type {{ node_name }} = z.infer<typeof {{ node_name }}Schema>;
{% endfor %}
{% for edge_name, edge_def in edges.items() %}
{% if edge_def.description %}
// {{ edge_def.description }}
{% endif %}
export const {{ edge_def.class_name }}Schema = z.object({
{% for field_name, field_def in edge_def.fields.items() %}
{% if field_def.description %}
// {{ field_def.description }}
{% endif %}
{{ field_name }}: {{ field_def.zod_schema }}.optional(),
{% endfor %}
});
export type {{ edge_def.class_name }} = z.infer<typeof {{ edge_def.class_name }}Schema>;
{% endfor %}
@@ -0,0 +1,17 @@
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"lib": ["ES2020"],
"declaration": true,
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}
+187
View File
@@ -0,0 +1,187 @@
#!/usr/bin/env python
"""
TypeScript code generator for pentagi-taxonomy.
Generates Zod schemas and TypeScript types from YAML entity definitions using Jinja2 templates.
"""
import sys
import json
from pathlib import Path
from typing import Any, Dict, List
from jinja2 import Environment, FileSystemLoader
# Add shared validator to path
sys.path.insert(0, str(Path(__file__).parent.parent / "shared"))
from validator import validate_schema, SchemaValidationError
def ts_type_from_yaml(yaml_type: str) -> str:
"""Convert YAML type to TypeScript type."""
if yaml_type.endswith("[]"):
base_type = yaml_type[:-2]
ts_base = ts_type_from_yaml(base_type)
return f"{ts_base}[]"
type_map = {
"string": "string",
"int": "number",
"float": "number",
"boolean": "boolean",
"timestamp": "number",
}
return type_map.get(yaml_type, "string")
def zod_schema_from_field(field_def: Dict[str, Any]) -> str:
"""Generate Zod schema for a field."""
yaml_type = field_def.get("type", "string")
# Handle arrays
if yaml_type.endswith("[]"):
base_type = yaml_type[:-2]
base_schema = zod_schema_from_field({"type": base_type, **{k: v for k, v in field_def.items() if k != "type"}})
return f"z.array({base_schema})"
# Base type mapping
type_map = {
"string": "z.string()",
"int": "z.number().int()",
"float": "z.number()",
"boolean": "z.boolean()",
"timestamp": "z.number()",
}
schema = type_map.get(yaml_type, "z.string()")
# Apply enum constraint
if "enum" in field_def:
enum_values = field_def["enum"]
enum_str = ", ".join(json.dumps(v) for v in enum_values)
schema = f"z.enum([{enum_str}])"
# Apply regex constraint
if "regex" in field_def and "enum" not in field_def:
regex = field_def["regex"]
schema = f"z.string().regex(/{regex}/)"
# Apply min/max constraints
if "min" in field_def:
schema += f".min({field_def['min']})"
if "max" in field_def:
schema += f".max({field_def['max']})"
return schema
def prepare_field_for_zod(field_name: str, field_def: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare field data for Zod schema generation."""
return {
"zod_schema": zod_schema_from_field(field_def),
"description": field_def.get("description", "")
}
def prepare_nodes_data(schema: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare node data for template rendering."""
nodes = {}
for node_name, node_def in schema.get("nodes", {}).items():
fields = {}
for field_name, field_def in node_def.get("fields", {}).items():
fields[field_name] = prepare_field_for_zod(field_name, field_def)
nodes[node_name] = {
"description": node_def.get("description", ""),
"fields": fields
}
return nodes
def prepare_edges_data(schema: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare edge data for template rendering."""
edges = {}
for edge_name, edge_def in schema.get("edges", {}).items():
# Convert SCREAMING_SNAKE_CASE to PascalCase
class_name = ''.join(word.capitalize() for word in edge_name.split('_'))
fields = {}
for field_name, field_def in edge_def.get("fields", {}).items():
fields[field_name] = prepare_field_for_zod(field_name, field_def)
edges[edge_name] = {
"class_name": class_name,
"description": edge_def.get("description", ""),
"fields": fields
}
return edges
def main():
if len(sys.argv) != 2:
print("Usage: python generate.py <version>")
print("Example: python generate.py 2")
sys.exit(1)
version_arg = sys.argv[1]
# Determine paths
codegen_dir = Path(__file__).parent.parent.parent
version_dir = codegen_dir / f"v{version_arg}"
schema_path = version_dir / "entities.yml"
output_dir = version_dir / "typescript"
src_dir = output_dir / "src"
templates_dir = codegen_dir / "codegen" / "templates" / "typescript"
print(f"Generating TypeScript code for version {version_arg}...")
print(f" Schema: {schema_path}")
print(f" Output: {output_dir}")
# Validate schema
try:
schema = validate_schema(schema_path)
except SchemaValidationError as e:
print(f"✗ Schema validation failed: {e}", file=sys.stderr)
sys.exit(1)
schema_version = schema.get("version", int(version_arg))
# Create output directories
src_dir.mkdir(parents=True, exist_ok=True)
# Setup Jinja2 environment
env = Environment(loader=FileSystemLoader(templates_dir), trim_blocks=True, lstrip_blocks=True)
# Prepare data for templates
nodes_data = prepare_nodes_data(schema)
edges_data = prepare_edges_data(schema)
# Generate files
print(" Generating package.json...")
package_template = env.get_template("package.json.j2")
package_json = package_template.render(version=schema_version)
(output_dir / "package.json").write_text(package_json)
print(" Generating tsconfig.json...")
tsconfig_template = env.get_template("tsconfig.json.j2")
tsconfig = tsconfig_template.render()
(output_dir / "tsconfig.json").write_text(tsconfig)
print(" Generating src/schemas.ts...")
schemas_template = env.get_template("schemas.ts.j2")
schemas_content = schemas_template.render(nodes=nodes_data, edges=edges_data)
(src_dir / "schemas.ts").write_text(schemas_content)
print(" Generating src/index.ts...")
index_template = env.get_template("index.ts.j2")
index_content = index_template.render(version=schema_version)
(src_dir / "index.ts").write_text(index_content)
print(f"✓ TypeScript code generation complete!")
print(f" Generated {len(nodes_data)} node schemas")
print(f" Generated {len(edges_data)} edge schemas")
if __name__ == "__main__":
main()
-2359
View File
File diff suppressed because it is too large Load Diff
+92
View File
@@ -0,0 +1,92 @@
# Schema version (must match parent version directory)
version: 1
nodes:
Target:
description: "A target system being assessed during penetration testing"
fields:
version:
type: int
description: "Taxonomy schema version (auto-injected by Graphiti fork)"
entity_uuid:
type: string
description: "Unique identifier"
hostname:
type: string
description: "DNS hostname if known"
ip_address:
type: string
description: "IP address of the target"
regex: "^(?:[0-9]{1,3}\\.){3}[0-9]{1,3}$"
target_type:
type: string
description: "Classification of target"
enum: [host, web_service, api]
risk_score:
type: float
description: "Calculated risk score"
min: 0.0
max: 10.0
status:
type: string
description: "Current status"
enum: [active, inactive]
Port:
description: "A network port on a target system"
fields:
version:
type: int
description: "Taxonomy schema version (auto-injected by Graphiti fork)"
entity_uuid:
type: string
description: "Unique identifier"
port_number:
type: int
description: "Port number"
min: 1
max: 65535
protocol:
type: string
description: "Network protocol"
enum: [tcp, udp]
state:
type: string
description: "Port state"
enum: [open, closed, filtered]
edges:
HAS_PORT:
description: "A target has a port"
fields:
version:
type: int
description: "Taxonomy schema version (auto-injected by Graphiti fork)"
timestamp:
type: timestamp
description: "When association was established"
DISCOVERED:
description: "An action discovered an entity"
fields:
version:
type: int
description: "Taxonomy schema version (auto-injected by Graphiti fork)"
timestamp:
type: timestamp
description: "Discovery timestamp"
confidence:
type: float
description: "Confidence score"
min: 0.0
max: 1.0
method:
type: string
description: "Discovery method"
enum: [active, passive]
relationships:
- source: Target
target: Port
edges: [HAS_PORT]
+51
View File
@@ -0,0 +1,51 @@
# Pentagi Taxonomy Go Package (v1)
Auto-generated Go package containing structs with validation for pentesting entities.
## Installation
```bash
go get github.com/vxcontrol/pentagi-taxonomy/v1/go/entities
```
## Usage
```go
package main
import (
"fmt"
"github.com/vxcontrol/pentagi-taxonomy/v1/go/entities"
)
func main() {
hostname := "example.com"
ipAddr := "192.168.1.1"
targetType := "host"
riskScore := 7.5
target := entities.Target{
Hostname: &hostname,
IPAddress: &ipAddr,
TargetType: &targetType,
RiskScore: &riskScore,
}
// Validate the entity
if err := target.Validate(); err != nil {
fmt.Printf("Validation error: %v\n", err)
return
}
fmt.Println("Target is valid!")
}
```
## Development
Run tests:
```bash
go test ./... -v
```
+40
View File
@@ -0,0 +1,40 @@
// Auto-generated entity definitions for pentagi-taxonomy.
// DO NOT EDIT - this file is generated from entities.yml
package entities
// Target A target system being assessed during penetration testing
type Target struct {
Version *int `json:"version,omitempty"` // Taxonomy schema version (auto-injected by Graphiti fork)
EntityUuid *string `json:"entity_uuid,omitempty"` // Unique identifier
Hostname *string `json:"hostname,omitempty"` // DNS hostname if known
IpAddress *string `json:"ip_address,omitempty" validate:"omitempty,ipv4"` // IP address of the target
TargetType *string `json:"target_type,omitempty" validate:"omitempty,oneof=host web_service api"` // Classification of target
RiskScore *float64 `json:"risk_score,omitempty" validate:"omitempty,min=0.0,max=10.0"` // Calculated risk score
Status *string `json:"status,omitempty" validate:"omitempty,oneof=active inactive"` // Current status
}
// Port A network port on a target system
type Port struct {
Version *int `json:"version,omitempty"` // Taxonomy schema version (auto-injected by Graphiti fork)
EntityUuid *string `json:"entity_uuid,omitempty"` // Unique identifier
PortNumber *int `json:"port_number,omitempty" validate:"omitempty,min=1,max=65535"` // Port number
Protocol *string `json:"protocol,omitempty" validate:"omitempty,oneof=tcp udp"` // Network protocol
State *string `json:"state,omitempty" validate:"omitempty,oneof=open closed filtered"` // Port state
}
// HasPort A target has a port
type HasPort struct {
Version *int `json:"version,omitempty"` // Taxonomy schema version (auto-injected by Graphiti fork)
Timestamp *float64 `json:"timestamp,omitempty"` // When association was established
}
// Discovered An action discovered an entity
type Discovered struct {
Version *int `json:"version,omitempty"` // Taxonomy schema version (auto-injected by Graphiti fork)
Timestamp *float64 `json:"timestamp,omitempty"` // Discovery timestamp
Confidence *float64 `json:"confidence,omitempty" validate:"omitempty,min=0.0,max=1.0"` // Confidence score
Method *string `json:"method,omitempty" validate:"omitempty,oneof=active passive"` // Discovery method
}
+39
View File
@@ -0,0 +1,39 @@
// Auto-generated validator helpers for pentagi-taxonomy.
// DO NOT EDIT - this file is generated from entities.yml
package entities
import (
"github.com/go-playground/validator/v10"
)
// Validator is the shared validator instance for all entities
var Validator *validator.Validate
func init() {
Validator = validator.New()
// Register custom validators for complex regex patterns here
// Example: Validator.RegisterValidation("cve_id", cveIDValidator)
}
// Validate validates a Target entity
func (e *Target) Validate() error {
return Validator.Struct(e)
}
// Validate validates a Port entity
func (e *Port) Validate() error {
return Validator.Struct(e)
}
// Validate validates a HasPort edge
func (e *HasPort) Validate() error {
return Validator.Struct(e)
}
// Validate validates a Discovered edge
func (e *Discovered) Validate() error {
return Validator.Struct(e)
}
+7
View File
@@ -0,0 +1,7 @@
module github.com/vxcontrol/pentagi-taxonomy/v1/go
go 1.21
require (
github.com/go-playground/validator/v10 v10.22.0
)
+46
View File
@@ -0,0 +1,46 @@
# Pentagi Taxonomy Python Package (v1)
Auto-generated Python package containing Pydantic models for pentesting entities.
## Installation
Install directly from GitHub:
```bash
pip install git+https://github.com/vxcontrol/pentagi-taxonomy.git#subdirectory=v1/python
```
## Usage
```python
from pentagi_taxonomy import TAXONOMY_VERSION
from pentagi_taxonomy.nodes import Target, Port
from pentagi_taxonomy.edges import HasPort
# Create a target
target = Target(
hostname="example.com",
ip_address="192.168.1.1",
target_type="host",
risk_score=7.5
)
# Create a port
port = Port(
port_number=443,
protocol="tcp",
state="open"
)
print(f"Using taxonomy version: {TAXONOMY_VERSION}")
```
## Development
Run tests:
```bash
pip install -e .[dev]
pytest tests/
```
+17
View File
@@ -0,0 +1,17 @@
"""
Auto-generated Pentagi Taxonomy package.
DO NOT EDIT - this file is generated from entities.yml
"""
# Version constant (read from version.yml during generation)
TAXONOMY_VERSION: int = 1
# Re-export entity types for convenience
from .entity_map import ENTITY_TYPES, EDGE_TYPES, EDGE_TYPE_MAP
__all__ = [
'TAXONOMY_VERSION',
'ENTITY_TYPES',
'EDGE_TYPES',
'EDGE_TYPE_MAP',
]
+20
View File
@@ -0,0 +1,20 @@
"""
Auto-generated edge models for pentagi-taxonomy.
DO NOT EDIT - this file is generated from entities.yml
"""
from pydantic import BaseModel, Field
from typing import Literal
class HasPort(BaseModel):
"""A target has a port"""
version: int | None = Field(None, description='Taxonomy schema version (auto-injected by Graphiti fork)')
timestamp: float | None = Field(None, description='When association was established')
class Discovered(BaseModel):
"""An action discovered an entity"""
version: int | None = Field(None, description='Taxonomy schema version (auto-injected by Graphiti fork)')
timestamp: float | None = Field(None, description='Discovery timestamp')
confidence: float | None = Field(None, description='Confidence score', ge=0.0, le=1.0)
method: Literal['active', 'passive'] | None = Field(None, description='Discovery method')
+21
View File
@@ -0,0 +1,21 @@
"""
Auto-generated entity mappings for pentagi-taxonomy.
DO NOT EDIT - this file is generated from entities.yml
"""
from .nodes import Target, Port
from .edges import HasPort, Discovered
ENTITY_TYPES = {
'Target': Target,
'Port': Port,
}
EDGE_TYPES = {
'HAS_PORT': HasPort,
'DISCOVERED': Discovered,
}
EDGE_TYPE_MAP = {
('Target', 'Port'): ['HAS_PORT'],
}
+26
View File
@@ -0,0 +1,26 @@
"""
Auto-generated node models for pentagi-taxonomy.
DO NOT EDIT - this file is generated from entities.yml
"""
from pydantic import BaseModel, Field
from typing import Literal
class Target(BaseModel):
"""A target system being assessed during penetration testing"""
version: int | None = Field(None, description='Taxonomy schema version (auto-injected by Graphiti fork)')
entity_uuid: str | None = Field(None, description='Unique identifier')
hostname: str | None = Field(None, description='DNS hostname if known')
ip_address: str | None = Field(None, description='IP address of the target')
target_type: Literal['host', 'web_service', 'api'] | None = Field(None, description='Classification of target')
risk_score: float | None = Field(None, description='Calculated risk score', ge=0.0, le=10.0)
status: Literal['active', 'inactive'] | None = Field(None, description='Current status')
class Port(BaseModel):
"""A network port on a target system"""
version: int | None = Field(None, description='Taxonomy schema version (auto-injected by Graphiti fork)')
entity_uuid: str | None = Field(None, description='Unique identifier')
port_number: int | None = Field(None, description='Port number', ge=1, le=65535)
protocol: Literal['tcp', 'udp'] | None = Field(None, description='Network protocol')
state: Literal['open', 'closed', 'filtered'] | None = Field(None, description='Port state')
+23
View File
@@ -0,0 +1,23 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "pentagi-taxonomy"
version = "1.0.0"
description = "Pentagi taxonomy entities v1"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"pydantic>=2.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
]
[tool.setuptools.packages.find]
where = ["."]
include = ["pentagi_taxonomy*"]
+52
View File
@@ -0,0 +1,52 @@
"""
Tests for generated Python code (v1).
"""
import pytest
from pydantic import ValidationError
from pentagi_taxonomy import TAXONOMY_VERSION, ENTITY_TYPES, EDGE_TYPES
from pentagi_taxonomy.nodes import Target, Port
from pentagi_taxonomy.edges import HasPort, Discovered
def test_taxonomy_version():
"""Test that TAXONOMY_VERSION is correctly set."""
assert TAXONOMY_VERSION == 1
def test_entity_types_exported():
"""Test that entity type mappings are exported."""
assert 'Target' in ENTITY_TYPES
assert 'Port' in ENTITY_TYPES
assert ENTITY_TYPES['Target'] == Target
def test_target_validation():
"""Test Target validation."""
target = Target(
hostname="example.com",
ip_address="192.168.1.1",
target_type="host",
risk_score=5.0
)
assert target.hostname == "example.com"
def test_port_validation():
"""Test Port validation."""
port = Port(
port_number=443,
protocol="tcp",
state="open"
)
assert port.port_number == 443
def test_all_fields_optional():
"""Test that all fields are optional."""
target = Target()
assert target.version is None
port = Port()
assert port.port_number is None
+56
View File
@@ -0,0 +1,56 @@
# Pentagi Taxonomy TypeScript Package (v1)
Auto-generated TypeScript package with Zod schemas for pentesting entities.
## Installation
Install via gitpkg (with automatic build):
```bash
npm install 'https://gitpkg.now.sh/vxcontrol/pentagi-taxonomy/v1/typescript?main&scripts.postinstall=npm%20install%20--ignore-scripts%20%26%26%20npm%20run%20build'
```
Or add to package.json with an alias:
```json
{
"dependencies": {
"@pentagi/taxonomy-v1": "https://gitpkg.now.sh/vxcontrol/pentagi-taxonomy/v1/typescript?main&scripts.postinstall=npm%20install%20--ignore-scripts%20%26%26%20npm%20run%20build"
}
}
```
## Usage
```typescript
import { TargetSchema, Target, PortSchema } from 'pentagi-taxonomy';
import { TAXONOMY_VERSION } from 'pentagi-taxonomy';
// Create and validate a target
const target: Target = {
hostname: 'example.com',
ip_address: '192.168.1.1',
target_type: 'host',
risk_score: 7.5
};
// Runtime validation
const result = TargetSchema.safeParse(target);
if (result.success) {
console.log('Valid target:', result.data);
} else {
console.error('Validation errors:', result.error.errors);
}
console.log(`Using taxonomy version: ${TAXONOMY_VERSION}`);
```
## Development
Build the package:
```bash
npm install
npm run build
```
+23
View File
@@ -0,0 +1,23 @@
{
"name": "pentagi-taxonomy",
"version": "1.0.0",
"description": "Pentagi taxonomy entities v1",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc",
"prepublishOnly": "npm run build"
},
"dependencies": {
"zod": "^3.22.4"
},
"devDependencies": {
"typescript": "^5.3.0"
},
"keywords": [
"pentagi",
"taxonomy",
"graphiti",
"validation"
]
}
+8
View File
@@ -0,0 +1,8 @@
/**
* Auto-generated index for pentagi-taxonomy.
* DO NOT EDIT - this file is generated from entities.yml
*/
export * from './schemas';
export const TAXONOMY_VERSION = 1;
+68
View File
@@ -0,0 +1,68 @@
/**
* Auto-generated Zod schemas for pentagi-taxonomy.
* DO NOT EDIT - this file is generated from entities.yml
*/
import { z } from 'zod';
// A target system being assessed during penetration testing
export const TargetSchema = z.object({
// Taxonomy schema version (auto-injected by Graphiti fork)
version: z.number().int().optional(),
// Unique identifier
entity_uuid: z.string().optional(),
// DNS hostname if known
hostname: z.string().optional(),
// IP address of the target
ip_address: z.string().regex(/^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$/).optional(),
// Classification of target
target_type: z.enum(["host", "web_service", "api"]).optional(),
// Calculated risk score
risk_score: z.number().min(0.0).max(10.0).optional(),
// Current status
status: z.enum(["active", "inactive"]).optional(),
});
export type Target = z.infer<typeof TargetSchema>;
// A network port on a target system
export const PortSchema = z.object({
// Taxonomy schema version (auto-injected by Graphiti fork)
version: z.number().int().optional(),
// Unique identifier
entity_uuid: z.string().optional(),
// Port number
port_number: z.number().int().min(1).max(65535).optional(),
// Network protocol
protocol: z.enum(["tcp", "udp"]).optional(),
// Port state
state: z.enum(["open", "closed", "filtered"]).optional(),
});
export type Port = z.infer<typeof PortSchema>;
// A target has a port
export const HasPortSchema = z.object({
// Taxonomy schema version (auto-injected by Graphiti fork)
version: z.number().int().optional(),
// When association was established
timestamp: z.number().optional(),
});
export type HasPort = z.infer<typeof HasPortSchema>;
// An action discovered an entity
export const DiscoveredSchema = z.object({
// Taxonomy schema version (auto-injected by Graphiti fork)
version: z.number().int().optional(),
// Discovery timestamp
timestamp: z.number().optional(),
// Confidence score
confidence: z.number().min(0.0).max(1.0).optional(),
// Discovery method
method: z.enum(["active", "passive"]).optional(),
});
export type Discovered = z.infer<typeof DiscoveredSchema>;
+16
View File
@@ -0,0 +1,16 @@
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"lib": ["ES2020"],
"declaration": true,
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}
+3
View File
@@ -0,0 +1,3 @@
# Current global taxonomy version
version: 1