Skip to content

Optionally queue outgoing data #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/xmpp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
compress/1,
compress/2,
reset_stream/1,
send_elements/2,
send_element/2,
send_header/2,
send_trailer/1,
Expand Down Expand Up @@ -196,6 +197,12 @@ reset_stream(#socket_state{xml_stream = XMLStream,
SocketData#socket_state{socket = Socket1}
end.

-spec send_elements(socket_state(), [fxml:xmlel()]) -> ok | {error, inet:posix()}.
send_elements(#socket_state{xml_stream = undefined}, _Els) ->
erlang:error(not_implemented);
send_elements(SocketData, Els) ->
send(SocketData, list_to_binary([fxml:element_to_binary(El) || El <- Els])).

-spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
send_element(#socket_state{xml_stream = undefined} = SocketData, El) ->
send_xml(SocketData, {xmlstreamelement, El});
Expand Down
152 changes: 127 additions & 25 deletions src/xmpp_stream_in.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
%% API
-export([start/3, start_link/3, call/3, cast/2, reply/2, stop/1, stop_async/1,
accept/1, send/2, close/1, close/2, send_error/3, establish/1,
get_transport/1, change_shaper/2, set_timeout/2, format_error/1,
send_ws_ping/1]).
get_transport/1, change_shaper/2, configure_queue/3, set_timeout/2,
format_error/1, send_ws_ping/1]).

%% gen_server callbacks
-export([init/1, handle_cast/2, handle_call/3, handle_info/2,
Expand Down Expand Up @@ -58,6 +58,9 @@
stream_encrypted => boolean(),
stream_version => {non_neg_integer(), non_neg_integer()},
stream_authenticated => boolean(),
stream_queue := [xmpp_element() | xmlel()],
stream_queue_max := non_neg_integer(),
stream_queue_timeout => {non_neg_integer(), integer()},
ip => {inet:ip_address(), inet:port_number()},
codec_options => [xmpp:decode_option()],
xmlns => binary(),
Expand Down Expand Up @@ -226,7 +229,21 @@ close(Pid, Reason) ->
establish(State) ->
process_stream_established(State).

-spec set_timeout(state(), non_neg_integer() | infinity) -> state().
-spec configure_queue(state(), non_neg_integer(), non_neg_integer()) -> state().
configure_queue(#{owner := Owner} = State, MaxSize, MaxDelay)
when Owner == self() ->
flush_queue(State), % Support reconfiguration.
if MaxSize == 0; MaxDelay == 0 ->
State#{stream_queue_max => 0};
true ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
State#{stream_queue_max => MaxSize,
stream_queue_timeout => {MaxDelay, CurrentTime}}
end;
configure_queue(_, _, _) ->
erlang:error(badarg).

-spec set_timeout(state(), timeout()) -> state().
set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() ->
case Timeout of
infinity -> State#{stream_timeout => infinity};
Expand Down Expand Up @@ -280,7 +297,9 @@ init([Mod, {SockMod, Socket}, Opts]) ->
socket_mod => SockMod,
socket_opts => Opts,
stream_timeout => {Timeout, Time},
stream_state => accepting},
stream_state => accepting,
stream_queue => [],
stream_queue_max => 0},
{ok, State, Timeout}.

-spec handle_cast(term(), state()) -> next_state().
Expand Down Expand Up @@ -424,6 +443,8 @@ handle_info({'$gen_all_state_event', {xmlstreamcdata, Data}},
noreply(try callback(handle_cdata, Data, State)
catch _:{?MODULE, undef} -> State
end);
handle_info(timeout, #{stream_queue := [_|_]} = State) ->
noreply(flush_queue(State));
handle_info(timeout, #{lang := Lang} = State) ->
Disconnected = is_disconnected(State),
noreply(try callback(handle_timeout, State)
Expand Down Expand Up @@ -522,15 +543,32 @@ init_state(#{socket := Socket, mod := Mod} = State, Opts) ->
end.

-spec noreply(state()) -> noreply();
({stop, state()}) -> {stop, normal, state()}.
({stop, state()}) -> {stop, normal, state()};
({stop, normal, state()}) -> {stop, normal, state()}.
noreply({stop, State}) ->
{stop, normal, State};
noreply(#{stream_timeout := infinity} = State) ->
{noreply, State, infinity};
noreply(#{stream_timeout := {MSecs, StartTime}} = State) ->
noreply({stop, normal, State}) ->
{stop, normal, State};
noreply(State) ->
{noreply, State, get_timeout(State)}.

-spec get_timeout(state()) -> timeout().
get_timeout(State) ->
min(get_stream_timeout(State), get_queue_timeout(State)).

-spec get_stream_timeout(state()) -> timeout().
get_stream_timeout(#{stream_timeout := infinity}) ->
infinity;
get_stream_timeout(#{stream_timeout := {MSecs, StartTime}}) ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
Timeout = max(0, MSecs - CurrentTime + StartTime),
{noreply, State, Timeout}.
max(0, MSecs - CurrentTime + StartTime).

-spec get_queue_timeout(state()) -> timeout().
get_queue_timeout(#{stream_queue := []}) ->
infinity;
get_queue_timeout(#{stream_queue_timeout := {MSecs, StartTime}}) ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
max(0, MSecs - CurrentTime + StartTime).

-spec is_disconnected(state()) -> boolean().
is_disconnected(#{stream_state := StreamState}) ->
Expand Down Expand Up @@ -1193,21 +1231,29 @@ send_header(State, _) ->

-spec send_pkt(state(), xmpp_element() | xmlel()) -> state().
send_pkt(State, Pkt) ->
Result = socket_send(State, Pkt),
State1 = try callback(handle_send, Pkt, Result, State)
catch _:{?MODULE, undef} -> State
end,
case Result of
_ when is_record(Pkt, stream_error) ->
process_stream_end({stream, {out, Pkt}}, State1);
ok ->
State1;
{error, _Why} ->
% Queue process_stream_end instead of calling it directly,
% so we have opportunity to process incoming queued messages before
% terminating session.
self() ! {'$gen_event', closed},
State1
case check_queue(State, Pkt) of
flush ->
flush_queue(State, Pkt);
queue ->
queue_pkt(State, Pkt);
noqueue ->
State1 = flush_queue(State),
Result = socket_send(State1, Pkt),
State2 = try callback(handle_send, Pkt, Result, State1)
catch _:{?MODULE, undef} -> State1
end,
case Result of
_ when is_record(Pkt, stream_error) ->
process_stream_end({stream, {out, Pkt}}, State2);
ok ->
State2;
{error, _Why} ->
% Queue process_stream_end instead of calling it directly,
% so we have the opportunity to process incoming queued
% messages before terminating the session.
self() ! {'$gen_event', closed},
State2
end
end.

-spec send_error(state(), xmpp_element() | xmlel(), stanza_error()) -> state().
Expand Down Expand Up @@ -1258,6 +1304,62 @@ close_socket(#{socket := Socket} = State) ->
close_socket(State) ->
State.

-spec check_queue(state(), xmpp_element() | xmlel()) -> flush | queue | noqueue.
check_queue(#{stream_queue_max := 0}, _Pkt) ->
noqueue;
check_queue(#{stream_state := StreamState}, _Pkt)
when StreamState /= established->
noqueue;
check_queue(_State, Pkt)
when not ?is_stanza(Pkt),
not is_record(Pkt, sm_a),
not is_record(Pkt, sm_r) ->
noqueue;
check_queue(#{stream_queue := Q, stream_queue_max := MaxQueue}, _Pkt)
when length(Q) >= MaxQueue ->
flush;
check_queue(_State, _Pkt) ->
queue.

-spec queue_pkt(state(), xmpp_element() | xmlel()) -> state().
queue_pkt(#{stream_queue := [],
stream_queue_timeout := {MSecs, _PrevTime}} = State, Pkt) ->
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
State#{stream_queue := [Pkt],
stream_queue_timeout := {MSecs, CurrentTime}};
queue_pkt(#{stream_queue := Q} = State, Pkt) ->
State#{stream_queue := [Pkt|Q]}.

-spec flush_queue(state(), xmpp_element() | xmlel()) -> state().
flush_queue(State, Pkt) ->
flush_queue(queue_pkt(State, Pkt)).

-spec flush_queue(state()) -> state().
flush_queue(#{stream_queue := []} = State) ->
State;
flush_queue(#{stream_queue := Q0,
socket := Sock,
xmlns := NS} = State0) ->
Q = lists:reverse(Q0),
Els = [xmpp:encode(Pkt, NS) || Pkt <- Q],
Result = xmpp_socket:send_elements(Sock, Els),
State1 = State0#{stream_queue := []},
State2 = try lists:foldl(
fun(Pkt, State) ->
callback(handle_send, Pkt, Result, State)
end, State1, Q)
catch _:{?MODULE, undef} -> State1
end,
case Result of
ok ->
State2;
{error, _Why} ->
self() ! {'$gen_event', closed},
State2
end;
flush_queue(#{stream_queue := _Q} = State) -> % Socket has been released.
State#{stream_queue := []}.

-spec select_lang(binary(), binary()) -> binary().
select_lang(Lang, <<"">>) -> Lang;
select_lang(_, Lang) -> Lang.
Expand Down
Loading