Spaces:
Sleeping
Sleeping
| """Report metadata management.""" | |
| from typing import Dict, List, Any, Set | |
| from pathlib import Path | |
| def get_report_metadata(chunks: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Extract metadata from chunks. | |
| Args: | |
| chunks: List of chunk dictionaries | |
| Returns: | |
| Dictionary with report metadata | |
| """ | |
| if not chunks: | |
| return {} | |
| sources = set() | |
| filenames = set() | |
| years = set() | |
| for chunk in chunks: | |
| metadata = chunk.get("metadata", {}) | |
| if "source" in metadata: | |
| sources.add(metadata["source"]) | |
| if "filename" in metadata: | |
| filenames.add(metadata["filename"]) | |
| if "year" in metadata: | |
| years.add(metadata["year"]) | |
| return { | |
| "sources": sorted(list(sources)), | |
| "filenames": sorted(list(filenames)), | |
| "years": sorted(list(years)), | |
| "total_chunks": len(chunks) | |
| } | |
| def get_available_sources() -> List[str]: | |
| """ | |
| Get list of available report sources (legacy compatibility). | |
| Returns: | |
| List of source categories | |
| """ | |
| # This would typically come from the original auditqa_old.reports module | |
| # For now, return common categories | |
| return [ | |
| "Consolidated", | |
| "Ministry, Department, Agency and Projects", | |
| "Local Government", | |
| "Value for Money", | |
| "Thematic", | |
| "Hospital", | |
| "Project" | |
| ] | |
| def get_source_subtypes() -> Dict[str, List[str]]: | |
| """ | |
| Get mapping of sources to their subtypes (placeholder). | |
| Returns: | |
| Dictionary mapping sources to subtypes | |
| """ | |
| # This was originally imported from auditqa_old.reports.new_files | |
| # For now, return a placeholder structure | |
| return { | |
| "Consolidated": ["Annual Consolidated OAG 2024", "Annual Consolidated OAG 2023"], | |
| "Local Government": ["District Reports", "Municipal Reports"], | |
| "Ministry, Department, Agency and Projects": ["Ministry Reports", "Agency Reports"], | |
| "Value for Money": ["VFM Reports 2024", "VFM Reports 2023"], | |
| "Thematic": ["Thematic Reports 2024", "Thematic Reports 2023"], | |
| "Hospital": ["Hospital Reports 2024", "Hospital Reports 2023"], | |
| "Project": ["Project Reports 2024", "Project Reports 2023"] | |
| } | |
| def validate_report_filters( | |
| reports: List[str] = None, | |
| sources: str = None, | |
| subtype: List[str] = None, | |
| available_metadata: Dict[str, Any] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Validate report filter parameters. | |
| Args: | |
| reports: List of specific report filenames | |
| sources: Source category | |
| subtype: List of subtypes | |
| available_metadata: Available metadata for validation | |
| Returns: | |
| Dictionary with validation results | |
| """ | |
| validation_result = { | |
| "valid": True, | |
| "warnings": [], | |
| "errors": [] | |
| } | |
| if not available_metadata: | |
| validation_result["warnings"].append("No metadata available for validation") | |
| return validation_result | |
| available_sources = available_metadata.get("sources", []) | |
| available_filenames = available_metadata.get("filenames", []) | |
| # Validate sources | |
| if sources and sources not in available_sources: | |
| validation_result["errors"].append(f"Source '{sources}' not found in available sources") | |
| validation_result["valid"] = False | |
| # Validate reports | |
| if reports: | |
| for report in reports: | |
| if report not in available_filenames: | |
| validation_result["warnings"].append(f"Report '{report}' not found in available reports") | |
| # Validate subtypes | |
| if subtype: | |
| for sub in subtype: | |
| if sub not in available_filenames: | |
| validation_result["warnings"].append(f"Subtype '{sub}' not found in available reports") | |
| return validation_result | |
| def get_report_statistics(chunks: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Get statistics about reports in chunks. | |
| Args: | |
| chunks: List of chunk dictionaries | |
| Returns: | |
| Dictionary with report statistics | |
| """ | |
| if not chunks: | |
| return {} | |
| stats = { | |
| "total_chunks": len(chunks), | |
| "sources": {}, | |
| "years": {}, | |
| "avg_chunk_length": 0, | |
| "total_content_length": 0 | |
| } | |
| total_length = 0 | |
| for chunk in chunks: | |
| content = chunk.get("content", "") | |
| total_length += len(content) | |
| metadata = chunk.get("metadata", {}) | |
| # Count by source | |
| source = metadata.get("source", "Unknown") | |
| stats["sources"][source] = stats["sources"].get(source, 0) + 1 | |
| # Count by year | |
| year = metadata.get("year", "Unknown") | |
| stats["years"][year] = stats["years"].get(year, 0) + 1 | |
| stats["total_content_length"] = total_length | |
| stats["avg_chunk_length"] = total_length / len(chunks) if chunks else 0 | |
| return stats | |
| def filter_chunks_by_metadata( | |
| chunks: List[Dict[str, Any]], | |
| source_filter: str = None, | |
| filename_filter: List[str] = None, | |
| year_filter: List[str] = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Filter chunks by metadata criteria. | |
| Args: | |
| chunks: List of chunk dictionaries | |
| source_filter: Source to filter by | |
| filename_filter: List of filenames to filter by | |
| year_filter: List of years to filter by | |
| Returns: | |
| Filtered list of chunks | |
| """ | |
| filtered_chunks = chunks | |
| if source_filter: | |
| filtered_chunks = [ | |
| chunk for chunk in filtered_chunks | |
| if chunk.get("metadata", {}).get("source") == source_filter | |
| ] | |
| if filename_filter: | |
| filtered_chunks = [ | |
| chunk for chunk in filtered_chunks | |
| if chunk.get("metadata", {}).get("filename") in filename_filter | |
| ] | |
| if year_filter: | |
| filtered_chunks = [ | |
| chunk for chunk in filtered_chunks | |
| if chunk.get("metadata", {}).get("year") in year_filter | |
| ] | |
| return filtered_chunks | |