Spaces:
Paused
Paused
| """ | |
| Service for dark web content operations. | |
| """ | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy.future import select | |
| from sqlalchemy import func, or_, text | |
| from datetime import datetime | |
| from typing import List, Optional, Dict, Any, Union | |
| from src.models.dark_web_content import DarkWebContent, DarkWebMention, ContentType, ContentStatus | |
| from src.models.threat import Threat, ThreatCategory, ThreatSeverity, ThreatStatus | |
| from src.api.schemas import PaginationParams | |
| async def create_content( | |
| db: AsyncSession, | |
| url: str, | |
| content: str, | |
| title: Optional[str] = None, | |
| content_type: ContentType = ContentType.OTHER, | |
| content_status: ContentStatus = ContentStatus.NEW, | |
| source_name: Optional[str] = None, | |
| source_type: Optional[str] = None, | |
| language: Optional[str] = None, | |
| relevance_score: float = 0.0, | |
| sentiment_score: float = 0.0, | |
| entity_data: Optional[str] = None, | |
| ) -> DarkWebContent: | |
| """ | |
| Create a new dark web content entry. | |
| Args: | |
| db: Database session | |
| url: URL of the content | |
| content: Text content | |
| title: Title of the content | |
| content_type: Type of content | |
| content_status: Status of content | |
| source_name: Name of the source | |
| source_type: Type of source | |
| language: Language of the content | |
| relevance_score: Relevance score (0-1) | |
| sentiment_score: Sentiment score (-1 to 1) | |
| entity_data: JSON string of extracted entities | |
| Returns: | |
| DarkWebContent: Created content | |
| """ | |
| # Extract domain from URL if possible | |
| domain = None | |
| if url: | |
| try: | |
| from urllib.parse import urlparse | |
| parsed_url = urlparse(url) | |
| domain = parsed_url.netloc | |
| except: | |
| pass | |
| db_content = DarkWebContent( | |
| url=url, | |
| domain=domain, | |
| title=title, | |
| content=content, | |
| content_type=content_type, | |
| content_status=content_status, | |
| source_name=source_name, | |
| source_type=source_type, | |
| language=language, | |
| scraped_at=datetime.utcnow(), | |
| relevance_score=relevance_score, | |
| sentiment_score=sentiment_score, | |
| entity_data=entity_data, | |
| ) | |
| db.add(db_content) | |
| await db.commit() | |
| await db.refresh(db_content) | |
| return db_content | |
| async def get_content_by_id(db: AsyncSession, content_id: int) -> Optional[DarkWebContent]: | |
| """ | |
| Get dark web content by ID. | |
| Args: | |
| db: Database session | |
| content_id: Content ID | |
| Returns: | |
| Optional[DarkWebContent]: Content or None if not found | |
| """ | |
| result = await db.execute(select(DarkWebContent).filter(DarkWebContent.id == content_id)) | |
| return result.scalars().first() | |
| async def get_contents( | |
| db: AsyncSession, | |
| pagination: PaginationParams, | |
| content_type: Optional[List[ContentType]] = None, | |
| content_status: Optional[List[ContentStatus]] = None, | |
| source_name: Optional[str] = None, | |
| search_query: Optional[str] = None, | |
| from_date: Optional[datetime] = None, | |
| to_date: Optional[datetime] = None, | |
| ) -> List[DarkWebContent]: | |
| """ | |
| Get dark web contents with filtering and pagination. | |
| Args: | |
| db: Database session | |
| pagination: Pagination parameters | |
| content_type: Filter by content type | |
| content_status: Filter by content status | |
| source_name: Filter by source name | |
| search_query: Search in title and content | |
| from_date: Filter by scraped_at >= from_date | |
| to_date: Filter by scraped_at <= to_date | |
| Returns: | |
| List[DarkWebContent]: List of dark web contents | |
| """ | |
| query = select(DarkWebContent) | |
| # Apply filters | |
| if content_type: | |
| query = query.filter(DarkWebContent.content_type.in_(content_type)) | |
| if content_status: | |
| query = query.filter(DarkWebContent.content_status.in_(content_status)) | |
| if source_name: | |
| query = query.filter(DarkWebContent.source_name == source_name) | |
| if search_query: | |
| search_filter = or_( | |
| DarkWebContent.title.ilike(f"%{search_query}%"), | |
| DarkWebContent.content.ilike(f"%{search_query}%") | |
| ) | |
| query = query.filter(search_filter) | |
| if from_date: | |
| query = query.filter(DarkWebContent.scraped_at >= from_date) | |
| if to_date: | |
| query = query.filter(DarkWebContent.scraped_at <= to_date) | |
| # Apply pagination | |
| query = query.order_by(DarkWebContent.scraped_at.desc()) | |
| query = query.offset((pagination.page - 1) * pagination.size).limit(pagination.size) | |
| result = await db.execute(query) | |
| return result.scalars().all() | |
| async def count_contents( | |
| db: AsyncSession, | |
| content_type: Optional[List[ContentType]] = None, | |
| content_status: Optional[List[ContentStatus]] = None, | |
| source_name: Optional[str] = None, | |
| search_query: Optional[str] = None, | |
| from_date: Optional[datetime] = None, | |
| to_date: Optional[datetime] = None, | |
| ) -> int: | |
| """ | |
| Count dark web contents with filtering. | |
| Args: | |
| db: Database session | |
| content_type: Filter by content type | |
| content_status: Filter by content status | |
| source_name: Filter by source name | |
| search_query: Search in title and content | |
| from_date: Filter by scraped_at >= from_date | |
| to_date: Filter by scraped_at <= to_date | |
| Returns: | |
| int: Count of dark web contents | |
| """ | |
| query = select(func.count(DarkWebContent.id)) | |
| # Apply filters (same as in get_contents) | |
| if content_type: | |
| query = query.filter(DarkWebContent.content_type.in_(content_type)) | |
| if content_status: | |
| query = query.filter(DarkWebContent.content_status.in_(content_status)) | |
| if source_name: | |
| query = query.filter(DarkWebContent.source_name == source_name) | |
| if search_query: | |
| search_filter = or_( | |
| DarkWebContent.title.ilike(f"%{search_query}%"), | |
| DarkWebContent.content.ilike(f"%{search_query}%") | |
| ) | |
| query = query.filter(search_filter) | |
| if from_date: | |
| query = query.filter(DarkWebContent.scraped_at >= from_date) | |
| if to_date: | |
| query = query.filter(DarkWebContent.scraped_at <= to_date) | |
| result = await db.execute(query) | |
| return result.scalar() | |
| async def create_mention( | |
| db: AsyncSession, | |
| content_id: int, | |
| keyword: str, | |
| keyword_category: Optional[str] = None, | |
| context: Optional[str] = None, | |
| snippet: Optional[str] = None, | |
| mention_type: Optional[str] = None, | |
| confidence: float = 0.0, | |
| is_verified: bool = False, | |
| ) -> DarkWebMention: | |
| """ | |
| Create a new dark web mention. | |
| Args: | |
| db: Database session | |
| content_id: ID of the content where the mention was found | |
| keyword: Keyword that was mentioned | |
| keyword_category: Category of the keyword | |
| context: Text surrounding the mention | |
| snippet: Extract of text containing the mention | |
| mention_type: Type of mention | |
| confidence: Confidence score (0-1) | |
| is_verified: Whether the mention is verified | |
| Returns: | |
| DarkWebMention: Created mention | |
| """ | |
| db_mention = DarkWebMention( | |
| content_id=content_id, | |
| keyword=keyword, | |
| keyword_category=keyword_category, | |
| context=context, | |
| snippet=snippet, | |
| mention_type=mention_type, | |
| confidence=confidence, | |
| is_verified=is_verified, | |
| ) | |
| db.add(db_mention) | |
| await db.commit() | |
| await db.refresh(db_mention) | |
| return db_mention | |
| async def get_mention_by_id(db: AsyncSession, mention_id: int) -> Optional[DarkWebMention]: | |
| """ | |
| Get dark web mention by ID. | |
| Args: | |
| db: Database session | |
| mention_id: Mention ID | |
| Returns: | |
| Optional[DarkWebMention]: Mention or None if not found | |
| """ | |
| result = await db.execute(select(DarkWebMention).filter(DarkWebMention.id == mention_id)) | |
| return result.scalars().first() | |
| async def get_mentions( | |
| db: AsyncSession, | |
| pagination: PaginationParams, | |
| keyword: Optional[str] = None, | |
| content_id: Optional[int] = None, | |
| is_verified: Optional[bool] = None, | |
| from_date: Optional[datetime] = None, | |
| to_date: Optional[datetime] = None, | |
| ) -> List[DarkWebMention]: | |
| """ | |
| Get dark web mentions with filtering and pagination. | |
| Args: | |
| db: Database session | |
| pagination: Pagination parameters | |
| keyword: Filter by keyword | |
| content_id: Filter by content ID | |
| is_verified: Filter by verification status | |
| from_date: Filter by created_at >= from_date | |
| to_date: Filter by created_at <= to_date | |
| Returns: | |
| List[DarkWebMention]: List of dark web mentions | |
| """ | |
| query = select(DarkWebMention) | |
| # Apply filters | |
| if keyword: | |
| query = query.filter(DarkWebMention.keyword.ilike(f"%{keyword}%")) | |
| if content_id: | |
| query = query.filter(DarkWebMention.content_id == content_id) | |
| if is_verified is not None: | |
| query = query.filter(DarkWebMention.is_verified == is_verified) | |
| if from_date: | |
| query = query.filter(DarkWebMention.created_at >= from_date) | |
| if to_date: | |
| query = query.filter(DarkWebMention.created_at <= to_date) | |
| # Apply pagination | |
| query = query.order_by(DarkWebMention.created_at.desc()) | |
| query = query.offset((pagination.page - 1) * pagination.size).limit(pagination.size) | |
| result = await db.execute(query) | |
| return result.scalars().all() | |
| async def create_threat_from_content( | |
| db: AsyncSession, | |
| content_id: int, | |
| title: str, | |
| description: str, | |
| severity: ThreatSeverity, | |
| category: ThreatCategory, | |
| confidence_score: float = 0.0, | |
| ) -> Threat: | |
| """ | |
| Create a threat from dark web content. | |
| Args: | |
| db: Database session | |
| content_id: ID of the content | |
| title: Threat title | |
| description: Threat description | |
| severity: Threat severity | |
| category: Threat category | |
| confidence_score: Confidence score (0-1) | |
| Returns: | |
| Threat: Created threat | |
| """ | |
| # Get the content | |
| content = await get_content_by_id(db, content_id) | |
| if not content: | |
| raise ValueError(f"Content with ID {content_id} not found") | |
| # Create the threat | |
| from src.api.services.threat_service import create_threat | |
| threat = await create_threat( | |
| db=db, | |
| title=title, | |
| description=description, | |
| severity=severity, | |
| category=category, | |
| status=ThreatStatus.NEW, | |
| source_url=content.url, | |
| source_name=content.source_name, | |
| source_type=content.source_type, | |
| confidence_score=confidence_score, | |
| ) | |
| return threat |