import logging import numpy as np import cv2 import os from datetime import datetime import uuid from . import constants from . import utils from . import ocr_placeholder logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def calculate_draft(pose_results, segment_data, original_image): # Ensure original_image is a NumPy array if not isinstance(original_image, np.ndarray): original_image = np.array(original_image) # Attempt to convert if not already # Create the segment mask internally mask = np.zeros(original_image.shape[:2], dtype=np.uint8) if len(segment_data)>1: r = [len(i) for i in segment_data] segment_data = segment_data[np.argmax(r)] pts = np.array(segment_data, dtype=np.int32) cv2.fillPoly(mask, [pts], 1) segment_mask = mask """ Calculates the draft measurement. """ mark_names = ["meter mark", "80cm mark", "60cm mark", "40cm mark", "20cm mark"] def find_lowest_mark_group(pose_results): lowest_mark_group = None max_y = -1 for mark_group in pose_results: last_valid_keypoint = None for keypoint in reversed(mark_group): if keypoint[2] >= constants.CONF_THRESHOLD: last_valid_keypoint = keypoint break if last_valid_keypoint is not None: _, y, _ = last_valid_keypoint if y > max_y: max_y = y lowest_mark_group = mark_group return lowest_mark_group def extract_meter_mark(image, mark_group, group_index): first_keypoint = mark_group[0] x, y, _ = first_keypoint twenty_cm_in_pixels = utils.calc_distance(mark_group[0], mark_group[1]) square_size = utils.calc_sqr_size(constants.DEFAULT_SQUARE_SIZE_CM, twenty_cm_in_pixels) square_size *= 1.3 half_size = int(square_size/ 2) h, w, _ = image.shape x1 = int(x - int(half_size*1.2)) y1 = int(y - half_size) x2 = int(x + int(half_size*1.2)) y2 = int(y + half_size) if x1 < 0: x1 = 0 if y1 < 0: y1 = 0 if x2 > w: x2 = w if y2 > h: y2 = h cropped_image = image[y1:y2, x1:x2] if constants.SAVE_CROPPED_IMAGE: output_folder = os.path.join(os.path.dirname(__file__), constants.CROP_OUTPUT_FOLDER) if not os.path.exists(output_folder): os.makedirs(output_folder) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"{timestamp}_{group_index}_{uuid.uuid4()}.png" cv2.imwrite(os.path.join(output_folder, filename), cropped_image) return cropped_image # Crop and save meter marks for all groups with high confidence for i, mark_group in enumerate(pose_results): if mark_group[0][2] >= constants.CONF_THRESHOLD: extract_meter_mark(original_image, mark_group, i) lowest_mark_group = find_lowest_mark_group(pose_results) if lowest_mark_group is None: logging.error("No lowest mark group found.") return -1 logging.info(f"Lowest mark group found: {lowest_mark_group}") meter_mark_image = extract_meter_mark(original_image, lowest_mark_group, -1) meter_value_str = ocr_placeholder.perform_ocr(meter_mark_image) meter_value = int(meter_value_str.replace('m', '')) logging.info(f"Meter value from OCR: {meter_value}m") last_valid_keypoint = None last_valid_keypoint_index = -1 for i, keypoint in reversed(list(enumerate(lowest_mark_group))): if keypoint[2] >= constants.CONF_THRESHOLD: last_valid_keypoint = keypoint last_valid_keypoint_index = i break if last_valid_keypoint is None: logging.error("No last valid keypoint found.") return -1 logging.info(f"Last valid keypoint found: {mark_names[last_valid_keypoint_index]} ({last_valid_keypoint_index}) at coordinates {last_valid_keypoint[:2]}") x, y, _ = last_valid_keypoint # Find the water line in the segment mask column = segment_mask[:, int(x)] water_line_indices = np.where(column > 0) if len(water_line_indices[0]) > 0: water_line_top_y = water_line_indices[0][0] water_line_bottom_y = water_line_indices[0][-1] # Define the waterline segment as a vertical line in the column of the keypoint segment_start = (x, water_line_top_y) segment_end = (x, water_line_bottom_y) pixel_distance = utils.distance_point_to_segment((x, y), segment_start, segment_end) else: logging.error("No water line found.") return -1 logging.info(f"Pixel distance between keypoint and water line: {pixel_distance}") distances = [] for i in range(len(lowest_mark_group) - 1): if lowest_mark_group[i][2] >= constants.CONF_THRESHOLD and lowest_mark_group[i+1][2] >= constants.CONF_THRESHOLD: distances.append(utils.calc_distance(lowest_mark_group[i], lowest_mark_group[i+1])) if not distances: logging.error("No valid consecutive keypoints found to calculate 20cm in pixels.") return -1 twenty_cm_in_pixels = np.mean(distances) logging.info(f"20cm in pixels: {twenty_cm_in_pixels}") cm_distance = (pixel_distance / twenty_cm_in_pixels) * 20 logging.info(f"Distance in cm between keypoint and water line: {cm_distance}") last_valid_keypoint_cm = (100 - (last_valid_keypoint_index * 20)) logging.info(f"Last valid keypoint cm value: {last_valid_keypoint_cm}") final_draft_cm = (last_valid_keypoint_cm + 5) - cm_distance logging.info(f"Final draft cm value: {final_draft_cm}") final_draft = (meter_value - 1) + (final_draft_cm / 100) logging.info(f"Final calculated draft: {final_draft}") mid_results = { 'meter_value': meter_value, 'last_valid_keypoint_cm': last_valid_keypoint_cm, 'cm_distance': cm_distance, 'final_draft_cm': final_draft_cm, 'extracted_mark_image': meter_mark_image } return final_draft, mid_results