-
2019-11-23 19:37:24
前言
有一张客户信息表,数据量很大。需要把这些记录调用某个接口(这个接口一次只能查询10个客户信息),然后把接口返回信息解析保存到另一个表里。
客户信息表(cust_info)如下:
id cust_id status remark input_time update_tiem 1 20191111000001 2019-11-23 10:45:04 2019-11-23 10:45:04 2 20191111000002 2019-11-23 10:45:04 2019-11-23 10:45:04 3 20191111000003 2019-11-23 10:45:04 2019-11-23 10:45:04 4 20191111000004 2019-11-23 10:45:04 2019-11-23 10:45:04 5 20191111000005 2019-11-23 10:45:04 2019-11-23 10:45:04 解析后要保存的表(cust_detail)如下:
id cust_id city name tel age input_time update_time 1 20191111000001 北京 张三 1877778872 12 2019-11-23 10:45:04 2019-11-23 10:45:04 2 20191111000002 北京 张三 1877778872 12 2019-11-23 10:45:04 2019-11-23 10:45:04 3 20191111000003 北京 张三 1877778872 12 2019-11-23 10:45:04 2019-11-23 10:45:04 4 20191111000004 北京 张三 1877778872 12 2019-11-23 10:45:04 2019-11-23 10:45:04 5 20191111000005 北京 张三 1877778872 12 2019-11-23 10:45:04 2019-11-23 10:45:04 第一版
思路:
使用线程池,结合redis来实现。每次固定从cust_info表取30条未处理的记录,按10等分给三个线程。用redis来统计线程是否执行完毕,在redis中存一个变量值为开启的线程数。当子线程执行完毕时将redis中的该变量减一。while循环发现该变量值变为0时说明子线程全部执行完毕,然后再取30条未处理的数据,重复上面的步骤,直至完成。子线程在查询完某个客户时会更新cust_info表的status字段值为1,表示该客户已经被查询过了。缺点:
- 需要依赖redis。
- 每次循环要等所以有的线程执行完毕后,再进入下一次循环,线程利用不充分。
主要文件详情:
- CustThreadPoolExecutor.java 线程池
- CustQueryOneThread.java 线程类
- CustInfoOneServiceImpl.java 业务逻辑类
每次取数据的mybatis的 xml文件
<select id="selectCustInfoList" resultMap="BaseResultMap"> select <include refid="Base_Column_List"/> from cust_info where status = 0 or status is null </select>
CustThreadPoolExecutor.java
package com.lh.service.task; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class CustThreadPoolExecutor { private static ThreadPoolExecutor pool; private static void init() { pool = new ThreadPoolExecutor( 3, 6, 1000, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<Runnable>(30) ); } private CustThreadPoolExecutor() { pool = new ThreadPoolExecutor( 3, 6, 1000, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<Runnable>(30) ); } public static void execute(Runnable runnable) { if (pool == null) { init(); } pool.execute(runnable); } }
CustQueryOneThread.java
package com.lh.service.task; import com.alibaba.fastjson.JSON; import com.lh.dao.sys.CustDetailMapper; import com.lh.dao.sys.CustInfoMapper; import com.lh.entity.CustDetail; import com.lh.entity.CustInfo; import com.lh.entity.enums.RedisKeyEnum; import com.lh.utils.RedisUtil; import com.lh.utils.SpringUtil; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @Slf4j public class CustQueryOneThread implements Runnable { private List<CustInfo> data; private Map<String, CustInfo> map; private SqlSessionFactory sqlSessionFactory; private RedisUtil redisUtil; public CustQueryOneThread(List<CustInfo> data) { this.data = data; this.map = new HashMap<>(); this.sqlSessionFactory = SpringUtil.getBean(SqlSessionFactory.class); this.redisUtil = SpringUtil.getBean(RedisUtil.class); } @Override public void run() { try { // 1、组装参数 String custIds = this.before(); // 2、调用接口 我们这里简单模拟 String result = this.runing(custIds); // 3、解析返回保存记录 this.after(result); } catch (Exception e) { e.printStackTrace(); log.error("查询客户信息发生异常!e={}", e); } finally { //线程执行完毕 计数减一 redisUtil.incrBy(RedisKeyEnum.CUST_QUERY_THREAD_COUNT.getCode(), -1); } } /** * 组装接口调用入参 * * @return */ private String before() { return data.stream().map(a -> { map.put(a.getCustId(), a); return a.getCustId(); }).collect(Collectors.joining(",")); } /** * 模拟调用外部接口返回Json 数据 * * @return */ private String runing(String request) { List<CustDetail> list = Arrays.stream(request.split(",")).map(a -> { CustDetail custDetail = new CustDetail(); custDetail.setCustId(a); custDetail.setAge(12); custDetail.setCity("北京"); custDetail.setTel("17788998880"); return custDetail; }).collect(Collectors.toList()); return JSON.toJSONString(list); } /** * 解析接口返回并保存结果 * * @param result */ private void after(String result) { SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH); try { CustDetailMapper custDetailMapper = session.getMapper(CustDetailMapper.class); CustInfoMapper custInfoMapper = session.getMapper(CustInfoMapper.class); List<CustDetail> list = JSON.parseArray(result, CustDetail.class); for (CustDetail custDetail : list) { custDetailMapper.insertSelective(custDetail); CustInfo custInfo = map.get(custDetail.getCustId()); custInfo.setStatus(Byte.parseByte("1")); custInfo.setRemark("处理成功"); custInfoMapper.updateByPrimaryKeySelective(custInfo); } session.commit(); session.clearCache(); } catch (Exception e) { e.printStackTrace(); log.error("保存接口返回发生异常!e={}", e); session.rollback(); } finally { session.close(); } } }
CustInfoOneServiceImpl.java
package com.lh.service.impl; import com.github.pagehelper.Page; import com.github.pagehelper.PageHelper; import com.lh.dao.sys.CustInfoMapper; import com.lh.entity.CustInfo; import com.lh.entity.enums.RedisKeyEnum; import com.lh.service.CustInfoService; import com.lh.service.task.CustQueryOneThread; import com.lh.service.task.CustThreadPoolExecutor; import com.lh.utils.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; import java.util.stream.Collectors; @Slf4j @Service("custInfoOneServiceImpl") public class CustInfoOneServiceImpl implements CustInfoService { private static final int PAGE_SIZE = 30; private static final int QUERY_SIZE = 10; @Resource private CustInfoMapper custInfoMapper; @Resource private RedisUtil redisUtil; @Override public void queryJob() { try { log.info("==============cust query task begin!================="); redisUtil.set(RedisKeyEnum.CUST_QUERY_THREAD_COUNT.getCode(), "0"); while (true) { String s = redisUtil.get(RedisKeyEnum.CUST_QUERY_THREAD_COUNT.getCode()); if ("0".equals(s)) { this.query(); } else if ("-1".equals(s)) { break; } else { Thread.sleep(1000); } } log.info("==============cust query task end !================="); } catch (Exception e) { e.printStackTrace(); log.error("==============cust query task exception! e={}=================", e); } } private void query() { Page page = PageHelper.startPage(1, PAGE_SIZE); List<CustInfo> list = custInfoMapper.selectCustInfoList(); if (list == null || list.size() == 0) { redisUtil.set(RedisKeyEnum.CUST_QUERY_THREAD_COUNT.getCode(), "-1"); return; } int size = list.size(); int m = size / QUERY_SIZE; int n = size % QUERY_SIZE; int threadCount = m + (n > 0 ? 1 : 0); redisUtil.set(RedisKeyEnum.CUST_QUERY_THREAD_COUNT.getCode(), String.valueOf(threadCount)); if (m > 0) { for (int i = 0; i < m; i++) { List<CustInfo> collect = list.stream().skip(QUERY_SIZE * i).limit(QUERY_SIZE).collect(Collectors.toList()); CustThreadPoolExecutor.execute(new CustQueryOneThread(collect)); } } if (n > 0) { List<CustInfo> collect = list.stream().skip(QUERY_SIZE * m).collect(Collectors.toList()); CustThreadPoolExecutor.execute(new CustQueryOneThread(collect)); } } }
第二版
思路:
使用线程池,结合CountDownLatch来实现。每次固定从cust_info表取30条未处理的记录,按10等分给三个线程,用CountDownLatch来统计线程是否执行完毕。CountDownLatch在创建时传入开启的线程数。并将CountDownLatch对象传给子线程,子线程执行完毕时调用CountDownLatch的countDown()方法,此时主线程调用CountDownLatch的await()方法等待所有子线程执行完毕。当所有的子线程都调用完countDown()方法说明子线程全部执行完毕,然后主线程继续进入下一个循环。再取30条未处理的数据,重复上面的步骤,直至完成。子线程在查询完某个客户时会更新cust_info表的status字段值为1,表示该客户已经被查询过了。优点:不需要依赖其他。
缺点: 每次循环要等所以有的线程执行完毕后,再进入下一次循环,线程利用不充分。主要文件详情:
- CustQueryTwoThread.java 线程类
- CustInfoTwoServiceImpl.java 业务逻辑类
每次取数据的mybatis的 xml文件
<select id="selectCustInfoList" resultMap="BaseResultMap"> select <include refid="Base_Column_List"/> from cust_info where status = 0 or status is null </select> <select id="selectCustInfoCount" resultType="java.lang.Integer"> select count(1) from cust_info </select>
CustQueryTwoThread.java
package com.lh.service.task; import com.alibaba.fastjson.JSON; import com.lh.dao.sys.CustDetailMapper; import com.lh.dao.sys.CustInfoMapper; import com.lh.entity.CustDetail; import com.lh.entity.CustInfo; import com.lh.utils.SpringUtil; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; @Slf4j public class CustQueryTwoThread implements Runnable { private List<CustInfo> data; private Map<String, CustInfo> map; private CountDownLatch countDownLatch; private SqlSessionFactory sqlSessionFactory; public CustQueryTwoThread(List<CustInfo> data, CountDownLatch countDownLatch) { this.data = data; this.map = new HashMap<>(); this.countDownLatch = countDownLatch; this.sqlSessionFactory = SpringUtil.getBean(SqlSessionFactory.class); } @Override public void run() { try { // 1、组装参数 String custIds = this.before(); // 2、调用接口 我们这里简单模拟 String result = this.runing(custIds); // 3、解析返回保存记录 this.after(result); } catch (Exception e) { e.printStackTrace(); log.error("查询客户信息发生异常!e={}", e); } finally { //线程执行完毕 释放 countDownLatch.countDown(); } } /** * 组装接口调用入参 * * @return */ private String before() { return data.stream().map(a -> { map.put(a.getCustId(), a); return a.getCustId(); }).collect(Collectors.joining(",")); } /** * 模拟调用外部接口返回Json 数据 * * @return */ private String runing(String request) { List<CustDetail> list = Arrays.stream(request.split(",")).map(a -> { CustDetail custDetail = new CustDetail(); custDetail.setCustId(a); custDetail.setAge(12); custDetail.setCity("北京"); custDetail.setTel("17788998880"); custDetail.setName("小花"); return custDetail; }).collect(Collectors.toList()); return JSON.toJSONString(list); } /** * 解析接口返回并保存结果 * * @param result */ private void after(String result) { SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH); try { CustDetailMapper custDetailMapper = session.getMapper(CustDetailMapper.class); CustInfoMapper custInfoMapper = session.getMapper(CustInfoMapper.class); List<CustDetail> list = JSON.parseArray(result, CustDetail.class); for (CustDetail custDetail : list) { custDetailMapper.insertSelective(custDetail); CustInfo custInfo = map.get(custDetail.getCustId()); custInfo.setStatus(Byte.parseByte("1")); custInfo.setRemark("处理成功"); custInfoMapper.updateByPrimaryKeySelective(custInfo); } session.commit(); session.clearCache(); } catch (Exception e) { e.printStackTrace(); log.error("保存接口返回发生异常!e={}", e); session.rollback(); } finally { session.close(); } } }
CustInfoTwoServiceImpl.java
package com.lh.service.impl; import com.github.pagehelper.Page; import com.github.pagehelper.PageHelper; import com.lh.dao.sys.CustInfoMapper; import com.lh.entity.CustInfo; import com.lh.service.CustInfoService; import com.lh.service.task.CustQueryTwoThread; import com.lh.service.task.CustThreadPoolExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; @Slf4j @Service("custInfoTwoServiceImpl") public class CustInfoTwoServiceImpl implements CustInfoService { private static final int PAGE_SIZE = 30; private static final int QUERY_SIZE = 10; @Resource private CustInfoMapper custInfoMapper; @Override public void queryJob() { try { log.info("==============cust query task begin!================="); int i = custInfoMapper.selectCustInfoCount(); int pageCount = i / PAGE_SIZE + (i % PAGE_SIZE > 0 ? 1 : 0); int flag = 1; while (flag <= pageCount) { Page page = PageHelper.startPage(1, PAGE_SIZE); List<CustInfo> list = custInfoMapper.selectCustInfoList(); if (list == null || list.size() == 0) { break; } this.query(list); flag++; } log.info("==============cust query task end !================="); } catch (Exception e) { e.printStackTrace(); log.error("==============cust query task exception! e={}=================", e); } } private void query(List<CustInfo> list) throws InterruptedException { int size = list.size(); int m = size / QUERY_SIZE; int n = size % QUERY_SIZE; int threadCount = m + (n > 0 ? 1 : 0); CountDownLatch latch = new CountDownLatch(threadCount); if (m > 0) { for (int i = 0; i < m; i++) { List<CustInfo> collect = list.stream().skip(QUERY_SIZE * i).limit(QUERY_SIZE).collect(Collectors.toList()); CustThreadPoolExecutor.execute(new CustQueryTwoThread(collect, latch)); } } if (n > 0) { List<CustInfo> collect = list.stream().skip(QUERY_SIZE * m).collect(Collectors.toList()); CustThreadPoolExecutor.execute(new CustQueryTwoThread(collect, latch)); } // 主线程等待所以有子线程执行完毕再继续执行 latch.await(); } }
第三版
思路:
使用线程池,和Semaphore 实现。Semaphore 初始化时设置线程数为3,将cust_info表按照id字段排序每次依次从cust_info表30条记录,按10等分给三个线程,在分给线程之前要先获取许可(semaphore.acquire()),有许可时才开启线程。当子线程在执行完毕后会释放许可,主线程这时会获取许可,把准备好的数据分给子线程。当一次循环执行完毕之后再依次向下取30条。重复上面的步骤,直至完成。优点:不需要依赖其他。每当一个线程执行完毕,就可以开启一个新的线程。不用等到所有的线程执行完毕再继续。
主要文件详情:
- CustQueryThreeThread.java 线程类
- CustInfoThreeServiceImpl.java 业务逻辑类
<select id="selectList" resultMap="BaseResultMap"> select <include refid="Base_Column_List"/> from cust_info </select> <select id="selectCustInfoCount" resultType="java.lang.Integer"> select count(1) from cust_info </select>
CustQueryThreeThread.java
package com.lh.service.task; import com.alibaba.fastjson.JSON; import com.lh.dao.sys.CustDetailMapper; import com.lh.dao.sys.CustInfoMapper; import com.lh.entity.CustDetail; import com.lh.entity.CustInfo; import com.lh.utils.SpringUtil; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.stream.Collectors; @Slf4j public class CustQueryThreeThread implements Runnable { private List<CustInfo> data; private Map<String, CustInfo> map; private Semaphore semaphore; private SqlSessionFactory sqlSessionFactory; public CustQueryThreeThread(List<CustInfo> data, Semaphore semaphore) { this.data = data; this.map = new HashMap<>(); this.semaphore = semaphore; this.sqlSessionFactory = SpringUtil.getBean(SqlSessionFactory.class); } @Override public void run() { try { // 1、组装参数 String custIds = this.before(); // 2、调用接口 我们这里简单模拟 System.out.println("sleep ===== begin"); Thread.sleep(10000); System.out.println("sleep ===== end"); String result = this.runing(custIds); // 3、解析返回保存记录 this.after(result); } catch (Exception e) { e.printStackTrace(); log.error("查询客户信息发生异常!e={}", e); } finally { //线程执行完毕 释放许可 System.out.println("sleep ===== release "); semaphore.release(); } } /** * 组装接口调用入参 * * @return */ private String before() { return data.stream().map(a -> { map.put(a.getCustId(), a); return a.getCustId(); }).collect(Collectors.joining(",")); } /** * 模拟调用外部接口返回Json 数据 * * @return */ private String runing(String request) { List<CustDetail> list = Arrays.stream(request.split(",")).map(a -> { CustDetail custDetail = new CustDetail(); custDetail.setCustId(a); custDetail.setAge(12); custDetail.setCity("北京"); custDetail.setTel("17788998880"); custDetail.setName("小花"); return custDetail; }).collect(Collectors.toList()); return JSON.toJSONString(list); } /** * 解析接口返回并保存结果 * * @param result */ private void after(String result) { SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH); try { CustDetailMapper custDetailMapper = session.getMapper(CustDetailMapper.class); CustInfoMapper custInfoMapper = session.getMapper(CustInfoMapper.class); List<CustDetail> list = JSON.parseArray(result, CustDetail.class); for (CustDetail custDetail : list) { custDetailMapper.insertSelective(custDetail); CustInfo custInfo = map.get(custDetail.getCustId()); custInfo.setStatus(Byte.parseByte("1")); custInfo.setRemark("处理成功"); custInfoMapper.updateByPrimaryKeySelective(custInfo); } session.commit(); session.clearCache(); } catch (Exception e) { e.printStackTrace(); log.error("保存接口返回发生异常!e={}", e); session.rollback(); } finally { session.close(); } } }
CustInfoThreeServiceImpl.java
package com.lh.service.impl; import com.github.pagehelper.Page; import com.github.pagehelper.PageHelper; import com.lh.dao.sys.CustInfoMapper; import com.lh.dao.sys.CustQueryRecordMapper; import com.lh.entity.CustInfo; import com.lh.entity.CustQueryRecord; import com.lh.service.CustInfoService; import com.lh.service.task.CustQueryThreeThread; import com.lh.service.task.CustQueryTwoThread; import com.lh.service.task.CustThreadPoolExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.stream.Collectors; @Slf4j @Service("custInfoThreeServiceImpl") public class CustInfoThreeServiceImpl implements CustInfoService { private static final int PAGE_SIZE = 30; private static final int QUERY_SIZE = 10; @Resource private CustInfoMapper custInfoMapper; @Resource private CustQueryRecordMapper custQueryRecordMapper; @Override public void queryJob() { try { log.info("==============cust query task begin!================="); Semaphore semaphore = new Semaphore(3); int i = custInfoMapper.selectCustInfoCount(); int pageCount = i / PAGE_SIZE + (i % PAGE_SIZE > 0 ? 1 : 0); int flag = 1; while (flag <= pageCount) { Page page = PageHelper.startPage(flag, PAGE_SIZE); List<CustInfo> list = custInfoMapper.selectList(); if (list == null || list.size() == 0) { break; } this.query(list, semaphore); this.saveRecord(flag, list.size()); flag++; } log.info("==============cust query task end !================="); } catch (Exception e) { e.printStackTrace(); log.error("==============cust query task exception! e={}=================", e); } } /** * 查询 * * @param list * @param semaphore * @throws InterruptedException */ private void query(List<CustInfo> list, Semaphore semaphore) throws InterruptedException { int size = list.size(); int m = size / QUERY_SIZE; int n = size % QUERY_SIZE; if (m > 0) { for (int i = 0; i < m; i++) { List<CustInfo> collect = list.stream().skip(QUERY_SIZE * i).limit(QUERY_SIZE).collect(Collectors.toList()); // 获取许可 semaphore.acquire(); CustThreadPoolExecutor.execute(new CustQueryThreeThread(collect, semaphore)); } } if (n > 0) { List<CustInfo> collect = list.stream().skip(QUERY_SIZE * m).collect(Collectors.toList()); CustThreadPoolExecutor.execute(new CustQueryThreeThread(collect, semaphore)); } } /** * 保存记录 * * @param page * @param size */ private void saveRecord(int page, int size) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String busiDate = sdf.format(new Date()); CustQueryRecord record = new CustQueryRecord(); record.setRecordId(busiDate + "_" + page); record.setPage(page); record.setBusiDate(busiDate); record.setRemark(String.valueOf(size)); custQueryRecordMapper.insertSelective(record); } }
新增了查询记录表,这里会把执行成功的记录和页号存在表里。方便异常后重启用。可以在方法开始的时候根据busi_date 来查询page最大值,如果存在从最大值+1开始执行。如果不存在就从第一页(page等于1)开始执行。注意获取最大值后要和根据和 pagecount (int pageCount = i / PAGE_SIZE + (i % PAGE_SIZE > 0 ? 1 : 0);)进行比较如果相等或者大于pagecount ;直接return。
id record_id page remark busi_date input_time update_time 1 2019-11-23_1 1 30 2019-11-23 2019-11-23 19:09:12 2019-11-23 19:09:12 2 2019-11-23_2 2 30 2019-11-23 2019-11-23 19:09:22 2019-11-23 19:09:22 3 2019-11-23_3 3 30 2019-11-23 2019-11-23 19:09:32 2019-11-23 19:09:32 4 2019-11-23_4 4 10 2019-11-23 2019-11-23 19:09:42 2019-11-23 19:09:42 更多相关内容 -
基于Java的多线程快速排序设计与优化
2020-10-16 18:40:22为实现多线程快速排序,提出基于Fork/Join框架的多线程快速排序,同时对排序算法进行优化。该算法主要用于大量数据需要进行排序处理的应用。 -
JAVA实现多线程的两种方法实例分享
2020-09-05 02:35:51介绍了JAVA实现多线程的两种方法实例分享,有需要的朋友可以参考一下 -
java对大数据量文件内容的多线程读取和排序.pdf
2021-10-30 15:02:46java对大数据量文件内容的多线程读取和排序.pdf -
java用多线程进行排序算法的比较
2010-06-01 12:20:26这是一个简单的运用多线程的程序 主要对快速排序等多个算法进行比较 -
java实验报告——多线程.pdf
2020-12-15 11:14:22成绩 面向对象原理与Java实践课程实验报告 实验5多线程 姓 名 _ _ _ 班 级 _ 学 号 _ 实验地点 _ 实验时间 _ 指导教师 _ _ _ 一实验目的 了解线程调度机制 理解线程同步机制 掌握线程设计方法 二实验要求 掌握线程... -
万字图解Java多线程
2020-09-06 14:45:07java多线程我个人觉得是javaSe中最难的一部分,我以前也是感觉学会了,但是真正有多线程的需求却不知道怎么下手,实际上还是对多线程这块知识了解不深刻,不知道多线程api的应用场景,不知道多线程的运行流程等等,...前言
授权Java面试者精选独家原创发布
java多线程我个人觉得是javaSe中最难的一部分,我以前也是感觉学会了,但是真正有多线程的需求却不知道怎么下手,实际上还是对多线程这块知识了解不深刻,不知道多线程api的应用场景,不知道多线程的运行流程等等,本篇文章将使用实例+图解+源码的方式来解析java多线程。
文章篇幅较长,大家也可以有选择的看具体章节,建议多线程的代码全部手敲,永远不要相信你看到的结论,自己编码后运行出来的,才是自己的。
什么是java多线程?
进程与线程
进程
- 当一个程序被运行,就开启了一个进程, 比如启动了qq,word
- 程序由指令和数据组成,指令要运行,数据要加载,指令被cpu加载运行,数据被加载到内存,指令运行时可由cpu调度硬盘、网络等设备
线程
- 一个进程内可分为多个线程
- 一个线程就是一个指令流,cpu调度的最小单位,由cpu一条一条执行指令
并行与并发
并发:单核cpu运行多线程时,时间片进行很快的切换。线程轮流执行cpu
并行:多核cpu运行 多线程时,真正的在同一时刻运行
java提供了丰富的api来支持多线程。
为什么用多线程?
多线程能实现的都可以用单线程来完成,那单线程运行的好好的,为什么java要引入多线程的概念呢?
多线程的好处:
-
程序运行的更快!快!快!
-
充分利用cpu资源,目前几乎没有线上的cpu是单核的,发挥多核cpu强大的能力
多线程难在哪里?
单线程只有一条执行线,过程容易理解,可以在大脑中清晰的勾勒出代码的执行流程
多线程却是多条线,而且一般多条线之间有交互,多条线之间需要通信,一般难点有以下几点
- 多线程的执行结果不确定,受到cpu调度的影响
- 多线程的安全问题
- 线程资源宝贵,依赖线程池操作线程,线程池的参数设置问题
- 多线程执行是动态的,同时的,难以追踪过程
- 多线程的底层是操作系统层面的,源码难度大
有时候希望自己变成一个字节穿梭于服务器中,搞清楚来龙去脉,就像无敌破坏王一样(没看过这部电影的可以看下,脑洞大开)。
java多线程的基本使用
定义任务、创建和运行线程
任务: 线程的执行体。也就是我们的核心代码逻辑
定义任务
- 继承Thread类 (可以说是 将任务和线程合并在一起)
- 实现Runnable接口 (可以说是 将任务和线程分开了)
- 实现Callable接口 (利用FutureTask执行任务)
Thread实现任务的局限性
- 任务逻辑写在Thread类的run方法中,有单继承的局限性
- 创建多线程时,每个任务有成员变量时不共享,必须加static才能做到共享
Runnable和Callable解决了Thread的局限性
但是Runbale相比Callable有以下的局限性
- 任务没有返回值
- 任务无法抛异常给调用方
如下代码 几种定义线程的方式
@Slf4j class T extends Thread { @Override public void run() { log.info("我是继承Thread的任务"); } } @Slf4j class R implements Runnable { @Override public void run() { log.info("我是实现Runnable的任务"); } } @Slf4j class C implements Callable<String> { @Override public String call() throws Exception { log.info("我是实现Callable的任务"); return "success"; } }
创建线程的方式
- 通过Thread类直接创建线程
- 利用线程池内部创建线程
启动线程的方式
- 调用线程的start()方法
// 启动继承Thread类的任务 new T().start(); // 启动继承Thread匿名内部类的任务 可用lambda优化 Thread t = new Thread(){ @Override public void run() { log.info("我是Thread匿名内部类的任务"); } }; // 启动实现Runnable接口的任务 new Thread(new R()).start(); // 启动实现Runnable匿名实现类的任务 new Thread(new Runnable() { @Override public void run() { log.info("我是Runnable匿名内部类的任务"); } }).start(); // 启动实现Runnable的lambda简化后的任务 new Thread(() -> log.info("我是Runnable的lambda简化后的任务")).start(); // 启动实现了Callable接口的任务 结合FutureTask 可以获取线程执行的结果 FutureTask<String> target = new FutureTask<>(new C()); new Thread(target).start(); log.info(target.get());
以上各个线程相关的类的类图如下
上下文切换
多核cpu下,多线程是并行工作的,如果线程数多,单个核又会并发的调度线程,运行时会有上下文切换的概念
cpu执行线程的任务时,会为线程分配时间片,以下几种情况会发生上下文切换。
- 线程的cpu时间片用完
- 垃圾回收
- 线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法
当发生上下文切换时,操作系统会保存当前线程的状态,并恢复另一个线程的状态,jvm中有块内存地址叫程序计数器,用于记录线程执行到哪一行代码,是线程私有的。
idea打断点的时候可以设置为Thread模式,idea的debug模式可以看出栈帧的变化
线程的礼让-yield()&线程的优先级
yield()方法会让运行中的线程切换到就绪状态,重新争抢cpu的时间片,争抢时是否获取到时间片看cpu的分配。
代码如下
// 方法的定义 public static native void yield(); Runnable r1 = () -> { int count = 0; for (;;){ log.info("---- 1>" + count++); } }; Runnable r2 = () -> { int count = 0; for (;;){ Thread.yield(); log.info(" ---- 2>" + count++); } }; Thread t1 = new Thread(r1,"t1"); Thread t2 = new Thread(r2,"t2"); t1.start(); t2.start(); // 运行结果 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129504 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129505 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129506 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129507 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129508 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129509 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129510 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129511 11:49:15.796 [t1] INFO thread.TestYield - ---- 1>129512 11:49:15.798 [t2] INFO thread.TestYield - ---- 2>293 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129513 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129514 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129515 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129516 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129517 11:49:15.798 [t1] INFO thread.TestYield - ---- 1>129518
如上述结果所示,t2线程每次执行时进行了yield(),线程1执行的机会明显比线程2要多。
线程的优先级
线程内部用1~10的数来调整线程的优先级,默认的线程优先级为NORM_PRIORITY:5
cpu比较忙时,优先级高的线程获取更多的时间片
cpu比较闲时,优先级设置基本没用
public final static int MIN_PRIORITY = 1; public final static int NORM_PRIORITY = 5; public final static int MAX_PRIORITY = 10; // 方法的定义 public final void setPriority(int newPriority) { }
cpu比较忙时
Runnable r1 = () -> { int count = 0; for (;;){ log.info("---- 1>" + count++); } }; Runnable r2 = () -> { int count = 0; for (;;){ log.info(" ---- 2>" + count++); } }; Thread t1 = new Thread(r1,"t1"); Thread t2 = new Thread(r2,"t2"); t1.setPriority(Thread.NORM_PRIORITY); t2.setPriority(Thread.MAX_PRIORITY); t1.start(); t2.start(); // 可能的运行结果 11:59:00.696 [t1] INFO thread.TestYieldPriority - ---- 1>44102 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135903 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135904 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135905 11:59:00.696 [t2] INFO thread.TestYieldPriority - ---- 2>135906
cpu比较闲时
Runnable r1 = () -> { int count = 0; for (int i = 0; i < 10; i++) { log.info("---- 1>" + count++); } }; Runnable r2 = () -> { int count = 0; for (int i = 0; i < 10; i++) { log.info(" ---- 2>" + count++); } }; Thread t1 = new Thread(r1,"t1"); Thread t2 = new Thread(r2,"t2"); t1.setPriority(Thread.MIN_PRIORITY); t2.setPriority(Thread.MAX_PRIORITY); t1.start(); t2.start(); // 可能的运行结果 线程1优先级低 却先运行完 12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>7 12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>8 12:01:09.916 [t1] INFO thread.TestYieldPriority - ---- 1>9 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>2 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>3 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>4 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>5 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>6 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>7 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>8 12:01:09.916 [t2] INFO thread.TestYieldPriority - ---- 2>9
守护线程
默认情况下,java进程需要等待所有线程都运行结束,才会结束,有一种特殊线程叫守护线程,当所有的非守护线程都结束后,即使它没有执行完,也会强制结束。
默认的线程都是非守护线程。
垃圾回收线程就是典型的守护线程
// 方法的定义 public final void setDaemon(boolean on) { } Thread thread = new Thread(() -> { while (true) { } }); // 具体的api。设为true表示未守护线程,当主线程结束后,守护线程也结束。 // 默认是false,当主线程结束后,thread继续运行,程序不停止 thread.setDaemon(true); thread.start(); log.info("结束");
线程的阻塞
线程的阻塞可以分为好多种,从操作系统层面和java层面阻塞的定义可能不同,但是广义上使得线程阻塞的方式有下面几种
- BIO阻塞,即使用了阻塞式的io流
- sleep(long time) 让线程休眠进入阻塞状态
- a.join() 调用该方法的线程进入阻塞,等待a线程执行完恢复运行
- sychronized或ReentrantLock 造成线程未获得锁进入阻塞状态 (同步锁章节细说)
- 获得锁之后调用wait()方法 也会让线程进入阻塞状态 (同步锁章节细说)
- LockSupport.park() 让线程进入阻塞状态 (同步锁章节细说)
sleep()
使线程休眠,会将运行中的线程进入阻塞状态。当休眠时间结束后,重新争抢cpu的时间片继续运行
// 方法的定义 native方法 public static native void sleep(long millis) throws InterruptedException; try { // 休眠2秒 // 该方法会抛出 InterruptedException异常 即休眠过程中可被中断,被中断后抛出异常 Thread.sleep(2000); } catch (InterruptedException异常 e) { } try { // 使用TimeUnit的api可替代 Thread.sleep TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { }
join()
join是指调用该方法的线程进入阻塞状态,等待某线程执行完成后恢复运行
// 方法的定义 有重载 // 等待线程执行完才恢复运行 public final void join() throws InterruptedException { } // 指定join的时间。指定时间内 线程还未执行完 调用方线程不继续等待就恢复运行 public final synchronized void join(long millis) throws InterruptedException{}
Thread t = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } r = 10; }); t.start(); // 让主线程阻塞 等待t线程执行完才继续执行 // 去除该行,执行结果为0,加上该行 执行结果为10 t.join(); log.info("r:{}", r); // 运行结果 13:09:13.892 [main] INFO thread.TestJoin - r:10
线程的打断-interrupt()
// 相关方法的定义 public void interrupt() { } public boolean isInterrupted() { } public static boolean interrupted() { }
打断标记:线程是否被打断,true表示被打断了,false表示没有
isInterrupted() 获取线程的打断标记 ,调用后不会修改线程的打断标记
interrupt()方法用于中断线程
- 可以打断sleep,wait,join等显式的抛出InterruptedException方法的线程,但是打断后,线程的打断标记还是false
- 打断正常线程 ,线程不会真正被中断,但是线程的打断标记为true
interrupted() 获取线程的打断标记,调用后清空打断标记 即如果获取为true 调用后打断标记为false (不常用)
interrupt实例: 有个后台监控线程不停的监控,当外界打断它时,就结束运行。代码如下
@Slf4j class TwoPhaseTerminal{ // 监控线程 private Thread monitor; public void start(){ monitor = new Thread(() ->{ // 不停的监控 while (true){ Thread thread = Thread.currentThread(); // 判断当前线程是否被打断 if (thread.isInterrupted()){ log.info("当前线程被打断,结束运行"); break; } try { Thread.sleep(1000); // 监控逻辑中被打断后,打断标记为true log.info("监控"); } catch (InterruptedException e) { // 睡眠时被打断时抛出异常 在该处捕获到 此时打断标记还是false // 在调用一次中断 使得中断标记为true thread.interrupt(); } } }); monitor.start(); } public void stop(){ monitor.interrupt(); } }
线程的状态
上面说了一些基本的api的使用,调用上面的方法后都会使得线程有对应的状态。
线程的状态可从 操作系统层面分为五种状态 从java api层面分为六种状态。
五种状态
- 初始状态:创建线程对象时的状态
- 可运行状态(就绪状态):调用start()方法后进入就绪状态,也就是准备好被cpu调度执行
- 运行状态:线程获取到cpu的时间片,执行run()方法的逻辑
- 阻塞状态: 线程被阻塞,放弃cpu的时间片,等待解除阻塞重新回到就绪状态争抢时间片
- 终止状态: 线程执行完成或抛出异常后的状态
六种状态
Thread类中的内部枚举State
public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
- NEW 线程对象被创建
- Runnable 线程调用了start()方法后进入该状态,该状态包含了三种情况
- 就绪状态 :等待cpu分配时间片
- 运行状态:进入Runnable方法执行任务
- 阻塞状态:BIO 执行阻塞式io流时的状态
- Blocked 没获取到锁时的阻塞状态(同步锁章节会细说)
- WAITING 调用wait()、join()等方法后的状态
- TIMED_WAITING 调用 sleep(time)、wait(time)、join(time)等方法后的状态
- TERMINATED 线程执行完成或抛出异常后的状态
六种线程状态和方法的对应关系
线程的相关方法总结
主要总结Thread类中的核心方法
方法名称 是否static 方法说明 start() 否 让线程启动,进入就绪状态,等待cpu分配时间片 run() 否 重写Runnable接口的方法,线程获取到cpu时间片时执行的具体逻辑 yield() 是 线程的礼让,使得获取到cpu时间片的线程进入就绪状态,重新争抢时间片 sleep(time) 是 线程休眠固定时间,进入阻塞状态,休眠时间完成后重新争抢时间片,休眠可被打断 join()/join(time) 否 调用线程对象的join方法,调用者线程进入阻塞,等待线程对象执行完或者到达指定时间才恢复,重新争抢时间片 isInterrupted() 否 获取线程的打断标记,true:被打断,false:没有被打断。调用后不会修改打断标记 interrupt() 否 打断线程,抛出InterruptedException异常的方法均可被打断,但是打断后不会修改打断标记,正常执行的线程被打断后会修改打断标记 interrupted() 否 获取线程的打断标记。调用后会清空打断标记 stop() 否 停止线程运行 不推荐 suspend() 否 挂起线程 不推荐 resume() 否 恢复线程运行 不推荐 currentThread() 是 获取当前线程 Object中与线程相关方法
方法名称 方法说明 wait()/wait(long timeout) 获取到锁的线程进入阻塞状态 notify() 随机唤醒被wait()的一个线程 notifyAll(); 唤醒被wait()的所有线程,重新争抢时间片 同步锁
线程安全
- 一个程序运行多个线程本身是没有问题的
- 问题有可能出现在多个线程访问共享资源
- 多个线程都是读共享资源也是没有问题的
- 当多个线程读写共享资源时,如果发生指令交错,就会出现问题
临界区: 一段代码如果对共享资源的多线程读写操作,这段代码就被称为临界区。
注意的是 指令交错指的是 java代码在解析成字节码文件时,java代码的一行代码在字节码中可能有多行,在线程上下文切换时就有可能交错。
线程安全指的是多线程调用同一个对象的临界区的方法时,对象的属性值一定不会发生错误,这就是保证了线程安全。
如下面不安全的代码
// 对象的成员变量 private static int count = 0; public static void main(String[] args) throws InterruptedException { // t1线程对变量+5000次 Thread t1 = new Thread(() -> { for (int i = 0; i < 5000; i++) { count++; } }); // t2线程对变量-5000次 Thread t2 = new Thread(() -> { for (int i = 0; i < 5000; i++) { count--; } }); t1.start(); t2.start(); // 让t1 t2都执行完 t1.join(); t2.join(); System.out.println(count); } // 运行结果 -1399
上面的代码 两个线程,一个+5000次,一个-5000次,如果线程安全,count的值应该还是0。
但是运行很多次,每次的结果不同,且都不是0,所以是线程不安全的。
线程安全的类一定所有的操作都线程安全吗?
开发中经常会说到一些线程安全的类,如ConcurrentHashMap,线程安全指的是类里每一个独立的方法是线程安全的,但是方法的组合就不一定是线程安全的。
成员变量和静态变量是否线程安全?
- 如果没有多线程共享,则线程安全
- 如果存在多线程共享
- 多线程只有读操作,则线程安全
- 多线程存在写操作,写操作的代码又是临界区,则线程不安全
局部变量是否线程安全?
- 局部变量是线程安全的
- 局部变量引用的对象未必是线程安全的
- 如果该对象没有逃离该方法的作用范围,则线程安全
- 如果该对象逃离了该方法的作用范围,比如:方法的返回值,需要考虑线程安全
synchronized
同步锁也叫对象锁,是锁在对象上的,不同的对象就是不同的锁。
该关键字是用于保证线程安全的,是阻塞式的解决方案。
让同一个时刻最多只有一个线程能持有对象锁,其他线程在想获取这个对象锁就会被阻塞,不用担心上下文切换的问题。
注意: 不要理解为一个线程加了锁 ,进入 synchronized代码块中就会一直执行下去。如果时间片切换了,也会执行其他线程,再切换回来会紧接着执行,只是不会执行到有竞争锁的资源,因为当前线程还未释放锁。
当一个线程执行完synchronized的代码块后 会唤醒正在等待的线程
synchronized实际上使用对象锁保证临界区的原子性 临界区的代码是不可分割的 不会因为线程切换所打断
基本使用
// 加在方法上 实际是对this对象加锁 private synchronized void a() { } // 同步代码块,锁对象可以是任意的,加在this上 和a()方法作用相同 private void b(){ synchronized (this){ } } // 加在静态方法上 实际是对类对象加锁 private synchronized static void c() { } // 同步代码块 实际是对类对象加锁 和c()方法作用相同 private void d(){ synchronized (TestSynchronized.class){ } } // 上述b方法对应的字节码源码 其中monitorenter就是加锁的地方 0 aload_0 1 dup 2 astore_1 3 monitorenter 4 aload_1 5 monitorexit 6 goto 14 (+8) 9 astore_2 10 aload_1 11 monitorexit 12 aload_2 13 athrow 14 return
线程安全的代码
private static int count = 0; private static Object lock = new Object(); private static Object lock2 = new Object(); // t1线程和t2对象都是对同一对象加锁。保证了线程安全。此段代码无论执行多少次,结果都是0 public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 5000; i++) { synchronized (lock) { count++; } } }); Thread t2 = new Thread(() -> { for (int i = 0; i < 5000; i++) { synchronized (lock) { count--; } } }); t1.start(); t2.start(); // 让t1 t2都执行完 t1.join(); t2.join(); System.out.println(count); }
重点:加锁是加在对象上,一定要保证是同一对象,加锁才能生效
线程通信
wait+notify
线程间通信可以通过共享变量+wait()¬ify()来实现
wait()将线程进入阻塞状态,notify()将线程唤醒
当多线程竞争访问对象的同步方法时,锁对象会关联一个底层的Monitor对象(重量级锁的实现)
如下图所示 Thread0,1先竞争到锁执行了代码后,2,3,4,5线程同时来执行临界区的代码,开始竞争锁
- Thread-0先获取到对象的锁,关联到monitor的owner,同步代码块内调用了锁对象的wait()方法,调用后会进入waitSet等待,Thread-1同样如此,此时Thread-0的状态为Waitting
- Thread2、3、4、5同时竞争,2获取到锁后,关联了monitor的owner,3、4、5只能进入EntryList中等待,此时2线程状态为 Runnable,3、4、5状态为Blocked
- 2执行后,唤醒entryList中的线程,3、4、5进行竞争锁,获取到的线程即会关联monitor的owner
- 3、4、5线程在执行过程中,调用了锁对象的notify()或notifyAll()时,会唤醒waitSet的线程,唤醒的线程进入entryList等待重新竞争锁
注意:
-
Blocked状态和Waitting状态都是阻塞状态
-
Blocked线程会在owner线程释放锁时唤醒
-
wait和notify使用场景是必须要有同步,且必须获得对象的锁才能调用,使用锁对象去调用,否则会抛异常
- wait() 释放锁 进入 waitSet 可传入时间,如果指定时间内未被唤醒 则自动唤醒
- notify()随机唤醒一个waitSet里的线程
- notifyAll()唤醒waitSet中所有的线程
static final Object lock = new Object(); new Thread(() -> { synchronized (lock) { log.info("开始执行"); try { // 同步代码内部才能调用 lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("继续执行核心逻辑"); } }, "t1").start(); new Thread(() -> { synchronized (lock) { log.info("开始执行"); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("继续执行核心逻辑"); } }, "t2").start(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("开始唤醒"); synchronized (lock) { // 同步代码内部才能调用 lock.notifyAll(); } // 执行结果 14:29:47.138 [t1] INFO TestWaitNotify - 开始执行 14:29:47.141 [t2] INFO TestWaitNotify - 开始执行 14:29:49.136 [main] INFO TestWaitNotify - 开始唤醒 14:29:49.136 [t2] INFO TestWaitNotify - 继续执行核心逻辑 14:29:49.136 [t1] INFO TestWaitNotify - 继续执行核心逻辑
wait 和 sleep的区别?
二者都会让线程进入阻塞状态,有以下区别
- wait是Object的方法 sleep是Thread的方法
- wait会立即释放锁 sleep不会释放锁
- wait后线程的状态是Watting sleep后线程的状态为 Time_Waiting
park&unpark
LockSupport是juc下的工具类,提供了park和unpark方法,可以实现线程通信
与wait和notity相比的不同点
- wait 和notify需要获取对象锁 park unpark不要
- unpark 可以指定唤醒线程 notify随机唤醒
- park和unpark的顺序可以先unpark wait和notify的顺序不能颠倒
生产者消费者模型
指的是有生产者来生产数据,消费者来消费数据,生产者生产满了就不生产了,通知消费者取,等消费了再进行生产。
消费者消费不到了就不消费了,通知生产者生产,生产到了再继续消费。
public static void main(String[] args) throws InterruptedException { MessageQueue queue = new MessageQueue(2); // 三个生产者向队列里存值 for (int i = 0; i < 3; i++) { int id = i; new Thread(() -> { queue.put(new Message(id, "值" + id)); }, "生产者" + i).start(); } Thread.sleep(1000); // 一个消费者不停的从队列里取值 new Thread(() -> { while (true) { queue.take(); } }, "消费者").start(); } } // 消息队列被生产者和消费者持有 class MessageQueue { private LinkedList<Message> list = new LinkedList<>(); // 容量 private int capacity; public MessageQueue(int capacity) { this.capacity = capacity; } /** * 生产 */ public void put(Message message) { synchronized (list) { while (list.size() == capacity) { log.info("队列已满,生产者等待"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.addLast(message); log.info("生产消息:{}", message); // 生产后通知消费者 list.notifyAll(); } } public Message take() { synchronized (list) { while (list.isEmpty()) { log.info("队列已空,消费者等待"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = list.removeFirst(); log.info("消费消息:{}", message); // 消费后通知生产者 list.notifyAll(); return message; } } } // 消息 class Message { private int id; private Object value; }
同步锁案例
为了更形象的表达加同步锁的概念,这里举一个生活中的例子,尽量把以上的概念具体化出来。
这里举一个每个人非常感兴趣的一件东西。 钱!!!(马老师除外)。
现实中,我们去银行门口的自动取款机取钱,取款机的钱就是共享变量,为了保障安全,不可能两个陌生人同时进入同一个取款机内取钱,所以只能一个人进入取钱,然后锁上取款机的门,其他人只能在取款机门口等待。
取款机有多个,里面的钱互不影响,锁也有多个(多个对象锁),取钱人在多个取款机里同时取钱也没有安全问题。
假如每个取钱的陌生人都是线程,当取钱人进入取款机锁了门后(线程获得锁),取到钱后出门(线程释放锁),下一个人竞争到锁来取钱。
假设工作人员也是一个线程,如果取钱人进入后发现取款机钱不足了,这时通知工作人员来向取款机里加钱(调用notifyAll方法),取钱人暂停取钱,进入银行大堂阻塞等待(调用wait方法)。
银行大堂里的工作人员和取钱人都被唤醒,重新竞争锁,进入后如果是取钱人,由于取款机没钱,还得进入银行大堂等待。
当工作人员获得取款机的锁进入后,加了钱后会通知大厅里的人来取钱(调用notifyAll方法)。自己暂停加钱,进入银行大堂等待唤醒加钱(调用wait方法)。
这时大堂里等待的人都来竞争锁,谁获取到谁进入继续取钱。
和现实中不同的就是这里没有排队的概念,谁抢到锁谁进去取。
ReentrantLock
可重入锁 : 一个线程获取到对象的锁后,执行方法内部在需要获取锁的时候是可以获取到的。如以下代码
private static final ReentrantLock LOCK = new ReentrantLock(); private static void m() { LOCK.lock(); try { log.info("begin"); // 调用m1() m1(); } finally { // 注意锁的释放 LOCK.unlock(); } } public static void m1() { LOCK.lock(); try { log.info("m1"); m2(); } finally { // 注意锁的释放 LOCK.unlock(); } }
synchronized 也是可重入锁,ReentrantLock有以下优点
- 支持获取锁的超时时间
- 获取锁时可被打断
- 可设为公平锁
- 可以有不同的条件变量,即有多个waitSet,可以指定唤醒
api
// 默认非公平锁,参数传true 表示未公平锁 ReentrantLock lock = new ReentrantLock(false); // 尝试获取锁 lock() // 释放锁 应放在finally块中 必须执行到 unlock() try { // 获取锁时可被打断,阻塞中的线程可被打断 LOCK.lockInterruptibly(); } catch (InterruptedException e) { return; } // 尝试获取锁 获取不到就返回false LOCK.tryLock() // 支持超时时间 一段时间没获取到就返回false tryLock(long timeout, TimeUnit unit) // 指定条件变量 休息室 一个锁可以创建多个休息室 Condition waitSet = ROOM.newCondition(); // 释放锁 进入waitSet等待 释放后其他线程可以抢锁 yanWaitSet.await() // 唤醒具体休息室的线程 唤醒后 重写竞争锁 yanWaitSet.signal()
实例:一个线程输出a,一个线程输出b,一个线程输出c,abc按照顺序输出,连续输出5次
这个考的就是线程的通信,利用 wait()/notify()和控制变量可以实现,此处使用ReentrantLock即可实现该功能。
public static void main(String[] args) { AwaitSignal awaitSignal = new AwaitSignal(5); // 构建三个条件变量 Condition a = awaitSignal.newCondition(); Condition b = awaitSignal.newCondition(); Condition c = awaitSignal.newCondition(); // 开启三个线程 new Thread(() -> { awaitSignal.print("a", a, b); }).start(); new Thread(() -> { awaitSignal.print("b", b, c); }).start(); new Thread(() -> { awaitSignal.print("c", c, a); }).start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } awaitSignal.lock(); try { // 先唤醒a a.signal(); } finally { awaitSignal.unlock(); } } } class AwaitSignal extends ReentrantLock { // 循环次数 private int loopNumber; public AwaitSignal(int loopNumber) { this.loopNumber = loopNumber; } /** * @param print 输出的字符 * @param current 当前条件变量 * @param next 下一个条件变量 */ public void print(String print, Condition current, Condition next) { for (int i = 0; i < loopNumber; i++) { lock(); try { try { // 获取锁之后等待 current.await(); System.out.print(print); } catch (InterruptedException e) { } next.signal(); } finally { unlock(); } } }
死锁
说到死锁,先举个例子,
下面是代码实现
static Beer beer = new Beer(); static Story story = new Story(); public static void main(String[] args) { new Thread(() ->{ synchronized (beer){ log.info("我有酒,给我故事"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (story){ log.info("小王开始喝酒讲故事"); } } },"小王").start(); new Thread(() ->{ synchronized (story){ log.info("我有故事,给我酒"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (beer){ log.info("老王开始喝酒讲故事"); } } },"老王").start(); } class Beer { } class Story{ }
死锁导致程序无法正常运行下去
检测工具可以检查到死锁信息
java内存模型(JMM)
jmm 体现在以下三个方面
- 原子性 保证指令不会受到上下文切换的影响
- 可见性 保证指令不会受到cpu缓存的影响
- 有序性 保证指令不会受并行优化的影响
可见性
停不下来的程序
static boolean run = true; public static void main(String[] args) throws InterruptedException { Thread t = new Thread(() -> { while (run) { // .... } }); t.start(); Thread.sleep(1000); // 线程t不会如预想的停下来 run = false; }
如上图所示,线程有自己的工作缓存,当主线程修改了变量并同步到主内存时,t线程没有读取到,所以程序停不下来
有序性
JVM在不影响程序正确性的情况下可能会调整语句的执行顺序,该情况也称为 指令重排序
static int i; static int j; // 在某个线程内执行如下赋值操作 i = ...; j = ...; 有可能将j先赋值
原子性
原子性大家应该比较熟悉,上述同步锁的synchronized代码块就是保证了原子性,就是一段代码是一个整体,原子性保证了线程安全,不会受到上下文切换的影响。
volatile
该关键字解决了可见性和有序性,volatile通过内存屏障来实现的
- 写屏障
会在对象写操作之后加写屏障,会对写屏障的之前的数据都同步到主存,并且保证写屏障的执行顺序在写屏障之前
- 读屏障
会在对象读操作之前加读屏障,会在读屏障之后的语句都从主存读,并保证读屏障之后的代码执行在读屏障之后
注意: volatile不能解决原子性,即不能通过该关键字实现线程安全。
volatile应用场景:一个线程读取变量,另外的线程操作变量,加了该关键字后保证写变量后,读变量的线程可以及时感知。
无锁-cas
cas (compare and swap) 比较并交换
为变量赋值时,从内存中读取到的值v,获取到要交换的新值n,执行 compareAndSwap()方法时,比较v和当前内存中的值是否一致,如果一致则将n和v交换,如果不一致,则自旋重试。
cas底层是cpu层面的,即不使用同步锁也可以保证操作的原子性。
private AtomicInteger balance; // 模拟cas的具体操作 @Override public void withdraw(Integer amount) { while (true) { // 获取当前值 int pre = balance.get(); // 进行操作后得到新值 int next = pre - amount; // 比较并设置成功 则中断 否则自旋重试 if (balance.compareAndSet(pre, next)) { break; } } }
无锁的效率是要高于之前的锁的,由于无锁不会涉及线程的上下文切换
cas是乐观锁的思想,sychronized是悲观锁的思想
cas适合很少有线程竞争的场景,如果竞争很强,重试经常发生,反而降低效率
juc并发包下包含了实现了cas的原子类
- AtomicInteger/AtomicBoolean/AtomicLong
- AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray
- AtomicReference/AtomicStampedReference/AtomicMarkableReference
AtomicInteger
常用api
new AtomicInteger(balance) get() compareAndSet(pre, next) // i.incrementAndGet() ++i // i.decrementAndGet() --i // i.getAndIncrement() i++ // i.getAndDecrement() ++i i.addAndGet() // 传入函数式接口 修改i int getAndUpdate(IntUnaryOperator updateFunction) // cas 的核心方法 compareAndSet(int expect, int update)
ABA问题
cas存在ABA问题,即比较并交换时,如果原值为A,有其他线程将其修改为B,在有其他线程将其修改为A。
此时实际发生过交换,但是比较和交换由于值没改变可以交换成功
解决方式
AtomicStampedReference/AtomicMarkableReference
上面两个类解决ABA问题,原理就是为对象增加版本号,每次修改时增加版本号,就可以避免ABA问题
或者增加个布尔变量标识,修改后调整布尔变量值,也可以避免ABA问题
线程池
线程池的介绍
线程池是java并发最重要的一个知识点,也是难点,是实际应用最广泛的。
线程的资源很宝贵,不可能无限的创建,必须要有管理线程的工具,线程池就是一种管理线程的工具,java开发中经常有池化的思想,如 数据库连接池、Redis连接池等。
预先创建好一些线程,任务提交时直接执行,既可以节约创建线程的时间,又可以控制线程的数量。
线程池的好处
- 降低资源消耗,通过池化思想,减少创建线程和销毁线程的消耗,控制资源
- 提高响应速度,任务到达时,无需创建线程即可运行
- 提供更多更强大的功能,可扩展性高
线程池的构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
构造器参数的意义
参数名 参数意义 corePoolSize 核心线程数 maximumPoolSize 最大线程数 keepAliveTime 救急线程的空闲时间 unit 救急线程的空闲时间单位 workQueue 阻塞队列 threadFactory 创建线程的工厂,主要定义线程名 handler 拒绝策略 线程池案例
下面 我们通过一个实例来理解线程池的参数以及线程池的接收任务的过程
如上图 银行办理业务。
- 客户到银行时,开启柜台进行办理,柜台相当于线程,客户相当于任务,有两个是常开的柜台,三个是临时柜台。2就是核心线程数,5是最大线程数。即有两个核心线程
- 当柜台开到第二个后,都还在处理业务。客户再来就到排队大厅排队。排队大厅只有三个座位。
- 排队大厅坐满时,再来客户就继续开柜台处理,目前最大有三个临时柜台,也就是三个救急线程
- 此时再来客户,就无法正常为其 提供业务,采用拒绝策略来处理它们
- 当柜台处理完业务,就会从排队大厅取任务,当柜台隔一段空闲时间都取不到任务时,如果当前线程数大于核心线程数时,就会回收线程。即撤销该柜台。
线程池的状态
线程池通过一个int变量的高3位来表示线程池的状态,低29位来存储线程池的数量
状态名称 高三位 接收新任务 处理阻塞队列任务 说明 Running 111 Y Y 正常接收任务,正常处理任务 Shutdown 000 N Y 不会接收任务,会执行完正在执行的任务,也会处理阻塞队列里的任务 stop 001 N N 不会接收任务,会中断正在执行的任务,会放弃处理阻塞队列里的任务 Tidying 010 N N 任务全部执行完毕,当前活动线程是0,即将进入终结 Termitted 011 N N 终结状态 // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
线程池的主要流程
线程池创建、接收任务、执行任务、回收线程的步骤
- 创建线程池后,线程池的状态是Running,该状态下才能有下面的步骤
- 提交任务时,线程池会创建线程去处理任务
- 当线程池的工作线程数达到corePoolSize时,继续提交任务会进入阻塞队列
- 当阻塞队列装满时,继续提交任务,会创建救急线程来处理
- 当线程池中的工作线程数达到maximumPoolSize时,会执行拒绝策略
- 当线程取任务的时间达到keepAliveTime还没有取到任务,工作线程数大于corePoolSize时,会回收该线程
注意: 不是刚创建的线程是核心线程,后面创建的线程是非核心线程,线程是没有核心非核心的概念的,这是我长期以来的误解。
拒绝策略
- 调用者抛出RejectedExecutionException (默认策略)
- 让调用者运行任务
- 丢弃此次任务
- 丢弃阻塞队列中最早的任务,加入该任务
提交任务的方法
// 执行Runnable public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } // 提交Callable public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 内部构建FutureTask RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } // 提交Runnable,指定返回值 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 内部构建FutureTask RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } // 提交Runnable,指定返回值 public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); // 内部构建FutureTask RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
Execetors创建线程池
注意: 下面几种方式都不推荐使用
1.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
- 核心线程数 = 最大线程数 没有救急线程
- 阻塞队列无界 可能导致oom
2.newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
- 核心线程数是0,最大线程数无限制 ,救急线程60秒回收
- 队列采用 SynchronousQueue 实现 没有容量,即放入队列后没有线程来取就放不进去
- 可能导致线程数过多,cpu负担太大
3.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
- 核心线程数和最大线程数都是1,没有救急线程,无界队列 可以不停的接收任务
- 将任务串行化 一个个执行, 使用包装类是为了屏蔽修改线程池的一些参数 比如 corePoolSize
- 如果某线程抛出异常了,会重新创建一个线程继续执行
- 可能造成oom
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
- 任务调度的线程池 可以指定延迟时间调用,可以指定隔一段时间调用
线程池的关闭
shutdown()
会让线程池状态为shutdown,不能接收任务,但是会将工作线程和阻塞队列里的任务执行完 相当于优雅关闭
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow()
会让线程池状态为stop, 不能接收任务,会立即中断执行中的工作线程,并且不会执行阻塞队列里的任务, 会返回阻塞队列的任务列表
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
线程池的正确使用姿势
线程池难就难在参数的配置,有一套理论配置参数
cpu密集型 : 指的是程序主要发生cpu的运算
核心线程数: CPU核心数+1
IO密集型: 远程调用RPC,操作数据库等,不需要使用cpu进行大量的运算。 大多数应用的场景
核心线程数=核数*cpu期望利用率 *总时间/cpu运算时间
但是基于以上理论还是很难去配置,因为cpu运算时间不好估算
实际配置大小可参考下表
cpu密集型 io密集型 线程数数量 核数<=x<=核数*2 核心数*50<=x<=核心数 *100 队列长度 y>=100 1<=y<=10 1.线程池参数通过分布式配置,修改配置无需重启应用
线程池参数是根据线上的请求数变化而变化的,最好的方式是 核心线程数、最大线程数 队列大小都是可配置的
主要配置 corePoolSize maxPoolSize queueSize
java提供了可方法覆盖参数,线程池内部会处理好参数 进行平滑的修改
public void setCorePoolSize(int corePoolSize) { }
2.增加线程池的监控
3.io密集型可调整为先新增任务到最大线程数后再将任务放到阻塞队列
代码 主要可重写阻塞队列 加入任务的方法
public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } final ReentrantLock lock = this.lock; lock.lock(); try { int currentPoolThreadSize = executor.getPoolSize(); // 如果提交任务数小于当前创建的线程数, 说明还有空闲线程, if (executor.getTaskCount() < currentPoolThreadSize) { // 将任务放入队列中,让线程去处理任务 return super.offer(runnable); } // 核心改动 // 如果当前线程数小于最大线程数,则返回 false ,让线程池去创建新的线程 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // 否则,就将任务放入队列中 return super.offer(runnable); } finally { lock.unlock(); } }
3.拒绝策略 建议使用tomcat的拒绝策略(给一次机会)
// tomcat的源码 @Override public void execute(Runnable command) { if ( executor != null ) { try { executor.execute(command); } catch (RejectedExecutionException rx) { // 捕获到异常后 在从队列获取,相当于重试1取不到任务 在执行拒绝任务 if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full."); } } else throw new IllegalStateException("StandardThreadPool not started."); }
建议修改从队列取任务的方式: 增加超时时间,超时1分钟取不到在进行返回
public boolean offer(E e, long timeout, TimeUnit unit){}
结语
工作三四年了,还没有正式的写过博客,自学一直都是通过笔记的方式积累,最近重新学了一下java多线程,想着周末把这部分内容认真的写篇博客分享出去。
文章篇幅较长,给看到这里的小伙伴点个大大的赞!由于作者水平有限,加之第一次写博客,文章中难免会有错误之处,欢迎小伙伴们反馈指正。
如果觉得文章对你有帮助,麻烦 点赞、评论、转发、在看 走起
你的支持是我最大的动力!!!
-
Java多线程之线程安全问题
2022-03-31 11:02:50本篇文章介绍的内容为Java多线程中的线程安全问题,此处的安全问题并不是指的像黑客入侵造成的安全问题,线程安全问题是指因多线程抢占式执行而导致程序出现bug的问题。⭐️前面的话⭐️
本篇文章介绍的内容为Java多线程中的线程安全问题,此处的安全问题并不是指的像黑客入侵造成的安全问题,线程安全问题是指因多线程抢占式执行而导致程序出现bug的问题。
📒博客主页:未见花闻的博客主页
🎉欢迎关注🔎点赞👍收藏⭐️留言📝
📌本文由未见花闻原创,CSDN首发!
📆首发时间:🌴2022年4月2日🌴
✉️坚持和努力一定能换来诗与远方!
💭参考书籍:📚《Java编程思想》,📚《Java核心技术》
💬参考在线编程网站:🌐牛客网🌐力扣
博主的码云gitee,平常博主写的程序代码都在里面。
博主的github,平常博主写的程序代码都在里面。
🍭作者水平很有限,如果发现错误,一定要及时告知作者哦!感谢感谢!
📌导航小助手📌
☁️1.线程安全概述
❄️1.1什么是线程安全问题
首先我们需要明白操作系统中线程的调度是抢占式执行的,或者说是随机的,这就造成线程调度执行时线程的执行顺序是不确定的,有一些代码执行顺序不同不影响程序运行的结果,但也有一些代码执行顺序发生改变了重写的运行结果会受影响,这就造成程序会出现bug,对于多线程并发时会使程序出现bug的代码称作线程不安全的代码,这就是线程安全问题。
下面,将介绍一种典型的线程安全问题实例,整数自增问题。
❄️1.2一个存在线程安全问题的程序
有一天,老师布置了这样一个问题:使用两个线程将变量
count
自增10
万次,每个线程承担5
万次的自增任务,变量count
的初始值为0
。
这个问题很简单,最终的结果我们也能够口算出来,答案就是10
万。
小明同学做事非常迅速,很快就写出了下面的一段代码:class Counter { private int count; public void increase() { ++this.count; } public int getCount() { return this.count; } } public class Main11 { private static final int CNT = 50000; private static final Counter counter = new Counter(); public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { for (int i = 0; i < CNT; i++) { counter.increase(); } }); Thread thread2 = new Thread(() -> { for (int j = 0; j < CNT; j++) { counter.increase(); } }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.getCount()); } }
按理来说,结果应该是
10
万,我们来看看运行结果:
运行的结果比10
万要小,你可以试着运行该程序你会发现每次运行的结果都不一样,但绝大部分情况,结果都会比预期的值要小,下面我们就来分析分析为什么会这样。☁️2.线程加锁与线程不安全的原因
❄️2.1案例分析
上面我们使用多线程运行了一个程序,将一个变量值为0的变量自增10万次,但是最终实际结果比我们预期结果要小,原因就是线程调度的顺序是随机的,造成线程间自增的指令集交叉,导致运行时出现两次自增但值只自增一次的情况,所以得到的结果会偏小。
我们知道一次自增操作可以包含以下几条指令:
- 将内存中变量的值加载到寄存器,不妨将该操作记为
load
。 - 在寄存器中执行自增操作,不妨将该操作记为
add
。 - 将寄存器的值保存至内存中,不妨将该操作记为
save
。
我们来画一条时间轴,来总结一下常见的几种情况:
⭐️情况1: 线程间指令集,无交叉,运行结果与预期相同,图中寄存器A表示线程1所用的寄存器,寄存器B表示线程2所用的寄存器,后续情况同理。
⭐️情况2: 线程间指令集存在交叉,运行结果低于预期结果。
⭐️情况3: 线程间指令集完全交叉,实际结果低于预期。
根据上面我们所列举的情况,发现线程运行时没有交叉指令的时候运行结果是正常的,但是一旦有了交叉会导致自增操作的结果会少1
,综上可以得到一个结论,那就是由于自增操作不是原子性的,多个线程并发执行时很可能会导致执行的指令交叉,导致线程安全问题。那如何解决上述线程不安全的问题呢?当然有,那就是对对象加锁。
❄️2.2线程加锁
⚡️2.2.1什么是加锁
为了解决由于“抢占式执行”所导致的线程安全问题,我们可以对操作的对象进行加锁,当一个线程拿到该对象的锁后,会将该对象锁起来,其他线程如果需要执行该对象的任务时,需要等待该线程运行完该对象的任务后才能执行。
举个例子,假设要你去银行的ATM机存钱或者取款,每台ATM机一般都在一间单独的小房子里面,这个小房子有一扇门一把锁,你进去使用ATM机时,门会自动的锁上,这个时候如果有人要来取款,那它得等你使用完并出来它才能进去使用ATM,那么这里的“你”相当于线程,ATM相当于一个对象,小房子相当于一把锁,其他的人相当于其他的线程。
在java中最常用的加锁操作就是使用synchronized
关键字进行加锁。⚡️2.2.2如何加锁
synchronized 会起到互斥效果, 某个线程执行到某个对象的 synchronized 中时, 其他线程如果也执行到同一个对象 synchronized 就会阻塞等待。
线程进入 synchronized 修饰的代码块, 相当于加锁
,退出 synchronized 修饰的代码块, 相当于解锁
。java中的加锁操作可以使用
synchronized
关键字来实现,它的常见使用方式如下:⭐️方式1: 使用
synchronized
关键字修饰普通方法,这样会使方法所在的对象加上一把锁。
例如,就以上面自增的程序为例,尝试使用synchronized
关键字进行加锁,如下我对increase
方法进行了加锁,实际上是对某个对象加锁,此锁的对象就是this
,本质上加锁操作就是修改this
对象头的标记位。class Counter { private int count; synchronized public void increase() { ++this.count; } public int getCount() { return this.count; } }
多线程自增的main方法如下,后面会以相同的栗子介绍
synchronized
的其他用法,后面就不在列出这段代码了。public class Main11 { private static final int CNT = 50000; private static final Counter counter = new Counter(); public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { for (int i = 0; i < CNT; i++) { counter.increase(); } }); Thread thread2 = new Thread(() -> { for (int j = 0; j < CNT; j++) { counter.increase(); } }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.getCount()); } }
看看运行结果:
⭐️方式2: 使用synchronized
关键字对代码段进行加锁,但是需要显式指定加锁的对象。
例如:class Counter { private int count; public void increase() { synchronized (this){ ++this.count; } } public int getCount() { return this.count; } }
运行结果:
⭐️方式3: 使用synchronized
关键字修饰静态方法,相当于对当前类的类对象进行加锁。class Counter { private static int count; synchronized public static void increase() { ++count; } public int getCount() { return this.count; } }
运行结果:
常见的用法差不多就是这些,对于线程加锁(线程拿锁),如果两个线程同时拿一个对象的锁,就会产生锁竞争,两个线程同时拿两个不同对象的锁不会产生锁竞争。
对于synchronized
这个关键字,它的英文意思是同步,但是同步在计算机中是存在多种意思的,比如在多线程中,这里同步的意思是“互斥”;而在IO或网络编程中同步指的是“异步”,与多线程没有半点的关系。⭐️synchronized 的工作过程:
- 获得互斥锁
lock
- 从主内存拷贝变量的最新副本到工作的内存
- 执行代码
- 将更改后的共享变量的值刷新到主内存
- 释放互斥锁
unlock
synchronized 同步块对同一条线程来说是可重入的,不会出现自己把自己锁死的问题,即死锁问题,关于死锁后续文章再做介绍。
综上,synchronized关键字加锁有如下性质:互斥性,刷新内存性,可重入性。
synchronized关键字也相当于一把监视器锁monitor lock,如果不加锁,直接使用
wait
方法(一种线程等待的方法,后面细说),会抛出非法监视器异常,引发这个异常的原因就是没有加锁。⚡️2.2.3再析案例
对自增那个代码上锁后,我们再来分析一下为什么加上了所就线程安全了,先列代码:
class Counter { private int count; synchronized public void increase() { ++this.count; } public int getCount() { return this.count; } } public class Main11 { private static final int CNT = 50000; private static final Counter counter = new Counter(); public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { for (int i = 0; i < CNT; i++) { counter.increase(); } }); Thread thread2 = new Thread(() -> { for (int j = 0; j < CNT; j++) { counter.increase(); } }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.getCount()); } }
多线程并发执行时,上一次就分析过没有指令集交叉就不会出现问题,因此这里我们只讨论指令交叉后,加锁操作是如何保证线程安全的,不妨记加锁为
lock
,解锁为unlock
,两个线程运行过程如下:
线程1首先拿到目标对象的锁,对对象进行加锁,处于lock
状态,当线程2来执行自增操作时会发生阻塞,直到线程1的自增操作完毕,处于unlock
状态,线程2才会就绪取执行线程2的自增操作。
加锁后线程就是串行执行,与单线程其实没有很大的区别,那多线程是不是没有用了呢?但是对方法加锁后,线程运行该方法才会加锁,运行完该方法就会自动解锁,况且大部分操作并发执行是不会造成线程安全的,只有少部分的修改操作才会有可能导致线程安全问题,因此整体上多线程运行效率还是比单线程高得多。❄️2.3线程不安全的原因
首先,线程不安全根源是线程间的调度充满随机性,导致原有的逻辑被改变,造成线程不安全,这个问题无法解决,无可奈何。
多个线程针对同一资源进行写(修改)操作,并且针对资源的修改操作不是原子性的,可能会导致线程不安全问题,类似于数据库的事务。
由于编译器的优化,内存可见性无法保证,就是当线程频繁地对同一个变量进行读操作时,会直接从寄存器上读值,不会从内存上读值,这样内存的值修改时,线程就感知不到该变量已经修改,会导致线程安全问题(这是编译器优化的结果,现代的编译器都有类似的优化不止于Java),因为相比于寄存器,从内容中读取数据的效率要小的多,所以编译器会尽可能地在逻辑不变的情况下对代码进行优化,单线程情况下是不会翻车的,但是多线程就不一定了,比如下面一段代码:
import java.util.Scanner; public class Main12 { private static int isQuit; public static void main(String[] args) { Thread thread = new Thread(() -> { while (isQuit == 0) { } System.out.println("线程thread执行完毕!"); }); thread.start(); Scanner sc = new Scanner(System.in); System.out.println("请输入isQuit的值,不为0线程thread停止执行!"); isQuit = sc.nextInt(); System.out.println("main线程执行完毕!"); } }
运行结果:
我们从运行结果可以知道,输入isQuit
后,线程thread
没有停止,这就是编译器优化导致线程感知不到内存可见性,从而导致线程不安全。
我们可以使用volatile
关键字保证内存可见性。
我们可以使用volatile
关键字修饰isQuit
来保证内存可见性。import java.util.Scanner; public class Main12 { volatile private static int isQuit; public static void main(String[] args) { Thread thread = new Thread(() -> { while (isQuit == 0) { } System.out.println("线程thread执行完毕!"); }); thread.start(); Scanner sc = new Scanner(System.in); System.out.println("请输入isQuit的值,不为0线程thread停止执行!"); isQuit = sc.nextInt(); System.out.println("main线程执行完毕!"); } }
运行结果:
⭐️synchronized与volatile关键字的区别:
synchronized
关键字能保证原子性,但是是否能够保证内存可见性要看情况(上面这个栗子是不行的),而volatile
关键字只能保证内存可见性不能保证原子性。
保证内存可见性就是禁止编译器做出如上的优化而已。import java.util.Scanner; public class Main12 { private static int isQuit; //锁对象 private static final Object lock = new Object(); public static void main(String[] args) { Thread thread = new Thread(() -> { synchronized (lock) { while (isQuit == 0) { } System.out.println("线程thread执行完毕!"); } }); thread.start(); Scanner sc = new Scanner(System.in); System.out.println("请输入isQuit的值,不为0线程thread停止执行!"); isQuit = sc.nextInt(); System.out.println("main线程执行完毕!"); } }
运行结果:
编译器优化除了导致内存可见性感知不到的问题,还有指令重排序也会导致线程安全问题,指令重排序也是编译器优化之一,就是编译器会智能地(保证原有逻辑不变的情况下)调整代码执行顺序,从而提高程序运行的效率,单线程没问题,但是多线程可能会翻车,这个原因了解即可。
☁️3.线程安全的标准类
Java 标准库中很多都是线程不安全的。这些类可能会涉及到多线程修改共享数据, 又没有任何加锁措施。例如,ArrayList,LinkedList,HashMap,TreeMap,HashSet,TreeSet,StringBuilder。
但是还有一些是线程安全的,使用了一些锁机制来控制,例如,Vector (不推荐使用),HashTable (不推荐使用),ConcurrentHashMap (推荐),StringBuffer。
还有的虽然没有加锁, 但是不涉及 “修改”, 仍然是线程安全的,例如String。在线程安全问题中可能你还会遇到JMM模型,在这里补充一下,JMM其实就是把操作系统中的寄存器,缓存和内存重新封装了一下,其中在JMM中寄存器和缓存称为工作内存,内存称为主内存。
其中缓存分为一级缓存L1,二级缓存L2和三级缓存L3,从L1到L3空间越来越大,最大也比内存空间小,最小也比寄存器空间大,访问速度越来越慢,最慢也比内存的访问速度快,最快也没有寄存器访问快。☁️4.Object类提供的线程等待方法
除了Thread类中的能够实现线程等待的方法,如
join
,sleep
,在Object类中也提供了相关线程等待的方法。序号 方法 说明 1 public final void wait() throws InterruptedException 释放锁并使线程进入WAITING状态 2 public final native void wait(long timeout) throws InterruptedException; 相比于方法1,多了一个最长等待时间 3 public final void wait(long timeout, int nanos) throws InterruptedException 相比于方法2,等待的最长时间精度更大 4 public final native void notify(); 唤醒一个WAITING状态的线程,并加锁,搭配wait方法使用 5 public final native void notifyAll(); 唤醒所有处于WAITING状态的线程,并加锁(很可能产生锁竞争),搭配wait方法使用 上面介绍
synchronized
关键字的时候,如果不对线程加锁会产生非法监视异常,我们来验证一下:public class TestDemo12 { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行完毕!"); }); thread.start(); System.out.println("wait前"); thread.wait(); System.out.println("wait后"); } }
看看运行结果:
果然抛出了一个IllegalMonitorStateException
,因为wait
方法的执行步骤为:先释放锁,再使线程等待,你现在都没有加锁,那如何释放锁呢?所以会抛出这个异常,但是执行notify
是无害的。wait
方法常常搭配notify
方法搭配一起使用,前者能够释放锁,使线程等待,后者能获取锁,使线程继续执行,这套组合拳的流程图如下:
现在有两个任务由两个线程执行,假设线程2比线程1先执行,请写出一个多线程程序使任务1在任务2前面完成,其中线程1执行任务1,线程2执行任务2。
这个需求可以使用wait/notify
来实现。class Task{ public void task(int i) { System.out.println("任务" + i + "完成!"); } } public class WiteNotify { //锁对象 private static final Object lock = new Object(); public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { synchronized (lock) { Task task1 = new Task(); task1.task(1); //通知线程2线程1的任务完成 System.out.println("notify前"); lock.notify(); System.out.println("notify后"); } }); Thread thread2 = new Thread(() -> { synchronized (lock) { Task task2 = new Task(); //等待线程1的任务1执行完毕 System.out.println("wait前"); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } task2.task(2); System.out.println("wait后"); } }); thread2.start(); Thread.sleep(10); thread1.start(); } }
运行结果:
本文总结了多线程中线程安全问题,下一篇文章会介绍多线程的重要实例。
觉得文章写得不错的老铁们,点赞评论关注走一波!谢谢啦! - 将内存中变量的值加载到寄存器,不妨将该操作记为
-
多线程冒泡法排序
2019-12-05 22:20:01java课的课后作业,要求实现数组的多线程冒泡排序并实现可视化,自己写了一个奇偶排序的多线程,但效率较低,现在回顾一下老师课上讲的程序。 生成的界面如上 首先实现Button1的功能,生成随机的指定大小的数组 ...java课的课后作业,要求实现数组的多线程冒泡排序并实现可视化,自己写了一个奇偶排序的多线程,但效率较低,现在回顾一下老师课上讲的程序。
生成的界面如上
首先实现Button1的功能,生成随机的指定大小的数组private void jButton1ActionPerformed(java.awt.event.ActionEvent evt) { // TODO add your handling code here: long len=(Long)jFormattedTextField1.getValue(); int arr[]=new int[(int)len]; for(int i=0;i<len;i++) { arr[i]=(int)(Math.random()*10000); } SortPane.ar=arr; this.repaint(); }
接着实现选择线程的按钮
private void jButton2ActionPerformed(java.awt.event.ActionEvent evt) { // TODO add your handling code here: Object value=jSpinner1.getValue(); int tn= Integer.parseInt(value.toString()); SortPane.mode=new int[tn]; BubbleOrder[]bo=new BubbleOrder[tn]; for(int i=0;i<tn;i++) { bo[i]= new BubbleOrder((SortPane)jPanel1,i,tn,SortPane.ar); } try { Thread.sleep(500);}catch(Exception e){} for(int i=0;i<tn;i++) { bo[i].start(); } }
输入的值为tn,最后生成tn个线程
在SortPane中实现用直线表示数字大小的方法为:public void paint(Graphics g) { if(ar==null) super.paint(g); //To change body of generated methods, choose Tools | Templates. else{ g.setColor(Color.gray); float w=this.getSize().width; float h=this.getSize().height; g.fillRect(0, 0, this.getSize().width, this.getSize().height); g.setColor(Color.red); float w1=w/ar.length; w1=w1<1?1:(int)(w1+0.5); for(int i=0;i<ar.length;i++){ g.drawLine((int)(i*w1), (int)h,(int)(i*w1) ,(int) (h*(1-ar[i]/10000.0)));//drawLine中的4个参数表示两个点的坐标 } } }
最重要的是如何实现多线程的冒泡排序以及使各线程之间同步,假设项目中使用的4个线程的冒泡排序,那么第一个线程排序下标为0,4,8…的数,第二个线程排序下标为1,5,9…的数,由此类推,当四个线程均排到最后一个数时再对最后四个数字进行排序从而找到整个数组中最小的数,找到最小的数之后再对剩下的数组按照相同的方法排序,直到只剩下4个数时用任意一个线程去排序即可得到有序的数组,下面对代码的每一步进行分析。
1.将主函数中的参数传到各线程中,对于各参数的意义放到后面分析。public BubbleOrder(SortPane sp, int id, int tn, int ar[]) { this.id = id; this.tn = tn; ot = (int) Math.pow(tn, 2) - 1; this.ar = ar; this.sp = sp; BubbleOrder.len = ar.length; }
2.对其进行数组每次排序后对最后四个数排序和对剩下的最后四个数排序的函数sort1进行分析
void sort1(int len) { if (len > tn) {//判断是每次排序的最后找最小值还是对剩下的最后四个数 for (int k = len - tn; k < len - 1; k++) {//一层for循环找最小值 if (ar[k] < ar[k + 1]) { int ls = ar[k]; ar[k] = ar[k + 1]; ar[k + 1] = ls; } } } else {//两层for循环进行排序 for (int k = len; k > 0; k--) { for (int j = 0; j < k - 1; j++) { if (ar[j] < ar[j + 1]) { int ls = ar[j]; ar[j] = ar[j + 1]; ar[j + 1] = ls; } } } exit = true; } }
3.再对其中的排序函数进行分析,很容易知道此处的tn为线程数
void sort(int len) { if (id > len - tn) { exit = true;//当剩余需排序的长度小于线程数时退出,然后采用单线程排序即可 } for (int k = id; k < len - tn; k = k + tn) {//id是各线程声明的整型数值,表示线程的名字和线程排序的起点 if (ar[k] < ar[k + tn]) {//将较小的数换到后面 int ls = ar[k]; ar[k] = ar[k + tn]; ar[k + tn] = ls; } } }
3.分析线程里面的run函数
一个for循环中,首先进行冒泡排序,然后用一个waiting表示当前线程已完成排序,等待其他线程,已完成排序的线程SortPane.get(id)方法得到false,在sortPane中声明的mode数组存储各线程的完成情况,若完成则赋值为1,public void run() { while (!exit) { System.out.print("Thread " + id + " sorting\r\n"); sort(len); //synchronized(SortPane.ar){ waiting = true; try{ System.out.print("Thread " + id + " waiting\r\n"); SortPane.add(id); if(issorted(SortPane.mode)) { sort1(len); sp.repaint(); BubbleOrder.len--; if (BubbleOrder.len==tn) {//当仅剩下和线程相等的数组时直接用sort1函数排序 sort1(len); sp.repaint(); break; } SortPane.clear(); synchronized (SortPane.ii) { SortPane.ii.notifyAll(); } } else { synchronized (SortPane.ii) { // SortPane.ar.notify(); if (exit) { break; } SortPane.ii.wait(); } } } catch (InterruptedException ex) {//好像没啥用 }
此处为SortPane的mode数组,用于表示各线程是否已完成当前线程的排序
static int[] mode; static synchronized void add(int id) { mode[id]=1; }
issorted为判断mode数组是否所有数字为1的函数
boolean issorted(int a[]) { boolean sorted=true; for(int i=0;i<a.length;i++) { if(a[i]!=1) sorted=false; } return sorted; }
最后的效果如下
-
如何用多线程实现归并排序
2018-08-09 22:45:17之前听吴恩达老大说过Python里面的Numpy包的矩阵运算就是多线程的,所以能做到的情况下尽量用矩阵运算代替循环,这样能大大加快运算的速度。 为了提高速度,如果不涉及外部资源读取的话,要提高运行速度就要做到... -
Java多线程 - 锁
2022-03-28 15:31:49Java多线程 - 锁 三性 可见性 指的是线程之间的可见性,一个线程对状态的修改,对其他线程是可见的。在 Java中 volatile、synchronized 和 final 实现可见性。 原子性 如果一个操作是不可分割的,我们则称之为... -
Java中如何等待创建启动的子线程们运行结束后再执行下一步 | Java多线程 | 多线程排序
2020-03-03 15:01:00????使用 ThreadGroup,首先创建一个线程组,把创建的子线程都放到这个线程组里,然后循环判断这个线程组的活跃线程数量 是否等于0,不等于0继续,否则代表子线程全部执行完了,进行下一步。 -
java多线程排序
2008-07-02 09:37:17java多线程排序源程序,三种排序算法。希尔排序,快速排序,堆排序。 -
java 多线程 面试题整理(更新......)
2021-11-30 16:23:063、什么是同步执行和异步执行4、Java中实现多线程有几种方法?(较难)(1)继承Thread类(2)实现runable接口(3)实现Callable接口(创建FutureTask(Callable)对象)5、Future接口,Callable接口,FutureTask实现类的... -
Java多线程(一文看懂!)
2021-07-28 17:36:15二,多线程的实现方式 三,多线程的五大状态 四,多线程的调度 五,线程的同步(多口售票问题) 六,线程的协作(生产者-消费者模型) 一,多线程的介绍 百度中多线程的介绍(multithreading):是指从软件或者... -
2021年JAVA多线程并发编程面试题(持续更新)
2021-02-18 12:56:25Java 中用到的线程调度算法是什么?线程同步以及线程调度相关的方法。sleep() 和 wait() 有什么区别?线程的 sleep()方法和 yield()方法有什么区别?同步方法和同步块,哪个是更好的选择?如果你提交任务时,线程池... -
java多线程提交,如何按照时间顺序获取线程结果,看完你就懂了 | Java工具类
2021-11-01 21:41:01在工作中是否存在这样的场景,多个线程提交执行,你不想全部线程执行结束了获取结果,而是有线程完成返回结果就获取消费。本文提供该场景的工具类,可以直接用哦。 Maven依赖 其实要不要无所谓。主要是为了方便... -
java多线程会造成线程安全问题的原因总结
2021-09-30 22:13:28众所周知,多线程会造成线程安全问题,那么多线程为什么会导致线程安全问题呢? 一:首先了解jvm内存的运行时数据区 1.堆区:存储对象实例(和实例变量),数组等 2.java虚拟机栈(方法·栈),存放方法声明,... -
Java多线程---单例模式(有趣易懂版)
2021-03-17 17:54:38在多线程执行中,原本是要将数据从主内存拿到自己的私有工作区中去修改,然后再放回主内存中,但这个过程中,可能A线程正在改数据还没放回去,B线程又去拷贝这个数据去修改,导致数据不一致。 使用volatile关键字,... -
Java重排序对多线程的影响
2017-06-17 14:53:58上一次的发文远在两个月前了,算是经历了一段低糜期,本来打算的更新一直断更到现在。还是好好学习吧,努力的人运气一定不会差的。这一篇文章来谈一谈Java虚拟机对代码优化带来的影响...通过JMM(Java内存模型)实现内存 -
Java多线程并发编程知识体系(附大图-持续更新)
2021-08-22 11:34:05定义:多线程读写共享变量时出现不正确的行为 原因 原子性问题 CPU时钟中断带来的线程切换 可见性问题 多核CPU高速缓存之间不可见 重排序问题 CPU和编译器会进行重排序指令 典型问题:单例模式DCL ... -
Java并发 多线程实现计数功能(线程安全)
2019-07-24 16:25:59在上篇,我们利用线程池,信号量,倒计时相关类实现计数的功能,但运行结果总不能达到目标,我们将做以下改进。 1.首先附上源码,红色标注,是我们此次修改的地方 import javax.annotation.concurrent.ThreadSafe... -
java多线程 —— 面试题集合(最全集合)
2019-08-21 11:15:30java多线程创建创建线程的方式Runnable和Callable的区别Thread类中的start()和run()方法有什么区别?什么导致线程阻塞?3. 多线程同步和锁怎么检测一个线程是否持有对象监视器Condition?4. 线程池CyclicBarrier和... -
Java中的多线程与高并发
2021-11-22 17:20:53线程是系统调度的最小单元,是线程的运算单元,一个进程可以包括一个或多个线程。 线程的三种创建方式: 一、继承Thread类 class MyThread extends Thread { @Override public void run() { Log.i(TAG, ... -
java多线程之指令重排序
2016-05-25 11:21:56编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。 2)指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction-Level Parallelism,ILP)来将多条指令重叠执行。如果不存在数据依赖性... -
Java 多线程总结笔记
2018-09-19 10:21:56Java多线程总结笔记 实现多线程的方法 查看Thread类的源码,可以发现它实现了Runnable接口,然后在自己的run方法中调用了Runnable的run方法。这里其实就是静态代理这一设计模式,Thread是代理角色,而Runnable则是... -
Java多线程系列—多线程带来的问题(05)
2021-05-05 14:08:35多线程带来的问题 为什么需要多线程 其实说白了,时代变了,现在的机器都是多核的了,为了榨干机器最后的性能我们引入单线程。 为了充分利用CPU资源,为了提高CPU的使用率,采用多线程的方式去同时完成几件事情而不... -
【java多线程系列】java内存模型与指令重排序
2016-06-26 10:21:28在多线程编程中,需要处理两个最核心的问题,线程之间如何通信及线程之间如何同步,线程之间通信指的是线程之间通过何种机制交换信息,同步指的是如何控制不同线程之间操作发生的相对顺序。很多读者可能会说这还不... -
Java面试题总结 - Java多线程篇(附答案)
2021-05-29 20:56:17一、线程的run()和start()有什么区别? 每个线程都是通过某个特定...start()方法来启动一个线程,真正实现了多线程运行。调用start()方法无需等待run方法体代码执行完毕,可以直接继续执行其他的代码; 此时线程.. -
Java Web基础篇之Java多线程
2019-05-08 13:18:061、多线程与进程 1.1、是什么? 线程有时被称为轻量进程(Lightweight Process,LWP),是程序执行流的最小单元,进程与线程区别如下:一个进程至少有一个线程.。 1.2、扩展: 进程与线程区别,线程的划分尺度小于...