|  | import logging | 
					
						
						|  | import json | 
					
						
						|  | import os | 
					
						
						|  | from typing import List, Dict, Any, Optional | 
					
						
						|  | from pathlib import Path | 
					
						
						|  | import pickle | 
					
						
						|  | from datetime import datetime | 
					
						
						|  | import asyncio | 
					
						
						|  |  | 
					
						
						|  | from core.models import Document, DocumentType | 
					
						
						|  | import config | 
					
						
						|  |  | 
					
						
						|  | logger = logging.getLogger(__name__) | 
					
						
						|  |  | 
					
						
						|  | class DocumentStoreService: | 
					
						
						|  | def __init__(self): | 
					
						
						|  | self.config = config.config | 
					
						
						|  | self.store_path = Path(self.config.DOCUMENT_STORE_PATH) | 
					
						
						|  | self.store_path.mkdir(parents=True, exist_ok=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self.metadata_path = self.store_path / "metadata" | 
					
						
						|  | self.content_path = self.store_path / "content" | 
					
						
						|  |  | 
					
						
						|  | self.metadata_path.mkdir(exist_ok=True) | 
					
						
						|  | self.content_path.mkdir(exist_ok=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self._cache = {} | 
					
						
						|  | self._cache_size_limit = 100 | 
					
						
						|  |  | 
					
						
						|  | async def store_document(self, document: Document) -> bool: | 
					
						
						|  | """Store a document and its metadata""" | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | metadata_file = self.metadata_path / f"{document.id}.json" | 
					
						
						|  | metadata = { | 
					
						
						|  | "id": document.id, | 
					
						
						|  | "filename": document.filename, | 
					
						
						|  | "doc_type": document.doc_type.value, | 
					
						
						|  | "file_size": document.file_size, | 
					
						
						|  | "created_at": document.created_at.isoformat(), | 
					
						
						|  | "metadata": document.metadata, | 
					
						
						|  | "tags": document.tags, | 
					
						
						|  | "summary": document.summary, | 
					
						
						|  | "category": document.category, | 
					
						
						|  | "language": document.language, | 
					
						
						|  | "content_length": len(document.content) | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | with open(metadata_file, 'w', encoding='utf-8') as f: | 
					
						
						|  | json.dump(metadata, f, indent=2, ensure_ascii=False) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | content_file = self.content_path / f"{document.id}.txt" | 
					
						
						|  | with open(content_file, 'w', encoding='utf-8') as f: | 
					
						
						|  | f.write(document.content) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self._add_to_cache(document.id, document) | 
					
						
						|  |  | 
					
						
						|  | logger.info(f"Stored document {document.id} ({document.filename})") | 
					
						
						|  | return True | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Error storing document {document.id}: {str(e)}") | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  | async def get_document(self, document_id: str) -> Optional[Document]: | 
					
						
						|  | """Retrieve a document by ID""" | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | if document_id in self._cache: | 
					
						
						|  | return self._cache[document_id] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | metadata_file = self.metadata_path / f"{document_id}.json" | 
					
						
						|  | content_file = self.content_path / f"{document_id}.txt" | 
					
						
						|  |  | 
					
						
						|  | if not metadata_file.exists() or not content_file.exists(): | 
					
						
						|  | return None | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with open(metadata_file, 'r', encoding='utf-8') as f: | 
					
						
						|  | metadata = json.load(f) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with open(content_file, 'r', encoding='utf-8') as f: | 
					
						
						|  | content = f.read() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | document = Document( | 
					
						
						|  | id=metadata["id"], | 
					
						
						|  | filename=metadata["filename"], | 
					
						
						|  | content=content, | 
					
						
						|  | doc_type=DocumentType(metadata["doc_type"]), | 
					
						
						|  | file_size=metadata["file_size"], | 
					
						
						|  | created_at=datetime.fromisoformat(metadata["created_at"]), | 
					
						
						|  | metadata=metadata.get("metadata", {}), | 
					
						
						|  | tags=metadata.get("tags", []), | 
					
						
						|  | summary=metadata.get("summary"), | 
					
						
						|  | category=metadata.get("category"), | 
					
						
						|  | language=metadata.get("language") | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self._add_to_cache(document_id, document) | 
					
						
						|  |  | 
					
						
						|  | return document | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Error retrieving document {document_id}: {str(e)}") | 
					
						
						|  | return None | 
					
						
						|  |  | 
					
						
						|  | async def list_documents(self, limit: int = 50, offset: int = 0, | 
					
						
						|  | filters: Optional[Dict[str, Any]] = None) -> List[Document]: | 
					
						
						|  | """List documents with pagination and filtering""" | 
					
						
						|  | try: | 
					
						
						|  | documents = [] | 
					
						
						|  | metadata_files = list(self.metadata_path.glob("*.json")) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | metadata_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | start_idx = offset | 
					
						
						|  | end_idx = offset + limit | 
					
						
						|  |  | 
					
						
						|  | for metadata_file in metadata_files[start_idx:end_idx]: | 
					
						
						|  | try: | 
					
						
						|  | with open(metadata_file, 'r', encoding='utf-8') as f: | 
					
						
						|  | metadata = json.load(f) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if filters and not self._apply_filters(metadata, filters): | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | content_file = self.content_path / f"{metadata['id']}.txt" | 
					
						
						|  | if content_file.exists(): | 
					
						
						|  | with open(content_file, 'r', encoding='utf-8') as f: | 
					
						
						|  | content = f.read() | 
					
						
						|  | else: | 
					
						
						|  | content = "" | 
					
						
						|  |  | 
					
						
						|  | document = Document( | 
					
						
						|  | id=metadata["id"], | 
					
						
						|  | filename=metadata["filename"], | 
					
						
						|  | content=content, | 
					
						
						|  | doc_type=DocumentType(metadata["doc_type"]), | 
					
						
						|  | file_size=metadata["file_size"], | 
					
						
						|  | created_at=datetime.fromisoformat(metadata["created_at"]), | 
					
						
						|  | metadata=metadata.get("metadata", {}), | 
					
						
						|  | tags=metadata.get("tags", []), | 
					
						
						|  | summary=metadata.get("summary"), | 
					
						
						|  | category=metadata.get("category"), | 
					
						
						|  | language=metadata.get("language") | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | documents.append(document) | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.warning(f"Error loading document metadata from {metadata_file}: {str(e)}") | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  | return documents | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Error listing documents: {str(e)}") | 
					
						
						|  | return [] | 
					
						
						|  |  | 
					
						
						|  | def _apply_filters(self, metadata: Dict[str, Any], filters: Dict[str, Any]) -> bool: | 
					
						
						|  | """Apply filters to document metadata""" | 
					
						
						|  | try: | 
					
						
						|  | for key, value in filters.items(): | 
					
						
						|  | if key == "doc_type": | 
					
						
						|  | if metadata.get("doc_type") != value: | 
					
						
						|  | return False | 
					
						
						|  | elif key == "filename_contains": | 
					
						
						|  | if value.lower() not in metadata.get("filename", "").lower(): | 
					
						
						|  | return False | 
					
						
						|  | elif key == "created_after": | 
					
						
						|  | doc_date = datetime.fromisoformat(metadata.get("created_at", "")) | 
					
						
						|  | if doc_date < value: | 
					
						
						|  | return False | 
					
						
						|  | elif key == "created_before": | 
					
						
						|  | doc_date = datetime.fromisoformat(metadata.get("created_at", "")) | 
					
						
						|  | if doc_date > value: | 
					
						
						|  | return False | 
					
						
						|  | elif key == "tags": | 
					
						
						|  | doc_tags = set(metadata.get("tags", [])) | 
					
						
						|  | required_tags = set(value) if isinstance(value, list) else {value} | 
					
						
						|  | if not required_tags.intersection(doc_tags): | 
					
						
						|  | return False | 
					
						
						|  | elif key == "category": | 
					
						
						|  | if metadata.get("category") != value: | 
					
						
						|  | return False | 
					
						
						|  | elif key == "language": | 
					
						
						|  | if metadata.get("language") != value: | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  | return True | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Error applying filters: {str(e)}") | 
					
						
						|  | return True | 
					
						
						|  |  | 
					
						
						|  | async def update_document_metadata(self, document_id: str, updates: Dict[str, Any]) -> bool: | 
					
						
						|  | """Update document metadata""" | 
					
						
						|  | try: | 
					
						
						|  | metadata_file = self.metadata_path / f"{document_id}.json" | 
					
						
						|  |  | 
					
						
						|  | if not metadata_file.exists(): | 
					
						
						|  | logger.warning(f"Document {document_id} not found") | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with open(metadata_file, 'r', encoding='utf-8') as f: | 
					
						
						|  | metadata = json.load(f) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for key, value in updates.items(): | 
					
						
						|  | if key in ["tags", "summary", "category", "language", "metadata"]: | 
					
						
						|  | metadata[key] = value | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with open(metadata_file, 'w', encoding='utf-8') as f: | 
					
						
						|  | json.dump(metadata, f, indent=2, ensure_ascii=False) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if document_id in self._cache: | 
					
						
						|  | document = self._cache[document_id] | 
					
						
						|  | for key, value in updates.items(): | 
					
						
						|  | if hasattr(document, key): | 
					
						
						|  | setattr(document, key, value) | 
					
						
						|  |  | 
					
						
						|  | logger.info(f"Updated metadata for document {document_id}") | 
					
						
						|  | return True | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Error updating document metadata: {str(e)}") | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  | async def delete_document(self, document_id: str) -> bool: | 
					
						
						|  | """Delete a document and its metadata""" | 
					
						
						|  | try: | 
					
						
						|  | metadata_file = self.metadata_path / f"{document_id}.json" | 
					
						
						|  | content_file = self.content_path / f"{document_id}.txt" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if metadata_file.exists(): | 
					
						
						|  | metadata_file.unlink() | 
					
						
						|  | if content_file.exists(): | 
					
						
						|  | content_file.unlink() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if document_id in self._cache: | 
					
						
						|  | del self._cache[document_id] | 
					
						
						|  |  | 
					
						
						|  | logger.info(f"Deleted document {document_id}") | 
					
						
						|  | return True | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Error deleting document {document_id}: {str(e)}") | 
					
						
						|  | return False | 
					
						
						|  |  | 
					
						
						|  | async def search_documents(self, query: str, fields: List[str] = None) -> List[Document]: | 
					
						
						|  | """Simple text search across documents""" | 
					
						
						|  | if not fields: | 
					
						
						|  | fields = ["filename", "content", "tags", "summary"] | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | matching_documents = [] | 
					
						
						|  | query_lower = query.lower() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | all_documents = await self.list_documents(limit=1000) | 
					
						
						|  |  | 
					
						
						|  | for document in all_documents: | 
					
						
						|  | match_found = False | 
					
						
						|  |  | 
					
						
						|  | for field in fields: | 
					
						
						|  | field_value = getattr(document, field, "") | 
					
						
						|  | if isinstance(field_value, list): | 
					
						
						|  | field_value = " ".join(field_value) | 
					
						
						|  | elif field_value is None: | 
					
						
						|  | field_value = "" | 
					
						
						|  |  | 
					
						
						|  | if query_lower in str(field_value).lower(): | 
					
						
						|  | match_found = True | 
					
						
						|  | break | 
					
						
						|  |  | 
					
						
						|  | if match_found: | 
					
						
						|  | matching_documents.append(document) | 
					
						
						|  |  | 
					
						
						|  | logger.info(f"Found {len(matching_documents)} documents matching '{query}'") | 
					
						
						|  | return matching_documents | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Error searching documents: {str(e)}") | 
					
						
						|  | return [] | 
					
						
						|  |  | 
					
						
						|  | def _add_to_cache(self, document_id: str, document: Document): | 
					
						
						|  | """Add document to cache with size limit""" | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | if len(self._cache) >= self._cache_size_limit: | 
					
						
						|  |  | 
					
						
						|  | oldest_key = next(iter(self._cache)) | 
					
						
						|  | del self._cache[oldest_key] | 
					
						
						|  |  | 
					
						
						|  | self._cache[document_id] = document | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Error adding to cache: {str(e)}") | 
					
						
						|  |  | 
					
						
						|  | async def get_stats(self) -> Dict[str, Any]: | 
					
						
						|  | """Get statistics about the document store""" | 
					
						
						|  | try: | 
					
						
						|  | metadata_files = list(self.metadata_path.glob("*.json")) | 
					
						
						|  | content_files = list(self.content_path.glob("*.txt")) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | total_size = 0 | 
					
						
						|  | for file_path in metadata_files + content_files: | 
					
						
						|  | total_size += file_path.stat().st_size | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | type_counts = {} | 
					
						
						|  | for metadata_file in metadata_files: | 
					
						
						|  | try: | 
					
						
						|  | with open(metadata_file, 'r') as f: | 
					
						
						|  | metadata = json.load(f) | 
					
						
						|  | doc_type = metadata.get("doc_type", "unknown") | 
					
						
						|  | type_counts[doc_type] = type_counts.get(doc_type, 0) + 1 | 
					
						
						|  | except: | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  | return { | 
					
						
						|  | "total_documents": len(metadata_files), | 
					
						
						|  | "total_size_bytes": total_size, | 
					
						
						|  | "total_size_mb": round(total_size / (1024 * 1024), 2), | 
					
						
						|  | "cache_size": len(self._cache), | 
					
						
						|  | "document_types": type_counts, | 
					
						
						|  | "storage_path": str(self.store_path), | 
					
						
						|  | "metadata_files": len(metadata_files), | 
					
						
						|  | "content_files": len(content_files) | 
					
						
						|  | } | 
					
						
						|  | except Exception as e: | 
					
						
						|  | logger.error(f"Error getting document store stats: {str(e)}") | 
					
						
						|  | return {"error": str(e)} |