|
|
from flask import Flask, render_template, Response, flash, redirect, url_for |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from unstructured.partition.pdf import partition_pdf |
|
|
import json, base64, io, os |
|
|
from PIL import Image, ImageEnhance, ImageDraw |
|
|
from imutils.perspective import four_point_transform |
|
|
from dotenv import load_dotenv |
|
|
import pytesseract |
|
|
from transformers import BlipProcessor, BlipForConditionalGeneration |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
app = Flask(__name__) |
|
|
app.secret_key = os.getenv("SECRET_KEY") |
|
|
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" |
|
|
poppler_path=r"C:\poppler-23.11.0\Library\bin" |
|
|
|
|
|
count = 0 |
|
|
OUTPUT_FOLDER = "OUTPUTS" |
|
|
IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE") |
|
|
DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER,"DETECTED_IMAGE") |
|
|
PDF_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_PDF") |
|
|
JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON") |
|
|
|
|
|
for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, PDF_FOLDER_PATH, JSON_FOLDER_PATH]: |
|
|
os.makedirs(path, exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
camera= cv2.VideoCapture(0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_document_contour(image): |
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
|
blur = cv2.GaussianBlur(gray, (5, 5), 0) |
|
|
_, thresh = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
|
|
|
|
|
contours, _ = cv2.findContours(thresh, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) |
|
|
contours = sorted(contours, key=cv2.contourArea, reverse=True) |
|
|
|
|
|
for contour in contours: |
|
|
area = cv2.contourArea(contour) |
|
|
if area > 1000: |
|
|
peri = cv2.arcLength(contour, True) |
|
|
approx = cv2.approxPolyDP(contour, 0.02 * peri, True) |
|
|
if len(approx) == 4: |
|
|
return approx |
|
|
return None |
|
|
|
|
|
def load_image(image_path): |
|
|
ext = os.path.splitext(image_path)[1].lower() |
|
|
if ext in ['.png', '.jpg', '.jpeg', '.webp', '.tiff']: |
|
|
image = cv2.imread(image_path) |
|
|
cv2.imshow("Original Image",image) |
|
|
print(f"Image : {image}") |
|
|
if image is None: |
|
|
raise ValueError(f"Failed to load image from {image_path}. The file may be corrupted or unreadable.") |
|
|
return image |
|
|
else: |
|
|
raise ValueError(f"Unsupported image format: {ext}") |
|
|
|
|
|
|
|
|
def upscale_image(image, scale=2): |
|
|
height, width = image.shape[:2] |
|
|
upscaled_image = cv2.resize(image, (width * scale, height * scale), interpolation=cv2.INTER_CUBIC) |
|
|
print(f"UPSCALE IMAGE : {upscaled_image}") |
|
|
return upscaled_image |
|
|
|
|
|
|
|
|
def reduce_noise(image): |
|
|
return cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21) |
|
|
|
|
|
|
|
|
def sharpen_image(image): |
|
|
kernel = np.array([[0, -1, 0], |
|
|
[-1, 5, -1], |
|
|
[0, -1, 0]]) |
|
|
sharpened_image = cv2.filter2D(image, -1, kernel) |
|
|
return sharpened_image |
|
|
|
|
|
|
|
|
def enhance_image(image): |
|
|
pil_img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) |
|
|
enhancer = ImageEnhance.Contrast(pil_img) |
|
|
enhanced_image = enhancer.enhance(1.5) |
|
|
enhanced_image_bgr = cv2.cvtColor(np.array(enhanced_image), cv2.COLOR_RGB2BGR) |
|
|
return enhanced_image_bgr |
|
|
|
|
|
|
|
|
def process_image(image_path, scale=2): |
|
|
|
|
|
image = load_image(image_path) |
|
|
|
|
|
|
|
|
upscaled_image = upscale_image(image, scale) |
|
|
|
|
|
|
|
|
denoised_image = reduce_noise(upscaled_image) |
|
|
|
|
|
|
|
|
sharpened_image = sharpen_image(denoised_image) |
|
|
|
|
|
|
|
|
final_image = enhance_image(sharpened_image) |
|
|
print(f"FINAL IMAGE : {final_image}") |
|
|
cv2.imshow("Final Image",final_image) |
|
|
return final_image |
|
|
|
|
|
|
|
|
""" BlipProcessor: converts Image into tensor format""" |
|
|
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") |
|
|
|
|
|
""" BlipForConditionalGeneration: Generates the Image Caption(text)""" |
|
|
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to("cpu") |
|
|
print(f"BLIP Model: {blip_model}") |
|
|
|
|
|
def get_blip_description(image: Image.Image) -> str: |
|
|
inputs = blip_processor(image, return_tensors="pt").to("cpu") |
|
|
output = blip_model.generate(**inputs, max_new_tokens=100) |
|
|
caption = blip_processor.decode(output[0], skip_special_tokens=True) |
|
|
return caption |
|
|
|
|
|
|
|
|
def extract_images_from_pdf(pdf_path, output_json_path): |
|
|
elements = partition_pdf( |
|
|
filename=pdf_path, |
|
|
strategy="hi_res", |
|
|
extract_image_block_types=["Image"], |
|
|
extract_image_block_to_payload=True, |
|
|
) |
|
|
with open(output_json_path, "w") as f: |
|
|
json.dump([element.to_dict() for element in elements], f, indent=4) |
|
|
|
|
|
|
|
|
with open(output_json_path, 'r') as file: |
|
|
file_elements = json.load(file) |
|
|
|
|
|
extracted_images_dir = os.path.join(os.path.dirname(output_json_path), "extracted_images") |
|
|
os.makedirs(extracted_images_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
manipulated_json = {} |
|
|
pdf_filename = os.path.basename(pdf_path) |
|
|
pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\") |
|
|
|
|
|
sprite_count = 1 |
|
|
|
|
|
for i, element in enumerate(file_elements): |
|
|
if "image_base64" in element["metadata"]: |
|
|
image_data = base64.b64decode(element["metadata"]["image_base64"]) |
|
|
|
|
|
image = Image.open(io.BytesIO(image_data)).convert("RGB") |
|
|
image.show(title=f"Extracted Image {i+1}") |
|
|
|
|
|
|
|
|
description = get_blip_description(image) |
|
|
|
|
|
manipulated_json[f"Sprite {sprite_count}"] = { |
|
|
"name": pdf_filename, |
|
|
"base64": element["metadata"]["image_base64"], |
|
|
"file-path": pdf_dir_path, |
|
|
"description":description |
|
|
} |
|
|
sprite_count += 1 |
|
|
|
|
|
|
|
|
manipulated_json_path = output_json_path.replace(".json", "_sprites.json") |
|
|
with open(manipulated_json_path, "w") as sprite_file: |
|
|
json.dump(manipulated_json, sprite_file, indent=4) |
|
|
|
|
|
print(f"✅ Manipulated sprite JSON saved: {manipulated_json_path}") |
|
|
|
|
|
display = None |
|
|
scale = 0.5 |
|
|
contour = None |
|
|
|
|
|
def gen_frames(): |
|
|
global display |
|
|
|
|
|
while True: |
|
|
|
|
|
success, frame = camera.read() |
|
|
if not success: |
|
|
break |
|
|
else: |
|
|
display = frame.copy() |
|
|
contour = detect_document_contour(display) |
|
|
|
|
|
if contour is not None: |
|
|
cv2.drawContours(display, [contour], -1, (0, 255, 0), 3) |
|
|
|
|
|
resized = cv2.resize(display, (int(scale * display.shape[1]), int(scale * display.shape[0]))) |
|
|
cv2.imshow("📷 Scan Document - Press 's' to Save, ESC to Exit", resized) |
|
|
|
|
|
ret, buffer = cv2.imencode('.jpg', resized) |
|
|
|
|
|
frame = buffer.tobytes() |
|
|
yield (b'--frame\r\n' |
|
|
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') |
|
|
|
|
|
|
|
|
@app.route("/capture", methods=['POST']) |
|
|
def capture_document(): |
|
|
global count, display |
|
|
|
|
|
if display is None: |
|
|
flash("❌ No frame captured!", "error") |
|
|
return redirect(url_for("index")) |
|
|
|
|
|
frame = display.copy() |
|
|
contour = detect_document_contour(frame) |
|
|
|
|
|
if contour is None: |
|
|
flash("❌ No document contour found!", "error") |
|
|
return redirect(url_for("index")) |
|
|
|
|
|
warped = four_point_transform(frame, contour.reshape(4, 2)) |
|
|
image_path = os.path.join(IMAGE_FOLDER_PATH, f"scanned_colored_{count}.jpg") |
|
|
|
|
|
pdf_path = os.path.join(PDF_FOLDER_PATH, f"scanned_colored_{count}.pdf") |
|
|
json_path = os.path.join(JSON_FOLDER_PATH, f"scanned_{count}.json") |
|
|
|
|
|
|
|
|
cv2.imwrite(image_path, warped) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
img = process_image(image_path) |
|
|
pil_img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) |
|
|
pil_img.save(pdf_path) |
|
|
|
|
|
extract_images_from_pdf(pdf_path, json_path) |
|
|
|
|
|
flash("✅ Document scanned and saved!", "success") |
|
|
count += 1 |
|
|
return redirect(url_for("index")) |
|
|
|
|
|
@app.route('/video_feed') |
|
|
def video_feed(): |
|
|
|
|
|
return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') |
|
|
|
|
|
@app.route('/') |
|
|
def index(): |
|
|
"""Video streaming home page.""" |
|
|
return render_template('live_streaming_index.html') |
|
|
|
|
|
if __name__ == '__main__': |
|
|
app.run(host="0.0.0.0", port=7860, debug=False) |