-
Notifications
You must be signed in to change notification settings - Fork 3.8k
2019 12 01 websocket与下位机通过netty方式通信传输行为信息
作者:小傅哥
博客:https://bugstack.cn - 原创系列专题
沉淀、分享、成长,让自己和他人都能有所收获!
微信公众号:bugstack虫洞栈 沉淀、分享、成长,专注于原创专题案例,以最易学习编程的方式分享知识,让自己和他人都能有所收获。目前已完成的专题有;Netty4.x实战专题案例、用Java实现JVM、基于JavaAgent的全链路监控、手写RPC框架、架构设计专题案例[Ing]等。
在物联网开发中,常常需要通过网页端来控制设备,包括;获取信息、执行操作、启动停止等,就像我们在手机上会控制家里的小米盒子、路由器、电饭煲或者在线养狗等一些设备一样。在这里所有的下层设备都可以通过socket通信链接到服务端,而用户一端在通过http链接或者websocket链接到服务端,通过发送和接收数据来做出相应的行为操作。如下图;
- 本章节整合Springboot+Netty,通过部署nettySocket与webSocket两套服务端,来接收转发行为消息。
- 客户端采用js链接websocket,用于接收服务端反馈与发送指令,用于获取下位机信息。
- 在test中启动一个模拟下位机,用于反馈信息数据。在真实开发中下位机与服务端通信通常是定义好的字节码,需要自己编写解码器。
- jdk 1.8.0
- IntelliJ IDEA Community Edition 2018.3.1 x64
- Netty 4.1.36.Final
itstack-demo-netty-3-01
└── src
├── main
│ ├── java
│ │ └── org.itstack.demo.ark
│ │ ├── domain
│ │ │ ├── msgobj
│ │ │ │ ├── Feedback.java
│ │ │ │ ├── QueryInfoReq.java
│ │ │ │ └── Text.java
│ │ │ ├── Device.java
│ │ │ ├── EasyResult.java
│ │ │ ├── InfoProtocol.java
│ │ │ └── ServerInfo.java
│ │ ├── server
│ │ │ ├── socket
│ │ │ │ ├── MyChannelInitializer.java
│ │ │ │ ├── MyServerHandler.java
│ │ │ │ └── NettyServer.java
│ │ │ └── websocket
│ │ │ ├── MyWsChannelInitializer.java
│ │ │ ├── MyWsServerHandler.java
│ │ │ └── WsNettyServer.java
│ │ ├── util
│ │ │ ├── CacheUtil.java
│ │ │ ├── MsgUtil.java
│ │ │ └── MsgUtil.java
│ │ ├── web
│ │ │ └── NettyController.java
│ │ └── Application.java
│ └── resources
│ │ └── application.yml
│ └── webapp
│ ├── arkWs
│ │ ├── js
│ │ │ └── ws.js
│ │ └── arkWsControlCenter.html
│ ├── res
│ └── WEB-INF
│ └── index.jsp
│
└── test
└── java
└── org.itstack.demo.test
└── ApiTest.java
演示部分重点代码块,完整代码下载关注公众号;bugstack虫洞栈,回复Netty案例
server/socket/MyServerHandler.java & socket数据处理
- 当有下位机链接服务端时,构建下位机信息,实际使用可以通过注册方式进行链接验证。
- 当收到下位机信息后转发到websocket端,使网页端收到下位机信息反馈
public class MyServerHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(MyServerHandler.class);
/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
String channelId = channel.id().toString();
System.out.println("链接报告开始");
System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channelId);
System.out.println("链接报告IP:" + channel.localAddress().getHostString());
System.out.println("链接报告Port:" + channel.localAddress().getPort());
System.out.println("链接报告完毕");
//构建设备信息{下位机、中继器、IO板卡}
Device device = new Device();
device.setChannelId(channelId);
device.setNumber(UUID.randomUUID().toString());
device.setIp(channel.remoteAddress().getHostString());
device.setPort(channel.remoteAddress().getPort());
device.setConnectTime(new Date());
//添加设备信息
deviceGroup.put(channelId, device);
CacheUtil.cacheClientChannel.put(channelId, channel);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object objMsgJsonStr) throws Exception {
//接收设备发来信息
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息内容:" + objMsgJsonStr);
//转发消息到Ws
CacheUtil.wsChannelGroup.writeAndFlush(new TextWebSocketFrame(objMsgJsonStr.toString()));
}
}
server/websocket/MyWsServerHandler.java & websocket数据处理
- websocket数据需要转换后使用,可以支持文本消息,本案例中使用json字符串,方便对象传输
- channelRead转发数据,当收到数据后发送给下位机,主要通过内容中channel控制
public class MyWsServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
...
//ws
if (msg instanceof WebSocketFrame) {
WebSocketFrame webSocketFrame = (WebSocketFrame) msg;
//关闭请求
if (webSocketFrame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) webSocketFrame.retain());
return;
}
//ping请求
if (webSocketFrame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(webSocketFrame.content().retain()));
return;
}
//只支持文本格式,不支持二进制消息
if (!(webSocketFrame instanceof TextWebSocketFrame)) {
throw new Exception("仅支持文本格式");
}
String request = ((TextWebSocketFrame) webSocketFrame).text();
System.out.println("服务端收到:" + request);
InfoProtocol infoProtocol = JSON.parseObject(request, InfoProtocol.class);
//socket转发消息
String channelId = infoProtocol.getChannelId();
Channel channel = CacheUtil.cacheClientChannel.get(channelId);
if (null == channel) return;
channel.writeAndFlush(MsgUtil.buildMsg(infoProtocol));
//websocket消息反馈发送成功
ctx.writeAndFlush(MsgUtil.buildWsMsgText(ctx.channel().id().toString(), "向下位机传达消息success!"));
}
}
}
web/NettyController.java & 控制层方便获取服务端信息
- 控制层提供了查询服务列表、链接设备信息、以及主动触发信息发送
- 另外如果需要将服务端的启动关闭进行手动控制,可以在这里面提供方法供调用
@Controller
public class NettyController {
private Logger logger = LoggerFactory.getLogger(NettyController.class);
@RequestMapping("/index")
public String index() {
return "index";
}
@RequestMapping("/queryNettyServerList")
@ResponseBody
public Collection<ServerInfo> queryNettyServerList() {
try {
Collection<ServerInfo> serverInfos = CacheUtil.serverInfoMap.values();
logger.info("查询服务端列表。{}", JSON.toJSONString(serverInfos));
return serverInfos;
} catch (Exception e) {
logger.info("查询服务端列表失败。", e);
return null;
}
}
@RequestMapping("/queryDeviceList")
@ResponseBody
public Collection<Device> queryDeviceList() {
try {
Collection<Device> deviceInfos = CacheUtil.deviceGroup.values();
logger.info("查询设备链接列表。{}", JSON.toJSONString(deviceInfos));
return deviceInfos;
} catch (Exception e) {
logger.info("查询设备链接列表失败。", e);
return null;
}
}
@RequestMapping("/doSendMsg")
@ResponseBody
public EasyResult doSendMsg(String reqStr) {
try {
logger.info("向设备发送信息[可以通过另外一个Web服务调用本接口发送信息]:{}", reqStr);
InfoProtocol infoProtocol = MsgUtil.getMsg(reqStr);
String channelId = infoProtocol.getChannelId();
Channel channel = CacheUtil.cacheClientChannel.get(channelId);
channel.writeAndFlush(MsgUtil.buildMsg(infoProtocol));
return EasyResult.buildSuccessResult();
} catch (Exception e) {
logger.error("向设备发送信息失败:{}", reqStr, e);
return EasyResult.buildErrResult(e);
}
}
}
Application.java & 启动层
- 通过继承CommandLineRunner方法,在服务就绪后启动socket服务
- 启动后需要循环验证是否启动完成
@SpringBootApplication
@ComponentScan("org.itstack.demo.ark")
public class Application implements CommandLineRunner {
private Logger logger = LoggerFactory.getLogger(Application.class);
@Value("${netty.socket.port}")
private int nettyServerPort;
@Value("${netty.websocket.port}")
private int nettyWsServerPort;
//默认线程池
private static ExecutorService executorService = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
//启动NettyServer-socket
logger.info("启动NettyServer服务,启动端口:{}", nettyServerPort);
NettyServer nettyServer = new NettyServer(new InetSocketAddress(nettyServerPort));
Future<Channel> future = executorService.submit(nettyServer);
Channel channel = future.get();
if (null == channel) {
throw new RuntimeException("netty server-s open error channel is null");
}
while (!channel.isActive()) {
logger.info("启动NettyServer服务,循环等待启动...");
Thread.sleep(500);
}
logger.info("启动NettyServer服务,完成:{}", channel.localAddress());
CacheUtil.serverInfoMap.put(nettyServerPort, new ServerInfo("NettySocket", NetUtil.getHost(), nettyServerPort, new Date()));
//启动NettyServer-websocket
logger.info("启动NettyWsServer服务,启动端口:{}", nettyWsServerPort);
WsNettyServer wsNettyServer = new WsNettyServer(new InetSocketAddress(nettyWsServerPort));
Future<Channel> wsFuture = executorService.submit(wsNettyServer);
Channel wsChannel = wsFuture.get();
if (null == wsChannel) {
throw new RuntimeException("netty server-ws open error channel is null");
}
while (!wsChannel.isActive()) {
logger.info("启动NettyWsServer服务,循环等待启动...");
Thread.sleep(500);
}
logger.info("启动NettyWsServer服务,完成:{}", wsChannel.localAddress());
CacheUtil.serverInfoMap.put(nettyServerPort, new ServerInfo("NettyWsSocket", NetUtil.getHost(), nettyServerPort, new Date()));
}
}
webapp/arkWs/js/ws.js & 链接websocket服务端
socket = new WebSocket("ws://localhost:7398/websocket");
socket.onmessage = function(event){
var msg = JSON.parse(event.data);
console.info(msg);
$("#msgBox").val($("#msgBox").val() + event.data + "\r\n");
};
- 分别启动如下内容;
- Application.java,Plugins/spring-boot/spring-boot:run
- ApiTest.java,右键启动模拟下位机
- 打开服务端链接;http://localhost:8080/ http://localhost:8080/arkWs/arkWsControlCenter.html
- 发送模拟信息,观察执行结果;
2019-12-01 15:11:49.965 INFO 7620 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring FrameworkServlet 'dispatcherServlet' 2019-12-01 15:11:49.965 INFO 7620 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started 2019-12-01 15:11:49.980 INFO 7620 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 15 ms 2019-12-01 15:11:51.157 INFO 7620 --- [nio-8080-exec-3] o.itstack.demo.ark.web.NettyController : 查询设备链接列表。[{"channelId":"281d1279","connectTime":1575184302964,"ip":"127.0.0.1","number":"74de0967-c0b4-4426-a9d1-183feaff2acf","port":3972}] 2019-12-01 15:11:51.162 INFO 7620 --- [io-8080-exec-10] o.itstack.demo.ark.web.NettyController : 查询服务端列表。[{"ip":"10.13.70.50","openDate":1575184290501,"port":7397,"typeInfo":"NettyWsSocket"}] 2019-12-01 15:11:58.476 INFO 7620 --- [ntLoopGroup-7-1] o.i.d.a.s.websocket.MyWsServerHandler : 链接报告开始 2019-12-01 15:11:58.476 INFO 7620 --- [ntLoopGroup-7-1] o.i.d.a.s.websocket.MyWsServerHandler : 链接报告信息:有一客户端链接到本服务端 2019-12-01 15:11:58.476 INFO 7620 --- [ntLoopGroup-7-1] o.i.d.a.s.websocket.MyWsServerHandler : 链接报告IP:0:0:0:0:0:0:0:1 2019-12-01 15:11:58.476 INFO 7620 --- [ntLoopGroup-7-1] o.i.d.a.s.websocket.MyWsServerHandler : 链接报告Port:7398 2019-12-01 15:11:58.476 INFO 7620 --- [ntLoopGroup-7-1] o.i.d.a.s.websocket.MyWsServerHandler : 链接报告完毕 服务端收到:{"channelId":"281d1279","msgType":2,"msgObj":{"stateType":"1"}} 2019-12-01 15:12:05 接收到消息内容:{"msgObj":{"stateMsg":"温度信息:91.31334894176383°C","stateType":1,"channelId":"93c1120a"},"msgType":3,"channelId":"93c1120a"} 服务端收到:{"channelId":"281d1279","msgType":2,"msgObj":{"stateType":"1"}} 2019-12-01 15:12:05 接收到消息内容:{"msgObj":{"stateMsg":"温度信息:44.83794772946285°C","stateType":1,"channelId":"93c1120a"},"msgType":3,"channelId":"93c1120a"}
- 在使用springboot与netty结合,开发一个简便的服务端还是很方便的,而且在集合一些springcloud的服务,可以使项目工程更加完善。
- 可以尝试做一些设备控制服务,在我们不在家的时候也可以通过一个h5链接控制家里的设备,比如快到家将热水器打开。
- 本案例还偏向于模拟,在实际开发过程中还是会出现很多业务问题需要解决,尤其是服务端与下位机的通信,需要编写编码器与解码器。
微信搜索「bugstack虫洞栈」公众号,关注后回复「netty案例」获取本文源码&更多原创专题案例!
小傅哥(微信:fustack),公众号:bugstack虫洞栈
| bugstack.cn - 沉淀、分享、成长,让自己和他人都能有所收获!
🌏 知识星球:码农会锁
实战项目:「DDD+RPC分布式抽奖系统
」、专属小册、问题解答、简历指导、架构图稿、视频课程
🐲 头条
-
💥
🎁 Lottery 抽奖系统
- 基于领域驱动设计的四层架构的互联网分布式开发实践 -
小傅哥的《重学 Java 设计模式》
- 全书彩印、重绘类图、添加内容 -
⭐小傅哥的《Java 面经手册》
- 全书5章29节,417页11.5万字,完稿&发版 -
小傅哥的《手撸 Spring》
- 通过带着读者手写简化版 Spring 框架,了解 Spring 核心原理 -
🌈小傅哥的《SpringBoot 中间件设计和开发》
- 小册16个中间件开发30个代码库
⛳ 目录
💋 精选
🐾 友链
建立本开源项目的初衷是基于个人学习与工作中对 Java 相关技术栈的总结记录,在这里也希望能帮助一些在学习 Java 过程中遇到问题的小伙伴,如果您需要转载本仓库的一些文章到自己的博客,请按照以下格式注明出处,谢谢合作。
作者:小傅哥
链接:https://bugstack.cn
来源:bugstack虫洞栈
2021年10月24日,小傅哥
的文章全部开源到代码库 CodeGuide
中,与同好同行,一起进步,共同维护。
这里我提供 3 种方式:
-
提出
Issue
:在 Issue 中指出你觉得需要改进/完善的地方(能够独立解决的话,可以在提出 Issue 后再提交PR
)。 -
处理
Issue
: 帮忙处理一些待处理的Issue
。 -
提交
PR
: 对于错别字/笔误这类问题可以直接提交PR
,无需提交Issue
确认。
详细参考:CodeGuide 贡献指南 - 非常感谢你的支持,这里会留下你的足迹
- 加群交流 本群的宗旨是给大家提供一个良好的技术学习交流平台,所以杜绝一切广告!由于微信群人满 100 之后无法加入,请扫描下方二维码先添加作者 “小傅哥” 微信(fustack),备注:加群。
- 公众号(bugstack虫洞栈) - 沉淀、分享、成长,专注于原创专题案例,以最易学习编程的方式分享知识,让自己和他人都能有所收获。
感谢以下人员对本仓库做出的贡献或者对小傅哥的赞赏,当然不仅仅只有这些贡献者,这里就不一一列举了。如果你希望被添加到这个名单中,并且提交过 Issue 或者 PR,请与我联系。