- 新增 analyze、analyze_result、analyze_status 和 health 路由 - 实现图像上传和任务提交功能 - 添加任务状态查询和结果获取接口 - 集成 segformer 和 yolo 模型进行图像检测 - 实现 SAM3 预处理功能用于图像预处理判断 - 添加模型选择配置支持 segformer 和 yolo - 实现任务队列管理和异步处理机制 - 添加 Dockerfile 用于容器化部署 - 配置环境变量和 gitignore 规则 - 创建数据模型定义 API 响应结构
292 lines
9.7 KiB
Python
292 lines
9.7 KiB
Python
import os
|
||
|
||
import cv2
|
||
import numpy as np
|
||
import onnxruntime as ort
|
||
|
||
CLASS_NAMES = {
|
||
0: "Hollowing",
|
||
1: "Water seepage",
|
||
2: "Cracking",
|
||
}
|
||
|
||
CLASS_COLORS = {
|
||
0: (0, 0, 255), # Hollowing -> 红色
|
||
1: (0, 255, 0), # Water seepage -> 绿色
|
||
2: (255, 0, 0), # Cracking -> 蓝色
|
||
}
|
||
|
||
IMG_SIZE = 640
|
||
|
||
|
||
class YOLOSeg:
|
||
def __init__(self, onnx_path: str = "model.onnx", imgsz: int = IMG_SIZE):
|
||
real_path = os.path.join(os.path.dirname(__file__), onnx_path)
|
||
self.session = ort.InferenceSession(
|
||
real_path,
|
||
providers=["CUDAExecutionProvider", "CPUExecutionProvider"]
|
||
if ort.get_device() == "GPU"
|
||
else ["CPUExecutionProvider"],
|
||
)
|
||
self.ndtype = (
|
||
np.half
|
||
if self.session.get_inputs()[0].type == "tensor(float16)"
|
||
else np.single
|
||
)
|
||
self.imgsz = imgsz
|
||
self.classes = CLASS_NAMES
|
||
|
||
# ---------- 预处理:letterbox ----------
|
||
def _preprocess(self, img_bgr):
|
||
h0, w0 = img_bgr.shape[:2]
|
||
new_shape = (self.imgsz, self.imgsz)
|
||
|
||
r = min(new_shape[0] / h0, new_shape[1] / w0)
|
||
ratio = (r, r)
|
||
new_unpad = (int(round(w0 * r)), int(round(h0 * r)))
|
||
pad_w = (new_shape[1] - new_unpad[0]) / 2
|
||
pad_h = (new_shape[0] - new_unpad[1]) / 2
|
||
|
||
if (w0, h0) != new_unpad:
|
||
img = cv2.resize(img_bgr, new_unpad, interpolation=cv2.INTER_LINEAR)
|
||
else:
|
||
img = img_bgr.copy()
|
||
|
||
top, bottom = int(round(pad_h - 0.1)), int(round(pad_h + 0.1))
|
||
left, right = int(round(pad_w - 0.1)), int(round(pad_w + 0.1))
|
||
img = cv2.copyMakeBorder(
|
||
img, top, bottom, left, right,
|
||
borderType=cv2.BORDER_CONSTANT, value=(114, 114, 114)
|
||
)
|
||
|
||
# HWC -> CHW, BGR->RGB, /255
|
||
img = np.ascontiguousarray(np.einsum("HWC->CHW", img)[::-1], dtype=self.ndtype)
|
||
img = img / 255.0
|
||
if img.ndim == 3:
|
||
img = img[None] # (1,3,H,W)
|
||
return img, ratio, (pad_w, pad_h)
|
||
|
||
# ---------- mask -> 多边形 ----------
|
||
@staticmethod
|
||
def _masks2segments(masks):
|
||
"""masks: (N,H,W) -> 每个实例的多边形坐标"""
|
||
segments = []
|
||
for x in masks.astype("uint8"):
|
||
cs = cv2.findContours(x, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)[0]
|
||
if cs:
|
||
# 取点数最多的一条轮廓
|
||
c = np.array(cs[np.argmax([len(i) for i in cs])]).reshape(-1, 2)
|
||
else:
|
||
c = np.zeros((0, 2))
|
||
segments.append(c.astype("float32"))
|
||
return segments
|
||
|
||
@staticmethod
|
||
def _crop_mask(masks, boxes):
|
||
"""masks: (N,H,W), boxes: (N,4) xyxy"""
|
||
n, h, w = masks.shape
|
||
x1, y1, x2, y2 = np.split(boxes[:, :, None], 4, 1)
|
||
r = np.arange(w, dtype=x1.dtype)[None, None, :]
|
||
c = np.arange(h, dtype=x1.dtype)[None, :, None]
|
||
return masks * ((r >= x1) * (r < x2) * (c >= y1) * (c < y2))
|
||
|
||
@staticmethod
|
||
def _scale_mask(masks, im0_shape, ratio_pad=None):
|
||
"""把特征图上的 mask 缩放到原图大小"""
|
||
im1_shape = masks.shape[:2]
|
||
if ratio_pad is None:
|
||
gain = min(im1_shape[0] / im0_shape[0], im1_shape[1] / im0_shape[1])
|
||
pad = (
|
||
(im1_shape[1] - im0_shape[1] * gain) / 2,
|
||
(im1_shape[0] - im0_shape[0] * gain) / 2,
|
||
)
|
||
else:
|
||
pad = ratio_pad[1]
|
||
|
||
top, left = int(round(pad[1] - 0.1)), int(round(pad[0] - 0.1))
|
||
bottom = int(round(im1_shape[0] - pad[1] + 0.1))
|
||
right = int(round(im1_shape[1] - pad[0] + 0.1))
|
||
|
||
if masks.ndim < 2:
|
||
raise ValueError("masks ndim 应该是 2 或 3")
|
||
|
||
masks = masks[top:bottom, left:right]
|
||
masks = cv2.resize(
|
||
masks, (im0_shape[1], im0_shape[0]), interpolation=cv2.INTER_LINEAR
|
||
)
|
||
if masks.ndim == 2:
|
||
masks = masks[:, :, None]
|
||
return masks
|
||
|
||
def _process_mask(self, protos, masks_in, bboxes, im0_shape):
|
||
"""
|
||
protos: (C,Hm,Wm)
|
||
masks_in: (N,C)
|
||
bboxes: (N,4) xyxy
|
||
返回: (N,H,W) bool
|
||
"""
|
||
c, mh, mw = protos.shape
|
||
masks = (
|
||
np.matmul(masks_in, protos.reshape(c, -1))
|
||
.reshape(-1, mh, mw)
|
||
.transpose(1, 2, 0)
|
||
) # HWN
|
||
masks = np.ascontiguousarray(masks)
|
||
masks = self._scale_mask(masks, im0_shape) # HWC
|
||
masks = np.einsum("HWN->NHW", masks) # NHW
|
||
masks = self._crop_mask(masks, bboxes)
|
||
return masks > 0.5
|
||
|
||
@staticmethod
|
||
def _get_cid(name):
|
||
for k, v in CLASS_NAMES.items():
|
||
if v == name:
|
||
return k
|
||
|
||
@staticmethod
|
||
def _make_color_mask(img_bgr, masks, coords):
|
||
"""
|
||
生成一张“带颜色的掩码图”
|
||
- 背景为黑色
|
||
- 每个实例区域按类别上色(不叠加到原图)
|
||
返回:color_mask (H,W,3) BGR uint8
|
||
"""
|
||
h, w = img_bgr.shape[:2]
|
||
color_mask = np.zeros((h, w, 3), dtype=np.uint8)
|
||
|
||
N = masks.shape[0]
|
||
for i in range(N):
|
||
m = masks[i] # (H,W) bool
|
||
inst = coords[i]
|
||
cid = YOLOSeg._get_cid(inst[0])
|
||
# print(f"name: {inst[0]}, cid: {cid}")
|
||
color = CLASS_COLORS.get(cid, (0, 255, 255)) # 没配置的类用黄青色
|
||
|
||
# 只在掩码区域上色
|
||
color_mask[m] = color
|
||
|
||
return color_mask
|
||
|
||
# ---------- 推理主入口 ----------
|
||
def detect(self, img_input):
|
||
conf_thres = 0.1
|
||
iou_thres = 0.1
|
||
"""
|
||
输入: 原始 BGR 图像
|
||
输出:
|
||
masks: (N,H,W) bool 掩码
|
||
coords: List[dict] 每个实例包含 class_name, confidence, points(多边形)
|
||
color_mask: 带有颜色的掩码图(黑背景,上面是彩色的缺陷区域)
|
||
"""
|
||
if isinstance(img_input, str):
|
||
img_bgr = cv2.imdecode(np.fromfile(img_input, dtype=np.uint8), cv2.IMREAD_COLOR)
|
||
else:
|
||
img_bgr = img_input
|
||
|
||
if img_bgr is None:
|
||
raise ValueError("img_bgr is None, 请检查图片读取是否成功")
|
||
|
||
im0 = img_bgr.copy()
|
||
im, ratio, (pad_w, pad_h) = self._preprocess(im0)
|
||
|
||
# ONNX 推理
|
||
input_name = self.session.get_inputs()[0].name
|
||
preds = self.session.run(None, {input_name: im})
|
||
x, protos = preds[0], preds[1] # x:(1,C,N), protos:(1,32,Hm,Wm)
|
||
|
||
# (1,C,N) -> (N,C)
|
||
x = np.einsum("bcn->bnc", x)[0] # (N, C)
|
||
|
||
# 从 protos 动态推断 mask 通道数
|
||
nm = int(protos.shape[1]) # 一般是 32
|
||
C = x.shape[1]
|
||
nc = C - 4 - nm # 类别数
|
||
|
||
# 类别分数区间 [4:4+nc]
|
||
cls_scores = x[:, 4:4 + nc]
|
||
cls_max = np.max(cls_scores, axis=-1)
|
||
keep = cls_max > conf_thres
|
||
x = x[keep]
|
||
cls_scores = cls_scores[keep]
|
||
|
||
h0, w0 = im0.shape[:2]
|
||
|
||
if x.size == 0:
|
||
# 没有检测到任何目标:返回空 mask、空坐标、空彩色掩码
|
||
empty_masks = np.zeros((0, h0, w0), dtype=bool)
|
||
empty_color_mask = np.zeros((h0, w0, 3), dtype=np.uint8)
|
||
return empty_masks, [], empty_color_mask
|
||
|
||
conf = cls_max[keep]
|
||
cls_id = np.argmax(cls_scores, axis=-1)
|
||
# 拼成 [cx,cy,w,h, conf, cls_id, mask_coeffs...]
|
||
x = np.c_[x[:, :4], conf, cls_id, x[:, -nm:]]
|
||
|
||
# ===== NMS:OpenCV NMSBoxes 需要 [x, y, w, h] 左上角坐标 =====
|
||
# 当前 x[:, :4] 是 [cx, cy, w, h],先转换成 [x, y, w, h]
|
||
bboxes_xywh = x[:, :4].copy()
|
||
bboxes_xywh[:, 0] = bboxes_xywh[:, 0] - bboxes_xywh[:, 2] / 2 # x = cx - w/2
|
||
bboxes_xywh[:, 1] = bboxes_xywh[:, 1] - bboxes_xywh[:, 3] / 2 # y = cy - h/2
|
||
|
||
indices = cv2.dnn.NMSBoxes(
|
||
bboxes_xywh.tolist(), x[:, 4].tolist(), conf_thres, iou_thres
|
||
)
|
||
|
||
# 不同 OpenCV 版本,indices 可能是 []、[0,1]、[[0],[1]]、np.array([...])
|
||
if indices is None or len(indices) == 0:
|
||
empty_masks = np.zeros((0, h0, w0), dtype=bool)
|
||
empty_color_mask = np.zeros((h0, w0, 3), dtype=np.uint8)
|
||
return empty_masks, [], empty_color_mask
|
||
|
||
# 统一成一维整型索引数组
|
||
indices = np.array(indices).reshape(-1)
|
||
x = x[indices]
|
||
|
||
# cxcywh -> xyxy(这里用处理后的 x[:, :4])
|
||
x[:, 0:2] -= x[:, 2:4] / 2
|
||
x[:, 2:4] += x[:, 0:2]
|
||
|
||
# 去掉 pad,缩放回原图
|
||
x[:, [0, 2]] -= pad_w
|
||
x[:, [1, 3]] -= pad_h
|
||
x[:, :4] /= min(ratio)
|
||
|
||
# 限制在图像范围内
|
||
x[:, [0, 2]] = x[:, [0, 2]].clip(0, w0)
|
||
x[:, [1, 3]] = x[:, [1, 3]].clip(0, h0)
|
||
|
||
# 解码 mask
|
||
protos = protos[0] # (32,Hm,Wm)
|
||
bboxes_xyxy = x[:, :4]
|
||
mask_coeffs = x[:, 6:]
|
||
masks = self._process_mask(protos, mask_coeffs, bboxes_xyxy, im0.shape)
|
||
|
||
# 掩码 -> 多边形
|
||
segments = self._masks2segments(masks)
|
||
|
||
# 打包坐标结果
|
||
coords = []
|
||
for (x1, y1, x2, y2, conf_i, cls_i), seg in zip(x[:, :6], segments):
|
||
cid = int(cls_i)
|
||
coords.append(
|
||
(
|
||
self.classes.get(cid, str(cid)),
|
||
seg.tolist()
|
||
)
|
||
)
|
||
|
||
color_mask = self._make_color_mask(im0, masks, coords)
|
||
|
||
return color_mask, coords
|
||
|
||
|
||
if __name__ == "__main__":
|
||
img_path = r"D:\Projects\Python\wall\app\core\yolo\test.jpg"
|
||
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"}
|
||
# ====== 加载模型 ======
|
||
model = YOLOSeg()
|
||
img = cv2.imdecode(np.fromfile(str(img_path), dtype=np.uint8), cv2.IMREAD_COLOR)
|
||
color_mask, coords = model.detect(img)
|
||
print(color_mask.shape)
|
||
print(coords)
|