Skip to content

Commit 655c590

Browse files
authored
prevent connection reuse on errors (#27)
If the server returns an error or is likely to close connection, it is best to not reuse the connection for future client interactions. With this we now set the `CURLOPT_FORBID_REUSE` flag on the easy handle when we detect such a condition.
1 parent 333406a commit 655c590

File tree

2 files changed

+50
-54
lines changed

2 files changed

+50
-54
lines changed

src/curl.jl

Lines changed: 47 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,34 @@ function set_connect_timeout(easy::Curl.Easy, timeout::Real)
172172
end
173173
end
174174

175+
# Prevent reuse of this handle
176+
# Should be called if an error is detected and/or the server is likely to close connection
177+
forbid_reuse(easy::Curl.Easy) = Curl.setopt(easy, CURLOPT_FORBID_REUSE, Clong(1))
178+
179+
function get_grpc_status(easy::Curl.Easy)
180+
grpc_status = StatusCode.OK.code
181+
grpc_message = ""
182+
183+
# parse the grpc headers
184+
@debug("response headers", easy.res_hdrs)
185+
for hdr in easy.res_hdrs
186+
if startswith(hdr, "grpc-status")
187+
grpc_status = parse(Int, strip(last(split(hdr, ':'; limit=2))))
188+
elseif startswith(hdr, "grpc-message")
189+
grpc_message = string(strip(last(split(hdr, ':'; limit=2))))
190+
end
191+
end
192+
193+
if (easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code)
194+
grpc_status = StatusCode.DEADLINE_EXCEEDED.code
195+
end
196+
if (grpc_status != StatusCode.OK.code) && isempty(grpc_message)
197+
grpc_message = grpc_status_message(grpc_status)
198+
end
199+
200+
return grpc_status, grpc_message
201+
end
202+
175203
function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2};
176204
maxage::Clong = typemax(Clong),
177205
keepalive::Clong = 60,
@@ -209,62 +237,30 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
209237
nothing
210238
end
211239

212-
# do send recv data
213-
if VERSION < v"1.5"
214-
cleaned_up = false
215-
exception = nothing
216-
cleanup_once = (ex)->begin
217-
if !cleaned_up
218-
cleaned_up = true
219-
exception = ex
220-
cleanup()
221-
end
222-
end
223-
224-
@sync begin
225-
@async try
226-
recv_data(easy, output, max_recv_message_length)
227-
catch ex
228-
cleanup_once(ex)
229-
end
230-
@async try
231-
send_data(easy, input, max_send_message_length)
232-
catch ex
233-
cleanup_once(ex)
234-
end
235-
end
240+
exception = nothing
241+
grpc_status = StatusCode.OK.code
242+
grpc_message = ""
236243

237-
if exception !== nothing
238-
throw(exception)
244+
# do send recv data
245+
try
246+
Base.Experimental.@sync begin
247+
@async recv_data(easy, output, max_recv_message_length)
248+
@async send_data(easy, input, max_send_message_length)
239249
end
240-
else
241-
try
242-
Base.Experimental.@sync begin
243-
@async recv_data(easy, output, max_recv_message_length)
244-
@async send_data(easy, input, max_send_message_length)
245-
end
246-
finally # ensure handle is removed
247-
cleanup()
250+
grpc_status, grpc_message = get_grpc_status(easy)
251+
if ((easy.code != CURLE_OK) || (grpc_status != StatusCode.OK.code))
252+
forbid_reuse(easy)
248253
end
254+
catch ex
255+
forbid_reuse(easy)
256+
exception = ex
257+
finally # ensure handle is removed
258+
cleanup()
249259
end
250260

251-
@debug("response headers", easy.res_hdrs)
252-
253-
# parse the grpc headers
254-
grpc_status = StatusCode.OK.code
255-
grpc_message = ""
256-
for hdr in easy.res_hdrs
257-
if startswith(hdr, "grpc-status")
258-
grpc_status = parse(Int, strip(last(split(hdr, ':'; limit=2))))
259-
elseif startswith(hdr, "grpc-message")
260-
grpc_message = string(strip(last(split(hdr, ':'; limit=2))))
261-
end
262-
end
263-
if (easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code)
264-
grpc_status = StatusCode.DEADLINE_EXCEEDED.code
265-
end
266-
if (grpc_status != StatusCode.OK.code) && isempty(grpc_message)
267-
grpc_message = grpc_status_message(grpc_status)
261+
# throw the unwrapped exception if there was one
262+
if exception !== nothing
263+
throw(exception)
268264
end
269265

270266
if ((easy.code == CURLE_OK) && (grpc_status == StatusCode.OK.code))

src/grpc.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ end
1919
gRPCStatus(success::Bool, grpc_status::Int, message::AbstractString) = gRPCStatus(success, grpc_status, string(message), nothing)
2020
function gRPCStatus(status_future)
2121
try
22-
fetch(status_future)
22+
return fetch(status_future)
2323
catch ex
2424
task_exception = isa(ex, TaskFailedException) ? ex.task.exception : ex
2525
while isa(task_exception, TaskFailedException)
2626
task_exception = task_exception.task.exception
2727
end
28-
gRPCStatus(false, StatusCode.INTERNAL.code, string(task_exception), task_exception)
28+
return gRPCStatus(false, StatusCode.INTERNAL.code, string(task_exception), task_exception)
2929
end
3030
end
3131

@@ -181,7 +181,7 @@ end
181181
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{T2}) where {T1 <: ProtoType, T2 <: ProtoType}
182182
outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}())
183183
try
184-
take!(outchannel), status_future
184+
return (take!(outchannel), status_future)
185185
catch ex
186186
gRPCCheck(status_future) # check for core issue
187187
if isa(ex, InvalidStateException)

0 commit comments

Comments
 (0)