精华内容
下载资源
问答
  • 上篇文章提起关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。...本文给出了基于多线程并发模式的,测试代码案例和实测结果,希望能给大家一些启示。

    一、引言:

      上篇文章提起关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。根据网友的反馈,基于单线程的模式实现的数据插入毕竟有限。通过个人实测,在我的虚拟机环境下,单线程插入数据的值约为4w/s。集群指标是:CPU双核1.83,虚拟机512M内存,集群部署单点模式。本文给出了基于多线程并发模式的,测试代码案例和实测结果,希望能给大家一些启示:

    二、源程序:

    复制代码
      1 import org.apache.hadoop.conf.Configuration;
      2 import org.apache.hadoop.hbase.HBaseConfiguration;
      3 import java.io.BufferedReader;
      4 import java.io.File;
      5 import java.io.FileNotFoundException;
      6 import java.io.FileReader;
      7 import java.io.IOException;
      8 import java.util.ArrayList;
      9 import java.util.List;
     10 import java.util.Random;
     11 
     12 import org.apache.hadoop.conf.Configuration;
     13 import org.apache.hadoop.hbase.HBaseConfiguration;
     14 import org.apache.hadoop.hbase.client.HBaseAdmin;
     15 import org.apache.hadoop.hbase.client.HTable;
     16 import org.apache.hadoop.hbase.client.HTableInterface;
     17 import org.apache.hadoop.hbase.client.HTablePool;
     18 import org.apache.hadoop.hbase.client.Put;
     19 
     20 public class HBaseImportEx {
     21     static Configuration hbaseConfig = null;
     22     public static HTablePool pool = null;
     23     public static String tableName = "T_TEST_1";
     24     static{
     25          //conf = HBaseConfiguration.create();
     26          Configuration HBASE_CONFIG = new Configuration();
     27          HBASE_CONFIG.set("hbase.master", "192.168.230.133:60000");
     28          HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.230.133");
     29          HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
     30          hbaseConfig = HBaseConfiguration.create(HBASE_CONFIG);
     31          
     32          pool = new HTablePool(hbaseConfig, 1000); 
     33     }
     34     /*
     35      * Insert Test single thread
     36      * */
     37     public static void SingleThreadInsert()throws IOException
     38     {
     39         System.out.println("---------开始SingleThreadInsert测试----------");
     40         long start = System.currentTimeMillis();
     41         //HTableInterface table = null;
     42         HTable table = null;
     43         table = (HTable)pool.getTable(tableName);
     44         table.setAutoFlush(false);
     45         table.setWriteBufferSize(24*1024*1024);
     46         //构造测试数据
     47         List<Put> list = new ArrayList<Put>();
     48         int count = 10000;
     49         byte[] buffer = new byte[350];
     50         Random rand = new Random();
     51         for(int i=0;i<count;i++)
     52         {
     53             Put put = new Put(String.format("row %d",i).getBytes());
     54             rand.nextBytes(buffer);
     55             put.add("f1".getBytes(), null, buffer);
     56             //wal=false
     57             put.setWriteToWAL(false);
     58             list.add(put);    
     59             if(i%10000 == 0)
     60             {
     61                 table.put(list);
     62                 list.clear();    
     63                 table.flushCommits();
     64             }            
     65         }
     66         long stop = System.currentTimeMillis();
     67         //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
     68           
     69         System.out.println("插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
     70         
     71         System.out.println("---------结束SingleThreadInsert测试----------");
     72     }
     73     /*
     74      * 多线程环境下线程插入函数 
     75      * 
     76      * */
     77     public static void InsertProcess()throws IOException
     78     {
     79         long start = System.currentTimeMillis();
     80         //HTableInterface table = null;
     81         HTable table = null;
     82         table = (HTable)pool.getTable(tableName);
     83         table.setAutoFlush(false);
     84         table.setWriteBufferSize(24*1024*1024);
     85         //构造测试数据
     86         List<Put> list = new ArrayList<Put>();
     87         int count = 10000;
     88         byte[] buffer = new byte[256];
     89         Random rand = new Random();
     90         for(int i=0;i<count;i++)
     91         {
     92             Put put = new Put(String.format("row %d",i).getBytes());
     93             rand.nextBytes(buffer);
     94             put.add("f1".getBytes(), null, buffer);
     95             //wal=false
     96             put.setWriteToWAL(false);
     97             list.add(put);    
     98             if(i%10000 == 0)
     99             {
    100                 table.put(list);
    101                 list.clear();    
    102                 table.flushCommits();
    103             }            
    104         }
    105         long stop = System.currentTimeMillis();
    106         //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
    107           
    108         System.out.println("线程:"+Thread.currentThread().getId()+"插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
    109     }
    110     
    111     
    112     /*
    113      * Mutil thread insert test
    114      * */
    115     public static void MultThreadInsert() throws InterruptedException
    116     {
    117         System.out.println("---------开始MultThreadInsert测试----------");
    118         long start = System.currentTimeMillis();
    119         int threadNumber = 10;
    120         Thread[] threads=new Thread[threadNumber];
    121         for(int i=0;i<threads.length;i++)
    122         {
    123             threads[i]= new ImportThread();
    124             threads[i].start();            
    125         }
    126         for(int j=0;j< threads.length;j++)
    127         {
    128              (threads[j]).join();
    129         }
    130         long stop = System.currentTimeMillis();
    131           
    132         System.out.println("MultThreadInsert:"+threadNumber*10000+"共耗时:"+ (stop - start)*1.0/1000+"s");        
    133         System.out.println("---------结束MultThreadInsert测试----------");
    134     }    
    135 
    136     /**
    137      * @param args
    138      */
    139     public static void main(String[] args)  throws Exception{
    140         // TODO Auto-generated method stub
    141         //SingleThreadInsert();        
    142         MultThreadInsert();
    143         
    144         
    145     }
    146     
    147     public static class ImportThread extends Thread{
    148         public void HandleThread()
    149         {                        
    150             //this.TableName = "T_TEST_1";
    151         
    152             
    153         }
    154         //
    155         public void run(){
    156             try{
    157                 InsertProcess();            
    158             }
    159             catch(IOException e){
    160                 e.printStackTrace();                
    161             }finally{
    162                 System.gc();
    163                 }
    164             }            
    165         }
    166 
    167 }
    复制代码

    三、说明

    1.线程数设置需要根据本集群硬件参数,实际测试得出。否则线程过多的情况下,总耗时反而是下降的。

    2.单笔提交数对性能的影响非常明显,需要在自己的环境下,找到最理想的数值,这个需要与单条记录的字节数相关。

    四、测试结果

    ---------开始MultThreadInsert测试----------

    线程:8插入数据:10000共耗时:1.328s
    线程:16插入数据:10000共耗时:1.562s
    线程:11插入数据:10000共耗时:1.562s
    线程:10插入数据:10000共耗时:1.812s
    线程:13插入数据:10000共耗时:2.0s
    线程:17插入数据:10000共耗时:2.14s
    线程:14插入数据:10000共耗时:2.265s
    线程:9插入数据:10000共耗时:2.468s
    线程:15插入数据:10000共耗时:2.562s
    线程:12插入数据:10000共耗时:2.671s
    MultThreadInsert:100000共耗时:2.703s
    ---------结束MultThreadInsert测试----------


    备注:该技术专题讨论正在群Hadoop高级交流群:293503507同步直播中,敬请关注。

    展开全文
  • 因此在多线程模式下,并发对同一个数据库连接调用sqlite3_prepare_v2()来创建prepared statement,或者对同一个数据库连接的任何prepared statement并发调用sqlite3_bind_*()和sqlite3_step()等函数都会出错(在iOS...

    先看看总结的结论:

    A.  由此可见,要想保证线程安全的话,可以有这4种方式:

    1. SQLite使用单线程模式,用一个专门的线程访问数据库。
    2. SQLite使用单线程模式,用一个线程队列来访问数据库,队列一次只允许一个线程执行,队列里的线程共用一个数据库连接。
    3. SQLite使用多线程模式,每个线程创建自己的数据库连接。
    4. SQLite使用串行模式,所有线程共用全局的数据库连接。

    B.  关于数据库连接和数据库事务,以及线程之间的关系对应

        事务是和数据库连接相关的,与线程是无关的。一个数据库连接同时只能执行一个事务操作。

    每个数据库连接(使用pager来)维护自己的事务,且同时只能有一个事务(但是可以用SAVEPOINT来实现内嵌事务)。也就是说,事务与线程无关,一个线程里可以同时用多个数据库连接来完成多个事务,而多个线程也可以同时(非并发)使用一个数据库连接来共同完成一个事务。

    经常有人抱怨SQLite的插入太慢实际上它可以做到每秒插入几万次,但是每秒只能提交几十次事务。因此在插入大批数据时,可以通过禁用自动提交来提速。数据库只有在事务中才能被更改。所有更改数据库的命令(除SELECT以外的所有SQL命令)都会自动开启一个新事务,并且当最后一个查询完成时自动提交。

     

    一、 是否支持多线程?

     

    SQLite官网上的“Is SQLite threadsafe?”这个问答。 简单来说,从3.3.1版本开始,它就是线程安全的了。而iOS的SQLite版本没有低于这个版本的,当然,你也可以自己编译最新版本。


    不过这个线程安全仍然是有限制的,在这篇《Is SQLite thread-safe?》里有详细的解释。
    另一篇重要的文档就是《SQLite And Multiple Threads》。它指出SQLite支持3种线程模式:

    1. 单线程:禁用所有的mutex锁,并发使用时会出错。当SQLite编译时加了SQLITE_THREADSAFE=0参数,或者在初始化SQLite前调用sqlite3_config(SQLITE_CONFIG_SINGLETHREAD)时启用。
    2. 多线程:只要一个数据库连接不被多个线程同时使用就是安全的。源码中是启用bCoreMutex,禁用bFullMutex。实际上就是禁用数据库连接和prepared statement(准备好的语句)上的锁,因此不能在多个线程中并发使用同一个数据库连接或prepared statement。当SQLite编译时加了SQLITE_THREADSAFE=2参数时默认启用。若SQLITE_THREADSAFE不为0,可以在初始化SQLite前,调用sqlite3_config(SQLITE_CONFIG_MULTITHREAD)启用;或者在创建数据库连接时,设置SQLITE_OPEN_NOMUTEX flag。
    3. 串行:启用所有的锁,包括bCoreMutex和bFullMutex。因为数据库连接和prepared statement都已加锁,所以多线程使用这些对象时没法并发,也就变成串行了。当SQLite编译时加了SQLITE_THREADSAFE=1参数时默认启用。若SQLITE_THREADSAFE不为0,可以在初始化SQLite前,调用sqlite3_config(SQLITE_CONFIG_SERIALIZED)启用;或者在创建数据库连接时,设置SQLITE_OPEN_FULLMUTEX flag。

      而这里所说的初始化是指调用sqlite3_initialize()函数,这个函数在调用sqlite3_open()时会自动调用,且只有第一次调用是有效的。

      调用sqlite3_threadsafe()可以获得编译期的SQLITE_THREADSAFE参数。标准发行版是1,也就是串行模式;而iOS上是2,也就是多线程模式;Python的sqlite3模块也默认使用串行模式,可以用sqlite3.threadsafety来配置。

      

      另一个要说明的是prepared statement,它是由数据库连接(的pager)来管理的,使用它也可看成使用这个数据库连接。因此在多线程模式下,并发对同一个数据库连接调用sqlite3_prepare_v2()来创建prepared statement,或者对同一个数据库连接的任何prepared statement并发调用sqlite3_bind_*()和sqlite3_step()等函数都会出错(在iOS上,该线程会出现EXC_BAD_ACCESS而中止)。这种错误无关读写,就是只读也会出错。文档中给出的安全使用规则是:没有事务正在等待执行,所有prepared statement都被finalized

      但是默认情况下,一个线程只能使用当前线程打开的数据库连接,除非在连接时设置了check_same_thread=False参数。如果是用不同的数据库连接,每个连接都不能读取其他连接中未提交的数据,除非使用read-uncommitted模式。

    现在3种模式都有所了解了,清楚SQLite并不是对多线程无能为力后,接下来就了解下事务吧。

     

    二、事务


      数据库只有在事务中才能被更改。所有更改数据库的命令(除SELECT以外的所有SQL命令)都会自动开启一个新事务,并且当最后一个查询完成时自动提交。
      而BEGIN命令可以手动开始事务,并关闭自动提交。当下一条COMMIT命令执行时,自动提交再次打开,事务中所做的更改也被写入数据库。当COMMIT失败时,自动提交仍然关闭,以便让用户尝试再次提交。若执行的是ROLLBACK命令,则也打开自动提交,但不保存事务中的更改。关闭数据库或遇到错误时,也会自动回滚事务。
      

      经常有人抱怨SQLite的插入太慢实际上它可以做到每秒插入几万次,但是每秒只能提交几十次事务。因此在插入大批数据时,可以通过禁用自动提交来提速。

      还有一个很重要的知识点需要强调:事务是和数据库连接相关的,每个数据库连接(使用pager来)维护自己的事务,且同时只能有一个事务(但是可以用SAVEPOINT来实现内嵌事务)。也就是说,事务与线程无关,一个线程里可以同时用多个数据库连接来完成多个事务,而多个线程也可以同时(非并发)使用一个数据库连接来共同完成一个事务。

     

    而要实现事务,就不得不用到
    一个SQLite数据库文件有5种锁的状态:

    • UNLOCKED:表示数据库此时并未被读写。
    • SHARED:表示数据库可以被读取。SHARED锁可以同时被多个线程拥有。一旦某个线程持有SHARED锁,就没有任何线程可以进行写操作。
    • RESERVED:表示准备写入数据库。RESERVED锁最多只能被一个线程拥有,此后它可以进入PENDING状态。
    • PENDING:表示即将写入数据库,正在等待其他读线程释放SHARED锁。一旦某个线程持有PENDING锁,其他线程就不能获取SHARED锁。这样一来,只要等所有读线程完成,释放SHARED锁后,它就可以进入EXCLUSIVE状态了。
    • EXCLUSIVE:表示它可以写入数据库了。进入这个状态后,其他任何线程都不能访问数据库文件。因此为了并发性,它的持有时间越短越好。

    一个线程只有在拥有低级别的锁的时候,才能获取更高一级的锁。SQLite就是靠这5种类型的锁,巧妙地实现了读写线程的互斥。同时也可看出,写操作必须进入EXCLUSIVE状态,此时并发数被降到1,这也是SQLite被认为并发插入性能不好的原因。
    另外,read-uncommitted和WAL模式会影响这个锁的机制。在这2种模式下,读线程不会被写线程阻塞,即使写线程持有PENDING或EXCLUSIVE锁。

    提到锁就不得不说到死锁的问题,而SQLite也可能出现死锁。
    下面举个例子:

    连接1:BEGIN (UNLOCKED)
    连接1:SELECT ... (SHARED)
    连接1:INSERT ... (RESERVED)
    连接2:BEGIN (UNLOCKED)
    连接2:SELECT ... (SHARED)
    连接1:COMMIT (PENDING,尝试获取EXCLUSIVE锁,但还有SHARED锁未释放,返回SQLITE_BUSY)
    连接2:INSERT ... (尝试获取RESERVED锁,但已有PENDING锁未释放,返回SQLITE_BUSY)

    现在2个连接都在等待对方释放锁,于是就死锁了。当然,实际情况并没那么糟糕,任何一方选择不继续等待,回滚事务就行了。

    不过要更好地解决这个问题,就必须更深入地了解事务了。
    实际上BEGIN语句可以有3种起始状态:

    • DEFERRED:默认值,开始事务时不获取任何锁。进行第一次读操作时获取SHARED锁,进行第一次写操作时获取RESERVED锁。
    • IMMEDIATE:开始事务时获取RESERVED锁。
    • EXCLUSIVE:开始事务时获取EXCLUSIVE锁。


    现在考虑2个事务在开始时都使用IMMEDIATE方式:

    连接1:BEGIN IMMEDIATE (RESERVED)
    连接1:SELECT ... (RESERVED)
    连接1:INSERT ... (RESERVED)
    连接2:BEGIN IMMEDIATE (尝试获取RESERVED锁,但已有RESERVED锁未释放,因此事务开始失败,返回SQLITE_BUSY,等待用户重试)
    连接1:COMMIT (EXCLUSIVE,写入完成后释放)
    连接2:BEGIN IMMEDIATE (RESERVED)
    连接2:SELECT ... (RESERVED)
    连接2:INSERT ... (RESERVED)
    连接2:COMMIT (EXCLUSIVE,写入完成后释放)

    这样死锁就被避免了。

    而EXCLUSIVE方式则更为严苛,即使其他连接以DEFERRED方式开启事务也不会死锁:

    连接1:BEGIN EXCLUSIVE (EXCLUSIVE)
    连接1:SELECT ... (EXCLUSIVE)
    连接1:INSERT ... (EXCLUSIVE)
    连接2:BEGIN (UNLOCKED)
    连接2:SELECT ... (尝试获取SHARED锁,但已有EXCLUSIVE锁未释放,返回SQLITE_BUSY,等待用户重试)
    连接1:COMMIT (EXCLUSIVE,写入完成后释放)
    连接2:SELECT ... (SHARED)
    连接2:INSERT ... (RESERVED)
    连接2:COMMIT (EXCLUSIVE,写入完成后释放)

    不过在并发很高的情况下,直接获取EXCLUSIVE锁的难度比较大;而且为了避免EXCLUSIVE状态长期阻塞其他请求,最好的方式还是让所有写事务都以IMMEDIATE方式开始。
    顺带一提,要实现重试的话,可以使用sqlite3_busy_timeout()或sqlite3_busy_handler()函数。

    由此可见,要想保证线程安全的话,可以有这4种方式:

    1. SQLite使用单线程模式,用一个专门的线程访问数据库。
    2. SQLite使用单线程模式,用一个线程队列来访问数据库,队列一次只允许一个线程执行,队列里的线程共用一个数据库连接。
    3. SQLite使用多线程模式,每个线程创建自己的数据库连接。
    4. SQLite使用串行模式,所有线程共用全局的数据库连接。

     

    三、sqlite3插入速度慢

     

    1.像上述一样显示的给多个insert加上事务

      sqlite在没有显式使用事务的时候会为每条insert都使用事务操作,而sqlite数据库是以文件的形式存在磁盘中,就相当于每次访问时都要打开一次文件,如果对数据进行大量的操作,时间都耗费在I/O操作上,所以很慢。

    解决方法是显式使用事务的形式提交:因为我们开始事务后,进行的大量操作的语句都保存在内存中,当提交时才全部写入数据库,此时,数据库文件也就只用打开一次。

     

    2.如果加上事务还是不行,可以尝试修改同步模式

      初用sqlite3插入数据时,插入每条数据大概需要100ms左右。如果是批量导入,可以引进事务提高速度。但是假设你的业务是每间隔几秒插入几条数据,显然100ms是不能容许的。
    解决办法是,在调用sqlite3_open函数后添加下面一行代码:

        sqlite3_exec(db, "PRAGMA synchronous = OFF; ", 0,0,0);

     

        上面的解决办法貌似治标不治本,为什么加上上面的代码行,速度会提高那么多?

     

    磁盘同步 

    1.如何设置:

    PRAGMA synchronous = FULL; (2) 

    PRAGMA synchronous = NORMAL; (1) 

    PRAGMA synchronous = OFF; (0)
     

    2.参数含义:

    当synchronous设置为FULL (2), SQLite数据库引擎在紧急时刻会暂停以确定数据已经写入磁盘。这使系统崩溃或电源出问题时能确保数据库在重起后不会损坏。FULL synchronous很安全但很慢。
     

    当synchronous设置为NORMAL(1), SQLite数据库引擎在大部分紧急时刻会暂停,但不像FULL模式下那么频繁。 NORMAL模式下有很小的几率(但不是不存在)发生电源故障导致数据库损坏的情况。但实际上,在这种情况 下很可能你的硬盘已经不能使用,或者发生了其他的不可恢复的硬件错误。
     

    设置为synchronous OFF (0)时,SQLite在传递数据给系统以后直接继续而不暂停。若运行SQLite的应用程序崩溃, 数据不会损伤,但在系统崩溃或写入数据时意外断电的情况下数据库可能会损坏。另一方面,在synchronous OFF时 一些操作可能会快50倍甚至更多。在SQLite 2中,缺省值为NORMAL.而在3中修改为FULL。  www.2cto.com  
     

    3.建议:

    如果有定期备份的机制,而且少量数据丢失可接受,用OFF。

         注意上面红色加粗的字样。总结:如果你的数据对安全性完整性等要求不是太高,可以采用设置为0的方法,毕竟只是“数据库可能会损坏”,至于损坏几率为多大,笔者也暂不知晓。。。。。。还没遇到过损坏,不知什么时候才会发生。

    转载于:https://my.oschina.net/u/185531/blog/1786071

    展开全文
  • EXCEL解析导入,批量插入大量数据 excel导入基本分为3个步骤: 1. 上传文件 ...2. 多线程执行 直接贴代码,项目实例: public Result importExcel1() { AmroContext context = AmroContextUtil.get

    EXCEL解析导入,批量插入大量数据

    excel导入基本分为3个步骤:

     1. 上传文件
     2. 解析文件
     3. 逻辑内容
    

    首先查看是哪一步需要优化,一般需要在第3步,插入数据库的逻辑优化。
    如果数据量比较大的话,单条sql执行很慢,几条数据几次和数据库交互。影响执行效率。解决办法:

    1. 批量插入
    2. 多线程执行
    

    直接贴代码,项目实例:

     public Result importExcel1() {
            AmroContext context = AmroContextUtil.getAmroContext();
            InputStream instream = null;
            AmroTransactionManager manager = AmroTransactionManager.buildAmroTransactionManager();
            Workbook wb = null;
    
            MultipartFile mf = context.getMultipartFile("file");
            String sourceId = context.getRequestParamMap().get("sourceId");
    
            LoginAccount loginAccount = context.getLoginAccount();
            String accountID = loginAccount.getAccountId();
            String userName = loginAccount.getUserName();
            String fileName = mf.getOriginalFilename();
            String msg = "";
            String pkid = "";
            List<EmModCnlList> emModCnlLists = new ArrayList<>();
            Result result = null;
    
    
            try {
                instream = mf.getInputStream();
                wb = ExcelUtils.getopenStyle(instream, fileName);
                Sheet sheet = wb.getSheetAt(0);
                /*  int rowNum = sheet.getLastRowNum();*/
                //表头行
                Row row = sheet.getRow(0);
                if (row == null) {
                    msg = "第一行为空,请使用模板文件导入!";
                    CommonSystemException.throwCustomException(com.bireturn.amro.tdms.constants.ErrorMsgConstant.ERR_DEL_BY_SELF, new Object[]{msg});
                }
                int colNum = row.getPhysicalNumberOfCells();
    
                if (colNum < 9) {
                    msg = "小于9列,请使用模板文件导入!";
                    CommonSystemException.throwCustomException(com.bireturn.amro.tdms.constants.ErrorMsgConstant.ERR_DEL_BY_SELF, new Object[]{msg});
                }
                if (colNum != 9) {
                    msg = "导入表格列数有误!";
                    CommonSystemException.throwCustomException(com.bireturn.amro.tdms.constants.ErrorMsgConstant.ERR_DEL_BY_SELF, new Object[]{msg});
                }
                EmModCnlList emModCnlList = null;
                int rows = sheet.getPhysicalNumberOfRows();
                emModCnlListMapper.delDataByAcPKid(sourceId);
    
                long bTime1 = System.currentTimeMillis();
                for (int i = 0; i < rows; i++) {
    
                    //过滤表头行
                    if (i == 0) {
                        continue;
                    }
                    /*   if()*/
                    /*   pkid = emModCnlListMapper.getPkid();*/
                    // 获取当前行的数据
                    Row row2 = sheet.getRow(i);
                    if (row2.getCell(0) == null && row2.getCell(1) == null && row2.getCell(2) == null && row2.getCell(3) == null
                            && row2.getCell(4) == null && row2.getCell(5) == null && row2.getCell(6) == null && row2.getCell(7) == null && row2.getCell(8) == null
                    ) {
                        break;
                    }
                    emModCnlList = new EmModCnlList();
    
                    int index = 0;
                    String value = "";
                    for (Cell cell : row2) {
                        if (cell.getCellType() == Cell.CELL_TYPE_NUMERIC) {
                            value = StringUtils.isNotBlank(String.valueOf(cell.getNumericCellValue())) ? String.valueOf(cell.getNumericCellValue()) : "";
                        } else if (cell.getCellType() == Cell.CELL_TYPE_STRING) {
                            value = StringUtils.isNotBlank(cell.getStringCellValue()) ? cell.getStringCellValue() : "";
                        }else{
                            value = StringUtils.isNotBlank(cell.getStringCellValue()) ? cell.getStringCellValue() : "";
                        }
                        switch (index) {
                            case 0:
                                emModCnlList.setMod(value);
                                index++;
                                continue;
                            case 1:
                                emModCnlList.setIsAntiModOf(value);
                                index++;
                                continue;
                            case 2:
                                emModCnlList.setHasForAntiMod(value);
                                index++;
                                continue;
                            case 3:
                                emModCnlList.setTitle(value);
                                index++;
                                continue;
                            case 4:
                                emModCnlList.setMp(value);
                                index++;
                                continue;
                            case 5:
                                emModCnlList.setMscn(value);
                                index++;
                                continue;
                            case 6:
                                emModCnlList.setSb(value);
                                index++;
                                continue;
                            case 7:
                                emModCnlList.setRfc(value);
                                index++;
                                continue;
                            case 8:
                                emModCnlList.setAta(value);
                                index++;
                                break;
                        }
                        break;
                    }
                    emModCnlList.setAcPkid(sourceId);
                    emModCnlList.setImpDate(new Date());
                    emModCnlList.setImpMan(userName);
                    emModCnlList.setImpManId(accountID);
                    emModCnlLists.add(emModCnlList);
    
                }
                int threadSize = 500;
                int dataSize = emModCnlLists.size();
                // 线程数
                int threadNum = dataSize / threadSize + 1;
                // 定义标记,过滤threadNum为整数
                boolean special = dataSize % threadSize == 0;
    
                // 创建一个线程池
                ExecutorService exec = Executors.newFixedThreadPool(threadNum);
                // 定义一个任务集合
                List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
                Callable<Integer> task = null;
                List<EmModCnlList> cutList = null;
                System.out.println("MOD清单导入——线程开始执行");
                // 确定每条线程的数据
                for (int i = 0; i < threadNum; i++) {
                    if (i == threadNum - 1) {
                        if (special) {
                            break;
                        }
                        cutList = emModCnlLists.subList(threadSize * i, dataSize);
                    } else {
                        cutList = emModCnlLists.subList(threadSize * i, threadSize * (i + 1));
                    }
                    // System.out.println("第" + (i + 1) + "组:" + cutList.toString());
                    final List<EmModCnlList> listStr = cutList;
                    task = new Callable<Integer>() {
    
                        @Override
                        public Integer call() throws Exception {
                           /* System.out.println(Thread.currentThread().getName() + "线程:" + listStr);*/
                            emModCnlListMapper.insertObject(listStr);
                            return 1;
                        }
                    };
                    // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系
                    tasks.add(task);
                }
    
                List<Future<Integer>> results = exec.invokeAll(tasks);
                //捕捉异常或者....  这句话很重要
                for (Future<Integer> res : results) {
                    System.out.println(res.get());
                }
    
                exec.shutdown();
                System.out.println("线程任务执行结束");
                System.err.println("MOD清单导入——执行任务消耗了 :" + (System.currentTimeMillis() - bTime1) + "毫秒");
                manager.setIsOk(true);
            } catch (Exception e) {
                e.printStackTrace();
                if (!msg.equals("")) {
                    CommonSystemException.throwCustomException(com.bireturn.amro.tdms.constants.ErrorMsgConstant.ERR_DEL_BY_SELF, new Object[]{msg});
                } else {
                    CommonSystemException.throwCustomException(com.bireturn.amro.tdms.constants.ErrorMsgConstant.ERR_DEL_BY_SELF, new Object[]{e.toString()});
                }
    
            } finally {
                manager.transactionCommitOrRollback();
                if (wb != null) {
                    try {
                        wb.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (instream != null) {
                    try {
                        instream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            result = Result.buildSuccess();
            return result;
        }
    

    xml文件:数据库oracle

     <insert id="insertObject"
                parameterType="java.util.List">
           insert  into EM_MOD_CNL_LIST(
            PKID,
            AC_PKID,
            MOD,
            IS_ANTI_MOD_OF,
            HAS_FOR_ANTI_MOD,
            TITLE,
            MP,
            MSCN,
            SB,
            RFC,
            ATA,
            IMP_MAN_ID,
            IMP_MAN,
            IMP_DATE
            )
            SELECT EM_MOD_CNL_LIST_SEQ.nextval,s.* FROM
            (
            <foreach collection="list" item="item" index="index" separator="UNION ALL">select
                #{item.acPkid},
                #{item.mod},
                #{item.isAntiModOf},
                #{item.hasForAntiMod},
                #{item.title},
                #{item.mp},
                #{item.mscn},
                #{item.sb},
                #{item.rfc},
                #{item.ata},
                #{item.impManId},
                #{item.impMan},
                #{item.impDate}
                from dual
            </foreach>
            ) s
        </insert>
    

    我的excel只有一个sheet页所以没有优化。如果多个的话可以看着修改一下,把公共的提取出来。如果数据量很多的话,开很多线程影响内存,可以任务不统一提交,每一个线程执行任务完毕以后就释放,这里只用到简单的,为了省事。
    亲测可用!亲测可用!亲测可用!

    展开全文
  • 文章目录前言正文ThreadPool()中的pool.map()多线程优化前期需求可能用到的数据库操作及指令:完整代码 前言 文末附完整代码 本文为基于上一篇文章的多线程优化(完整独立),上一版本爬取时间为21000s左右,优化后...

    前言

    文末附完整代码

    本文为基于上一篇文章的多线程优化(完整独立),上一版本爬取时间为21000s左右,优化后仅3000s左右。上一版(未优化版)链接:
    爬虫爬取京东商品详细数据 (品牌、售价、各类评论量(精确数量)、热评词及数量等)json解析部分数据

    较原版所作改动:
    1.添加多线程优化
    2.运行结果显示优化

    需要的前期准备:
    1.需要预先爬取产品id(data-sku)
    文章链接:(多线程优化)爬取京东笔记本电脑销量榜data-sku(商品id),并存入云服务器中的mysql库(爬取时间较上一版提升十多倍)
    2.需先在存储产品id(data-sku)的数据库对应表中创建相应字段
    详情可查看未优化版。

    3.数据库ip及密码以***替代
    4.可能遇到的错误及解决方法
    数据库设置最大连接数方法(使用ThreadPool多线程插入数据到数据库报错)

    使用ThreadPool()中的pool.map()多线程运行selenium webdriver导致的内存占用巨大,内存溢出乃至电脑卡死解决方法

    正文

    ThreadPool()中的pool.map()多线程优化

    安装:

    pip install threadpool
    

    使用介绍:

    (1)引入threadpool模块

    from multiprocessing.dummy import Pool as ThreadPool
    

    (2)创建线程 池pool = ThreadPool(n) # n为线程数

    pool = ThreadPool(8)
    

    (3)多线程处理

    pool.map(反复调用的函数(只需要函数名),  参数池(元组类型))
    

    (3)结束

    pool.close() 	# 
    
    pool.join()	#
    

    本次选用的内容和使用:

        po = ThreadPool(12)
        po.map(get_data, tuple(url_id[0:6000]))
        time.sleep(1)
        po.close()
        po.join()
    

    前期需求

    需要的前期准备:
    1.需要预先爬取产品id(data-sku)
    文章链接:(多线程优化)爬取京东笔记本电脑销量榜data-sku(商品id),并存入云服务器中的mysql库(爬取时间较上一版提升十多倍)
    2.需先在存储产品id(data-sku)的数据库对应表中创建相应字段
    详情可查看未优化版。

    3.数据库ip及密码以***替代
    4.可能遇到的错误及解决方法
    数据库设置最大连接数方法(使用ThreadPool多线程插入数据到数据库报错)

    使用ThreadPool()中的pool.map()多线程运行selenium webdriver导致的内存占用巨大,内存溢出乃至电脑卡死解决方法

    可能用到的数据库操作及指令:

    1、说明:创建数据库
    CREATE DATABASE database-name
    2、说明:删除数据库
    drop database dbname
    3、说明:备份sql server
    --- 创建 备份数据的 device
    USE master
    EXEC sp_addumpdevice 'disk', 'testBack', 'c:\mssql7backup\MyNwind_1.dat'
    --- 开始 备份
    BACKUP DATABASE pubs TO testBack
    4、说明:创建新表
    create table tabname(col1 type1 [not null] [primary key],col2 type2 [not null],..)
    根据已有的表创建新表:
    A:create table tab_new like tab_old (使用旧表创建新表)
    B:create table tab_new as select col1,col2… from tab_old definition only
    5、说明:删除新表
    drop table tabname
    6、说明:增加一个列
    Alter table tabname add column col type
    注:列增加后将不能删除。DB2中列加上后数据类型也不能改变,唯一能改变的是增加varchar类型的长度。
    7、说明:添加主键: Alter table tabname add primary key(col)
    说明:删除主键: Alter table tabname drop primary key(col)
    8、说明:创建索引:create [unique] index idxname on tabname(col….)
    删除索引:drop index idxname
    注:索引是不可更改的,想更改必须删除重新建。
    9、说明:创建视图:create view viewname as select statement
    删除视图:drop view viewname
    10、说明:几个简单的基本的sql语句
    选择:select * from table1 where 范围
    插入:insert into table1(field1,field2) values(value1,value2)
    删除:delete from table1 where 范围
    更新:update table1 set field1=value1 where 范围
    查找:select * from table1 where field1 like ’%value1%’ ---like的语法很精妙,查资料!
    排序:select * from table1 order by field1,field2 [desc]
    总数:select count as totalcount from table1
    求和:select sum(field1) as sumvalue from table1
    平均:select avg(field1) as avgvalue from table1
    最大:select max(field1) as maxvalue from table1
    最小:select min(field1) as minvalue from table1
    

    结果展示

    原版:在这里插入图片描述

    优化版(现版):在这里插入图片描述

    完整代码

    import requests
    from lxml import etree
    import json
    from multiprocessing.dummy import Pool as ThreadPool
    import random
    import pymysql
    import time
    
    #  请求头池
    user_agent = [
        "Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50",
        "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:38.0) Gecko/20100101 Firefox/38.0",
        "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1",
        "Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11",
    ]
    error_counter = []
    
    
    def get_url():  # 从数据库获取商品url
        try:
            db = pymysql.connect(host="***", user="root", password="***", database="JD_DATA", charset="utf8")
            d = db.cursor()
            sql = 'SELECT url FROM product_data'
            d.execute(sql)
            rows = d.fetchall()
            urls_id = []
            for row in rows:
                for i in row:
                    urls_id.append(i)
            return urls_id
    
            d.close()
            db.close()
    
        except BaseException as e:
            print("获取数据库数据失败!:{}".format(e))
        d.close()
        db.close()
    
    
    def insert(value, table, p):    # 写入数据到数据库函数
        # 连接数据库
        db = pymysql.connect(host="***", user="root", password="***", database="JD_DATA",
                             charset="utf8")
        cursor = db.cursor()
        # 更新数据,判断写入位置
        sql = "UPDATE product_data SET {table} = '{val}' WHERE url = '{p}'".format(table=table, val=value, p=p)
    
        cursor.execute(sql)
        db.commit()
        db.close()
    
    
    def get_data(product_id):  # 获取商品详细数据
        try:
            p_url = 'https://item.jd.com/' + product_id + '.html'
            header = {'User-Agent': random.choice(user_agent)}
            r = requests.get(p_url, headers=header)
    
            detail = etree.HTML(r.text)  # str转HTML
    
            [product_name] = detail.xpath('//*[@id="detail"]/div[2]/div[1]/div[1]/ul[2]/li[1]/@title')  # 商品名称
            # print("商品:" + str(product_name))
    
            [product_brand] = detail.xpath('//*[@id="detail"]/div[2]/div[1]/div[1]/ul[1]/li[1]/a/text()')  # 商品品牌
            # print("品牌:" + str(product_brand))
    
            # 请求价格信息json
            p = requests.get('https:' + '//p.3.cn/prices/mgets?skuIds=J_' + product_id, headers=header)  # 请求商品价格json
            [product_dict] = json.loads(p.text)  # 获取商品价格
            product_m_price = product_dict['m']
            product_price = product_dict['p']
            product_o_price = product_dict['op']
    
            # 请求评论信息json
            c = requests.get('https://club.jd.com/comment/productCommentSummaries.action?referenceIds=' + product_id,
                             headers=header)  # 请求评论json
            comment_dict = json.loads(c.text.split('[')[-1].split(']')[0])  # json内容截取
    
            total_comment_num = comment_dict['CommentCount']
            good_comment_num = comment_dict['ShowCount']
            good_percent_com = comment_dict['GoodRate']
            bad_comment_num = comment_dict['PoorCount']
            bad_percent_com = comment_dict['PoorRate']
    
            # 调用函数,写入数据
            insert(value=product_name, table='product_name', p=product_id)
            insert(value=product_brand, table='product_brand', p=product_id)
            insert(value=product_m_price, table='product_m_price', p=product_id)
            insert(value=product_price, table='product_price', p=product_id)
            insert(value=product_o_price, table='product_o_price', p=product_id)
            insert(value=total_comment_num, table='total_comment_num', p=product_id)
            insert(value=good_comment_num, table='good_comment_num', p=product_id)
            insert(value=good_percent_com, table='good_percent_com', p=product_id)
            insert(value=bad_comment_num, table='bad_comment_num', p=product_id)
            insert(value=bad_percent_com, table='bad_percent_com', p=product_id)
    
            # 请求评论信息json
            icon = requests.get(
                'https://club.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98&productId=' + product_id + '&score=0&sortType=5&page=0&pageSize=10',
                headers=header)
            comment_ic = json.loads(icon.text.split('hotCommentTagStatistics":')[-1].split(',"jwotestProduct')[0])
            icon = []
            for ic in comment_ic:
                comment_ic_name = ic['name']
                comment_ic_num = ic['count']
                comment_icon = comment_ic_name + '(' + str(comment_ic_num) + ')'
                icon.append(comment_icon)
            comment_icon_all = ','.join(icon)
            # print(comment_icon_all)
    
            insert(value=comment_icon_all, table='dict_icon', p=product_id)
            '''
            print("当前价格:" + product_price)
            print("指导价格:" + product_o_price)
            print("最高价格:" + product_m_price)
    
            print("总评论数为:{}" .format(total_comment_num))
            print("好评数: {}" .format(good_comment_num))
            print("好评率: {}" .format(good_percent_com))
            print("差评数: {}".format(bad_comment_num))
            print("差评率: {}".format(bad_percent_com))
            '''
        except Exception:
            error_counter.append(error_counter)
    
    
    if __name__ == "__main__":
        start = time.perf_counter()
        print('正在获取产品id......')
        url_id = get_url()
        print('获取产品id成......')
        print('正在获取产品数据......')
        po = ThreadPool(12)
        po.map(get_data, tuple(url_id[0:6000]))
        time.sleep(1)
        po.close()
        po.join()
        end = time.perf_counter()
        # print('\n程序完成!\n共爬取{}件商品数据,失败{}件,耗时{}s。'.format(i-1, j-1, end-start))
        print('\n程序完成!\n累计失败件数为:{}件!\n累计耗时{}s。'.format(len(error_counter), end-start))
    
    
    展开全文
  • MySql 快速插入千万级大数据

    万次阅读 2016-06-17 10:08:42
    在数据分析领域,数据库是我们的好帮手。...在参加阿里的天池大数据算法竞赛中(流行音乐趋势预测),我遇到了这样的问题,在没有优化数据库查询及插入之前,我花了不少冤枉时间,没有优化之前,1500万条数据
  • 线程池的配置与并发优化处理,SQL优化性能的一些心得
  • 但是这个多线程事务......没事,我慢慢给你说。如图所示,有个小伙伴想要实现多线程事务。这个需求其实我在不同的地方看到过很多次,所以我才说:这个问题又出现了。那么有解决方案吗?在此之前,我的回答都是非常的...
  • ) 2)ParNew垃圾收集器(多线程串行垃圾收集器,Serial的多线程版本,其它特性同Serial。) 3)Parallel Scavenge垃圾收集器(类似于ParNew,但是该收集器关注的是cpu的吞吐量,通过参数来控制吞吐量,是吞吐量优 ...
  • Oracle插入大数据的方法总结

    千次阅读 2013-11-25 22:23:01
    假设T1是待插入的表,数据来自T2(或者表连接产生的临时表) 一、使用hint INSERT /*+ nologging append parallel(T1,4)*/  INTO T1 SELECT * FROM T2; 1、非归档模式下,只需append就能大量减少redo...
  • java处理大数据的一个优化解决方案

    千次阅读 2015-04-01 20:12:11
    之前和大家提过我们公司现在在做一个手机应用商店的项目,之前测过平均每分钟有2000条请求,每秒就是50左右,现在肯定更,数据量大的时候每秒有400~500条sql插入操作(记录用户行为,每个请求都会将信息写入log表),...
  • 在处理本地数据上,我比较喜欢选择使用它,不仅是因为他与sql server有着比较相近的语法,还因为它不需要安装,仅需要通过命令行就能启动了,而且他在处理大数据时,性能比sql server好很,好吧这里不继续争论性能...
  • java多线程实现大批量数据切分成指定份数的数据,然后多线程处理入库或者导出,线程的个数和每份数据的数量都可以控制
  • 点击上方蓝字关注我们基因组大数据变异检测算法的并行优化崔英博1,黄春1,唐滔1,杨灿群1,廖湘科1,彭绍亮2,31国防科技大学计算机学院,湖南 长沙 4100732湖南大学...
  • 多线程在随着cpu发展应用的是越来越多,游戏场景因为其在服务器所占的数据量与逻辑复杂度的原因必须依赖于它。为什么场景要采用多线程?场景的线程是怎样的?场景的线程又是如何创建的?场景的线程又是怎样管理的?...
  • 大数据

    2015-02-15 14:18:56
    用户行为类数据是最常见的大数据形式,比如电信的通话记录、网站的访问日志、应用商店的app下载记录、银行的账户信息、机顶盒的观看记录、股票的交易记录、保险业的保单信息,连锁超市会员的购物信息、交通违法信息...
  • 多线程 ETL JVM调优 内存分析工具MAT … 一 、前言 场景 使用etl及多线程方式同步近3000KW条业务更改记录数据,不同种记录需请求不同平台接口获取详细数据并处理同步到相关数据表,有线程及各种错误日志记录及相关...
  • 大批量数据多线程处理 原因:最近写项目多时候遇到一个这样的问题,Excel批量导入表格,数据量较大,处理时间较长。然后小编想到了可以用多线程处理。 例: //导入失败集合 List<BatchPayDetail...
  • 从上篇的原理,解决实际项目的问题,遇到一个优化问题: •一个月没有执行完的SQL: •业务边IO及CPU占用 TOP 1 •需要业务理解的SQL •底层 IO及CPU占用TOP 1 •模型 1、一个月没有执行完的SQL •Mergetb_wz_...
  • Hbase和Hive集成以及性能优化Hive与HBase集成使用场景Hive与HBase集成原理Hive与HBase集成示例名称空间(NameSpace)安全权限(Security with GRANT)安全权限(revoke&user_permission)Region SplitHBase ...
  • 单表60亿记录等大数据场景的MySQL优化和运维之道
  • 记得开始学习Java的时候,一遇到多线程情况就使用synchronized,相对于当时的我们来说synchronized是这么的神奇而又强大,那个时候我们赋予它一个名字“同步”,也成为了我们解决多线程情况的百试不爽的良药。...
  • 新手入门大数据大数据的入门!!!认识大数据

    万次阅读 多人点赞 2019-04-10 18:25:56
    大数据介绍大数据的由来大数据的应用领域大数据方面核心技术有哪些?一、数据采集与预处理Flume NGNDCLogstashSqoop流式计算Zookeeper二、数据存储HBasePhoenixYarnMesosRedisAtlasKudu三、数据清洗OozieAzkaban四、...
  • mybatis先查询百万条数据,另一张表根据百万条数据中的一个字段修改另一张表 多线程实现,效率要高
  • 第十章 多线程

    千次阅读 2019-09-26 19:46:52
    文章目录什么是线程?能解决什么问题(为什么不仅只有进程)?Java中创建线程的三种方式?线程的生命周期?线程死锁的原因 & 举例 & 如何避免死锁?run()和start()方法区别 & wait()和sleep()方法区别?...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 14,478
精华内容 5,791
关键字:

多线程优化大数据插入