Spaces:
Running
Running
| import logging | |
| import numpy as np | |
| import cv2 | |
| import os | |
| from datetime import datetime | |
| import uuid | |
| from . import constants | |
| from . import utils | |
| from . import ocr_placeholder | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| def calculate_draft(pose_results, segment_data, original_image): | |
| # Ensure original_image is a NumPy array | |
| if not isinstance(original_image, np.ndarray): | |
| original_image = np.array(original_image) # Attempt to convert if not already | |
| # Create the segment mask internally | |
| mask = np.zeros(original_image.shape[:2], dtype=np.uint8) | |
| if len(segment_data)>1: | |
| r = [len(i) for i in segment_data] | |
| segment_data = segment_data[np.argmax(r)] | |
| pts = np.array(segment_data, dtype=np.int32) | |
| cv2.fillPoly(mask, [pts], 1) | |
| segment_mask = mask | |
| """ | |
| Calculates the draft measurement. | |
| """ | |
| mark_names = ["meter mark", "80cm mark", "60cm mark", "40cm mark", "20cm mark"] | |
| def find_lowest_mark_group(pose_results): | |
| lowest_mark_group = None | |
| max_y = -1 | |
| for mark_group in pose_results: | |
| last_valid_keypoint = None | |
| for keypoint in reversed(mark_group): | |
| if keypoint[2] >= constants.CONF_THRESHOLD: | |
| last_valid_keypoint = keypoint | |
| break | |
| if last_valid_keypoint is not None: | |
| _, y, _ = last_valid_keypoint | |
| if y > max_y: | |
| max_y = y | |
| lowest_mark_group = mark_group | |
| return lowest_mark_group | |
| def extract_meter_mark(image, mark_group, group_index): | |
| first_keypoint = mark_group[0] | |
| x, y, _ = first_keypoint | |
| twenty_cm_in_pixels = utils.calc_distance(mark_group[0], mark_group[1]) | |
| square_size = utils.calc_sqr_size(constants.DEFAULT_SQUARE_SIZE_CM, twenty_cm_in_pixels) | |
| square_size *= 1.2 | |
| half_size = int(square_size / 2) | |
| h, w, _ = image.shape | |
| x1 = int(x - half_size - (square_size * 0.2)) | |
| y1 = int(y - half_size) | |
| x2 = int(x + half_size) | |
| y2 = int(y + half_size) | |
| if x1 < 0: x1 = 0 | |
| if y1 < 0: y1 = 0 | |
| if x2 > w: x2 = w | |
| if y2 > h: y2 = h | |
| cropped_image = image[y1:y2, x1:x2] | |
| if constants.SAVE_CROPPED_IMAGE: | |
| output_folder = os.path.join(os.path.dirname(__file__), constants.CROP_OUTPUT_FOLDER) | |
| if not os.path.exists(output_folder): | |
| os.makedirs(output_folder) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
| filename = f"{timestamp}_{group_index}_{uuid.uuid4()}.png" | |
| cv2.imwrite(os.path.join(output_folder, filename), cropped_image) | |
| return cropped_image | |
| # Crop and save meter marks for all groups with high confidence | |
| for i, mark_group in enumerate(pose_results): | |
| if mark_group[0][2] >= constants.CONF_THRESHOLD: | |
| extract_meter_mark(original_image, mark_group, i) | |
| lowest_mark_group = find_lowest_mark_group(pose_results) | |
| if lowest_mark_group is None: | |
| logging.error("No lowest mark group found.") | |
| return -1 | |
| logging.info(f"Lowest mark group found: {lowest_mark_group}") | |
| meter_mark_image = extract_meter_mark(original_image, lowest_mark_group, -1) | |
| meter_value_str = ocr_placeholder.perform_ocr(meter_mark_image) | |
| meter_value = int(meter_value_str.replace('m', '')) | |
| logging.info(f"Meter value from OCR: {meter_value}m") | |
| last_valid_keypoint = None | |
| last_valid_keypoint_index = -1 | |
| for i, keypoint in reversed(list(enumerate(lowest_mark_group))): | |
| if keypoint[2] >= constants.CONF_THRESHOLD: | |
| last_valid_keypoint = keypoint | |
| last_valid_keypoint_index = i | |
| break | |
| if last_valid_keypoint is None: | |
| logging.error("No last valid keypoint found.") | |
| return -1 | |
| logging.info(f"Last valid keypoint found: {mark_names[last_valid_keypoint_index]} ({last_valid_keypoint_index}) at coordinates {last_valid_keypoint[:2]}") | |
| x, y, _ = last_valid_keypoint | |
| # Find the water line in the segment mask | |
| column = segment_mask[:, int(x)] | |
| water_line_indices = np.where(column > 0) | |
| if len(water_line_indices[0]) > 0: | |
| water_line_top_y = water_line_indices[0][0] | |
| water_line_bottom_y = water_line_indices[0][-1] | |
| # Define the waterline segment as a vertical line in the column of the keypoint | |
| segment_start = (x, water_line_top_y) | |
| segment_end = (x, water_line_bottom_y) | |
| pixel_distance = utils.distance_point_to_segment((x, y), segment_start, segment_end) | |
| else: | |
| logging.error("No water line found.") | |
| return -1 | |
| logging.info(f"Pixel distance between keypoint and water line: {pixel_distance}") | |
| distances = [] | |
| for i in range(len(lowest_mark_group) - 1): | |
| if lowest_mark_group[i][2] >= constants.CONF_THRESHOLD and lowest_mark_group[i+1][2] >= constants.CONF_THRESHOLD: | |
| distances.append(utils.calc_distance(lowest_mark_group[i], lowest_mark_group[i+1])) | |
| if not distances: | |
| logging.error("No valid consecutive keypoints found to calculate 20cm in pixels.") | |
| return -1 | |
| twenty_cm_in_pixels = np.mean(distances) | |
| logging.info(f"20cm in pixels: {twenty_cm_in_pixels}") | |
| cm_distance = (pixel_distance / twenty_cm_in_pixels) * 20 | |
| logging.info(f"Distance in cm between keypoint and water line: {cm_distance}") | |
| last_valid_keypoint_cm = (100 - (last_valid_keypoint_index * 20)) | |
| logging.info(f"Last valid keypoint cm value: {last_valid_keypoint_cm}") | |
| final_draft_cm = (last_valid_keypoint_cm + 5) - cm_distance | |
| logging.info(f"Final draft cm value: {final_draft_cm}") | |
| final_draft = (meter_value - 1) + (final_draft_cm / 100) | |
| logging.info(f"Final calculated draft: {final_draft}") | |
| mid_results = { | |
| 'meter_value': meter_value, | |
| 'last_valid_keypoint_cm': last_valid_keypoint_cm, | |
| 'cm_distance': cm_distance, | |
| 'final_draft_cm': final_draft_cm, | |
| } | |
| return final_draft, mid_results | |