Skip to content

Ingest async implementation #430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open

Conversation

ohadbitt
Copy link
Collaborator

Changed

  • [BREAKING] All synchronous queued and streaming ingestion APIs now delegate to their asynchronous counterparts
    internally and block for results.
  • [BREAKING] Streaming client no longer check for blob size and if it exists.

Added

  • The SDK now provides Reactor Core-based asynchronous APIs for all queued and streaming ingestion endpoints,
    enabling non-blocking operations.

Copy link

github-actions bot commented Apr 10, 2025

Test Results

342 tests  ±0   333 ✅ ±0   1h 8m 42s ⏱️ + 1h 5m 31s
 28 suites ±0     9 💤 ±0 
 28 files   ±0     0 ❌ ±0 

Results for commit 7a7080f. ± Comparison against base commit 0472212.

This pull request removes 24 and adds 24 tests. Note that renamed tests count towards both.
com.microsoft.azure.kusto.ingest.ManagedStreamingTest ‑ IngestFromStream_CsvStream
com.microsoft.azure.kusto.ingest.QueuedIngestClientTest ‑ ingestFromFile_FileDoesNotExist_IngestionClientException
com.microsoft.azure.kusto.ingest.QueuedIngestClientTest ‑ ingestFromResultSet_StreamIngest_IngestionClientException
com.microsoft.azure.kusto.ingest.QueuedIngestClientTest ‑ ingestFromResultSet_StreamIngest_IngestionServiceException
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlob_BlobSourceInfoWithBlankBlobPath_IllegalArgumentException
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlob_BlobSourceInfoWithNullBlobPath_IllegalArgumentException
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlob_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String, String)[1]
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlob_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String, String)[2]
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlob_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String, String)[3]
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlob_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String, String)[4]
…
com.microsoft.azure.kusto.ingest.ManagedStreamingTest ‑ ingestFromStream_CsvStream
com.microsoft.azure.kusto.ingest.QueuedIngestClientTest ‑ ingestFromFileAsync_FileDoesNotExist_IngestionClientException
com.microsoft.azure.kusto.ingest.QueuedIngestClientTest ‑ ingestFromResultSetAsync_StreamIngest_IngestionClientException
com.microsoft.azure.kusto.ingest.QueuedIngestClientTest ‑ ingestFromResultSetAsync_StreamIngest_IngestionServiceException
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlobAsync_BlobSourceInfoWithBlankBlobPath_IllegalArgumentException
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlobAsync_BlobSourceInfoWithNullBlobPath_IllegalArgumentException
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlobAsync_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String, String)[1]
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlobAsync_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String, String)[2]
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlobAsync_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String, String)[3]
com.microsoft.azure.kusto.ingest.StreamingIngestClientTest ‑ ingestFromBlobAsync_IngestionPropertiesWithIllegalDatabaseOrTableNames_IllegalArgumentException(String, String)[4]
…

♻️ This comment has been updated with latest results.

@AsafMah AsafMah requested a review from Copilot June 3, 2025 03:34
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR converts all ingestion paths to non-blocking Reactor Core-based implementations, making synchronous methods delegate to async variants and introducing breaking changes in streaming behavior.

  • Delegated all sync queued and streaming ingestion APIs to new async counterparts returning Mono.
  • Replaced Azure Blob/Queue/Table clients with their async variants across resource classes.
  • Updated retry logic and resource refresh scheduling to use Reactor’s Mono and Scheduler constructs.

Reviewed Changes

Copilot reviewed 39 out of 39 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
ingest/src/main/java/com/microsoft/azure/kusto/ingest/result/IngestionStatusResult.java Updated to return Mono<List> instead of List.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/result/IngestionStatusInTableDescription.java Switched TableClient to TableAsyncClient and renamed accessors.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/result/IngestionResult.java Modified interface to use Mono for getIngestionStatusCollection.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/QueueWithSas.java Switched to QueueAsyncClient and async getters.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/ContainerWithSas.java Switched to BlobContainerAsyncClient and async getters.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/exceptions/IngestionServiceException.java Removed outdated TODO comments.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/exceptions/IngestionClientException.java Removed outdated TODO comments.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java Refactored streaming ingestion to Mono-based async implementations.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceManager.java Introduced async scheduling and Reactor-based retry config.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceAlgorithms.java Converted retry logic to async; missing logger field.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java Refactored queued ingestion to Reactor Mono chains.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java Updated managed streaming to async with Reactor retryWhen.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientBase.java Added async method wrappers and Mono-block bridges.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClient.java Extended interface with async ingestion methods.
ingest/src/main/java/com/microsoft/azure/kusto/ingest/AzureStorageClient.java Migrated storage operations to async Blob/Queue/Table clients.
ingest/pom.xml Added Reactor and Netty dependencies for async support.
data/src/main/java/com/microsoft/azure/kusto/data/auth/CloudInfo.java Adjusted ExponentialRetry invocation to new signature.
data/src/main/java/com/microsoft/azure/kusto/data/Utils.java Renamed channel variable to decoder and updated inbound checks.
data/src/main/java/com/microsoft/azure/kusto/data/ExponentialRetry.java Extended retry() to accept a filter predicate.
CHANGELOG.md Documented breaking changes and new async APIs.
Comments suppressed due to low confidence (2)

ingest/src/main/java/com/microsoft/azure/kusto/ingest/result/IngestionStatusInTableDescription.java:70

  • [nitpick] The setter name 'setAsyncTableClient' is inconsistent with the getter 'getTableAsyncClient'. Rename the setter to 'setTableAsyncClient' for consistency.
    public void setAsyncTableClient(TableAsyncClient tableAsyncClient) {

ingest/src/main/java/com/microsoft/azure/kusto/ingest/ResourceAlgorithms.java:91

  • The logger 'log' is not defined in this class, causing a compilation error. Add a 'private static final Logger log = LoggerFactory.getLogger(ResourceAlgorithms.class);' declaration at the top.
                    log.warn(String.format("Error during attempt %d of %d for %s.", attempt, RETRY_COUNT, actionName), e);

if (ingestionStatus != null) {
return Mono.just(Collections.singletonList(ingestionStatus));
}
return Mono.empty();
Copy link
Preview

Copilot AI Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning Mono.empty() completes without emitting a value, which differs from returning an empty list. Consider returning Mono.just(Collections.emptyList()) when ingestionStatus is null.

Suggested change
return Mono.empty();
return Mono.just(Collections.emptyList());

Copilot uses AI. Check for mistakes.

@@ -129,7 +144,7 @@ public void run() {
return null;
}, "ResourceManager.refreshIngestionResource");
} catch (Exception e) {
log.error("Error in refreshIngestionResources: " + e.getMessage(), e);
log.error("Error in refreshIngestionResources: " + e.getMessage(), e); // onerrorresume
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?


while (statuses.get(0).status == OperationStatus.Pending && timeoutInSec > 0) {
Thread.sleep(1000);
timeoutInSec -= 1;
statuses = ingestionResult.getIngestionStatusCollection();
statuses = ingestionResult.getIngestionStatusCollection(); //TODO: this is async now? should we have a sync equivalent?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now have a sync right?

@@ -38,13 +38,13 @@ public static void main(String[] args) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("filePath"));
ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProperties);
}
List<IngestionStatus> statuses = ingestionResult.getIngestionStatusCollection();
List<IngestionStatus> statuses = ingestionResult.getIngestionStatusCollectionAsync().block(); // TODO: how to handle this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for all the samples you can leave them be

@@ -27,7 +27,7 @@ public static void main(String[] args) {
System.getProperty("appKey"),
System.getProperty("appTenant"));

CompletableFuture<IngestionResult> cf;
CompletableFuture<IngestionResult> cf; // TODO: adjust this to use the async API instead of using CompletableFuture or not?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add a new one, but if this example works, keep it

Copy link
Contributor

@AsafMah AsafMah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments, but apprvoed

.map(ByteArrayInputStream::new);
}

public static Mono<byte[]> toByteArray(InputStream inputStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is blocking, I missed it.
It could follow the same pattern:

public static Mono<byte[]> toByteArray(InputStream inputStream) {
    return Mono.fromCallable(() -> {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            byte[] buffer = new byte[8192];
            int bytesRead;
            while ((bytesRead = inputStream.read(buffer)) != -1) {
                byteArrayOutputStream.write(buffer, 0, bytesRead);
            }
            return byteArrayOutputStream.toByteArray();
        } finally {
            inputStream.close();
        }
    }).subscribeOn(Schedulers.boundedElastic());
}

I'll let you know once I figure out how to use Netty for this as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also the BinaryData.fromStreamAsync method in the Azure SDK.

public static Mono<byte[]> toByteArray(InputStream inputStream) {
        return BinaryData.fromStreamAsync(inputStream)
                .map(BinaryData::toBytes);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautiful! Just what I looked for in my comment earlier @ohadbitt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants