Spaces:
Runtime error
Runtime error
| import asyncio | |
| from typing import Any | |
| import weave | |
| from guardrails import Guard | |
| from guardrails.hub import SecretsPresent | |
| from llm_guard.input_scanners import Secrets | |
| from llm_guard.util import configure_logger | |
| from guardrails_genie.guardrails import GuardrailManager | |
| from guardrails_genie.guardrails.base import Guardrail | |
| from guardrails_genie.guardrails.secrets_detection import ( | |
| SecretsDetectionResponse, | |
| SecretsDetectionSimpleResponse, | |
| SecretsDetectionGuardrail, | |
| ) | |
| from guardrails_genie.metrics import AccuracyMetric | |
| logger = configure_logger(log_level="ERROR") | |
| class GuardrailsAISecretsDetector(Guardrail): | |
| """ | |
| A class to detect secrets using Guardrails AI. | |
| Attributes: | |
| validator (Any): The validator used for detecting secrets. | |
| """ | |
| validator: Any | |
| def __init__(self): | |
| """ | |
| Initializes the GuardrailsAISecretsDetector with a validator. | |
| """ | |
| validator = Guard().use(SecretsPresent, on_fail="fix") | |
| super().__init__(validator=validator) | |
| def scan(self, text: str) -> dict: | |
| """ | |
| Scans the given text for secrets. | |
| Args: | |
| text (str): The text to scan for secrets. | |
| Returns: | |
| dict: A dictionary containing the scan results. | |
| """ | |
| response = self.validator.validate(text) | |
| if response.validation_summaries: | |
| summary = response.validation_summaries[0] | |
| return { | |
| "has_secret": True, | |
| "detected_secrets": { | |
| str(k): v | |
| for k, v in enumerate( | |
| summary.failure_reason.splitlines()[1:], start=1 | |
| ) | |
| }, | |
| "explanation": summary.failure_reason, | |
| "modified_prompt": response.validated_output, | |
| "risk_score": 1.0, | |
| } | |
| else: | |
| return { | |
| "has_secret": False, | |
| "detected_secrets": None, | |
| "explanation": "No secrets detected in the text.", | |
| "modified_prompt": response.validated_output, | |
| "risk_score": 0.0, | |
| } | |
| def guard( | |
| self, | |
| prompt: str, | |
| return_detected_secrets: bool = True, | |
| **kwargs, | |
| ) -> SecretsDetectionResponse | SecretsDetectionResponse: | |
| """ | |
| Guards the given prompt by scanning for secrets. | |
| Args: | |
| prompt (str): The prompt to scan for secrets. | |
| return_detected_secrets (bool): Whether to return detected secrets. | |
| Returns: | |
| SecretsDetectionResponse | SecretsDetectionSimpleResponse: The response after scanning for secrets. | |
| """ | |
| results = self.scan(prompt) | |
| if return_detected_secrets: | |
| return SecretsDetectionResponse( | |
| contains_secrets=results["has_secret"], | |
| detected_secrets=results["detected_secrets"], | |
| explanation=results["explanation"], | |
| redacted_text=results["modified_prompt"], | |
| risk_score=results["risk_score"], | |
| ) | |
| else: | |
| return SecretsDetectionSimpleResponse( | |
| contains_secrets=not results["has_secret"], | |
| explanation=results["explanation"], | |
| redacted_text=results["modified_prompt"], | |
| risk_score=results["risk_score"], | |
| ) | |
| class LLMGuardSecretsDetector(Guardrail): | |
| """ | |
| A class to detect secrets using LLM Guard. | |
| Attributes: | |
| validator (Any): The validator used for detecting secrets. | |
| """ | |
| validator: Any | |
| def __init__(self): | |
| """ | |
| Initializes the LLMGuardSecretsDetector with a validator. | |
| """ | |
| validator = Secrets(redact_mode="all") | |
| super().__init__(validator=validator) | |
| def scan(self, text: str) -> dict: | |
| """ | |
| Scans the given text for secrets. | |
| Args: | |
| text (str): The text to scan for secrets. | |
| Returns: | |
| dict: A dictionary containing the scan results. | |
| """ | |
| sanitized_prompt, is_valid, risk_score = self.validator.scan(text) | |
| if is_valid: | |
| return { | |
| "has_secret": not is_valid, | |
| "detected_secrets": None, | |
| "explanation": "No secrets detected in the text.", | |
| "modified_prompt": sanitized_prompt, | |
| "risk_score": risk_score, | |
| } | |
| else: | |
| return { | |
| "has_secret": not is_valid, | |
| "detected_secrets": {}, | |
| "explanation": "This library does not return detected secrets.", | |
| "modified_prompt": sanitized_prompt, | |
| "risk_score": risk_score, | |
| } | |
| def guard( | |
| self, | |
| prompt: str, | |
| return_detected_secrets: bool = True, | |
| **kwargs, | |
| ) -> SecretsDetectionResponse | SecretsDetectionResponse: | |
| """ | |
| Guards the given prompt by scanning for secrets. | |
| Args: | |
| prompt (str): The prompt to scan for secrets. | |
| return_detected_secrets (bool): Whether to return detected secrets. | |
| Returns: | |
| SecretsDetectionResponse | SecretsDetectionSimpleResponse: The response after scanning for secrets. | |
| """ | |
| results = self.scan(prompt) | |
| if return_detected_secrets: | |
| return SecretsDetectionResponse( | |
| contains_secrets=results["has_secret"], | |
| detected_secrets=results["detected_secrets"], | |
| explanation=results["explanation"], | |
| redacted_text=results["modified_prompt"], | |
| risk_score=results["risk_score"], | |
| ) | |
| else: | |
| return SecretsDetectionSimpleResponse( | |
| contains_secrets=not results["has_secret"], | |
| explanation=results["explanation"], | |
| redacted_text=results["modified_prompt"], | |
| risk_score=results["risk_score"], | |
| ) | |
| def main(): | |
| """ | |
| Main function to initialize and evaluate the secrets detectors. | |
| """ | |
| client = weave.init("parambharat/secrets-detection") | |
| dataset = weave.ref("secrets-detection-benchmark:latest").get() | |
| llm_guard_guardrail = LLMGuardSecretsDetector() | |
| guardrails_ai_guardrail = GuardrailsAISecretsDetector() | |
| guardrails_genie_guardrail = SecretsDetectionGuardrail() | |
| all_guards = [ | |
| llm_guard_guardrail, | |
| guardrails_ai_guardrail, | |
| guardrails_genie_guardrail, | |
| ] | |
| evaluation = weave.Evaluation( | |
| dataset=dataset.rows, | |
| scorers=[AccuracyMetric()], | |
| ) | |
| for guard in all_guards: | |
| name = guard.__class__.__name__ | |
| guardrail_manager = GuardrailManager( | |
| guardrails=[ | |
| guard, | |
| ] | |
| ) | |
| results = asyncio.run( | |
| evaluation.evaluate( | |
| guardrail_manager, | |
| __weave={"display_name": f"{name}"}, | |
| ) | |
| ) | |
| print(results) | |
| if __name__ == "__main__": | |
| main() | |