Spaces:
Sleeping
Sleeping
| import base64 | |
| import os | |
| from models.database import SessionLocal, Report | |
| from models.schemas import MeasurementMetadata | |
| import smtplib | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.base import MIMEBase | |
| from email import encoders | |
| from typing import Optional | |
| from datetime import datetime | |
| REPORTS_DIR = "reports" | |
| class PersistenceService: | |
| """ | |
| A service to handle the persistence of the report. | |
| """ | |
| def __init__(self): | |
| if not os.path.exists(REPORTS_DIR): | |
| os.makedirs(REPORTS_DIR) | |
| def save_to_disk(self, pdf_bytes: bytes, metadata: MeasurementMetadata) -> str: | |
| """Saves the PDF report to the disk.""" | |
| filename = f"report_{metadata.ship_id}_{metadata.timestamp.strftime('%Y%m%d%H%M%S')}.pdf" | |
| file_path = os.path.join(REPORTS_DIR, filename) | |
| with open(file_path, "wb") as f: | |
| f.write(pdf_bytes) | |
| return file_path # Return the full file_path | |
| def save_to_db(self, file_path: str, metadata: MeasurementMetadata, draft_measurement: float, confidence_score: float, image_bytes: bytes): | |
| """Saves the report metadata to the database.""" | |
| db = SessionLocal() | |
| db_report = Report( | |
| ship_id=metadata.ship_id, | |
| timestamp=metadata.timestamp, | |
| latitude=metadata.latitude, | |
| longitude=metadata.longitude, | |
| draft_measurement=draft_measurement, | |
| confidence_score=confidence_score, | |
| pdf_path=file_path, | |
| image_bytes=image_bytes | |
| ) | |
| db.add(db_report) | |
| db.commit() | |
| db.refresh(db_report) | |
| db.close() | |
| def get_all_reports( | |
| self, | |
| skip: int = 0, | |
| limit: int = 10, | |
| search: Optional[str] = None, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None, | |
| ): | |
| """Retrieves all reports from the database with pagination and filtering.""" | |
| db = SessionLocal() | |
| # Explicitly select columns including image_bytes | |
| query = db.query( | |
| Report.id, | |
| Report.ship_id, | |
| Report.timestamp, | |
| Report.latitude, | |
| Report.longitude, | |
| Report.draft_measurement, | |
| Report.confidence_score, | |
| Report.pdf_path, | |
| Report.image_bytes # Include image_bytes | |
| ) | |
| if search: | |
| query = query.filter(Report.ship_id.contains(search)) | |
| if start_date: | |
| query = query.filter(Report.timestamp >= start_date) | |
| if end_date: | |
| query = query.filter(Report.timestamp <= end_date) | |
| reports = query.offset(skip).limit(limit).all() | |
| db.close() | |
| # Convert SQLAlchemy Row objects to dictionaries and Base64 encode image_bytes | |
| column_names = [ | |
| "id", "ship_id", "timestamp", "latitude", "longitude", | |
| "draft_measurement", "confidence_score", "pdf_path", "image_bytes" # Add image_bytes | |
| ] | |
| reports_as_dicts = [] | |
| for report_row in reports: | |
| report_dict = {name: value for name, value in zip(column_names, report_row)} | |
| if report_dict["image_bytes"]: | |
| # Base64 encode the image bytes | |
| report_dict["image_bytes"] = base64.b64encode(report_dict["image_bytes"]).decode('utf-8') | |
| reports_as_dicts.append(report_dict) | |
| return reports_as_dicts | |
| def get_report_by_id(self, report_id: int): | |
| """Retrieves a single report by its ID.""" | |
| db = SessionLocal() | |
| report = db.query(Report).filter(Report.id == report_id).first() | |
| db.close() | |
| return report | |
| def send_by_email(self, file_path: str, recipient_email: str): | |
| """ | |
| Sends the PDF report as an email attachment. | |
| NOTE: This is a placeholder and requires real SMTP configuration. | |
| """ | |
| # In a real application, these would come from a config file | |
| smtp_server = "smtp.gmail.com" | |
| smtp_port = 587 | |
| sender_email = "copfnf@gmail.com" | |
| sender_password = "@26484295may" | |
| recipient_email = "pfnfcat@gmail.com" | |
| msg = MIMEMultipart() | |
| msg["From"] = sender_email | |
| msg["To"] = recipient_email | |
| msg["Subject"] = f"Ship Draft Report: {os.path.basename(file_path)}" | |
| with open(file_path, "rb") as attachment: | |
| part = MIMEBase("application", "octet-stream") | |
| part.set_payload(attachment.read()) | |
| encoders.encode_base64(part) | |
| part.add_header( | |
| "Content-Disposition", | |
| f"attachment; filename= {os.path.basename(file_path)}", | |
| ) | |
| msg.attach(part) | |
| print(f"\n--- EMAIL SIMULATION ---") | |
| print(f"Sending email to {recipient_email} from {sender_email}") | |
| print(f"Attaching file: {file_path}") | |
| print(f"--- END EMAIL SIMULATION ---") | |
| # The following code would send the email | |
| try: | |
| server = smtplib.SMTP(smtp_server, smtp_port) | |
| server.starttls() | |
| server.login(sender_email, sender_password) | |
| server.sendmail(sender_email, recipient_email, msg.as_string()) | |
| server.quit() | |
| print("Email sent successfully!") | |
| except Exception as e: | |
| print(f"Failed to send email: {e}") | |