Spaces:
Runtime error
Runtime error
| import os | |
| import cv2 | |
| import numpy as np | |
| import time | |
| from tqdm import tqdm | |
| import random | |
| # from shapely.geometry import Point, Polygon | |
| from numpy.linalg import svd | |
| from collections import namedtuple | |
| from vis_common import get_logger | |
| from typing import Any, Dict, List, Optional, Type, Union | |
| logger = get_logger('v_utils') | |
| import pdb | |
| b = pdb.set_trace | |
| IMAGE_EXTS = ['jpg', 'png', 'jpeg', 'JPG', 'PNG', 'JPEG'] | |
| PALETTE = [ | |
| (0.12156862745098039, 0.4666666666666667, 0.7058823529411765), | |
| (0.6823529411764706, 0.7803921568627451, 0.9098039215686274), | |
| (1.0, 0.4980392156862745, 0.054901960784313725), | |
| (1.0, 0.7333333333333333, 0.47058823529411764), | |
| (0.17254901960784313, 0.6274509803921569, 0.17254901960784313), | |
| (0.596078431372549, 0.8745098039215686, 0.5411764705882353), | |
| (0.8392156862745098, 0.15294117647058825, 0.1568627450980392), | |
| (1.0, 0.596078431372549, 0.5882352941176471), | |
| (0.5803921568627451, 0.403921568627451, 0.7411764705882353), | |
| (0.7725490196078432, 0.6901960784313725, 0.8352941176470589), | |
| (0.5490196078431373, 0.33725490196078434, 0.29411764705882354), | |
| (0.7686274509803922, 0.611764705882353, 0.5803921568627451), | |
| (0.8901960784313725, 0.4666666666666667, 0.7607843137254902), | |
| (0.9686274509803922, 0.7137254901960784, 0.8235294117647058), | |
| (0.4980392156862745, 0.4980392156862745, 0.4980392156862745), | |
| (0.7803921568627451, 0.7803921568627451, 0.7803921568627451), | |
| (0.7372549019607844, 0.7411764705882353, 0.13333333333333333), | |
| (0.8588235294117647, 0.8588235294117647, 0.5529411764705883), | |
| (0.09019607843137255, 0.7450980392156863, 0.8117647058823529), | |
| (0.6196078431372549, 0.8549019607843137, 0.8980392156862745), | |
| ] | |
| def check_file_in_paths(paths, filename): | |
| for path in paths: | |
| file = os.path.join(path, filename) | |
| print(file) | |
| if os.path.exists(file): | |
| print(file) | |
| return True | |
| return False | |
| def clean_backslash(dir): | |
| while dir[-1] == '/': | |
| dir = dir[:-1] | |
| return dir | |
| def odgt2txt(odgt_file, | |
| txt_file, | |
| image_key='image', | |
| segment_key='segment'): | |
| import io_utils as io_uts | |
| odgt = io_uts.load_odgt(odgt_file) | |
| f = open(txt_file, 'w') | |
| for item in odgt: | |
| string = f"{item[image_key]} {item[segment_key]}\n" | |
| f.write(string) | |
| f.close() | |
| print("done") | |
| def single_thresh(args, mark_ignore=True): | |
| """ | |
| threshold 255, 128, 0 type of label for a binary label | |
| """ | |
| image_name, label_name, out_label_name = args | |
| image = cv2.imread(image_name, cv2.IMREAD_UNCHANGED) | |
| mask_org = cv2.imread(label_name, cv2.IMREAD_UNCHANGED) | |
| if not (image.shape[0] / image.shape[1] == mask_org.shape[0] / mask_org.shape[1]): | |
| # rotate match | |
| if mask_org.shape[1] / mask_org.shape[0] == image.shape[0] / image.shape[1]: | |
| mask_org = cv2.rotate(mask_org, cv2.cv2.ROTATE_90_CLOCKWISE) | |
| print(image_name, label_name, f"shape not match {mask_org.shape} vs {image.shape}") | |
| else: | |
| print(image_name, label_name, "shape not match even rotation") | |
| assert False | |
| name = basename(label_name) | |
| if mask_org.ndim == 3: | |
| mask_org = mask_org[:, :, 0] | |
| mask = np.zeros_like(mask_org) | |
| mask[mask_org > 172] = 1 | |
| if mark_ignore: | |
| ignore_region = np.logical_and( | |
| mask_org <= 172, | |
| mask_org >= 70) | |
| mask[ignore_region] = 255 | |
| cv2.imwrite(out_label_name, np.uint8(mask)) | |
| def find_file_w_exts(filename, exts, w_dot=False): | |
| appex = '.' if w_dot else '' | |
| for ext in exts: | |
| if os.path.exists(f"{filename}{appex}{ext}"): | |
| return True, f"{filename}{appex}{ext}" | |
| return False, None | |
| def seg_folder_to_txt(image_folder, label_folder, root, | |
| output_file): | |
| exts = ['jpg', 'png', 'jpeg'] | |
| image_files = list_all_files(image_folder, exts) | |
| f = open(output_file, 'w') | |
| for image_file in tqdm(image_files): | |
| image_name = basename(image_file) | |
| label_file = f"{label_folder}/{image_name}.png" | |
| assert os.path.exists(label_file), f"{image_file} {label_file}" | |
| image_file = image_file.replace(root, '.') | |
| label_file = label_file.replace(root, '.') | |
| string = f"{image_file} {label_file}\n" | |
| f.write(string) | |
| f.close() | |
| print("done") | |
| def wait_for_file(filename, step=5.0): | |
| count = 0.0 | |
| while not os.path.exists(): | |
| time.sleep(step) | |
| count += step | |
| time.sleep(step) | |
| print(f"found {filename} after {count}s") | |
| def get_trimap_by_binary(img, eradius=20, dradius=20): | |
| kernel = np.ones((radius, radius),np.uint8) | |
| erosion = cv2.erode(img, kernel, iterations = 1) | |
| dilation = cv2.dilate(img, kernel, iterations = 1) | |
| trimap = img.copy() | |
| mask = np.logical_and(dilation > 0, erosion == 0) | |
| trimap[mask] = 128 | |
| return trimap | |
| def get_matting_trimap(segment, eradius = 30, dradius = 30): | |
| # find the highest box, dilate segment | |
| dilate_ker = np.ones((dradius, dradius), np.uint8) | |
| shrink_ker = np.ones((eradius, eradius), np.uint8) | |
| segment_out = cv2.dilate(segment, dilate_ker, iterations=1) | |
| segment_in = cv2.erode(segment, shrink_ker, iterations=1) | |
| segment_image = np.zeros_like(segment, dtype=np.uint8) | |
| segment_image[segment_out > 0] = 128 | |
| segment_image[segment_in > 0] = 255 | |
| return segment_image | |
| def get_trimap_by_thresh(): | |
| pass | |
| def Mat2EulerImage(mat: np.ndarray, Image): | |
| channel = 1 if mat.ndim == 2 else mat.shape[-1] | |
| return Image( | |
| data=mat.tobytes(), | |
| rows=mat.shape[0], | |
| cols=mat.shape[1], | |
| channel=channel | |
| ) | |
| def EulerImagetoMat(res, channel=1): | |
| """ | |
| for euler thrift, usually a image is set as | |
| struct Image { | |
| 1: binary data, // cv::imencode(".png", image), should be bgr image | |
| 2: i32 rows, | |
| 3: i32 cols, | |
| 4: i32 channel | |
| } | |
| here we transform back | |
| """ | |
| data = res.data | |
| if channel > 1: | |
| return np.fromstring(data, dtype=np.uint8).reshape( | |
| (res.rows, res.cols, channel)) | |
| return np.fromstring(data, dtype=np.uint8).reshape( | |
| (res.rows, res.cols)) | |
| """ | |
| encode the name of an image with chinese | |
| """ | |
| class NameCoder(): | |
| def __init__(self, root_dir): | |
| self.root_dir = root_dir | |
| def __call__(self, name): | |
| import pinyin as py | |
| return py.get(name.replace( | |
| self.root_dir, '').replace('/', '_').replace(' ', '_'), | |
| format='strip') | |
| def basename(path): | |
| return os.path.splitext(os.path.basename(path))[0] | |
| def ext(path): | |
| return os.path.splitext(os.path.basename(path))[1][1:] | |
| def get_cur_abs_path(some_file): | |
| return os.path.dirname(os.path.abspath(some_file)) | |
| def list_all_files(directory, exts=None, recursive=True): | |
| import glob | |
| all_files = [] | |
| if exts is None: | |
| exts = IMAGE_EXTS | |
| for ext in exts: | |
| if not recursive: | |
| files = glob.glob("%s/*%s" % (directory, ext), | |
| recursive=recursive) | |
| else: | |
| files = glob.glob("%s/**/*%s" % (directory, ext), | |
| recursive=recursive) | |
| all_files = all_files + files | |
| all_files = sorted(all_files) | |
| return all_files | |
| def list_all_folders(directory): | |
| import glob | |
| folders = glob.glob(f"{directory}/*/") | |
| return folders | |
| def list_all(folder, exts=None, recur=False): | |
| if exts is None: | |
| return list_all_folders(folder) | |
| else: | |
| return list_all_files(folder, exts, recur) | |
| def split_path(folder): | |
| blocks = folder.split('/') | |
| return [name for name in blocks if name != ''] | |
| def dump_image(pred, res_file, score=True, dim='CHW'): | |
| if score: | |
| dump_prob2image(res_file, pred, dim=dim) | |
| else: | |
| res_file = res_file + '.png' | |
| cv2.imwrite(res_file, np.uint8(pred)) | |
| def dump_prob2image(filename, array, dim='CHW'): | |
| """ | |
| dump probility map to image when | |
| array: [x, height, width] (x = 1, 3, 4) | |
| """ | |
| if dim == 'CHW': | |
| array = np.transpose(np.uint8(array * 255), (1, 2, 0)) | |
| class_num = array.shape[2] | |
| # assert class_num <= 4 | |
| if class_num >= 4 : | |
| print('warning: only save the first 3 channels') | |
| array = array[:, :, :3] | |
| if class_num == 2: | |
| array = array[:, :, 1] | |
| cv2.imwrite(filename + '.png', array) | |
| def load_image2prob(filename): | |
| if not filename.endswith('.png'): | |
| filename = filename + '.png' | |
| array = cv2.imread(filename, cv2.IMREAD_UNCHANGED) | |
| array = np.transpose(array, (2, 0, 1)) / 255 | |
| return array | |
| def mask2box(mask): | |
| """ | |
| t, l, b, r | |
| y0, x0, y1, x1 | |
| """ | |
| y, x = np.where(mask > 0) | |
| return [np.min(y), np.min(x), np.max(y), np.max(x)] | |
| def dilate_mask(mask, kernel=20): | |
| mask = np.uint8(mask) | |
| kernel = np.ones((kernel, kernel), np.uint8) | |
| mask_out = cv2.dilate(mask, kernel, iterations=1) | |
| return mask_out | |
| def erode_mask(mask, kernel=20): | |
| kernel = np.ones((kernel, kernel), np.uint8) | |
| mask_out = cv2.erode(mask, kernel, iterations=1) | |
| return mask_out | |
| def pack_argument(args, arg_names): | |
| """ | |
| args: object of all arguments | |
| arg_names: list of string name for needed arguments | |
| """ | |
| kwargs = {} | |
| for arg_name in arg_names: | |
| cur_args = getattr(args, arg_name) if hasattr(args, arg_name) else None | |
| if cur_args: | |
| kwargs[arg_name] = cur_args | |
| return kwargs | |
| def line_segment_cross(seg1, seg2): | |
| """ | |
| :param seg1: [start, end] | |
| :param seg2: [start, end] | |
| :return: | |
| True if cross, false otherwise | |
| """ | |
| def ccw(A, B, C): | |
| return (C.y - A.y) * (B.x - A.x) > (B.y - A.y) * (C.x - A.x) | |
| # Return true if line segments AB and CD intersect | |
| def intersect(A, B, C, D): | |
| return ccw(A, C, D) != ccw(B, C, D) and ccw(A, B, C) != ccw(A, B, D) | |
| Point = namedtuple('Point', 'x y') | |
| A = Point(seg1[0][0], seg1[0][1]) | |
| B = Point(seg1[1][0], seg1[1][1]) | |
| C = Point(seg2[0][0], seg2[0][1]) | |
| D = Point(seg2[1][0], seg2[1][1]) | |
| return intersect(A, B, C, D) | |
| def pts_in_line(pts, lines, th=10): | |
| """ | |
| pts: [x, y] | |
| lines: [[x0, y0, x1, y1]] | |
| """ | |
| count = 0 | |
| for line in lines: | |
| x, y = pts | |
| x0, y0, x1, y1 = line | |
| dir0 = np.array([x - x0, y - y0]) | |
| dir1 = np.array([x1 - x0, y1 - y0]) | |
| diff = min(angle_diff(dir0, dir1), | |
| angle_diff(-1 * dir0, dir1)) | |
| if diff < th: | |
| count += 1 | |
| return count | |
| def out_of_bound(pt, sz): | |
| x, y = pt | |
| h, w = sz | |
| return x < 0 or y < 0 or x >= w or y >= h | |
| def pts_in_mask(pts, mask, allow_out=True): | |
| """ | |
| pts: n x 2 x, y location | |
| return len n mask | |
| """ | |
| idx = np.zeros(pts.shape[0]) > 0 | |
| for i, pt in enumerate(pts): | |
| x, y = pt | |
| if out_of_bound(pt, mask.shape): | |
| continue | |
| if mask[y, x] > 0: | |
| idx[i] = True | |
| return idx | |
| def pts_in_poly(pts, poly, sz): | |
| """ | |
| pts: n x 2 x, y location | |
| return len n mask | |
| """ | |
| mask = np.ones(sz) | |
| cv2.fillPoly(mask, | |
| pts=[np.int0(poly)], | |
| color=(1,)) | |
| return pts_in_mask(pts, mask) | |
| def line_intersect_pt(lines: np.array, randsac=True): | |
| """ | |
| lines: n x 4, [s, e] of line | |
| return: intersect_pt, is_parallel | |
| """ | |
| if lines.shape[0] < 2: | |
| raise ValueError('not enough line') | |
| num = lines.shape[0] | |
| line_id0 = 0 | |
| max_correct = 2 | |
| best_vp = None | |
| for line_id0 in range(num): | |
| for i in range(num): | |
| if i == line_id0: | |
| continue | |
| lines_cur = lines[[line_id0, i], :] | |
| N = 2 | |
| p1 = np.column_stack((lines_cur[:, :2], np.ones(N, dtype=np.float32))) | |
| p2 = np.column_stack((lines_cur[:, 2:], np.ones(N, dtype=np.float32))) | |
| cross_p = np.cross(p1, p2) | |
| vp1 = np.cross(cross_p[0], cross_p[1]) | |
| if vp1[2] < 1e-5: | |
| continue | |
| vp1 /= vp1[2] | |
| correct = pts_in_line(vp1[:2], lines) | |
| if max_correct <= correct: | |
| best_vp = vp1[:2] | |
| max_correct = correct | |
| if best_vp is not None: | |
| return best_vp, False | |
| return None, True | |
| def angle_diff(ba, bc, axis=None): | |
| norma = np.linalg.norm(ba, axis=axis) | |
| normb = np.linalg.norm(bc, axis=axis) | |
| dot_prod = np.sum(ba * bc, axis=axis) | |
| cosine_angle = dot_prod / (norma * normb) | |
| angle = np.arccos(cosine_angle) * 180.0 / np.pi | |
| return angle | |
| def on_right_side(rect, sz): | |
| # judge whether rect side | |
| h, w = sz | |
| cx = w // 2 | |
| return all([pt[0] >= cx for pt in rect]) | |
| def pts_angle(pts): | |
| """ | |
| pts [3 x 2] | |
| """ | |
| ba = pts[0] - pts[1] | |
| bc = pts[2] - pts[1] | |
| angle = angle_diff(ba, bc) | |
| return angle | |
| def sample_points(mask, num_points=100): | |
| # Get the indices where mask values are greater than 0 | |
| indices = np.argwhere(mask > 0) | |
| # Randomly select num_points indices | |
| selected_indices = np.random.choice(indices.shape[0], size=num_points, replace=False) | |
| # Get the selected points | |
| selected_points = indices[selected_indices] | |
| return selected_points | |
| def valid_para_ratio(pts, th=5): | |
| """ | |
| pts: [4 x 2] | |
| """ | |
| def valid_ratio(ratio): | |
| return 1.0 / th < ratio < th | |
| ratio0 = line_len(pts[0], pts[1]) / line_len(pts[2], pts[3]) | |
| if not valid_ratio(ratio0): | |
| return False | |
| ratio1 = line_len(pts[1], pts[2]) / line_len(pts[3], pts[0]) | |
| if not valid_ratio(ratio1): | |
| return False | |
| return True | |
| def line_len(pt0, pt1): | |
| """ | |
| pt0, 1: [1x2] | |
| """ | |
| return np.linalg.norm(pt0 - pt1) | |
| def split_list(seq, part): | |
| """ | |
| split a list to sub lists | |
| """ | |
| size = len(seq) / part + 1 if part > 0 else 1 | |
| size = int(size) | |
| return [seq[i:i+size] for i in range(0, len(seq), size)] | |
| def find_portion(mask, portion_x, portion_y, th=0): | |
| if mask.ndim > 2: | |
| raise ValueError(f"mask must be 2 dim, now {mask.ndim}") | |
| y, x = np.where(mask > th) | |
| x = np.percentile(x, portion_x) | |
| y = np.percentile(y, portion_y) | |
| return int(x), int(y) | |
| def random_split(num, portion=0.1, max_num=1000): | |
| """ | |
| num: length of list | |
| max_num is val num | |
| return: | |
| train, val list | |
| """ | |
| val_num = min(portion * num, max_num) | |
| val_num = int(val_num) | |
| idx = [i for i in range(num)] | |
| random.shuffle(idx) | |
| return idx[val_num:], idx[:val_num] | |
| def shuffle_list(list_in): | |
| return random.shuffle(list_in) | |
| def pick(lst, idx): | |
| return [lst[i] for i in idx] | |
| def mkdir_if_need(folder): | |
| if not os.path.exists(folder): | |
| os.makedirs(folder) | |
| def mkdir_if_exists(path, image_name): | |
| target_path = os.path.join(path, os.path.dirname(image_name)) | |
| if not os.path.exists(target_path): | |
| os.makedirs(target_path) | |
| def mkdir(folder, image_name=None): | |
| if image_name is not None: | |
| mkdir_if_exists(folder, image_name) | |
| return | |
| mkdir_if_need(folder) | |
| return folder | |
| return folder | |
| def save_image_w_pallete(segment, file_name): | |
| import PIL.Image as Image | |
| pallete = get_pallete(256) | |
| segmentation_result = np.uint8(segment) | |
| segmentation_result = Image.fromarray(segmentation_result) | |
| segmentation_result.putpalette(pallete) | |
| segmentation_result.save(file_name) | |
| def get_max_size(out_size, max_len): | |
| height, width = out_size | |
| scale = max(height, width) / max_len | |
| if scale > 1: | |
| height, width = np.uint32( np.array(out_size) / scale) | |
| return height ,width | |
| def get_pallete(num_cls): | |
| """ | |
| this function is to get the colormap for visualizing | |
| the segmentation mask | |
| :param num_cls: the number of visulized class | |
| :return: the pallete | |
| """ | |
| n = num_cls | |
| pallete = [0]*(n*3) | |
| for j in range(0,n): | |
| lab = j | |
| pallete[j*3+0] = 0 | |
| pallete[j*3+1] = 0 | |
| pallete[j*3+2] = 0 | |
| i = 0 | |
| while (lab > 0): | |
| pallete[j*3+0] |= (((lab >> 0) & 1) << (7-i)) | |
| pallete[j*3+1] |= (((lab >> 1) & 1) << (7-i)) | |
| pallete[j*3+2] |= (((lab >> 2) & 1) << (7-i)) | |
| i = i + 1 | |
| lab >>= 3 | |
| return pallete | |
| def color2label(label_color, color_map=None): | |
| """ | |
| Convert color image to semantic id based on color_map | |
| color_map = {$rgb: $label_id} | |
| if color map is None. Then we treat 0 as background and all none | |
| zero ids as label id | |
| """ | |
| # default bkg 255 | |
| label_color = np.int32(label_color) | |
| height, width = label_color.shape[0:2] | |
| label = label_color[:, :, 0] * (255 ** 2) + \ | |
| label_color[:, :, 1] * 255 + \ | |
| label_color[:, :, 2] | |
| label_id = np.unique(label) | |
| if color_map is None: | |
| for i, id in enumerate(label_id): | |
| if id == 0: | |
| continue | |
| mask = label == id | |
| label[mask] = i | |
| return label | |
| for rgb, i in color_map.items(): | |
| cur_num = rgb[0] * (255 ** 2) + rgb[1] * 255 + rgb[2] | |
| if cur_num in label_id: | |
| mask = (label - cur_num) != 0 | |
| label = label * mask + i * (1 - mask) | |
| return label | |
| def flow2color(flow): | |
| assert flow.shape[2] == 2 | |
| hsv = np.zeros((flow.shape[0], | |
| flow.shape[1], 3), | |
| dtype=np.float32) | |
| hsv[...,1] = 255 | |
| mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1]) | |
| hsv[...,0] = ang * 180 / np.pi / 2 | |
| hsv[...,2] = cv2.normalize(mag,None,0,255,cv2.NORM_MINMAX) | |
| rgb = cv2.cvtColor(np.uint8(hsv), cv2.COLOR_HSV2BGR) | |
| return hsv, rgb | |
| def colorEncode(labelmap, colors, mode='RGB'): | |
| labelmap = labelmap.astype('int') | |
| labelmap_rgb = np.zeros((labelmap.shape[0], labelmap.shape[1], 3), | |
| dtype=np.uint8) | |
| for label in np.unique(labelmap): | |
| if label < 0: | |
| continue | |
| labelmap_rgb += (labelmap == label)[:, :, np.newaxis] * \ | |
| np.tile(colors[label], | |
| (labelmap.shape[0], labelmap.shape[1], 1)) | |
| if mode == 'BGR': | |
| return labelmap_rgb[:, :, ::-1] | |
| else: | |
| return labelmap_rgb | |
| def drawBoundingbox(image, boxes, colors=None): | |
| """ | |
| boxes: t, l, b r | |
| """ | |
| if colors is None: | |
| colors = [[255, 255, 0]] * len(boxes) | |
| for color, box in zip(colors, boxes): | |
| box = box.astype(np.uint32) | |
| t, l, b, r = box[0], box[1], box[2], box[3] | |
| cv2.rectangle(image, (l, t), (r, b), color, 2) | |
| return image | |
| def round2stride(length, stride): | |
| return (length // stride) * stride | |
| def resize_rect(rect, sz_src, sz_tgt): | |
| """ | |
| :param rect: n x 4 x 2 rectangles | |
| :param sz_src: (height, width) | |
| :param sz_tgt: | |
| :return: | |
| """ | |
| if len(rect) == 0: | |
| return rect | |
| height, width = sz_src | |
| height_tgt, width_tgt = sz_tgt | |
| rect[:, :, 0] = np.int64(rect[:, :, 0] * width_tgt / width) | |
| rect[:, :, 1] = np.int64(rect[:, :, 1] * height_tgt / height) | |
| return rect | |
| def resize_lines(lines, sz_src, sz_tgt): | |
| """ | |
| :param lines: [n x 4 ] each line [start (x, y), end (x, y)] | |
| :param sz_src: | |
| :param sz_tgt: | |
| :return: | |
| """ | |
| assert lines.shape[1] == 2 | |
| lines = lines.reshape([-1, 2, 2]) | |
| lines = resize_rect(lines, sz_src, sz_tgt) | |
| lines = lines.reshape([-1, 4]) | |
| return lines | |
| def resize_LShape(lShapes, sz_src, sz_tgt): | |
| """ | |
| :param lShapes: [n x 6] | |
| :param sz_src: | |
| :param sz_tgt: | |
| :return: | |
| """ | |
| assert lShapes.shape[1] == 3 | |
| lShapes = lShapes.reshape([-1, 3, 2]) | |
| lShapes = resize_rect(lShapes, sz_src, sz_tgt) | |
| lShapes = lShapes.reshape([-1, 6]) | |
| return lShapes | |
| def resize_to_fix_side(image, size=960, fix_type='height'): | |
| if fix_type == "height": | |
| scale = size / image.shape[0] | |
| height, width = size, int(scale * image.shape[1]) | |
| elif fix_type == "width": | |
| scale = size / image.shape[1] | |
| height, width = int(scale * image.shape[0]), size | |
| else: | |
| raise ValueError("fix type must in [height, widht]") | |
| image = cv2.resize(image, (width, height)) | |
| return image | |
| def resize_like(image, src, side="all", interpolation=None): | |
| """ | |
| resize image like src | |
| """ | |
| shape = src.shape[:2] | |
| if interpolation is None: | |
| interpolation = cv2.INTER_CUBIC | |
| if side != "all": | |
| size = shape[0] if side == "height" else shape[1] | |
| image = resize_to_fix_side(image, size, fix_type=side) | |
| return image | |
| image = cv2.resize(image, (shape[1], shape[0]), | |
| interpolation=interpolation) | |
| return image | |
| def getmaxsize(shape, size=720, fixSide=False): | |
| """ | |
| input: [h, w, c] | |
| output: [w, h] | |
| """ | |
| height, width = shape[:2] | |
| scale = max(height, width) / size | |
| height, width = np.uint32(np.array(shape[:2]) / scale) | |
| if fixSide: | |
| return (width, height) | |
| else: | |
| if scale > 1: | |
| return (width, height) | |
| else: | |
| return (shape[1], shape[0]) | |
| def resize2size(images, size, interpolations=None): | |
| """ | |
| :param images: | |
| :param size: width height | |
| :param interpolations: | |
| :return: | |
| """ | |
| if interpolations is None: | |
| interpolations = [cv2.INTER_LINEAR for _ in range(len(images))] | |
| for i, (image, interpolation) in enumerate(zip(images, interpolations)): | |
| if interpolation is None: | |
| interpolation = cv2.INTER_LINEAR | |
| if image is None: | |
| print(f"{i}_th image is None") | |
| image = cv2.resize(image, tuple(size), interpolation=interpolation) | |
| images[i] = image | |
| return images | |
| def resize2maxsize(image, | |
| size=720, | |
| interpolation=None, | |
| fixSide=False): | |
| """ | |
| Constraint the maximum length of an image | |
| Args: | |
| fixSide: set image side must be the same as size | |
| """ | |
| if interpolation is None: | |
| interpolation = cv2.INTER_CUBIC | |
| image_out = image.copy() | |
| height, width = image.shape[:2] | |
| scale = max(height, width) / size | |
| if image_out.dtype == 'bool': | |
| image_out = np.uint8(image_out) | |
| height, width = np.uint32(np.array(image.shape[:2]) / scale) | |
| if fixSide: | |
| image_out = cv2.resize(image_out, (width, height), | |
| interpolation=interpolation) | |
| else: | |
| if scale > 1: | |
| image_out = cv2.resize(image_out, (width, height), | |
| interpolation=interpolation) | |
| if image.dtype == bool: | |
| image_out = image_out > 0 | |
| return image_out | |
| def resize2minsize(image, size=256, interpolation=None): | |
| """ | |
| Constraint the minimum length of an image | |
| """ | |
| if size is None: | |
| return image | |
| if interpolation is None: | |
| interpolation = cv2.INTER_CUBIC | |
| height, width = image.shape[:2] | |
| scale = min(height, width) / size | |
| image_out = image.copy() | |
| if image_out.dtype == 'bool': | |
| image_out = np.uint8(image_out) | |
| if scale > 1: | |
| height, width = np.uint32(np.array(image.shape[:2]) / scale) | |
| image_out = cv2.resize(image_out, (width, height), | |
| interpolation=interpolation) | |
| if image.dtype == bool: | |
| image_out = image_out > 0 | |
| return image_out | |
| def resize2minsize(image, size=256, interpolation=None): | |
| """ | |
| Constraint the minimum length of an image | |
| """ | |
| if interpolation is None: | |
| interpolation = cv2.INTER_CUBIC | |
| height, width = image.shape[:2] | |
| scale = min(height, width) / size | |
| image_out = image.copy() | |
| if image_out.dtype == 'bool': | |
| image_out = np.uint8(image_out) | |
| if scale > 1: | |
| height, width = np.uint32(np.array(image.shape[:2]) / scale) | |
| image_out = cv2.resize(image_out, (width, height), | |
| interpolation=interpolation) | |
| if image.dtype == bool: | |
| image_out = image_out > 0 | |
| return image_out | |
| def getimgsizeby(sz, size=960, fix_type='max', stride=1): | |
| height, width = sz | |
| if fix_type == 'min': | |
| scale = min(height, width) / size | |
| elif fix_type == "max": | |
| scale = max(height, width) / size | |
| elif fix_type == 'height': | |
| scale = height / size | |
| elif fix_type == 'width': | |
| scale = width / size | |
| height, width = np.uint32(np.float32(sz) / scale) | |
| if stride > 1: | |
| height = round2stride(height, stride) | |
| width = round2stride(width, stride) | |
| return height, width | |
| def resize2fixSize(image, size=960, fix_type='max', interpolation=None): | |
| if interpolation is None: | |
| interpolation = cv2.INTER_CUBIC | |
| height, width = getimgsizeby(image.shape[:2], size, fix_type) | |
| image_out = image.copy() | |
| if image_out.dtype == 'bool': | |
| image_out = np.uint8(image_out) | |
| image_out = cv2.resize(image_out, (width, height), | |
| interpolation=interpolation) | |
| if image.dtype == bool: | |
| image_out = image_out > 0 | |
| return image_out | |
| def resize2range(image, max_size=720, min_size=480, | |
| interpolation=None, stride=None): | |
| """ | |
| Constraint the maximum length of an image and min size of an image | |
| if conf | |
| """ | |
| if interpolation is None: | |
| interpolation = cv2.INTER_LINEAR | |
| height, width = image.shape[:2] | |
| scale_to_max = max_size / max(height, width) | |
| scale_to_min = min(min_size / min(height, width), | |
| max_size / max(height, width)) | |
| image_out = image.copy() | |
| if scale_to_max < 1: | |
| height, width = np.uint32(np.array(image.shape[:2]) * scale_to_max) | |
| if stride is not None: | |
| height = round2stride(height, stride) | |
| width = round2stride(width, stride) | |
| image_out = cv2.resize(image_out, (width, height), | |
| interpolation=interpolation) | |
| return image_out | |
| else: | |
| if scale_to_min > 1: | |
| height, width = np.uint32(np.array(image.shape[:2]) * scale_to_min) | |
| image_out = cv2.resize(image_out, (width, height), | |
| interpolation=interpolation) | |
| return image_out | |
| return image_out | |
| def resize2maxshape(image, shape, | |
| interpolation=None, | |
| with_scale=False, | |
| mean_value=0): | |
| """ | |
| shape is the target video shape | |
| resize an image to target shape by padding zeros | |
| when ratio is not match | |
| """ | |
| def get_start_end(scale_id, height_new, width_new): | |
| if scale_id == 0: | |
| s_v, e_v = 0, height_new | |
| s_h = int((shape[1] - width_new) / 2) | |
| e_h = s_h + width_new | |
| else: | |
| s_v = int((shape[0] - height_new) / 2) | |
| e_v = s_v + height_new | |
| s_h, e_h = 0, width_new | |
| return s_v, e_v, s_h, e_h | |
| if interpolation is None: | |
| interpolation = cv2.INTER_CUBIC | |
| shape = list(shape) | |
| image_shape = shape if image.ndim == 2 else shape + [image.shape[-1]] | |
| image_out = np.zeros(image_shape) + mean_value | |
| height, width = image.shape[:2] | |
| scale_rate = np.array([shape[0] / height, shape[1] / width]) | |
| scale_id = np.argmin(scale_rate) | |
| scale = scale_rate[scale_id] | |
| image = cv2.resize(image, (int(width * scale), int(height * scale)), | |
| interpolation=interpolation) | |
| height_new, width_new = image.shape[:2] | |
| s_v, e_v, s_h, e_h = get_start_end(scale_id, height_new, width_new) | |
| image_out[s_v:e_v, s_h:e_h] = image | |
| crop = [s_v, s_h, e_v, e_h] # top, left, bottom, right | |
| if not with_scale: | |
| return image_out | |
| else: | |
| return image_out, scale, crop | |
| def bilinear_interpolation(x, y, points): | |
| '''Interpolate (x,y) from values associated with four points. | |
| The four points are a list of four triplets: (x, y, value). | |
| The four points can be in any order. They should form a rectangle. | |
| >>> bilinear_interpolation(12, 5.5, | |
| ... [(10, 4, 100), | |
| ... (20, 4, 200), | |
| ... (10, 6, 150), | |
| ... (20, 6, 300)]) | |
| 165.0 | |
| ''' | |
| # See formula at: http://en.wikipedia.org/wiki/Bilinear_interpolation | |
| points = sorted(points) # order points by x, then by y | |
| (x1, y1, q11), (_x1, y2, q12), (x2, _y1, q21), (_x2, _y2, q22) = points | |
| if x1 != _x1 or x2 != _x2 or y1 != _y1 or y2 != _y2: | |
| raise ValueError('points do not form a rectangle') | |
| if not x1 <= x <= x2 or not y1 <= y <= y2: | |
| raise ValueError('(x, y) not within the rectangle') | |
| return (q11 * (x2 - x) * (y2 - y) + | |
| q21 * (x - x1) * (y2 - y) + | |
| q12 * (x2 - x) * (y - y1) + | |
| q22 * (x - x1) * (y - y1) | |
| ) / ((x2 - x1) * (y2 - y1) + 0.0) | |
| def dump_to_npy(arrays, file_path=None): | |
| """ | |
| dump set of images to array for local visualization | |
| arrays: the input arrays | |
| file_path: saving path | |
| """ | |
| assert isinstance(arrays, dict) | |
| for k, v in arrays.items(): | |
| np.save(os.path.join(file_path, k + '.npy'), v) | |
| def crop(image, box): | |
| """ | |
| box: t, l, b, r | |
| """ | |
| t, l, b, r = box | |
| return image[t:b, l:r] | |
| def padding_image(image_in, | |
| image_size, | |
| crop=None, | |
| interpolation=cv2.INTER_NEAREST, | |
| pad_val=0.): | |
| """Pad image to target image_size based on a given crop | |
| """ | |
| assert isinstance(pad_val, float) | isinstance(pad_val, list) | |
| if image_size[0] <= image_in.shape[0] and \ | |
| image_size[1] <= image_in.shape[1]: | |
| return image_in | |
| image = image_in.copy() | |
| in_dim = np.ndim(image) | |
| if in_dim == 2: | |
| image = image[:, :, None] | |
| if isinstance(pad_val, float): | |
| pad_val = [pad_val] * image.shape[-1] | |
| assert len(pad_val) == image.shape[-1] | |
| dim = image.shape[2] | |
| image_pad = np.ones(image_size + [dim], dtype=image_in.dtype) * \ | |
| np.array(pad_val) | |
| if not (crop is None): | |
| h, w = image_size | |
| crop_cur = np.uint32([crop[0] * h, crop[1] * w, | |
| crop[2] * h, crop[3] * w]) | |
| image = cv2.resize( | |
| image, (crop_cur[3] - crop_cur[1], crop_cur[2] - crop_cur[0]), | |
| interpolation=interpolation) | |
| else: | |
| h, w = image_in.shape[:2] | |
| # default crop is padding center | |
| hp, wp = image_pad.shape[:2] | |
| t, l = int((hp - h) / 2), int((wp - w) / 2) | |
| crop_cur = [t, l, t + h, l + w] | |
| image_pad[crop_cur[0]:crop_cur[2], crop_cur[1]:crop_cur[3], :] = image | |
| if in_dim == 2: | |
| image_pad = np.squeeze(image_pad) | |
| return image_pad | |
| def enlighting_v2(image, value=30): | |
| hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) | |
| h, s, v = cv2.split(hsv) | |
| value = (255 - np.mean(v)) * 0.6 | |
| value = int(value) | |
| lim = 255 - value | |
| v[v > lim] = 255 | |
| v[v <= lim] += value | |
| final_hsv = cv2.merge((h, s, v)) | |
| img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR) | |
| return img | |
| def enlighting(image): | |
| hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) | |
| h, s, v = cv2.split(hsv) | |
| # clahe = cv2.createCLAHE(clipLimit=30, tileGridSize=(8,8)) | |
| # v = clahe.apply(v) | |
| v = cv2.equalizeHist(v) | |
| # v = cv2.add(v, value) | |
| # v[v > 255] = 255 | |
| # v[v < 0] = 0 | |
| final_hsv = cv2.merge((h, s, v)) | |
| img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR) | |
| return img | |
| def white_balance(img): | |
| result = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) | |
| avg_a = np.average(result[:, :, 1]) | |
| avg_b = np.average(result[:, :, 2]) | |
| result[:, :, 1] = result[:, :, 1] - ((avg_a - 128) * (result[:, :, 0] / 255.0) * 1.1) | |
| result[:, :, 2] = result[:, :, 2] - ((avg_b - 128) * (result[:, :, 0] / 255.0) * 1.1) | |
| result = cv2.cvtColor(result, cv2.COLOR_LAB2BGR) | |
| return result | |
| def one_hot(label_map, class_num): | |
| shape = np.array(label_map.shape) | |
| length = np.prod(shape) | |
| label_one_hot = np.zeros((length, class_num)) | |
| label_flat = label_map.flatten() | |
| label_one_hot[range(length), label_flat] = 1 | |
| label_one_hot = label_one_hot.reshape(shape.tolist() + [class_num]) | |
| return label_one_hot | |
| def prob2label(label_prob): | |
| """Convert probability to a descrete label map | |
| """ | |
| assert label_prob.ndim == 3 | |
| return np.argmax(label_prob, axis=2) | |
| """ | |
| label_prob: [0, 1] probability map | |
| """ | |
| def prob2color(label_prob, color_map, bkg_color=[0,0,0]): | |
| """ | |
| color_map: 0-255 [[x, x, x], ...] python list | |
| """ | |
| assert isinstance(color_map, list) | |
| height, width, dim = label_prob.shape | |
| color_map = color_map[:(dim - 1)] | |
| color_map_mat = np.matrix([bkg_color] + color_map) | |
| label_prob_mat = np.matrix(label_prob.reshape((height * width, dim))) | |
| label_color = np.array(label_prob_mat * color_map_mat) | |
| label_color = label_color.reshape((height, width, -1)) | |
| return np.uint8(label_color) | |
| def mix_probimage(prob, image, alpha=0.7): | |
| """ | |
| prob: [h, w, dim] or [h, w] uint8 | |
| """ | |
| if prob.ndim == 2: | |
| prob = prob[:, :, None] | |
| if prob.dtype == 'uint8': | |
| prob = np.float32(prob) / 255.0 | |
| color_map = get_pallete(256) | |
| color_map = np.array(color_map).reshape([-1, 3])[1:, :] | |
| color_map = color_map.tolist() | |
| prob_color = prob2color(prob, color_map) | |
| image = resize_like(image, prob) | |
| mix_image = (1 - alpha) * image + alpha * prob_color | |
| return mix_image | |
| def label2color(label, color_map=None, bkg_color=[0, 0, 0]): | |
| if color_map is None: | |
| color_map = np.uint8(np.array(PALETTE) * 255) | |
| color_map = color_map.tolist() | |
| height, width = label.shape[0:2] | |
| class_num = len(color_map) + 1 | |
| label_one_hot = one_hot(label, class_num) | |
| label_color = prob2color(label_one_hot, color_map, bkg_color) | |
| return label_color | |
| def gif_to_frames(in_path, out_path, max_frame=10000): | |
| import imageio | |
| gif = imageio.get_reader(in_path, '.gif') | |
| # Here's the number you're looking for | |
| for frame_id, frame in tqdm(enumerate(gif)): | |
| filename = '%s/%04d.png'% (out_path, frame_id) | |
| frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| cv2.imwrite(filename, frame) | |
| if frame_id > max_frame: | |
| break | |
| print('finished') | |
| def speedx_video(video_in, video_out, speed): | |
| import moviepy.editor as me | |
| import moviepy | |
| clip = me.VideoFileClip(video_in) | |
| clip = moviepy.video.fx.all.speedx(clip, factor=speedx) | |
| clip.write_videofile(video_out) | |
| def resize_boxes(boxes, image_shape): | |
| """ | |
| boxes: n x 4 [t, l, b, r] | |
| image_shape: height, width | |
| """ | |
| if len(boxes) == 0: | |
| return boxes | |
| boxes = np.array(boxes) | |
| boxes[:, [0, 2]] *= image_shape[0] | |
| boxes[:, [1, 3]] *= image_shape[1] | |
| return boxes | |
| def lens_blur(img, depth_in, fg_depth, | |
| fg_mask=None, NUM_LAYERS = 20): | |
| def layer_mask(dm, s, e): | |
| # copy image dimensions, but fill with zeros | |
| m = np.zeros(dm.shape) | |
| # set values above start threshold to white | |
| m[dm >= s] = 1 | |
| # set values above end threshold to black | |
| m[dm > e] = 0 | |
| return m | |
| def to_multi_mask(mask, ch=3): | |
| return np.tile(mask[:, :, None] > 0, (1, 1, ch)) | |
| depth = depth_in.copy() | |
| out = np.zeros(img.shape) | |
| min_depth = np.min(np.unique(depth)) | |
| max_depth = np.max(np.unique(depth)) | |
| min_depth = int(min_depth / max_depth * 255) | |
| fg_depth = int(fg_depth / max_depth * 255) | |
| depth = np.uint8(depth * 255 / max_depth) | |
| s = (255 - min_depth) // NUM_LAYERS | |
| layers = np.array(range(min_depth, 255, s)) | |
| for i, a in enumerate(layers[:-1]): | |
| if layers[i] < fg_depth and layers[i+1] > fg_depth: | |
| fg_depth = layers[i] | |
| break | |
| for a in layers: | |
| l_mask = layer_mask(depth, a, a+s) | |
| l_mask = to_multi_mask(l_mask) | |
| res = blur_filter(img, np.abs(a - fg_depth)) | |
| out[l_mask] = res[l_mask] | |
| if fg_mask is not None: | |
| fg_mask = np.tile(fg_mask[:, :, None] > 0, (1, 1, 3)) | |
| out[fg_mask] = img[fg_mask] | |
| return out | |
| ############################################### | |
| ### Filters | |
| ############################################### | |
| # Change blur by epsilon value (a) | |
| def blur_filter(img, a): | |
| # increase kernel effect slowly, must be odd | |
| k = (a // 10) + 1 if (a // 10) % 2 == 0 else (a // 10) + 2 | |
| # can't exceed 255 | |
| k = k if k < 255 else 255 | |
| kernel = (k, k) | |
| # blur filter | |
| o = cv2.GaussianBlur(img, kernel, 9) | |
| return o | |
| def box_center(box): | |
| """ | |
| boxes: n x 4 [t, l, b, r] | |
| """ | |
| return (box[1] + box[3]) // 2, (box[0] + box[2]) // 2 | |
| def mean_value(value, mask): | |
| """ | |
| mean value inside mat | |
| """ | |
| if value.ndim == 2: | |
| value = value[:, :, None] | |
| h, w, dim = value.shape | |
| test = value.reshape([-1, dim]) | |
| mean = np.mean(test[mask.flatten(), :], axis=0) | |
| return mean | |
| def is_neighbor_mask(mask0, mask1, min_len=200, kernel=10): | |
| # at least 200 pixel connecting edge | |
| mask = dilate_mask(mask1, kernel=kernel) | |
| intern = np.sum(np.logical_and(mask0 > 0, mask > 0)) | |
| return intern > min_len * kernel | |
| def get_salient_components(segment_in, th=0.1, min_th=25): | |
| """ | |
| :param segment_in: 0, 1 mask | |
| :param th: | |
| :return: | |
| """ | |
| segment = segment_in.copy() | |
| area_org = np.sum(segment) | |
| segment = np.uint8(segment_in * 255) | |
| ret, labels = cv2.connectedComponents(segment) | |
| if ret == 2: | |
| return [segment_in] | |
| masks = [] | |
| for i in range(1, ret): | |
| mask = labels == i | |
| area = np.sum(mask) | |
| if area < area_org * th : | |
| continue | |
| if area < min_th: | |
| continue | |
| masks.append(mask) | |
| return masks | |
| def get_component(segment, criteria='max'): | |
| """ find the largest connected component mask | |
| """ | |
| ret, labels = cv2.connectedComponents(segment) | |
| if ret == 2: | |
| return segment | |
| max_area = 0 | |
| idx = 1 | |
| for i in range(1, ret): | |
| area = np.sum(labels == i) | |
| if area > max_area: | |
| max_area = area | |
| idx = i | |
| return np.uint8(255 * (labels == idx)) | |
| def find_largest_mask(segment, ignore_ids=None): | |
| """ find the largest mask inside component | |
| """ | |
| if ignore_ids is None: | |
| ignore_ids = [] | |
| ids = np.unique(segment) | |
| max_area = 0 | |
| idx = 1 | |
| for i in ids: | |
| if i in ignore_ids: | |
| continue | |
| area = np.sum(segment == i) | |
| if area > max_area: | |
| max_area = area | |
| idx = i | |
| return idx, segment == idx | |
| def find_center_mask(segment, ignore_ids, box = None): | |
| h, w = segment.shape | |
| if box is None: | |
| box = [int(h / 4), | |
| int(w / 4), | |
| int(h * 3 / 4), | |
| int(w * 3 / 4)] | |
| idx, _ = find_largest_mask( | |
| segment[box[0]:box[2], box[1]:box[3]], ignore_ids) | |
| return idx, segment == idx | |
| def get_largest_component(segment_in, criteria='max'): | |
| segment = segment_in.copy() | |
| thresh = 0.3 | |
| segment = np.uint8(255 * (np.float32(segment) / 255.0 > thresh)) | |
| ret, labels = cv2.connectedComponents(segment) | |
| if ret == 2: | |
| return segment_in | |
| max_area = 0 | |
| idx = 1 | |
| for i in range(1, ret): | |
| area = np.sum(labels == i) | |
| if area > max_area: | |
| max_area = area | |
| idx = i | |
| mask = dilate_mask(np.uint8(labels == idx)) | |
| segment = segment_in * mask | |
| return np.uint8(segment) | |
| def fillholes(mask): | |
| """ | |
| binary mask | |
| """ | |
| des = np.uint8(mask > 0) * 255 | |
| contour, hier = cv2.findContours(des,cv2.RETR_CCOMP,cv2.CHAIN_APPROX_SIMPLE) | |
| # des = cv2.merge([des, des, des]) | |
| # cv2.drawContours(des, contour, -1, (0, 255, 0), 3) | |
| for i, cnt in enumerate(contour): | |
| cv2.drawContours(des, [cnt], -1, 255, -1) | |
| # mask = des == 0 | |
| return des > 0 | |
| def video_to_frames(in_path, out_path, max_frame=100000): | |
| """separate video to frames | |
| """ | |
| print("saving videos to frames at {}".format(out_path)) | |
| cap = cv2.VideoCapture(in_path) | |
| frame_id = 0 | |
| mkdir_if_need(out_path) | |
| # cv2.namedWindow("video") | |
| while(cap.isOpened()): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| filename = out_path + '/%04d.jpg' % frame_id | |
| cv2.imwrite(filename, frame) | |
| frame_id += 1 | |
| if frame_id > max_frame: | |
| break | |
| cap.release() | |
| print("finished") | |
| def resize_video(in_path, out_path, sz, max_frame=10000): | |
| """separate video to frames | |
| Args: | |
| sz: height, width of new video | |
| """ | |
| from moviepy.editor import ImageSequenceClip, VideoFileClip | |
| print("resize videos to vidoe at {}".format(out_path)) | |
| new_height, new_width = sz | |
| assert os.path.exists(in_path), f"must exist {in_path}" | |
| cap = cv2.VideoCapture(in_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| progress_bar = tqdm(total=max_frame) | |
| progress_bar.set_description('Progress') | |
| frame_id = 0 | |
| frames = [] | |
| while(cap.isOpened()): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame = cv2.resize(frame, (new_width, new_height)) | |
| frames.append(frame[:, :, ::-1]) | |
| frame_id += 1 | |
| progress_bar.update(frame_id) | |
| if frame_id > max_frame: | |
| break | |
| clip = ImageSequenceClip(frames, fps) | |
| clip.write_videofile(out_path, fps=fps) | |
| cap.release() | |
| print("finished") | |
| def frame_to_video_simple(frames, | |
| fps=10, | |
| video_name='video.avi', | |
| reader=cv2.IMREAD_UNCHANGED): | |
| """ | |
| Combine frames to video | |
| image_path: path of images | |
| """ | |
| import sys | |
| if video_name.endswith('.avi'): | |
| fourcc = cv2.VideoWriter_fourcc(*'XVID') | |
| elif video_name.endswith('.mp4'): | |
| fourcc = cv2.VideoWriter_fourcc(*'MP4V') | |
| is_str = False | |
| if isinstance(frames[0], str): | |
| frame = cv2.imread(frames[0], cv2.IMREAD_UNCHANGED) | |
| is_str = True | |
| else: | |
| frame = frames[0] | |
| sz = frame.shape[:2] | |
| video = cv2.VideoWriter(video_name, fourcc, fps, (sz[1], sz[0])) | |
| for i, frame in enumerate(tqdm(frames)): | |
| sys.stdout.write('\r>>process %04d / %04d' % (i, len(frames))) | |
| sys.stdout.flush() | |
| if is_str: | |
| frame = cv2.imread(frame, reader) | |
| video.write(frame) | |
| cv2.destroyAllWindows() | |
| video.release() | |
| print('save to %s' % video_name) | |
| def frame_to_video(image_path, | |
| label_path, | |
| frame_list, | |
| label_ext='', | |
| label_map_is_color=False, | |
| color_map=None, | |
| sz=None, | |
| fps=10, | |
| alpha=0.5, | |
| video_name='video.avi', | |
| exts=["jpg", "png"], | |
| is_probability=False): | |
| """ | |
| Combine frames to video to visualize image & label image | |
| image_path: path of images | |
| exts: 1st is | |
| """ | |
| def to_color_map(label): | |
| assert color_map is not None | |
| bkg = [255, 255, 255] | |
| if is_probability: | |
| if label.ndim == 2: | |
| label = np.float32(label) / 255 | |
| label = np.concatenate( | |
| [1 - label[:, :, None], | |
| label[:, :, None]], axis=2) | |
| label = prob2color(label, color_map, bkg_color=bkg) | |
| else: | |
| label[label > len(color_map)] = 0 | |
| label = label2color(label, color_map, bkg) | |
| return label[:, :, ::-1] | |
| import sys | |
| ext_image, ext_label = exts | |
| if sz is None: | |
| label = cv2.imread(f"{label_path}/{frame_list[0]}.{ext_label}", cv2.IMREAD_UNCHANGED) | |
| sz = label.shape[:2] | |
| if video_name.endswith('.avi'): | |
| fourcc = cv2.VideoWriter_fourcc(*'XVID') | |
| elif video_name.endswith('.mp4'): | |
| fourcc = cv2.VideoWriter_fourcc(*'MP4V') | |
| video = cv2.VideoWriter(video_name, fourcc, fps, (sz[1], sz[0])) | |
| for i, image_name in enumerate(frame_list): | |
| sys.stdout.write('\r>>process %04d / %04d' % (i, len(frame_list))) | |
| sys.stdout.flush() | |
| image = cv2.resize( | |
| cv2.imread(f"{image_path}/{image_name}.jpg", cv2.IMREAD_COLOR), | |
| (sz[1], sz[0])) | |
| label_name = image_name + label_ext | |
| label = cv2.resize(cv2.imread(f"{label_path}/{label_name}.{ext_label}", | |
| cv2.IMREAD_UNCHANGED), | |
| (sz[1], sz[0]), interpolation=cv2.INTER_NEAREST) | |
| if not label_map_is_color: | |
| label = to_color_map(label) | |
| frame = np.uint8(image * alpha + label * (1 - alpha)) | |
| video.write(frame) | |
| cv2.destroyAllWindows() | |
| video.release() | |
| print('save to %s' % video_name) | |
| def video_to_frame(video_path, | |
| image_folder_path=None, | |
| sample_rate=1, | |
| max_len=None, | |
| holder=None, | |
| ext="jpg"): | |
| """ | |
| holder: the holder of image list | |
| """ | |
| if image_folder_path is not None: | |
| mkdir_if_need(image_folder_path) | |
| if video_path.split('.')[-1] == 'gif': | |
| gif_to_frames(video_path, image_folder_path) | |
| return | |
| vidcap = cv2.VideoCapture(video_path) | |
| success, image = vidcap.read() | |
| assert success, video_path | |
| sz = image.shape[:2] | |
| count = 0 | |
| while success: | |
| if count % sample_rate == 0: | |
| image_path = f'{image_folder_path}/{count:04}.{ext}' | |
| if max_len is not None: | |
| image = resize2maxsize(image, max_len) | |
| # height, width = image.shape[:2] | |
| # length = int(height / 2) | |
| # image = image[:length, :, :] | |
| if image_folder_path is not None: | |
| cv2.imwrite(image_path, image) # save frame as JPEG file | |
| if holder is not None: | |
| holder.append(image) | |
| success, image = vidcap.read() | |
| count += 1 | |
| print('success split %s' % video_path) | |
| fps = vidcap.get(cv2.CAP_PROP_FPS) | |
| return fps, sz | |
| def box_intersect(box0, box1): | |
| # top, left, bottom, right | |
| box = [max(box0[0], box1[0]), max(box0[1], box1[1]), | |
| min(box0[2], box1[2]), min(box0[3], box1[3])] | |
| return box | |
| def timefunc(f): | |
| def f_timer(*args, **kwargs): | |
| start = time.time() | |
| result = f(*args, **kwargs) | |
| end = time.time() | |
| logger.debug(f.__name__, 'took', | |
| end - start, 'second') | |
| return result | |
| return f_timer | |
| def test_one_hot(): | |
| label = np.array([[1, 2], [3, 4]]) | |
| label_one_hot = one_hot(label, 5) | |
| print(label_one_hot) | |
| def test_resize2range(): | |
| test = np.ones([100, 200]) | |
| test2 = resize2range(test, 200, 50) | |
| print(test2.shape) | |
| def test_prob2image(): | |
| test = np.random.random_sample((3, 10, 10)) | |
| dump_prob2image('test', test) | |
| res = load_image2prob('test') | |
| np.testing.assert_allclose(test, res, rtol=0.5, atol=1e-02) | |
| def shape_match(images): | |
| assert len(images) > 1 | |
| shape = images[0].shape[:2] | |
| for image in images[1:]: | |
| cur_shape = image.shape[:2] | |
| if np.sum(np.abs(np.array(shape) - \ | |
| np.array(cur_shape))): | |
| return False | |
| return True | |
| def append_apex(filename, appex): | |
| filename = filename.split('.') | |
| prefix = '.'.join(filename[:-1]) | |
| filetype = filename[-1] | |
| return '%s_%s.%s' % (prefix, appex, filetype) | |
| def get_obj_center(mask, th=0): | |
| """ | |
| mask: 0 | |
| """ | |
| y, x = np.where(mask > th) | |
| if len(y) == 0: | |
| return -1 , -1 | |
| x, y = np.mean(x), np.mean(y) | |
| return int(x), int(y) | |
| def poly_area(poly): | |
| """ | |
| Args: | |
| poly: [n x 2] np.array [x, y] | |
| """ | |
| return PolyArea(poly[:, 0], poly[:, 1]) | |
| def PolyArea(x, y): | |
| return 0.5*np.abs(np.dot(x, np.roll(y, 1))-np.dot(y, np.roll(x,1))) | |
| def rect_size(rect): | |
| return np.linalg.norm(rect[0, :] - rect[2, :]) | |
| def avg_size(rects, option='median'): | |
| sizes = np.zeros(len(rects)) | |
| for i, rect in enumerate(rects): | |
| sizes[i] = rect_size(rect) | |
| if option == 'median': | |
| return np.median(sizes) | |
| if option == 'mean': | |
| return np.mean(sizes) | |
| return None | |
| def poly_ratio(rect, type='min'): | |
| if type == 'avg': | |
| l1 = np.linalg.norm(rect[0, :] - rect[1, :]) | |
| l2 = np.linalg.norm(rect[1, :] - rect[2, :]) | |
| l3 = np.linalg.norm(rect[2, :] - rect[3, :]) | |
| l4 = np.linalg.norm(rect[3, :] - rect[0, :]) | |
| return (l1 + l3) / (l2 + l4) | |
| ratio = 0 | |
| for i in range(4): | |
| s = i | |
| t = (i + 1) % 4 | |
| e = (i + 2) % 4 | |
| l1 = np.linalg.norm(rect[s, :] - rect[t, :]) | |
| l2 = np.linalg.norm(rect[t, :] - rect[e, :]) | |
| cur_ratio = max(l1 / (l2 + 1e-10), l2 / (l1 + 1e-10)) | |
| if cur_ratio > ratio: | |
| ratio = cur_ratio | |
| return ratio | |
| def rect_ratio(rect): | |
| """ x / y | |
| :param rect: | |
| :return: | |
| """ | |
| x_diff = np.max(rect[:, 0]) - np.min(rect[:, 0]) | |
| y_diff = np.max(rect[:, 1]) - np.min(rect[:, 1]) | |
| return max(x_diff / y_diff, y_diff / x_diff) | |
| def rect_in_size(rect, image_sz, num_th=4): | |
| """rectangle inside image | |
| """ | |
| h, w = image_sz | |
| def pt_in_size(pt): | |
| return 0 <= pt[0] < w and 0 <= pt[1] < h | |
| valid = [False for i in range(rect.shape[0])] | |
| for i, pt in enumerate(rect): | |
| if pt_in_size(pt): | |
| valid[i] = True | |
| return np.sum(valid) >= num_th | |
| def valid_rect(rect): | |
| l, r, t, b = rect | |
| return l < r and t < b | |
| def compute_normal_deg_absvar(normal, mask): | |
| normal_cur = normal * mask[:, :, None] | |
| mean_normal = np.sum(normal_cur, axis=(0, 1)) / np.sum(mask) | |
| inner = np.sum(mean_normal[None, None, :] * normal_cur, axis=2) | |
| s = np.clip(np.abs(inner), 0, 1) | |
| diff = np.rad2deg(np.arccos(s)) | |
| var = np.sum(diff * mask) / np.sum(mask) | |
| return var | |
| def compute_ignore_mask(x, ignore_value=None): | |
| mask = 1 | |
| if ignore_value is None: | |
| return mask | |
| dim = x.ndim | |
| if x.ndim == 2: | |
| x = x[:, :, None] | |
| if not isinstance(ignore_value, list): | |
| ignore_value = [ignore_value] * x.shape[-1] | |
| for i, value in enumerate(ignore_value): | |
| cur_mask = x[:, :, i] == value | |
| mask = mask * cur_mask | |
| if dim == 2: | |
| x = x.squeeze(-1) | |
| return mask | |
| def weight_reduce(res, weights): | |
| """ | |
| """ | |
| dim = res[0].ndim | |
| result = 0 | |
| weight_all = 0 | |
| for i, x in enumerate(res): | |
| if dim == 2: | |
| x = x[:, :, None] | |
| weight = weights[i] | |
| result = result + (x * weight[:, :, None]) | |
| weight_all = weight_all + weight | |
| if dim == 2: | |
| result = result.squeeze(-1) | |
| return result / np.maximum(weight_all[:, :, None], 1e-6) | |
| def mask_assign(x, mask, target): | |
| dim = x.ndim | |
| if dim == 2: | |
| x = x[:, :, None] | |
| for i in range(x.shape[-1]): | |
| cache = x[:, :, i] | |
| cache_tgt = target[:, :, i] | |
| cache[mask] = cache_tgt[mask] | |
| x[:, :, i] = cache | |
| if dim == 2: | |
| x = x.squeeze(-1) | |
| return x | |
| def overlap_poly(poly0, poly1, mask=None): | |
| sz = None | |
| if mask is None: | |
| h = max(np.max(poly0[:, 1]), np.max(poly1[:, 1])) | |
| w = max(np.max(poly0[:, 0]), np.max(poly1[:, 0])) | |
| sz = [h + 1, w + 1] | |
| else: | |
| sz = mask.shape[:2] | |
| vis_map0 = np.zeros(sz) | |
| cv2.fillPoly(vis_map0, | |
| pts=[np.int0(poly0)], | |
| color=(1,)) | |
| vis_map1 = np.zeros(sz) | |
| cv2.fillPoly(vis_map1, | |
| pts=[np.int0(poly1)], | |
| color=(1,)) | |
| inter_area = np.sum(vis_map0 * vis_map1), | |
| return inter_area, inter_area / np.sum(vis_map0), inter_area / np.sum(vis_map1) | |
| def overlap_rect_mask(rect, mask): | |
| """ | |
| ratio that mask is in rectangle | |
| """ | |
| vis_map = np.zeros(mask.shape) | |
| cv2.fillPoly(vis_map, | |
| pts=[np.int0(rect)], | |
| color=(1,)) | |
| overlap = np.sum(np.int32(mask > 0) * | |
| np.int32(vis_map > 0)) | |
| ratio = overlap / np.sum(vis_map > 0) | |
| return ratio | |
| def pt_in_poly(pt, poly): | |
| """ | |
| poly: list of pt | |
| """ | |
| from shapely.geometry import Point | |
| from shapely.geometry.polygon import Polygon | |
| point = Point(pt[0], pt[1]) | |
| polygon = Polygon(poly) | |
| return polygon.contains(point) | |
| def pt_in_poly_w_mask(pt, poly, sz, margin=None): | |
| """ | |
| margin: ratio of area for expand | |
| """ | |
| mask = np.zeros(np.int0(sz)) | |
| cv2.fillPoly(mask, | |
| pts=[np.int0(poly)], | |
| color=(255,)) | |
| if margin is not None: | |
| rectArea = PolyArea(poly[:, 0], poly[:, 1]) | |
| pixel = np.int0(margin * np.sqrt(rectArea)) | |
| mask = dilate_mask(mask, pixel) | |
| pt = np.int0(pt) | |
| return mask[pt[1], pt[0]] > 0 | |
| def is_overlap(r_cur, r_over, ths=None): | |
| """ whether two rects are overlapping | |
| r_cur: [l, r, t, b] | |
| """ | |
| if ths is None: | |
| ths = [0, 0] | |
| w_th, h_th = ths | |
| l, r, t, b = r_cur | |
| l0, r0, t0, b0 = r_over | |
| if l >= (r0 + w_th) or r <= (l0 - w_th): | |
| return False | |
| if b <= (t0 - h_th) or t >= (b0 + h_th): | |
| return False | |
| return True | |
| def rect_from_poly(poly): | |
| min_x, max_x = np.min(poly[:, 0]), np.max(poly[:, 0]) | |
| min_y, max_y = np.min(poly[:, 1]), np.max(poly[:, 1]) | |
| return min_x, max_x, min_y, max_y | |
| def rotate_image_if_needed(image): | |
| from PIL import Image, ExifTags | |
| if hasattr(image, '_getexif'): # only present in JPEGs | |
| for orientation in ExifTags.TAGS.keys(): | |
| if ExifTags.TAGS[orientation]=='Orientation': | |
| break | |
| e = image._getexif() # returns None if no EXIF data | |
| if e is not None: | |
| exif=dict(e.items()) | |
| if orientation in exif: | |
| orientation = exif[orientation] | |
| if orientation == 3: image = image.transpose(Image.ROTATE_180) | |
| elif orientation == 6: image = image.transpose(Image.ROTATE_270) | |
| elif orientation == 8: image = image.transpose(Image.ROTATE_90) | |
| return image | |
| def is_night_scene(image, prob_map, sky_prob_threshold=200, brightness_threshold=100): | |
| """ | |
| Return True if it's a night scene image | |
| image: original image | |
| prob_map: the probability map of image segmentation (red: sky; green: building; blue: background, value from 0 to 255) | |
| sky_prob_threshold: pixel val > sky_prob_threshold will be segmented as sky | |
| brightness_threshold: val < brightness_threshold will be considered as night scene | |
| """ | |
| rotate_image_if_needed(image) | |
| image = np.array(image.convert('L')) | |
| sky, building, background = prob_map.split() | |
| # calculate average brightness of the sky: | |
| sky_mask = np.array(sky) | |
| sky_brightness = (sky_mask > sky_prob_threshold) * image | |
| if (np.count_nonzero(sky_brightness) == 0): | |
| return False | |
| else: | |
| avg_sky_brightness = sky_brightness[np.nonzero(sky_brightness)].mean() | |
| return avg_sky_brightness < brightness_threshold | |
| def detect_lines(img, | |
| fg_mask=None, | |
| length_thresh=None): | |
| """ | |
| Detects lines using OpenCV LSD Detector | |
| Return: | |
| n x 4 line start, line end | |
| """ | |
| # Convert to grayscale if required | |
| if len(img.shape) == 3: | |
| img_copy = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| else: | |
| img_copy = img | |
| h, w = img.shape[:2] | |
| if length_thresh is None: | |
| length_thresh = int(max(h, w) * 0.04) | |
| # Create LSD detector with default parameters | |
| lsd = cv2.createLineSegmentDetector(0) | |
| # Detect lines in the image | |
| # Returns a NumPy array of type N x 1 x 4 of float32 | |
| # such that the 4 numbers in the last dimension are (x1, y1, x2, y2) | |
| # These denote the start and end positions of a line | |
| lines = lsd.detect(img_copy)[0] | |
| # Remove singleton dimension | |
| lines = lines[:, 0] | |
| # Filter out the lines whose length is lower than the threshold | |
| dx = lines[:, 2] - lines[:, 0] | |
| dy = lines[:, 3] - lines[:, 1] | |
| lengths = np.sqrt(dx * dx + dy * dy) | |
| mask = lengths >= length_thresh | |
| lines = lines[mask] | |
| # todo remove lines at boundary | |
| if fg_mask: | |
| fg_mask = cv2.distanceTransform(fg_mask, distanceType=cv2.DIST_C, maskSize=5).astype(np.float32) | |
| select_id = np.ones((len(lines),)) | |
| for ind, l in enumerate(lines): | |
| ll = np.int0(l) | |
| dist = (fg_mask[ll[1], ll[0]] + fg_mask[ll[3], ll[2]]) * 0.5 | |
| if dist < 8: | |
| select_id[ind] = 0 | |
| lines = lines[select_id > 0] | |
| return lines | |
| def get_a_key(dict_data: Dict[str, Any]): | |
| """ | |
| Get first iterated key value from a dictionary. | |
| Args: | |
| dict_data (Dict[str, Any]): dict with string keys. | |
| Returns: | |
| Optional[str]: str key if non-empty, else None. | |
| """ | |
| if dict_data: | |
| key = next(iter(dict_data)) | |
| return key | |
| else: | |
| return None | |
| def shift_to_center(image, mask, shape=None): | |
| """ | |
| shift image object to center at mask center | |
| """ | |
| if shape is None: | |
| shape = image.shape[:2] | |
| assert mask.shape[0] == shape[0] | |
| cy, cx = shape[0] // 2, shape[1] // 2 | |
| positions = np.nonzero(mask) | |
| top = positions[0].min() | |
| bottom = positions[0].max() | |
| left = positions[1].min() | |
| right = positions[1].max() | |
| new_l = cx - (right - left) // 2 | |
| new_r = new_l + right - left | |
| new_top = cy - (bottom - top) // 2 | |
| new_bottom = new_top + bottom - top | |
| new_im = np.zeros(image.shape) | |
| new_im[new_top:new_bottom, new_l:new_r, :] = \ | |
| image[top:bottom, left:right, :] | |
| return new_im | |
| def ndarray_to_list(in_dict: dict): | |
| for key, item in in_dict.items(): | |
| if isinstance(item, np.ndarray): | |
| in_dict[key] = item.tolist() | |
| if isinstance(item, dict): | |
| in_dict[key] = ndarray_to_list(item) | |
| return in_dict | |
| """ | |
| encode image to string and decode it back | |
| """ | |
| def encode_b64(mat, format='.png'): | |
| mat = cv2.imencode(format, mat)[1] | |
| return base64.b64encode(mat).decode('utf-8') | |
| def decode64(string): | |
| jpg_original = base64.b64decode(string) | |
| jpg_as_np = np.frombuffer(jpg_original, dtype=np.uint8) | |
| img = cv2.imdecode(jpg_as_np, cv2.IMREAD_UNCHANGED) | |
| return img | |
| def remap_texture(triangle1, triangle2, texture): | |
| import numpy as np | |
| import cv2 | |
| # Convert input triangles to numpy arrays | |
| tri1 = np.array(triangle1, dtype=np.float32) | |
| tri2 = np.array(triangle2, dtype=np.float32) | |
| # Find the bounding rectangle of each triangle | |
| rect1 = cv2.boundingRect(tri1) | |
| rect2 = cv2.boundingRect(tri2) | |
| # Offset points by left top corner of the respective rectangles | |
| tri1_rect = np.float32(tri1 - rect1[:2]) | |
| tri2_rect = np.float32(tri2 - rect2[:2]) | |
| # Apply the affine transformation to map the texture from triangle1 to triangle2 | |
| warp_mat = cv2.getAffineTransform(tri1_rect, tri2_rect) | |
| warped_texture = cv2.warpAffine(texture, warp_mat, (rect2[2], rect2[3])) | |
| # Create a mask for the destination triangle | |
| mask = np.zeros((rect2[3], rect2[2], 3), dtype=np.uint8) | |
| cv2.fillConvexPoly(mask, np.int32(tri2_rect), (1.0, 1.0, 1.0), 16, 0) | |
| # Apply the mask to the warped texture | |
| remapped_texture = warped_texture * mask | |
| return remapped_texture, mask | |
| def fuse_rgb_mask(image, mask): | |
| """ | |
| image: h, w, [3,4] rgb or rgba image | |
| mask: h, w, [1,3] mask | |
| """ | |
| if isinstance(image, str): | |
| image = cv2.imread(image, cv2.IMREAD_UNCHANGED) | |
| if isinstance(mask, str): | |
| mask = cv2.imread(mask, cv2.IMREAD_UNCHANGED) | |
| if not shape_match([image, mask]): | |
| image = cv2.resize(image, (mask.shape[1], mask.shape[0])) | |
| if image.shape[-1] == 4: | |
| image = image[:, :, :3] | |
| if mask.shape[-1] == 3: | |
| mask = mask[:, :, 0] | |
| mask = mask[:, :, None] | |
| if mask.max() == 1: | |
| mask = mask * 255 | |
| return np.concatenate([image, mask], axis=2) | |
| def test_remap_texture(): | |
| # Define test input values | |
| triangle1 = [(0, 0), (50, 0), (0, 50)] | |
| triangle2 = [(0, 0), (100, 0), (0, 100)] | |
| texture = np.ones((50, 50, 3), dtype=np.uint8) * 255 | |
| # Call the remap_texture function with the test input values | |
| remapped_texture = remap_texture(triangle1, triangle2, texture) | |
| # Check if the output is as expected | |
| assert remapped_texture.shape == (100, 100, 3), "Remapped texture shape is incorrect" | |
| assert np.all(remapped_texture[:50, :50] == texture), "Texture not correctly remapped in the destination triangle" | |
| # Print a success message if the test passes | |
| print("Test passed: remap_texture function works as expected") | |
| def test_line_seg_cross(): | |
| seg1 = np.array([[0, 0], [1, 1]]) | |
| seg2 = np.array([[1, 0], [0, 1]]) | |
| print(line_segment_cross(seg1, seg2)) | |
| seg1 = np.array([[0, 0], [1, 1]]) | |
| seg2 = np.array([[1, 0], [1.5, 2]]) | |
| print(line_segment_cross(seg1, seg2)) | |
| if __name__ == '__main__': | |
| # test_one_hot() | |
| # test_resize2range() | |
| # test_prob2image() | |
| # test_line_seg_cross() | |
| # test = np.array([[0, 2], [1, 1], [1, 0], [0, 0]]) | |
| # area = PolyArea(test[:, 0], test[:, 1]) | |
| # print(area) | |
| # test_remap_texture() | |
| # pt = np.array([0.5, 0.5]) | |
| # rect = np.array([[0, 1], [1, 1], [1, 0], [0, 0]]) | |
| # print(pt_in_poly(pt, rect)) | |
| # test_file = "/opt/tiger/mzy-project/temp/BuildingAR/facader/test.png" | |
| # test_out = "/opt/tiger/mzy-project/temp/BuildingAR/facader/test2.png" | |
| # image = cv2.imread(test_file, cv2.IMREAD_UNCHANGED) | |
| # image = fillholes(image) | |
| # print(np.unique(image)) | |
| # cv2.imwrite(test_out, image * 255) | |
| # test = np.array([[0, 2], [1, 1], [1, 0], [0, 0]]) | |
| # print(overlap_poly(test, test)) | |
| # area = PolyArea(test[:, 0], test[s:, 1]) | |
| # print(area) | |
| # import plot_utils as p_uts | |
| # image = np.zeros((480, 640, 3)) | |
| # lines = np.array([[500.5 , 299.6 , 409.375, 235.375], | |
| # [504.575, 309.325, 415.625, 244.575]]) | |
| # pt, _ = line_intersect_pt(lines) | |
| # print(pt) | |
| # cv2.circle(image, np.int32(pt), 1, (255, 0, 0), 2) | |
| # image = p_uts.drawLines(image, lines.reshape([-1, 2, 2])) | |
| # cv2.imwrite('test.png', image) | |
| paths = "/opt/tiger/spark_deploy/spark-3.0/spark-stable/bin:/opt/mlx_deploy/miniconda3/envs/mlx/bin:/opt/tiger/mlx_deploy:/opt/tiger/tce/tce_tools/bin:/home/tiger/.local/bin:/opt/common_tools:/usr/local/go/bin:/opt/tiger/mlx_deploy/vscode/code-server-4.7.1-linux-amd64/lib/vscode/bin/remote-cli:/opt/tiger/spark_deploy/spark-3.0/spark-stable/bin:/opt/mlx_deploy/miniconda3/envs/mlx/bin:/opt/tiger/mlx_deploy:/opt/tiger/spark_deploy/spark-3.0/spark-stable/bin:/opt/mlx_deploy/miniconda3/envs/mlx/bin:/opt/tiger/mlx_deploy:/opt/tiger/spark_deploy/spark-3.0/spark-stable/bin:/opt/mlx_deploy/miniconda3/envs/mlx/bin:/opt/tiger/mlx_deploy:/workspace:/opt/tiger/consul_deploy/bin/go:/root/miniconda3/bin:/root/miniconda3/condabin:/usr/local/cuda/bin:/workspace:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/tiger/ss_bin:/usr/local/jdk/bin:/usr/sbin:/opt/tiger/ss_lib/bin:/opt/tiger/ss_lib/python_package/lib/python2.7/site-packages/django/bin:/opt/tiger/yarn_deploy/hadoop/bin:/opt/tiger/yarn_deploy/hive/bin:/opt/tiger/yarn_deploy/jdk/bin:/opt/tiger/hadoop_deploy/jython-2.5.2/bin:/usr/local/bvc/bin:/opt/tiger/arnold/bin:/workspace/bernard/bin:/workspace://bin:/opt/tiger/ss_bin:/opt/tiger/ss_lib/bin:/opt/common_tools:/opt/tiger/yarn_deploy/hadoop/bin:/opt/tiger/yarn_deploy/hive/bin:/workspace:/workspace://bin:/opt/tiger/ss_bin:/opt/tiger/ss_lib/bin:/opt/common_tools:/opt/tiger/yarn_deploy/hadoop/bin:/opt/tiger/yarn_deploy/hive/bin:/workspace://bin:/opt/tiger/ss_bin:/opt/tiger/ss_lib/bin:/opt/common_tools:/opt/tiger/yarn_deploy/hadoop/bin:/opt/tiger/yarn_deploy/hive/bin:/opt/tiger/nastk/bin:/workspace://bin:/opt/tiger/ss_bin:/opt/tiger/ss_lib/bin:/opt/common_tools:/opt/tiger/yarn_deploy/hadoop/bin:/opt/tiger/yarn_deploy/hive/bin" | |
| paths = paths.split(":") | |
| check_file_in_paths(paths, "docker") | |