An agent that makes decisions without being auditable is a liability.
When your agent processes a payment, modifies a database, or sends an email — and something goes wrong — you need to know exactly what happened. Which agent. Which tool. Which parameters. What context led to the decision. Who approved it (if human was involved). What the agent was thinking when it made the call.
This is the compliance layer. And it’s the hardest part of deploying agents in regulated environments.
The Three Compliance Frameworks
| Framework | What It Requires | Why Agents Are Hard |
|---|---|---|
| SOC 2 | Controls over security, availability, processing integrity | Agent decisions are non-deterministic — proving “processing integrity” is difficult |
| GDPR | Right to explanation, data minimization, right to deletion | Agents ingest and process data in opaque ways — explaining a decision requires tracing an LLM’s latent space |
| PCI DSS | Cardholder data protection, access control, audit trails | Agents may inadvertently log, cache, or embed payment data in conversation history |
Each framework imposes requirements on logging, explainability, data handling, and access control. This post covers how to meet them.
Pillar 1: Audit Trails — The Complete Record
An agent audit trail must capture everything: every input, every tool call, every output, and every decision point.
The Agent Audit Record
from dataclasses import dataclass, field, asdictfrom datetime import datetimefrom typing import Anyimport jsonimport uuid
@dataclassclass AgentAuditRecord: """Complete audit record for a single agent interaction."""
# Identity audit_id: str = field(default_factory=lambda: uuid.uuid4().hex) agent_id: str agent_version: str # Prompt version, steering file hash session_id: str user_id: str
# Input timestamp: datetime = field(default_factory=datetime.utcnow) raw_input: str # Original user message sanitized_input: str # PII-redacted version input_guardrail_results: list
# Decision trace (for explainability) system_prompt_used: str steering_files_loaded: list[str] mcp_servers_connected: list[str]
# Tool calls tool_calls: list[dict] = field(default_factory=list) # Each tool call record: # { # "tool": "query_database", # "params": {"sql": "SELECT..."}, # "params_redacted": {"sql": "SELECT..."}, # PII removed # "result_preview": "3 rows returned", # "result_redacted": [...], # "duration_ms": 234, # "success": True, # "guardrail_check": "passed", # }
# Output raw_output: str sanitized_output: str output_guardrail_results: list
# Compliance metadata data_retention_days: int = 90 contains_pii: bool contains_pci: bool retention_policy: str # "standard", "pii_retention", "pci_retention"
# Human-in-loop human_approvals: list[dict] = field(default_factory=list) # { # "tool": "deploy_production", # "approved_by": "user_abc", # "approved_at": "2026-06-04T12:00:00Z", # "reasoning_summary": "Agent explained X, Y, Z" # }
def to_storage_format(self) -> dict: """Convert to format suitable for encrypted storage.""" record = asdict(self) record["timestamp"] = self.timestamp.isoformat() return record
class AuditTrailLogger: """Secure audit trail logger with encryption and immutability."""
def __init__(self, storage_backend="s3", encryption_key=None): self.storage = AuditLogStorage(storage_backend) self.encryptor = FieldEncryptor(encryption_key) if encryption_key else None
async def log_interaction(self, record: AgentAuditRecord): # Encrypt sensitive fields if self.encryptor: record.raw_input = self.encryptor.encrypt(record.raw_input) record.raw_output = self.encryptor.encrypt(record.raw_output)
# Add hash chain (immutability) prev_hash = await self.storage.get_last_hash() record.prev_hash = prev_hash record.hash = self.compute_hash(record)
# Store await self.storage.append(f"agent/{record.agent_id}/{record.session_id}", record)
def compute_hash(self, record: AgentAuditRecord) -> str: content = json.dumps(record.to_storage_format(), sort_keys=True) if record.prev_hash: content = record.prev_hash + content return hashlib.sha256(content.encode()).hexdigest()
async def get_session_trail(self, session_id: str) -> list[AgentAuditRecord]: """Retrieve complete audit trail for a session.""" return await self.storage.query(session_id=session_id)What to Log (Minimum Viable Audit)
| Event | Log | Retention |
|---|---|---|
| Session start | Agent version, user, session ID | 90 days |
| User message | Raw and sanitized | 90 days (raw PII: 30 days) |
| Tool call | Tool name, params, result | 90 days |
| Tool call approval | Approver, timestamp, context | 7 years (for regulated) |
| Agent output | Raw and sanitized | 90 days |
| Guardrail trigger | Guardrail name, action taken | 1 year |
| Error | Error type, message, stack trace | 1 year |
| Session end | Summary, cost, duration | 90 days |
Pillar 2: Explainability — Why Did the Agent Do That?
SOC 2 processing integrity and GDPR right to explanation both require that you can explain an agent’s decision. This is fundamentally at odds with how LLMs work — they don’t have decision trees.
Approach: Capture the Reasoning Trace
class ReasoningCapturer: """Capture the agent's reasoning process for explainability."""
def __init__(self): self.reasoning_log = []
async def capture_trace(self, agent_run): """Wrap an agent run to capture reasoning."""
# Hook into agent events agent_run.on("thinking", self.capture_thinking) agent_run.on("tool_call", self.capture_tool_decision) agent_run.on("tool_result", self.capture_tool_result) agent_run.on("final_output", self.capture_final_output)
def capture_thinking(self, thought: str): self.reasoning_log.append({ "type": "thought", "timestamp": datetime.utcnow().isoformat(), "content": thought, })
def capture_tool_decision(self, tool: str, args: dict): self.reasoning_log.append({ "type": "tool_decision", "timestamp": datetime.utcnow().isoformat(), "tool": tool, "args": self.redact_pii(args), "reasoning_context": self.get_current_context(), })
def capture_tool_result(self, tool: str, result: Any): self.reasoning_log.append({ "type": "tool_result", "timestamp": datetime.utcnow().isoformat(), "tool": tool, "result_preview": str(result)[:200] + "...", })
def get_explainability_report(self) -> dict: """Generate a human-readable explanation of the agent's decisions.""" return { "chain_of_thought": [ entry for entry in self.reasoning_log if entry["type"] == "thought" ], "tool_calls": [ { "step": i, "tool": entry["tool"], "why": self.infer_tool_reasoning(entry), } for i, entry in enumerate(self.reasoning_log) if entry["type"] == "tool_decision" ], "summary": self.generate_summary(), }
def infer_tool_reasoning(self, entry: dict) -> str: """From the surrounding thoughts, infer why this tool was called.""" # Find the thoughts before this tool call idx = self.reasoning_log.index(entry) preceding_thoughts = [ e["content"] for e in self.reasoning_log[max(0, idx-3):idx] if e["type"] == "thought" ] return " — ".join(preceding_thoughts) if preceding_thoughts else "No reasoning captured"GDPR Right to Explanation Response
class GDPRCompliance: """Handle GDPR right to explanation requests."""
async def generate_explanation( self, user_id: str, session_id: str, audit_trail: AuditTrailLogger, ) -> dict: """Generate a human-readable explanation for GDPR Article 22 compliance."""
records = await audit_trail.get_session_trail(session_id)
explanation = { "request_date": datetime.utcnow().isoformat(), "data_subject": user_id, "automated_decision": { "made": True, "logic_involved": "Large Language Model with tool access", "significance": self.describe_significance(records), }, "decision_sequence": [], }
for record in records: step = { "input": record.sanitized_input[:500], "tools_used": [tc["tool"] for tc in record.tool_calls], "reasoning": self.extract_reasoning(record), "output_preview": record.sanitized_output[:500], } explanation["decision_sequence"].append(step)
return explanation
async def handle_deletion_request(self, user_id: str): """GDPR right to erasure.""" await self.anonymize_user_data(user_id) await self.audit_log.log_deletion(user_id)Pillar 3: Data Handling — PII, PCI, and Retention
Agents process user data. That data often includes PII (names, emails, addresses) and sometimes PCI (credit card numbers, CVV). How you handle this data determines your compliance posture.
PII Detection and Redaction
import refrom presidio_analyzer import AnalyzerEnginefrom presidio_anonymizer import AnonymizerEngine
class PIIHandler: """Detect, redact, and manage PII in agent interactions."""
def __init__(self): self.analyzer = AnalyzerEngine() self.anonymizer = AnonymizerEngine()
# Regex patterns for fast pre-filtering self.patterns = { "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', "phone": r'\b\+?\d{1,3}[-.\s]?\(?\d{1,4}\)?[-.\s]?\d{1,4}[-.\s]?\d{1,9}\b', "ssn": r'\b\d{3}-\d{2}-\d{4}\b', "credit_card": r'\b(?:\d{4}[-\s]?){3}\d{4}\b', }
async def process_input(self, text: str, context: str = "") -> ProcessedContent: """Analyze and redact PII from agent input."""
# 1. Fast regex detection found_patterns = {} for pattern_name, pattern in self.patterns.items(): matches = re.findall(pattern, text) if matches: found_patterns[pattern_name] = matches
# 2. NLP-based detection (catches context-sensitive PII) analyzer_results = self.analyzer.analyze(text=text, language='en')
# 3. Redact redacted = self.anonymizer.anonymize( text=text, analyzer_results=analyzer_results, )
return ProcessedContent( original=text, redacted=redacted.text, detected_pii=found_patterns, has_pii=len(found_patterns) > 0 or len(analyzer_results) > 0, pii_categories=list(set( list(found_patterns.keys()) + [r.entity_type for r in analyzer_results] )), )
async def handle_pci_data(self, text: str) -> PCIResult: """Special handling for PCI DSS cardholder data."""
credit_card_pattern = r'\b(?:\d{4}[-\s]?){3}\d{4}\b' if re.search(credit_card_pattern, text): # PCI DSS Requirement 3.4: Render PAN unreadable redacted = re.sub(credit_card_pattern, '****-****-****-****', text)
# Log PCI event (separate from standard audit) await self.pci_audit_log.log_pci_event( event_type="card_data_detected", action="redacted", timestamp=datetime.utcnow(), )
# Never store full PAN return PCIResult( original_redacted=True, safe_output=redacted, pci_audit_id=uuid.uuid4().hex, )
return PCIResult(original_redacted=False, safe_output=text)Data Retention Policies
class DataRetentionManager: """Manage data retention based on data classification."""
RETENTION_RULES = { "standard": { "conversation_logs": timedelta(days=90), "tool_call_logs": timedelta(days=90), "aggregate_metrics": timedelta(days=365), }, "pii": { "conversation_logs": timedelta(days=30), # Shorter for PII "tool_call_logs": timedelta(days=90), "pii_detection_log": timedelta(days=365), }, "pci": { "conversation_logs": timedelta(days=0), # Never store raw "tool_call_logs": timedelta(days=365), # Longer for audit "pci_event_log": timedelta(days=2555), # 7 years for PCI }, }
async def apply_retention_policy(self): """Apply retention policies to all stored data.""" for data_class, rules in self.RETENTION_RULES.items(): for log_type, retention in rules.items(): cutoff = datetime.utcnow() - retention await self.storage.delete_older_than( collection=f"{data_class}/{log_type}", cutoff=cutoff, )
async def handle_deletion_request(self, user_id: str): """GDPR Article 17: Right to erasure.""" # Anonymize user-specific audit records await self.storage.anonymize_field( collection="standard/conversation_logs", field="user_id", value=user_id, replacement="ANONYMIZED", ) # Delete PII-specific records await self.storage.delete( collection="pii/conversation_logs", where={"user_id": user_id}, )Pillar 4: SOC 2 Controls for Agents
SOC 2 requires controls across five trust service criteria. Here’s how they map to agent security:
SOC 2 Control Mapping
| SOC 2 Criterion | Agent Control | Implementation |
|---|---|---|
| CC6.1 Logical access | MCP server authentication | API key + JWT per server |
| CC6.6 Security incident detection | Injection detection guardrails | Pattern + LLM-based detection |
| CC7.2 Monitoring | Audit trail logging | Full interaction logging |
| CC8.1 Change management | Prompt versioning | Git-tracked, PR-reviewed prompts |
| A1.2 Processing integrity | Tool call validation | Parameter validation + context check |
| A1.3 Error handling | Structured error responses | Graceful degradation, audit logging |
SOC 2 Evidence Collection for Agents
class SOC2EvidenceCollector: """Generate evidence for SOC 2 audits."""
async def collect_controls_evidence(self, start_date, end_date): return { "cc6_1_access_control": { "control": "MCP servers require authentication", "evidence": await self.get_auth_logs(start_date, end_date), "pass_rate": await self.calc_auth_pass_rate(), "exceptions": await self.get_auth_failures(), }, "cc6_6_injection_prevention": { "control": "Input guardrails block injection attempts", "evidence": await self.get_guardrail_logs(), "blocked_count": await self.count_blocked_injections(), "false_positive_rate": await self.calc_false_positive_rate(), }, "cc7_2_monitoring": { "control": "All agent interactions are logged", "evidence": await self.get_audit_log_coverage(), "coverage_percentage": await self.calc_log_coverage(), }, "a1_2_processing_integrity": { "control": "Tool calls are validated before execution", "evidence": await self.get_tool_validation_logs(), "validation_pass_rate": await self.calc_validation_rate(), }, }Pillar 5: PCI Compliance for Payment Agents
If your agent processes payments, PCI DSS applies. The key requirements:
class PCIComplianceLayer: """PCI DSS compliance for agent payment processing."""
# PCI Requirement 3.4: Render PAN unreadable # PCI Requirement 6.5: Secure coding (parameterized queries, input validation) # PCI Requirement 7.2: Restrict access on need-to-know # PCI Requirement 10.2: Audit trails for all access
async def process_payment_tool_call(self, params: dict) -> PaymentResult: # PCI 3.4: Never log full card numbers if "card_number" in params: pan = params["card_number"] params["card_number"] = f"****-****-****-{pan[-4:]}" # Send full PAN directly to processor, never store payment_result = await payment_gateway.charge( card_number=pan, # Direct to processor amount=params["amount"], )
# PCI 10.2: Audit trail await self.pci_audit_log.append({ "event": "payment_processed", "card_last_four": pan[-4:] if "card_number" in params else None, "amount": params.get("amount"), "result": payment_result.status, "audit_id": uuid.uuid4().hex, "timestamp": datetime.utcnow().isoformat(), # NOT included: full PAN, CVV, expiry })
return payment_result
async def validate_pci_compliance(self): """Run PCI compliance checks.""" checks = { "no_pan_in_logs": await self.check_logs_for_pan(), "no_pan_in_memory": await self.check_conversation_history_for_pan(), "tool_access_limited": await self.verify_payment_tool_permissions(), "audit_trails_complete": await self.check_audit_trail_completeness(), "retention_policy_applied": await self.verify_retention(), } return checksProduction Implementation: The Compliance Layer
class AgentComplianceLayer: """Complete compliance layer for regulated agent deployments."""
def __init__(self): self.audit = AuditTrailLogger() self.pii = PIIHandler() self.reasoning = ReasoningCapturer() self.pci = PCIComplianceLayer() self.retention = DataRetentionManager() self.gdpr = GDPRCompliance()
async def process_interaction(self, agent, user_id: str, message: str) -> str: # 1. PII/PCI detection and redaction processed_input = await self.pii.process_input(message) if processed_input.has_pii and processed_input.pci_detected: pci_result = await self.pii.handle_pci_data(message) safe_message = pci_result.safe_output else: safe_message = processed_input.redacted
# 2. Create audit record audit_record = AgentAuditRecord( agent_id=agent.id, agent_version=get_prompt_version(), session_id=agent.session_id, user_id=user_id, raw_input=message, sanitized_input=safe_message, contains_pii=processed_input.has_pii, contains_pci=processed_input.pci_detected, )
# 3. Capture reasoning await self.reasoning.capture_trace(agent)
# 4. Execute with monitoring try: response = await agent.run(safe_message)
# 5. Sanitize output processed_output = await self.pii.process_input(response) safe_response = processed_output.redacted
# 6. Complete audit record audit_record.raw_output = response audit_record.sanitized_output = safe_response audit_record.tool_calls = agent.tool_call_history audit_record.reasoning_trace = self.reasoning.reasoning_log
# 7. Store await self.audit.log_interaction(audit_record)
return safe_response
except Exception as e: audit_record.error = str(e) await self.audit.log_interaction(audit_record) raiseProduction Checklist
- Every interaction has a complete audit record (input, tools, output)
- Audit records are encrypted at rest
- Audit records use hash chains for immutability
- PII is detected and redacted before storage
- PCI card data is never stored (truncated to last 4 digits)
- PCI events have separate, longer-retention audit logs
- Reasoning traces are captured for explainability
- GDPR right to explanation endpoint returns structured decision trace
- GDPR right to erasure anonymizes user data
- Data retention policies are applied automatically (30/90/365/2555 days)
- SOC 2 control evidence can be collected on demand
- Audit logs are accessible but append-only (no deletion)
Next in the Series
| Post | Topic |
|---|---|
| 1 | Prompt Injection & Defense |
| 2 | Tool Access Control |
| 3 | MCP Server Security |
| 4 | Agent Auditing & Compliance (this) |
| 5 | Production Security Patterns — coming next |
Series: Agent Security 2026 — Production Patterns. Post 4: Agent Auditing & Compliance — SOC2, GDPR, PCI for AI agents.
Advertisement
Advertisement