13
13
14
14
package com .zfoo .net .core .proxy .handler ;
15
15
16
+ import com .zfoo .net .NetContext ;
16
17
import com .zfoo .net .core .proxy .TunnelProtocolClient2Server ;
17
18
import com .zfoo .net .core .proxy .TunnelProtocolServer2Client ;
19
+ import com .zfoo .net .core .proxy .TunnelServer ;
18
20
import com .zfoo .net .packet .PacketService ;
21
+ import com .zfoo .net .util .SessionUtils ;
22
+ import com .zfoo .protocol .buffer .ByteBufUtils ;
19
23
import com .zfoo .protocol .util .IOUtils ;
20
24
import com .zfoo .protocol .util .StringUtils ;
21
25
import io .netty .buffer .ByteBuf ;
22
26
import io .netty .channel .ChannelHandlerContext ;
23
27
import io .netty .handler .codec .ByteToMessageCodec ;
24
28
import io .netty .util .ReferenceCountUtil ;
29
+ import org .slf4j .Logger ;
30
+ import org .slf4j .LoggerFactory ;
25
31
26
32
import java .util .List ;
27
33
30
36
*/
31
37
public class TunnelServerCodecHandler extends ByteToMessageCodec <TunnelProtocolServer2Client > {
32
38
39
+ private static final Logger logger = LoggerFactory .getLogger (TunnelServerCodecHandler .class );
40
+
33
41
@ Override
34
42
protected void decode (ChannelHandlerContext ctx , ByteBuf in , List <Object > out ) {
35
43
// 不够读一个int
@@ -50,12 +58,41 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
50
58
return ;
51
59
}
52
60
53
- var retainedByteBuf = in .readRetainedSlice (length );
54
- try {
55
- TunnelProtocolClient2Server .read (ctx .channel (), retainedByteBuf );
56
- } catch (Throwable t ) {
57
- ReferenceCountUtil .release (retainedByteBuf );
61
+ var flag = in .readByte ();
62
+ length -= 1 ;
63
+
64
+ if (flag == TunnelProtocolClient2Server .FLAG_REGISTER ) {
65
+ TunnelServer .tunnels .add (ctx .channel ());
66
+ in .readSlice (length );
67
+ return ;
68
+ }
69
+
70
+ if (flag == TunnelProtocolClient2Server .FLAG_HEARTBEAT ) {
71
+ // in.readSlice(length);
72
+ return ;
73
+ }
74
+
75
+ var beforeReaderIndex = in .readerIndex ();
76
+ var sid = ByteBufUtils .readLong (in );
77
+ var uid = ByteBufUtils .readLong (in );
78
+
79
+ length -= in .readerIndex () - beforeReaderIndex ;
80
+
81
+ var session = NetContext .getSessionManager ().getServerSession (sid );
82
+ if (!SessionUtils .isActive (session )) {
83
+ in .readSlice (length );
84
+ logger .warn ("session:[{}] is not activate" , sid );
85
+ return ;
58
86
}
87
+
88
+ if (!session .getChannel ().isWritable ()) {
89
+ in .readSlice (length );
90
+ logger .warn ("session:[{}] is not writable" , sid );
91
+ return ;
92
+ }
93
+
94
+ var retainedByteBuf = in .readRetainedSlice (length );
95
+ session .getChannel ().writeAndFlush (TunnelProtocolServer2Client .valueOf (sid , uid , retainedByteBuf ));
59
96
}
60
97
61
98
@ Override
0 commit comments