From 7299f8b06ada94802ca66d302fcd93c8c28a9c71 Mon Sep 17 00:00:00 2001 From: Robert Merget Date: Tue, 28 Apr 2026 01:18:08 +0400 Subject: [PATCH] Make partial-result handling caller-driven BulkScanWorker.handle() now accepts an optional Consumer for partial results instead of internally calling persistenceProvider.upsertPartialResult(). The framework still updates the ProgressableFuture so pollers see live state, but persistence, throttling, and any other partial-result policy now live with the caller. Worker.handleScanJob constructs the persistence callback inline. This drops the persistenceProvider dependency from BulkScanWorker, ScanConfig.createWorker, and BulkScanWorkerManager.getBulkScanWorker / handle / handleStatic. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../rub/nds/crawler/core/BulkScanWorker.java | 43 ++- .../crawler/core/BulkScanWorkerManager.java | 55 ++-- .../java/de/rub/nds/crawler/core/Worker.java | 5 +- .../de/rub/nds/crawler/data/ScanConfig.java | 17 +- .../core/BulkScanWorkerManagerTest.java | 41 +-- .../nds/crawler/core/BulkScanWorkerTest.java | 290 +++++++----------- .../dummy/DummyControllerCommandConfig.java | 6 +- .../dummy/DummyPersistenceProviderTest.java | 4 +- 8 files changed, 181 insertions(+), 280 deletions(-) diff --git a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java index e436961b..53c0d135 100644 --- a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java +++ b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java @@ -11,7 +11,6 @@ import de.rub.nds.crawler.data.ScanConfig; import de.rub.nds.crawler.data.ScanJobDescription; import de.rub.nds.crawler.data.ScanTarget; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.crawler.util.CanceallableThreadPoolExecutor; import de.rub.nds.scanner.core.execution.NamedThreadFactory; import java.util.concurrent.LinkedBlockingDeque; @@ -43,9 +42,6 @@ public abstract class BulkScanWorker { /** The scan configuration for this worker */ protected final T scanConfig; - /** The persistence provider for writing partial results */ - protected final IPersistenceProvider persistenceProvider; - /** * Calls the inner scan function and may handle cleanup. This is needed to wrap the scanner into * a future object such that we can handle timeouts properly. @@ -60,16 +56,10 @@ public abstract class BulkScanWorker { * @param scanConfig The scan configuration for this worker * @param parallelScanThreads The number of parallel scan threads to use, i.e., how many {@link * ScanTarget}s to handle in parallel. - * @param persistenceProvider The persistence provider for writing partial results */ - protected BulkScanWorker( - String bulkScanId, - T scanConfig, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + protected BulkScanWorker(String bulkScanId, T scanConfig, int parallelScanThreads) { this.bulkScanId = bulkScanId; this.scanConfig = scanConfig; - this.persistenceProvider = persistenceProvider; timeoutExecutor = new CanceallableThreadPoolExecutor( @@ -94,24 +84,33 @@ protected BulkScanWorker( *
  • Wait for the final result via {@link ProgressableFuture#get()} * * + *

    The optional {@code partialResultCallback} is invoked once for every partial result the + * scan emits. The framework does not throttle, persist, or otherwise post-process partial + * results; the caller owns that policy. Throw-safe: exceptions from the callback are logged and + * swallowed so the scan continues. + * * @param jobDescription The job description for this scan. + * @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore + * partials. * @return A ProgressableFuture representing the scan lifecycle */ - public ProgressableFuture handle(ScanJobDescription jobDescription) { + public ProgressableFuture handle( + ScanJobDescription jobDescription, Consumer partialResultCallback) { // if we initialized ourself, we also clean up ourself shouldCleanupSelf.weakCompareAndSetAcquire(false, init()); activeJobs.incrementAndGet(); ProgressableFuture progressableFuture = new ProgressableFuture<>(); - // Compose a consumer that both updates the future and persists partial results Consumer progressConsumer = partialResult -> { progressableFuture.updateResult(partialResult); - try { - persistenceProvider.upsertPartialResult(jobDescription, partialResult); - } catch (Exception e) { - LOGGER.warn("Failed to persist partial result, continuing scan", e); + if (partialResultCallback != null) { + try { + partialResultCallback.accept(partialResult); + } catch (Exception e) { + LOGGER.warn("Partial result callback threw, continuing scan", e); + } } }; @@ -132,6 +131,16 @@ public ProgressableFuture handle(ScanJobDescription jobDescription) { return progressableFuture; } + /** + * Convenience overload for callers that do not need partial results. + * + * @param jobDescription The job description for this scan. + * @return A ProgressableFuture representing the scan lifecycle + */ + public ProgressableFuture handle(ScanJobDescription jobDescription) { + return handle(jobDescription, null); + } + /** * Scans a target and returns the result as a Document. This is the core scanning functionality * that must be implemented by subclasses. diff --git a/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java b/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java index 45161938..09fcd52d 100644 --- a/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java +++ b/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java @@ -14,9 +14,9 @@ import de.rub.nds.crawler.data.BulkScanInfo; import de.rub.nds.crawler.data.ScanConfig; import de.rub.nds.crawler.data.ScanJobDescription; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.apache.commons.lang3.exception.UncheckedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,32 +58,33 @@ public static BulkScanWorkerManager getInstance() { /** * Static convenience method to handle a scan job. See also {@link #handle(ScanJobDescription, - * int, int, IPersistenceProvider)}. + * int, int, Consumer)}. * * @param scanJobDescription The scan job to handle * @param parallelConnectionThreads The number of parallel connection threads to use (used to * create worker if it does not exist) * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) - * @param persistenceProvider The persistence provider for writing partial results + * @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore + * partials. * @return A ProgressableFuture representing the scan lifecycle */ public static ProgressableFuture handleStatic( ScanJobDescription scanJobDescription, int parallelConnectionThreads, int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + Consumer partialResultCallback) { return handleStatic( scanJobDescription, parallelConnectionThreads, parallelScanThreads, 1, - persistenceProvider); + partialResultCallback); } /** * Static convenience method to handle a scan job. See also {@link #handle(ScanJobDescription, - * int, int, int, IPersistenceProvider)}. + * int, int, int, Consumer)}. * * @param scanJobDescription The scan job to handle * @param parallelConnectionThreads The number of parallel connection threads to use (used to @@ -91,7 +92,8 @@ public static ProgressableFuture handleStatic( * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) * @param parallelProbes The number of probes to run in parallel per scan target - * @param persistenceProvider The persistence provider for writing partial results + * @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore + * partials. * @return A ProgressableFuture representing the scan lifecycle */ public static ProgressableFuture handleStatic( @@ -99,14 +101,14 @@ public static ProgressableFuture handleStatic( int parallelConnectionThreads, int parallelScanThreads, int parallelProbes, - IPersistenceProvider persistenceProvider) { + Consumer partialResultCallback) { BulkScanWorkerManager manager = getInstance(); return manager.handle( scanJobDescription, parallelConnectionThreads, parallelScanThreads, parallelProbes, - persistenceProvider); + partialResultCallback); } private final Cache> bulkScanWorkers; @@ -136,7 +138,6 @@ private BulkScanWorkerManager() { * create worker if it does not exist) * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) - * @param persistenceProvider The persistence provider for writing partial results * @return A bulk scan worker for the specified bulk scan * @throws UncheckedException If a worker cannot be created */ @@ -144,15 +145,9 @@ public BulkScanWorker getBulkScanWorker( String bulkScanId, ScanConfig scanConfig, int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + int parallelScanThreads) { return getBulkScanWorker( - bulkScanId, - scanConfig, - parallelConnectionThreads, - parallelScanThreads, - 1, - persistenceProvider); + bulkScanId, scanConfig, parallelConnectionThreads, parallelScanThreads, 1); } /** @@ -166,7 +161,6 @@ public BulkScanWorker getBulkScanWorker( * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) * @param parallelProbes The number of probes to run in parallel per scan target - * @param persistenceProvider The persistence provider for writing partial results * @return A bulk scan worker for the specified bulk scan * @throws UncheckedException If a worker cannot be created */ @@ -175,8 +169,7 @@ public BulkScanWorker getBulkScanWorker( ScanConfig scanConfig, int parallelConnectionThreads, int parallelScanThreads, - int parallelProbes, - IPersistenceProvider persistenceProvider) { + int parallelProbes) { try { return bulkScanWorkers.get( bulkScanId, @@ -186,8 +179,7 @@ public BulkScanWorker getBulkScanWorker( bulkScanId, parallelConnectionThreads, parallelScanThreads, - parallelProbes, - persistenceProvider); + parallelProbes); ret.init(); return ret; }); @@ -206,20 +198,21 @@ public BulkScanWorker getBulkScanWorker( * create worker if it does not exist) * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) - * @param persistenceProvider The persistence provider for writing partial results + * @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore + * partials. * @return A ProgressableFuture representing the scan lifecycle */ public ProgressableFuture handle( ScanJobDescription scanJobDescription, int parallelConnectionThreads, int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + Consumer partialResultCallback) { return handle( scanJobDescription, parallelConnectionThreads, parallelScanThreads, 1, - persistenceProvider); + partialResultCallback); } /** @@ -232,7 +225,8 @@ public ProgressableFuture handle( * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) * @param parallelProbes The number of probes to run in parallel per scan target - * @param persistenceProvider The persistence provider for writing partial results + * @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore + * partials. * @return A ProgressableFuture representing the scan lifecycle */ public ProgressableFuture handle( @@ -240,7 +234,7 @@ public ProgressableFuture handle( int parallelConnectionThreads, int parallelScanThreads, int parallelProbes, - IPersistenceProvider persistenceProvider) { + Consumer partialResultCallback) { BulkScanInfo bulkScanInfo = scanJobDescription.getBulkScanInfo(); BulkScanWorker worker = getBulkScanWorker( @@ -248,8 +242,7 @@ public ProgressableFuture handle( bulkScanInfo.getScanConfig(), parallelConnectionThreads, parallelScanThreads, - parallelProbes, - persistenceProvider); - return worker.handle(scanJobDescription); + parallelProbes); + return worker.handle(scanJobDescription, partialResultCallback); } } diff --git a/src/main/java/de/rub/nds/crawler/core/Worker.java b/src/main/java/de/rub/nds/crawler/core/Worker.java index 69200395..8a54d794 100644 --- a/src/main/java/de/rub/nds/crawler/core/Worker.java +++ b/src/main/java/de/rub/nds/crawler/core/Worker.java @@ -16,6 +16,7 @@ import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.scanner.core.execution.NamedThreadFactory; import java.util.concurrent.*; +import java.util.function.Consumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bson.Document; @@ -94,13 +95,15 @@ private ScanResult waitForScanResult( private void handleScanJob(ScanJobDescription scanJobDescription) { LOGGER.info("Received scan job for {}", scanJobDescription.getScanTarget()); + Consumer partialResultCallback = + partial -> persistenceProvider.upsertPartialResult(scanJobDescription, partial); ProgressableFuture progressableFuture = BulkScanWorkerManager.handleStatic( scanJobDescription, parallelConnectionThreads, parallelScanThreads, parallelProbes, - persistenceProvider); + partialResultCallback); workerExecutor.submit( () -> { diff --git a/src/main/java/de/rub/nds/crawler/data/ScanConfig.java b/src/main/java/de/rub/nds/crawler/data/ScanConfig.java index 1ded351e..dec85156 100644 --- a/src/main/java/de/rub/nds/crawler/data/ScanConfig.java +++ b/src/main/java/de/rub/nds/crawler/data/ScanConfig.java @@ -9,7 +9,6 @@ package de.rub.nds.crawler.data; import de.rub.nds.crawler.core.BulkScanWorker; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.scanner.core.config.ScannerDetail; import de.rub.nds.scanner.core.probe.ProbeType; import java.io.Serializable; @@ -123,35 +122,27 @@ public void setExcludedProbes(List excludedProbes) { * @param bulkScanID The ID of the bulk scan this worker is for * @param parallelConnectionThreads The number of parallel connection threads to use * @param parallelScanThreads The number of parallel scan threads to use - * @param persistenceProvider The persistence provider for writing partial results * @return A worker for this scan configuration */ public abstract BulkScanWorker createWorker( - String bulkScanID, - int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider); + String bulkScanID, int parallelConnectionThreads, int parallelScanThreads); /** * Creates a worker for this scan configuration and configures probe-level parallelism. * Implementations can override this method to support probe-level parallelism directly. Default - * behavior delegates to {@link #createWorker(String, int, int, IPersistenceProvider)} for - * backward compatibility. + * behavior delegates to {@link #createWorker(String, int, int)}. * * @param bulkScanID The ID of the bulk scan this worker is for * @param parallelConnectionThreads The number of parallel connection threads to use * @param parallelScanThreads The number of parallel scan threads to use * @param parallelProbes The number of probes to run in parallel per scan target - * @param persistenceProvider The persistence provider for writing partial results * @return A worker for this scan configuration */ public BulkScanWorker createWorker( String bulkScanID, int parallelConnectionThreads, int parallelScanThreads, - int parallelProbes, - IPersistenceProvider persistenceProvider) { - return createWorker( - bulkScanID, parallelConnectionThreads, parallelScanThreads, persistenceProvider); + int parallelProbes) { + return createWorker(bulkScanID, parallelConnectionThreads, parallelScanThreads); } } diff --git a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerManagerTest.java b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerManagerTest.java index 7c8bcfa0..751884ba 100644 --- a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerManagerTest.java +++ b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerManagerTest.java @@ -12,8 +12,6 @@ import de.rub.nds.crawler.data.ScanConfig; import de.rub.nds.crawler.data.ScanJobDescription; -import de.rub.nds.crawler.dummy.DummyPersistenceProvider; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.scanner.core.config.ScannerDetail; import java.io.Serializable; import java.util.function.Consumer; @@ -35,12 +33,8 @@ int getCapturedParallelProbes() { @Override public BulkScanWorker createWorker( - String bulkScanID, - int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { - return new CapturingBulkScanWorker( - bulkScanID, this, parallelScanThreads, persistenceProvider); + String bulkScanID, int parallelConnectionThreads, int parallelScanThreads) { + return new CapturingBulkScanWorker(bulkScanID, this, parallelScanThreads); } @Override @@ -48,25 +42,17 @@ public BulkScanWorker createWorker( String bulkScanID, int parallelConnectionThreads, int parallelScanThreads, - int parallelProbes, - IPersistenceProvider persistenceProvider) { + int parallelProbes) { capturedParallelProbes = parallelProbes; - return createWorker( - bulkScanID, - parallelConnectionThreads, - parallelScanThreads, - persistenceProvider); + return createWorker(bulkScanID, parallelConnectionThreads, parallelScanThreads); } } static class CapturingBulkScanWorker extends BulkScanWorker { CapturingBulkScanWorker( - String bulkScanId, - CapturingScanConfig scanConfig, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { - super(bulkScanId, scanConfig, parallelScanThreads, persistenceProvider); + String bulkScanId, CapturingScanConfig scanConfig, int parallelScanThreads) { + super(bulkScanId, scanConfig, parallelScanThreads); } @Override @@ -86,13 +72,7 @@ protected void cleanupInternal() {} void getBulkScanWorkerPropagatesParallelProbes() { CapturingScanConfig scanConfig = new CapturingScanConfig(); BulkScanWorkerManager.getInstance() - .getBulkScanWorker( - "bulk-scan-" + System.nanoTime(), - scanConfig, - 5, - 2, - 9, - new DummyPersistenceProvider()); + .getBulkScanWorker("bulk-scan-" + System.nanoTime(), scanConfig, 5, 2, 9); assertEquals(9, scanConfig.getCapturedParallelProbes()); } @@ -101,12 +81,7 @@ void getBulkScanWorkerPropagatesParallelProbes() { void oldGetBulkScanWorkerSignatureDefaultsParallelProbesToOne() { CapturingScanConfig scanConfig = new CapturingScanConfig(); BulkScanWorkerManager.getInstance() - .getBulkScanWorker( - "bulk-scan-" + System.nanoTime(), - scanConfig, - 5, - 2, - new DummyPersistenceProvider()); + .getBulkScanWorker("bulk-scan-" + System.nanoTime(), scanConfig, 5, 2); assertEquals(1, scanConfig.getCapturedParallelProbes()); } diff --git a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java index 35b1dd38..1c08e1e9 100644 --- a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java +++ b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java @@ -17,10 +17,10 @@ import de.rub.nds.crawler.data.ScanResult; import de.rub.nds.crawler.data.ScanTarget; import de.rub.nds.crawler.dummy.DummyPersistenceProvider; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; import org.bson.Document; import org.junit.jupiter.api.Test; @@ -35,12 +35,8 @@ public TestScanConfig() { @Override public BulkScanWorker createWorker( - String bulkScanID, - int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { - return new TestBulkScanWorker( - bulkScanID, this, parallelScanThreads, persistenceProvider); + String bulkScanID, int parallelConnectionThreads, int parallelScanThreads) { + return new TestBulkScanWorker(bulkScanID, this, parallelScanThreads); } } @@ -49,13 +45,10 @@ static class TestBulkScanWorker extends BulkScanWorker { private boolean initCalled = false; private boolean cleanupCalled = false; private ScanJobDescription capturedJobDescription = null; + private final List emittedPartials = new CopyOnWriteArrayList<>(); - TestBulkScanWorker( - String bulkScanId, - TestScanConfig scanConfig, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { - super(bulkScanId, scanConfig, parallelScanThreads, persistenceProvider); + TestBulkScanWorker(String bulkScanId, TestScanConfig scanConfig, int parallelScanThreads) { + super(bulkScanId, scanConfig, parallelScanThreads); } @Override @@ -65,6 +58,12 @@ public Document scan( capturedJobDescription = jobDescription; ScanTarget scanTarget = jobDescription.getScanTarget(); + Document partial = new Document(); + partial.put("phase", "in-progress"); + partial.put("target", scanTarget.getIp()); + emittedPartials.add(partial); + progressConsumer.accept(partial); + Document result = new Document(); result.put("target", scanTarget.getIp()); result.put("hasJobDescription", jobDescription != null); @@ -97,27 +96,9 @@ public ScanJobDescription getCapturedJobDescription() { } } - @Test - void testGetCurrentJobDescriptionReturnsNullOutsideScanContext() { - TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); - - // getCurrentJobDescription() is protected, so we can't call it directly from test - // But we can verify through the scan() method that it returns null when not in context - assertNull( - worker.getCapturedJobDescription(), - "Job description should be null before any scan"); - } - - @Test - void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Exception { - TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); - + private static ScanJobDescription newJob(TestScanConfig config, String ip) { ScanTarget target = new ScanTarget(); - target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) + target.setIp(ip); target.setPort(443); BulkScan bulkScan = @@ -130,107 +111,138 @@ void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Excepti false, null); - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + return new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + } + + @Test + void testGetCurrentJobDescriptionReturnsNullOutsideScanContext() { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + assertNull( + worker.getCapturedJobDescription(), + "Job description should be null before any scan"); + } + + @Test + void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); // TEST-NET-1 (RFC 5737) - // Execute the scan ProgressableFuture future = worker.handle(jobDescription); Document result = future.get(); - // Verify the job description was available during scan assertTrue( result.getBoolean("hasJobDescription"), "Job description should be available in scan context"); assertEquals(jobDescription.getId().toString(), result.getString("jobId")); - // Verify the captured job description matches assertNotNull(worker.getCapturedJobDescription()); assertEquals(jobDescription.getId(), worker.getCapturedJobDescription().getId()); - assertEquals(target, worker.getCapturedJobDescription().getScanTarget()); + assertEquals( + jobDescription.getScanTarget(), worker.getCapturedJobDescription().getScanTarget()); - // Simulate the partial results persistence flow + // Simulate the persistence flow DummyPersistenceProvider persistenceProvider = new DummyPersistenceProvider(); - - // Update job status to SUCCESS (required by ScanResult constructor) jobDescription.setStatus(JobStatus.SUCCESS); - - // Create ScanResult from the scan result Document and job description ScanResult scanResult = new ScanResult(jobDescription, result); - // Verify ScanResult has the correct scanJobDescriptionId assertEquals( jobDescription.getId().toString(), scanResult.getScanJobDescriptionId(), "ScanResult should use job description UUID as scanJobDescriptionId"); - // Simulate persisting to MongoDB persistenceProvider.insertScanResult(scanResult, jobDescription); - // Simulate retrieving from MongoDB by scanJobDescriptionId ScanResult retrievedResult = persistenceProvider.getScanResultByScanJobDescriptionId( "test-db", "test-collection", jobDescription.getId().toString()); - // Verify the retrieved result matches assertNotNull( retrievedResult, "Should be able to retrieve ScanResult by job description ID"); - assertEquals( - jobDescription.getId().toString(), - retrievedResult.getScanJobDescriptionId(), - "Retrieved result should have matching scanJobDescriptionId"); - assertEquals( - scanResult.getBulkScan(), - retrievedResult.getBulkScan(), - "Retrieved result should have matching bulk scan ID"); - assertEquals( - scanResult.getScanTarget(), - retrievedResult.getScanTarget(), - "Retrieved result should have matching scan target"); - assertEquals( - scanResult.getResult(), - retrievedResult.getResult(), - "Retrieved result should have matching result document"); + assertEquals(jobDescription.getId().toString(), retrievedResult.getScanJobDescriptionId()); + assertEquals(scanResult.getBulkScan(), retrievedResult.getBulkScan()); + assertEquals(scanResult.getScanTarget(), retrievedResult.getScanTarget()); + assertEquals(scanResult.getResult(), retrievedResult.getResult()); } @Test - void testThreadLocalIsCleanedUpAfterScan() throws Exception { + void testPartialResultCallbackReceivesEmittedPartials() throws Exception { TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); - ScanTarget target = new ScanTarget(); - target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) - target.setPort(443); + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); - BulkScan bulkScan = - new BulkScan( - BulkScanWorkerTest.class, - BulkScanWorkerTest.class, - "test-db", - config, - System.currentTimeMillis(), - false, - null); + List received = new CopyOnWriteArrayList<>(); + ProgressableFuture future = worker.handle(jobDescription, received::add); + future.get(); - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + assertEquals(1, received.size(), "Caller callback should see the partial emitted by scan"); + assertEquals("in-progress", received.get(0).getString("phase")); + } - // Execute the scan + @Test + void testNullPartialResultCallbackIsAllowed() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + + // Null callback must not throw and the scan must still complete + ProgressableFuture future = worker.handle(jobDescription, null); + Document result = future.get(); + assertNotNull(result); + } + + @Test + void testPartialResultsStillUpdateProgressableFuture() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + + // No callback supplied; partials should still flow into the future for pollers ProgressableFuture future = worker.handle(jobDescription); - future.get(); // Wait for completion + future.get(); + // After completion, getCurrentResult is the final result; just assert non-null + assertNotNull(future.getCurrentResult()); + } + + @Test + void testCallbackExceptionDoesNotKillScan() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); - // After scan completes, verify we can run another scan - ScanTarget newTarget = new ScanTarget(); - newTarget.setIp("192.0.2.2"); // TEST-NET-1 (RFC 5737) - newTarget.setPort(443); + Consumer throwingCallback = + p -> { + throw new RuntimeException("intentional"); + }; + ProgressableFuture future = worker.handle(jobDescription, throwingCallback); - ScanJobDescription newJobDescription = - new ScanJobDescription(newTarget, bulkScan, JobStatus.TO_BE_EXECUTED); + Document result = future.get(); + assertNotNull(result, "Scan should complete even when callback throws"); + assertTrue(result.getBoolean("hasJobDescription")); + } + + @Test + void testThreadLocalIsCleanedUpAfterScan() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + + ProgressableFuture future = worker.handle(jobDescription); + future.get(); + + ScanJobDescription newJobDescription = newJob(config, "192.0.2.2"); ProgressableFuture future2 = worker.handle(newJobDescription); Document result2 = future2.get(); - // The second scan should have the second job description, not the first assertEquals(newJobDescription.getId().toString(), result2.getString("jobId")); assertEquals(newJobDescription.getId(), worker.getCapturedJobDescription().getId()); } @@ -238,36 +250,17 @@ void testThreadLocalIsCleanedUpAfterScan() throws Exception { @Test void testMultipleConcurrentScansHaveSeparateContexts() throws Exception { TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 2, new DummyPersistenceProvider()); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 2); - BulkScan bulkScan = - new BulkScan( - BulkScanWorkerTest.class, - BulkScanWorkerTest.class, - "test-db", - config, - System.currentTimeMillis(), - false, - null); - - // Create multiple job descriptions List jobDescriptions = new ArrayList<>(); List> futures = new ArrayList<>(); for (int i = 0; i < 5; i++) { - ScanTarget target = new ScanTarget(); - target.setIp("192.0.2." + (i + 1)); // TEST-NET-1 (RFC 5737) - target.setPort(443); - - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + ScanJobDescription jobDescription = newJob(config, "192.0.2." + (i + 1)); jobDescriptions.add(jobDescription); - futures.add(worker.handle(jobDescription)); } - // Wait for all scans to complete and verify each got the correct job description for (int i = 0; i < 5; i++) { Document result = futures.get(i).get(); assertTrue(result.getBoolean("hasJobDescription")); @@ -281,30 +274,12 @@ void testMultipleConcurrentScansHaveSeparateContexts() throws Exception { @Test void testInitializationIsCalledOnFirstHandle() throws Exception { TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); assertFalse(worker.isInitCalled(), "Init should not be called before first handle"); - ScanTarget target = new ScanTarget(); - target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) - target.setPort(443); - - BulkScan bulkScan = - new BulkScan( - BulkScanWorkerTest.class, - BulkScanWorkerTest.class, - "test-db", - config, - System.currentTimeMillis(), - false, - null); - - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - - ProgressableFuture future = worker.handle(jobDescription); - future.get(); + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + worker.handle(jobDescription).get(); assertTrue(worker.isInitCalled(), "Init should be called on first handle"); } @@ -312,28 +287,10 @@ void testInitializationIsCalledOnFirstHandle() throws Exception { @Test void testCleanupIsCalledWhenAllJobsComplete() throws Exception { TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); - - ScanTarget target = new ScanTarget(); - target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) - target.setPort(443); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); - BulkScan bulkScan = - new BulkScan( - BulkScanWorkerTest.class, - BulkScanWorkerTest.class, - "test-db", - config, - System.currentTimeMillis(), - false, - null); - - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - - ProgressableFuture future = worker.handle(jobDescription); - future.get(); + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + worker.handle(jobDescription).get(); // Give cleanup a moment to execute (it runs after job completion) Thread.sleep(100); @@ -344,41 +301,20 @@ void testCleanupIsCalledWhenAllJobsComplete() throws Exception { @Test void testManualInitPreventsSelfCleanup() throws Exception { TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); - // Call init manually worker.init(); assertTrue(worker.isInitCalled(), "Init should be called"); - ScanTarget target = new ScanTarget(); - target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) - target.setPort(443); - - BulkScan bulkScan = - new BulkScan( - BulkScanWorkerTest.class, - BulkScanWorkerTest.class, - "test-db", - config, - System.currentTimeMillis(), - false, - null); - - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - - ProgressableFuture future = worker.handle(jobDescription); - future.get(); + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + worker.handle(jobDescription).get(); - // Give cleanup a moment (if it were to execute) Thread.sleep(100); assertFalse( worker.isCleanupCalled(), "Cleanup should NOT be called when init was manual (shouldCleanupSelf = false)"); - // Cleanup should only be called when we explicitly call it worker.cleanup(); assertTrue(worker.isCleanupCalled(), "Cleanup should be called when explicitly called"); } diff --git a/src/test/java/de/rub/nds/crawler/dummy/DummyControllerCommandConfig.java b/src/test/java/de/rub/nds/crawler/dummy/DummyControllerCommandConfig.java index 0b01d9e3..0c4f28dd 100644 --- a/src/test/java/de/rub/nds/crawler/dummy/DummyControllerCommandConfig.java +++ b/src/test/java/de/rub/nds/crawler/dummy/DummyControllerCommandConfig.java @@ -11,7 +11,6 @@ import de.rub.nds.crawler.config.ControllerCommandConfig; import de.rub.nds.crawler.core.BulkScanWorker; import de.rub.nds.crawler.data.ScanConfig; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.scanner.core.config.ScannerDetail; public class DummyControllerCommandConfig extends ControllerCommandConfig { @@ -21,10 +20,7 @@ public ScanConfig getScanConfig() { return new ScanConfig(ScannerDetail.NORMAL, 1, 1, getExcludedProbes()) { @Override public BulkScanWorker createWorker( - String bulkScanID, - int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + String bulkScanID, int parallelConnectionThreads, int parallelScanThreads) { return null; } }; diff --git a/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProviderTest.java b/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProviderTest.java index c31af5dc..2dc8740f 100644 --- a/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProviderTest.java +++ b/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProviderTest.java @@ -17,7 +17,6 @@ import de.rub.nds.crawler.data.ScanJobDescription; import de.rub.nds.crawler.data.ScanResult; import de.rub.nds.crawler.data.ScanTarget; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.scanner.core.config.ScannerDetail; import org.bson.Document; import org.junit.jupiter.api.BeforeEach; @@ -39,8 +38,7 @@ void setUp() { public BulkScanWorker createWorker( String bulkScanID, int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + int parallelScanThreads) { return null; } };