Spaces:
Sleeping
Sleeping
File size: 6,181 Bytes
6a6918c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
import logging
import numpy as np
import cv2
import os
from datetime import datetime
import uuid
from . import constants
from . import utils
from . import ocr_placeholder
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def calculate_draft(pose_results, segment_data, original_image):
# Ensure original_image is a NumPy array
if not isinstance(original_image, np.ndarray):
original_image = np.array(original_image) # Attempt to convert if not already
# Create the segment mask internally
mask = np.zeros(original_image.shape[:2], dtype=np.uint8)
if len(segment_data)>1:
r = [len(i) for i in segment_data]
segment_data = segment_data[np.argmax(r)]
pts = np.array(segment_data, dtype=np.int32)
cv2.fillPoly(mask, [pts], 1)
segment_mask = mask
"""
Calculates the draft measurement.
"""
mark_names = ["meter mark", "80cm mark", "60cm mark", "40cm mark", "20cm mark"]
def find_lowest_mark_group(pose_results):
lowest_mark_group = None
max_y = -1
for mark_group in pose_results:
last_valid_keypoint = None
for keypoint in reversed(mark_group):
if keypoint[2] >= constants.CONF_THRESHOLD:
last_valid_keypoint = keypoint
break
if last_valid_keypoint is not None:
_, y, _ = last_valid_keypoint
if y > max_y:
max_y = y
lowest_mark_group = mark_group
return lowest_mark_group
def extract_meter_mark(image, mark_group, group_index):
first_keypoint = mark_group[0]
x, y, _ = first_keypoint
twenty_cm_in_pixels = utils.calc_distance(mark_group[0], mark_group[1])
square_size = utils.calc_sqr_size(constants.DEFAULT_SQUARE_SIZE_CM, twenty_cm_in_pixels)
square_size *= 1.2
half_size = int(square_size / 2)
h, w, _ = image.shape
x1 = int(x - half_size - (square_size * 0.2))
y1 = int(y - half_size)
x2 = int(x + half_size)
y2 = int(y + half_size)
if x1 < 0: x1 = 0
if y1 < 0: y1 = 0
if x2 > w: x2 = w
if y2 > h: y2 = h
cropped_image = image[y1:y2, x1:x2]
if constants.SAVE_CROPPED_IMAGE:
output_folder = os.path.join(os.path.dirname(__file__), constants.CROP_OUTPUT_FOLDER)
if not os.path.exists(output_folder):
os.makedirs(output_folder)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"{timestamp}_{group_index}_{uuid.uuid4()}.png"
cv2.imwrite(os.path.join(output_folder, filename), cropped_image)
return cropped_image
# Crop and save meter marks for all groups with high confidence
for i, mark_group in enumerate(pose_results):
if mark_group[0][2] >= constants.CONF_THRESHOLD:
extract_meter_mark(original_image, mark_group, i)
lowest_mark_group = find_lowest_mark_group(pose_results)
if lowest_mark_group is None:
logging.error("No lowest mark group found.")
return -1
logging.info(f"Lowest mark group found: {lowest_mark_group}")
meter_mark_image = extract_meter_mark(original_image, lowest_mark_group, -1)
meter_value_str = ocr_placeholder.perform_ocr(meter_mark_image)
meter_value = int(meter_value_str.replace('m', ''))
logging.info(f"Meter value from OCR: {meter_value}m")
last_valid_keypoint = None
last_valid_keypoint_index = -1
for i, keypoint in reversed(list(enumerate(lowest_mark_group))):
if keypoint[2] >= constants.CONF_THRESHOLD:
last_valid_keypoint = keypoint
last_valid_keypoint_index = i
break
if last_valid_keypoint is None:
logging.error("No last valid keypoint found.")
return -1
logging.info(f"Last valid keypoint found: {mark_names[last_valid_keypoint_index]} ({last_valid_keypoint_index}) at coordinates {last_valid_keypoint[:2]}")
x, y, _ = last_valid_keypoint
# Find the water line in the segment mask
column = segment_mask[:, int(x)]
water_line_indices = np.where(column > 0)
if len(water_line_indices[0]) > 0:
water_line_top_y = water_line_indices[0][0]
water_line_bottom_y = water_line_indices[0][-1]
# Define the waterline segment as a vertical line in the column of the keypoint
segment_start = (x, water_line_top_y)
segment_end = (x, water_line_bottom_y)
pixel_distance = utils.distance_point_to_segment((x, y), segment_start, segment_end)
else:
logging.error("No water line found.")
return -1
logging.info(f"Pixel distance between keypoint and water line: {pixel_distance}")
distances = []
for i in range(len(lowest_mark_group) - 1):
if lowest_mark_group[i][2] >= constants.CONF_THRESHOLD and lowest_mark_group[i+1][2] >= constants.CONF_THRESHOLD:
distances.append(utils.calc_distance(lowest_mark_group[i], lowest_mark_group[i+1]))
if not distances:
logging.error("No valid consecutive keypoints found to calculate 20cm in pixels.")
return -1
twenty_cm_in_pixels = np.mean(distances)
logging.info(f"20cm in pixels: {twenty_cm_in_pixels}")
cm_distance = (pixel_distance / twenty_cm_in_pixels) * 20
logging.info(f"Distance in cm between keypoint and water line: {cm_distance}")
last_valid_keypoint_cm = (100 - (last_valid_keypoint_index * 20))
logging.info(f"Last valid keypoint cm value: {last_valid_keypoint_cm}")
final_draft_cm = (last_valid_keypoint_cm + 5) - cm_distance
logging.info(f"Final draft cm value: {final_draft_cm}")
final_draft = (meter_value - 1) + (final_draft_cm / 100)
logging.info(f"Final calculated draft: {final_draft}")
mid_results = {
'meter_value': meter_value,
'last_valid_keypoint_cm': last_valid_keypoint_cm,
'cm_distance': cm_distance,
'final_draft_cm': final_draft_cm,
}
return final_draft, mid_results
|