下面代码中的返回码是206,不是200.因为是断点,所以返回码不一样。
-
多线程下载技术
2016-03-13 14:06:00 -
多线程下载技术
2010-01-19 20:26:00要用8个下载线程。 第一步:将namelock.avi文件分成8个子模块。这里要注意的地方是我所说的分成8个字模块,并不是把文件的内容分别存放到8个不同的缓冲区里。而是生成8个不同的文件偏移量。很多时候程序员为了...现在有个namelock.avi文件需要下载。文件的大小为:364544字节。要用8个下载线程。 第一步:将namelock.avi文件分成8个子模块。这里要注意的地方是我所说的分成8个字模块,并不是把文件的内容分别存放到8个不同的缓冲区里。而是生成8个不同的文件偏移量。很多时候程序员为了偷懒往往容易一次性将文件读入内存,这样带来的后果是不堪设想的。一个比较理想的方法是这样的。
bool DealFile(string fileName)
{
FILE *file;
DWORD fileSize ,pos;
int readLen ;
//MAX_BUFFER_LEN 在头文件里定义,这里能够保证数据不丢失,也不至于内存逸出
char *buffer = new char[MAX_BUFFER_LEN];
file = fopen(fileName.c_str(),"r+b");
if(file == NULL) return false;
fseek(file,0,2);
fileSize = ftell(file); //取得文件的大小
fseek(file,0,0);
do{
readLen = fread(buffer,sizeof(char),MAX_BUFFER_LEN,file);
if(readLen > 0)
{
pos += readLen;
//对读取的文件做处理
}
}while(pos < fileSize); //循环读取文件
delete[] buffer;
fclose(file); //释放资源
return true;
}
8个线程下载文件时,都要对内容文件和配置文件进行读写。这样如果没有处理好,很有可能会造成访问文件失败,我定义了一个全局变量FileLocked,如果FileLocked=true说明文件正在被某个线程访问。所以使用Sleep(10)睡眠等待。当某个线程进入读写文件时必须设置FileLocked = true;访问文件完毕必须将FileLocked = false;这样就能很好的控制各个线程对文件的访问了。(对临界资源的访问有API提供了很多很好的解决方法,请查阅)。
8个下载线程同时下载文件时,完成部分下载是随机的。那么怎么样把随机的文件数据按照偏移量正确的写入文件呢?我是这样实现的,当要下载文件namelock.avi时,首先查找文件namelock.avi.san配置文件是否存在。如果存在,说明上次已经下载过部分该文件,就可以断点续传了。如果没有找到该文件,那么生成和该文件的大小一样大的文件,文件里所有的数据都为0,(可以使用函数memset(buffer,10000,''0''))和一个配置文件。然后利用fseek函数将数据正确的覆盖原先的0;接下来要介绍一写配置文件的格式了。很简单,配置文件的内容主要包括:文件在本地保存的绝对路径、文件的大小、线程的个数、已经下载的文件大小,各个线程的任务(在原始文件起始位置和结束位置,中间使用''-''分开);如:D:mmnamelock.avi //文件保存在这里
364544 //文件大小
5 //有5个线程在下载
0 //已经下载了0字节
0-72908 //线程1的下载任务
72908-145816 //线程2的下载任务
145816-218724 //线程3的下载任务
218724-291632 //线程4的下载任务
291632-364544 //线程5的下载任务
以上是开始下载时的各个线程的任务分配。
D:mmnamelock.avi
364544
5
113868
72908-72908
113868-145816
145816-218724
218724-291632
291632-364544以上是某一时刻各个线程的任务分配情况。
各个线程任务分配是这样实现的。在开始下载时,文件平均分成若干块进行下载。如第一个线程一开始的任务是从文件的0位置开始下载一直到72908位置处。线程1每次下载一块数据后就要调整任务,如第一次下载了20800字节的数据,那么线程1的任务将改为:20800-72908。如此下去,直到任务为72908-72908时表示线程1完成了当前的下载任务。此时,线程1就分析各个线程的任务,找出任务最为繁忙的一个线程:如线程3:14816-218724。那么线程1就自动去调整任务,拿50%的任务来再次下载。周而复始直到各个线程都完成任务。不过这里有一点需要注意:为了避免重复下载部分数据,在调整任务的时候,起始的文件便移量必须加上接受缓冲器的字节数,因为如前面所举的列子来看。线程1和线程3在平衡负载的时候,线程正在下载数据,如果所剩的数据比接受缓冲器的大小还小,线程1和线程3的部分下载数据将会重复。
在调整任务和分析任务的时候,会发现一个问题。就是读取文件数据太过频繁。于是我用了一个数据结构。在下载文件的过程中始终打开配置文件,这样速度提高了很多。在文件下载完毕后关闭文件。数据结构如下:typedef struct FromToImpl{
DWORD from; //任务起始位置
DWORD to; //任务结束位置
}m_fromTo;
typedef struct InfroImpl{
String fileLoad; //文件保存位置
DWORD fileSize; //文件大小
int threadCnt; //下载线程数
DWORD alreadyDownloadCnt; //已经下载的文件大小
FromToImpl *fromToImpl; //各个线程的任务描述
}m_inforImpl; -
Java多线程下载技术实现
2019-01-03 13:49:37多线程下载技术,简单的说就是把要下载的文件分成几块,由不同的线程来负责每一块数据的下载任务。 技术要点 RandomAccessFile: Java中用来实现随机访问文件的类 http Range请求头 具体思路 1、文件分块。 文件分块...多线程下载
多线程下载技术,简单的说就是把要下载的文件分成几块,由不同的线程来负责每一块数据的下载任务。技术要点
RandomAccessFile:
Java中用来实现随机访问文件的类
http Range请求头
具体思路
1、文件分块。 文件分块大小(blockSize)= (文件大小 +线程数 - 1 )/ 线程数 ;
2、确定每一个线程所要下载的 文件的起始和结束位置。
现假设为每个线程分别编号:0,1, 2,3;则
第一个线程负责的下载位置是: 0*blockSize - (0+1)blockSize -1,
第二个线程负责的下载位置是: 1blockSize - (1+1)blockSize -1,
以此类推第i个线程负责的下载位置是:iblockSize - (i+1)blockSize -1;
即线程(编号为id)下载开始位置 start = idblock;
即线程(编号为id)下载结束位置 end = (id+1)*block -1;
3、设置http 请求头, conn.setRequestProperty(“Range”, “bytes=” + start + “-” + end);代码实现
一个简单的Java多线程下载代码如下:package com.ricky.java.test.download; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.HttpURLConnection; import java.net.URL; public class Downloader { private URL url; // 目标地址 private File file; // 本地文件 private static final int THREAD_AMOUNT = 8; // 线程数 private static final String DOWNLOAD_DIR_PATH = "D:/Download"; // 下载目录 private int threadLen; // 每个线程下载多少 public Downloader(String address, String filename) throws IOException { // 通过构造函数传入下载地址 url = new URL(address); File dir = new File(DOWNLOAD_DIR_PATH); if(!dir.exists()){ dir.mkdirs(); } file = new File(dir, filename); } public void download() throws IOException { HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setConnectTimeout(5000); int totalLen = conn.getContentLength(); // 获取文件长度 threadLen = (totalLen + THREAD_AMOUNT - 1) / THREAD_AMOUNT; // 计算每个线程要下载的长度 System.out.println("totalLen="+totalLen+",threadLen:"+threadLen); RandomAccessFile raf = new RandomAccessFile(file, "rws"); // 在本地创建一个和服务端大小相同的文件 raf.setLength(totalLen); // 设置文件的大小 raf.close(); for (int i = 0; i < THREAD_AMOUNT; i++) // 开启3条线程, 每个线程下载一部分数据到本地文件中 new DownloadThread(i).start(); } private class DownloadThread extends Thread { private int id; public DownloadThread(int id) { this.id = id; } public void run() { int start = id * threadLen; // 起始位置 int end = id * threadLen + threadLen - 1; // 结束位置 System.out.println("线程" + id + ": " + start + "-" + end); try { HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setConnectTimeout(5000); conn.setRequestProperty("Range", "bytes=" + start + "-" + end); // 设置当前线程下载的范围 InputStream in = conn.getInputStream(); RandomAccessFile raf = new RandomAccessFile(file, "rws"); raf.seek(start); // 设置保存数据的位置 byte[] buffer = new byte[1024]; int len; while ((len = in.read(buffer)) != -1) raf.write(buffer, 0, len); raf.close(); System.out.println("线程" + id + "下载完毕"); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { String address = "http://dldir1.qq.com/qqfile/qq/QQ7.9/16621/QQ7.9.exe"; new Downloader(address, "QQ7.9.exe").download(); // String address = "http://api.t.dianping.com/n/api.xml?cityId=2"; // new Downloader(address, "2.xml").download(); } }
封装多线程下载
文件下载是一个常用的模块,我们可以对其封装一下,方便以后调用。涉及到的开发技术如下:JDK 1.7
Eclipse Juno
Maven 3
HttpClient 4.3.6
工程目录结构如下所示:com.ricky.common.java.download.FileDownloader package com.ricky.common.java.download; import org.apache.log4j.Logger; import com.ricky.common.java.download.config.FileDownloaderConfiguration; /** * Java 文件多线程下载 * @author Ricky Fung * */ public class FileDownloader { protected Logger mLogger = Logger.getLogger("devLog"); private volatile static FileDownloader fileDownloader; private FileDownloaderEngine downloaderEngine; private FileDownloaderConfiguration configuration; public static FileDownloader getInstance(){ if(fileDownloader==null){ synchronized (FileDownloader.class) { if(fileDownloader==null){ fileDownloader = new FileDownloader(); } } } return fileDownloader; } protected FileDownloader(){ } public synchronized void init(FileDownloaderConfiguration configuration){ if (configuration == null) { throw new IllegalArgumentException("FileDownloader configuration can not be initialized with null"); } if (this.configuration == null) { mLogger.info("init FileDownloader"); downloaderEngine = new FileDownloaderEngine(configuration); this.configuration = configuration; }else{ mLogger.warn("Try to initialize FileDownloader which had already been initialized before."); } } public boolean download(String url, String filename){ return downloaderEngine.download(url, filename); } public boolean isInited() { return configuration != null; } public void destroy() { if(downloaderEngine!=null){ downloaderEngine.close(); downloaderEngine = null; } } } com.ricky.common.java.download.config.FileDownloaderConfiguration package com.ricky.common.java.download.config; import java.io.File; public class FileDownloaderConfiguration { private final int connectTimeout; private final int socketTimeout; private final int maxRetryCount; private final int coreThreadNum; private final long requestBytesSize; private final File downloadDestinationDir; private FileDownloaderConfiguration(Builder builder) { this.connectTimeout = builder.connectTimeout; this.socketTimeout = builder.socketTimeout; this.maxRetryCount = builder.maxRetryCount; this.coreThreadNum = builder.coreThreadNum; this.requestBytesSize = builder.requestBytesSize; this.downloadDestinationDir = builder.downloadDestinationDir; } public int getConnectTimeout() { return connectTimeout; } public int getSocketTimeout() { return socketTimeout; } public int getMaxRetryCount() { return maxRetryCount; } public int getCoreThreadNum() { return coreThreadNum; } public long getRequestBytesSize() { return requestBytesSize; } public File getDownloadDestinationDir() { return downloadDestinationDir; } public static FileDownloaderConfiguration.Builder custom() { return new Builder(); } public static class Builder { private int connectTimeout; private int socketTimeout; private int maxRetryCount; private int coreThreadNum; private long requestBytesSize; private File downloadDestinationDir; public Builder connectTimeout(int connectTimeout) { this.connectTimeout = connectTimeout; return this; } public Builder socketTimeout(int socketTimeout) { this.socketTimeout = socketTimeout; return this; } public Builder coreThreadNum(int coreThreadNum) { this.coreThreadNum = coreThreadNum; return this; } public Builder maxRetryCount(int maxRetryCount) { this.maxRetryCount = maxRetryCount; return this; } public Builder requestBytesSize(long requestBytesSize) { this.requestBytesSize = requestBytesSize; return this; } public Builder downloadDestinationDir(File downloadDestinationDir) { this.downloadDestinationDir = downloadDestinationDir; return this; } public FileDownloaderConfiguration build() { initDefaultValue(this); return new FileDownloaderConfiguration(this); } private void initDefaultValue(Builder builder) { if(builder.connectTimeout<1){ builder.connectTimeout = 6*1000; } if(builder.socketTimeout<1){ builder.socketTimeout = 6*1000; } if(builder.maxRetryCount<1){ builder.maxRetryCount = 1; } if(builder.coreThreadNum<1){ builder.coreThreadNum = 3; } if(builder.requestBytesSize<1){ builder.requestBytesSize = 1024*128; } if(builder.downloadDestinationDir==null){ builder.downloadDestinationDir = new File("./"); } } } } com.ricky.common.java.download.FileDownloaderEngine package com.ricky.common.java.download; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.util.BitSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.log4j.Logger; import com.ricky.common.java.download.config.FileDownloaderConfiguration; import com.ricky.common.java.download.job.DownloadWorker; import com.ricky.common.java.download.job.Worker.DownloadListener; public class FileDownloaderEngine { protected Logger mLogger = Logger.getLogger("devLog"); private FileDownloaderConfiguration configuration; private ExecutorService pool; private HttpRequestImpl httpRequestImpl; private File downloadDestinationDir; private int coreThreadNum; public FileDownloaderEngine(FileDownloaderConfiguration configuration){ this.configuration = configuration; this.coreThreadNum = configuration.getCoreThreadNum(); this.httpRequestImpl = new HttpRequestImpl(this.configuration); this.pool = Executors.newFixedThreadPool(this.configuration.getCoreThreadNum()); this.downloadDestinationDir = this.configuration.getDownloadDestinationDir(); if(!this.downloadDestinationDir.exists()){ this.downloadDestinationDir.mkdirs(); } } public boolean download(String url, String filename){ long start_time = System.currentTimeMillis(); mLogger.info("开始下载,url:"+url+",filename:"+filename); long total_file_len = httpRequestImpl.getFileSize(url); // 获取文件长度 if(total_file_len<1){ mLogger.warn("获取文件大小失败,url:"+url+",filename:"+filename); return false; } final BitSet downloadIndicatorBitSet = new BitSet(coreThreadNum); //标记每个线程下载是否成功 File file = null; try { file = new File(downloadDestinationDir, filename); RandomAccessFile raf = new RandomAccessFile(file, "rws"); // 在本地创建一个和服务端大小相同的文件 raf.setLength(total_file_len); // 设置文件的大小 raf.close(); mLogger.info("create new file:"+file); } catch (FileNotFoundException e) { mLogger.error("create new file error", e); } catch (IOException e) { mLogger.error("create new file error", e); } if(file==null || !file.exists()){ mLogger.warn("创建文件失败,url:"+url+",filename:"+filename); return false; } long thread_download_len = (total_file_len + coreThreadNum - 1) / coreThreadNum; // 计算每个线程要下载的长度 mLogger.info("filename:"+filename+",total_file_len="+total_file_len+",coreThreadNum:"+coreThreadNum+",thread_download_len:"+thread_download_len); CountDownLatch latch = new CountDownLatch(coreThreadNum);//两个工人的协作 for (int i = 0; i < coreThreadNum; i++){ DownloadWorker worker = new DownloadWorker(i, url, thread_download_len, file, httpRequestImpl, latch); worker.addListener(new DownloadListener() { @Override public void notify(int thread_id, String url, long start, long end, boolean result, String msg) { mLogger.info("thread_id:"+thread_id+" download result:"+result+",url->"+url); modifyState(downloadIndicatorBitSet, thread_id); } }); pool.execute(worker); } try { latch.await(); } catch (InterruptedException e) { mLogger.error("CountDownLatch Interrupt", e); } mLogger.info("下载结束,url:"+url+",耗时:"+((System.currentTimeMillis()-start_time)/1000)+"(s)"); return downloadIndicatorBitSet.cardinality()==coreThreadNum; } private synchronized void modifyState(BitSet bitSet, int index){ bitSet.set(index); } /**释放资源*/ public void close(){ if(httpRequestImpl!=null){ httpRequestImpl.close(); httpRequestImpl = null; } if(pool!=null){ pool.shutdown(); pool = null; } } } com.ricky.common.java.download.job.DownloadWorker package com.ricky.common.java.download.job; import java.io.File; import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; import com.ricky.common.java.download.HttpRequestImpl; import com.ricky.common.java.download.RetryFailedException; public class DownloadWorker extends Worker { protected Logger mLogger = Logger.getLogger("devLog"); private int id; private String url; private File file; private long thread_download_len; private CountDownLatch latch; private HttpRequestImpl httpRequestImpl; public DownloadWorker(int id, String url, long thread_download_len, File file, HttpRequestImpl httpRequestImpl, CountDownLatch latch) { this.id = id; this.url = url; this.thread_download_len = thread_download_len; this.file = file; this.httpRequestImpl = httpRequestImpl; this.latch = latch; } @Override public void run() { long start = id * thread_download_len; // 起始位置 long end = id * thread_download_len + thread_download_len - 1; // 结束位置 mLogger.info("线程:" + id +" 开始下载 url:"+url+ ",range:" + start + "-" + end); boolean result = false; try { httpRequestImpl.downloadPartFile(id, url, file, start, end); result = true; mLogger.info("线程:" + id + " 下载 "+url+ " range[" + start + "-" + end+"] 成功"); } catch (RetryFailedException e) { mLogger.error("线程:" + id +" 重试出错", e); }catch (Exception e) { mLogger.error("线程:" + id +" 下载出错", e); } if(listener!=null){ mLogger.info("notify FileDownloaderEngine download result"); listener.notify(id, url, start, end, result, ""); } latch.countDown(); } } com.ricky.common.java.download.job.Worker package com.ricky.common.java.download.job; public abstract class Worker implements Runnable { protected DownloadListener listener; public void addListener(DownloadListener listener){ this.listener = listener; } public interface DownloadListener{ public void notify(int thread_id, String url, long start, long end, boolean result, String msg); } } com.ricky.common.java.download.HttpRequestImpl package com.ricky.common.java.download; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import org.apache.commons.io.IOUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.log4j.Logger; import com.ricky.common.java.download.config.FileDownloaderConfiguration; import com.ricky.common.java.http.HttpClientManager; public class HttpRequestImpl { protected Logger mLogger = Logger.getLogger("devLog"); private int connectTimeout; private int socketTimeout; private int maxRetryCount; private long requestBytesSize; private CloseableHttpClient httpclient = HttpClientManager.getHttpClient(); public HttpRequestImpl(FileDownloaderConfiguration configuration){ connectTimeout = configuration.getConnectTimeout(); socketTimeout = configuration.getSocketTimeout(); maxRetryCount = configuration.getMaxRetryCount(); requestBytesSize = configuration.getRequestBytesSize(); } public void downloadPartFile(int id, String url, File file, long start, long end){ RandomAccessFile raf = null; try { raf = new RandomAccessFile(file, "rws"); } catch (FileNotFoundException e) { mLogger.error("file not found:"+file, e); throw new IllegalArgumentException(e); } int retry = 0; long pos = start; while(pos<end){ long end_index = pos + requestBytesSize; if(end_index>end){ end_index = end; } boolean success = false; try { success = requestByRange(url, raf, pos, end_index); } catch (ClientProtocolException e) { mLogger.error("download error,start:"+pos+",end:"+end_index, e); }catch (IOException e) { mLogger.error("download error,start:"+pos+",end:"+end_index, e); }catch (Exception e) { mLogger.error("download error,start:"+pos+",end:"+end_index, e); } // mLogger.info("线程:" + id +",download url:"+url+",range:"+ pos + "-" + end_index+",success="+success ); if(success){ pos += requestBytesSize; retry = 0; }else{ if(retry < maxRetryCount){ retry++; mLogger.warn("线程:" + id +",url:"+url+",range:"+pos+","+end_index+" 下载失败,重试"+retry+"次"); }else{ mLogger.warn("线程:" + id +",url:"+url+",range:"+pos+","+end_index+" 下载失败,放弃重试!"); throw new RetryFailedException("超过最大重试次数"); } } } } private boolean requestByRange(String url, RandomAccessFile raf, long start, long end) throws ClientProtocolException, IOException { HttpGet httpget = new HttpGet(url); httpget.setHeader("User-Agent", "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.152 Safari/537.36"); httpget.setHeader("Range", "bytes=" + start + "-" + end); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(connectTimeout) .setSocketTimeout(socketTimeout) .build(); httpget.setConfig(requestConfig); CloseableHttpResponse response = null; try { response = httpclient.execute(httpget); int code = response.getStatusLine().getStatusCode(); if(code==HttpStatus.SC_OK || code== HttpStatus.SC_PARTIAL_CONTENT){ HttpEntity entity = response.getEntity(); if (entity != null) { InputStream in = entity.getContent(); raf.seek(start);// 设置保存数据的位置 byte[] buffer = new byte[1024]; int len; while ((len = in.read(buffer)) != -1){ raf.write(buffer, 0, len); } return true; }else{ mLogger.warn("response entity is null,url:"+url); } }else{ mLogger.warn("response error, code="+code+",url:"+url); } }finally { IOUtils.closeQuietly(response); } return false; } public long getFileSize(String url){ int retry = 0; long filesize = 0; while(retry<maxRetryCount){ try { filesize = getContentLength(url); } catch (Exception e) { mLogger.error("get File Size error", e); } if(filesize>0){ break; }else{ retry++; mLogger.warn("get File Size failed,retry:"+retry); } } return filesize; } private long getContentLength(String url) throws ClientProtocolException, IOException{ HttpGet httpget = new HttpGet(url); httpget.setHeader("User-Agent", "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.152 Safari/537.36"); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(connectTimeout) .setSocketTimeout(socketTimeout) .build(); httpget.setConfig(requestConfig); CloseableHttpResponse response = null; try { response = httpclient.execute(httpget); int code = response.getStatusLine().getStatusCode(); if(code==HttpStatus.SC_OK){ HttpEntity entity = response.getEntity(); if (entity != null) { return entity.getContentLength(); } }else{ mLogger.warn("response code="+code); } }finally { IOUtils.closeQuietly(response); } return -1; } public void close(){ if(httpclient!=null){ try { httpclient.close(); } catch (IOException e) { e.printStackTrace(); } httpclient = null; } } }
最后是客户端调用代码
package com.ricky.common.java; import java.io.File; import com.ricky.common.java.download.FileDownloader; import com.ricky.common.java.download.config.FileDownloaderConfiguration; public class FileDownloaderTest { public static void main(String[] args) { FileDownloader fileDownloader = FileDownloader.getInstance(); FileDownloaderConfiguration configuration = FileDownloaderConfiguration .custom() .coreThreadNum(5) .downloadDestinationDir(new File("D:/Download")) .build(); fileDownloader.init(configuration); String url = "http://dldir1.qq.com/qqfile/qq/QQ7.9/16621/QQ7.9.exe";; String filename = "QQ7.9.exe"; boolean result = fileDownloader.download(url, filename); System.out.println("download result:"+result); fileDownloader.destroy(); //close it when you not need } }
-
Java 多线程下载技术实现
2016-01-07 16:54:47多线程下载多线程下载技术,简单的说就是把要下载的文件分成几块,由不同的线程来负责每一块数据的下载任务。技术要点 RandomAccessFile: Java中用来实现随机访问文件的类 http Range请求头 具体思路1、文件分块。...多线程下载
多线程下载技术,简单的说就是把要下载的文件分成几块,由不同的线程来负责每一块数据的下载任务。
技术要点
- RandomAccessFile:
Java中用来实现随机访问文件的类 - http Range请求头
具体思路
1、文件分块。 文件分块大小(blockSize)= (文件大小 +线程数 - 1 )/ 线程数 ;
2、确定每一个线程所要下载的 文件的起始和结束位置。
现假设为每个线程分别编号:0,1, 2,3;则
第一个线程负责的下载位置是: 0*blockSize - (0+1)*blockSize -1,
第二个线程负责的下载位置是: 1*blockSize - (1+1)*blockSize -1,
以此类推第i个线程负责的下载位置是:i*blockSize - (i+1)*blockSize -1;
即线程(编号为id)下载开始位置 start = id*block;
即线程(编号为id)下载结束位置 end = (id+1)*block -1;
3、设置http 请求头, conn.setRequestProperty(“Range”, “bytes=” + start + “-” + end);代码实现
一个简单的Java多线程下载代码如下:
package com.ricky.java.test.download; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.HttpURLConnection; import java.net.URL; public class Downloader { private URL url; // 目标地址 private File file; // 本地文件 private static final int THREAD_AMOUNT = 8; // 线程数 private static final String DOWNLOAD_DIR_PATH = "D:/Download"; // 下载目录 private int threadLen; // 每个线程下载多少 public Downloader(String address, String filename) throws IOException { // 通过构造函数传入下载地址 url = new URL(address); File dir = new File(DOWNLOAD_DIR_PATH); if(!dir.exists()){ dir.mkdirs(); } file = new File(dir, filename); } public void download() throws IOException { HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setConnectTimeout(5000); int totalLen = conn.getContentLength(); // 获取文件长度 threadLen = (totalLen + THREAD_AMOUNT - 1) / THREAD_AMOUNT; // 计算每个线程要下载的长度 System.out.println("totalLen="+totalLen+",threadLen:"+threadLen); RandomAccessFile raf = new RandomAccessFile(file, "rws"); // 在本地创建一个和服务端大小相同的文件 raf.setLength(totalLen); // 设置文件的大小 raf.close(); for (int i = 0; i < THREAD_AMOUNT; i++) // 开启3条线程, 每个线程下载一部分数据到本地文件中 new DownloadThread(i).start(); } private class DownloadThread extends Thread { private int id; public DownloadThread(int id) { this.id = id; } public void run() { int start = id * threadLen; // 起始位置 int end = id * threadLen + threadLen - 1; // 结束位置 System.out.println("线程" + id + ": " + start + "-" + end); try { HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setConnectTimeout(5000); conn.setRequestProperty("Range", "bytes=" + start + "-" + end); // 设置当前线程下载的范围 InputStream in = conn.getInputStream(); RandomAccessFile raf = new RandomAccessFile(file, "rws"); raf.seek(start); // 设置保存数据的位置 byte[] buffer = new byte[1024]; int len; while ((len = in.read(buffer)) != -1) raf.write(buffer, 0, len); raf.close(); System.out.println("线程" + id + "下载完毕"); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { String address = "http://dldir1.qq.com/qqfile/qq/QQ7.9/16621/QQ7.9.exe"; new Downloader(address, "QQ7.9.exe").download(); // String address = "http://api.t.dianping.com/n/api.xml?cityId=2"; // new Downloader(address, "2.xml").download(); } }
封装多线程下载
文件下载是一个常用的模块,我们可以对其封装一下,方便以后调用。涉及到的开发技术如下:
- JDK 1.7
- Eclipse Juno
- Maven 3
- HttpClient 4.3.6
工程目录结构如下所示:
com.ricky.common.java.download.FileDownloader
package com.ricky.common.java.download; import org.apache.log4j.Logger; import com.ricky.common.java.download.config.FileDownloaderConfiguration; /** * Java 文件多线程下载 * @author Ricky Fung * */ public class FileDownloader { protected Logger mLogger = Logger.getLogger("devLog"); private volatile static FileDownloader fileDownloader; private FileDownloaderEngine downloaderEngine; private FileDownloaderConfiguration configuration; public static FileDownloader getInstance(){ if(fileDownloader==null){ synchronized (FileDownloader.class) { if(fileDownloader==null){ fileDownloader = new FileDownloader(); } } } return fileDownloader; } protected FileDownloader(){ } public synchronized void init(FileDownloaderConfiguration configuration){ if (configuration == null) { throw new IllegalArgumentException("FileDownloader configuration can not be initialized with null"); } if (this.configuration == null) { mLogger.info("init FileDownloader"); downloaderEngine = new FileDownloaderEngine(configuration); this.configuration = configuration; }else{ mLogger.warn("Try to initialize FileDownloader which had already been initialized before."); } } public boolean download(String url, String filename){ return downloaderEngine.download(url, filename); } public boolean isInited() { return configuration != null; } public void destroy() { if(downloaderEngine!=null){ downloaderEngine.close(); downloaderEngine = null; } } }
com.ricky.common.java.download.config.FileDownloaderConfiguration
package com.ricky.common.java.download.config; import java.io.File; public class FileDownloaderConfiguration { private final int connectTimeout; private final int socketTimeout; private final int maxRetryCount; private final int coreThreadNum; private final long requestBytesSize; private final File downloadDestinationDir; private FileDownloaderConfiguration(Builder builder) { this.connectTimeout = builder.connectTimeout; this.socketTimeout = builder.socketTimeout; this.maxRetryCount = builder.maxRetryCount; this.coreThreadNum = builder.coreThreadNum; this.requestBytesSize = builder.requestBytesSize; this.downloadDestinationDir = builder.downloadDestinationDir; } public int getConnectTimeout() { return connectTimeout; } public int getSocketTimeout() { return socketTimeout; } public int getMaxRetryCount() { return maxRetryCount; } public int getCoreThreadNum() { return coreThreadNum; } public long getRequestBytesSize() { return requestBytesSize; } public File getDownloadDestinationDir() { return downloadDestinationDir; } public static FileDownloaderConfiguration.Builder custom() { return new Builder(); } public static class Builder { private int connectTimeout; private int socketTimeout; private int maxRetryCount; private int coreThreadNum; private long requestBytesSize; private File downloadDestinationDir; public Builder connectTimeout(int connectTimeout) { this.connectTimeout = connectTimeout; return this; } public Builder socketTimeout(int socketTimeout) { this.socketTimeout = socketTimeout; return this; } public Builder coreThreadNum(int coreThreadNum) { this.coreThreadNum = coreThreadNum; return this; } public Builder maxRetryCount(int maxRetryCount) { this.maxRetryCount = maxRetryCount; return this; } public Builder requestBytesSize(long requestBytesSize) { this.requestBytesSize = requestBytesSize; return this; } public Builder downloadDestinationDir(File downloadDestinationDir) { this.downloadDestinationDir = downloadDestinationDir; return this; } public FileDownloaderConfiguration build() { initDefaultValue(this); return new FileDownloaderConfiguration(this); } private void initDefaultValue(Builder builder) { if(builder.connectTimeout<1){ builder.connectTimeout = 6*1000; } if(builder.socketTimeout<1){ builder.socketTimeout = 6*1000; } if(builder.maxRetryCount<1){ builder.maxRetryCount = 1; } if(builder.coreThreadNum<1){ builder.coreThreadNum = 3; } if(builder.requestBytesSize<1){ builder.requestBytesSize = 1024*128; } if(builder.downloadDestinationDir==null){ builder.downloadDestinationDir = new File("./"); } } } }
com.ricky.common.java.download.FileDownloaderEngine
package com.ricky.common.java.download; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.util.BitSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.log4j.Logger; import com.ricky.common.java.download.config.FileDownloaderConfiguration; import com.ricky.common.java.download.job.DownloadWorker; import com.ricky.common.java.download.job.Worker.DownloadListener; public class FileDownloaderEngine { protected Logger mLogger = Logger.getLogger("devLog"); private FileDownloaderConfiguration configuration; private ExecutorService pool; private HttpRequestImpl httpRequestImpl; private File downloadDestinationDir; private int coreThreadNum; public FileDownloaderEngine(FileDownloaderConfiguration configuration){ this.configuration = configuration; this.coreThreadNum = configuration.getCoreThreadNum(); this.httpRequestImpl = new HttpRequestImpl(this.configuration); this.pool = Executors.newFixedThreadPool(this.configuration.getCoreThreadNum()); this.downloadDestinationDir = this.configuration.getDownloadDestinationDir(); if(!this.downloadDestinationDir.exists()){ this.downloadDestinationDir.mkdirs(); } } public boolean download(String url, String filename){ long start_time = System.currentTimeMillis(); mLogger.info("开始下载,url:"+url+",filename:"+filename); long total_file_len = httpRequestImpl.getFileSize(url); // 获取文件长度 if(total_file_len<1){ mLogger.warn("获取文件大小失败,url:"+url+",filename:"+filename); return false; } final BitSet downloadIndicatorBitSet = new BitSet(coreThreadNum); //标记每个线程下载是否成功 File file = null; try { file = new File(downloadDestinationDir, filename); RandomAccessFile raf = new RandomAccessFile(file, "rws"); // 在本地创建一个和服务端大小相同的文件 raf.setLength(total_file_len); // 设置文件的大小 raf.close(); mLogger.info("create new file:"+file); } catch (FileNotFoundException e) { mLogger.error("create new file error", e); } catch (IOException e) { mLogger.error("create new file error", e); } if(file==null || !file.exists()){ mLogger.warn("创建文件失败,url:"+url+",filename:"+filename); return false; } long thread_download_len = (total_file_len + coreThreadNum - 1) / coreThreadNum; // 计算每个线程要下载的长度 mLogger.info("filename:"+filename+",total_file_len="+total_file_len+",coreThreadNum:"+coreThreadNum+",thread_download_len:"+thread_download_len); CountDownLatch latch = new CountDownLatch(coreThreadNum);//两个工人的协作 for (int i = 0; i < coreThreadNum; i++){ DownloadWorker worker = new DownloadWorker(i, url, thread_download_len, file, httpRequestImpl, latch); worker.addListener(new DownloadListener() { @Override public void notify(int thread_id, String url, long start, long end, boolean result, String msg) { mLogger.info("thread_id:"+thread_id+" download result:"+result+",url->"+url); modifyState(downloadIndicatorBitSet, thread_id); } }); pool.execute(worker); } try { latch.await(); } catch (InterruptedException e) { mLogger.error("CountDownLatch Interrupt", e); } mLogger.info("下载结束,url:"+url+",耗时:"+((System.currentTimeMillis()-start_time)/1000)+"(s)"); return downloadIndicatorBitSet.cardinality()==coreThreadNum; } private synchronized void modifyState(BitSet bitSet, int index){ bitSet.set(index); } /**释放资源*/ public void close(){ if(httpRequestImpl!=null){ httpRequestImpl.close(); httpRequestImpl = null; } if(pool!=null){ pool.shutdown(); pool = null; } } }
com.ricky.common.java.download.job.DownloadWorker
package com.ricky.common.java.download.job; import java.io.File; import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; import com.ricky.common.java.download.HttpRequestImpl; import com.ricky.common.java.download.RetryFailedException; public class DownloadWorker extends Worker { protected Logger mLogger = Logger.getLogger("devLog"); private int id; private String url; private File file; private long thread_download_len; private CountDownLatch latch; private HttpRequestImpl httpRequestImpl; public DownloadWorker(int id, String url, long thread_download_len, File file, HttpRequestImpl httpRequestImpl, CountDownLatch latch) { this.id = id; this.url = url; this.thread_download_len = thread_download_len; this.file = file; this.httpRequestImpl = httpRequestImpl; this.latch = latch; } @Override public void run() { long start = id * thread_download_len; // 起始位置 long end = id * thread_download_len + thread_download_len - 1; // 结束位置 mLogger.info("线程:" + id +" 开始下载 url:"+url+ ",range:" + start + "-" + end); boolean result = false; try { httpRequestImpl.downloadPartFile(id, url, file, start, end); result = true; mLogger.info("线程:" + id + " 下载 "+url+ " range[" + start + "-" + end+"] 成功"); } catch (RetryFailedException e) { mLogger.error("线程:" + id +" 重试出错", e); }catch (Exception e) { mLogger.error("线程:" + id +" 下载出错", e); } if(listener!=null){ mLogger.info("notify FileDownloaderEngine download result"); listener.notify(id, url, start, end, result, ""); } latch.countDown(); } }
com.ricky.common.java.download.job.Worker
package com.ricky.common.java.download.job; public abstract class Worker implements Runnable { protected DownloadListener listener; public void addListener(DownloadListener listener){ this.listener = listener; } public interface DownloadListener{ public void notify(int thread_id, String url, long start, long end, boolean result, String msg); } }
com.ricky.common.java.download.HttpRequestImpl
package com.ricky.common.java.download; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import org.apache.commons.io.IOUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.log4j.Logger; import com.ricky.common.java.download.config.FileDownloaderConfiguration; import com.ricky.common.java.http.HttpClientManager; public class HttpRequestImpl { protected Logger mLogger = Logger.getLogger("devLog"); private int connectTimeout; private int socketTimeout; private int maxRetryCount; private long requestBytesSize; private CloseableHttpClient httpclient = HttpClientManager.getHttpClient(); public HttpRequestImpl(FileDownloaderConfiguration configuration){ connectTimeout = configuration.getConnectTimeout(); socketTimeout = configuration.getSocketTimeout(); maxRetryCount = configuration.getMaxRetryCount(); requestBytesSize = configuration.getRequestBytesSize(); } public void downloadPartFile(int id, String url, File file, long start, long end){ RandomAccessFile raf = null; try { raf = new RandomAccessFile(file, "rws"); } catch (FileNotFoundException e) { mLogger.error("file not found:"+file, e); throw new IllegalArgumentException(e); } int retry = 0; long pos = start; while(pos<end){ long end_index = pos + requestBytesSize; if(end_index>end){ end_index = end; } boolean success = false; try { success = requestByRange(url, raf, pos, end_index); } catch (ClientProtocolException e) { mLogger.error("download error,start:"+pos+",end:"+end_index, e); }catch (IOException e) { mLogger.error("download error,start:"+pos+",end:"+end_index, e); }catch (Exception e) { mLogger.error("download error,start:"+pos+",end:"+end_index, e); } // mLogger.info("线程:" + id +",download url:"+url+",range:"+ pos + "-" + end_index+",success="+success ); if(success){ pos += requestBytesSize; retry = 0; }else{ if(retry < maxRetryCount){ retry++; mLogger.warn("线程:" + id +",url:"+url+",range:"+pos+","+end_index+" 下载失败,重试"+retry+"次"); }else{ mLogger.warn("线程:" + id +",url:"+url+",range:"+pos+","+end_index+" 下载失败,放弃重试!"); throw new RetryFailedException("超过最大重试次数"); } } } } private boolean requestByRange(String url, RandomAccessFile raf, long start, long end) throws ClientProtocolException, IOException { HttpGet httpget = new HttpGet(url); httpget.setHeader("User-Agent", "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.152 Safari/537.36"); httpget.setHeader("Range", "bytes=" + start + "-" + end); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(connectTimeout) .setSocketTimeout(socketTimeout) .build(); httpget.setConfig(requestConfig); CloseableHttpResponse response = null; try { response = httpclient.execute(httpget); int code = response.getStatusLine().getStatusCode(); if(code==HttpStatus.SC_OK || code== HttpStatus.SC_PARTIAL_CONTENT){ HttpEntity entity = response.getEntity(); if (entity != null) { InputStream in = entity.getContent(); raf.seek(start);// 设置保存数据的位置 byte[] buffer = new byte[1024]; int len; while ((len = in.read(buffer)) != -1){ raf.write(buffer, 0, len); } return true; }else{ mLogger.warn("response entity is null,url:"+url); } }else{ mLogger.warn("response error, code="+code+",url:"+url); } }finally { IOUtils.closeQuietly(response); } return false; } public long getFileSize(String url){ int retry = 0; long filesize = 0; while(retry<maxRetryCount){ try { filesize = getContentLength(url); } catch (Exception e) { mLogger.error("get File Size error", e); } if(filesize>0){ break; }else{ retry++; mLogger.warn("get File Size failed,retry:"+retry); } } return filesize; } private long getContentLength(String url) throws ClientProtocolException, IOException{ HttpGet httpget = new HttpGet(url); httpget.setHeader("User-Agent", "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.152 Safari/537.36"); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(connectTimeout) .setSocketTimeout(socketTimeout) .build(); httpget.setConfig(requestConfig); CloseableHttpResponse response = null; try { response = httpclient.execute(httpget); int code = response.getStatusLine().getStatusCode(); if(code==HttpStatus.SC_OK){ HttpEntity entity = response.getEntity(); if (entity != null) { return entity.getContentLength(); } }else{ mLogger.warn("response code="+code); } }finally { IOUtils.closeQuietly(response); } return -1; } public void close(){ if(httpclient!=null){ try { httpclient.close(); } catch (IOException e) { e.printStackTrace(); } httpclient = null; } } }
最后是客户端调用代码
package com.ricky.common.java; import java.io.File; import com.ricky.common.java.download.FileDownloader; import com.ricky.common.java.download.config.FileDownloaderConfiguration; public class FileDownloaderTest { public static void main(String[] args) { FileDownloader fileDownloader = FileDownloader.getInstance(); FileDownloaderConfiguration configuration = FileDownloaderConfiguration .custom() .coreThreadNum(5) .downloadDestinationDir(new File("D:/Download")) .build(); fileDownloader.init(configuration); String url = "http://dldir1.qq.com/qqfile/qq/QQ7.9/16621/QQ7.9.exe";; String filename = "QQ7.9.exe"; boolean result = fileDownloader.download(url, filename); System.out.println("download result:"+result); fileDownloader.destroy(); //close it when you not need } }
源代码
- RandomAccessFile:
-
多线程下载技术论文.rar
2019-07-09 09:26:26具体研究内容:下载功能实现,包括单线程下载功能,多线程下载功能,多任务下载功能,删除任务的实现。断点续传等功能包括,下载过程中,暂停下载,承接上次未完成的下载任务。 关键词:多线程;线程安全;断点续传... -
java实现多线程下载技术
2013-03-11 22:11:32多线程下载技术,简单的说就是把要下载的文件分成几块,由不同的线程来负责每一块数据的下载任务。 要使用一个随机访问文件的类:RandomAccessFile类,具体用法请参考:... -
使用Java实现多线程下载技术
2020-07-21 09:39:38下载类: package com.gyx; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.*; public class Download extends Thread{ ... -
java多线程下载
2017-07-30 21:23:40多线程下载技术是很常见的一种下载方案,这种方式充分利用了多线程的优势,在同一时间段内通过多个线程发起下载请求,将需要下载的数据分割成多个部分,每一个线程只负责下载其中一个部分,然后将下载后的数据... -
Aria2多线程下载工具
2020-02-29 08:22:09Aria2可以静默运行在后台,下载资源的时候一键呼出,利用多线程下载技术,加快大文件的下载速度.下载速度 快的飞起! -
java多线程下载源码_Java多线程文件分片下载实现的示例代码
2021-02-13 02:15:41多线程下载介绍多线程下载技术是很常见的一种下载方案,这种方式充分利用了多线程的优势,在同一时间段内通过多个线程发起下载请求,将需要下载的数据分割成多个部分,每一个线程只负责下载其中一个部分,然后将下载... -
多线程下载
2009-08-11 10:50:20网络蚂蚁、FlashGet、迅雷等支持HTTP协议的下载软件无一例外地使用了多线程下载技术。比起单线程下载,多线程下载在同一时间段内发出多个下载请求,每个下载请求负责下载一段内容,充分地利用了网络带宽。 当然多... -
多线程下载文件
2011-07-13 14:11:09网络蚂蚁、FlashGet、迅雷等支持HTTP协议的下载软件无一例外地使用了多线程下载技术。比起单线程下载,多线程下载在同一时间段内发出多个下载请求,每个下载请求负责下载一段内容,充分地利用了网络带宽。 当然多... -
下载 多线程编程技术开发资料 高清完整PDF版
2017-05-16 20:01:41多线程编程技术开发资料.pdf 个人收集电子书,仅用学习使用,不可用于商业用途,如有版权问题,请联系删除! -
Android 多线程断点下载技术分享
2014-11-29 17:41:301、多线程下载的原理分析 2、网络编程初步 3、文件IO高级编程技术 4、多线程断点续传原理分析及设计 5、多线程断点续传Java版本的实现及优化 6、Android版本的多线程断点续传的实现与优化 7、... -
第八部分 多线程下载实战
2011-01-18 15:44:00实战多线程下载 现在很多下载软件像迅雷,旋风等都使用了多线程下载技术。比起单线程下载多线程下载在同一时间段内发出多个下载请求,每个下载请求负责下载一段内容,充分利用了网络宽带。 <br />因此本文... -
多线程技术下载文件
2008-06-27 15:04:00由于网络传输存在着传输速度... 该实例应用JAVA多线程技术,将一个网络文件分为若干块,每一个线程负责一块数据的下载,下载完毕后将其保存在指定的磁盘路径中. 首先创建继承Thread类的传输文件线程类,其JAVA文件名为SiteF -
IOS网络、多线程、shareSDK-使用EDG多线程技术下载图片
2018-11-22 17:44:05使用EDG中央调度多线程技术实现图片的异步下载 // // ViewController.swift // Dome2test // // Created by 郭文亮 on 2018/11/22. // Copyright © 2018年 finalliang. All rights reserved. // import ... -
java 实现多线程下载(1)
2012-10-08 15:05:18网络蚂蚁、flashget、迅雷等支持HTTP协议的下载软件无一例外地使用了多线程下载技术。比起单线程下载,多线程下载在同一时间段内发出多个下载请求,每个下载请求负责下载一段内存,充分地利用了网络带宽。当然多线程...
-
Query on A Tree(可持续01线段树+dfs序)
-
MYSQL长字符截断
-
转行做IT-第15章 Collection、泛型、Iterator
-
NFC-host-card-emulation-Android-master.zip
-
DT开源博客第一版.mp4
-
JavaScript数据结构——数组
-
牛牛量化策略交易
-
MySQL 高可用工具 heartbeat 实战部署详解
-
嵘泰股份首次公开发行股票招股说明书.pdf
-
线程状态
-
弘阳地产高管再变动:曾俊凯进、张良等人退,对千亿规模避而不提
-
MySQL 多平台多模式(安装、配置和连接 详解)
-
linux系统下文件夹没有权限
-
基于java实现的c++动态链接库调用案例
-
2021-03-03
-
UVA839 天平 Not so Mobile
-
用研转岗规划——案例2则
-
SpringBoot 2.3.x整合Swagger3.x接口文档+Knife4jUI
-
Java IF的多选择和嵌套结构 -04天 学习笔记
-
matlab即将消失的inline小朋友