Skip to content

Commit 23029f9

Browse files
pmeinhardtandreasknoepfle
authored andcommitted
Iterate on tarpipe upload implementation
1 parent e889b26 commit 23029f9

File tree

1 file changed

+70
-39
lines changed

1 file changed

+70
-39
lines changed

examples/tarpipe.exs

Lines changed: 70 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,59 +3,90 @@
33
ctx =
44
SSHKit.Context.new()
55
|> SSHKit.Context.path("/tmp")
6-
|> SSHKit.Context.user("other")
7-
|> SSHKit.Context.group("other")
6+
# |> SSHKit.Context.user("other")
7+
# |> SSHKit.Context.group("other")
88
|> SSHKit.Context.umask("0077")
99

10-
defmodule Xfer do
11-
# https://github.com/erlang/otp/blob/OTP-23.2.1/lib/ssh/src/ssh.hrl
12-
def to_binary(data) when is_list(data) do
13-
:erlang.iolist_to_binary(data)
14-
catch
15-
_ -> :unicode.characters_to_binary(data)
16-
end
10+
defmodule TP do
11+
def upload!(conn, source, dest, opts \\ []) do
12+
ctx = Keyword.get(opts, :context, SSHKit.Context.new())
1713

18-
def to_binary(data) when is_binary(data) do
19-
data
20-
end
21-
end
14+
Stream.resource(
15+
fn ->
16+
{:ok, chan} = SSHKit.Channel.open(conn, [])
17+
command = SSHKit.Context.build(ctx, "tar -x")
18+
:success = SSHKit.Channel.exec(chan, command)
2219

23-
source = "test/fixtures"
20+
owner = self()
2421

25-
:ok =
26-
with {:ok, chan} <- SSHKit.Channel.open(conn, []) do
27-
command = SSHKit.Context.build(ctx, "tar -x")
22+
tarpipe = spawn(fn ->
23+
{:ok, tar} = :erl_tar.init(chan, :write, fn
24+
:position, {^chan, position} ->
25+
# IO.inspect(position, label: "position")
26+
{:ok, 0}
2827

29-
case SSHKit.Channel.exec(chan, command) do
30-
:success ->
31-
# In case of failed upload, check command output:
32-
# IO.inspect(SSHKit.Channel.recv(chan))
28+
:write, {^chan, data} ->
29+
# TODO: Send data in chunks based on channel window size?
30+
# IO.inspect(data, label: "write")
31+
# In case of failing upload, check command output:
32+
# IO.inspect(SSHKit.Channel.recv(chan, 0))
33+
chunk = to_binary(data)
3334

34-
{:ok, tar} = :erl_tar.init(chan, :write, fn
35-
:position, {^chan, position} ->
36-
# IO.write("tar position: #{inspect(position)}")
37-
{:ok, 0}
35+
receive do
36+
:cont ->
37+
:ok = SSHKit.Channel.send(chan, chunk)
38+
end
39+
send(owner, {:write, chan, self(), chunk})
40+
:ok
3841

39-
:write, {^chan, data} ->
40-
# TODO: Send data in chunks based on channel window size?
41-
:ok = SSHKit.Channel.send(chan, Xfer.to_binary(data))
42-
:ok
42+
:close, ^chan ->
43+
# IO.puts("close")
44+
:ok = SSHKit.Channel.eof(chan)
45+
send(owner, {:close, chan, self()})
46+
:ok
47+
end)
4348

44-
:close, ^chan ->
45-
:ok = SSHKit.Channel.eof(chan)
46-
:ok
49+
:ok = :erl_tar.add(tar, to_charlist(source), to_charlist(Path.basename(source)), [])
50+
:ok = :erl_tar.close(tar)
4751
end)
4852

49-
:ok = :erl_tar.add(tar, to_charlist(source), to_charlist(Path.basename(source)), [])
53+
{chan, tarpipe}
54+
end,
55+
fn {chan, tarpipe} ->
56+
send(tarpipe, :cont)
5057

51-
:ok = :erl_tar.close(tar)
58+
receive do
59+
{:write, ^chan, ^tarpipe, data} ->
60+
{[{:write, chan, data}], {chan, tarpipe}}
5261

53-
:failure ->
54-
{:error, :failure}
62+
{:close, ^chan, ^tarpipe} ->
63+
{:halt, {chan, tarpipe}}
64+
end
65+
end,
66+
fn {chan, tarpipe} ->
67+
:ok = SSHKit.Channel.close(chan)
68+
:ok = SSHKit.Channel.flush(chan)
69+
end
70+
)
71+
end
5572

56-
other ->
57-
other
58-
end
73+
# https://github.com/erlang/otp/blob/OTP-23.2.1/lib/ssh/src/ssh.hrl
74+
def to_binary(data) when is_list(data) do
75+
:erlang.iolist_to_binary(data)
76+
catch
77+
_ -> :unicode.characters_to_binary(data)
5978
end
6079

80+
def to_binary(data) when is_binary(data) do
81+
data
82+
end
83+
end
84+
85+
stream = TP.upload!(conn, "test/fixtures", "upload", context: ctx)
86+
87+
Enum.each(stream, fn
88+
{:write, chan, data} ->
89+
IO.puts("Upload, sent #{byte_size(data)} bytes")
90+
end)
91+
6192
:ok = SSHKit.close(conn)

0 commit comments

Comments
 (0)