draft-estimation / services /persistence_service.py
PauloFN's picture
first
6a6918c
raw
history blame
5.33 kB
import base64
import os
from models.database import SessionLocal, Report
from models.schemas import MeasurementMetadata
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from typing import Optional
from datetime import datetime
REPORTS_DIR = "reports"
class PersistenceService:
"""
A service to handle the persistence of the report.
"""
def __init__(self):
if not os.path.exists(REPORTS_DIR):
os.makedirs(REPORTS_DIR)
def save_to_disk(self, pdf_bytes: bytes, metadata: MeasurementMetadata) -> str:
"""Saves the PDF report to the disk."""
filename = f"report_{metadata.ship_id}_{metadata.timestamp.strftime('%Y%m%d%H%M%S')}.pdf"
file_path = os.path.join(REPORTS_DIR, filename)
with open(file_path, "wb") as f:
f.write(pdf_bytes)
return file_path # Return the full file_path
def save_to_db(self, file_path: str, metadata: MeasurementMetadata, draft_measurement: float, confidence_score: float, image_bytes: bytes):
"""Saves the report metadata to the database."""
db = SessionLocal()
db_report = Report(
ship_id=metadata.ship_id,
timestamp=metadata.timestamp,
latitude=metadata.latitude,
longitude=metadata.longitude,
draft_measurement=draft_measurement,
confidence_score=confidence_score,
pdf_path=file_path,
image_bytes=image_bytes
)
db.add(db_report)
db.commit()
db.refresh(db_report)
db.close()
def get_all_reports(
self,
skip: int = 0,
limit: int = 10,
search: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
):
"""Retrieves all reports from the database with pagination and filtering."""
db = SessionLocal()
# Explicitly select columns including image_bytes
query = db.query(
Report.id,
Report.ship_id,
Report.timestamp,
Report.latitude,
Report.longitude,
Report.draft_measurement,
Report.confidence_score,
Report.pdf_path,
Report.image_bytes # Include image_bytes
)
if search:
query = query.filter(Report.ship_id.contains(search))
if start_date:
query = query.filter(Report.timestamp >= start_date)
if end_date:
query = query.filter(Report.timestamp <= end_date)
reports = query.offset(skip).limit(limit).all()
db.close()
# Convert SQLAlchemy Row objects to dictionaries and Base64 encode image_bytes
column_names = [
"id", "ship_id", "timestamp", "latitude", "longitude",
"draft_measurement", "confidence_score", "pdf_path", "image_bytes" # Add image_bytes
]
reports_as_dicts = []
for report_row in reports:
report_dict = {name: value for name, value in zip(column_names, report_row)}
if report_dict["image_bytes"]:
# Base64 encode the image bytes
report_dict["image_bytes"] = base64.b64encode(report_dict["image_bytes"]).decode('utf-8')
reports_as_dicts.append(report_dict)
return reports_as_dicts
def get_report_by_id(self, report_id: int):
"""Retrieves a single report by its ID."""
db = SessionLocal()
report = db.query(Report).filter(Report.id == report_id).first()
db.close()
return report
def send_by_email(self, file_path: str, recipient_email: str):
"""
Sends the PDF report as an email attachment.
NOTE: This is a placeholder and requires real SMTP configuration.
"""
# In a real application, these would come from a config file
smtp_server = "smtp.gmail.com"
smtp_port = 587
sender_email = "copfnf@gmail.com"
sender_password = "@26484295may"
recipient_email = "pfnfcat@gmail.com"
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient_email
msg["Subject"] = f"Ship Draft Report: {os.path.basename(file_path)}"
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename= {os.path.basename(file_path)}",
)
msg.attach(part)
print(f"\n--- EMAIL SIMULATION ---")
print(f"Sending email to {recipient_email} from {sender_email}")
print(f"Attaching file: {file_path}")
print(f"--- END EMAIL SIMULATION ---")
# The following code would send the email
try:
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender_email, sender_password)
server.sendmail(sender_email, recipient_email, msg.as_string())
server.quit()
print("Email sent successfully!")
except Exception as e:
print(f"Failed to send email: {e}")