精华内容
下载资源
问答
  • 主要介绍了JAVA实现长连接(含心跳检测Demo),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • Socket心跳连接_java

    2016-02-20 21:38:36
    主要是关于java tcp长连接,心跳连接源码。
  • Java心跳包功能TCP实现

    2017-11-07 11:37:16
    Java心跳包功能的实现,tcp协议的,包含客户端和服务端
  • websocket 心跳连接

    2019-04-20 14:39:38
    前端实现 websocket心跳连接 支持心跳检测和心跳重连
  • Socket长连接、通信、心跳包、消息回调、Java服务端
  • java写的websocket客户端,包含心跳监测。 此架包可解决问题:系统使用websocket 访问远程上的实时数据,但是有时候会停止更新实时数据,只要重启了自己的系统,就会继续更新数据了,此包可以完美解决此问题。
  • java心跳机制

    千次阅读 2017-12-10 02:21:29
    心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。 什么是心跳机制? 就是每隔几分钟发送一个固定信息给服务端,服务端收到后回复一个固定信息如果服务端几分钟内...

    另外学习的文章:http://blog.csdn.net/Ro_bot/article/details/51620064
    心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。
    什么是心跳机制?
    就是每隔几分钟发送一个固定信息给服务端,服务端收到后回复一个固定信息如果服务端几分钟内没有收到客户端信息则视客户端断开。

    发包方:可以是客户也可以是服务端,看哪边实现方便合理。 心跳包之所以叫心跳包是因为:它像心跳一样每隔固定时间发一次,以此来告诉服务器,这个客户端还活着。事实上这是为了保持长连接,至于这个包的内容,是没有什么特别规定的,不过一般都是很小的包,或者只包含包头的一个空包。心跳包主要也就是用于长连接的保活和断线处理。一般的应用下,判定时间在30-40秒比较不错。如果实在要求高,那就在6-9秒。

    心跳包的发送,通常有两种技术:

    1.应用层自己实现的心跳包

    Client启动一个定时器,不断发心跳;
    Server收到心跳后,给个回应;
    Server启动一个定时器,判断Client是否存在,判断方法这里列两种:
    时间差和简单标志。

    时间差策略
    收到一个心跳后,记录当前时间(记为recvedTime)。
    判断定时器时间到达,计算多久没收到心跳的时间(T)=当前时间 - recvedTime(上面记录的时间)。如果T大于某个设定值,就可以认为Client超时了。
    简单标志
    收到一个心跳后,设置连接标志为true;
    判断定时器时间到达,查看所有的标志,false的,认为对端超时了;true的将其设成false。

    上面这种方法比上面简单一些,但检测某个Client是否离线的误差有点大。

    2.使用SO_KEEPALIVE套接字选项

    在TCP的机制里面,本身是存在有心跳包的机制的,也就是TCP的选项. 不论是服务端还是客户端,一方开启KeepAlive功能后,就会自动在规定时间内向对方发送心跳包, 而另一方在收到心跳包后就会自动回复,以告诉对方我仍然在线。因为开启KeepAlive功能需要消耗额外的宽带和流量,所以TCP协议层默认并不开启默认的KeepAlive超时需要7,200,000 MilliSeconds, 即2小时,探测次数为5次。对于很多服务端应用程序来说,2小时的空闲时间太长。因此,我们需要手工开启KeepAlive功能并设置合理的KeepAlive参数

    开启KeepAlive选项后会导致的三种情况:

    • 对方接收一切正常:以期望的ACK响应,2小时后,TCP将发出另一个探测分节
    • 对方已崩溃且已重新启动:以RST响应。套接口的待处理错误被置为ECONNRESET,套接口本身则被关闭。
    • 对方无任何响应:套接口的待处理错误被置为ETIMEOUT,套接口本身则被关闭.

    有关SO_KEEPALIVE的三个参数:
    (1).tcp_keepalive_intvl,保活探测消息的发送频率。默认值为75s。
    (2).tcp_keepalive_probes,TCP发送保活探测消息以确定连接是否已断开的次数。默认值为9(次)。
    发送频率tcp_keepalive_intvl乘以发送次数tcp_keepalive_probes,就得到了从开始探测直到放弃探测确定连接断开的时间,大约为11min。
    (3).tcp_keepalive_time,在TCP保活打开的情况下,最后一次数据交换到TCP发送第一个保活探测消息的时间,即允许的持续空闲时间。默认值为7200s(2h)

    实现原理:

    长连接的维持,是要客户端程序,定时向服务端程序,发送一个维持连接包的。
    如果,长时间未发送维持连接包,服务端程序将断开连接。

    客户端:

       Client通过持有Socket的对象,可以随时(使用sendObject方法)发送Massage Object(消息)给服务端。
       如果keepAliveDelay毫秒(程序中是2秒)内未发送任何数据,则自动发送一个KeepAlive Object(心跳)给服务端,用于维持连接。
       由于,我们向服务端,可以发送很多不同的消息对象,服务端也可以返回不同的对象。所以,对于返回对象的处理,要编写具体的ObjectAction实现类进行处理。通过Client.addActionMap方法进行添加。这样,程序会回调处理。
    

    服务端:

        由于客户端会定时(keepAliveDelay毫秒)发送维持连接的信息过来,所以,服务端要有一个检测机制。
        即当服务端receiveTimeDelay毫秒(程序中是3秒)内未接收任何数据,则自动断开与客户端的连接。
         ActionMapping的原理与客户端相似(相同)。
         通过添加相应的ObjectAction实现类,可以实现不同对象的响应、应答过程。
    

    Demo:

    package socket.keepalive.test;
    import java.io.Serializable;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    /**
     * 维持连接的消息对象(心跳对象)
     */
    public class KeepAlive implements Serializable{
        private static final long serialVersionUID = -2813120366138988480L;
    
        /* 覆盖该方法,仅用于测试使用。
         * @see java.lang.Object#toString()
         */
        @Override
        public String toString() {
            return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"\t维持连接包";
        }
    }
    package socket.keepalive.test;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.Socket;
    import java.net.UnknownHostException;
    import java.util.Collections;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     *  C/S架构的客户端对象,持有该对象,可以随时向服务端发送消息。
     * <p>
     * 创建时间:2010-7-18 上午12:17:25
     * @author HouLei
     * @since 1.0
     */
    public class Client {
    
        /**
         * 处理服务端发回的对象,可实现该接口。
         */
        public static interface ObjectAction{
            void doAction(Object obj,Client client);
        }
    
        public static final class DefaultObjectAction implements ObjectAction{
            public void doAction(Object obj,Client client) {
                System.out.println("处理:\t"+obj.toString());
            }
        }
    
    
        public static void main(String[] args) throws UnknownHostException, IOException {
            String serverIp = "127.0.0.1";
            int port = 65432;
            Client client = new Client(serverIp,port);
            client.start();
        }
    
        private String serverIp;
        private int port;
        private Socket socket;
        private boolean running=false; //连接状态
    
        private long lastSendTime; //最后一次发送数据的时间
    
        //用于保存接收消息对象类型及该类型消息处理的对象
        private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class,ObjectAction>();
    
        public Client(String serverIp, int port) {
            this.serverIp=serverIp;
            this.port=port;
        }
    
        public void start() throws UnknownHostException, IOException {
            if(running)return;
            socket = new Socket(serverIp,port);
            System.out.println("本地端口:"+socket.getLocalPort());
            lastSendTime=System.currentTimeMillis();
            running=true;
            new Thread(new KeepAliveWatchDog()).start();  //保持长连接的线程,每隔2秒项服务器发一个一个保持连接的心跳消息
            new Thread(new ReceiveWatchDog()).start();    //接受消息的线程,处理消息
        }
    
        public void stop(){
            if(running)running=false;
        }
    
        /**
         * 添加接收对象的处理对象。
         * @param cls 待处理的对象,其所属的类。
         * @param action 处理过程对象。
         */
        public void addActionMap(Class<Object> cls,ObjectAction action){
            actionMapping.put(cls, action);
        }
    
        public void sendObject(Object obj) throws IOException {
            ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
            oos.writeObject(obj);
            System.out.println("发送:\t"+obj);
            oos.flush();
        }
    
        class KeepAliveWatchDog implements Runnable{
            long checkDelay = 10;
            long keepAliveDelay = 2000;
            public void run() {
                while(running){
                    if(System.currentTimeMillis()-lastSendTime>keepAliveDelay){
                        try {
                            Client.this.sendObject(new KeepAlive());
                        } catch (IOException e) {
                            e.printStackTrace();
                            Client.this.stop();
                        }
                        lastSendTime = System.currentTimeMillis();
                    }else{
                        try {
                            Thread.sleep(checkDelay);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            Client.this.stop();
                        }
                    }
                }
            }
        }
    
        class ReceiveWatchDog implements Runnable{
            public void run() {
                while(running){
                    try {
                        InputStream in = socket.getInputStream();
                        if(in.available()>0){
                            ObjectInputStream ois = new ObjectInputStream(in);
                            Object obj = ois.readObject();
                            System.out.println("接收:\t"+obj);
                            ObjectAction oa = actionMapping.get(obj.getClass());
                            oa = oa==null?new DefaultObjectAction():oa;
                            oa.doAction(obj, Client.this);
                        }else{
                            Thread.sleep(10);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        Client.this.stop();
                    } 
                }
            }
        }
    
    }
    package socket.keepalive.test;
    
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * C/S架构的服务端对象。
     * <p>
     * 创建时间:2010-7-18 上午12:17:37
     * @author HouLei
     * @since 1.0
     */
    public class Server {
    
        /**
         * 要处理客户端发来的对象,并返回一个对象,可实现该接口。
         */
        public interface ObjectAction{
            Object doAction(Object rev, Server server);
        }
    
        public static final class DefaultObjectAction implements ObjectAction{
            public Object doAction(Object rev,Server server) {
                System.out.println("处理并返回:"+rev);
                return rev;
            }
        }
    
        public static void main(String[] args) {
            int port = 65432;
            Server server = new Server(port);
            server.start();
        }
    
        private int port;
        private volatile boolean running=false;
        private long receiveTimeDelay=3000;
        private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class,ObjectAction>();
        private Thread connWatchDog;
    
        public Server(int port) {
            this.port = port;
        }
    
        public void start(){
            if(running)return;
            running=true;
            connWatchDog = new Thread(new ConnWatchDog());
            connWatchDog.start();
        }
    
        @SuppressWarnings("deprecation")
        public void stop(){
            if(running)running=false;
            if(connWatchDog!=null)connWatchDog.stop();
        }
    
        public void addActionMap(Class<Object> cls,ObjectAction action){
            actionMapping.put(cls, action);
        }
    
        class ConnWatchDog implements Runnable{
            public void run(){
                try {
                    ServerSocket ss = new ServerSocket(port,5);
                    while(running){
                        Socket s = ss.accept();
                        new Thread(new SocketAction(s)).start();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    Server.this.stop();
                }
    
            }
        }
    
        class SocketAction implements Runnable{
            Socket s;
            boolean run=true;
            long lastReceiveTime = System.currentTimeMillis();
            public SocketAction(Socket s) {
                this.s = s;
            }
            public void run() {
                while(running && run){
                    if(System.currentTimeMillis()-lastReceiveTime>receiveTimeDelay){
                        overThis();
                    }else{
                        try {
                            InputStream in = s.getInputStream();
                            if(in.available()>0){
                                ObjectInputStream ois = new ObjectInputStream(in);
                                Object obj = ois.readObject();
                                lastReceiveTime = System.currentTimeMillis();
                                System.out.println("接收:\t"+obj);
                                ObjectAction oa = actionMapping.get(obj.getClass());
                                oa = oa==null?new DefaultObjectAction():oa;
                                Object out = oa.doAction(obj,Server.this);
                                if(out!=null){
                                    ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
                                    oos.writeObject(out);
                                    oos.flush();
                                }
                            }else{
                                Thread.sleep(10);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            overThis();
                        } 
                    }
                }
            }
    
            private void overThis() {
                if(run)run=false;
                if(s!=null){
                    try {
                        s.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("关闭:"+s.getRemoteSocketAddress());
            }
    
        }
    
    }
    
    
    展开全文
  • JAVA实现长连接(含心跳检测)Demo

    万次阅读 2017-01-12 15:25:19
     长连接的维持,是要客户端程序,定时向服务端程序,发送一个维持连接包的。  如果,长时间未发送维持连接包,服务端程序将断开连接。 客户端:  Client通过持有Socket的对象,可以随时(使用sendObject方法...
    实现原理: 
    

           长连接的维持,是要客户端程序,定时向服务端程序,发送一个维持连接包的。
           如果,长时间未发送维持连接包,服务端程序将断开连接。

    客户端:
           Client通过持有Socket的对象,可以随时(使用sendObject方法)发送Massage Object(消息)给服务端。
           如果keepAliveDelay毫秒(程序中是2秒)内未发送任何数据,则自动发送一个KeepAlive Object(心跳)给服务端,用于维持连接。
           由于,我们向服务端,可以发送很多不同的消息对象,服务端也可以返回不同的对象所以,对于返回对象的处理,要编写具体的ObjectAction实现类进行处理。通过Client.addActionMap方法进行添加。这样,程序会回调处理。

    服务端:
            由于客户端会定时(keepAliveDelay毫秒)发送维持连接的信息过来,所以,服务端要有一个检测机制。
            即当服务端receiveTimeDelay毫秒(程序中是3秒)内未接收任何数据,则自动断开与客户端的连接。
             ActionMapping的原理与客户端相似(相同)。

             通过添加相应的ObjectAction实现类,可以实现不同对象的响应、应答过程。

    Demo:

    package socket.keepalive.test;
    
    
    import java.io.Serializable;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * 
     * 维持连接的消息对象(心跳对象)
     */
    public class KeepAlive implements Serializable{
    
    	private static final long serialVersionUID = -2813120366138988480L;
    
    	/* 覆盖该方法,仅用于测试使用。
    	 * @see java.lang.Object#toString()
    	 */
    	@Override
    	public String toString() {
    		return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"\t维持连接包";
    	}
    
    }
    
    

     

    package socket.keepalive.test;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.Socket;
    import java.net.UnknownHostException;
    import java.util.Collections;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     *	C/S架构的客户端对象,持有该对象,可以随时向服务端发送消息。
     * <p>
     * 创建时间:2010-7-18 上午12:17:25
     * @author HouLei
     * @since 1.0
     */
    public class Client {
    
    	/**
    	 * 处理服务端发回的对象,可实现该接口。
    	 */
    	public static interface ObjectAction{
    		void doAction(Object obj,Client client);
    	}
    	
    	public static final class DefaultObjectAction implements ObjectAction{
    		public void doAction(Object obj,Client client) {
    			System.out.println("处理:\t"+obj.toString());
    		}
    	}
    	
    	
    	public static void main(String[] args) throws UnknownHostException, IOException {
    		String serverIp = "127.0.0.1";
    		int port = 65432;
    		Client client = new Client(serverIp,port);
    		client.start();
    	}
    	
    	private String serverIp;
    	private int port;
    	private Socket socket;
    	private boolean running=false; //连接状态
    	
    	private long lastSendTime; //最后一次发送数据的时间
    	
    	//用于保存接收消息对象类型及该类型消息处理的对象
    	private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class,ObjectAction>();
    	
    	public Client(String serverIp, int port) {
    		this.serverIp=serverIp;
    		this.port=port;
    	}
    	
    	public void start() throws UnknownHostException, IOException {
    		if(running)return;
    		socket = new Socket(serverIp,port);
    		System.out.println("本地端口:"+socket.getLocalPort());
    		lastSendTime=System.currentTimeMillis();
    		running=true;
    		new Thread(new KeepAliveWatchDog()).start();  //保持长连接的线程,每隔2秒项服务器发一个一个保持连接的心跳消息
    		new Thread(new ReceiveWatchDog()).start();    //接受消息的线程,处理消息
    	}
    	
    	public void stop(){
    		if(running)running=false;
    	}
    	
    	/**
    	 * 添加接收对象的处理对象。
    	 * @param cls 待处理的对象,其所属的类。
    	 * @param action 处理过程对象。
    	 */
    	public void addActionMap(Class<Object> cls,ObjectAction action){
    		actionMapping.put(cls, action);
    	}
    
    	public void sendObject(Object obj) throws IOException {
    		ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
    		oos.writeObject(obj);
    		System.out.println("发送:\t"+obj);
    		oos.flush();
    	}
    	
    	class KeepAliveWatchDog implements Runnable{
    		long checkDelay = 10;
    		long keepAliveDelay = 2000;
    		public void run() {
    			while(running){
    				if(System.currentTimeMillis()-lastSendTime>keepAliveDelay){
    					try {
    						Client.this.sendObject(new KeepAlive());
    					} catch (IOException e) {
    						e.printStackTrace();
    						Client.this.stop();
    					}
    					lastSendTime = System.currentTimeMillis();
    				}else{
    					try {
    						Thread.sleep(checkDelay);
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    						Client.this.stop();
    					}
    				}
    			}
    		}
    	}
    	
    	class ReceiveWatchDog implements Runnable{
    		public void run() {
    			while(running){
    				try {
    					InputStream in = socket.getInputStream();
    					if(in.available()>0){
    						ObjectInputStream ois = new ObjectInputStream(in);
    						Object obj = ois.readObject();
    						System.out.println("接收:\t"+obj);
    						ObjectAction oa = actionMapping.get(obj.getClass());
    						oa = oa==null?new DefaultObjectAction():oa;
    						oa.doAction(obj, Client.this);
    					}else{
    						Thread.sleep(10);
    					}
    				} catch (Exception e) {
    					e.printStackTrace();
    					Client.this.stop();
    				} 
    			}
    		}
    	}
    	
    }
    
    


     

    package socket.keepalive.test;
    
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * C/S架构的服务端对象。
     * <p>
     * 创建时间:2010-7-18 上午12:17:37
     * @author HouLei
     * @since 1.0
     */
    public class Server {
    
    	/**
    	 * 要处理客户端发来的对象,并返回一个对象,可实现该接口。
    	 */
    	public interface ObjectAction{
    		Object doAction(Object rev, Server server);
    	}
    	
    	public static final class DefaultObjectAction implements ObjectAction{
    		public Object doAction(Object rev,Server server) {
    			System.out.println("处理并返回:"+rev);
    			return rev;
    		}
    	}
    	
    	public static void main(String[] args) {
    		int port = 65432;
    		Server server = new Server(port);
    		server.start();
    	}
    	
    	private int port;
    	private volatile boolean running=false;
    	private long receiveTimeDelay=3000;
    	private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class,ObjectAction>();
    	private Thread connWatchDog;
    	
    	public Server(int port) {
    		this.port = port;
    	}
    
    	public void start(){
    		if(running)return;
    		running=true;
    		connWatchDog = new Thread(new ConnWatchDog());
    		connWatchDog.start();
    	}
    	
    	@SuppressWarnings("deprecation")
    	public void stop(){
    		if(running)running=false;
    		if(connWatchDog!=null)connWatchDog.stop();
    	}
    	
    	public void addActionMap(Class<Object> cls,ObjectAction action){
    		actionMapping.put(cls, action);
    	}
    	
    	class ConnWatchDog implements Runnable{
    		public void run(){
    			try {
    				ServerSocket ss = new ServerSocket(port,5);
    				while(running){
    					Socket s = ss.accept();
    					new Thread(new SocketAction(s)).start();
    				}
    			} catch (IOException e) {
    				e.printStackTrace();
    				Server.this.stop();
    			}
    			
    		}
    	}
    	
    	class SocketAction implements Runnable{
    		Socket s;
    		boolean run=true;
    		long lastReceiveTime = System.currentTimeMillis();
    		public SocketAction(Socket s) {
    			this.s = s;
    		}
    		public void run() {
    			while(running && run){
    				if(System.currentTimeMillis()-lastReceiveTime>receiveTimeDelay){
    					overThis();
    				}else{
    					try {
    						InputStream in = s.getInputStream();
    						if(in.available()>0){
    							ObjectInputStream ois = new ObjectInputStream(in);
    							Object obj = ois.readObject();
    							lastReceiveTime = System.currentTimeMillis();
    							System.out.println("接收:\t"+obj);
    							ObjectAction oa = actionMapping.get(obj.getClass());
    							oa = oa==null?new DefaultObjectAction():oa;
    							Object out = oa.doAction(obj,Server.this);
    							if(out!=null){
    								ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
    								oos.writeObject(out);
    								oos.flush();
    							}
    						}else{
    							Thread.sleep(10);
    						}
    					} catch (Exception e) {
    						e.printStackTrace();
    						overThis();
    					} 
    				}
    			}
    		}
    		
    		private void overThis() {
    			if(run)run=false;
    			if(s!=null){
    				try {
    					s.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    			System.out.println("关闭:"+s.getRemoteSocketAddress());
    		}
    		
    	}
    	
    }
    
    


     


     

     

     

    展开全文
  • java socket 长连接 心跳

    千次阅读 2018-06-02 16:19:01
    首先先说说心跳包在socket连接中的意义: 通过socket连接的双方为了保证在一段时间未发消息不被防火墙断开连接或者使对方及时知道自己是否已经断线而定期给对方发送的某些特殊标识字符,这个字符可以根据双方自定义...

    首先先说说心跳包在socket连接中的意义:
    通过socket连接的双方为了保证在一段时间未发消息不被防火墙断开连接或者使对方及时知道自己是否已经断线而定期给对方发送的某些特殊标识字符,这个字符可以根据双方自定义,没有实际的通讯意义。
    而定制的时间也是双方协商后定制的。

    首先设置socket的一些属性:

    //表示底层的TCP 实现会监视该连接是否有效。默认值为 false, 表示TCP 不会监视连接是否有效, 不活动的客户端可能会永远存在下去, 而不会注意到服务器已经崩溃。(当连接处于空闲状态(连接的两端没有互相传送数据) 超过了 2 小时时, 本地的TCP 实现会发送一个数据包给远程的 Socket. 如果远程Socket 没有发回响应, TCP实现就会持续尝试 11 分钟, 直到接收到响应为止. 如果在 12 分钟内未收到响应, TCP 实现就会自动关闭本地Socket, 断开连接. 在不同的网络平台上, TCP实现尝试与远程Socket 对话的时限有所差别.)
      if(!socket.getKeepAlive()) socket.setKeepAlive(true);   
    //默认值为 false。为 true 时, 表示支持发送一个字节的 TCP 紧急数据. Socket 类的 sendUrgentData(int data) 方法用于发送一个字节的 TCP紧急数据。 为 false的这种情况下, 当接收方收到紧急数据时不作任何处理, 直接将其丢弃. 如果用户希望发送紧急数据, 应该把 OOBINLINE 设为 trueif(!socket.getOOBInline())socket.setOOBInline(true);

    心跳包的写法:

        private Thread thread;//循环发送心跳包的线程
        private Socket socket;//与服务器建立连接的socket对象
        private OutputStream outputStream;//输出流,用于发送心跳
    
        public void startThreadSocket() {
            try {
                if(!socket.getKeepAlive()) socket.setKeepAlive(true);//true,若长时间没有连接则断开
                if(!socket.getOOBInline())socket.setOOBInline(true);//true,允许发送紧急数据,不做处理
                outputStream = socket.getOutputStream();//获得socket的输出流
                final String socketContent = "[这里为与服务器协商的特殊字符串,用于识别是发送的心跳信息]" + "\n";
                thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        while (true) {
                            try {
                                Thread.sleep(20 * 1000);//20s发送一次心跳
                                outputStream.write(socketContent.getBytes("UTF-8"));
                                outputStream.flush();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
                thread.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    心跳包的写法很简单,和服务器商定特殊字符串,接收后不做处理。定时发送,然后设置一下socket的属性就行了。

    关于socket的其他属性设置:

    TCP_NODELAY: 表示立即发送数据.
    
    SO_RESUSEADDR: 表示是否允许重用Socket 所绑定的本地地址.
    
    SO_TIMEOUT: 表示接收数据时的等待超时数据.
    
    SO_LINGER: 表示当执行Socket 的 close()方法时, 是否立即关闭底层的Socket.
    
    SO_SNFBUF: 表示发送数据的缓冲区的大小.
    
    SO_RCVBUF: 表示接收数据的缓冲区的大小. SO_KEEPALIVE: 表示对于长时间处于空闲状态的Socket , 是否要自动把它关闭. OOBINLINE: 表示是否支持发送一个字节的TCP 紧急数据.
    if(!socket.getTcpNoDelay()) socket.setTcpNoDelay(true);//关闭缓冲区,及时发送数据;默认false,打开缓冲区;先放在缓冲区, 等缓存区满了再发出. 发送完一批数据后, 会等待接收方对这批数据的回应, 然后再发送下一批数据

    if(!socket.getResuseAddress())socket.setResuseAddress(true);
    
    //当接收方通过Socket 的close() 方法关闭Socket 时, 如果网络上还有发送到这个Socket 的数据, 那么底层的Socket 不会立即释放本地端口,而是会等待一段时间,确保接收到了网络上发送过来的延迟数据, 然后再释放端口。Socket接收到延迟数据后,不会对这些数据作任何处理。socket接收延迟数据的目的是,确保这些数据不会被其他碰巧绑定到同样端口的新进程接收到。
    
    方法必须在Socket还没有绑定到一个本地端口之前调用,否则执行socket.setResuseAddress(true) 方法无效. 因此必须按照以下方式创建Socket对象, 然后再连接远程服务器 。

    更多的属性解释就不一一列举了。

    展开全文
  • 物联网之java实现(springboot + netty版server和client + 心跳,附完整源码)。 实例讲解。
  • socket心跳测试(java

    热门讨论 2011-12-22 09:48:01
    java语言 建立网络通讯长连接示例(心跳测试)
  • Java实现心跳机制

    万次阅读 2018-11-07 16:15:30
    一、心跳机制简介  在分布式系统中,分布在不同主机上的节点需要检测其他节点的状态,如服务器节点需要检测从节点是否失效。为了检测对方节点的有效性,每隔固定时间就发送一个固定信息给对方,对方回复一个固定...

    转载自 https://www.cnblogs.com/codingexperience/p/5939059.html

    一、心跳机制简介

         在分布式系统中,分布在不同主机上的节点需要检测其他节点的状态,如服务器节点需要检测从节点是否失效。为了检测对方节点的有效性,每隔固定时间就发送一个固定信息给对方,对方回复一个固定信息,如果长时间没有收到对方的回复,则断开与对方的连接。

         发包方既可以是服务端,也可以是客户端,这要看具体实现。因为是每隔固定时间发送一次,类似心跳,所以发送的固定信息称为心跳包。心跳包一般为比较小的包,可根据具体实现。心跳包主要应用于长连接的保持与短线链接。

          一般而言,应该客户端主动向服务器发送心跳包,因为服务器向客户端发送心跳包会影响服务器的性能。

    二、心跳机制实现方式

        心跳机制有两种实现方式,一种基于TCP自带的心跳包,TCP的SO_KEEPALIVE选项可以,系统默认的默认跳帧频率为2小时,超过2小时后,本地的TCP 实现会发送一个数据包给远程的 Socket. 如果远程Socket 没有发回响应, TCP实现就会持续尝试 11 分钟, 直到接收到响应为止。 否则就会自动断开Socket连接。但TCP自带的心跳包无法检测比较敏感地知道对方的状态,默认2小时的空闲时间,对于大多数的应用而言太长了。可以手工开启KeepAlive功能并设置合理的KeepAlive参数。

        另一种在应用层自己进行实现,基本步骤如下:

    1. Client使用定时器,不断发送心跳;
    2. Server收到心跳后,回复一个包;
    3. Server为每个Client启动超时定时器,如果在指定时间内没有收到Client的心跳包,则Client失效。

    三、Java实现心跳机制

        这里基于Java实现的简单RPC框架实现心跳机制。Java实现代码如下所示:

        心跳客户端类:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    public class HeartbeatClient implements Runnable {

     

        private String serverIP = "127.0.0.1";

        private int serverPort = 8089;

        private String nodeID = UUID.randomUUID().toString();

        private boolean isRunning = true;

        //  最近的心跳时间

        private long lastHeartbeat;

        // 心跳间隔时间

        private long heartBeatInterval = 10 * 1000;

     

        public void run() {

            try {

                while (isRunning) {

                    HeartbeatHandler handler = RPClient.getRemoteProxyObj(HeartbeatHandler.class, new InetSocketAddress(serverIP, serverPort));

                    long startTime = System.currentTimeMillis();

                    // 是否达到发送心跳的周期时间

                    if (startTime - lastHeartbeat > heartBeatInterval) {

                        System.out.println("send a heart beat");

                        lastHeartbeat = startTime;

     

                        HeartbeatEntity entity = new HeartbeatEntity();

                        entity.setTime(startTime);

                        entity.setNodeID(nodeID);

     

                        // 向服务器发送心跳,并返回需要执行的命令

                        Cmder cmds = handler.sendHeartBeat(entity);

     

                        if (!processCommand(cmds))

                            continue;

                    }

                }

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

     

        private boolean processCommand(Cmder cmds) {

            // ...

            return true;

        }

     

    }

          心跳包实体类:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    public class HeartbeatEntity implements Serializable {

     

        private long time;

        private String nodeID;

        private String error;

        private Map<String, Object> info = new HashMap<String, Object>();

     

        public String getNodeID() {

            return nodeID;

        }

     

        public void setNodeID(String nodeID) {

            this.nodeID = nodeID;

        }

     

        public String getError() {

            return error;

        }

     

        public void setError(String error) {

            this.error = error;

        }

     

        public Map<String, Object> getInfo() {

            return info;

        }

     

        public void setInfo(Map<String, Object> info) {

            this.info = info;

        }

     

        public long getTime() {

            return time;

        }

     

        public void setTime(long time) {

            this.time = time;

        }

    }

      服务器接受心跳包返回的命令对象类:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    public class Cmder implements Serializable {

     

        private String nodeID;

        private String error;

        private Map<String, Object> info = new HashMap<String, Object>();

     

        public String getNodeID() {

            return nodeID;

        }

     

        public void setNodeID(String nodeID) {

            this.nodeID = nodeID;

        }

     

        public String getError() {

            return error;

        }

     

        public void setError(String error) {

            this.error = error;

        }

     

        public Map<String, Object> getInfo() {

            return info;

        }

     

        public void setInfo(Map<String, Object> info) {

            this.info = info;

        }

    }

      RPC服务注册中心:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    81

    82

    83

    84

    85

    86

    87

    88

    89

    90

    91

    92

    93

    94

    95

    96

    97

    98

    99

    100

    101

    102

    103

    104

    105

    106

    107

    108

    109

    110

    111

    112

    113

    114

    115

    116

    117

    118

    119

    120

    121

    122

    123

    124

    125

    public class ServiceCenter {

     

        private ExecutorService executor = Executors.newFixedThreadPool(20);

     

        private final ConcurrentHashMap<String, Class> serviceRegistry = new ConcurrentHashMap<String, Class>();

     

        private AtomicBoolean isRunning = new AtomicBoolean(true);

     

        // 服务器监听端口

        private int port = 8089;

     

        // 心跳监听器

        HeartbeatLinstener linstener;

     

        // 单例模式

        private static class SingleHolder {

            private static final ServiceCenter INSTANCE = new ServiceCenter();

        }

     

        private ServiceCenter() {

        }

     

        public static ServiceCenter getInstance() {

            return SingleHolder.INSTANCE;

        }

     

        public void register(Class serviceInterface, Class impl) {

            System.out.println("regeist service " + serviceInterface.getName());

            serviceRegistry.put(serviceInterface.getName(), impl);

        }

     

        public void start() throws IOException {

            ServerSocket server = new ServerSocket();

            server.bind(new InetSocketAddress(port));

            System.out.println("start server");

            linstener = HeartbeatLinstener.getInstance();

            System.out.println("start listen heart beat");

            try {

                while (true) {

                    // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行

                    executor.execute(new ServiceTask(server.accept()));

                }

            } finally {

                server.close();

            }

        }

     

        public void stop() {

            isRunning.set(false);

            executor.shutdown();

        }

     

     

        public boolean isRunning() {

            return isRunning.get();

        }

     

        public int getPort() {

            return port;

        }

     

        public void settPort(int port) {

            this.port = port;

        }

     

        public ConcurrentHashMap<String, Class> getServiceRegistry() {

            return serviceRegistry;

        }

     

        private class ServiceTask implements Runnable {

            Socket clent = null;

     

            public ServiceTask(Socket client) {

                this.clent = client;

            }

     

            public void run() {

                ObjectInputStream input = null;

                ObjectOutputStream output = null;

                try {

                    // 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果

                    input = new ObjectInputStream(clent.getInputStream());

                    String serviceName = input.readUTF();

                    String methodName = input.readUTF();

                    Class<?>[] parameterTypes = (Class<?>[]) input.readObject();

                    Object[] arguments = (Object[]) input.readObject();

                    Class serviceClass = serviceRegistry.get(serviceName);

                    if (serviceClass == null) {

                        throw new ClassNotFoundException(serviceName + " not found");

                    }

                    Method method = serviceClass.getMethod(methodName, parameterTypes);

                    Object result = method.invoke(serviceClass.newInstance(), arguments);

     

                    // 3.将执行结果反序列化,通过socket发送给客户端

                    output = new ObjectOutputStream(clent.getOutputStream());

                    output.writeObject(result);

                } catch (Exception e) {

                    e.printStackTrace();

                } finally {

                    if (output != null) {

                        try {

                            output.close();

                        } catch (IOException e) {

                            e.printStackTrace();

                        }

                    }

                    if (input != null) {

                        try {

                            input.close();

                        } catch (IOException e) {

                            e.printStackTrace();

                        }

                    }

                    if (clent != null) {

                        try {

                            clent.close();

                        } catch (IOException e) {

                            e.printStackTrace();

                        }

                    }

                }

     

            }

        }

    }

      心跳监听类:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    package com.cang.heartbeat;

     

    import java.io.IOException;

    import java.io.ObjectInputStream;

    import java.io.ObjectOutputStream;

    import java.lang.reflect.Method;

    import java.net.InetSocketAddress;

    import java.net.ServerSocket;

    import java.net.Socket;

    import java.util.Iterator;

    import java.util.Map;

    import java.util.concurrent.ConcurrentHashMap;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

    import java.util.concurrent.atomic.AtomicBoolean;

     

    /**

     * 心跳监听保存信息

     *

     * @author cang

     * @create_time 2016-09-28 11:40

     */

    public class HeartbeatLinstener {

     

        private ExecutorService executor = Executors.newFixedThreadPool(20);

     

        private final ConcurrentHashMap<String, Object> nodes = new ConcurrentHashMap<String, Object>();

        private final ConcurrentHashMap<String, Long> nodeStatus = new ConcurrentHashMap<String, Long>();

     

        private long timeout = 10 * 1000;

     

        // 服务器监听端口

        private int port = 8089;

     

        // 单例模式

        private static class SingleHolder {

            private static final HeartbeatLinstener INSTANCE = new HeartbeatLinstener();

        }

     

        private HeartbeatLinstener() {

        }

     

        public static HeartbeatLinstener getInstance() {

            return SingleHolder.INSTANCE;

        }

     

        public ConcurrentHashMap<String, Object> getNodes() {

            return nodes;

        }

     

        public void registerNode(String nodeId, Object nodeInfo) {

            nodes.put(nodeId, nodeInfo);

            nodeStatus.put(nodeId, System.currentTimeMillis());

        }

     

        public void removeNode(String nodeID) {

            if (nodes.containsKey(nodeID)) {

                nodes.remove(nodeID);

            }

        }

     

        // 检测节点是否有效

        public boolean checkNodeValid(String key) {

            if (!nodes.containsKey(key) || !nodeStatus.containsKey(key)) return false;

            if ((System.currentTimeMillis() - nodeStatus.get(key)) > timeout) return false;

            return true;

        }

     

        // 删除所有失效节点

        public void removeInValidNode() {

            Iterator<Map.Entry<String, Long>> it = nodeStatus.entrySet().iterator();

            while (it.hasNext()) {

                Map.Entry<String, Long> e = it.next();

                if ((System.currentTimeMillis() - nodeStatus.get(e.getKey())) > timeout) {

                    nodes.remove(e.getKey());

                }

            }

        }

     

    }

      心跳处理类接口:

    1

    2

    3

    public interface HeartbeatHandler {

        public Cmder sendHeartBeat(HeartbeatEntity info);

    }

       心跳处理实现类:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    public class HeartbeatHandlerImpl implements HeartbeatHandler {

        public Cmder sendHeartBeat(HeartbeatEntity info) {

            HeartbeatLinstener linstener = HeartbeatLinstener.getInstance();

     

            // 添加节点

            if (!linstener.checkNodeValid(info.getNodeID())) {

                linstener.registerNode(info.getNodeID(), info);

            }

     

            // 其他操作

            Cmder cmder = new Cmder();

           cmder.setNodeID(info.getNodeID());

            // ...

     

            System.out.println("current all the nodes: ");

            Map<String, Object> nodes = linstener.getNodes();

            for (Map.Entry e : nodes.entrySet()) {

                System.out.println(e.getKey() + " : " + e.getValue());

            }

            System.out.println("hadle a heartbeat");

            return cmder;

        }

    }

      测试类:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    public class HeartbeatTest {

     

        public static void main(String[] args) {

            new Thread(new Runnable() {

                public void run() {

                    try {

                        ServiceCenter serviceServer = ServiceCenter.getInstance();

                        serviceServer.register(HeartbeatHandler.class, HeartbeatHandlerImpl.class);

                        serviceServer.start();

                    } catch (IOException e) {

                        e.printStackTrace();

                    }

                }

            }).start();

            Thread client1 = new Thread(new HeartbeatClient());

            client1.start();

            Thread client2 = new Thread(new HeartbeatClient());

            client2.start();

        }

    }

    四、总结

        上面的代码还有很多不足的地方,希望有空能进行改善:

    1.  配置为硬编码;
    2.  命令类Cmder没有实际实现,返回的Cmder对象没有实际进行处理;

       其他小问题就暂时不管了,希望以后能重写上面的代码。

    展开全文
  • 由于上次上传的是快压进行压缩,造成很多人下载下来,必须说安装快压软件,是我的失误操作的,本来想的是将项目中用到的基础模块共享出来,这样提供开源思路,后续的解析数据方便的多,可以按照上述的思路去实现,...
  • java 心跳检测

    千次阅读 2017-11-06 17:51:27
    今天再看Spring Cloud Eureka的时候,提到了心跳检测。之前也在某个地方看到过这个概念。但是一直没有深究。 找到了这篇文章。写的很不错。就来转载一下 http://blog.csdn.net/zhao9tian/article/details/52275214...
  • 简单解释就是: 短连接:jian
  • import java.io.*; import java.net.Socket; public class SocketUtil extends Thread{ public Socket socket; public boolean running=... //连接状态 public long lastSendTime; //最后一次发送数据的时间 pu...
  • java 心跳机制

    千次阅读 2018-06-20 15:04:51
    心跳包就是在客户端和服务器间定时通知对方自己状态的一个自己定义的命令字,按照一定的时间间隔发送,类似于心跳,所以叫做心跳包。
  • 由于在长连接的场景下,客户端和服务端并不是一直处于通信状态,... 客户端检测到某个服务端迟迟没有响应心跳也能重连获取一个新的连接。 正好借着在 cim有这样两个需求来聊一聊。 心跳实现方式 心跳其实有两种...
  • JAVA实现心跳机制

    千次阅读 2019-07-15 16:32:21
    1.心跳机制简介 在分布式系统中,分布在不同主机上的节点需要检测其他节点的状态,如服务器节点需要检测从节点是否失效。为了检测对方节点的有效性,每隔固定时间就发送一个固定信息给对方,对方回复一个固定信息...
  • Java 通过 Thread 实现心跳线程

    千次阅读 2018-11-14 18:53:57
    Java 学习笔记(02) mywang88 ...用来实现这一功能的进程,有时也被称为“心跳进程”。 实现 创建文件 HeartBeating.java,代码如下: public class HeartBeating { public static void start...
  • java socket长连接客户端服务端(标准实例),准确无误,流行结构。
  • NATS-Java客户端 客户端。 版本说明 这是java-nats库的2.x版本。 此版本是对原始库的完全重写。 重写的部分目标是解决线程的过度使用,我们创建了Dispatcher构造,以使应用程序可以更有意地控制线程的创建。 此版本...
  • Java心跳机制

    万次阅读 2016-06-09 11:19:11
    心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。 大部分CS的应用需要心跳机制。心跳机制一般在Server和Client都要实现,两者实现原理基本一样。Client不关心性能...
  • Socket长连接+心跳包+发送+读取,用到的全在这里了,自己看看哪里不需要的就不要添加了!代码很清晰很明白了!
  • 一个Socket连接管理器(心跳机制)
  • netty心跳连接代码

    2021-01-19 15:51:14
    netty心跳连接代码
  • Delphi之TClientSocket和TServerSocket进行TCP长连接通讯,使用KeepALive自动发送心跳包检测断网,并实现断线重连,经测试可以及时检测到拔掉网线断网情况,具有较高的参考价值

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 46,106
精华内容 18,442
关键字:

java心跳连接

java 订阅