Pārlūkot izejas kodu

init add xipeng

九月 6 gadi atpakaļ
revīzija
5480cca84f
23 mainītis faili ar 3462 papildinājumiem un 0 dzēšanām
  1. 83 0
      pom.xml
  2. 12 0
      src/main/java/com/mogo/xts/XtsApplication.java
  3. 16 0
      src/main/java/com/mogo/xts/controller/SocketController.java
  4. 36 0
      src/main/java/com/mogo/xts/enums/ActionEnum.java
  5. 72 0
      src/main/java/com/mogo/xts/netty/client/DefaultFuture.java
  6. 99 0
      src/main/java/com/mogo/xts/netty/client/NettyClient.java
  7. 57 0
      src/main/java/com/mogo/xts/netty/client/SocketManager.java
  8. 40 0
      src/main/java/com/mogo/xts/netty/client/XtsCoreClientHandler.java
  9. 13 0
      src/main/java/com/mogo/xts/netty/idle/ChannelHandlerHolder.java
  10. 76 0
      src/main/java/com/mogo/xts/netty/idle/HeartBeatClientHandler.java
  11. 49 0
      src/main/java/com/mogo/xts/netty/idle/HeartBeatServerHandler.java
  12. 87 0
      src/main/java/com/mogo/xts/netty/idle/NettyConnectionWatchDog.java
  13. 858 0
      src/main/java/com/mogo/xts/netty/protobuf/ChannelRequestProto.java
  14. 1023 0
      src/main/java/com/mogo/xts/netty/protobuf/ChannelResponseProto.java
  15. 71 0
      src/main/java/com/mogo/xts/netty/server/NettyServer.java
  16. 51 0
      src/main/java/com/mogo/xts/netty/server/XtsCoreServerHandler.java
  17. 30 0
      src/main/java/com/mogo/xts/utils/KidUtils.java
  18. 24 0
      src/main/java/com/mogo/xts/utils/SocketUtils.java
  19. 616 0
      src/main/java/com/mogo/xts/utils/TimeUtil.java
  20. 0 0
      src/main/resources/application.properties
  21. 27 0
      src/test/java/com/mogo/nio/BufferTest.java
  22. 91 0
      src/test/java/com/mogo/nio/NioServer.java
  23. 31 0
      src/test/java/com/mogo/xts/XtsApplicationTests.java

+ 83 - 0
pom.xml

@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>com.mogo</groupId>
+	<artifactId>xts</artifactId>
+	<version>0.0.1-SNAPSHOT</version>
+	<packaging>jar</packaging>
+
+	<name>xts</name>
+	<url>https://gitee.com/huxipeng/xts.git</url>
+
+	<parent>
+		<groupId>org.springframework.boot</groupId>
+		<artifactId>spring-boot-starter-parent</artifactId>
+		<version>2.0.6.RELEASE</version>
+		<relativePath/> <!-- lookup parent from repository -->
+	</parent>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+		<java.version>1.8</java.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-amqp</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-logging</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-data-redis</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-web</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.projectlombok</groupId>
+			<artifactId>lombok</artifactId>
+			<version>1.18.2</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-lang3</artifactId>
+			<version>3.8.1</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.protobuf</groupId>
+			<artifactId>protobuf-java</artifactId>
+			<version>3.6.1</version>
+		</dependency>
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+			<version>2.9.9</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.springframework.boot</groupId>
+				<artifactId>spring-boot-maven-plugin</artifactId>
+			</plugin>
+		</plugins>
+	</build>
+
+
+</project>

+ 12 - 0
src/main/java/com/mogo/xts/XtsApplication.java

@@ -0,0 +1,12 @@
+package com.mogo.xts;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class XtsApplication {
+
+	public static void main(String[] args) {
+		SpringApplication.run(XtsApplication.class, args);
+	}
+}

+ 16 - 0
src/main/java/com/mogo/xts/controller/SocketController.java

@@ -0,0 +1,16 @@
+package com.mogo.xts.controller;
+
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * @author Xipeng
+ **/
+@RestController
+public class SocketController {
+
+    @GetMapping("/testSocket")
+    public void testSocket() throws Exception{
+
+    }
+}

+ 36 - 0
src/main/java/com/mogo/xts/enums/ActionEnum.java

@@ -0,0 +1,36 @@
+package com.mogo.xts.enums;
+
+/**
+ * @author Xipeng
+ **/
+public enum ActionEnum {
+
+    HEART_BEAT("HEART_BEAT", "心跳包"),
+    TEST("TEST", "测试"),
+    ;
+
+    private String action;
+
+    private String desc;
+
+    ActionEnum(String action, String desc) {
+        this.action = action;
+        this.desc = desc;
+    }
+
+    public String getAction() {
+        return action;
+    }
+
+    public void setAction(String action) {
+        this.action = action;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+
+    public void setDesc(String desc) {
+        this.desc = desc;
+    }
+}

+ 72 - 0
src/main/java/com/mogo/xts/netty/client/DefaultFuture.java

@@ -0,0 +1,72 @@
+package com.mogo.xts.netty.client;
+
+import com.mogo.xts.netty.protobuf.ChannelResponseProto;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @author Xipeng
+ **/
+public class DefaultFuture {
+
+    private static final Map<String, DefaultFuture> defaultFutures = new ConcurrentHashMap<String, DefaultFuture>();
+
+    public Lock lock = new ReentrantLock();
+
+    private Condition isEmpty = lock.newCondition();
+
+    private volatile boolean isDone = false;
+
+    private ChannelResponseProto.ChannelResponse response;
+
+    private static final int delay = 10;
+
+    public DefaultFuture(String requestId) {
+        defaultFutures.put(requestId, this);
+    }
+
+    public ChannelResponseProto.ChannelResponse get() throws Exception {
+
+        lock.lock();
+
+        try {
+            while (!isDone) {
+                isEmpty.await(delay, TimeUnit.MILLISECONDS);
+            }
+            return this.response;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public static void receive(ChannelResponseProto.ChannelResponse response) {
+        DefaultFuture future = defaultFutures.get(response.getRequestId());
+        if (future != null) {
+            Lock lock = future.lock;
+            lock.lock();
+            try {
+                future.setResponse(response);
+                future.isDone = true;
+                future.isEmpty.signal();
+                defaultFutures.remove(future);
+            } catch (Exception e) {
+                throw e;
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    public ChannelResponseProto.ChannelResponse getResponse() {
+        return response;
+    }
+
+    public void setResponse(ChannelResponseProto.ChannelResponse response) {
+        this.response = response;
+    }
+}

+ 99 - 0
src/main/java/com/mogo/xts/netty/client/NettyClient.java

@@ -0,0 +1,99 @@
+package com.mogo.xts.netty.client;
+
+import com.mogo.xts.netty.idle.HeartBeatClientHandler;
+import com.mogo.xts.netty.idle.NettyConnectionWatchDog;
+import com.mogo.xts.netty.protobuf.ChannelRequestProto;
+import com.mogo.xts.netty.protobuf.ChannelResponseProto;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.HashedWheelTimer;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Xipeng
+ **/
+public class NettyClient {
+
+    private final String host;
+    private final int port;
+
+    private Bootstrap b;
+
+    protected final HashedWheelTimer timer = new HashedWheelTimer();
+
+    public NettyClient(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+    public void start() throws Exception {
+        EventLoopGroup group = new NioEventLoopGroup();
+        try {
+            b = new Bootstrap();
+            b.group(group).channel(NioSocketChannel.class)
+                    .option(ChannelOption.SO_BACKLOG, 128)
+                    .option(ChannelOption.SO_KEEPALIVE, true)
+                    .remoteAddress(new InetSocketAddress(host, port))
+                    .handler(new LoggingHandler(LogLevel.INFO));
+
+            // 定义重连检测狗, 如果netty服务端挂了,则尝试重连
+            final NettyConnectionWatchDog watchDog = new NettyConnectionWatchDog(b, timer, port, host, true) {
+                @Override
+                public ChannelHandler[] handlers() {
+                    return new ChannelHandler[]{
+                            this,
+                            //new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
+                            new ProtobufVarint32FrameDecoder(),
+                            new ProtobufDecoder(ChannelResponseProto.ChannelResponse.getDefaultInstance()),
+                            new ProtobufVarint32LengthFieldPrepender(),
+                            new ProtobufEncoder(),
+                            new HeartBeatClientHandler(),
+                            new XtsCoreClientHandler()
+                    };
+                }
+            };
+
+            ChannelFuture future;
+            //进行连接
+            try {
+                synchronized (b) {
+                    b.handler(new ChannelInitializer<Channel>() {
+                        //初始化channel
+                        @Override
+                        protected void initChannel(Channel ch) throws Exception {
+                            ch.pipeline().addLast(watchDog.handlers());
+                        }
+                    });
+                    future = b.connect(host,port);
+                }
+
+                future.sync();
+            } catch (Throwable t) {
+                throw new Exception("connects to  fails", t);
+            }
+        } catch (Exception e){
+            group.shutdownGracefully().sync();
+        }
+    }
+    public static void main(String[] args) throws Exception {
+        if (args.length != 2) {
+            System.err.println("Usage: " + NettyClient.class.getSimpleName() + " <host> <port>");
+            return;
+        }
+        String host = args[0];
+        int port = Integer.parseInt(args[1]);
+        new NettyClient(host, port).start();
+    }
+
+}

+ 57 - 0
src/main/java/com/mogo/xts/netty/client/SocketManager.java

@@ -0,0 +1,57 @@
+package com.mogo.xts.netty.client;
+
+import com.mogo.xts.netty.protobuf.ChannelRequestProto;
+import com.mogo.xts.utils.SocketUtils;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * 发送数据
+ * @author Xipeng
+ **/
+public class SocketManager {
+
+    private static final int max_size = 50;
+
+    private ChannelHandlerContext ctx;
+
+    private static ExecutorService executorService = Executors.newFixedThreadPool(max_size);
+
+    private static class HolderClass {
+        private final static SocketManager instance = new SocketManager();
+    }
+
+    public static SocketManager getInstance() {
+        return HolderClass.instance;
+    }
+
+    private SocketManager() {
+
+    }
+
+    public void sendMsg(ChannelRequestProto.ChannelRequest channelRequest) throws Exception {
+
+        if(channelRequest == null) {
+            return;
+        }
+
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                SocketUtils.sendMsg(ctx, channelRequest);
+            }
+        });
+
+    }
+
+    public ChannelHandlerContext getCtx() {
+        return ctx;
+    }
+
+    public void setCtx(ChannelHandlerContext ctx) {
+        this.ctx = ctx;
+    }
+}

+ 40 - 0
src/main/java/com/mogo/xts/netty/client/XtsCoreClientHandler.java

@@ -0,0 +1,40 @@
+package com.mogo.xts.netty.client;
+
+import com.mogo.xts.enums.ActionEnum;
+import com.mogo.xts.netty.protobuf.ChannelRequestProto;
+import com.mogo.xts.netty.protobuf.ChannelResponseProto;
+import com.mogo.xts.utils.KidUtils;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @author Xipeng
+ **/
+@Slf4j
+@ChannelHandler.Sharable
+public class XtsCoreClientHandler extends ChannelInboundHandlerAdapter {
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        log.info("client receive msg " + msg);
+        ChannelResponseProto.ChannelResponse response = (ChannelResponseProto.ChannelResponse) msg;
+        DefaultFuture future = new DefaultFuture(response.getRequestId());
+        DefaultFuture.receive(response);
+        System.out.println(future.get());
+        ctx.fireChannelRead(ctx);
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        SocketManager.getInstance().setCtx(ctx);
+
+        ChannelRequestProto.ChannelRequest.Builder builder = ChannelRequestProto.ChannelRequest.newBuilder();
+        builder.setAction(ActionEnum.TEST.getAction());
+        builder.setRequestId(KidUtils.generateShortUuid());
+        builder.setContext(ActionEnum.TEST.getAction());
+        SocketManager.getInstance().sendMsg(builder.build());
+        ctx.fireChannelActive();
+    }
+}

+ 13 - 0
src/main/java/com/mogo/xts/netty/idle/ChannelHandlerHolder.java

@@ -0,0 +1,13 @@
+package com.mogo.xts.netty.idle;
+
+import io.netty.channel.ChannelHandler;
+
+/**
+ * 客户端的ChannelHandler集合
+ * Xipeng
+ */
+public interface ChannelHandlerHolder {
+
+    ChannelHandler[] handlers();
+
+}

+ 76 - 0
src/main/java/com/mogo/xts/netty/idle/HeartBeatClientHandler.java

@@ -0,0 +1,76 @@
+package com.mogo.xts.netty.idle;
+
+import com.mogo.xts.enums.ActionEnum;
+import com.mogo.xts.netty.protobuf.ChannelRequestProto;
+import com.mogo.xts.netty.protobuf.ChannelResponseProto;
+import com.mogo.xts.utils.KidUtils;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.CharsetUtil;
+import io.netty.util.ReferenceCountUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Date;
+
+/**
+ * @author Xipeng
+ **/
+@Slf4j
+@ChannelHandler.Sharable
+public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
+
+    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
+            CharsetUtil.UTF_8));
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        log.info("激活时间是:"+new Date());
+        log.info("HeartBeatClientHandler channelActive");
+        ctx.fireChannelActive();
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        log.info("停止时间是:"+new Date());
+        log.info("HeartBeatClientHandler channelInactive");
+    }
+
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        // 为了减少网络消耗,此处不会接收到任何服务端对于心跳的任何响应
+        ChannelResponseProto.ChannelResponse channelResponse = (ChannelResponseProto.ChannelResponse) msg;
+        if(channelResponse != null && StringUtils.equalsIgnoreCase(channelResponse.getAction(), ActionEnum.HEART_BEAT.getAction())) {
+            log.info("xts client heart channelRead ..");
+        } else {
+            ctx.fireChannelRead(msg);
+        }
+        ReferenceCountUtil.release(msg);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            IdleState state = ((IdleStateEvent) evt).state();
+            if (state == IdleState.WRITER_IDLE) {
+                ctx.writeAndFlush(createChannelRequest());
+            }
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
+
+    private ChannelRequestProto.ChannelRequest createChannelRequest() {
+        ChannelRequestProto.ChannelRequest.Builder builder = ChannelRequestProto.ChannelRequest.newBuilder();
+        builder.setAction(ActionEnum.HEART_BEAT.getAction());
+        builder.setRequestId(KidUtils.generateShortUuid());
+        builder.setContext(ActionEnum.HEART_BEAT.getAction());
+        return builder.build();
+    }
+}

+ 49 - 0
src/main/java/com/mogo/xts/netty/idle/HeartBeatServerHandler.java

@@ -0,0 +1,49 @@
+package com.mogo.xts.netty.idle;
+
+import com.mogo.xts.enums.ActionEnum;
+import com.mogo.xts.netty.protobuf.ChannelRequestProto;
+import com.mogo.xts.utils.SocketUtils;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * @author Xipeng
+ **/
+@Slf4j
+@ChannelHandler.Sharable
+public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        ChannelRequestProto.ChannelRequest channelRequest = (ChannelRequestProto.ChannelRequest)msg;
+        if(channelRequest != null && StringUtils.equalsIgnoreCase(channelRequest.getAction(), ActionEnum.HEART_BEAT.getAction())){
+             log.info("xts server heart channelRead ..");
+             log.info(ctx.channel().remoteAddress() + " -> Server : " + channelRequest.getContext());
+        } else {
+            ctx.fireChannelRead(msg);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if(evt instanceof IdleStateEvent) {
+            IdleState state = ((IdleStateEvent)evt).state();
+            if(state == IdleState.READER_IDLE) { // 如果出现读空闲,则认为客户端挂了,需要重连
+                throw new Exception("reader_idle exception");
+            }
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
+}

+ 87 - 0
src/main/java/com/mogo/xts/netty/idle/NettyConnectionWatchDog.java

@@ -0,0 +1,87 @@
+package com.mogo.xts.netty.idle;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 重连检测狗,当发现当前的链路不稳定关闭之后,进行12次重连
+ * @author Xipeng
+ **/
+@ChannelHandler.Sharable
+@Slf4j
+public abstract class NettyConnectionWatchDog extends ChannelInboundHandlerAdapter implements TimerTask, ChannelHandlerHolder {
+
+    private final Bootstrap bootstrap;
+    private final Timer timer;
+    private final int port;
+    private final String host;
+    private volatile boolean reconnect = true;
+    private volatile int attempts;
+
+    public NettyConnectionWatchDog(Bootstrap bootstrap, Timer timer, int port, String host, boolean reconnect) {
+        this.bootstrap = bootstrap;
+        this.timer = timer;
+        this.port = port;
+        this.host = host;
+        this.reconnect = reconnect;
+    }
+
+    /**
+     * channel链路每次active的时候,将其连接的次数重新0
+     */
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        log.info("当前链路已经激活了,重连尝试次数重新置为0");
+        attempts = 0;
+        ctx.fireChannelActive();
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        log.info("链接关闭");
+        if(reconnect){
+            log.info("链接关闭,将进行重连");
+            if (attempts < 12) {
+                attempts++;
+            }
+            //重连的间隔时间会越来越长
+            int timeout = 2 << attempts;
+            timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
+        }
+        ctx.fireChannelInactive();
+    }
+
+    public void run(Timeout timeout) throws Exception {
+        ChannelFuture future;
+        //bootstrap已经初始化好了,只需要将handler填入就可以了
+        synchronized (bootstrap) {
+            bootstrap.handler(new ChannelInitializer<Channel>(){
+                @Override
+                protected void initChannel(Channel ch) throws Exception {
+                    ch.pipeline().addLast(handlers());
+                }
+            });
+            future = bootstrap.connect(host,port);
+        }
+        //future对象
+        future.addListener(new ChannelFutureListener() {
+            public void operationComplete(ChannelFuture f) throws Exception {
+                boolean succeed = f.isSuccess();
+                //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
+                if (!succeed) {
+                    log.info("reconnect fail");
+                    f.channel().pipeline().fireChannelInactive();
+                }else{
+                    log.info("reconnect success");
+                }
+            }
+        });
+    }
+
+}

+ 858 - 0
src/main/java/com/mogo/xts/netty/protobuf/ChannelRequestProto.java

@@ -0,0 +1,858 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: ChannelRequest.proto
+
+package com.mogo.xts.netty.protobuf;
+
+public final class ChannelRequestProto {
+  private ChannelRequestProto() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+  }
+  public interface ChannelRequestOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // required string requestId = 1;
+    /**
+     * <code>required string requestId = 1;</code>
+     */
+    boolean hasRequestId();
+    /**
+     * <code>required string requestId = 1;</code>
+     */
+    String getRequestId();
+    /**
+     * <code>required string requestId = 1;</code>
+     */
+    com.google.protobuf.ByteString
+        getRequestIdBytes();
+
+    // required string action = 2;
+    /**
+     * <code>required string action = 2;</code>
+     */
+    boolean hasAction();
+    /**
+     * <code>required string action = 2;</code>
+     */
+    String getAction();
+    /**
+     * <code>required string action = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getActionBytes();
+
+    // required string context = 3;
+    /**
+     * <code>required string context = 3;</code>
+     */
+    boolean hasContext();
+    /**
+     * <code>required string context = 3;</code>
+     */
+    String getContext();
+    /**
+     * <code>required string context = 3;</code>
+     */
+    com.google.protobuf.ByteString
+        getContextBytes();
+  }
+  /**
+   * Protobuf type {@code netty.ChannelRequest}
+   */
+  public static final class ChannelRequest extends
+      com.google.protobuf.GeneratedMessage
+      implements ChannelRequestOrBuilder {
+    // Use ChannelRequest.newBuilder() to construct.
+    private ChannelRequest(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private ChannelRequest(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final ChannelRequest defaultInstance;
+    public static ChannelRequest getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public ChannelRequest getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private ChannelRequest(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              requestId_ = input.readBytes();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              action_ = input.readBytes();
+              break;
+            }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              context_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return ChannelRequestProto.internal_static_netty_ChannelRequest_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return ChannelRequestProto.internal_static_netty_ChannelRequest_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              ChannelRequest.class, Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<ChannelRequest> PARSER =
+        new com.google.protobuf.AbstractParser<ChannelRequest>() {
+      public ChannelRequest parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new ChannelRequest(input, extensionRegistry);
+      }
+    };
+
+    @Override
+    public com.google.protobuf.Parser<ChannelRequest> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // required string requestId = 1;
+    public static final int REQUESTID_FIELD_NUMBER = 1;
+    private Object requestId_;
+    /**
+     * <code>required string requestId = 1;</code>
+     */
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>required string requestId = 1;</code>
+     */
+    public String getRequestId() {
+      Object ref = requestId_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          requestId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>required string requestId = 1;</code>
+     */
+    public com.google.protobuf.ByteString
+        getRequestIdBytes() {
+      Object ref = requestId_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (String) ref);
+        requestId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // required string action = 2;
+    public static final int ACTION_FIELD_NUMBER = 2;
+    private Object action_;
+    /**
+     * <code>required string action = 2;</code>
+     */
+    public boolean hasAction() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>required string action = 2;</code>
+     */
+    public String getAction() {
+      Object ref = action_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          action_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>required string action = 2;</code>
+     */
+    public com.google.protobuf.ByteString
+        getActionBytes() {
+      Object ref = action_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (String) ref);
+        action_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // required string context = 3;
+    public static final int CONTEXT_FIELD_NUMBER = 3;
+    private Object context_;
+    /**
+     * <code>required string context = 3;</code>
+     */
+    public boolean hasContext() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>required string context = 3;</code>
+     */
+    public String getContext() {
+      Object ref = context_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          context_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>required string context = 3;</code>
+     */
+    public com.google.protobuf.ByteString
+        getContextBytes() {
+      Object ref = context_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (String) ref);
+        context_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    private void initFields() {
+      requestId_ = "";
+      action_ = "";
+      context_ = "";
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasAction()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasContext()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getRequestIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getActionBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, getContextBytes());
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getRequestIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getActionBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, getContextBytes());
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @Override
+    protected Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    public static ChannelRequest parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static ChannelRequest parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static ChannelRequest parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static ChannelRequest parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static ChannelRequest parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static ChannelRequest parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static ChannelRequest parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static ChannelRequest parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static ChannelRequest parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static ChannelRequest parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(ChannelRequest prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code netty.ChannelRequest}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements ChannelRequestOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return ChannelRequestProto.internal_static_netty_ChannelRequest_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return ChannelRequestProto.internal_static_netty_ChannelRequest_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                ChannelRequest.class, Builder.class);
+      }
+
+      // Construct using com.mogo.xts.netty.protobuf.ChannelRequestProto.ChannelRequest.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        requestId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        action_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        context_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return ChannelRequestProto.internal_static_netty_ChannelRequest_descriptor;
+      }
+
+      public ChannelRequest getDefaultInstanceForType() {
+        return ChannelRequest.getDefaultInstance();
+      }
+
+      public ChannelRequest build() {
+        ChannelRequest result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public ChannelRequest buildPartial() {
+        ChannelRequest result = new ChannelRequest(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.requestId_ = requestId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.action_ = action_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.context_ = context_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof ChannelRequest) {
+          return mergeFrom((ChannelRequest)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(ChannelRequest other) {
+        if (other == ChannelRequest.getDefaultInstance()) return this;
+        if (other.hasRequestId()) {
+          bitField0_ |= 0x00000001;
+          requestId_ = other.requestId_;
+          onChanged();
+        }
+        if (other.hasAction()) {
+          bitField0_ |= 0x00000002;
+          action_ = other.action_;
+          onChanged();
+        }
+        if (other.hasContext()) {
+          bitField0_ |= 0x00000004;
+          context_ = other.context_;
+          onChanged();
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        if (!hasAction()) {
+          
+          return false;
+        }
+        if (!hasContext()) {
+          
+          return false;
+        }
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        ChannelRequest parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (ChannelRequest) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // required string requestId = 1;
+      private Object requestId_ = "";
+      /**
+       * <code>required string requestId = 1;</code>
+       */
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>required string requestId = 1;</code>
+       */
+      public String getRequestId() {
+        Object ref = requestId_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          requestId_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      /**
+       * <code>required string requestId = 1;</code>
+       */
+      public com.google.protobuf.ByteString
+          getRequestIdBytes() {
+        Object ref = requestId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (String) ref);
+          requestId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>required string requestId = 1;</code>
+       */
+      public Builder setRequestId(
+          String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        requestId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string requestId = 1;</code>
+       */
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = getDefaultInstance().getRequestId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string requestId = 1;</code>
+       */
+      public Builder setRequestIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        requestId_ = value;
+        onChanged();
+        return this;
+      }
+
+      // required string action = 2;
+      private Object action_ = "";
+      /**
+       * <code>required string action = 2;</code>
+       */
+      public boolean hasAction() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>required string action = 2;</code>
+       */
+      public String getAction() {
+        Object ref = action_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          action_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      /**
+       * <code>required string action = 2;</code>
+       */
+      public com.google.protobuf.ByteString
+          getActionBytes() {
+        Object ref = action_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (String) ref);
+          action_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>required string action = 2;</code>
+       */
+      public Builder setAction(
+          String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        action_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string action = 2;</code>
+       */
+      public Builder clearAction() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        action_ = getDefaultInstance().getAction();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string action = 2;</code>
+       */
+      public Builder setActionBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        action_ = value;
+        onChanged();
+        return this;
+      }
+
+      // required string context = 3;
+      private Object context_ = "";
+      /**
+       * <code>required string context = 3;</code>
+       */
+      public boolean hasContext() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>required string context = 3;</code>
+       */
+      public String getContext() {
+        Object ref = context_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          context_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      /**
+       * <code>required string context = 3;</code>
+       */
+      public com.google.protobuf.ByteString
+          getContextBytes() {
+        Object ref = context_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (String) ref);
+          context_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>required string context = 3;</code>
+       */
+      public Builder setContext(
+          String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        context_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string context = 3;</code>
+       */
+      public Builder clearContext() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        context_ = getDefaultInstance().getContext();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string context = 3;</code>
+       */
+      public Builder setContextBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        context_ = value;
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:netty.ChannelRequest)
+    }
+
+    static {
+      defaultInstance = new ChannelRequest(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:netty.ChannelRequest)
+  }
+
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_netty_ChannelRequest_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_netty_ChannelRequest_fieldAccessorTable;
+
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    String[] descriptorData = {
+      "\n\024ChannelRequest.proto\022\005netty\"D\n\016Channel" +
+      "Request\022\021\n\trequestId\030\001 \002(\t\022\016\n\006action\030\002 \002" +
+      "(\t\022\017\n\007context\030\003 \002(\tB2\n\033com.mogo.xts.nett" +
+      "y.protobufB\023ChannelRequestProto"
+    };
+    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+        public com.google.protobuf.ExtensionRegistry assignDescriptors(
+            com.google.protobuf.Descriptors.FileDescriptor root) {
+          descriptor = root;
+          internal_static_netty_ChannelRequest_descriptor =
+            getDescriptor().getMessageTypes().get(0);
+          internal_static_netty_ChannelRequest_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_netty_ChannelRequest_descriptor,
+              new String[] { "RequestId", "Action", "Context", });
+          return null;
+        }
+      };
+    com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+        }, assigner);
+  }
+
+  // @@protoc_insertion_point(outer_class_scope)
+}

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 1023 - 0
src/main/java/com/mogo/xts/netty/protobuf/ChannelResponseProto.java


+ 71 - 0
src/main/java/com/mogo/xts/netty/server/NettyServer.java

@@ -0,0 +1,71 @@
+package com.mogo.xts.netty.server;
+
+import com.mogo.xts.netty.idle.HeartBeatServerHandler;
+import com.mogo.xts.netty.protobuf.ChannelRequestProto;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Xipeng
+ **/
+public class NettyServer {
+
+    private final int port;
+
+    public NettyServer(int port) {
+        this.port = port;
+    }
+
+    public static void main(String[] args) throws Exception{
+        if (args.length != 1) {
+            System.err.println("Usage: " + NettyServer.class.getSimpleName() + " <port>");
+        }
+        int port = Integer.parseInt(args[0]);
+        new NettyServer(port).start();
+    }
+
+    private void start() throws Exception {
+        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        try {
+            ServerBootstrap bootstrap = new ServerBootstrap();
+            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+                    .option(ChannelOption.SO_BACKLOG, 128)
+                    .option(ChannelOption.SO_KEEPALIVE, true)
+                    .handler(new LoggingHandler(LogLevel.INFO))
+                    .localAddress(new InetSocketAddress(port))
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+                        protected void initChannel(SocketChannel ch) throws Exception {
+                            ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.MINUTES));
+                            ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
+                            ch.pipeline().addLast(new ProtobufDecoder(ChannelRequestProto.ChannelRequest.getDefaultInstance()));
+                            ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
+                            ch.pipeline().addLast(new ProtobufEncoder());
+                            ch.pipeline().addLast(new HeartBeatServerHandler());
+                            ch.pipeline().addLast(new XtsCoreServerHandler());
+                        }
+                    });
+            ChannelFuture future = bootstrap.bind().sync();
+            future.channel().closeFuture().sync();
+        } catch (Exception e) {
+            bossGroup.shutdownGracefully().sync();
+            workerGroup.shutdownGracefully().sync();
+        }
+    }
+}

+ 51 - 0
src/main/java/com/mogo/xts/netty/server/XtsCoreServerHandler.java

@@ -0,0 +1,51 @@
+package com.mogo.xts.netty.server;
+
+import com.mogo.xts.netty.protobuf.ChannelRequestProto;
+import com.mogo.xts.netty.protobuf.ChannelResponseProto;
+import com.mogo.xts.utils.SocketUtils;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * @author Xipeng
+ **/
+@Slf4j
+@ChannelHandler.Sharable
+public class XtsCoreServerHandler extends ChannelInboundHandlerAdapter {
+
+    private static final int max_size = 100;
+
+    private static ExecutorService executorService = Executors.newFixedThreadPool(max_size);
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        log.info("xts server receive " + msg);
+        ChannelRequestProto.ChannelRequest request = (ChannelRequestProto.ChannelRequest) msg;
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                SocketUtils.sendMsg(ctx, createResponse(request));
+            }
+        });
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+    }
+
+    private ChannelResponseProto.ChannelResponse createResponse(ChannelRequestProto.ChannelRequest request) {
+        ChannelResponseProto.ChannelResponse.Builder builder =  ChannelResponseProto.ChannelResponse.newBuilder();
+        builder.setRequestId(request.getRequestId());
+        builder.setAction(request.getAction());
+        builder.setContext(request.getContext());
+        builder.setRetMsg("isOk");
+        return builder.build();
+    }
+}

+ 30 - 0
src/main/java/com/mogo/xts/utils/KidUtils.java

@@ -0,0 +1,30 @@
+package com.mogo.xts.utils;
+
+import java.util.Date;
+import java.util.UUID;
+
+/**
+ * @author Xipeng
+ **/
+public class KidUtils {
+
+    private static String[] chars = new String[]{"a", "b", "c", "d", "e", "f",
+            "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s",
+            "t", "u", "v", "w", "x", "y", "z", "0", "1", "2", "3", "4", "5",
+            "6", "7", "8", "9", "A", "B", "C", "D", "E", "F", "G", "H", "I",
+            "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V",
+            "W", "X", "Y", "Z"};
+
+
+    public static String generateShortUuid() {
+        StringBuffer shortBuffer = new StringBuffer();
+        String uuid = UUID.randomUUID().toString().replace("-", "");
+        for (int i = 0; i < 8; i++) {
+            String str = uuid.substring(i * 4, i * 4 + 4);
+            int x = Integer.parseInt(str, 16);
+            shortBuffer.append(chars[x % 0x3E]);
+        }
+        return shortBuffer.toString();
+
+    }
+}

+ 24 - 0
src/main/java/com/mogo/xts/utils/SocketUtils.java

@@ -0,0 +1,24 @@
+package com.mogo.xts.utils;
+
+import com.mogo.xts.netty.protobuf.ChannelRequestProto;
+import com.mogo.xts.netty.protobuf.ChannelResponseProto;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.ReferenceCountUtil;
+
+/**
+ * @author Xipeng
+ **/
+public class SocketUtils {
+
+    public static void sendMsg(final ChannelHandlerContext ctx, final ChannelResponseProto.ChannelResponse msg) {
+        ctx.writeAndFlush(msg);
+    }
+
+    public static void sendMsg(final ChannelHandlerContext ctx, final ChannelRequestProto.ChannelRequest msg) {
+        ctx.writeAndFlush(msg);
+    }
+
+
+}

+ 616 - 0
src/main/java/com/mogo/xts/utils/TimeUtil.java

@@ -0,0 +1,616 @@
+package com.mogo.xts.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.*;
+
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+
+public class TimeUtil {
+	public static final String YYYY_MM = "yyyy-MM";
+	public static final String YYYY_MM_DD = "yyyy-MM-dd";
+	public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
+	public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
+	public static final String YYYYMMDD = "yyyyMMdd";
+	public static final Timestamp MAX_EXPIRE_DATE = getMaxExpireTime();
+
+	private static Timestamp getMaxExpireTime() {
+		Timestamp rtn = Timestamp.valueOf("2100-01-01 00:00:00");
+		return rtn;
+	}
+
+	public static Date addOrMinusYear(long ti, int i) throws Exception {
+		Date rtn = null;
+		GregorianCalendar cal = new GregorianCalendar();
+		Date date = new Date(ti);
+		cal.setTime(date);
+		cal.add(1, i);
+		rtn = cal.getTime();
+		return rtn;
+	}
+
+	public static Date addOrMinusMonth(long ti, int i) throws Exception {
+		Date rtn = null;
+		GregorianCalendar cal = new GregorianCalendar();
+		Date date = new Date(ti);
+		cal.setTime(date);
+		cal.add(2, i);
+		rtn = cal.getTime();
+		return rtn;
+	}
+
+	public static Date addOrMinusWeek(long ti, int i) {
+		Date rtn = null;
+		GregorianCalendar cal = new GregorianCalendar();
+		Date date = new Date(ti);
+		cal.setTime(date);
+		cal.add(3, i);
+		rtn = cal.getTime();
+		return rtn;
+	}
+
+	public static Date addOrMinusDays(long ti, int i) {
+		Date rtn = null;
+		GregorianCalendar cal = new GregorianCalendar();
+		Date date = new Date(ti);
+		cal.setTime(date);
+		cal.add(5, i);
+		rtn = cal.getTime();
+		return rtn;
+	}
+
+	public static Date addOrMinusHours(long ti, int i) {
+		Date rtn = null;
+		GregorianCalendar cal = new GregorianCalendar();
+		Date date = new Date(ti);
+		cal.setTime(date);
+		cal.add(10, i);
+		rtn = cal.getTime();
+		return rtn;
+	}
+
+	public static Date addOrMinusMinutes(long ti, int i) {
+		Date rtn = null;
+		GregorianCalendar cal = new GregorianCalendar();
+		Date date = new Date(ti);
+		cal.setTime(date);
+		cal.add(12, i);
+		rtn = cal.getTime();
+		return rtn;
+	}
+
+	public static Date addOrMinusSecond(long ti, int i) {
+		Date rtn = null;
+		GregorianCalendar cal = new GregorianCalendar();
+		Date date = new Date(ti);
+		cal.setTime(date);
+		cal.add(13, i);
+		rtn = cal.getTime();
+		return rtn;
+	}
+
+	public static int yearsBetween(Date start, Date end) {
+		return Years.yearsBetween(LocalDate.fromDateFields(start),
+				LocalDate.fromDateFields(end)).getYears();
+	}
+
+	public static int monthsBetween(Date start, Date end) {
+		return Months.monthsBetween(LocalDate.fromDateFields(start),
+				LocalDate.fromDateFields(end)).getMonths();
+	}
+
+	public static int weeksBetween(Date start, Date end) {
+		return Weeks.weeksBetween(LocalDate.fromDateFields(start),
+				LocalDate.fromDateFields(end)).getWeeks();
+	}
+
+	public static int daysBetween(Date start, Date end) {
+		return Days.daysBetween(LocalDate.fromDateFields(start),
+				LocalDate.fromDateFields(end)).getDays();
+	}
+
+	public static int hoursBetween(Date start, Date end) {
+		return Hours.hoursBetween(LocalDate.fromDateFields(start),
+				LocalDate.fromDateFields(end)).getHours();
+	}
+
+	public static int minutesBetween(Date start, Date end) {
+		return Minutes.minutesBetween(LocalDate.fromDateFields(start),
+				LocalDate.fromDateFields(end)).getMinutes();
+	}
+
+	public static int secondsBetween(Date start, Date end) {
+		return Seconds.secondsBetween(LocalDate.fromDateFields(start),
+				LocalDate.fromDateFields(end)).getSeconds();
+	}
+
+	public static Timestamp getNextMonthStartDate(Date date) {
+		Calendar rightNow = Calendar.getInstance();
+		rightNow.setTime(date);
+		rightNow.set(5, 1);
+		rightNow.set(11, 0);
+		rightNow.set(14, 0);
+		rightNow.set(13, 0);
+		rightNow.set(12, 0);
+		rightNow.set(2, rightNow.get(2) + 1);
+		return new Timestamp(rightNow.getTimeInMillis());
+	}
+
+	public static Timestamp getBeforeMonthStartDate(Date date) {
+		Calendar rightNow = Calendar.getInstance();
+		rightNow.setTime(date);
+		rightNow.set(5, 1);
+		rightNow.set(11, 0);
+		rightNow.set(14, 0);
+		rightNow.set(13, 0);
+		rightNow.set(12, 0);
+		rightNow.set(2, rightNow.get(2) - 1);
+		return new Timestamp(rightNow.getTimeInMillis());
+	}
+
+	public static Timestamp getCurrentMonthEndDate(Date date) {
+		Calendar rightNow = Calendar.getInstance();
+		rightNow.setTime(date);
+		rightNow.set(5, rightNow.getActualMaximum(5));
+		rightNow.set(11, 23);
+		rightNow.set(14, 59);
+		rightNow.set(13, 59);
+		rightNow.set(12, 59);
+		rightNow.set(2, rightNow.get(2));
+		return new Timestamp(rightNow.getTimeInMillis());
+	}
+
+	public static Timestamp getCurrentMonthFirstDate(Date date) {
+		Calendar rightNow = Calendar.getInstance();
+		rightNow.setTime(date);
+		rightNow.set(5, 1);
+		rightNow.set(11, 0);
+		rightNow.set(14, 0);
+		rightNow.set(13, 0);
+		rightNow.set(12, 0);
+		rightNow.set(2, rightNow.get(2));
+		return new Timestamp(rightNow.getTimeInMillis());
+	}
+
+	public static Timestamp getCurrentDayEndDate(Date date) {
+		Calendar rightNow = Calendar.getInstance();
+		rightNow.setTime(date);
+		rightNow.set(11, 23);
+		rightNow.set(14, 59);
+		rightNow.set(13, 59);
+		rightNow.set(12, 59);
+		rightNow.set(2, rightNow.get(2));
+		return new Timestamp(rightNow.getTimeInMillis());
+	}
+
+	public static Timestamp getCurrentDayStartDate(Date date) {
+		Calendar rightNow = Calendar.getInstance();
+		rightNow.setTime(date);
+		rightNow.set(11, 0);
+		rightNow.set(14, 0);
+		rightNow.set(13, 0);
+		rightNow.set(12, 0);
+		rightNow.set(2, rightNow.get(2));
+		return new Timestamp(rightNow.getTimeInMillis());
+	}
+
+	public static Timestamp getBeforeDayEndDate(Date date) {
+		Calendar rightNow = Calendar.getInstance();
+		rightNow.setTime(date);
+		rightNow.set(5, rightNow.get(5) - 1);
+		rightNow.set(11, 23);
+		rightNow.set(14, 59);
+		rightNow.set(13, 59);
+		rightNow.set(12, 59);
+		rightNow.set(2, rightNow.get(2));
+		return new Timestamp(rightNow.getTimeInMillis());
+	}
+
+	public static Timestamp getNextDayStartDay(Date date) {
+		Calendar rightNow = Calendar.getInstance();
+		rightNow.setTime(date);
+		rightNow.set(5, rightNow.get(5) + 1);
+		rightNow.set(11, 0);
+		rightNow.set(14, 0);
+		rightNow.set(13, 0);
+		rightNow.set(12, 0);
+		rightNow.set(2, rightNow.get(2));
+		return new Timestamp(rightNow.getTimeInMillis());
+	}
+
+	public static String getYYYY_MM_DD(Date date) {
+		if (date == null) {
+			return null;
+		}
+		DateFormat dateformat = new SimpleDateFormat("yyyy-MM-dd");
+		return dateformat.format(date);
+	}
+
+	public static String getYYYYMMDDHHMMSS(Date date) {
+		if (date == null) {
+			return null;
+		}
+		DateFormat dateformat = new SimpleDateFormat("yyyyMMddHHmmss");
+		return dateformat.format(date);
+	}
+
+	public static String getYYYY_MM_DD_HH_MM_SS(Date date) {
+		if (date == null) {
+			return null;
+		}
+		DateFormat dateformat = new SimpleDateFormat(YYYY_MM_DD_HH_MM_SS);
+		return dateformat.format(date);
+	}
+
+	public static Date getYYYY_MM_DD_HH_MM_SS(String dateString) {
+		Date date = null;
+		try {
+			if (StringUtils.isBlank(dateString)) {
+				return null;
+			}
+			DateFormat dateformat = new SimpleDateFormat(YYYY_MM_DD_HH_MM_SS);
+			date = dateformat.parse(dateString);
+		} catch (ParseException e) {
+			e.printStackTrace();
+		}
+
+		return date;
+
+	}
+	
+	public static Date getYYYY_MM_DD(String dateString) {
+		Date date = null;
+		try {
+			if (dateString == null) {
+				return null;
+			}
+			DateFormat dateformat = new SimpleDateFormat(YYYY_MM_DD);
+			date = dateformat.parse(dateString);
+		} catch (ParseException e) {
+			e.printStackTrace();
+		}
+
+		return date;
+
+	}
+
+	public static String getYYYYMMDD(Date date) {
+		if (date == null) {
+			return null;
+		}
+		DateFormat dateformat = new SimpleDateFormat("yyyyMMdd");
+		return dateformat.format(date);
+	}
+
+	public static String getYYYYMM(Date date) {
+		if (date == null) {
+			return null;
+		}
+		DateFormat dateformat = new SimpleDateFormat("yyyyMM");
+		return dateformat.format(date);
+	}
+
+	public static String getMM(Date date) {
+		if (date == null) {
+			return null;
+		}
+		DateFormat dateformat = new SimpleDateFormat("MM");
+		return dateformat.format(date);
+	}
+
+	public static Timestamp getSysDate() {
+		Timestamp rtn = new Timestamp(System.currentTimeMillis());
+		return rtn;
+	}
+
+	public static Timestamp getMaxExpire() {
+		return MAX_EXPIRE_DATE;
+	}
+
+	public static Timestamp getTimstampByString(String strDate, String mask)
+			throws Exception {
+		if (strDate == null) {
+			return null;
+		}
+		DateFormat dateformat = new SimpleDateFormat(mask);
+		return new Timestamp(dateformat.parse(strDate).getTime());
+	}
+
+	public static Timestamp getBillMonthDate(Date beginDate, Date endDate) {
+		if (null == beginDate) {
+			return null;
+		}
+
+		Timestamp monthEndDate = new Timestamp(addOrMinusDays(
+				getNextMonthStartDate(endDate).getTime(), -1).getTime());
+		return new Timestamp(monthEndDate.getTime());
+	}
+
+	public static Timestamp getTruncDate(Date date) {
+		Calendar rightNow = Calendar.getInstance();
+		rightNow.setTime(date);
+		rightNow.set(11, 0);
+		rightNow.set(14, 0);
+		rightNow.set(13, 0);
+		rightNow.set(12, 0);
+		return new Timestamp(rightNow.getTimeInMillis());
+	}
+
+	public static long getDistanceTime(Date date1, Date date2) {
+		return date1.getTime() - date2.getTime();
+	}
+
+	/**
+	 * 获取n天之后(之前)的时间
+	 * 
+	 * @param date
+	 * @param cutDay
+	 * @return
+	 */
+	public static Date addDistanceTime(Date date, String cutDay) {
+		if (StringUtils.isBlank(cutDay)) {
+			return date;
+		}
+		Calendar calendar = Calendar.getInstance();
+		calendar.setTime(date);
+		calendar.add(Calendar.DAY_OF_MONTH, +Integer.parseInt(cutDay));// +1今天的时间加一天
+		date = calendar.getTime();
+		return date;
+	}
+
+	/**
+	 * 比较时间大小 date1=date2: return 0 --- date1>date2: return 1 --- date1<date2:
+	 * return -1 ---
+	 */
+	public static int compare_date(Date date1, Date date2) {
+		DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
+		try {
+			Date dt1 = df.parse(df.format(date1));
+			Date dt2 = df.parse(df.format(date2));
+			if (dt1.getTime() > dt2.getTime()) {
+				return 1;
+			} else if (dt1.getTime() < dt2.getTime()) {
+				return -1;
+			} else {
+				return 0;
+			}
+		} catch (Exception exception) {
+			exception.printStackTrace();
+		}
+		return 0;
+	}
+
+	/**
+	 * 获取指定日期前后几天的开始时间
+	 * 
+	 */
+	public static Date getSpecifiedDayBefore(Date time, int cutDay) {
+		DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
+		Calendar c = Calendar.getInstance();
+		Date date = null;
+		try {
+			c.setTime(time);
+			int day = c.get(Calendar.DATE);
+			c.set(Calendar.DATE, day + cutDay);
+			String dayBefore = new SimpleDateFormat("yyyy-MM-dd").format(c
+					.getTime());
+			date = df.parse(dayBefore + " 00:00:00");
+		} catch (ParseException e) {
+			e.printStackTrace();
+		}
+		return date;
+	}
+
+	/**
+	 * 获取指定日期前后几天的结束时间
+	 * 
+	 */
+	public static Date getSpecifiedDayAfter(Date time, int cutDay) {
+		DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
+		Calendar c = Calendar.getInstance();
+		Date date = null;
+		try {
+			c.setTime(time);
+			int day = c.get(Calendar.DATE);
+			c.set(Calendar.DATE, day + cutDay);
+			String dayBefore = new SimpleDateFormat("yyyy-MM-dd").format(c
+					.getTime());
+			date = df.parse(dayBefore + " 23:59:59");
+		} catch (ParseException e) {
+			e.printStackTrace();
+		}
+		return date;
+	}
+	
+	/**
+	 * 判断时间是否在时间段内
+	 * 
+	 * @param nowTime
+	 * @param beginTime
+	 * @param endTime
+	 * @return
+	 */
+	public static boolean betweenCalendar(Date nowTime, Date beginTime, Date endTime) {
+		Calendar date = Calendar.getInstance();
+		date.setTime(nowTime);
+		Calendar begin = Calendar.getInstance();
+		begin.setTime(beginTime);
+		Calendar end = Calendar.getInstance();
+		end.setTime(endTime);
+		if (date.after(begin) && date.before(end)) {
+			return true;
+		} else if (nowTime.compareTo(beginTime) == 0
+				|| nowTime.compareTo(endTime) == 0) {
+			return true;
+		} else {
+			return false;
+		}
+	}
+	
+	/**
+	 * 两个日期相差多少天小时分钟等
+	 * @param startDate
+	 * @param endDate
+	 * @param type 1 天 2 小时 3 分钟 4 秒
+	 * @return
+	 */
+	public static long getDatePoor(Date startDate, Date endDate, int type) {
+
+		long nd = 1000 * 24 * 60 * 60;
+		long nh = 1000 * 60 * 60;
+		long nm = 1000 * 60;
+		long ns = 1000;
+		// 获得两个时间的毫秒时间差异
+		long diff = endDate.getTime() - startDate.getTime();
+		// 计算差多少天
+		if (type == 1) {
+			return diff / nd;
+			// 计算差多少小时
+		} else if (type == 2) {
+			return diff / nh;
+			// 计算差多少分钟
+		} else if (type == 3) {
+			return diff / nm;
+			// 计算差多少秒
+		} else if (type == 4){
+			return diff / ns;
+		}
+		return 0;
+
+	}
+	
+	/**
+	 * 获取当天的零点零分零秒和23点59分59秒
+	 * flag  1:零点 2:23点
+	 * @return
+	 * @throws ParseException 
+	 */
+	public static Timestamp getThisDayStartEndDate(int flag) throws ParseException {
+	    SimpleDateFormat formater = new SimpleDateFormat("yyyy/MM/dd");
+	    SimpleDateFormat formater2 = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+	     
+	    Date start = formater2.parse(formater.format(new Date())+ " 00:00:00");
+	    Date end = formater2.parse(formater.format(new Date())+ " 23:59:59");
+        if(flag==1){
+        	return new Timestamp(start.getTime()); 
+        }
+    	return new Timestamp(end.getTime());
+	}
+	
+	/**
+	   * 比较时间大小
+	   * date1=date2: return 2 ---
+	   * date1>date2: return 1 ---
+	   * date1<date2: return -1 ---
+	   */
+	  public static int compare_date2(Date date1,Date date2){
+		  int i =0;
+		  try {
+	          if (date1.getTime() > date2.getTime()) {
+	              i=1;
+	          } else if (date1.getTime() < date2.getTime()) {
+	              i=-1;
+	          } else {
+	              i=2;
+	          }
+	      } catch (Exception exception) {
+	          exception.printStackTrace();
+	      }
+	      return i;
+	  }
+
+	/**
+	 * 通过时间秒毫秒数判断两个时间的间隔
+	 * 
+	 * @param date1
+	 * @param date2
+	 *            date2 > date1
+	 * @return
+	 */
+	public static int differentDaysByMillisecond(Date date1, Date date2) {
+		int days = (int) ((date2.getTime() - date1.getTime()) / (1000 * 3600 * 24));
+		return days;
+	}
+	
+	/**
+	 * 将时间戳转换为时间
+	 * @param s
+	 * @return
+	 */
+	public static Date stampToDate(String s) {
+		return new Date(new Long(s));
+	}
+
+    public static String getSmsYMD(Date date){
+        String str = new SimpleDateFormat("yyyy年MM月dd日").format(date);
+        StringBuilder sb = new StringBuilder();
+        if (StringUtils.isNotBlank(str)){
+            sb.append(str.substring(2,4)+"年");
+            sb.append(str.substring(str.indexOf("年")+1,str.indexOf("月"))+"月");
+            sb.append(str.substring(str.indexOf("月")+1,str.indexOf("日"))+"日");
+        }
+        return  sb.toString();
+    }
+
+    public static String getSmsMD(Date date){
+        String str = new SimpleDateFormat("yyyy年MM月dd日").format(date);
+        StringBuilder sb = new StringBuilder();
+        if (StringUtils.isNotBlank(str)){
+
+            sb.append(str.substring(str.indexOf("年")+1,str.indexOf("月"))+"月");
+            sb.append(str.substring(str.indexOf("月")+1,str.indexOf("日"))+"日");
+        }
+        return  sb.toString();
+    }
+
+    public static String getSmsMDH(Date date){
+        String str = new SimpleDateFormat("yyyy年MM月dd日HH时mm分ss秒").format(date);
+        StringBuilder sb = new StringBuilder();
+        if (StringUtils.isNotBlank(str)){
+
+            sb.append(str.substring(str.indexOf("年")+1,str.indexOf("月"))+"月");
+            sb.append(str.substring(str.indexOf("月")+1,str.indexOf("日"))+"日");
+            sb.append(str.substring(str.indexOf("日")+1,str.indexOf("时"))+"时");
+        }
+        return  sb.toString();
+    }
+
+
+    public static String getSmsDay(Date date){
+        String str = new SimpleDateFormat("yyyy年MM月dd日").format(date);
+        StringBuilder sb = new StringBuilder();
+        if (StringUtils.isNotBlank(str)){
+            sb.append(str.substring(str.indexOf("月")+1,str.indexOf("日"))+"日");
+        }
+        return  sb.toString();
+    }
+
+    //获取月份最后一天
+    public static String getMonthLastDay(Date date){
+		 String lastday;
+		 Calendar cale = null;
+		 SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+		 cale = Calendar.getInstance();
+		 cale.setTime(date);
+		 cale.add(Calendar.MONTH, 1);
+		 cale.set(Calendar.DAY_OF_MONTH, 0);
+		 lastday = format.format(cale.getTime());
+         return  lastday;
+    }
+
+    public static void main(String[] args) {
+    	try {
+			Date a=addOrMinusMonth(new Date().getTime() , 2);
+			System.out.println(getYYYY_MM_DD_HH_MM_SS(a));
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+}

+ 0 - 0
src/main/resources/application.properties


+ 27 - 0
src/test/java/com/mogo/nio/BufferTest.java

@@ -0,0 +1,27 @@
+package com.mogo.nio;
+
+import io.netty.util.NettyRuntime;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Xipeng
+ * @create 2019-04-17 20:56
+ **/
+public class BufferTest {
+
+    public static void main(String[] args) {
+//        int i = 0;
+//        while (i <= 100) {
+//            if((i & -i) == i) System.out.println(i);
+//            i++;
+//        }
+
+        AtomicInteger idx = new AtomicInteger();
+        System.out.println(7 & 7);
+    }
+}

+ 91 - 0
src/test/java/com/mogo/nio/NioServer.java

@@ -0,0 +1,91 @@
+package com.mogo.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Xipeng
+ * @create 2019-03-31 17:25
+ **/
+public class NioServer {
+
+    private static final Map<String, SocketChannel> clientMap = new ConcurrentHashMap();
+
+    public static void main(String[] args) {
+
+        try {
+            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+            serverSocketChannel.configureBlocking(false);
+
+            Selector selector = Selector.open();
+            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+            serverSocketChannel.bind(new InetSocketAddress(8899));
+
+            while (true) {
+                selector.select();
+                Set<SelectionKey> selectionKeys = selector.selectedKeys();
+                selectionKeys.forEach(value -> {
+                    try {
+                        if(value.isAcceptable()) {
+                            ServerSocketChannel channel = (ServerSocketChannel)value.channel();
+                            SocketChannel clientChannel = channel.accept();
+                            clientChannel.configureBlocking(false);
+                            String clientId = UUID.randomUUID().toString();
+                            System.out.println("客户端接入" + clientId);
+                            clientMap.put(clientId, clientChannel);
+                            clientChannel.register(selector, SelectionKey.OP_READ);
+                        } else if(value.isReadable()) {
+                            SocketChannel clientChannel = (SocketChannel)value.channel();
+                            ByteBuffer buffer = ByteBuffer.allocate(1024);
+                            int count = clientChannel.read(buffer);
+                            if (count > 0) {
+                                buffer.flip();
+                                Charset charset = Charset.forName("utf-8");
+                                String receiveMsg = String.valueOf(charset.decode(buffer).array());
+                                System.out.println("receiveMsg = " +receiveMsg);
+                                Iterator<Map.Entry<String, SocketChannel>> it = clientMap.entrySet().iterator();
+                                String sendClient = null;
+                                while (it.hasNext()) {
+                                    Map.Entry<String, SocketChannel> next = it.next();
+                                    if(next.getValue() == clientChannel) {
+                                        sendClient = next.getKey();
+                                        break;
+                                    }
+                                }
+                                it = clientMap.entrySet().iterator();
+                                ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
+                                while (it.hasNext()) {
+                                    SocketChannel socketChannel = it.next().getValue();
+                                    writeBuffer.clear();
+                                    writeBuffer.put(("sendClient:" + sendClient + "发送了消息").getBytes());
+                                    writeBuffer.flip();
+                                    socketChannel.write(writeBuffer);
+                                }
+                            }
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                });
+                selectionKeys.clear();
+            }
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+        }
+    }
+
+}

+ 31 - 0
src/test/java/com/mogo/xts/XtsApplicationTests.java

@@ -0,0 +1,31 @@
+package com.mogo.xts;
+
+import com.mogo.xts.enums.ActionEnum;
+import com.mogo.xts.netty.client.DefaultFuture;
+import com.mogo.xts.netty.client.NettyClient;
+import com.mogo.xts.netty.client.SocketManager;
+import com.mogo.xts.netty.protobuf.ChannelRequestProto;
+import com.mogo.xts.netty.protobuf.ChannelResponseProto;
+import com.mogo.xts.utils.KidUtils;
+import com.mogo.xts.utils.SocketUtils;
+import io.netty.channel.ChannelHandlerContext;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class
+XtsApplicationTests {
+	@Test
+	public void sendMsg() throws Exception{
+		/*ChannelRequestProto.ChannelRequest.Builder builder = ChannelRequestProto.ChannelRequest.newBuilder();
+		builder.setAction(ActionEnum.TEST.getAction());
+		builder.setRequestId(KidUtils.generateShortUuid());
+		builder.setContext(ActionEnum.TEST.getAction());
+		ChannelResponseProto.ChannelResponse response = NettyClient.send(builder.build());
+		System.out.println(response);*/
+	}
+
+}