"""Provides functionality to write the generated data to disc."""
import os
from typing import List, Dict, Union, Any, Set, Tuple
import json
import csv
import numpy as np
import bpy
import mathutils
import h5py
from blenderproc.python.postprocessing.PostProcessingUtility import trim_redundant_channels, \
segmentation_mapping
from blenderproc.python.postprocessing.PostProcessingUtility import dist2depth, depth2dist
from blenderproc.python.types.EntityUtility import Entity
from blenderproc.python.utility.BlenderUtility import load_image
from blenderproc.python.utility.Utility import resolve_path, Utility, NumpyEncoder
from blenderproc.python.utility.MathUtility import change_coordinate_frame_of_point, \
change_source_coordinate_frame_of_transformation_matrix, change_target_coordinate_frame_of_transformation_matrix
from blenderproc.python.camera import CameraUtility
[docs]
def write_hdf5(output_dir_path: str, output_data_dict: Dict[str, List[Union[np.ndarray, list, dict]]],
append_to_existing_output: bool = False, stereo_separate_keys: bool = False):
"""
Saves the information provided inside of the output_data_dict into a .hdf5 container
:param output_dir_path: The folder path in which the .hdf5 containers will be generated
:param output_data_dict: The container, which keeps the different images, which should be saved to disc.
Each key will be saved as its own key in the .hdf5 container.
:param append_to_existing_output: If this is True, the output_dir_path folder will be scanned for pre-existing
.hdf5 containers and the numbering of the newly added containers, will start
right where the last run left off.
:param stereo_separate_keys: If this is True and the rendering was done in stereo mode, than the stereo images
won't be saved in one tensor [2, img_x, img_y, channels], where the img[0] is the
left image and img[1] the right. They will be saved in separate keys: for example
for colors in colors_0 and colors_1.
"""
if not os.path.exists(output_dir_path):
os.makedirs(output_dir_path)
amount_of_frames = 0
for data_block in output_data_dict.values():
if isinstance(data_block, list):
amount_of_frames = max([amount_of_frames, len(data_block)])
# if append to existing output is turned on the existing folder is searched for the highest occurring
# index, which is then used as starting point for this run
if append_to_existing_output:
frame_offset = 0
# Look for hdf5 file with highest index
for path in os.listdir(output_dir_path):
if path.endswith(".hdf5"):
index = path[:-len(".hdf5")]
if index.isdigit():
frame_offset = max(frame_offset, int(index) + 1)
else:
frame_offset = 0
if amount_of_frames != bpy.context.scene.frame_end - bpy.context.scene.frame_start:
raise Exception("The amount of images stored in the output_data_dict does not correspond with the amount"
"of images specified by frame_start to frame_end.")
for frame in range(bpy.context.scene.frame_start, bpy.context.scene.frame_end):
# for each frame a new .hdf5 file is generated
hdf5_path = os.path.join(output_dir_path, str(frame + frame_offset) + ".hdf5")
with h5py.File(hdf5_path, "w") as file:
# Go through all the output types
print(f"Merging data for frame {frame} into {hdf5_path}")
adjusted_frame = frame - bpy.context.scene.frame_start
for key, data_block in output_data_dict.items():
if adjusted_frame < len(data_block):
# get the current data block for the current frame
used_data_block = data_block[adjusted_frame]
if stereo_separate_keys and (bpy.context.scene.render.use_multiview or
used_data_block.shape[0] == 2):
# stereo mode was activated
_WriterUtility.write_to_hdf_file(file, key + "_0", data_block[adjusted_frame][0])
_WriterUtility.write_to_hdf_file(file, key + "_1", data_block[adjusted_frame][1])
else:
_WriterUtility.write_to_hdf_file(file, key, data_block[adjusted_frame])
else:
raise Exception(f"There are more frames {adjusted_frame} then there are blocks of information "
f" {len(data_block)} in the given list for key {key}.")
blender_proc_version = Utility.get_current_version()
if blender_proc_version is not None:
_WriterUtility.write_to_hdf_file(file, "blender_proc_version", np.string_(blender_proc_version))
[docs]
class _WriterUtility:
[docs]
@staticmethod
def load_registered_outputs(keys: Set[str], keys_with_alpha_channel: Set[str] = None) -> \
Dict[str, Union[np.ndarray, List[np.ndarray]]]:
"""
Loads registered outputs with specified keys
:param keys: set of output_key types to load
:param keys_with_alpha_channel: A set containing all keys whose alpha channels should be loaded.
:return: dict of lists of raw loaded outputs. Keys are e.g. 'distance', 'colors', 'normals', 'segmap'
"""
output_data_dict: Dict[str, Union[np.ndarray, List[np.ndarray]]] = {}
reg_outputs = Utility.get_registered_outputs()
for reg_out in reg_outputs:
if reg_out['key'] in keys:
key_has_alpha_channel = keys_with_alpha_channel is not None and reg_out[
'key'] in keys_with_alpha_channel
if '%' in reg_out['path']:
# per frame outputs
for frame_id in range(bpy.context.scene.frame_start, bpy.context.scene.frame_end):
output_path = resolve_path(reg_out['path'] % frame_id)
if os.path.exists(output_path):
output_file = _WriterUtility.load_output_file(output_path, key_has_alpha_channel)
else:
# check for stereo files
output_paths = _WriterUtility.get_stereo_path_pair(output_path)
# convert to a tensor of shape [2, img_x, img_y, channels]
# output_file[0] is the left image and output_file[1] the right image
output_file = np.array(
[_WriterUtility.load_output_file(path, key_has_alpha_channel) for path in
output_paths])
# For outputs like distance or depth, we automatically trim the last channel here
if "trim_redundant_channels" in reg_out and reg_out["trim_redundant_channels"]:
output_file = trim_redundant_channels(output_file)
if "convert_to_depth" in reg_out and reg_out["convert_to_depth"]:
output_file = dist2depth(output_file)
if "convert_to_distance" in reg_out and reg_out["convert_to_distance"]:
output_file = depth2dist(output_file)
# semantic seg must be last
if "is_semantic_segmentation" in reg_out and reg_out["is_semantic_segmentation"]\
and "semantic_segmentation_mapping" in reg_out \
and "semantic_segmentation_default_values" in reg_out:
output_file = segmentation_mapping(output_file,
reg_out["semantic_segmentation_mapping"],
reg_out["semantic_segmentation_default_values"])
for key, output_info in output_file.items():
output_data_dict.setdefault(key, []).append(output_info)
else:
output_data_dict.setdefault(reg_out['key'], []).append(output_file)
else:
# per run outputs
output_path = resolve_path(reg_out['path'])
output_file = _WriterUtility.load_output_file(output_path, key_has_alpha_channel)
output_data_dict[reg_out['key']] = output_file
return output_data_dict
[docs]
@staticmethod
def get_stereo_path_pair(file_path: str) -> Tuple[str, str]:
"""
Returns stereoscopic file path pair for a given "normal" image file path.
:param file_path: The file path of a single image.
:return: The pair of file paths corresponding to the stereo images,
"""
path_split = file_path.split(".")
path_l = f"{path_split[0]}_L.{path_split[1]}"
path_r = f"{path_split[0]}_R.{path_split[1]}"
return path_l, path_r
[docs]
@staticmethod
def load_output_file(file_path: str, load_alpha_channel: bool = False,
remove: bool = True) -> Union[np.ndarray, List[Any]]:
""" Tries to read in the file with the given path into a numpy array.
:param file_path: The file path. Type: string.
:param load_alpha_channel: Whether to load the alpha channel as well. Type: bool. Default: False
:param remove: Whether to delete file after loading.
:return: Loaded data from the file as numpy array if possible.
"""
if not os.path.exists(file_path):
raise FileNotFoundError("File not found: " + file_path)
file_ending = file_path[file_path.rfind(".") + 1:].lower()
if file_ending in ["exr", "png", "jpg"]:
# num_channels is 4 if transparent_background is true in config
output = load_image(file_path, num_channels=3 + (1 if load_alpha_channel else 0))
elif file_ending in ["npy", "npz"]:
output = np.load(file_path)
elif file_ending in ["csv"]:
output = _WriterUtility.load_csv(file_path)
else:
raise NotImplementedError("File with ending " + file_ending + " cannot be loaded.")
if remove:
os.remove(file_path)
return output
[docs]
@staticmethod
def load_csv(file_path: str) -> List[Any]:
""" Load the csv file at the given path.
:param file_path: The path. Type: string.
:return: The content of the file
"""
rows = []
with open(file_path, mode='r', encoding="utf-8") as csv_file:
csv_reader = csv.DictReader(csv_file)
for row in csv_reader:
rows.append(row)
return rows
[docs]
@staticmethod
def get_common_attribute(item: bpy.types.Object, attribute_name: str,
local_frame_change: Union[None, List[str]] = None,
world_frame_change: Union[None, List[str]] = None) -> Any:
""" Returns the value of the requested attribute for the given item.
This method covers all general attributes that blender objects have.
:param item: The item. Type: blender object.
:param attribute_name: The attribute name. Type: string.
:param local_frame_change: Can be used to change the local coordinate frame of matrices.
Default: ["X", "Y", "Z"]
:param world_frame_change: Can be used to change the world coordinate frame of points and matrices.
Default: ["X", "Y", "Z"]
:return: The attribute value.
"""
if local_frame_change is None:
local_frame_change = ["X", "Y", "Z"]
if world_frame_change is None:
world_frame_change = ["X", "Y", "Z"]
# Print warning if local_frame_change is used with other attributes than matrix_world
if local_frame_change != ["X", "Y", "Z"] and attribute_name in ["location", "rotation_euler",
"rotation_forward_vec", "rotation_up_vec"]:
print("Warning: The local_frame_change parameter is at the moment only supported by "
"the matrix_world attribute.")
if attribute_name == "name":
return item.name
if attribute_name == "location":
return change_coordinate_frame_of_point(item.location, world_frame_change)
if attribute_name == "rotation_euler":
return change_coordinate_frame_of_point(item.rotation_euler, world_frame_change)
if attribute_name == "rotation_forward_vec":
# Calc forward vector from rotation matrix
rot_mat = item.rotation_euler.to_matrix()
forward = rot_mat @ mathutils.Vector([0, 0, -1])
return change_coordinate_frame_of_point(forward, world_frame_change)
if attribute_name == "rotation_up_vec":
# Calc up vector from rotation matrix
rot_mat = item.rotation_euler.to_matrix()
up = rot_mat @ mathutils.Vector([0, 1, 0])
return change_coordinate_frame_of_point(up, world_frame_change)
if attribute_name == "matrix_world":
# Transform matrix_world to given destination frame
matrix_world = change_source_coordinate_frame_of_transformation_matrix(Entity(item).get_local2world_mat(),
local_frame_change)
matrix_world = change_target_coordinate_frame_of_transformation_matrix(matrix_world, world_frame_change)
return [list(c) for c in matrix_world]
if attribute_name.startswith("customprop_"):
custom_property_name = attribute_name[len("customprop_"):]
# Make sure the requested custom property exist
if custom_property_name in item:
return item[custom_property_name]
raise ValueError(f"No such custom property: {custom_property_name}")
raise ValueError(f"No such attribute: {attribute_name}")
[docs]
@staticmethod
def get_cam_attribute(cam_ob: bpy.context.scene.camera, attribute_name: str,
local_frame_change: Union[None, List[str]] = None,
world_frame_change: Union[None, List[str]] = None) -> Any:
""" Returns the value of the requested attribute for the given object.
:param cam_ob: The camera object.
:param attribute_name: The attribute name.
:param local_frame_change: Can be used to change the local coordinate frame of matrices.
Default: ["X", "Y", "Z"]
:param world_frame_change: Can be used to change the world coordinate frame of points and matrices.
Default: ["X", "Y", "Z"]
:return: The attribute value.
"""
if attribute_name == "fov_x":
return CameraUtility.get_fov()[0]
if attribute_name == "fov_y":
return CameraUtility.get_fov()[1]
if attribute_name == "shift_x":
return cam_ob.data.shift_x
if attribute_name == "shift_y":
return cam_ob.data.shift_y
if attribute_name == "half_fov_x":
return CameraUtility.get_fov()[0] * 0.5
if attribute_name == "half_fov_y":
return CameraUtility.get_fov()[1] * 0.5
if attribute_name == "cam_K":
return [list(c) for c in CameraUtility.get_intrinsics_as_K_matrix()]
if attribute_name == "cam2world_matrix":
return _WriterUtility.get_common_attribute(cam_ob, "matrix_world", local_frame_change,
world_frame_change)
return _WriterUtility.get_common_attribute(cam_ob, attribute_name, local_frame_change,
world_frame_change)
[docs]
@staticmethod
def get_light_attribute(light: bpy.types.Light, attribute_name: str,
local_frame_change: Union[None, List[str]] = None,
world_frame_change: Union[None, List[str]] = None) -> Any:
""" Returns the value of the requested attribute for the given light.
:param light: The light. Type: blender scene object of type light.
:param attribute_name: The attribute name.
:param local_frame_change: Can be used to change the local coordinate frame of matrices.
Default: ["X", "Y", "Z"]
:param world_frame_change: Can be used to change the world coordinate frame of points and matrices.
Default: ["X", "Y", "Z"]
:return: The attribute value.
"""
if attribute_name == "energy":
return light.data.energy
return _WriterUtility.get_common_attribute(light, attribute_name, local_frame_change, world_frame_change)
[docs]
@staticmethod
def _get_shapenet_attribute(shapenet_obj: bpy.types.Object, attribute_name: str,
local_frame_change: Union[None, List[str]] = None,
world_frame_change: Union[None, List[str]] = None):
""" Returns the value of the requested attribute for the given object.
:param shapenet_obj: The ShapeNet object.
:param attribute_name: The attribute name.
:param local_frame_change: Can be used to change the local coordinate frame of matrices.
Default: ["X", "Y", "Z"]
:param world_frame_change: Can be used to change the world coordinate frame of points and matrices.
Default: ["X", "Y", "Z"]
:return: The attribute value.
"""
if attribute_name == "used_synset_id":
return shapenet_obj.get("used_synset_id", "")
if attribute_name == "used_source_id":
return shapenet_obj.get("used_source_id", "")
return _WriterUtility.get_common_attribute(shapenet_obj, attribute_name, local_frame_change,
world_frame_change)
[docs]
@staticmethod
def write_to_hdf_file(file, key: str, data: Union[np.ndarray, list, dict], compression: str = "gzip"):
""" Adds the given data as a new entry to the given hdf5 file.
:param file: The hdf5 file handle. Type: hdf5.File
:param key: The key at which the data should be stored in the hdf5 file.
:param data: The data to store.
"""
if not isinstance(data, np.ndarray) and not isinstance(data, np.bytes_):
if isinstance(data, (list, dict)):
# If the data contains one or multiple dicts that contain e.q. object states
if isinstance(data, dict) or len(data) > 0 and isinstance(data[0], dict):
# Serialize them into json (automatically convert numpy arrays to lists)
data = np.string_(json.dumps(data, cls=NumpyEncoder))
data = np.array(data)
else:
raise Exception(
f"This fct. expects the data for key {key} to be a np.ndarray, list or dict not a {type(data)}!")
if data.dtype.char == 'S':
file.create_dataset(key, data=data, dtype=data.dtype)
else:
file.create_dataset(key, data=data, compression=compression)