Netty4:一个简单的消息传递的demo(分析和解析)

1.声明

当前内容主要用于本人学习和使用Netty,并记录其中的实际控制流程,和一些技巧

当前demo为:

  1. 客户端向服务器发送消息
  2. 服务器响应消息
  3. 优化其中的解析

2.基本demo

1.服务器端启动类


import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.hy.netty.server.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 
 * @author hy
 * @createTime 2021-04-17 07:38:13
 * @description 通过官方的文件创建一个netty服务器
 *
 */
public class NettyServer {
	private final int port;
	private final EventLoopGroup parentGroup;
	private final EventLoopGroup childGroup;
	private final ServerBootstrap bootstrap;

	public NettyServer(int port) {
		this.port = port;
		this.parentGroup = new NioEventLoopGroup();
		this.childGroup = new NioEventLoopGroup();
		bootstrap = new ServerBootstrap();
	}

	// 启动当前netty服务器的方法
	public void start() throws Exception {

		// 创建nettyServer的处理器
		final NettyServerHandler nettyServerHandler = new NettyServerHandler(this);
		bootstrap.group(parentGroup, childGroup) // 向当前的服务启动中添加组
				.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					public void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline()
						.addLast(nettyServerHandler)
						.addLast(new StringEncoder())
						.addLast(new StringDecoder());
					}
				}).childOption(ChannelOption.SO_KEEPALIVE, true);

		ChannelFuture future = bootstrap.bind(port).sync();

		if (future.isSuccess()) {
			System.out.println("服务端启动成功");
		} else {
			System.out.println("服务端启动失败");
			future.cause().printStackTrace();
		}

		future.channel().closeFuture().sync();// 这里必须要使用,否则又可能报错
	}

	// 关闭当前的Netty服务器
	public void close() {
		parentGroup.shutdownGracefully();
		childGroup.shutdownGracefully();
	}


	public static void main(String[] args) throws Exception {
		int port = 8080;
		new NettyServer(port).start();// 创建一个Netty服务并使用8080端口,然后启动
	}
}

服务器的消息Handler


import com.hy.netty.server.NettyServer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * @description 创建自己的Netty服务处理器,用于处理当前NettyServer中接受的请求的数据
 * @author hy
 * @date 2019-10-08
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter { // (1)

	private final NettyServer nettyServer;

	public NettyServerHandler(NettyServer nettyServer) {
		this.nettyServer = nettyServer;
	}

	// 这里是管道容器中的数据的读取和处理
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
		System.out.println("调用NettyServerHandler当前的channelRead方法!");
		// 通过bytebuf来进行读取数据
		String message = null;
		if (msg instanceof String) {
			message = (String) msg;
		} else if (msg instanceof ByteBuf) {
			ByteBuf in = (ByteBuf) msg;
			message = in.toString(io.netty.util.CharsetUtil.UTF_8);// 这里用于接受消息,使用utf-8编码
		}
		try {

			System.out.println("NettyServer:接收到消息【 " + message + " 】");
			if ("CLIENT CLOSED".equalsIgnoreCase(message)) {
				System.out.println("客户连接已关闭!");
				nettyServer.close();// 关闭当前的服务器
			} else {
				// 当前服务器的操作
				ctx.channel().writeAndFlush("服务器已接收消息!");
			}

		} finally {
			ReferenceCountUtil.release(msg); // 必须要释放
		}
	}

	// 出现异常的时候调用的方法
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		System.out.println("调用NettyServerHandler当前的exceptionCaught方法!");
		cause.printStackTrace();
		ctx.close();
	}
}

2.客户端启动类


import java.util.Scanner;
import com.hy.netty.client.handler.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @description 通过官方文档创建netty的客户端
 * @author hy
 * @date 2019-10-08
 */
public class NettyClient {
	private final String host;// 当前netty客户端连接的主机地址
	private final int port;// 当前netty客户端连接的端口
	private final EventLoopGroup mainGroup;
	private final Bootstrap bootstrap;
	private Channel channel;

	public NettyClient(String host, int port) {
		this.host = host;
		this.port = port;
		this.mainGroup = new NioEventLoopGroup();
		this.bootstrap = new Bootstrap();
	}

	public Channel getChannel() {
		return this.channel;
	}

	// 启动当前netty服务器的操作
	public void start() throws Exception {
		bootstrap
		.group(mainGroup)
		.channel(NioSocketChannel.class)
		.option(ChannelOption.SO_KEEPALIVE, true)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				ch.pipeline()
				.addLast(new NettyClientHandler())
				.addLast(new StringEncoder())
				.addLast(new StringDecoder());
			}
		});

		// 开始连接当前的服务器地址
		ChannelFuture future = bootstrap.connect(host, port).sync(); 
		// 添加监听器,用于判断是否连接成功或者连接失败
		future.addListener(new ChannelFutureListener() {

			public void operationComplete(ChannelFuture future) throws Exception {
				if (future.isSuccess()) {
					System.out.println("连接服务器成功");

				} else {
					System.out.println("连接服务器失败");
					future.cause().printStackTrace();
					mainGroup.shutdownGracefully(); // 关闭线程组
				}

			}
		});
		this.channel = future.channel();
		
	}

	// 创建向服务器发送消息的方法
	public void send(String message) {
		Channel channel = this.getChannel();
		channel.writeAndFlush(message);
	}

	// 客户端的关闭操作
	public void close() {
		send("CLIENT CLOSED");
		mainGroup.shutdownGracefully();// 关闭当前的线程组
	}

	// 当前客户端的操作
	public void clientOption() {
		@SuppressWarnings("resource")
		Scanner clientInput = new Scanner(System.in);
		while (true) {
			System.out.println("请输入发送的字符(exit或者quit退出!):");
			String nextLine = clientInput.nextLine();
			if ("exit".equalsIgnoreCase(nextLine) || "quit".equalsIgnoreCase(nextLine)) {
				break;
			}
			send(nextLine);
		}
		close();// 关闭当前的客户端
	}

	public static void main(String[] args) throws Exception {
		NettyClient client = new NettyClient("localhost", 8080);
		client.start();
		client.send("NettyClient:你好我是客户端,请求连接!");// 这里可以使用
		client.clientOption();
	}

}

客户端消息Handler类

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
 * 
 * @author hy
 * @createTime 2021-04-17 07:59:25
 * @description 创建自己的服务处理器
 *
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter { 

	// 每次有数据的时候会被调用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
    	System.out.println("NettyClientHandler:调用当前的channelRead方法!");
    	 ByteBuf in = (ByteBuf) msg;
        try {
			System.out.println("开始读取服务器发送的数据!");
			String researchMessage = in.toString(io.netty.util.CharsetUtil.UTF_8);
			System.out.println("NettyClient:接收数据【 "+researchMessage+" 】");//这里用于接受消息
		} finally {
			ReferenceCountUtil.release(msg);//(3)这里必须要释放
		}
    }
    
    // 第一次连接上服务器的时候被调用
    @Override
    public void channelActive(final ChannelHandlerContext ctx) { 
    	System.out.println("NettyClientHandler:调用当前的channelActive方法!");
    }
    
    //出现异常的时候,直接关闭这个连接
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
    	System.out.println("NettyClientHandler:调用当前的exceptionCaught方法!");
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

3.启动和测试

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试成功!
发现问题,其中传递的是ByteBuf,但是我使用writeAndFlush是传递的为String类型

4.查看和解析当前的Handler以及解码和加码器

1.查看当前的StringEncoder和StringDeocder·
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
也就是说当前的StringEncoder和StringDecoder就是入站解码器和出站加码器

整理当前的pipline中的handler形成流程控制图

在这里插入图片描述

所以

  1. 当前的从客户端写出的消息,直接经过StringEncoder变成ByteBuf,最后直接传递到了服务器端的NettyServerHandler进行处理(所以服务器接收的不是String而是ByteBuf)
  2. 服务器端写出的消息也是直接通过StringEncoder直接变成ByteBuf传递给客户端NettyClientHandler所以接收的也是ByteBuf

思考,直接将StringDecoder这个入站解析放在NettyHandler的前面不就行了,就可以直接得到String类型的数据,不用解码了!

在这里插入图片描述

5.修改demo

1.修改当前的服务器中的NettyServerHandler
在这里插入图片描述
2.修改当前服务器中的NettyServer中的pipline添加顺序
在这里插入图片描述

bootstrap.group(parentGroup, childGroup) // 向当前的服务启动中添加组
				.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					public void initChannel(SocketChannel ch) throws Exception {
						/*ch.pipeline()
						.addLast(nettyServerHandler)
						.addLast(new StringEncoder())
						.addLast(new StringDecoder());*/
						ch.pipeline()										
						.addLast(new StringDecoder())
						.addLast(nettyServerHandler)
						.addLast(new StringEncoder());
					}
				}).childOption(ChannelOption.SO_KEEPALIVE, true);

3.修改客户端的NettyClientHandler

在这里插入图片描述

4.修改客户端的NettyClient的pipline方式
在这里插入图片描述

.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				/*
				ch.pipeline()
				.addLast(new NettyClientHandler())
				.addLast(new StringEncoder())
				.addLast(new StringDecoder());*/
				ch.pipeline()	
				.addLast(new StringDecoder())
				.addLast(new NettyClientHandler())
				.addLast(new StringEncoder());	
			}
		});

此时修改完毕

6.重新测试

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
测试成功!

7.总结

1.使用netty的时候小心当前的出站和入站规则处理类的放置顺序,使用不当可能导致不同的效果

热门文章

暂无图片
编程学习 ·

exe4j详细使用教程(附下载安装链接)

一、exe4j介绍 ​ exe4j是一个帮助你集成Java应用程序到Windows操作环境的java可执行文件生成工具&#xff0c;无论这些应用是用于服务器&#xff0c;还是图形用户界面&#xff08;GUI&#xff09;或命令行的应用程序。如果你想在任务管理器中及Windows XP分组的用户友好任务栏…
暂无图片
编程学习 ·

AUTOSAR从入门到精通100讲(126)-浅谈车载充电系统通信方案

01 引言 本文深入研究车载充电系统策略,设计出一套基于电动汽车电池管理系统与车载充电机的CAN通信协议,可供电动汽车设计人员参考借鉴。 02 电动汽车充电系统通讯网络 电动汽车整车控制系统中采用的是CAN总线通信方式,由一个整车内部高速CAN网络、内部低速CAN网络和一个充电…
暂无图片
编程学习 ·

CMake(九):生成器表达式

当运行CMake时&#xff0c;开发人员倾向于认为它是一个简单的步骤&#xff0c;需要读取项目的CMakeLists.txt文件&#xff0c;并生成相关的特定于生成器的项目文件集(例如Visual Studio解决方案和项目文件&#xff0c;Xcode项目&#xff0c;Unix Makefiles或Ninja输入文件)。然…
暂无图片
编程学习 ·

47.第十章 网络协议和管理配置 -- 网络配置(八)

4.3.3 route 命令 路由表管理命令 路由表主要构成: Destination: 目标网络ID,表示可以到达的目标网络ID,0.0.0.0/0 表示所有未知网络,又称为默认路由,优先级最低Genmask:目标网络对应的netmaskIface: 到达对应网络,应该从当前主机哪个网卡发送出来Gateway: 到达非直连的网络,…
暂无图片
编程学习 ·

元宇宙技术基础

请看图&#xff1a; 1、通过AR、VR等交互技术提升游戏的沉浸感 回顾游戏的发展历程&#xff0c;沉浸感的提升一直是技术突破的主要方向。从《愤怒的小鸟》到CSGO,游戏建模方式从2D到3D的提升使游戏中的物体呈现立体感。玩家在游戏中可以只有切换视角&#xff0c;进而提升沉浸…
暂无图片
编程学习 ·

flink的伪分布式搭建

一 flink的伪分布式搭建 1.1 执行架构图 1.Flink程序需要提交给 Job Client2.Job Client将作业提交给 Job Manager3.Job Manager负责协调资源分配和作业执行。 资源分配完成后&#xff0c;任务将提交给相应的 Task Manage。4.Task Manager启动一个线程以开始执行。Task Manage…
暂无图片
编程学习 ·

十进制正整数与二进制字符串的转换(C++)

Function one&#xff1a; //十进制数字转成二进制字符串 string Binary(int x) {string s "";while(x){if(x % 2 0) s 0 s;else s 1 s;x / 2;}return s; } Function two&#xff1a; //二进制字符串变为十进制数字 int Decimal(string s) {int num 0, …
暂无图片
编程学习 ·

[含lw+源码等]微信小程序校园辩论管理平台+后台管理系统[包运行成功]Java毕业设计计算机毕设

项目功能简介: 《微信小程序校园辩论管理平台后台管理系统》该项目含有源码、论文等资料、配套开发软件、软件安装教程、项目发布教程等 本系统包含微信小程序做的辩论管理前台和Java做的后台管理系统&#xff1a; 微信小程序——辩论管理前台涉及技术&#xff1a;WXML 和 WXS…
暂无图片
编程学习 ·

树莓派驱动DHT11温湿度传感器

1&#xff0c;直接使用python库 代码如下 import RPi.GPIO as GPIO import dht11 import time import datetimeGPIO.setwarnings(True) GPIO.setmode(GPIO.BCM)instance dht11.DHT11(pin14)try:while True:result instance.read()if result.is_valid():print(ok)print(&quo…
暂无图片
编程学习 ·

ELK简介

ELK简介 ELK是三个开源软件的缩写&#xff0c;Elasticsearch、Logstash、Kibana。它们都是开源软件。不过现在还新增了一个 Beats&#xff0c;它是一个轻量级的日志收集处理工具(Agent)&#xff0c;Beats 占用资源少&#xff0c;适合于在各个服务器上搜集日志后传输给 Logstas…
暂无图片
编程学习 ·

Linux 基础

通常大数据框架都部署在 Linux 服务器上&#xff0c;所以需要具备一定的 Linux 知识。Linux 书籍当中比较著名的是 《鸟哥私房菜》系列&#xff0c;这个系列很全面也很经典。但如果你希望能够快速地入门&#xff0c;这里推荐《Linux 就该这么学》&#xff0c;其网站上有免费的电…
暂无图片
编程学习 ·

Windows2022 无线网卡装不上驱动

想来 Windows2022 和 windows10/11 的驱动应该差不多通用的&#xff0c;但是死活装不上呢&#xff1f; 搜一下&#xff0c;有人提到 “默认安装时‘无线LAN服务’是关闭的&#xff0c;如果需要开启&#xff0c;只需要在“添加角色和功能”中&#xff0c;选择开启“无线LAN服务…
暂无图片
编程学习 ·

【嵌入式面试宝典】版本控制工具Git常用命令总结

目录 创建仓库 查看信息 版本回退 版本检出 远程库 Git 创建仓库 git initgit add <file> 可反复多次使用&#xff0c;添加多个文件git commit -m <message> 查看信息 git status 仓库当前的状态git diff 差异对比git log 历史记录&#xff0c;提交日志--pret…
暂无图片
编程学习 ·

用Postman生成测试报告

newman newman是一款基于nodejs开发的可以运行postman脚本的工具&#xff0c;使用Newman&#xff0c;可以直接从命令运行和测试postman集合。 安装nodejs 下载地址&#xff1a;https://nodejs.org/en/download/ 选择自己系统相对应的版本内容进行下载&#xff0c;然后傻瓜式安…
暂无图片
编程学习 ·

Java面向对象之多态、向上转型和向下转型

文章目录前言一、多态二、引用类型之间的转换Ⅰ.向上转型Ⅱ.向下转型总结前言 今天继续Java面向对象的学习&#xff0c;学习面向对象的第三大特征&#xff1a;多态&#xff0c;了解多态的意义&#xff0c;以及两种引用类型之间的转换&#xff1a;向上转型、向下转型。  希望能…