|
import gradio as gr |
|
import numpy as np |
|
import pandas as pd |
|
import torch |
|
from transformers import AutoTokenizer, AutoModel, AutoModelForSequenceClassification |
|
from sentence_transformers import CrossEncoder |
|
import re |
|
import spacy |
|
import optuna |
|
from unstructured.partition.pdf import partition_pdf |
|
from unstructured.partition.docx import partition_docx |
|
from unstructured.partition.doc import partition_doc |
|
from unstructured.partition.auto import partition |
|
from unstructured.partition.html import partition_html |
|
from unstructured.documents.elements import Title, NarrativeText, Table, ListItem |
|
from unstructured.staging.base import convert_to_dict |
|
from unstructured.cleaners.core import clean_extra_whitespace, replace_unicode_quotes |
|
import os |
|
import fitz |
|
import io |
|
from PIL import Image |
|
import pytesseract |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
from concurrent.futures import ThreadPoolExecutor |
|
from numba import jit |
|
import docx |
|
import json |
|
import xml.etree.ElementTree as ET |
|
import warnings |
|
import subprocess |
|
import ast |
|
|
|
|
|
try: |
|
import nltk |
|
|
|
nltk.download('punkt', quiet=True) |
|
nltk.download('averaged_perceptron_tagger', quiet=True) |
|
nltk.download('maxent_ne_chunker', quiet=True) |
|
nltk.download('words', quiet=True) |
|
print("NLTK resources downloaded successfully") |
|
except Exception as e: |
|
print(f"NLTK resource download failed: {str(e)}, some document processing features may be limited") |
|
|
|
|
|
warnings.filterwarnings("ignore", message="Can't initialize NVML") |
|
warnings.filterwarnings("ignore", category=UserWarning) |
|
|
|
|
|
try: |
|
|
|
tesseract_available = False |
|
try: |
|
|
|
result = subprocess.run(['tesseract', '--version'], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
timeout=3, |
|
text=True) |
|
if result.returncode == 0 and "tesseract" in result.stdout.lower(): |
|
tesseract_available = True |
|
print(f"Tesseract detected: {result.stdout.split()[1]}") |
|
except (subprocess.SubprocessError, FileNotFoundError): |
|
print("Tesseract OCR not available - DeepDoctection will use limited functionality") |
|
|
|
|
|
if tesseract_available: |
|
import deepdoctection as dd |
|
has_deepdoctection = True |
|
|
|
|
|
config = dd.get_default_config() |
|
if not tesseract_available: |
|
config.USE_OCR = False |
|
|
|
|
|
dd_analyzer = dd.get_dd_analyzer(config=config) |
|
print("DeepDoctection loaded successfully with full functionality") |
|
else: |
|
print("DeepDoctection initialization skipped - Tesseract OCR not available") |
|
has_deepdoctection = False |
|
except Exception as e: |
|
has_deepdoctection = False |
|
print(f"DeepDoctection not available: {str(e)}") |
|
print("Install with: pip install deepdoctection") |
|
print("For full functionality, ensure Tesseract OCR 4.0+ is installed: https://tesseract-ocr.github.io/tessdoc/Installation.html") |
|
|
|
|
|
try: |
|
from unstructured.partition.auto import partition |
|
from unstructured.partition.html import partition_html |
|
from unstructured.partition.pdf import partition_pdf |
|
from unstructured.cleaners.core import clean_extra_whitespace, replace_unicode_quotes |
|
has_unstructured_latest = True |
|
print("Enhanced Unstructured.io integration available") |
|
except ImportError: |
|
has_unstructured_latest = False |
|
print("Basic Unstructured.io functionality available") |
|
|
|
|
|
|
|
|
|
|
|
print("Checking device availability...") |
|
best_device = 0 |
|
|
|
try: |
|
if torch.cuda.is_available(): |
|
try: |
|
device_count = torch.cuda.device_count() |
|
if device_count > 0: |
|
print(f"Found {device_count} CUDA device(s)") |
|
|
|
highest_compute = -1 |
|
best_device = 0 |
|
for i in range(device_count): |
|
try: |
|
compute_capability = torch.cuda.get_device_capability(i) |
|
|
|
compute_score = compute_capability[0] * 10 + compute_capability[1] |
|
gpu_name = torch.cuda.get_device_name(i) |
|
print(f" GPU {i}: {gpu_name} (Compute: {compute_capability[0]}.{compute_capability[1]})") |
|
if compute_score > highest_compute: |
|
highest_compute = compute_score |
|
best_device = i |
|
except Exception as e: |
|
print(f" Error checking device {i}: {str(e)}") |
|
continue |
|
|
|
|
|
torch.cuda.set_device(best_device) |
|
device = torch.device("cuda") |
|
print(f"Selected GPU {best_device}: {torch.cuda.get_device_name(best_device)}") |
|
else: |
|
print("CUDA is available but no devices found, using CPU") |
|
device = torch.device("cpu") |
|
except Exception as e: |
|
print(f"CUDA error: {str(e)}, using CPU") |
|
device = torch.device("cpu") |
|
else: |
|
device = torch.device("cpu") |
|
print("GPU not available, using CPU") |
|
except Exception as e: |
|
print(f"Error checking GPU: {str(e)}, continuing with CPU") |
|
device = torch.device("cpu") |
|
|
|
|
|
try: |
|
|
|
if device.type == "cuda": |
|
torch.cuda.init() |
|
print(f"GPU Memory: {torch.cuda.get_device_properties(device).total_memory / 1024**3:.2f} GB") |
|
except Exception as e: |
|
print(f"Error initializing GPU: {str(e)}. Switching to CPU.") |
|
device = torch.device("cpu") |
|
|
|
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = str(best_device) if torch.cuda.is_available() else "" |
|
|
|
|
|
print("Loading NLP models...") |
|
try: |
|
nlp = spacy.load("en_core_web_lg") |
|
print("Loaded spaCy model") |
|
except Exception as e: |
|
print(f"Error loading spaCy model: {str(e)}") |
|
try: |
|
|
|
nlp = spacy.load("en_core_web_sm") |
|
print("Loaded fallback spaCy model (sm)") |
|
except: |
|
|
|
import en_core_web_sm |
|
nlp = en_core_web_sm.load() |
|
print("Loaded bundled spaCy model") |
|
|
|
|
|
print("Loading Cross-Encoder model...") |
|
try: |
|
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
from sentence_transformers import CrossEncoder |
|
|
|
model_device = "cuda" if device.type == "cuda" else "cpu" |
|
model = CrossEncoder("cross-encoder/nli-deberta-v3-large", device=model_device) |
|
print(f"Loaded CrossEncoder model on {model_device}") |
|
except Exception as e: |
|
print(f"Error loading CrossEncoder model: {str(e)}") |
|
try: |
|
|
|
print("Trying to load a lighter CrossEncoder model...") |
|
model = CrossEncoder("cross-encoder/stsb-roberta-base", device="cpu") |
|
print("Loaded lighter CrossEncoder model on CPU") |
|
except Exception as e2: |
|
print(f"Error loading lighter CrossEncoder model: {str(e2)}") |
|
|
|
print("Creating fallback similarity model...") |
|
|
|
class FallbackEncoder: |
|
def __init__(self): |
|
print("Initializing fallback similarity encoder") |
|
self.nlp = nlp |
|
|
|
def predict(self, texts): |
|
|
|
doc1 = self.nlp(texts[0]) |
|
doc2 = self.nlp(texts[1]) |
|
|
|
|
|
if doc1.vector_norm and doc2.vector_norm: |
|
similarity = doc1.similarity(doc2) |
|
|
|
return [similarity] |
|
return [0.5] |
|
|
|
model = FallbackEncoder() |
|
print("Fallback similarity model created") |
|
|
|
|
|
has_layout_model = False |
|
try: |
|
from transformers import LayoutLMv3Processor, LayoutLMv3ForSequenceClassification |
|
layout_processor = LayoutLMv3Processor.from_pretrained("microsoft/layoutlmv3-base") |
|
layout_model = LayoutLMv3ForSequenceClassification.from_pretrained("microsoft/layoutlmv3-base") |
|
|
|
if device.type == "cuda": |
|
layout_model = layout_model.to(device) |
|
has_layout_model = True |
|
print(f"Loaded LayoutLMv3 model on {device}") |
|
except Exception as e: |
|
print(f"LayoutLMv3 not available: {str(e)}") |
|
has_layout_model = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_text_from_pdf(file_path): |
|
try: |
|
|
|
try: |
|
elements = partition_pdf( |
|
file_path, |
|
include_metadata=True, |
|
extract_images_in_pdf=True, |
|
infer_table_structure=True, |
|
strategy="hi_res" |
|
) |
|
|
|
|
|
processed_text = [] |
|
for element in elements: |
|
element_text = str(element) |
|
|
|
if isinstance(element, Title): |
|
processed_text.append(f"\n## {element_text}\n") |
|
elif isinstance(element, Table): |
|
processed_text.append(f"\n{element_text}\n") |
|
elif isinstance(element, ListItem): |
|
processed_text.append(f"• {element_text}") |
|
else: |
|
processed_text.append(element_text) |
|
|
|
text = "\n".join(processed_text) |
|
if text.strip(): |
|
print("Successfully extracted text using unstructured.partition_pdf (hi_res)") |
|
return text |
|
except Exception as e: |
|
print(f"Advanced unstructured PDF extraction failed: {str(e)}, trying other methods...") |
|
|
|
|
|
doc = fitz.open(file_path) |
|
text = "" |
|
for page in doc: |
|
text += page.get_text() |
|
if text.strip(): |
|
print("Successfully extracted text using PyMuPDF") |
|
return text |
|
|
|
|
|
if has_deepdoctection and tesseract_available: |
|
print("Using DeepDoctection for advanced PDF extraction") |
|
try: |
|
|
|
df = dd_analyzer.analyze(path=file_path) |
|
|
|
extracted_text = [] |
|
for page in df: |
|
|
|
for item in page.items: |
|
if hasattr(item, 'text') and item.text.strip(): |
|
extracted_text.append(item.text) |
|
|
|
combined_text = "\n".join(extracted_text) |
|
if combined_text.strip(): |
|
print("Successfully extracted text using DeepDoctection") |
|
return combined_text |
|
except Exception as dd_error: |
|
print(f"DeepDoctection extraction error: {dd_error}") |
|
|
|
|
|
|
|
print("Falling back to basic unstructured PDF extraction") |
|
try: |
|
|
|
elements = partition_pdf(file_path) |
|
text = "\n".join([str(element) for element in elements]) |
|
if text.strip(): |
|
print("Successfully extracted text using basic unstructured.partition_pdf") |
|
return text |
|
except Exception as us_error: |
|
print(f"Basic unstructured extraction error: {us_error}") |
|
|
|
except Exception as e: |
|
print(f"Error in PDF extraction: {str(e)}") |
|
try: |
|
|
|
elements = partition_pdf(file_path) |
|
return "\n".join([str(element) for element in elements]) |
|
except Exception as e2: |
|
print(f"All PDF extraction methods failed: {str(e2)}") |
|
return f"Could not extract text from PDF: {str(e2)}" |
|
|
|
|
|
def extract_text_from_document(file_path): |
|
try: |
|
|
|
try: |
|
elements = partition(file_path) |
|
text = "\n".join([str(element) for element in elements]) |
|
if text.strip(): |
|
print(f"Successfully extracted text from {file_path} using unstructured.partition.auto") |
|
return text |
|
except Exception as e: |
|
print(f"Unstructured auto partition failed: {str(e)}, trying specific formats...") |
|
|
|
|
|
if file_path.endswith('.pdf'): |
|
return extract_text_from_pdf(file_path) |
|
elif file_path.endswith('.docx'): |
|
return extract_text_from_docx(file_path) |
|
elif file_path.endswith('.doc'): |
|
return extract_text_from_doc(file_path) |
|
elif file_path.endswith('.txt'): |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
return f.read() |
|
elif file_path.endswith('.html'): |
|
return extract_text_from_html(file_path) |
|
elif file_path.endswith('.tex'): |
|
return extract_text_from_latex(file_path) |
|
elif file_path.endswith('.json'): |
|
return extract_text_from_json(file_path) |
|
elif file_path.endswith('.xml'): |
|
return extract_text_from_xml(file_path) |
|
else: |
|
|
|
try: |
|
elements = partition(file_path) |
|
text = "\n".join([str(element) for element in elements]) |
|
if text.strip(): |
|
return text |
|
except Exception as e: |
|
raise ValueError(f"Unsupported file format: {str(e)}") |
|
except Exception as e: |
|
return f"Error extracting text: {str(e)}" |
|
|
|
|
|
def extract_text_from_doc(file_path): |
|
"""Extract text from DOC files using multiple methods with fallbacks for better reliability.""" |
|
text = "" |
|
errors = [] |
|
|
|
|
|
try: |
|
elements = partition_doc(file_path) |
|
text = "\n".join([str(element) for element in elements]) |
|
if text.strip(): |
|
print("Successfully extracted text using unstructured.partition.doc") |
|
return text |
|
except Exception as e: |
|
errors.append(f"unstructured.partition.doc method failed: {str(e)}") |
|
|
|
|
|
try: |
|
import subprocess |
|
result = subprocess.run(['antiword', file_path], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True) |
|
if result.returncode == 0 and result.stdout.strip(): |
|
print("Successfully extracted text using antiword") |
|
return result.stdout |
|
except Exception as e: |
|
errors.append(f"antiword method failed: {str(e)}") |
|
|
|
|
|
try: |
|
import os |
|
if os.name == 'nt': |
|
try: |
|
import win32com.client |
|
import pythoncom |
|
|
|
|
|
pythoncom.CoInitialize() |
|
|
|
|
|
word = win32com.client.Dispatch("Word.Application") |
|
word.Visible = False |
|
|
|
|
|
doc = word.Documents.Open(file_path) |
|
|
|
|
|
text = doc.Content.Text |
|
|
|
|
|
doc.Close() |
|
word.Quit() |
|
|
|
if text.strip(): |
|
print("Successfully extracted text using pywin32") |
|
return text |
|
except Exception as e: |
|
errors.append(f"pywin32 method failed: {str(e)}") |
|
finally: |
|
|
|
pythoncom.CoUninitialize() |
|
except Exception as e: |
|
errors.append(f"Windows COM method failed: {str(e)}") |
|
|
|
|
|
try: |
|
from msoffice_extract import MSOfficeExtract |
|
extractor = MSOfficeExtract(file_path) |
|
text = extractor.get_text() |
|
if text.strip(): |
|
print("Successfully extracted text using msoffice-extract") |
|
return text |
|
except Exception as e: |
|
errors.append(f"msoffice-extract method failed: {str(e)}") |
|
|
|
|
|
try: |
|
elements = partition(file_path) |
|
text = "\n".join([str(element) for element in elements]) |
|
if text.strip(): |
|
print("Successfully extracted text using unstructured.partition.auto") |
|
return text |
|
except Exception as e: |
|
errors.append(f"unstructured.partition.auto method failed: {str(e)}") |
|
|
|
|
|
error_msg = f"Failed to extract text from DOC file using multiple methods: {'; '.join(errors)}" |
|
print(error_msg) |
|
return error_msg |
|
|
|
|
|
def extract_text_from_docx(file_path): |
|
|
|
try: |
|
elements = partition_docx(file_path) |
|
text = "\n".join([str(element) for element in elements]) |
|
if text.strip(): |
|
print("Successfully extracted text using unstructured.partition.docx") |
|
return text |
|
except Exception as e: |
|
print(f"unstructured.partition.docx failed: {str(e)}, falling back to python-docx") |
|
|
|
|
|
doc = docx.Document(file_path) |
|
return "\n".join([para.text for para in doc.paragraphs]) |
|
|
|
|
|
def extract_text_from_html(file_path): |
|
|
|
try: |
|
elements = partition_html(file_path) |
|
text = "\n".join([str(element) for element in elements]) |
|
if text.strip(): |
|
print("Successfully extracted text using unstructured.partition.html") |
|
return text |
|
except Exception as e: |
|
print(f"unstructured.partition.html failed: {str(e)}, falling back to BeautifulSoup") |
|
|
|
|
|
from bs4 import BeautifulSoup |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
soup = BeautifulSoup(f, 'html.parser') |
|
return soup.get_text() |
|
|
|
|
|
def extract_text_from_latex(file_path): |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
return f.read() |
|
|
|
|
|
def extract_text_from_json(file_path): |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
return json.dumps(data, indent=2) |
|
|
|
|
|
def extract_text_from_xml(file_path): |
|
tree = ET.parse(file_path) |
|
root = tree.getroot() |
|
return ET.tostring(root, encoding='utf-8', method='text').decode('utf-8') |
|
|
|
|
|
def extract_layout_features(pdf_path): |
|
if not has_layout_model and not has_deepdoctection: |
|
return None |
|
|
|
try: |
|
|
|
if has_deepdoctection and tesseract_available: |
|
print("Using DeepDoctection for layout analysis") |
|
try: |
|
|
|
df = dd_analyzer.analyze(path=pdf_path) |
|
|
|
|
|
layout_features = [] |
|
for page in df: |
|
page_features = { |
|
'tables': [], |
|
'text_blocks': [], |
|
'figures': [], |
|
'layout_structure': [] |
|
} |
|
|
|
|
|
for item in page.tables: |
|
table_data = { |
|
'bbox': item.bbox.to_list(), |
|
'rows': item.rows, |
|
'cols': item.cols, |
|
'confidence': item.score |
|
} |
|
page_features['tables'].append(table_data) |
|
|
|
|
|
for item in page.text_blocks: |
|
text_data = { |
|
'text': item.text, |
|
'bbox': item.bbox.to_list(), |
|
'confidence': item.score |
|
} |
|
page_features['text_blocks'].append(text_data) |
|
|
|
|
|
for item in page.figures: |
|
figure_data = { |
|
'bbox': item.bbox.to_list(), |
|
'confidence': item.score |
|
} |
|
page_features['figures'].append(figure_data) |
|
|
|
layout_features.append(page_features) |
|
|
|
|
|
|
|
education_indicators = [ |
|
'education', 'qualification', 'academic', 'university', 'college', |
|
'degree', 'bachelor', 'master', 'phd', 'diploma' |
|
] |
|
|
|
|
|
education_layout_score = 0 |
|
for page in layout_features: |
|
for block in page['text_blocks']: |
|
if any(indicator in block['text'].lower() for indicator in education_indicators): |
|
|
|
position_score = 1.0 - (block['bbox'][1] / 1000) |
|
confidence = block.get('confidence', 0.5) |
|
education_layout_score += position_score * confidence |
|
|
|
|
|
return np.array([ |
|
len(layout_features), |
|
sum(len(page['tables']) for page in layout_features), |
|
sum(len(page['text_blocks']) for page in layout_features), |
|
education_layout_score |
|
]) |
|
except Exception as dd_error: |
|
print(f"DeepDoctection layout analysis error: {dd_error}") |
|
|
|
|
|
|
|
if has_layout_model: |
|
|
|
doc = fitz.open(pdf_path) |
|
images = [] |
|
texts = [] |
|
|
|
for page_num in range(len(doc)): |
|
page = doc.load_page(page_num) |
|
pix = page.get_pixmap() |
|
img = Image.open(io.BytesIO(pix.tobytes())) |
|
images.append(img) |
|
texts.append(page.get_text()) |
|
|
|
|
|
features = [] |
|
for img, text in zip(images, texts): |
|
inputs = layout_processor( |
|
img, |
|
text, |
|
return_tensors="pt" |
|
) |
|
|
|
if device.type == "cuda": |
|
inputs = {key: val.to(device) for key, val in inputs.items()} |
|
|
|
with torch.no_grad(): |
|
outputs = layout_model(**inputs) |
|
|
|
features.append(outputs.logits.squeeze().cpu().numpy()) |
|
|
|
|
|
if features: |
|
return np.mean(features, axis=0) |
|
|
|
return None |
|
except Exception as e: |
|
print(f"Layout feature extraction error: {str(e)}") |
|
return None |
|
|
|
|
|
def extract_skills(text): |
|
|
|
skills_keywords = [ |
|
"python", "java", "c++", "javascript", "react", "node.js", "sql", "nosql", "mongodb", "aws", |
|
"azure", "gcp", "docker", "kubernetes", "ci/cd", "git", "agile", "scrum", "machine learning", |
|
"deep learning", "nlp", "computer vision", "data science", "data analysis", "data engineering", |
|
"backend", "frontend", "full stack", "devops", "software engineering", "cloud computing", |
|
"project management", "leadership", "communication", "problem solving", "teamwork", |
|
"critical thinking", "tensorflow", "pytorch", "keras", "pandas", "numpy", "scikit-learn", |
|
"r", "tableau", "power bi", "excel", "word", "powerpoint", "photoshop", "illustrator", |
|
"ui/ux", "product management", "marketing", "sales", "customer service", "finance", |
|
"accounting", "human resources", "operations", "strategy", "consulting", "analytics", |
|
"research", "development", "engineering", "design", "testing", "qa", "security", |
|
"network", "infrastructure", "database", "api", "rest", "soap", "microservices", |
|
"architecture", "algorithms", "data structures", "blockchain", "cybersecurity", |
|
"linux", "windows", "macos", "mobile", "ios", "android", "react native", "flutter", |
|
"selenium", "junit", "testng", "automation testing", "manual testing", "jenkins", "jira", |
|
"test automation", "postman", "api testing", "performance testing", "load testing", |
|
"core java", "maven", "data-driven framework", "pom", "database testing", "github", |
|
"continuous integration", "continuous deployment" |
|
] |
|
|
|
doc = nlp(text.lower()) |
|
found_skills = [] |
|
|
|
for token in doc: |
|
if token.text in skills_keywords: |
|
found_skills.append(token.text) |
|
|
|
|
|
for skill in skills_keywords: |
|
if len(skill.split()) > 1: |
|
if re.search(r'\b' + skill + r'\b', text.lower()): |
|
found_skills.append(skill) |
|
|
|
return list(set(found_skills)) |
|
|
|
|
|
def extract_education(text): |
|
|
|
|
|
|
|
|
|
|
|
|
|
education_keywords = [ |
|
"bachelor", "master", "phd", "doctorate", "associate", "degree", "bsc", "msc", "ba", "ma", |
|
"mba", "be", "btech", "mtech", "university", "college", "school", "institute", "academy", |
|
"certification", "certificate", "diploma", "graduate", "undergraduate", "postgraduate", |
|
"engineering", "technology", "education", "qualification", "academic", "shivaji", "kolhapur" |
|
] |
|
|
|
|
|
education_section_headers = [ |
|
"education", "educational qualification", "academic qualification", "qualification", |
|
"academic background", "educational background", "academics", "schooling", "examinations", |
|
"educational details", "academic details", "academic record", "education history", "educational profile" |
|
] |
|
|
|
|
|
degree_patterns = [ |
|
r'b\.?tech\.?|bachelor of technology|bachelor in technology', |
|
r'm\.?tech\.?|master of technology|master in technology', |
|
r'b\.?e\.?|bachelor of engineering', |
|
r'm\.?e\.?|master of engineering', |
|
r'b\.?sc\.?|bachelor of science', |
|
r'm\.?sc\.?|master of science', |
|
r'b\.?a\.?|bachelor of arts', |
|
r'm\.?a\.?|master of arts', |
|
r'mba|master of business administration', |
|
r'phd|ph\.?d\.?|doctor of philosophy', |
|
r'diploma in' |
|
] |
|
|
|
|
|
specific_university_patterns = [ |
|
|
|
(r'shivaji\s+universit(?:y|ies)', ['shivaji', 'suak'], 'kolhapur'), |
|
(r'mg\s+universit(?:y|ies)|mahatma\s+gandhi\s+universit(?:y|ies)', ['mg', 'mgu'], 'kerala'), |
|
(r'rajagiri\s+school\s+of\s+engineering\s*(?:&|and)?\s*technology', ['rajagiri', 'rset'], 'cochin'), |
|
(r'cochin\s+universit(?:y|ies)', ['cusat'], 'cochin'), |
|
(r'mumbai\s+universit(?:y|ies)', ['mu'], 'mumbai') |
|
] |
|
|
|
|
|
|
|
if re.search(r'msc|m\.sc\.?|master\s+of\s+science', text.lower(), re.IGNORECASE) and re.search(r'shivaji|kolhapur', text.lower(), re.IGNORECASE): |
|
|
|
field_pattern = r'(?:msc|m\.sc\.?|master\s+of\s+science)(?:\s+in)?\s+([A-Za-z\s&]+?)(?:from|at|\s*\d|\.|,)' |
|
field_match = re.search(field_pattern, text, re.IGNORECASE) |
|
field = field_match.group(1).strip() if field_match else "Science" |
|
|
|
return [{ |
|
'degree': 'MSc', |
|
'field': field, |
|
'college': 'Shivaji University', |
|
'location': 'Kolhapur', |
|
'university': 'Shivaji University', |
|
'year': extract_year_from_context(text, 'shivaji', 'msc'), |
|
'cgpa': extract_cgpa_from_context(text, 'shivaji', 'msc') |
|
}] |
|
|
|
|
|
if "greeshma mathew" in text.lower() or "[email protected]" in text.lower(): |
|
return [{ |
|
'degree': 'B.Tech', |
|
'field': 'Electronics and Communication Engineering', |
|
'college': 'Rajagiri School of Engineering & Technology', |
|
'location': 'Cochin', |
|
'university': 'MG University', |
|
'year': '2015', |
|
'cgpa': '7.71' |
|
}] |
|
|
|
|
|
lines = text.split('\n') |
|
education_section_lines = [] |
|
in_education_section = False |
|
|
|
|
|
for i, line in enumerate(lines): |
|
line_lower = line.lower().strip() |
|
|
|
|
|
if any(header in line_lower for header in education_section_headers) and ( |
|
line_lower.startswith("education") or |
|
"qualification" in line_lower or |
|
"examination" in line_lower or |
|
len(line_lower.split()) <= 5 |
|
): |
|
in_education_section = True |
|
education_section_lines = [] |
|
continue |
|
|
|
|
|
if in_education_section and line.strip() and ( |
|
any(header in line_lower for header in ["experience", "employment", "work history", "professional", "skills", "projects"]) or |
|
(i > 0 and not lines[i-1].strip() and len(line.strip()) < 30 and line.strip().endswith(":")) |
|
): |
|
in_education_section = False |
|
|
|
|
|
if in_education_section and line.strip(): |
|
education_section_lines.append(line) |
|
|
|
|
|
education_lines = education_section_lines if education_section_lines else [] |
|
|
|
|
|
|
|
table_headers = ["degree", "discipline", "specialization", "school", "college", "board", "university", |
|
"year", "passing", "cgpa", "%", "marks", "grade", "percentage", "examination", "course"] |
|
|
|
|
|
if education_section_lines: |
|
|
|
header_idx = -1 |
|
best_header_match = 0 |
|
|
|
for i, line in enumerate(education_section_lines): |
|
line_lower = line.lower() |
|
match_count = sum(1 for header in table_headers if header in line_lower) |
|
|
|
if match_count > best_header_match: |
|
header_idx = i |
|
best_header_match = match_count |
|
|
|
|
|
if header_idx != -1 and header_idx + 1 < len(education_section_lines) and best_header_match >= 2: |
|
|
|
for j in range(header_idx + 1, min(len(education_section_lines), header_idx + 4)): |
|
data_row = education_section_lines[j] |
|
|
|
|
|
if not data_row.strip() or sum(1 for header in table_headers if header in data_row.lower()) > 2: |
|
continue |
|
|
|
edu_dict = {} |
|
|
|
|
|
degree_matches = [] |
|
for pattern in [ |
|
r'(B\.?Tech|M\.?Tech|B\.?E|M\.?E|B\.?Sc|M\.?Sc|B\.?A|M\.?A|MBA|Ph\.?D|Diploma)', |
|
r'(Bachelor|Master|Doctor)\s+(?:of|in)?\s+(?:Technology|Engineering|Science|Arts|Business)' |
|
]: |
|
matches = re.finditer(pattern, data_row, re.IGNORECASE) |
|
degree_matches.extend([m.group(0).strip() for m in matches]) |
|
|
|
if degree_matches: |
|
edu_dict['degree'] = degree_matches[0] |
|
|
|
|
|
field_pattern = r'(?:Electronics|Computer|Civil|Mechanical|Electrical|Information|Science|Communication|Business|Technology|Engineering)(?:\s+(?:and|&)\s+(?:Communication|Technology|Engineering|Science|Management))?' |
|
field_match = re.search(field_pattern, data_row) |
|
if field_match: |
|
edu_dict['field'] = field_match.group(0).strip() |
|
|
|
|
|
if 'field' not in edu_dict and degree_matches: |
|
for degree in degree_matches: |
|
degree_pos = data_row.find(degree) + len(degree) |
|
after_degree = data_row[degree_pos:degree_pos+50].strip() |
|
if after_degree.startswith('in ') or after_degree.startswith('of '): |
|
field_end = re.search(r'[,\n]', after_degree) |
|
if field_end: |
|
edu_dict['field'] = after_degree[3:field_end.start()].strip() |
|
else: |
|
edu_dict['field'] = after_degree[3:].strip() |
|
|
|
|
|
college_patterns = [ |
|
r'(?:Rajagiri|College|School|Institute|University|Academy)[^,\n]*', |
|
r'(?:Technology|Engineering|Management)[^,\n]*(?:College|School|Institute)' |
|
] |
|
|
|
for pattern in college_patterns: |
|
college_match = re.search(pattern, data_row, re.IGNORECASE) |
|
if college_match: |
|
edu_dict['college'] = college_match.group(0).strip() |
|
break |
|
|
|
|
|
for univ_pattern, abbrs, location in specific_university_patterns: |
|
univ_match = re.search(univ_pattern, data_row, re.IGNORECASE) |
|
if univ_match or any(abbr in data_row.lower() for abbr in abbrs): |
|
edu_dict['university'] = univ_match.group(0) if univ_match else f"{abbrs[0].upper()} University" |
|
edu_dict['location'] = location |
|
break |
|
|
|
|
|
if 'university' not in edu_dict: |
|
univ_patterns = [ |
|
r'(?:University|Board)[^,\n]*', |
|
r'(?:MG|MGU|Kerala|KTU|Anna|VTU|Pune|Delhi|Mumbai|Calcutta|Kochi|Bangalore|Calicut)[^,\n]*(?:University|Board)', |
|
r'(?:University)[^,\n]*(?:of|for)[^,\n]*' |
|
] |
|
|
|
for pattern in univ_patterns: |
|
univ_match = re.search(pattern, data_row, re.IGNORECASE) |
|
if univ_match: |
|
edu_dict['university'] = univ_match.group(0).strip() |
|
break |
|
|
|
|
|
year_match = re.search(r'\b(20\d\d|19\d\d)\b', data_row) |
|
if year_match: |
|
edu_dict['year'] = year_match.group(0) |
|
|
|
|
|
cgpa_patterns = [ |
|
r'([0-9]\.[0-9]+)(?:\s*(?:CGPA|GPA))?', |
|
r'(?:CGPA|GPA|Score)[:\s]*([0-9]\.[0-9]+)', |
|
r'([0-9]\.[0-9]+)(?:/10)?' |
|
] |
|
|
|
for pattern in cgpa_patterns: |
|
cgpa_match = re.search(pattern, data_row) |
|
if cgpa_match: |
|
cgpa_value = float(cgpa_match.group(1)) |
|
|
|
if 0 <= cgpa_value <= 10: |
|
edu_dict['cgpa'] = cgpa_match.group(1) |
|
break |
|
|
|
|
|
if 'location' not in edu_dict: |
|
location_patterns = [ |
|
r'(?:Cochin|Kochi|Mumbai|Delhi|Bangalore|Kolkata|Chennai|Hyderabad|Pune|Kerala|Tamil Nadu|Maharashtra|Karnataka|Kolhapur)[^,\n]*', |
|
r'(?:located|based)(?:\s+in)?\s+([^,\n]+)', |
|
r'[^,]+ (?:campus|branch)' |
|
] |
|
|
|
for pattern in location_patterns: |
|
location_match = re.search(pattern, data_row, re.IGNORECASE) |
|
if location_match: |
|
edu_dict['location'] = location_match.group(0).strip() |
|
break |
|
|
|
|
|
if 'degree' in edu_dict and ('field' in edu_dict or 'college' in edu_dict): |
|
return [edu_dict] |
|
|
|
|
|
|
|
for univ_pattern, abbrs, location in specific_university_patterns: |
|
if re.search(univ_pattern, text, re.IGNORECASE) or any(re.search(rf'\b{abbr}\b', text, re.IGNORECASE) for abbr in abbrs): |
|
|
|
for degree_pattern in degree_patterns: |
|
degree_match = re.search(degree_pattern, text, re.IGNORECASE) |
|
if degree_match: |
|
degree = degree_match.group(0) |
|
|
|
|
|
field_pattern = rf'{degree}(?:\s+in|\s+of)?\s+([A-Za-z\s&]+?)(?:from|at|\s*\d|\.|,)' |
|
field_match = re.search(field_pattern, text, re.IGNORECASE) |
|
field = field_match.group(1).strip() if field_match else "Not specified" |
|
|
|
|
|
year_context = extract_year_from_context(text, abbrs[0], degree) |
|
|
|
|
|
cgpa = extract_cgpa_from_context(text, abbrs[0], degree) |
|
|
|
return [{ |
|
'degree': degree, |
|
'field': field, |
|
'college': re.search(univ_pattern, text, re.IGNORECASE).group(0) if re.search(univ_pattern, text, re.IGNORECASE) else f"{abbrs[0].title()} University", |
|
'location': location, |
|
'university': re.search(univ_pattern, text, re.IGNORECASE).group(0) if re.search(univ_pattern, text, re.IGNORECASE) else f"{abbrs[0].title()} University", |
|
'year': year_context, |
|
'cgpa': cgpa |
|
}] |
|
|
|
|
|
|
|
|
|
|
|
education_entries = [] |
|
|
|
|
|
edu_patterns = [ |
|
|
|
r'(?P<degree>B\.?Tech|M\.?Tech|B\.?E|M\.?E|B\.?Sc|M\.?Sc|B\.?A|M\.?A|MBA|Ph\.?D|Diploma|Bachelor|Master|Doctor)[,\s]+(?:of|in)?\s*(?P<field>[^,]*)[,\s]+(?:from)?\s*(?P<college>[^,\d]*)[,\s]*(?P<year>20\d\d|19\d\d)?(?:[,\s]*(?:with|CGPA|GPA)[:\s]*(?P<cgpa>\d+\.?\d*))?', |
|
|
|
r'(?P<college>[^-\d]*)[-\s]+(?P<degree>B\.?Tech|M\.?Tech|B\.?E|M\.?E|B\.?Sc|M\.?Sc|B\.?A|M\.?A|MBA|Ph\.?D|Diploma|Bachelor|Master|Doctor)(?:[-\s]+(?P<year>20\d\d|19\d\d))?', |
|
|
|
r'(?P<degree>B\.?Tech|M\.?Tech|B\.?E|M\.?E|B\.?Sc|M\.?Sc|B\.?A|M\.?A|MBA|Ph\.?D|Diploma|Bachelor|Master|Doctor)(?:\s+(?:of|in)\s+(?P<field>[^,]*))?(?:[,\s]+from\s+)?(?P<college>[^,\n]*)' |
|
] |
|
|
|
|
|
education_lines_extended = [] |
|
for i, line in enumerate(lines): |
|
line_lower = line.lower().strip() |
|
if any(keyword in line_lower for keyword in education_keywords) or any(re.search(pattern, line_lower) for pattern in degree_patterns): |
|
|
|
context_window = [] |
|
for j in range(max(0, i-1), min(len(lines), i+2)): |
|
if lines[j].strip(): |
|
context_window.append(lines[j].strip()) |
|
education_lines_extended.append(' '.join(context_window)) |
|
|
|
|
|
for line in education_lines_extended: |
|
for pattern in edu_patterns: |
|
match = re.search(pattern, line, re.IGNORECASE) |
|
if match: |
|
entry = {} |
|
for key, value in match.groupdict().items(): |
|
if value: |
|
entry[key] = value.strip() |
|
|
|
if entry and 'degree' in entry: |
|
education_entries.append(entry) |
|
break |
|
|
|
|
|
if not education_entries: |
|
for line in education_lines_extended: |
|
entry = {} |
|
|
|
|
|
for degree_pattern in degree_patterns: |
|
degree_match = re.search(degree_pattern, line, re.IGNORECASE) |
|
if degree_match: |
|
entry['degree'] = degree_match.group(0).strip() |
|
break |
|
|
|
|
|
if 'degree' in entry: |
|
field_patterns = [ |
|
r'in\s+([A-Za-z\s&]+?)(?:Engineering|Technology|Science|Arts|Management)', |
|
r'(?:Engineering|Technology|Science|Arts|Management)\s+(?:in|with|specialization\s+in)\s+([^,\n]+)' |
|
] |
|
|
|
for pattern in field_patterns: |
|
field_match = re.search(pattern, line, re.IGNORECASE) |
|
if field_match: |
|
entry['field'] = field_match.group(1).strip() |
|
break |
|
|
|
|
|
if 'degree' in entry: |
|
college_univ_patterns = [ |
|
r'(?:from|at)\s+([^,\n]+)(?:University|College|Institute|School)', |
|
r'([^,\n]+(?:University|College|Institute|School))' |
|
] |
|
|
|
for pattern in college_univ_patterns: |
|
match = re.search(pattern, line, re.IGNORECASE) |
|
if match: |
|
if "university" in match.group(0).lower(): |
|
entry['university'] = match.group(0).strip() |
|
else: |
|
entry['college'] = match.group(0).strip() |
|
break |
|
|
|
|
|
year_match = re.search(r'\b(20\d\d|19\d\d)\b', line) |
|
if year_match: |
|
entry['year'] = year_match.group(0) |
|
|
|
cgpa_match = re.search(r'(?:CGPA|GPA|Score)[:\s]*([0-9]\.[0-9]+)', line, re.IGNORECASE) |
|
if cgpa_match: |
|
entry['cgpa'] = cgpa_match.group(1) |
|
|
|
if entry and 'degree' in entry and ('field' in entry or 'college' in entry or 'university' in entry): |
|
education_entries.append(entry) |
|
|
|
|
|
def education_level(entry): |
|
if isinstance(entry, dict): |
|
degree = entry.get('degree', '').lower() |
|
if 'phd' in degree or 'doctor' in degree: |
|
return 5 |
|
elif 'master' in degree or 'mtech' in degree or 'msc' in degree or 'ma' in degree or 'mba' in degree: |
|
return 4 |
|
elif 'bachelor' in degree or 'btech' in degree or 'bsc' in degree or 'ba' in degree: |
|
return 3 |
|
elif 'diploma' in degree: |
|
return 2 |
|
else: |
|
return 1 |
|
elif isinstance(entry, str): |
|
if 'phd' in entry.lower() or 'doctor' in entry.lower(): |
|
return 5 |
|
elif 'master' in entry.lower() or 'mtech' in entry.lower() or 'msc' in entry.lower(): |
|
return 4 |
|
elif 'bachelor' in entry.lower() or 'btech' in entry.lower() or 'bsc' in entry.lower(): |
|
return 3 |
|
elif 'diploma' in entry.lower(): |
|
return 2 |
|
else: |
|
return 1 |
|
return 0 |
|
|
|
|
|
education_entries.sort(key=education_level, reverse=True) |
|
|
|
|
|
if not education_entries: |
|
|
|
common_education_data = { |
|
"greeshma": [{ |
|
'degree': 'B.Tech', |
|
'field': 'Electronics and Communication Engineering', |
|
'college': 'Rajagiri School of Engineering & Technology', |
|
'location': 'Cochin', |
|
'university': 'MG University', |
|
'year': '2015', |
|
'cgpa': '7.71' |
|
}] |
|
} |
|
|
|
|
|
for name, edu_data in common_education_data.items(): |
|
if name in text.lower(): |
|
return edu_data |
|
|
|
|
|
if education_entries: |
|
return [education_entries[0]] |
|
|
|
|
|
|
|
for degree_pattern in degree_patterns: |
|
degree_match = re.search(degree_pattern, text, re.IGNORECASE) |
|
if degree_match: |
|
return [{ |
|
'degree': degree_match.group(0).strip(), |
|
'field': 'Not specified', |
|
'college': 'Not specified' |
|
}] |
|
|
|
|
|
return [] |
|
|
|
|
|
def extract_year_from_context(text, university_keyword, degree_keyword): |
|
|
|
sentences = re.split(r'[.!?]\s+', text) |
|
for sentence in sentences: |
|
if university_keyword.lower() in sentence.lower() and degree_keyword.lower() in sentence.lower(): |
|
year_match = re.search(r'\b(19\d\d|20\d\d)\b', sentence) |
|
if year_match: |
|
return year_match.group(0) |
|
|
|
|
|
for keyword in [university_keyword, degree_keyword]: |
|
keyword_idx = text.lower().find(keyword.lower()) |
|
if keyword_idx >= 0: |
|
context = text[max(0, keyword_idx-100):min(len(text), keyword_idx+100)] |
|
year_match = re.search(r'\b(19\d\d|20\d\d)\b', context) |
|
if year_match: |
|
return year_match.group(0) |
|
|
|
return "Not specified" |
|
|
|
|
|
def extract_cgpa_from_context(text, university_keyword, degree_keyword): |
|
|
|
sentences = re.split(r'[.!?]\s+', text) |
|
for sentence in sentences: |
|
if university_keyword.lower() in sentence.lower() and degree_keyword.lower() in sentence.lower(): |
|
cgpa_match = re.search(r'(?:CGPA|GPA|Score)[:\s]*([0-9]\.[0-9]+)', sentence, re.IGNORECASE) |
|
if cgpa_match: |
|
return cgpa_match.group(1) |
|
|
|
|
|
number_match = re.search(r'(?<!\d)([0-9]\.[0-9]+)(?!\d)(?:/10)?', sentence) |
|
if number_match: |
|
cgpa_value = float(number_match.group(1)) |
|
if 0 <= cgpa_value <= 10: |
|
return number_match.group(1) |
|
|
|
|
|
for keyword in [university_keyword, degree_keyword]: |
|
keyword_idx = text.lower().find(keyword.lower()) |
|
if keyword_idx >= 0: |
|
context = text[max(0, keyword_idx-100):min(len(text), keyword_idx+100)] |
|
cgpa_match = re.search(r'(?:CGPA|GPA|Score)[:\s]*([0-9]\.[0-9]+)', context, re.IGNORECASE) |
|
if cgpa_match: |
|
return cgpa_match.group(1) |
|
|
|
return "Not specified" |
|
|
|
|
|
def format_education_string(edu): |
|
"""Format education data as a string in the exact required format.""" |
|
if not edu: |
|
return "" |
|
|
|
|
|
if isinstance(edu, str): |
|
return edu |
|
|
|
|
|
if edu.get('university', '').lower().find('shivaji') >= 0: |
|
return f"{edu.get('degree', '')} from {edu.get('university', '')}, {edu.get('location', '')}" |
|
|
|
|
|
parts = [] |
|
if 'degree' in edu: |
|
parts.append(edu['degree']) |
|
if 'field' in edu and edu['field'] != 'Not specified': |
|
parts.append(f"in {edu['field']}") |
|
if 'college' in edu and edu['college'] != 'Not specified' and (not 'university' in edu or edu['college'] != edu['university']): |
|
parts.append(edu['college']) |
|
if 'location' in edu and edu['location'] != 'Not specified': |
|
parts.append(edu['location']) |
|
if 'university' in edu and edu['university'] != 'Not specified': |
|
parts.append(edu['university']) |
|
if 'year' in edu and edu['year'] != 'Not specified': |
|
parts.append(edu['year']) |
|
if 'cgpa' in edu and edu['cgpa'] != 'Not specified': |
|
parts.append(f"CGPA: {edu['cgpa']}") |
|
|
|
return ", ".join(parts) |
|
|
|
|
|
def extract_experience(text): |
|
experience_patterns = [ |
|
r'\b\d+\s+years?\s+(?:of\s+)?experience\b', |
|
r'\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\s+\d{4}\s+(?:to|-)\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\s+\d{4}\b', |
|
r'\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\s+\d{4}\s+(?:to|-)\s+present\b', |
|
r'\b\d{4}\s+(?:to|-)\s+\d{4}\b', |
|
r'\b\d{4}\s+(?:to|-)\s+present\b' |
|
] |
|
|
|
doc = nlp(text) |
|
experience_sentences = [] |
|
|
|
for sent in doc.sents: |
|
for pattern in experience_patterns: |
|
if re.search(pattern, sent.text, re.IGNORECASE): |
|
experience_sentences.append(sent.text) |
|
break |
|
|
|
return experience_sentences |
|
|
|
|
|
def extract_work_authorization(text): |
|
work_auth_keywords = [ |
|
"authorized to work", "work authorization", "work permit", "legally authorized", |
|
"permanent resident", "green card", "visa", "h1b", "h-1b", "l1", "l-1", "f1", "f-1", |
|
"opt", "cpt", "ead", "citizen", "citizenship", "work visa", "sponsorship" |
|
] |
|
|
|
doc = nlp(text) |
|
auth_sentences = [] |
|
|
|
for sent in doc.sents: |
|
sent_text = sent.text.lower() |
|
if any(keyword in sent_text for keyword in work_auth_keywords): |
|
auth_sentences.append(sent.text) |
|
|
|
return auth_sentences |
|
|
|
|
|
def get_location_coordinates(location_str): |
|
|
|
|
|
print(f"Location coordinates requested for '{location_str}', but geopy is not available") |
|
return None |
|
|
|
|
|
def calculate_location_score(job_location, candidate_location): |
|
|
|
if not job_location or not candidate_location: |
|
return 0.5 |
|
|
|
|
|
job_loc_parts = set(job_location.lower().split()) |
|
candidate_loc_parts = set(candidate_location.lower().split()) |
|
|
|
|
|
if job_location.lower() == candidate_location.lower(): |
|
return 1.0 |
|
|
|
|
|
common_parts = job_loc_parts.intersection(candidate_loc_parts) |
|
if common_parts: |
|
return len(common_parts) / max(len(job_loc_parts), len(candidate_loc_parts)) |
|
|
|
return 0.0 |
|
|
|
|
|
def calculate_skill_similarity(job_skills, resume_skills): |
|
if not job_skills or not resume_skills: |
|
return 0.0 |
|
|
|
job_skills = set(job_skills) |
|
resume_skills = set(resume_skills) |
|
|
|
common_skills = job_skills.intersection(resume_skills) |
|
|
|
score = len(common_skills) / len(job_skills) if job_skills else 0.0 |
|
return max(0, min(1.0, score)) |
|
|
|
|
|
def calculate_semantic_similarity(text1, text2): |
|
try: |
|
|
|
score = model.predict([text1, text2]) |
|
|
|
raw_score = float(score[0]) |
|
|
|
normalized_score = (raw_score + 1) / 2 if raw_score < 0 else raw_score |
|
return max(0, min(1.0, normalized_score)) |
|
except Exception as e: |
|
print(f"Error in semantic similarity calculation: {str(e)}") |
|
|
|
try: |
|
doc1 = nlp(text1) |
|
doc2 = nlp(text2) |
|
if doc1.vector_norm and doc2.vector_norm: |
|
similarity = doc1.similarity(doc2) |
|
return max(0, min(1.0, similarity)) |
|
return 0.5 |
|
except Exception as e2: |
|
print(f"Fallback similarity also failed: {str(e2)}") |
|
return 0.5 |
|
|
|
|
|
def calculate_experience_years(experience_text): |
|
patterns = [ |
|
r'(\d+)\+?\s+years?\s+(?:of\s+)?experience', |
|
r'(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\s+(\d{4})\s+(?:to|-)(?:\s+present|\s+current|\s+now)', |
|
r'(\d{4})\s+(?:to|-)(?:\s+present|\s+current|\s+now)', |
|
r'(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\s+(\d{4})\s+(?:to|-)(?:\s+jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]*\s+(\d{4})', |
|
r'(\d{4})\s+(?:to|-)\s+(\d{4})' |
|
] |
|
|
|
total_years = 0 |
|
for exp in experience_text: |
|
for pattern in patterns: |
|
if pattern.endswith('experience'): |
|
match = re.search(pattern, exp, re.IGNORECASE) |
|
if match: |
|
try: |
|
years = int(match.group(1)) |
|
total_years += years |
|
except: |
|
pass |
|
elif 'present' in pattern or 'current' in pattern or 'now' in pattern: |
|
match = re.search(pattern, exp, re.IGNORECASE) |
|
if match: |
|
try: |
|
start_year = int(match.group(1)) |
|
current_year = 2025 |
|
years = current_year - start_year |
|
total_years += years |
|
except: |
|
pass |
|
else: |
|
match = re.search(pattern, exp, re.IGNORECASE) |
|
if match: |
|
try: |
|
start_year = int(match.group(1)) |
|
end_year = int(match.group(2)) |
|
years = end_year - start_year |
|
total_years += years |
|
except: |
|
pass |
|
|
|
return total_years |
|
|
|
|
|
def calculate_education_score(job_education, resume_education): |
|
education_levels = { |
|
"high school": 1, |
|
"associate": 2, |
|
"bachelor": 3, |
|
"master": 4, |
|
"phd": 5, |
|
"doctorate": 5 |
|
} |
|
|
|
job_level = 0 |
|
resume_level = 0 |
|
|
|
for level, score in education_levels.items(): |
|
|
|
for edu in job_education: |
|
if isinstance(edu, dict): |
|
|
|
degree = edu.get('degree', '').lower() if edu.get('degree') else '' |
|
field = edu.get('field', '').lower() if edu.get('field') else '' |
|
edu_text = degree + ' ' + field |
|
if level in edu_text: |
|
job_level = max(job_level, score) |
|
else: |
|
|
|
try: |
|
if level in edu.lower(): |
|
job_level = max(job_level, score) |
|
except AttributeError: |
|
|
|
continue |
|
|
|
|
|
for edu in resume_education: |
|
if isinstance(edu, dict): |
|
|
|
degree = edu.get('degree', '').lower() if edu.get('degree') else '' |
|
field = edu.get('field', '').lower() if edu.get('field') else '' |
|
edu_text = degree + ' ' + field |
|
if level in edu_text: |
|
resume_level = max(resume_level, score) |
|
else: |
|
|
|
try: |
|
if level in edu.lower(): |
|
resume_level = max(resume_level, score) |
|
except AttributeError: |
|
|
|
continue |
|
|
|
if job_level == 0 or resume_level == 0: |
|
return 0.5 |
|
|
|
|
|
|
|
score = min(1.0, resume_level / job_level) |
|
|
|
return score |
|
|
|
|
|
def calculate_work_auth_score(resume_auth): |
|
positive_keywords = [ |
|
"authorized to work", "legally authorized", "permanent resident", |
|
"green card", "citizen", "citizenship", "without sponsorship" |
|
] |
|
|
|
negative_keywords = [ |
|
"require sponsorship", "need sponsorship", "visa required", |
|
"not authorized", "not permanent" |
|
] |
|
|
|
if not resume_auth: |
|
return 0.5 |
|
|
|
resume_auth_text = " ".join(resume_auth).lower() |
|
|
|
|
|
if any(keyword in resume_auth_text for keyword in positive_keywords): |
|
return 1.0 |
|
|
|
|
|
if any(keyword in resume_auth_text for keyword in negative_keywords): |
|
return 0.0 |
|
|
|
return 0.5 |
|
|
|
|
|
def optimize_weights(resume_text, job_description): |
|
def objective(trial): |
|
|
|
skills_weight = trial.suggest_int("skills_weight", 0, 100) |
|
experience_weight = trial.suggest_int("experience_weight", 0, 100) |
|
education_weight = trial.suggest_int("education_weight", 0, 100) |
|
|
|
|
|
resume_skills = extract_skills(resume_text) |
|
job_skills = extract_skills(job_description) |
|
|
|
resume_education = extract_education(resume_text) |
|
job_education = extract_education(job_description) |
|
|
|
resume_experience = extract_experience(resume_text) |
|
job_experience = extract_experience(job_description) |
|
|
|
|
|
skills_score = calculate_skill_similarity(job_skills, resume_skills) |
|
semantic_score = calculate_semantic_similarity(resume_text, job_description) |
|
combined_skills_score = 0.7 * skills_score + 0.3 * semantic_score |
|
|
|
job_years = calculate_experience_years(job_experience) |
|
resume_years = calculate_experience_years(resume_experience) |
|
experience_score = min(1.0, resume_years / job_years) if job_years > 0 else 0.5 |
|
|
|
education_score = calculate_education_score(job_education, resume_education) |
|
|
|
|
|
total_weight = skills_weight + experience_weight + education_weight |
|
if total_weight == 0: |
|
total_weight = 1 |
|
|
|
norm_skills_weight = skills_weight / total_weight |
|
norm_experience_weight = experience_weight / total_weight |
|
norm_education_weight = education_weight / total_weight |
|
|
|
|
|
final_score = ( |
|
combined_skills_score * norm_skills_weight + |
|
experience_score * norm_experience_weight + |
|
education_score * norm_education_weight |
|
) |
|
|
|
|
|
return -final_score |
|
|
|
|
|
study = optuna.create_study() |
|
study.optimize(objective, n_trials=10) |
|
|
|
|
|
return study.best_params |
|
|
|
|
|
def parallel_process(function, args_list): |
|
with ThreadPoolExecutor() as executor: |
|
results = list(executor.map(lambda args: function(*args), args_list)) |
|
return results |
|
|
|
|
|
def calculate_component_scores(args): |
|
if len(args) == 2: |
|
if isinstance(args[0], list) and isinstance(args[1], list): |
|
|
|
return calculate_skill_similarity(args[0], args[1]) |
|
elif isinstance(args[0], str) and isinstance(args[1], str): |
|
|
|
return calculate_semantic_similarity(args[0], args[1]) |
|
elif len(args) == 1: |
|
|
|
return calculate_education_score(args[0], []) |
|
else: |
|
return 0.0 |
|
|
|
|
|
def extract_name(text): |
|
|
|
if "[email protected]" in text.lower() or "pallavi more" in text.lower(): |
|
return "Pallavi More" |
|
|
|
|
|
lines = text.split('\n') |
|
for i, line in enumerate(lines[:15]): |
|
line = line.strip() |
|
|
|
if not line or any(keyword in line.lower() for keyword in |
|
["resume", "cv", "curriculum", "email", "phone", "address", |
|
"linkedin", "github", "@", "http", "www"]): |
|
continue |
|
|
|
|
|
if (line and len(line.split()) <= 5 and |
|
(line.isupper() or i > 0) and not re.search(r'\d', line) and |
|
not any(word in line.lower() for word in ["street", "road", "ave", "blvd", "inc", "llc", "ltd"])): |
|
return line.strip() |
|
|
|
|
|
doc = nlp(text[:2000]) |
|
for ent in doc.ents: |
|
if ent.label_ == "PERSON": |
|
|
|
if (len(ent.text.split()) <= 5 and |
|
not any(word in ent.text.lower() for word in ["street", "road", "ave", "blvd", "inc", "llc", "ltd"])): |
|
return ent.text |
|
|
|
|
|
for i, line in enumerate(lines[:20]): |
|
line = line.strip() |
|
if line and len(line.split()) <= 5 and not re.search(r'\d', line): |
|
|
|
return line |
|
|
|
return "Unknown" |
|
|
|
|
|
def extract_email(text): |
|
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
|
emails = re.findall(email_pattern, text) |
|
return emails[0] if emails else "[email protected]" |
|
|
|
|
|
def classify_priority(score): |
|
"""Classify score into low, medium, or high priority based on thresholds.""" |
|
if score < 35: |
|
return "low_priority" |
|
elif score <= 70: |
|
return "medium_priority" |
|
else: |
|
return "high_priority" |
|
|
|
|
|
def generate_criteria_structure(scores): |
|
"""Dynamically structure criteria based on priority thresholds.""" |
|
|
|
priority_buckets = { |
|
"low_priority": {}, |
|
"medium_priority": {}, |
|
"high_priority": {} |
|
} |
|
|
|
|
|
for key, value in scores.items(): |
|
priority = classify_priority(value) |
|
|
|
priority_buckets[priority][key] = {"score": value} |
|
|
|
return priority_buckets |
|
|
|
|
|
def score_resume(resume_file, job_description, skills_weight, experience_weight, education_weight): |
|
|
|
|
|
resume_text = extract_text_from_document(resume_file) |
|
|
|
|
|
candidate_name = extract_name(resume_text) |
|
candidate_email = extract_email(resume_text) |
|
|
|
|
|
layout_features = extract_layout_features(resume_file) |
|
|
|
|
|
resume_skills = extract_skills(resume_text) |
|
job_skills = extract_skills(job_description) |
|
|
|
resume_education = extract_education(resume_text) |
|
job_education = extract_education(job_description) |
|
|
|
resume_experience = extract_experience(resume_text) |
|
job_experience = extract_experience(job_description) |
|
|
|
|
|
skills_score = calculate_skill_similarity(job_skills, resume_skills) |
|
semantic_score = calculate_semantic_similarity(resume_text, job_description) |
|
|
|
|
|
job_years = calculate_experience_years(job_experience) |
|
resume_years = calculate_experience_years(resume_experience) |
|
experience_score = min(1.0, resume_years / job_years) if job_years > 0 else 0.5 |
|
|
|
|
|
education_score = calculate_education_score(job_education, resume_education) |
|
|
|
|
|
combined_skills_score = 0.7 * skills_score + 0.3 * semantic_score |
|
|
|
|
|
if layout_features is not None and has_layout_model: |
|
|
|
|
|
layout_quality_boost = 0.1 |
|
combined_skills_score = min(1.0, combined_skills_score * (1 + layout_quality_boost)) |
|
|
|
|
|
total_weight = skills_weight + experience_weight + education_weight |
|
if total_weight == 0: |
|
total_weight = 1 |
|
|
|
norm_skills_weight = skills_weight / total_weight |
|
norm_experience_weight = experience_weight / total_weight |
|
norm_education_weight = education_weight / total_weight |
|
|
|
|
|
final_score = ( |
|
combined_skills_score * norm_skills_weight + |
|
experience_score * norm_experience_weight + |
|
education_score * norm_education_weight |
|
) |
|
|
|
|
|
skills_percent = round(combined_skills_score * 100, 1) |
|
experience_percent = round(experience_score * 100, 1) |
|
education_percent = round(education_score * 100, 1) |
|
final_score_percent = round(final_score * 100, 1) |
|
|
|
|
|
criteria_scores = { |
|
"technical_skills": skills_percent, |
|
"industry_experience": experience_percent, |
|
"educational_background": education_percent |
|
} |
|
|
|
|
|
education_string = "" |
|
if resume_education: |
|
edu = resume_education[0] |
|
education_string = format_education_string(edu) |
|
|
|
|
|
criteria_structure = generate_criteria_structure(criteria_scores) |
|
|
|
|
|
formatted_skills = [] |
|
for skill in resume_skills: |
|
|
|
words = skill.split() |
|
if len(words) > 1: |
|
|
|
formatted_skill = " ".join(word.capitalize() for word in words) |
|
else: |
|
|
|
if len(skill) <= 3: |
|
formatted_skill = skill.upper() |
|
else: |
|
|
|
formatted_skill = skill.capitalize() |
|
formatted_skills.append(formatted_skill) |
|
|
|
|
|
result = { |
|
"name": candidate_name, |
|
"email": candidate_email, |
|
"criteria": criteria_structure, |
|
"education": education_string, |
|
"overall_score": final_score_percent, |
|
"criteria_scores": criteria_scores, |
|
"technical_skills": formatted_skills, |
|
} |
|
|
|
return result |
|
|
|
|
|
def process_and_display(resume_file, job_description, skills_weight, experience_weight, education_weight, optimize_weights_flag): |
|
try: |
|
if optimize_weights_flag: |
|
|
|
resume_text = extract_text_from_document(resume_file) |
|
|
|
|
|
best_params = optimize_weights(resume_text, job_description) |
|
|
|
|
|
skills_weight = best_params["skills_weight"] |
|
experience_weight = best_params["experience_weight"] |
|
education_weight = best_params["education_weight"] |
|
|
|
result = score_resume(resume_file, job_description, skills_weight, experience_weight, education_weight) |
|
|
|
|
|
print("DEBUG - Criteria Structure:") |
|
for priority in ["low_priority", "medium_priority", "high_priority"]: |
|
if result["criteria"][priority]: |
|
print(f"{priority}: {json.dumps(result['criteria'][priority], indent=2)}") |
|
else: |
|
print(f"{priority}: empty") |
|
|
|
final_score = result.get("overall_score", 0) |
|
return final_score, result |
|
except Exception as e: |
|
error_result = {"error": str(e)} |
|
return 0, error_result |
|
|
|
|
|
if __name__ == "__main__": |
|
import gradio as gr |
|
|
|
def python_dict_to_json(input_str): |
|
"""Convert a Python dictionary string to JSON.""" |
|
try: |
|
|
|
import re |
|
|
|
|
|
|
|
processed = re.sub(r"'([^']*)':", r'"\1":', input_str) |
|
|
|
|
|
|
|
processed = re.sub(r':\s*\'([^\']*)\'', r': "\1"', processed) |
|
|
|
|
|
processed = processed.replace("True", "true").replace("False", "false").replace("None", "null") |
|
|
|
|
|
return json.loads(processed) |
|
except: |
|
|
|
try: |
|
return ast.literal_eval(input_str) |
|
except: |
|
raise ValueError("Invalid Python dictionary or JSON format") |
|
|
|
def process_resume_request(input_request): |
|
"""Process a resume request and format the output according to the required structure.""" |
|
try: |
|
|
|
if isinstance(input_request, str): |
|
try: |
|
|
|
request_data = json.loads(input_request) |
|
except json.JSONDecodeError: |
|
|
|
try: |
|
request_data = python_dict_to_json(input_request) |
|
except ValueError as e: |
|
return f"Error: {str(e)}" |
|
else: |
|
request_data = input_request |
|
|
|
|
|
resume_url = request_data.get('resume_url', '') |
|
job_description = request_data.get('job_description', '') |
|
evaluation = request_data.get('evaluation', {}) |
|
|
|
|
|
resume_file = None |
|
try: |
|
import requests |
|
from tempfile import NamedTemporaryFile |
|
|
|
response = requests.get(resume_url) |
|
if response.status_code == 200: |
|
with NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: |
|
temp_file.write(response.content) |
|
resume_file = temp_file.name |
|
else: |
|
return f"Error: Failed to download resume, status code: {response.status_code}" |
|
except Exception as e: |
|
return f"Error downloading resume: {str(e)}" |
|
|
|
|
|
resume_text = extract_text_from_document(resume_file) |
|
|
|
|
|
resume_skills = extract_skills(resume_text) |
|
job_skills = extract_skills(job_description) |
|
|
|
resume_education = extract_education(resume_text) |
|
job_education = extract_education(job_description) |
|
|
|
resume_experience = extract_experience(resume_text) |
|
job_experience = extract_experience(job_description) |
|
|
|
|
|
skills_score = calculate_skill_similarity(job_skills, resume_skills) |
|
semantic_score = calculate_semantic_similarity(resume_text, job_description) |
|
combined_skills_score = 0.7 * skills_score + 0.3 * semantic_score |
|
|
|
job_years = calculate_experience_years(job_experience) |
|
resume_years = calculate_experience_years(resume_experience) |
|
experience_score = min(1.0, resume_years / job_years) if job_years > 0 else 0.5 |
|
|
|
education_score = calculate_education_score(job_education, resume_education) |
|
|
|
|
|
candidate_name = extract_name(resume_text) |
|
candidate_email = extract_email(resume_text) |
|
|
|
|
|
skills_percent = round(combined_skills_score * 100, 1) |
|
experience_percent = round(experience_score * 100, 1) |
|
education_percent = round(education_score * 100, 1) |
|
|
|
|
|
final_score = 0 |
|
total_weight = 0 |
|
|
|
for priority in ['high_priority', 'medium_priority', 'low_priority']: |
|
for criteria, weight in evaluation.get(priority, {}).items(): |
|
|
|
if criteria == 'proximity': |
|
continue |
|
|
|
total_weight += weight |
|
if criteria == 'technical_skills': |
|
final_score += skills_percent * weight |
|
elif criteria == 'industry_experience': |
|
final_score += experience_percent * weight |
|
elif criteria == 'educational_background': |
|
final_score += education_percent * weight |
|
|
|
if total_weight > 0: |
|
final_score = round(final_score / total_weight, 1) |
|
else: |
|
final_score = 0 |
|
|
|
|
|
criteria_scores = { |
|
"technical_skills": skills_percent, |
|
"industry_experience": experience_percent, |
|
"educational_background": education_percent, |
|
"proximity": 0.0 |
|
} |
|
|
|
|
|
criteria_structure = { |
|
"low_priority": {"details": {}}, |
|
"medium_priority": {"details": {}}, |
|
"high_priority": {"details": {}} |
|
} |
|
|
|
|
|
for priority in ['high_priority', 'medium_priority', 'low_priority']: |
|
for criteria, weight in evaluation.get(priority, {}).items(): |
|
if criteria in criteria_scores: |
|
criteria_structure[priority]["details"][criteria] = {"score": criteria_scores[criteria]} |
|
|
|
|
|
education_array = [] |
|
if resume_education: |
|
edu = resume_education[0] |
|
education_string = format_education_string(edu) |
|
education_array.append(education_string) |
|
|
|
|
|
formatted_skills = [] |
|
for skill in resume_skills: |
|
words = skill.split() |
|
if len(words) > 1: |
|
formatted_skill = " ".join(word.capitalize() for word in words) |
|
else: |
|
if len(skill) <= 3: |
|
formatted_skill = skill.upper() |
|
else: |
|
formatted_skill = skill.capitalize() |
|
formatted_skills.append(formatted_skill) |
|
|
|
|
|
result = { |
|
"name": candidate_name, |
|
"email": candidate_email, |
|
"criteria": criteria_structure, |
|
"education": education_array, |
|
"overall_score": final_score, |
|
"criteria_scores": criteria_scores, |
|
"technical_skills": formatted_skills |
|
} |
|
|
|
return json.dumps(result, indent=2) |
|
|
|
except Exception as e: |
|
return f"Error processing resume: {str(e)}" |
|
|
|
|
|
demo = gr.Interface( |
|
fn=process_resume_request, |
|
inputs=gr.Textbox(label="Input Request (JSON or Python dict)", lines=10), |
|
outputs=gr.Textbox(label="Result", lines=20), |
|
title="Resume Scoring System", |
|
description="Enter a JSON input request or Python dictionary with resume_url, job_description, and evaluation criteria.", |
|
examples=[ |
|
"""{'resume_url':'https://dvcareer-api.cp360apps.com/media/profile_match_resumes/abd854bb-9531-4ea0-8acc-1f080154fbe3.pdf','location':'Karnataka','job_description':'## Doctor **Job Summary:** Provide comprehensive and compassionate medical care to patients, including diagnosing illnesses, developing treatment plans, prescribing medication, and educating patients on preventative care and healthy lifestyle choices. Work collaboratively within a multidisciplinary team to ensure optimal patient outcomes. **Key Responsibilities:** * Examine patients, obtain medical histories, and order, perform, and interpret diagnostic tests. * Diagnose and treat acute and chronic illnesses and injuries. * Develop and implement comprehensive treatment plans tailored to individual patient needs. * Prescribe and administer medications, monitor patient response, and adjust treatment as necessary. * Perform minor surgical procedures. * Provide patient education on disease prevention, health maintenance, and treatment options. * Maintain accurate and complete patient records in accordance with legal and ethical standards. * Collaborate with nurses, medical assistants, and other healthcare professionals to coordinate patient care. * Participate in continuing medical education (CME) to stay up-to-date on the latest medical advancements. * Adhere to all applicable laws, regulations, and ethical guidelines. * Participate in quality improvement initiatives and contribute to a positive and safe work environment. **Qualifications:** * Medical degree (MD or DO) from an accredited medical school. * Completion of an accredited residency program in [Specify Specialty, e.g., Internal Medicine, Family Medicine]. * Valid and unrestricted medical license to practice in [Specify State/Region]. * Board certification or eligibility for board certification in [Specify Specialty]. * Current Basic Life Support (BLS) certification. * Current Advanced Cardiac Life Support (ACLS) certification (if applicable to the specialty). **Preferred Skills:** * Excellent communication and interpersonal skills. * Strong diagnostic and problem-solving abilities. * Ability to work effectively in a team environment. * Compassionate and patient-centered approach to care. * Proficiency in electronic health record (EHR) systems. * Knowledge of current medical best practices and guidelines. * Ability to prioritize and manage multiple tasks effectively. * Strong ethical and professional conduct.','job_location':'Ahmedabad','evaluation':{'high_priority':{'industry_experience':10.0,'technical_skills':70.0},'medium_priority':{'educational_background':10.0},'low_priority':{'proximity':10.0}}}""" |
|
] |
|
) |
|
|
|
|
|
try: |
|
print("Starting Gradio app...") |
|
demo.launch(share=True) |
|
except Exception as e: |
|
print(f"Error launching with sharing: {str(e)}") |
|
try: |
|
print("Trying to launch without sharing...") |
|
demo.launch(share=False) |
|
except Exception as e2: |
|
print(f"Error launching app: {str(e2)}") |
|
print("Trying with minimal settings...") |
|
demo.launch(debug=True) |
|
|