Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:
run: docker info

- name: Run integration tests
run: crystal spec -Dintegration --order random
run: crystal spec spec/integration_spec.cr -Dintegration --order random

# Code coverage with kcov
coverage:
Expand Down
89 changes: 85 additions & 4 deletions spec/integration_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,62 @@ require "./spec_helper"
# via /var/run/docker.sock

{% if flag?(:integration) %}
def send_redis_command(io : IO, parts : Array(String)) : Nil
io << "*#{parts.size}\r\n"
parts.each do |part|
io << "$#{part.bytesize}\r\n#{part}\r\n"
end
io.flush
end

def read_redis_response(io : IO) : String
prefix = io.read_char || raise "Missing Redis response"

case prefix
when '+', '-', ':'
line = io.gets || raise "Missing Redis response line"
line.rstrip
when '$'
size_line = io.gets || raise "Missing Redis bulk string size"
size = size_line.strip.to_i
return "" if size < 0

payload = Bytes.new(size)
io.read_fully(payload)

trailer = Bytes.new(2)
io.read_fully(trailer)

String.new(payload)
else
raise "Unsupported Redis response prefix: #{prefix}"
end
end

def redis_command(host : String, port : Int32, *parts : String) : String
TCPSocket.open(host, port) do |socket|
send_redis_command(socket, parts.to_a)
read_redis_response(socket)
end
end

def redis_command(redis_url : String, *parts : String) : String
uri = URI.parse(redis_url)
host = uri.host || raise "Redis URL is missing host"
port = uri.port || 6379

TCPSocket.open(host, port) do |socket|
if password = uri.password
send_redis_command(socket, ["AUTH", password])
auth_response = read_redis_response(socket)
raise "Redis AUTH failed: #{auth_response}" unless auth_response == "OK"
end

send_redis_command(socket, parts.to_a)
read_redis_response(socket)
end
end
Comment on lines +44 to +66
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add timeout and retry logic to prevent flaky tests.

The pipeline failures showing "Connection reset by peer" suggest a race condition between the container's log output and TCP port readiness. TCPSocket.open should include timeouts, and the helper should retry on transient failures.

🛠️ Proposed fix with timeout and retry logic
 def redis_command(host : String, port : Int32, *parts : String) : String
-  TCPSocket.open(host, port) do |socket|
-    send_redis_command(socket, parts.to_a)
-    read_redis_response(socket)
+  retries = 3
+  begin
+    TCPSocket.open(host, port, connect_timeout: 5.seconds) do |socket|
+      socket.read_timeout = 5.seconds
+      send_redis_command(socket, parts.to_a)
+      read_redis_response(socket)
+    end
+  rescue ex : IO::Error | Socket::ConnectError
+    retries -= 1
+    if retries > 0
+      sleep 0.5.seconds
+      retry
+    end
+    raise ex
   end
 end

 def redis_command(redis_url : String, *parts : String) : String
   uri = URI.parse(redis_url)
   host = uri.host || raise "Redis URL is missing host"
   port = uri.port || 6379

-  TCPSocket.open(host, port) do |socket|
-    if password = uri.password
-      send_redis_command(socket, ["AUTH", password])
-      auth_response = read_redis_response(socket)
-      raise "Redis AUTH failed: #{auth_response}" unless auth_response == "OK"
+  retries = 3
+  begin
+    TCPSocket.open(host, port, connect_timeout: 5.seconds) do |socket|
+      socket.read_timeout = 5.seconds
+      if password = uri.password
+        send_redis_command(socket, ["AUTH", password])
+        auth_response = read_redis_response(socket)
+        raise "Redis AUTH failed: #{auth_response}" unless auth_response == "OK"
+      end
+
+      send_redis_command(socket, parts.to_a)
+      read_redis_response(socket)
     end
-
-    send_redis_command(socket, parts.to_a)
-    read_redis_response(socket)
+  rescue ex : IO::Error | Socket::ConnectError
+    retries -= 1
+    if retries > 0
+      sleep 0.5.seconds
+      retry
+    end
+    raise ex
   end
 end
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def redis_command(host : String, port : Int32, *parts : String) : String
TCPSocket.open(host, port) do |socket|
send_redis_command(socket, parts.to_a)
read_redis_response(socket)
end
end
def redis_command(redis_url : String, *parts : String) : String
uri = URI.parse(redis_url)
host = uri.host || raise "Redis URL is missing host"
port = uri.port || 6379
TCPSocket.open(host, port) do |socket|
if password = uri.password
send_redis_command(socket, ["AUTH", password])
auth_response = read_redis_response(socket)
raise "Redis AUTH failed: #{auth_response}" unless auth_response == "OK"
end
send_redis_command(socket, parts.to_a)
read_redis_response(socket)
end
end
def redis_command(host : String, port : Int32, *parts : String) : String
retries = 3
begin
TCPSocket.open(host, port, connect_timeout: 5.seconds) do |socket|
socket.read_timeout = 5.seconds
send_redis_command(socket, parts.to_a)
read_redis_response(socket)
end
rescue ex : IO::Error | Socket::ConnectError
retries -= 1
if retries > 0
sleep 0.5.seconds
retry
end
raise ex
end
end
def redis_command(redis_url : String, *parts : String) : String
uri = URI.parse(redis_url)
host = uri.host || raise "Redis URL is missing host"
port = uri.port || 6379
retries = 3
begin
TCPSocket.open(host, port, connect_timeout: 5.seconds) do |socket|
socket.read_timeout = 5.seconds
if password = uri.password
send_redis_command(socket, ["AUTH", password])
auth_response = read_redis_response(socket)
raise "Redis AUTH failed: #{auth_response}" unless auth_response == "OK"
end
send_redis_command(socket, parts.to_a)
read_redis_response(socket)
end
rescue ex : IO::Error | Socket::ConnectError
retries -= 1
if retries > 0
sleep 0.5.seconds
retry
end
raise ex
end
end
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@spec/integration_spec.cr` around lines 44 - 66, The redis_command helpers
(both overloads) should use a connect timeout and retry loop around the
TCPSocket.open call to avoid transient "Connection reset by peer" failures: wrap
the TCPSocket.open(host, port) call in a small retry loop (e.g., max attempts +
backoff sleep) that rescues transient socket errors (Errno::ECONNREFUSED,
Errno::ECONNRESET, IO::Timeout, etc.) and retries, and when opening the socket
use a connect-with-timeout approach (or set socket read/write timeouts) before
calling send_redis_command and read_redis_response; preserve existing AUTH logic
in the redis_command(redis_url, ...) overload so AUTH is retried only after a
successful socket open and still raises if auth_response != "OK".


describe "Integration: DockerContainer" do
it "starts and stops a Redis container" do
container = Testcontainers::DockerContainer.new("redis:7-alpine")
Expand All @@ -24,6 +80,9 @@ require "./spec_helper"
port = container.mapped_port(6379)
host.should_not be_empty
port.should be > 0
redis_command(host, port, "PING").should eq("PONG")
redis_command(host, port, "SET", "integration:start-stop", "ok").should eq("OK")
redis_command(host, port, "GET", "integration:start-stop").should eq("ok")

container.stop
container.exited?.should be_true
Expand All @@ -38,10 +97,14 @@ require "./spec_helper"

container.use do |c|
c.running?.should be_true
c.mapped_port(6379).should be > 0
c.exists?.should be_true
port = c.mapped_port(6379)
port.should be > 0
redis_command(c.host, port, "PING").should eq("PONG")
end

container.exists?.should be_false
container.container_id.should be_nil
end

it "executes commands in the container" do
Expand All @@ -50,8 +113,10 @@ require "./spec_helper"

begin
container.start
output = container.exec(["redis-cli", "ping"])
output.strip.should eq("PONG")
output = container.exec(["redis-cli", "SET", "integration:exec", "value-from-exec"])
output.strip.should eq("OK")
container.exec(["redis-cli", "GET", "integration:exec"]).strip.should eq("value-from-exec")
redis_command(container.host, container.mapped_port(6379), "GET", "integration:exec").should eq("value-from-exec")
ensure
container.stop rescue nil
container.remove(force: true) rescue nil
Expand All @@ -64,9 +129,10 @@ require "./spec_helper"

begin
container.start
sleep 1 # Give the container a moment to produce logs
sleep 1.second
logs = container.logs
logs.should contain("Ready to accept connections")
logs.should contain("Redis version")
ensure
container.stop rescue nil
container.remove(force: true) rescue nil
Expand All @@ -77,13 +143,24 @@ require "./spec_helper"
describe "Integration: RedisContainer" do
it "starts and provides connection URL" do
container = Testcontainers::RedisContainer.new("redis:7-alpine")
.with_password("secret")

begin
container.start
container.running?.should be_true
url = container.redis_url
url.should start_with("redis://")
url.should contain(":#{container.mapped_port(6379)}")

uri = URI.parse(url)
uri.scheme.should eq("redis")
uri.host.should eq(container.host)
uri.port.should eq(container.mapped_port(6379))
uri.password.should eq("secret")

redis_command(url, "PING").should eq("PONG")
redis_command(url, "SET", "integration:redis-container", "ready").should eq("OK")
redis_command(url, "GET", "integration:redis-container").should eq("ready")
ensure
container.stop rescue nil
container.remove(force: true) rescue nil
Expand All @@ -99,9 +176,13 @@ require "./spec_helper"
network.create!
network.created?.should be_true
network.network_id.should_not be_nil
network.info.name.should eq(network.name)
network.info.id.should eq(network.network_id)
ensure
network.remove rescue nil
end

network.created?.should be_false
end

it "applies Testcontainers labels to container and network" do
Expand Down
Loading