Skip to content

Commit 62b2b6c

Browse files
committed
feat[net]: add broker server and client
1 parent 6dd3e44 commit 62b2b6c

File tree

5 files changed

+244
-0
lines changed

5 files changed

+244
-0
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (C) 2020 The zfoo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
10+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package com.zfoo.net.core.broker;
15+
16+
import com.zfoo.net.NetContext;
17+
import com.zfoo.net.handler.BaseRouteHandler;
18+
import com.zfoo.net.util.SessionUtils;
19+
import io.netty.channel.ChannelHandler;
20+
import io.netty.channel.ChannelHandlerContext;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
* @author jaysunxiao
26+
*/
27+
@ChannelHandler.Sharable
28+
public class BrokerClientRouteHandler extends BaseRouteHandler {
29+
30+
private static final Logger logger = LoggerFactory.getLogger(BrokerClientRouteHandler.class);
31+
32+
@Override
33+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
34+
super.channelActive(ctx);
35+
36+
BrokerTcpClient.brokers.add(ctx.channel());
37+
var session = SessionUtils.getSession(ctx);
38+
NetContext.getRouter().send(session, new BrokerRegisterAsk());
39+
logger.info("broker client activate in sid:[{}]", session.getSid());
40+
}
41+
42+
@Override
43+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
44+
super.channelInactive(ctx);
45+
46+
var session = SessionUtils.getSession(ctx);
47+
BrokerTcpClient.brokers.remove(ctx.channel());
48+
logger.warn("broker client inactivate in sid:[{}]", session.getSid());
49+
}
50+
51+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (C) 2020 The zfoo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
10+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package com.zfoo.net.core.broker;
14+
15+
import com.zfoo.protocol.anno.Protocol;
16+
17+
/**
18+
* @author jaysunxiao
19+
*/
20+
@Protocol(id = 150)
21+
public class BrokerRegisterAsk {
22+
23+
// 不带gpu的服务器
24+
public static final int HOME = 1;
25+
// 带了gpu的服务器
26+
public static final int HOME_GPU = 2;
27+
28+
private int brokerType;
29+
30+
public BrokerRegisterAsk() {
31+
}
32+
33+
public BrokerRegisterAsk(int brokerType) {
34+
this.brokerType = brokerType;
35+
}
36+
37+
public int getBrokerType() {
38+
return brokerType;
39+
}
40+
41+
public void setBrokerType(int brokerType) {
42+
this.brokerType = brokerType;
43+
}
44+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (C) 2020 The zfoo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
10+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package com.zfoo.net.core.broker;
15+
16+
import com.zfoo.net.NetContext;
17+
import com.zfoo.net.handler.BaseRouteHandler;
18+
import com.zfoo.net.packet.DecodedPacketInfo;
19+
import com.zfoo.net.util.SessionUtils;
20+
import io.netty.channel.ChannelHandler;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
/**
26+
* @author jaysunxiao
27+
*/
28+
@ChannelHandler.Sharable
29+
public class BrokerServerRouteHandler extends BaseRouteHandler {
30+
31+
private static final Logger logger = LoggerFactory.getLogger(BrokerServerRouteHandler.class);
32+
33+
@Override
34+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
35+
var session = SessionUtils.getSession(ctx);
36+
if (session == null) {
37+
return;
38+
}
39+
DecodedPacketInfo decodedPacketInfo = (DecodedPacketInfo) msg;
40+
if (decodedPacketInfo.getPacket() instanceof BrokerRegisterAsk) {
41+
BrokerTcpServer.brokers.add(ctx.channel());
42+
return;
43+
}
44+
NetContext.getRouter().receive(session, decodedPacketInfo.getPacket(), decodedPacketInfo.getAttachment());
45+
}
46+
47+
@Override
48+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
49+
super.channelActive(ctx);
50+
logger.info("broker server channel is active {}", SessionUtils.sessionInfo(ctx));
51+
}
52+
53+
@Override
54+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
55+
super.channelInactive(ctx);
56+
BrokerTcpServer.brokers.remove(ctx.channel());
57+
logger.warn("broker server channel is inactive {}", SessionUtils.sessionSimpleInfo(ctx));
58+
}
59+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (C) 2020 The zfoo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
10+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package com.zfoo.net.core.broker;
15+
16+
import com.zfoo.net.core.AbstractClient;
17+
import com.zfoo.net.core.HostAndPort;
18+
import com.zfoo.net.handler.codec.tcp.TcpCodecHandler;
19+
import com.zfoo.net.handler.idle.ClientIdleHandler;
20+
import io.netty.channel.Channel;
21+
import io.netty.channel.socket.SocketChannel;
22+
import io.netty.handler.timeout.IdleStateHandler;
23+
24+
import java.util.concurrent.CopyOnWriteArrayList;
25+
26+
/**
27+
* @author jaysunxiao
28+
*/
29+
public class BrokerTcpClient extends AbstractClient<SocketChannel> {
30+
31+
public static final CopyOnWriteArrayList<Channel> brokers = new CopyOnWriteArrayList<>();
32+
33+
public BrokerTcpClient(HostAndPort host) {
34+
super(host);
35+
}
36+
37+
@Override
38+
protected void initChannel(SocketChannel channel) {
39+
// 可以看出来,这个客户端检测到空闲的时间是60s,相对短一点,这样子就可以发送心跳。
40+
// 服务器端则是180s,相对长一点,一旦检测到空闲,则把客户端踢掉。
41+
channel.pipeline().addLast(new IdleStateHandler(0, 0, 60));
42+
channel.pipeline().addLast(new ClientIdleHandler());
43+
channel.pipeline().addLast(new TcpCodecHandler());
44+
channel.pipeline().addLast(new BrokerClientRouteHandler());
45+
}
46+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (C) 2020 The zfoo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
10+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package com.zfoo.net.core.broker;
15+
16+
import com.zfoo.net.core.AbstractServer;
17+
import com.zfoo.net.core.HostAndPort;
18+
import com.zfoo.net.handler.codec.tcp.TcpCodecHandler;
19+
import com.zfoo.net.handler.idle.ServerIdleHandler;
20+
import io.netty.channel.Channel;
21+
import io.netty.channel.socket.SocketChannel;
22+
import io.netty.handler.timeout.IdleStateHandler;
23+
24+
import java.util.concurrent.CopyOnWriteArrayList;
25+
26+
/**
27+
* @author jaysunxiao
28+
*/
29+
public class BrokerTcpServer extends AbstractServer<SocketChannel> {
30+
31+
public static final CopyOnWriteArrayList<Channel> brokers = new CopyOnWriteArrayList<>();
32+
33+
public BrokerTcpServer(HostAndPort host) {
34+
super(host);
35+
}
36+
37+
@Override
38+
protected void initChannel(SocketChannel channel) throws Exception {
39+
channel.pipeline().addLast(new IdleStateHandler(0, 0, 180));
40+
channel.pipeline().addLast(new ServerIdleHandler());
41+
channel.pipeline().addLast(new TcpCodecHandler());
42+
channel.pipeline().addLast(new BrokerServerRouteHandler());
43+
}
44+
}

0 commit comments

Comments
 (0)