|
|
import cv2 |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import io |
|
|
import base64 |
|
|
from typing import List, Dict, Any, Optional |
|
|
import torch |
|
|
from groq import Groq |
|
|
from config.settings import settings |
|
|
|
|
|
class ImageService: |
|
|
def __init__(self, groq_client: Groq): |
|
|
self.client = groq_client |
|
|
self.ocr_processor = None |
|
|
self.easy_ocr_reader = None |
|
|
self._initialize_ocr_models() |
|
|
|
|
|
def _initialize_ocr_models(self): |
|
|
"""Khởi tạo các model OCR với xử lý lỗi tốt hơn""" |
|
|
try: |
|
|
print("🔄 Đang khởi tạo OCR models...") |
|
|
|
|
|
|
|
|
try: |
|
|
import easyocr |
|
|
|
|
|
|
|
|
safe_languages = ['en', 'vi'] |
|
|
|
|
|
|
|
|
try: |
|
|
safe_languages.append('fr') |
|
|
safe_languages.append('es') |
|
|
safe_languages.append('de') |
|
|
except: |
|
|
print("⚠️ Một số ngôn ngữ châu Âu không khả dụng") |
|
|
|
|
|
|
|
|
try: |
|
|
safe_languages.append('ja') |
|
|
except: |
|
|
print("⚠️ Tiếng Nhật không khả dụng") |
|
|
|
|
|
|
|
|
try: |
|
|
safe_languages.append('ko') |
|
|
except: |
|
|
print("⚠️ Tiếng Hàn không khả dụng") |
|
|
|
|
|
|
|
|
try: |
|
|
safe_languages.append('ch_sim') |
|
|
except: |
|
|
print("⚠️ Tiếng Trung không khả dụng") |
|
|
|
|
|
print(f"🎯 Ngôn ngữ OCR được hỗ trợ: {safe_languages}") |
|
|
|
|
|
self.easy_ocr_reader = easyocr.Reader( |
|
|
safe_languages, |
|
|
gpu=torch.cuda.is_available() |
|
|
) |
|
|
print("✅ EasyOCR initialized successfully") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Lỗi khởi tạo EasyOCR: {e}") |
|
|
print("🔄 Thử khởi tạo chỉ với tiếng Anh...") |
|
|
try: |
|
|
self.easy_ocr_reader = easyocr.Reader(['en']) |
|
|
print("✅ EasyOCR initialized with English only") |
|
|
except Exception as e2: |
|
|
print(f"❌ Không thể khởi tạo EasyOCR: {e2}") |
|
|
self.easy_ocr_reader = None |
|
|
|
|
|
|
|
|
try: |
|
|
from manga_ocr import MangaOcr |
|
|
self.ocr_processor = MangaOcr() |
|
|
print("✅ MangaOCR initialized successfully") |
|
|
except ImportError: |
|
|
print("❌ MangaOCR not installed, installing...") |
|
|
import subprocess |
|
|
subprocess.run(["pip", "install", "manga-ocr"], check=True) |
|
|
from manga_ocr import MangaOcr |
|
|
self.ocr_processor = MangaOcr() |
|
|
print("✅ MangaOCR installed and initialized") |
|
|
except Exception as e: |
|
|
print(f"❌ Lỗi khởi tạo MangaOCR: {e}") |
|
|
self.ocr_processor = None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Lỗi khởi tạo OCR models: {e}") |
|
|
|
|
|
def preprocess_image(self, image: np.ndarray) -> np.ndarray: |
|
|
"""Tiền xử lý ảnh để cải thiện OCR accuracy""" |
|
|
try: |
|
|
|
|
|
if image.dtype != np.uint8: |
|
|
image = image.astype(np.uint8) |
|
|
|
|
|
|
|
|
if len(image.shape) == 3: |
|
|
if image.shape[2] == 4: |
|
|
image = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB) |
|
|
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) |
|
|
else: |
|
|
gray = image |
|
|
|
|
|
|
|
|
height, width = gray.shape |
|
|
max_dimension = 1600 |
|
|
if max(height, width) > max_dimension: |
|
|
scale = max_dimension / max(height, width) |
|
|
new_width = int(width * scale) |
|
|
new_height = int(height * scale) |
|
|
gray = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_AREA) |
|
|
|
|
|
|
|
|
|
|
|
denoised = cv2.medianBlur(gray, 3) |
|
|
|
|
|
|
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) |
|
|
enhanced = clahe.apply(denoised) |
|
|
|
|
|
|
|
|
_, binary = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
|
|
|
|
|
return binary |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ Lỗi tiền xử lý ảnh: {e}") |
|
|
return image |
|
|
|
|
|
def extract_text_easyocr(self, image: np.ndarray) -> List[Dict[str, Any]]: |
|
|
"""Trích xuất text sử dụng EasyOCR với xử lý lỗi""" |
|
|
if self.easy_ocr_reader is None: |
|
|
print("❌ EasyOCR chưa được khởi tạo") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
print("🔍 EasyOCR đang xử lý ảnh...") |
|
|
|
|
|
|
|
|
processed_image = self.preprocess_image(image) |
|
|
|
|
|
|
|
|
if processed_image.dtype != np.uint8: |
|
|
processed_image = processed_image.astype(np.uint8) |
|
|
|
|
|
|
|
|
results = self.easy_ocr_reader.readtext( |
|
|
processed_image, |
|
|
detail=1, |
|
|
paragraph=True, |
|
|
batch_size=4, |
|
|
contrast_ths=0.3, |
|
|
adjust_contrast=0.5, |
|
|
text_threshold=0.5, |
|
|
link_threshold=0.4 |
|
|
) |
|
|
|
|
|
|
|
|
extracted_texts = [] |
|
|
for bbox, text, confidence in results: |
|
|
clean_text = text.strip() |
|
|
if clean_text and len(clean_text) > 1: |
|
|
extracted_texts.append({ |
|
|
'text': clean_text, |
|
|
'confidence': float(confidence), |
|
|
'bbox': bbox, |
|
|
'source': 'easyocr' |
|
|
}) |
|
|
|
|
|
print(f"📊 EasyOCR tìm thấy {len(extracted_texts)} vùng văn bản") |
|
|
return extracted_texts |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Lỗi EasyOCR: {e}") |
|
|
return [] |
|
|
|
|
|
def extract_text_mangaocr(self, image: np.ndarray) -> List[Dict[str, Any]]: |
|
|
"""Trích xuất text sử dụng MangaOCR (tốt cho tiếng Việt)""" |
|
|
if self.ocr_processor is None: |
|
|
print("❌ MangaOCR chưa được khởi tạo") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
print("🔍 MangaOCR đang xử lý ảnh...") |
|
|
|
|
|
|
|
|
if len(image.shape) == 3: |
|
|
pil_image = Image.fromarray(image) |
|
|
else: |
|
|
pil_image = Image.fromarray(image).convert('RGB') |
|
|
|
|
|
|
|
|
text = self.ocr_processor(pil_image) |
|
|
|
|
|
if text and len(text.strip()) > 1: |
|
|
return [{ |
|
|
'text': text.strip(), |
|
|
'confidence': 0.8, |
|
|
'bbox': None, |
|
|
'source': 'manga_ocr' |
|
|
}] |
|
|
else: |
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Lỗi MangaOCR: {e}") |
|
|
return [] |
|
|
|
|
|
def extract_text_pytesseract(self, image: np.ndarray) -> List[Dict[str, Any]]: |
|
|
"""Trích xuất text sử dụng pytesseract (fallback)""" |
|
|
try: |
|
|
import pytesseract |
|
|
|
|
|
|
|
|
processed_image = self.preprocess_image(image) |
|
|
|
|
|
|
|
|
custom_config = r'--oem 3 --psm 6 -l vie+eng' |
|
|
|
|
|
|
|
|
text = pytesseract.image_to_string(processed_image, config=custom_config) |
|
|
|
|
|
if text and len(text.strip()) > 1: |
|
|
return [{ |
|
|
'text': text.strip(), |
|
|
'confidence': 0.7, |
|
|
'bbox': None, |
|
|
'source': 'pytesseract' |
|
|
}] |
|
|
else: |
|
|
return [] |
|
|
|
|
|
except ImportError: |
|
|
print("⚠️ pytesseract chưa được cài đặt") |
|
|
return [] |
|
|
except Exception as e: |
|
|
print(f"❌ Lỗi pytesseract: {e}") |
|
|
return [] |
|
|
|
|
|
def merge_ocr_results(self, results_list: List[List]) -> str: |
|
|
"""Kết hợp và chọn lọc kết quả từ nhiều OCR engine""" |
|
|
all_texts = [] |
|
|
|
|
|
for results in results_list: |
|
|
for result in results: |
|
|
if result['confidence'] > 0.4: |
|
|
all_texts.append(result['text']) |
|
|
|
|
|
|
|
|
unique_texts = [] |
|
|
seen_texts = set() |
|
|
|
|
|
for text in all_texts: |
|
|
clean_text = text.strip() |
|
|
|
|
|
if (clean_text and |
|
|
len(clean_text) > 2 and |
|
|
any(c.isalnum() for c in clean_text) and |
|
|
clean_text not in seen_texts): |
|
|
unique_texts.append(clean_text) |
|
|
seen_texts.add(clean_text) |
|
|
|
|
|
if unique_texts: |
|
|
combined_text = "\n".join(unique_texts) |
|
|
print(f"✅ Văn bản trích xuất: {combined_text[:200]}...") |
|
|
return combined_text |
|
|
else: |
|
|
return "Không phát hiện được văn bản trong ảnh." |
|
|
|
|
|
def extract_text_from_image(self, image: np.ndarray) -> str: |
|
|
"""Trích xuất văn bản từ ảnh sử dụng nhiều OCR engine""" |
|
|
if image is None: |
|
|
return "Không có ảnh được tải lên." |
|
|
|
|
|
try: |
|
|
print("🔍 Đang trích xuất văn bản từ ảnh...") |
|
|
|
|
|
|
|
|
all_results = [] |
|
|
|
|
|
|
|
|
easyocr_results = self.extract_text_easyocr(image) |
|
|
all_results.append(easyocr_results) |
|
|
|
|
|
|
|
|
mangaocr_results = self.extract_text_mangaocr(image) |
|
|
all_results.append(mangaocr_results) |
|
|
|
|
|
|
|
|
pytesseract_results = self.extract_text_pytesseract(image) |
|
|
all_results.append(pytesseract_results) |
|
|
|
|
|
|
|
|
merged_text = self.merge_ocr_results(all_results) |
|
|
|
|
|
return merged_text |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Lỗi trích xuất văn bản: {e}") |
|
|
return f"Lỗi khi trích xuất văn bản: {str(e)}" |
|
|
|
|
|
def analyze_text_with_llm(self, extracted_text: str, user_description: str = "") -> str: |
|
|
"""Phân tích văn bản trích xuất được bằng LLM""" |
|
|
try: |
|
|
if not extracted_text or "Không phát hiện" in extracted_text: |
|
|
|
|
|
prompt = """ |
|
|
Tôi đã tải lên một hình ảnh nhưng không thể trích xuất được văn bản từ đó. |
|
|
Đây có thể là ảnh chụp, hình minh họa, hoặc ảnh không có chữ. |
|
|
|
|
|
Hãy: |
|
|
1. Mô tả tổng quan về loại hình ảnh này có thể là gì |
|
|
2. Đưa ra các phán đoán về nội dung dựa trên đặc điểm chung |
|
|
3. Gợi ý loại ảnh nào thường chứa văn bản để trích xuất |
|
|
""" |
|
|
else: |
|
|
if user_description: |
|
|
prompt = f""" |
|
|
NGƯỜI DÙNG MÔ TẢ: "{user_description}" |
|
|
|
|
|
VĂN BẢN TRÍCH XUẤT TỪ ẢNH: |
|
|
{extracted_text} |
|
|
|
|
|
Hãy phân tích bằng tiếng Việt: |
|
|
1. Tóm tắt nội dung chính của văn bản |
|
|
2. Giải thích ý nghĩa trong ngữ cảnh người dùng mô tả |
|
|
3. Đưa ra thông tin bổ sung hoặc gợi ý liên quan |
|
|
""" |
|
|
else: |
|
|
prompt = f""" |
|
|
VĂN BẢN TRÍCH XUẤT TỪ ẢNH: |
|
|
{extracted_text} |
|
|
|
|
|
Hãy phân tích bằng tiếng Việt: |
|
|
1. Loại văn bản này là gì? (tài liệu, quảng cáo, tin nhắn, v.v.) |
|
|
2. Nội dung chính và thông tin quan trọng |
|
|
3. Ngữ cảnh có thể và ý nghĩa |
|
|
4. Thông tin hữu ích khác từ văn bản |
|
|
""" |
|
|
|
|
|
|
|
|
completion = self.client.chat.completions.create( |
|
|
model=settings.DEFAULT_LLM_MODEL, |
|
|
messages=[ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": "Bạn là chuyên gia phân tích hình ảnh và văn bản. Hãy trả lời bằng tiếng Việt tự nhiên, rõ ràng và hữu ích. Nếu không có văn bản, hãy mô tả ảnh một cách thông minh." |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": prompt |
|
|
} |
|
|
], |
|
|
max_tokens=800, |
|
|
temperature=0.7 |
|
|
) |
|
|
|
|
|
return completion.choices[0].message.content |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Lỗi phân tích với LLM: {e}") |
|
|
return f"Lỗi khi phân tích với AI: {str(e)}" |
|
|
|
|
|
def analyze_image_with_description(self, image, user_description: str = "") -> str: |
|
|
"""Phân tích ảnh hoàn chỉnh: OCR + LLM""" |
|
|
if image is None: |
|
|
return "❌ Vui lòng tải lên một hình ảnh để phân tích." |
|
|
|
|
|
try: |
|
|
|
|
|
extracted_text = self.extract_text_from_image(image) |
|
|
|
|
|
|
|
|
analysis_result = self.analyze_text_with_llm(extracted_text, user_description) |
|
|
|
|
|
|
|
|
result = f"""📊 **KẾT QUẢ PHÂN TÍCH HÌNH ẢNH** |
|
|
|
|
|
🔍 **Văn bản trích xuất được:** |
|
|
{extracted_text} |
|
|
|
|
|
🤖 **Phân tích AI:** |
|
|
{analysis_result} |
|
|
|
|
|
--- |
|
|
*Hệ thống sử dụng đa OCR engine (EasyOCR, MangaOCR) kết hợp AI* |
|
|
*Độ chính xác phụ thuộc vào chất lượng ảnh và độ phức tạp của văn bản*""" |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Lỗi phân tích ảnh: {e}") |
|
|
return f"❌ Lỗi trong quá trình phân tích: {str(e)}" |