精华内容
下载资源
问答
  • 往MySQL中插入1000000条数据只花了6秒钟! 关键点: 1.使用PreparedStatement对象 2.rewriteBatchedStatements=true 开启批量插入,插入只执行一次,所有插入比较快。 二、 代码 package test0823.demo1; ...
  • 近期为了满足客户的(××电网公司)需求,先说下他们的需求,...3.同步需要保证实时性,数据都是秒级的,一分钟下来至少是2万条数据。   看到这个需求我的第一反应估计跟大家是一样的,就是建立外连接,然后使用mer

    近期为了满足客户的(××电网公司)需求,先说下他们的需求,需求如下:

    1.实现Ⅱ区、Ⅲ区数据库的同步,其中Ⅱ区是主数据库,Ⅲ区是需要同步的数据库。

    2.两台数据库服务器之间是不能直接通讯的,因为Ⅱ、Ⅲ区之间安装了隔离装置,只能通过端口访问。

    3.同步需要保证实时性,数据都是秒级的,一分钟下来至少是2万条数据。

     

    看到这个需求我的第一反应估计跟大家是一样的,就是建立外连接,然后使用merger函数去定时同步,效果是最好的,但最终只能想想了,因为两台服务器根本不能通讯的。

    上面的方法行不通了就得换一种方式了,后面经过和同事们讨论定下了一个方案,思路如下:

    1.使用存储过程生成更新的或者插入的SQL语句,保存到表中;

    2.使用后台服务定期读取表中已经生成好的SQL语句做成bat文件并压缩打包;

    3.使用传输服务将Ⅱ区的文件传输到Ⅲ区指定服务器并解压到指定的目录;

    4.使用Ⅲ区的同步服务调用bat文件进行数据插入

    思路是没问题的,基本功能都实现了,但是问题就处在效率提不上去,同步2万数据需要一分钟以上,这显然是不符合需求的。

     

    行不通就只能换一个方案了,不使用批处理来同步了,先代码来实现同步;

    详细思路如下:

    1.同样是使用存储过程生成指定格式的字符串(XML格式)保存到数据库中,代码如下:

    1)既然是指定格式的话,肯定要模版才行,下面的代码是根据配置生成指定格式的模版

    create or replace procedure P_pub_SyncData_InitTemplate is
      /*
       过程名称:P_pub_SyncData_InitTemplate
       用途:数据库同步,生成模版
       创建日期:2013-07-03 16:41:00
      */
      V_INSERT_TEMPLATE     VARCHAR2(2000);
      V_INSERT_TEMPLATE_COL VARCHAR2(1000);
      V_INSERT_TEMPLATE_VAL VARCHAR2(1000);
      V_DELETE_TEMPLATE     VARCHAR2(1000);
      V_DATA_XMLTEMPLATE    VARCHAR2(2000);
      V_DATA_TYPETEMPLATE   VARCHAR2(2000);
      V_DELETE_COND_TEMPLATE VARCHAR2(500);
      V_COL_LIST                                                    dbms_sql.Varchar2_Table;                     --字段集合
      V_COL_TYPE_LIST                                               dbms_sql.Varchar2_Table;                     --字段类型集合
      CURSOR C_SYNC_CFG IS
        SELECT upper(TABLE_NAME) TABLE_NAME,
               upper(DELETE_CONDITION) DELETE_CONDITION,
               upper(COMPARE_DATE_COL) COMPARE_DATE_COL ,
               EXEC_TYPE,
               SYNCTIME_TYPE,
               TIMETAG_TYPE
          FROM TB_PUB_SYNC_DATA_CFG_NEW;
    begin
      FOR V_SYNC IN C_SYNC_CFG LOOP
        SELECT COLUMN_NAME,DATA_TYPE bulk collect into V_COL_LIST,V_COL_TYPE_LIST from USER_TAB_COLUMNS where table_name=V_SYNC.TABLE_NAME;
         --数据插入 字段
         V_INSERT_TEMPLATE_COL := 'INSERT INTO '|| V_SYNC.TABLE_NAME||'(';
         --数据插入  值
         V_INSERT_TEMPLATE_VAL := ' VALUES(';
         --如果是 是先清空表中的数据  使用truncate  提高效率
         IF V_SYNC.EXEC_TYPE=1 THEN
           V_DELETE_TEMPLATE :='TRUNCATE TABLE '||V_SYNC.TABLE_NAME;
         ELSE
           --删除模版
           V_DELETE_TEMPLATE     :='DELETE '||V_SYNC.TABLE_NAME||' WHERE 1=1 ';
         END IF;
         --XML中的数据行模版
         V_DATA_XMLTEMPLATE    :='SELECT ''<'||V_SYNC.TABLE_NAME||'DataRow ';
         --表格 列的数据类型模版
         V_DATA_TYPETEMPLATE   :='<'||V_SYNC.TABLE_NAME||'Coltype';
         --数据删除时  条件列模版
         V_DELETE_COND_TEMPLATE:='<'||V_SYNC.TABLE_NAME||'DelCol DeleteCol="'||V_SYNC.DELETE_CONDITION||'"></'||V_SYNC.TABLE_NAME||'DelCol>';
         FOR  I IN 1..V_COL_LIST.COUNT LOOP
           V_INSERT_TEMPLATE_COL :=V_INSERT_TEMPLATE_COL||V_COL_LIST(I)||',';
           V_INSERT_TEMPLATE_VAL :=V_INSERT_TEMPLATE_VAL||':'||V_COL_LIST(I)||',';
           V_DATA_TYPETEMPLATE   :=V_DATA_TYPETEMPLATE||' '||V_COL_LIST(I)||'="'||initcap(V_COL_TYPE_LIST(I))||'"';
           V_DATA_XMLTEMPLATE    :=V_DATA_XMLTEMPLATE ||' '||V_COL_LIST(I)||'="'||FUN_PUB_SYNC_TEMPLATE_FORMAT(V_COL_LIST(I),V_COL_TYPE_LIST(I))||'"';
          IF V_SYNC.EXEC_TYPE=0 AND INSTR(V_SYNC.DELETE_CONDITION,V_COL_LIST(I))>0 THEN
           V_DELETE_TEMPLATE     :=V_DELETE_TEMPLATE||' AND '||V_COL_LIST(I)||'=:'||V_COL_LIST(I);
          END IF;
         END LOOP;
         V_INSERT_TEMPLATE :=substr(V_INSERT_TEMPLATE_COL,0,length(V_INSERT_TEMPLATE_COL)-1)||') '||substr(V_INSERT_TEMPLATE_VAL,0,length(V_INSERT_TEMPLATE_VAL)-1)||')';
         V_DATA_TYPETEMPLATE :=V_DATA_TYPETEMPLATE||'></'||V_SYNC.TABLE_NAME||'Coltype>';
         V_DATA_XMLTEMPLATE  :=V_DATA_XMLTEMPLATE||'></'||V_SYNC.TABLE_NAME||'DataRow>'' FROM '||V_SYNC.TABLE_NAME||' WHERE 1=1';

      --生成查询条件模版
      --不区分机组
      IF V_SYNC.TIMETAG_TYPE=1 AND V_SYNC.COMPARE_DATE_COL IS NOT NULL then
        V_DATA_XMLTEMPLATE := V_DATA_XMLTEMPLATE||' AND '||V_SYNC.COMPARE_DATE_COL|| '>{BEGIN_DATE} AND '||V_SYNC.COMPARE_DATE_COL||'<={END_DATE}';
      --区分机组
      ELSIF V_SYNC.TIMETAG_TYPE=0 AND V_SYNC.COMPARE_DATE_COL IS NOT NULL THEN
       V_DATA_XMLTEMPLATE := V_DATA_XMLTEMPLATE||' AND '||V_SYNC.COMPARE_DATE_COL|| '>{BEGIN_DATE} AND '||V_SYNC.COMPARE_DATE_COL||'<={END_DATE} AND SET_CODE={SET_CODE} AND FACTORY_CODE={FACTORY_CODE}';
      ELSIF V_SYNC.TIMETAG_TYPE=0 THEN
       V_DATA_XMLTEMPLATE := V_DATA_XMLTEMPLATE||' AND SET_CODE={SET_CODE} AND FACTORY_CODE={FACTORY_CODE}';
      END IF;
      UPDATE TB_PUB_SYNC_DATA_CFG_NEW SET insert_template=V_INSERT_TEMPLATE,delete_template=V_DELETE_TEMPLATE,DATA_XMLTEMPLATE=V_DATA_XMLTEMPLATE,DELETE_COND_TEMPLATE=V_DELETE_TEMPLATE,DATA_TYPE_TEMPLATE=V_DATA_TYPETEMPLATE WHERE UPPER(TABLE_NAME)=V_SYNC.TABLE_NAME;
      END LOOP;
     
      --插入数据更新状态标签
       MERGE INTO TB_PUB_SYNC_TIMESEG_TAG T USING (select 'SYNCSTATUSTAG' TABLE_NAME FROM DUAL) T1 ON (T.TABLE_NAME=T1.TABLE_NAME)
       WHEN NOT MATCHED THEN
       INSERT VALUES('SYNCSTATUSTAG','0','0',SYSDATE,SYSDATE,SYSDATE); 
      
       COMMIT;
       EXCEPTION WHEN OTHERS THEN
       dbms_output.put_line(substr(sqlerrm,0,200));
       rollback;
    end P_pub_SyncData_InitTemplate;

     

    2)根据生成的模版去生成数据,代码如下:

    create or replace procedure P_PUB_JOB_DATA_SYNC_New
    /*
     过程名称:P_JOB_PUB_DATA_SYNC
     用途:数据库同步,生成数据脚本
     创建人:颜显斌
     创建日期:2013-07-03 16:41:00
    */
     is                                                       
      TYPE                       Mycursor_Type is ref cursor;                                     --游标类型变量
      CUR_SQL                    Mycursor_Type;                                                   --游标变量
      V_SQL                      VARCHAR2(2000);
      V_SYNC_TIME                date;
      V_FACTORY_CODE_LIST        dbms_sql.Varchar2_Table;
      V_SET_CODE_LIST            dbms_sql.Varchar2_Table;
      V_BEGIN_DATE               DATE;
      V_END_DATE                 DATE;
      V_TEMP_SQL                 VARCHAR2(2000);
      v_orderno                  integer:=0;                                        --记录行数
      v_count                    integer;
      v_xmltemp                  varchar2(2000);
      v_exectype                 varchar2(30);
      v_commsql                  varchar2(200);
      CURSOR C_DataSyncCfg IS
      SELECT upper(TABLE_NAME) TABLE_NAME,
               upper(DELETE_CONDITION) DELETE_CONDITION,
               upper(COMPARE_DATE_COL) COMPARE_DATE_COL ,
               insert_template,
               delete_template,
               data_xmltemplate,
               delete_cond_template,
               data_type_template,
               sync_type,
               IS_SYNC,
               EXEC_TYPE,
               SYNCTIME_TYPE,
               TIMETAG_TYPE,
               order_no
          FROM TB_PUB_SYNC_DATA_CFG_NEW where is_sync=1;

     begin
       --为了保证Xml的完整性,当表中还有数据的时候不再插入,等待后台服务读取完成再进行写入
       select count(0) into v_count from tb_pub_sync_data_sql;
       if v_count>0 then
         return;
       end if;
       --更新同步数据的状态标签,0:正在生成数据;1:数据生成完成后台可以调用(目的:保证XML文件的完整性)
       execute immediate 'update TB_PUB_SYNC_TIMESEG_TAG set factory_code=0,set_code=0,begin_time=sysdate,end_time=sysdate,update_time=sysdate where table_name=''SYNCSTATUSTAG''';
       --使用绑定变量  公用的插入数据SQL
       v_commsql :='INSERT INTO TB_PUB_SYNC_DATA_SQL (table_name, SQL_CONTENT, UDPATE_TIME,ORDER_NO)  VALUES (:x,:x,:x,:x)';
       --插入Xml头和根节点
       execute immediate v_commsql using '','<?xml version="1.0" encoding="utf-8" ?><XMLROOT>',sysdate,v_orderno;
       v_orderno :=v_orderno+1;
       --得到配置表中的信息
       for V_DataSyncCfg in C_DataSyncCfg
       LOOP
         --拼凑程序处理中需要的  执行类型
         if V_DataSyncCfg.EXEC_TYPE=1 then
           v_exectype := 'TruncateAndInsert';
         elsif V_DataSyncCfg.EXEC_TYPE=0 then
           v_exectype := 'DeleteAndInsert';
         elsif V_DataSyncCfg.EXEC_TYPE=2 then
           v_exectype := 'Insert';
         else
           v_exectype := 'DeleteAndInsert';
         end if;
         --插入表的模版信息
         v_xmltemp := '<'||V_DataSyncCfg.TABLE_NAME||'  InsertTemplate="'||V_DataSyncCfg.insert_template||'"  DeleteTemplate="'||V_DataSyncCfg.delete_template||'" OrdernNo="'||V_DataSyncCfg.order_no||'" ExecType="'||v_exectype||'">';
         execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,v_xmltemp,sysdate,v_orderno;
         v_orderno :=v_orderno+1;
         --插入删除条件模版
         v_xmltemp := '<'||V_DataSyncCfg.TABLE_NAME||'DelCol DeleteCol="'||V_DataSyncCfg.DELETE_CONDITION||'"></'||V_DataSyncCfg.TABLE_NAME||'DelCol>';
         execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,v_xmltemp,sysdate,v_orderno;
         v_orderno :=v_orderno+1;
        --插入数据类型模版
         v_xmltemp := V_DataSyncCfg.data_type_template;
         execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,v_xmltemp,sysdate,v_orderno;
         v_orderno :=v_orderno+1;
       --时间标签不区分机组
       -------------------------------BEGIN---------------------------------------------------------------------------------
       ----------------------不区分机组的时间标签开始----------------------------------------------------------------------
       --------------------------------------------------------------------------------------------------------------------
       IF V_DataSyncCfg.TIMETAG_TYPE=1 THEN
       BEGIN
       --获取上次更新结束时间
       BEGIN
        SELECT END_TIME INTO V_BEGIN_DATE FROM TB_PUB_SYNC_TIMESEG_TAG WHERE TABLE_NAME=V_DataSyncCfg.TABLE_NAME;
        EXCEPTION WHEN NO_DATA_FOUND THEN
        V_BEGIN_DATE := SYSDATE-365;
       END;
       --如果时间类型为系统时间
       IF V_DataSyncCfg.SYNCTIME_TYPE=0 THEN
        V_END_DATE := SYSDATE;
       --如果时间类型为数据时间
       ELSE
       EXECUTE IMMEDIATE 'SELECT MAX('||V_DataSyncCfg.COMPARE_DATE_COL||') FROM '||V_DataSyncCfg.TABLE_NAME  INTO V_END_DATE;
       END IF;

       IF V_END_DATE IS NULL OR V_END_DATE<=V_BEGIN_DATE THEN
         GOTO LABEL_NEXT_TABLE_SYNC;
       END IF;

       V_TEMP_SQL  :=V_DataSyncCfg.data_xmltemplate;
       V_TEMP_SQL  :=REPLACE(V_TEMP_SQL,'{BEGIN_DATE}','to_date('''||to_char(V_BEGIN_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
       V_TEMP_SQL  :=REPLACE(V_TEMP_SQL,'{END_DATE}','to_date('''||to_char(V_END_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
       --dbms_output.put_line(V_TEMP_SQL);
       ------------------------------BEGIN----------------------------------------------------------------------------------
       ----------------------开始执行动态游标----------------------------------------------------------------------
       --------------------------------------------------------------------------------------------------------------------
       --动态执行模版SQL
       OPEN CUR_SQL FOR V_TEMP_SQL;
       LOOP
       EXIT WHEN CUR_SQL%NOTFOUND;
       FETCH CUR_SQL  INTO V_SQL;
       IF V_SQL IS NOT NULL THEN
       --将结果写入表中
       V_SYNC_TIME := SYSDATE;
       execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,V_SQL,V_SYNC_TIME,v_orderno;
       v_orderno :=v_orderno+1;
       V_SQL := NULL;
       END IF;
       END LOOP;
       --------------------------------------------------------------------------------------------------------------------
       ----------------------结束执行动态游标----------------------------------------------------------------------
       ---------------------------END--------------------------------------------------------------------------------------
       MERGE INTO TB_PUB_SYNC_TIMESEG_TAG T USING (SELECT V_DataSyncCfg.TABLE_NAME TABLE_NAME FROM DUAL) T1
       ON (T.TABLE_NAME=T1.TABLE_NAME)
       WHEN MATCHED THEN
       UPDATE SET END_TIME=V_END_DATE,BEGIN_TIME=V_BEGIN_DATE,UPDATE_TIME=SYSDATE
       WHEN NOT MATCHED THEN
       INSERT VALUES(V_DataSyncCfg.TABLE_NAME,'','',V_BEGIN_DATE,V_END_DATE,SYSDATE);
       END;
       --------------------------------------------------------------------------------------------------------------------
       ----------------------不区分机组的时间标签结束----------------------------------------------------------
       ----------------------------END-------------------------------------------------------------------------------

     

       -------------------------------BEGIN-------------------------------------------------------------------------------
       ----------------------时间标签区分机组开始----------------------------------------------------------------------
       --------------------------------------------------------------------------------------------------------------------
       --时间标签区分机组,这种是出现表中没有UPDATE_TIME的情况下
        ELSE
        BEGIN
        SELECT FACTORY_CODE, SET_CODE BULK COLLECT INTO V_FACTORY_CODE_LIST, V_SET_CODE_LIST FROM V_PUB_SET WHERE IS_VIRTUAL<>1 ORDER BY FACTORY_CODE, SET_CODE desc;

       ----------------------------BEGIN-----------------------------------------------------------------------------------
       ----------------------循环机组列表开始----------------------------------------------------------------------
       --------------------------------------------------------------------------------------------------------------------
        --循环机组列表
        FOR I IN 1..V_SET_CODE_LIST.COUNT LOOP
        --获取开始时间
        BEGIN
        SELECT END_TIME INTO V_BEGIN_DATE FROM TB_PUB_SYNC_TIMESEG_TAG WHERE TABLE_NAME=V_DataSyncCfg.TABLE_NAME AND SET_CODE=V_SET_CODE_LIST(I) AND FACTORY_CODE=V_FACTORY_CODE_LIST(I);
        EXCEPTION WHEN NO_DATA_FOUND THEN
        V_BEGIN_DATE := SYSDATE-365;
        WHEN OTHERS THEN
        GOTO LABEL_NEXT_SET;
        END;
        --如果时间类型为系统时间
        IF V_DataSyncCfg.SYNCTIME_TYPE=0 THEN
        V_END_DATE := SYSDATE;
        --如果时间类型为数据时间
        ELSE
        BEGIN
        EXECUTE IMMEDIATE 'SELECT MAX('||V_DataSyncCfg.COMPARE_DATE_COL||')  FROM '||V_DataSyncCfg.TABLE_NAME||' WHERE SET_CODE='||V_SET_CODE_LIST(I)||'
        AND FACTORY_CODE='''||V_FACTORY_CODE_LIST(I)||''''  INTO V_END_DATE;
        EXCEPTION WHEN OTHERS THEN
        GOTO LABEL_NEXT_SET;
        END;
        END IF;

        IF V_END_DATE IS NULL OR V_END_DATE<=V_BEGIN_DATE  THEN
         GOTO LABEL_NEXT_SET;
        END IF;

        V_TEMP_SQL :=V_DataSyncCfg.data_xmltemplate;
        V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{BEGIN_DATE}','to_date('''||to_char(V_BEGIN_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
        V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{END_DATE}','to_date('''||to_char(V_END_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
        V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{FACTORY_CODE}',''''||V_FACTORY_CODE_LIST(I)||'''');
        V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{SET_CODE}',''''||V_SET_CODE_LIST(I)||'''');

        --dbms_output.put_line(V_TEMP_SQL);
       -------------------------------BEGIN--------------------------------------------------------------------------------
       ----------------------开始执行动态游标----------------------------------------------------------------------
       --------------------------------------------------------------------------------------------------------------------
        --开始执行动态游标
        OPEN CUR_SQL FOR V_TEMP_SQL;
        LOOP
        EXIT WHEN CUR_SQL%NOTFOUND;
        FETCH CUR_SQL  INTO V_SQL;
        IF V_SQL IS NOT NULL THEN
        --将结果写入表中
        V_SYNC_TIME := SYSDATE;
        execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,V_SQL,V_SYNC_TIME,v_orderno;
        v_orderno :=v_orderno+1;
        V_SQL := NULL;
        END IF;
        END LOOP;
       --------------------------------------------------------------------------------------------------------------------
       ----------------------结束执行动态游标----------------------------------------------------------------------
       -----------------------------END------------------------------------------------------------------------------------
        MERGE INTO TB_PUB_SYNC_TIMESEG_TAG T USING (SELECT V_DataSyncCfg.TABLE_NAME TABLE_NAME,V_FACTORY_CODE_LIST(I) FACTORY_CODE,V_SET_CODE_LIST(I) SET_CODE FROM DUAL) T1
        ON (T.TABLE_NAME=T1.TABLE_NAME AND T.FACTORY_CODE=T1.FACTORY_CODE AND T.SET_CODE=T1.SET_CODE)
        WHEN MATCHED THEN
        UPDATE SET END_TIME=V_END_DATE,BEGIN_TIME=V_BEGIN_DATE,UPDATE_TIME=SYSDATE
        WHEN NOT MATCHED THEN
        INSERT VALUES(V_DataSyncCfg.TABLE_NAME,V_FACTORY_CODE_LIST(I),V_SET_CODE_LIST(I),V_BEGIN_DATE,V_END_DATE,SYSDATE);
         --每个机组提交一次
        <<LABEL_NEXT_SET>>
        NULL;
        END LOOP;
       --------------------------------------------------------------------------------------------------------------------
       ----------------------循环机组列表结束----------------------------------------------------------------------
       -----------------------------END-----------------------------------------------------------------------------------

        END;
        END IF;
        --------------------------------------------------------------------------------------------------------------------
        ----------------------时间标签区分机组结束----------------------------------------------------------
        ------------------------------END-----------------------------------------------------------------------------------

        --插入表接点结束符
        execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,'</'||V_DataSyncCfg.TABLE_NAME||'>',sysdate,v_orderno;
        v_orderno:=v_orderno+1;
        --如果是手动同步,则同步一次即修改同步标识,标识为不同步
        IF V_DataSyncCfg.SYNC_TYPE=0 AND V_DataSyncCfg.IS_SYNC=1 THEN
        UPDATE TB_PUB_SYNC_DATA_CFG_NEW SET IS_SYNC=0 WHERE upper(TABLE_NAME)=upper(V_DataSyncCfg.TABLE_NAME);
        END IF;
        <<LABEL_NEXT_TABLE_SYNC>>
         null;
        END LOOP;
        --插入根节点结束符
        execute immediate v_commsql using '','</XMLROOT>',sysdate,v_orderno;
        --更新数据同步状态为‘1’已完成,这时后台服务可以调用
        execute immediate 'update TB_PUB_SYNC_TIMESEG_TAG set factory_code=1,set_code=1,begin_time=sysdate,end_time=sysdate,update_time=sysdate where table_name=''SYNCSTATUSTAG''';   
        COMMIT;
       EXCEPTION WHEN OTHERS THEN
       ROLLBACK;
     END P_PUB_JOB_DATA_SYNC_New;

     

    上面列出的就是数据库的两个核心存储过程,使用job定时去执行P_PUB_JOB_DATA_SYNC_New

    然后使用查询是,用tb_pub_sync_data_sql表中的order_no字段 排序,这样能保证XML的完整性

    3)使用后台服务定时读取tb_pub_sync_data_sql中的数据去生成文件,生成的文件格式如下:

    <?xml version="1.0" encoding="utf-8" ?>

    <XMLROOT>

    --该节点保存表的插入和删除模版以及更新方式

    <表名  InsertTemplate=""  DelteTemplate=""  ExecType=""  Order_No="">

    --该节点保存,删除时需要用那些字段来做条件,多个字段用逗号隔开,该节点每张表只会写一次

    <表名DelCol  DeleteCol="col1,col2,col3"></表名DelCol DelCol>

    --该节点保存表中各字段的数据类型,用于程序进行数据转换,该节点每张表只会写一次

    <表名ColType col1="Varchar2" col2="Number" col3="Varchar2"></表名ColType>

    --该节点用于保存数据,一条数据对应一个节点

    <表名DataRow col1="12" col2="23" col3="45"></表名DataRow>

    </表名>

    </XMLROOT>

    3)将文件压缩打包,然后通过传输服务将文件传送到Ⅲ区服务器指定的目录

    4)Ⅲ区服务器上的数据同步服务,读取指定目录下的XML文件进行解析,转换成目标格式,下面贴出主要的代码

    代码如下:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using DataSyncSvr.DataEntity;
    using Oracle.DataAccess.Client;
    using DataSyncSvr.DataParse;
    using Publish.Log;
    using System.IO;
    using System.Data;

    namespace DataSyncSvr
    {
        public class DataSyncMain
        {
            public ILogger mlog;
            private OracleConnection conn;
            public void DataSyncStart()
            {
                ParseXmlToDataEntity ParseEntity = new ParseXmlToDataEntity();
                Dictionary<string, Dictionary<string, DataSyncDataEntity>> FileList = ParseEntity.GetDataEnetityList();
                Dictionary<string, DataSyncDataEntity> EntityDataList;
                conn = new OracleConnection(SyncConst.Connstr);
                if (conn.State == ConnectionState.Closed)
                {
                    conn.Open();
                }
                DataSyncDataEntity EntityData;
                if (FileList == null || FileList.Keys.Count < 1) return;
                foreach(string filekey in FileList.Keys)
                {
                    EntityDataList=FileList[filekey];
                    foreach (string key in EntityDataList.Keys)
                    {
                        try
                        {
                            EntityData = EntityDataList[key];
                            mlog.LogInfo("开始同步表:\"" + key + "\"");
                            if (EntityData.ExecuteType == ExecType.DeleteAndInsert || EntityData.ExecuteType == ExecType.TruncateAndInsert)
                            {
                                //先执行删除操作
                                AddExecParamAndRun(EntityData, ExecType.Delete);
                                //再执行插入操作
                                AddExecParamAndRun(EntityData, ExecType.Insert);
                            }
                            else
                            {
                                AddExecParamAndRun(EntityData, ExecType.Insert);
                            }
                            mlog.LogInfo("\"" + key + "\"表数据同步完成,受影响行数:(" + EntityData.RecordCount + ")行");
                          
                        }
                        catch (Exception ex)
                        {
                            mlog.LogInfo(ex.ToString());
                            break;
                        }
                    }
                    //删除文件
                    if (File.Exists(filekey))
                    {
                        File.Delete(filekey);
                        mlog.LogInfo("文件:\"" + filekey + "\"已删除");
                    }
                }
                conn.Close();
            }
            private void AddExecParamAndRun(DataSyncDataEntity EntityData, ExecType excutetype)
            {
                OracleParameter Param;
                string[] columns = EntityData.InsertCol;
                string ExecTemplate = EntityData.InsertTemplate;
                if (excutetype.Equals(ExecType.Delete))
                {
                    columns = EntityData.DeleteCol;
                    ExecTemplate = EntityData.DeleteTemplate;
                }
                OracleCommand command = new OracleCommand();
                command.Connection = conn;
                command.CommandText = ExecTemplate;
                command.ArrayBindCount = EntityData.TableData[EntityData.InsertCol[0]].Length;
                command.CommandTimeout = 600;
                if (columns != null && columns.Length > 0)
                {
                    foreach (string colname in columns)
                    {
                        if (!string.IsNullOrEmpty(colname))
                        {
                            Param = new OracleParameter(colname, EntityData.DataType[colname]);
                            Param.Direction = ParameterDirection.Input;
                            Param.Value = EntityData.TableData[colname];
                            command.Parameters.Add(Param);
                        }
                    }
                }
                command.ExecuteNonQuery();
            }
        }
    }

    注意:使用了Oracle.DataAccess.dll动态库,这个可以去官网下载。

    写了这么久也累了,下面就贴上我上传的源码地址吧,http://download.csdn.net/detail/yanxianbin1989/5970217  欢迎各位高手前来交流,相互学习,共同进步。

    献丑了,嘿嘿!

    QQ:936052556;Email:yanxianbin1989@gmail.com或者QQ邮箱也行

    展开全文
  • mysql100万级别的数据批量插入

    千次阅读 2016-06-04 17:05:43
    // 清空上一次添加的数据 suffix = new StringBuffer(); } // 头等连接 pst.close(); conn.close(); } catch (SQLException e) { e.printStackTrace(); } // 结束时间 Long end = new...
    public static void insert() {
    
    // 开时时间
    Long begin = new Date().getTime();
    // sql前缀
    String prefix = "INSERT INTO tb_big_data (count, create_time, random) VALUES ";
    try {
    // 保存sql后缀
    StringBuffer suffix = new StringBuffer();
    // 设置事务为非自动提交
    conn.setAutoCommit(false);
    // Statement st = conn.createStatement();
    // 比起st,pst会更好些
    PreparedStatement pst = conn.prepareStatement("");
    // 外层循环,总提交事务次数
    for (int i = 1; i <= 100; i++) {
    // 第次提交步长
    for (int j = 1; j <= 10000; j++) {
    // 构建sql后缀
    suffix.append("(" + j * i + ", SYSDATE(), " + i * j
    * Math.random() + "),");
    }
    // 构建完整sql
    String sql = prefix + suffix.substring(0, suffix.length() - 1);
    // 添加执行sql
    pst.addBatch(sql);
    // 执行操作
    pst.executeBatch();
    // 提交事务
    conn.commit();
    // 清空上一次添加的数据
    suffix = new StringBuffer();
    }
    // 头等连接
    pst.close();
    conn.close();
    } catch (SQLException e) {
    e.printStackTrace();
    }
    // 结束时间
    Long end = new Date().getTime();
    // 耗时
    System.out.println("cast : " + (end - begin) / 1000 + " ms");

    }


    参考:http://blog.csdn.net/frinder/article/details/38830723

    展开全文
  • 1、用easypoi解析Excel时解析过程已经封装好,我们得到的直接是解析完成的list数据集合,但是当数据量较大,据我测试达到10000时就出现了内存溢出 java heap space。所以数据量过大时,用自己定义的poi进行边解析...

    梗概

    1、用easypoi解析Excel时解析过程已经封装好,我们得到的直接是解析完成的list数据集合,但是当数据量较大,据我测试达到10000条时就出现了内存溢出 java heap space。所以数据量过大时,用自己定义的poi进行边解析,边插入数据库。并且多线程批量插入数据库。这个过程参考该篇文章,帮助很大:
    Springboot+poi上传并处理百万级数据EXCEL
    2、上传到数据库的过程中,前端需要实时显示进度条。看了大部分博客都是对上传文件的监听,获取进度。但是我要做的是对插入数据库的数据进行监听。

    操作步骤

    1、前端页面

    <!DOCTYPE html>
    <html lang="en"
          xmlns:th="http://www.thymeleaf.org">
    <head>
        <script th:src="@{/js/jquery.min.js}"></script>
    </head>
        <div class="modal-header">
            <button type="button" class="close" data-dismiss="modal"
                    aria-hidden="true" >&times;</button>
            <h4 class="modal-title" id="myModalLabel">导入文件1</h4>
        </div>
    
        <div class="modal-body row">
            <button type="button" class="close" data-dismiss="modal"><span aria-hidden="true">&times;</span><span class="sr-only">Close</span></button>
            <form>
                <div class="fallback">
                    <input name="file" type="file" id="file_id" />
                </div>
                <input type="button"  value="上传" class="btn_save" onclick="toProgress()"/>
                <input type="button"  value="取消" class="btn_save" onclick="toCancel()"/>
            </form>
        </div>
    
    <div class="modal-footer" id="myModal_add_progressBar_Module">
        <label class='col-sm-2 control-label'>上传进度:</label>
        <div class='col-sm-10'>
            <div class="progress">
                <div class="progress-bar" id="myModal_add_progressBar" role="progressbar" aria-valuemin="0"
                     aria-valuemax="100" aria-valuenow="0" style="width:0;">
                </div>
            </div>
            <span id="percentage"></span><span id="time"></span>
        </div>
        <div id="son"></div>
    </div>
    </html>
    <script type="text/javascript" src="/tpl/user/user_import.js"></script>
    

    页面图片
    在这里插入图片描述2、js方法

    var timer_is_on = 1;
    var timer ;
    function toCancel() {
        timer_is_on = 0;
        clearInterval(timer);
    }
    function toProgress() {
        var formData = new FormData();
        var file = $("#file_id");
        formData.append('file', file[0].files[0]);
        // 写入数据
        $.ajax({
            url: "/user/user_toImport",
            type: 'POST',
            Accept: 'text/html;charset=UTF-8',
            cache: false,
            data: formData,
            processData: false,
            contentType: false,
            success: function (msg) {
    
            },
        });
    
        /*** 进度条的显示  */
        window.setTimeout(function () {
            if (timer_is_on == 1){
                timer = window.setInterval(function () {
                    $.ajax({
                        type: 'post',
                        dataType: 'json',
                        url: "/user/user_toProress",
                        success: function (data) {
                            $("#myModal_add_progressBar").css("width", data.percent + "%").text(data.percent + "%");
                            if (data.percent == "100") {
                                window.clearInterval(timer);
                            }
                        },
                    });
                }, 800);
            }
        }, 800);
    }
    

    用两个ajax发送两个请求,第一个ajax发送请求进行解析数据保存到数据库,第二个ajax用一个定时器,不断发送请求给后端进行获取进度
    3、Controller

        @RequestMapping(value = "/user/user_toImport")
        @ResponseBody
        public ReturnResult toUploadExcel(
                @RequestParam(value = "file" )MultipartFile multipartFile
        ) {
            try {
                userService.addBlackLists(multipartFile.getInputStream() );
            } catch (Exception e) {
                e.printStackTrace();
            }
            return ReturnResult.success();
        }
        Map<String , Object> resultMap = new HashMap<>();
        @RequestMapping(value = "/user/user_toProress")
        @ResponseBody
        public  Map<String , Object> progress() throws IOException {
            int percent = userService.getPercent();
            resultMap.put("percent" , percent);
            return  resultMap;
        }
    
    

    4、ServiceImpl

    @Service
    public class UserServiceImpl implements UserService {
        @Autowired
        private UserMapper userMapper;
        int addingCount = 0;
        int percent = 0;
        
        @Override
        public int addBlackLists(InputStream is) throws ExecutionException, InterruptedException {
    
            ExecutorService executorService = Executors.newCachedThreadPool();
            ArrayList<Future<Integer>> resultList = new ArrayList<>();
    
            XxlsAbstract xxlsAbstract = new XxlsAbstract() {
      
                 /**
                 step:
                 1、查找sheet:processOneSheet
                 2、处理每一行:optRows
                 3、进入XxlsAbstract后,parse()方法内部不断调用optRows()方法,所以不          断地解析每一行,
                 4、每解析一行,willSaveAmount就会加1。直到加到1001(包含了表头行),将list集合存到数据库
                 5.
                 */
                @Override
                public void optRows(int sheetIndex, int curRow, List<String> rowlist) throws SQLException {
                    /**
                     * 边解析边插入数据库 否则会出现oom java heap space
                     * 判断即将插入的数据是否已经到达1000,如果到达1000,
                     * 进行数据插入
                     */
                    if (this.willSaveAmount == 1001) {
                        List<Map<String, Object>> list = new LinkedList<>(this.dataList.subList(1, 1001));
                        Callable<Integer> callable = () -> {
                            int count = userMapper.addBlackLists(list);
                           //将保存到数据库的数量返回给Controller(getPercent方法),Controller再返回给前端,产生进度
                            addingCount += count;
                            double dPercent = (double) addingCount / 100000;   //将计算出来的数转换成double 
                            percent = (int) (dPercent * 100);               //再乘上100取整
                            return count;
                        };
                        this.willSaveAmount = 0;
                        this.dataList = new LinkedList<>();
                        Future<Integer> future = executorService.submit(callable);
                    }
                    //没有映射到类里面,是因为操作excel数据时边解析边插入数据库,如果要映射要在获取cell时进行映射
                    //汇总数据
                    Map<String, Object> map = new HashMap<>();//LinkedHashMap在遍历的时候会比HashMap慢
                    map.put("user_name", rowlist.get(0));
                    map.put("gender", rowlist.get(1));
                    map.put("account", rowlist.get(2));
                    map.put("password", rowlist.get(3));
                    map.put("address1", rowlist.get(4));
                    map.put("address2", rowlist.get(5));
                    map.put("address3", rowlist.get(6));
                    map.put("address4", rowlist.get(7));
                    map.put("address5", rowlist.get(8));
                    map.put("address6", rowlist.get(9));
                    map.put("address7", rowlist.get(10));
                    map.put("address8", rowlist.get(11));
                    map.put("address9", rowlist.get(12));
                    map.put("address10", rowlist.get(13));
                    map.put("address11", rowlist.get(14));
                    map.put("address12", rowlist.get(15));
                    map.put("address13", rowlist.get(16));
                    map.put("address14", rowlist.get(17));
                    map.put("address15", rowlist.get(18));
                    map.put("address16", rowlist.get(19));
                    map.put("telephone1", rowlist.get(20));
                    map.put("telephone2", rowlist.get(21));
                    map.put("telephone3", rowlist.get(22));
                    map.put("telephone4", rowlist.get(23));
                    map.put("telephone5", rowlist.get(24));
                    map.put("telephone6", rowlist.get(25));
                    map.put("telephone7", rowlist.get(26));
                    map.put("telephone8", rowlist.get(27));
                    map.put("telephone9", rowlist.get(28));
                    map.put("telephone10", rowlist.get(29));
                    map.put("telephone11", rowlist.get(30));
                    map.put("telephone12", rowlist.get(31));
                    map.put("telephone13", rowlist.get(32));
                    map.put("telephone14", rowlist.get(33));
                    map.put("telephone15", rowlist.get(34));
                    map.put("telephone16", rowlist.get(35));
                    map.put("email", rowlist.get(36));
                    this.dataList.add(map);
                    this.willSaveAmount++;
                    this.totalSavedAmount++;
                }
            };
            try {
                xxlsAbstract.processOneSheet(is, 1);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            //针对没有存入的数据进行处理
            if (xxlsAbstract.willSaveAmount != 0) {
                List<Map<String, Object>> list = new LinkedList<>(xxlsAbstract.dataList);
                Callable<Integer> callable = () -> {
                    int count = userMapper.addBlackLists(list);
                    //导出的进度条信息
                    addingCount += count;
                    double dPercent = (double) addingCount / 100000;   //将计算出来的数转换成double 
                    percent = (int) (dPercent * 100);               //再乘上100取整
                    return count;
                };
                Future<Integer> future = executorService.submit(callable);
            }
            executorService.shutdown();
    
            return 0;
            //
        }
    
        @Override
        public int getPercent() {
            return percent;
        }
    
    }
    

    为了减小数据库的io,提高性能。运用1000条批量插入的方式,同时运用Jave的Future模式,多线程操作数据

    5、这里用到的解析Excel的方法

    /**
     * @Author wpzhang
     * @Date 2019/12/17
     * @Description
     */
    package com.yaspeed.core;
    
    import org.apache.poi.openxml4j.opc.OPCPackage;
    import org.apache.poi.ss.formula.functions.T;
    import org.apache.poi.xssf.eventusermodel.XSSFReader;
    import org.apache.poi.xssf.model.SharedStringsTable;
    import org.apache.poi.xssf.usermodel.XSSFRichTextString;
    import org.xml.sax.Attributes;
    import org.xml.sax.InputSource;
    import org.xml.sax.SAXException;
    import org.xml.sax.XMLReader;
    import org.xml.sax.helpers.DefaultHandler;
    import org.xml.sax.helpers.XMLReaderFactory;
    
    import java.io.InputStream;
    import java.sql.SQLException;
    import java.util.*;
    
    /**
     * XSSF and SAX (Event API)
     */
    public abstract class XxlsAbstract extends DefaultHandler {
        private SharedStringsTable sst;
        private String lastContents;
        private boolean nextIsString;
        private int sheetIndex = -1;
        private List<String> rowlist = new ArrayList<>();
    //    public List<Map<String , Object>> dataList = new LinkedList<>();  //即将进行批量插入的数据
        public List<Map<String , Object>> dataList = new LinkedList<>();  //即将进行批量插入的数据
        public int willSaveAmount;  //将要插入的数据量
    
        public int totalSavedAmount; //总共插入了多少数据
        private int curRow = 0; //当前行
        private int curCol = 0; //当前列索引
        private int preCol = 0; //上一列列索引
        private int titleRow = 0; //标题行,一般情况下为0
        private int rowsize = 0; //列数
    
        //excel记录行操作方法,以sheet索引,行索引和行元素列表为参数,对sheet的一行元素进行操作,元素为String类型
        public abstract void optRows(int sheetIndex, int curRow, List<String> rowlist) throws SQLException;
    //只遍历一个sheet,其中sheetId为要遍历的sheet索引,从1开始,1-3
    
        /**
         * @paramis
         * @param sheetId  sheetId为要遍历的sheet索引,从1开始,1-3
         * @throws Exception
         */
        public void processOneSheet(InputStream is, int sheetId) throws Exception {
            OPCPackage pkg = OPCPackage.open(is);
            XSSFReader r = new XSSFReader(pkg);
            SharedStringsTable sst = r.getSharedStringsTable();
            XMLReader parser = fetchSheetParser(sst);
    // rId2 found by processing the Workbook
    // 根据 rId# 或 rSheet# 查找sheet
            InputStream sheet2 = r.getSheet("rId" + sheetId);
            sheetIndex++;
            InputSource sheetSource = new InputSource(sheet2);
            parser.parse(sheetSource);// 调用endElement
            sheet2.close();
        }
    
    
        public XMLReader fetchSheetParser(SharedStringsTable sst)
                throws SAXException {
            XMLReader parser = XMLReaderFactory.createXMLReader();
    //.createXMLReader("org.apache.xerces.parsers.SAXParser");
            this.sst = sst;
            parser.setContentHandler(this);
            return parser;
        }
    
        public void startElement(String uri, String localName, String name,
                                 Attributes attributes) throws SAXException {
    // c => 单元格
            if (name.equals("c")) {
    // 如果下一个元素是 SST 的索引,则将nextIsString标记为true
                String cellType = attributes.getValue("t");
                String rowStr = attributes.getValue("r");
                curCol = this.getRowIndex(rowStr);
                if (cellType != null && cellType.equals("s")) {
                    nextIsString = true;
                } else {
                    nextIsString = false;
                }
            }
    // 置空
            lastContents = "";
        }
    
    
        public void endElement(String uri, String localName, String name)
                throws SAXException {
            //todo 3 调用optRows()
    // 根据SST的索引值的到单元格的真正要存储的字符串
    // 这时characters()方法可能会被调用多次
            if (nextIsString) {
                try {
                    int idx = Integer.parseInt(lastContents);
                    lastContents = new XSSFRichTextString(sst.getEntryAt(idx))
                            .toString();
                } catch (Exception e) {
                }
            }
    // v => 单元格的值,如果单元格是字符串则v标签的值为该字符串在SST中的索引
    // 将单元格内容加入rowlist中,在这之前先去掉字符串前后的空白符
            if (name.equals("v")) {
                String value = lastContents.trim();
                value = value.equals("") ? " " : value;
                int cols = curCol - preCol;
                if (cols > 1) {
                    for (int i = 0; i < cols - 1; i++) {
                        rowlist.add(preCol, "");
                    }
                }
                preCol = curCol;
                rowlist.add(curCol - 1, value);//这里对rowlist进行了初始化
            } else {
    //如果标签名称为 row ,这说明已到行尾,调用 optRows() 方法
                if (name.equals("row")) {
                    int tmpCols = rowlist.size();
                    if (curRow > this.titleRow && tmpCols < this.rowsize) {
                        for (int i = 0; i < this.rowsize - tmpCols; i++) {
                            rowlist.add(rowlist.size(), "");
                        }
                    }
                    try {
                        optRows(sheetIndex, curRow, rowlist);
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                    if (curRow == this.titleRow) {
                        this.rowsize = rowlist.size();
                    }
                    rowlist.clear();
                    curRow++;
                    curCol = 0;
                    preCol = 0;
                }
            }
        }
    
        public void characters(char[] ch, int start, int length)
                throws SAXException {
    //得到单元格内容的值
            lastContents += new String(ch, start, length);
        }
    
        //得到列索引,每一列c元素的r属性构成为字母加数字的形式,字母组合为列索引,数字组合为行索引,
    //如AB45,表示为第(A-A+1)*26+(B-A+1)*26列,45行
        public int getRowIndex(String rowStr) {
            rowStr = rowStr.replaceAll("[^A-Z]", "");
            byte[] rowAbc = rowStr.getBytes();
            int len = rowAbc.length;
            float num = 0;
            for (int i = 0; i < len; i++) {
                num += (rowAbc[i] - 'A' + 1) * Math.pow(26, len - i - 1);
            }
            return (int) num;
        }
    
        public int getTitleRow() {
            return titleRow;
        }
    
        public void setTitleRow(int titleRow) {
            this.titleRow = titleRow;
        }
    }
    

    6、顺便记录下将list<Map<String , Object>>插入到数据库的mapper.xml写法

     <insert id="addBlackLists" parameterType="java.util.List">
              INSERT INTO user_import
              (
              user_name , gender , account , password ,
              address1 , address2 , address3 , address4 , address5 , address6 , address7 , address8 , address9 , address10 , address11 , address12 , address13 , address14 , address15 , address16 ,
              telephone1 , telephone2 , telephone3 , telephone4 , telephone5 , telephone6 , telephone7 , telephone8 , telephone9 , telephone10 , telephone11 , telephone12 , telephone13 , telephone14 , telephone15 , telephone16 ,
              email
              )
              VALUES
            <foreach collection="list" item="item" index="index" separator=",">
              (
              #{item.user_name , jdbcType=VARCHAR} , #{item.gender, jdbcType=VARCHAR} , #{item.account , jdbcType=VARCHAR} , #{item.password, jdbcType=VARCHAR} ,
              #{item.address1, jdbcType=VARCHAR} , #{item.address2, jdbcType=VARCHAR} , #{item.address3, jdbcType=VARCHAR} , #{item.address4, jdbcType=VARCHAR} , #{item.address5, jdbcType=VARCHAR} , #{item.address6, jdbcType=VARCHAR} , #{item.address7, jdbcType=VARCHAR} , #{item.address8, jdbcType=VARCHAR} , #{item.address9, jdbcType=VARCHAR} , #{item.address10, jdbcType=VARCHAR} , #{item.address11, jdbcType=VARCHAR} , #{item.address12, jdbcType=VARCHAR} , #{item.address13, jdbcType=VARCHAR} , #{item.address14, jdbcType=VARCHAR} , #{item.address15, jdbcType=VARCHAR} , #{item.address16, jdbcType=VARCHAR},
              #{item.telephone1,jdbcType=VARCHAR} , #{item.telephone2,jdbcType=VARCHAR} , #{item.telephone3,jdbcType=VARCHAR} , #{item.telephone4,jdbcType=VARCHAR} , #{item.telephone5,jdbcType=VARCHAR} , #{item.telephone6,jdbcType=VARCHAR} , #{item.telephone7,jdbcType=VARCHAR} , #{item.telephone8,jdbcType=VARCHAR} , #{item.telephone9,jdbcType=VARCHAR} , #{item.telephone10,jdbcType=VARCHAR} , #{item.telephone11,jdbcType=VARCHAR} , #{item.telephone12,jdbcType=VARCHAR} , #{item.telephone13,jdbcType=VARCHAR} , #{item.telephone14,jdbcType=VARCHAR} , #{item.telephone15,jdbcType=VARCHAR} , #{item.telephone16,jdbcType=VARCHAR} ,
              #{item.email ,jdbcType=VARCHAR}
              )
          </foreach>
        </insert>
    

    在插入到数据库的同时前端一直不断在发请求,Controller中一直在执行progress()方法,通过调用getPercent()不断获取serviceImpl中的percent(进度),将进度塞到map集合里传给前端
    效果图
    在这里插入图片描述经测试,插入100000条数据到数据库2分钟不到,由于电脑太垃圾,正常电脑应该在1分半钟左右

    展开全文
  • 向Mysql批量插入50万条数据

    千次阅读 2019-11-03 22:40:54
    》批量插入数据脚本 表Sql: CREATE TABLE dept( id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT, deptno MEDIUMINT UNSIGNED NOT NULL DEFAULT 0, dname VARCHAR(20) NOT NULL DEFAULT '', loc VARCHAR(13) NOT...

    》批量插入数据脚本

    表Sql:

    CREATE TABLE dept(

    id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT,

    deptno MEDIUMINT UNSIGNED NOT NULL DEFAULT 0,

    dname VARCHAR(20) NOT NULL DEFAULT '',

    loc VARCHAR(13) NOT NULL DEFAULT ''

    )ENGINE=INNODB DEFAULT CHARSET=GBK;

     

     

    CREATE TABLE emp

    (id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT,

    empno MEDIUMINT UNSIGNED NOT NULL DEFAULT 0,

    ename VARCHAR(20) NOT NULL DEFAULT '',

    job VARCHAR(9) NOT NULL DEFAULT '',

    mgr MEDIUMINT UNSIGNED NOT NULL DEFAULT 0,

    hiredate DATE NOT NULL,

    sal DECIMAL(7,2) NOT NULL,

    comm DECIMAL(7,2) NOT NULL,

    deptno MEDIUMINT UNSIGNED NOT NULL DEFAULT 0

    )ENGINE=INNODB DEFAULT CHARSET=GBK;

     

    随机产生字符串:

    DELIMITER $$

    CREATE FUNCTION rand_string(n INT) RETURNS VARCHAR(255)

    BEGIN

    DECLARE chars_str VARCHAR(100) DEFAULT 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';

    DECLARE return_str VARCHAR(255) DEFAULT '';

    DECLARE i INT DEFAULT 0;

    WHILE i<n DO

    SET return_str=CONCAT(return_str,SUBSTRING(chars_str,FLOOR(1+RAND()*52),1));

    SET i=i+1;

    END WHILE;

    RETURN return_str;

    END $$

     

    随机产生部门编号:

    DELIMITER $$

    CREATE FUNCTION rand_num()

    RETURNS INT(50)

    BEGIN

    DECLARE i INT DEFAULT 0;

    SET i= FLOOR(100+RAND()*10);

    RETURN i;

    END $$

     

     

    (假如要删除:drop function rand_num;)

     

    创建存储过程:

     创建往emp 表中插入数据的存储过程:

    DELIMITER $$

    CREATE PROCEDURE insert_emp(IN START INT(10),IN max_num INT(10))

    BEGIN

    DECLARE i INT DEFAULT 0;

    SET autocommit =0;

    REPEAT

    SET i = i+1;

    INSERT INTO emp(empno,ename,job,mgr,hiredate,sal,comm,deptno) VALUES((START+i),rand_string(6),'SALESMAN',0001,CURDATE(),2000,400,rand_num());

    UNTIL i=max_num

    END REPEAT;

    COMMIT;

    END$$

     

     

    往dept表添加随机数据:

    DELIMITER $$

    CREATE PROCEDURE insert_dept(IN START INT(10),IN max_num INT(10))

    BEGIN

    DECLARE i INT DEFAULT 0;

    REPEAT

    SET i = i+1;

    INSERT INTO dept(deptno,dname,loc) VALUES ((START+i),rand_string(10),rand_string(8));

    UNTIL i = max_num

    END REPEAT;

    COMMIT;

    END $$

     

     

    (如果下删除:DROP PROCEDURE insert_dept;)

     

     

    调用存储过程:

    (先把;分号为介绍符切换回来:DELIMITER ;  )

    插入dept:

      CALL insert_dept(100,10);

     

    插入emp数据添加500000条数据;

    CALL insert_emp(100001,500000);

    查阅:

    SELECT COUNT(*) FROM emp;

    展开全文
  • 1.出现高并发现象。 2.php脚本有个默认的执行超时时间 默认为30秒,所以对批量数据处理的时候会停止, 解决方法:可以再第一行代码中添加set_time_limit(0);(永不超时)...
  • 先提出一个简单需求:如下 ...后面此simple会继续更新至GitHub,以便作为一个完整最终包含了最初学习的HDFS到离线数据处理到最后的实时流式数据处理的Spark 。 很高兴 这是挑战自己的高度再次成功的案例,继续
  • 基本思路如下 poi 基于xml解析(event user model ) 多线程批量插入 软件环境 Springboot2.0 + MyBatis Mysql 5.7 + poi 基于xml解析(event user model ) package ......
  • MySQL数据库到底能存储多少数据呢?很多小伙伴可能会觉得MySQL数据库存储的数据量不大,不能支撑起大数据量的存储与读写性能。实则不然,只要架构设计得当,MySQL其实完全可以存储海量数据,并且在海量数据的存储下...
  • 1 Excel上传 针对Excel的上传,采用的是比较常规的方法,其实和文件上传是相同的。具体源码如下: @PostMapping(value = "", consumes = "multipart/*", headers = "content-type=...
  • Java上传和下载Excel数据

    千次阅读 2018-05-10 17:47:26
    map.put("message","上传失败,上传数据必须大于一"); return map; } for (Map, String> dataMap : listData) { System.out.println(keys[0] + ":" + dataMap.get(keys[0])); System.out....
  • 资源是.sql文件压缩后上传(文件太大,解压后470多M)。用navicat导入实测用时5分钟,数据量300W。快速获取百万级真实测试数据
  • 阅读文本大概需要3分钟。来源:https://www.cnblogs.com/barrywxx/p/10700221.html最近业务方有一个需求,需要一次导入超过100万数据到系统数据...
  • 我在反病毒和反恶意软件行业的不同公司工作了12年,从而我知道由于我们每天处理大量的数据,这些系统是多么复杂。有趣的是,在过去的大约9年间,我参与的所有的web后端的开发通常是通过Ruby on Ra
  • java百万条数据导出到excel表格

    千次阅读 2019-03-25 12:34:38
    easyexcel重写了poi对07版Excel的解析,能够原本一个3M的excel用POI sax依然需要100M左右内存降低到几M,并且再大的excel不会出现内存溢出,03版依赖POI的sax模式。在上层做了模型转换的封装,让使用者更加简单方便....
  • 由于项目需要对大量Excel数据进行输入输出处理,在使用JXL,POI后发现很容易出现OOM,最后在网上找到阿里的开源...经过大量的调试优化,现通过JAVA生成104行20列的数据并写入到Excel文件的Sheet中只需要70秒的时间。
  • Apache Flink 在快手亿级数据的应用实践总结

    千次阅读 多人点赞 2019-07-17 15:07:35
    优化后的效果:P99 查询时延性能提升 10 倍,即 nextKey 获取 RocksDB 一条数据, P99 时延由 1000 毫秒到 100 毫秒以内。 作业吞吐反压问题进而得到解决。 1.2.5 RocksDB 磁盘压力问题 Flink 作业上线遇到的...
  • 除了有2个零星的点,价格在100左右,销量也在100万上下。 需要说明的是,销量随着价格锐减,这是很正常的,京东淘宝做个点图估计也差不多。 再细化一点可以吗 可以的。食品是销售额第一的品类,衣食住行,不意外,...
  • 网络摄像头100万.200万.300万.400万.500万分辨率多少?传输带宽分别是多少?存储空间是多少? 500W≈2560*1920 300W≈2048*1536 200W≈1920*1080 200W≈1600*1200 130W≈1280*960 100W≈1280*720 格式 ...
  • 本文代码实现基本按照《数据结构》课本目录顺序,外加大量的复杂算法实现,一篇文章足够。能换你一个收藏了吧?
  • PHPEXCEL 20万数据导入导出(一)

    千次阅读 2019-07-31 09:53:28
    本片博客记录了一次实际开发中的需要使用PHPExcel导入导出大量数据(20万)的解决过程。 复盘优化自己的项目,好处在于,一旦已...在本项目开发中,需要导入20万人员的信息,导出100万条用户信息。 通过查阅网上资...
  • 先上导出代码  ...列名,数据> /// <param name=filepath></param> /// <returns></returns> public bool NewExport(List<DictionaryEntry> list, string filepath) { bool bSuccess = true;
  • asp net做数据导入的时候遇到超时情况和查询速度慢的情况 在此提供的经验是刚刚测试成功的大项目 兴奋之余把经验方法写成文档教程在此上传以供分享 经验 技术 知识 岂能是虚拟资源分可比的 文档内容不多 就两页 主要...
  • 数据结构面试100

    万次阅读 多人点赞 2017-04-08 22:22:32
    引言  无私分享造就开源的辉煌。... 一年之前的10月14日,一个名叫July (头像为手冢国光)的人在一个叫csdn的论坛上开帖分享微软等公司数据结构+算法面试100题,自此,与上千网友一起做,一起思考
  • 一,演示和需求的SDK和ak等配置最近我们一个项目需要自己android端上传数据到百度LBS服务,然后获取数据并显示。搞了2天完成了。其实很简单的哦!我们一步步来,今天的活我一小时搞定,一天写博客,真的很激动解决了...
  • 300万数据的导入导出

    千次阅读 2017-03-20 17:09:03
    进入了一个公司实习,要求自己写一个基于spring boot,用JPA实现的增删改查的小项目,前台用bootstrap,期间叫...10万数据量对于彼时的我也已经是不小的数据了,从来没有操作这么大的数据。当然期间用POI操作Excel。
  • 用PHP一次写入百万测试数据

    千次阅读 2018-03-12 17:13:15
    亲测有效,本人网上各种搜资料摸索出来滴,执行效率甚至比通过navicat还要高。navicat见我另一篇博客。...1、PHP代码//添加测试数据 private function add_test_data($max=1000000){ $mo...
  • 微软等数据结构+算法面试100题全部答案集锦 作者:July、阿财。 时间:二零一一年十月十三日。 引言  无私分享造就开源的辉煌。  今是二零一一年十月十三日,明日14日即是本人刚好开博一周年。在一...
  • C#一次性向数据库插入上万条数据的方法![图片说明](https://img-ask.csdn.net/upload/201701/17/1484640104_603613.png)就是这张表,求一下代码的写法
  • 1.数据块恢复 当某台机器上的一个DataNode进程down掉,HDFS为了保证文件的副本满足设定的副本数,会进行数据块的恢复操作。块恢复操作主要受两个参数影响: a)dfs.namenode.replication.work.multiplier.per....

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 28,725
精华内容 11,490
关键字:

上传100万条数据