netty学习
过程
server启动 parentGroup 可以简单的理解一个为一个线程池 NioEventLoop可以理解为一个线程,它本身不是一个线程,但是会绑定一个线程 NioEventLoop对指定的port进行连接监听
client启动 执行步骤二 eventLoopGroup和那个parentGroup是一样的
Pipeline是一个双向链表,包含很多的处理器
parentGroup childGroup parentGroup相当于迎宾员,childgroup相当于服务员。parentGroup只是管客户端链接的,childGroup后续所有的服务
核心概念
Channael
管道,其实对Socket的封装
EventLoopGroup
是一个eventloop池,包含很多eventloop。EventLoop本身只是一个线程驱动,在生命周期之内只绑定一个线程
netty为每一个Channnel分配一个EventLoop,用于处理用户连接,对用户请求处理等所有事件Channel和EventLoop的关系是n:1,而EventLoop和线程的关系是1:1,一个EventLoop可以和很多的Channel绑定
serverBootstrap
服务端使用的是ServerBootstrap;客户端是Bootstrap。相当于粘合剂,将各个组件关联起来
ChannelHeader和ChannelPipeline
ChannelHeader是对Channel中数据的处理器,可以是系统本身定义好的编码器也可以是用户定义的。这些处理器会被统一添加到一个ChannelPipeline的对象中,然后按照顺序对Channel中的数据进行一次处理
ChannelFuture
Netty中的所有I/O操作都是异步的Netty中定义了ChannelFuture对象作为异步操作的代言人,表示异步操作本身。如果想获取该异步操作的的返回值,可以通过该异步对象中的addListener()方法为该异步操作添加监听器,为其注册回调,当结果出来之后马上调用执行
回调:当结果出来之后立马启用
代码
依赖
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.63.Final</version>
</dependency>
</dependencies>
IDEA的maven项目的netty包的导入(其他jar同) - CccccDi - 博客园 (cnblogs.com)
IDEA引入Netty包 - 亲爸爸 - 博客园 (cnblogs.com)
demo
服务器端:
package com.lyz.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class SomeServer {
public static void main(String[] args){
// 创建一个group,用于处理客户端连接请求
NioEventLoopGroup parentGroup=new NioEventLoopGroup();
// 创建一个group,用于处理客户端连接上sever之后的后续请求
NioEventLoopGroup childGroup=new NioEventLoopGroup();
try {
//bootstrap用于初始化channel
ServerBootstrap bootstrap=new ServerBootstrap();
//指定两个要使用的group
bootstrap.group(parentGroup,childGroup)
//指定创建的channel的类型
.channel(NioServerSocketChannel.class)
//指定要使用的处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
//初始化channel方法
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//channel一旦创建完毕就会同时绑定一个pipeline
ChannelPipeline pipeline=ch.pipeline();
//添加编码器
pipeline.addLast(new StringEncoder());
//添加解码器
pipeline.addLast(new StringDecoder());
//添加自定义的处理器
pipeline.addLast(new SomeServerHeadler());
}
});
//创建channel,绑定指定的主机(hostName,port)
//sync() 将异步变成同步的
ChannelFuture future =null;
future=bootstrap.bind(8888).sync();//这个方法不执行完毕不往下执行,这就是为啥给他从异步变成同步
System.out.println("服务器8888已经启动");
//当channel被关闭之后,会触发closeFuture()的执行,去完成一些首位工作
future.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
//将两个group进行优雅关闭
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
package com.lyz.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
//自定义服务端处理器,用于处理来自客户端的数据
public class SomeServerHeadler extends ChannelInboundHandlerAdapter {
//一种回调方法:当client将数据写入到channel并发送到server后,server就会触发该方法的执行
/**
* @param ctx 表示当前处理器(其实他就是当前处理器封装的一个节点)
* @param msg client发来的数据
* @throws Exception
* **/
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
//输出client的地址与发送来的数据
System.out.println(ctx.channel().remoteAddress()+","+msg);
//向客户端发送一个随机的uuid
//UUID.randomUUID()
ctx.channel().writeAndFlush("from server"+msg);
TimeUnit.MICROSECONDS.sleep(500);
}
//一旦在服务器端发生异常,就会触发该方法的运行
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//关闭channel
ctx.close();
}
}
客户端
package com.lyz.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class SomeClient {
public static void main(String[] args) {
NioEventLoopGroup group =new NioEventLoopGroup();
try{
//对比server端使用的是ServerBootstrap
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group)
//指定要创建的channel的类型
//server指定的是NioServerSocketChannel
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline=ch.pipeline();
pipeline.addLast(new StringEncoder());//pipeline.addLast("可以自定义处理器名,不添加默认为类名",new StringEncoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(new SomeClientHeadler());
}
});
ChannelFuture future=bootstrap.connect("localhost",8888).sync();
future.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
}
package com.lyz.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.time.LocalDateTime;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
//自定义客户端处理器,处理来自于server的数据
public class SomeClientHeadler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
System.out.println(ctx.channel().remoteAddress()+","+msg);
Scanner in =new Scanner(System.in);
System.out.println("请输入您要发送的信息");
String data=in.nextLine();
ctx.channel().writeAndFlush("from client:"+ data);
TimeUnit.MILLISECONDS.sleep(500);
}
//当channel被激活的时候会触发该方法的执行,该方法指挥执行一次
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.channel().writeAndFlush("send the first data");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
课程设计结果
服务器端
package com.lyz.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.HashMap;
import java.util.Map;
public class SomeServer {
public static Map<String, ChannelHandlerContext> user_list=new HashMap<>();
public static void main(String[] args){
// 创建一个group,用于处理客户端连接请求
NioEventLoopGroup parentGroup=new NioEventLoopGroup();
// 创建一个group,用于处理客户端连接上sever之后的后续请求
NioEventLoopGroup childGroup=new NioEventLoopGroup();
try {
//bootstrap用于初始化channel
ServerBootstrap bootstrap=new ServerBootstrap();
//指定两个要使用的group
bootstrap.group(parentGroup,childGroup)
//指定创建的channel的类型
.channel(NioServerSocketChannel.class)
//指定要使用的处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
//初始化channel方法
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//channel一旦创建完毕就会同时绑定一个pipeline
ChannelPipeline pipeline=ch.pipeline();
//添加编码器
pipeline.addLast(new StringEncoder());
//添加解码器
pipeline.addLast(new StringDecoder());
//添加自定义的处理器
pipeline.addLast(new SomeServerHeadler());
}
});
//创建channel,绑定指定的主机(hostName,port)
//sync() 将异步变成同步的
ChannelFuture future =null;
future=bootstrap.bind(8888).sync();//这个方法不执行完毕不往下执行,这就是为啥给他从异步变成同步
System.out.println("服务器8888已经启动");
//当channel被关闭之后,会触发closeFuture()的执行,去完成一些首位工作
future.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
//将两个group进行优雅关闭
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
package com.lyz.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
//自定义服务端处理器,用于处理来自客户端的数据
public class SomeServerHeadler extends ChannelInboundHandlerAdapter {
//一种回调方法:当client将数据写入到channel并发送到server后,server就会触发该方法的执行
/**
* @param ctx 表示当前处理器(其实他就是当前处理器封装的一个节点)
* @param msg client发来的数据
* @throws Exception
* **/
public static Map<String, ChannelHandlerContext> user_list=new HashMap<>();
public void sendto(ChannelHandlerContext ctx,Object msg)throws Exception{
ctx.channel().writeAndFlush("from server"+msg);
TimeUnit.MICROSECONDS.sleep(500);
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
//输出client的地址与发送来的数据
System.out.println(ctx.channel().remoteAddress()+","+msg);
msg=msg.toString().replaceAll("[\\r\\n]","");
//登录
String username="";
if (msg.toString().startsWith("LOGIN:")){
username=msg.toString().split(":")[1];
if (user_list.containsKey(username)){
System.out.println(username+"登录失败");
String data="失败,请重新登录";
sendto(user_list.get(username),data);
}
else{
System.out.println(username+"登录成功");
user_list.put(username,ctx);
String data=username+"登录成功";
sendto(user_list.get(username),data);
}
}
else if (msg.toString().startsWith("SENDTO:")){//SENDTO:1;MSG:HELLO,I AM 1
String sendto_name=msg.toString().split(";")[0].split(":")[1];
String data=msg.toString().split(";")[1].split(":")[1];
System.out.println(sendto_name+"------"+data);
sendto(user_list.get(sendto_name),data);
}
else if (msg.toString().startsWith("SENDALL:")){
Set<String> keys=user_list.keySet();
String data=msg.toString().split(":")[1];
for(String key:keys){
sendto(user_list.get(key),data);
}
}
else if(msg.toString().startsWith("QUERY")){
String data=user_list.toString();
sendto(ctx,data);
}
else if (msg.toString().startsWith("EXIT")){
user_list.remove(username);
String data="退出成功";
sendto(ctx,data);
ctx.close();
}
else{
String data="失败,请重新输入";
sendto(ctx,data);
}
}
//一旦在服务器端发生异常,就会触发该方法的运行
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//关闭channel
ctx.close();
}
}
客户端
package com.lyz.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class SomeClient {
public static void main(String[] args) {
NioEventLoopGroup group =new NioEventLoopGroup();
try{
//对比server端使用的是ServerBootstrap
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group)
//指定要创建的channel的类型
//server指定的是NioServerSocketChannel
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline=ch.pipeline();
pipeline.addLast(new StringEncoder());//pipeline.addLast("可以自定义处理器名,不添加默认为类名",new StringEncoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(new SomeClientHeadler());
}
});
ChannelFuture future=bootstrap.connect("localhost",8888).sync();
future.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
}
package com.lyz.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.time.LocalDateTime;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
//自定义客户端处理器,处理来自于server的数据
public class SomeClientHeadler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
System.out.println(ctx.channel().remoteAddress()+","+msg);
sendto(ctx,msg);
}
//当channel被激活的时候会触发该方法的执行,该方法指挥执行一次
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
Scanner in =new Scanner(System.in);
System.out.println("请输入您的LOGIN:");
String Login=in.nextLine();
ctx.channel().writeAndFlush("LOGIN:"+Login);
}
public void sendto(ChannelHandlerContext ctx,Object msg)throws Exception{
if(msg.toString().equals("登录失败,请重新登录"))
{
System.out.println("请输入您的LOGIN:");
Scanner in =new Scanner(System.in);
String Login=in.nextLine();
ctx.channel().writeAndFlush("LOGIN:"+Login);
}
else if (msg.toString().equals("退出成功")){
ctx.channel().closeFuture().sync();
}
else {
System.out.println("请输入您要发送的信息");
Scanner in =new Scanner(System.in);
String data = in.nextLine();
ctx.channel().writeAndFlush(data);
TimeUnit.MILLISECONDS.sleep(500);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}