Skip to content
This repository was archived by the owner on Dec 20, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion keiko-core/keiko-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ dependencies {
api "org.springframework:spring-context"

implementation "com.netflix.spectator:spectator-api"
implementation "javax.annotation:javax.annotation-api"
implementation "jakarta.annotation:jakarta.annotation-api"

testImplementation project(":keiko-test-common")
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.netflix.spinnaker.q.metrics.NoHandlerCapacity
import java.time.Duration
import java.util.Random
import java.util.concurrent.RejectedExecutionException
import javax.annotation.PostConstruct
import jakarta.annotation.PostConstruct
import org.slf4j.Logger
import org.slf4j.LoggerFactory.getLogger
import org.springframework.scheduling.annotation.Scheduled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import redis.clients.jedis.Connection
import redis.clients.jedis.HostAndPort
import redis.clients.jedis.Jedis
import redis.clients.jedis.JedisCluster
Expand Down Expand Up @@ -138,7 +139,7 @@ class RedisQueueConfiguration {
@Value("\${redis.connection:redis://localhost:6379}") connection: String,
@Value("\${redis.timeout:2000}") timeout: Int,
@Value("\${redis.maxattempts:4}") maxAttempts: Int,
redisPoolConfig: GenericObjectPoolConfig<Jedis>
redisPoolConfig: GenericObjectPoolConfig<Connection>
): JedisCluster {
URI.create(connection).let { cx ->
val port = if (cx.port == -1) Protocol.DEFAULT_PORT else cx.port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import java.util.Optional
import org.slf4j.Logger
import redis.clients.jedis.Jedis
import redis.clients.jedis.Transaction
import redis.clients.jedis.commands.JedisClusterCommands
import redis.clients.jedis.commands.JedisCommands

abstract class AbstractRedisQueue(
Expand Down Expand Up @@ -79,27 +78,15 @@ abstract class AbstractRedisQueue(
internal fun JedisCommands.hgetInt(key: String, field: String, default: Int = 0) =
hget(key, field)?.toInt() ?: default

internal fun JedisClusterCommands.hgetInt(key: String, field: String, default: Int = 0) =
hget(key, field)?.toInt() ?: default

internal fun JedisCommands.zismember(key: String, member: String) =
zrank(key, member) != null

internal fun JedisClusterCommands.zismember(key: String, member: String) =
zrank(key, member) != null

internal fun JedisCommands.anyZismember(key: String, members: Set<String>) =
members.any { zismember(key, it) }

internal fun JedisClusterCommands.anyZismember(key: String, members: Set<String>) =
members.any { zismember(key, it) }

internal fun JedisCommands.firstFingerprint(key: String, fingerprint: Fingerprint) =
fingerprint.all.firstOrNull { zismember(key, it) }

internal fun JedisClusterCommands.firstFingerprint(key: String, fingerprint: Fingerprint) =
fingerprint.all.firstOrNull { zismember(key, it) }

@Deprecated("Hashes the attributes property, which is mutable")
internal fun Message.hashV1() =
Hashing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,14 @@ class RedisClusterQueue(
fun JedisCluster.multi(block: Transaction.() -> Unit) =
getConnectionFromSlot(JedisClusterCRC16.getSlot(queueKey))
.use { c ->
c.multi()
.let { tx ->
tx.block()
tx.exec()
}
// Send MULTI command to start a transaction
c.sendCommand(redis.clients.jedis.Protocol.Command.MULTI)
// Wrap the connection in a Transaction object
val tx = Transaction(c)
// Execute the block on the Transaction
tx.block()
// Commit the transaction
tx.exec()
}

private fun ackMessage(fingerprint: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.scheduling.annotation.Scheduled
import redis.clients.jedis.Jedis
import redis.clients.jedis.commands.ScriptingCommands
import redis.clients.jedis.commands.ScriptingKeyCommands
import redis.clients.jedis.exceptions.JedisDataException
import redis.clients.jedis.params.ZAddParams.zAddParams
import redis.clients.jedis.util.Pool
Expand Down Expand Up @@ -341,7 +341,7 @@ class RedisQueue(
}
}

internal fun ScriptingCommands.readMessageWithLock(): Triple<String, Instant, String?>? {
internal fun ScriptingKeyCommands.readMessageWithLock(): Triple<String, Instant, String?>? {
try {
val response = evalsha(
readMessageWithLockScriptSha,
Expand Down
3 changes: 2 additions & 1 deletion keiko-sql/keiko-sql.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ dependencies {
implementation "io.spinnaker.kork:kork-sql"
implementation "de.huxhorn.sulky:de.huxhorn.sulky.ulid"
implementation "io.github.resilience4j:resilience4j-retry"
implementation "javax.validation:validation-api"
implementation "io.github.resilience4j:resilience4j-vavr"
implementation "jakarta.validation:jakarta.validation-api"
implementation "org.jooq:jooq"

testImplementation project(":keiko-tck")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package com.netflix.spinnaker.config
import com.netflix.spinnaker.kork.sql.config.RetryProperties
import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties
import java.time.Duration
import javax.validation.constraints.Pattern
import javax.validation.constraints.Positive
import jakarta.validation.constraints.Pattern
import jakarta.validation.constraints.Positive
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.validation.annotation.Validated

Expand Down
2 changes: 1 addition & 1 deletion orca-clouddriver/orca-clouddriver.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies {
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml")
implementation("io.kubernetes:client-java")
implementation("com.amazonaws:aws-java-sdk-lambda")
implementation("javax.validation:validation-api")
implementation("jakarta.validation:jakarta.validation-api")
implementation("org.jetbrains:annotations")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-guava")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode;
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution;
import com.netflix.spinnaker.orca.clouddriver.tasks.image.DeleteImageTask;
import jakarta.validation.constraints.NotNull;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.validation.constraints.NotNull;
import org.springframework.stereotype.Component;

@Component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask;
import com.netflix.spinnaker.orca.clouddriver.tasks.job.UpdateJobProcessesTask;
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.ServerGroupCacheForceRefreshTask;
import jakarta.validation.constraints.NotNull;
import javax.annotation.Nonnull;
import javax.validation.constraints.NotNull;
import org.springframework.stereotype.Component;

@Component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution;
import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask;
import com.netflix.spinnaker.orca.clouddriver.tasks.launchconfigurations.DeleteLaunchConfigurationTask;
import jakarta.validation.constraints.NotNull;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.validation.constraints.NotNull;
import lombok.Data;
import org.springframework.stereotype.Component;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution;
import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask;
import com.netflix.spinnaker.orca.clouddriver.tasks.launchtemplates.DeleteLaunchTemplateTask;
import jakarta.validation.constraints.NotNull;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.validation.constraints.NotNull;
import lombok.Data;
import org.springframework.stereotype.Component;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask;
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.ServerGroupCacheForceRefreshTask;
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.UpsertDisruptionBudgetTask;
import jakarta.validation.constraints.NotNull;
import javax.annotation.Nonnull;
import javax.validation.constraints.NotNull;
import org.springframework.stereotype.Component;

@Component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.netflix.spinnaker.orca.clouddriver.pipeline.servergroup.strategies.lambda;

import jakarta.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution;
import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask;
import com.netflix.spinnaker.orca.clouddriver.tasks.snapshot.DeleteSnapshotTask;
import jakarta.validation.constraints.NotNull;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.validation.constraints.NotNull;
import org.springframework.stereotype.Component;

@Component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import com.netflix.spinnaker.orca.clouddriver.model.TaskId;
import com.netflix.spinnaker.orca.clouddriver.pipeline.image.DeleteImageStage;
import com.netflix.spinnaker.orca.clouddriver.utils.CloudProviderAware;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.Validation;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spinnaker.orca.StageResolver;
Expand Down Expand Up @@ -82,6 +83,7 @@ public void LambdaTrafficRoutingStageIntegrationTest() throws Exception {
this.mockMvc
.perform(
MockMvcRequestBuilders.post("/orchestrate")
.with(csrf())
.content(content)
.contentType(MediaType.APPLICATION_JSON))
.andReturn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import io.mockk.verify
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc
import org.springframework.http.MediaType
import org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf
import org.springframework.test.context.TestPropertySource
import org.springframework.test.web.servlet.MockMvc
import org.springframework.test.web.servlet.get
Expand Down Expand Up @@ -75,6 +76,7 @@ class KubernetesPreconfiguredJobSpec : JUnit5Minutests {
every { katoRestService.requestOperations(any(), any(), any()) } returns TaskId("1")

val resp = subject.post("/orchestrate") {
with(csrf())
contentType = MediaType.APPLICATION_JSON
content = pipeline
}.andReturn().response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.netflix.spinnaker.orca.pipeline.model.support.TriggerDeserializer
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria
import com.netflix.spinnaker.orca.sql.PipelineRefTriggerDeserializerSupplier
import com.netflix.spinnaker.orca.sql.pipeline.persistence.PipelineRefTrigger
import rx.schedulers.Schedulers
import io.reactivex.rxjava3.schedulers.Schedulers
import spock.lang.Specification
import spock.lang.Subject
import spock.lang.Unroll
Expand Down Expand Up @@ -68,23 +68,23 @@ abstract class PipelineExecutionRepositoryTck<T extends ExecutionRepository> ext
repository().store(succeededExecution)
def pipelines = repository().retrievePipelinesForPipelineConfigId(
"pipeline-1", new ExecutionCriteria(pageSize: 5, statuses: ["RUNNING", "SUCCEEDED", "TERMINAL"])
).subscribeOn(Schedulers.io()).toList().toBlocking().single()
).subscribeOn(Schedulers.io()).toList().blockingGet()

then:
pipelines*.id.sort() == [runningExecution.id, succeededExecution.id].sort()

when:
pipelines = repository().retrievePipelinesForPipelineConfigId(
"pipeline-1", new ExecutionCriteria(pageSize: 5, statuses: ["RUNNING"])
).subscribeOn(Schedulers.io()).toList().toBlocking().single()
).subscribeOn(Schedulers.io()).toList().blockingGet()

then:
pipelines*.id.sort() == [runningExecution.id].sort()

when:
pipelines = repository().retrievePipelinesForPipelineConfigId(
"pipeline-1", new ExecutionCriteria(pageSize: 5, statuses: ["TERMINAL"])
).subscribeOn(Schedulers.io()).toList().toBlocking().single()
).subscribeOn(Schedulers.io()).toList().blockingGet()

then:
pipelines.isEmpty()
Expand All @@ -109,23 +109,23 @@ abstract class PipelineExecutionRepositoryTck<T extends ExecutionRepository> ext
repository().store(succeededExecution)
def orchestrations = repository().retrieveOrchestrationsForApplication(
runningExecution.application, new ExecutionCriteria(pageSize: 5, statuses: ["RUNNING", "SUCCEEDED", "TERMINAL"])
).subscribeOn(Schedulers.io()).toList().toBlocking().single()
).subscribeOn(Schedulers.io()).toList().blockingGet()

then:
orchestrations*.id.sort() == [runningExecution.id, succeededExecution.id].sort()

when:
orchestrations = repository().retrieveOrchestrationsForApplication(
runningExecution.application, new ExecutionCriteria(pageSize: 5, statuses: ["RUNNING"])
).subscribeOn(Schedulers.io()).toList().toBlocking().single()
).subscribeOn(Schedulers.io()).toList().blockingGet()

then:
orchestrations*.id.sort() == [runningExecution.id].sort()

when:
orchestrations = repository().retrieveOrchestrationsForApplication(
runningExecution.application, new ExecutionCriteria(pageSize: 5, statuses: ["TERMINAL"])
).subscribeOn(Schedulers.io()).toList().toBlocking().single()
).subscribeOn(Schedulers.io()).toList().blockingGet()

then:
orchestrations.isEmpty()
Expand Down Expand Up @@ -247,7 +247,7 @@ abstract class PipelineExecutionRepositoryTck<T extends ExecutionRepository> ext
repository().store(pipeline)

expect:
repository().retrieve(PIPELINE).toBlocking().first().id == pipeline.id
repository().retrieve(PIPELINE).blockingFirst().id == pipeline.id

with(repository().retrieve(pipeline.type, pipeline.id)) {
id == pipeline.id
Expand Down Expand Up @@ -316,7 +316,7 @@ abstract class PipelineExecutionRepositoryTck<T extends ExecutionRepository> ext
thrown ExecutionNotFoundException

and:
repository().retrieve(PIPELINE).toList().toBlocking().first() == []
repository().retrieve(PIPELINE).toList().blockingGet() == []
}

def "updateStatus sets startTime to current time if new status is RUNNING"() {
Expand Down Expand Up @@ -629,7 +629,7 @@ abstract class PipelineExecutionRepositoryTck<T extends ExecutionRepository> ext
.setPageSize(limit)

expect:
with(repository().retrieve(type, criteria).toList().toBlocking().single()) {
with(repository().retrieve(type, criteria).toList().blockingGet()) {
size() == expectedResults
type.every { it == type }
if (statuses) {
Expand Down
6 changes: 3 additions & 3 deletions orca-core/orca-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies {
api("io.spinnaker.kork:kork-plugins")
api("io.spinnaker.kork:kork-security")
api("io.spinnaker.kork:kork-telemetry")
api("io.reactivex:rxjava")
api("io.reactivex.rxjava3:rxjava")

api(project(":orca-api"))
implementation("com.github.ben-manes.caffeine:guava")
Expand All @@ -49,8 +49,8 @@ dependencies {
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.apache.commons:commons-lang3")
implementation("org.apache.httpcomponents:httpclient")
implementation("javax.servlet:javax.servlet-api:4.0.1")
implementation("javax.validation:validation-api")
implementation("jakarta.servlet:jakarta.servlet-api")
implementation("jakarta.validation:jakarta.validation-api")
implementation("com.jayway.jsonpath:json-path")
implementation("org.yaml:snakeyaml")
implementation("org.apache.groovy:groovy")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilderFactory;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository;
import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.time.Clock;
import java.time.Duration;
import java.util.Collection;
Expand All @@ -73,8 +75,6 @@
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import rx.Scheduler;
import rx.schedulers.Schedulers;

@Configuration
@ComponentScan({
Expand Down
Loading
Loading