精华内容
下载资源
问答
  • Asp.Net如何实现多线程上传大文件

    千次阅读 2012-07-17 11:25:14
    推荐使用StorageWebPlug(新颖网络上传插件)。是ActiveX插件。不过已经提供了JavaScript类库,调用方便。...//这里创建了一个文件上传任务,并将文件ID设置为1 upFile.ATL.UserName = "test"; //设置FTP服务器用户名 u
    推荐使用StorageWebPlug(新颖网络上传插件)。是ActiveX插件。不过已经提供了JavaScript类库,调用方便。

    代码示例:
    JScript code
    var upFile = new FileUploader(1);//这里创建了一个文件上传任务,并将文件ID设置为1 upFile.ATL.UserName = "test"; //设置FTP服务器用户名 upFile.ATL.UserPass = "test"; //设置FTP服务器密码 upFile.ATL.IP = "192.168.1.1"; //设置FTP服务器IP地址 upFile.ATL.Passive = false; //传输模式为被动模式 upFile.ATL.LocalFileName = "C:\\myfile.txt";//设置要上传的本地文件 upFile.ATL.RemoteFileName = "myfile.txt"; //设置目标文件名称 upFile.ATL.RemoteFolder = ""; //设置远程文件夹,用来存放用户上传的文件。这里默认为根目录。 //设置事件,这些事件必须设置,否则上传任务将无法执行 upFile.ATL.OnComplete = function() { } upFile.ATL.OnPost = function() { } upFile.ATL.OnStop = function() { } upFile.ATL.OnError = function() { } upFile.ATL.OnSuspend = function() { } upFile.ATL.OnContinue = function() { } upFile.Post(); //开始上传



    截图:



    详细介绍:http://www.ncmem.com/webplug/features/index.aspx
    技术论坛:http://www.5gzl.com/bbs/

    http://topic.csdn.net/u/20100714/20/adccf997-87a1-4f1f-9042-74bb34957622.html

    展开全文
  • jquery + php 实现大文件分片多线程上传的代码 分享给大家
  • 最近学习研究了下用http进行大文件上传:经过不断修复优化,功能实现如下: 1.上传大文件,(理论上GB以上都可以,上传速度视带宽而定); 2.实现多文件同时上传; 3.实现文件断点续传; 4.提供上传回调,显示...

    最近学习研究了下用http进行大文件上传:

    经过不断修复优化,功能实现如下:
    1.上传大文件;
    2.实现多文件同时上传;
    3.实现文件断点续传;
    4.提供上传回调,显示上传速度与进度;
    5.多线程上传,使用线程池进行管理;
    6.上传失败保存现场,下回继续上传;
    ……..

    直接贴代码,如有疑问请留言,参与讨论;

    有三个核心类:ResumableUploadUtil ,UpLoadFileInfo,UploadTimerTask

    ResumableUploadUtil 用于完成上传核心逻辑:

    package upload;
    
    import android.os.Handler;
    
    import  upload.UpLoadFileInfo;
    import  upload.ConstantValue;
    import  upload.MyApplicationLike;
    import  upload.ActivityStack;
    import  upload.Logger;
    import  upload.StringUtil;
    
    import org.json.JSONObject;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.OutputStream;
    import java.io.RandomAccessFile;
    import java.io.UnsupportedEncodingException;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.HashMap;
    import java.util.concurrent.Executors;
    
    
    /**
     * Created by LiKang on 2016/12/22 15:15.
     * email:15034671952@163.com
     */
    public class ResumableUploadUtil {
        private final String Tag = "ResumableUploadUtil";
        private final String BOUNDARY = "----androidUploadBinBoundary";
        private Handler handler;
        /**
         * 默认分段大小
         */
        public final static long defaultChunkSize = 1024 * 1024 * 3 / 2;
        /**
         * 默认缓冲区大小
         */
        public final static int defaultBufferSize = 1024 * 4;
        /**
         * 默认并发上传线程数
         */
        public final static int defalutConcurrentThreadsNum = 3;
    
        private int concurrentThreadsNum = ResumableUploadUtil.defalutConcurrentThreadsNum;
    
        public ResumableUploadUtil() {
            this.handler = new Handler();
        }
    
    
        /**
         * 上传 准备;
         */
        private void upLoadPrepare(UpLoadFileInfo fileInfo) {
            computeChunkNums(fileInfo);
            computeEachThreadChunkNum(fileInfo);
        }
    
        /**
         * 计算分几片段;
         */
        private void computeChunkNums(UpLoadFileInfo fileInfo) {
            long tempLastChunkSize = fileInfo.fileSize % defaultChunkSize;
            fileInfo.totalChunks = (int) (fileInfo.fileSize / defaultChunkSize);
            if (tempLastChunkSize != 0) {
                fileInfo.totalChunks += 1;
            }
            Logger.d(Tag, "totalChunks:" + fileInfo.totalChunks);
        }
    
        /**
         * 设置 工作线程信息
         */
        private void computeEachThreadChunkNum(UpLoadFileInfo fileInfo) {
            int eachThreadChunkNum = fileInfo.totalChunks / concurrentThreadsNum; //每一个 线程承担的 上传片段
            int remainedChunkNum = fileInfo.totalChunks % concurrentThreadsNum;
    
            for (int threadIndex = 0; threadIndex < concurrentThreadsNum; threadIndex++) {
                HashMap<String, Integer> perThreadInfo = new HashMap<>();
                if (remainedChunkNum > threadIndex) {
                    perThreadInfo.put("eachThreadChunkNum", eachThreadChunkNum + 1);
                    perThreadInfo.put("endThreadChunkIndex", threadIndex * (eachThreadChunkNum + 1) + eachThreadChunkNum);
                    perThreadInfo.put("curThreadChunkIndex", threadIndex * (eachThreadChunkNum + 1));
                    perThreadInfo.put("startThreadChunkIndex", threadIndex * (eachThreadChunkNum + 1));
                } else {
                    perThreadInfo.put("eachThreadChunkNum", eachThreadChunkNum);
                    perThreadInfo.put("endThreadChunkIndex", threadIndex * eachThreadChunkNum + eachThreadChunkNum - 1 + remainedChunkNum);
                    perThreadInfo.put("curThreadChunkIndex", threadIndex * eachThreadChunkNum + remainedChunkNum);
                    perThreadInfo.put("startThreadChunkIndex", threadIndex * eachThreadChunkNum + remainedChunkNum);
                }
                perThreadInfo.put("threadId", threadIndex);
                fileInfo.threadInfo.add(perThreadInfo);
            }
            Logger.d(Tag, "fileInfo.threadInfo:" + fileInfo.threadInfo);
        }
    
    
        /**
         * 获取当前片段大小
         *
         * @return
         */
        private long getCurrentChunkSize(int curThreadChunkIndex, UpLoadFileInfo fileInfo) {
            long tempLastChunkSize = fileInfo.fileSize % defaultChunkSize;
            if (tempLastChunkSize != 0) {
    
                if (curThreadChunkIndex == fileInfo.totalChunks - 1) {
                    //最后一片;
                    return tempLastChunkSize;
                } else {
                    return defaultChunkSize;
                }
    
            } else {
                return defaultChunkSize;
            }
    
        }
    
        /**
         * 重新计算当前上传大小;
         *
         * @return
         */
        private void recomputeHasUploadSize(UpLoadFileInfo fileInfo) {
            fileInfo.hasUploadSize = 0;
            fileInfo.hasUploadSizeBeforeOneSec = 0;
            for (int threadIndex = 0; threadIndex < concurrentThreadsNum; threadIndex++) {
                HashMap<String, Integer> perThreadInfo = fileInfo.threadInfo.get(threadIndex);
                int curThreadChunkIndex = perThreadInfo.get("curThreadChunkIndex");
                int startThreadChunkIndex = perThreadInfo.get("startThreadChunkIndex");
                for (int tempIndex = 0; tempIndex < curThreadChunkIndex - startThreadChunkIndex; tempIndex++) {
                    fileInfo.hasUploadSize += getCurrentChunkSize(tempIndex, fileInfo);
                }
            }
        }
    
        /**
         * 开始或继续上传;
         *
         * @param fileInfo
         */
        public void startUpload(final UpLoadFileInfo fileInfo) {
    
            if (fileInfo.uploadStatus == UploadStatus.NOTSTART) {
                upLoadPrepare(fileInfo);
            } else {
                recomputeHasUploadSize(fileInfo);
            }
            fileInfo.uploadStatus = ResumableUploadUtil.UploadStatus.UPLOADING;
            fileInfo.fixedThreadPool = Executors.newFixedThreadPool(concurrentThreadsNum);
            fileInfo.isBecauseDoBackgroundPause = false;
            startCountTime(fileInfo);
            if (fileInfo.resumableUploadListener != null) {
                fileInfo.resumableUploadListener.onUpLoadStart(fileInfo);
            }
    
            //多线程上传;
            for (int threadIndex = 0; threadIndex < concurrentThreadsNum; threadIndex++) {
                HashMap<String, Integer> threadInfotemp = fileInfo.threadInfo.get(threadIndex);
                Integer eachThreadChunkNum = threadInfotemp.get("eachThreadChunkNum");
                Integer curThreadChunkIndex = threadInfotemp.get("curThreadChunkIndex");
                Integer endThreadChunkIndex = threadInfotemp.get("endThreadChunkIndex");
                Integer threadId = threadInfotemp.get("threadId");
    
                Logger.e(Tag, "threadId:" + threadId + "," + "eachThreadChunkNum:" + eachThreadChunkNum + "," + "curThreadChunkIndex:" + curThreadChunkIndex + "endThreadChunkIndex:" + endThreadChunkIndex);
    
                if (eachThreadChunkNum != 0) {
                    doUpload(threadIndex, fileInfo);
                }
            }
        }
    
        /**
         * 上传暂停;
         *
         * @param becauseDoBackgroundPause 是否因为后台运行停止上传;
         * @param fileInfo
         */
        public void uploadPause(boolean becauseDoBackgroundPause, final UpLoadFileInfo fileInfo) {
    
            fileInfo.uploadStatus = ResumableUploadUtil.UploadStatus.PAUSE;
            fileInfo.isBecauseDoBackgroundPause = becauseDoBackgroundPause;
            stopCountTime(fileInfo);
            saveUploadFileInfo(fileInfo);
            if (fileInfo.resumableUploadListener != null) {
                handler.post(new Runnable() {
                    @Override
                    public void run() {
                        fileInfo.resumableUploadListener.onUpLoadPause(fileInfo);
                    }
                });
            }
            if (fileInfo.fixedThreadPool != null) {
                fileInfo.fixedThreadPool.shutdownNow();
                fileInfo.fixedThreadPool = null;
            }
        }
    
        /**
         * 上传错误
         *
         * @param e        异常类别
         * @param fileInfo
         */
        public synchronized void uploadError(final Exception e, final UpLoadFileInfo fileInfo) {
    
            //上传失败;1. 保存上传现场; 2. 提示失败原因
            if (fileInfo.uploadStatus != UploadStatus.UPLOADING) {
                return;
            }
            Logger.d(Tag, "uploadError");
            fileInfo.uploadStatus = ResumableUploadUtil.UploadStatus.ERROR;
            if (fileInfo.resumableUploadListener != null) {
                handler.post(new Runnable() {
                    @Override
                    public void run() {
    
                        fileInfo.resumableUploadListener.onUpLoadError(e, fileInfo);
                    }
                });
            }
    
            stopCountTime(fileInfo);
            saveUploadFileInfo(fileInfo);
            if (fileInfo.fixedThreadPool != null) {
                fileInfo.fixedThreadPool.shutdownNow();
                fileInfo.fixedThreadPool = null;
            }
        }
    
        /**
         * 上传成功
         *
         * @param fileInfo
         * @param url
         */
        private void uploadSuccess(final UpLoadFileInfo fileInfo, String url) {
            Logger.d(Tag, "任务上传成功!!!");
            fileInfo.uploadStatus = UploadStatus.SUCCESS;
            removeUploadFileInfo(fileInfo);
            stopCountTime(fileInfo);
            fileInfo.fileUrl = url;
            if (fileInfo.resumableUploadListener != null) {
                handler.post(new Runnable() {
                    @Override
                    public void run() {
                        fileInfo.resumableUploadListener.onUpLoadSuccess(fileInfo);
    
                    }
                });
            }
            if (fileInfo.fixedThreadPool != null) {
                fileInfo.fixedThreadPool.shutdownNow();
                fileInfo.fixedThreadPool = null;
            }
        }
    
        /**
         * 保存上传文件记录
         *
         * @param fileInfo
         */
        public void saveUploadFileInfo(UpLoadFileInfo fileInfo) {
            String cacheFileExtension = "";
            if (fileInfo.fileType.equals(ConstantValue.FILETYPE_VIDEO)) {
                cacheFileExtension = ConstantValue.cacheVideoExtension;
            } else if (fileInfo.fileType.equals(ConstantValue.FILETYPE_DOC)) {
                cacheFileExtension = ConstantValue.cacheDocExtension;
            }
            fileInfo.cacheUploadFilePath = CacheUploadInfo.saveUploadInfoFile + File.separator
                    + fileInfo.fileType + fileInfo.uploadFileId + "." + cacheFileExtension;
            Logger.d("cacheUploadFilePath", fileInfo.cacheUploadFilePath);
            CacheUploadInfo.writeObjectToFile(fileInfo, fileInfo.cacheUploadFilePath);
        }
    
        /**
         * 删除文件上传记录
         *
         * @param fileInfo
         */
        public void removeUploadFileInfo(UpLoadFileInfo fileInfo) {
    
            if (!StringUtil.isBlank(fileInfo.cacheUploadFilePath)) {
                File cacheUploadfile = new File(fileInfo.cacheUploadFilePath);
                if (cacheUploadfile.exists()) {
                    cacheUploadfile.delete();
                }
            }
        }
    
    
        /**
         * 开启一个任务上传;
         *
         * @param threadIndex
         * @param fileInfo
         */
        private void doUpload(final int threadIndex, final UpLoadFileInfo fileInfo) {
            if (fileInfo.fixedThreadPool == null)
                return;
    
            if (!fileInfo.fixedThreadPool.isShutdown()) {
                fileInfo.fixedThreadPool.execute(
                        new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    byte[] headerInfo = buildHeaderInfo(threadIndex, fileInfo);
                                    byte[] endInfo = ("\r\n--" + BOUNDARY + "--\r\n").getBytes("UTF-8");
                                    HttpURLConnection conn = initHttpConnection(fileInfo.remoteUrl);
                                    OutputStream out = conn.getOutputStream();
                                    out.write(headerInfo);
                                    writeToServer(threadIndex, conn, out, endInfo, fileInfo);//写数据;
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    uploadError(e, fileInfo);
    
                                }
                            }
                        });
            }
        }
    
    
        /**
         * 构建上传参数;
         *
         * @param threadIndex
         * @param fileInfo
         * @return
         * @throws UnsupportedEncodingException
         */
        private byte[] buildHeaderInfo(int threadIndex, UpLoadFileInfo fileInfo) throws UnsupportedEncodingException {
    
            HashMap<String, String> params = new HashMap<>();
            params.put("cloudUserGUID", fileInfo.comParams.get("cloudUserGUID"));
            params.put("notifyUrl", fileInfo.uploadSuccessCallback);
            params.put("fileType", fileInfo.fileType);
            params.put("storageServerGUID", fileInfo.storageServerGUID);
            params.put("resumableType", "application/x-zip-compressed");
            params.put("resumableTotalSize", fileInfo.fileSize + "");
            params.put("resumableIdentifier", fileInfo.fileSize + "-" + fileInfo.fileName + "");
            params.put("resumableFilename", fileInfo.fileName + "");
            params.put("resumableRelativePath", fileInfo.filePath);
            params.put("resumableChunkSize", defaultChunkSize + "");   //分片大小;
            params.put("resumableTotalChunks", fileInfo.totalChunks + "");
    
            HashMap<String, Integer> perThreadInfo = fileInfo.threadInfo.get(threadIndex);
            int curThreadChunkIndex = perThreadInfo.get("curThreadChunkIndex");
            params.put("resumableCurrentChunkSize", getCurrentChunkSize(curThreadChunkIndex, fileInfo) + "");  //当前片大小
            params.put("resumableChunkNumber", curThreadChunkIndex + 1 + "");
    
            StringBuilder sb = new StringBuilder();
            for (String key : params.keySet()) {
                sb.append("--" + BOUNDARY + "\r\n");
                sb.append("Content-Disposition: form-data; name=\"" + key + "\""
                        + "\r\n");
                sb.append("\r\n");
                sb.append(params.get(key) + "\r\n");
            }
    
            //上传文件的头
            sb.append("--" + BOUNDARY + "\r\n");
            sb.append("Content-Disposition: form-data; name=\"file\"; filename=\"" + fileInfo.fileName + "\"" + "\r\n");
            sb.append("Content-Type: application/octet-stream" + "\r\n");
            sb.append("\r\n");
    
    //        Logger.d(Tag, "headerInfo:" + sb.toString());
            Logger.d("buildHeaderInfo", "threadIndex:" + threadIndex + ",resumableTotalSize:"
                    + fileInfo.fileSize + ",resumableTotalChunks:"
                    + fileInfo.totalChunks + ",resumableCurrentChunkSize:"
                    + getCurrentChunkSize(curThreadChunkIndex, fileInfo) + ",resumableChunkNumber" + (curThreadChunkIndex + 1) + "");
    
            byte[] haderInfoBytes = sb.toString().getBytes("UTF-8");
            params = null;
            sb = null;
            return haderInfoBytes;
        }
    
        /**
         * 初始化 http连接
         *
         * @param url
         * @return
         * @throws IOException
         */
        private HttpURLConnection initHttpConnection(URL url) throws IOException {
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("POST");
            conn.setRequestProperty("Content-Type",
                    "multipart/form-data; boundary=" + BOUNDARY);
    
            conn.setConnectTimeout(30 * 1000);//设置0.5min 超时
            conn.setRequestProperty("Connection", "Keep-Alive");
            conn.setRequestProperty("Charset", "UTF-8");
            conn.setDoInput(true);
            conn.setUseCaches(false);
            conn.setDoOutput(true);
            return conn;
        }
    
    
        //写每一个片段数据
        public void writeToServer(int threadIndex, HttpURLConnection conn, OutputStream out, byte[] endInfo, final UpLoadFileInfo fileInfo) throws Exception {
            RandomAccessFile raf = new RandomAccessFile(new File(fileInfo.filePath), "r");//负责读取数据
            float filesize = fileInfo.fileSize;
            HashMap<String, Integer> perThreadInfo = fileInfo.threadInfo.get(threadIndex);
            int curThreadChunkIndex = perThreadInfo.get("curThreadChunkIndex");
    
            raf.seek(defaultChunkSize * curThreadChunkIndex);
            byte b[] = new byte[defaultBufferSize];//暂存容器
            int n = 0; //本次写出字节数
            long readLength = 0;//记录此片已读字节数
    
            while (readLength < getCurrentChunkSize(curThreadChunkIndex, fileInfo)) {
                //判断是否在后台运行
                boolean runningOnBackground = ActivityStack.isRunningOnBackground(MyApplicationLike.getContext());
                if (runningOnBackground) {
                    uploadPause(true, fileInfo);
                }
                if (fileInfo.uploadStatus != UploadStatus.UPLOADING) {
                    return;
                }
                n = raf.read(b, 0, defaultBufferSize);
                out.write(b, 0, n);
                readLength += n;
                fileInfo.hasUploadSize += n;
                fileInfo.updateTextProgress();
                fileInfo.uploadProgress = (fileInfo.hasUploadSize / filesize) * 100;
                Logger.d(Tag, "进度:" + fileInfo.uploadProgress + "%" + ",hasUploadSize:" + fileInfo.hasUploadSize + ",filesize:" + filesize);
    
                if (fileInfo.resumableUploadListener != null) {
                    handler.post(new Runnable() {
                        @Override
                        public void run() {
                            if (fileInfo.uploadStatus == UploadStatus.UPLOADING) {
                                fileInfo.resumableUploadListener.onUpLoading(fileInfo);
                            }
                        }
                    });
                }
            }
            out.write(endInfo);
            out.close();
            raf.close();
            //工作线程;
            raf = null;
            b = null;
            handleWriterResult(threadIndex, conn, fileInfo);
    
        }
    
        /**
         * 处理每一片上传结果
         *
         * @param threadIndex
         * @param conn
         * @param fileInfo
         * @throws Exception
         */
        private void handleWriterResult(int threadIndex, HttpURLConnection conn, final UpLoadFileInfo fileInfo) throws Exception {
            //response:
            final String responseMsg = getResponseMsg(conn);
            Logger.d(Tag, "responseMsg:" + responseMsg);
            //response:==================
            HashMap<String, Integer> perThreadInfo = fileInfo.threadInfo.get(threadIndex);
            int curThreadChunkIndex = perThreadInfo.get("curThreadChunkIndex");
            int eachThreadChunkNum = perThreadInfo.get("eachThreadChunkNum");
            int startThreadChunkIndex = perThreadInfo.get("startThreadChunkIndex");
            int endThreadChunkIndex = perThreadInfo.get("endThreadChunkIndex");
    
    
            if (curThreadChunkIndex != endThreadChunkIndex) {
    
                //非最后一段
                if (conn.getResponseCode() == 200) {
                    //继续上传
                    Logger.d("handle2WriterResult", "handleWriterResult: " + ",threadIndex:" + threadIndex +
                            " ,CurrentChunkSize:" + getCurrentChunkSize(curThreadChunkIndex, fileInfo) +
                            ",curThreadChunkIndex:" + curThreadChunkIndex +
                            ",eachThreadChunkNum:" + eachThreadChunkNum +
                            ",startThreadChunkIndex:" + startThreadChunkIndex +
                            ",endThreadChunkIndex:" + endThreadChunkIndex +
                            ",totalChunks:" + fileInfo.totalChunks);
                    Logger.d(Tag, "工作线程:" + threadIndex + "上传成功" + ",curThreadChunkIndex:" + curThreadChunkIndex);
                    if (curThreadChunkIndex < endThreadChunkIndex) {
                        curThreadChunkIndex += 1;//每个工作线程的当前 片
                        perThreadInfo.put("curThreadChunkIndex", curThreadChunkIndex);
                    }
    
                    doUpload(threadIndex, fileInfo);
                    Logger.d(Tag, "继续上传!!!");
                } else {
                    uploadError(null, fileInfo);
                }
    
            } else {
    
                //最后一段
                if (conn.getResponseCode() == 200) {
                    //上传成功
                    Logger.d("handle2WriterResult", "handleWriterResult: " + ",threadIndex:" + threadIndex +
                            " ,CurrentChunkSize:" + getCurrentChunkSize(curThreadChunkIndex, fileInfo) +
                            ",curThreadChunkIndex:" + curThreadChunkIndex +
                            ",eachThreadChunkNum:" + eachThreadChunkNum +
                            ",startThreadChunkIndex:" + startThreadChunkIndex +
                            ",endThreadChunkIndex:" + endThreadChunkIndex +
                            ",totalChunks:" + fileInfo.totalChunks);
                    Logger.d(Tag, "工作线程:" + threadIndex + "上传成功最后一段!!!");
                    if (curThreadChunkIndex < endThreadChunkIndex) {
                        curThreadChunkIndex += 1;//每个工作线程的当前 片
                        perThreadInfo.put("curThreadChunkIndex", curThreadChunkIndex);
                    }
    
                    if (!StringUtil.isBlank(responseMsg)) {
                        JSONObject object = new JSONObject(responseMsg);
                        final String url = String.valueOf(object.get("data"));
    
                        if (!StringUtil.isBlank(url)) {
                            uploadSuccess(fileInfo, url);
                        }
                        object = null;
                    }
                } else {
                    uploadError(null, fileInfo);
                }
            }
    
        }
    
    
        /**
         * 获取到每一片 上传后结果
         *
         * @param conn
         * @return
         * @throws IOException
         */
        private String getResponseMsg(HttpURLConnection conn) throws IOException {
            StringBuilder sbResponse = new StringBuilder();
            BufferedReader in = new BufferedReader(new InputStreamReader(conn
                    .getInputStream(), "UTF-8"));
            String inputLine;
            while ((inputLine = in.readLine()) != null) {
                sbResponse.append(inputLine);
            }
            in.close();
            String responseMsg = sbResponse.toString();
            in = null;
            sbResponse = null;
            return responseMsg;
        }
    
        public void startCountTime(UpLoadFileInfo fileInfo) {
            if (fileInfo.timerTask == null) {
                fileInfo.timerTask = new UploadTimerTask(fileInfo);
            }
            fileInfo.timerTask.start();
        }
    
        public void stopCountTime(UpLoadFileInfo fileInfo) {
            if (fileInfo.timerTask != null) {
                fileInfo.timerTask.stop();
            }
        }
    
        public enum UploadStatus {
            UPLOADING, SUCCESS, PAUSE, NOTSTART, ERROR
        }
    
        public interface ResumableUploadListener {
    
            void onUpLoading(UpLoadFileInfo fileInfo);
    
            void onUpLoadSuccess(UpLoadFileInfo fileInfo);
    
            void onUpLoadError(Exception e, UpLoadFileInfo fileInfo);
    
            void onUpLoadStart(UpLoadFileInfo fileInfo);
    
            void onUpLoadPause(UpLoadFileInfo fileInfo);
    
        }
    
        public void setResumableUploadListener(ResumableUploadUtil.ResumableUploadListener listener, UpLoadFileInfo fileInfo) {
            fileInfo.resumableUploadListener = listener;
        }
    
    }
    

    UpLoadFileInfo 用于保存上传记录:

    
    package upload;
    
    import upload.ResumableUploadUtil;
    import upload.UploadTimerTask;
    import upload.StringUtil;
    
    import java.io.Serializable;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    
    import static upload.StringUtil.getDataSize;
    
    
    public class UpLoadFileInfo implements Serializable {
        public URL remoteUrl;//上传地址
        public String fileUrl;//上传成功后文件地址
        public String fileName;
        public String filePath;
        public String cacheUploadFilePath;
        public long fileSize;
        public long hasUploadSize;
        public long hasUploadSizeBeforeOneSec;
        public long ModifiedDate;
        public long dbId; // id in the database, if is from database
        public float uploadProgress;
        public String uploadFileId;
        public int recLen = 0;//已经上传时间
        public HashMap<String, String> comParams;
        public String rootDir;
        public String storageServerGUID;
        public String uploadSuccessCallback;
        public String fileType;
        public String extension;
        public String textProgress = updateTextProgress();
        public String uploadSpeed = updateUploadSpeed();
        public boolean isBecauseDoBackgroundPause = false;
        public int totalChunks = 0;
        public transient UploadTimerTask timerTask;
        public transient ExecutorService fixedThreadPool;
        /**
         * 上传监听
         */
        public transient ResumableUploadUtil.ResumableUploadListener resumableUploadListener;
        /**
         * 上传状态
         */
        public ResumableUploadUtil.UploadStatus uploadStatus = ResumableUploadUtil.UploadStatus.NOTSTART;
    
        /**
         * 工作线程信息
         */
        public List<HashMap<String, Integer>> threadInfo = new ArrayList<>();
    
    
        public String updateTextProgress() {
            return textProgress = StringUtil.getDataSize(hasUploadSize) + "/" + StringUtil.getDataSize(fileSize);
        }
    
    
        public String updateUploadSpeed() {
            return uploadSpeed = getDataSize(hasUploadSize - hasUploadSizeBeforeOneSec) + "/s";
        }
    
    
        @Override
        public String toString() {
            return "UpLoadFileInfo{" +
                    "UploadTimerTask=" + timerTask +
                    ", remoteUrl=" + remoteUrl +
                    ", fileUrl='" + fileUrl + '\'' +
                    ", fileName='" + fileName + '\'' +
                    ", filePath='" + filePath + '\'' +
                    ", cacheUploadFilePath='" + cacheUploadFilePath + '\'' +
                    ", fileSize=" + fileSize +
                    ", hasUploadSize=" + hasUploadSize +
                    ", ModifiedDate=" + ModifiedDate +
                    ", dbId=" + dbId +
                    ", uploadProgress=" + uploadProgress +
                    ", uploadFileId=" + uploadFileId +
                    ", recLen=" + recLen +
                    ", comParams=" + comParams +
                    ", rootDir='" + rootDir + '\'' +
                    ", storageServerGUID='" + storageServerGUID + '\'' +
                    ", uploadSuccessCallback='" + uploadSuccessCallback + '\'' +
                    ", fileType='" + fileType + '\'' +
                    ", extension='" + extension + '\'' +
                    ", textProgress='" + textProgress + '\'' +
                    ", uploadSpeed='" + uploadSpeed + '\'' +
                    ", fixedThreadPool=" + fixedThreadPool +
                    ", resumableUploadListener=" + resumableUploadListener +
                    ", uploadStatus=" + uploadStatus +
                    ", totalChunks=" + totalChunks +
                    ", isBecauseDoBackgroundPause=" + isBecauseDoBackgroundPause +
                    ", threadInfo=" + threadInfo +
                    '}';
        }
    
    }
    

    UploadTimerTask 用于计时,计算上传速度:

    package upload;
    
    import android.os.SystemClock;
    
    import upload.UpLoadFileInfo;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class UploadTimerTask {
        private UpLoadFileInfo upLoadFileInfo;
        private boolean stop = true;
        private ExecutorService executorService = Executors.newSingleThreadExecutor();
    
        public UploadTimerTask(UpLoadFileInfo upLoadFileInfo) {
            this.upLoadFileInfo = upLoadFileInfo;
        }
    
    
        public void start() {
            this.stop = false;
            executorService.execute(
                    new Runnable() {
                        @Override
                        public void run() {
                            while (!stop) {
                                upLoadFileInfo.hasUploadSizeBeforeOneSec = upLoadFileInfo.hasUploadSize;
                                SystemClock.sleep(1000);
                                upLoadFileInfo.recLen++;
                                upLoadFileInfo.updateUploadSpeed();
                            }
                        }
                    }
            );
        }
    
        public void stop() {
            this.stop = true;
        }
    }
    展开全文
  • 2.利用contentBody 重写write的方法切割文件实现上传文件的切割用的是 RandomAccessFile 这个对象去读文件, RandomAccessFile raf = new RandomAccessFile(targetFile, "r");raf对象的seek()方法可跳跃


    这里提供两种文件分块的方式:

    1.手动进行文件的切分从而讲形成的块文件进行上传

    2.利用contentBody 重写write的方法切割文件实现上传

    对文件的切割用的是

    RandomAccessFile 这个对象去读文件,
     RandomAccessFile   raf = new RandomAccessFile(targetFile, "r");
    raf对象的seek()方法可跳跃前多少个字节进行读取文件,实现大文件的分块。
    
    

    package com.wondersgroup.wbgl.web.Test2;
    
    import com.wondersgroup.core.exceptions.ExcelException;
    import com.wondersgroup.core.util.DateUtil;
    import org.apache.commons.codec.digest.DigestUtils;
    import org.apache.commons.io.IOUtils;
    import org.apache.http.HttpResponse;
    import org.apache.http.NameValuePair;
    import org.apache.http.client.HttpClient;
    import org.apache.http.client.config.RequestConfig;
    import org.apache.http.client.methods.HttpPost;
    import org.apache.http.entity.ContentType;
    import org.apache.http.entity.mime.HttpMultipartMode;
    import org.apache.http.entity.mime.MultipartEntityBuilder;
    import org.apache.http.entity.mime.content.AbstractContentBody;
    import org.apache.http.entity.mime.content.ContentBody;
    import org.apache.http.entity.mime.content.StringBody;
    import org.apache.http.impl.client.HttpClientBuilder;
    
    
    import java.io.*;
    import java.util.*;
    import java.util.concurrent.*;
    
    
    public class Test {
        public static class BlockStreamBody extends AbstractContentBody {
            public static long CLOUD_API_LOGON_SIZE = 10 * 1024 * 1024;
    
            //给MultipartEntity看的2个参数
            private long blockSize = 0;//本次分块上传的大小
            private String fileName = null;//上传文件名
            //writeTo需要的3个参数
            private int blockNumber = 0, blockIndex = 0;//blockNumber分块数;blockIndex当前第几块
            private File targetFile = null;//要上传的文件
    
            private BlockStreamBody(String mimeType) {
                super(mimeType);
                // TODO Auto-generated constructor stub
            }
    
            /**
             * 自定义的ContentBody构造子
             *
             * @param blockNumber 分块数
             * @param blockIndex 当前第几块
             * @param targetFile 要上传的文件
             */
            public BlockStreamBody(int blockNumber, int blockIndex, File targetFile) {
                this("application/octet-stream");
                this.blockNumber = blockNumber;//blockNumber初始化
                this.blockIndex = blockIndex;//blockIndex初始化
                this.targetFile = targetFile;//targetFile初始化
                this.fileName = targetFile.getName();//fileName初始化
                //blockSize初始化
                if (blockIndex < blockNumber) {//不是最后一块,那就是固定大小了
                    this.blockSize = CLOUD_API_LOGON_SIZE;
                } else {//最后一块
                    this.blockSize = targetFile.length() - CLOUD_API_LOGON_SIZE * (blockNumber - 1);
                }
            }
    
            @Override
            public  void writeTo(OutputStream out) throws IOException {
                RandomAccessFile   raf = new RandomAccessFile(targetFile, "r");//负责读取数据
                byte b[] = new byte[1024];//暂存容器
                if (blockIndex == 1) {//第一块
                    int n = 0;
                    long readLength = 0;//记录已读字节数
                    while (readLength <= blockSize - 1024) {//大部分字节在这里读取
                        n = raf.read(b, 0, 1024);
                        readLength += 1024;
                        out.write(b, 0, n);
                    }
                    if (readLength <= blockSize) {//余下的不足 1024 个字节在这里读取
                        n = raf.read(b, 0, (int) (blockSize - readLength));
                        out.write(b, 0, n);
                    }
                } else if (blockIndex < blockNumber) {//既不是第一块,也不是最后一块
                    raf.seek(CLOUD_API_LOGON_SIZE * (blockIndex - 1));//跳过前[块数*固定大小 ]个字节
                    int n = 0;
                    long readLength = 0;//记录已读字节数
                    while (readLength <= blockSize - 1024) {//大部分字节在这里读取
                        n = raf.read(b, 0, 1024);
                        readLength += 1024;
                        out.write(b, 0, n);
                    }
                    if (readLength <= blockSize) {//余下的不足 1024 个字节在这里读取
                        n = raf.read(b, 0, (int) (blockSize - readLength));
                        out.write(b, 0, n);
                    }
                } else {//最后一块
                    raf.seek(CLOUD_API_LOGON_SIZE * (blockIndex - 1));//跳过前[块数*固定大小 ]个字节
                    int n = 0;
                    while ((n = raf.read(b, 0, 1024)) != -1) {
                        out.write(b, 0, n);
                    }
                }
                raf.close();
                //TODO 最后不要忘掉关闭out/raf
            }
    
            @Override
            public String getCharset() {
                // TODO Auto-generated method stub
                return null;
            }
    
            @Override
            public String getTransferEncoding() {
                // TODO Auto-generated method stub
                return "binary";
            }
    
            @Override
            public String getFilename() {
                // TODO Auto-generated method stub
                return fileName;
            }
    
            @Override
            public long getContentLength() {
                // TODO Auto-generated method stub
                return blockSize;
            }
        }
        private static BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(20);
    
        private static ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, blockingQueue, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                while (!blockingQueue.offer(r)) {
                }
            }
        });
    
        public static long CLOUD_API_LOGON_SIZE = 10* 1024*1024;
        public  static int timeOut = 60*60*1000;
    
        public  static  final String checkURL = "http://localhost:8080/claim/private/checked/checkChunk.do";
    
        public static  final  String mergeURL = "";
    
        public static  final  String uploadURL = "";
    
    
        public static void main(String[] args) throws Exception {
            String filePath = "E:\\workspace\\wbglProject\\dispatcher\\target\\dispatcher.war";
            System.out.println("开始上传"+ DateUtil.dateToString3(new Date()));
            uploadToDrive(filePath);
            System.out.println("上传结束"+ DateUtil.dateToString3(new Date()));
        }
    
        /**
         * 上传文件处理及参数封装
         *
         * @param filePath
         * @throws Exception
         */
        public static void uploadToDrive(String filePath)throws Exception {
            File targetFile = new File(filePath);
            HashMap<String, Object> params = new HashMap<String, Object>();
            File file = new File(filePath);
            params.put("fileName",file.getName());
            params.put("fileLength",file.length());
            String md5 = DigestUtils.md5Hex(new FileInputStream(file));
            params.put("md5",md5);
            BufferedReader in = null;
            long someExtra = 0;
            long targetFileSize = targetFile.length();
            int mBlockNumber = 0;
            if (targetFileSize < CLOUD_API_LOGON_SIZE) {
                mBlockNumber = 1;
                someExtra = targetFileSize;
            } else {
                mBlockNumber = (int) (targetFileSize / CLOUD_API_LOGON_SIZE);
                someExtra = targetFileSize
                        % CLOUD_API_LOGON_SIZE;
                if (someExtra > 0) {
                    mBlockNumber++;
                }
            }
            params.put("chunkNum", Integer.toString(mBlockNumber));
            List<Callable<String>> callableList = new ArrayList<Callable<String>>();
            // 定义BufferedReader输入流来读取URL的响应,设置编码方式
            for (int i = 1; i <= mBlockNumber; i++) {
                int chunkIndex = i;
                params.put("chunkIndex", i);
                long chunkSize = CLOUD_API_LOGON_SIZE;
                params.put("chunkSize", CLOUD_API_LOGON_SIZE);
                if (i == mBlockNumber) {
                    chunkSize = someExtra == 0 ? CLOUD_API_LOGON_SIZE : someExtra;
                    params.put("chunkSize", (someExtra == 0 ? CLOUD_API_LOGON_SIZE : someExtra) + "");
                }
                //
                UploadThread thread = new UploadThread(chunkIndex , chunkSize , targetFile , params,makeHeads());
                callableList.add(thread);
            }
            List<Future<String>> futures = executorService.invokeAll(callableList);
            restPost(mergeURL,params,makeHeads(),targetFile);
        }
    
    
        /**
         * 发送请求
         *
         * @param url 请求路径
         * @param params 请求参数
         * @param heads 请求头
         * @param targetFile 上传的文件
         * @return String
         */
        public static String restPost(String url, Map<String, Object> params, Map<String, String> heads,File targetFile) {
            HttpClient httpClient = null;
            HttpPost httpPost = new HttpPost(url);
            String content="";
            try {
                MultipartEntityBuilder mpEntity = MultipartEntityBuilder.create();
                for (String head : heads.keySet()) {
                    httpPost.setHeader(head, heads.get(head));
                }
                List<NameValuePair> pairs = new ArrayList<NameValuePair>();
                if (null != params && params.size() > 0) {
                    for (String param : params.keySet()) {
                        mpEntity.addPart(param, new StringBody(params.get(param).toString(), ContentType.MULTIPART_FORM_DATA));}
                }
                mpEntity.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
                if (targetFile != null && targetFile.exists()) {
                    ContentBody contentBody = new BlockStreamBody(Integer.parseInt(params.get("chunkNum").toString()), Integer.parseInt(params.get("chunkIndex").toString()), targetFile);
                    mpEntity.addPart("file", contentBody);
                }
                httpPost.setEntity(mpEntity.build());
                httpClient = HttpClientBuilder.create().build();
                RequestConfig.Builder build = RequestConfig.custom();
                build.setConnectTimeout(timeOut);
                build.setSocketTimeout(timeOut);
                HttpResponse execute = httpClient.execute(httpPost);
                content = IOUtils.toString(execute.getEntity().getContent(), "utf-8");
                System.out.println(content);
            }catch (Exception e){
                e.printStackTrace();
                throw new ExcelException(e.getMessage());
            }
            int i=0;
            i++;
            System.out.println(i+"=============响应结果==================\n"+content);
            System.out.println("=============end==================\n");
            return content.trim();
            }
    
        public static Map<String, String> makeHeads(){
            Map<String, String> heads = new HashMap<String, String>();
            heads.put("serviceNo","YB");
            heads.put("serviceKey","6a69b7af4eba48859d7e71ed652e958e");
            heads.put("fileType","4");
            return heads;
        }
    
    
        public static class UploadThread implements Callable<String>{
            private final int chunkIndex;
    
            private final long chunkSize;
    
            private final File targetFile;
    
            private Map<String , String> heads = new HashMap<String, String>();
    
            private Map<String , Object> params = new HashMap<String,Object>();
    
            public UploadThread(int chunkIndex, long chunkSize, File targetFile,Map<String,Object> params,Map<String , String> heads) {
                this.chunkIndex = chunkIndex;
                this.chunkSize = chunkSize;
                this.targetFile = targetFile;
                this.params.putAll(params);
                this.heads.putAll(heads);
            }
    
            @Override
            public String call() throws Exception {
                String s = restPost(checkURL, params, heads, null);
                Map<String,Object> retmap = (Map)com.alibaba.fastjson.JSON.parse(s);
                Map body =(Map) retmap.get("body");
                Object isExist = body.get("isExist");
                if(isExist.toString().equals("false")){
                    String s2 = restPost(uploadURL,params ,heads, targetFile);
                }
                return "";
            }
        }
    
    }
    
    
    2.手动切分文件实现上传

    package com.wondersgroup.wbgl.web.Test;
    
    import com.sun.org.apache.xpath.internal.operations.Bool;
    import com.wondersgroup.core.util.LogUtil;
    import org.apache.commons.codec.digest.DigestUtils;
    import org.apache.commons.io.FileUtils;
    import org.apache.commons.io.IOUtils;
    import org.apache.http.Consts;
    import org.apache.http.HttpEntity;
    import org.apache.http.client.methods.CloseableHttpResponse;
    import org.apache.http.client.methods.HttpPost;
    import org.apache.http.entity.ContentType;
    import org.apache.http.entity.mime.MultipartEntityBuilder;
    import org.apache.http.entity.mime.content.FileBody;
    import org.apache.http.entity.mime.content.StringBody;
    import org.apache.http.impl.client.CloseableHttpClient;
    import org.apache.http.impl.client.HttpClients;
    import org.apache.http.util.EntityUtils;
    import org.codehaus.jackson.map.ObjectMapper;
    import org.slf4j.Logger;
    
    import java.io.*;
    import java.nio.charset.Charset;
    import java.util.*;
    import java.util.concurrent.*;
    
    public class TestChunkUpload {
    
        private static final Logger logger = LogUtil.getLogger(TestChunkUpload.class);
    
        public  static  final String check_url = "http://localhost:8080/wbgl/service/claim/private/checked/checkChunk.do";
    
        public static  final  String upload_url = "http://localhost:8080/wbgl/service/claim/private/checked/merge.do";
    
        public static  final  String merge_url = "http://localhost:8080/wbgl/service/claim/private/checked/upload.do";
    
    /*    private static String check_url = "http://182.150.61.17:31001/wbglWeb/service/claim/private/checked/checkChunk.do";
        private static String upload_url = "http://182.150.61.17:31001/wbglWeb/service/claim/private/checked/upload.do";
        private static String merge_url = "http://182.150.61.17:31001/wbglWeb/service/claim/private/checked/merge.do";*/
        private static String serviceNo = "PK";
        private static String serviceKey = "b42bc810c84d45a08251a3ceeb84a7fe";
        private static String tempPath = "E:\\files";
    
        private ExecutorService executorService = Executors.newFixedThreadPool(5);
    
    
        public static void main(String[] args) {
            /* 验证拆分
            File file = new File("/Users/liqingdong/Temp/plsqldev1106x64.exe");
            Map<String, Object> chunkInfo = new TestChunkUpload().getChunkInfo(file, 7);
            System.out.println(chunkInfo.toString());
            */
    
            /* 验证合并
            try {
                File merge = new TestChunkUpload().merge("/Users/liqingdong/Temp/d9867da93be8fa1d64abb344adb30586");
                FileInputStream fis = new FileInputStream(merge);
                String md5Hex = DigestUtils.md5Hex(fis);
                fis.close();
                System.out.println(md5Hex);
            } catch (IOException e) {
                e.printStackTrace();
            }
            */
    
            File file = new File("D:\\BaiduNetdiskDownload\\12345.mp4");
            TestChunkUpload testChunkUpload = new TestChunkUpload();
            try {
                testChunkUpload.uploadByThread("4", file);
            } catch (InterruptedException e) {
                logger.error(e.getMessage());
            }
    
    
        }
    
        /**
         * 文件上传
         *
         * @param fileType 上传文件类型
         * @param file     上传文件
         * @return
         */
        @SuppressWarnings("unchecked")
        public boolean upload(String fileType, File file) {
            logger.info("文件:{} 开始上传...", file.getName());
            long start = System.currentTimeMillis();
            Map<String, Object> chunkInfo = getChunkInfo(file, 10);
            List<Map<String, Object>> chunkList = (List<Map<String, Object>>) chunkInfo.get("chunkList");
            int successCount = 0;
            for (Map<String, Object> chunkFileInfo : chunkList) {
                boolean exist = checkChunk(fileType, chunkInfo.get("md5").toString(), (Integer) chunkFileInfo.get("chunkIndex"), (Integer) chunkInfo.get("chunkNum"), (Long) chunkFileInfo.get("chunkSize"));
                if (exist) continue;
                boolean success = doUpload(fileType, (File) chunkFileInfo.get("chunkFile"), file.getName(), (Long) chunkInfo.get("fileLength"), chunkInfo.get("md5").toString(), (Integer) chunkFileInfo.get("chunkIndex"), (Integer) chunkInfo.get("chunkNum"), (Long) chunkFileInfo.get("chunkSize"));
                if (success) successCount++;
            }
            boolean result = successCount == Integer.valueOf(chunkInfo.get("chunkNum").toString()) && merge(fileType, file.getName(), (Long) chunkInfo.get("fileLength"), chunkInfo.get("md5").toString(), (Integer) chunkInfo.get("chunkNum"));
    
            logger.info("文件上传结束。本次上传总耗时:" + (System.currentTimeMillis() - start) / 1000 + "秒");
    
            // 删除本地临时分块文件
            File tempDir = new File(tempPath + File.separator + chunkInfo.get("md5").toString());
            try {
                FileUtils.deleteDirectory(tempDir);
            } catch (IOException e) {
                logger.error("删除临时文件失败,{}", e.getMessage());
            }
            return result;
        }
    
        /**
         * 多线程文件上传
         *
         * @param fileType
         * @param file
         * @return
         */
        @SuppressWarnings("unchecked")
        public boolean uploadByThread(String fileType, File file) throws InterruptedException {
            logger.info("文件:{} 开始上传...", file.getName());
            long start = System.currentTimeMillis();
            Map<String, Object> chunkInfo = getChunkInfo(file, 10);
    
            List<Map<String, Object>> chunkList = (List<Map<String, Object>>) chunkInfo.get("chunkList");
            List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
    
            for (Map<String, Object> chunkFileInfo : chunkList) {
                Callable<Boolean> task = task(chunkInfo, chunkFileInfo, fileType, file);
                tasks.add(task);
            }
            executorService.invokeAll(tasks);
    
            boolean result = merge(fileType, file.getName(), (Long) chunkInfo.get("fileLength"), chunkInfo.get("md5").toString(), (Integer) chunkInfo.get("chunkNum"));
    
            logger.info("文件上传结束。本次上传总耗时:" + (System.currentTimeMillis() - start) / 1000 + "秒");
            return result;
        }
    
        private Callable<Boolean> task(final Map<String, Object> chunkInfo, final Map<String, Object> chunkFileInfo, final String fileType, final File file) {
            return new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    boolean result = false;
                    System.out.println(Thread.currentThread().getName() + "开始执行...");
                    boolean exist = checkChunk(fileType, chunkInfo.get("md5").toString(), (Integer) chunkFileInfo.get("chunkIndex"), (Integer) chunkInfo.get("chunkNum"), (Long) chunkFileInfo.get("chunkSize"));
                    if (!exist) {
                        result = doUpload(fileType, (File) chunkFileInfo.get("chunkFile"), file.getName(), (Long) chunkInfo.get("fileLength"), chunkInfo.get("md5").toString(), (Integer) chunkFileInfo.get("chunkIndex"), (Integer) chunkInfo.get("chunkNum"), (Long) chunkFileInfo.get("chunkSize"));
                    }
    
                    System.out.println(Thread.currentThread().getName() + "执行结束.");
                    return exist || result;
                }
            };
        }
    
        /**
         * 获取文件分块信息
         *
         * @param file        文件
         * @param defaultSize 单块文件大小(单位:M)
         * @return
         */
        private Map<String, Object> getChunkInfo(File file, int defaultSize) {
            Map<String, Object> result = new HashMap<String, Object>();
            FileInputStream fis = null;
            try {
                fis = new FileInputStream(file);
                String md5 = DigestUtils.md5Hex(fis);
                result.put("md5", md5);
    
                long fileLength = file.length();
                result.put("fileLength", fileLength);
    
                int defaultByteSize = defaultSize * 1024 * 1024;
                int chunkNum = (int) Math.ceil(fileLength / (double) defaultByteSize);
                result.put("chunkNum", chunkNum);
    
                File chunkFile;
                FileOutputStream fos;
                File parentDirectory = new File(tempPath + File.separator + md5);
                // 删除原有MD5目录,重新生成
                if (parentDirectory.exists()) FileUtils.deleteDirectory(parentDirectory);
                parentDirectory.mkdirs();
    
                HashMap<String, Object> chunkFileInfo;// 单块文件信息
                List<Map<String, Object>> chunkFiles = new ArrayList<Map<String, Object>>();// 文件块信息集合
                byte[] bytes = new byte[defaultByteSize];
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                for (int chunkIndex = 1; chunkIndex <= chunkNum; chunkIndex++) {
                    chunkFile = new File(parentDirectory, chunkIndex + ".temp");
                    chunkFile.createNewFile();
                    fos = new FileOutputStream(chunkFile);
    
                    int offset = (chunkIndex - 1) * defaultByteSize;// 文件读取偏移量
                    if (chunkIndex == chunkNum) {
                        bytes = new byte[((Long) (fileLength - offset)).intValue()];
                    }
    
                    raf.seek(offset);
                    raf.read(bytes);
                    fos.write(bytes);
                    fos.flush();
                    fos.close();
                    chunkFileInfo = new HashMap<String, Object>();
                    chunkFileInfo.put("chunkFile", chunkFile);
                    chunkFileInfo.put("chunkIndex", chunkIndex);
                    chunkFileInfo.put("chunkSize", chunkFile.length());
                    chunkFiles.add(chunkFileInfo);
                }
                raf.close();
    
                result.put("chunkList", chunkFiles);
            } catch (IOException e) {
                logger.error(e.getMessage());
            } finally {
                try {
                    if (fis != null) fis.close();
                } catch (IOException e) {
                    logger.error(e.getMessage());
                }
            }
    
            return result;
        }
    
        /**
         * 校验文件块
         *
         * @param fileType
         * @param md5
         * @param chunkIndex
         * @param chunkNum
         * @param chunkSize
         * @return
         */
        @SuppressWarnings("unchecked")
        private boolean checkChunk(String fileType, String md5, int chunkIndex, int chunkNum, long chunkSize) {
            CloseableHttpClient httpClient = null;
            CloseableHttpResponse response = null;
            try {
                httpClient = HttpClients.createDefault();
    
                HttpPost httpPost = new HttpPost(check_url);
    
                httpPost.addHeader("serviceNo", serviceNo);
                httpPost.addHeader("serviceKey", serviceKey);
                httpPost.addHeader("fileType", fileType);
    
                HttpEntity reqEntity = MultipartEntityBuilder.create()
                        .addPart("md5", new StringBody(md5, ContentType.create("text/plain", Consts.UTF_8)))
                        .addPart("chunkIndex", new StringBody(String.valueOf(chunkIndex), ContentType.create("text/plain", Consts.UTF_8)))
                        .addPart("chunkNum", new StringBody(String.valueOf(chunkNum), ContentType.create("text/plain", Consts.UTF_8)))
                        .addPart("chunkSize", new StringBody(String.valueOf(chunkSize), ContentType.create("text/plain", Consts.UTF_8)))
                        .build();
    
                httpPost.setEntity(reqEntity);
    
                // 发起请求 并返回请求的响应
                response = httpClient.execute(httpPost);
    
                // 获取响应对象
                HttpEntity resEntity = response.getEntity();
                if (resEntity != null) {
                    String result = EntityUtils.toString(resEntity, Charset.forName("UTF-8"));
                    EntityUtils.consume(resEntity);// 销毁
    
                    logger.info("文件块[" + chunkIndex + "],校验响应结果:" + result);
    
                    Map<String, Object> map = new ObjectMapper().readValue(result, HashMap.class);
                    if (Boolean.valueOf(map.get("success").toString())) {
                        Map<String, Object> body = (Map<String, Object>) map.get("body");
                        return Boolean.valueOf(body.get("isExist").toString());
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage());
            } finally {
                try {
                    if (response != null) response.close();
                    if (httpClient != null) httpClient.close();
                } catch (IOException e) {
                    logger.error(e.getMessage());
                }
            }
            return false;
        }
    
        /**
         * 分块上传
         *
         * @param fileType   文件类型
         * @param file       当前文件块
         * @param md5        文件MD5
         * @param chunkIndex 当前文件分块下标
         * @param chunkNum   文件总块数
         * @param chunkSize  文件大小
         */
        @SuppressWarnings("unchecked")
        private boolean doUpload(String fileType, File file, String fileName, long fileLength, String md5, int chunkIndex, int chunkNum, long chunkSize) {
            CloseableHttpClient httpClient = null;
            CloseableHttpResponse response = null;
            try {
                httpClient = HttpClients.createDefault();
    
                HttpPost httpPost = new HttpPost(upload_url);
    
                httpPost.addHeader("serviceNo", serviceNo);
                httpPost.addHeader("serviceKey", serviceKey);
                httpPost.addHeader("fileType", fileType);
    
                HttpEntity reqEntity = MultipartEntityBuilder.create()
                        .addPart("file", new FileBody(file))
                        .addPart("md5", new StringBody(md5, ContentType.create("text/plain", Consts.UTF_8)))
                        .addPart("fileName", new StringBody(fileName, ContentType.create("text/plain", Consts.UTF_8)))
                        .addPart("chunkIndex", new StringBody(String.valueOf(chunkIndex), ContentType.create("text/plain", Consts.UTF_8)))
                        .addPart("chunkNum", new StringBody(String.valueOf(chunkNum), ContentType.create("text/plain", Consts.UTF_8)))
                        .addPart("chunkSize", new StringBody(String.valueOf(chunkSize), ContentType.create("text/plain", Consts.UTF_8)))
                        .addPart("fileLength", new StringBody(String.valueOf(fileLength), ContentType.create("text/plain", Consts.UTF_8)))
                        .build();
    
                httpPost.setEntity(reqEntity);
    
                // 发起请求 并返回请求的响应
                response = httpClient.execute(httpPost);
    
                // 获取响应对象
                HttpEntity resEntity = response.getEntity();
                if (resEntity != null) {
                    String result = EntityUtils.toString(resEntity, Charset.forName("UTF-8"));
                    EntityUtils.consume(resEntity);
    
                    logger.info("文件块[" + chunkIndex + "],上传响应结果:" + result);
                    Map<String, Object> map = new ObjectMapper().readValue(result, Map.class);
                    return Boolean.valueOf(map.get("success").toString());
                }
            } catch (Exception e) {
                logger.error(e.getMessage());
            } finally {
                try {
                    if (response != null) response.close();
                    if (httpClient != null) httpClient.close();
                } catch (IOException e) {
                    logger.error(e.getMessage());
                }
            }
            return false;
        }
    
        /**
         * 本地测试文件合并
         *
         * @param path
         * @return
         * @throws IOException
         */
        public File merge(String path) throws IOException {
            File file = new File(path);
            File[] files = file.listFiles();
            TreeSet<File> treeSet = new TreeSet<File>(new Comparator<File>() {
                @Override
                public int compare(File o1, File o2) {
                    return o1.getName().substring(0, o1.getName().indexOf(".")).compareTo(o2.getName().substring(0, o2.getName().indexOf(".")));
                }
            });
    
            treeSet.addAll(Arrays.asList(files));
            File merge = new File(file.getParentFile(), "new.exe");
            if (merge.exists()) FileUtils.deleteQuietly(merge);
            merge.createNewFile();
            FileOutputStream fos = new FileOutputStream(merge);
            FileInputStream fis;
            for (File f : treeSet) {
                fis = new FileInputStream(f);
                IOUtils.copy(fis, fos);
                fis.close();
            }
            fos.flush();
            fos.close();
    
            return merge;
        }
    
        /**
         * 分块文件合并
         *
         * @param fileType
         * @param fileName
         * @param fileLength
         * @param md5
         * @param chunkNum
         * @return
         */
        @SuppressWarnings("unchecked")
        private boolean merge(String fileType, String fileName, long fileLength, String md5, int chunkNum) {
            CloseableHttpClient httpClient = null;
            CloseableHttpResponse response = null;
            try {
                httpClient = HttpClients.createDefault();
    
                HttpPost httpPost = new HttpPost(merge_url);
    
                httpPost.addHeader("serviceNo", serviceNo);
                httpPost.addHeader("serviceKey", serviceKey);
                httpPost.addHeader("fileType", fileType);
    
                HttpEntity reqEntity = MultipartEntityBuilder.create()
                        .addPart("md5", new StringBody(md5, ContentType.create("text/plain", Consts.UTF_8)))
                        .addPart("fileName", new StringBody(fileName, ContentType.create("text/plain", Consts.UTF_8)))
                        .addPart("chunkNum", new StringBody(String.valueOf(chunkNum), ContentType.create("text/plain", Consts.UTF_8)))
                        .addPart("fileLength", new StringBody(String.valueOf(fileLength), ContentType.create("text/plain", Consts.UTF_8)))
                        .build();
    
                httpPost.setEntity(reqEntity);
    
                // 发起请求 并返回请求的响应
                response = httpClient.execute(httpPost);
    
                // 获取响应对象
                HttpEntity resEntity = response.getEntity();
                if (resEntity != null) {
                    String result = EntityUtils.toString(resEntity, Charset.forName("UTF-8"));
                    EntityUtils.consume(resEntity);
    
                    logger.info("文件:{}合并响应结果:" + result, fileName);
                    Map<String, Object> map = new ObjectMapper().readValue(result, Map.class);
                    return Boolean.valueOf(map.get("success").toString());
                }
            } catch (Exception e) {
                logger.error(e.getMessage());
            } finally {
                try {
                    if (response != null) response.close();
                    if (httpClient != null) httpClient.close();
                } catch (IOException e) {
                    logger.error(e.getMessage());
                }
            }
            return false;
        }
    }
    

    展开全文
  • 多线程同时上传文件实现

    千次阅读 2016-09-26 21:58:24
    一 首先需要一个结构体 这个结构体应当拥有 (1)源文件的句柄 (2)目标文件的句柄 (3)写文件的起始位置 (4)写文件的终止位置二 每一个结构体代表了文件的一部分 ...开始定义线程内的函数,通过lseek函数找到应当


    首先需要一个结构体
    这个结构体应当拥有
    (1)源文件的句柄
    (2)目标文件的句柄
    (3)写文件的起始位置
    (4)写文件的终止位置


    每一个结构体代表了文件的一部分
    如果说把文件分成了5份,那就应该定义5个结构体,然后求出文件的大小,把文件的大小分成五份,
    然后记录这一部分应当从文件的何处拷贝到何处,以及目标文件和源文件的句柄


    开始定义线程内的函数,通过lseek函数找到应当找到的位置
    然后开始拷贝就好了

    以下是我参照别人代码自己实现的多线程拷贝,但不知道为什么有时候会拷贝失误一下下,但是总体是ok的

    #include <pthread.h>
    #include <sys/stat.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <fcntl.h>
    #include <string.h>
    #include <unistd.h>
    #include <sys/types.h>
    
    #define THREAD_BUFF_SIZE 1024
    
    typedef struct filePart
    {
        size_t start;
        size_t end;
        int infd;//源文件
        int outfd;//目的
    }filePart;
    
    void* transFile(void* arg)
    {
         filePart* block = (filePart*)arg;
         char buf[THREAD_BUFF_SIZE];
         int ret;
       //  size_t count = block->start;
    
         ret = lseek(block->infd,block->start,SEEK_SET);
         ret = lseek(block->outfd,block->start,SEEK_SET);
    
         int size = 0;
         while((size = read(block->infd,buf,sizeof(buf)))>0)
         {
             write(block->outfd,buf,size);
         }
         pthread_exit(NULL);
    
    }
    
    size_t get_filesize(int fd)
    {
        struct stat st;
        fstat(fd,&st);
        return st.st_size;
    
    }
    
    
    int main(int argc,char* argv[])
    {
        if(argc<2)
        {
            printf("Wrong!");
            return -1;
        }
    
        int infd = open(argv[1],O_RDONLY);
        int outfd = open(argv[2],O_CREAT|O_WRONLY,0644);
    
        size_t file_size = get_filesize(infd);
    
        int n = 5;//设置五份
    
        filePart* blocks = (filePart*)malloc(sizeof(filePart)*n);
        int i = 0;
        size_t percent = file_size/n;
    
        for(;i<n;++i)
        {
             blocks[i].infd = infd;
             blocks[i].outfd = outfd;
             blocks[i].start = i*percent;
             blocks[i].end = blocks[i].start + percent;
        }
    
        blocks[i].end = file_size;
    
        pthread_t ptid[5];
    
        for(i = 0 ; i < n; ++i)
        {
            pthread_create(&ptid[i],NULL,transFile,&(blocks[i]));
        }
        for(i = 0 ; i < n; ++i)
        {
            pthread_join(ptid[i],NULL);
        }
         //释放资源
         free(blocks);
         close(infd);
         close(outfd);
         printf("Copy Successfully \n");
        return 0;
    }
    
    
    展开全文
  • jQuery+PHP实现大文件分片后多线程上传,返回上传后的URLPHP端上传代码session_start(); //文件分片上传 class uploader{ //执行上传(的name属性,保存路径-相对当前路径) public function upload($name,$...
  • jQuery+PHP实现大文件分片后多线程上传 (博主原创)PHP端上传代码session_start();//文件分片上传classuploader{//执行上传(的name属性,保存路径-相对当前路径)publicfunctionupload($name,$savedir='uploads'){$...
  • 最近遇见一个需要上传超大大文件的需求,调研了七牛和腾讯云的切片分段上传功能,因此在此整理前端大文件上传相关功能的实现。 在某些业务中,大文件上传是一个比较重要的交互场景,如上传入库比较大的Excel表格...
  • 相信有些小伙伴已经了解大文件上传的解决方案,在上传大文件时,为了提高上传的效率,我们一般会使用 Blob.slice 方法对大文件按照指定的大小进行切割,然后在开启多线程进行分块上传,等所有分块都成功上传后,再...
  • 大文件上传及断点续传,要求:支持50G级的单个文件上传和续传。续传要求:在刷新浏览器后能够续传上传,在重启浏览器后能够继续上传上(关闭浏览器后重新打开),在重启电脑后能够继续上传。 支持PC端全平台,...
  • 最近遇见一个需要上传超大大文件的需求,调研了七牛和腾讯云的切片分段上传功能,因此在此整理前端大文件上传相关功能的实现。 在某些业务中,大文件上传是一个比较重要的交互场景,如上传入库比较大的Excel表格...
  • 最近抽空做个小工具,使用AWSSDK 对本地文件目录监控,并自动同步上传文件到S3 的过程,使用的是多线程异步上传,针对大文件进行了分块 参考文献: https://www.codeproject.com/Articles/131678/Amazon-S-Sync ...
  • 用户本地有一份txt或者csv文件,无论是从业务数据库导出、还是其他途径获取,当需要使用蚂蚁的数据分析工具进行数据加工、挖掘和共创应用的时候,首先要将本地文件上传至ODPS,普通的小文件通过浏览器上传至服务器...
  • 在Web应用系统开发中,文件上传和下载功能是非常常用的功能,今天来讲一下JavaWeb中的文件上传和下载功能的实现。 先说下要求: PC端全平台支持,要求支持Windows,Mac,Linux 支持所有浏览器。 支持文件批量上传 ...
  • 前言:因自己负责的项目(jetty内嵌启动的SpringMvc)中需要实现文件上传,而自己对java文件上传这一块未接触过,且对 Http 协议较模糊,故这次采用渐进的方式来学习文件上传的原理与实践。该博客重在实践。 一. ...
  • 用java技术实现大文件上传,可以上传视频、电影、大文档,采用分片功能,多线程上传数据,解决普通组件大文件上传速度慢,甚至不能上传的问题。
  • 我用有证书的Applet在客户端用多线程通过socket将文件上传。在本机上测试好像可以,但是通过网络上传时就会发生掉包现象,必须重新上传。虽然这是程序自动的,但这样也会浪费大量的时间,有时会重复好
  • java两台服务器之间,大文件上传(续传),采用了Socket通信机制以及JavaIO流两个技术点,具体思路如下: 实现思路: 1、服:利用ServerSocket搭建服务器,开启相应端口,进行长连接操作 2、服:使用...
  • ASP.NET上传文件用FileUpLoad就可以,但是对文件夹的操作却不能用FileUpLoad来实现。 下面这个示例便是使用ASP.NET来实现上传文件夹并对文件夹进行压缩以及解压。 ASP.NET页面设计:TextBox和Button按钮。 ...
  • 前后端要高度配合,需要双方约定好一些数据,才能完成大文件分块,我们在项目中要重点解决的以下问题。 *如何分片; *如何合成一个文件; *中断了从哪个分片开始。 如何分,利用强大的js库,来减轻我们的工作,...
  • IE的自带下载功能中没有断点续传功能,要实现断点续传功能,需要用到HTTP协议中鲜为人知的几个响应头和请求头。 一.两个必要响应头Accept-Ranges、ETag 客户端每次提交下载请求时,服务端都要添加这两个响应头,...
  • 谁能告诉我一下,实现B/S架构实现上传500MB~2GB文件实现有进度条上传
  • 程序录制视频的文件大概5M左右,客户反应...,并开启多线程同时上传这些个文件。理论上听起来好像这种方法可以,但是具体要怎么实现。或者有什么 别的方法可以提高文件上传速度。请求服务器调用的webservice,不能改。
  • Java 多线程下载大文件(断点续传)

    千次阅读 2020-01-27 20:25:39
    前段时间在项目上遇到了一个上传大文件的问题,其实如果文件比较小,很好处理;...其实想实现一个完备的这种系统是很难的,本文主要是以多线程下本地大文件传输为例简单介绍一下原理(下载网络资源...
  • 最近工作中有使用到OSS的分片上传API,整体流程就是前端将大文件进行分割,每个分片大小是5MB,分片个数是:(文件大小/分片大小),前端多线程处理上传分片到后端,后端接收到分片后调用OSS验证是否存在接口校验之前...
  • 此项目主要实现:在服务器端上传大文件 使用的技术点: 文件流读写、进度条、多线程、WCF服务接口的实现和回调 文件服务接口 3个功能: 1.文件准备上传 2.文件传输(上传按钮) 3.文件传输终止 1.搭建客户端和服务器端...
  • 多线程网络传输大都没有实现大于2G文件,在前人基础上改写的真正的多线程网络传输,不受文件大小的限制!先上传大文件上传的完整代码,有需要大文件超过2G的下载,请留言!
  • 前端大文件上传网上的大部分文章已经给出了解决方案,核心是利用 Blob.prototype.slice 方法,和数组的 slice 方法相似,调用的 slice 方法可以返回原文件的某个切片。 这样我们就可以根据预先设置好的切片最大数量...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 420
精华内容 168
关键字:

多线程实现大文件上传