Skip to content

Commit 66e6860

Browse files
authored
Merge pull request #28 from JuliaComputing/tan/misc
concurrent use is flaky, add to readme
2 parents 655c590 + f547f83 commit 66e6860

File tree

5 files changed

+93
-8
lines changed

5 files changed

+93
-8
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ gRPCController(;
143143
[ max_message_length = DEFAULT_MAX_MESSAGE_LENGTH, ]
144144
[ max_recv_message_length = 0, ]
145145
[ max_send_message_length = 0, ]
146+
[ enable_shared_locks = false, ]
146147
[ verbose::Bool = false, ]
147148
)
148149
```
@@ -163,6 +164,8 @@ gRPCController(;
163164
`max_message_length`, same as setting this to 0)
164165
- `max_send_message_length`: maximum message length to send (default is
165166
`max_message_length`, same as setting this to 0)
167+
- `enable_shared_locks`: whether to enable locks for using gRPCClient across
168+
tasks/threads concurrently (experimental, default is false)
166169
- `verbose`: whether to print out verbose communication logs (default false)
167170

168171
### `gRPCChannel`
@@ -231,6 +234,10 @@ It has the following members:
231234
- `grpc_status`: grpc status code for this request
232235
- `message`: any error message if request was not successful
233236

237+
## TODO
238+
239+
- Concurrent use of gRPCClient is still somewhat flaky.
240+
234241
## Credits
235242

236243
This package was originally developed at [Julia Computing](https://juliacomputing.com)

src/curl.jl

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,57 @@ function buffer_send_data(input::Channel{T}) where T <: ProtoType
4242
end
4343
=#
4444

45+
function share_lock(easy_p::Ptr{Cvoid}, data::curl_lock_data, access::curl_lock_access, userptr::Ptr{Cvoid})
46+
share = unsafe_pointer_to_objref(Ptr{CurlShare}(userptr))::CurlShare
47+
lock(share.locks[data])
48+
nothing
49+
end
50+
51+
function share_unlock(easy_p::Ptr{Cvoid}, data::curl_lock_data, userptr::Ptr{Cvoid})
52+
share = unsafe_pointer_to_objref(Ptr{CurlShare}(userptr))::CurlShare
53+
unlock(share.locks[data])
54+
nothing
55+
end
56+
57+
mutable struct CurlShare
58+
shptr::Ptr{CURLSH}
59+
locks::Vector{ReentrantLock}
60+
closed::Bool
61+
62+
function CurlShare()
63+
shptr = curl_share_init()
64+
curl_share_setopt(shptr, CURLSHOPT_SHARE, CURL_LOCK_DATA_SHARE)
65+
curl_share_setopt(shptr, CURLSHOPT_SHARE, CURL_LOCK_DATA_COOKIE)
66+
curl_share_setopt(shptr, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS)
67+
curl_share_setopt(shptr, CURLSHOPT_SHARE, CURL_LOCK_DATA_PSL)
68+
curl_share_setopt(shptr, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT)
69+
70+
share_lock_cb = @cfunction(share_lock, Cvoid, (Ptr{Cvoid}, Cuint, Cuint, Ptr{Cvoid}))
71+
share_unlock_cb = @cfunction(share_unlock, Cvoid, (Ptr{Cvoid}, Cuint, Ptr{Cvoid}))
72+
73+
@ccall LibCURL.LibCURL_jll.libcurl.curl_share_setopt(shptr::Ptr{CURLSH}, CURLSHOPT_LOCKFUNC::CURLSHoption; share_lock_cb::Ptr{Cvoid})::CURLSHcode
74+
@ccall LibCURL.LibCURL_jll.libcurl.curl_share_setopt(shptr::Ptr{CURLSH}, CURLSHOPT_UNLOCKFUNC::CURLSHoption; share_unlock_cb::Ptr{Cvoid})::CURLSHcode
75+
76+
locks = Vector(undef, CURL_LOCK_DATA_LAST)
77+
for idx in 1:CURL_LOCK_DATA_LAST
78+
locks[idx] = ReentrantLock()
79+
end
80+
81+
obj = new(shptr, locks, false)
82+
userptr = pointer_from_objref(obj)
83+
@ccall LibCURL.LibCURL_jll.libcurl.curl_share_setopt(shptr::Ptr{CURLSH}, CURLSHOPT_USERDATA::CURLSHoption; userptr::Ptr{Cvoid})::CURLSHcode
84+
obj
85+
end
86+
end
87+
88+
function close(share::CurlShare)
89+
if share.closed
90+
curl_share_cleanup(share.shptr)
91+
share.closed = true
92+
end
93+
nothing
94+
end
95+
4596
function send_data(easy::Curl.Easy, input::Channel{T}, max_send_message_length::Int) where T <: ProtoType
4697
while true
4798
yield()
@@ -95,7 +146,7 @@ function grpc_request_header(request_timeout::Real)
95146
end
96147
end
97148

98-
function easy_handle(maxage::Clong, keepalive::Clong, negotiation::Symbol, revocation::Bool, request_timeout::Real)
149+
function easy_handle(curlshare::Union{Nothing,Ptr{CURLSH}}, maxage::Clong, keepalive::Clong, negotiation::Symbol, revocation::Bool, request_timeout::Real)
99150
easy = Curl.Easy()
100151
http_version = (negotiation === :http2) ? CURL_HTTP_VERSION_2_0 :
101152
(negotiation === :http2_tls) ? CURL_HTTP_VERSION_2TLS :
@@ -105,6 +156,9 @@ function easy_handle(maxage::Clong, keepalive::Clong, negotiation::Symbol, revoc
105156
Curl.setopt(easy, CURLOPT_PIPEWAIT, Clong(1))
106157
Curl.setopt(easy, CURLOPT_POST, Clong(1))
107158
Curl.setopt(easy, CURLOPT_HTTPHEADER, grpc_request_header(request_timeout))
159+
if curlshare !== nothing
160+
Curl.setopt(easy, CURLOPT_SHARE, curlshare)
161+
end
108162
if !revocation
109163
Curl.setopt(easy, CURLOPT_SSL_OPTIONS, CURLSSLOPT_NO_REVOKE)
110164
end
@@ -200,7 +254,7 @@ function get_grpc_status(easy::Curl.Easy)
200254
return grpc_status, grpc_message
201255
end
202256

203-
function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2};
257+
function grpc_request(curlshare::Union{Nothing,Ptr{CURLSH}}, downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2};
204258
maxage::Clong = typemax(Clong),
205259
keepalive::Clong = 60,
206260
negotiation::Symbol = :http2_prior_knowledge,
@@ -210,7 +264,7 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
210264
max_recv_message_length::Int = DEFAULT_MAX_RECV_MESSAGE_LENGTH,
211265
max_send_message_length::Int = DEFAULT_MAX_SEND_MESSAGE_LENGTH,
212266
verbose::Bool = false)::gRPCStatus where {T1 <: ProtoType, T2 <: ProtoType}
213-
Curl.with_handle(easy_handle(maxage, keepalive, negotiation, revocation, request_timeout)) do easy
267+
Curl.with_handle(easy_handle(curlshare, maxage, keepalive, negotiation, revocation, request_timeout)) do easy
214268
# setup the request
215269
Curl.set_url(easy, url)
216270
Curl.set_timeout(easy, request_timeout)

src/gRPCClient.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ using ProtoBuf
66

77
import Downloads: Curl
88
import ProtoBuf: call_method
9+
import Base: close
910

1011
export gRPCController, gRPCChannel, gRPCException, gRPCServiceCallException, gRPCMessageTooLargeException, gRPCStatus, gRPCCheck, StatusCode
1112

src/grpc.jl

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ Contains settings to control the behavior of gRPC requests.
9797
`max_message_length`, same as setting this to 0)
9898
- `max_send_message_length`: maximum message length to send (default is
9999
`max_message_length`, same as setting this to 0)
100+
- `enable_shared_locks`: whether to enable locks for using gRPCClient across
101+
tasks/threads concurrently (experimental, default is false)
100102
- `verbose`: whether to print out verbose communication logs (default false)
101103
"""
102104
struct gRPCController <: ProtoRpcController
@@ -108,6 +110,7 @@ struct gRPCController <: ProtoRpcController
108110
connect_timeout::Real
109111
max_recv_message_length::Int
110112
max_send_message_length::Int
113+
enable_shared_locks::Bool
111114
verbose::Bool
112115

113116
function gRPCController(;
@@ -120,6 +123,7 @@ struct gRPCController <: ProtoRpcController
120123
max_message_length::Integer = DEFAULT_MAX_MESSAGE_LENGTH,
121124
max_recv_message_length::Integer = 0,
122125
max_send_message_length::Integer = 0,
126+
enable_shared_locks::Bool = false,
123127
verbose::Bool = false
124128
)
125129
if maxage < 0 || keepalive < 0 || request_timeout < 0 || connect_timeout < 0 ||
@@ -128,7 +132,17 @@ struct gRPCController <: ProtoRpcController
128132
end
129133
(max_recv_message_length == 0) && (max_recv_message_length = max_message_length)
130134
(max_send_message_length == 0) && (max_send_message_length = max_message_length)
131-
new(maxage, keepalive, negotiation, revocation, request_timeout, connect_timeout, max_recv_message_length, max_send_message_length, verbose)
135+
new(maxage,
136+
keepalive,
137+
negotiation,
138+
revocation,
139+
request_timeout,
140+
connect_timeout,
141+
max_recv_message_length,
142+
max_send_message_length,
143+
enable_shared_locks,
144+
verbose,
145+
)
132146
end
133147
end
134148

@@ -146,16 +160,22 @@ the server.
146160
struct gRPCChannel <: ProtoRpcChannel
147161
downloader::Downloader
148162
baseurl::String
163+
curlshare::CurlShare
149164

150165
function gRPCChannel(baseurl::String)
151166
downloader = Downloader(; grace=Inf)
152167
Curl.init!(downloader.multi)
153168
Curl.setopt(downloader.multi, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX)
154169
endswith(baseurl, '/') && (baseurl = baseurl[1:end-1])
155-
new(downloader, baseurl)
170+
new(downloader, baseurl, CurlShare())
156171
end
157172
end
158173

174+
function close(channel::gRPCChannel)
175+
close(channel.curlshare)
176+
nothing
177+
end
178+
159179
function to_delimited_message_bytes(msg, max_message_length::Int)
160180
iob = IOBuffer()
161181
limitiob = LimitIO(iob, max_message_length)
@@ -193,7 +213,8 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M
193213
end
194214
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, outchannel::Channel{T2}) where {T1 <: ProtoType, T2 <: ProtoType}
195215
url = string(channel.baseurl, "/", service.name, "/", method.name)
196-
status_future = @async grpc_request(channel.downloader, url, input, outchannel;
216+
shptr = controller.enable_shared_locks ? channel.curlshare.shptr : nothing
217+
status_future = @async grpc_request(shptr, channel.downloader, url, input, outchannel;
197218
maxage = controller.maxage,
198219
keepalive = controller.keepalive,
199220
negotiation = controller.negotiation,

test/runtests_routeguide.jl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,10 @@ server_endpoint = isempty(ARGS) ? "http://localhost:10000/" : ARGS[1]
8888
@debug("testing routeclinet...")
8989
test_clients(server_endpoint)
9090

91-
@debug("testing async safety...")
92-
test_task_safety(server_endpoint)
91+
if get(ENV, "TEST_ASYNC", "false") == "true"
92+
@debug("testing async safety...")
93+
test_task_safety(server_endpoint)
94+
end
9395

9496
kill(serverproc)
9597
@info("stopped test server")

0 commit comments

Comments
 (0)