Skip to content

Commit 77f66db

Browse files
pmeinhardtandreasknoepfle
authored andcommitted
Iterate on tarpipe implementation
1 parent 23029f9 commit 77f66db

File tree

1 file changed

+31
-16
lines changed

1 file changed

+31
-16
lines changed

examples/tarpipe.exs

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,35 @@ defmodule TP do
1313

1414
Stream.resource(
1515
fn ->
16-
{:ok, chan} = SSHKit.Channel.open(conn, [])
17-
command = SSHKit.Context.build(ctx, "tar -x")
18-
:success = SSHKit.Channel.exec(chan, command)
19-
2016
owner = self()
2117

2218
tarpipe = spawn(fn ->
19+
{:ok, chan} = SSHKit.Channel.open(conn, [])
20+
command = SSHKit.Context.build(ctx, "tar -x")
21+
:success = SSHKit.Channel.exec(chan, command)
22+
23+
# TODO: What if command immediately exits or does not exist?
24+
# IO.inspect(SSHKit.Channel.recv(chan, 1000))
25+
2326
{:ok, tar} = :erl_tar.init(chan, :write, fn
2427
:position, {^chan, position} ->
2528
# IO.inspect(position, label: "position")
2629
{:ok, 0}
2730

2831
:write, {^chan, data} ->
2932
# TODO: Send data in chunks based on channel window size?
30-
# IO.inspect(data, label: "write")
33+
IO.inspect(data, label: "write")
3134
# In case of failing upload, check command output:
3235
# IO.inspect(SSHKit.Channel.recv(chan, 0))
3336
chunk = to_binary(data)
3437

3538
receive do
3639
:cont ->
37-
:ok = SSHKit.Channel.send(chan, chunk)
40+
case SSHKit.Channel.send(chan, chunk) do
41+
:ok -> send(owner, {:write, chan, self(), chunk})
42+
other -> send(owner, {:error, chan, self(), other})
43+
end
3844
end
39-
send(owner, {:write, chan, self(), chunk})
4045
:ok
4146

4247
:close, ^chan ->
@@ -48,24 +53,34 @@ defmodule TP do
4853

4954
:ok = :erl_tar.add(tar, to_charlist(source), to_charlist(Path.basename(source)), [])
5055
:ok = :erl_tar.close(tar)
56+
57+
:ok = SSHKit.Channel.close(chan)
5158
end)
5259

53-
{chan, tarpipe}
60+
tarpipe
5461
end,
55-
fn {chan, tarpipe} ->
62+
fn tarpipe ->
5663
send(tarpipe, :cont)
5764

5865
receive do
59-
{:write, ^chan, ^tarpipe, data} ->
60-
{[{:write, chan, data}], {chan, tarpipe}}
66+
{:write, chan, ^tarpipe, data} ->
67+
{[{:write, chan, data}], tarpipe}
68+
69+
{:close, chan, ^tarpipe} ->
70+
{:halt, tarpipe}
6171

62-
{:close, ^chan, ^tarpipe} ->
63-
{:halt, {chan, tarpipe}}
72+
{:error, chan, ^tarpipe, error} ->
73+
IO.inspect(error, label: "received error")
74+
{:halt, tarpipe}
6475
end
76+
77+
# case Tarpipe.proceed(tarpipe) do
78+
# {:write, …} -> {[], tarpipe}
79+
# {:error, …} -> raise
80+
# end
6581
end,
66-
fn {chan, tarpipe} ->
67-
:ok = SSHKit.Channel.close(chan)
68-
:ok = SSHKit.Channel.flush(chan)
82+
fn tarpipe ->
83+
nil # :ok = Tarpipe.close(tarpipe)
6984
end
7085
)
7186
end

0 commit comments

Comments
 (0)