netty学习


netty学习

过程

执行流程示意图

server启动 parentGroup 可以简单的理解一个为一个线程池 NioEventLoop可以理解为一个线程,它本身不是一个线程,但是会绑定一个线程 NioEventLoop对指定的port进行连接监听

client启动 执行步骤二 eventLoopGroup和那个parentGroup是一样的

Pipeline是一个双向链表,包含很多的处理器

parentGroup childGroup parentGroup相当于迎宾员,childgroup相当于服务员。parentGroup只是管客户端链接的,childGroup后续所有的服务

核心概念

Channael

管道,其实对Socket的封装

EventLoopGroup

是一个eventloop池,包含很多eventloop。EventLoop本身只是一个线程驱动,在生命周期之内只绑定一个线程

netty为每一个Channnel分配一个EventLoop,用于处理用户连接,对用户请求处理等所有事件Channel和EventLoop的关系是n:1,而EventLoop和线程的关系是1:1,一个EventLoop可以和很多的Channel绑定

serverBootstrap

服务端使用的是ServerBootstrap;客户端是Bootstrap。相当于粘合剂,将各个组件关联起来

ChannelHeader和ChannelPipeline

ChannelHeader是对Channel中数据的处理器,可以是系统本身定义好的编码器也可以是用户定义的。这些处理器会被统一添加到一个ChannelPipeline的对象中,然后按照顺序对Channel中的数据进行一次处理

ChannelFuture

Netty中的所有I/O操作都是异步的Netty中定义了ChannelFuture对象作为异步操作的代言人,表示异步操作本身。如果想获取该异步操作的的返回值,可以通过该异步对象中的addListener()方法为该异步操作添加监听器,为其注册回调,当结果出来之后马上调用执行

回调:当结果出来之后立马启用

代码

依赖

<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.63.Final</version>
    </dependency>
</dependencies>
         

IDEA的maven项目的netty包的导入(其他jar同) - CccccDi - 博客园 (cnblogs.com)

IDEA引入Netty包 - 亲爸爸 - 博客园 (cnblogs.com)

demo

服务器端:

package com.lyz.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SomeServer {
    public static void main(String[] args){
        // 创建一个group,用于处理客户端连接请求
        NioEventLoopGroup parentGroup=new NioEventLoopGroup();
        // 创建一个group,用于处理客户端连接上sever之后的后续请求
        NioEventLoopGroup childGroup=new NioEventLoopGroup();
        try {
            //bootstrap用于初始化channel
            ServerBootstrap bootstrap=new  ServerBootstrap();
            //指定两个要使用的group
            bootstrap.group(parentGroup,childGroup)
                    //指定创建的channel的类型
                    .channel(NioServerSocketChannel.class)
                    //指定要使用的处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //初始化channel方法
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //channel一旦创建完毕就会同时绑定一个pipeline
                            ChannelPipeline pipeline=ch.pipeline();
                            //添加编码器
                            pipeline.addLast(new StringEncoder());
                            //添加解码器
                            pipeline.addLast(new StringDecoder());
                            //添加自定义的处理器
                            pipeline.addLast(new SomeServerHeadler());
                        }
                    });
            //创建channel,绑定指定的主机(hostName,port)
            //sync() 将异步变成同步的
            ChannelFuture future =null;
            future=bootstrap.bind(8888).sync();//这个方法不执行完毕不往下执行,这就是为啥给他从异步变成同步
            System.out.println("服务器8888已经启动");
            //当channel被关闭之后,会触发closeFuture()的执行,去完成一些首位工作
            future.channel().closeFuture().sync();
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            //将两个group进行优雅关闭
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
package com.lyz.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

//自定义服务端处理器,用于处理来自客户端的数据
public class SomeServerHeadler extends ChannelInboundHandlerAdapter {
    //一种回调方法:当client将数据写入到channel并发送到server后,server就会触发该方法的执行
    /**
     * @param ctx 表示当前处理器(其实他就是当前处理器封装的一个节点)
     * @param msg client发来的数据
     * @throws Exception
     * **/
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        //输出client的地址与发送来的数据
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        //向客户端发送一个随机的uuid
        //UUID.randomUUID()
        ctx.channel().writeAndFlush("from server"+msg);
        TimeUnit.MICROSECONDS.sleep(500);
    }
    //一旦在服务器端发生异常,就会触发该方法的运行
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        //关闭channel
        ctx.close();
    }
}

客户端

package com.lyz.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SomeClient {
    public static void main(String[] args) {
        NioEventLoopGroup group =new  NioEventLoopGroup();
        try{
            //对比server端使用的是ServerBootstrap
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.group(group)
                    //指定要创建的channel的类型
                    //server指定的是NioServerSocketChannel
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline=ch.pipeline();
                            pipeline.addLast(new StringEncoder());//pipeline.addLast("可以自定义处理器名,不添加默认为类名",new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new SomeClientHeadler());
                        }
                    });
            ChannelFuture future=bootstrap.connect("localhost",8888).sync();
            future.channel().closeFuture().sync();
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }
}
package com.lyz.client;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.time.LocalDateTime;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;


//自定义客户端处理器,处理来自于server的数据
public class SomeClientHeadler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        Scanner in =new Scanner(System.in);
        System.out.println("请输入您要发送的信息");
        String data=in.nextLine();
        ctx.channel().writeAndFlush("from client:"+ data);
        TimeUnit.MILLISECONDS.sleep(500);
    }

    //当channel被激活的时候会触发该方法的执行,该方法指挥执行一次
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ctx.channel().writeAndFlush("send the first data");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

课程设计结果

服务器端

package com.lyz.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.HashMap;
import java.util.Map;

public class SomeServer {
    public static Map<String, ChannelHandlerContext> user_list=new HashMap<>();

    public static void main(String[] args){
        // 创建一个group,用于处理客户端连接请求
        NioEventLoopGroup parentGroup=new NioEventLoopGroup();
        // 创建一个group,用于处理客户端连接上sever之后的后续请求
        NioEventLoopGroup childGroup=new NioEventLoopGroup();
        try {
            //bootstrap用于初始化channel
            ServerBootstrap bootstrap=new  ServerBootstrap();
            //指定两个要使用的group
            bootstrap.group(parentGroup,childGroup)
                    //指定创建的channel的类型
                    .channel(NioServerSocketChannel.class)
                    //指定要使用的处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //初始化channel方法
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //channel一旦创建完毕就会同时绑定一个pipeline
                            ChannelPipeline pipeline=ch.pipeline();
                            //添加编码器
                            pipeline.addLast(new StringEncoder());
                            //添加解码器
                            pipeline.addLast(new StringDecoder());
                            //添加自定义的处理器
                            pipeline.addLast(new SomeServerHeadler());
                        }
                    });
            //创建channel,绑定指定的主机(hostName,port)
            //sync() 将异步变成同步的
            ChannelFuture future =null;
            future=bootstrap.bind(8888).sync();//这个方法不执行完毕不往下执行,这就是为啥给他从异步变成同步
            System.out.println("服务器8888已经启动");
            //当channel被关闭之后,会触发closeFuture()的执行,去完成一些首位工作
            future.channel().closeFuture().sync();
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            //将两个group进行优雅关闭
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
package com.lyz.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

//自定义服务端处理器,用于处理来自客户端的数据
public class SomeServerHeadler extends ChannelInboundHandlerAdapter {
    //一种回调方法:当client将数据写入到channel并发送到server后,server就会触发该方法的执行
    /**
     * @param ctx 表示当前处理器(其实他就是当前处理器封装的一个节点)
     * @param msg client发来的数据
     * @throws Exception
     * **/
    public static Map<String, ChannelHandlerContext> user_list=new HashMap<>();
    public void sendto(ChannelHandlerContext ctx,Object msg)throws Exception{
        ctx.channel().writeAndFlush("from server"+msg);
        TimeUnit.MICROSECONDS.sleep(500);
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        //输出client的地址与发送来的数据
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        msg=msg.toString().replaceAll("[\\r\\n]","");
        //登录
        String username="";
        if (msg.toString().startsWith("LOGIN:")){
            username=msg.toString().split(":")[1];
            if (user_list.containsKey(username)){
                System.out.println(username+"登录失败");
                String data="失败,请重新登录";
                sendto(user_list.get(username),data);
            }
            else{
                System.out.println(username+"登录成功");
                user_list.put(username,ctx);
                String data=username+"登录成功";
                sendto(user_list.get(username),data);
            }
        }
        else if (msg.toString().startsWith("SENDTO:")){//SENDTO:1;MSG:HELLO,I AM 1
            String sendto_name=msg.toString().split(";")[0].split(":")[1];
            String data=msg.toString().split(";")[1].split(":")[1];
            System.out.println(sendto_name+"------"+data);
            sendto(user_list.get(sendto_name),data);
        }
        else if (msg.toString().startsWith("SENDALL:")){
            Set<String> keys=user_list.keySet();
            String data=msg.toString().split(":")[1];
            for(String key:keys){
                sendto(user_list.get(key),data);
            }
        }
        else if(msg.toString().startsWith("QUERY")){
            String data=user_list.toString();
            sendto(ctx,data);
        }
        else if (msg.toString().startsWith("EXIT")){
            user_list.remove(username);
            String data="退出成功";
            sendto(ctx,data);
            ctx.close();
        }
        else{
            String data="失败,请重新输入";
            sendto(ctx,data);
        }
    }
    //一旦在服务器端发生异常,就会触发该方法的运行
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        //关闭channel
        ctx.close();
    }


}

客户端

package com.lyz.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SomeClient {
    public static void main(String[] args) {
        NioEventLoopGroup group =new  NioEventLoopGroup();
        try{
            //对比server端使用的是ServerBootstrap
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.group(group)
                    //指定要创建的channel的类型
                    //server指定的是NioServerSocketChannel
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline=ch.pipeline();
                            pipeline.addLast(new StringEncoder());//pipeline.addLast("可以自定义处理器名,不添加默认为类名",new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new SomeClientHeadler());
                        }
                    });
            ChannelFuture future=bootstrap.connect("localhost",8888).sync();
            future.channel().closeFuture().sync();
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }
}
package com.lyz.client;


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.time.LocalDateTime;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;


//自定义客户端处理器,处理来自于server的数据
public class SomeClientHeadler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        System.out.println(ctx.channel().remoteAddress()+","+msg);
        sendto(ctx,msg);
    }

    //当channel被激活的时候会触发该方法的执行,该方法指挥执行一次
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        Scanner in =new Scanner(System.in);
        System.out.println("请输入您的LOGIN:");
        String Login=in.nextLine();
        ctx.channel().writeAndFlush("LOGIN:"+Login);
    }
    
    public void sendto(ChannelHandlerContext ctx,Object msg)throws Exception{
        if(msg.toString().equals("登录失败,请重新登录"))
        {
            System.out.println("请输入您的LOGIN:");
            Scanner in =new Scanner(System.in);
            String Login=in.nextLine();
            ctx.channel().writeAndFlush("LOGIN:"+Login);
        }
        else if (msg.toString().equals("退出成功")){
            ctx.channel().closeFuture().sync();
        }
        else {
            System.out.println("请输入您要发送的信息");
            Scanner in =new Scanner(System.in);
            String data = in.nextLine();
            ctx.channel().writeAndFlush(data);
            TimeUnit.MILLISECONDS.sleep(500);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


文章作者: 毛豆不逗比
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 毛豆不逗比 !
  目录
{% include '_third-party/exturl.swig' %} {% include '_third-party/bookmark.swig' %} {% include '_third-party/copy-code.swig' %} + {% include '_custom/custom.swig' %}