Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,20 @@ dependencies {
implementation "org.testeditor.web:org.testeditor.web.dropwizard:$versions.testEditorDropwizard"
implementation "commons-io:commons-io:2.6"
implementation "org.apache.commons:commons-text:1.4"
implementation "org.testeditor.web:org.testeditor.web.dropwizard:$versions.testEditorDropwizard"
implementation 'org.eclipse.jgit:org.eclipse.jgit:5.1.2.201810061102-r'
implementation "org.testeditor.web:org.testeditor.web.dropwizard:$versions.testEditorDropwizard"
implementation "org.eclipse.jgit:org.eclipse.jgit:5.1.2.201810061102-r"
implementation "org.glassfish.jersey.ext.rx:jersey-rx-client-java8:2.25.1"
implementation "io.dropwizard:dropwizard-client:$versions.dropwizard"

testImplementation "junit:junit:4.12"
testImplementation "io.dropwizard:dropwizard-testing:$versions.dropwizard"
testImplementation "org.assertj:assertj-core:3.12.0"
testImplementation "org.mockito:mockito-core:3.1.0"
testImplementation "org.eclipse.xtext:org.eclipse.xtext.testing:$versions.xtext"
testImplementation "org.testeditor.web:org.testeditor.web.dropwizard.testing:$versions.testEditorDropwizard"
testImplementation 'org.eclipse.jgit:org.eclipse.jgit.junit:5.1.2.201810061102-r'
testImplementation "org.testeditor.web:org.testeditor.web.dropwizard.testing:$versions.testEditorDropwizard"
testImplementation 'org.eclipse.jgit:org.eclipse.jgit.junit:5.1.2.201810061102-r'

annotationProcessor 'com.github.moditect.deptective:deptective-javac-plugin:master-SNAPSHOT'
annotationProcessor 'com.github.moditect.deptective:deptective-javac-plugin:master-SNAPSHOT'
}

mainClassName = 'org.testeditor.web.backend.testexecution.dropwizard.TestExecutionApplication'
Expand Down Expand Up @@ -142,4 +144,4 @@ compileJava {
'reporting_policy=WARN ' +
'visualize=true ' +
"config_file=${projectDir}/src/main/resources/META-INF/deptective.json"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package org.testeditor.web.backend.testexecution.distributed.common

import com.auth0.jwt.JWT
import com.auth0.jwt.algorithms.Algorithm
import java.net.URI
import java.util.concurrent.CompletionStage
import java.util.concurrent.ExecutorService
import java.util.concurrent.ForkJoinPool
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Provider
import javax.inject.Singleton
import javax.ws.rs.client.Entity
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response
import javax.ws.rs.core.StreamingOutput
import org.glassfish.jersey.client.rx.RxClient
import org.glassfish.jersey.client.rx.java8.RxCompletionStageInvoker
import org.slf4j.LoggerFactory

import static javax.ws.rs.client.Entity.json
import static javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE
import static org.glassfish.jersey.client.ClientProperties.READ_TIMEOUT
import static org.glassfish.jersey.client.ClientProperties.CONNECT_TIMEOUT
import static org.glassfish.jersey.client.HttpUrlConnectorProvider.USE_FIXED_LENGTH_STREAMING

/**
* Abstraction around an HTTP client for easy mocking
*/
interface RestClient {

public static val int READ_TIMEOUT_MILLIS = 10000

def <T> CompletionStage<Response> postAsync(URI uri, T body)

def CompletionStage<Response> postAsync(URI uri, StreamingOutput body)

def <T> CompletionStage<Response> putAsync(URI uri, T body)

def <T> CompletionStage<Response> getAsync(URI uri)

def <T> CompletionStage<Response> getAsync(URI uri, MediaType accept)

def <T> CompletionStage<Response> deleteAsync(URI uri)

def <T> Response post(URI uri, T body)

def <T> Response put(URI uri, T body)

def <T> Response get(URI uri)

def <T> Response get(URI uri, MediaType accept)

def <T> Response delete(URI uri)

}

abstract class AbstractRestClient implements RestClient {

override <T> post(URI uri, T body) {
return uri.postAsync(body).toCompletableFuture.join
}

override <T> put(URI uri, T body) {
return uri.putAsync(body).toCompletableFuture.join
}

override <T> get(URI uri) {
return uri.getAsync.toCompletableFuture.join
}

override <T> delete(URI uri) {
return uri.deleteAsync.toCompletableFuture.join
}

override <T> get(URI uri, MediaType accept) {
return uri.getAsync(accept).toCompletableFuture.join
}

}

@Singleton
class JerseyBasedRestClient extends AbstractRestClient {

static val logger = LoggerFactory.getLogger(JerseyBasedRestClient)

val Provider<RxClient<RxCompletionStageInvoker>> httpClientProvider
val ExecutorService executor

@Inject
new(Provider<RxClient<RxCompletionStageInvoker>> httpClientProvider, @Named('httpClientExecutor') ForkJoinPool executor) {
this.httpClientProvider = httpClientProvider
this.executor = executor
}

override <T> CompletionStage<Response> postAsync(URI uri, T body) {
val entity = json(body)
logger.info('''sending POST request to «uri.toString» with body:\n«entity.toString»''')
return uri.invoker.post(entity)
}

override CompletionStage<Response> postAsync(URI uri, StreamingOutput body) {
val entity = Entity.entity(body, MediaType.APPLICATION_OCTET_STREAM_TYPE)
logger.info('''sending POST request to «uri.toString» with streaming data''')
return uri.streamingInvoker.post(entity)
}

override <T> CompletionStage<Response> putAsync(URI uri, T body) {
val entity = json(body)
logger.info('''sending PUT request to «uri.toString» with body:\n«entity.toString»''')
return uri.invoker.put(entity)
}

override <T> CompletionStage<Response> getAsync(URI uri) {
return uri.invoker.get
}

override <T> CompletionStage<Response> getAsync(URI uri, MediaType accept) {
return uri.getInvoker(accept).get
}

override <T> deleteAsync(URI uri) {
return uri.invoker.delete
}

private def getInvoker(URI uri) {
return uri.getInvoker(APPLICATION_JSON_TYPE)
}

private def getInvoker(URI uri, MediaType accept) {
return httpClientProvider.get.property(READ_TIMEOUT, READ_TIMEOUT_MILLIS).target(uri).request(accept).header(
'Authorization', '''Bearer «dummyToken»''').rx(executor)
}

private def getStreamingInvoker(URI uri) {
return httpClientProvider.get.property(USE_FIXED_LENGTH_STREAMING, true).property(READ_TIMEOUT, 0).property(CONNECT_TIMEOUT, 0).target(uri).
request.header('Authorization', '''Bearer «dummyToken»''').rx(executor)
}

val static String dummyToken = createToken('test.execution', 'Test Execution User', 'testeditor.eng@gmail.com')

static def String createToken(String id, String name, String eMail) {
val builder = JWT.create => [
withClaim('id', id)
withClaim('name', name)
withClaim('email', eMail)
]
return builder.sign(Algorithm.HMAC256("secret"))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.testeditor.web.backend.testexecution.distributed.common

import java.util.Set
import org.testeditor.web.backend.testexecution.common.TestExecutionKey

interface TestExecutionManager extends StatusAwareTestJobStore {

def void cancelJob(TestExecutionKey key)

def TestExecutionKey addJob(Iterable<String> testFiles, Set<String> requiredCapabilities)

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ interface WorkerInfo {

def Set<String> getProvidedCapabilities()

static val NONE = new WorkerInfo {
val uri = new URI('')
val providedCapabilities = <String>emptySet

override getUri() { uri }

override getProvidedCapabilities() { providedCapabilities }

}

}

interface Worker extends RunningTest, WorkerInfo, TestJobStore {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.testeditor.web.backend.testexecution.distributed.common

interface WorkerAPI<R> {

def R isRegistered()

def R executeTestJob(TestJob job)

def R cancelTestJob()

def R getTestJobState(Boolean wait)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.testeditor.web.backend.testexecution.distributed.common

import org.testeditor.web.backend.testexecution.common.TestExecutionKey
import org.testeditor.web.backend.testexecution.common.TestStatus

interface WorkerManagerAPI<R,S> {

def R registerWorker(Worker worker)

def R unregisterWorker(String id)

def R upload(String workerId, TestExecutionKey jobId, String fileName, S content)

def R updateStatus(String workerId, TestExecutionKey jobId, TestStatus status)

}
Original file line number Diff line number Diff line change
@@ -1,50 +1,40 @@
package org.testeditor.web.backend.testexecution.distributed.manager

import java.util.Optional
import java.util.Set
import java.util.concurrent.atomic.AtomicLong
import javax.inject.Inject
import javax.inject.Singleton
import org.slf4j.LoggerFactory
import org.testeditor.web.backend.testexecution.common.TestExecutionKey
import org.testeditor.web.backend.testexecution.common.TestStatus
import org.testeditor.web.backend.testexecution.distributed.common.StatusAwareTestJobStore
import org.testeditor.web.backend.testexecution.distributed.common.TestExecutionManager
import org.testeditor.web.backend.testexecution.distributed.common.TestJob
import org.testeditor.web.backend.testexecution.distributed.common.TestJobInfo
import org.testeditor.web.backend.testexecution.distributed.common.TestJobInfo.JobState
import org.testeditor.web.backend.testexecution.distributed.common.WritableStatusAwareTestJobStore

interface TestExecutionManager extends StatusAwareTestJobStore {

def void cancelJob(TestExecutionKey key)

def TestExecutionKey addJob(Iterable<String> testFiles, Set<String> requiredCapabilities)

}

@Singleton
class LocalSingleWorkerExecutionManager implements TestExecutionManager {
static val logger = LoggerFactory.getLogger(LocalSingleWorkerExecutionManager)
class DefaultExecutionManager implements TestExecutionManager {
static val logger = LoggerFactory.getLogger(DefaultExecutionManager)

@Inject extension WorkerProvider workerProvider
@Inject extension WritableStatusAwareTestJobStore jobStore

var AtomicLong runningTestSuiteRunId = new AtomicLong(0)
var Optional<TestJobInfo> currentJob = Optional.empty
val currentJob = <TestExecutionKey, TestJobInfo>newHashMap

override cancelJob(TestExecutionKey key) {
currentJob.filter[id == key].ifPresent[
workers.head.cancel
currentJob.remove(key) => [
workerForJob.cancel
setState(JobState.COMPLETED_CANCELLED).store
currentJob = Optional.empty
]
}

override addJob(Iterable<String> testFiles, Set<String> requiredCapabilities) {
return (new TestJob(new TestExecutionKey("0").deriveFreshRunId, emptySet, testFiles) => [
store
workers.head.assign(it).thenAccept[status|updateStatus(status)]
currentJob = Optional.of(it)
idleWorkers.head?.assign(it)?.thenAccept[status|updateStatus(status)]
currentJob.put(id, it)
]).id
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.testeditor.web.backend.testexecution.distributed.manager
import javax.inject.Inject
import org.eclipse.xtend.lib.annotations.Delegate
import org.testeditor.web.backend.testexecution.common.TestExecutionKey
import org.testeditor.web.backend.testexecution.common.TestStatus
import org.testeditor.web.backend.testexecution.distributed.common.TestJob
import org.testeditor.web.backend.testexecution.distributed.common.TestJobInfo
import org.testeditor.web.backend.testexecution.distributed.common.TestJobStore
Expand All @@ -17,6 +18,14 @@ class LocalSingleWorkerManager implements WorkerProvider {
override getWorkers() {
return #[worker]
}

override idleWorkers() {
return #[worker].filter[checkStatus !== TestStatus.RUNNING].filter(WorkerInfo)
}

override workerForJob(TestJobInfo job) {
return if (currentJob?.id == job.id) { worker } else { WorkerInfo.NONE }
}

override assign(WorkerInfo worker, TestJob job) {
return if (worker === this.worker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ import org.testeditor.web.backend.testexecution.common.TestStatus
import org.testeditor.web.backend.testexecution.distributed.common.StatusAwareTestJobStore
import org.testeditor.web.backend.testexecution.distributed.common.TestJob
import org.testeditor.web.backend.testexecution.distributed.common.WorkerInfo
import org.testeditor.web.backend.testexecution.distributed.common.TestJobInfo

interface WorkerProvider extends StatusAwareTestJobStore {

def Iterable<WorkerInfo> getWorkers()

def Iterable<WorkerInfo> idleWorkers()

def WorkerInfo workerForJob(TestJobInfo job)

def CompletionStage<TestStatus> assign(WorkerInfo worker, TestJob job)

Expand Down
Loading