Skip to content

Conversation

@zhuanshenbsj1
Copy link
Contributor

@zhuanshenbsj1 zhuanshenbsj1 commented Apr 28, 2025

Purpose

  1. Monitor the input/output (IO) of Flink during read and write operations.

Tests

API and Format

Documentation

@wwj6591812
Copy link
Contributor

1、You add lots of metric, I suggest you sent the discuss email to [email protected] first.
2、modify the doc.

@zhuanshenbsj1
Copy link
Contributor Author

1、You add lots of metric, I suggest you sent the discuss email to [email protected] first. 2、modify the doc.

Roger that.


<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this dep

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this dep

I moved some metrics classes from paimon-core to paimon-common project, and the class DescriptiveStatisticsHistogram depends on this.

*
* <p>It allows users to monitor and track file I/O operations (e.g., read, write, delete, rename).
*/
public class FileIOWrapper implements FileIO {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetricsFileIO?

public static final String GROUP_NAME = "source";
private final MetricGroup metricGroup;

private final AtomicLong readBytes = new AtomicLong(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only keep:

  String READ_BYTES = "read.bytes";
  String READ_OPERATIONS = "read.operations";
  String WRITE_BYTES = "write.bytes";
  String WRITE_OPERATIONS = "write.operations";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only keep:

  String READ_BYTES = "read.bytes";
  String READ_OPERATIONS = "read.operations";
  String WRITE_BYTES = "write.bytes";
  String WRITE_OPERATIONS = "write.operations";

Other metrics have been removed.

TableSchema tableSchema,
CatalogEnvironment catalogEnvironment) {
this.fileIO = fileIO;
this.fileIO = new FileIOWrapper(fileIO);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to introduce an option to enable MetricsFileIO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

We may need to introduce an option to enable MetricsFileIO.

@JingsongLi
Copy link
Contributor

I'v done refactor for making with MetricRegistry public in #5578

@zhuanshenbsj1
Copy link
Contributor Author

I'v done refactor for making with MetricRegistry public in #5578

rebase on this && add uts & all checks passed, cc~ @JingsongLi

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please only use TableWrite.withMetricRegistry and TableRead.withMetricRegistry, do not introduce any API for metrics.

And please make sure only io metric enabled, the file io can be MetricsFileIO.

@Override
public SeekableInputStream newInputStream(Path path) throws IOException {
SeekableInputStream inputStream = fileIO.newInputStream(path);
return new SeekableInputStreamIOWrapper(inputStream, this.inputMetrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider SeekableInputStream can be VectoredReadable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider SeekableInputStream can be VectoredReadable.

done.

import java.util.concurrent.atomic.AtomicLong;

/** Collects and monitors outp stream metrics. */
public class OutputMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please merge these tow classes into one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please merge these tow classes into one.

done.


/** Collects and monitors outp stream metrics. */
public class OutputMetrics {
public static final String GROUP_NAME = "sink";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group name should be io.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group name should be io.

done.

@zhuanshenbsj1
Copy link
Contributor Author

Please only use TableWrite.withMetricRegistry and TableRead.withMetricRegistry, do not introduce any API for metrics.

And please make sure only io metric enabled, the file io can be MetricsFileIO.

If we only use TableRead.withMetricRegistry, it would require too may modifications. However, in reality, we're always using and modifying the same AbstractFileStoreTable fileio reference. I've attempted to register metrics directly to the AbstractFileStoreTable within the ReadBuilder. This way, all subsequent operations like new scan, new streamscan, or new read can directly reuse the metrics collected by the AbstractFileStoreTable.

@zhuanshenbsj1 zhuanshenbsj1 force-pushed the metrics branch 4 times, most recently from f2e8fe8 to 99e93e4 Compare May 15, 2025 09:32
@zhuanshenbsj1 zhuanshenbsj1 requested a review from JingsongLi May 16, 2025 01:41
@zhuanshenbsj1 zhuanshenbsj1 force-pushed the metrics branch 4 times, most recently from 20eabb5 to 2b3fd4c Compare May 21, 2025 02:56
@zhuanshenbsj1
Copy link
Contributor Author

@JingsongLi Could you please take a look at this PR again for any remaining issues, thank ~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants