Skip to content

Commit 556ae23

Browse files
authored
Merge pull request #6 from JuliaComputing/tan/grpc
add message size limit checks
2 parents d28400d + 49242ab commit 556ae23

File tree

7 files changed

+247
-57
lines changed

7 files changed

+247
-57
lines changed

README.md

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ gRPCController(;
140140
[ revocation::Bool = true, ]
141141
[ request_timeout::Real = Inf, ]
142142
[ connect_timeout::Real = 0, ]
143+
[ max_message_length = DEFAULT_MAX_MESSAGE_LENGTH, ]
144+
[ max_recv_message_length = 0, ]
145+
[ max_send_message_length = 0, ]
143146
[ verbose::Bool = false, ]
144147
)
145148
```
@@ -155,6 +158,11 @@ gRPCController(;
155158
- `request_timeout`: request timeout (seconds)
156159
- `connect_timeout`: connect timeout (seconds) (default is 300 seconds, same
157160
as setting this to 0)
161+
- `max_message_length`: maximum message length (default is 4MB)
162+
- `max_recv_message_length`: maximum message length to receive (default is
163+
`max_message_length`, same as setting this to 0)
164+
- `max_send_message_length`: maximum message length to send (default is
165+
`max_message_length`, same as setting this to 0)
158166
- `verbose`: whether to print out verbose communication logs (default false)
159167

160168
### `gRPCChannel`
@@ -178,15 +186,6 @@ the server.
178186
- `success`: whether the request was completed successfully.
179187
- `message`: any error message if request was not successful
180188

181-
### `gRPCException`
182-
183-
Every gRPC request returns the result and a future representing the status
184-
of the gRPC request. Use the `gRPCCheck` method on the status future to check
185-
the request status and throw a `gRPCException` if it is not successful.
186-
187-
A `gRPCException` has a member named `message` that may contain an error
188-
message if request was not successful.
189-
190189
### `gRPCCheck`
191190

192191
```julia
@@ -196,3 +195,36 @@ gRPCCheck(status; throw_error::Bool=true)
196195
Method to check the response of a gRPC request and raise a `gRPCException`
197196
if it has failed. If `throw_error` is set to false, returns `true` or `false`
198197
indicating success instead.
198+
199+
### `gRPCException`
200+
201+
Every gRPC request returns the result and a future representing the status
202+
of the gRPC request. Use the `gRPCCheck` method on the status future to check
203+
the request status and throw a `gRPCException` if it is not successful.
204+
205+
The abstract `gRPCException` type has the following concrete implementations:
206+
207+
- `gRPCMessageTooLargeException`
208+
- `gRPCServiceCallException`
209+
210+
### `gRPCMessageTooLargeException`
211+
212+
A `gRPMessageTooLargeException` exception is thrown when a message is
213+
encountered that has a size greater than the limit configured.
214+
Specifically, `max_recv_message_length` while receiving and
215+
`max_send_message_length` while sending.
216+
217+
A `gRPMessageTooLargeException` has the following members:
218+
219+
- `limit`: the limit value that was exceeded
220+
- `encountered`: the amount of data that was actually received
221+
or sent before this error was triggered. Note that this may
222+
not correspond to the full size of the data, as error may be
223+
thrown before actually materializing the complete data.
224+
225+
### `gRPCServiceCallException`
226+
227+
A `gRPCServiceCallException` is thrown if a gRPC request is not successful.
228+
It has the following members:
229+
230+
- `message`: any error message if request was not successful

src/curl.jl

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ function buffer_send_data(input::Channel{T}) where T <: ProtoType
1818
end
1919
=#
2020

21-
function send_data(easy::Curl.Easy, input::Channel{T}) where T <: ProtoType
21+
function send_data(easy::Curl.Easy, input::Channel{T}, max_send_message_length::Int) where T <: ProtoType
2222
while true
2323
yield()
24-
data = isready(input) ? to_delimited_message_bytes(take!(input)) : isopen(input) ? UInt8[] : nothing
24+
data = isready(input) ? to_delimited_message_bytes(take!(input), max_send_message_length) : isopen(input) ? UInt8[] : nothing
2525
easy.input === nothing && break
2626
easy.input = data
2727
Curl.curl_easy_pause(easy.handle, Curl.CURLPAUSE_CONT)
@@ -63,7 +63,7 @@ function easy_handle(maxage::Clong, keepalive::Clong, negotiation::Symbol, revoc
6363
easy
6464
end
6565

66-
function recv_data(easy::Curl.Easy, output::Channel{T}) where T <: ProtoType
66+
function recv_data(easy::Curl.Easy, output::Channel{T}, max_recv_message_length::Int) where T <: ProtoType
6767
iob = PipeBuffer()
6868
waiting_for_header = true
6969
msgsize = 0
@@ -78,6 +78,11 @@ function recv_data(easy::Curl.Easy, output::Channel{T}) where T <: ProtoType
7878
if bytesavailable(iob) >= 5
7979
compressed = read(iob, UInt8) # compression
8080
datalen = ntoh(read(iob, UInt32)) # message length
81+
82+
if datalen > max_recv_message_length
83+
throw(gRPCMessageTooLargeException(max_recv_message_length, datalen))
84+
end
85+
8186
waiting_for_header = false
8287
else
8388
need_more = true
@@ -114,10 +119,12 @@ end
114119
function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2};
115120
maxage::Clong = typemax(Clong),
116121
keepalive::Clong = 60,
117-
negotiation::Symbol = :http2_prior_knowledge,
118-
revocation::Bool = true,
122+
negotiation::Symbol = :http2_prior_knowledge,
123+
revocation::Bool = true,
119124
request_timeout::Real = Inf,
120125
connect_timeout::Real = 0,
126+
max_recv_message_length::Int = DEFAULT_MAX_RECV_MESSAGE_LENGTH,
127+
max_send_message_length::Int = DEFAULT_MAX_SEND_MESSAGE_LENGTH,
121128
verbose::Bool = false)::gRPCStatus where {T1 <: ProtoType, T2 <: ProtoType}
122129
Curl.with_handle(easy_handle(maxage, keepalive, negotiation, revocation)) do easy
123130
# setup the request
@@ -131,14 +138,54 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
131138
# do the request
132139
Curl.add_handle(downloader.multi, easy)
133140

134-
try
135-
# do send recv data
136-
Base.Experimental.@sync begin
137-
@async recv_data(easy, output)
138-
@async send_data(easy, input)
139-
end
140-
finally # ensure handle is removed
141+
function cleanup()
141142
Curl.remove_handle(downloader.multi, easy)
143+
# though remove_handle sets easy.handle to C_NULL, it does not close output and progress channels
144+
# we need to close them here to unblock anything waiting on them
145+
close(easy.output)
146+
close(easy.progress)
147+
close(output)
148+
close(input)
149+
nothing
150+
end
151+
152+
# do send recv data
153+
if VERSION < v"1.5"
154+
cleaned_up = false
155+
exception = nothing
156+
cleanup_once = (ex)->begin
157+
if !cleaned_up
158+
cleaned_up = true
159+
exception = ex
160+
cleanup()
161+
end
162+
end
163+
164+
@sync begin
165+
@async try
166+
recv_data(easy, output, max_recv_message_length)
167+
catch ex
168+
cleanup_once(ex)
169+
end
170+
@async try
171+
send_data(easy, input, max_send_message_length)
172+
catch ex
173+
cleanup_once(ex)
174+
end
175+
end
176+
177+
if exception !== nothing
178+
throw(exception)
179+
end
180+
else
181+
try
182+
Base.Experimental.@sync begin
183+
@async recv_data(easy, output, max_recv_message_length)
184+
@async send_data(easy, input, max_send_message_length)
185+
end
186+
finally # ensure handle is removed
187+
cleanup()
188+
end
142189
end
143190

144191
(easy.code == CURLE_OK) ? gRPCStatus(true, "") : gRPCStatus(false, Curl.get_curl_errstr(easy))

src/gRPCClient.jl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ using ProtoBuf
77
import Downloads: Curl
88
import ProtoBuf: call_method
99

10-
export gRPCController, gRPCChannel, gRPCException, gRPCStatus, gRPCCheck
10+
export gRPCController, gRPCChannel, gRPCException, gRPCServiceCallException, gRPCMessageTooLargeException, gRPCStatus, gRPCCheck
1111

12+
abstract type gRPCException <: Exception end
13+
14+
include("limitio.jl")
1215
include("curl.jl")
1316
include("grpc.jl")
1417
include("generate.jl")

src/grpc.jl

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,39 +12,56 @@
1212
struct gRPCStatus
1313
success::Bool
1414
message::String
15+
exception::Union{Nothing,Exception}
16+
end
17+
18+
gRPCStatus(success::Bool, message::AbstractString) = gRPCStatus(success, string(message), nothing)
19+
function gRPCStatus(status_future)
20+
try
21+
fetch(status_future)
22+
catch ex
23+
task_exception = isa(ex, TaskFailedException) ? ex.task.exception : ex
24+
while isa(task_exception, TaskFailedException)
25+
task_exception = task_exception.task.exception
26+
end
27+
gRPCStatus(false, string(task_exception), task_exception)
28+
end
1529
end
1630

1731
"""
18-
struct gRPCException
32+
struct gRPCServiceCallException
1933
message::String
2034
end
2135
22-
Every gRPC request returns the result and a future representing the status
23-
of the gRPC request. Use the `gRPCCheck` method on the status future to check
24-
the request status and throw a `gRPCException` if it is not successful.
25-
26-
A `gRPCException` has the following members:
36+
A `gRPCServiceCallException` is thrown if a gRPC request is not successful.
37+
It has the following members:
2738
2839
- `message`: any error message if request was not successful
2940
"""
30-
struct gRPCException <: Exception
41+
struct gRPCServiceCallException <: gRPCException
3142
message::String
3243
end
3344

45+
Base.show(io::IO, m::gRPCServiceCallException) = print(io, "gRPCServiceCallException - $(m.message)")
46+
3447
"""
3548
gRPCCheck(status; throw_error::Bool=true)
3649
37-
Check the response of a gRPC request and raise a `gRPCException` if it has
38-
failed. If `throw_error` is set to false, returns `true` or `false` indicating
39-
success instead.
50+
Every gRPC request returns the result and a future representing the status
51+
of the gRPC request. Check the response of a gRPC request and raise a
52+
`gRPCException` if it has failed. If `throw_error` is set to false, this
53+
returns `true` or `false` indicating success instead.
4054
"""
41-
gRPCCheck(status; throw_error::Bool=true) = gRPCCheck(fetch(status); throw_error=throw_error)
55+
gRPCCheck(status_future; throw_error::Bool=true) = gRPCCheck(gRPCStatus(status_future); throw_error=throw_error)
4256
function gRPCCheck(status::gRPCStatus; throw_error::Bool=true)
43-
if throw_error
44-
status.success || throw(gRPCException(status.message))
45-
else
46-
status.success
57+
if throw_error && !status.success
58+
if status.exception === nothing
59+
throw(gRPCServiceCallException(status.message))
60+
else
61+
throw(status.exception)
62+
end
4763
end
64+
status.success
4865
end
4966

5067
"""
@@ -55,6 +72,9 @@ end
5572
[ revocation::Bool = true, ]
5673
[ request_timeout::Real = Inf, ]
5774
[ connect_timeout::Real = 0, ]
75+
[ max_message_length = DEFAULT_MAX_MESSAGE_LENGTH, ]
76+
[ max_recv_message_length = 0, ]
77+
[ max_send_message_length = 0, ]
5878
[ verbose::Bool = false, ]
5979
)
6080
@@ -70,6 +90,11 @@ Contains settings to control the behavior of gRPC requests.
7090
- `request_timeout`: request timeout (seconds)
7191
- `connect_timeout`: connect timeout (seconds) (default is 300 seconds, same
7292
as setting this to 0)
93+
- `max_message_length`: maximum message length (default is 4MB)
94+
- `max_recv_message_length`: maximum message length to receive (default is
95+
`max_message_length`, same as setting this to 0)
96+
- `max_send_message_length`: maximum message length to send (default is
97+
`max_message_length`, same as setting this to 0)
7398
- `verbose`: whether to print out verbose communication logs (default false)
7499
"""
75100
struct gRPCController <: ProtoRpcController
@@ -79,6 +104,8 @@ struct gRPCController <: ProtoRpcController
79104
revocation::Bool
80105
request_timeout::Real
81106
connect_timeout::Real
107+
max_recv_message_length::Int
108+
max_send_message_length::Int
82109
verbose::Bool
83110

84111
function gRPCController(;
@@ -88,9 +115,18 @@ struct gRPCController <: ProtoRpcController
88115
revocation::Bool = true,
89116
request_timeout::Real = Inf,
90117
connect_timeout::Real = 0,
118+
max_message_length::Integer = DEFAULT_MAX_MESSAGE_LENGTH,
119+
max_recv_message_length::Integer = 0,
120+
max_send_message_length::Integer = 0,
91121
verbose::Bool = false
92122
)
93-
new(maxage, keepalive, negotiation, revocation, request_timeout, connect_timeout, verbose)
123+
if maxage < 0 || keepalive < 0 || request_timeout < 0 || connect_timeout < 0 ||
124+
max_message_length < 0 || max_recv_message_length < 0 || max_send_message_length < 0
125+
throw(ArgumentError("Invalid gRPCController parameter"))
126+
end
127+
(max_recv_message_length == 0) && (max_recv_message_length = max_message_length)
128+
(max_send_message_length == 0) && (max_send_message_length = max_message_length)
129+
new(maxage, keepalive, negotiation, revocation, request_timeout, connect_timeout, max_recv_message_length, max_send_message_length, verbose)
94130
end
95131
end
96132

@@ -118,13 +154,15 @@ struct gRPCChannel <: ProtoRpcChannel
118154
end
119155
end
120156

121-
function to_delimited_message_bytes(msg)
157+
function to_delimited_message_bytes(msg, max_message_length::Int)
122158
iob = IOBuffer()
123-
write(iob, UInt8(0)) # compression
124-
write(iob, hton(UInt32(0))) # message length (placeholder)
125-
data_len = writeproto(iob, msg) # message bytes
126-
seek(iob, 1) # seek out the message length placeholder
127-
write(iob, hton(UInt32(data_len))) # fill the message length
159+
limitiob = LimitIO(iob, max_message_length)
160+
write(limitiob, UInt8(0)) # compression
161+
write(limitiob, hton(UInt32(0))) # message length (placeholder)
162+
data_len = writeproto(limitiob, msg) # message bytes
163+
164+
seek(iob, 1) # seek out the message length placeholder
165+
write(iob, hton(UInt32(data_len))) # fill the message length
128166
take!(iob)
129167
end
130168

@@ -155,6 +193,8 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M
155193
revocation = controller.revocation,
156194
request_timeout = controller.request_timeout,
157195
connect_timeout = controller.connect_timeout,
196+
max_recv_message_length = controller.max_recv_message_length,
197+
max_send_message_length = controller.max_send_message_length,
158198
verbose = controller.verbose,
159199
)
160200
outchannel, status_future

0 commit comments

Comments
 (0)