精华内容
下载资源
问答
  • 2017年08月02日 17:13:15 阅读数:1804 标签: socketandroid通信心跳检测 更多 个人分类: android-杂项 在Android开发中,我们可能需要和服务器保持连接不断开,这时需要用到socket通信的长连接,并且定时...

    Android socket通信的长连接与心跳检测

    2017年08月02日 17:13:15 阅读数:1804 标签: socketandroid通信心跳检测 更多

    个人分类: android-杂项

    在Android开发中,我们可能需要和服务器保持连接不断开,这时需要用到socket通信的长连接,并且定时发送消息检测是否是连接状态——心跳检测。 
    我们需要一个客户端和一个服务器端的demo,现在我就贴出重要代码,主要是android客户端的,服务器端的demo供大家下载。 
    首先我们需要新建一个BackService类来继承Service:

    package com.example.sockettest;  
    
    import java.io.IOException;  
    import java.io.InputStream;  
    import java.io.OutputStream;  
    import java.lang.ref.WeakReference;  
    import java.net.Socket;  
    import java.net.UnknownHostException;  
    import java.util.Arrays;  
    
    import android.annotation.SuppressLint;  
    import android.app.Service;  
    import android.content.Intent;  
    import android.os.Handler;  
    import android.os.IBinder;  
    import android.os.RemoteException;  
    import android.util.Log;  
    
    public class BackService extends Service {  
        private static final String TAG = "BackService";  
        /** 心跳检测时间  */  
        private static final long HEART_BEAT_RATE = 3 * 1000;  
        /** 主机IP地址  */  
        private static final String HOST = "192.168.1.104";  
        /** 端口号  */  
        public static final int PORT = 9800;  
        /** 消息广播  */  
        public static final String MESSAGE_ACTION = "org.feng.message_ACTION";  
        /** 心跳广播  */  
        public static final String HEART_BEAT_ACTION = "org.feng.heart_beat_ACTION";  
    
        private long sendTime = 0L;  
    
        /** 弱引用 在引用对象的同时允许对垃圾对象进行回收  */  
        private WeakReference<Socket> mSocket;  
    
        private ReadThread mReadThread;  
    
        private IBackService.Stub iBackService = new IBackService.Stub() {  
            @Override  
            public boolean sendMessage(String message) throws RemoteException {  
                return sendMsg(message);  
            }  
        };  
    
        @Override  
        public IBinder onBind(Intent arg0) {  
            return (IBinder) iBackService;  
        }  
    
        @Override  
        public void onCreate() {  
            super.onCreate();  
            new InitSocketThread().start();  
        }  
    
        // 发送心跳包  
        private Handler mHandler = new Handler();  
        private Runnable heartBeatRunnable = new Runnable() {  
            @Override  
            public void run() {  
                if (System.currentTimeMillis() - sendTime >= HEART_BEAT_RATE) {  
                    boolean isSuccess = sendMsg("");// 就发送一个\r\n过去, 如果发送失败,就重新初始化一个socket  
                    if (!isSuccess) {  
                        mHandler.removeCallbacks(heartBeatRunnable);  
                        mReadThread.release();  
                        releaseLastSocket(mSocket);  
                        new InitSocketThread().start();  
                    }  
                }  
                mHandler.postDelayed(this, HEART_BEAT_RATE);  
            }  
        };  
    
        public boolean sendMsg(String msg) {  
            if (null == mSocket || null == mSocket.get()) {  
                return false;  
            }  
            Socket soc = mSocket.get();  
            try {  
                if (!soc.isClosed() && !soc.isOutputShutdown()) {  
                    OutputStream os = soc.getOutputStream();  
                    String message = msg + "\r\n";  
                    os.write(message.getBytes());  
                    os.flush();  
                    sendTime = System.currentTimeMillis();// 每次发送成功数据,就改一下最后成功发送的时间,节省心跳间隔时间  
                    Log.i(TAG, "发送成功的时间:" + sendTime);  
                } else {  
                    return false;  
                }  
            } catch (IOException e) {  
                e.printStackTrace();  
                return false;  
            }  
            return true;  
        }  
    
        // 初始化socket  
        private void initSocket() throws UnknownHostException, IOException {  
            Socket socket = new Socket(HOST, PORT);  
            mSocket = new WeakReference<Socket>(socket);  
            mReadThread = new ReadThread(socket);  
            mReadThread.start();  
            mHandler.postDelayed(heartBeatRunnable, HEART_BEAT_RATE);// 初始化成功后,就准备发送心跳包  
        }  
    
        // 释放socket  
        private void releaseLastSocket(WeakReference<Socket> mSocket) {  
            try {  
                if (null != mSocket) {  
                    Socket sk = mSocket.get();  
                    if (!sk.isClosed()) {  
                        sk.close();  
                    }  
                    sk = null;  
                    mSocket = null;  
                }  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
    
        class InitSocketThread extends Thread {  
            @Override  
            public void run() {  
                super.run();  
                try {  
                    initSocket();  
                } catch (UnknownHostException e) {  
                    e.printStackTrace();  
                } catch (IOException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
    
        public class ReadThread extends Thread {  
            private WeakReference<Socket> mWeakSocket;  
            private boolean isStart = true;  
    
            public ReadThread(Socket socket) {  
                mWeakSocket = new WeakReference<Socket>(socket);  
            }  
    
            public void release() {  
                isStart = false;  
                releaseLastSocket(mWeakSocket);  
            }  
    
            @SuppressLint("NewApi")  
            @Override  
            public void run() {  
                super.run();  
                Socket socket = mWeakSocket.get();  
                if (null != socket) {  
                    try {  
                        InputStream is = socket.getInputStream();  
                        byte[] buffer = new byte[1024 * 4];  
                        int length = 0;  
                        while (!socket.isClosed() && !socket.isInputShutdown()  
                                && isStart && ((length = is.read(buffer)) != -1)) {  
                            if (length > 0) {  
                                String message = new String(Arrays.copyOf(buffer,  
                                        length)).trim();   
                                Log.i(TAG, "收到服务器发送来的消息:"+message);   
                                // 收到服务器过来的消息,就通过Broadcast发送出去    
                                if (message.equals("ok")) {// 处理心跳回复  
                                    Intent intent = new Intent(HEART_BEAT_ACTION);  
                                    sendBroadcast(intent);  
                                } else {  
                                    // 其他消息回复  
                                    Intent intent = new Intent(MESSAGE_ACTION);  
                                    intent.putExtra("message", message);  
                                    sendBroadcast(intent);  
                                }  
                            }   
                        }  
                    } catch (IOException e) {  
                        e.printStackTrace();  
                    }  
                }  
            }  
        }  
    }  
    •  

    关键代码已经注释了,相信大家应该可以看懂。在这个类中关联了一个IBackService的类,新建一个IBackService.aidl。对,没错,就是新建一个IBackService.aidl,关于aidl的知识请查阅相关文档。代码如下:

    package com.example.sockettest;  
    interface IBackService{  
        boolean sendMessage(String message);  
    }  
    
    •  

    现在就是MainActivity了,这就是一个activity,接收广播,改变UI,就不多说了:

    package com.example.sockettest;  
    
    import android.app.Activity;  
    import android.content.BroadcastReceiver;  
    import android.content.ComponentName;  
    import android.content.Context;  
    import android.content.Intent;  
    import android.content.IntentFilter;  
    import android.content.ServiceConnection;  
    import android.os.Bundle;  
    import android.os.IBinder;  
    import android.os.RemoteException;  
    import android.util.Log;  
    import android.view.View;  
    import android.view.View.OnClickListener;  
    import android.widget.Button;  
    import android.widget.EditText;  
    import android.widget.TextView;  
    import android.widget.Toast;  
    
    public class MainActivity extends Activity {  
        private static final String TAG = "MainActivity";  
    
        private Intent mServiceIntent;  
        private IBackService iBackService;  
        private TextView tv;  
        private EditText et;  
        private Button btn;  
    
        @Override  
        protected void onCreate(Bundle savedInstanceState) {  
            super.onCreate(savedInstanceState);  
            setContentView(R.layout.activity_main);  
    
            initViews();  
            initData();  
        }  
    
        private void initViews() {  
            tv = (TextView) findViewById(R.id.tv);  
            et = (EditText) findViewById(R.id.editText1);  
            btn = (Button) findViewById(R.id.button1);  
        }  
    
        private void initData() {  
            mServiceIntent = new Intent(this, BackService.class);  
            btn.setOnClickListener(new OnClickListener() {  
                @Override  
                public void onClick(View v) {  
                    String string = et.getText().toString().trim();  
                    Log.i(TAG, string);  
                    try {  
                        Log.i(TAG, "是否为空:" + iBackService);  
                        if (iBackService == null) {  
                            Toast.makeText(getApplicationContext(),  
                                    "没有连接,可能是服务器已断开", Toast.LENGTH_SHORT).show();  
                        } else {  
                            boolean isSend = iBackService.sendMessage(string);  
                            Toast.makeText(MainActivity.this,  
                                    isSend ? "success" : "fail", Toast.LENGTH_SHORT)  
                                    .show();  
                            et.setText("");  
                        }  
                    } catch (RemoteException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
        }  
    
        @Override  
        protected void onStart() {  
            super.onStart();  
            bindService(mServiceIntent, conn, BIND_AUTO_CREATE);  
            // 开始服务  
            registerReceiver();  
        }  
    
        @Override  
        protected void onResume() {  
            super.onResume();  
            // 注册广播 最好在onResume中注册  
            // registerReceiver();  
        }  
    
        @Override  
        protected void onPause() {  
            super.onPause();  
            // 注销广播 最好在onPause上注销  
            unregisterReceiver(mReceiver);  
            // 注销服务  
            unbindService(conn);  
        }  
    
        // 注册广播  
        private void registerReceiver() {  
            IntentFilter intentFilter = new IntentFilter();  
            intentFilter.addAction(BackService.HEART_BEAT_ACTION);  
            intentFilter.addAction(BackService.MESSAGE_ACTION);  
            registerReceiver(mReceiver, intentFilter);  
        }  
    
        private BroadcastReceiver mReceiver = new BroadcastReceiver() {  
            @Override  
            public void onReceive(Context context, Intent intent) {  
                String action = intent.getAction();  
                // 消息广播  
                if (action.equals(BackService.MESSAGE_ACTION)) {  
                    String stringExtra = intent.getStringExtra("message");  
                    tv.setText(stringExtra);  
                } else if (action.equals(BackService.HEART_BEAT_ACTION)) {// 心跳广播  
                    tv.setText("正常心跳");  
                }  
            }  
        };  
    
        private ServiceConnection conn = new ServiceConnection() {  
            @Override  
            public void onServiceDisconnected(ComponentName name) {  
                // 未连接为空  
                iBackService = null;  
            }  
    
            @Override  
            public void onServiceConnected(ComponentName name, IBinder service) {  
                // 已连接  
                iBackService = IBackService.Stub.asInterface(service);  
            }  
        };  
    }  
    展开全文
  • Netty数据通信心跳检测

    千次阅读 2017-01-15 11:03:15
    一、数据通信 我们需要考虑的问题是两台机器使用Netty的怎么进行通信,大体上可以分为三种:  1、使用长连接通道不断开的方式进行通信,也就是客户端和服务器端的通道一直处于开启状态,客户端数量较少的情况下,...

    一、数据通信

    我们需要考虑的问题是两台机器使用Netty的怎么进行通信,大体上可以分为三种:

     1、使用长连接通道不断开的方式进行通信,也就是客户端和服务器端的通道一直处于开启状态,客户端数量较少的情况下,可以使用这种方式

    2、一次性批量提交数据,采用短连接的方式。也就是我们会把数据保存在本地临时目录下或者临时表里面,当达到临界值的时候一次批量提交数据,又或者根据定时的任务轮询提交。这种情况的弊端是做不到数据的实时性。

    3、可以使用特殊的长连接的方式,在某一段时间内,客户端和服务端没有通信则断开连接。下次连接则是客户端向服务器端发起请求的时候,再次建立连接。但是这种模式我们需要考虑2中情况

       3.1如果在超时的时候关闭通道(Netty已经提供了API),关闭通道后如何建立连接(启动一个线程重新通过ChannelFuture cf = c.getChannelFuture();连接服务器。).

    这里有7个类,分别是Client,ClientHandler、Server、ServerHandler、Request(客户端请求的实体类)、Response(服务端返回给客户端的实体类)、MarshallingCodeCFactory(Marshalling工厂)

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    
    import java.util.concurrent.TimeUnit;
    
    
    /**
     * Best Do It
     */
    public class Client {
    	
    	private static class SingletonHolder {
    		static final Client instance = new Client();
    	}
    	
    	public static Client getInstance(){
    		return SingletonHolder.instance;
    	}
    	
    	private EventLoopGroup group;
    	private Bootstrap b;
    	private ChannelFuture cf ;
    	
    	private Client(){
    			group = new NioEventLoopGroup();
    			b = new Bootstrap();
    			b.group(group)
    			 .channel(NioSocketChannel.class)
    			 .handler(new LoggingHandler(LogLevel.INFO))
    			 .handler(new ChannelInitializer<SocketChannel>() {
    					@Override
    					protected void initChannel(SocketChannel sc) throws Exception {
    						sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    						sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
    						//超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
    						sc.pipeline().addLast(new ReadTimeoutHandler(5)); //指定超时断开时间
    						sc.pipeline().addLast(new ClientHandler());
    					}
    		    });
    	}
    	
    	public void connect(){
    		try {
    			this.cf = b.connect("127.0.0.1", 8765).sync();
    			System.out.println("远程服务器已经连接, 可以进行数据交换..");				
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	public ChannelFuture getChannelFuture(){
    		
    		if(this.cf == null){
    			this.connect();
    		}
    		if(!this.cf.channel().isActive()){
    			this.connect();
    		}
    		
    		return this.cf;
    	}
    	
    	public static void main(String[] args) throws Exception{
    		final Client c = Client.getInstance();
    		//c.connect();
    		
    		ChannelFuture cf = c.getChannelFuture();
    		for(int i = 1; i <= 3; i++ ){
    			Request request = new Request();
    			request.setId("" + i);
    			request.setName("pro" + i);
    			request.setRequestMessage("数据信息" + i);
    			cf.channel().writeAndFlush(request);
    			TimeUnit.SECONDS.sleep(4);
    		}
    
    		cf.channel().closeFuture().sync();
    		
    		
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					System.out.println("进入子线程...");
    					ChannelFuture cf = c.getChannelFuture();
    					System.out.println(cf.public class Client {
    	
    	private static class SingletonHolder {
    		static final Client instance = new Client();
    	}
    	
    	public static Client getInstance(){
    		return SingletonHolder.instance;
    	}
    	
    	private EventLoopGroup group;
    	private Bootstrap b;
    	private ChannelFuture cf ;
    	
    	private Client(){
    			group = new NioEventLoopGroup();
    			b = new Bootstrap();
    			b.group(group)
    			 .channel(NioSocketChannel.class)
    			 .handler(new LoggingHandler(LogLevel.INFO))
    			 .handler(new ChannelInitializer<SocketChannel>() {
    					@Override
    					protected void initChannel(SocketChannel sc) throws Exception {
    						sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    						sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
    						//超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
    						sc.pipeline().addLast(new ReadTimeoutHandler(5)); //指定超时断开时间
    						sc.pipeline().addLast(new ClientHandler());
    					}
    		    });
    	}
    	
    	public void connect(){
    		try {
    			this.cf = b.connect("127.0.0.1", 8765).sync();
    			System.out.println("远程服务器已经连接, 可以进行数据交换..");				
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	public ChannelFuture getChannelFuture(){
    		
    		if(this.cf == null){
    			this.connect();
    		}
    		if(!this.cf.channel().isActive()){
    			this.connect();
    		}
    		
    		return this.cf;
    	}
    	
    	public static void main(String[] args) throws Exception{
    		final Client c = Client.getInstance();
    		//c.connect();
    		
    		ChannelFuture cf = c.getChannelFuture();
    		for(int i = 1; i <= 3; i++ ){
    			Request request = new Request();
    			request.setId("" + i);
    			request.setName("pro" + i);
    			request.setRequestMessage("数据信息" + i);
    			cf.channel().writeAndFlush(request);
    			TimeUnit.SECONDS.sleep(4);
    		}
    
    		cf.channel().closeFuture().sync();
    		
    		
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					System.out.println("进入子线程...");
    					ChannelFuture cf = c.getChannelFuture();
    					System.out.println(cf.channel().isActive());
    					System.out.println(cf.channel().isOpen());
    					
    					//再次发送数据
    					Request request = new Request();
    					request.setId("" + 4);
    					request.setName("pro" + 4);
    					request.setRequestMessage("数据信息" + 4);
    					cf.channel().writeAndFlush(request);					
    					cf.channel().closeFuture().sync();
    					System.out.println("子线程结束.");
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}).start();
    		
    		System.out.println("断开连接,主线程结束..");
    		
    	}
    	
    	
    	
    }
    channel().isActive());
    					System.out.println(cf.channel().isOpen());
    					
    					//再次发送数据
    					Request request = new Request();
    					request.setId("" + 4);
    					request.setName("pro" + 4);
    					request.setRequestMessage("数据信息" + 4);
    					cf.channel().writeAndFlush(request);					
    					cf.channel().closeFuture().sync();
    					System.out.println("子线程结束.");
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}).start();
    		
    		System.out.println("断开连接,主线程结束..");
    		
    	}
    	
    	
    	
    }
    


    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter{
    	
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
    	}
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try {
    			Response resp = (Response)msg;
    			System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());			
    		} finally {
    			ReferenceCountUtil.release(msg);
    		}
    	}
    
    	@Override
    	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    		
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		ctx.close();
    	}
    	
    }
    

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    
    public class Server {
    
    	public static void main(String[] args) throws Exception{
    		
    		EventLoopGroup pGroup = new NioEventLoopGroup();
    		EventLoopGroup cGroup = new NioEventLoopGroup();
    		
    		ServerBootstrap b = new ServerBootstrap();
    		b.group(pGroup, cGroup)
    		 .channel(NioServerSocketChannel.class)
    		 .option(ChannelOption.SO_BACKLOG, 1024)
    		 //设置日志
    		 .handler(new LoggingHandler(LogLevel.INFO))
    		 .childHandler(new ChannelInitializer<SocketChannel>() {
    			protected void initChannel(SocketChannel sc) throws Exception {
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
    				sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
    				sc.pipeline().addLast(new ServerHandler());
    			}
    		});
    		
    		ChannelFuture cf = b.bind(8765).sync();
    		
    		cf.channel().closeFuture().sync();
    		pGroup.shutdownGracefully();
    		cGroup.shutdownGracefully();
    		
    	}
    }
    


    package bhz.netty.runtime;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class ServerHandler extends ChannelHandlerAdapter{
    
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
    	}
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		Request request = (Request)msg;
    		System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
    		Response response = new Response();
    		response.setId(request.getId());
    		response.setName("response" + request.getId());
    		response.setResponseMessage("响应内容" + request.getId());
    		ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);
    	}
    
    	@Override
    	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    		
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		ctx.close();
    	}
    
    	
    	
    }
    



    package bhz.netty.runtime;
    
    import java.io.Serializable;
    
    public class Request implements Serializable{
    
    	private static final long  SerialVersionUID = 1L;
    	
    	private String id ;
    	private String name ;
    	private String requestMessage ;
    	
    	public String getId() {
    		return id;
    	}
    	public void setId(String id) {
    		this.id = id;
    	}
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    	public String getRequestMessage() {
    		return requestMessage;
    	}
    	public void setRequestMessage(String requestMessage) {
    		this.requestMessage = requestMessage;
    	}
    
    
    }
    

    package bhz.netty.runtime;
    
    import java.io.Serializable;
    
    public class Response implements Serializable{
    	
    	private static final long serialVersionUID = 1L;
    	
    	private String id;
    	private String name;
    	private String responseMessage;
    	
    	public String getId() {
    		return id;
    	}
    	public void setId(String id) {
    		this.id = id;
    	}
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    	public String getResponseMessage() {
    		return responseMessage;
    	}
    	public void setResponseMessage(String responseMessage) {
    		this.responseMessage = responseMessage;
    	}
    	
    
    }
    

    import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
    import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    import io.netty.handler.codec.marshalling.UnmarshallerProvider;
    
    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
     
    public final class MarshallingCodeCFactory {
    
        /**
         * 创建Jboss Marshalling解码器MarshallingDecoder
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
        	//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
    		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
    		//创建了MarshallingConfiguration对象,配置了版本号为5 
    		final MarshallingConfiguration configuration = new MarshallingConfiguration();
    		configuration.setVersion(5);
    		//根据marshallerFactory和configuration创建provider
    		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
    		//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
    		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
    		return decoder;
        }
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
    		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
    		final MarshallingConfiguration configuration = new MarshallingConfiguration();
    		configuration.setVersion(5);
    		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
    		//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
    		MarshallingEncoder encoder = new MarshallingEncoder(provider);
    		return encoder;
        }
    }
    

       3.2客户端宕机的时候,我们无需考虑,下次客户端重启的时候就可以和我们的服务端进行连接了。若是服务器宕机,我们的服务器如何与服务器进行连接呢?

       如果是服务端宕机了,如果是Spring的话可以起一个定时的job每隔一段时间去看下服务端是否健康。也可以打包一个jar,然后写一个.bat(linux下),定时的让客户端连接一下服务端。


    二、心跳检测

    一般来讲,我们去维护服务器集群,肯定有一台主服务器还应该有N台从服务器,那么我们主机需要时时刻刻知道从服务器的健康情况,然后进行实时的监控,在这个分布式架构里叫做心跳检测或者叫心跳监控,Netty可以做这样的一件事情。

    一共6个类:Client、ClienHeartBeattHandler(心跳处理类)、Server(服务端)、ServerHeartBeatHandler(服务端处理类)、

    MarshallingCodeCFactory(编解码处理工厂)、RequestInfo(客户端返回给服务端实体类)。这里还需要使用到sigar.jar文件来获取系统的信息,还是用到了newScheduledThreadPool定时的处理任务

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class Client {
    
    	
    	public static void main(String[] args) throws Exception{
    		
    		EventLoopGroup group = new NioEventLoopGroup();
    		Bootstrap b = new Bootstrap();
    		b.group(group)
    		 .channel(NioSocketChannel.class)
    		 .handler(new ChannelInitializer<SocketChannel>() {
    			@Override
    			protected void initChannel(SocketChannel sc) throws Exception {
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
    				sc.pipeline().addLast(new ClienHeartBeattHandler());
    			}
    		});
    		
    		ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
    
    		cf.channel().closeFuture().sync();
    		group.shutdownGracefully();
    	}
    }
    


    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.TimeUnit;
    
    import org.hyperic.sigar.CpuPerc;
    import org.hyperic.sigar.Mem;
    import org.hyperic.sigar.Sigar;
    import org.hyperic.sigar.Swap;
    
    
    public class ClienHeartBeattHandler extends ChannelHandlerAdapter {
    
        private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        
        private ScheduledFuture<?> heartBeat;
    	//主动向服务器发送认证信息
        private InetAddress addr ;
        
        private static final String SUCCESS_KEY = "auth_success_key";
    
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		addr = InetAddress.getLocalHost();
            String ip = addr.getHostAddress();
    		String key = "1234";
    		//证书
    		String auth = ip + "," + key;
    		ctx.writeAndFlush(auth);
    	}
    	
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        	try {
            	if(msg instanceof String){
            		String ret = (String)msg;
            		if(SUCCESS_KEY.equals(ret)){
            	    	// 握手成功,主动发送心跳消息
            	    	this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS);
            		    System.out.println(msg);    			
            		}
            		else {
            			System.out.println(msg);
            		}
            	}
    		} finally {
    			ReferenceCountUtil.release(msg);
    		}
        }
    
        private class HeartBeatTask implements Runnable {
        	private final ChannelHandlerContext ctx;
    
    		public HeartBeatTask(final ChannelHandlerContext ctx) {
    		    this.ctx = ctx;
    		}
    	
    		@Override
    		public void run() {
    			try {
    			    RequestInfo info = new RequestInfo();
    			    //ip
    			    info.setIp(addr.getHostAddress());
    		        Sigar sigar = new Sigar();
    		        //cpu prec
    		        CpuPerc cpuPerc = sigar.getCpuPerc();
    		        HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
    		        cpuPercMap.put("combined", cpuPerc.getCombined());
    		        cpuPercMap.put("user", cpuPerc.getUser());
    		        cpuPercMap.put("sys", cpuPerc.getSys());
    		        cpuPercMap.put("wait", cpuPerc.getWait());
    		        cpuPercMap.put("idle", cpuPerc.getIdle());
    		        // memory
    		        Mem mem = sigar.getMem();
    				HashMap<String, Object> memoryMap = new HashMap<String, Object>();
    				memoryMap.put("total", mem.getTotal() / 1024L);
    				memoryMap.put("used", mem.getUsed() / 1024L);
    				memoryMap.put("free", mem.getFree() / 1024L);
    				info.setCpuPercMap(cpuPercMap);
    			    info.setMemoryMap(memoryMap);
    			    ctx.writeAndFlush(info);
    			    
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    
    	    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    	    	cause.printStackTrace();
    			if (heartBeat != null) {
    			    heartBeat.cancel(true);
    			    heartBeat = null;
    			}
    			ctx.fireExceptionCaught(cause);
    	    }
    	    
    	}
    }
    


    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class Server {
    
    	public static void main(String[] args) throws Exception{
    		
    		EventLoopGroup pGroup = new NioEventLoopGroup();
    		EventLoopGroup cGroup = new NioEventLoopGroup();
    		
    		ServerBootstrap b = new ServerBootstrap();
    		b.group(pGroup, cGroup)
    		 .channel(NioServerSocketChannel.class)
    		 .option(ChannelOption.SO_BACKLOG, 1024)
    		 //设置日志
    		 .handler(new LoggingHandler(LogLevel.INFO))
    		 .childHandler(new ChannelInitializer<SocketChannel>() {
    			protected void initChannel(SocketChannel sc) throws Exception {
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
    				sc.pipeline().addLast(new ServerHeartBeatHandler());
    			}
    		});
    		
    		ChannelFuture cf = b.bind(8765).sync();
    		
    		cf.channel().closeFuture().sync();
    		pGroup.shutdownGracefully();
    		cGroup.shutdownGracefully();
    		
    	}
    }
    


    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.HashMap;
    
    public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
        
    	/** key:ip value:auth */
    	private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
    	private static final String SUCCESS_KEY = "auth_success_key";
    	
    	static {
    		AUTH_IP_MAP.put("192.168.1.200", "1234");
    	}
    	
    	private boolean auth(ChannelHandlerContext ctx, Object msg){
    			//System.out.println(msg);
    			String [] ret = ((String) msg).split(",");
    			String auth = AUTH_IP_MAP.get(ret[0]);
    			if(auth != null && auth.equals(ret[1])){
    				ctx.writeAndFlush(SUCCESS_KEY);
    				return true;
    			} else {
    				ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
    				return false;
    			}
    	}
    	
    	@Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		if(msg instanceof String){
    			auth(ctx, msg);
    		} else if (msg instanceof RequestInfo) {
    			
    			RequestInfo info = (RequestInfo) msg;
    			System.out.println("--------------------------------------------");
    			System.out.println("当前主机ip为: " + info.getIp());
    			System.out.println("当前主机cpu情况: ");
    			HashMap<String, Object> cpu = info.getCpuPercMap();
    			System.out.println("总使用率: " + cpu.get("combined"));
    			System.out.println("用户使用率: " + cpu.get("user"));
    			System.out.println("系统使用率: " + cpu.get("sys"));
    			System.out.println("等待率: " + cpu.get("wait"));
    			System.out.println("空闲率: " + cpu.get("idle"));
    			
    			System.out.println("当前主机memory情况: ");
    			HashMap<String, Object> memory = info.getMemoryMap();
    			System.out.println("内存总量: " + memory.get("total"));
    			System.out.println("当前内存使用量: " + memory.get("used"));
    			System.out.println("当前内存剩余量: " + memory.get("free"));
    			System.out.println("--------------------------------------------");
    			
    			ctx.writeAndFlush("info received!");
    		} else {
    			ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
    		}
        }
    
    
    }
    

    import java.io.Serializable;
    import java.util.HashMap;
    
    public class RequestInfo implements Serializable {
    
    	private String ip ;
    	private HashMap<String, Object> cpuPercMap ;
    	private HashMap<String, Object> memoryMap;
    	//.. other field
    	
    	public String getIp() {
    		return ip;
    	}
    	public void setIp(String ip) {
    		this.ip = ip;
    	}
    	public HashMap<String, Object> getCpuPercMap() {
    		return cpuPercMap;
    	}
    	public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
    		this.cpuPercMap = cpuPercMap;
    	}
    	public HashMap<String, Object> getMemoryMap() {
    		return memoryMap;
    	}
    	public void setMemoryMap(HashMap<String, Object> memoryMap) {
    		this.memoryMap = memoryMap;
    	}
    	
    	
    }
    

    服务端:

    当前主机ip为: 192.168.1.100
    当前主机cpu情况: 
    总使用率: 0.21943887775551102
    用户使用率: 0.08667334669338678
    系统使用率: 0.13276553106212424
    等待率: 0.0
    空闲率: 0.7805611222444889
    当前主机memory情况: 
    内存总量: 12525296
    当前内存使用量: 5393304
    当前内存剩余量: 7131992






    展开全文
  • Android_socket通信加长连接(有心跳检测)都有注释,代码简洁
  • 问题1:我们想实现客户端和服务端建立连接之后,5秒钟之后如果没有数据传输就关闭与客户端的连接。 解决办法:在服务端加上下面一条代码 ch.pipeline().addLast(new ReadTimeoutHandler(5));...

    问题1:我们想实现客户端和服务端建立连接之后,5秒钟之后如果没有数据传输就关闭与客户端的连接。

    解决办法:在服务端加上下面一条代码

    ch.pipeline().addLast(new ReadTimeoutHandler(5)); //netty自带的超时类

     

    转载于:https://www.cnblogs.com/coder-lzh/p/9471410.html

    展开全文
  • 数据通信 概述: netty的ReadTimeOut实现方案3 服务端: public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup();....

    数据通信

    概述:

    netty的ReadTimeOut实现方案3

    服务端:

    public class Server {
    
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 1024)
             //设置日志
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
            
            ChannelFuture cf = b.bind(8765).sync();
            
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
            
        }
    }
    View Code

    主要是加入sc.pipeline().addLast(new ReadTimeoutHandler(5)); 

    客户端:

    public class Client {
    
        private static class SingletonHolder {
            static final Client instance = new Client();
        }
    
        public static Client getInstance() {
            return SingletonHolder.instance;
        }
    
        private EventLoopGroup group;
        private Bootstrap b;
        private ChannelFuture cf;
    
        private Client() {
            group = new NioEventLoopGroup();
            b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            // 超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
                            sc.pipeline().addLast(new ReadTimeoutHandler(5));
                            sc.pipeline().addLast(new ClientHandler());
                        }
                    });
        }
    
        public void connect() {
            try {
                this.cf = b.connect("127.0.0.1", 8765).sync();
                System.out.println("远程服务器已经连接, 可以进行数据交换..");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public ChannelFuture getChannelFuture() {
    
            if (this.cf == null) {
                this.connect();
            }
            if (!this.cf.channel().isActive()) {
                this.connect();
            }
    
            return this.cf;
        }
    
        public static void main(String[] args) throws Exception {
            final Client c = Client.getInstance();
            // c.connect();
    
            ChannelFuture cf = c.getChannelFuture();
            for (int i = 1; i <= 3; i++) {
                Request request = new Request();
                request.setId("" + i);
                request.setName("pro" + i);
                request.setRequestMessage("数据信息" + i);
                cf.channel().writeAndFlush(request);
                TimeUnit.SECONDS.sleep(4);
            }
    
            cf.channel().closeFuture().sync();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("进入子线程...");
                        ChannelFuture cf = c.getChannelFuture();
                        System.out.println(cf.channel().isActive());
                        System.out.println(cf.channel().isOpen());
    
                        // 再次发送数据
                        Request request = new Request();
                        request.setId("" + 4);
                        request.setName("pro" + 4);
                        request.setRequestMessage("数据信息" + 4);
                        cf.channel().writeAndFlush(request);
                        cf.channel().closeFuture().sync();
                        System.out.println("子线程结束.");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            System.out.println("断开连接,主线程结束..");
    
        }
    
    }
    View Code

    主要看getChannelFuture这个方法,this.cf == null是第一次连接的时候用到的,!this.cf.channel().isActive() 是连接超时后重新发起连接用到的。

    其他的代码:

    public class ClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                Response resp = (Response) msg;
                System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
    
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Request request = (Request) msg;
            System.out
                    .println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
            Response response = new Response();
            response.setId(request.getId());
            response.setName("response" + request.getId());
            response.setResponseMessage("响应内容" + request.getId());
            ctx.writeAndFlush(response);// .addListener(ChannelFutureListener.CLOSE);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
    
    
    public final class MarshallingCodeCFactory {
    
        /**
         * 创建Jboss Marshalling解码器MarshallingDecoder
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
            //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            //创建了MarshallingConfiguration对象,配置了版本号为5 
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            //根据marshallerFactory和configuration创建provider
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
            return decoder;
        }
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    }
    
    
    public class Request implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        private String id;
        private String name;
        private String requestMessage;
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getRequestMessage() {
            return requestMessage;
        }
    
        public void setRequestMessage(String requestMessage) {
            this.requestMessage = requestMessage;
        }
    
    }
    
    
    public class Response implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        private String id;
        private String name;
        private String responseMessage;
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getResponseMessage() {
            return responseMessage;
        }
    
        public void setResponseMessage(String responseMessage) {
            this.responseMessage = responseMessage;
        }
    
    }
    View Code

     心跳检测

    概述:

    代码示例:

    public class Server {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
    
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    // 设置日志
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            sc.pipeline().addLast(new ServerHeartBeatHandler());
                        }
                    });
    
            ChannelFuture cf = b.bind(8765).sync();
    
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
    
        }
    }
    
    
    public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
    
        /** key:ip value:auth */
        private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
        private static final String SUCCESS_KEY = "auth_success_key";
    
        static {
            AUTH_IP_MAP.put("192.168.1.200", "1234");
        }
    
        private boolean auth(ChannelHandlerContext ctx, Object msg) {
            // System.out.println(msg);
            String[] ret = ((String) msg).split(",");
            String auth = AUTH_IP_MAP.get(ret[0]);
            if (auth != null && auth.equals(ret[1])) {
                ctx.writeAndFlush(SUCCESS_KEY);
                return true;
            } else {
                ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
                return false;
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof String) {
                auth(ctx, msg);
            } else if (msg instanceof RequestInfo) {
    
                RequestInfo info = (RequestInfo) msg;
                System.out.println("--------------------------------------------");
                System.out.println("当前主机ip为: " + info.getIp());
                System.out.println("当前主机cpu情况: ");
                HashMap<String, Object> cpu = info.getCpuPercMap();
                System.out.println("总使用率: " + cpu.get("combined"));
                System.out.println("用户使用率: " + cpu.get("user"));
                System.out.println("系统使用率: " + cpu.get("sys"));
                System.out.println("等待率: " + cpu.get("wait"));
                System.out.println("空闲率: " + cpu.get("idle"));
    
                System.out.println("当前主机memory情况: ");
                HashMap<String, Object> memory = info.getMemoryMap();
                System.out.println("内存总量: " + memory.get("total"));
                System.out.println("当前内存使用量: " + memory.get("used"));
                System.out.println("当前内存剩余量: " + memory.get("free"));
                System.out.println("--------------------------------------------");
    
                ctx.writeAndFlush("info received!");
            } else {
                ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
            }
        }
    
    }
    
    
    public class Client {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ClienHeartBeattHandler());
                }
            });
    
            ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
    
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    
    
    public class ClienHeartBeattHandler extends ChannelHandlerAdapter {
    
        private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
        private ScheduledFuture<?> heartBeat;
        // 主动向服务器发送认证信息
        private InetAddress addr;
    
        private static final String SUCCESS_KEY = "auth_success_key";
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            addr = InetAddress.getLocalHost();
            String ip = addr.getHostAddress();
            String key = "1234";
            // 证书
            String auth = ip + "," + key;
            ctx.writeAndFlush(auth);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                if (msg instanceof String) {
                    String ret = (String) msg;
                    if (SUCCESS_KEY.equals(ret)) {
                        // 握手成功,主动发送心跳消息
                        this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2,
                                TimeUnit.SECONDS);
                        System.out.println(msg);
                    } else {
                        System.out.println(msg);
                    }
                }
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        private class HeartBeatTask implements Runnable {
            private final ChannelHandlerContext ctx;
    
            public HeartBeatTask(final ChannelHandlerContext ctx) {
                this.ctx = ctx;
            }
    
            @Override
            public void run() {
                try {
                    RequestInfo info = new RequestInfo();
                    // ip
                    info.setIp(addr.getHostAddress());
                    Sigar sigar = new Sigar();
                    // cpu prec
                    CpuPerc cpuPerc = sigar.getCpuPerc();
                    HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
                    cpuPercMap.put("combined", cpuPerc.getCombined());
                    cpuPercMap.put("user", cpuPerc.getUser());
                    cpuPercMap.put("sys", cpuPerc.getSys());
                    cpuPercMap.put("wait", cpuPerc.getWait());
                    cpuPercMap.put("idle", cpuPerc.getIdle());
                    // memory
                    Mem mem = sigar.getMem();
                    HashMap<String, Object> memoryMap = new HashMap<String, Object>();
                    memoryMap.put("total", mem.getTotal() / 1024L);
                    memoryMap.put("used", mem.getUsed() / 1024L);
                    memoryMap.put("free", mem.getFree() / 1024L);
                    info.setCpuPercMap(cpuPercMap);
                    info.setMemoryMap(memoryMap);
                    ctx.writeAndFlush(info);
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                cause.printStackTrace();
                if (heartBeat != null) {
                    heartBeat.cancel(true);
                    heartBeat = null;
                }
                ctx.fireExceptionCaught(cause);
            }
    
        }
    }
    
    
    public final class MarshallingCodeCFactory {
    
        /**
         * 创建Jboss Marshalling解码器MarshallingDecoder
         * 
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
            // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            // 创建了MarshallingConfiguration对象,配置了版本号为5
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            // 根据marshallerFactory和configuration创建provider
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
            return decoder;
        }
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         * 
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    }
    
    
    public class RequestInfo implements Serializable {
    
        private String ip;
        private HashMap<String, Object> cpuPercMap;
        private HashMap<String, Object> memoryMap;
        // .. other field
    
        public String getIp() {
            return ip;
        }
    
        public void setIp(String ip) {
            this.ip = ip;
        }
    
        public HashMap<String, Object> getCpuPercMap() {
            return cpuPercMap;
        }
    
        public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
            this.cpuPercMap = cpuPercMap;
        }
    
        public HashMap<String, Object> getMemoryMap() {
            return memoryMap;
        }
    
        public void setMemoryMap(HashMap<String, Object> memoryMap) {
            this.memoryMap = memoryMap;
        }
    
    }
    View Code

    当client刚刚连接的时候,会发送认证信息到server端认证,认证通过后再定时发送心跳包。

    转载于:https://www.cnblogs.com/lostyears/p/8482450.html

    展开全文
  • 1、Netty数据通信的场景 1.1在实际场景中,我们使用Netty进行通信大致有以下3种方式: 第一种,使用长连接通道不断开的形式进行通信,也就是服务器和客户端的通道一直处于开启的状态。如果服务器性能足够好,并且...
  • 在Android开发中,我们可能需要和服务器保持连接不断开,这时需要用到socket通信的长连接,并且定时发送消息检测是否是连接状态——心跳检测。  我们需要一个客户端和一个服务器端的demo,现在我就贴出重要代码,...
  • 目的及介绍 Netty是一款类似于Tomcat的服务器,它更关注网络编程,相对来说网络通信性能更高。...模仿一个服务器端检测客服端心跳的机制,场景:在长链接情况下,服务器端需要通过心跳机制来确认客服端存活
  • 本文基于networkcomms...在网络通信程序中,心跳检测是必不可少的,我们来看一下networkcomms中是如何实现的 以networkcomms2.3.1为例: 在服务器端,会有一个线程专门用来发送心跳消息 代码如下: protect...
  • WCF 双工通信 脱机检测、自动重连、心跳包 演示代码 通过客户端定时给服务器心跳检测在线。 用户登录后,服务器定期发送数据到客户。服务器脱机后客户端自动检测并自动登录。 服务端监控客户端是否离线,心跳...
  • 心跳检测

    2019-10-05 23:05:59
    听群友说,心跳检测包理论上只能自己检测,这是为啥呢? 之前读《大型网站架构技术》这本书里面也提到,那到底什么是怎么一回事呢? 轮询 概括来说是服务端定时主动的去与要监控状态的客户端(或者叫其他系统)通信...
  • JAVA NETTY 心跳检测

    2021-01-26 18:58:07
    使用Socket通信一般经常用来处理多个服务器之间的心跳检测,一般来讲去维护服务器集群,肯定有一台或几台服务器主机Master,还应该有n台Slave。Master常常需要知道自己下面从服务器的各方面情况,进行实时监控,这在...
  • Netty 心跳检测

    2021-02-23 21:18:08
    1. 前言 ...了解 TCP:TCP 协议适用于客户端数量相对比较少,并且通信频繁的业务场景;Http 协议则适用于客户端数量比较大的业务场景。因为 Http 是短连接,请求完成即会释放连接资源,不再占用服务器..

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 404
精华内容 161
关键字:

通信心跳检测