Spaces:
Sleeping
Sleeping
| import re | |
| from typing import List, Dict, Optional | |
| from pathlib import Path | |
| from collections import defaultdict | |
| from dataclasses import dataclass | |
| from docx import Document | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| class DocumentChunk: | |
| chunk_id: int | |
| text: str | |
| embedding: List[float] | |
| metadata: Dict | |
| class DocumentChunker: | |
| def __init__(self): | |
| self.embed_model = SentenceTransformer("all-MiniLM-L6-v2") | |
| self.category_patterns = { | |
| "Project Summary": [r"\bsummary\b", r"\bproject overview\b"], | |
| "Contact Information": [r"\bcontact\b", r"\bemail\b", r"\bphone\b", r"\baddress\b"], | |
| "Problem/ Need": [r"\bproblem\b", r"\bneed\b", r"\bchallenge\b"], | |
| "Mission Statement": [r"\bmission\b", r"\bvision\b"], | |
| "Fit or Alignment to Grant": [r"\balignment\b", r"\bfit\b", r"\bgrant (focus|priority)\b"], | |
| "Goals/ Vision / Objectives": [r"\bgoals?\b", r"\bobjectives?\b", r"\bvision\b"], | |
| "Our Solution *PROGRAMS* and Approach": [r"\bsolution\b", r"\bprogram\b", r"\bapproach\b"], | |
| "Impact, Results, or Outcomes": [r"\bimpact\b", r"\bresults?\b", r"\boutcomes?\b"], | |
| "Beneficiaries": [r"\bbeneficiaries\b", r"\bwho we serve\b", r"\btarget audience\b"], | |
| "Differentiation with Competitors": [r"\bcompetitor\b", r"\bdifferent\b", r"\bvalue proposition\b"], | |
| "Plan and Timeline": [r"\btimeline\b", r"\bschedule\b", r"\bmilestone\b"], | |
| "Budget and Funding": [r"\bbudget\b", r"\bfunding\b", r"\bcost\b"], | |
| "Sustainability and Strategy": [r"\bsustainability\b", r"\bexit strategy\b"], | |
| "Organization's History": [r"\bhistory\b", r"\borganization background\b"], | |
| "Team Member Descriptions": [r"\bteam\b", r"\bstaff\b", r"\blived experience\b"], | |
| } | |
| self.patterns = { | |
| 'grant_application': { | |
| 'header_patterns': [ | |
| r'\*\*([^*]+)\*\*', | |
| r'^([A-Z][^a-z]*[A-Z])$', | |
| r'^([A-Z][A-Za-z\s]+)$', | |
| ], | |
| 'question_patterns': [ | |
| r'^.+\?$', | |
| r'^\*?Please .+', | |
| r'^How .+', | |
| r'^What .+', | |
| r'^Describe .+', | |
| ] | |
| } | |
| } | |
| def match_category(self, text: str, return_first: bool = True) -> Optional[str] or List[str]: | |
| lower_text = text.lower() | |
| match_scores = defaultdict(int) | |
| for category, patterns in self.category_patterns.items(): | |
| for pattern in patterns: | |
| matches = re.findall(pattern, lower_text) | |
| match_scores[category] += len(matches) | |
| if not match_scores: | |
| return None if return_first else [] | |
| sorted_categories = sorted(match_scores.items(), key=lambda x: -x[1]) | |
| return sorted_categories[0][0] if return_first else [cat for cat, _ in sorted_categories if match_scores[cat] > 0] | |
| def extract_text_from_docx(self, file_path: str) -> str: | |
| doc = Document(file_path) | |
| return '\n'.join([f"**{p.text}**" if any(r.bold for r in p.runs) else p.text for p in doc.paragraphs]) | |
| def detect_document_type(self, text: str) -> str: | |
| keywords = ['grant', 'funding', 'mission'] | |
| return 'grant_application' if sum(k in text.lower() for k in keywords) >= 2 else 'generic' | |
| def extract_headers(self, text: str, doc_type: str) -> List[Dict]: | |
| lines = text.split('\n') | |
| headers = [] | |
| patterns = self.patterns.get(doc_type, self.patterns['grant_application']) | |
| for i, line in enumerate(lines): | |
| line = line.strip("* ") | |
| if any(re.match(p, line, re.IGNORECASE) for p in patterns['question_patterns']): | |
| headers.append({'text': line, 'line_number': i, 'pattern_type': 'question'}) | |
| elif any(re.match(p, line) for p in patterns['header_patterns']): | |
| headers.append({'text': line, 'line_number': i, 'pattern_type': 'header'}) | |
| return headers | |
| def chunk_by_headers(self, text: str, headers: List[Dict], max_words=150) -> List[Dict]: | |
| lines = text.split('\n') | |
| chunks = [] | |
| if not headers: | |
| # fallback chunking | |
| words = text.split() | |
| for i in range(0, len(words), max_words): | |
| piece = ' '.join(words[i:i + max_words]) | |
| chunks.append({ | |
| 'chunk_id': len(chunks) + 1, | |
| 'header': '', | |
| 'questions': [], | |
| 'content': piece, | |
| 'pattern_type': 'auto' | |
| }) | |
| return chunks | |
| for i, header in enumerate(headers): | |
| start, end = header['line_number'], headers[i + 1]['line_number'] if i + 1 < len(headers) else len(lines) | |
| content_lines = lines[start + 1:end] | |
| questions = [l.strip() for l in content_lines if l.strip().endswith('?') and len(l.split()) <= 20] | |
| content = ' '.join([l.strip() for l in content_lines if l.strip() and l.strip() not in questions]) | |
| for j in range(0, len(content.split()), max_words): | |
| chunk_text = ' '.join(content.split()[j:j + max_words]) | |
| chunks.append({ | |
| 'chunk_id': len(chunks) + 1, | |
| 'header': header['text'] if header['pattern_type'] == 'header' else '', | |
| 'questions': questions if header['pattern_type'] == 'question' else [], | |
| 'content': chunk_text, | |
| 'pattern_type': header['pattern_type'], | |
| 'split_index': j // max_words | |
| }) | |
| return chunks | |
| def extract_topics_tfidf(self, text: str, max_features: int = 3) -> List[str]: | |
| clean = re.sub(r'[^\w\s]', ' ', text.lower()) | |
| vectorizer = TfidfVectorizer(max_features=max_features * 2, stop_words='english') | |
| tfidf = vectorizer.fit_transform([clean]) | |
| terms = vectorizer.get_feature_names_out() | |
| scores = tfidf.toarray()[0] | |
| top_terms = [term for term, score in sorted(zip(terms, scores), key=lambda x: -x[1]) if score > 0] | |
| return top_terms[:max_features] | |
| def calculate_confidence_score(self, chunk: Dict) -> float: | |
| score = 0.0 | |
| if chunk.get('header'): score += 0.3 | |
| if chunk.get('content') and len(chunk['content'].split()) > 20: score += 0.3 | |
| if chunk.get('questions'): score += 0.2 | |
| return min(score, 1.0) | |
| def process_document(self, file_path: str, title: Optional[str] = None) -> List[Dict]: | |
| file_path = Path(file_path) | |
| text = self.extract_text_from_docx(str(file_path)) if file_path.suffix == ".docx" else file_path.read_text() | |
| doc_type = self.detect_document_type(text) | |
| headers = self.extract_headers(text, doc_type) | |
| raw_chunks = self.chunk_by_headers(text, headers) | |
| final_chunks = [] | |
| for chunk in raw_chunks: | |
| full_text = f"{chunk['header']} {' '.join(chunk['questions'])} {chunk['content']}".strip() | |
| category = self.match_category(full_text, return_first=True) | |
| categories = self.match_category(full_text, return_first=False) | |
| embedding = self.embed_model.encode(full_text).tolist() | |
| topics = self.extract_topics_tfidf(full_text) | |
| confidence = self.calculate_confidence_score(chunk) | |
| final_chunks.append({ | |
| "chunk_id": chunk['chunk_id'], | |
| "text": full_text, | |
| "embedding": embedding, | |
| "metadata": { | |
| **chunk, | |
| "title": title or file_path.name, | |
| "category": category, | |
| "categories": categories, | |
| "topics": topics, | |
| "confidence_score": confidence | |
| } | |
| }) | |
| return final_chunks | |