import logging import os import base64 import datetime from datetime import datetime import dotenv import pandas as pd import streamlit as st from streamlit_tags import st_tags from PyPDF2 import PdfReader, PdfWriter from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer, RecognizerResult from presidio_anonymizer import AnonymizerEngine from presidio_anonymizer.entities import OperatorConfig import tempfile import pytz st.set_page_config(page_title="Presidio PHI De-identification", layout="wide", initial_sidebar_state="expanded", menu_items={"About": "https://microsoft.github.io/presidio/"}) dotenv.load_dotenv() logger = logging.getLogger("presidio-streamlit") def get_timestamp_prefix_old() -> str: """πŸ•’ Stamps time with Central swagger!""" central = pytz.timezone("US/Central") return datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper() def get_timestamp_prefix() -> str: central = pytz.timezone("US/Central") return datetime.datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper() def nlp_engine_and_registry(model_family: str, model_path: str) -> tuple: """πŸ€– Sparks NLP models with a wink!""" registry = RecognizerRegistry() registry.load_predefined_recognizers() if model_family.lower() == "flair": from flair.models import SequenceTagger tagger = SequenceTagger.load(model_path) logger.info(f"Flair model loaded: {model_path}") return tagger, registry elif model_family.lower() == "huggingface": from transformers import pipeline nlp = pipeline("ner", model=model_path, tokenizer=model_path) logger.info(f"HuggingFace model loaded: {model_path}") return nlp, registry raise ValueError(f"Model family {model_family} unsupported") def analyzer_engine(model_family: str, model_path: str) -> AnalyzerEngine: """πŸ” Unleashes the PHI-hunting beast!""" nlp_engine, registry = nlp_engine_and_registry(model_family, model_path) return AnalyzerEngine(registry=registry) def get_supported_entities(model_family: str, model_path: str) -> list[str]: """πŸ“‹ Spills the beans on PHI targets!""" return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"] if model_family.lower() == "huggingface" else ["PERSON", "LOCATION", "ORGANIZATION"] # Feature Spotlight: πŸ•΅οΈβ€β™‚οΈ PHI Hunt Kicks Off! # Models dive into PDFs, sniffing out sensitive bits with ninja vibes! 😎 def analyze(analyzer: AnalyzerEngine, text: str, entities: list[str], language: str, score_threshold: float, return_decision_process: bool, allow_list: list[str], deny_list: list[str]) -> list: """🦸 Zaps PHI with eagle-eye precision!""" results = analyzer.analyze(text=text, entities=entities, language=language, score_threshold=score_threshold, return_decision_process=return_decision_process) filtered = [] for result in results: snippet = text[result.start:result.end].lower() if any(word.lower() in snippet for word in allow_list): continue if any(word.lower() in snippet for word in deny_list) or not deny_list: filtered.append(result) return filtered def anonymize(text: str, operator: str, analyze_results: list, mask_char: str = "*", number_of_chars: int = 15) -> dict: """πŸ•΅οΈβ€β™€οΈ Hides PHI with a magician’s flair!""" anonymizer = AnonymizerEngine() config = {"DEFAULT": OperatorConfig(operator, {})} if operator == "mask": config["DEFAULT"] = OperatorConfig(operator, {"masking_char": mask_char, "chars_to_mask": number_of_chars}) return anonymizer.anonymize(text=text, analyzer_results=analyze_results, operators=config) def create_ad_hoc_deny_list_recognizer(deny_list: list[str] = None) -> PatternRecognizer: """🚨 Sets traps for sneaky PHI rogues!""" return None if not deny_list else PatternRecognizer(supported_entity="GENERIC_PII", deny_list=deny_list) def save_pdf(pdf_input) -> str: """πŸ’Ύ Stashes PDFs in a temp vault!""" if pdf_input.size > 200 * 1024 * 1024: logger.error(f"Upload rejected: {pdf_input.name} exceeds 200MB") st.error("PDF exceeds 200MB limit") raise ValueError("PDF too big") with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf", dir="/tmp") as tmp: tmp.write(pdf_input.read()) logger.info(f"Uploaded PDF to {tmp.name}, size: {pdf_input.size} bytes") return tmp.name # Feature Spotlight: πŸ“„ PDF Wizardry Unleashed! # Uploads zip through, PHI vanishes, and out pops a safe PDF with timestamp pizzazz! ✨ def read_pdf(pdf_path: str) -> str: """πŸ“– Gobbles PDF text like candy!""" reader = PdfReader(pdf_path) text = "".join(page.extract_text() or "" + "\n" for page in reader.pages) logger.info(f"Extracted {len(text)} chars from {pdf_path}") return text def create_pdf(text: str, input_path: str, output_filename: str) -> str: """πŸ–¨οΈ Spins a new PDF with PHI-proof charm!""" reader = PdfReader(input_path) writer = PdfWriter() for page in reader.pages: writer.add_page(page) with open(output_filename, "wb") as f: writer.write(f) logger.info(f"Created PDF: {output_filename}") return output_filename # Sidebar st.sidebar.header("PHI De-identification with Presidio") model_list = [ ("flair/ner-english-large", "https://huggingface.co./flair/ner-english-large"), ("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co./obi/deid_roberta_i2b2"), ("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co./StanfordAIMI/stanford-deidentifier-base"), ] st_model = st.sidebar.selectbox("NER model", [m[0] for m in model_list], 0) st.sidebar.markdown(f"[View model]({next(url for m, url in model_list if m == st_model)})") st_model_package = st_model.split("/")[0] st_model = st_model if st_model_package.lower() != "huggingface" else "/".join(st_model.split("/")[1:]) analyzer_params = (st_model_package, st_model) st.sidebar.warning("Models may snooze briefly!") st_operator = st.sidebar.selectbox("De-id approach", ["replace", "redact", "mask"], 0) st_threshold = st.sidebar.slider("Threshold", 0.0, 1.0, 0.35) st_return_decision_process = st.sidebar.checkbox("Show analysis", False) with st.sidebar.expander("Allow/Deny lists"): st_allow_list = st_tags(label="Allowlist", text="Add word, hit enter") st_deny_list = st_tags(label="Denylist", text="Add word, hit enter") # Main col1, col2 = st.columns(2) with col1: st.subheader("Input") uploaded_file = st.file_uploader("Upload PDF", type=["pdf"], help="Max 200MB") if uploaded_file: try: logger.info(f"Upload: {uploaded_file.name}, size: {uploaded_file.size} bytes") pdf_path = save_pdf(uploaded_file) text = read_pdf(pdf_path) if not text: st.error("No text extracted") raise ValueError("Empty PDF") analyzer = analyzer_engine(*analyzer_params) st_analyze_results = analyze( analyzer=analyzer, text=text, entities=get_supported_entities(*analyzer_params), language="en", score_threshold=st_threshold, return_decision_process=st_return_decision_process, allow_list=st_allow_list, deny_list=st_deny_list, ) phi_types = set(res.entity_type for res in st_analyze_results) if phi_types: st.success(f"Zapped PHI: {', '.join(phi_types)}") else: st.info("No PHI found") anonymized_result = anonymize(text=text, operator=st_operator, analyze_results=st_analyze_results) timestamp = get_timestamp_prefix() output_filename = f"{timestamp}_{uploaded_file.name}" create_pdf(anonymized_result.text, pdf_path, output_filename) with open(output_filename, "rb") as f: b64 = base64.b64encode(f.read()).decode() st.markdown(f'Download de-identified PDF', unsafe_allow_html=True) with col2: st.subheader("Findings") if st_analyze_results: df = pd.DataFrame([r.to_dict() for r in st_analyze_results]) df["text"] = [text[r.start:r.end] for r in st_analyze_results] df_subset = df[["entity_type", "text", "start", "end", "score"]].rename( {"entity_type": "Type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence"}, axis=1 ) if st_return_decision_process: df_subset = pd.concat([df_subset, pd.DataFrame([r.analysis_explanation.to_dict() for r in st_analyze_results])], axis=1) st.dataframe(df_subset.reset_index(drop=True), use_container_width=True) else: st.text("No findings") os.remove(pdf_path) except Exception as e: st.error(f"Oops: {str(e)}") logger.error(f"Error: {str(e)}")