|
18 | 18 |
|
19 | 19 | #include <cub/cub.cuh> |
20 | 20 | #include <thrust/copy.h> |
| 21 | +#include <thrust/count.h> |
21 | 22 | #include <thrust/device_ptr.h> |
22 | 23 | #include <thrust/execution_policy.h> |
23 | 24 | #include <thrust/extrema.h> |
24 | 25 | #include <thrust/fill.h> |
25 | 26 | #include <thrust/reduce.h> |
26 | 27 |
|
| 28 | +#ifdef _OPENMP |
| 29 | +#include <omp.h> |
| 30 | +#else |
| 31 | +#define omp_get_max_threads() 1 |
| 32 | +#endif |
| 33 | + |
| 34 | +#include <algorithm> |
| 35 | +#include <cstdint> |
| 36 | + |
27 | 37 | namespace ML { |
28 | 38 | namespace HDBSCAN { |
29 | 39 | namespace detail { |
30 | 40 | namespace Condense { |
31 | 41 |
|
| 42 | +template <typename value_idx, typename value_t> |
| 43 | +value_t get_lambda_host(value_idx node, value_idx num_points, const value_t* deltas) |
| 44 | +{ |
| 45 | + value_t delta = deltas[node - num_points]; |
| 46 | + return delta > 0.0 ? 1.0 / delta : std::numeric_limits<value_t>::max(); |
| 47 | +} |
| 48 | + |
| 49 | +/* Heuristic dispatching to bottom-up implementation. A high persistent_ratio means there are more |
| 50 | +chances to stop early as we climb up the tree, making it more efficient for bottom-up approach |
| 51 | +Note: These thresholds are based on empirical observations from running the algorithm on |
| 52 | +datasets ranging from 30K to 5M points, with varying distributions and thread counts. For more |
| 53 | +details, please refer to the tables in this PR: https://github.com/rapidsai/cuml/pull/7459 |
| 54 | +*/ |
| 55 | +bool dispatch_to_bottom_up(int num_persistent, int n_leaves) |
| 56 | +{ |
| 57 | + int n_nodes = 2 * n_leaves - 1; |
| 58 | + int internal_nodes = n_nodes - n_leaves; |
| 59 | + float persistent_ratio = static_cast<float>(num_persistent) / static_cast<float>(internal_nodes); |
| 60 | + int num_omp_threads = omp_get_max_threads(); |
| 61 | + if (persistent_ratio >= 0.001) { |
| 62 | + return true; |
| 63 | + } else if (persistent_ratio >= 0.0001 && num_omp_threads >= 16) { |
| 64 | + return true; |
| 65 | + } else if (num_omp_threads >= 64) { |
| 66 | + return true; |
| 67 | + } else { |
| 68 | + return false; |
| 69 | + } |
| 70 | +} |
| 71 | + |
| 72 | +/** |
| 73 | + * Bottom-up approach using OpenMP on CPU |
| 74 | + */ |
| 75 | +template <typename value_idx, typename value_t> |
| 76 | +void _build_condensed_hiearchy_bottom_up(const raft::handle_t& handle, |
| 77 | + const value_idx* children, |
| 78 | + const value_t* delta, |
| 79 | + const value_idx* sizes, |
| 80 | + const rmm::device_uvector<uint8_t>& is_persistent, |
| 81 | + int n_leaves, |
| 82 | + int min_cluster_size, |
| 83 | + rmm::device_uvector<value_idx>& out_parent, |
| 84 | + rmm::device_uvector<value_idx>& out_child, |
| 85 | + rmm::device_uvector<value_t>& out_lambda, |
| 86 | + rmm::device_uvector<value_idx>& out_size) |
| 87 | +{ |
| 88 | + cudaStream_t stream = handle.get_stream(); |
| 89 | + int total_internal = n_leaves - 1; |
| 90 | + |
| 91 | + value_idx root = 2 * (n_leaves - 1); |
| 92 | + value_idx n_nodes = 2 * n_leaves - 1; |
| 93 | + |
| 94 | + std::vector<value_idx> h_children(2 * (n_leaves - 1)); |
| 95 | + std::vector<value_t> h_delta(n_leaves - 1); |
| 96 | + std::vector<value_idx> h_sizes(n_leaves - 1); |
| 97 | + std::vector<uint8_t> h_is_persistent(total_internal, 0); |
| 98 | + |
| 99 | + raft::copy(h_children.data(), children, 2 * (n_leaves - 1), stream); |
| 100 | + raft::copy(h_delta.data(), delta, n_leaves - 1, stream); |
| 101 | + raft::copy(h_sizes.data(), sizes, n_leaves - 1, stream); |
| 102 | + raft::copy(h_is_persistent.data(), is_persistent.data(), total_internal, stream); |
| 103 | + handle.sync_stream(stream); |
| 104 | + |
| 105 | + // Allocate host output arrays |
| 106 | + std::vector<value_idx> h_out_parent((root + 1) * 2, -1); |
| 107 | + std::vector<value_idx> h_out_child((root + 1) * 2, -1); |
| 108 | + std::vector<value_t> h_out_lambda((root + 1) * 2, -1); |
| 109 | + std::vector<value_idx> h_out_sizes((root + 1) * 2, -1); |
| 110 | + |
| 111 | + { |
| 112 | + std::vector<int> parent(n_nodes, -1); |
| 113 | + |
| 114 | + /* |
| 115 | + * Get parent information for easy pointer chasing. |
| 116 | + */ |
| 117 | +#pragma omp parallel for |
| 118 | + for (int node = n_leaves; node < n_nodes; node++) { |
| 119 | + int left = h_children[(node - n_leaves) * 2]; |
| 120 | + int right = h_children[(node - n_leaves) * 2 + 1]; |
| 121 | + parent[left] = node; |
| 122 | + parent[right] = node; |
| 123 | + } |
| 124 | + |
| 125 | + std::vector<value_idx> relabel(n_nodes, 0); |
| 126 | + |
| 127 | + /* |
| 128 | + * Build the parent–child relationships between internal and leaf nodes, |
| 129 | + * and assign each leaf a representative ancestor and a lambda value. |
| 130 | + * |
| 131 | + * - Every leaf node should be connected to an internal node |
| 132 | + * that lies directly below the nearest persistent node in the hierarchy. |
| 133 | + * - The `relabel` array stores this mapping: for each node, it records the |
| 134 | + * ancestor that serves as its cluster representative. |
| 135 | + * - The first internal ancestor whose size >= min_cluster_size provides |
| 136 | + * the lambda value for the leaf. |
| 137 | + */ |
| 138 | +#pragma omp parallel for |
| 139 | + for (int node = 0; node < n_nodes; node++) { |
| 140 | + int current = node; |
| 141 | + int ancestor = parent[current]; |
| 142 | + int rel = -1; |
| 143 | + float assign_lambda = -1.0f; |
| 144 | + |
| 145 | + // climb until we reach root or a persistent parent |
| 146 | + while (ancestor != -1) { |
| 147 | + if (node < n_leaves && assign_lambda == -1 && |
| 148 | + h_sizes[ancestor - n_leaves] >= min_cluster_size) { |
| 149 | + // if leaf node, keep track of delta value of the first internal node that has >= min |
| 150 | + // clusters |
| 151 | + assign_lambda = get_lambda_host(ancestor, n_leaves, h_delta.data()); |
| 152 | + } |
| 153 | + bool ancestor_persistent = (ancestor >= n_leaves && h_is_persistent[ancestor - n_leaves]); |
| 154 | + |
| 155 | + if (ancestor_persistent) { |
| 156 | + rel = current; |
| 157 | + break; |
| 158 | + } |
| 159 | + |
| 160 | + current = ancestor; |
| 161 | + ancestor = parent[current]; |
| 162 | + } |
| 163 | + // ancestor stays -1 for the root |
| 164 | + relabel[node] = ancestor == -1 ? root : rel; |
| 165 | + |
| 166 | + if (node < n_leaves) { |
| 167 | + h_out_parent[node * 2] = relabel[node]; |
| 168 | + h_out_child[node * 2] = node; |
| 169 | + h_out_lambda[node * 2] = assign_lambda; |
| 170 | + h_out_sizes[node * 2] = 1; |
| 171 | + } |
| 172 | + } |
| 173 | + |
| 174 | + /* |
| 175 | + * Add edges for internal (persistent) nodes to their children using the relabel array. |
| 176 | + */ |
| 177 | +#pragma omp parallel for |
| 178 | + for (int node = n_leaves; node < n_nodes; node++) { |
| 179 | + if (h_is_persistent[node - n_leaves]) { |
| 180 | + value_idx left_child = h_children[(node - n_leaves) * 2]; |
| 181 | + value_idx right_child = h_children[((node - n_leaves) * 2) + 1]; |
| 182 | + int left_count = left_child >= n_leaves ? h_sizes[left_child - n_leaves] : 1; |
| 183 | + int right_count = right_child >= n_leaves ? h_sizes[right_child - n_leaves] : 1; |
| 184 | + value_t lambda_value = get_lambda_host(node, n_leaves, h_delta.data()); |
| 185 | + |
| 186 | + h_out_parent[node * 2] = relabel[node]; |
| 187 | + h_out_child[node * 2] = left_child; |
| 188 | + h_out_lambda[node * 2] = lambda_value; |
| 189 | + h_out_sizes[node * 2] = left_count; |
| 190 | + |
| 191 | + h_out_parent[node * 2 + 1] = relabel[node]; |
| 192 | + h_out_child[node * 2 + 1] = right_child; |
| 193 | + h_out_lambda[node * 2 + 1] = lambda_value; |
| 194 | + h_out_sizes[node * 2 + 1] = right_count; |
| 195 | + } |
| 196 | + } |
| 197 | + } |
| 198 | + |
| 199 | + // Copy results back to device |
| 200 | + raft::copy(out_parent.data(), h_out_parent.data(), (root + 1) * 2, stream); |
| 201 | + raft::copy(out_child.data(), h_out_child.data(), (root + 1) * 2, stream); |
| 202 | + raft::copy(out_lambda.data(), h_out_lambda.data(), (root + 1) * 2, stream); |
| 203 | + raft::copy(out_size.data(), h_out_sizes.data(), (root + 1) * 2, stream); |
| 204 | +} |
| 205 | + |
| 206 | +/** |
| 207 | + * Top-down BFS approach on GPU |
| 208 | + */ |
| 209 | +template <typename value_idx, typename value_t, int tpb = 256> |
| 210 | +void _build_condensed_hierarchy_top_down(const raft::handle_t& handle, |
| 211 | + const value_idx* children, |
| 212 | + const value_t* delta, |
| 213 | + const value_idx* sizes, |
| 214 | + int n_leaves, |
| 215 | + int min_cluster_size, |
| 216 | + rmm::device_uvector<value_idx>& out_parent, |
| 217 | + rmm::device_uvector<value_idx>& out_child, |
| 218 | + rmm::device_uvector<value_t>& out_lambda, |
| 219 | + rmm::device_uvector<value_idx>& out_size) |
| 220 | +{ |
| 221 | + cudaStream_t stream = handle.get_stream(); |
| 222 | + auto exec_policy = handle.get_thrust_policy(); |
| 223 | + |
| 224 | + // Root is the last edge in the dendrogram |
| 225 | + value_idx root = 2 * (n_leaves - 1); |
| 226 | + |
| 227 | + rmm::device_uvector<bool> frontier(root + 1, stream); |
| 228 | + rmm::device_uvector<bool> next_frontier(root + 1, stream); |
| 229 | + |
| 230 | + thrust::fill(exec_policy, frontier.begin(), frontier.end(), false); |
| 231 | + thrust::fill(exec_policy, next_frontier.begin(), next_frontier.end(), false); |
| 232 | + |
| 233 | + // Array to propagate the lambda of subtrees actively being collapsed |
| 234 | + // through multiple bfs iterations. |
| 235 | + rmm::device_uvector<value_t> ignore(root + 1, stream); |
| 236 | + |
| 237 | + // Propagate labels from root |
| 238 | + rmm::device_uvector<value_idx> relabel(root + 1, stream); |
| 239 | + thrust::fill(exec_policy, relabel.begin(), relabel.end(), -1); |
| 240 | + |
| 241 | + raft::update_device(relabel.data() + root, &root, 1, stream); |
| 242 | + |
| 243 | + // Flip frontier for root |
| 244 | + constexpr bool start = true; |
| 245 | + raft::update_device(frontier.data() + root, &start, 1, stream); |
| 246 | + |
| 247 | + thrust::fill(exec_policy, out_parent.begin(), out_parent.end(), -1); |
| 248 | + thrust::fill(exec_policy, out_child.begin(), out_child.end(), -1); |
| 249 | + thrust::fill(exec_policy, out_lambda.begin(), out_lambda.end(), -1); |
| 250 | + thrust::fill(exec_policy, out_size.begin(), out_size.end(), -1); |
| 251 | + thrust::fill(exec_policy, ignore.begin(), ignore.end(), -1); |
| 252 | + |
| 253 | + // While frontier is not empty, perform single bfs through tree |
| 254 | + size_t grid = raft::ceildiv(root + 1, static_cast<value_idx>(tpb)); |
| 255 | + |
| 256 | + value_idx n_elements_to_traverse = |
| 257 | + thrust::reduce(exec_policy, frontier.data(), frontier.data() + root + 1, 0); |
| 258 | + |
| 259 | + while (n_elements_to_traverse > 0) { |
| 260 | + // TODO: Investigate whether it would be worth performing a gather/argmatch in order |
| 261 | + // to schedule only the number of threads needed. (it might not be worth it) |
| 262 | + condense_hierarchy_kernel<<<grid, tpb, 0, stream>>>(frontier.data(), |
| 263 | + next_frontier.data(), |
| 264 | + ignore.data(), |
| 265 | + relabel.data(), |
| 266 | + children, |
| 267 | + delta, |
| 268 | + sizes, |
| 269 | + n_leaves, |
| 270 | + min_cluster_size, |
| 271 | + out_parent.data(), |
| 272 | + out_child.data(), |
| 273 | + out_lambda.data(), |
| 274 | + out_size.data()); |
| 275 | + |
| 276 | + thrust::copy(exec_policy, next_frontier.begin(), next_frontier.end(), frontier.begin()); |
| 277 | + thrust::fill(exec_policy, next_frontier.begin(), next_frontier.end(), false); |
| 278 | + |
| 279 | + n_elements_to_traverse = thrust::reduce( |
| 280 | + exec_policy, frontier.data(), frontier.data() + root + 1, static_cast<value_idx>(0)); |
| 281 | + } |
| 282 | +} |
| 283 | + |
32 | 284 | /** |
33 | 285 | * Condenses a binary single-linkage tree dendrogram in the Scipy hierarchy |
34 | 286 | * format by collapsing subtrees that fall below a minimum cluster size. |
35 | 287 | * |
| 288 | + * This function dispatches to either a CPU (bottom-up) or GPU (top-down BFS) |
| 289 | + * implementation based on a heuristic that considers the persistent node ratio |
| 290 | + * and available OpenMP threads. |
| 291 | + * |
36 | 292 | * For increased parallelism, the output array sizes are held fixed but |
37 | 293 | * the result will be sparse (e.g. zeros in place of parents who have been |
38 | 294 | * removed / collapsed). This function accepts an empty instance of |
@@ -78,69 +334,54 @@ void build_condensed_hierarchy(const raft::handle_t& handle, |
78 | 334 | "total.", |
79 | 335 | static_cast<int>(n_vertices)); |
80 | 336 |
|
81 | | - rmm::device_uvector<bool> frontier(root + 1, stream); |
82 | | - rmm::device_uvector<bool> next_frontier(root + 1, stream); |
83 | | - |
84 | | - thrust::fill(exec_policy, frontier.begin(), frontier.end(), false); |
85 | | - thrust::fill(exec_policy, next_frontier.begin(), next_frontier.end(), false); |
86 | | - |
87 | | - // Array to propagate the lambda of subtrees actively being collapsed |
88 | | - // through multiple bfs iterations. |
89 | | - rmm::device_uvector<value_t> ignore(root + 1, stream); |
| 337 | + int total_internal = n_leaves - 1; |
90 | 338 |
|
91 | | - // Propagate labels from root |
92 | | - rmm::device_uvector<value_idx> relabel(root + 1, handle.get_stream()); |
93 | | - thrust::fill(exec_policy, relabel.begin(), relabel.end(), -1); |
| 339 | + // Identify persistent nodes (those with size >= min_cluster_size) |
| 340 | + rmm::device_uvector<uint8_t> is_persistent(total_internal, stream); |
| 341 | + thrust::fill(exec_policy, is_persistent.begin(), is_persistent.end(), 0); |
94 | 342 |
|
95 | | - raft::update_device(relabel.data() + root, &root, 1, handle.get_stream()); |
| 343 | + size_t grid = raft::ceildiv(total_internal, static_cast<int>(tpb)); |
| 344 | + get_persistent_nodes_kernel<<<grid, tpb, 0, stream>>>( |
| 345 | + children, delta, sizes, is_persistent.data(), min_cluster_size, n_leaves); |
96 | 346 |
|
97 | | - // Flip frontier for root |
98 | | - constexpr bool start = true; |
99 | | - raft::update_device(frontier.data() + root, &start, 1, handle.get_stream()); |
| 347 | + thrust::device_ptr<uint8_t> is_persistent_ptr(is_persistent.data()); |
| 348 | + int num_persistent = |
| 349 | + thrust::count(exec_policy, is_persistent_ptr, is_persistent_ptr + total_internal, 1); |
100 | 350 |
|
| 351 | + // Allocate output arrays |
101 | 352 | rmm::device_uvector<value_idx> out_parent((root + 1) * 2, stream); |
102 | 353 | rmm::device_uvector<value_idx> out_child((root + 1) * 2, stream); |
103 | 354 | rmm::device_uvector<value_t> out_lambda((root + 1) * 2, stream); |
104 | 355 | rmm::device_uvector<value_idx> out_size((root + 1) * 2, stream); |
105 | 356 |
|
106 | | - thrust::fill(exec_policy, out_parent.begin(), out_parent.end(), -1); |
107 | | - thrust::fill(exec_policy, out_child.begin(), out_child.end(), -1); |
108 | | - thrust::fill(exec_policy, out_lambda.begin(), out_lambda.end(), -1); |
109 | | - thrust::fill(exec_policy, out_size.begin(), out_size.end(), -1); |
110 | | - thrust::fill(exec_policy, ignore.begin(), ignore.end(), -1); |
111 | | - |
112 | | - // While frontier is not empty, perform single bfs through tree |
113 | | - size_t grid = raft::ceildiv(root + 1, static_cast<value_idx>(tpb)); |
114 | | - |
115 | | - value_idx n_elements_to_traverse = |
116 | | - thrust::reduce(exec_policy, frontier.data(), frontier.data() + root + 1, 0); |
117 | | - |
118 | | - while (n_elements_to_traverse > 0) { |
119 | | - // TODO: Investigate whether it would be worth performing a gather/argmatch in order |
120 | | - // to schedule only the number of threads needed. (it might not be worth it) |
121 | | - condense_hierarchy_kernel<<<grid, tpb, 0, handle.get_stream()>>>(frontier.data(), |
122 | | - next_frontier.data(), |
123 | | - ignore.data(), |
124 | | - relabel.data(), |
125 | | - children, |
126 | | - delta, |
127 | | - sizes, |
128 | | - n_leaves, |
129 | | - min_cluster_size, |
130 | | - out_parent.data(), |
131 | | - out_child.data(), |
132 | | - out_lambda.data(), |
133 | | - out_size.data()); |
134 | | - |
135 | | - thrust::copy(exec_policy, next_frontier.begin(), next_frontier.end(), frontier.begin()); |
136 | | - thrust::fill(exec_policy, next_frontier.begin(), next_frontier.end(), false); |
137 | | - |
138 | | - n_elements_to_traverse = thrust::reduce( |
139 | | - exec_policy, frontier.data(), frontier.data() + root + 1, static_cast<value_idx>(0)); |
140 | | - |
141 | | - handle.sync_stream(stream); |
| 357 | + // Dispatch to CPU or GPU implementation based on heuristic |
| 358 | + if (dispatch_to_bottom_up(num_persistent, n_leaves)) { |
| 359 | + _build_condensed_hiearchy_bottom_up(handle, |
| 360 | + children, |
| 361 | + delta, |
| 362 | + sizes, |
| 363 | + is_persistent, |
| 364 | + n_leaves, |
| 365 | + min_cluster_size, |
| 366 | + out_parent, |
| 367 | + out_child, |
| 368 | + out_lambda, |
| 369 | + out_size); |
| 370 | + } else { |
| 371 | + _build_condensed_hierarchy_top_down<value_idx, value_t, tpb>(handle, |
| 372 | + children, |
| 373 | + delta, |
| 374 | + sizes, |
| 375 | + n_leaves, |
| 376 | + min_cluster_size, |
| 377 | + out_parent, |
| 378 | + out_child, |
| 379 | + out_lambda, |
| 380 | + out_size); |
142 | 381 | } |
143 | 382 |
|
| 383 | + handle.sync_stream(stream); |
| 384 | + |
144 | 385 | condensed_tree.condense(out_parent.data(), out_child.data(), out_lambda.data(), out_size.data()); |
145 | 386 | } |
146 | 387 |
|
|
0 commit comments