精华内容
下载资源
问答
  • java 多线程 数据库访问 OOS 录入 java 多线程 数据库访问 OOS 录入
  • 多线程操作数据库文件,多线程分页读取数据
  • java 多线程操作数据库

    热门讨论 2010-10-19 13:16:33
    一个java 多线程操作数据库应用程序!!!
  • 如题, 使用java语言,sqlite数据如何解决多线程同步锁的问题
  • 众所周知,创建数据库连接需要消耗较的资源,且创建时间也较长。如果网站一天100万PV(假设每个页面都有DB读取或修改操作),程序就需要创建100万次连接,极大的浪费资源。 事实上,同一时间需要创建数据库连接的...

    本例采用mysql数据库,因此请先下载mysql-connection.jar

    在我们的实际开发中,离不开和数据库打交道。而和数据库的通信,离不开数据库连接。
    通常用JDBC连接数据库时,需要加载数据驱动,然后再通过接口返回数据库连接。
    一般分为两步:
    1、加载驱动至内存
    Class.forName(“com.mysql.jdbc.Driver”);

    2、创建并获取连接,返回的是JDBC中的Connection
    DriverManager.getConnection(url, user, password)

    示例:

    //要连接的数据库URL
    String url = "jdbc:mysql://localhost:3306/tangwmdb";
    //连接的数据库时使用的用户名
    String username = "root";
    //连接的数据库时使用的密码
    String password = "root";
    
    //1.加载驱动
    //DriverManager.registerDriver(new com.mysql.jdbc.Driver());不推荐使用这种方式来加载驱动
    Class.forName("com.mysql.jdbc.Driver");//推荐使用这种方式来加载驱动
    
    //2.获取与数据库的链接
    Connection conn = DriverManager.getConnection(url, username, password);
    
    //3.获取用于向数据库发送sql语句的statement
    Statement st = conn.createStatement();
    
    String sql = "select id,name,password from members";
    //4.向数据库发sql,并获取代表结果集的resultset
    ResultSet rs = st.executeQuery(sql);
    
    //5.取出结果集的数据
    while(rs.next()){
        System.out.println("id=" + rs.getObject("id"));
        System.out.println("name=" + rs.getObject("name"));
        System.out.println("password=" + rs.getObject("password"));
    }
    
    //6.关闭链接,释放资源
    rs.close();
    st.close();
    conn.close();

    众所周知,创建数据库连接需要消耗较多的资源,且创建时间也较长。如果网站一天100万PV(假设每个页面都有DB读取或修改操作),程序就需要创建100万次连接,极大的浪费资源。
    事实上,同一时间需要创建数据库连接的请求数量并不多,一般几百个足够了。那么我们可以根据需要创建一个连接池,它负责分配、管理和释放数据库连接,它允许应用程序重复使用同一个现有的数据库连接,而不是重新建立一个。这里用到了设计模式中的一个模式:享元模式(Flyweight)
    比如我们的连接池中有1000条连接,请求来时,连接池从池中分配一条给请求,用完后收回,而不是销毁,等到下次有请求来时,又可以重复分配使用。
    这里写图片描述

    当使用了数据库连接池之后,在项目的实际开发中就不需要编写连接数据库的代码了,直接从数据源获得数据库的连接。比如:

    //DBCP 数据库连接池
    DataSource ds = BasicDataSourceFactory.createDataSource(prop);
    Connection conn = ds.getConnection();

    可以看到创建连接的工作很简单,因为复杂的分配、回收功能都交给了连接池去处理。

    当前有一些开源的数据连接池实现:

    • DBCP 数据库连接池
    • C3P0 数据库连接池

    另外阿里开源项目Druid(整个项目由数据库连接池、插件框架和SQL解析器组成)中的数据库连接池被很多互联网公司都采用在生产环境中。

    编写自己的数据库连接池

    编写的连接池需要做到以下几个基本点:
    1、可配置并管理多个连接节点的连接池
    这里写图片描述

    2、始使化时根据配置中的初始连接数创建指定数量的连接
    3、在连接池没有达到最大连接数之前,如果有可用的空闲连接就直接使用空闲连接,如果没有,就创建新的连接。
    4、当连接池中的活动连接数达到最大连接数,新的请求进入等待状态,直到有连接被释放。
    5、由于数据库连接闲置久了会超时关闭,因此需要连接池采用机制保证每次请求的连接都是有效可用的。
    6、线程安全
    7、连接池内部要保证指定最小连接数量的空闲连接。
    对于最小连接数在实际应用中的效果以及与初始连接数的区别,其实理解的不是很透。在程序中我采用的方式是,如果 活动连接数 + 空闲连接数 < 最小连接数,就补齐对应数量(最小连接数 - 活动连接数 - 空闲连接数)的空闲连接

    摘录一段:

    数据库连接池的最小连接数和最大连接数的设置要考虑到以下几个因素:
    最小连接数:是连接池一直保持的数据库连接,所以如果应用程序对数据库连接的使用量不大,将会有大量的数据库连接资源被浪费。
    最大连接数:是连接池能申请的最大连接数,如果数据库连接请求超过次数,后面的数据库连接请求将被加入到等待队列中,这会影响以后的数据库操作。
    如果最小连接数与最大连接数相差很大,那么最先连接请求将会获利,之后超过最小连接数量的连接请求等价于建立一个新的数据库连接。不过,这些大于最小连接数的数据库连接在使用完不会马上被释放,它将被放到连接池中等待重复使用或是超时后被释放。

    系统结构:

    1.连接池接口IConnectionPool:里面定义一些基本的获取连接的一些方法。
    2.连接池接口实现ConnectionPool
    3.连接池管理DBConnectionManager:管理不同的连接池,所有的连接都是通过这里获得。
    4.其它工具类,诸如属性读取类PropertiesManager,属性保存类DBPropertyBean。
    这里写图片描述

    工程结构:

    这里写图片描述

    工程代码:

    DBPropertyBean.java

    package com.twm.TDBConnectionPool;
    public class DBPropertyBean {
    
        private String nodeName;
        //数据连接驱动
        private String driverName;
        //数据连接url
        private String url;
        //数据连接username
        private String username;
        //数据连接密码
        private String password;
        //连接池最大连接数
        private int maxConnections ;
        //连接池最小连接数
        private int minConnections;
        //连接池初始连接数
        private int initConnections;
        //重连间隔时间 ,单位毫秒
        private int conninterval ;
        //获取连接超时时间 ,单位毫秒,0永不超时
        private int timeout ;
    
        //构造方法
        public DBPropertyBean(){
            super();
        }
    
        //下面是getter and setter
    
        /**
         * 获取数据库连接节点名称
         * @return
         */
        public String getNodeName() {
            return nodeName;
        }
    
        /**
         * 设置数据库连接节点名称
         * @param nodeName
         */
        public void setNodeName(String nodeName) {
            this.nodeName = nodeName;
        }
    
        /**
         * 获取数据库驱动
         * @return
         */
        public String getDriverName() {
            return driverName;
        }
    
        /**
         * 设置数据库驱动
         * @param driverName
         */
        public void setDriverName(String driverName) {
            this.driverName = driverName;
        }
    
        /**
         * 获取数据库url
         * @return
         */
        public String getUrl() {
            return url;
        }
    
        /**
         * 设置数据库url
         * @param url
         */
        public void setUrl(String url) {
            this.url = url;
        }
    
        /**
         * 获取用户名
         * @return
         */
        public String getUsername() {
            return username;
        }
    
        /**
         * 设置用户名
         * @param username
         */
        public void setUsername(String username) {
            this.username = username;
        }
    
        /**
         * 获取数据库连接密码
         * @return
         */
        public String getPassword(){
            return password;
        }
    
        /**
         * 设置数据库连接密码
         * @param password
         */
        public void setPassword(String password) {
            this.password = password;
        }
    
        /**
         * 获取最大连接数
         * @return
         */
        public int getMaxConnections() {
            return maxConnections;
        }
    
        /**
         * 设置最大连接数
         * @param maxConnections
         */
        public void setMaxConnections(int maxConnections) {
            this.maxConnections = maxConnections;
        }
    
        /**
         * 获取最小连接数(也是数据池初始连接数)
         * @return
         */
        public int getMinConnections() {
            return minConnections;
        }
    
        /**
         * 设置最小连接数(也是数据池初始连接数)
         * @param minConnections
         */
        public void setMinConnections(int minConnections) {
            this.minConnections = minConnections;
        }
    
        /**
         * 获取初始加接数
         * @return
         */
        public int getInitConnections() {
            return initConnections;
        }
    
        /**
         * 设置初始连接数
         * @param initConnections
         */
        public void setInitConnections(int initConnections) {
            this.initConnections = initConnections;
        }
    
        /**
         * 获取重连间隔时间,单位毫秒
         * @return
         */
        public int getConninterval() {
            return conninterval;
        }
    
        /**
         * 设置重连间隔时间,单位毫秒
         * @param conninterval
         */
        public void setConninterval(int conninterval) {
            this.conninterval = conninterval;
        }
    
        /**
         * 获取连接超时时间,单位毫秒
         * @return
         */
        public int getTimeout() {
            return timeout;
        }
    
        /**
         * 设置连接超时时间 ,单位毫秒,0-无限重连
         * @param timeout
         */
        public void setTimeout(int timeout) {
            this.timeout = timeout;
        }
    
    }

    IConnectionPool.java

    package com.twm.TDBConnectionPool;
    
    import java.sql.Connection;
    import java.sql.SQLException;
    
    public interface IConnectionPool {
        /**
         * 获取一个数据库连接,如果等待超过超时时间,将返回null
         * @return 数据库连接对象
         */
        public Connection getConnection();
    
        /**
         * 获得当前线程的连接库连接
         * @return 数据库连接对象
         */
        public Connection getCurrentConnecton();
    
        /**
         * 释放当前线程数据库连接
         * @param conn 数据库连接对象
         * @throws SQLException
         */
        public void releaseConn(Connection conn) throws SQLException;
    
        /**
         * 销毁清空当前连接池
         */
        public void destroy();
    
        /**
         * 连接池可用状态
         * @return 连接池是否可用
         */
        public boolean isActive();
    
        /**
         * 定时器,检查连接池
         */
        public void checkPool();
    
        /**
         * 获取线程池活动连接数
         * @return 线程池活动连接数
         */
        public int getActiveNum();
    
        /**
         * 获取线程池空闲连接数
         * @return 线程池空闲连接数
         */
        public int getFreeNum();
    }

    ConnectionPool.java

    package com.twm.TDBConnectionPool;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.sql.Connection;
    import java.sql.Driver;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.TimerTask;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.log4j.Logger;
    
    
    /** 
     * 类说明 :友元类,包内可见,不提供给客户程序直接访问。
     */
    class ConnectionPool implements IConnectionPool {
    
        private static final Logger log = Logger.getLogger(ConnectionPool.class);
    
        private DBPropertyBean propertyBean=null;
    
        //连接池可用状态
        private Boolean isActive = true;
    
        // 空闲连接池 。由于List读写频繁,使用LinkedList存储比较合适
        private LinkedList<Connection> freeConnections = new LinkedList<Connection>();  
        // 活动连接池。活动连接数 <= 允许最大连接数(maxConnections)
        private LinkedList<Connection> activeConnections = new LinkedList<Connection>(); 
    
        //当前线程获得的连接
        private ThreadLocal<Connection> currentConnection= new ThreadLocal<Connection>();
    
        //构造方法无法返回null,所以取消掉。在下面增加了CreateConnectionPool静态方法。
        private ConnectionPool(){
            super();
        }
    
        public static ConnectionPool CreateConnectionPool(DBPropertyBean propertyBean) {
            ConnectionPool connpool=new ConnectionPool();
            connpool.propertyBean = propertyBean;
    
            //加载驱动 
    
            //在多节点环境配置下,因为在这里无法判断驱动是否已经加载,可能会造成多次重复加载相同驱动。
            //因此加载驱动的动作,挪到connectionManager管理类中去实现了。
            /*try {
                Class.forName(connpool.propertyBean.getDriverName());
                log.info("加载JDBC驱动"+connpool.propertyBean.getDriverName()+"成功");
            } catch (ClassNotFoundException e) {
                log.info("未找到JDBC驱动" + connpool.propertyBean.getDriverName() + ",请引入相关包");
                return null;
            }*/
    
            //基本点2、始使化时根据配置中的初始连接数创建指定数量的连接
            for (int i = 0; i < connpool.propertyBean.getInitConnections(); i++) {
                try {
                    Connection conn = connpool.NewConnection();
                    connpool.freeConnections.add(conn);
                } catch (SQLException | ClassNotFoundException e) {
                    log.error(connpool.propertyBean.getNodeName()+"节点连接池初始化失败");
                    return null;
                }
            }
    
            connpool.isActive = true;
            return connpool;
        }
    
    
    
        /**
         * 检测连接是否有效
         * @param 数据库连接对象
         * @return Boolean
         */
        private Boolean isValidConnection(Connection conn) throws SQLException{
            try {
                if(conn==null || conn.isClosed()){
                    return false;
                }
            } catch (SQLException e) {
                throw new SQLException(e);
            }
            return true;
        }
    
        /**
         * 创建一个新的连接
         * @return 数据库连接对象
         * @throws ClassNotFoundException
         * @throws SQLException
         */
        private Connection NewConnection() throws ClassNotFoundException,
                SQLException {
    
            Connection conn = null;
            try {
                if (this.propertyBean != null) {
                    //Class.forName(this.propertyBean.getDriverName());
                    conn = DriverManager.getConnection(this.propertyBean.getUrl(),
                            this.propertyBean.getUsername(),
                            this.propertyBean.getPassword());
                }
            } catch (SQLException e) {
                throw new SQLException(e);
            }
    
    
    
            return conn;
        }
    
    
        @Override
        public synchronized Connection getConnection() {
            Connection conn = null;
            if (this.getActiveNum() < this.propertyBean.getMaxConnections()) {
                // 分支1:当前使用的连接没有达到最大连接数  
                // 基本点3、在连接池没有达到最大连接数之前,如果有可用的空闲连接就直接使用空闲连接,如果没有,就创建新的连接。
                if (this.getFreeNum() > 0) {
                    // 分支1.1:如果空闲池中有连接,就从空闲池中直接获取
                    log.info("分支1.1:如果空闲池中有连接,就从空闲池中直接获取");
                    conn = this.freeConnections.pollFirst();
    
                    //连接闲置久了也会超时,因此空闲池中的有效连接会越来越少,需要另一个进程进行扫描监测,不断保持一定数量的可用连接。
                    //在下面定义了checkFreepools的TimerTask类,在checkPool()方法中进行调用。
    
                    //基本点5、由于数据库连接闲置久了会超时关闭,因此需要连接池采用机制保证每次请求的连接都是有效可用的。
                    try {
                        if(this.isValidConnection(conn)){
                            this.activeConnections.add(conn);
                            currentConnection.set(conn);
                        }else{
                            conn = getConnection();//同步方法是可重入锁
                        }
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                } else {
                    // 分支1.2:如果空闲池中无可用连接,就创建新的连接
                    log.info("分支1.2:如果空闲池中无可用连接,就创建新的连接");
                    try {
                        conn = this.NewConnection();
                        this.activeConnections.add(conn);
                    } catch (ClassNotFoundException | SQLException e) {
                        e.printStackTrace();
                    }
                }
            } else {
                // 分支2:当前已到达最大连接数  
                // 基本点4、当连接池中的活动连接数达到最大连接数,新的请求进入等待状态,直到有连接被释放。
                log.info("分支2:当前已到达最大连接数 ");
                long startTime = System.currentTimeMillis();
    
                //进入等待状态。等待被notify(),notifyALL()唤醒或者超时自动苏醒  
                try{
                    this.wait(this.propertyBean.getConninterval());  
                }catch(InterruptedException e) {  
                    log.error("线程等待被打断");  
                }
    
                //若线程超时前被唤醒并成功获取连接,就不会走到return null。
                //若线程超时前没有获取连接,则返回null。
                //如果timeout设置为0,就无限重连。
                if(this.propertyBean.getTimeout()!=0){
                    if(System.currentTimeMillis() - startTime > this.propertyBean.getTimeout())  
                        return null;  
                }
                conn = this.getConnection();
    
            }
            return conn;
        }
    
    
        @Override
        public Connection getCurrentConnecton() {
            Connection conn=currentConnection.get();
            try {
                if(! isValidConnection(conn)){
                    conn=this.getConnection();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return conn;
        }
    
    
        @Override
        public synchronized void releaseConn(Connection conn) throws SQLException {
    
            log.info(Thread.currentThread().getName()+"关闭连接:activeConnections.remove:"+conn);
            this.activeConnections.remove(conn);
            this.currentConnection.remove();
            //活动连接池删除的连接,相应的加到空闲连接池中
            try {
                if(isValidConnection(conn)){
                    freeConnections.add(conn);
                }else{
                    freeConnections.add(this.NewConnection());
                }
    
            } catch (ClassNotFoundException | SQLException e) {
                e.printStackTrace();
            }
            //唤醒getConnection()中等待的线程
            this.notifyAll();
        }
    
        @Override
        public synchronized void destroy() {
            for (Connection conn : this.freeConnections) {  
                try {
                    if (this.isValidConnection(conn)) { 
                        conn.close();
                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                }   
            }  
            for (Connection conn : this.activeConnections) {  
                try {
                    if (this.isValidConnection(conn)) { 
                        conn.close();
                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                } 
            }
            this.isActive = false;
            this.freeConnections.clear();
            this.activeConnections.clear();
        }
    
        @Override
        public boolean isActive() {
            return this.isActive;
        }
    
    
        @Override
        public void checkPool() {
    
            final String nodename=this.propertyBean.getNodeName();
    
            ScheduledExecutorService ses=Executors.newScheduledThreadPool(2);
    
            //功能一:开启一个定时器线程输出状态
            ses.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    System.out.println(nodename +"空闲连接数:"+getFreeNum());  
                    System.out.println(nodename +"活动连接数:"+getActiveNum());   
    
                }
            }, 1, 1, TimeUnit.SECONDS);
    
            //功能二:开启一个定时器线程,监测并维持空闲池中的最小连接数
            ses.scheduleAtFixedRate(new checkFreepools(this), 1, 5, TimeUnit.SECONDS);
        }
    
        @Override
        public synchronized int getActiveNum() {
            return this.activeConnections.size();
        }
    
        @Override
        public synchronized int getFreeNum() {
            return this.freeConnections.size();
        }
    
        //基本点6、连接池内部要保证指定最小连接数量的空闲连接
        class checkFreepools extends TimerTask {
            private ConnectionPool conpool = null;
    
            public checkFreepools(ConnectionPool cp) {
                this.conpool = cp;
            }
    
            @Override
            public void run() {
                if (this.conpool != null && this.conpool.isActive()) {
                    int poolstotalnum = conpool.getFreeNum()
                            + conpool.getActiveNum();
                    int subnum = conpool.propertyBean.getMinConnections()
                            - poolstotalnum;
    
                    if (subnum > 0) {
                        System.out.println(conpool.propertyBean.getNodeName()
                                + "扫描并维持空闲池中的最小连接数,需补充" + subnum + "个连接");
                        for (int i = 0; i < subnum; i++) {
                            try {
                                conpool.freeConnections
                                        .add(conpool.NewConnection());
                            } catch (ClassNotFoundException | SQLException e) {
                                e.printStackTrace();
                            }
                        }
    
                    }
                }
    
            }
    
        }
    
    
    
    }

    ConnectionManager.java

    package com.twm.TDBConnectionPool;
    
    import java.sql.Connection;
    import java.sql.Driver;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    import java.util.ArrayList;
    import java.util.Enumeration;
    import java.util.HashSet;
    import java.util.Hashtable;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Queue;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    
    import org.apache.log4j.Logger;
    import com.twm.TDBConnectionPool.util.PropertiesManager;
    
    
    public class ConnectionManager {
        private final Logger log = Logger.getLogger(ConnectionPool.class);
    
        private static ConnectionManager dbm = null;
    
        /** 
         * 加载的驱动器名称集合 
         */  
        private Set<String> drivers = new HashSet<String>(); 
    
        /**
         * 数据库连接池字典
         * 为每个节点创建一个连接池(可配置多个节点)
         */
        private ConcurrentHashMap<String, IConnectionPool> pools = new ConcurrentHashMap<String, IConnectionPool>();
    
    
    
        private ConnectionManager() {
            createPools();
        }
    
    
        /**
         * 装载JDBC驱动程序,并创建连接池
         */
        private void createPools() {
            String str_nodenames = PropertiesManager.getProperty("nodename");
            //基本点1、可配置并管理多个连接节点的连接池
            for (String str_nodename : str_nodenames.split(",")) {
                DBPropertyBean dbProperty = new DBPropertyBean();
                dbProperty.setNodeName(str_nodename);
    
                //验证url配置正确性
                String url = PropertiesManager.getProperty(str_nodename + ".url");
                if (url == null) {
                    log.error(str_nodename+"节点的连接字符串为空,请检查配置文件");
                    continue;
                }
                dbProperty.setUrl(url);
    
                //验证driver配置正确性
                String driver = PropertiesManager.getProperty(str_nodename + ".driver");
                if (driver == null) {
                    log.error(str_nodename+"节点的driver驱动为空,请检查配置文件");
                    continue;
                }
                dbProperty.setDriverName(driver);
    
    
                //验证user配置正确性
                String user = PropertiesManager.getProperty(str_nodename + ".user");
                if (user == null) {
                    log.error(str_nodename+"节点的用户名设置为空,请检查配置文件");
                    continue;
                }
                dbProperty.setUsername(user);
    
    
                //验证password配置正确性
                String password = PropertiesManager.getProperty(str_nodename + ".password");
                if (password == null) {
                    log.error(str_nodename+"节点的密码设置为空,请检查配置文件");
                    continue;
                }
                dbProperty.setPassword(password);
    
                //验证最小连接数配置正确性
                String str_minconnections=PropertiesManager.getProperty(str_nodename + ".minconnections");
                int minConn;
                try {
                    minConn = Integer.parseInt(str_minconnections);
                } catch (NumberFormatException e) {
                    log.error(str_nodename + "节点最小连接数设置错误,默认设为5");
                    minConn=5;
                }
                dbProperty.setMinConnections(minConn);
    
                //验证初始连接数配置正确性
                String str_initconnections=PropertiesManager.getProperty(str_nodename + ".initconnections");
                int initConn;
                try {
                    initConn = Integer.parseInt(str_initconnections);
                } catch (NumberFormatException e) {
                    log.error(str_nodename + "节点初始连接数设置错误,默认设为5");
                    initConn=5;
                }
                dbProperty.setInitConnections(initConn);
    
                //验证最大连接数配置正确性
                String str_maxconnections=PropertiesManager.getProperty(str_nodename + ".maxconnections");
                int maxConn;
                try {
                    maxConn = Integer.parseInt(str_maxconnections);
                } catch (NumberFormatException e) {
                    log.error(str_nodename + "节点最大连接数设置错误,默认设为20");
                    maxConn=20;
                }
                dbProperty.setMaxConnections(maxConn);
    
                //验证conninterval配置正确性
                String str_conninterval=PropertiesManager.getProperty(str_nodename + ".conninterval");
                int conninterval;
                try {
                    conninterval = Integer.parseInt(str_conninterval);
                } catch (NumberFormatException e) {
                    log.error(str_nodename + "节点重新连接间隔时间设置错误,默认设为500ms");
                    conninterval = 500;
                }
                dbProperty.setConninterval(conninterval);
    
                //验证timeout配置正确性
                String str_timeout=PropertiesManager.getProperty(str_nodename + ".timeout");
                int timeout;
                try {
                    timeout = Integer.parseInt(str_timeout);
                } catch (NumberFormatException e) {
                    log.error(str_nodename + "节点连接超时时间设置错误,默认设为2000ms");
                    timeout = 2000;
                }
                dbProperty.setTimeout(timeout);
    
                //创建驱动
                if(!drivers.contains(dbProperty.getDriverName())){
                    try {
                        Class.forName(dbProperty.getDriverName());
                        log.info("加载JDBC驱动"+dbProperty.getDriverName()+"成功");
                        drivers.add(dbProperty.getDriverName());
                    } catch (ClassNotFoundException e) {
                        log.error("未找到JDBC驱动" + dbProperty.getDriverName() + ",请引入相关包");
                        e.printStackTrace();
                    }
                }
    
                //创建连接池。这里采用同步方法实现的连接池类ConnectionPool。
                //(如果后面我们还有别的实现方式,只需要更改这里就行了。)
                IConnectionPool cp = ConnectionPool.CreateConnectionPool(dbProperty);
                if (cp != null) {
                    pools.put(str_nodename, cp);
                    cp.checkPool();
                    log.info("创建" + str_nodename + "数据库连接池成功");
                } else {
                    log.info("创建" + str_nodename + "数据库连接池失败");
                }
            }
    
        }
    
        /**
         * 获得单例
         * 
         * @return DBConnectionManager单例
         */
        public synchronized static ConnectionManager getInstance() {
            if (dbm == null) {
                dbm = new ConnectionManager();
            }
            return dbm;
        }
    
        /**
         * 从指定连接池中获取可用连接
         * 
         * @param poolName要获取连接的连接池名称
         * @return连接池中的一个可用连接或null
         */
        public Connection getConnection(String poolName) {
            IConnectionPool pool =  pools.get(poolName);
            return pool.getConnection();
        }
    
    
        /**
         * 回收指定连接池的连接
         * 
         * @param poolName连接池名称
         * @param conn要回收的连接
         */
        public void closeConnection(String poolName, Connection conn) throws SQLException {
            IConnectionPool pool = pools.get(poolName);
            if (pool != null) {
                try {
                    pool.releaseConn(conn);
                } catch (SQLException e) {
                    log.error("回收"+poolName+"池中的连接失败。");
                    throw new SQLException(e);
                }
            }else{
                log.error("找不到"+poolName+"连接池,无法回收");
            }
        }
    
        /**
         * 关闭所有连接,撤销驱动器的注册
         */
        public void destroy() {
            for (Map.Entry<String, IConnectionPool> poolEntry : pools.entrySet()) {
                IConnectionPool pool = poolEntry.getValue();
                pool.destroy();
            }
            log.info("已经关闭所有连接");
        }
    }

    PropertiesManager.java

    package com.twm.TDBConnectionPool.util;
    
    import java.io.IOException;
    import java.util.Enumeration;
    import java.util.Properties;
    
    
    public class PropertiesManager {
        private static Properties pro = new Properties();
    
        private PropertiesManager() {
    
        }
    
        static {
            try {
                pro.load(PropertiesManager.class.getClassLoader().getResourceAsStream("DB.properties"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static String getProperty(String key) {
            return pro.getProperty(key);
        }
    
        public static String getProperty(String key, String defaultValue) {
            return pro.getProperty(key, defaultValue);
        }
    
        public static Enumeration<?> propertiesNames() {
            return pro.propertyNames();
        }
    }

    testpool.java

    package com.twm.TDBConnectionPool.run;
    
    import java.sql.Connection;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.ArrayList;
    import java.util.Enumeration;
    import java.util.Iterator;
    import java.util.List;
    
    import com.twm.TDBConnectionPool.ConnectionManager;
    import com.twm.TDBConnectionPool.DBPropertyBean;
    import com.twm.TDBConnectionPool.IConnectionPool;
    import com.twm.TDBConnectionPool.util.PropertiesManager;
    
    
    public class testpool {
    
        /**
         * @param args
         * @throws InterruptedException 
         */
        public static void main(String[] args) throws InterruptedException {
    
            List<Thread> threadlist=new ArrayList<Thread>();
            for(int i=1;i<=3;i++){
                Thread subThread = new Thread(new workrun(i));
                subThread.start();
                threadlist.add(subThread);
            }
            for (Iterator<Thread> iterator = threadlist.iterator(); iterator.hasNext();) {
                Thread thread = iterator.next();
                thread.join();
            }
            //ConnectionManager.getInstance().destroy();
    
        }
    
    }
    class workrun implements Runnable{
        int i;
        public workrun(int i){
            this.i=i;
        }
        @Override
        public void run() {
            ConnectionManager cm = ConnectionManager.getInstance();
    
             //1.从数据池中获取数据库连接
            Connection conn = cm.getConnection("default");
            System.out.println("线程 " + Thread.currentThread().getName() + "获得连接:" + conn);
    
            //模拟查询耗时操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
    
             //2.获取用于向数据库发送sql语句的statement
            Statement st = null;
            try {
                st = conn.createStatement();
            } catch (SQLException e) {
                e.printStackTrace();
            }
    
            String sql = "select * from product where id="+i;
            //3.向数据库发sql,并获取代表结果集的resultset
            //4.取出结果集的数据
            ResultSet rs = null;
            try {
                rs = st.executeQuery(sql);
                while(rs.next()){
                    System.out.println("productname=" + rs.getObject("productname"));
                    //System.out.println("price=" + rs.getObject("price"));
                    //System.out.println("cateid=" + rs.getObject("cateid"));
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
    
            //模拟查询耗时操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            //5.关闭链接,释放资源
            try {
                rs.close();
                st.close();
                //conn.close();
                cm.closeConnection("default",conn);
            } catch (SQLException e) {
                e.printStackTrace();
            }
    
        }
    
    }

    DB.properties

    nodename=default,testdb
    
    default.driver=com.mysql.jdbc.Driver
    default.url=jdbc:mysql://localhost:3306/tangwenmingdb
    default.user=root
    default.password=root
    default.maxconnections=10
    default.minconnections=5
    default.initconnections=2
    default.conninterval=500
    default.timeout = 5000
    
    testdb.driver=com.mysql.jdbc.Driver
    testdb.url=jdbc:mysql://192.168.1.17:3306/test
    testdb.user=root
    testdb.password=root
    testdb.maxconnections=10
    testdb.minconnections=2
    testdb.initconnections=2
    testdb.conninterval= 500
    testdb.timeout = 5000

    product表结构和数据:

    /*!40101 SET NAMES utf8 */;
    create table `product` (
        `id` int (10),
        `productname` varchar (150),
        `price` Decimal (12),
        `cateid` int (11)
    ); 
    insert into `product` (`id`, `productname`, `price`, `cateid`) values('1','iphone 8 plus','5700.00','1');
    insert into `product` (`id`, `productname`, `price`, `cateid`) values('2','华为G7','1700.00','1');
    insert into `product` (`id`, `productname`, `price`, `cateid`) values('3','HTC','1200.00','1');

    这里写图片描述

    至此,我们用同步方法synchronized实现了一个简单的数据库连接池。大家可以试着调整DB.properties里的参数配置(比如maxconnections,minconnections,initconnections),观察控制台的输出变化。也可以调整程序中的thread.sleep时长观察。


    代理Connection

    现在来做一个优化,各位一定要看完,这里搞得我很痛苦,也留下一个未解决的问题。希望大家帮忙。
    通常,用户在使用完毕数据库连接后会调用conn.close()关闭连接,而不是调用我们程序中的ConnectionManager.closeConnection("default",conn);方法关闭。
    为了让conn.close和我们自己写的closeConnection()方法保持一致的效果,就必须要改写connection的close()方法,在这个场景下,很容易想到使用代理方式。这里先采用动态代理方式来实现:

    通过改造ConnectionPool类:
    1、在ConnectionPool类中添加一个内部InvocationHandler实现类(和checkFreepools类平级的)

    class ConProxyHandler implements InvocationHandler{
            private Connection conn;
            private ConnectionPool cp;
    
            public ConProxyHandler(ConnectionPool cp, Connection conn) {
                this.cp = cp;
                this.conn = conn;
            }
    
            @Override
    
            public Object invoke(Object proxy, Method method, Object[] args)
                    throws Throwable {
                if(method.getName().equals("close")){
                    cp.releaseConn(this.conn);
                    return null;
                }else{
                    //如果不是调用close方法,就原样处理
                    return method.invoke(this.conn, args);
                }
            }
    
        }

    2、改造ConnectionPool类的NewConnection()方法,在return conn之前,加上一段

    if (conn != null) {
        ConProxyHandler handler = new ConProxyHandler(this, conn);
        Connection conn_return = (Connection) Proxy.newProxyInstance(
                ConnectionPool.class.getClassLoader(),
                new Class[] { Connection.class }, handler);
        System.out.println("生成代理类:" + conn_return);
        return conn_return;
    }

    最后在调用的testpool.java中把

    cm.closeConnection("default",conn);

    改为:

    conn.close();

    然后运行,前面都正常,到最后出问题了:
    当三个线程分别关闭连接后,空闲连接数加上去了,但活动连接数并没有相应的减少。
    心碎。。。。到底哪出问题了呢?

    打断点跟踪发现this.activeConnections.remove(conn);这句并没有成功移除conn。
    然后开始检查:
    1、打印ConProxyHandler类中的cp和ConnectionPool类中CreateConnectionPool()方法里的connpool,发现hashcode是一致的
    2、在ConProxyHandler类中的invoke方法中加上对equals方法的处理。发现压根不会进到这里。
    3、跟踪到this.activeConnections.remove(conn);
    发现this.activeConnections里保存了三个连接类
    这里写图片描述

    并且传入参数conn的hashcode和this.activeConnections中其中一个连接类是一致的

    这里写图片描述

    但是运行完this.activeConnections.remove(conn)后到下句,发现this.activeConnections里依然存在com.mysql.jdbc.JDBC4Connection@109cdd5对象。

    为什么删不掉呢?
    太打击积极性了,也不知道怎么调试查看原因。开始怀疑自己还适合做开发吗?

    即然删不掉,那就将this.activeConnections改成hashmap,用key来删掉。
    改动ConnectionPool .java如下:

    package com.twm.TDBConnectionPool;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.sql.Connection;
    import java.sql.Driver;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    import java.util.ArrayList;
    import java.util.Hashtable;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.TimerTask;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.log4j.Logger;
    
    
    /** 
     * @author TangWenming  E-mail:soonfly#gmail.com
     * @version 创建时间:2017-5-16 下午5:49:17 
     * 类说明 :友元类,包内可见,不提供给客户程序直接访问。
     */
    class ConnectionPool implements IConnectionPool {
    
        private static final Logger log = Logger.getLogger(ConnectionPool.class);
    
        private DBPropertyBean propertyBean=null;
    
        //连接池可用状态
        private Boolean isActive = true;
    
        // 空闲连接池 。
        private Hashtable<Integer,Connection> freeConnections = new Hashtable<Integer,Connection>();  
        // 活动连接池。
        private Hashtable<Integer,Connection> activeConnections = new Hashtable<Integer,Connection>(); 
    
        //当前线程获得的连接
        private ThreadLocal<Connection> currentConnection= new ThreadLocal<Connection>();
    
        //构造方法无法返回null,所以取消掉。在下面增加了CreateConnectionPool静态方法。
        private ConnectionPool(){
            super();
        }
    
        public static ConnectionPool CreateConnectionPool(DBPropertyBean propertyBean) {
            ConnectionPool connpool=new ConnectionPool();
            connpool.propertyBean = propertyBean;
    
            //加载驱动 
    
            //在多节点环境配置下,因为在这里无法判断驱动是否已经加载,可能会造成多次重复加载相同驱动。
            //因此加载驱动的动作,挪到connectionManager管理类中去实现了。
            /*try {
                Class.forName(connpool.propertyBean.getDriverName());
                log.info("加载JDBC驱动"+connpool.propertyBean.getDriverName()+"成功");
            } catch (ClassNotFoundException e) {
                log.info("未找到JDBC驱动" + connpool.propertyBean.getDriverName() + ",请引入相关包");
                return null;
            }*/
    
            //基本点2、始使化时根据配置中的初始连接数创建指定数量的连接
            for (int i = 0; i < connpool.propertyBean.getInitConnections(); i++) {
                try {
                    Connection conn = connpool.NewConnection();
                    connpool.freeConnections.put(conn.hashCode(), conn);
                } catch (SQLException | ClassNotFoundException e) {
                    log.error(connpool.propertyBean.getNodeName()+"节点连接池初始化失败");
                    return null;
                }
            }
    
            connpool.isActive = true;
            return connpool;
        }
    
    
    
        /**
         * 检测连接是否有效
         * @param 数据库连接对象
         * @return Boolean
         */
        private Boolean isValidConnection(Connection conn) throws SQLException{
            try {
                if(conn==null || conn.isClosed()){
                    return false;
                }
            } catch (SQLException e) {
                throw new SQLException(e);
            }
            return true;
        }
    
        /**
         * 创建一个新的连接
         * @return 数据库连接对象
         * @throws ClassNotFoundException
         * @throws SQLException
         */
        private Connection NewConnection() throws ClassNotFoundException,
                SQLException {
    
            Connection conn = null;
            try {
                if (this.propertyBean != null) {
                    //Class.forName(this.propertyBean.getDriverName());
                    conn = DriverManager.getConnection(this.propertyBean.getUrl(),
                            this.propertyBean.getUsername(),
                            this.propertyBean.getPassword());
                }
            } catch (SQLException e) {
                throw new SQLException(e);
            }
    
            if (conn != null) {
                ConProxyHandler handler = new ConProxyHandler(this, conn);
                Connection conn_return = (Connection) Proxy.newProxyInstance(
                        ConnectionPool.class.getClassLoader(),
                        new Class[] { Connection.class }, handler);
                //System.out.println("生成代理类:" + conn_return);
                return conn_return;
            }
    
            return conn;
        }
    
    
        @Override
        public synchronized Connection getConnection() {
            Connection conn = null;
            if (this.getActiveNum() < this.propertyBean.getMaxConnections()) {
                // 分支1:当前使用的连接没有达到最大连接数  
                // 基本点3、在连接池没有达到最大连接数之前,如果有可用的空闲连接就直接使用空闲连接,如果没有,就创建新的连接。
                if (this.getFreeNum() > 0) {
                    // 分支1.1:如果空闲池中有连接,就从空闲池中直接获取
                    int key = (int) this.freeConnections.keySet().toArray()[0];
                    log.info("分支1.1:如果空闲池中有连接,"+Thread.currentThread().getName()+"就从空闲池中直接获取"+key);
                    //conn = this.freeConnections.pollFirst();
    
                    conn = this.freeConnections.get(key);
                    this.freeConnections.remove(key);
    
    
                    //连接闲置久了也会超时,因此空闲池中的有效连接会越来越少,需要另一个进程进行扫描监测,不断保持一定数量的可用连接。
                    //在下面定义了checkFreepools的TimerTask类,在checkPool()方法中进行调用。
    
                    //基本点5、由于数据库连接闲置久了会超时关闭,因此需要连接池采用机制保证每次请求的连接都是有效可用的。
                    try {
                        if(this.isValidConnection(conn)){
                            this.activeConnections.put(conn.hashCode(), conn);
                            currentConnection.set(conn);
                        }else{
                            conn = getConnection();//同步方法是可重入锁
                        }
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
    
                    System.out.println(this.propertyBean.getNodeName() +"空闲连接数:"+getFreeNum());  
                    System.out.println(this.propertyBean.getNodeName() +"活动连接数:"+getActiveNum());
                } else {
    
                    try {
                        conn = this.NewConnection();
                        // 分支1.2:如果空闲池中无可用连接,就创建新的连接
                        log.info("分支1.2:如果空闲池中无可用连接,"+Thread.currentThread().getName()+"就创建新的连接"+conn.hashCode());
                        this.activeConnections.put(conn.hashCode(), conn);
                    } catch (ClassNotFoundException | SQLException e) {
                        e.printStackTrace();
                    }
                    System.out.println(this.propertyBean.getNodeName() +"空闲连接数:"+getFreeNum());  
                    System.out.println(this.propertyBean.getNodeName() +"活动连接数:"+getActiveNum());
                }
            } else {
                // 分支2:当前已到达最大连接数  
                // 基本点4、当连接池中的活动连接数达到最大连接数,新的请求进入等待状态,直到有连接被释放。
                log.info("分支2:"+Thread.currentThread().getName()+":当前已到达最大连接数 ");
                long startTime = System.currentTimeMillis();
    
                //进入等待状态。等待被notify(),notifyALL()唤醒或者超时自动苏醒  
                try{
                    this.wait(this.propertyBean.getConninterval());  
                }catch(InterruptedException e) {  
                    log.error("线程等待被打断");  
                }
    
                //若线程超时前被唤醒并成功获取连接,就不会走到return null。
                //若线程超时前没有获取连接,则返回null。
                //如果timeout设置为0,就无限重连。
                if(this.propertyBean.getTimeout()!=0){
                    if(System.currentTimeMillis() - startTime > this.propertyBean.getTimeout())  
                        return null;  
                }
                conn = this.getConnection();
    
            }
            return conn;
        }
    
    
        @Override
        public Connection getCurrentConnecton() {
            Connection conn=currentConnection.get();
            try {
                if(! isValidConnection(conn)){
                    conn=this.getConnection();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return conn;
        }
    
    
        @Override
        public synchronized void releaseConn(Connection conn) throws SQLException {
    
            log.info(Thread.currentThread().getName()+"关闭连接:activeConnections.remove:"+conn.hashCode());
            this.activeConnections.remove(conn.hashCode());
            this.currentConnection.remove();
            //活动连接池删除的连接,相应的加到空闲连接池中
            try {
                if(isValidConnection(conn)){
                    freeConnections.put(conn.hashCode(), conn);
                }else{
                    Connection newconn = this.NewConnection();
                    freeConnections.put(newconn.hashCode(), newconn);
                }
    
            } catch (ClassNotFoundException | SQLException e) {
                e.printStackTrace();
            }
            System.out.println(this.propertyBean.getNodeName() +"空闲连接数:"+getFreeNum());  
            System.out.println(this.propertyBean.getNodeName() +"活动连接数:"+getActiveNum());
            //唤醒getConnection()中等待的线程
            this.notifyAll();
        }
    
        @Override
        public synchronized void destroy() {
            for(int key:this.freeConnections.keySet()){
                Connection conn = this.freeConnections.get(key);
                try {
                    if (this.isValidConnection(conn)) { 
                        conn.close();
                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            for(int key:this.activeConnections.keySet()){
                Connection conn = this.activeConnections.get(key);
                try {
                    if (this.isValidConnection(conn)) { 
                        conn.close();
                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            this.isActive = false;
            this.freeConnections.clear();
            this.activeConnections.clear();
        }
    
        @Override
        public boolean isActive() {
            return this.isActive;
        }
    
    
        @Override
        public void checkPool() {
    
            /*final String nodename=this.propertyBean.getNodeName();
    
            ScheduledExecutorService ses=Executors.newScheduledThreadPool(2);
    
            //功能一:开启一个定时器线程输出状态
            ses.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    System.out.println(nodename +"空闲连接数:"+getFreeNum());  
                    System.out.println(nodename +"活动连接数:"+getActiveNum());   
    
                }
            }, 1, 1, TimeUnit.SECONDS);
    
            //功能二:开启一个定时器线程,监测并维持空闲池中的最小连接数
            ses.scheduleAtFixedRate(new checkFreepools(this), 1, 5, TimeUnit.SECONDS);*/
        }
    
        @Override
        public synchronized int getActiveNum() {
            return this.activeConnections.size();
        }
    
        @Override
        public synchronized int getFreeNum() {
            return this.freeConnections.size();
        }
    
        //基本点6、连接池内部要保证指定最小连接数量的空闲连接
        class checkFreepools extends TimerTask {
            private ConnectionPool conpool = null;
    
            public checkFreepools(ConnectionPool cp) {
                this.conpool = cp;
            }
    
            @Override
            public void run() {
                if (this.conpool != null && this.conpool.isActive()) {
                    int poolstotalnum = conpool.getFreeNum()
                            + conpool.getActiveNum();
                    int subnum = conpool.propertyBean.getMinConnections()
                            - poolstotalnum;
    
                    if (subnum > 0) {
                        System.out.println(conpool.propertyBean.getNodeName()
                                + "扫描并维持空闲池中的最小连接数,需补充" + subnum + "个连接");
                        for (int i = 0; i < subnum; i++) {
                            try {
                                Connection newconn = conpool.NewConnection();
                                conpool.freeConnections.put(newconn.hashCode(), newconn);
                            } catch (ClassNotFoundException | SQLException e) {
                                e.printStackTrace();
                            }
                        }
    
                    }
                }
    
            }
    
        }
    
        class ConProxyHandler implements InvocationHandler{
            private Connection conn;
            private ConnectionPool cp;
    
            public ConProxyHandler(ConnectionPool cp, Connection conn) {
                this.cp = cp;
                this.conn = conn;
            }
    
            @Override
    
            public Object invoke(Object proxy, Method method, Object[] args)
                    throws Throwable {
                if(method.getName().equals("close")){
                    cp.releaseConn(this.conn);
                    return this.conn;
                }else{
                            return true;
                        }
                    }else {
                        return true;
                    }
                }else{
                    //如果不是调用close方法,就原样处理
                    return method.invoke(this.conn, args);
                }
            }
    
        }
    
    }

    再次运行。。。。又出西西了。发现如果某个线程获取到的是另一个线程使用过的connection,就无法调用close方法正常关闭。

    可以把下面这几个参数值改动一下,很快就见效:

    default.maxconnections=2
    default.minconnections=1
    default.initconnections=1

    运行程序后,可以看到最后获取到连接的线程,并没能成功调用con.close()。

    至此,数据库连接池是成功实现了。
    但是最后优化过程中使用代理Connection对象的方式并没有成功。希望大家帮我一起分析一下是什么原因导致的,可以留言,也可站内信给我。主要为了弄明白到底是怎么回事,不甚感激!

    展开全文
  • 万字图解Java多线程

    万次阅读 多人点赞 2020-09-06 14:45:07
    java多线程我个人觉得是javaSe中最难的一部分,我以前也是感觉学会了,但是真正有多线程的需求却不知道怎么下手,实际上还是对多线程这块知识了解不深刻,不知道多线程api的应用场景,不知道多线程的运行流程等等,...

    前言

    授权Java面试者精选独家原创发布

    java多线程我个人觉得是javaSe中最难的一部分,我以前也是感觉学会了,但是真正有多线程的需求却不知道怎么下手,实际上还是对多线程这块知识了解不深刻,不知道多线程api的应用场景,不知道多线程的运行流程等等,本篇文章将使用实例+图解+源码的方式来解析java多线程。

    文章篇幅较长,大家也可以有选择的看具体章节,建议多线程的代码全部手敲,永远不要相信你看到的结论,自己编码后运行出来的,才是自己的。

    什么是java多线程?

    进程与线程

    进程

    • 当一个程序被运行,就开启了一个进程, 比如启动了qq,word
    • 程序由指令和数据组成,指令要运行,数据要加载,指令被cpu加载运行,数据被加载到内存,指令运行时可由cpu调度硬盘、网络等设备

    线程

    • 一个进程内可分为多个线程
    • 一个线程就是一个指令流,cpu调度的最小单位,由cpu一条一条执行指令

    并行与并发

    并发:单核cpu运行多线程时,时间片进行很快的切换。线程轮流执行cpu

    并行:多核cpu运行 多线程时,真正的在同一时刻运行

    java提供了丰富的api来支持多线程。

    为什么用多线程?

    多线程能实现的都可以用单线程来完成,那单线程运行的好好的,为什么java要引入多线程的概念呢?

    多线程的好处:

    1. 程序运行的更快!快!快!

    2. 充分利用cpu资源,目前几乎没有线上的cpu是单核的,发挥多核cpu强大的能力

    多线程难在哪里?

    单线程只有一条执行线,过程容易理解,可以在大脑中清晰的勾勒出代码的执行流程

    多线程却是多条线,而且一般多条线之间有交互,多条线之间需要通信,一般难点有以下几点

    1. 多线程的执行结果不确定,受到cpu调度的影响
    2. 多线程的安全问题
    3. 线程资源宝贵,依赖线程池操作线程,线程池的参数设置问题
    4. 多线程执行是动态的,同时的,难以追踪过程
    5. 多线程的底层是操作系统层面的,源码难度大

    有时候希望自己变成一个字节穿梭于服务器中,搞清楚来龙去脉,就像无敌破坏王一样(没看过这部电影的可以看下,脑洞大开)。

    java多线程的基本使用

    定义任务、创建和运行线程

    任务: 线程的执行体。也就是我们的核心代码逻辑

    定义任务

    1. 继承Thread类 (可以说是 将任务和线程合并在一起)
    2. 实现Runnable接口 (可以说是 将任务和线程分开了)
    3. 实现Callable接口 (利用FutureTask执行任务)

    Thread实现任务的局限性

    1. 任务逻辑写在Thread类的run方法中,有单继承的局限性
    2. 创建多线程时,每个任务有成员变量时不共享,必须加static才能做到共享

    Runnable和Callable解决了Thread的局限性

    但是Runbale相比Callable有以下的局限性

    1. 任务没有返回值
    2. 任务无法抛异常给调用方

    如下代码 几种定义线程的方式

    @Slf4j
    class T extends Thread {
        @Override
        public void run() {
            log.info("我是继承Thread的任务");
        }
    }
    @Slf4j
    class R implements Runnable {
    
        @Override
        public void run() {
            log.info("我是实现Runnable的任务");
        }
    }
    @Slf4j
    class C implements Callable<String> {
    
        @Override
        public String call() throws Exception {
            log.info("我是实现Callable的任务");
            return "success";
        }
    }
    

    创建线程的方式

    1. 通过Thread类直接创建线程
    2. 利用线程池内部创建线程

    启动线程的方式

    • 调用线程的start()方法
    // 启动继承Thread类的任务
    new T().start();
    
    // 启动继承Thread匿名内部类的任务 可用lambda优化
    Thread t = new Thread(){
      @Override
      public void run() {
        log.info("我是Thread匿名内部类的任务");
      }
    };
    
    //  启动实现Runnable接口的任务
    new Thread(new R()).start();
    
    //  启动实现Runnable匿名实现类的任务
    new Thread(new Runnable() {
        @Override
        public void run() {
            log.info("我是Runnable匿名内部类的任务");
        }
    }).start();
    
    //  启动实现Runnable的lambda简化后的任务
    new Thread(() -> log.info("我是Runnable的lambda简化后的任务")).start();
    
    // 启动实现了Callable接口的任务 结合FutureTask 可以获取线程执行的结果
    FutureTask<String> target = new FutureTask<>(new C());
    new Thread(target).start();
    log.info(target.get());
    
    

    以上各个线程相关的类的类图如下

    上下文切换

    多核cpu下,多线程是并行工作的,如果线程数多,单个核又会并发的调度线程,运行时会有上下文切换的概念

    cpu执行线程的任务时,会为线程分配时间片,以下几种情况会发生上下文切换。

    1. 线程的cpu时间片用完
    2. 垃圾回收
    3. 线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法

    当发生上下文切换时,操作系统会保存当前线程的状态,并恢复另一个线程的状态,jvm中有块内存地址叫程序计数器,用于记录线程执行到哪一行代码,是线程私有的。

    idea打断点的时候可以设置为Thread模式,idea的debug模式可以看出栈帧的变化

    线程的礼让-yield()&线程的优先级

    yield()方法会让运行中的线程切换到就绪状态,重新争抢cpu的时间片,争抢时是否获取到时间片看cpu的分配。

    代码如下

    // 方法的定义
    public static native void yield();
    
    Runnable r1 = () -> {
        int count = 0;
        for (;;){
           log.info("---- 1>" + count++);
        }
    };
    Runnable r2 = () -> {
        int count = 0;
        for (;;){
            Thread.yield();
            log.info("            ---- 2>" + count++);
        }
    };
    Thread t1 = new Thread(r1,"t1");
    Thread t2 = new Thread(r2,"t2");
    t1.start();
    t2.start();
    
    // 运行结果
    11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129504
    11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129505
    11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129506
    11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129507
    11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129508
    11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129509
    11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129510
    11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129511
    11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129512
    11:49:15.798 [t2] INFO thread.TestYield -             ---- 2>293
    11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129513
    11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129514
    11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129515
    11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129516
    11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129517
    11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129518
    

    如上述结果所示,t2线程每次执行时进行了yield(),线程1执行的机会明显比线程2要多。

    线程的优先级

    ​ 线程内部用1~10的数来调整线程的优先级,默认的线程优先级为NORM_PRIORITY:5

    ​ cpu比较忙时,优先级高的线程获取更多的时间片

    ​ cpu比较闲时,优先级设置基本没用

     public final static int MIN_PRIORITY = 1;
    
     public final static int NORM_PRIORITY = 5;
    
     public final static int MAX_PRIORITY = 10;
     
     // 方法的定义
     public final void setPriority(int newPriority) {
     }
    

    cpu比较忙时

    Runnable r1 = () -> {
        int count = 0;
        for (;;){
           log.info("---- 1>" + count++);
        }
    };
    Runnable r2 = () -> {
        int count = 0;
        for (;;){
            log.info("            ---- 2>" + count++);
        }
    };
    Thread t1 = new Thread(r1,"t1");
    Thread t2 = new Thread(r2,"t2");
    t1.setPriority(Thread.NORM_PRIORITY);
    t2.setPriority(Thread.MAX_PRIORITY);
    t1.start();
    t2.start();
    
    // 可能的运行结果
    11:59:00.696 [t1] INFO thread.TestYieldPriority - ---- 1>44102
    11:59:00.696 [t2] INFO thread.TestYieldPriority -             ---- 2>135903
    11:59:00.696 [t2] INFO thread.TestYieldPriority -             ---- 2>135904
    11:59:00.696 [t2] INFO thread.TestYieldPriority -             ---- 2>135905
    11:59:00.696 [t2] INFO thread.TestYieldPriority -             ---- 2>135906
    

    cpu比较闲时

    Runnable r1 = () -> {
        int count = 0;
        for (int i = 0; i < 10; i++) {
            log.info("---- 1>" + count++);
        }
    };
    Runnable r2 = () -> {
        int count = 0;
        for (int i = 0; i < 10; i++) {
            log.info("            ---- 2>" + count++);
    
        }
    };
    Thread t1 = new Thread(r1,"t1");
    Thread t2 = new Thread(r2,"t2");
    t1.setPriority(Thread.MIN_PRIORITY);
    t2.setPriority(Thread.MAX_PRIORITY);
    t1.start();
    t2.start();
    
    // 可能的运行结果 线程1优先级低 却先运行完
    12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>7
    12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>8
    12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>9
    12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>2
    12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>3
    12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>4
    12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>5
    12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>6
    12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>7
    12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>8
    12:01:09.916 [t2] INFO thread.TestYieldPriority -             ---- 2>9
    
    

    守护线程

    默认情况下,java进程需要等待所有线程都运行结束,才会结束,有一种特殊线程叫守护线程,当所有的非守护线程都结束后,即使它没有执行完,也会强制结束。

    默认的线程都是非守护线程。

    垃圾回收线程就是典型的守护线程

    // 方法的定义
    public final void setDaemon(boolean on) {
    }
    
    Thread thread = new Thread(() -> {
        while (true) {
        }
    });
    // 具体的api。设为true表示未守护线程,当主线程结束后,守护线程也结束。
    // 默认是false,当主线程结束后,thread继续运行,程序不停止
    thread.setDaemon(true);
    thread.start();
    log.info("结束");
    

    线程的阻塞

    线程的阻塞可以分为好多种,从操作系统层面和java层面阻塞的定义可能不同,但是广义上使得线程阻塞的方式有下面几种

    1. BIO阻塞,即使用了阻塞式的io流
    2. sleep(long time) 让线程休眠进入阻塞状态
    3. a.join() 调用该方法的线程进入阻塞,等待a线程执行完恢复运行
    4. sychronized或ReentrantLock 造成线程未获得锁进入阻塞状态 (同步锁章节细说)
    5. 获得锁之后调用wait()方法 也会让线程进入阻塞状态 (同步锁章节细说)
    6. LockSupport.park() 让线程进入阻塞状态 (同步锁章节细说)

    sleep()

    ​ 使线程休眠,会将运行中的线程进入阻塞状态。当休眠时间结束后,重新争抢cpu的时间片继续运行

    // 方法的定义 native方法
    public static native void sleep(long millis) throws InterruptedException; 
    
    try {
       // 休眠2秒
       // 该方法会抛出 InterruptedException异常 即休眠过程中可被中断,被中断后抛出异常
       Thread.sleep(2000);
     } catch (InterruptedException异常 e) {
     }
     try {
       // 使用TimeUnit的api可替代 Thread.sleep 
       TimeUnit.SECONDS.sleep(1);
     } catch (InterruptedException e) {
     }
    

    join()

    ​ join是指调用该方法的线程进入阻塞状态,等待某线程执行完成后恢复运行

    // 方法的定义 有重载
    // 等待线程执行完才恢复运行
    public final void join() throws InterruptedException {
    }
    // 指定join的时间。指定时间内 线程还未执行完 调用方线程不继续等待就恢复运行
    public final synchronized void join(long millis)
        throws InterruptedException{}
    
    
    Thread t = new Thread(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        r = 10;
    });
    
    t.start();
    // 让主线程阻塞 等待t线程执行完才继续执行 
    // 去除该行,执行结果为0,加上该行 执行结果为10
    t.join();
    log.info("r:{}", r);
    
    // 运行结果
    13:09:13.892 [main] INFO thread.TestJoin - r:10
    

    线程的打断-interrupt()

    // 相关方法的定义
    public void interrupt() {
    }
    public boolean isInterrupted() {
    }
    public static boolean interrupted() {
    }
    

    打断标记:线程是否被打断,true表示被打断了,false表示没有

    isInterrupted() 获取线程的打断标记 ,调用后不会修改线程的打断标记

    interrupt()方法用于中断线程

    1. 可以打断sleep,wait,join等显式的抛出InterruptedException方法的线程,但是打断后,线程的打断标记还是false
    2. 打断正常线程 ,线程不会真正被中断,但是线程的打断标记为true

    interrupted() 获取线程的打断标记,调用后清空打断标记 即如果获取为true 调用后打断标记为false (不常用)

    interrupt实例: 有个后台监控线程不停的监控,当外界打断它时,就结束运行。代码如下

    @Slf4j
    class TwoPhaseTerminal{
        // 监控线程
        private Thread monitor;
    
        public void start(){
            monitor = new Thread(() ->{
               // 不停的监控
                while (true){
                    Thread thread = Thread.currentThread();
                 	// 判断当前线程是否被打断
                    if (thread.isInterrupted()){
                        log.info("当前线程被打断,结束运行");
                        break;
                    }
                    try {
                        Thread.sleep(1000);
                    	// 监控逻辑中被打断后,打断标记为true
                        log.info("监控");
                    } catch (InterruptedException e) {
                        // 睡眠时被打断时抛出异常 在该处捕获到 此时打断标记还是false
                        // 在调用一次中断 使得中断标记为true
                        thread.interrupt();
                    }
                }
            });
            monitor.start();
        }
    
        public void stop(){
            monitor.interrupt();
        }
    }
    

    线程的状态

    上面说了一些基本的api的使用,调用上面的方法后都会使得线程有对应的状态。

    线程的状态可从 操作系统层面分为五种状态 从java api层面分为六种状态。

    五种状态

    1. 初始状态:创建线程对象时的状态
    2. 可运行状态(就绪状态):调用start()方法后进入就绪状态,也就是准备好被cpu调度执行
    3. 运行状态:线程获取到cpu的时间片,执行run()方法的逻辑
    4. 阻塞状态: 线程被阻塞,放弃cpu的时间片,等待解除阻塞重新回到就绪状态争抢时间片
    5. 终止状态: 线程执行完成或抛出异常后的状态

    六种状态

    Thread类中的内部枚举State

    public enum State {
    	NEW,
    	RUNNABLE,
    	BLOCKED,
    	WAITING,
    	TIMED_WAITING,
    	TERMINATED;
    }
    
    1. NEW 线程对象被创建
    2. Runnable 线程调用了start()方法后进入该状态,该状态包含了三种情况
      1. 就绪状态 :等待cpu分配时间片
      2. 运行状态:进入Runnable方法执行任务
      3. 阻塞状态:BIO 执行阻塞式io流时的状态
    3. Blocked 没获取到锁时的阻塞状态(同步锁章节会细说)
    4. WAITING 调用wait()、join()等方法后的状态
    5. TIMED_WAITING 调用 sleep(time)、wait(time)、join(time)等方法后的状态
    6. TERMINATED 线程执行完成或抛出异常后的状态

    六种线程状态和方法的对应关系

    线程的相关方法总结

    主要总结Thread类中的核心方法

    方法名称是否static方法说明
    start()让线程启动,进入就绪状态,等待cpu分配时间片
    run()重写Runnable接口的方法,线程获取到cpu时间片时执行的具体逻辑
    yield()线程的礼让,使得获取到cpu时间片的线程进入就绪状态,重新争抢时间片
    sleep(time)线程休眠固定时间,进入阻塞状态,休眠时间完成后重新争抢时间片,休眠可被打断
    join()/join(time)调用线程对象的join方法,调用者线程进入阻塞,等待线程对象执行完或者到达指定时间才恢复,重新争抢时间片
    isInterrupted()获取线程的打断标记,true:被打断,false:没有被打断。调用后不会修改打断标记
    interrupt()打断线程,抛出InterruptedException异常的方法均可被打断,但是打断后不会修改打断标记,正常执行的线程被打断后会修改打断标记
    interrupted()获取线程的打断标记。调用后会清空打断标记
    stop()停止线程运行 不推荐
    suspend()挂起线程 不推荐
    resume()恢复线程运行 不推荐
    currentThread()获取当前线程

    Object中与线程相关方法

    方法名称方法说明
    wait()/wait(long timeout)获取到锁的线程进入阻塞状态
    notify()随机唤醒被wait()的一个线程
    notifyAll();唤醒被wait()的所有线程,重新争抢时间片

    同步锁

    线程安全

    • 一个程序运行多个线程本身是没有问题的
    • 问题有可能出现在多个线程访问共享资源
      • 多个线程都是读共享资源也是没有问题的
      • 当多个线程读写共享资源时,如果发生指令交错,就会出现问题

    临界区: 一段代码如果对共享资源的多线程读写操作,这段代码就被称为临界区。

    注意的是 指令交错指的是 java代码在解析成字节码文件时,java代码的一行代码在字节码中可能有多行,在线程上下文切换时就有可能交错。

    线程安全指的是多线程调用同一个对象的临界区的方法时,对象的属性值一定不会发生错误,这就是保证了线程安全。

    如下面不安全的代码

    // 对象的成员变量
    private static int count = 0;
    
    public static void main(String[] args) throws InterruptedException {
      // t1线程对变量+5000次
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                count++;
            }
        });
      // t2线程对变量-5000次
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                count--;
            }
        });
    
        t1.start();
        t2.start();
    
        // 让t1 t2都执行完
        t1.join();
        t2.join();
        System.out.println(count);
    }
    
    // 运行结果 
    -1399
    

    上面的代码 两个线程,一个+5000次,一个-5000次,如果线程安全,count的值应该还是0。

    但是运行很多次,每次的结果不同,且都不是0,所以是线程不安全的。

    线程安全的类一定所有的操作都线程安全吗?

    开发中经常会说到一些线程安全的类,如ConcurrentHashMap,线程安全指的是类里每一个独立的方法是线程安全的,但是方法的组合就不一定是线程安全的

    成员变量和静态变量是否线程安全?

    • 如果没有多线程共享,则线程安全
    • 如果存在多线程共享
      • 多线程只有读操作,则线程安全
      • 多线程存在写操作,写操作的代码又是临界区,则线程不安全

    局部变量是否线程安全?

    • 局部变量是线程安全的
    • 局部变量引用的对象未必是线程安全的
      • 如果该对象没有逃离该方法的作用范围,则线程安全
      • 如果该对象逃离了该方法的作用范围,比如:方法的返回值,需要考虑线程安全

    synchronized

    同步锁也叫对象锁,是锁在对象上的,不同的对象就是不同的锁。

    该关键字是用于保证线程安全的,是阻塞式的解决方案。

    让同一个时刻最多只有一个线程能持有对象锁,其他线程在想获取这个对象锁就会被阻塞,不用担心上下文切换的问题。

    注意: 不要理解为一个线程加了锁 ,进入 synchronized代码块中就会一直执行下去。如果时间片切换了,也会执行其他线程,再切换回来会紧接着执行,只是不会执行到有竞争锁的资源,因为当前线程还未释放锁。

    当一个线程执行完synchronized的代码块后 会唤醒正在等待的线程

    synchronized实际上使用对象锁保证临界区的原子性 临界区的代码是不可分割的 不会因为线程切换所打断

    基本使用

    // 加在方法上 实际是对this对象加锁
    private synchronized void a() {
    }
    
    // 同步代码块,锁对象可以是任意的,加在this上 和a()方法作用相同
    private void b(){
        synchronized (this){
    
        }
    }
    
    // 加在静态方法上 实际是对类对象加锁
    private synchronized static void c() {
    
    }
    
    // 同步代码块 实际是对类对象加锁 和c()方法作用相同
    private void d(){
        synchronized (TestSynchronized.class){
            
        }
    }
    
    // 上述b方法对应的字节码源码 其中monitorenter就是加锁的地方
     0 aload_0
     1 dup
     2 astore_1
     3 monitorenter
     4 aload_1
     5 monitorexit
     6 goto 14 (+8)
     9 astore_2
    10 aload_1
    11 monitorexit
    12 aload_2
    13 athrow
    14 return
    

    线程安全的代码

    private static int count = 0;
    
    private static Object lock = new Object();
    
    private static Object lock2 = new Object();
    
     // t1线程和t2对象都是对同一对象加锁。保证了线程安全。此段代码无论执行多少次,结果都是0
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                synchronized (lock) {
                    count++;
                }
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                synchronized (lock) {
                    count--;
                }
            }
        });
     
        t1.start();
        t2.start();
    
        // 让t1 t2都执行完
        t1.join();
        t2.join();
        System.out.println(count);
    }
    

    重点:加锁是加在对象上,一定要保证是同一对象,加锁才能生效

    线程通信

    wait+notify

    线程间通信可以通过共享变量+wait()&notify()来实现

    wait()将线程进入阻塞状态,notify()将线程唤醒

    当多线程竞争访问对象的同步方法时,锁对象会关联一个底层的Monitor对象(重量级锁的实现)

    如下图所示 Thread0,1先竞争到锁执行了代码后,2,3,4,5线程同时来执行临界区的代码,开始竞争锁

    1. Thread-0先获取到对象的锁,关联到monitor的owner,同步代码块内调用了锁对象的wait()方法,调用后会进入waitSet等待,Thread-1同样如此,此时Thread-0的状态为Waitting
    2. Thread2、3、4、5同时竞争,2获取到锁后,关联了monitor的owner,3、4、5只能进入EntryList中等待,此时2线程状态为 Runnable,3、4、5状态为Blocked
    3. 2执行后,唤醒entryList中的线程,3、4、5进行竞争锁,获取到的线程即会关联monitor的owner
    4. 3、4、5线程在执行过程中,调用了锁对象的notify()或notifyAll()时,会唤醒waitSet的线程,唤醒的线程进入entryList等待重新竞争锁

    注意:

    1. Blocked状态和Waitting状态都是阻塞状态

    2. Blocked线程会在owner线程释放锁时唤醒

    3. wait和notify使用场景是必须要有同步,且必须获得对象的锁才能调用,使用锁对象去调用,否则会抛异常

    • wait() 释放锁 进入 waitSet 可传入时间,如果指定时间内未被唤醒 则自动唤醒
    • notify()随机唤醒一个waitSet里的线程
    • notifyAll()唤醒waitSet中所有的线程
    static final Object lock = new Object();
    new Thread(() -> {
        synchronized (lock) {
            log.info("开始执行");
            try {
              	// 同步代码内部才能调用
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("继续执行核心逻辑");
        }
    }, "t1").start();
    
    new Thread(() -> {
        synchronized (lock) {
            log.info("开始执行");
            try {
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("继续执行核心逻辑");
        }
    }, "t2").start();
    
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.info("开始唤醒");
    
    synchronized (lock) {
      // 同步代码内部才能调用
        lock.notifyAll();
    }
    // 执行结果
    14:29:47.138 [t1] INFO TestWaitNotify - 开始执行
    14:29:47.141 [t2] INFO TestWaitNotify - 开始执行
    14:29:49.136 [main] INFO TestWaitNotify - 开始唤醒
    14:29:49.136 [t2] INFO TestWaitNotify - 继续执行核心逻辑
    14:29:49.136 [t1] INFO TestWaitNotify - 继续执行核心逻辑
    

    wait 和 sleep的区别?

    二者都会让线程进入阻塞状态,有以下区别

    1. wait是Object的方法 sleep是Thread的方法
    2. wait会立即释放锁 sleep不会释放锁
    3. wait后线程的状态是Watting sleep后线程的状态为 Time_Waiting

    park&unpark

    LockSupport是juc下的工具类,提供了park和unpark方法,可以实现线程通信

    与wait和notity相比的不同点

    1. wait 和notify需要获取对象锁 park unpark不要
    2. unpark 可以指定唤醒线程 notify随机唤醒
    3. park和unpark的顺序可以先unpark wait和notify的顺序不能颠倒

    生产者消费者模型

    指的是有生产者来生产数据,消费者来消费数据,生产者生产满了就不生产了,通知消费者取,等消费了再进行生产。
    

    消费者消费不到了就不消费了,通知生产者生产,生产到了再继续消费。

      public static void main(String[] args) throws InterruptedException {
            MessageQueue queue = new MessageQueue(2);
    		
    		// 三个生产者向队列里存值
            for (int i = 0; i < 3; i++) {
                int id = i;
                new Thread(() -> {
                    queue.put(new Message(id, "值" + id));
                }, "生产者" + i).start();
            }
    
            Thread.sleep(1000);
    
    		// 一个消费者不停的从队列里取值
            new Thread(() -> {
                while (true) {
                    queue.take();
                }
            }, "消费者").start();
    
        }
    }
    
    
    // 消息队列被生产者和消费者持有
    class MessageQueue {
        private LinkedList<Message> list = new LinkedList<>();
    
        // 容量
        private int capacity;
    
        public MessageQueue(int capacity) {
            this.capacity = capacity;
        }
    
        /**
         * 生产
         */
        public void put(Message message) {
            synchronized (list) {
                while (list.size() == capacity) {
                    log.info("队列已满,生产者等待");
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.addLast(message);
                log.info("生产消息:{}", message);
                // 生产后通知消费者
                list.notifyAll();
            }
        }
    
        public Message take() {
            synchronized (list) {
                while (list.isEmpty()) {
                    log.info("队列已空,消费者等待");
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Message message = list.removeFirst();
                log.info("消费消息:{}", message);
                // 消费后通知生产者
                list.notifyAll();
                return message;
            }
        }
    
    
    }
     // 消息
    class Message {
    
        private int id;
    
        private Object value;
    }
    

    同步锁案例

    为了更形象的表达加同步锁的概念,这里举一个生活中的例子,尽量把以上的概念具体化出来。

    这里举一个每个人非常感兴趣的一件东西。 钱!!!(马老师除外)。

    现实中,我们去银行门口的自动取款机取钱,取款机的钱就是共享变量,为了保障安全,不可能两个陌生人同时进入同一个取款机内取钱,所以只能一个人进入取钱,然后锁上取款机的门,其他人只能在取款机门口等待。

    取款机有多个,里面的钱互不影响,锁也有多个(多个对象锁),取钱人在多个取款机里同时取钱也没有安全问题。

    假如每个取钱的陌生人都是线程,当取钱人进入取款机锁了门后(线程获得锁),取到钱后出门(线程释放锁),下一个人竞争到锁来取钱。

    假设工作人员也是一个线程,如果取钱人进入后发现取款机钱不足了,这时通知工作人员来向取款机里加钱(调用notifyAll方法),取钱人暂停取钱,进入银行大堂阻塞等待(调用wait方法)。

    银行大堂里的工作人员和取钱人都被唤醒,重新竞争锁,进入后如果是取钱人,由于取款机没钱,还得进入银行大堂等待。

    当工作人员获得取款机的锁进入后,加了钱后会通知大厅里的人来取钱(调用notifyAll方法)。自己暂停加钱,进入银行大堂等待唤醒加钱(调用wait方法)。

    这时大堂里等待的人都来竞争锁,谁获取到谁进入继续取钱。

    和现实中不同的就是这里没有排队的概念,谁抢到锁谁进去取。

    ReentrantLock

    可重入锁 : 一个线程获取到对象的锁后,执行方法内部在需要获取锁的时候是可以获取到的。如以下代码

    private static final ReentrantLock LOCK = new ReentrantLock();
    
    private static void m() {
        LOCK.lock();
        try {
            log.info("begin");
          	// 调用m1()
            m1();
        } finally {
            // 注意锁的释放
            LOCK.unlock();
        }
    }
    public static void m1() {
        LOCK.lock();
        try {
            log.info("m1");
            m2();
        } finally {
            // 注意锁的释放
            LOCK.unlock();
        }
    }
    

    synchronized 也是可重入锁,ReentrantLock有以下优点

    1. 支持获取锁的超时时间
    2. 获取锁时可被打断
    3. 可设为公平锁
    4. 可以有不同的条件变量,即有多个waitSet,可以指定唤醒

    api

    // 默认非公平锁,参数传true 表示未公平锁
    ReentrantLock lock = new ReentrantLock(false);
    // 尝试获取锁
    lock()
    // 释放锁 应放在finally块中 必须执行到
    unlock()
    try {
        // 获取锁时可被打断,阻塞中的线程可被打断
        LOCK.lockInterruptibly();
    } catch (InterruptedException e) {
        return;
    }
    // 尝试获取锁 获取不到就返回false
    LOCK.tryLock()
    // 支持超时时间 一段时间没获取到就返回false
    tryLock(long timeout, TimeUnit unit)
    // 指定条件变量 休息室 一个锁可以创建多个休息室
    Condition waitSet = ROOM.newCondition();
    // 释放锁  进入waitSet等待 释放后其他线程可以抢锁
    yanWaitSet.await()
    // 唤醒具体休息室的线程 唤醒后 重写竞争锁
    yanWaitSet.signal()
    
    

    实例:一个线程输出a,一个线程输出b,一个线程输出c,abc按照顺序输出,连续输出5次

    这个考的就是线程的通信,利用 wait()/notify()和控制变量可以实现,此处使用ReentrantLock即可实现该功能。

      public static void main(String[] args) {
            AwaitSignal awaitSignal = new AwaitSignal(5);
            // 构建三个条件变量
            Condition a = awaitSignal.newCondition();
            Condition b = awaitSignal.newCondition();
            Condition c = awaitSignal.newCondition();
            // 开启三个线程
            new Thread(() -> {
                awaitSignal.print("a", a, b);
            }).start();
    
            new Thread(() -> {
                awaitSignal.print("b", b, c);
            }).start();
    
            new Thread(() -> {
                awaitSignal.print("c", c, a);
            }).start();
    
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            awaitSignal.lock();
            try {
                // 先唤醒a
                a.signal();
            } finally {
                awaitSignal.unlock();
            }
        }
    
    
    }
    
    class AwaitSignal extends ReentrantLock {
    
        // 循环次数
        private int loopNumber;
    
        public AwaitSignal(int loopNumber) {
            this.loopNumber = loopNumber;
        }
    
        /**
         * @param print   输出的字符
         * @param current 当前条件变量
         * @param next    下一个条件变量
         */
        public void print(String print, Condition current, Condition next) {
    
            for (int i = 0; i < loopNumber; i++) {
                lock();
                try {
                    try {
                        // 获取锁之后等待
                        current.await();
                        System.out.print(print);
                    } catch (InterruptedException e) {
                    }
                    next.signal();
                } finally {
                    unlock();
                }
            }
        }
    

    死锁

    说到死锁,先举个例子,

    下面是代码实现

    static Beer beer = new Beer();
    static Story story = new Story();
    
    public static void main(String[] args) {
        new Thread(() ->{
            synchronized (beer){
                log.info("我有酒,给我故事");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (story){
                    log.info("小王开始喝酒讲故事");
                }
            }
        },"小王").start();
    
        new Thread(() ->{
            synchronized (story){
                log.info("我有故事,给我酒");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (beer){
                    log.info("老王开始喝酒讲故事");
                }
            }
        },"老王").start();
    }
    class Beer {
    }
    
    class Story{
    }
    

    死锁导致程序无法正常运行下去

    检测工具可以检查到死锁信息

    java内存模型(JMM)

    jmm 体现在以下三个方面

    1. 原子性 保证指令不会受到上下文切换的影响
    2. 可见性 保证指令不会受到cpu缓存的影响
    3. 有序性 保证指令不会受并行优化的影响

    可见性

    停不下来的程序

    static boolean run = true;
    
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            while (run) {
                // ....
            }
        });
        t.start();
        Thread.sleep(1000);
       // 线程t不会如预想的停下来
        run = false; 
    }
    

    如上图所示,线程有自己的工作缓存,当主线程修改了变量并同步到主内存时,t线程没有读取到,所以程序停不下来

    有序性

    JVM在不影响程序正确性的情况下可能会调整语句的执行顺序,该情况也称为 指令重排序

      static int i;
      static int j;
    // 在某个线程内执行如下赋值操作
            i = ...;
            j = ...;
      有可能将j先赋值
    

    原子性

    原子性大家应该比较熟悉,上述同步锁的synchronized代码块就是保证了原子性,就是一段代码是一个整体,原子性保证了线程安全,不会受到上下文切换的影响。

    volatile

    该关键字解决了可见性和有序性,volatile通过内存屏障来实现的

    • 写屏障

    会在对象写操作之后加写屏障,会对写屏障的之前的数据都同步到主存,并且保证写屏障的执行顺序在写屏障之前

    • 读屏障

    会在对象读操作之前加读屏障,会在读屏障之后的语句都从主存读,并保证读屏障之后的代码执行在读屏障之后

    注意: volatile不能解决原子性,即不能通过该关键字实现线程安全。

    volatile应用场景:一个线程读取变量,另外的线程操作变量,加了该关键字后保证写变量后,读变量的线程可以及时感知。

    无锁-cas

    cas (compare and swap) 比较并交换

    为变量赋值时,从内存中读取到的值v,获取到要交换的新值n,执行 compareAndSwap()方法时,比较v和当前内存中的值是否一致,如果一致则将n和v交换,如果不一致,则自旋重试。

    cas底层是cpu层面的,即不使用同步锁也可以保证操作的原子性。

    private AtomicInteger balance;
    
    // 模拟cas的具体操作
    @Override
    public void withdraw(Integer amount) {
        while (true) {
            // 获取当前值
            int pre = balance.get();
            // 进行操作后得到新值
            int next = pre - amount;
            // 比较并设置成功 则中断 否则自旋重试
            if (balance.compareAndSet(pre, next)) {
                break;
            }
        }
    }
    

    无锁的效率是要高于之前的锁的,由于无锁不会涉及线程的上下文切换

    cas是乐观锁的思想,sychronized是悲观锁的思想

    cas适合很少有线程竞争的场景,如果竞争很强,重试经常发生,反而降低效率

    juc并发包下包含了实现了cas的原子类

    1. AtomicInteger/AtomicBoolean/AtomicLong
    2. AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray
    3. AtomicReference/AtomicStampedReference/AtomicMarkableReference

    AtomicInteger

    常用api

    new AtomicInteger(balance)
    get()
    compareAndSet(pre, next)
    //        i.incrementAndGet() ++i
    //        i.decrementAndGet() --i
    //        i.getAndIncrement() i++
    //        i.getAndDecrement() ++i
     i.addAndGet()
      // 传入函数式接口 修改i
      int getAndUpdate(IntUnaryOperator updateFunction)
      // cas 的核心方法
      compareAndSet(int expect, int update)
    

    ABA问题

    cas存在ABA问题,即比较并交换时,如果原值为A,有其他线程将其修改为B,在有其他线程将其修改为A。

    此时实际发生过交换,但是比较和交换由于值没改变可以交换成功

    解决方式

    AtomicStampedReference/AtomicMarkableReference

    上面两个类解决ABA问题,原理就是为对象增加版本号,每次修改时增加版本号,就可以避免ABA问题

    或者增加个布尔变量标识,修改后调整布尔变量值,也可以避免ABA问题

    线程池

    线程池的介绍

    线程池是java并发最重要的一个知识点,也是难点,是实际应用最广泛的。

    线程的资源很宝贵,不可能无限的创建,必须要有管理线程的工具,线程池就是一种管理线程的工具,java开发中经常有池化的思想,如 数据库连接池、Redis连接池等。

    预先创建好一些线程,任务提交时直接执行,既可以节约创建线程的时间,又可以控制线程的数量。

    线程池的好处

    1. 降低资源消耗,通过池化思想,减少创建线程和销毁线程的消耗,控制资源
    2. 提高响应速度,任务到达时,无需创建线程即可运行
    3. 提供更多更强大的功能,可扩展性高

    线程池的构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
     
    }
    

    构造器参数的意义

    参数名参数意义
    corePoolSize核心线程数
    maximumPoolSize最大线程数
    keepAliveTime救急线程的空闲时间
    unit救急线程的空闲时间单位
    workQueue阻塞队列
    threadFactory创建线程的工厂,主要定义线程名
    handler拒绝策略

    线程池案例

    下面 我们通过一个实例来理解线程池的参数以及线程池的接收任务的过程

    如上图 银行办理业务。

    1. 客户到银行时,开启柜台进行办理,柜台相当于线程,客户相当于任务,有两个是常开的柜台,三个是临时柜台。2就是核心线程数,5是最大线程数。即有两个核心线程
    2. 当柜台开到第二个后,都还在处理业务。客户再来就到排队大厅排队。排队大厅只有三个座位。
    3. 排队大厅坐满时,再来客户就继续开柜台处理,目前最大有三个临时柜台,也就是三个救急线程
    4. 此时再来客户,就无法正常为其 提供业务,采用拒绝策略来处理它们
    5. 当柜台处理完业务,就会从排队大厅取任务,当柜台隔一段空闲时间都取不到任务时,如果当前线程数大于核心线程数时,就会回收线程。即撤销该柜台。

    线程池的状态

    线程池通过一个int变量的高3位来表示线程池的状态,低29位来存储线程池的数量

    状态名称高三位接收新任务处理阻塞队列任务说明
    Running111YY正常接收任务,正常处理任务
    Shutdown000NY不会接收任务,会执行完正在执行的任务,也会处理阻塞队列里的任务
    stop001NN不会接收任务,会中断正在执行的任务,会放弃处理阻塞队列里的任务
    Tidying010NN任务全部执行完毕,当前活动线程是0,即将进入终结
    Termitted011NN终结状态
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    

    线程池的主要流程

    线程池创建、接收任务、执行任务、回收线程的步骤

    1. 创建线程池后,线程池的状态是Running,该状态下才能有下面的步骤
    2. 提交任务时,线程池会创建线程去处理任务
    3. 当线程池的工作线程数达到corePoolSize时,继续提交任务会进入阻塞队列
    4. 当阻塞队列装满时,继续提交任务,会创建救急线程来处理
    5. 当线程池中的工作线程数达到maximumPoolSize时,会执行拒绝策略
    6. 当线程取任务的时间达到keepAliveTime还没有取到任务,工作线程数大于corePoolSize时,会回收该线程

    注意: 不是刚创建的线程是核心线程,后面创建的线程是非核心线程,线程是没有核心非核心的概念的,这是我长期以来的误解。

    拒绝策略

    1. 调用者抛出RejectedExecutionException (默认策略)
    2. 让调用者运行任务
    3. 丢弃此次任务
    4. 丢弃阻塞队列中最早的任务,加入该任务

    提交任务的方法

    // 执行Runnable
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    // 提交Callable
    public <T> Future<T> submit(Callable<T> task) {
      if (task == null) throw new NullPointerException();
       // 内部构建FutureTask
      RunnableFuture<T> ftask = newTaskFor(task);
      execute(ftask);
      return ftask;
    }
    // 提交Runnable,指定返回值
    public Future<?> submit(Runnable task) {
      if (task == null) throw new NullPointerException();
      // 内部构建FutureTask
      RunnableFuture<Void> ftask = newTaskFor(task, null);
      execute(ftask);
      return ftask;
    } 
    //  提交Runnable,指定返回值
    public <T> Future<T> submit(Runnable task, T result) {
      if (task == null) throw new NullPointerException();
       // 内部构建FutureTask
      RunnableFuture<T> ftask = newTaskFor(task, result);
      execute(ftask);
      return ftask;
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
    }
    

    Execetors创建线程池

    注意: 下面几种方式都不推荐使用

    1.newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    • 核心线程数 = 最大线程数 没有救急线程
    • 阻塞队列无界 可能导致oom

    2.newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    • 核心线程数是0,最大线程数无限制 ,救急线程60秒回收
    • 队列采用 SynchronousQueue 实现 没有容量,即放入队列后没有线程来取就放不进去
    • 可能导致线程数过多,cpu负担太大

    3.newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    • 核心线程数和最大线程数都是1,没有救急线程,无界队列 可以不停的接收任务
    • 将任务串行化 一个个执行, 使用包装类是为了屏蔽修改线程池的一些参数 比如 corePoolSize
    • 如果某线程抛出异常了,会重新创建一个线程继续执行
    • 可能造成oom

    4.newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    • 任务调度的线程池 可以指定延迟时间调用,可以指定隔一段时间调用

    线程池的关闭

    shutdown()

    会让线程池状态为shutdown,不能接收任务,但是会将工作线程和阻塞队列里的任务执行完 相当于优雅关闭

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    

    shutdownNow()

    会让线程池状态为stop, 不能接收任务,会立即中断执行中的工作线程,并且不会执行阻塞队列里的任务, 会返回阻塞队列的任务列表

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    

    线程池的正确使用姿势

    线程池难就难在参数的配置,有一套理论配置参数

    cpu密集型 : 指的是程序主要发生cpu的运算

    ​ 核心线程数: CPU核心数+1

    IO密集型: 远程调用RPC,操作数据库等,不需要使用cpu进行大量的运算。 大多数应用的场景

    ​ 核心线程数=核数*cpu期望利用率 *总时间/cpu运算时间

    但是基于以上理论还是很难去配置,因为cpu运算时间不好估算

    实际配置大小可参考下表

    cpu密集型io密集型
    线程数数量核数<=x<=核数*2核心数*50<=x<=核心数 *100
    队列长度y>=1001<=y<=10

    1.线程池参数通过分布式配置,修改配置无需重启应用

    线程池参数是根据线上的请求数变化而变化的,最好的方式是 核心线程数、最大线程数 队列大小都是可配置的

    主要配置 corePoolSize maxPoolSize queueSize

    java提供了可方法覆盖参数,线程池内部会处理好参数 进行平滑的修改

    public void setCorePoolSize(int corePoolSize) {
    }
    

    2.增加线程池的监控

    3.io密集型可调整为先新增任务到最大线程数后再将任务放到阻塞队列

    代码 主要可重写阻塞队列 加入任务的方法

    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }
    
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int currentPoolThreadSize = executor.getPoolSize();
           
            // 如果提交任务数小于当前创建的线程数, 说明还有空闲线程,
            if (executor.getTaskCount() < currentPoolThreadSize) {
                // 将任务放入队列中,让线程去处理任务
                return super.offer(runnable);
            }
    		// 核心改动
            // 如果当前线程数小于最大线程数,则返回 false ,让线程池去创建新的线程
            if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
                return false;
            }
    
            // 否则,就将任务放入队列中
            return super.offer(runnable);
        } finally {
            lock.unlock();
        }
    }
    

    3.拒绝策略 建议使用tomcat的拒绝策略(给一次机会)

    // tomcat的源码
    @Override
    public void execute(Runnable command) {
        if ( executor != null ) {
            try {
                executor.execute(command);
            } catch (RejectedExecutionException rx) {
                // 捕获到异常后 在从队列获取,相当于重试1取不到任务 在执行拒绝任务
                if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
            }
        } else throw new IllegalStateException("StandardThreadPool not started.");
    }
    

    建议修改从队列取任务的方式: 增加超时时间,超时1分钟取不到在进行返回

    public boolean offer(E e, long timeout, TimeUnit unit){}
    

    结语

    工作三四年了,还没有正式的写过博客,自学一直都是通过笔记的方式积累,最近重新学了一下java多线程,想着周末把这部分内容认真的写篇博客分享出去。

    文章篇幅较长,给看到这里的小伙伴点个大大的赞!由于作者水平有限,加之第一次写博客,文章中难免会有错误之处,欢迎小伙伴们反馈指正。

    如果觉得文章对你有帮助,麻烦 点赞、评论、转发、在看 走起

    你的支持是我最大的动力!!!

    展开全文
  • 利用Java多线程技术实现数据库访问.caj
  • 我看了网上的代码基本都是只写了一个run打印了 ”任务 。。。以完成“ 这样的文字,并没有具体写每个线程所进行的操作啊,求大神指点多线程同时访问sqlite数据库要怎么做?
  • java多线程读取多个文件 导入数据库

    万次阅读 2016-10-14 16:59:21
    近期在做java读文件的项目,由于数据量较大,因此研究了一下多线程,总结了一下:一. 多个线程读文件和单个线程读文件,效率差不多,甚至可能不如单线程,原因如下:如果只是单纯的读文件,一个线程足够了,因为一般...

    近期在做java读文件的项目,由于数据量较大,因此研究了一下多线程,总结了一下:

    一. 多个线程读文件和单个线程读文件,效率差不多,甚至可能不如单线程,原因如下:

    如果只是单纯的读文件,一个线程足够了,因为一般瓶颈是在磁盘io上,多个线程只会在磁盘io上阻塞。因为不同文件的读写,会造成磁头的频繁转换,磁头的频繁转换要比读取磁盘的时间更长。

    但是一般是读一小块做一次处理,然后再读下一块,这样只用一个线程磁盘io有空闲的时间,就可以用多线程处理,有的线程在读数据有的线程在处理数据。而且磁盘是有缓存的,一次读48行,可能会缓存后面的1m内容,下n次其他线程来读的时候磁盘可以直接从缓存中取数据。估计线程数不会超过10个,太多线程仍然会阻塞在磁盘io上。但是随机读取文件无法利用缓存机制,而且硬盘不断的重新定位会花费大量的寻道时间,估计效率还比不上多个线程用同一个指针顺序读取文件。理论推测,具体还是得自己写程序跑一下。(原文链接:http://www.zhihu.com/question/20149395/answer/14136499)

    二. 所以这种情况下,最好有个线程去读取文件,其他的线程去处理文件数据中的业务逻辑处理(参考:http://www.dewen.net.cn/q/1334)

    解决方案:
    (1)首先,开辟一个Reader线程, 该线程负责读取文件,将读取记录存入队列中(LinkedBlockingQueue 或者 ArrayBlockingQueue)

    ArrayBlockingQueue跟LinkedBlockingQueue的区别:

    • 队列中的锁的实现不同
      ArrayBlockingQueue中的锁是没有分离的,即生产和消费用的是同一个锁;
      LinkedBlockingQueue中的锁是分离的,即生产用的是putLock,消费是takeLock

    • 在生产或消费时操作不同
      ArrayBlockingQueue基于数组,在生产和消费的时候,是直接将枚举对象插入或移除的,不会产生或销毁任何额外的对象实例;
      LinkedBlockingQueue基于链表,在生产和消费的时候,需要把枚举对象转换为Node进行插入或移除,会生成一个额外的Node对象,这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。

    • 队列大小初始化方式不同
      ArrayBlockingQueue是有界的,必须指定队列的大小;
      LinkedBlockingQueue是无界的,可以不指定队列的大小,但是默认是Integer.MAX_VALUE。当然也可以指定队列大小,从而成为有界的。

    注意:
    在使用LinkedBlockingQueue时,若用默认大小且当生产速度大于消费速度时候,有可能会内存溢出。

    在使用ArrayBlockingQueue和LinkedBlockingQueue分别对1000000个简单字符做入队操作时,
    LinkedBlockingQueue的消耗是ArrayBlockingQueue消耗的10倍左右,
    即LinkedBlockingQueue消耗在1500毫秒左右,而ArrayBlockingQueue只需150毫秒左右。

    按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。

    (2)再开辟若干个线程,负责从队列中取数据,并插入数据库中

    (3)Reader线程读取完成后,应“通知”处理线程,当处理线程处理完队列的记录,并发现Reader线程已终止的时候,就停止了。
    (参考:http://www.infoq.com/cn/articles/java-multithreaded-programming-mode-two-phase-termination)
    终止线程的三种方法

    • 使用退出标志,使线程正常退出,也就是当run方法完成后线程终止(一般线程中的任务是放在一个循环中,需要退出时只需破坏循环的条件,退出循环即可)。
    • 使用stop方法强行终止线程(这个方法不推荐使用,因为stop和suspend、resume一样,也可能发生不可预料的结果)。
    • 使用interrupt方法中断线程。

    批量读文件入库程序可参考:
    http://blog.csdn.net/u010323023/article/details/52403046?locationNum=5

    一对多实例(参考链接:http://lucky-xingxing.iteye.com/blog/2054071):

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * Created with IntelliJ IDEA.
     * Date: 4/24/14
     * Time: 9:56 AM
     * To change this template use File | Settings | File Templates.
     *
     * 生产者与消费者模型中,要保证以下几点:
     * 1 同一时间内只能有一个生产者生产
     * 2 同一时间内只能有一个消费者消费
     * 3 生产者生产的同时消费者不能消费
     * 4 消费者消费的同时生产者不能生产
     * 5 共享空间空时消费者不能继续消费
     * 6 共享空间满时生产者不能继续生产
     *
     * 使用并发库中的BlockingQueue(阻塞队列) 实现生产者与消费者
     */
    public class WaitNoticeDemo {
        public static void main(String[] args) {
    
            //固定容器大小为10
            BlockingQueue<Food> foods = new LinkedBlockingQueue<Food>(100);
    
            boolean completeFlag = false;
    
            Thread produce = new Thread(new Produce(foods));
            Thread consume1 = new Thread(new Consume(foods));
            Thread consume2 = new Thread(new Consume(foods));
            produce.start();
            consume1.start();
            consume2.start();
    
    
        }
    }
    
    /**
     * 生产者
     */
    class Produce implements Runnable{
        private BlockingQueue<Food> foods;
        private Integer count = 0;
        private boolean exitFlag = false;
        Produce(BlockingQueue<Food> foods) {
            this.foods = foods;
        }
    
        //@Override
        public void run() {
            int i = 0;
            while (i<50){
                try {
                    //当生产的食品数量装满了容器,那么在while里面该食品容器(阻塞队列)会自动阻塞  wait状态 等待消费
                    foods.put(new Food("食品"+i));
                    i++;
                    if(i == 50){
                        foods.put(new Food("end"));
    
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                }
            }//while
    
            System.out.println("生产者线程结束");
        }
    }
    
    /**
     * 消费者
     */
    class Consume implements Runnable {
        private BlockingQueue<Food> foods;
        private boolean flag = true;
    
        Consume(BlockingQueue<Food> foods){
            this.foods = foods;
        }
        //@Override
        public void run() {
            System.out.println("消费者线程 - " + Thread.currentThread().getName() + "启动");
            long start = System.currentTimeMillis(); // 记录起始时间
            try {
                //Thread.sleep(3);  //用于测试当生产者生产满10个食品后是否进入等待状态
                while (flag){
                    //当容器里面的食品数量为空时,那么在while里面该食品容器(阻塞队列)会自动阻塞  wait状态 等待生产
                    Food food = foods.take();
                    System.out.println(Thread.currentThread().getName() + "消费"+food.getName());
    
                    if(("end").equals(food.getName())){
                           flag = false;
                           foods.put(food);//将结束标志放进队列  以防别的消费者线程看不到
    
                          long end = System.currentTimeMillis(); // 记录起始时间
                          System.out.println("execuete time : " + (end-start));
                    }
    
                }//while
    
                System.out.println("消费者线程结束");
            } catch (InterruptedException e) {
                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
            }
        }
    }
    
    /**
     * 食品
     */
    class Food{
        private String name;
    
        String getName() {
            return name;
        }
    
        Food(String name){
            this.name = name;
            System.out.println("生产"+name);
        }
    }
    
    

    三. 最近碰到csv文件上传存入数据库后,数据库存储空间远大于实际文件大小的情况,以下是针对该种情况的解决方案:
    1. 设置的字段类型、长度尽可能和文件中对应字段接近;
    2. 不要CHAR, 多用VARCHAR,在数据量比较大时,VARCHAR的优势特别明显。

    四. 对于处理大数据量的记录,并将处理结果写入文件中的处理方案:
    方案一(适合于处理和输出的数据量都很大的情况):
    生产者:多个线程 读取一定量的数据并处理,然后将处理结果封装成一个队列元素,装进阻塞队列中
    消费者: 一个线程 取元素 追加写文件(csv) (多个线程写文件是不安全的)

    方案二(目前在使用的,适用于需要处理的数据量大,但输出的数据量不大的情况):
    生产者:一个线程,分页查询部分数据,将其封装成队列元素装进队列中
    消费者:多个线程 ,从队列中取出数据元素并处理,存储处理结果。
    生产者和消费者执行完毕后,再集中将消费者处理的结果一个个输出到相应文件中

    五. 数据记录验证问题
    对于用数据库中的一个数据表验证另一个数据表中数据的情况,应采用SQL连接查询(适用于待验证和标准数据都在数据库的情况);

    而对于用数据库中的一个数据表验证文件中的数据记录的情况,则需要首先根据某些关键字从数据库中查出相关记录,然后在内存中进行数据的验证处理工作,避免多次访问数据库(适用于待验证数据不多,而验证的标准数据很多的情况)。

    六.文件字符串长度大于数据库规定长度时,截断处理
    数据库中 nchar (nvarchar)类型中文字符和英文字符长度都为1,nchar(10)的字段可以存放十个中英文字符,而char(varchar)类型中文字符长度为2,,英文字符长度为1,char(10)的字段可以存放五个中文字符,10个英文字符。

    针对char类型,中文字符长度为2,英文字符长度为1的情况,计算字符串长度和对字符串截断处理的代码如下:

    // 判断一个字符是否是中文
        public static boolean isChineseChar(char c) {
            return c >= 0x4E00 &&  c <= 0x9FA5;// 根据字节码判断
        }
    
        //20170302 针对数据库varchar类型 计算字符串长度:中文字符数*2 + 英文字符数<= 数据库字段设定长度
        // 判断一个字符串是否含有中文
        public static Integer getDBStrLength(String str) {
           Integer len = 0;
           if(str != null && str.length()>0){
               char[] arr = str.toCharArray();
               for(int i=0; i<arr.length; i++){
                   if(isChineseChar(arr[i])){
                        len += 2;
                   }else{
                       len += 1;
                   }
               }//for
           }
    
           return len;
        }
    
        //20170302 针对数据库varchar类型 根据字符串长度以及长度限制  对字符串进行截断处理
        public static String getTruncateStrByMaxLength(String str,Integer maxLen) {
            String result = "";
            Integer len = 0;
            if(str != null && str.length()>0){
                char[] arr = str.toCharArray();
                for(int i=0; i<arr.length; i++){
                    if(isChineseChar(arr[i])){
                        len += 2;
                    }else{
                        len += 1;
                    }
                    if(len <= maxLen){
                         result += arr[i];
                    }else{
                        break;
                    }
                }//for
            }
    
            return result;
        }
    展开全文
  • NULL 博文链接:https://chensijie88888.iteye.com/blog/829238
  • 进而从提高访问数据库效率的角度,论述了采用数据库连接池技术、优化SQL语句格式和多线程方法等可提高Java访问数据库效率的方法。  1.概述  在软件开发中通常都会涉及到数据库的应用。而数据库的连接则是数据库...
  • 多线程访问数据库

    千次阅读 2016-05-23 09:47:36
    android.database.sqlite.SQLiteDatabaseLockedException: database is locked (code 5)上面的问题,是因为每次创建SQLiteOpenHelper...为确保多线程能安全地操作数据库,则需要保证只有一个数据库连接被占用。 java

    <span style="font-size:18px;">
    </span>

    android.database.sqlite.SQLiteDatabaseLockedException: database is locked (code 5)

    上面的问题,是因为每次创建SQLiteOpenHelper对象时,实际上也是在新建一个数据库连接。如果通过多个连接同时对数据库进行写数据操作,则一定会失败。为确保多线程能安全地操作数据库,则需要保证只有一个数据库连接被占用。

    <span style="font-size:18px;">
    </span>

    java.lang.IllegalStateException:attempt to re-open an already-closed object: SQLiteDatabase

    既然只有一个数据库连接,不同的线程会取得一样的SQLiteDatabase对象实例。当线程1尝试管理数据库连接时,线程2却仍然在使用该数据库连接。这就导致了上面问题的原因。因此我们只能在确保数据库没有被占用的情况下,才去关闭它。

    DatabaseManager:

    <span style="font-size:18px;">public class DatabaseManager {
    
        private AtomicInteger mOpenCounter = new AtomicInteger();
        private static DatabaseManager mInstance;
        private SQLiteOpenHelper mDatabaseHelper;
        private SQLiteDatabase mDatabase;
    
        private DatabaseManager(SQLiteOpenHelper helper) {
            mDatabaseHelper = helper;
        }
    
        public static synchronized void initInstance(SQLiteOpenHelper helper) {
            if (mInstance == null) {
                mInstance = new DatabaseManager(helper);
            }
        }
    
        public static synchronized DatabaseManager getInstance() {
            if (mInstance == null) {
                throw new IllegalStateException(
                        "DatabaseManager: 请先调用始化操作initInstance");
            }
            return mInstance;
        }
    
        public void executeQuery(IQueryExecutor executor) {
            SQLiteDatabase database = openDatabase();
            executor.run(database);
            closeDatabase();
        }
    
        public void executeQueryTask(final IQueryExecutor executor) {
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    executeQuery(executor);
                }
            }).start();
        }
    
        private synchronized SQLiteDatabase openDatabase() {
            if (mOpenCounter.incrementAndGet() == 1) {
                mDatabase = mDatabaseHelper.getWritableDatabase();
            }
            return mDatabase;
        }
    
        private synchronized void closeDatabase() {
            if (mOpenCounter.decrementAndGet() == 0) {
                mDatabase.close();
            }
        }
    
    }</span>

    helper文件:

    <span style="font-size:18px;">public class AddPathToDBHelper extends SQLiteOpenHelper {
    
        private static final int START_DATABASE_VERSION = 1;
        private static final int CURRENT_DATABASE_VERSION = 1;
        private static final String DB_NAME = "paths.db";
    
        public AddPathToDBHelper(Context context) {
            super(context, DB_NAME, null, CURRENT_DATABASE_VERSION);
        }
    
        @Override
        public void onCreate(SQLiteDatabase db) {
            db.execSQL("create table if not exists " + Table.TABLE_NAME + " ( "
                    + Table._ID + " integer primary key autoincrement , "
                    + Table.PATH + " text unique not null , " + Table.LAST_MODIFIED
                    + " long );");
        }
    
        @Override
        public void onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {
            switch (oldVersion) {
            case START_DATABASE_VERSION:
    
                break;
    
            default:
                break;
            }
        }
    
    }</span>

    DAO文件:

    <span style="font-size:18px;">public class AddPathDAO {
    
        interface Table {
            String TABLE_NAME = "allPaths";
            String _ID = "_id";
            String PATH = "path";
            String LAST_MODIFIED = "last_modified";
        }
    
        private SQLiteDatabase mDatabase;
    
        public AddPathDAO(SQLiteDatabase database) {
            mDatabase = database;
        }
    
        public void insertPath(HashMap<String, Long> map) {
            ContentValues values = new ContentValues();
            try {
                mDatabase.beginTransaction();
                for (Entry<String, Long> item : map.entrySet()) {
                    values.put(Table.PATH, item.getKey());
                    values.put(Table.LAST_MODIFIED, item.getValue());
                    mDatabase.insert(Table.TABLE_NAME, null, values);
                }
                mDatabase.setTransactionSuccessful();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                mDatabase.endTransaction();
            }
        }
    }</span>

    调用用例:

    <span style="font-size:18px;">public class MainActivity extends Activity {
    
        private HashMap<String, Long> map;
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            DatabaseManager.initInstance(new AddPathToDBHelper(
                    getApplicationContext()));
            map = new HashMap<String, Long>();
            map.put("sdcard0/tencet", 1234354566L);
            map.put("sdcard0/tencet1", 1234354566L);
            map.put("sdcard0/tencet2", 1234354566L);
            DatabaseManager.getInstance().executeQuery(new IQueryExecutor() {
    
                @Override
                public void run(SQLiteDatabase database) {
                    AddPathDAO addPathDAO = new AddPathDAO(database);
                    addPathDAO.insertPath(map);
                }
            });
        }
    
    }</span>

    展开全文
  • java多线程核心技术

    2018-01-19 17:22:04
    结合大量实例,全面讲解Java多线程编程中的并发访问、线程间通信、锁等最难突破的核心技术与应用实践 Java多线程无处不在,如服务器、数据库、应用。多线程可以有效提升计算和处理效率,大大提升吞吐量和可伸缩性,...
  • 多线程多进程数据库访问应注意问题:事务在执行时不知道其他事务的存在,无论是多Connection或是单Connection。1、Select 语句不需要显示的用 BeginTrans 声明事务。如果显示的进行了声明则会给数据库加入共享锁,接...
  • 多线程访问数据库问题

    千次阅读 2018-03-06 21:02:23
    最近做一个多线程的远程升级软件,做完后用一个对应的测试程序进行测试,发现线程一多必崩溃,而把所有数据库处理全部删掉后,就可以同时运行几百个线程不崩溃了.原因应该是自己采用了单例模式,在各个线程中发送信号给...
  • 进而从提高访问数据库效率的角度,论述了采用数据库连接池技术、优化SQL语句格式和多线程方法等可提高Java访问数据库效率的方法。  1.概述  在软件开发中通常都会涉及到数据库的应用。而数据库的连接则是数据库...
  • java线程读取数据库,用jdbc

    千次阅读 2012-06-21 14:11:03
    访问数据库的时候总是DriverManager.getConnection(),虽然测试之后发现消耗的时间并不,大概在10毫秒左右(依据电脑配置不同略有差异),第一次初始化大概在300毫秒左右。但是总觉得在线程中一直getConnection...
  • 本课程全面讲解Java多线程编程中的并发访问、线程间通信、锁等难突破的核心技术与应用实践。  Java多线程无处不在,如服务器、数据库、应用。多线程可以有效提升计算和处理效率,大大提升吞吐量和可伸缩性,深得...
  • Spring+Hibernate多线程访问数据库

    千次阅读 2013-11-18 10:37:11
    手头的项目采用Spring MVC框架,考虑到执行效率,需要在多线程下用native SQL访问数据库。留一份笔记备忘。 基本配置就不说了,网上有很多教程。 主叫类: @Component("manage") @Transactional public class Manage...
  • //service里的方法主要是对参数进行一些处理,然后去数据库查数据 Clob clob = (Clob)result.get("clob"); //下面要把clob字段转成string,系统也会报错,为关闭的链接, //要怎么解决啊,是和线程安全有关吗,还是...
  • Java多线程应用实例

    万次阅读 2016-08-23 18:23:50
    Java多线程及其应用实例,数据库连接池,线程池
  • Java多线程总结

    万次阅读 2018-02-07 19:33:19
    如:为什么你需要使用线程, 如何创建线程,用什么方式创建线程比较好(比如:继承thread类还是调用Runnable接口),然后逐渐问到并发问题像在Java并发编程的过程中遇到了什么挑战,Java内存模型,JDK1.5引入了哪些...
  • JAVA多线程常见的十大问题

    万次阅读 2020-05-06 20:55:40
    JAVA多线程常见的十大问题讲解
  • //唤醒等待在该pool上的线程 pool.notifyAll(); } } } /** * 获取连接 * * @param mills * @return * @throws InterruptedException */ public Connection fetchConnection(long ...
  • Java多线程网络爬虫(时光网为例)

    万次阅读 热门讨论 2016-09-24 10:22:46
    数据库操作多线程简介Java多线程实现方式主要有三种:继承Thread类、实现Runnable接口、使用ExecutorService、Callable、Future实现有返回结果的多线程。其中前两种方式线程执行完后都没有返回值,只有最后一种是带...
  • Android多线程下安全访问数据库

    万次阅读 2014-04-16 08:55:09
    为了记录如何线程安全地访问你的Android数据库实例,我写下了这篇小小札记。文章中引用的项目代码请点击这里  假设你已编写了自己的 SQLiteOpenHelper。 public class DatabaseHelper extends SQLiteOpenHelper {...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 222,867
精华内容 89,146
关键字:

java多线程数据库访问

java 订阅