Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
""" | |
Helper methods for the Presidio Streamlit app | |
""" | |
from typing import List, Optional, Tuple | |
import logging | |
import streamlit as st | |
from presidio_analyzer import ( | |
AnalyzerEngine, | |
RecognizerResult, | |
RecognizerRegistry, | |
PatternRecognizer, | |
) | |
from presidio_anonymizer import AnonymizerEngine | |
from presidio_anonymizer.entities import OperatorConfig | |
logger = logging.getLogger("presidio-streamlit") | |
def nlp_engine_and_registry( | |
model_family: str, | |
model_path: str, | |
) -> Tuple[object, RecognizerRegistry]: | |
"""Create the NLP Engine instance based on the requested model.""" | |
registry = RecognizerRegistry() | |
try: | |
if model_family.lower() == "flair": | |
from flair.models import SequenceTagger | |
tagger = SequenceTagger.load(model_path) | |
registry.load_predefined_recognizers() | |
registry.add_recognizer_from_dict({ | |
"name": "flair_recognizer", | |
"supported_language": "en", | |
"supported_entities": ["PERSON", "LOCATION", "ORGANIZATION"], | |
"model": model_path, | |
"package": "flair", | |
}) | |
return tagger, registry | |
elif model_family.lower() == "huggingface": | |
from transformers import pipeline | |
nlp = pipeline("ner", model=model_path, tokenizer=model_path) | |
registry.load_predefined_recognizers() | |
registry.add_recognizer_from_dict({ | |
"name": "huggingface_recognizer", | |
"supported_language": "en", | |
"supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"], | |
"model": model_path, | |
"package": "transformers", | |
}) | |
return nlp, registry | |
else: | |
raise ValueError(f"Model family {model_family} not supported") | |
except Exception as e: | |
logger.error(f"Error loading model {model_path} for {model_family}: {str(e)}") | |
raise RuntimeError(f"Failed to load model: {str(e)}. Ensure model is downloaded and accessible.") | |
def analyzer_engine( | |
model_family: str, | |
model_path: str, | |
) -> AnalyzerEngine: | |
"""Create the Analyzer Engine instance based on the requested model.""" | |
nlp_engine, registry = nlp_engine_and_registry(model_family, model_path) | |
analyzer = AnalyzerEngine(registry=registry) | |
return analyzer | |
def get_supported_entities(model_family: str, model_path: str) -> List[str]: | |
"""Return supported entities for the selected model.""" | |
if model_family.lower() == "huggingface": | |
return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"] | |
elif model_family.lower() == "flair": | |
return ["PERSON", "LOCATION", "ORGANIZATION"] | |
return ["PERSON", "LOCATION", "ORGANIZATION"] | |
def analyze( | |
analyzer: AnalyzerEngine, | |
text: str, | |
entities: List[str], | |
language: str, | |
score_threshold: float, | |
return_decision_process: bool, | |
allow_list: List[str], | |
deny_list: List[str], | |
) -> List[RecognizerResult]: | |
"""Analyze text for PHI entities.""" | |
try: | |
results = analyzer.analyze( | |
text=text, | |
entities=entities, | |
language=language, | |
score_threshold=score_threshold, | |
return_decision_process=return_decision_process, | |
) | |
# Apply allow and deny lists | |
filtered_results = [] | |
for result in results: | |
text_snippet = text[result.start:result.end].lower() | |
if any(word.lower() in text_snippet for word in allow_list): | |
continue | |
if any(word.lower() in text_snippet for word in deny_list): | |
filtered_results.append(result) | |
elif not deny_list: | |
filtered_results.append(result) | |
return filtered_results | |
except Exception as e: | |
logger.error(f"Analysis error: {str(e)}") | |
raise | |
def anonymize( | |
text: str, | |
operator: str, | |
analyze_results: List[RecognizerResult], | |
mask_char: str = "*", | |
number_of_chars: int = 15, | |
) -> dict: | |
"""Anonymize detected PHI entities in the text.""" | |
try: | |
anonymizer = AnonymizerEngine() | |
operator_config = { | |
"DEFAULT": OperatorConfig(operator, {}) | |
} | |
if operator == "mask": | |
operator_config["DEFAULT"] = OperatorConfig(operator, { | |
"masking_char": mask_char, | |
"chars_to_mask": number_of_chars, | |
}) | |
return anonymizer.anonymize( | |
text=text, | |
analyzer_results=analyze_results, | |
operators=operator_config, | |
) | |
except Exception as e: | |
logger.error(f"Anonymization error: {str(e)}") | |
raise | |
def create_ad_hoc_deny_list_recognizer( | |
deny_list: Optional[List[str]] = None, | |
) -> Optional[PatternRecognizer]: | |
"""Create a recognizer for deny list items.""" | |
if not deny_list: | |
return None | |
try: | |
deny_list_recognizer = PatternRecognizer( | |
supported_entity="GENERIC_PII", deny_list=deny_list | |
) | |
return deny_list_recognizer | |
except Exception as e: | |
logger.error(f"Error creating deny list recognizer: {str(e)}") | |
raise |