|
30 | 30 | #include <cudf/table/table_view.hpp> |
31 | 31 | #include <cudf/transform.hpp> |
32 | 32 | #include <cudf/types.hpp> |
33 | | -#include <rmm/mr/device/cuda_async_memory_resource.hpp> |
34 | | -#include <rmm/mr/device/per_device_resource.hpp> |
35 | | -#include <rmm/mr/device/pool_memory_resource.hpp> |
| 33 | +#include <rmm/mr/cuda_async_memory_resource.hpp> |
| 34 | +#include <rmm/mr/per_device_resource.hpp> |
| 35 | +#include <rmm/mr/pool_memory_resource.hpp> |
36 | 36 | #include <rmm/resource_ref.hpp> |
37 | 37 |
|
38 | 38 | #include <rapidsmpf/buffer/buffer.hpp> |
@@ -476,44 +476,36 @@ static __device__ void calculate_amount(double *amount, double discount, double |
476 | 476 | rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; |
477 | 477 | co_await ctx->executor()->schedule(); |
478 | 478 | auto msg = co_await ch_in->receive(); |
479 | | - ctx->comm()->logger().print("Sortby"); |
480 | 479 | // We know we only have a single chunk from the groupby |
481 | 480 | if (msg.empty()) { |
482 | 481 | co_return; |
483 | 482 | } |
| 483 | + ctx->comm()->logger().print("Sortby"); |
484 | 484 | auto chunk = |
485 | 485 | rapidsmpf::ndsh::to_device(ctx, msg.release<rapidsmpf::streaming::TableChunk>()); |
486 | 486 | auto table = chunk.table_view(); |
487 | | -#if defined(__GNUC__) && __GNUC__ == 13 |
488 | | -#pragma GCC diagnostic push |
489 | | -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" |
490 | | -#endif |
491 | 487 | auto rounded = cudf::round( |
492 | 488 | table.column(2), |
493 | 489 | 2, |
494 | 490 | cudf::rounding_method::HALF_EVEN, |
495 | 491 | chunk.stream(), |
496 | 492 | ctx->br()->device_mr() |
497 | 493 | ); |
498 | | -#if defined(__GNUC__) && __GNUC__ == 13 |
499 | | -#pragma GCC diagnostic pop |
500 | | -#endif |
501 | | - co_await ch_out->send( |
502 | | - rapidsmpf::streaming::to_message( |
503 | | - 0, |
504 | | - std::make_unique<rapidsmpf::streaming::TableChunk>( |
505 | | - cudf::sort_by_key( |
506 | | - cudf::table_view({table.column(0), table.column(1), rounded->view()}), |
507 | | - table.select({0, 1}), |
508 | | - {cudf::order::ASCENDING, cudf::order::DESCENDING}, |
509 | | - {cudf::null_order::BEFORE, cudf::null_order::BEFORE}, |
510 | | - chunk.stream(), |
511 | | - ctx->br()->device_mr() |
512 | | - ), |
513 | | - chunk.stream() |
514 | | - ) |
| 494 | + auto result = rapidsmpf::streaming::to_message( |
| 495 | + 0, |
| 496 | + std::make_unique<rapidsmpf::streaming::TableChunk>( |
| 497 | + cudf::sort_by_key( |
| 498 | + cudf::table_view({table.column(0), table.column(1), rounded->view()}), |
| 499 | + table.select({0, 1}), |
| 500 | + {cudf::order::ASCENDING, cudf::order::DESCENDING}, |
| 501 | + {cudf::null_order::BEFORE, cudf::null_order::BEFORE}, |
| 502 | + chunk.stream(), |
| 503 | + ctx->br()->device_mr() |
| 504 | + ), |
| 505 | + chunk.stream() |
515 | 506 | ) |
516 | 507 | ); |
| 508 | + co_await ch_out->send(std::move(result)); |
517 | 509 | co_await ch_out->drain(ctx->executor()); |
518 | 510 | } |
519 | 511 |
|
|
0 commit comments