-
2022-02-22 09:44:24
使用两个线程,一个线程读,一个线程写,实际上就是一个典型的生产者-消费者模型
public class Application { private String prefix = "G:/JAVA资料/"; private String readFilename = prefix + "curblock-笔试-1504_人口、人口密度统计年鉴_20191113.xlsx"; private String writeFilename = prefix + "new.xlsx"; // 结束标志位 private boolean end = false; // 使用一个队列进行同步 private BlockingQueue<List<String>> queue = new LinkedBlockingQueue<List<String>>(1); // 生产者线程 // 读取excel文件,并且一行一行加入到同步队列中 class Producer extends Thread { @Override public void run(){ // 读取 ExcelReader reader = ExcelUtil.getReader(readFilename); List<List<Object>> lists = reader.read(); // 将对象一行一行送到队列中 for(List<Object> list : lists){ try { queue.put(cast(list)); System.out.println("[" + Thread.currentThread().getName() + "]生产者生产一行数据..."); } catch (InterruptedException e) { e.printStackTrace(); } } // 读取结束 end = true; reader.close(); } // 将List<Object>转为List<String> public List<String> cast(List<Object> list){ List<String> tmp = new ArrayList<>(); for(Object li : list){ tmp.add((String) li); } return tmp; } } // 消费者线程 // 从队列中拿数据并写入文件 class Consumer extends Thread { @Override public void run(){ ExcelWriter writer = ExcelUtil.getWriter(writeFilename); // 当输入流读取完成并且队列空的时候结束 while(!end || !queue.isEmpty()){ try { // 拿一条数据 List<String> list = queue.take(); List<List<String>> row = new ArrayList<>(); row.add(list); // 写入到输出流中 writer.write(row, true); System.out.println("[" + Thread.currentThread().getName() + "]消费者写入一条数据..."); } catch (InterruptedException e) { e.printStackTrace(); } } writer.close(); } } @Test public void test(){ Application application = new Application(); Producer producer = application.new Producer(); Consumer consumer = application.new Consumer(); producer.start(); consumer.start(); try { producer.join(); consumer.join(); System.out.println("["+Thread.currentThread().getName()+"]完成..."); } catch (InterruptedException e) { e.printStackTrace(); } } }
更多相关内容 -
C++双向链表类及四种多线程安全读写测试用例
2018-07-05 09:36:46实现双向链表,查找、替换、删除、插入,并用线程同步技术实现了多线程读写与线程安全 -
多线程样例一 读写参数文件
2021-09-09 15:02:03事情起因: 修改配置后,C++函数取读配置xml时,相机缓存满了而引起崩溃。几率发生。 解决思路: 读文件费时间,所以开一个线程读文件。...查看日志:“读取参数”线程约60秒完成,模拟图像处理”线程约50秒完成。 -
多线程读写excel
2018-11-19 17:46:11q1_t = threading.Thread(target=self.put_data_1) thread_list.append(q1_t) # 从队列q1取出数据放入q2的线程 q2_t = threading.Thread(target=self.worker1) thread_list.append(q2_t) # 从队列q2中取出数据放入q3...from queue import Queue
import threading
import timefrom openpyxl import load_workbook
class test_q(object):
def __init__(self):
self.q1 = Queue()
self.q2 = Queue()
self.q3 = Queue()# 队列一种放入数据
def put_data_1(self):
i = 0
while i < 100000:
self.q1.put(str(i))
i += 1# 从队列q1中取出数据,并放入队里q2
def worker1(self):
while True:
num1 = self.q1.get()
if num1:
print('从q1中拿到的数据是11111111111%s' % num1)
self.q2.put('放入q2数据%s' % num1)
self.q1.task_done()
# 从队列q2中取出数据放入q3
def worker2(self):
while True:
re_num = self.q2.get()
if re_num:
print('从q2中拿到数据是22222222222222%s' % re_num)
self.q3.put('放入3的数据%s' % re_num)
self.q2.task_done()# 从队列q3中取出数据写入excel表格
def worker3(self, table, loc):
while True:
loc.acquire()
res_3 = self.q3.get()
if res_3:
print('从q3中取出的数据333333******%s' % res_3)
# with open('G:\\vw\zmyw_test.txt', 'a+', encoding='utf8') as f:
# f.write(res_3+'\n')
table.append([res_3])
self.q3.task_done()
loc.release()def run(self):
# 线程列表
thread_list = []# 队列q1放入数据的线程
q1_t = threading.Thread(target=self.put_data_1)
thread_list.append(q1_t)# 从队列q1取出数据放入q2的线程
q2_t = threading.Thread(target=self.worker1)
thread_list.append(q2_t)# 从队列q2中取出数据放入q3的线程
q3_t = threading.Thread(target=self.worker2)
thread_list.append(q3_t)# 打开excel表格
open_excel = load_workbook('G:\\vw\zmyw_test.xlsx')
table = open_excel['Sheet1']# 从队列q3中取出数据写入excel表格
loc = threading.Lock()
for i in range(5):
w_excel = threading.Thread(target=self.worker3, args=(table,loc))
thread_list.append(w_excel)
for i in thread_list:
print('开启线程')
i.setDaemon(True)
i.start()for j in [self.q1, self.q2, self.q3]:
print('_____')
j.join()
print('***************')
open_excel.save('G:\\vw\zmyw_test.xlsx')if __name__=='__main__':
te = test_q()
te.run() -
Java多线程并发中支持并发的list对象
2021-03-08 16:59:50Java多线程并发编程中并发容器第二篇之List的并发类讲解概述本文我们将详细讲解list对应的并发容器以及用代码来测试ArrayList、vector以及CopyOnWriteArrayList在100个线程向list中添加1000个数据后的比较本文是...Java多线程并发编程中并发容器第二篇之List的并发类讲解
概述
本文我们将详细讲解list对应的并发容器以及用代码来测试ArrayList、vector以及CopyOnWriteArrayList在100个线程向list中添加1000个数据后的比较
本文是《凯哥分享Java并发编程之J.U.C包讲解》系列教程中的第六篇。如果想系统学习,凯哥(kaigejava)建议从第一篇开始看。
从本篇开始,我们就来讲解讲解Java的并发容器。大致思路:先介绍什么是并发容器。然后讲解list相关的、map相关的以及队列相关的。这个系列会有好几篇文章。大家最好跟着一篇一篇学。
正文开始
并发容器分类讲解
CopyOneWriteArrayList
Copy-One-Write:即写入时候复制。
我们知道在原来List子类中vactor是同步容器线程安全的。这个CopyOneWriteArrayList可以理解为是他的并发替代品。
其底层数据结构也是数值。和ArrayList的不同之处就在于:在list对象中新增或者是删除元素的时候会把原来的集合copy一份,增删操作是在新的对象中操作的。操作完成之后,会将新的数组替换原来的数组。
我们来看看CopyOnWriteArrayList源码中的add方法:
编辑
我们来看看setArray方法:
发现了吗?变量使用的是transient和volatile两个关键之来修饰的。
在之前文章中,我们知道了volatile关键字是内存可见性。那么transient关键字是干嘛的呢?我们来看下百科解释:
关键的一句话:用transient关键字修饰的成员变量不用参与序列化过程。
添加注释后的源码:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
//获取到锁
lock.lock();
try {
//获取到原集合
Object[] elements = getArray();
int len = elements.length;
//将原集合copy一份到新的集合中。并设置新的集合的长度为原集合长度+1
Object[] newElements = Arrays.copyOf(elements, len + 1);
//将需要新增的元数添加到新的素组中
newElements[len] = e;
//将新数组替换原来数据。 使用transient和volatitle关键字修饰的
setArray(newElements);
return true;
} finally {
lock.unlock();
}
代码很简单,大致流程如下:
先从ReentrantLock中获取到锁(这样在多线程下可以防止其他线程来修改容器list里面内容了);
通过arrays.copyOf方法copy出一份原有数组长度+1;
将要添加的元素赋值给copy出来的数组;
使用setArray方法将新得数组替换原有素组。
因为都是List集合。我们就拿ArrayList、vector以及CopyOnWriteArrayList进行比较:
ArrayList、vector以及CopyOnWriteArrayList比较
业务场景描述:
启动100个线程,向对象中添加1000个数据。查看各自运行结果耗时及插入数据总数。代码在文章最后凯哥会贴出来。
先用线程非安全的arrayList执行效果:
执行arryList时间为 : 112毫秒!
List.size() : 93266
我们发现list的长度不对。正确的应该是100*1000.从结果来看,arrayList丢数据了。
使用Vector执行后的效果:
执行vector时间为 : 98毫秒!
List.size() : 100000
执行的总数对,说下同步锁没有丢数据。
在来看看copyOnWriteArrayList执行效果:
执行copyOnWriteArrayList时间为 : 5951毫秒!
运行后数据比较:
从上面表格中我们可以看出非安全线程的容器会丢数据。使用copyOneWriteArrayList耗时很长。那是因为每次运行都要copyof一份。
总结
copyArrayList(这里凯哥就简写了):是读写分离的。在写的时候会复制一个新的数组来完成插入和修改或者删除操作之后,再将新的数组给array.读取的时候直接读取最新的数据。
因为在写的时候需要向主内存申请控件,导致写操作的时候,效率非常低的(虽然在操作时候比较慢得,但是在删除或者修改数组的头和尾的时候还是很快的。因为其数据结构决定查找头和尾快,而且执行不需要同步锁)
从上面表中,可以看出copyArrayList虽然保证了线程的安全性,但是写操作效率太low了。但是相比Vector来说,在并发安全方面的性能要比vector好;
CopyArrayList和Vector相比改进的地方:
Vector是在新增、删除、修改以及查询的时候都使用了Synchronized关键字来保证同步的。但是每个方法在执行的时候,都需要获取到锁,在获取锁等待的过程中性能就会大大的降低的。
CopyArrayList的改进:只是在新增和删除的方法上使用了ReentrantLock锁进行(这里凯哥就不截图源码了,自己可以看看源码)。在读的时候不加锁的。所以在读的方面性能要不vector性能要好。
所以,CopyArrayList支持读多写少的并发情况
CopyOnWriteArrayList的使用场景:
由于读操作不加锁,增删改三个操作加锁的,因此适用于读多写少的场景,
局限性:因为读的时候不加锁的,读的效率和普通的arrayList是一样的。但是请看读操作:
在get的时候array使用的是volatile修饰的。是内存可见的。所以可以说copyArrayList在读的时候不会出现arrayList读取到脏数据的问题。
Get(i)方法比较如下:
附件:arrayList、vector、copyOnwriteArrayList比较的代码:
public static voidmain(String[] args) {//使用线程不安全的arrayList// List arryList = new ArrayList<>();//使用vector// List arryList = new Vector<>();
//使用copyOnWriteArrayListList arryList = newCopyOnWriteArrayList<>();Random random = newRandom();Thread [] threadArr = newThread[100];CountDownLatch latch = newCountDownLatch(threadArr.length);Long beginTime = System.currentTimeMillis();for(inti = 0;i
-
java多线程读取、操作List集合
2018-05-21 16:36:00/** * 固定开辟8线程 * * @Author: wpf * @Date: 16:05 2018/5/21 * @Description: * @param * @param null * @return */ private void threadExec1(List<SysCompany&g...大数据中,多线程分段list操作, demo如下
/** * 每500条数据开启一条线程 * * @Author: wpf * @Date: 16:05 2018/5/21 * @Description: * @param * @param null * @return */ private void threadExec2(List<SysCompany> list) throws ExecutionException, InterruptedException { // 开始时间 long start = System.currentTimeMillis(); LOG.info("list的大小:" + list.size()); // 每500条数据开启一条线程 int threadSize = 500; // 总数据条数 int dataSize = list.size(); // 线程数 int threadNum = dataSize / threadSize + 1; // 定义标记,过滤threadNum为整数 boolean special = dataSize % threadSize == 0; // 创建一个线程池 ExecutorService exec = Executors.newFixedThreadPool(threadNum); // 定义一个任务集合 List<Callable<SysCompany>> tasks = new ArrayList<Callable<SysCompany>>(); Callable<SysCompany> task = null; List<SysCompany> cutList = null; // 确定每条线程的数据 for (int i = 0; i < threadNum; i++) { if (i == threadNum - 1) { if (special) { break; } cutList = list.subList(threadSize * i, dataSize); } else { cutList = list.subList(threadSize * i, threadSize * (i + 1)); } // LOG.info("第" + (i + 1) + "组:" + cutList.toString()); final List<SysCompany> listStr = cutList; task = new Callable<SysCompany>() { @Override public SysCompany call() throws Exception { for(SysCompany c : listStr){ if(StringUtils.isNotEmpty(c.getUsername())){ LOG.info("timer:在执行中..." + c.getUsername()); // 业务... sysCompanyService.getComCodeByInfoCompanyUser(c.getUsername()); } } return null; } }; // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系 tasks.add(task); } List<Future<SysCompany>> results = exec.invokeAll(tasks); /*for (Future<SysCompany> future : results) { SysCompany c = future.get(); }*/ // 关闭线程池 exec.shutdown(); LOG.info("线程任务执行结束"); LOG.info("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒"); }
/** * 固定开辟8线程 * * @Author: wpf * @Date: 16:05 2018/5/21 * @Description: * @param * @param null * @return */ private void threadExec1(List<SysCompany> list) throws ExecutionException, InterruptedException{ // 开始时间 long start = System.currentTimeMillis(); // 总数据条数 int listSize = list.size(); LOG.info("总记录数为:"+listSize); // 线程数 int threadNum = 8; // 创建一个线程池 ExecutorService exec = Executors.newFixedThreadPool(threadNum); // 定义一个任务集合 List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>(); Callable<Integer> task = null; List<SysCompany> cutList = null; // 均分算法 Map<Integer,Long> map = allotOfAverage1(threadNum,listSize); int endValue = 0; int startValue = 0; int i = 0; for (Long v : map.values()) { endValue = (int)(startValue+v); cutList = list.subList(startValue, endValue); startValue += v; LOG.info("第" + ++i +"组大小为:"+cutList.size()); final List<SysCompany> listStr = cutList; task = new Callable<Integer>() { @Override public Integer call() throws Exception { for(SysCompany c : listStr){ if(StringUtils.isNotEmpty(c.getUsername())){ // LOG.info("timer:在执行中..." + c.getUsername()); // 业务... sysCompanyService.getComCodeByInfoCompanyUser(c.getUsername()); } } return 1; } }; // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系 tasks.add(task); } List<Future<Integer>> results = exec.invokeAll(tasks); for (Future<Integer> future : results) { LOG.info(future.get()); } // 关闭线程池 exec.shutdown(); LOG.info("线程任务执行结束"); LOG.info("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒"); }
/* * 平均分配 */ public static Map<Integer,Long> allotOfAverage1(int threadNum, long listSize){ Map<Integer,Long> allot=new HashMap<Integer,Long>(); //保存分配的信息 for(int i=0;i<listSize;i++){ int j=i%threadNum; if(allot.containsKey(j)){ allot.put(j, allot.get(j) +1); }else{ allot.put(j, 1L); } } return allot; }
-
大文件多线程读写操作
2018-07-03 00:52:40对于一个比较大的数据文件, 如:20G, 每一行为一个数据单元,在不进行文件切割的前提下, 想利用多线程处理文件中的每行数据。 如果依次读取文件中每一行并处理则处理速度较慢较慢; 全部读取再处理机器的内存不能... -
java多线程读写文件
2018-04-07 23:11:06为了提高文件写出分析和写出效率,本文采用多线程实现多文件解析和写出。具体实现如下:二、文件读写工具类package com.test.multiThreadsReadFile.utils; import java.io.BufferedReader; import java.io.File; ... -
Python文本操作---多线程读写
2018-09-13 17:40:071.导入threading模块:线程 2.步骤:读取数据放入内存(readlines)---->检索符合的数据(Find)---->搜索到数据添加入列表(findlist[])---->数据写入文本中 #coding=utf-8 import threading ... -
多线程 读取List
2015-08-14 16:17:00import java.util.ArrayList; import java.util.List; import org.apache.commons.lang3.ArrayUtils; public class Test_4 { ... * 多线程处理list * * @param data 数据li -
C++高级——多线程编程
2020-07-28 18:51:14一条线程指的是进程中一个单一顺序的控制流,一个进程可以并发执行多个线程,每个线程会执行不同的任务。对应在现实生活中,进程是组长,线程是小组成员。 怎么创建启动一个线程 在语言级别,一般调用std名称空间的... -
Java多线程系列—多线程带来的问题(05)
2021-05-05 14:08:35为了充分利用CPU资源,为了提高CPU的使用率,采用多线程的方式去同时完成几件事情而不互相干扰,为了处理大量的IO操作时或处理的情况需要花费大量的时间等等,比如:读写文件,视频图像的采集,处理,显示,保存等。... -
redis高并发导致读写变慢(redis多线程)
2019-12-24 21:43:14文章目录redis 高并发读写变慢...最近在最redis + MQ高并发的一个功能,压测时发现redis读写性能突然降低很多,而redis已经启用一年多,一直没问题,走了点弯路后发现是因为对 JedisPool的高并发处理上存在效率 问题... -
java 多线程提高大数据量的读写效率
2019-07-16 14:53:47对于多线程来说,刚开始是比较蒙的,不了解其中的运行机制。 最近项目中需要用多线程解决一个加载缓慢的问题,特此写了一个例子,供大家参考,如有建议,请多指教,哈哈哈 那么,话不多说。 先说下需求:此接口供xxx... -
python多线程读取列表
2019-08-07 10:09:46本文代码实现了python多线程读取列表,包括python多线程初始化、开始和释放线程锁、分配多线程列表数等内容,可做参考。 -
java多线程之 几种线程安全的list synchronizedList()&CopyOnWriteArrayList
2021-01-25 09:40:02这段代码中,我们创建了两个线程,同时对ArrayList添加10000个元素,如果我们运行这段代码,我们肯定期望它返回的是20000。可是我在JDK1.8环境中运行这段代码,多次验证,会出现两种结果: import java.util.*; ... -
三种线程安全的List
2021-05-30 12:10:45在单线程开发环境中,我们经常使用ArrayList作容器来存储我们的数据,但它不是线程安全的,在多线程环境中使用它可能会出现意想不到的结果。 多线程中的ArrayList: 我们可以从一段代码了解并发环境下使用ArrayList... -
万字图解Java多线程
2020-09-06 14:45:07java多线程我个人觉得是javaSe中最难的一部分,我以前也是感觉学会了,但是真正有多线程的需求却不知道怎么下手,实际上还是对多线程这块知识了解不深刻,不知道多线程api的应用场景,不知道多线程的运行流程等等,... -
List在高并发场景下保证线程安全的三种方案,CopyOnWriteArrayList写时复制读写分离
2020-05-17 00:22:17Java并发修改异常:java.util.ConcurrentModificationExpection vector线程安全但是是JDK1.0诞生 arrayList线程不安全但是是JDK 1.2诞生 Collection是一个接口 ...//list线程不安全问题,已经举一个不安全的 -
JAVA多线程之初探List线程安全的三种实现方式
2020-05-01 18:51:48初探List线程安全的三种实现方式 先演示一下List线程不安全的情况 package JUC.unsafe; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.... -
ElasticSearch多线程写入实战(一)多线程同步异步写入ES,模拟hadoop文件拆分处理,join与...多线程读写小练习
2018-08-09 00:47:303-1、多线程读写文件小练习——写 3-1-1、synchronized与Lock的区别与使用 3-2、多线程读写文件小练习——读 3-2-1、单线程读 3-2-2、拆分文件 3-2-3、多线程读 3-2-4、控制父子线程顺序,... -
java多线程读取多个文件的方法
2021-02-12 10:51:57本文实例为大家分享了java多线程读取多个文件的具体代码,供大家参考,具体内容如下工具类代码如下:import java.io.*;import java.util.List;import java.util.concurrent.CountDownLatch;/*** 多线程读取多个文件*... -
多线程场景 Redis中List队列 LPush 和 remove 是否线程安全?
2020-03-20 21:19:29多线程场景 Redis中List队列 LPush 和 remove 是否线程安全? 1.业务场景如下 作者在rabbitmq消息队列场景中,实现了一个监控消息数据全部入库的程序,每个生产者发送消息的同时,插入一个key到 redis 中的 list ... -
C#操作SQLite数据库之读写数据库的方法
2020-09-02 02:02:13主要介绍了C#操作SQLite数据库之读写数据库的方法,简单分析了C#针对SQLite数据库的读写及显示等操作相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下 -
Java 多线程均匀处理同一个List中的数据
2018-09-30 15:34:35需求:使用多线程来处理同一个List中的数据,希望每个线程处理的数量是均匀的 事例代码如下: public class Test { static class HandleThread extends Thread { private String threadName; private List&... -
C#身份信息动态生成源代码(winform面向对象、多线程、数据存储等)
2018-10-09 14:12:38采用面向对象的编程思想,还涉及到了线程技术Thread、SQL数据库读写SqlBulkCopy、文件流读写FileStream、JSON序列化及反序列化、Linq、List操作和转化等方面技术。 -
多线程读写大量数据到excel
2019-06-28 16:33:19这样的类型,所以设计生产者的时候是,一个线程负责某天的表,线程数不要超过查询的天数总和(例如读取20190720-20190730 这十天的数据,那么线程数不能超过10个,也不能过多,看自己系统内存情况),否则,会有线.... -
多线程常见面试题(含常见项目遇到多线程问题解决及面试对话)
2021-07-30 10:12:28多线程 什么是线程和进程?他们是什么关系? 进程:在操作系统中能够独立运行,并且作为资源分配的基本单位。它表示运行中的程序。系统运行一个程序就是一个进程从创建、运行到消亡的过程。 线程:是一个比进程更小的... -
Redis 6.0多线程模型总结
2022-01-24 00:06:10这个版本提供了诸多令人心动的新特性及功能改进,比如新网络协议RESP3,新的集群代理,ACL等,其中关注度比较高的应该是多线程模型了。 1、Redis6.0之前的版本真的是单线程吗? Redis在处理客户端的请求时,包括... -
线程安全的List
2022-01-12 17:20:18我们都知道ArrayList是非线程安全的,当多线程开发的时候,如果多个线程都对同一个ArrayList进行操作会报ConcurrentModificationException错误,这时我们就需要一个线程安全的List集合。 我在开发过程中遇到...