diff --git a/gen2-geometry-fitting/README.md b/gen2-geometry-fitting/README.md new file mode 100644 index 000000000..77a593894 --- /dev/null +++ b/gen2-geometry-fitting/README.md @@ -0,0 +1,32 @@ +# Gen2 Geometry Fitting + +This example demonstrates fitting of predefined geometric primitives onto point clouds of objects detected in RGBD images. The device captures depth and color information and runs YOLOv5 on the image data. Detected bounding boxes are used to fit geometry onto subsets of the full point cloud. Possible types of geometry are plane, cuboid, sphere and cylinder, the fitted geometry depends on the YOLO detection label. You can expect up to 5FPS, depending on the geometry. Cylinder fitting is not officially and will take up to 3 seconds, it is disabled by default, set any detection shape to cylinder to enable. The example was tested on a OAK-D PRO device. +![example](https://github.com/leoogrizek/depthai-experiments/assets/35898289/1b2c212b-92f6-4a56-9cf8-fbc7929a3e8f) + +## Installation + +1. Clone the repository: + ```bash + git clone https://github.com/luxonis/depthai-experiments + cd [example-directory] + ``` +2. It is recommended to run this example in a virtual environment + ```bash + conda create --name myenv + conda activate myenv + ``` +3. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +## Usage + +The example can be run with + +```bash +python geometry_fitting.py +``` +If you would prefer running this in Jupyter, simply run all cells in `geometry_fitting.ipynb`. + +By default this example does not print out the parameters of detected shapes. If you want this functionality change the variable `verbose` to `True`. diff --git a/gen2-geometry-fitting/geometry fitting.ipynb b/gen2-geometry-fitting/geometry fitting.ipynb new file mode 100644 index 000000000..2e470829c --- /dev/null +++ b/gen2-geometry-fitting/geometry fitting.ipynb @@ -0,0 +1,1283 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "4857a209", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Jupyter environment detected. Enabling Open3D WebVisualizer.\n", + "[Open3D INFO] WebRTC GUI backend enabled.\n", + "[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.\n" + ] + } + ], + "source": [ + "import os\n", + "import time\n", + "import json\n", + "from sys import maxsize\n", + "from itertools import combinations\n", + "from typing import *\n", + "\n", + "from numba import jit\n", + "import numpy as np\n", + "import open3d as o3d\n", + "import cv2\n", + "import depthai as dai\n", + "from pathlib import Path\n", + "from scipy.spatial import ConvexHull\n", + "from sklearn.cluster import DBSCAN\n", + "import pyransac3d as pyrsc\n", + "import blobconverter\n", + "\n", + "\n", + "class PointCloudVisualizer():\n", + " \"\"\"\n", + " PointCloudVisualizer is a utility class designed to facilitate the visualization of point clouds using Open3D.\n", + " It provides functionalities to convert RGBD images to point clouds, visualize them in a window, and manipulate\n", + " the visualized point clouds by adding meshes or clearing them. The class also handles camera intrinsics \n", + " and sets up the visualization environment with appropriate parameters for optimal viewing.\n", + " \n", + " Attributes:\n", + " - R_camera_to_world: Rotation matrix to transform camera coordinates to world coordinates.\n", + " - pcl: The main point cloud object that will be visualized.\n", + " - pinhole_camera_intrinsic: Camera intrinsic parameters for projecting depth to 3D points.\n", + " - vis: The Open3D visualizer object.\n", + " - meshes: A list to store additional mesh geometries added to the visualizer.\n", + " \"\"\" \n", + " \n", + " \n", + " \n", + " def __init__(self, intrinsic_matrix: np.ndarray, width: int, height: int) -> None:\n", + " # Rotation matrix to transform from camera to world coordinates\n", + " self.R_camera_to_world = np.array([[1, 0, 0], [0, -1, 0], [0, 0, -1]]).astype(np.float64)\n", + " \n", + " # Create an empty point cloud object\n", + " self.pcl = o3d.geometry.PointCloud()\n", + " \n", + " # Define the camera intrinsic parameters using the provided matrix\n", + " self.pinhole_camera_intrinsic = o3d.camera.PinholeCameraIntrinsic(width,\n", + " height,\n", + " intrinsic_matrix[0][0],\n", + " intrinsic_matrix[1][1],\n", + " intrinsic_matrix[0][2],\n", + " intrinsic_matrix[1][2])\n", + " \n", + " # Initialize the Open3D visualizer\n", + " self.vis = o3d.visualization.Visualizer()\n", + " self.vis.create_window(window_name=\"Point Cloud\")\n", + " \n", + " # Add the point cloud to the visualizer\n", + " self.vis.add_geometry(self.pcl)\n", + " \n", + " # Add a coordinate frame origin to the visualizer for reference\n", + " origin = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.05, origin=[0, 0, 0])\n", + " self.vis.add_geometry(origin)\n", + " \n", + " # Adjust the view control settings\n", + " view_control = self.vis.get_view_control()\n", + " view_control.set_constant_z_far(1000)\n", + "\n", + " # List to store all the meshes\n", + " self.meshes = []\n", + " \n", + " # Adjust render options to show the back face of the mesh\n", + " render_option = self.vis.get_render_option()\n", + " render_option.mesh_show_back_face = True\n", + "\n", + " \n", + " \n", + " \n", + " def rgbd_to_projection(self, depth_map: np.ndarray, rgb: np.ndarray, downsample: bool = False, remove_noise: bool = False) -> o3d.geometry.PointCloud:\n", + " \"\"\"\n", + " Convert RGBD images to a point cloud projection.\n", + "\n", + " Parameters:\n", + " - depth_map: Depth image.\n", + " - rgb: RGB image.\n", + " - downsample (optional): Boolean flag to determine if the point cloud should be downsampled.\n", + " False is recommended as curved geometry tends to be detected poorly otherwise\n", + " - remove_noise (optional): Boolean flag to determine if noise should be removed from the point cloud. \n", + " False is recommended as this is very slow\n", + "\n", + " Returns:\n", + " - Point cloud generated from the RGBD image.\n", + " \"\"\"\n", + "\n", + " # Convert numpy RGB and depth images to Open3D Image format\n", + " rgb_o3d = o3d.geometry.Image(rgb)\n", + " depth_o3d = o3d.geometry.Image(depth_map)\n", + "\n", + " # Create an RGBD image from the RGB and depth images\n", + " rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth(\n", + " rgb_o3d, depth_o3d, \n", + " convert_rgb_to_intensity=(len(rgb.shape) != 3), # Convert RGB to intensity if it's not already in that format\n", + " depth_trunc=20000, # Truncate depth values beyond 20 meters\n", + " depth_scale=1000.0 # Scale factor for depth values\n", + " )\n", + "\n", + " # Create a point cloud from the RGBD image using the camera intrinsics\n", + " pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd_image, self.pinhole_camera_intrinsic)\n", + "\n", + " # Downsample the point cloud if the flag is set\n", + " if downsample:\n", + " pcd = pcd.voxel_down_sample(voxel_size=0.01)\n", + "\n", + " # Remove noise from the point cloud if the flag is set\n", + " if remove_noise:\n", + " pcd = pcd.remove_statistical_outlier(30, 0.1)[0]\n", + "\n", + " # Update the point cloud object of the class with the new points and colors\n", + " self.pcl.points = pcd.points\n", + " self.pcl.colors = pcd.colors\n", + "\n", + " # Rotate the point cloud to align with the world coordinates\n", + " self.pcl.rotate(self.R_camera_to_world, center=np.array([0,0,0],dtype=np.float64))\n", + "\n", + " return self.pcl\n", + " \n", + " \n", + "\n", + " def visualize_pcd(self) -> None:\n", + " \"\"\"\n", + " Visualize the point cloud in the visualizer window.\n", + " \"\"\"\n", + "\n", + " # Get the view control object from the visualizer\n", + " view_control = self.vis.get_view_control()\n", + "\n", + " # Convert the current view control parameters to pinhole camera parameters\n", + " pinhole_camera_parameters = view_control.convert_to_pinhole_camera_parameters()\n", + "\n", + " # Set the extrinsic matrix for the camera parameters.\n", + " # This matrix is hardcoded due to observed unexpected behavior with other values.\n", + " pinhole_camera_parameters.extrinsic = [[1.0, 0.0, 0.0, -0.141],\n", + " [0.0, -1.0, 0.0, 0.141],\n", + " [0.0, 0.0, -1.0, 0.52655451],\n", + " [0.0, 0.0, 0.0, 1.0]]\n", + "\n", + " # Apply the updated camera parameters to the visualizer's view control\n", + " view_control.convert_from_pinhole_camera_parameters(pinhole_camera_parameters)\n", + "\n", + " # Update the point cloud geometry in the visualizer\n", + " self.vis.update_geometry(self.pcl)\n", + "\n", + " # Poll for any events (like user input)\n", + " self.vis.poll_events()\n", + "\n", + " # Update the visualizer's renderer to reflect the changes\n", + " self.vis.update_renderer()\n", + " \n", + " \n", + "\n", + " def close_window(self) -> None:\n", + " \"\"\"\n", + " Close the visualizer window.\n", + " \"\"\"\n", + " self.vis.destroy_window()\n", + " \n", + "\n", + "\n", + " def add_mesh(self, mesh: o3d.geometry.TriangleMesh) -> None:\n", + " \"\"\"\n", + " Add a mesh to the visualizer and store it in the list of meshes.\n", + "\n", + " Parameters:\n", + " - mesh: The mesh (of type open3d.geometry.TriangleMesh or similar) to be added to the visualizer.\n", + " \"\"\"\n", + " self.vis.add_geometry(mesh) # Add the mesh to the visualizer\n", + " self.meshes.append(mesh) # Store the mesh in the list of meshes\n", + " \n", + " \n", + "\n", + " def clear_meshes(self) -> None:\n", + " \"\"\"\n", + " Remove all meshes from the visualizer and clear the list of meshes.\n", + " \"\"\"\n", + " for mesh in self.meshes: # Iterate over each mesh in the list\n", + " self.vis.remove_geometry(mesh) # Remove the mesh from the visualizer\n", + " self.meshes = [] \n", + "\n", + "\n", + "\n", + "\n", + "@jit(nopython=True)\n", + "def _get_largest_cluster_label(labels: np.ndarray) -> int:\n", + " \"\"\"\n", + " Get the label of the largest cluster from the given labels.\n", + "\n", + " Parameters:\n", + " - labels: The labels assigned to each point by a clustering algorithm.\n", + "\n", + " Returns:\n", + " - largest_cluster_label: The label of the largest cluster.\n", + " \"\"\"\n", + " unique_labels = np.unique(labels)\n", + " \n", + " # Manually compute counts for each unique label\n", + " counts = np.zeros(unique_labels.shape[0], dtype=np.int64)\n", + " for i, label in enumerate(unique_labels):\n", + " counts[i] = np.sum(labels == label)\n", + " \n", + " mask = unique_labels != -1\n", + " unique_labels = unique_labels[mask]\n", + " counts = counts[mask]\n", + " \n", + " largest_cluster_label = unique_labels[np.argmax(counts)]\n", + " return largest_cluster_label\n", + "\n", + "def segment_largest_cluster(pcl: o3d.geometry.PointCloud, eps: float = 0.05, min_samples: int = 10, max_points: int = 2000) -> o3d.geometry.PointCloud:\n", + " \"\"\"\n", + " Segment the largest cluster from a point cloud using DBSCAN.\n", + "\n", + " Parameters:\n", + " - pcl: The input point cloud.\n", + " - eps: The maximum distance between two samples for one to be considered as in the neighborhood of the other.\n", + " - min_samples: The number of samples in a neighborhood for a point to be considered as a core point.\n", + " - max_points: Maximum number of points to consider. If the point cloud has more points, a random subset is taken.\n", + "\n", + " Returns:\n", + " - largest_cluster_pcl: Point cloud of the largest cluster.\n", + " \"\"\"\n", + " points = np.asarray(pcl.points)\n", + " \n", + " if max_points is not None and len(points) > max_points:\n", + " selected_indices = np.random.choice(len(points), max_points, replace=False)\n", + " points = points[selected_indices]\n", + " \n", + " clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points)\n", + " labels = clustering.labels_\n", + "\n", + " largest_cluster_label = _get_largest_cluster_label(labels)\n", + "\n", + " largest_cluster_points = points[labels == largest_cluster_label]\n", + "\n", + " largest_cluster_pcl = o3d.geometry.PointCloud()\n", + " largest_cluster_pcl.points = o3d.utility.Vector3dVector(largest_cluster_points)\n", + "\n", + " return largest_cluster_pcl\n", + "\n", + "\n", + "class HostSync:\n", + " \"\"\"\n", + " HostSync is a utility class designed to synchronize multiple message streams based on their sequence numbers.\n", + " It ensures that for a given sequence number, messages from all streams are present, allowing for synchronized\n", + " processing of data from different sources.\n", + " \"\"\"\n", + "\n", + " def __init__(self) -> None:\n", + " \"\"\"\n", + " Initialize the HostSync object with an empty dictionary to store arrays of messages.\n", + " \"\"\"\n", + " self.arrays = {}\n", + "\n", + " def add_msg(self, name: str, msg: Any) -> Union[Dict[str, Any], bool]:\n", + " \"\"\"\n", + " Add a message to the corresponding array based on its name.\n", + "\n", + " Parameters:\n", + " - name: The name (or type) of the message.\n", + " - msg: The message object to be added.\n", + "\n", + " Returns:\n", + " - A dictionary of synced messages if there are enough synced messages, otherwise False.\n", + " \"\"\"\n", + " # Check if the message type exists in the dictionary, if not, create an empty list for it\n", + " if not name in self.arrays:\n", + " self.arrays[name] = []\n", + "\n", + " # Append the message to the corresponding list along with its sequence number\n", + " self.arrays[name].append({\"msg\": msg, \"seq\": msg.getSequenceNum()})\n", + "\n", + " synced = {}\n", + " # Check for messages with the same sequence number across all message types\n", + " for name, arr in self.arrays.items():\n", + " for i, obj in enumerate(arr):\n", + " if msg.getSequenceNum() == obj[\"seq\"]:\n", + " synced[name] = obj[\"msg\"]\n", + " break\n", + "\n", + " # If there are 3 synced messages (in this use case color, depth, nn but you may add more by changing 3 to something else), \n", + " # remove all older messages and return the synced messages\n", + " if len(synced) == 3:\n", + " # Remove older messages with sequence numbers less than the current message's sequence number\n", + " for name, arr in self.arrays.items():\n", + " for i, obj in enumerate(arr):\n", + " if obj[\"seq\"] < msg.getSequenceNum():\n", + " arr.remove(obj)\n", + " else:\n", + " break\n", + " return synced\n", + " return False" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "ad203f82", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "shape_dict provides a mapping between object names and their corresponding geometric shapes. \n", + "The shapes are approximated using basic geometric primitives. \n", + "For objects that do not have a clear or simple geometric representation, the value is set to None.\n", + "\n", + "Key: Object name (e.g., \"person\", \"car\", \"apple\"), must be a YOLO label\n", + "Value: Geometric shape representing the object \"cylinder\", \"cuboid\", \"sphere\", \"plane\" or None if no simple shape can represent the object.\n", + "\"\"\"\n", + "shape_dict = {\n", + " \"person\": None,\n", + " \"bicycle\": None,\n", + " \"car\": \"cuboid\",\n", + " \"motorbike\": None,\n", + " \"aeroplane\": None,\n", + " \"bus\": \"cuboid\",\n", + " \"train\": \"cuboid\",\n", + " \"truck\": \"cuboid\",\n", + " \"boat\": \"cuboid\",\n", + " \"traffic light\": None,\n", + " \"fire hydrant\": None,\n", + " \"stop sign\": \"plane\",\n", + " \"parking meter\": None,\n", + " \"bench\": \"cuboid\",\n", + " \"bird\": None,\n", + " \"cat\": None,\n", + " \"dog\": None,\n", + " \"horse\": None,\n", + " \"sheep\": None,\n", + " \"cow\": None,\n", + " \"elephant\": None,\n", + " \"bear\": None,\n", + " \"zebra\": None,\n", + " \"giraffe\": None,\n", + " \"backpack\": \"cuboid\",\n", + " \"umbrella\": None,\n", + " \"handbag\": \"cuboid\",\n", + " \"tie\": None,\n", + " \"suitcase\": \"cuboid\",\n", + " \"frisbee\": None,\n", + " \"skis\": \"plane\",\n", + " \"snowboard\": \"plane\",\n", + " \"sports ball\": \"sphere\",\n", + " \"kite\": None,\n", + " \"baseball bat\": None,\n", + " \"baseball glove\": None,\n", + " \"skateboard\": \"plane\",\n", + " \"surfboard\": \"plane\",\n", + " \"tennis racket\": \"plane\",\n", + " \"bottle\": None,\n", + " \"wine glass\": None,\n", + " \"cup\": None,\n", + " \"fork\": None,\n", + " \"knife\": None,\n", + " \"spoon\": None,\n", + " \"bowl\": None,\n", + " \"banana\": None,\n", + " \"apple\": \"sphere\",\n", + " \"sandwich\": None,\n", + " \"orange\": \"sphere\",\n", + " \"broccoli\": None,\n", + " \"carrot\": None,\n", + " \"hot dog\": None,\n", + " \"pizza\": \"cuboid\",\n", + " \"donut\": None,\n", + " \"cake\": \"cuboid\",\n", + " \"chair\": \"cuboid\",\n", + " \"sofa\": \"cuboid\",\n", + " \"pottedplant\": \"cuboid\",\n", + " \"bed\": \"cuboid\",\n", + " \"diningtable\": \"cuboid\",\n", + " \"toilet\": None,\n", + " \"tvmonitor\": \"cuboid\",\n", + " \"laptop\": \"cuboid\",\n", + " \"mouse\": None,\n", + " \"remote\": \"cuboid\",\n", + " \"keyboard\": \"cuboid\",\n", + " \"cell phone\": \"cuboid\",\n", + " \"microwave\": \"cuboid\",\n", + " \"oven\": \"cuboid\",\n", + " \"toaster\": \"cuboid\",\n", + " \"sink\": None,\n", + " \"refrigerator\": \"cuboid\",\n", + " \"book\": \"cuboid\",\n", + " \"clock\": None,\n", + " \"vase\": None,\n", + " \"scissors\": None,\n", + " \"teddy bear\": None,\n", + " \"hair drier\": None,\n", + " \"toothbrush\": None\n", + "}\n", + "\n", + "\n", + "def fit_and_create_sphere_mesh(points: np.ndarray, thresh: float = 0.2, verbose: bool = False) -> o3d.geometry.TriangleMesh:\n", + " \"\"\"\n", + " fit_and_create_sphere_mesh fits a sphere to a set of 3D points using the RANSAC algorithm and then creates a mesh representation of the fitted sphere.\n", + "\n", + " Parameters:\n", + " - points (numpy.ndarray): A set of 3D coordinates, where each point is represented as an array of [x, y, z] values.\n", + " - thresh (float, optional): A threshold value for the RANSAC algorithm to determine inliers and outliers. Defaults to 0.2\n", + " - verbose (bool, optional): If set to True, prints out the sphere parameters. Defaults to False.\n", + "\n", + " Returns:\n", + " - sphere_mesh (o3d.geometry.TriangleMesh): A mesh representation of the fitted sphere.\n", + " \"\"\"\n", + " \n", + " # Fit a sphere using RANSAC\n", + " sph = pyrsc.Sphere()\n", + " center, radius, _ = sph.fit(points, thresh)\n", + " \n", + " # Print sphere parameters if verbose is True\n", + " if verbose:\n", + " print(f\"Detected sphere with radius {radius:.2f} at location ({center[0]:.2f}, {center[1]:.2f}, {center[2]:.2f})\")\n", + " \n", + " # Create a sphere mesh using Open3D\n", + " sphere_mesh = o3d.geometry.TriangleMesh.create_sphere(radius=radius)\n", + " sphere_mesh.translate(center)\n", + " \n", + " return sphere_mesh\n", + "\n", + "@jit(nopython=True)\n", + "def _project_points_onto_plane(points, best_eq):\n", + " \"\"\"\n", + " Project a set of 3D points onto a plane.\n", + "\n", + " Parameters:\n", + " - points: np.ndarray\n", + " The 3D points to be projected.\n", + " - best_eq: List[float]\n", + " The equation of the plane [a, b, c, d] such that ax + by + cz + d = 0.\n", + "\n", + " Returns:\n", + " - projected_points: np.ndarray\n", + " The projected points onto the plane.\n", + " \"\"\"\n", + " projected_points = np.empty(points.shape)\n", + " for i, point in enumerate(points):\n", + " t = -(best_eq[0] * point[0] + best_eq[1] * point[1] + best_eq[2] * point[2] + best_eq[3]) / (best_eq[0]**2 + best_eq[1]**2 + best_eq[2]**2)\n", + " projected_point = point + t * np.array(best_eq[:3])\n", + " projected_points[i] = projected_point\n", + " return projected_points\n", + "\n", + "@jit(nopython=True)\n", + "def _map_3d_to_2d(points, basis1, basis2):\n", + " \"\"\"\n", + " Map 3D points to a 2D coordinate system defined by two basis vectors.\n", + "\n", + " Parameters:\n", + " - points: np.ndarray\n", + " The 3D points to be mapped.\n", + " - basis1: np.ndarray\n", + " The first basis vector of the 2D coordinate system.\n", + " - basis2: np.ndarray\n", + " The second basis vector of the 2D coordinate system.\n", + "\n", + " Returns:\n", + " - points_2d: np.ndarray\n", + " The mapped 2D points.\n", + " \"\"\"\n", + " points_2d = np.empty((points.shape[0], 2))\n", + " for i, point in enumerate(points):\n", + " coord1 = np.dot(point, basis1)\n", + " coord2 = np.dot(point, basis2)\n", + " points_2d[i] = [coord1, coord2]\n", + " return points_2d\n", + "\n", + "@jit(nopython=True)\n", + "def _create_triangles_from_hull(hull_vertices_count):\n", + " \"\"\"\n", + " Create triangles by connecting adjacent vertices of a convex hull.\n", + "\n", + " Parameters:\n", + " - hull_vertices_count: int\n", + " The number of vertices in the convex hull.\n", + "\n", + " Returns:\n", + " - triangles: np.ndarray\n", + " The triangles formed by connecting adjacent vertices of the convex hull.\n", + " \"\"\"\n", + " triangles = np.empty((hull_vertices_count - 2, 3), dtype=np.int32)\n", + " for i in range(1, hull_vertices_count - 1):\n", + " triangles[i-1] = [0, i, i + 1]\n", + " return triangles\n", + "\n", + "\n", + "\n", + "def fit_and_create_plane_mesh(points: np.ndarray, thresh: float = 0.1, verbose: bool = False) -> o3d.geometry.TriangleMesh:\n", + " \"\"\"\n", + " Fit a plane to a set of 3D points using the RANSAC algorithm and then creates a mesh representation of the fitted plane.\n", + "\n", + " Parameters:\n", + " - points (numpy.ndarray): A set of 3D coordinates, where each point is an array with [x, y, z] values.\n", + " - thresh (float, optional): A threshold value for the RANSAC algorithm to determine inliers and outliers. Defaults to 0.1.\n", + " - verbose (bool, optional): If True, prints out the plane parameters. Defaults to False.\n", + "\n", + " Returns:\n", + " - mesh (o3d.geometry.TriangleMesh): A mesh representation of the fitted plane.\n", + " \"\"\" \n", + " \n", + " # Fit a plane using RANSAC\n", + " plane = pyrsc.Plane()\n", + " best_eq, inliers = plane.fit(points, thresh)\n", + " \n", + " # If verbose is True, print the plane parameters\n", + " if verbose:\n", + " a, b, c, d = best_eq\n", + " print(f\"Detected plane: {a:.2f}x + {b:.2f}y + {c:.2f}z + {d:.2f} = 0.\")\n", + " \n", + " # Project the inlier points onto the plane\n", + " projected_points = _project_points_onto_plane(points[inliers], best_eq)\n", + " \n", + " # Define a 2D coordinate system on the plane\n", + " normal = np.array(best_eq[:3])\n", + " basis1 = np.cross(normal, normal + np.array([1, 0, 0]))\n", + " basis1 /= np.linalg.norm(basis1)\n", + " basis2 = np.cross(normal, basis1)\n", + " basis2 /= np.linalg.norm(basis2)\n", + " \n", + " # Map the 3D points to the 2D coordinate system\n", + " points_2d = _map_3d_to_2d(projected_points, basis1, basis2)\n", + " \n", + " # Compute the 2D convex hull\n", + " hull_2d = ConvexHull(points_2d)\n", + "\n", + " # Map the 2D convex hull back to 3D space\n", + " hull_3d_points = []\n", + " for vertex in hull_2d.vertices:\n", + " point_2d = points_2d[vertex]\n", + " point_3d = point_2d[0] * basis1 + point_2d[1] * basis2\n", + " hull_3d_points.append(point_3d)\n", + " hull_3d_points = np.array(hull_3d_points)\n", + "\n", + " # Create a mesh from the 3D convex hull points\n", + " mesh = o3d.geometry.TriangleMesh()\n", + " mesh.vertices = o3d.utility.Vector3dVector(hull_3d_points)\n", + "\n", + " # Create triangles by connecting adjacent vertices of the convex hull\n", + " triangles = _create_triangles_from_hull(len(hull_3d_points))\n", + " mesh.triangles = o3d.utility.Vector3iVector(triangles)\n", + "\n", + " # Compute normals for shading\n", + " mesh.compute_vertex_normals()\n", + " \n", + " return mesh\n", + "\n", + "\n", + "def fit_and_create_cuboid_mesh(points: np.ndarray, thresh: float = 0.01, max_iterations: int = 1000, verbose: bool = False) -> o3d.geometry.TriangleMesh:\n", + " \"\"\"\n", + " Fit a cuboid to a set of 3D points using the RANSAC algorithm and then creates a mesh representation of the fitted cuboid.\n", + "\n", + " Parameters:\n", + " - points (numpy.ndarray): A set of 3D coordinates, where each point is an array with [x, y, z] values.\n", + " - thresh (float, optional): A threshold value for the RANSAC algorithm to determine inliers and outliers. Defaults to 0.01.\n", + " - max_iterations (int, optional): Maximum number of iterations for the RANSAC algorithm. Defaults to 1000.\n", + " - verbose (bool, optional): If True, prints out the cuboid parameters. Defaults to False.\n", + "\n", + " Returns:\n", + " - cuboid_mesh (o3d.geometry.TriangleMesh): A mesh representation of the fitted cuboid.\n", + " \"\"\"\n", + " \n", + " # Fit a cuboid using RANSAC\n", + " cuboid = pyrsc.Cuboid()\n", + " cuboid_params = cuboid.fit(points, thresh, max_iterations)\n", + " \n", + " # Extract the detected planes and inliers\n", + " planes = cuboid_params[0]\n", + " inlier_indices = cuboid_params[1]\n", + " \n", + " # Extract the actual inlier points from the original point cloud\n", + " inlier_points = points[inlier_indices]\n", + " \n", + " # Get parallel planes for each detected plane\n", + " all_planes = []\n", + " for plane in planes:\n", + " parallel_planes = get_parallel_planes(plane, inlier_points)\n", + " all_planes.extend(parallel_planes)\n", + " \n", + " # Generate the cuboid mesh from the planes\n", + " cuboid_mesh = cuboid_from_planes(all_planes)\n", + " \n", + " # Compute the cuboid parameters\n", + " min_point = np.min(inlier_points, axis=0)\n", + " max_point = np.max(inlier_points, axis=0)\n", + " midpoint = (min_point + max_point) / 2\n", + " width = max_point[0] - min_point[0]\n", + " height = max_point[1] - min_point[1]\n", + " depth = max_point[2] - min_point[2]\n", + " \n", + " # If verbose is True, print the cuboid parameters in a single line\n", + " if verbose:\n", + " print(f\"Detected cuboid with Midpoint: [{midpoint[0]:.2f}, {midpoint[1]:.2f}, {midpoint[2]:.2f}], Width: {width:.2f}, Height: {height:.2f}, Depth: {depth:.2f}\")\n", + " \n", + " return cuboid_mesh\n", + "\n", + "\n", + "@jit(nopython=True)\n", + "def _project_onto_normal(inliers: np.ndarray, normal: np.ndarray) -> np.ndarray:\n", + " \"\"\"\n", + " Project the inliers onto the plane's normal.\n", + "\n", + " Parameters:\n", + " - inliers (numpy.ndarray): The inlier points, where each point is an array with [x, y, z] values.\n", + " - normal (numpy.ndarray): The normal of the plane.\n", + "\n", + " Returns:\n", + " - projections (numpy.ndarray): The projections of the inliers onto the plane's normal.\n", + " \"\"\"\n", + " return np.dot(inliers, normal)\n", + "\n", + "\n", + "def get_parallel_planes(plane: List[float], inliers: np.ndarray) -> Tuple[List[float], List[float]]:\n", + " \"\"\"\n", + " Given a plane and inliers, find the two parallel planes that bound the inliers.\n", + "\n", + " Parameters:\n", + " - plane (List[float]): The equation of the plane [a, b, c, d] such that ax + by + cz + d = 0.\n", + " - inliers (numpy.ndarray): The inlier points, where each point is an array with [x, y, z] values.\n", + "\n", + " Returns:\n", + " - parallel_planes (Tuple[List[float], List[float]]): The equations of the two parallel planes.\n", + " \"\"\"\n", + " normal = np.array(plane[:3])\n", + " \n", + " # Project the inliers onto the plane's normal\n", + " projections = _project_onto_normal(inliers, normal)\n", + " \n", + " # Adjust the d value based on the min and max projections\n", + " d_min = -np.min(projections)\n", + " d_max = -np.max(projections)\n", + " \n", + " parallel_plane_1 = plane.copy()\n", + " parallel_plane_1[3] = d_min\n", + "\n", + " parallel_plane_2 = plane.copy()\n", + " parallel_plane_2[3] = d_max\n", + "\n", + " return parallel_plane_1, parallel_plane_2\n", + "\n", + "\n", + "@jit(nopython=True)\n", + "def _solve_system_of_equations(normals: np.ndarray, constants: np.ndarray) -> np.ndarray:\n", + " \"\"\"\n", + " Solve a system of linear equations to find the intersection point of three planes.\n", + "\n", + " Parameters:\n", + " - normals (numpy.ndarray): The normals of the three planes.\n", + " - constants (numpy.ndarray): The constants from the plane equations.\n", + "\n", + " Returns:\n", + " - point (numpy.ndarray): The intersection point of the three planes, represented as [x, y, z].\n", + " \"\"\"\n", + " return np.linalg.solve(normals, constants)\n", + "\n", + "\n", + "def intersect_planes(plane1: List[float], plane2: List[float], plane3: List[float]) -> np.ndarray:\n", + " \"\"\"\n", + " Find the intersection point of three planes.\n", + "\n", + " Parameters:\n", + " - plane1, plane2, plane3 (List[float]): The equations of the planes [a, b, c, d] such that ax + by + cz + d = 0.\n", + "\n", + " Returns:\n", + " - point (numpy.ndarray): The intersection point of the three planes, represented as [x, y, z].\n", + " \"\"\"\n", + " normals = np.array([plane1[:3], plane2[:3], plane3[:3]])\n", + " constants = -np.array([plane1[3], plane2[3], plane3[3]])\n", + " point = _solve_system_of_equations(normals, constants)\n", + " return point\n", + "\n", + "\n", + "\n", + "def create_quad_face(p1: np.ndarray, p2: np.ndarray, p3: np.ndarray, p4: np.ndarray) -> o3d.geometry.TriangleMesh:\n", + " \"\"\"\n", + " Create a mesh from four vertices.\n", + "\n", + " Parameters:\n", + " - p1, p2, p3, p4 (numpy.ndarray): Vertices of the quad face [x, y, z]\n", + "\n", + " Returns:\n", + " - mesh (o3d.geometry.TriangleMesh): The quad mesh created from the four vertices.\n", + " \"\"\"\n", + " \n", + " # Combine the four vertices into a list\n", + " vertices = [p1, p2, p3, p4]\n", + " \n", + " # Define the triangles that make up the quad face. \n", + " # The quad is split into two triangles for rendering.\n", + " triangles = [[0, 1, 2], [1, 2, 3]]\n", + "\n", + " # Create a new TriangleMesh object\n", + " mesh = o3d.geometry.TriangleMesh()\n", + " \n", + " # Set the vertices of the mesh\n", + " mesh.vertices = o3d.utility.Vector3dVector(vertices)\n", + " \n", + " # Set the triangles of the mesh\n", + " mesh.triangles = o3d.utility.Vector3iVector(triangles)\n", + " \n", + " return mesh\n", + "\n", + "\n", + "def cuboid_from_planes(planes: List[List[float]], color: List[float] = [0.5, 0.5, 0.5]) -> o3d.geometry.TriangleMesh:\n", + " \"\"\"\n", + " Generate a cuboid mesh from 6 plane equations.\n", + "\n", + " Parameters:\n", + " planes (List[List[float]]): A list of 6 plane equations [a, b, c, d] such that ax + by + cz + d = 0.\n", + "\n", + " Returns:\n", + " combined_mesh (o3d.geometry.TriangleMesh): An open3d.geometry.TriangleMesh object representing the cuboid.\n", + " \"\"\"\n", + " \n", + " # Generate all combinations of 5 planes\n", + " combinations_of_five = list(combinations(planes, 5))\n", + "\n", + " quads = []\n", + " for combination_of_five in combinations_of_five:\n", + " combinations_of_three = list(combinations(combination_of_five, 3))\n", + " vertices = []\n", + " for combination in combinations_of_three:\n", + " # Check if any two planes share a normal\n", + " normals = [plane[:3] for plane in combination]\n", + " if len(set(tuple(normal) for normal in normals)) < 3:\n", + " continue # Skip this combination if any two normals are the same\n", + " vertices.append(intersect_planes(*combination))\n", + " quads.append(vertices)\n", + "\n", + " meshes = []\n", + " # Create a mesh for each quadrilateral and add it to the list\n", + " for quad in quads:\n", + " p1, p2, p3, p4 = quad\n", + " quad_face = create_quad_face(p1, p2, p3, p4)\n", + " meshes.append(quad_face)\n", + " \n", + " # Combine into single triangle mesh\n", + " combined_mesh = meshes[0]\n", + " for mesh in meshes[1:]:\n", + " combined_mesh += mesh\n", + " \n", + " # Color the mesh\n", + " combined_mesh.paint_uniform_color(color)\n", + "\n", + " return combined_mesh\n", + "\n", + "\n", + "def fit_and_create_cylinder_mesh(points: np.ndarray, thresh: float = 0.2, verbose: bool = False) -> o3d.geometry.TriangleMesh:\n", + " \"\"\"\n", + " Fit a cylinder to a set of 3D points using the RANSAC algorithm and then creates a mesh representation of the fitted cylinder.\n", + "\n", + " Parameters:\n", + " - points (numpy.ndarray): A set of 3D coordinates, where each point is an array with [x, y, z] values.\n", + " - thresh (float, optional): A threshold value for the RANSAC algorithm to determine inliers and outliers. Defaults to 0.2.\n", + " - verbose (bool, optional): If True, prints out the cylinder parameters. Defaults to False.\n", + "\n", + " Returns:\n", + " - mesh_cylinder (o3d.geometry.TriangleMesh): A mesh representation of the fitted cylinder.\n", + " \"\"\"\n", + " \n", + " # Fit a cylinder using RANSAC\n", + " cylinder = pyrsc.Cylinder()\n", + " center, axis, radius, inliers = cylinder.fit(points, thresh)\n", + " \n", + " # Calculate the top and bottom endpoints of the cylinder\n", + " top_point, bottom_point = cylinder_endpoints(center, axis, points[inliers])\n", + " \n", + " # Create a cylinder mesh\n", + " height = np.linalg.norm(top_point - bottom_point)\n", + " mesh_cylinder = o3d.geometry.TriangleMesh.create_cylinder(radius=radius, height=height)\n", + " \n", + " # Translate the cylinder to the midpoint of top and bottom points\n", + " midpoint = (top_point + bottom_point) / 2\n", + " mesh_cylinder.translate(midpoint)\n", + " \n", + " # Rotate the cylinder to align with the given axis\n", + " z_axis = np.array([0, 0, 1])\n", + " rotation_matrix = get_rotation_matrix_from_vectors(z_axis, axis)\n", + " mesh_cylinder.rotate(rotation_matrix, center=midpoint)\n", + " \n", + " # If verbose is True, print the cylinder parameters in a single line\n", + " if verbose:\n", + " print(f\"Detected cylinder with Radius: {radius:.2f}, Height: {height:.2f}, Midpoint: [{midpoint[0]:.2f}, {midpoint[1]:.2f}, {midpoint[2]:.2f}], Axis: [{axis[0]:.2f}, {axis[1]:.2f}, {axis[2]:.2f}]\")\n", + " \n", + " return mesh_cylinder\n", + "\n", + "\n", + "@jit(nopython=True)\n", + "def _compute_rotation_matrix(v: np.ndarray, s: float, c: float) -> np.ndarray:\n", + " \"\"\"\n", + " Compute the rotation matrix using the Rodriguez rotation formula.\n", + "\n", + " Parameters:\n", + " - v: Cross product of vectors 'a' and 'b'.\n", + " - s: Sine of the angle between vectors 'a' and 'b'.\n", + " - c: Cosine of the angle between vectors 'a' and 'b'.\n", + "\n", + " Returns:\n", + " - rotation_matrix: The 3x3 rotation matrix.\n", + " \"\"\"\n", + " v_skew = np.array([\n", + " [0, -v[2], v[1]],\n", + " [v[2], 0, -v[0]],\n", + " [-v[1], v[0], 0]\n", + " ])\n", + " rotation_matrix = np.eye(3) + v_skew + (v_skew @ v_skew) * ((1 - c) / s**2)\n", + " return rotation_matrix\n", + "\n", + "def get_rotation_matrix_from_vectors(a: np.ndarray, b: np.ndarray) -> np.ndarray:\n", + " \"\"\"\n", + " Compute the rotation matrix that aligns vector 'a' to vector 'b'.\n", + "\n", + " Parameters:\n", + " - a: The source vector that needs to be rotated.\n", + " - b: The target vector to which 'a' should be aligned.\n", + "\n", + " Returns:\n", + " - rotation_matrix: The 3x3 rotation matrix that rotates 'a' to align with 'b'.\n", + " \"\"\"\n", + " v = np.cross(a, b)\n", + " s = np.linalg.norm(v)\n", + " c = np.dot(a, b)\n", + " return _compute_rotation_matrix(v, s, c)\n", + "\n", + "\n", + "@jit(nopython=True)\n", + "def _compute_endpoints(center: np.ndarray, axis: np.ndarray, inlier_points: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:\n", + " \"\"\"\n", + " Compute the top and bottom endpoints of a cylinder.\n", + "\n", + " Parameters:\n", + " - center: The center point of the cylinder.\n", + " - axis: The axis of the cylinder.\n", + " - inlier_points: The inlier points that lie on the surface of the cylinder.\n", + "\n", + " Returns:\n", + " - top_point: The top endpoint of the cylinder.\n", + " - bottom_point: The bottom endpoint of the cylinder.\n", + " \"\"\"\n", + " t_values = np.array([(np.dot(point - center, axis)) for point in inlier_points])\n", + " top_t = np.max(t_values)\n", + " bottom_t = np.min(t_values)\n", + " top_point = center + top_t * axis\n", + " bottom_point = center + bottom_t * axis\n", + " return top_point, bottom_point\n", + "\n", + "\n", + "def cylinder_endpoints(center: np.ndarray, axis: np.ndarray, inlier_points: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:\n", + " \"\"\"\n", + " Compute the top and bottom endpoints of a cylinder given its center, axis, and inlier points.\n", + "\n", + " Parameters:\n", + " - center: The center point of the cylinder.\n", + " - axis: The axis of the cylinder.\n", + " - inlier_points: The inlier points that lie on the surface of the cylinder.\n", + "\n", + " Returns:\n", + " - top_point: The top endpoint of the cylinder.\n", + " - bottom_point: The bottom endpoint of the cylinder.\n", + " \"\"\"\n", + " axis = axis / np.linalg.norm(axis)\n", + " return _compute_endpoints(center, axis, inlier_points)\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "def fit_shape_to_detection(detection: str, point_cloud: o3d.geometry.PointCloud, shape_dict: dict, verbose: bool = False) -> Union[o3d.geometry.TriangleMesh, None]:\n", + " \"\"\"\n", + " Fit a geometric shape to a given detection based on a predefined dictionary of shapes. \n", + " The function selects the appropriate shape fitting method based on the detected object class.\n", + "\n", + " Parameters:\n", + " - detection (str): The object class detected, e.g., \"car\", \"person\", etc.\n", + " - point_cloud (o3d.geometry.PointCloud): The 3D point cloud data associated with the detection.\n", + " - shape_dict (dict): A dictionary mapping object classes to their corresponding geometric shapes.\n", + " - verbose (bool, optional): If True, prints out the shape parameters. Defaults to False.\n", + "\n", + " Returns:\n", + " - o3d.geometry.TriangleMesh or None: The fitted mesh for the detection. If the shape type is not defined or there are insufficient points, it returns None.\n", + " \"\"\"\n", + " \n", + " # Convert the point cloud to a numpy array\n", + " points = np.asarray(point_cloud.points)\n", + " \n", + " # Check if there are enough points for shape fitting\n", + " if len(points) < 20:\n", + " print(\"Not enough points for shape fitting.\")\n", + " return None\n", + " \n", + " # If there are too many points, randomly sample a subset for efficiency\n", + " if len(points) > 300:\n", + " selected_indices = np.random.choice(len(points), 300, replace=False)\n", + " points = points[selected_indices]\n", + "\n", + " # Get the shape type for the detected object class from the dictionary\n", + " shape_type = shape_dict.get(detection, None)\n", + " \n", + " # If the shape type is not defined, return None\n", + " if shape_type is None:\n", + " return None\n", + " \n", + " # Fit the appropriate shape based on the shape type\n", + " if shape_type == \"sphere\":\n", + " mesh = fit_and_create_sphere_mesh(points, verbose=verbose)\n", + " elif shape_type == \"plane\":\n", + " mesh = fit_and_create_plane_mesh(points, verbose=verbose)\n", + " elif shape_type == \"cylinder\":\n", + " mesh = fit_and_create_cylinder_mesh(points, verbose=verbose)\n", + " elif shape_type == \"cuboid\":\n", + " mesh = fit_and_create_cuboid_mesh(points, verbose=verbose)\n", + " else:\n", + " # Raise an error if the shape type is unknown\n", + " raise ValueError(f\"Unknown shape type: {shape_type}\")\n", + " \n", + " return mesh" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "f9084ee7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'classes': 80, 'coordinates': 4, 'anchors': [10, 13, 16, 30, 33, 23, 30, 61, 62, 45, 59, 119, 116, 90, 156, 198, 373, 326], 'anchor_masks': {'side52': [0, 1, 2], 'side26': [3, 4, 5], 'side13': [6, 7, 8]}, 'iou_threshold': 0.5, 'confidence_threshold': 0.5}\n", + "No blob found at yolov5n_coco_416x416. Looking into DepthAI model zoo.\n" + ] + } + ], + "source": [ + "# selected model and its config (YOLO v5)\n", + "args = {\n", + " 'model': 'yolov5n_coco_416x416',\n", + " 'config': 'json/yolov5.json'\n", + "}\n", + "\n", + "# parse config\n", + "configPath = Path(args[\"config\"])\n", + "if not configPath.exists():\n", + " raise ValueError(\"Path {} does not exist!\".format(configPath))\n", + "\n", + "with configPath.open() as f:\n", + " config = json.load(f)\n", + "nnConfig = config.get(\"nn_config\", {})\n", + "\n", + "# parse input shape\n", + "if \"input_size\" in nnConfig:\n", + " W, H = tuple(map(int, nnConfig.get(\"input_size\").split('x')))\n", + "\n", + "# extract metadata\n", + "metadata = nnConfig.get(\"NN_specific_metadata\", {})\n", + "classes = metadata.get(\"classes\", {})\n", + "coordinates = metadata.get(\"coordinates\", {})\n", + "anchors = metadata.get(\"anchors\", {})\n", + "anchorMasks = metadata.get(\"anchor_masks\", {})\n", + "iouThreshold = metadata.get(\"iou_threshold\", {})\n", + "confidenceThreshold = metadata.get(\"confidence_threshold\", {})\n", + "\n", + "print(metadata)\n", + "\n", + "# parse labels\n", + "nnMappings = config.get(\"mappings\", {})\n", + "labels = nnMappings.get(\"labels\", {})\n", + "\n", + "# get model path\n", + "nnPath = args[\"model\"]\n", + "if not Path(nnPath).exists():\n", + " print(\"No blob found at {}. Looking into DepthAI model zoo.\".format(nnPath))\n", + " nnPath = str(blobconverter.from_zoo(args[\"model\"], shaves = 6, zoo_type = \"depthai\", use_cache=True))\n", + "# sync outputs\n", + "syncNN = True\n", + "\n", + "# Initialize the pipeline\n", + "pipeline = dai.Pipeline()\n", + "\n", + "# Create nodes for the left and right cameras\n", + "left = pipeline.create(dai.node.MonoCamera)\n", + "right = pipeline.create(dai.node.MonoCamera)\n", + "\n", + "# Create a node for the stereo depth calculation\n", + "stereo = pipeline.create(dai.node.StereoDepth)\n", + "\n", + "# Create a node for the central RGB camera\n", + "camRgb = pipeline.create(dai.node.ColorCamera)\n", + "\n", + "# Create NeuralNetwork node and set the YOLOv4 model\n", + "yolo_nn = pipeline.create(dai.node.YoloDetectionNetwork)\n", + "yolo_nn.setConfidenceThreshold(confidenceThreshold)\n", + "yolo_nn.setNumClasses(classes)\n", + "yolo_nn.setCoordinateSize(coordinates)\n", + "yolo_nn.setAnchors(anchors)\n", + "yolo_nn.setAnchorMasks(anchorMasks)\n", + "yolo_nn.setIouThreshold(iouThreshold)\n", + "yolo_nn.setBlobPath(nnPath)\n", + "yolo_nn.setNumInferenceThreads(2)\n", + "yolo_nn.input.setBlocking(False)\n", + "\n", + "# Configure the MonoCamera nodes\n", + "left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)\n", + "left.setBoardSocket(dai.CameraBoardSocket.CAM_B)\n", + "right.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)\n", + "right.setBoardSocket(dai.CameraBoardSocket.CAM_C)\n", + "\n", + "# Link the left and right cameras to the stereo depth node\n", + "left.out.link(stereo.left)\n", + "right.out.link(stereo.right)\n", + "\n", + "# Create an output node for the depth stream\n", + "depth_out = pipeline.create(dai.node.XLinkOut)\n", + "depth_out.setStreamName(\"depth\")\n", + "stereo.depth.link(depth_out.input)\n", + "\n", + "# Configure the camRgb node\n", + "camRgb_out = pipeline.create(dai.node.XLinkOut)\n", + "camRgb_out.setStreamName(\"color\")\n", + "camRgb.video.link(camRgb_out.input)\n", + "camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)\n", + "camRgb.setPreviewSize(W, H)\n", + "camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_720_P)\n", + "camRgb.setInterleaved(False)\n", + "camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)\n", + "camRgb.setFps(30)\n", + "\n", + "# Link the central RGB camera output to the YoloDetectionNetwork node\n", + "camRgb.preview.link(yolo_nn.input)\n", + "\n", + "# Create an output node for the YOLO detections\n", + "xout_nn = pipeline.create(dai.node.XLinkOut)\n", + "xout_nn.setStreamName(\"nn\")\n", + "yolo_nn.out.link(xout_nn.input)\n", + "\n", + "\n", + "\n", + "\n", + "import time\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8ad1a77f", + "metadata": {}, + "outputs": [], + "source": [ + "class FrequencyMeter:\n", + " \"\"\"\n", + " A utility class to measure the frequency of events over a specified duration.\n", + "\n", + " Attributes:\n", + " - timestamps (List[float]): A list of timestamps when the events occurred.\n", + " - avg_over (int): The number of events over which the frequency is averaged.\n", + "\n", + " Methods:\n", + " - tick(): Records the current timestamp and calculates the average frequency over the last 'avg_over' events.\n", + " \"\"\"\n", + "\n", + " def __init__(self, avg_over=10):\n", + " \"\"\"\n", + " Initializes the FrequencyMeter with a specified number of events to average over.\n", + "\n", + " Parameters:\n", + " - avg_over (int, optional): The number of events over which the frequency is averaged. Defaults to 10.\n", + " \"\"\"\n", + " self.timestamps = []\n", + " self.avg_over = avg_over\n", + "\n", + " def tick(self):\n", + " \"\"\"\n", + " Records the current timestamp and calculates the average frequency over the last 'avg_over' events.\n", + " If the number of recorded events exceeds 'avg_over', the oldest timestamp is removed.\n", + " The calculated frequency is printed to the console.\n", + " \"\"\"\n", + " now = time.time()\n", + " self.timestamps.append(now)\n", + " if len(self.timestamps) > self.avg_over:\n", + " self.timestamps.pop(0)\n", + " freq = self.avg_over / (self.timestamps[-1] - self.timestamps[0])\n", + " print(f\"Frequency: {freq:.2f} Hz\")\n", + "\n", + "freq_meter = FrequencyMeter()\n", + "\n", + "\n", + "\n", + "# Set this to True if you want to print detections\n", + "verbose=False\n", + "\n", + "\n", + "with dai.Device(pipeline) as device:\n", + " \n", + " # Normalize bounding box coordinates to match frame dimensions\n", + " def frameNorm(frame: np.ndarray, bbox: List[float]) -> List[int]:\n", + " \"\"\"\n", + " Normalize bounding box coordinates based on frame dimensions.\n", + "\n", + " Parameters:\n", + " - frame (np.ndarray): The frame to which the bounding box corresponds.\n", + " - bbox (List[float]): Bounding box coordinates in the range <0..1>.\n", + "\n", + " Returns:\n", + " - List[int]: Normalized bounding box coordinates in pixels.\n", + " \"\"\"\n", + " normVals = np.full(len(bbox), frame.shape[0])\n", + " normVals[::2] = frame.shape[1]\n", + " return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)\n", + "\n", + " def displayFrame(name: str, frame: np.ndarray, detections: List[dai.ImgDetection]):\n", + " \"\"\"\n", + " Display the frame with bounding boxes and labels.\n", + "\n", + " Parameters:\n", + " - name (str): Window name for displaying the frame.\n", + " - frame (np.ndarray): The frame to display.\n", + " - detections (List[dai.ImgDetection]): List of detections to be drawn on the frame.\n", + " \"\"\"\n", + "\n", + "\n", + " color = (255, 0, 0) # Bounding box color\n", + " # Add bounding boxes and text onto the rgb frame according to detections\n", + " for detection in detections:\n", + " bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))\n", + " cv2.putText(frame, labels[detection.label], (bbox[0] + 10, bbox[1] + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)\n", + " cv2.putText(frame, f\"{int(detection.confidence * 100)}%\", (bbox[0] + 10, bbox[1] + 40), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)\n", + " cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)\n", + " # Display the frame\n", + " cv2.imshow(name, frame)\n", + "\n", + " # Set the brightness of the IR laser dot projector\n", + " device.setIrLaserDotProjectorBrightness(1200)\n", + " \n", + " # Create output queues for depth, color, and neural network outputs\n", + " qs = []\n", + " qs.append(device.getOutputQueue(\"depth\", maxSize=1, blocking=False))\n", + " qs.append(device.getOutputQueue(\"color\", maxSize=1, blocking=False))\n", + " qs.append(device.getOutputQueue(\"nn\", maxSize=1, blocking=False))\n", + "\n", + " # Retrieve camera calibration data\n", + " calibData = device.readCalibration()\n", + " w, h = camRgb.getIspSize()\n", + " intrinsics = calibData.getCameraIntrinsics(dai.CameraBoardSocket.CAM_A, dai.Size2f(w, h))\n", + " \n", + " # Initialize the PointCloudVisualizer with the camera intrinsics\n", + " pcl_converter = PointCloudVisualizer(intrinsics, w, h)\n", + "\n", + " # Initialize the HostSync class for synchronizing messages\n", + " sync = HostSync()\n", + "\n", + " while True:\n", + " # Iterate over each queue (depth, color, nn)\n", + " for q in qs:\n", + " # Try to get a new message from the queue\n", + " new_msg = q.tryGet()\n", + " if new_msg is not None:\n", + " # Add the new message to the synchronized message list\n", + " msgs = sync.add_msg(q.getName(), new_msg)\n", + " if msgs:\n", + " # Extract depth frame, color frame, and detections from the synchronized messages\n", + " depth = msgs[\"depth\"].getFrame()\n", + " color = msgs[\"color\"].getCvFrame()\n", + " detections = msgs[\"nn\"].detections\n", + "\n", + " # Convert the color frame from BGR to RGB and resize it to match the depth frame dimensions\n", + " rgb = cv2.cvtColor(color, cv2.COLOR_BGR2RGB)\n", + " rgb_resized = cv2.resize(rgb, (depth.shape[1], depth.shape[0]))\n", + "\n", + " # Initialize an empty list to store all meshes\n", + " all_meshes = []\n", + "\n", + " # Clear any existing meshes in the point cloud converter\n", + " pcl_converter.clear_meshes()\n", + "\n", + " # Process each detection\n", + " for detection in detections:\n", + " # Extract and normalize the bounding box coordinates\n", + " xmin, ymin, xmax, ymax = [\n", + " int(detection.xmin * w),\n", + " int(detection.ymin * h),\n", + " int(detection.xmax * w),\n", + " int(detection.ymax * h),\n", + " ]\n", + "\n", + " # Ensure bounding box coordinates are valid\n", + " xmin, xmax = sorted([xmin, xmax])\n", + " ymin, ymax = sorted([ymin, ymax])\n", + " xmin = max(0, min(xmin, rgb_resized.shape[1] - 1))\n", + " ymin = max(0, min(ymin, rgb_resized.shape[0] - 1))\n", + " xmax = min(rgb_resized.shape[1] - 1, max(xmax, 0))\n", + " ymax = min(rgb_resized.shape[0] - 1, max(ymax, 0))\n", + "\n", + " # Skip invalid bounding boxes\n", + " if xmin >= xmax or ymin >= ymax:\n", + " print(\"Invalid bounding box detected. Skipping this detection.\")\n", + " continue\n", + "\n", + " # Create a modified depth map with values outside the bounding box set to zero\n", + " modified_depth = np.copy(depth)\n", + " modified_depth[0:ymin, :] = 0\n", + " modified_depth[ymax:, :] = 0\n", + " modified_depth[:, 0:xmin] = 0\n", + " modified_depth[:, xmax:] = 0\n", + "\n", + " # Convert the modified depth and RGB data to a point cloud\n", + " object_pcl = pcl_converter.rgbd_to_projection(modified_depth, rgb_resized)\n", + "\n", + " # Segment the largest cluster from the point cloud\n", + " object_pcl = segment_largest_cluster(object_pcl)\n", + "\n", + " # Fit a shape to the detection and obtain the corresponding mesh\n", + " mesh = fit_shape_to_detection(labels[detection.label], object_pcl, shape_dict, verbose=verbose)\n", + "\n", + " # If a valid mesh is obtained, add it to the point cloud converter\n", + " if mesh is not None:\n", + " pcl_converter.add_mesh(mesh)\n", + "\n", + " # Convert the entire depth and RGB data to a point cloud\n", + " pcd = pcl_converter.rgbd_to_projection(depth, rgb_resized)\n", + "\n", + " # Display the color frame with bounding boxes and labels\n", + " displayFrame(\"rgb\", color, detections)\n", + "\n", + " # Visualize the point cloud\n", + " pcl_converter.visualize_pcd()\n", + " \n", + " # Tick the framerate meter\n", + " freq_meter.tick()\n", + "\n", + " # Check for user input to exit the loop\n", + " key = cv2.waitKey(1)\n", + " if key == ord(\"q\"):\n", + " pcl_converter.close_window()\n", + " cv2.destroyAllWindows()\n", + " break" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/gen2-geometry-fitting/geometry_fitting.py b/gen2-geometry-fitting/geometry_fitting.py new file mode 100644 index 000000000..c87460453 --- /dev/null +++ b/gen2-geometry-fitting/geometry_fitting.py @@ -0,0 +1,1205 @@ +import os +import time +import json +from sys import maxsize +from itertools import combinations +from typing import * + +from numba import jit +import numpy as np +import open3d as o3d +import cv2 +import depthai as dai +from pathlib import Path +from scipy.spatial import ConvexHull +from sklearn.cluster import DBSCAN +import pyransac3d as pyrsc +import blobconverter + + +class PointCloudVisualizer(): + """ + PointCloudVisualizer is a utility class designed to facilitate the visualization of point clouds using Open3D. + It provides functionalities to convert RGBD images to point clouds, visualize them in a window, and manipulate + the visualized point clouds by adding meshes or clearing them. The class also handles camera intrinsics + and sets up the visualization environment with appropriate parameters for optimal viewing. + + Attributes: + - R_camera_to_world: Rotation matrix to transform camera coordinates to world coordinates. + - pcl: The main point cloud object that will be visualized. + - pinhole_camera_intrinsic: Camera intrinsic parameters for projecting depth to 3D points. + - vis: The Open3D visualizer object. + - meshes: A list to store additional mesh geometries added to the visualizer. + """ + + + + def __init__(self, intrinsic_matrix: np.ndarray, width: int, height: int) -> None: + # Rotation matrix to transform from camera to world coordinates + self.R_camera_to_world = np.array([[1, 0, 0], [0, -1, 0], [0, 0, -1]]).astype(np.float64) + + # Create an empty point cloud object + self.pcl = o3d.geometry.PointCloud() + + # Define the camera intrinsic parameters using the provided matrix + self.pinhole_camera_intrinsic = o3d.camera.PinholeCameraIntrinsic(width, + height, + intrinsic_matrix[0][0], + intrinsic_matrix[1][1], + intrinsic_matrix[0][2], + intrinsic_matrix[1][2]) + + # Initialize the Open3D visualizer + self.vis = o3d.visualization.Visualizer() + self.vis.create_window(window_name="Point Cloud") + + # Add the point cloud to the visualizer + self.vis.add_geometry(self.pcl) + + # Add a coordinate frame origin to the visualizer for reference + origin = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.05, origin=[0, 0, 0]) + self.vis.add_geometry(origin) + + # Adjust the view control settings + view_control = self.vis.get_view_control() + view_control.set_constant_z_far(1000) + + # List to store all the meshes + self.meshes = [] + + # Adjust render options to show the back face of the mesh + render_option = self.vis.get_render_option() + render_option.mesh_show_back_face = True + + + + + def rgbd_to_projection(self, depth_map: np.ndarray, rgb: np.ndarray, downsample: bool = False, remove_noise: bool = False) -> o3d.geometry.PointCloud: + """ + Convert RGBD images to a point cloud projection. + + Parameters: + - depth_map: Depth image. + - rgb: RGB image. + - downsample (optional): Boolean flag to determine if the point cloud should be downsampled. + False is recommended as curved geometry tends to be detected poorly otherwise + - remove_noise (optional): Boolean flag to determine if noise should be removed from the point cloud. + False is recommended as this is very slow + + Returns: + - Point cloud generated from the RGBD image. + """ + + # Convert numpy RGB and depth images to Open3D Image format + rgb_o3d = o3d.geometry.Image(rgb) + depth_o3d = o3d.geometry.Image(depth_map) + + # Create an RGBD image from the RGB and depth images + rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth( + rgb_o3d, depth_o3d, + convert_rgb_to_intensity=(len(rgb.shape) != 3), # Convert RGB to intensity if it's not already in that format + depth_trunc=20000, # Truncate depth values beyond 20 meters + depth_scale=1000.0 # Scale factor for depth values + ) + + # Create a point cloud from the RGBD image using the camera intrinsics + pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd_image, self.pinhole_camera_intrinsic) + + # Downsample the point cloud if the flag is set + if downsample: + pcd = pcd.voxel_down_sample(voxel_size=0.01) + + # Remove noise from the point cloud if the flag is set + if remove_noise: + pcd = pcd.remove_statistical_outlier(30, 0.1)[0] + + # Update the point cloud object of the class with the new points and colors + self.pcl.points = pcd.points + self.pcl.colors = pcd.colors + + # Rotate the point cloud to align with the world coordinates + self.pcl.rotate(self.R_camera_to_world, center=np.array([0,0,0],dtype=np.float64)) + + return self.pcl + + + + def visualize_pcd(self) -> None: + """ + Visualize the point cloud in the visualizer window. + """ + + # Get the view control object from the visualizer + view_control = self.vis.get_view_control() + + # Convert the current view control parameters to pinhole camera parameters + pinhole_camera_parameters = view_control.convert_to_pinhole_camera_parameters() + + # Set the extrinsic matrix for the camera parameters. + # This matrix is hardcoded due to observed unexpected behavior with other values. + pinhole_camera_parameters.extrinsic = [[1.0, 0.0, 0.0, -0.141], + [0.0, -1.0, 0.0, 0.141], + [0.0, 0.0, -1.0, 0.52655451], + [0.0, 0.0, 0.0, 1.0]] + + # Apply the updated camera parameters to the visualizer's view control + view_control.convert_from_pinhole_camera_parameters(pinhole_camera_parameters) + + # Update the point cloud geometry in the visualizer + self.vis.update_geometry(self.pcl) + + # Poll for any events (like user input) + self.vis.poll_events() + + # Update the visualizer's renderer to reflect the changes + self.vis.update_renderer() + + + + def close_window(self) -> None: + """ + Close the visualizer window. + """ + self.vis.destroy_window() + + + + def add_mesh(self, mesh: o3d.geometry.TriangleMesh) -> None: + """ + Add a mesh to the visualizer and store it in the list of meshes. + + Parameters: + - mesh: The mesh (of type open3d.geometry.TriangleMesh or similar) to be added to the visualizer. + """ + self.vis.add_geometry(mesh) # Add the mesh to the visualizer + self.meshes.append(mesh) # Store the mesh in the list of meshes + + + + def clear_meshes(self) -> None: + """ + Remove all meshes from the visualizer and clear the list of meshes. + """ + for mesh in self.meshes: # Iterate over each mesh in the list + self.vis.remove_geometry(mesh) # Remove the mesh from the visualizer + self.meshes = [] + + + + +@jit(nopython=True) +def _get_largest_cluster_label(labels: np.ndarray) -> int: + """ + Get the label of the largest cluster from the given labels. + + Parameters: + - labels: The labels assigned to each point by a clustering algorithm. + + Returns: + - largest_cluster_label: The label of the largest cluster. + """ + unique_labels = np.unique(labels) + + # Manually compute counts for each unique label + counts = np.zeros(unique_labels.shape[0], dtype=np.int64) + for i, label in enumerate(unique_labels): + counts[i] = np.sum(labels == label) + + mask = unique_labels != -1 + unique_labels = unique_labels[mask] + counts = counts[mask] + + largest_cluster_label = unique_labels[np.argmax(counts)] + return largest_cluster_label + +def segment_largest_cluster(pcl: o3d.geometry.PointCloud, eps: float = 0.05, min_samples: int = 10, max_points: int = 2000) -> o3d.geometry.PointCloud: + """ + Segment the largest cluster from a point cloud using DBSCAN. + + Parameters: + - pcl: The input point cloud. + - eps: The maximum distance between two samples for one to be considered as in the neighborhood of the other. + - min_samples: The number of samples in a neighborhood for a point to be considered as a core point. + - max_points: Maximum number of points to consider. If the point cloud has more points, a random subset is taken. + + Returns: + - largest_cluster_pcl: Point cloud of the largest cluster. + """ + points = np.asarray(pcl.points) + + if max_points is not None and len(points) > max_points: + selected_indices = np.random.choice(len(points), max_points, replace=False) + points = points[selected_indices] + + clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points) + labels = clustering.labels_ + + largest_cluster_label = _get_largest_cluster_label(labels) + + largest_cluster_points = points[labels == largest_cluster_label] + + largest_cluster_pcl = o3d.geometry.PointCloud() + largest_cluster_pcl.points = o3d.utility.Vector3dVector(largest_cluster_points) + + return largest_cluster_pcl + + +class HostSync: + """ + HostSync is a utility class designed to synchronize multiple message streams based on their sequence numbers. + It ensures that for a given sequence number, messages from all streams are present, allowing for synchronized + processing of data from different sources. + """ + + def __init__(self) -> None: + """ + Initialize the HostSync object with an empty dictionary to store arrays of messages. + """ + self.arrays = {} + + def add_msg(self, name: str, msg: Any) -> Union[Dict[str, Any], bool]: + """ + Add a message to the corresponding array based on its name. + + Parameters: + - name: The name (or type) of the message. + - msg: The message object to be added. + + Returns: + - A dictionary of synced messages if there are enough synced messages, otherwise False. + """ + # Check if the message type exists in the dictionary, if not, create an empty list for it + if not name in self.arrays: + self.arrays[name] = [] + + # Append the message to the corresponding list along with its sequence number + self.arrays[name].append({"msg": msg, "seq": msg.getSequenceNum()}) + + synced = {} + # Check for messages with the same sequence number across all message types + for name, arr in self.arrays.items(): + for i, obj in enumerate(arr): + if msg.getSequenceNum() == obj["seq"]: + synced[name] = obj["msg"] + break + + # If there are 3 synced messages (in this use case color, depth, nn but you may add more by changing 3 to something else), + # remove all older messages and return the synced messages + if len(synced) == 3: + # Remove older messages with sequence numbers less than the current message's sequence number + for name, arr in self.arrays.items(): + for i, obj in enumerate(arr): + if obj["seq"] < msg.getSequenceNum(): + arr.remove(obj) + else: + break + return synced + return False + +""" +shape_dict provides a mapping between object names and their corresponding geometric shapes. +The shapes are approximated using basic geometric primitives. +For objects that do not have a clear or simple geometric representation, the value is set to None. + +Key: Object name (e.g., "person", "car", "apple"), must be a YOLO label +Value: Geometric shape representing the object "cylinder", "cuboid", "sphere", "plane" or None if no simple shape can represent the object. +""" +shape_dict = { + "person": None, + "bicycle": None, + "car": "cuboid", + "motorbike": None, + "aeroplane": None, + "bus": "cuboid", + "train": "cuboid", + "truck": "cuboid", + "boat": "cuboid", + "traffic light": None, + "fire hydrant": None, + "stop sign": "plane", + "parking meter": None, + "bench": "cuboid", + "bird": None, + "cat": None, + "dog": None, + "horse": None, + "sheep": None, + "cow": None, + "elephant": None, + "bear": None, + "zebra": None, + "giraffe": None, + "backpack": "cuboid", + "umbrella": None, + "handbag": "cuboid", + "tie": None, + "suitcase": "cuboid", + "frisbee": None, + "skis": "plane", + "snowboard": "plane", + "sports ball": "sphere", + "kite": None, + "baseball bat": None, + "baseball glove": None, + "skateboard": "plane", + "surfboard": "plane", + "tennis racket": "plane", + "bottle": None, + "wine glass": None, + "cup": None, + "fork": None, + "knife": None, + "spoon": None, + "bowl": None, + "banana": None, + "apple": "sphere", + "sandwich": None, + "orange": "sphere", + "broccoli": None, + "carrot": None, + "hot dog": None, + "pizza": "cuboid", + "donut": None, + "cake": "cuboid", + "chair": "cuboid", + "sofa": "cuboid", + "pottedplant": "cuboid", + "bed": "cuboid", + "diningtable": "cuboid", + "toilet": None, + "tvmonitor": "cuboid", + "laptop": "cuboid", + "mouse": None, + "remote": "cuboid", + "keyboard": "cuboid", + "cell phone": "cuboid", + "microwave": "cuboid", + "oven": "cuboid", + "toaster": "cuboid", + "sink": None, + "refrigerator": "cuboid", + "book": "cuboid", + "clock": None, + "vase": None, + "scissors": None, + "teddy bear": None, + "hair drier": None, + "toothbrush": None +} + + + +def fit_and_create_sphere_mesh(points: np.ndarray, thresh: float = 0.2, verbose: bool = False) -> o3d.geometry.TriangleMesh: + """ + fit_and_create_sphere_mesh fits a sphere to a set of 3D points using the RANSAC algorithm and then creates a mesh representation of the fitted sphere. + + Parameters: + - points (numpy.ndarray): A set of 3D coordinates, where each point is represented as an array of [x, y, z] values. + - thresh (float, optional): A threshold value for the RANSAC algorithm to determine inliers and outliers. Defaults to 0.2 + - verbose (bool, optional): If set to True, prints out the sphere parameters. Defaults to False. + + Returns: + - sphere_mesh (o3d.geometry.TriangleMesh): A mesh representation of the fitted sphere. + """ + + # Fit a sphere using RANSAC + sph = pyrsc.Sphere() + center, radius, _ = sph.fit(points, thresh) + + # Print sphere parameters if verbose is True + if verbose: + print(f"Detected sphere with radius {radius:.2f} at location ({center[0]:.2f}, {center[1]:.2f}, {center[2]:.2f})") + + # Create a sphere mesh using Open3D + sphere_mesh = o3d.geometry.TriangleMesh.create_sphere(radius=radius) + sphere_mesh.translate(center) + + return sphere_mesh + +@jit(nopython=True) +def _project_points_onto_plane(points, best_eq): + """ + Project a set of 3D points onto a plane. + + Parameters: + - points: np.ndarray + The 3D points to be projected. + - best_eq: List[float] + The equation of the plane [a, b, c, d] such that ax + by + cz + d = 0. + + Returns: + - projected_points: np.ndarray + The projected points onto the plane. + """ + projected_points = np.empty(points.shape) + for i, point in enumerate(points): + t = -(best_eq[0] * point[0] + best_eq[1] * point[1] + best_eq[2] * point[2] + best_eq[3]) / (best_eq[0]**2 + best_eq[1]**2 + best_eq[2]**2) + projected_point = point + t * np.array(best_eq[:3]) + projected_points[i] = projected_point + return projected_points + +@jit(nopython=True) +def _map_3d_to_2d(points, basis1, basis2): + """ + Map 3D points to a 2D coordinate system defined by two basis vectors. + + Parameters: + - points: np.ndarray + The 3D points to be mapped. + - basis1: np.ndarray + The first basis vector of the 2D coordinate system. + - basis2: np.ndarray + The second basis vector of the 2D coordinate system. + + Returns: + - points_2d: np.ndarray + The mapped 2D points. + """ + points_2d = np.empty((points.shape[0], 2)) + for i, point in enumerate(points): + coord1 = np.dot(point, basis1) + coord2 = np.dot(point, basis2) + points_2d[i] = [coord1, coord2] + return points_2d + +@jit(nopython=True) +def _create_triangles_from_hull(hull_vertices_count): + """ + Create triangles by connecting adjacent vertices of a convex hull. + + Parameters: + - hull_vertices_count: int + The number of vertices in the convex hull. + + Returns: + - triangles: np.ndarray + The triangles formed by connecting adjacent vertices of the convex hull. + """ + triangles = np.empty((hull_vertices_count - 2, 3), dtype=np.int32) + for i in range(1, hull_vertices_count - 1): + triangles[i-1] = [0, i, i + 1] + return triangles + + + +def fit_and_create_plane_mesh(points: np.ndarray, thresh: float = 0.1, verbose: bool = False) -> o3d.geometry.TriangleMesh: + """ + Fit a plane to a set of 3D points using the RANSAC algorithm and then creates a mesh representation of the fitted plane. + + Parameters: + - points (numpy.ndarray): A set of 3D coordinates, where each point is an array with [x, y, z] values. + - thresh (float, optional): A threshold value for the RANSAC algorithm to determine inliers and outliers. Defaults to 0.1. + - verbose (bool, optional): If True, prints out the plane parameters. Defaults to False. + + Returns: + - mesh (o3d.geometry.TriangleMesh): A mesh representation of the fitted plane. + """ + + # Fit a plane using RANSAC + plane = pyrsc.Plane() + best_eq, inliers = plane.fit(points, thresh) + + # If verbose is True, print the plane parameters + if verbose: + a, b, c, d = best_eq + print(f"Detected plane: {a:.2f}x + {b:.2f}y + {c:.2f}z + {d:.2f} = 0.") + + # Project the inlier points onto the plane + projected_points = _project_points_onto_plane(points[inliers], best_eq) + + # Define a 2D coordinate system on the plane + normal = np.array(best_eq[:3]) + basis1 = np.cross(normal, normal + np.array([1, 0, 0])) + basis1 /= np.linalg.norm(basis1) + basis2 = np.cross(normal, basis1) + basis2 /= np.linalg.norm(basis2) + + # Map the 3D points to the 2D coordinate system + points_2d = _map_3d_to_2d(projected_points, basis1, basis2) + + # Compute the 2D convex hull + hull_2d = ConvexHull(points_2d) + + # Map the 2D convex hull back to 3D space + hull_3d_points = [] + for vertex in hull_2d.vertices: + point_2d = points_2d[vertex] + point_3d = point_2d[0] * basis1 + point_2d[1] * basis2 + hull_3d_points.append(point_3d) + hull_3d_points = np.array(hull_3d_points) + + # Create a mesh from the 3D convex hull points + mesh = o3d.geometry.TriangleMesh() + mesh.vertices = o3d.utility.Vector3dVector(hull_3d_points) + + # Create triangles by connecting adjacent vertices of the convex hull + triangles = _create_triangles_from_hull(len(hull_3d_points)) + mesh.triangles = o3d.utility.Vector3iVector(triangles) + + # Compute normals for shading + mesh.compute_vertex_normals() + + return mesh + + +def fit_and_create_cuboid_mesh(points: np.ndarray, thresh: float = 0.01, max_iterations: int = 1000, verbose: bool = False) -> o3d.geometry.TriangleMesh: + """ + Fit a cuboid to a set of 3D points using the RANSAC algorithm and then creates a mesh representation of the fitted cuboid. + + Parameters: + - points (numpy.ndarray): A set of 3D coordinates, where each point is an array with [x, y, z] values. + - thresh (float, optional): A threshold value for the RANSAC algorithm to determine inliers and outliers. Defaults to 0.01. + - max_iterations (int, optional): Maximum number of iterations for the RANSAC algorithm. Defaults to 1000. + - verbose (bool, optional): If True, prints out the cuboid parameters. Defaults to False. + + Returns: + - cuboid_mesh (o3d.geometry.TriangleMesh): A mesh representation of the fitted cuboid. + """ + + # Fit a cuboid using RANSAC + cuboid = pyrsc.Cuboid() + cuboid_params = cuboid.fit(points, thresh, max_iterations) + + # Extract the detected planes and inliers + planes = cuboid_params[0] + inlier_indices = cuboid_params[1] + + # Extract the actual inlier points from the original point cloud + inlier_points = points[inlier_indices] + + # Get parallel planes for each detected plane + all_planes = [] + for plane in planes: + parallel_planes = get_parallel_planes(plane, inlier_points) + all_planes.extend(parallel_planes) + + # Generate the cuboid mesh from the planes + cuboid_mesh = cuboid_from_planes(all_planes) + + # Compute the cuboid parameters + min_point = np.min(inlier_points, axis=0) + max_point = np.max(inlier_points, axis=0) + midpoint = (min_point + max_point) / 2 + width = max_point[0] - min_point[0] + height = max_point[1] - min_point[1] + depth = max_point[2] - min_point[2] + + # If verbose is True, print the cuboid parameters in a single line + if verbose: + print(f"Detected cuboid with Midpoint: [{midpoint[0]:.2f}, {midpoint[1]:.2f}, {midpoint[2]:.2f}], Width: {width:.2f}, Height: {height:.2f}, Depth: {depth:.2f}") + + return cuboid_mesh + + +@jit(nopython=True) +def _project_onto_normal(inliers: np.ndarray, normal: np.ndarray) -> np.ndarray: + """ + Project the inliers onto the plane's normal. + + Parameters: + - inliers (numpy.ndarray): The inlier points, where each point is an array with [x, y, z] values. + - normal (numpy.ndarray): The normal of the plane. + + Returns: + - projections (numpy.ndarray): The projections of the inliers onto the plane's normal. + """ + return np.dot(inliers, normal) + + +def get_parallel_planes(plane: List[float], inliers: np.ndarray) -> Tuple[List[float], List[float]]: + """ + Given a plane and inliers, find the two parallel planes that bound the inliers. + + Parameters: + - plane (List[float]): The equation of the plane [a, b, c, d] such that ax + by + cz + d = 0. + - inliers (numpy.ndarray): The inlier points, where each point is an array with [x, y, z] values. + + Returns: + - parallel_planes (Tuple[List[float], List[float]]): The equations of the two parallel planes. + """ + normal = np.array(plane[:3]) + + # Project the inliers onto the plane's normal + projections = _project_onto_normal(inliers, normal) + + # Adjust the d value based on the min and max projections + d_min = -np.min(projections) + d_max = -np.max(projections) + + parallel_plane_1 = plane.copy() + parallel_plane_1[3] = d_min + + parallel_plane_2 = plane.copy() + parallel_plane_2[3] = d_max + + return parallel_plane_1, parallel_plane_2 + + +@jit(nopython=True) +def _solve_system_of_equations(normals: np.ndarray, constants: np.ndarray) -> np.ndarray: + """ + Solve a system of linear equations to find the intersection point of three planes. + + Parameters: + - normals (numpy.ndarray): The normals of the three planes. + - constants (numpy.ndarray): The constants from the plane equations. + + Returns: + - point (numpy.ndarray): The intersection point of the three planes, represented as [x, y, z]. + """ + return np.linalg.solve(normals, constants) + + +def intersect_planes(plane1: List[float], plane2: List[float], plane3: List[float]) -> np.ndarray: + """ + Find the intersection point of three planes. + + Parameters: + - plane1, plane2, plane3 (List[float]): The equations of the planes [a, b, c, d] such that ax + by + cz + d = 0. + + Returns: + - point (numpy.ndarray): The intersection point of the three planes, represented as [x, y, z]. + """ + normals = np.array([plane1[:3], plane2[:3], plane3[:3]]) + constants = -np.array([plane1[3], plane2[3], plane3[3]]) + point = _solve_system_of_equations(normals, constants) + return point + + + +def create_quad_face(p1: np.ndarray, p2: np.ndarray, p3: np.ndarray, p4: np.ndarray) -> o3d.geometry.TriangleMesh: + """ + Create a mesh from four vertices. + + Parameters: + - p1, p2, p3, p4 (numpy.ndarray): Vertices of the quad face [x, y, z] + + Returns: + - mesh (o3d.geometry.TriangleMesh): The quad mesh created from the four vertices. + """ + + # Combine the four vertices into a list + vertices = [p1, p2, p3, p4] + + # Define the triangles that make up the quad face. + # The quad is split into two triangles for rendering. + triangles = [[0, 1, 2], [1, 2, 3]] + + # Create a new TriangleMesh object + mesh = o3d.geometry.TriangleMesh() + + # Set the vertices of the mesh + mesh.vertices = o3d.utility.Vector3dVector(vertices) + + # Set the triangles of the mesh + mesh.triangles = o3d.utility.Vector3iVector(triangles) + + return mesh + + +def cuboid_from_planes(planes: List[List[float]], color: List[float] = [0.5, 0.5, 0.5]) -> o3d.geometry.TriangleMesh: + """ + Generate a cuboid mesh from 6 plane equations. + + Parameters: + planes (List[List[float]]): A list of 6 plane equations [a, b, c, d] such that ax + by + cz + d = 0. + + Returns: + combined_mesh (o3d.geometry.TriangleMesh): An open3d.geometry.TriangleMesh object representing the cuboid. + """ + + # Generate all combinations of 5 planes + combinations_of_five = list(combinations(planes, 5)) + + quads = [] + for combination_of_five in combinations_of_five: + combinations_of_three = list(combinations(combination_of_five, 3)) + vertices = [] + for combination in combinations_of_three: + # Check if any two planes share a normal + normals = [plane[:3] for plane in combination] + if len(set(tuple(normal) for normal in normals)) < 3: + continue # Skip this combination if any two normals are the same + vertices.append(intersect_planes(*combination)) + quads.append(vertices) + + meshes = [] + # Create a mesh for each quadrilateral and add it to the list + for quad in quads: + p1, p2, p3, p4 = quad + quad_face = create_quad_face(p1, p2, p3, p4) + meshes.append(quad_face) + + # Combine into single triangle mesh + combined_mesh = meshes[0] + for mesh in meshes[1:]: + combined_mesh += mesh + + # Color the mesh + combined_mesh.paint_uniform_color(color) + + return combined_mesh + + +def fit_and_create_cylinder_mesh(points: np.ndarray, thresh: float = 0.2, verbose: bool = False) -> o3d.geometry.TriangleMesh: + """ + Fit a cylinder to a set of 3D points using the RANSAC algorithm and then creates a mesh representation of the fitted cylinder. + + Parameters: + - points (numpy.ndarray): A set of 3D coordinates, where each point is an array with [x, y, z] values. + - thresh (float, optional): A threshold value for the RANSAC algorithm to determine inliers and outliers. Defaults to 0.2. + - verbose (bool, optional): If True, prints out the cylinder parameters. Defaults to False. + + Returns: + - mesh_cylinder (o3d.geometry.TriangleMesh): A mesh representation of the fitted cylinder. + """ + + # Fit a cylinder using RANSAC + cylinder = pyrsc.Cylinder() + center, axis, radius, inliers = cylinder.fit(points, thresh) + + # Calculate the top and bottom endpoints of the cylinder + top_point, bottom_point = cylinder_endpoints(center, axis, points[inliers]) + + # Create a cylinder mesh + height = np.linalg.norm(top_point - bottom_point) + mesh_cylinder = o3d.geometry.TriangleMesh.create_cylinder(radius=radius, height=height) + + # Translate the cylinder to the midpoint of top and bottom points + midpoint = (top_point + bottom_point) / 2 + mesh_cylinder.translate(midpoint) + + # Rotate the cylinder to align with the given axis + z_axis = np.array([0, 0, 1]) + rotation_matrix = get_rotation_matrix_from_vectors(z_axis, axis) + mesh_cylinder.rotate(rotation_matrix, center=midpoint) + + # If verbose is True, print the cylinder parameters in a single line + if verbose: + print(f"Detected cylinder with Radius: {radius:.2f}, Height: {height:.2f}, Midpoint: [{midpoint[0]:.2f}, {midpoint[1]:.2f}, {midpoint[2]:.2f}], Axis: [{axis[0]:.2f}, {axis[1]:.2f}, {axis[2]:.2f}]") + + return mesh_cylinder + + +@jit(nopython=True) +def _compute_rotation_matrix(v: np.ndarray, s: float, c: float) -> np.ndarray: + """ + Compute the rotation matrix using the Rodriguez rotation formula. + + Parameters: + - v: Cross product of vectors 'a' and 'b'. + - s: Sine of the angle between vectors 'a' and 'b'. + - c: Cosine of the angle between vectors 'a' and 'b'. + + Returns: + - rotation_matrix: The 3x3 rotation matrix. + """ + v_skew = np.array([ + [0, -v[2], v[1]], + [v[2], 0, -v[0]], + [-v[1], v[0], 0] + ]) + rotation_matrix = np.eye(3) + v_skew + (v_skew @ v_skew) * ((1 - c) / s**2) + return rotation_matrix + +def get_rotation_matrix_from_vectors(a: np.ndarray, b: np.ndarray) -> np.ndarray: + """ + Compute the rotation matrix that aligns vector 'a' to vector 'b'. + + Parameters: + - a: The source vector that needs to be rotated. + - b: The target vector to which 'a' should be aligned. + + Returns: + - rotation_matrix: The 3x3 rotation matrix that rotates 'a' to align with 'b'. + """ + v = np.cross(a, b) + s = np.linalg.norm(v) + c = np.dot(a, b) + return _compute_rotation_matrix(v, s, c) + + +@jit(nopython=True) +def _compute_endpoints(center: np.ndarray, axis: np.ndarray, inlier_points: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """ + Compute the top and bottom endpoints of a cylinder. + + Parameters: + - center: The center point of the cylinder. + - axis: The axis of the cylinder. + - inlier_points: The inlier points that lie on the surface of the cylinder. + + Returns: + - top_point: The top endpoint of the cylinder. + - bottom_point: The bottom endpoint of the cylinder. + """ + t_values = np.array([(np.dot(point - center, axis)) for point in inlier_points]) + top_t = np.max(t_values) + bottom_t = np.min(t_values) + top_point = center + top_t * axis + bottom_point = center + bottom_t * axis + return top_point, bottom_point + + +def cylinder_endpoints(center: np.ndarray, axis: np.ndarray, inlier_points: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """ + Compute the top and bottom endpoints of a cylinder given its center, axis, and inlier points. + + Parameters: + - center: The center point of the cylinder. + - axis: The axis of the cylinder. + - inlier_points: The inlier points that lie on the surface of the cylinder. + + Returns: + - top_point: The top endpoint of the cylinder. + - bottom_point: The bottom endpoint of the cylinder. + """ + axis = axis / np.linalg.norm(axis) + return _compute_endpoints(center, axis, inlier_points) + + + + + +def fit_shape_to_detection(detection: str, point_cloud: o3d.geometry.PointCloud, shape_dict: dict, verbose: bool = False) -> Union[o3d.geometry.TriangleMesh, None]: + """ + Fit a geometric shape to a given detection based on a predefined dictionary of shapes. + The function selects the appropriate shape fitting method based on the detected object class. + + Parameters: + - detection (str): The object class detected, e.g., "car", "person", etc. + - point_cloud (o3d.geometry.PointCloud): The 3D point cloud data associated with the detection. + - shape_dict (dict): A dictionary mapping object classes to their corresponding geometric shapes. + - verbose (bool, optional): If True, prints out the shape parameters. Defaults to False. + + Returns: + - o3d.geometry.TriangleMesh or None: The fitted mesh for the detection. If the shape type is not defined or there are insufficient points, it returns None. + """ + + # Convert the point cloud to a numpy array + points = np.asarray(point_cloud.points) + + # Check if there are enough points for shape fitting + if len(points) < 20: + print("Not enough points for shape fitting.") + return None + + # If there are too many points, randomly sample a subset for efficiency + if len(points) > 300: + selected_indices = np.random.choice(len(points), 300, replace=False) + points = points[selected_indices] + + # Get the shape type for the detected object class from the dictionary + shape_type = shape_dict.get(detection, None) + + # If the shape type is not defined, return None + if shape_type is None: + return None + + # Fit the appropriate shape based on the shape type + if shape_type == "sphere": + mesh = fit_and_create_sphere_mesh(points, verbose=verbose) + elif shape_type == "plane": + mesh = fit_and_create_plane_mesh(points, verbose=verbose) + elif shape_type == "cylinder": + mesh = fit_and_create_cylinder_mesh(points, verbose=verbose) + elif shape_type == "cuboid": + mesh = fit_and_create_cuboid_mesh(points, verbose=verbose) + else: + # Raise an error if the shape type is unknown + raise ValueError(f"Unknown shape type: {shape_type}") + + return mesh + + +# selected model and its config (YOLO v5) +args = { + 'model': 'yolov5n_coco_416x416', + 'config': 'json/yolov5.json' +} + +# parse config +configPath = Path(args["config"]) +if not configPath.exists(): + raise ValueError("Path {} does not exist!".format(configPath)) + +with configPath.open() as f: + config = json.load(f) +nnConfig = config.get("nn_config", {}) + +# parse input shape +if "input_size" in nnConfig: + W, H = tuple(map(int, nnConfig.get("input_size").split('x'))) + +# extract metadata +metadata = nnConfig.get("NN_specific_metadata", {}) +classes = metadata.get("classes", {}) +coordinates = metadata.get("coordinates", {}) +anchors = metadata.get("anchors", {}) +anchorMasks = metadata.get("anchor_masks", {}) +iouThreshold = metadata.get("iou_threshold", {}) +confidenceThreshold = metadata.get("confidence_threshold", {}) + +print(metadata) + +# parse labels +nnMappings = config.get("mappings", {}) +labels = nnMappings.get("labels", {}) + +# get model path +nnPath = args["model"] +if not Path(nnPath).exists(): + print("No blob found at {}. Looking into DepthAI model zoo.".format(nnPath)) + nnPath = str(blobconverter.from_zoo(args["model"], shaves = 6, zoo_type = "depthai", use_cache=True)) +# sync outputs +syncNN = True + +# Initialize the pipeline +pipeline = dai.Pipeline() + +# Create nodes for the left and right cameras +left = pipeline.create(dai.node.MonoCamera) +right = pipeline.create(dai.node.MonoCamera) + +# Create a node for the stereo depth calculation +stereo = pipeline.create(dai.node.StereoDepth) + +# Create a node for the central RGB camera +camRgb = pipeline.create(dai.node.ColorCamera) + +# Create NeuralNetwork node and set the YOLOv4 model +yolo_nn = pipeline.create(dai.node.YoloDetectionNetwork) +yolo_nn.setConfidenceThreshold(confidenceThreshold) +yolo_nn.setNumClasses(classes) +yolo_nn.setCoordinateSize(coordinates) +yolo_nn.setAnchors(anchors) +yolo_nn.setAnchorMasks(anchorMasks) +yolo_nn.setIouThreshold(iouThreshold) +yolo_nn.setBlobPath(nnPath) +yolo_nn.setNumInferenceThreads(2) +yolo_nn.input.setBlocking(False) + +# Configure the MonoCamera nodes +left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P) +left.setBoardSocket(dai.CameraBoardSocket.CAM_B) +right.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P) +right.setBoardSocket(dai.CameraBoardSocket.CAM_C) + +# Link the left and right cameras to the stereo depth node +left.out.link(stereo.left) +right.out.link(stereo.right) + +# Create an output node for the depth stream +depth_out = pipeline.create(dai.node.XLinkOut) +depth_out.setStreamName("depth") +stereo.depth.link(depth_out.input) + +# Configure the camRgb node +camRgb_out = pipeline.create(dai.node.XLinkOut) +camRgb_out.setStreamName("color") +camRgb.video.link(camRgb_out.input) +camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB) +camRgb.setPreviewSize(W, H) +camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_720_P) +camRgb.setInterleaved(False) +camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR) +camRgb.setFps(30) + +# Link the central RGB camera output to the YoloDetectionNetwork node +camRgb.preview.link(yolo_nn.input) + +# Create an output node for the YOLO detections +xout_nn = pipeline.create(dai.node.XLinkOut) +xout_nn.setStreamName("nn") +yolo_nn.out.link(xout_nn.input) + + + + +class FrequencyMeter: + """ + A utility class to measure the frequency of events over a specified duration. + + Attributes: + - timestamps (List[float]): A list of timestamps when the events occurred. + - avg_over (int): The number of events over which the frequency is averaged. + + Methods: + - tick(): Records the current timestamp and calculates the average frequency over the last 'avg_over' events. + """ + + def __init__(self, avg_over=10): + """ + Initializes the FrequencyMeter with a specified number of events to average over. + + Parameters: + - avg_over (int, optional): The number of events over which the frequency is averaged. Defaults to 10. + """ + self.timestamps = [] + self.avg_over = avg_over + + def tick(self): + """ + Records the current timestamp and calculates the average frequency over the last 'avg_over' events. + If the number of recorded events exceeds 'avg_over', the oldest timestamp is removed. + The calculated frequency is printed to the console. + """ + now = time.time() + self.timestamps.append(now) + if len(self.timestamps) > self.avg_over: + self.timestamps.pop(0) + freq = self.avg_over / (self.timestamps[-1] - self.timestamps[0]) + print(f"Frequency: {freq:.2f} Hz") + +freq_meter = FrequencyMeter() + + + +# Set this to True if you want to print detections +verbose=False + + +with dai.Device(pipeline) as device: + + # Normalize bounding box coordinates to match frame dimensions + def frameNorm(frame: np.ndarray, bbox: List[float]) -> List[int]: + """ + Normalize bounding box coordinates based on frame dimensions. + + Parameters: + - frame (np.ndarray): The frame to which the bounding box corresponds. + - bbox (List[float]): Bounding box coordinates in the range <0..1>. + + Returns: + - List[int]: Normalized bounding box coordinates in pixels. + """ + normVals = np.full(len(bbox), frame.shape[0]) + normVals[::2] = frame.shape[1] + return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int) + + def displayFrame(name: str, frame: np.ndarray, detections: List[dai.ImgDetection]): + """ + Display the frame with bounding boxes and labels. + + Parameters: + - name (str): Window name for displaying the frame. + - frame (np.ndarray): The frame to display. + - detections (List[dai.ImgDetection]): List of detections to be drawn on the frame. + """ + + + color = (255, 0, 0) # Bounding box color + # Add bounding boxes and text onto the rgb frame according to detections + for detection in detections: + bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax)) + cv2.putText(frame, labels[detection.label], (bbox[0] + 10, bbox[1] + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255) + cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 40), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255) + cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2) + # Display the frame + cv2.imshow(name, frame) + + # Set the brightness of the IR laser dot projector + device.setIrLaserDotProjectorBrightness(1200) + + # Create output queues for depth, color, and neural network outputs + qs = [] + qs.append(device.getOutputQueue("depth", maxSize=1, blocking=False)) + qs.append(device.getOutputQueue("color", maxSize=1, blocking=False)) + qs.append(device.getOutputQueue("nn", maxSize=1, blocking=False)) + + # Retrieve camera calibration data + calibData = device.readCalibration() + w, h = camRgb.getIspSize() + intrinsics = calibData.getCameraIntrinsics(dai.CameraBoardSocket.CAM_A, dai.Size2f(w, h)) + + # Initialize the PointCloudVisualizer with the camera intrinsics + pcl_converter = PointCloudVisualizer(intrinsics, w, h) + + # Initialize the HostSync class for synchronizing messages + sync = HostSync() + + while True: + # Iterate over each queue (depth, color, nn) + for q in qs: + # Try to get a new message from the queue + new_msg = q.tryGet() + if new_msg is not None: + # Add the new message to the synchronized message list + msgs = sync.add_msg(q.getName(), new_msg) + if msgs: + # Extract depth frame, color frame, and detections from the synchronized messages + depth = msgs["depth"].getFrame() + color = msgs["color"].getCvFrame() + detections = msgs["nn"].detections + + # Convert the color frame from BGR to RGB and resize it to match the depth frame dimensions + rgb = cv2.cvtColor(color, cv2.COLOR_BGR2RGB) + rgb_resized = cv2.resize(rgb, (depth.shape[1], depth.shape[0])) + + # Initialize an empty list to store all meshes + all_meshes = [] + + # Clear any existing meshes in the point cloud converter + pcl_converter.clear_meshes() + + # Process each detection + for detection in detections: + # Extract and normalize the bounding box coordinates + xmin, ymin, xmax, ymax = [ + int(detection.xmin * w), + int(detection.ymin * h), + int(detection.xmax * w), + int(detection.ymax * h), + ] + + # Ensure bounding box coordinates are valid + xmin, xmax = sorted([xmin, xmax]) + ymin, ymax = sorted([ymin, ymax]) + xmin = max(0, min(xmin, rgb_resized.shape[1] - 1)) + ymin = max(0, min(ymin, rgb_resized.shape[0] - 1)) + xmax = min(rgb_resized.shape[1] - 1, max(xmax, 0)) + ymax = min(rgb_resized.shape[0] - 1, max(ymax, 0)) + + # Skip invalid bounding boxes + if xmin >= xmax or ymin >= ymax: + print("Invalid bounding box detected. Skipping this detection.") + continue + + # Create a modified depth map with values outside the bounding box set to zero + modified_depth = np.copy(depth) + modified_depth[0:ymin, :] = 0 + modified_depth[ymax:, :] = 0 + modified_depth[:, 0:xmin] = 0 + modified_depth[:, xmax:] = 0 + + # Convert the modified depth and RGB data to a point cloud + object_pcl = pcl_converter.rgbd_to_projection(modified_depth, rgb_resized) + + # Segment the largest cluster from the point cloud + object_pcl = segment_largest_cluster(object_pcl) + + # Fit a shape to the detection and obtain the corresponding mesh + mesh = fit_shape_to_detection(labels[detection.label], object_pcl, shape_dict, verbose=verbose) + + # If a valid mesh is obtained, add it to the point cloud converter + if mesh is not None: + pcl_converter.add_mesh(mesh) + + # Convert the entire depth and RGB data to a point cloud + pcd = pcl_converter.rgbd_to_projection(depth, rgb_resized) + + # Display the color frame with bounding boxes and labels + displayFrame("rgb", color, detections) + + # Visualize the point cloud + pcl_converter.visualize_pcd() + + # Tick the framerate meter + freq_meter.tick() + + # Check for user input to exit the loop + key = cv2.waitKey(1) + if key == ord("q"): + pcl_converter.close_window() + cv2.destroyAllWindows() + break \ No newline at end of file diff --git a/gen2-geometry-fitting/json/yolov5.json b/gen2-geometry-fitting/json/yolov5.json new file mode 100644 index 000000000..fbf565806 --- /dev/null +++ b/gen2-geometry-fitting/json/yolov5.json @@ -0,0 +1,108 @@ +{ + "nn_config": + { + "output_format" : "detection", + "NN_family" : "YOLO", + "input_size": "416x416", + "NN_specific_metadata" : + { + "classes" : 80, + "coordinates" : 4, + "anchors" : [10,13, 16,30, 33,23, 30,61, 62,45, 59,119, 116,90, 156,198, 373,326], + "anchor_masks" : + { + "side52" : [0,1,2], + "side26" : [3,4,5], + "side13" : [6,7,8] + }, + "iou_threshold" : 0.5, + "confidence_threshold" : 0.5 + } + }, + "mappings": + { + "labels": + [ + "person", + "bicycle", + "car", + "motorbike", + "aeroplane", + "bus", + "train", + "truck", + "boat", + "traffic light", + "fire hydrant", + "stop sign", + "parking meter", + "bench", + "bird", + "cat", + "dog", + "horse", + "sheep", + "cow", + "elephant", + "bear", + "zebra", + "giraffe", + "backpack", + "umbrella", + "handbag", + "tie", + "suitcase", + "frisbee", + "skis", + "snowboard", + "sports ball", + "kite", + "baseball bat", + "baseball glove", + "skateboard", + "surfboard", + "tennis racket", + "bottle", + "wine glass", + "cup", + "fork", + "knife", + "spoon", + "bowl", + "banana", + "apple", + "sandwich", + "orange", + "broccoli", + "carrot", + "hot dog", + "pizza", + "donut", + "cake", + "chair", + "sofa", + "pottedplant", + "bed", + "diningtable", + "toilet", + "tvmonitor", + "laptop", + "mouse", + "remote", + "keyboard", + "cell phone", + "microwave", + "oven", + "toaster", + "sink", + "refrigerator", + "book", + "clock", + "vase", + "scissors", + "teddy bear", + "hair drier", + "toothbrush" + ] + } +} diff --git a/gen2-geometry-fitting/requirements.txt b/gen2-geometry-fitting/requirements.txt new file mode 100644 index 000000000..147994767 --- /dev/null +++ b/gen2-geometry-fitting/requirements.txt @@ -0,0 +1,9 @@ +numba==0.55.1 +numpy==1.21.0 +open3d==0.14.1 +opencv-python==4.5.3 +depthai==2.22.0.0 +scipy==1.9.0 +scikit-learn==1.2.2 +pyransac3d +blobconverter diff --git a/gen2-pointcloud/rgbd-pointcloud/main.py b/gen2-pointcloud/rgbd-pointcloud/main.py index 525f38a7b..194a5ce8f 100644 --- a/gen2-pointcloud/rgbd-pointcloud/main.py +++ b/gen2-pointcloud/rgbd-pointcloud/main.py @@ -25,12 +25,12 @@ monoLeft = pipeline.create(dai.node.MonoCamera) monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P) -monoLeft.setBoardSocket(dai.CameraBoardSocket.LEFT) +monoLeft.setBoardSocket(dai.CameraBoardSocket.CAM_B) #CAM_B = left camera monoRight = pipeline.create(dai.node.MonoCamera) monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P) -monoRight.setBoardSocket(dai.CameraBoardSocket.RIGHT) - +monoRight.setBoardSocket(dai.CameraBoardSocket.CAM_C) #CAM_C = right camera + stereo = pipeline.createStereoDepth() stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY) stereo.initialConfig.setMedianFilter(median) @@ -75,7 +75,7 @@ camRgb.setIspScale(1, 3) camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB) camRgb.initialControl.setManualFocus(130) - stereo.setDepthAlign(dai.CameraBoardSocket.RGB) + stereo.setDepthAlign(dai.CameraBoardSocket.CAM_A) #CAM_A = center camera camRgb.isp.link(xout_colorize.input) else: stereo.rectifiedRight.link(xout_colorize.input) @@ -133,10 +133,10 @@ def add_msg(self, name, msg): calibData = device.readCalibration() if COLOR: w, h = camRgb.getIspSize() - intrinsics = calibData.getCameraIntrinsics(dai.CameraBoardSocket.RGB, dai.Size2f(w, h)) + intrinsics = calibData.getCameraIntrinsics(dai.CameraBoardSocket.CAM_A, dai.Size2f(w, h)) else: w, h = monoRight.getResolutionSize() - intrinsics = calibData.getCameraIntrinsics(dai.CameraBoardSocket.RIGHT, dai.Size2f(w, h)) + intrinsics = calibData.getCameraIntrinsics(dai.CameraBoardSocket.CAM_C, dai.Size2f(w, h)) pcl_converter = PointCloudVisualizer(intrinsics, w, h) serial_no = device.getMxId() @@ -173,4 +173,6 @@ def add_msg(self, name, msg): cv2.imwrite(f"{serial_no}_{timestamp}_rectified_right.png", rectified_right) o3d.io.write_point_cloud(f"{serial_no}_{timestamp}.pcd", pcl_converter.pcl, compressed=True) elif key == ord("q"): - break + pcl_converter.close_window() + cv2.destroyAllWindows() + break \ No newline at end of file diff --git a/gen2-pointcloud/rgbd-pointcloud/requirements.txt b/gen2-pointcloud/rgbd-pointcloud/requirements.txt index 847d20983..32f008766 100644 --- a/gen2-pointcloud/rgbd-pointcloud/requirements.txt +++ b/gen2-pointcloud/rgbd-pointcloud/requirements.txt @@ -1,5 +1,4 @@ -opencv-python -depthai==2.16.0.0 -numpy -open3d==0.10.0.0; platform_machine != "armv7l" and python_version < "3.9" -torch \ No newline at end of file +numpy==1.21.5 +open3d==0.14.1 +depthai==2.22.0.0 +opencv-python==4.7.0