netty 教程_netty Socket异步用法详解

更新时间:2018-09-13    来源:js教程    手机版     字体:

【www.bbyears.com--js教程】

按照《Unix网络编程》的划分,IO模型可以分为:阻塞IO、非阻塞IO、IO复用、信号驱动IO和异步IO。按照POSIX标准来划分只分为两类:同步IO和异步IO。首先一个IO操作其实分成了两个步骤:发起IO请求和实际的IO操作,同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO服用、信号驱动IO都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO。阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那么就是非阻塞IO。

Netty是一个异步,事件驱动的网络编程框架和工具,使用Netty可以快速开发出可维护的,高性能、高扩展能力的协议服务及其客户端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

以Netty文档中提供的时间服务为例,当一个客户端连接上来的时候,会发送当前时间给它。无视客户端发过来的消息

服务端实现

首页定义一个UnixTime类,Unix格式的时间

 代码如下 复制代码

package com.netty.timeserver;
 
import java.util.Date;
 
public class UnixTime {
 private final int value;
 
 public UnixTime(int value) {
  this.value = value;
 }
 
 public int getValue() {
  return value;
 }
 
 @Override
 public String toString() {
  return new Date(value * 1000L).toString();
 }
}

服务器端处理消息类,继承自SimpleChannelHandler ,可以实现多个不同的

 代码如下 复制代码

package com.netty.timeserver;
 
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
 
public class TimeServerHandler extends SimpleChannelHandler {
 
 // 有连接进来的时候,发送当前时间
 @Override
 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  System.out.println("Client from: " + e.getChannel().getRemoteAddress());
  UnixTime time = new UnixTime((int) (System.currentTimeMillis() / 1000));
  e.getChannel().write(time);
  ChannelFuture f = e.getChannel().write(time);
  f.addListener(ChannelFutureListener.CLOSE);
 }
 
 // 保存所有连接进来的Channel,服务关闭的时候需要处理
 @Override
 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  TimeServer.allChannels.add(e.getChannel());
 }
 
 // 异常时触发
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
  e.getCause().printStackTrace();
  e.getChannel().close();
 }
}

编码POJO对象UnixTime用于发送

 代码如下 复制代码


package com.netty.timeserver;
 
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
 
public class TimeEncoder extends SimpleChannelHandler {
 @Override
 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
  UnixTime time = (UnixTime) e.getMessage();
 
  ChannelBuffer buf = ChannelBuffers.buffer(4);
  buf.writeInt(time.getValue());
 
  Channels.write(ctx, e.getFuture(), buf);
 }
}

启动服务端程序

 代码如下 复制代码

package com.netty.timeserver;
 
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
 
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
public class TimeServer {
 static final ChannelGroup allChannels = new DefaultChannelGroup("time-server");
 
 public static void main(String[] args) {
  ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
  ServerBootstrap bootstrap = new ServerBootstrap(factory);
  bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
   public ChannelPipeline getPipeline() {
    return Channels.pipeline(new TimeEncoder(), new TimeServerHandler());
   }
  });
  bootstrap.setOption("child.tcpNoDelay", true);
  bootstrap.setOption("child.keepAlive", true);
  bootstrap.setOption("reuseAddress", true);
 
  Channel channel = bootstrap.bind(new InetSocketAddress(9000));
  allChannels.add(channel);
  // 关闭服务的通知
  // waitForShutdownCommand();
  ChannelGroupFuture future = allChannels.close();
  future.awaitUninterruptibly();
  factory.releaseExternalResources();
 }
}

客户端实现

客户端的消息处理方法,同样继续自SimpleChannelHandler

 代码如下 复制代码

package com.netty.timeserver;
 
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
 
public class TimeClientHandler extends SimpleChannelHandler {
 
 @Override
 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
  UnixTime m = (UnixTime) e.getMessage();
  System.out.println("RECEIVED: " + m);
  e.getChannel().close();
 }
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
  e.getCause().printStackTrace();
  e.getChannel().close();
 }
}

客户端解析服务端发来的时间

 代码如下 复制代码

package com.netty.timeserver;
 
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
 
public class TimeDecoder extends FrameDecoder {
 
 @Override
 protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
  if (buffer.readableBytes() < 4) {
   return null;
  }
  return new UnixTime(buffer.readInt());
 }
}

客户端调用

 代码如下 复制代码 package com.netty.timeserver;
 
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
 
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
public class TimeClient {
 
 public static void main(String[] args) {
  String host = args[0];
  int port = Integer.parseInt(args[1]);
  ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
  ClientBootstrap bootstrap = new ClientBootstrap(factory);
  bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
   public ChannelPipeline getPipeline() {
    return Channels.pipeline(new TimeDecoder(), new TimeClientHandler());
   }
  });
 
  bootstrap.setOption("tcpNoDelay", true);
  bootstrap.setOption("keepAlive", true);
 
  ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
  future.awaitUninterruptibly();
  if (!future.isSuccess()) {
   future.getCause().printStackTrace();
  }
  Channel channel = future.getChannel();
 
  channel.getCloseFuture().awaitUninterruptibly();
  factory.releaseExternalResources();
 }
}

Mina出身于开源界的大牛apache组织,Netty出身于商业开源大亨jboss,而Grizzly则出身于土鳖曾经的Sun公司。从设计的理念上来看,MINA的设计理念是最为优雅的。当然,由于Netty的主导作者与Mina的主导作者是同一人,出自同一人之手的Netty在设计理念上与Mina基本上是一致的。而Grizzly在设计理念上就较差了点,几乎是Java NIO的简单封装。但是Mina项目主页的访问速度剧慢,所以我选了Netty。

本文来源:http://www.bbyears.com/wangyezhizuo/44303.html