Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import requests | |
| from pydantic import BaseModel, Field | |
| from typing import List, Tuple, Optional | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.prompts import ChatPromptTemplate | |
| import os | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, Header, Request | |
| from fastapi.responses import JSONResponse | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import json | |
| import tempfile | |
| import shutil | |
| import PyPDF2 | |
| from dotenv import load_dotenv | |
| import pdfplumber | |
| import re | |
| from db import * | |
| import time | |
| import asyncio | |
| from contextlib import asynccontextmanager | |
| import logging | |
| from sqlalchemy.pool import NullPool | |
| from cloud_config import * | |
| import uuid | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging for Cloud Run | |
| logging.basicConfig( | |
| level=getattr(logging, LOG_LEVEL), | |
| format=LOG_FORMAT | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Global variable to store access token | |
| access_token = None | |
| # Startup/shutdown events | |
| async def lifespan(app: FastAPI): | |
| # Startup | |
| logger.info("Starting up Job Recommendation API...") | |
| # You can initialize connection pools here if needed | |
| yield | |
| # Shutdown | |
| logger.info("Shutting down Job Recommendation API...") | |
| # Close any open connections here | |
| # Initialize FastAPI app with lifespan | |
| app = FastAPI( | |
| title="Job Recommendation API", | |
| description="API for processing resumes and recommending jobs", | |
| lifespan=lifespan | |
| ) | |
| # Add CORS middleware for cloud deployment | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Configure based on your needs | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Add request ID middleware for better tracing | |
| async def add_request_id(request: Request, call_next): | |
| request_id = f"{time.time()}-{request.client.host}" | |
| request.state.request_id = request_id | |
| # Log the request | |
| logger.info(f"Request ID: {request_id} - {request.method} {request.url.path}") | |
| try: | |
| response = await call_next(request) | |
| response.headers["X-Request-ID"] = request_id | |
| return response | |
| except Exception as e: | |
| logger.error(f"Request ID: {request_id} - Error: {str(e)}") | |
| raise | |
| # Security configuration | |
| API_KEY = os.getenv("API_KEY") | |
| security = HTTPBearer() | |
| def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| """ | |
| Verify the API key from the Authorization header | |
| """ | |
| if not API_KEY: | |
| logger.error("API key not configured") | |
| raise HTTPException( | |
| status_code=500, | |
| detail="API key not configured", | |
| ) | |
| if credentials.credentials != API_KEY: | |
| logger.warning("Invalid API key attempt") | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid API key", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return credentials.credentials | |
| # Initialize OpenAI client with error handling | |
| try: | |
| llm = ChatOpenAI( | |
| model="gpt-4o-mini", | |
| temperature=0, | |
| api_key=os.getenv("OPENAI_API_KEY") | |
| ) | |
| logger.info("OpenAI client initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize OpenAI client: {e}") | |
| raise | |
| # Initialize database engine with connection pooling suitable for Cloud Run | |
| def get_engine(): | |
| """ | |
| Get database engine with NullPool for Cloud Run | |
| """ | |
| try: | |
| conn_string = f"postgresql://{DB_PARAMS['user']}:{DB_PARAMS['password']}@{DB_PARAMS['host']}:{DB_PARAMS['port']}/{DB_PARAMS['dbname']}" | |
| # Use NullPool for Cloud Run to avoid connection issues | |
| engine = create_engine(conn_string, poolclass=NullPool, pool_pre_ping=True) | |
| logger.info("Database engine created successfully") | |
| return engine | |
| except Exception as e: | |
| logger.error(f"Failed to create database engine: {e}") | |
| raise | |
| # Initialize database engine | |
| engine = get_engine() | |
| def get_access_token(): | |
| """ | |
| Get access token for the external API with better error handling | |
| """ | |
| global access_token | |
| # If we already have a token, return it | |
| if access_token: | |
| return access_token | |
| try: | |
| login_url = str(os.getenv("login_url")) | |
| login_data = { | |
| "email": str(os.getenv("email")), | |
| "password": str(os.getenv("password")) | |
| } | |
| login_headers = { | |
| 'accept': 'application/json', | |
| 'Content-Type': 'application/json' | |
| } | |
| # Add timeout to prevent hanging | |
| login_response = requests.post(login_url, headers=login_headers, json=login_data, timeout=None) | |
| if login_response.status_code == 200: | |
| login_result = login_response.json() | |
| access_token = login_result.get('data', {}).get('tokens', {}).get('accessToken') | |
| if access_token: | |
| logger.info("Successfully obtained access token") | |
| return access_token | |
| else: | |
| logger.error("Login successful but no access token found in response") | |
| return None | |
| else: | |
| logger.error(f"Login failed with status {login_response.status_code}: {login_response.text}") | |
| return None | |
| except requests.exceptions.Timeout: | |
| logger.error("Login request timed out") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Network error during login: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Unexpected error getting access token: {e}") | |
| return None | |
| def generate_smart_hiring_collateral(job_description_text: str) -> tuple[str, str]: | |
| """ | |
| Generate collateral using the smart-hiring/generate endpoint | |
| Returns a tuple of (collateral, job_id) | |
| """ | |
| try: | |
| url = str(os.getenv("smart_hiring_url")) | |
| # Generate a unique job ID using UUID | |
| job_id = str(uuid.uuid4()) | |
| # Prepare headers with authentication | |
| headers = { | |
| 'accept': 'application/json', | |
| 'Authorization': f'Bearer {get_access_token()}' | |
| } | |
| # Prepare payload | |
| payload = { | |
| 'job_id': job_id, | |
| 'job_description_text': job_description_text | |
| } | |
| # Make the API request | |
| response = requests.post(url, headers=headers, data=payload, timeout=None) | |
| if response.status_code == 200: | |
| logger.info("Smart hiring collateral generated successfully") | |
| # Parse the response to extract smart_hiring_criteria | |
| try: | |
| response_data = response.json() | |
| if response_data.get('success') and 'data' in response_data: | |
| smart_hiring_criteria = response_data['data'].get('smart_hiring_criteria', '') | |
| if smart_hiring_criteria: | |
| logger.info("Successfully extracted smart hiring criteria") | |
| return smart_hiring_criteria, job_id | |
| else: | |
| logger.warning("No smart_hiring_criteria found in response") | |
| return "", job_id | |
| else: | |
| logger.warning("Invalid response format from smart hiring API") | |
| return "", job_id | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to parse smart hiring response as JSON: {e}") | |
| return "", job_id | |
| elif response.status_code == 401: | |
| logger.warning("Authentication failed for smart hiring, getting fresh token...") | |
| global access_token | |
| access_token = None # Reset the token | |
| new_token = get_access_token() | |
| if new_token: | |
| headers['Authorization'] = f'Bearer {new_token}' | |
| response = requests.post(url, headers=headers, data=payload, timeout=None) | |
| if response.status_code == 200: | |
| logger.info("Smart hiring collateral generated successfully with fresh token") | |
| # Parse the response to extract smart_hiring_criteria | |
| try: | |
| response_data = response.json() | |
| if response_data.get('success') and 'data' in response_data: | |
| smart_hiring_criteria = response_data['data'].get('smart_hiring_criteria', '') | |
| if smart_hiring_criteria: | |
| logger.info("Successfully extracted smart hiring criteria with fresh token") | |
| return smart_hiring_criteria, job_id | |
| else: | |
| logger.warning("No smart_hiring_criteria found in response with fresh token") | |
| return "", job_id | |
| else: | |
| logger.warning("Invalid response format from smart hiring API with fresh token") | |
| return "", job_id | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to parse smart hiring response as JSON with fresh token: {e}") | |
| return "", job_id | |
| else: | |
| logger.error(f"Smart hiring API call failed with status {response.status_code}") | |
| return "", job_id | |
| else: | |
| logger.error("Could not obtain fresh token for smart hiring") | |
| return "", job_id | |
| else: | |
| logger.error(f"Smart hiring API call failed with status {response.status_code}: {response.text}") | |
| return "", job_id | |
| except requests.exceptions.Timeout: | |
| logger.error(f"Smart hiring API request timed out after {EXTERNAL_API_TIMEOUT} seconds") | |
| return "", "" | |
| except Exception as e: | |
| logger.error(f"Exception occurred in smart hiring generation: {str(e)}") | |
| return "", "" | |
| class structure(BaseModel): | |
| name: str = Field(description="Name of the candidate") | |
| location: str = Field(description="The location of the candidate. Extract city and state if possible.") | |
| skills: List[str] = Field(description="List of individual skills of the candidate") | |
| ideal_jobs: str = Field(description="List of ideal jobs for the candidate based on past experience.") | |
| email: str = Field(description="The email of the candidate") | |
| yoe: str = Field(description="Years of experience of the candidate.") | |
| experience: str = Field(description="A brief summary of the candidate's past experience.") | |
| industry: str = Field(description="The industry the candidate has experience in.(Tech,Legal,Finance/Accounting,Healthcare,Industrial,Logistics,Telecom,Admin,Other)") | |
| class JobAnalysis(BaseModel): | |
| job_title: str | |
| company_name: str | |
| analysis: dict | |
| def extract_text_from_pdf(pdf_file_path: str) -> str: | |
| """ | |
| Extract text from PDF file using multiple methods for better accuracy | |
| """ | |
| text = "" | |
| # Method 1: Try pdfplumber (better for complex layouts) | |
| try: | |
| with pdfplumber.open(pdf_file_path) as pdf: | |
| for page in pdf.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| if text.strip(): | |
| logger.info(f"Successfully extracted text using pdfplumber: {len(text)} characters") | |
| return text.strip() | |
| except Exception as e: | |
| logger.warning(f"pdfplumber failed: {e}") | |
| # Method 2: Try PyPDF2 (fallback) | |
| try: | |
| with open(pdf_file_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| for page in pdf_reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| if text.strip(): | |
| logger.info(f"Successfully extracted text using PyPDF2: {len(text)} characters") | |
| return text.strip() | |
| except Exception as e: | |
| logger.error(f"PyPDF2 failed: {e}") | |
| # If both methods fail, return empty string | |
| logger.error("Failed to extract text from PDF") | |
| return "" | |
| def extract_resume_info(resume_text: str) -> structure: | |
| """ | |
| Extract structured information from resume using LLM | |
| """ | |
| prompt = ChatPromptTemplate.from_template(""" | |
| You are an expert resume parser. Extract the following information from the resume text provided and return it in a structured JSON format. | |
| Resume Text: | |
| {resume_text} | |
| Please extract and structure the information according to the following schema: | |
| - name: Full name of the candidate | |
| - location: City and state if available, otherwise general location | |
| - skills: List of technical skills, tools, technologies, programming languages, etc. | |
| - ideal_jobs: Based on their experience, what types of jobs would be ideal for this candidate | |
| - email: Email address of the candidate (if found in resume) | |
| - yoe: Years of experience (extract from work history) | |
| - experience: Brief summary of their work experience and background | |
| - industry: Categorize into one of these industries: Tech, Legal, Finance/Accounting, Healthcare, Industrial, Logistics, Telecom, Admin, Other | |
| Return ONLY a valid JSON object with these fields. Do not include any other text or explanations. | |
| """) | |
| try: | |
| str_llm = llm.with_structured_output(structure) | |
| chain = prompt | str_llm | |
| response = chain.invoke({"resume_text": resume_text}) | |
| validated_data = { | |
| 'name': response.name, | |
| 'location': response.location, | |
| 'email': response.email, | |
| 'skills': response.skills, | |
| 'ideal_jobs': response.ideal_jobs, | |
| 'yoe': response.yoe, | |
| 'experience': response.experience, | |
| 'industry': response.industry | |
| } | |
| logger.info(f"Successfully extracted resume info for: {validated_data['name']}") | |
| return validated_data | |
| except Exception as e: | |
| logger.error(f"Failed to extract resume info: {e}") | |
| return { | |
| 'name': "Unknown", | |
| 'location': "Unknown", | |
| 'email': "", | |
| 'skills': [], | |
| 'ideal_jobs': "Software Engineer", | |
| 'yoe': "0", | |
| 'experience': "No experience listed", | |
| 'industry': "Tech" | |
| } | |
| def filter_jobs_by_industry(jobs_df: pd.DataFrame, target_industry: str) -> pd.DataFrame: | |
| """ | |
| Filter jobs by industry | |
| """ | |
| # Map the extracted industry to database industry values | |
| industry_mapping = { | |
| 'Tech': ['technology', 'VC Tech'], | |
| 'Legal': ['Legal'], | |
| 'Finance/Accounting': ['finance/Accounting'], | |
| 'Healthcare': ['healthcare'], | |
| 'Industrial': ['industrial'], | |
| 'Logistics': ['logistics'], | |
| 'Telecom': ['telecom'], | |
| 'Admin': ['admin'], | |
| 'Other': ['Other'] | |
| } | |
| target_industries = industry_mapping.get(target_industry, ['Tech']) | |
| # Filter jobs by industry (using database column name 'industry') | |
| filtered_jobs = jobs_df[jobs_df['industry'].isin(target_industries)] | |
| logger.info(f"Filtered {len(filtered_jobs)} jobs for industry: {target_industry}") | |
| return filtered_jobs | |
| def filter_jobs_by_location(jobs_df: pd.DataFrame, candidate_location: str) -> pd.DataFrame: | |
| """ | |
| Filter jobs by location matching the candidate's location | |
| """ | |
| if not candidate_location or candidate_location.lower() in ['unknown', 'n/a', '']: | |
| logger.info(f"No location info provided, returning all {len(jobs_df)} jobs") | |
| return jobs_df # Return all jobs if no location info | |
| # Clean and normalize candidate location | |
| candidate_location = candidate_location.lower().strip() | |
| logger.info(f"Filtering jobs for candidate location: {candidate_location}") | |
| # Extract state abbreviations and full names | |
| state_mapping = { | |
| 'alabama': 'al', 'alaska': 'ak', 'arizona': 'az', 'arkansas': 'ar', 'california': 'ca', | |
| 'colorado': 'co', 'connecticut': 'ct', 'delaware': 'de', 'district of columbia': 'dc', 'florida': 'fl', 'georgia': 'ga', | |
| 'hawaii': 'hi', 'idaho': 'id', 'illinois': 'il', 'indiana': 'in', 'iowa': 'ia', | |
| 'kansas': 'ks', 'kentucky': 'ky', 'louisiana': 'la', 'maine': 'me', 'maryland': 'md', | |
| 'massachusetts': 'ma', 'michigan': 'mi', 'minnesota': 'mn', 'mississippi': 'ms', 'missouri': 'mo', | |
| 'montana': 'mt', 'nebraska': 'ne', 'nevada': 'nv', 'new hampshire': 'nh', 'new jersey': 'nj', | |
| 'new mexico': 'nm', 'new york': 'ny', 'north carolina': 'nc', 'north dakota': 'nd', 'ohio': 'oh', | |
| 'oklahoma': 'ok', 'oregon': 'or', 'pennsylvania': 'pa', 'rhode island': 'ri', 'south carolina': 'sc', | |
| 'south dakota': 'sd', 'tennessee': 'tn', 'texas': 'tx', 'utah': 'ut', 'vermont': 'vt', | |
| 'virginia': 'va', 'washington': 'wa', 'west virginia': 'wv', 'wisconsin': 'wi', 'wyoming': 'wy' | |
| } | |
| # Create location patterns to match | |
| location_patterns = [] | |
| # Add the original location | |
| location_patterns.append(candidate_location) | |
| # Add state variations | |
| for state_name, state_abbr in state_mapping.items(): | |
| if state_name in candidate_location or state_abbr in candidate_location: | |
| location_patterns.extend([state_name, state_abbr]) | |
| # Add common city variations (extract city name) | |
| city_match = re.search(r'^([^,]+)', candidate_location) | |
| if city_match: | |
| city_name = city_match.group(1).strip() | |
| location_patterns.append(city_name) | |
| # Add remote/anywhere patterns if location is remote | |
| if 'remote' in candidate_location or 'anywhere' in candidate_location: | |
| location_patterns.extend(['remote', 'anywhere', 'work from home', 'wfh']) | |
| logger.info(f"Location patterns to match: {location_patterns}") | |
| # Filter jobs by location | |
| matching_jobs = [] | |
| for _, job_row in jobs_df.iterrows(): | |
| job_location = str(job_row.get('job_location', '')).lower() | |
| # Check if any location pattern matches | |
| location_matches = any(pattern in job_location for pattern in location_patterns) | |
| # Also check for remote jobs if candidate location includes remote | |
| if 'remote' in candidate_location and any(remote_term in job_location for remote_term in ['remote', 'anywhere', 'work from home', 'wfh']): | |
| location_matches = True | |
| # Check for exact city/state matches | |
| if candidate_location in job_location or job_location in candidate_location: | |
| location_matches = True | |
| if location_matches: | |
| matching_jobs.append(job_row) | |
| result_df = pd.DataFrame(matching_jobs) if matching_jobs else jobs_df | |
| logger.info(f"Found {len(matching_jobs)} jobs matching location out of {len(jobs_df)} total jobs") | |
| return result_df | |
| def extract_experience_requirement(requirements_text: str) -> dict: | |
| """ | |
| Extract experience requirements from job requirements text | |
| Returns a dictionary with min_years, max_years, and level | |
| """ | |
| if not requirements_text or pd.isna(requirements_text): | |
| return {'min_years': 0, 'max_years': 999, 'level': 'any'} | |
| requirements_text = str(requirements_text).lower() | |
| # Common experience patterns | |
| experience_patterns = [ | |
| # Specific year ranges | |
| r'(\d+)[\-\+]\s*(\d+)\s*years?\s*experience', | |
| r'(\d+)\s*to\s*(\d+)\s*years?\s*experience', | |
| r'(\d+)\s*-\s*(\d+)\s*years?\s*experience', | |
| # Minimum years | |
| r'(\d+)\+?\s*years?\s*experience', | |
| r'minimum\s*(\d+)\s*years?\s*experience', | |
| r'at\s*least\s*(\d+)\s*years?\s*experience', | |
| # Level-based patterns | |
| r'(entry\s*level|junior|associate)', | |
| r'(mid\s*level|intermediate|mid\s*senior)', | |
| r'(senior|lead|principal|staff)', | |
| r'(executive|director|vp|chief|c\s*level)', | |
| # Specific year mentions | |
| r'(\d+)\s*years?\s*in\s*the\s*field', | |
| r'(\d+)\s*years?\s*of\s*professional\s*experience', | |
| r'(\d+)\s*years?\s*of\s*relevant\s*experience' | |
| ] | |
| min_years = 0 | |
| max_years = 999 | |
| level = 'any' | |
| # Check for specific year ranges | |
| for pattern in experience_patterns[:3]: # First 3 patterns are for ranges | |
| matches = re.findall(pattern, requirements_text) | |
| if matches: | |
| try: | |
| min_years = int(matches[0][0]) | |
| max_years = int(matches[0][1]) | |
| break | |
| except (ValueError, IndexError): | |
| continue | |
| # Check for minimum years if no range found | |
| if min_years == 0: | |
| for pattern in experience_patterns[3:6]: # Minimum year patterns | |
| matches = re.findall(pattern, requirements_text) | |
| if matches: | |
| try: | |
| min_years = int(matches[0]) | |
| break | |
| except (ValueError, IndexError): | |
| continue | |
| # Check for level-based requirements | |
| for pattern in experience_patterns[6:10]: # Level patterns | |
| matches = re.findall(pattern, requirements_text) | |
| if matches: | |
| level_match = matches[0].lower() | |
| if 'entry' in level_match or 'junior' in level_match or 'associate' in level_match: | |
| level = 'entry' | |
| if min_years == 0: | |
| min_years = 0 | |
| max_years = 2 | |
| elif 'mid' in level_match or 'intermediate' in level_match: | |
| level = 'mid' | |
| if min_years == 0: | |
| min_years = 2 | |
| max_years = 5 | |
| elif 'senior' in level_match or 'lead' in level_match or 'principal' in level_match or 'staff' in level_match: | |
| level = 'senior' | |
| if min_years == 0: | |
| min_years = 5 | |
| max_years = 10 | |
| elif 'executive' in level_match or 'director' in level_match or 'vp' in level_match or 'chief' in level_match: | |
| level = 'executive' | |
| if min_years == 0: | |
| min_years = 10 | |
| max_years = 999 | |
| break | |
| # Check for specific year mentions if still no match | |
| if min_years == 0: | |
| for pattern in experience_patterns[10:]: # Specific year mention patterns | |
| matches = re.findall(pattern, requirements_text) | |
| if matches: | |
| try: | |
| min_years = int(matches[0]) | |
| max_years = min_years + 2 # Add buffer | |
| break | |
| except (ValueError, IndexError): | |
| continue | |
| return { | |
| 'min_years': min_years, | |
| 'max_years': max_years, | |
| 'level': level | |
| } | |
| def filter_jobs_by_experience(jobs_df: pd.DataFrame, candidate_yoe: str) -> pd.DataFrame: | |
| """ | |
| Filter jobs by experience level matching the candidate's years of experience | |
| """ | |
| if not candidate_yoe or candidate_yoe.lower() in ['unknown', 'n/a', '']: | |
| logger.info(f"No experience info provided, returning all {len(jobs_df)} jobs") | |
| return jobs_df | |
| # Extract numeric years from candidate experience | |
| try: | |
| # Handle various formats like "5 years", "5+ years", "5-7 years", etc. | |
| yoe_match = re.search(r'(\d+(?:\.\d+)?)', str(candidate_yoe)) | |
| if yoe_match: | |
| candidate_years = float(yoe_match.group(1)) | |
| else: | |
| logger.warning(f"Could not extract years from: {candidate_yoe}") | |
| return jobs_df | |
| except (ValueError, TypeError): | |
| logger.error(f"Invalid experience format: {candidate_yoe}") | |
| return jobs_df | |
| logger.info(f"Filtering jobs for candidate with {candidate_years} years of experience") | |
| # Filter jobs by experience requirements | |
| matching_jobs = [] | |
| for _, job_row in jobs_df.iterrows(): | |
| requirements_text = str(job_row.get('requirements', '')) | |
| experience_req = extract_experience_requirement(requirements_text) | |
| # Check if candidate's experience matches the job requirements | |
| if (candidate_years >= experience_req['min_years'] and | |
| candidate_years <= experience_req['max_years']): | |
| matching_jobs.append(job_row) | |
| result_df = pd.DataFrame(matching_jobs) if matching_jobs else jobs_df | |
| logger.info(f"Found {len(matching_jobs)} jobs matching experience out of {len(jobs_df)} total jobs") | |
| return result_df | |
| def filter_jobs_by_priority(jobs_df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Filter jobs to only include high priority jobs | |
| """ | |
| if jobs_df.empty: | |
| logger.info("No jobs to filter by priority") | |
| return jobs_df | |
| # Filter jobs by priority - only include high priority jobs | |
| priority_filtered_jobs = jobs_df[jobs_df['priority'].str.lower() == 'high'] | |
| logger.info(f"Found {len(priority_filtered_jobs)} high priority jobs out of {len(jobs_df)} total jobs") | |
| return priority_filtered_jobs | |
| def create_job_description(job_row: pd.Series) -> str: | |
| """ | |
| Create a comprehensive job description from job data | |
| """ | |
| description_parts = [] | |
| if pd.notna(job_row.get('company_blurb')): | |
| description_parts.append(f"Company: {job_row['company_blurb']}") | |
| if pd.notna(job_row.get('company_culture')): | |
| description_parts.append(f"Company Culture: {job_row['company_culture']}") | |
| if pd.notna(job_row.get('description')): | |
| description_parts.append(f"Description: {job_row['description']}") | |
| if pd.notna(job_row.get('requirements')): | |
| description_parts.append(f"Requirements: {job_row['requirements']}") | |
| if pd.notna(job_row.get('role_responsibilities')): | |
| description_parts.append(f"Role Responsibilities: {job_row['role_responsibilities']}") | |
| if pd.notna(job_row.get('job_location')): | |
| description_parts.append(f"Location: {job_row['job_location']}") | |
| return "\n\n".join(description_parts) | |
| def create_jd_smart_hiring(job_row: pd.Series) -> str: | |
| """ | |
| Create a smart hiring job description from job data | |
| """ | |
| description_parts = [] | |
| if pd.notna(job_row.get('description')): | |
| description_parts.append(f"Description: {job_row['description']}") | |
| if pd.notna(job_row.get('requirements')): | |
| description_parts.append(f"Requirements: {job_row['requirements']}") | |
| return "\n\n".join(description_parts) | |
| def clean_analysis_result(analysis_result: dict) -> dict: | |
| """ | |
| Clean up the analysis result to only include final_score and summary | |
| """ | |
| if not isinstance(analysis_result, dict): | |
| return analysis_result | |
| # Remove user_context if present | |
| if 'user_context' in analysis_result: | |
| del analysis_result['user_context'] | |
| # Clean up final_response if present | |
| if 'final_response' in analysis_result: | |
| try: | |
| # Handle both string and dict formats | |
| if isinstance(analysis_result['final_response'], str): | |
| final_response = json.loads(analysis_result['final_response']) | |
| else: | |
| final_response = analysis_result['final_response'] | |
| # Extract and format the evaluation data | |
| if 'evaluation' in final_response and len(final_response['evaluation']) > 0: | |
| evaluation = final_response['evaluation'][0] | |
| # Create a minimal structure with only final_score and summary | |
| cleaned_response = { | |
| 'final_score': evaluation.get('final_score', 0), | |
| 'summary': {} | |
| } | |
| # Extract summary information | |
| if 'summary' in evaluation and len(evaluation['summary']) > 0: | |
| summary = evaluation['summary'][0] | |
| cleaned_response['summary'] = { | |
| 'strengths': summary.get('strengths', []), | |
| 'weaknesses': summary.get('weaknesses', []), | |
| 'opportunities': summary.get('opportunities', []), | |
| 'recommendations': summary.get('recommendations', []) | |
| } | |
| analysis_result['final_response'] = cleaned_response | |
| except (json.JSONDecodeError, KeyError, IndexError) as e: | |
| logger.error(f"Error cleaning analysis result: {e}") | |
| # Keep original if cleaning fails | |
| pass | |
| return analysis_result | |
| def sort_jobs_by_score(job_analyses: list) -> list: | |
| """ | |
| Sort jobs by final_score in descending order (highest scores first) | |
| """ | |
| def extract_score(job_analysis): | |
| try: | |
| analysis = job_analysis.get('analysis', {}) | |
| if 'final_response' in analysis and isinstance(analysis['final_response'], dict): | |
| return analysis['final_response'].get('final_score', 0) | |
| return 0 | |
| except: | |
| return 0 | |
| return sorted(job_analyses, key=extract_score, reverse=True) | |
| async def analyze_job_fit_with_retry(job_description: str, resume_file_path: str, job_row: pd.Series = None, max_retries: int = 3) -> dict: | |
| """ | |
| Analyze job-candidate fit with retry logic for resilience | |
| """ | |
| for attempt in range(max_retries): | |
| try: | |
| result = analyze_job_fit(job_description, resume_file_path, job_row) | |
| if "error" not in result: | |
| return result | |
| # If authentication error and not last attempt, retry | |
| if "Authentication failed" in result.get("error", "") and attempt < max_retries - 1: | |
| logger.warning(f"Authentication failed, retrying... (attempt {attempt + 1}/{max_retries})") | |
| global access_token | |
| access_token = None # Reset token to force refresh | |
| await asyncio.sleep(2 ** attempt) # Exponential backoff | |
| continue | |
| # If timeout error and not last attempt, retry with longer timeout | |
| if "timed out" in result.get("error", "").lower() and attempt < max_retries - 1: | |
| logger.warning(f"Request timed out, retrying with longer timeout... (attempt {attempt + 1}/{max_retries})") | |
| await asyncio.sleep(2 ** attempt) # Exponential backoff | |
| continue | |
| return result | |
| except Exception as e: | |
| logger.error(f"Attempt {attempt + 1}/{max_retries} failed: {str(e)}") | |
| if attempt == max_retries - 1: | |
| return {"error": f"Failed after {max_retries} attempts: {str(e)}"} | |
| await asyncio.sleep(2 ** attempt) | |
| def analyze_job_fit(job_description: str, resume_file_path: str, job_row: pd.Series = None) -> dict: | |
| """ | |
| Analyze job-candidate fit using the external API | |
| """ | |
| url = str(os.getenv("analyze_url")) | |
| # Check if resume file exists | |
| if not os.path.exists(resume_file_path): | |
| logger.error(f"Resume file not found: {resume_file_path}") | |
| return {"error": f"Resume file not found: {resume_file_path}"} | |
| # Prepare headers with authentication | |
| headers = { | |
| 'accept': 'application/json', | |
| 'Authorization': f'Bearer {get_access_token()}' | |
| } | |
| # Prepare form data | |
| files = { | |
| 'resume': (os.path.basename(resume_file_path), open(resume_file_path, 'rb'), 'application/pdf') | |
| } | |
| data = { | |
| 'jd_text': job_description | |
| } | |
| # Generate collateral if job_row is provided | |
| if job_row is not None: | |
| try: | |
| job_description_text = create_jd_smart_hiring(job_row) | |
| if job_description_text: | |
| collateral, job_id = generate_smart_hiring_collateral(job_description_text) | |
| if collateral: | |
| data['collateral'] = collateral | |
| data['job_id'] = job_id | |
| logger.info(f"Added collateral and job_id ({job_id}) to job fit analysis request") | |
| elif job_id: | |
| # Even if collateral is empty, we can still use the job_id | |
| data['job_id'] = job_id | |
| logger.info(f"Added job_id ({job_id}) to job fit analysis request (no collateral)") | |
| except Exception as e: | |
| logger.warning(f"Failed to generate collateral: {e}") | |
| # Continue without collateral if generation fails | |
| try: | |
| # Make the API request with configured timeout | |
| response = requests.post(url, headers=headers, files=files, data=data, timeout=None) | |
| # If we get an authentication error, try to get a fresh token and retry once | |
| if response.status_code == 401: | |
| logger.warning("Authentication failed, getting fresh token...") | |
| global access_token | |
| access_token = None # Reset the token | |
| new_token = get_access_token() | |
| if new_token: | |
| headers['Authorization'] = f'Bearer {new_token}' | |
| # Close the previous file and reopen | |
| files['resume'][1].close() | |
| files['resume'] = (os.path.basename(resume_file_path), open(resume_file_path, 'rb'), 'application/pdf') | |
| response = requests.post(url, headers=headers, files=files, data=data, timeout=None) | |
| else: | |
| # If we can't get a fresh token, return error | |
| return {"error": "Authentication failed and could not obtain fresh token"} | |
| if response.status_code == 200: | |
| logger.info("Job fit analysis completed successfully") | |
| return response.json() | |
| elif response.status_code == 401: | |
| # If we still get 401 after fresh token, return error | |
| return {"error": "Authentication failed even with fresh token"} | |
| else: | |
| logger.error(f"API call failed with status {response.status_code}") | |
| return {"error": f"API call failed with status {response.status_code}", "details": response.text} | |
| except requests.exceptions.Timeout: | |
| logger.error(f"API request timed out after {EXTERNAL_API_TIMEOUT} seconds") | |
| return {"error": f"API request timed out after {EXTERNAL_API_TIMEOUT} seconds"} | |
| except Exception as e: | |
| logger.error(f"Exception occurred: {str(e)}") | |
| return {"error": f"Exception occurred: {str(e)}"} | |
| finally: | |
| # Ensure the file is closed | |
| if 'resume' in files: | |
| try: | |
| files['resume'][1].close() | |
| except: | |
| pass | |
| async def process_resume_and_recommend_jobs( | |
| resume: UploadFile = File(...), | |
| resume_text: str = Form(""), | |
| api_key: str = Depends(verify_api_key) | |
| ): | |
| """ | |
| Process resume, extract information, filter jobs by industry, and analyze fit | |
| """ | |
| request_start_time = time.time() | |
| try: | |
| logger.info(f"Processing resume: {resume.filename}") | |
| # Save uploaded file temporarily | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
| shutil.copyfileobj(resume.file, tmp_file) | |
| tmp_file_path = tmp_file.name | |
| try: | |
| # Extract text from PDF if no resume_text provided | |
| if not resume_text: | |
| resume_text = extract_text_from_pdf(tmp_file_path) | |
| if not resume_text: | |
| logger.error("Could not extract text from PDF file") | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Could not extract text from PDF file"} | |
| ) | |
| # Extract resume information using LLM | |
| resume_info = extract_resume_info(resume_text) | |
| # Load jobs data from PostgreSQL database | |
| try: | |
| jobs_df = pd.read_sql_table("jobs", con=engine) | |
| candidates_df = pd.read_sql_table("candidates", con=engine) | |
| submissions_df = pd.read_sql_table("candidate_submissions", con=engine) | |
| logger.info(f"Loaded {len(jobs_df)} jobs, {len(candidates_df)} candidates, {len(submissions_df)} submissions") | |
| except Exception as db_error: | |
| logger.error(f"Database error: {db_error}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": "Database connection error"} | |
| ) | |
| # Filter jobs by industry | |
| filtered_jobs = filter_jobs_by_industry(jobs_df, resume_info['industry']) | |
| if filtered_jobs.empty: | |
| logger.warning(f"No jobs found for industry: {resume_info['industry']}") | |
| return JSONResponse( | |
| status_code=404, | |
| content={"message": f"No jobs found for industry: {resume_info['industry']}"} | |
| ) | |
| # Filter jobs by location | |
| location_filtered_jobs = filter_jobs_by_location(filtered_jobs, resume_info['location']) | |
| # Filter jobs by experience level | |
| experience_filtered_jobs = filter_jobs_by_experience(location_filtered_jobs, resume_info['yoe']) | |
| # Filter jobs by priority | |
| priority_filtered_jobs = filter_jobs_by_priority(experience_filtered_jobs) | |
| # Use priority filtered jobs if available, otherwise fall back to experience filtered jobs, then location filtered jobs | |
| if not priority_filtered_jobs.empty: | |
| jobs_to_analyze = priority_filtered_jobs | |
| elif not experience_filtered_jobs.empty: | |
| jobs_to_analyze = experience_filtered_jobs | |
| else: | |
| jobs_to_analyze = location_filtered_jobs | |
| # Create filtered_submission_df with job_ids from jobs_to_analyze | |
| job_ids_to_analyze = jobs_to_analyze['id'].tolist() | |
| filtered_submission_df = submissions_df[submissions_df['jobId'].isin(job_ids_to_analyze)] | |
| # Check if candidate email exists in candidates_df | |
| candidate_id = None | |
| if resume_info.get('email'): | |
| candidate_match = candidates_df[candidates_df['email'] == resume_info['email']] | |
| if not candidate_match.empty: | |
| candidate_id = candidate_match.iloc[0]['id'] | |
| logger.info(f"Found existing candidate with ID: {candidate_id}") | |
| # Analyze job fit for each filtered job | |
| job_analyses = [] | |
| # Use configured number of jobs to analyze | |
| for _, job_row in jobs_to_analyze.head(MAX_JOBS_TO_ANALYZE).iterrows(): | |
| job_id = job_row.get('id') | |
| # Check if we have an existing submission for this candidate and job | |
| existing_submission = None | |
| if candidate_id and job_id: | |
| submission_match = filtered_submission_df[ | |
| (filtered_submission_df['candidate_id'] == candidate_id) & | |
| (filtered_submission_df['jobId'] == job_id) | |
| ] | |
| if not submission_match.empty: | |
| existing_submission = submission_match.iloc[0] | |
| logger.info(f"Found existing submission for job_id: {job_id}, candidate_id: {candidate_id}") | |
| if existing_submission is not None: | |
| # Use existing fit score from submission | |
| fit_score = existing_submission.get('fit_score', 0) | |
| existing_analysis = { | |
| 'final_response': { | |
| 'final_score': fit_score, | |
| 'summary': { | |
| 'strengths': [], | |
| 'weaknesses': [], | |
| 'opportunities': [], | |
| 'recommendations': [] | |
| } | |
| }, | |
| 'source': 'existing_submission' | |
| } | |
| analysis_result = existing_analysis | |
| else: | |
| # Call API for new analysis with retry logic | |
| job_description = create_job_description(job_row) | |
| analysis_result = await analyze_job_fit_with_retry(job_description, tmp_file_path, job_row) | |
| analysis_result['source'] = 'api_call' | |
| # Clean up the analysis result | |
| cleaned_analysis = clean_analysis_result(analysis_result) | |
| job_analysis = JobAnalysis( | |
| job_title=job_row.get('job_title', 'Unknown'), | |
| company_name=job_row.get('company_name', 'Unknown'), | |
| analysis=cleaned_analysis | |
| ) | |
| job_analyses.append(job_analysis.dict()) | |
| # Sort jobs by final_score in descending order (highest scores first) | |
| job_analyses = sort_jobs_by_score(job_analyses) | |
| # Count existing submissions vs API calls | |
| existing_submissions_count = sum(1 for analysis in job_analyses if analysis.get('analysis', {}).get('source') == 'existing_submission') | |
| api_calls_count = sum(1 for analysis in job_analyses if analysis.get('analysis', {}).get('source') == 'api_call') | |
| # Clean up temporary file | |
| os.unlink(tmp_file_path) | |
| # Calculate processing time | |
| processing_time = time.time() - request_start_time | |
| logger.info(f"Request completed in {processing_time:.2f} seconds") | |
| return { | |
| "resume_info": resume_info, | |
| "industry": resume_info['industry'], | |
| "location": resume_info['location'], | |
| "experience_years": resume_info['yoe'], | |
| "jobs_analyzed": len(job_analyses), | |
| "location_filtered": not location_filtered_jobs.empty, | |
| "experience_filtered": not experience_filtered_jobs.empty, | |
| "priority_filtered": not priority_filtered_jobs.empty, | |
| "existing_submissions_used": existing_submissions_count, | |
| "api_calls_made": api_calls_count, | |
| "candidate_found": candidate_id is not None, | |
| "processing_time_seconds": round(processing_time, 2), | |
| "job_analyses": job_analyses | |
| } | |
| except Exception as e: | |
| # Clean up temporary file in case of error | |
| if os.path.exists(tmp_file_path): | |
| os.unlink(tmp_file_path) | |
| raise e | |
| except Exception as e: | |
| logger.error(f"Processing failed: {str(e)}", exc_info=True) | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": f"Processing failed: {str(e)}"} | |
| ) | |
| async def health_check(api_key: str = Depends(verify_api_key)): | |
| """ | |
| Health check endpoint with database connectivity check | |
| """ | |
| health_status = { | |
| "status": "healthy", | |
| "message": "Job Recommendation API is running", | |
| "timestamp": time.time() | |
| } | |
| # Check database connectivity | |
| try: | |
| with engine.connect() as conn: | |
| result = conn.execute(text("SELECT 1")) | |
| health_status["database"] = "connected" | |
| except Exception as e: | |
| logger.error(f"Database health check failed: {e}") | |
| health_status["database"] = "disconnected" | |
| health_status["status"] = "degraded" | |
| return health_status | |
| async def root(): | |
| """ | |
| Root endpoint | |
| """ | |
| return { | |
| "message": "Job Recommendation API", | |
| "version": "1.0.0", | |
| "docs": "/docs", | |
| "health": "/health" | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", 8080)) | |
| logger.info(f"Starting server on port {port}") | |
| uvicorn.run(app, host="0.0.0.0", port=port) |