mirror of
https://github.com/Heretek-AI/heretek-skills.git
synced 2026-07-01 19:54:03 -04:00
f23a235aa2
Remove old triad-*, curiosity-*, governance-*, and Swarm-era skills. Add 14 new heretek-* skills covering agent dev, API, backend, frontend, Docker, monitoring, security, state, testing, debugging, migration, NATS, contributing, and memory systems. Also remove data/*.db files, docs/, and legacy .env.example.
11 KiB
11 KiB
name, description
| name | description |
|---|---|
| heretek-security | Security practices for Heretek Swarm. Use when implementing authentication, working with secrets, configuring NATS mTLS, or reviewing code for security issues. Covers zero-trust architecture, input validation, and audit trails. |
Heretek Swarm Security
Security Architecture
Zero-Trust Model
- All inter-agent messages authenticated
- No implicit trust between services
- Least privilege access principles
- Defense in depth with multiple layers
Security Layers
- Network - mTLS everywhere
- Authentication - API keys, JWTs
- Authorization - Role-based access
- Input Validation - Sanitize all inputs
- Audit Trails - Log all operations
- Secrets Management - SOPS encryption
Authentication
API Keys
# backend/heretek_swarm/security/api_keys.py
from heretek_swarm.security import verify_api_key
@router.get("/agents")
async def get_agents(api_key: str = Depends(verify_api_key)):
# API key verified
return await list_agents()
JWT Tokens
# backend/heretek_swarm/security/jwt.py
from heretek_swarm.security import create_token, verify_token
# Create token
token = create_token(
user_id="user123",
roles=["admin"],
expires_in=timedelta(hours=1)
)
# Verify token
payload = verify_token(token)
Agent Authentication
# All agent messages must be authenticated
class AgentMessage:
content: str
sender: str
recipient: str
signature: str # HMAC signature
timestamp: datetime
def verify(self) -> bool:
"""Verify message signature."""
return verify_hmac(
self.content,
self.signature,
get_agent_key(self.sender)
)
NATS mTLS
Certificate Generation
# Generate certificates
cd certs
./generate.sh
# Files created:
# - ca.pem (Certificate Authority)
# - server.pem (Server certificate)
# - server.key (Server private key)
# - client.pem (Client certificate)
# - client.key (Client private key)
Configuration
# docker-compose.yml
services:
nats:
command: >
--tls
--tls_cert=/certs/server.pem
--tls_key=/certs/server.key
--tls_ca=/certs/ca.pem
--tls_verify=true
Client Connection
# backend/heretek_swarm/gateway/nats_client.py
import nats
async def connect_nats():
nc = await nats.connect(
"nats://nats:4222",
tls={
"cert": "/certs/client.pem",
"key": "/certs/client.key",
"ca": "/certs/ca.pem"
}
)
return nc
Input Validation
Pydantic Models
from pydantic import BaseModel, Field, validator
class AgentMessage(BaseModel):
content: str = Field(..., max_length=10000)
recipient: str = Field(..., pattern=r'^[a-z_]+$')
@validator('content')
def sanitize_content(cls, v):
# Remove potentially dangerous content
return sanitize_input(v)
FastAPI Validation
from fastapi import HTTPException, Query
@router.get("/search")
async def search(
query: str = Query(..., min_length=1, max_length=100),
limit: int = Query(10, ge=1, le=100)
):
# Query validated
return await search_memories(query, limit)
Agent Message Validation
# backend/heretek_swarm/security/zero_trust.py
class ZeroTrustValidator:
def validate_message(self, message: AgentMessage) -> bool:
# Validate signature
if not message.verify():
raise InvalidSignature()
# Validate timestamp (within 5 minutes)
if abs((datetime.now() - message.timestamp).total_seconds()) > 300:
raise MessageExpired()
# Validate sender exists
if not agent_exists(message.sender):
raise UnknownAgent()
return True
Secrets Management
SOPS Encryption
# Encrypt secrets
sops --encrypt secrets/decrypted.env > secrets/encrypted.env
# Decrypt secrets
sops --decrypt secrets/encrypted.env > secrets/decrypted.env
# Edit encrypted file
sops secrets/encrypted.env
Environment Variables
# Never commit secrets
# Use .env.example as template
cp .env.example .env
# Load secrets in Docker
docker compose --env-file secrets/decrypted.env up
Secret Rotation
# Rotate API keys
./scripts/rotate_keys.sh
# Rotate database passwords
./scripts/rotate_db_password.sh
# Rotate NATS certificates
./scripts/rotate_nats_certs.sh
Audit Trails
Structured Logging
import structlog
logger = structlog.get_logger(__name__)
# Log security events
logger.info(
"agent_message_sent",
sender="explorer",
recipient="coordinator",
message_type="status",
timestamp=datetime.now().isoformat()
)
# Log authentication attempts
logger.warning(
"authentication_failed",
api_key_prefix="sk-...",
ip_address="192.168.1.100",
reason="invalid_key"
)
Audit Database
-- Audit table
CREATE TABLE audit_log (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
event_type VARCHAR(50) NOT NULL,
actor VARCHAR(100) NOT NULL,
action VARCHAR(100) NOT NULL,
resource VARCHAR(200),
details JSONB,
ip_address INET,
user_agent TEXT
);
-- Index for fast queries
CREATE INDEX idx_audit_timestamp ON audit_log(timestamp);
CREATE INDEX idx_audit_actor ON audit_log(actor);
Compliance Logging
# Log all data access
@router.get("/memories/{memory_id}")
async def get_memory(memory_id: str, user: User = Depends(get_current_user)):
memory = await get_memory_by_id(memory_id)
# Audit log
await audit_log(
action="memory_accessed",
actor=user.id,
resource=memory_id,
details={"tags": memory.tags}
)
return memory
Authorization
Role-Based Access
from enum import Enum
class Role(Enum):
ADMIN = "admin"
AGENT = "agent"
USER = "user"
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
# Check permissions
def check_permission(user: User, permission: Permission) -> bool:
return permission in ROLE_PERMISSIONS[user.role]
Agent Permissions
# Define agent capabilities
AGENT_CAPABILITIES = {
"explorer": ["read", "write", "search"],
"coder": ["read", "write", "execute"],
"sentinel": ["read", "monitor", "alert"],
"arbiter": ["read", "write", "decide"]
}
def check_agent_permission(agent: str, action: str) -> bool:
return action in AGENT_CAPABILITIES.get(agent, [])
Rate Limiting
Implementation
from heretek_swarm.security.rate_limiter import RateLimiter
# Global rate limiter
rate_limiter = RateLimiter(
max_requests=1000,
window_seconds=60
)
# Per-agent rate limiter
agent_rate_limiter = RateLimiter(
max_requests=100,
window_seconds=60,
key_func=lambda: current_agent.id
)
@router.get("/agents")
async def get_agents():
if not rate_limiter.allow():
raise HTTPException(status_code=429, detail="Rate limit exceeded")
return await list_agents()
Rate Limit Headers
@router.get("/agents")
async def get_agents():
return JSONResponse(
content=agents,
headers={
"X-RateLimit-Limit": "100",
"X-RateLimit-Remaining": "95",
"X-RateLimit-Reset": "1640995200"
}
)
Security Testing
Static Analysis
# Bandit (Python security linter)
bandit -r backend/
# Safety (dependency vulnerabilities)
safety check
# Semgrep
semgrep --config=auto backend/
Dynamic Testing
# OWASP ZAP
docker run -t owasp/zap2docker-stable zap-full-scan.py http://localhost:8000
# Nmap
nmap -sV --script vuln localhost
Penetration Testing
# SQL injection testing
sqlmap -u "http://localhost:8000/api/search?query=test" --batch
# XSS testing
# Manual testing with browser dev tools
Code Review Security Checklist
Input Validation
- All inputs validated with Pydantic
- SQL queries use parameterized queries
- File paths validated to prevent traversal
- User input sanitized before display
Authentication
- API keys verified on all endpoints
- JWT tokens validated properly
- Agent messages authenticated
- No hardcoded credentials
Authorization
- Role-based access control implemented
- Least privilege principle applied
- Sensitive operations logged
- Admin endpoints protected
Secrets
- No secrets in code
- Secrets encrypted at rest
- Secrets rotated regularly
- Access to secrets logged
Error Handling
- Errors don't leak sensitive info
- Stack traces not exposed
- Validation errors generic
- Rate limiting implemented
Security Monitoring
Alert Rules
# prometheus/alerts.yml
groups:
- name: security
rules:
- alert: HighFailedLogins
expr: rate(failed_logins_total[5m]) > 10
labels:
severity: warning
- alert: UnauthorizedAccess
expr: increase(unauthorized_access_total[1h]) > 5
labels:
severity: critical
Dashboard
# Grafana dashboard for security metrics
- Failed login attempts
- Rate limit violations
- Unauthorized access attempts
- Audit log activity
- Certificate expiration
Incident Response
Security Incident Checklist
- Detect - Identify the incident
- Contain - Limit damage
- Eradicate - Remove threat
- Recover - Restore systems
- Learn - Post-mortem analysis
Emergency Procedures
# Revoke all API keys
./scripts/revoke_all_keys.sh
# Rotate all secrets
./scripts/rotate_all_secrets.sh
# Enable maintenance mode
./scripts/maintenance_mode.sh on
# Review audit logs
docker compose exec postgres psql -U postgres -c \
"SELECT * FROM audit_log WHERE timestamp > now() - interval '1 hour' ORDER BY timestamp DESC;"
Gotchas
- Never commit secrets - Use SOPS encryption
- Always validate inputs - SQL injection, XSS, path traversal
- Authenticate all messages - Zero-trust model
- Log security events - Audit trails required
- Rotate secrets regularly - Automate rotation
- Monitor for anomalies - Set up alerts
- Test security regularly - Penetration testing
- Keep dependencies updated - Vulnerability scanning
- Use HTTPS everywhere - TLS termination
- Follow least privilege - Minimal access rights
Best Practices
- Implement defense in depth
- Use parameterized queries
- Validate all inputs
- Implement proper error handling
- Use secure defaults
- Keep security logs
- Rotate secrets regularly
- Monitor for suspicious activity
- Conduct regular security reviews
- Document security procedures