diff --git a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java index e436961b..53c0d135 100644 --- a/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java +++ b/src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java @@ -11,7 +11,6 @@ import de.rub.nds.crawler.data.ScanConfig; import de.rub.nds.crawler.data.ScanJobDescription; import de.rub.nds.crawler.data.ScanTarget; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.crawler.util.CanceallableThreadPoolExecutor; import de.rub.nds.scanner.core.execution.NamedThreadFactory; import java.util.concurrent.LinkedBlockingDeque; @@ -43,9 +42,6 @@ public abstract class BulkScanWorker { /** The scan configuration for this worker */ protected final T scanConfig; - /** The persistence provider for writing partial results */ - protected final IPersistenceProvider persistenceProvider; - /** * Calls the inner scan function and may handle cleanup. This is needed to wrap the scanner into * a future object such that we can handle timeouts properly. @@ -60,16 +56,10 @@ public abstract class BulkScanWorker { * @param scanConfig The scan configuration for this worker * @param parallelScanThreads The number of parallel scan threads to use, i.e., how many {@link * ScanTarget}s to handle in parallel. - * @param persistenceProvider The persistence provider for writing partial results */ - protected BulkScanWorker( - String bulkScanId, - T scanConfig, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + protected BulkScanWorker(String bulkScanId, T scanConfig, int parallelScanThreads) { this.bulkScanId = bulkScanId; this.scanConfig = scanConfig; - this.persistenceProvider = persistenceProvider; timeoutExecutor = new CanceallableThreadPoolExecutor( @@ -94,24 +84,33 @@ protected BulkScanWorker( *
  • Wait for the final result via {@link ProgressableFuture#get()} * * + *

    The optional {@code partialResultCallback} is invoked once for every partial result the + * scan emits. The framework does not throttle, persist, or otherwise post-process partial + * results; the caller owns that policy. Throw-safe: exceptions from the callback are logged and + * swallowed so the scan continues. + * * @param jobDescription The job description for this scan. + * @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore + * partials. * @return A ProgressableFuture representing the scan lifecycle */ - public ProgressableFuture handle(ScanJobDescription jobDescription) { + public ProgressableFuture handle( + ScanJobDescription jobDescription, Consumer partialResultCallback) { // if we initialized ourself, we also clean up ourself shouldCleanupSelf.weakCompareAndSetAcquire(false, init()); activeJobs.incrementAndGet(); ProgressableFuture progressableFuture = new ProgressableFuture<>(); - // Compose a consumer that both updates the future and persists partial results Consumer progressConsumer = partialResult -> { progressableFuture.updateResult(partialResult); - try { - persistenceProvider.upsertPartialResult(jobDescription, partialResult); - } catch (Exception e) { - LOGGER.warn("Failed to persist partial result, continuing scan", e); + if (partialResultCallback != null) { + try { + partialResultCallback.accept(partialResult); + } catch (Exception e) { + LOGGER.warn("Partial result callback threw, continuing scan", e); + } } }; @@ -132,6 +131,16 @@ public ProgressableFuture handle(ScanJobDescription jobDescription) { return progressableFuture; } + /** + * Convenience overload for callers that do not need partial results. + * + * @param jobDescription The job description for this scan. + * @return A ProgressableFuture representing the scan lifecycle + */ + public ProgressableFuture handle(ScanJobDescription jobDescription) { + return handle(jobDescription, null); + } + /** * Scans a target and returns the result as a Document. This is the core scanning functionality * that must be implemented by subclasses. diff --git a/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java b/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java index 45161938..09fcd52d 100644 --- a/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java +++ b/src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java @@ -14,9 +14,9 @@ import de.rub.nds.crawler.data.BulkScanInfo; import de.rub.nds.crawler.data.ScanConfig; import de.rub.nds.crawler.data.ScanJobDescription; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.apache.commons.lang3.exception.UncheckedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,32 +58,33 @@ public static BulkScanWorkerManager getInstance() { /** * Static convenience method to handle a scan job. See also {@link #handle(ScanJobDescription, - * int, int, IPersistenceProvider)}. + * int, int, Consumer)}. * * @param scanJobDescription The scan job to handle * @param parallelConnectionThreads The number of parallel connection threads to use (used to * create worker if it does not exist) * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) - * @param persistenceProvider The persistence provider for writing partial results + * @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore + * partials. * @return A ProgressableFuture representing the scan lifecycle */ public static ProgressableFuture handleStatic( ScanJobDescription scanJobDescription, int parallelConnectionThreads, int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + Consumer partialResultCallback) { return handleStatic( scanJobDescription, parallelConnectionThreads, parallelScanThreads, 1, - persistenceProvider); + partialResultCallback); } /** * Static convenience method to handle a scan job. See also {@link #handle(ScanJobDescription, - * int, int, int, IPersistenceProvider)}. + * int, int, int, Consumer)}. * * @param scanJobDescription The scan job to handle * @param parallelConnectionThreads The number of parallel connection threads to use (used to @@ -91,7 +92,8 @@ public static ProgressableFuture handleStatic( * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) * @param parallelProbes The number of probes to run in parallel per scan target - * @param persistenceProvider The persistence provider for writing partial results + * @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore + * partials. * @return A ProgressableFuture representing the scan lifecycle */ public static ProgressableFuture handleStatic( @@ -99,14 +101,14 @@ public static ProgressableFuture handleStatic( int parallelConnectionThreads, int parallelScanThreads, int parallelProbes, - IPersistenceProvider persistenceProvider) { + Consumer partialResultCallback) { BulkScanWorkerManager manager = getInstance(); return manager.handle( scanJobDescription, parallelConnectionThreads, parallelScanThreads, parallelProbes, - persistenceProvider); + partialResultCallback); } private final Cache> bulkScanWorkers; @@ -136,7 +138,6 @@ private BulkScanWorkerManager() { * create worker if it does not exist) * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) - * @param persistenceProvider The persistence provider for writing partial results * @return A bulk scan worker for the specified bulk scan * @throws UncheckedException If a worker cannot be created */ @@ -144,15 +145,9 @@ public BulkScanWorker getBulkScanWorker( String bulkScanId, ScanConfig scanConfig, int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + int parallelScanThreads) { return getBulkScanWorker( - bulkScanId, - scanConfig, - parallelConnectionThreads, - parallelScanThreads, - 1, - persistenceProvider); + bulkScanId, scanConfig, parallelConnectionThreads, parallelScanThreads, 1); } /** @@ -166,7 +161,6 @@ public BulkScanWorker getBulkScanWorker( * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) * @param parallelProbes The number of probes to run in parallel per scan target - * @param persistenceProvider The persistence provider for writing partial results * @return A bulk scan worker for the specified bulk scan * @throws UncheckedException If a worker cannot be created */ @@ -175,8 +169,7 @@ public BulkScanWorker getBulkScanWorker( ScanConfig scanConfig, int parallelConnectionThreads, int parallelScanThreads, - int parallelProbes, - IPersistenceProvider persistenceProvider) { + int parallelProbes) { try { return bulkScanWorkers.get( bulkScanId, @@ -186,8 +179,7 @@ public BulkScanWorker getBulkScanWorker( bulkScanId, parallelConnectionThreads, parallelScanThreads, - parallelProbes, - persistenceProvider); + parallelProbes); ret.init(); return ret; }); @@ -206,20 +198,21 @@ public BulkScanWorker getBulkScanWorker( * create worker if it does not exist) * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) - * @param persistenceProvider The persistence provider for writing partial results + * @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore + * partials. * @return A ProgressableFuture representing the scan lifecycle */ public ProgressableFuture handle( ScanJobDescription scanJobDescription, int parallelConnectionThreads, int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + Consumer partialResultCallback) { return handle( scanJobDescription, parallelConnectionThreads, parallelScanThreads, 1, - persistenceProvider); + partialResultCallback); } /** @@ -232,7 +225,8 @@ public ProgressableFuture handle( * @param parallelScanThreads The number of parallel scan threads to use (used to create worker * if it does not exist) * @param parallelProbes The number of probes to run in parallel per scan target - * @param persistenceProvider The persistence provider for writing partial results + * @param partialResultCallback Optional consumer for partial results, or {@code null} to ignore + * partials. * @return A ProgressableFuture representing the scan lifecycle */ public ProgressableFuture handle( @@ -240,7 +234,7 @@ public ProgressableFuture handle( int parallelConnectionThreads, int parallelScanThreads, int parallelProbes, - IPersistenceProvider persistenceProvider) { + Consumer partialResultCallback) { BulkScanInfo bulkScanInfo = scanJobDescription.getBulkScanInfo(); BulkScanWorker worker = getBulkScanWorker( @@ -248,8 +242,7 @@ public ProgressableFuture handle( bulkScanInfo.getScanConfig(), parallelConnectionThreads, parallelScanThreads, - parallelProbes, - persistenceProvider); - return worker.handle(scanJobDescription); + parallelProbes); + return worker.handle(scanJobDescription, partialResultCallback); } } diff --git a/src/main/java/de/rub/nds/crawler/core/Worker.java b/src/main/java/de/rub/nds/crawler/core/Worker.java index 69200395..8a54d794 100644 --- a/src/main/java/de/rub/nds/crawler/core/Worker.java +++ b/src/main/java/de/rub/nds/crawler/core/Worker.java @@ -16,6 +16,7 @@ import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.scanner.core.execution.NamedThreadFactory; import java.util.concurrent.*; +import java.util.function.Consumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bson.Document; @@ -94,13 +95,15 @@ private ScanResult waitForScanResult( private void handleScanJob(ScanJobDescription scanJobDescription) { LOGGER.info("Received scan job for {}", scanJobDescription.getScanTarget()); + Consumer partialResultCallback = + partial -> persistenceProvider.upsertPartialResult(scanJobDescription, partial); ProgressableFuture progressableFuture = BulkScanWorkerManager.handleStatic( scanJobDescription, parallelConnectionThreads, parallelScanThreads, parallelProbes, - persistenceProvider); + partialResultCallback); workerExecutor.submit( () -> { diff --git a/src/main/java/de/rub/nds/crawler/data/ScanConfig.java b/src/main/java/de/rub/nds/crawler/data/ScanConfig.java index 1ded351e..dec85156 100644 --- a/src/main/java/de/rub/nds/crawler/data/ScanConfig.java +++ b/src/main/java/de/rub/nds/crawler/data/ScanConfig.java @@ -9,7 +9,6 @@ package de.rub.nds.crawler.data; import de.rub.nds.crawler.core.BulkScanWorker; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.scanner.core.config.ScannerDetail; import de.rub.nds.scanner.core.probe.ProbeType; import java.io.Serializable; @@ -123,35 +122,27 @@ public void setExcludedProbes(List excludedProbes) { * @param bulkScanID The ID of the bulk scan this worker is for * @param parallelConnectionThreads The number of parallel connection threads to use * @param parallelScanThreads The number of parallel scan threads to use - * @param persistenceProvider The persistence provider for writing partial results * @return A worker for this scan configuration */ public abstract BulkScanWorker createWorker( - String bulkScanID, - int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider); + String bulkScanID, int parallelConnectionThreads, int parallelScanThreads); /** * Creates a worker for this scan configuration and configures probe-level parallelism. * Implementations can override this method to support probe-level parallelism directly. Default - * behavior delegates to {@link #createWorker(String, int, int, IPersistenceProvider)} for - * backward compatibility. + * behavior delegates to {@link #createWorker(String, int, int)}. * * @param bulkScanID The ID of the bulk scan this worker is for * @param parallelConnectionThreads The number of parallel connection threads to use * @param parallelScanThreads The number of parallel scan threads to use * @param parallelProbes The number of probes to run in parallel per scan target - * @param persistenceProvider The persistence provider for writing partial results * @return A worker for this scan configuration */ public BulkScanWorker createWorker( String bulkScanID, int parallelConnectionThreads, int parallelScanThreads, - int parallelProbes, - IPersistenceProvider persistenceProvider) { - return createWorker( - bulkScanID, parallelConnectionThreads, parallelScanThreads, persistenceProvider); + int parallelProbes) { + return createWorker(bulkScanID, parallelConnectionThreads, parallelScanThreads); } } diff --git a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerManagerTest.java b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerManagerTest.java index 7c8bcfa0..751884ba 100644 --- a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerManagerTest.java +++ b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerManagerTest.java @@ -12,8 +12,6 @@ import de.rub.nds.crawler.data.ScanConfig; import de.rub.nds.crawler.data.ScanJobDescription; -import de.rub.nds.crawler.dummy.DummyPersistenceProvider; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.scanner.core.config.ScannerDetail; import java.io.Serializable; import java.util.function.Consumer; @@ -35,12 +33,8 @@ int getCapturedParallelProbes() { @Override public BulkScanWorker createWorker( - String bulkScanID, - int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { - return new CapturingBulkScanWorker( - bulkScanID, this, parallelScanThreads, persistenceProvider); + String bulkScanID, int parallelConnectionThreads, int parallelScanThreads) { + return new CapturingBulkScanWorker(bulkScanID, this, parallelScanThreads); } @Override @@ -48,25 +42,17 @@ public BulkScanWorker createWorker( String bulkScanID, int parallelConnectionThreads, int parallelScanThreads, - int parallelProbes, - IPersistenceProvider persistenceProvider) { + int parallelProbes) { capturedParallelProbes = parallelProbes; - return createWorker( - bulkScanID, - parallelConnectionThreads, - parallelScanThreads, - persistenceProvider); + return createWorker(bulkScanID, parallelConnectionThreads, parallelScanThreads); } } static class CapturingBulkScanWorker extends BulkScanWorker { CapturingBulkScanWorker( - String bulkScanId, - CapturingScanConfig scanConfig, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { - super(bulkScanId, scanConfig, parallelScanThreads, persistenceProvider); + String bulkScanId, CapturingScanConfig scanConfig, int parallelScanThreads) { + super(bulkScanId, scanConfig, parallelScanThreads); } @Override @@ -86,13 +72,7 @@ protected void cleanupInternal() {} void getBulkScanWorkerPropagatesParallelProbes() { CapturingScanConfig scanConfig = new CapturingScanConfig(); BulkScanWorkerManager.getInstance() - .getBulkScanWorker( - "bulk-scan-" + System.nanoTime(), - scanConfig, - 5, - 2, - 9, - new DummyPersistenceProvider()); + .getBulkScanWorker("bulk-scan-" + System.nanoTime(), scanConfig, 5, 2, 9); assertEquals(9, scanConfig.getCapturedParallelProbes()); } @@ -101,12 +81,7 @@ void getBulkScanWorkerPropagatesParallelProbes() { void oldGetBulkScanWorkerSignatureDefaultsParallelProbesToOne() { CapturingScanConfig scanConfig = new CapturingScanConfig(); BulkScanWorkerManager.getInstance() - .getBulkScanWorker( - "bulk-scan-" + System.nanoTime(), - scanConfig, - 5, - 2, - new DummyPersistenceProvider()); + .getBulkScanWorker("bulk-scan-" + System.nanoTime(), scanConfig, 5, 2); assertEquals(1, scanConfig.getCapturedParallelProbes()); } diff --git a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java index 35b1dd38..1c08e1e9 100644 --- a/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java +++ b/src/test/java/de/rub/nds/crawler/core/BulkScanWorkerTest.java @@ -17,10 +17,10 @@ import de.rub.nds.crawler.data.ScanResult; import de.rub.nds.crawler.data.ScanTarget; import de.rub.nds.crawler.dummy.DummyPersistenceProvider; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; import org.bson.Document; import org.junit.jupiter.api.Test; @@ -35,12 +35,8 @@ public TestScanConfig() { @Override public BulkScanWorker createWorker( - String bulkScanID, - int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { - return new TestBulkScanWorker( - bulkScanID, this, parallelScanThreads, persistenceProvider); + String bulkScanID, int parallelConnectionThreads, int parallelScanThreads) { + return new TestBulkScanWorker(bulkScanID, this, parallelScanThreads); } } @@ -49,13 +45,10 @@ static class TestBulkScanWorker extends BulkScanWorker { private boolean initCalled = false; private boolean cleanupCalled = false; private ScanJobDescription capturedJobDescription = null; + private final List emittedPartials = new CopyOnWriteArrayList<>(); - TestBulkScanWorker( - String bulkScanId, - TestScanConfig scanConfig, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { - super(bulkScanId, scanConfig, parallelScanThreads, persistenceProvider); + TestBulkScanWorker(String bulkScanId, TestScanConfig scanConfig, int parallelScanThreads) { + super(bulkScanId, scanConfig, parallelScanThreads); } @Override @@ -65,6 +58,12 @@ public Document scan( capturedJobDescription = jobDescription; ScanTarget scanTarget = jobDescription.getScanTarget(); + Document partial = new Document(); + partial.put("phase", "in-progress"); + partial.put("target", scanTarget.getIp()); + emittedPartials.add(partial); + progressConsumer.accept(partial); + Document result = new Document(); result.put("target", scanTarget.getIp()); result.put("hasJobDescription", jobDescription != null); @@ -97,27 +96,9 @@ public ScanJobDescription getCapturedJobDescription() { } } - @Test - void testGetCurrentJobDescriptionReturnsNullOutsideScanContext() { - TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); - - // getCurrentJobDescription() is protected, so we can't call it directly from test - // But we can verify through the scan() method that it returns null when not in context - assertNull( - worker.getCapturedJobDescription(), - "Job description should be null before any scan"); - } - - @Test - void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Exception { - TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); - + private static ScanJobDescription newJob(TestScanConfig config, String ip) { ScanTarget target = new ScanTarget(); - target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) + target.setIp(ip); target.setPort(443); BulkScan bulkScan = @@ -130,107 +111,138 @@ void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Excepti false, null); - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + return new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + } + + @Test + void testGetCurrentJobDescriptionReturnsNullOutsideScanContext() { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + assertNull( + worker.getCapturedJobDescription(), + "Job description should be null before any scan"); + } + + @Test + void testGetCurrentJobDescriptionReturnsCorrectJobInScanContext() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); // TEST-NET-1 (RFC 5737) - // Execute the scan ProgressableFuture future = worker.handle(jobDescription); Document result = future.get(); - // Verify the job description was available during scan assertTrue( result.getBoolean("hasJobDescription"), "Job description should be available in scan context"); assertEquals(jobDescription.getId().toString(), result.getString("jobId")); - // Verify the captured job description matches assertNotNull(worker.getCapturedJobDescription()); assertEquals(jobDescription.getId(), worker.getCapturedJobDescription().getId()); - assertEquals(target, worker.getCapturedJobDescription().getScanTarget()); + assertEquals( + jobDescription.getScanTarget(), worker.getCapturedJobDescription().getScanTarget()); - // Simulate the partial results persistence flow + // Simulate the persistence flow DummyPersistenceProvider persistenceProvider = new DummyPersistenceProvider(); - - // Update job status to SUCCESS (required by ScanResult constructor) jobDescription.setStatus(JobStatus.SUCCESS); - - // Create ScanResult from the scan result Document and job description ScanResult scanResult = new ScanResult(jobDescription, result); - // Verify ScanResult has the correct scanJobDescriptionId assertEquals( jobDescription.getId().toString(), scanResult.getScanJobDescriptionId(), "ScanResult should use job description UUID as scanJobDescriptionId"); - // Simulate persisting to MongoDB persistenceProvider.insertScanResult(scanResult, jobDescription); - // Simulate retrieving from MongoDB by scanJobDescriptionId ScanResult retrievedResult = persistenceProvider.getScanResultByScanJobDescriptionId( "test-db", "test-collection", jobDescription.getId().toString()); - // Verify the retrieved result matches assertNotNull( retrievedResult, "Should be able to retrieve ScanResult by job description ID"); - assertEquals( - jobDescription.getId().toString(), - retrievedResult.getScanJobDescriptionId(), - "Retrieved result should have matching scanJobDescriptionId"); - assertEquals( - scanResult.getBulkScan(), - retrievedResult.getBulkScan(), - "Retrieved result should have matching bulk scan ID"); - assertEquals( - scanResult.getScanTarget(), - retrievedResult.getScanTarget(), - "Retrieved result should have matching scan target"); - assertEquals( - scanResult.getResult(), - retrievedResult.getResult(), - "Retrieved result should have matching result document"); + assertEquals(jobDescription.getId().toString(), retrievedResult.getScanJobDescriptionId()); + assertEquals(scanResult.getBulkScan(), retrievedResult.getBulkScan()); + assertEquals(scanResult.getScanTarget(), retrievedResult.getScanTarget()); + assertEquals(scanResult.getResult(), retrievedResult.getResult()); } @Test - void testThreadLocalIsCleanedUpAfterScan() throws Exception { + void testPartialResultCallbackReceivesEmittedPartials() throws Exception { TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); - ScanTarget target = new ScanTarget(); - target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) - target.setPort(443); + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); - BulkScan bulkScan = - new BulkScan( - BulkScanWorkerTest.class, - BulkScanWorkerTest.class, - "test-db", - config, - System.currentTimeMillis(), - false, - null); + List received = new CopyOnWriteArrayList<>(); + ProgressableFuture future = worker.handle(jobDescription, received::add); + future.get(); - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + assertEquals(1, received.size(), "Caller callback should see the partial emitted by scan"); + assertEquals("in-progress", received.get(0).getString("phase")); + } - // Execute the scan + @Test + void testNullPartialResultCallbackIsAllowed() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + + // Null callback must not throw and the scan must still complete + ProgressableFuture future = worker.handle(jobDescription, null); + Document result = future.get(); + assertNotNull(result); + } + + @Test + void testPartialResultsStillUpdateProgressableFuture() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + + // No callback supplied; partials should still flow into the future for pollers ProgressableFuture future = worker.handle(jobDescription); - future.get(); // Wait for completion + future.get(); + // After completion, getCurrentResult is the final result; just assert non-null + assertNotNull(future.getCurrentResult()); + } + + @Test + void testCallbackExceptionDoesNotKillScan() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); - // After scan completes, verify we can run another scan - ScanTarget newTarget = new ScanTarget(); - newTarget.setIp("192.0.2.2"); // TEST-NET-1 (RFC 5737) - newTarget.setPort(443); + Consumer throwingCallback = + p -> { + throw new RuntimeException("intentional"); + }; + ProgressableFuture future = worker.handle(jobDescription, throwingCallback); - ScanJobDescription newJobDescription = - new ScanJobDescription(newTarget, bulkScan, JobStatus.TO_BE_EXECUTED); + Document result = future.get(); + assertNotNull(result, "Scan should complete even when callback throws"); + assertTrue(result.getBoolean("hasJobDescription")); + } + + @Test + void testThreadLocalIsCleanedUpAfterScan() throws Exception { + TestScanConfig config = new TestScanConfig(); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); + + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + + ProgressableFuture future = worker.handle(jobDescription); + future.get(); + + ScanJobDescription newJobDescription = newJob(config, "192.0.2.2"); ProgressableFuture future2 = worker.handle(newJobDescription); Document result2 = future2.get(); - // The second scan should have the second job description, not the first assertEquals(newJobDescription.getId().toString(), result2.getString("jobId")); assertEquals(newJobDescription.getId(), worker.getCapturedJobDescription().getId()); } @@ -238,36 +250,17 @@ void testThreadLocalIsCleanedUpAfterScan() throws Exception { @Test void testMultipleConcurrentScansHaveSeparateContexts() throws Exception { TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 2, new DummyPersistenceProvider()); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 2); - BulkScan bulkScan = - new BulkScan( - BulkScanWorkerTest.class, - BulkScanWorkerTest.class, - "test-db", - config, - System.currentTimeMillis(), - false, - null); - - // Create multiple job descriptions List jobDescriptions = new ArrayList<>(); List> futures = new ArrayList<>(); for (int i = 0; i < 5; i++) { - ScanTarget target = new ScanTarget(); - target.setIp("192.0.2." + (i + 1)); // TEST-NET-1 (RFC 5737) - target.setPort(443); - - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); + ScanJobDescription jobDescription = newJob(config, "192.0.2." + (i + 1)); jobDescriptions.add(jobDescription); - futures.add(worker.handle(jobDescription)); } - // Wait for all scans to complete and verify each got the correct job description for (int i = 0; i < 5; i++) { Document result = futures.get(i).get(); assertTrue(result.getBoolean("hasJobDescription")); @@ -281,30 +274,12 @@ void testMultipleConcurrentScansHaveSeparateContexts() throws Exception { @Test void testInitializationIsCalledOnFirstHandle() throws Exception { TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); assertFalse(worker.isInitCalled(), "Init should not be called before first handle"); - ScanTarget target = new ScanTarget(); - target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) - target.setPort(443); - - BulkScan bulkScan = - new BulkScan( - BulkScanWorkerTest.class, - BulkScanWorkerTest.class, - "test-db", - config, - System.currentTimeMillis(), - false, - null); - - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - - ProgressableFuture future = worker.handle(jobDescription); - future.get(); + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + worker.handle(jobDescription).get(); assertTrue(worker.isInitCalled(), "Init should be called on first handle"); } @@ -312,28 +287,10 @@ void testInitializationIsCalledOnFirstHandle() throws Exception { @Test void testCleanupIsCalledWhenAllJobsComplete() throws Exception { TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); - - ScanTarget target = new ScanTarget(); - target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) - target.setPort(443); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); - BulkScan bulkScan = - new BulkScan( - BulkScanWorkerTest.class, - BulkScanWorkerTest.class, - "test-db", - config, - System.currentTimeMillis(), - false, - null); - - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - - ProgressableFuture future = worker.handle(jobDescription); - future.get(); + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + worker.handle(jobDescription).get(); // Give cleanup a moment to execute (it runs after job completion) Thread.sleep(100); @@ -344,41 +301,20 @@ void testCleanupIsCalledWhenAllJobsComplete() throws Exception { @Test void testManualInitPreventsSelfCleanup() throws Exception { TestScanConfig config = new TestScanConfig(); - TestBulkScanWorker worker = - new TestBulkScanWorker("test-bulk-id", config, 1, new DummyPersistenceProvider()); + TestBulkScanWorker worker = new TestBulkScanWorker("test-bulk-id", config, 1); - // Call init manually worker.init(); assertTrue(worker.isInitCalled(), "Init should be called"); - ScanTarget target = new ScanTarget(); - target.setIp("192.0.2.1"); // TEST-NET-1 (RFC 5737) - target.setPort(443); - - BulkScan bulkScan = - new BulkScan( - BulkScanWorkerTest.class, - BulkScanWorkerTest.class, - "test-db", - config, - System.currentTimeMillis(), - false, - null); - - ScanJobDescription jobDescription = - new ScanJobDescription(target, bulkScan, JobStatus.TO_BE_EXECUTED); - - ProgressableFuture future = worker.handle(jobDescription); - future.get(); + ScanJobDescription jobDescription = newJob(config, "192.0.2.1"); + worker.handle(jobDescription).get(); - // Give cleanup a moment (if it were to execute) Thread.sleep(100); assertFalse( worker.isCleanupCalled(), "Cleanup should NOT be called when init was manual (shouldCleanupSelf = false)"); - // Cleanup should only be called when we explicitly call it worker.cleanup(); assertTrue(worker.isCleanupCalled(), "Cleanup should be called when explicitly called"); } diff --git a/src/test/java/de/rub/nds/crawler/dummy/DummyControllerCommandConfig.java b/src/test/java/de/rub/nds/crawler/dummy/DummyControllerCommandConfig.java index 0b01d9e3..0c4f28dd 100644 --- a/src/test/java/de/rub/nds/crawler/dummy/DummyControllerCommandConfig.java +++ b/src/test/java/de/rub/nds/crawler/dummy/DummyControllerCommandConfig.java @@ -11,7 +11,6 @@ import de.rub.nds.crawler.config.ControllerCommandConfig; import de.rub.nds.crawler.core.BulkScanWorker; import de.rub.nds.crawler.data.ScanConfig; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.scanner.core.config.ScannerDetail; public class DummyControllerCommandConfig extends ControllerCommandConfig { @@ -21,10 +20,7 @@ public ScanConfig getScanConfig() { return new ScanConfig(ScannerDetail.NORMAL, 1, 1, getExcludedProbes()) { @Override public BulkScanWorker createWorker( - String bulkScanID, - int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + String bulkScanID, int parallelConnectionThreads, int parallelScanThreads) { return null; } }; diff --git a/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProviderTest.java b/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProviderTest.java index c31af5dc..2dc8740f 100644 --- a/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProviderTest.java +++ b/src/test/java/de/rub/nds/crawler/dummy/DummyPersistenceProviderTest.java @@ -17,7 +17,6 @@ import de.rub.nds.crawler.data.ScanJobDescription; import de.rub.nds.crawler.data.ScanResult; import de.rub.nds.crawler.data.ScanTarget; -import de.rub.nds.crawler.persistence.IPersistenceProvider; import de.rub.nds.scanner.core.config.ScannerDetail; import org.bson.Document; import org.junit.jupiter.api.BeforeEach; @@ -39,8 +38,7 @@ void setUp() { public BulkScanWorker createWorker( String bulkScanID, int parallelConnectionThreads, - int parallelScanThreads, - IPersistenceProvider persistenceProvider) { + int parallelScanThreads) { return null; } };