"""Allows rendering the content of the scene in the coco file format."""
import datetime
from itertools import groupby
import json
import os
from typing import Optional, Dict, Union, Tuple, List
import numpy as np
from skimage import measure
import cv2
import bpy
from blenderproc.python.utility.LabelIdMapping import LabelIdMapping
[docs]
def write_coco_annotations(output_dir: str, instance_segmaps: List[np.ndarray],
instance_attribute_maps: List[dict],
colors: List[np.ndarray], color_file_format: str = "PNG",
mask_encoding_format: str = "rle", supercategory: str = "coco_annotations",
append_to_existing_output: bool = True,
jpg_quality: int = 95, label_mapping: Optional[LabelIdMapping] = None,
file_prefix: str = "", indent: Optional[Union[int, str]] = None):
""" Writes coco annotations in the following steps:
1. Locate the seg images
2. Locate the rgb maps
3. Locate the seg mappings
4. Read color mappings
5. For each frame write the coco annotation
:param output_dir: Output directory to write the coco annotations
:param instance_segmaps: List of instance segmentation maps
:param instance_attribute_maps: per-frame mappings with idx, class and optionally supercategory/bop_dataset_name
:param colors: List of color images. Does not support stereo images, enter left and right inputs subsequently.
:param color_file_format: Format to save color images in
:param mask_encoding_format: Encoding format of the binary masks. Default: 'rle'. Available: 'rle', 'polygon'.
:param supercategory: name of the dataset/supercategory to filter for, e.g. a specific BOP dataset set
by 'bop_dataset_name' or any loaded object with specified 'cp_supercategory'
:param append_to_existing_output: If true and if there is already a coco_annotations.json file in the output
directory, the new coco annotations will be appended to the existing file.
Also, the rgb images will be named such that there are no collisions.
:param jpg_quality: The desired quality level of the jpg encoding
:param label_mapping: The label mapping which should be used to label the categories based on their ids.
If None, is given then the `name` field in the csv files is used or - if not existing -
the category id itself is used.
:param file_prefix: Optional prefix for image file names
:param indent: If indent is a non-negative integer or string, then the annotation output
will be pretty-printed with that indent level. An indent level of 0, negative, or "" will
only insert newlines. None (the default) selects the most compact representation.
Using a positive integer indent indents that many spaces per level.
If indent is a string (such as "\t"), that string is used to indent each level.
"""
if len(colors) > 0 and len(colors[0].shape) == 4:
raise ValueError("BlenderProc currently does not support writing coco annotations for stereo images. "
"However, you can enter left and right images / segmaps separately.")
# Create output directory
os.makedirs(os.path.join(output_dir, 'images'), exist_ok=True)
coco_annotations_path = os.path.join(output_dir, "coco_annotations.json")
# Calculate image numbering offset, if append_to_existing_output is activated and coco data exists
if append_to_existing_output and os.path.exists(coco_annotations_path):
with open(coco_annotations_path, 'r', encoding="utf-8") as fp:
existing_coco_annotations = json.load(fp)
image_offset = max(image["id"] for image in existing_coco_annotations["images"]) + 1
else:
image_offset = 0
existing_coco_annotations = None
# collect all RGB paths
new_coco_image_paths = []
# for each rendered frame
for frame in range(bpy.context.scene.frame_start, bpy.context.scene.frame_end):
color_rgb = colors[frame - bpy.context.scene.frame_start]
# Reverse channel order for opencv
color_bgr = color_rgb.copy()
color_bgr[..., :3] = color_bgr[..., :3][..., ::-1]
if color_file_format == 'PNG':
target_base_path = f'images/{file_prefix}{frame + image_offset:06d}.png'
target_path = os.path.join(output_dir, target_base_path)
cv2.imwrite(target_path, color_bgr)
elif color_file_format == 'JPEG':
target_base_path = f'images/{file_prefix}{frame + image_offset:06d}.jpg'
target_path = os.path.join(output_dir, target_base_path)
cv2.imwrite(target_path, color_bgr, [int(cv2.IMWRITE_JPEG_QUALITY), jpg_quality])
else:
raise RuntimeError(f'Unknown color_file_format={color_file_format}. Try "PNG" or "JPEG"')
new_coco_image_paths.append(target_base_path)
coco_output = _CocoWriterUtility.generate_coco_annotations(instance_segmaps,
instance_attribute_maps,
new_coco_image_paths,
supercategory,
mask_encoding_format,
existing_coco_annotations,
label_mapping)
print("Writing coco annotations to " + coco_annotations_path)
with open(coco_annotations_path, 'w', encoding="utf-8") as fp:
json.dump(coco_output, fp, indent=indent)
[docs]
def binary_mask_to_rle(binary_mask: np.ndarray) -> Dict[str, List[int]]:
"""Converts a binary mask to COCOs run-length encoding (RLE) format. Instead of outputting
a mask image, you give a list of start pixels and how many pixels after each of those
starts are included in the mask.
:param binary_mask: a 2D binary numpy array where '1's represent the object
:return: Mask in RLE format
"""
rle: Dict[str, List[int]] = {'counts': [], 'size': list(binary_mask.shape)}
counts = rle.get('counts')
for i, (value, elements) in enumerate(groupby(binary_mask.ravel(order='F'))):
if i == 0 and value == 1:
counts.append(0)
counts.append(len(list(elements)))
return rle
[docs]
def rle_to_binary_mask(rle: Dict[str, List[int]]) -> np.ndarray:
"""Converts a COCOs run-length encoding (RLE) to binary mask.
:param rle: Mask in RLE format
:return: a 2D binary numpy array where '1's represent the object
"""
binary_array = np.zeros(np.prod(rle.get('size')), dtype=bool)
counts: List[int] = rle.get('counts')
start = 0
for i in range(len(counts) - 1):
start += counts[i]
end = start + counts[i + 1]
binary_array[start:end] = (i + 1) % 2
binary_mask = binary_array.reshape(*rle.get('size'), order='F')
return binary_mask
[docs]
class _CocoWriterUtility:
[docs]
@staticmethod
def generate_coco_annotations(inst_segmaps, inst_attribute_maps, image_paths, supercategory,
mask_encoding_format, existing_coco_annotations=None,
label_mapping: LabelIdMapping = None):
"""Generates coco annotations for images
:param inst_segmaps: List of instance segmentation maps
:param inst_attribute_maps: per-frame mappings with idx, class and optionally supercategory/bop_dataset_name
:param image_paths: A list of paths which points to the rendered images.
:param supercategory: name of the dataset/supercategory to filter for, e.g. a specific BOP dataset
:param mask_encoding_format: Encoding format of the binary mask. Type: string.
:param existing_coco_annotations: If given, the new coco annotations will be appended to the given
coco annotations dict.
:param label_mapping: The label mapping which should be used to label the categories based on their ids.
If None, is given then the `name` field in the csv files is used or - if not existing -
the category id itself is used.
:return: dict containing coco annotations
"""
categories = []
visited_categories = []
instance_2_category_maps = []
for inst_attribute_map in inst_attribute_maps:
instance_2_category_map = {}
for inst in inst_attribute_map:
# skip background
if int(inst["category_id"]) != 0:
# take all objects or objects from specified supercategory is defined
inst_supercategory = "coco_annotations"
if "bop_dataset_name" in inst:
inst_supercategory = inst["bop_dataset_name"]
elif "supercategory" in inst:
inst_supercategory = inst["supercategory"]
if supercategory in [inst_supercategory, 'coco_annotations']:
if int(inst["category_id"]) not in visited_categories:
cat_dict: Dict[str, Union[str, int]] = {'id': int(inst["category_id"]),
'supercategory': inst_supercategory}
# Determine name of category based on label_mapping, name or category_id
if label_mapping is not None:
cat_dict["name"] = label_mapping.label_from_id(cat_dict['id'])
elif "name" in inst:
cat_dict["name"] = inst["name"]
else:
cat_dict["name"] = inst["category_id"]
categories.append(cat_dict)
visited_categories.append(cat_dict['id'])
instance_2_category_map[int(inst["idx"])] = int(inst["category_id"])
instance_2_category_maps.append(instance_2_category_map)
licenses = [{
"id": 1,
"name": "Attribution-NonCommercial-ShareAlike License",
"url": "http://creativecommons.org/licenses/by-nc-sa/2.0/"
}]
info = {
"description": supercategory,
"url": "https://github.com/waspinator/pycococreator",
"version": "0.1.0",
"year": 2020,
"contributor": "Unknown",
"date_created": datetime.datetime.utcnow().isoformat(' ')
}
images: List[Dict[str, Union[str, int]]] = []
annotations: List[Dict[str, Union[str, int]]] = []
for inst_segmap, image_path, instance_2_category_map in zip(inst_segmaps, image_paths,
instance_2_category_maps):
# Add coco info for image
image_id = len(images)
images.append(_CocoWriterUtility.create_image_info(image_id, image_path, inst_segmap.shape))
# Go through all objects visible in this image
instances = np.unique(inst_segmap)
# Remove background
instances = np.delete(instances, np.where(instances == 0))
for inst in instances:
if inst in instance_2_category_map:
# Calc object mask
binary_inst_mask = np.where(inst_segmap == inst, 1, 0)
# Add coco info for object in this image
annotation = _CocoWriterUtility.create_annotation_info(len(annotations) + 1,
image_id,
instance_2_category_map[inst],
binary_inst_mask,
mask_encoding_format)
if annotation is not None:
annotations.append(annotation)
new_coco_annotations = {
"info": info,
"licenses": licenses,
"categories": categories,
"images": images,
"annotations": annotations
}
if existing_coco_annotations is not None:
new_coco_annotations = _CocoWriterUtility.merge_coco_annotations(existing_coco_annotations,
new_coco_annotations)
return new_coco_annotations
[docs]
@staticmethod
def merge_coco_annotations(existing_coco_annotations, new_coco_annotations):
""" Merges the two given coco annotation dicts into one.
Currently, this requires both coco annotations to have the exact same categories/objects.
The "images" and "annotations" sections are concatenated and respective ids are adjusted.
:param existing_coco_annotations: A dict describing the first coco annotations.
:param new_coco_annotations: A dict describing the second coco annotations.
:return: A dict containing the merged coco annotations.
"""
# Concatenate category sections
for cat_dict in new_coco_annotations["categories"]:
if cat_dict not in existing_coco_annotations["categories"]:
existing_coco_annotations["categories"].append(cat_dict)
# Concatenate images sections
image_id_offset = max(image["id"] for image in existing_coco_annotations["images"]) + 1
for image in new_coco_annotations["images"]:
image["id"] += image_id_offset
existing_coco_annotations["images"].extend(new_coco_annotations["images"])
# Concatenate annotations sections
if len(existing_coco_annotations["annotations"]) > 0:
annotation_id_offset = max(annotation["id"] for annotation in existing_coco_annotations["annotations"])
else:
annotation_id_offset = 0
for annotation in new_coco_annotations["annotations"]:
annotation["id"] += annotation_id_offset
annotation["image_id"] += image_id_offset
existing_coco_annotations["annotations"].extend(new_coco_annotations["annotations"])
return existing_coco_annotations
[docs]
@staticmethod
def create_image_info(image_id: int, file_name: str, image_size: Tuple[int, int]) -> Dict[str, Union[str, int]]:
"""Creates image info section of coco annotation
:param image_id: integer to uniquly identify image
:param file_name: filename for image
:param image_size: The size of the image, given as [W, H]
"""
image_info: Dict[str, Union[str, int]] = {
"id": image_id,
"file_name": file_name,
"width": image_size[1],
"height": image_size[0],
"date_captured": datetime.datetime.utcnow().isoformat(' '),
"license": 1,
"coco_url": "",
"flickr_url": ""
}
return image_info
[docs]
@staticmethod
def create_annotation_info(annotation_id: int, image_id: int, category_id: int, binary_mask: np.ndarray,
mask_encoding_format: str, tolerance: int = 2) -> Optional[Dict[str, Union[str, int]]]:
"""Creates info section of coco annotation
:param annotation_id: integer to uniquly identify the annotation
:param image_id: integer to uniquly identify image
:param category_id: Id of the category
:param binary_mask: A binary image mask of the object with the shape [H, W].
:param mask_encoding_format: Encoding format of the mask. Type: string.
:param tolerance: The tolerance for fitting polygons to the objects mask.
"""
area = _CocoWriterUtility.calc_binary_mask_area(binary_mask)
if area < 1:
return None
bounding_box = _CocoWriterUtility.bbox_from_binary_mask(binary_mask)
if mask_encoding_format == 'rle':
segmentation = binary_mask_to_rle(binary_mask)
elif mask_encoding_format == 'polygon':
segmentation = _CocoWriterUtility.binary_mask_to_polygon(binary_mask, tolerance)
if not segmentation:
return None
else:
raise RuntimeError(f"Unknown encoding format: {mask_encoding_format}")
annotation_info: Dict[str, Union[str, int]] = {
"id": annotation_id,
"image_id": image_id,
"category_id": category_id,
"iscrowd": 0,
"area": area,
"bbox": bounding_box,
"segmentation": segmentation,
"width": binary_mask.shape[1],
"height": binary_mask.shape[0],
}
return annotation_info
[docs]
@staticmethod
def bbox_from_binary_mask(binary_mask: np.ndarray) -> List[int]:
""" Returns the smallest bounding box containing all pixels marked "1" in the given image mask.
:param binary_mask: A binary image mask with the shape [H, W].
:return: The bounding box represented as [x, y, width, height]
"""
# Find all columns and rows that contain 1s
rows = np.any(binary_mask, axis=1)
cols = np.any(binary_mask, axis=0)
# Find the min and max col/row index that contain 1s
rmin, rmax = np.where(rows)[0][[0, -1]]
cmin, cmax = np.where(cols)[0][[0, -1]]
# Calc height and width
h = rmax - rmin + 1
w = cmax - cmin + 1
return [int(cmin), int(rmin), int(w), int(h)]
[docs]
@staticmethod
def calc_binary_mask_area(binary_mask: np.ndarray) -> int:
""" Returns the area of the given binary mask which is defined as the number of 1s in the mask.
:param binary_mask: A binary image mask with the shape [H, W].
:return: The computed area
"""
return binary_mask.sum().tolist()
[docs]
@staticmethod
def close_contour(contour: np.ndarray) -> np.ndarray:
""" Makes sure the given contour is closed.
:param contour: The contour to close.
:return: The closed contour.
"""
# If first != last point => add first point to end of contour to close it
if not np.array_equal(contour[0], contour[-1]):
contour = np.vstack((contour, contour[0]))
return contour
[docs]
@staticmethod
def binary_mask_to_polygon(binary_mask: np.ndarray, tolerance: int = 0) -> List[np.ndarray]:
"""Converts a binary mask to COCO polygon representation
:param binary_mask: a 2D binary numpy array where '1's represent the object
:param tolerance: Maximum distance from original points of polygon to approximated polygonal chain. If
tolerance is 0, the original coordinate array is returned.
"""
polygons = []
# pad mask to close contours of shapes which start and end at an edge
padded_binary_mask = np.pad(binary_mask, pad_width=1, mode='constant', constant_values=0)
contours = np.array(measure.find_contours(padded_binary_mask, 0.5))
# Reverse padding
contours -= 1
for contour in contours:
# Make sure contour is closed
contour = _CocoWriterUtility.close_contour(contour)
# Approximate contour by polygon
polygon = measure.approximate_polygon(contour, tolerance)
# Skip invalid polygons
if len(polygon) < 3:
continue
# Flip xy to yx point representation
polygon = np.flip(polygon, axis=1)
# Flatten
polygon = polygon.ravel()
# after padding and subtracting 1 we may get -0.5 points in our segmentation
polygon[polygon < 0] = 0
polygons.append(polygon.tolist())
return polygons