精华内容
下载资源
问答
  • 今天刚看完崔大佬的《python3网络爬虫开发实战》,顿时觉得...为了避免这种尴尬,以及我突然想写博客的心情,我决定还是为大家在进行一次简易爬虫展示,总体程序我会利用多线程的方式来充分利用CPU的空闲时间,其中我也
  • 多线程断点续传(数据库保存进度)

    千次阅读 2016-05-17 16:35:15
    由于是多线程断点下载,在开始下载前需要根据当前文件的大小以及线程的数量将任务进行分配,并根据每个线程需要下载的信息创建不同的线程实体,同时把线程信息保存到数据库中。利用Range请求头去请求不同部分的数据...

    今天来看看多线程断点续传(数据库保存进度)

    效果图

    这里写图片描述

    原理图

    这里写图片描述

    基本流程

    点击下载按钮后启动Service,去获取文件信息,获取文件信息成功后启动下载任务并把文件信息传过去,由于是多线程断点下载,在开始下载前需要根据当前文件的大小以及线程的数量将任务进行分配,并根据每个线程需要下载的信息创建不同的线程实体,同时把线程信息保存到数据库中。利用Range请求头去请求不同部分的数据,如果点击了暂停任务,则将当前任务暂停的标记设置为true,并且保存进度到数据库,下次继续下载的时候先从数据库中获取线程信息,根据当前的信息继续下载。

    代码

    FileListAdapter

    public class FileListAdapter extends BaseAdapter {
        private static final String TAG = "FileListAdapter";
        private Context mContext;
        private List<FileInfo> mListInfo;
    
        public FileListAdapter(Context context,List<FileInfo> infos){
            mContext = context;
            mListInfo = infos;
        }
    
        @Override
        public int getCount() {
            return mListInfo.size();
        }
    
        @Override
        public Object getItem(int position) {
            return mListInfo.get(position);
        }
    
        @Override
        public long getItemId(int position) {
            return position;
        }
    
        @Override
        public View getView(int position, View convertView, ViewGroup parent) {
    //        //获取对应位置的文件信息
            final FileInfo fileInfo = mListInfo.get(position);
            ViewHolder holder;
            if(convertView == null){
                convertView = LayoutInflater.from(mContext).inflate(R.layout.list_item,null);
                holder = new ViewHolder();
                holder.tvFile = (TextView) convertView.findViewById(R.id.textName);
                holder.btnStart = (Button) convertView.findViewById(R.id.btn_start);
                holder.btnStop = (Button) convertView.findViewById(R.id.btn_stop);
                holder.pbProgress = (ProgressBar) convertView.findViewById(R.id.progressBar);
                holder.tvFile.setText(fileInfo.fileName);
                holder.pbProgress.setMax(100);
                holder.btnStart.setOnClickListener(new View.OnClickListener() {
                    @Override
                    public void onClick(View v) {
                        //启动服务,把文件信息传过去
                        Intent intent = new Intent(mContext, DownloadService.class);
                        intent.setAction(DownloadService.ACTION_START);
                        intent.putExtra("fileinfo",fileInfo);
                        mContext.startService(intent);
                    }
                });
                holder.btnStop.setOnClickListener(new View.OnClickListener() {
                    @Override
                    public void onClick(View v) {
                        //停止服务
                        Intent intent = new Intent(mContext, DownloadService.class);
                        intent.setAction(DownloadService.ACTION_STOP);
                        intent.putExtra("fileinfo",fileInfo);
                        mContext.startService(intent);
                    }
                });
                convertView.setTag(holder);
            }else{
                holder = (ViewHolder) convertView.getTag();
            }
            holder.pbProgress.setProgress(fileInfo.finished);
            return convertView;
        }
    
        public void updateProgressBar(int id, int progress){
            FileInfo fileInfo = mListInfo.get(id);
            fileInfo.finished = progress;
            notifyDataSetChanged();
        }
    
        //为什莫要定义为静态的,因为静态内部类只会被加载一次
        static class ViewHolder{
            TextView tvFile;
            Button btnStart;
            Button btnStop;
            ProgressBar pbProgress;
        }
    }

    MainActivity

    public class MainActivity extends AppCompatActivity {
        private static final String TAG = "MainActivity";
        //url必须带前缀,也就是协议,否则会报错
        public static final String url1 = "http://www.imooc.com/mobile/imooc.apk";
    //    public static final String url2 = "http://www.imooc.com/download/Activator.exe";
    //    public static final String url3 = "http://www.imooc.com/download/iTunes64Setup.exe";
        private List<FileInfo> fileList;
        private BroadcastReceiver receiver = new BroadcastReceiver() {
            @Override
            public void onReceive(Context context, Intent intent) {
                String action = intent.getAction();
                if(DownloadService.ACTION_UPDATE.equals(action)){
                    int finished = intent.getIntExtra("finished", 0);
                    int id = intent.getIntExtra("id", 0);
                    adapter.updateProgressBar(id,finished);//根据id更新进度
                }else  if(DownloadService.ACTION_FINISHED.equals(action)){
                    Toast.makeText(MainActivity.this, "下载完成", Toast.LENGTH_SHORT).show();
                    FileInfo fileInfo = (FileInfo) intent.getSerializableExtra("fileinfo");
                    int id = fileInfo.id;
                    adapter.updateProgressBar(id,0);//下载完成将进度条置为0
                }
            }
        };
        private ListView listFile;
        private FileListAdapter adapter;
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
             initData();
            //注册广播
            IntentFilter filter = new IntentFilter();
            filter.addAction(DownloadService.ACTION_UPDATE);
            filter.addAction(DownloadService.ACTION_FINISHED);
            registerReceiver(receiver, filter);
        }
    
        public void initData(){
            listFile = (ListView) findViewById(R.id.listFile);
            fileList = new ArrayList<>();
            FileInfo fileInfo1 = new FileInfo(0,url1,"imooc1.apk",0,0);
            FileInfo fileInfo2 = new FileInfo(1,url1,"imooc2.apk",0,0);
            FileInfo fileInfo3 = new FileInfo(2,url1,"imooc3.apk",0,0);
            fileList.add(fileInfo1);
            fileList.add(fileInfo2);
            fileList.add(fileInfo3);
            adapter = new FileListAdapter(this,fileList);
            listFile.setAdapter(adapter);
        }
    
        @Override
        protected void onDestroy() {
            super.onDestroy();
            unregisterReceiver(receiver);
        }
    }

    DownloadService

    public class DownloadService extends Service {
        private static final String TAG = "DownloanService";
        public static final String ACTION_START = "ACTION_START";
        public static final String ACTION_STOP = "ACTION_STOP";
        public static final String ACTION_UPDATE = "ACTION_UPDATE";
        public static final String ACTION_FINISHED = "ACTION_FINISHED";
        public static final String DOWNLOAD_PATH = Environment.getExternalStorageDirectory()
                .getAbsolutePath() + "/downloads/";
        public static final int MSG_INIT_THREAD = 0;
        //保存当前任务
        private Map<Integer, DownloadTask> map = new LinkedHashMap<>();
    
        private Handler handler = new Handler() {
            @Override
            public void handleMessage(Message msg) {
                super.handleMessage(msg);
                switch (msg.what) {
                    case MSG_INIT_THREAD:
                        FileInfo fileInfo = (FileInfo) msg.obj;
                        //将文件信息传给下载任务,主要是文件的url,文件大小
                        DownloadTask task = new DownloadTask(DownloadService.this, fileInfo, 3);
                        task.download();
                        map.put(fileInfo.id, task);//将当前任务保存到集合中
                        break;
                }
            }
        };
    
        @Override
        public int onStartCommand(Intent intent, int flags, int startId) {
            if (intent.getAction() != null) {
                if (ACTION_START.equals(intent.getAction())) {
                    //获取文件信息
                    FileInfo fileInfo = (FileInfo) intent.getSerializableExtra("fileinfo");
    //                new InitThread(fileInfo).start();
                    DownloadTask.sExcutorService.execute(new InitThread(fileInfo));//利用线程池启动任务
                } else if (ACTION_STOP.equals(intent.getAction())) {
                    Toast.makeText(this, "停止", Toast.LENGTH_SHORT).show();
                    FileInfo fileInfo = (FileInfo) intent.getSerializableExtra("fileinfo");
                    DownloadTask task = map.get(fileInfo.id);//根据id取出当前下载的任务
                    if (task != null) {
                        task.isPause = true;
                    }
                }
            }
            return super.onStartCommand(intent, flags, startId);
        }
    
        @Nullable
        @Override
        public IBinder onBind(Intent intent) {
            return null;
        }
    
        class InitThread extends Thread {
            private FileInfo fileInfo;
            private RandomAccessFile raf;
            private HttpURLConnection conn;
    
            public InitThread(FileInfo fileInfo) {
                this.fileInfo = fileInfo;
            }
    
            @Override
            public void run() {
                super.run();
                try {
                    URL url = new URL(fileInfo.url);//www.imooc.com/mobile/imooc.apk
                    conn = (HttpURLConnection) url.openConnection();//打开链接
                    conn.setConnectTimeout(3000);
                    conn.setReadTimeout(3000);
                    int len = -1;
                    if (conn.getResponseCode() == 200) {
                        len = conn.getContentLength();//获取服务器文件长度
                    }
                    if (len < 0) {
                        return;
                    }
                    File dir = new File(DOWNLOAD_PATH);
                    if (!dir.exists()) {
                        dir.mkdir();//目录不存在创建目录
                    }
                    File file = new File(dir, fileInfo.fileName);
                    //在指定路径下创建一个个服务器文件大小一样的文件
                    raf = new RandomAccessFile(file, "rwd");
                    raf.setLength(len);//设置临时文件的长度为服务器文件长度
                    fileInfo.length = len;//设置文件信息
                    handler.obtainMessage(MSG_INIT_THREAD, fileInfo).sendToTarget();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (conn != null)
                        conn.disconnect();
                    try {
                        if (raf != null)
                            raf.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    DownloadTask

    public class DownloadTask {
        private static final String TAG = "DownloadTask";
        private Context mContext;
        //要下载的文件信息,
        private FileInfo fileInfo;
        private ThreadDao dao;
        //是否暂停
        public boolean isPause = false;
        //完成的进度
        private int finished;
        //线程数量
        private int mThreadCount = 1;
        //静态线程池,方便管理线程
        public static ExecutorService sExcutorService = Executors.newCachedThreadPool();
    
        private List<DownloadThread> mThreadList;
    
    
        public DownloadTask(Context mContext, FileInfo fileInfo, int threadCount) {
            this.mContext = mContext;
            this.fileInfo = fileInfo;
            this.mThreadCount = threadCount;
            dao = new ThreadDaoImpl(mContext);
        }
    
        public void download() {
            //每次下载任务前,根据url查询线程信息
            List<ThreadInfo> threadInfos = dao.queryThread(fileInfo.url);
            ThreadInfo threadinfo;
            if (threadInfos.size() == 0) {
                //每个线程需要下载的长度
                int length = fileInfo.length / mThreadCount;
                for (int i = 0; i < mThreadCount; i++) {
                    //结束位置比开始位置多length,减一是为了下载位置补充和
                    threadinfo = new ThreadInfo(i, i * length, (i + 1) * length - 1, fileInfo.url, 0);
                    if (i == mThreadCount - 1) {
                        threadinfo.end = fileInfo.length;//防止除不尽的情况
                    }
                    threadInfos.add(threadinfo);//将线程信息保存到集合中
                    //第一次数据库中不存在信息,向数据库写入信息
                    dao.insertThread(threadinfo);
                }
            }
            mThreadList = new ArrayList<>();
            for (ThreadInfo info : threadInfos) {
                DownloadThread downloadthread = new DownloadThread(info);
    //            downloadthread.start();
                    sExcutorService.execute(downloadthread);
                mThreadList.add(downloadthread);
            }
        }
    
        /**
         * 判断下载任务里面的所有线程是否执行完毕
         */
        public synchronized void checkAllThreadFinished() {
            boolean allFinished = true;
            for (DownloadThread thread : mThreadList) {
                if (!thread.isFinished) {
                    allFinished = false;
                    break;
                }
            }
            if (allFinished) {
                //下载完成删除删除下载信息
                dao.deleteThread(fileInfo.url);
                Intent intent = new Intent(DownloadService.ACTION_FINISHED);
                intent.putExtra("fileinfo", fileInfo);
                Log.d(TAG, "checkAllThreadFinished: "+fileInfo);
                mContext.sendBroadcast(intent);
            }
        }
    
        class DownloadThread extends Thread {
            private ThreadInfo mThreadInfo;
            private RandomAccessFile raf;
            private HttpURLConnection conn;
            public boolean isFinished = false;
    
            public DownloadThread(ThreadInfo threadInfo) {
                this.mThreadInfo = threadInfo;
            }
    
            @Override
            public void run() {
                super.run();
                //设置下载位置
                try {
                    URL url = new URL(mThreadInfo.url);//下载链接
                    conn = (HttpURLConnection) url.openConnection();
                    conn.setReadTimeout(5000);
                    conn.setConnectTimeout(4000);
                    //当前下载位置等于起始位置加上已经下载的进度
                    int start = mThreadInfo.start + mThreadInfo.finished;
                    //下载的范围为起始位置到文件长度,因为是单线程下载
                    conn.setRequestProperty("Range", "bytes=" + start + "-" + mThreadInfo.end);
                    File file = new File(DownloadService.DOWNLOAD_PATH, fileInfo.fileName);
                    raf = new RandomAccessFile(file, "rwd");
                    raf.seek(start);//指定从某个位置起
                    Intent intent = new Intent(DownloadService.ACTION_UPDATE);
                    finished += mThreadInfo.finished;//更新完成的进度,必须要加上数据库中查询到的文件信息
                    //开始下载
                    Log.d(TAG, "getResponseCode: "+conn.getResponseCode());
                    if (conn.getResponseCode() == 206) {
                        //读取数据
                        int len = -1;
                        long time = System.currentTimeMillis();
                        InputStream stream = conn.getInputStream();
                        byte[] buffer = new byte[1024 << 2];//每次赌徒多少个字节
                        while ((len = stream.read(buffer)) != -1) {
                            //写入文件
                            raf.write(buffer, 0, len);
                            finished += len;//累加整个文件完成进度
                            mThreadInfo.finished += len;
                            if(System.currentTimeMillis() - time >1500) {
                                time = System.currentTimeMillis();
                                //通知Activity更新进度条
                                intent.putExtra("finished", finished * 100 / fileInfo.length);
                                intent.putExtra("id", fileInfo.id);
                                mContext.sendBroadcast(intent);
                            }
                            //下载暂停,保存进度到数据库
                            if (isPause) {
                                //将当前的信息保存到数据库
                                dao.updateThread(mThreadInfo.url, mThreadInfo.id + "", mThreadInfo.finished);
                                return;
                            }
                        }
                        //表示下载完成
                        isFinished = true;
                        Log.d(TAG, "run: 完成");
                        checkAllThreadFinished();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    {
                        conn.disconnect();
                        try {
                            raf.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    其他相关的类和之前的一篇单线程断点续传中的相同,这里就不在给出了。

    最后在给出一篇多线程断点续传的文章
    多线程多任务断点续传

    源码下载

    点此下载源码

    展开全文
  • java多线程批量更新数据库数据

    千次阅读 2019-08-01 15:24:07
    需求:更新机构表的字段,该字段的内容为包括当前机构的机构号和所有父机构的机构号以逗号拼接的形式保存到数据库 该表的数据量大约为10万条(数据库类型:postgresql) 10万条数据每1000条跑一次事务(机构表:...

    需求:更新机构表的字段,该字段的内容为包括当前机构的机构号和所有父机构的机构号以逗号拼接的形式保存到数据库

    该表的数据量大约为10万条(数据库类型:postgresql)

    10万条数据每1000条跑一次事务(机构表:organization,机构号:brh_code,父机构号:up_brh_code)

    1.最开始的用法

    public List<Record> findAllParentData(String orgCode){
        String sql = "WITH RECURSIVE r AS (  " +
                    " SELECT brh_code, up_brh_code FROM organization WHERE brh_code = ? " +
                    " UNION ALL " + 
                    " SELECT o.brh_code, o.up_brh_code FROM organization o,r " +
                    " WHERE o.brh_code = r.up_brh_code) " +
                    " SELECT r.brh_code FROM r ";
        return Db.find(sql, orgCode);
    }

    这个方法会查当前机构的所有的父级机构,然后遍历拼接完后再执行update

    更新10万条数据本地运行了5个多小时,生产环境运行了2个多小时,这样肯定是不行的,主要是这个循环的sql运行时间比较长

    2.采用多线程更新

    控制层代码(一个定时任务:每1000跑一个事务)

    public class UpdateOrgFamilyField implements Job {
        private final int THREAD_COUNT = 10;
        private final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
        private static Logger log = Logger.getLogger(UpdateOrgFamilyField.class);
    
        @Override
        publi void execute (JobExecutionContext jobExecutionContext ) {
            int startLimit = 0;
            int limitLength = 1000;
            boolean next = true;
            ExecutorService es = Executors.newFixedThreadPool(THREAD_COUNT);
            try {
                log.info("开始执行任务");
                while (next) {
                    //每次获取1000条数据
                    List<Record> list = OrganizationJobService.me.queryOrgTable(startLimit, limitLength);
                    //每list.size()跑一次事务
                    OrganizationJobService.me.updateOrgFamilyField(list, es);
                    startLimit += limitLength;
                    if (list.size() < limitLength) {
                        next = false;
                    }
                }
                log.info("任务执行完成");
                latch.await();
                es.shutdown();
            } catch (Exception e) {
                log.error("执行更新任务出错");
            }
    
        }
    
    }

    业务层代码

    public class OrganizationJobService {
    
        public static final OrganizationJobService me = Duang.duang(OrganizationJobService.class, Tx.class);
        private static final Logger log = Logger.getLogger(OrganizationJobService.class);
    
         /**
          *查询机构表,每次查询limitLength条数据
          */
        public list<Record> queryOrgTable(int startlimit, int limitLength) {
            return Db.find("select id, brh_code, up_brh_code, org_family " + 
                " from organization order by id asc limit ? offset ? ", limitLength, startlimit);
        }
    
    
        /**
          *更新机构表的org_family字段
          */
        public void updateOrgFamilyField(List<Record> list, ExecutorService es){
            int count = 0;
            for (final Record record : list) {
                es.submit(new Runnable() {
                    @Override
                    public void run () {
                        StringBuilder sb = new StringBuilder();
                        String brhCode = record.getStr("brh_code");//机构号
                        String upCode = record.getStr("up_brh_code");//父机构号
                        sb.append(brhCode).append(",").append(upCode);
                        //获取父级机构号的字符串
                        String codeStr = queryBrhCodeByUpCode(upCode, sb);
                        record.set("org_family", codeStr);
                        Db.update("organization", "id", record);
                    }
                }); 
                count++;
                log.info("更新了" + count + "条数据");
            }
            log.info("成功更新了" + list.size() + "条");
        }
    
    
        /**
          *轮询获取父机构的字符串
          */
        private String queryBrhCodeByUpCode (String brhCode, StringBuilder sb){
            String sql = "select up_brh_code from organization where brh_code = ?";
            Record first = Db.findFirst(sql, brhCode);
            if (first == null) {
                return sb.toString();
            }
            String code = first.getStr("up_brh_code");
            if (StringUtils.isNotEmpty(code)) {
                sb.append(",").append(code);
            }
            queryBrhCodeByUpCode(code, sb);
            return sb.toString();
        }
    }

     

    更新10万条数据生产环境仅仅用了20秒,效率还是非常明显的

    开启线程数量:https://www.cnblogs.com/warehouse/p/10810338.html

    展开全文
  • 最近面试被问到如何快速将百万级数据保存到数据库。当时回答不理想,事后总结如下。 主要思路: 1.首先将数据一次读入到内存(如果文件超过系统内存,会有问题。具体解决方案我没尝试); 2.开辟线程,每个...

    参考文章:Java-快速读取百万级数据文件,插入数据库

    最近面试被问到如何快速将百万级数据保存到数据库。当时回答不理想,事后总结如下。

    主要思路:

    1.首先将数据一次读入到内存(如果文件超过系统内存,会有问题。具体解决方案我没尝试);

    2.开辟多个线程,每个线程新建一个数据库连接;

    3.每个线程批量执行插入操作。必须要设置conn.setAutoCommit(false)。这个很重。

    具体实现步骤如下:

    1. 获取jdbc连接类

    package io;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    
    /**
     * @ClassName: JDBCConn
     * @Author: wanggj
     * @Date: 2020/5/15 9:15
     **/
    public class JDBCConn {
        private static String DRIVERCLASS = "com.mysql.cj.jdbc.Driver";
        private static String URL = "jdbc:mysql://localhost:3306/stu?characterEncoding=utf8&serverTimezone=UTC";
        private static String USERNAME = "root";        //数据库用户名
        private static String PASSWORD = "root";        //数据库密码
    
        public static Connection getConnection() throws ClassNotFoundException, SQLException {
            Class.forName(DRIVERCLASS);
            Connection conn = DriverManager.getConnection(URL, USERNAME, PASSWORD);
            return conn;
        }
    
    }

    2.文本数据封装为对象

    package io;
    
    import java.io.Serializable;
    
    /**
     * @ClassName: UserInfo
     * @Author: wanggj
     * @Date: 2020/5/15 9:26
     **/
    public class UserInfo implements Serializable {
        private String name;    //名字
        private int gender;    //性别
        private String address;    //大区
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public int getGender() {
            return gender;
        }
    
        public void setGender(int gender) {
            this.gender = gender;
        }
    
        public String getAddress() {
            return address;
        }
    
        public void setAddress(String address) {
            this.address = address;
        }
    }
    

    3.读取文件到数据库

    package io;
    
    /**
     * @ClassName: Tinput
     * @Author: wanggj
     * @Date: 2020/5/15 9:28
     **/
    
    
    import java.io.*;
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.stream.IntStream;
    
    public class MultiThreadIO {
        public void importData(String filePath) {
            long start = System.currentTimeMillis();
            int i = 0;
            List<UserInfo> LS = new ArrayList<>();
            try (// 当逐行读写大于2G的文本文件时推荐使用以下代码
                 BufferedInputStream bis = new BufferedInputStream(new FileInputStream(new File(filePath)));
                 BufferedReader in = new BufferedReader(new InputStreamReader(bis, "UTF-8"), 10 * 1024 * 1024)) {
                while (in.ready()) {
                    String line = in.readLine();
                    String[] arr = line.split(",");        //将读取的每一行以 , 号分割成数组
                    if (arr.length < 3) continue;            //arr数组长度大于3才是一条完整的数据
                    UserInfo userInfo = new UserInfo();
                    userInfo.setName(arr[0]);    //名称
                    userInfo.setAddress(arr[1]);    //地址
                    userInfo.setGender(Integer.parseInt(arr[2]));    //性别
                    LS.add(userInfo);            //把从文件中读取的数据存到内存里
                    i++;                            //记录读取的条数
                }
                System.out.println("\n总共读取Txt文件中" + i + "条数据");
                try {
                    List<Thread> threads = new ArrayList<>();
                    IntStream.rangeClosed(0, 28).forEach(t -> {
                        Thread thread = new Thread() {
                            @Override
                            public void run() {
                                try {
                                    final Connection con = JDBCConn.getConnection();
                                    System.out.println("sql连接成功");
                                    String sql = "INSERT INTO user_info(`name` , `address`, `gender`) VALUES (?, ?, ?)";
                                    con.setAutoCommit(false); //(重要)必须设置为批量提交,为true的话,插入一条commit1次,非常耗时。
                                    PreparedStatement ptatm = con.prepareStatement(sql);
                                    LS.stream().skip(t * 100000).limit(100000).forEach(userInfo -> {
                                        try {
                                            ptatm.setString(1, userInfo.getName());        //名称
                                            ptatm.setString(2, userInfo.getAddress());    //大区
                                            ptatm.setInt(3, userInfo.getGender());        //登录时间
                                            ptatm.addBatch();                            //批量记录到容器里
                                        } catch (SQLException e) {
                                            e.printStackTrace();
                                        }
                                    });
                                    System.out.println(Thread.currentThread().getName() + "执行批量插入数据 开始.");
                                    ptatm.executeBatch();
                                    con.commit();
                                    System.out.println(Thread.currentThread().getName() + "执行批量插入数据 结束.");
                                    con.close();
                                    ptatm.close();
                                } catch (SQLException e) {
                                    e.printStackTrace();
                                } catch (ClassNotFoundException e) {
                                    e.printStackTrace();
                                }
                            }
                        };
                        thread.start();
                        threads.add(thread);
                    });
    
                    //所有子线程执行完毕执行主线程
                    threads.forEach(thread -> {
                        try {
                            thread.join();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                    System.out.println("任务执行完成。执行用时:" + (System.currentTimeMillis() - start));
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            } catch (IOException ex) {
                ex.printStackTrace();
    
            }
        }
    }
    
    

    测试客户端

    package io;
    
    /**
     * @ClassName: IOMainTest
     * @Author: wanggj
     * @Date: 2020/5/15 9:07
     **/
    public class IOMainTest {
    
        public static void main(String[] args) {
            MultiThreadIO threadIO = new MultiThreadIO();
            String filePath = "C:\\Users\\wanggj\\user.txt";
            threadIO.importData(filePath);
        }
    
    
    }
    

    注意:线上环境时,需要使用线程池替换自己手动创建线程。另外本人能力有限,如果有误,还请指正。

    展开全文
  • 我这边有个项目,是使用客户端socekt 连接服务获取数据并保存到数据库,但是由于是一个客户端连接多个服务,目前只想到了多线程,但是在线程下一直无法保存数据会是什么原因导致的private JdbcTemplate jdbcTemplate...

    我这边有个项目,是使用客户端socekt 连接服务获取数据并保存到数据库,但是由于是一个客户端连接多个服务,目前只想到了多线程,但是在线程下一直无法保存数据会是什么原因导致的

    private JdbcTemplate jdbcTemplate = null;

    private InputStream in;

    private Socket socket = null;

    private String ip = null;

    private int port = 0;

    private ProductionDataRepository repository;

    private JSONObject json = null;

    private ProductionData SelectProdata = null;

    private int connect_Info_id = 0;

    private int a = 1;

    private int b = 2;

    public SaveDataThread(String ip,int port,ProductionDataRepository repository,ProductionData SelectProdata,int connect_Info_id,JdbcTemplate jdbcTemplate) throws UnknownHostException, IOException {

    this.port = port;

    this.ip = ip;

    this.repository = repository;

    this.SelectProdata = SelectProdata;

    this.connect_Info_id = connect_Info_id;

    this.jdbcTemplate = jdbcTemplate;

    socket = new Socket(ip, port);

    }

    @Transactional(rollbackOn=Exception.class)

    public synchronized void run() {

    System.out.println("线程-" + Thread.currentThread().getId());

    System.out.println(ip);

    System.out.println(port);

    ProductionData Prodata = null;

    boolean connectStatus = true;

    while(connectStatus) {

    try {

    if(socket.getInputStream() == null) {

    continue;

    }

    in = socket.getInputStream();

    } catch (IOException e2) {

    // TODO Auto-generated catch block

    e2.printStackTrace();

    }

    try {

    socket.sendUrgentData(0xFF);

    } catch (IOException e1) {

    // TODO Auto-generated catch block

    e1.printStackTrace();

    socket = null;

    connectStatus = false;

    }

    byte[] data=new byte[1024];

    int len = 0;

    try {

    len = in.read(data);

    } catch (IOException e) {

    System.out.println("ProductionDataServices的save方法中的in.read(data)调用异常,异常原因:"+e.getMessage());

    e.printStackTrace();

    }

    System.out.println(new String(data,0,len));

    json = JSONObject.fromObject(new String(data,0,len));

    /*json = getPorductionData(as.get("ip").toString(),Integer.parseInt(as.get("port").toString()));*/

    if(json == null) {

    System.out.println("json对象为 null,未获取到数据!");

    continue;

    }

    //查询生产数据id 等于 production_data_id的数据

    //获取

    //如果当天存在数据则对数据进行修改

    String state = null;

    if(SelectProdata != null) {

    // 0 暂停 1 运行 2 未连接

    if(json.getInt("State") == 0) {

    state = "暂停";

    } else if(json.getInt("State") == 1) {

    state = "运行";

    }

    //判断是否为空,防止未连接时存在对象但是开始时间为空

    if(SelectProdata.getStartTime() == null) {

    repository.updateStart_time(new Timestamp(System.currentTimeMillis()),SelectProdata.getId());

    }

    System.out.println();

    int OK = SelectProdata.getOk()+json.getInt("OK");

    System.out.println("对象ng: "+SelectProdata.getOk());

    System.out.println("json对象NG: "+json.getInt("OK"));

    System.out.println(""+SelectProdata.getOk()+"+"+json.getInt("OK")+"="+(SelectProdata.getOk()+json.getInt("OK")));

    System.out.println("int总和ng: "+OK);

    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式

    String sql = "update ss_production_data pd set "

    + "pd.ok = "+(SelectProdata.getOk()+json.getInt("OK"))+","

    + "pd.ng = "+(SelectProdata.getNg()+json.getInt("NG"))+","

    + "pd.total = "+(SelectProdata.getOk()+json.getInt("OK")+SelectProdata.getNg()+json.getInt("NG"))+","

    + "pd.state = '"+state+"',"

    + "pd.owe_hole = "+(SelectProdata.getOweHole()+json.getInt("Hole"))+","

    + "pd.long_hair = "+(SelectProdata.getLongHair()+json.getInt("LongHair"))+","

    + "pd.Long_short_hair="+(SelectProdata.getLongShortHair()+json.getInt("UnevenHair"))+","

    + "pd.loose_wool="+(SelectProdata.getLooseWool()+json.getInt("LooseHair"))+","

    + "pd.implantation_error ="+(SelectProdata.getImplantationError()+json.getInt("WrongHair"))+","

    + "pd.difference_hair = "+(SelectProdata.getDifferenceHair()+json.getInt("CrushHair"))+","

    + "pd.dirty= "+(SelectProdata.getDirty()+json.getInt("Dirt"))+","

    + "pd.end_time = '"+df.format(new Date())+"'"

    + "where pd.id = "+SelectProdata.getId()+"";

    save(sql);

    continue;

    }

    Prodata = new ProductionData();

    //否则因为不存在id修改将变成添加

    Prodata.setName(connect_Info_id+"号机器");//机器名称

    Prodata.setConnectInfoId(connect_Info_id);//绑定连接信息id

    Prodata.setOk(json.getInt("OK"));//合格

    Prodata.setNg(json.getInt("NG"));//不合格

    Prodata.setTotal((json.getInt("OK")+json.getInt("NG")));//总数

    // 0 暂停 1 运行 2 未连接

    if(json.getInt("State") == 0) {

    Prodata.setState("暂停");

    } else if(json.getInt("State") == 1) {

    Prodata.setState("运行");

    }

    Prodata.setOweHole(json.getInt("Hole"));//欠孔

    Prodata.setLongHair(json.getInt("LongHair"));//长毛

    Prodata.setLongShortHair(json.getInt("UnevenHair"));//长短毛

    Prodata.setLooseWool(json.getInt("LooseHair"));//散毛

    Prodata.setImplantationError(json.getInt("WrongHair"));//植错毛

    Prodata.setDifferenceHair(json.getInt("CrushHair"));//差毛

    Prodata.setDirty(json.getInt("Dirt"));//脏毛

    Prodata.setStartTime(new Timestamp(System.currentTimeMillis()));

    Prodata.setStartTime(new Timestamp(System.currentTimeMillis()));

    Prodata.setCreateTime(new Timestamp(System.currentTimeMillis()));

    repository.save(Prodata);

    }

    }

    public void save(String sql) {

    String driver = "com.mysql.cj.jdbc.Driver";

    String url = "jdbc:mysql://127.0.0.1:3306/hair_planting_machine?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true";

    String username = "root";

    String password = "123456";

    Connection conn = null;

    try {

    Class.forName(driver); //classLoader,加载对应驱动

    conn = (Connection) DriverManager.getConnection(url, username, password);

    } catch (ClassNotFoundException e) {

    e.printStackTrace();

    } catch (SQLException e) {

    e.printStackTrace();

    }

    int i = 0;

    PreparedStatement pstmt;

    try {

    pstmt = (PreparedStatement) conn.prepareStatement(sql);

    i = pstmt.executeUpdate();

    System.out.println("resutl: " + i);

    pstmt.close();

    conn.close();

    } catch (SQLException e) {

    e.printStackTrace();

    }

    }

    展开全文
  • 海量文本数据保存到数据库思路

    千次阅读 2016-03-21 15:21:11
    由于数据量比较大,故使用多线程处理文本中的数据。整个项目的实现大致是这样: 通过前端网页上传文本文件;后台处理网页上传的文本文件,将相应的数据存储到数据库中;后台数据全部处理完成后,将整个后台处理...
  • 多线程读取数据库

    2009-06-26 16:15:51
    到多线程插入数据 [b]问题补充:[/b] 谢谢两位的回答,还有几个疑问 to ranLoD: "一个标识对象,当入库的线程读取标识对象的时候线程结束" 一个标识对象是什么意思,怎么样读取标识对象,能否写出几句简单的...
  • 目前有两个后台线程A和B,A每秒获取一下蓝牙数据,将其保存到数据库中;B定时循环执行遍历数据库,对没有处理的数据进行处理,并更新记录的标志位说明已经处理过。项目采用了CoreData,但每当执行4到5分钟后会报异常...
  • 2、偶现读取不到数据,但拉数据库里面却有数据;根据业务场景分析,确实存在并发读写的情况,由于我们使用的是单例模式,所以问题1在不进行多线程互斥访问的情况下,确实是存在这个问题,所以想到的思路是将所有读写...
  • 知识点第一章IO流编程字符和字节数据源头File类File类常用方法InputStreamFileInputStreamOutputStreamFileOutputStream流的套接DataInputStream/DataOutputStreamBufferedInputStream/...
  • 需求:需要解析txt文件中数据,保存数据到数据库中。(数据量较大:文件700M左右,数据有500万条左右) 简单写个工具类,jdbc连接数据库执行批处理: 实体类:注意时间类型 以下来之网页...
  • 前提提要: 最近有一个需求,需要做一个文件导入功能,像一般的文件上传都挺简单的...我需要将这些数据读取,并且对其中的一些字段做数据处理,经过测试发现,我对数据一边保存一边解析,需要注意的是:我这里的解...
  • 以下是完整代码,且使用的是多线程的方式。此处提到的多线程的方法可以参考Python黑魔法,一行实现并行化这篇文章,讲的很好。准备工作import tushare as tsfrom sqlalchemy import create_engine...
  •  --dbfile 存放结果数据到指定的数据库(sqlite)文件中  --key 页面内的关键词,获取满足该关键词的网页,可选参数,默认为所有页面  -l 日志记录文件记录详细程度,数字越大记录越详细,可选参数,默认spider....
  • 爬虫库 使用简单的requests库,这是一个阻塞的库,速度比较慢。...使用Python ORM sqlalchemy保存到数据库,也可以使用自带的csv模块存在CSV中。 API接口 因为API接口存在数据保护情况,一个电影的每...
  • 在某些爬虫项目下,我们需要每次执行一条插入语句,就立即调用commit方法更新数据库,...这里就可以使用Twisted中提供的以异步方式多线程访问数据库的模块adbapi,可以显著提供程序访问数据库的效率。 adbapi.Con...
  • 环境: ...后台保存附件信息到数据库的时候,总是报主键唯一性约束错误 分析: 后台使用到了一个uploadEntity对象,该对象被配置成了一个bean,但是没有配置其scope属性,所以默认是单例的,那么...
  • 六、开启多线程爬取 七、小结 很多人学习python,不知道从何学起。 很多人学习python,掌握了基本语法过后,不知道在哪里寻找案例上手。 很多已经做案例的人,却不知道如何去学习更加高深的知识。 那么针对这三类...
  • 昨天在做项目时,做了一个接口,接收数据保存到数据库中,接收到的数据也会向memchched中保存,所以数据实际是以key和value的形式来传属的,而数据库中保存这些数据的时候,也会去判断是否有相同的key和存在,如果...
  • java多线程

    2017-09-23 15:10:20
    项目中有个需求是要将查询出的1000万+的数据做一些处理后,再保存到数据库中,业务很简单,为了提高效率采用了多线程处理。 基本思路: 1、将查询出的数据分页 2、开启一个线程池循环处理每一页的数据
  • 多线程案例

    2016-11-26 15:42:28
    /*** 控制并发,属于一种较常见的限流手段,通过信号量机制(如Java...还需要存储到数据库中,而数据库的连接数只有10个,这时* 我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 398
精华内容 159
关键字:

多线程保存数据到数据库