Markit_v2 / src /ui /ui.py
AnseMin's picture
Modifying the UI
49c5606
import gradio as gr
import markdown
import threading
import time
import logging
from pathlib import Path
from src.core.converter import convert_file, set_cancellation_flag, is_conversion_in_progress
from src.parsers.parser_registry import ParserRegistry
# Import MarkItDown to check if it's available
try:
from markitdown import MarkItDown
HAS_MARKITDOWN = True
logging.info("MarkItDown is available for use")
except ImportError:
HAS_MARKITDOWN = False
logging.warning("MarkItDown is not available")
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Add a global variable to track cancellation state
conversion_cancelled = threading.Event()
# Pass the cancellation flag to the converter module
set_cancellation_flag(conversion_cancelled)
# Add a background thread to monitor cancellation
def monitor_cancellation():
"""Background thread to monitor cancellation and update UI if needed"""
logger.info("Starting cancellation monitor thread")
while is_conversion_in_progress():
if conversion_cancelled.is_set():
logger.info("Cancellation detected by monitor thread")
time.sleep(0.1) # Check every 100ms
logger.info("Cancellation monitor thread ending")
def validate_file_for_parser(file_path, parser_name):
"""Validate if the file type is supported by the selected parser."""
if not file_path:
return True, "" # No file selected yet
if "GOT-OCR" in parser_name:
file_ext = Path(file_path).suffix.lower()
if file_ext not in ['.jpg', '.jpeg', '.png']:
return False, "GOT-OCR only supports JPG and PNG formats."
return True, ""
def format_markdown_content(content):
if not content:
return content
# Convert the content to HTML using markdown library
html_content = markdown.markdown(str(content), extensions=['tables'])
return html_content
# Function to run conversion in a separate thread
def run_conversion_thread(file_path, parser_name, ocr_method_name, output_format):
"""Run the conversion in a separate thread and return the thread object"""
global conversion_cancelled
# Reset the cancellation flag
conversion_cancelled.clear()
# Create a container for the results
results = {"content": None, "download_file": None, "error": None}
def conversion_worker():
try:
content, download_file = convert_file(file_path, parser_name, ocr_method_name, output_format)
results["content"] = content
results["download_file"] = download_file
except Exception as e:
logger.error(f"Error during conversion: {str(e)}")
results["error"] = str(e)
# Create and start the thread
thread = threading.Thread(target=conversion_worker)
thread.daemon = True
thread.start()
return thread, results
def handle_convert(file_path, parser_name, ocr_method_name, output_format, is_cancelled):
"""Handle file conversion."""
global conversion_cancelled
# Check if we should cancel before starting
if is_cancelled:
logger.info("Conversion cancelled before starting")
return "Conversion cancelled.", None, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
# Validate file type for the selected parser
is_valid, error_msg = validate_file_for_parser(file_path, parser_name)
if not is_valid:
logger.error(f"File validation error: {error_msg}")
return f"Error: {error_msg}", None, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
logger.info("Starting conversion with cancellation flag cleared")
# Start the conversion in a separate thread
thread, results = run_conversion_thread(file_path, parser_name, ocr_method_name, output_format)
# Start the monitoring thread
monitor_thread = threading.Thread(target=monitor_cancellation)
monitor_thread.daemon = True
monitor_thread.start()
# Wait for the thread to complete or be cancelled
while thread.is_alive():
# Check if cancellation was requested
if conversion_cancelled.is_set():
logger.info("Cancellation detected, waiting for thread to finish")
# Give the thread a chance to clean up
thread.join(timeout=0.5)
if thread.is_alive():
logger.warning("Thread did not finish within timeout")
return "Conversion cancelled.", None, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
# Sleep briefly to avoid busy waiting
time.sleep(0.1)
# Thread has completed, check results
if results["error"]:
return f"Error: {results['error']}", None, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
content = results["content"]
download_file = results["download_file"]
# If conversion returned a cancellation message
if content == "Conversion cancelled.":
logger.info("Converter returned cancellation message")
return content, None, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
# Format the content and wrap it in the scrollable container
formatted_content = format_markdown_content(str(content))
html_output = f"<div class='output-container'>{formatted_content}</div>"
logger.info("Conversion completed successfully")
return html_output, download_file, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
def create_ui():
with gr.Blocks(css="""
/* Simple output container with only one scrollbar */
.output-container {
max-height: 420px;
overflow-y: auto;
border: 1px solid #ddd;
padding: 10px;
}
/* Hide any scrollbars from parent containers */
.gradio-container .prose {
overflow: visible;
}
.processing-controls {
display: flex;
justify-content: center;
gap: 10px;
margin-top: 10px;
}
/* Add margin above the provider/OCR options row */
.provider-options-row {
margin-top: 15px;
margin-bottom: 15px;
}
""") as demo:
# Simple title - no fancy HTML or CSS
gr.Markdown("## Markit: Document to Markdown Converter")
# State to track if cancellation is requested
cancel_requested = gr.State(False)
# State to store the conversion thread
conversion_thread = gr.State(None)
# State to store the output format (fixed to Markdown)
output_format_state = gr.State("Markdown")
# File input first
file_input = gr.File(label="Upload Document", type="filepath")
# Provider and OCR options below the file input
with gr.Row(elem_classes=["provider-options-row"]):
with gr.Column(scale=1):
parser_names = ParserRegistry.get_parser_names()
# Make MarkItDown the default parser if available
default_parser = next((p for p in parser_names if p == "MarkItDown"), parser_names[0] if parser_names else "PyPdfium")
provider_dropdown = gr.Dropdown(
label="Provider",
choices=parser_names,
value=default_parser,
interactive=True
)
with gr.Column(scale=1):
default_ocr_options = ParserRegistry.get_ocr_options(default_parser)
default_ocr = default_ocr_options[0] if default_ocr_options else "No OCR"
ocr_dropdown = gr.Dropdown(
label="OCR Options",
choices=default_ocr_options,
value=default_ocr,
interactive=True
)
# Simple output container with just one scrollbar
file_display = gr.HTML(
value="<div class='output-container'></div>",
label="Converted Content"
)
file_download = gr.File(label="Download File")
# Processing controls row
with gr.Row(elem_classes=["processing-controls"]):
convert_button = gr.Button("Convert", variant="primary")
cancel_button = gr.Button("Cancel", variant="stop", visible=False)
# Event handlers
provider_dropdown.change(
lambda p: gr.Dropdown(
choices=["Plain Text", "Formatted Text"] if "GOT-OCR" in p else ParserRegistry.get_ocr_options(p),
value="Plain Text" if "GOT-OCR" in p else (ParserRegistry.get_ocr_options(p)[0] if ParserRegistry.get_ocr_options(p) else None)
),
inputs=[provider_dropdown],
outputs=[ocr_dropdown]
)
# Reset cancel flag when starting conversion
def start_conversion():
global conversion_cancelled
conversion_cancelled.clear()
logger.info("Starting conversion with cancellation flag cleared")
return gr.update(visible=False), gr.update(visible=True), False
# Set cancel flag and terminate thread when cancel button is clicked
def request_cancellation(thread):
global conversion_cancelled
conversion_cancelled.set()
logger.info("Cancel button clicked, cancellation flag set")
# Try to join the thread with a timeout
if thread is not None:
logger.info(f"Attempting to join conversion thread: {thread}")
thread.join(timeout=0.5)
if thread.is_alive():
logger.warning("Thread did not finish within timeout")
# Add immediate feedback to the user
return gr.update(visible=True), gr.update(visible=False), True, None
# Start conversion sequence
convert_button.click(
fn=start_conversion,
inputs=[],
outputs=[convert_button, cancel_button, cancel_requested],
queue=False # Execute immediately
).then(
fn=handle_convert,
inputs=[file_input, provider_dropdown, ocr_dropdown, output_format_state, cancel_requested],
outputs=[file_display, file_download, convert_button, cancel_button, conversion_thread]
)
# Handle cancel button click
cancel_button.click(
fn=request_cancellation,
inputs=[conversion_thread],
outputs=[convert_button, cancel_button, cancel_requested, conversion_thread],
queue=False # Execute immediately
)
return demo
def launch_ui(server_name="0.0.0.0", server_port=7860, share=False):
demo = create_ui()
demo.launch(
server_name=server_name,
server_port=server_port,
root_path="",
show_error=True,
share=share
)