当前位置: 代码迷 >> 综合 >> balloon
  详细解决方案

balloon

热度:34   发布时间:2023-12-05 12:12:47.0
import os
import sys
import json
import datetime
import numpy as np
import skimage.draw
import tensorflow as tf
import matplotlib.pyplot as plt
# GPU限制
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if len(physical_devices) > 0:tf.config.experimental.set_memory_growth(physical_devices[0], True)# 项目的根目录
ROOT_DIR = os.path.abspath("../")
print(ROOT_DIR)  #
# 导入 Mask RCNN
sys.path.append(ROOT_DIR)  # 查找库的本地版本
from mrcnn.config import Config
from mrcnn import model as modellib, utils
from pathlib import Path
# 训练后的权重文件的路径
COCO_WEIGHTS_PATH = os.path.join(ROOT_DIR, "mask_rcnn_coco.h5")
print(COCO_WEIGHTS_PATH)# 用于保存日志和模型检查点的目录 (如果未通过命令行参数 --logs 提供)
DEFAULT_LOGS_DIR = os.path.join(ROOT_DIR, "logs")
print(DEFAULT_LOGS_DIR)############################################################
# Configurations
############################################################
class LandslideConfig(Config):"""用于对 toy balloon 数据集进行训练的配置.派生自基本的Config类, 并覆盖一些值."""# 为配置指定一个可识别的名称NAME = "landslide"# We use a GPU with 12GB memory, which can fit two images.# Adjust down if you use a smaller GPU.# IMAGES_PER_GPU = 2IMAGES_PER_GPU = 1# Number of classes (including background)NUM_CLASSES = 1 + 1  # Background + landslide //nonlandslide# Number of training steps per epochSTEPS_PER_EPOCH = 100 # 200# Skip detections with < 90% confidenceDETECTION_MIN_CONFIDENCE = 0.9# IMAGE_MIN_DIM = 512IMAGE_MIN_DIM = 128# IMAGE_MAX_DIM = 512IMAGE_MAX_DIM = 128############################################################
# Dataset
############################################################
class LandslideDataset(utils.Dataset):def load_landslide(self, dataset_dir, subset):"""加载 Balloon 数据集的子集.dataset_dir: 数据集的根目录.要加载的子集: train or val"""# 添加 classes. 我们仅添加一个类.self.add_class("landslide", 1, "landslide")# Train or validation dataset?assert subset in ["train", "val"]dataset_dir = os.path.join(dataset_dir, subset)# Load annotations# VGG Image Annotator (up to version 1.6) saves each image in the form:# { 'filename': '28503151_5b5b7ec140_b.jpg',# 'regions': {
    # '0': {
    # 'region_attributes': {},# 'shape_attributes': {
    # 'all_points_x': [...],# 'all_points_y': [...],# 'name': 'polygon'}},# ... more regions ...# },# 'size': 100202# }# We mostly care about the x and y coordinates of each region# Note: In VIA 2.0, regions was changed from a dict to a list.annotations = json.load(open(os.path.join(dataset_dir, "via_region_data.json")))annotations = list(annotations.values())  # don't need the dict keys# 即使图像没有任何注释,VIA工具也会将图像保存在JSON中.# 跳过未注释的图像annotations = [a for a in annotations if a['regions']]# Add imagesfor a in annotations:# Get the x, y coordinaets of points of the polygons that make up# the outline of each object instance. These are stores in the# shape_attributes (see json format above)# The if condition is needed to support VIA versions 1.x and 2.x.if type(a['regions']) is dict:polygons = [r['shape_attributes'] for r in a['regions'].values()]else:polygons = [r['shape_attributes'] for r in a['regions']]# load_mask() needs the image size to convert polygons to masks.# Unfortunately, VIA doesn't include it in JSON, so we must read# the image. This is only managable since the dataset is tiny.image_path = os.path.join(dataset_dir, a['filename'])image = skimage.io.imread(image_path)height, width = image.shape[:2]self.add_image("landslide",image_id=a['filename'],  # use file name as a unique image idpath=image_path,width=width, height=height,polygons=polygons)def load_mask(self, image_id):"""生成图像的实例 mask.Returns:masks: A bool array of shape [height, width, instance count] withone mask per instance.class_ids: a 1D array of class IDs of the instance masks."""# 如果不是 balloon dataset 图像, 则委托给父类.image_info = self.image_info[image_id]if image_info["source"] != "landslide":return super(self.__class__, self).load_mask(image_id)# 将多边形转化为位图形状的mask# [height, width, instance_count]info = self.image_info[image_id]mask = np.zeros([info["height"], info["width"], len(info["polygons"])],dtype=np.uint8)for i, p in enumerate(info["polygons"]):# 获取多边形内像素的索引并将其设置为1rr, cc = skimage.draw.polygon(p['all_points_y'], p['all_points_x'])mask[rr, cc, i] = 1# 返回掩码,以及每个实例的类ID数组。# 由于我们只有一个类ID,因此我们返回1的数组return mask.astype(np.bool), np.ones([mask.shape[-1]], dtype=np.int32)def image_reference(self, image_id):"""返回图像的路径."""info = self.image_info[image_id]if info["source"] == "landslide":return info["path"]else:super(self.__class__, self).image_reference(image_id)def train(model):"""训练模型."""# 训练数据.dataset_train = LandslideDataset()dataset_train.load_landslide(args.dataset, "train")dataset_train.prepare()# 验证数据dataset_val = LandslideDataset()dataset_val.load_landslide(args.dataset, "val")dataset_val.prepare()# *** 这个培训表只是一个例子. 更新您的需求 ***# 由于我们使用的数据集非常小,并且从COCO训练的权重开始,因此我们不需要训练太长时间.# 另外,不需要训练所有层,只要训练头部就可以完成print("Training network heads")model.train(dataset_train, dataset_val,learning_rate=config.LEARNING_RATE,epochs=30,layers='heads')def color_splash(image, mask):"""应用 颜色飞溅 效果.image: RGB image [height, width, 3]mask: instance segmentation mask [height, width, instance count]Returns 结果图像."""# 制作图像的灰度副本.# 尽管灰度副本仍然具有3个通道gray = skimage.color.gray2rgb(skimage.color.rgb2gray(image)) * 255# 从设置了mask的原始色彩图像中复制彩色像素if mask.shape[-1] > 0:# 我们将所有实例视为一个,因此将mask折叠为一层mask = (np.sum(mask, -1, keepdims=True) >= 1)splash = np.where(mask, image, gray).astype(np.uint8)else:splash = gray.astype(np.uint8)return splashdef detect_and_color_splash(model, image_path=None, video_path=None):assert image_path or video_path# Image or video?if image_path:# 运行模型检测并生成颜色飞溅效果print("Running on {}".format(args.image))# Read imageimage = skimage.io.imread(args.image)# Detect objectsr = model.detect([image], verbose=1)[0]# Color splashsplash = color_splash(image, r['masks'])# Save outputfile_name = "splash_{:%Y%m%dT%H%M%S}.png".format(datetime.datetime.now())skimage.io.imsave(file_name, splash)elif video_path:import cv2# Video capturevcapture = cv2.VideoCapture(video_path)width = int(vcapture.get(cv2.CAP_PROP_FRAME_WIDTH))height = int(vcapture.get(cv2.CAP_PROP_FRAME_HEIGHT))fps = vcapture.get(cv2.CAP_PROP_FPS)# Define code and create video writerfile_name = "splash_{:%Y%m%dT%H%M%S}.avi".format(datetime.datetime.now())vwriter = cv2.VideoWriter(file_name,cv2.VideoWriter_fourcc(*'MJPG'),fps, (width, height))count = 0success = Truewhile success:print("frame: ", count)# Read next imagesuccess, image = vcapture.read()if success:# OpenCV returns images as BGR, convert to RGBimage = image[..., ::-1]# Detect objectsr = model.detect([image], verbose=0)[0]# Color splashsplash = color_splash(image, r['masks'])# RGB -> BGR to save image to videosplash = splash[..., ::-1]# Add image to video writervwriter.write(splash)count += 1vwriter.release()print("Saved to ", file_name)def image_name(images_path,file_tpye):images = []for root,dirs,files in os.walk(images_path):for file in files:if os.path.splitext(file)[1] == file_tpye:images.append(os.path.join(root,file))return imagesdef get_ax(rows=1, cols=1, size=16): #bai_, ax = plt.subplots(rows, cols, figsize=(size * cols, size * rows))return axdef detect_and_show(model, image_path=None):import visualize_cv2dataset = LandslideDataset()dataset.load_landslide(args.dataset, "val")dataset.prepare()images = image_name(Path(args.image), ".png")print(images)count = 1for i in images:# Run model detection and generate the color splash effectprint("Running on {}".format(i))# Read imageimage = skimage.io.imread(i)# Detect objectsresults = model.detect([image], verbose=1)r = results[0]# Save outputfile_name = "detected_{:%Y%m%dT%H%M%S}{count}.png".format(datetime.datetime.now(), count=count)visualize_cv2.save_image(image, file_name, r['rois'], r['masks'],r['class_ids'], r['scores'], dataset.class_names,filter_classs_names=['landslide'], scores_thresh=0.7, mode=0)print("Saved to ", file_name)count = count + 1
############################################################
# Training
############################################################
if __name__ == '__main__':import argparse# 解析命令行参数parser = argparse.ArgumentParser(description='Train Mask R-CNN to detect landslide.')parser.add_argument("command",metavar="<command>",help="'train' or 'splash'")parser.add_argument('--dataset', required=False,metavar="/path/to/landslide/dataset/",help='Directory of the landslide dataset')parser.add_argument('--weights', required=True,metavar="/path/to/mask_rcnn_coco.h5",help="Path to weights .h5 file or 'coco'")parser.add_argument('--logs', required=False,default=DEFAULT_LOGS_DIR,metavar="/path/to/logs/",help='Logs and checkpoints directory (default=logs/)')parser.add_argument('--image', required=False,metavar="path or URL to image",help='Image to apply the color splash effect on')parser.add_argument('--video', required=False,metavar="path or URL to video",help='Video to apply the color splash effect on')args = parser.parse_args()# 验证参数if args.command == "train":assert args.dataset, "Argument --dataset is required for training"elif args.command == "splash":assert args.image or args.video,\"Provide --image or --video to apply color splash"print("Weights: ", args.weights)print("Dataset: ", args.dataset)print("Logs: ", args.logs)# Configurationsif args.command == "train":config = LandslideConfig()else:class InferenceConfig(LandslideConfig):# 设置 batch size 为 1 因此我们一次将对一幅图像进行推理.# Batch size = GPU_COUNT * IMAGES_PER_GPUGPU_COUNT = 1IMAGES_PER_GPU = 1config = InferenceConfig()config.display()# 创建模型if args.command == "train":model = modellib.MaskRCNN(mode="training", config=config,model_dir=args.logs)else:model = modellib.MaskRCNN(mode="inference", config=config,model_dir=args.logs)# 选择要加载的权重文件if args.weights.lower() == "coco":weights_path = COCO_WEIGHTS_PATH# 下载 weights fileif not os.path.exists(weights_path):utils.download_trained_weights(weights_path)elif args.weights.lower() == "last":# 查找最近的训练权重weights_path = model.find_last()elif args.weights.lower() == "imagenet":# 从 ImageNet 的权重开始训练weights_path = model.get_imagenet_weights()else:weights_path = args.weights# 加载 weightsprint("Loading weights ", weights_path)if args.weights.lower() == "coco":# 排除最后一层,应为特们需要匹配数量的类model.load_weights(weights_path, by_name=True, exclude=["mrcnn_class_logits", "mrcnn_bbox_fc","mrcnn_bbox", "mrcnn_mask"])else:model.load_weights(weights_path, by_name=True)# 训练或评估if args.command == "train":train(model)elif args.command == "splash":detect_and_color_splash(model, image_path=args.image,video_path=args.video)elif args.command == "show":detect_and_show(model, image_path=args.image)else:print("'{}' is not recognized. ""Use 'train' or 'splash'".format(args.command))