精华内容
下载资源
问答
  • Java多线程并发下载文件工具 … HttpClient 出处:https://blog.csdn.net/JinglongSource/article/details/102559449 import cn.shaines.core.utils.HttpClient.Response; import ...

    Java多线程并发下载文件工具

    HttpClient 出处:https://blog.csdn.net/JinglongSource/article/details/102559449

    import cn.shaines.core.utils.HttpClient.Response;
    import cn.shaines.core.utils.HttpClient.Response.BodyHandlers;
    import java.io.File;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.net.HttpURLConnection;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 学习资源:
     * https://blog.csdn.net/qq_34401512/article/details/77867576
     *
     * ConcurrentDownLoad
     *         .builder()
     *         // 设置URL
     *         .setUrl("http://117.148.175.41/cdn/pcfile/20190904/16/58/GeePlayerSetup_app.exe?dis_k=26cbb7b142c2446397843d6543da209ae&dis_t=1573620667&dis_dz=CMNET-GuangDong&dis_st=36")
     *         // 设置线程每次请求的大小
     *         .setBlockSize(1024)
     *         // 设置线程数量
     *         .setThreadCount(10)
     *         // 设置保存路径
     *         .setPath("C:\\Users\\houyu\\Desktop\\GeePlayerSetup_app_my33333333.exe")
     *         // 设置存在是否删除
     *         .setDeleteIfExist(true)
     *         // 创建
     *         .build()
     *         // 开始
     *         .start((msg, total, current, speed) -> {});
     *
     * @description 并发下载文件工具
     * @date 2019-11-12 14:35:26
     * @author houyu for.houyu@foxmail.com
     */
    public class ConcurrentDownLoad {
    
        private static final Logger log = LoggerFactory.getLogger(ConcurrentDownLoad.class);
    
        private Builder builder;
        /** HttpClient */
        private HttpClient httpClient;
        /** 线程池 */
        private ThreadPoolExecutor poolExecutor;
        /** 信号量 */
        private Semaphore semaphore;
        /** CountDownLatch */
        private CountDownLatch countDownLatch;
        /** 总长度 */
        private long total;
        /** 当前的进度 */
        private AtomicLong current;
        /** 回调方法 */
        private Callback callback;
    
        public static Builder builder() {
            return new Builder();
        }
    
        protected ConcurrentDownLoad(Builder builder) {
            this.builder = builder;
            httpClient = HttpClient.buildHttpClient();
            poolExecutor = new ThreadPoolExecutor(builder.threadCount, builder.threadCount,0L,TimeUnit.SECONDS,
                                                  new LinkedBlockingQueue<>(), new AbortPolicy());
        }
    
        private Long getContentLength() {
            Response<Long> response = httpClient.buildRequest(this.builder.url).GET().execute((request, http) -> {
                if(http.getResponseCode() == HttpURLConnection.HTTP_OK) {
                    return http.getContentLengthLong();
                }
                return null;
            });
            return response.getBody();
        }
    
        private List<long[]> getPaces(long totalLength) {
            List<long[]> paces = new ArrayList<>(16);
            long currentLength = totalLength;
            long startIndex = 0;
            long endIndex;
            while(currentLength > 0) {
                long size = currentLength >= this.builder.blockSize ? this.builder.blockSize : currentLength;
                endIndex = startIndex + size;
                endIndex = endIndex >= totalLength ? totalLength : endIndex;
                paces.add(new long[]{startIndex, endIndex});
                currentLength = currentLength - size;
                startIndex = endIndex + 1;
            }
            return paces;
        }
    
        public void start(Callback call) {
            this.run((msg, total, current, speed) -> {
                log.debug("msg:{} total:{} current:{} speed:{}", msg, total, current, speed);
                call.accept(msg, total, current, speed);
            });
        }
    
        public void start() {
            this.start((msg, total, current, speed) -> {});
        }
    
        private void run(Callback call) {
            try {
                this.callback = call;
                callback.accept("start...", 0, 0, 0);
                Long totalLength = getContentLength();
                if(totalLength == null) {
                    callback.accept("获取文件的长度失败", 0, 0, 0);
                    throw new RuntimeException("获取文件的长度失败");
                }
                total = totalLength;
                callback.accept(String.format("文件总长度:%s字节(B)", total), total, 0, 0);
                //
                this.builder.setBlockSize(this.builder.blockSize >= totalLength ? totalLength : this.builder.blockSize);
                //
                File file = new File(this.builder.path);
                if(file.exists()) {
                    callback.accept("文件存在", total, 0, 0);
                    if(builder.keepOnIfDisconnect && new File(this.builder.path + ".conf").exists()) {
                        // 支持断点
                        // String conf = Files.readString(Paths.get(this.builder.path + ".conf"), Charset.forName("UTF-8"));
                        // String[] split = conf.split(";");
                        // for(String s : split) {
                        //     s.split("-")
                        // }
                        // continueList.add()
                    } else {
                        if(builder.deleteIfExist) {
                            file.delete();
                            callback.accept("删除文件", total, 0, 0);
                            initFile(totalLength);
                        }
                    }
                } else {
                    callback.accept("文件不存在, 创建目录", total, 0, 0);
                    file.getParentFile().mkdirs();
                    initFile(totalLength);
                }
                //
                semaphore = new Semaphore(this.builder.threadCount);
                current = new AtomicLong(0);
                List<long[]> paces = getPaces(totalLength);
                countDownLatch = new CountDownLatch(paces.size());
                for(long[] pace : paces) {
                    callback.accept(String.format("pace:%s - %s", pace[0], pace[1]), total, 0, 0);
                    poolExecutor.submit(new DownLoadThread(pace[0], pace[1]));
                }
                try {
                    countDownLatch.await();
                } catch(InterruptedException e) {
                    e.printStackTrace();
                }
                poolExecutor.shutdown();
                callback.accept(String.format("下载完成:%s", this.builder.url), total, current.get(), 0);
            } catch(Exception e) {
                callback.accept(e.getMessage(), total, current.get(), 0);
            }
    
        }
    
        private void initFile(Long totalLength) throws IOException {
            RandomAccessFile raf = new RandomAccessFile(this.builder.path, "rwd");
            // 指定创建的文件的长度
            raf.setLength(totalLength);
            raf.close();
        }
    
        /**
         * 内部类用于实现下载并组装
         */
        private class DownLoadThread implements Runnable {
            /** 下载起始位置 */
            private long startIndex;
            /** 下载结束位置 */
            private long endIndex;
    
            public DownLoadThread(long startIndex, long endIndex) {
                this.startIndex = startIndex;
                this.endIndex = endIndex;
            }
    
            @Override
            public void run() {
                try (RandomAccessFile file = new RandomAccessFile(builder.path, "rwd")) {
                    semaphore.acquire();
                    file.seek(startIndex);
                    httpClient.buildRequest(builder.url)
                            // 添加请求头
                            .addHeader("Range", "bytes=" + startIndex + "-" + endIndex)
                            // 执行请求
                            // .execute(BodyHandlers.ofCallbackByteArray(file::write))
                            .execute(BodyHandlers.ofCallbackByteArray((data, index, length) -> {
                                file.write(data, index, length);
                                callback.accept("download...", total, current.addAndGet(length), 0);
                            }))
                    ;
                } catch(Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                    semaphore.release();
                }
            }
        }
    
        public static class Builder {
            /** 同时下载的线程数*/
            private int threadCount = 5;
            /** 每个线程每次执行的文件大小(b) 0.5M */
            private long blockSize = 1024 * 512;
            /** 服务器请求路径 */
            private String url;
            /** 本地路径 */
            private String path;
            /** 存在是否删除 */
            private boolean deleteIfExist = false;
            /** 是否断点续传 */
            private boolean keepOnIfDisconnect = true;
    
            public Builder setThreadCount(int threadCount) {
                this.threadCount = threadCount;
                return this;
            }
            public Builder setBlockSize(long blockSizeOfKb) {
                this.blockSize = blockSizeOfKb * 1024;
                return this;
            }
            public Builder setUrl(String url) {
                this.url = url;
                return this;
            }
            public Builder setPath(String path) {
                this.path = path;
                return this;
            }
            public Builder setDeleteIfExist(boolean deleteIfExist) {
                this.deleteIfExist = deleteIfExist;
                return this;
            }
            public Builder setKeepOnIfDisconnect(boolean keepOnIfDisconnect) {
                this.keepOnIfDisconnect = keepOnIfDisconnect;
                return this;
            }
    
            public ConcurrentDownLoad build() {
                return new ConcurrentDownLoad(this);
            }
        }
    
        public interface Callback {
            /**
             * 回调方法
             * @param msg 消息
             * @param total 总量
             * @param current 当前量
             * @param speed 速度(k/s) 暂时不实现
             */
            void accept(String msg, long total, long current, long speed);
        }
    
    
    
    }
    
    展开全文
  • java 多线程 并发 处理 大文件

    千次阅读 2019-10-10 23:46:39
    坚持打卡! 这个主要实现的是,多线程处理大文件,这里的大文件指的是好几十M的文件,例如我下边写的处理几百万条数据,对他们进行过滤,得到想要的...去并发的读取同一个文件 2,我这里线程的实现是通过实现Call...

    坚持打卡!


    这个主要实现的是,多线程处理大文件,这里的大文件指的是好几十M的文件,例如我下边写的处理几百万条数据,对他们进行过滤,得到想要的数据并输出到指定的文件中。

    一开始走了不少弯路,我现在讲一下我的主要实现的思路(这里也参考了很多大佬们的意见),当自己写出来的时候才是属于自己的。


    主要思路:

    1,创建线程池,其多个线程。去并发的读取同一个文件

    2,我这里线程的实现是通过实现Callable接口,重写call()

    3,既然要实现多线程,并发。所以每一个线程就要负责执行自己的操作,我之前存在一个缺点:虽然我起了很多线程去读,但是这些线程都是共用一个读的对象,即就是要等待其他线程处理完后才轮到自己,这就是串行,还不如一个单线程去执行。查阅了方法,有一种实现的方式是:对所要读取的文件进行按字节拆分,分块(区域)。即有多少个线程就把源文件分割成多少块(区域),每个线程就只对自己所负责的块进行操作,这样就互不影响了。

    4,RandomAccessFile 这个就是上面实现的核心类,大家可以去仔细查阅下,我目前了解还不是很深入。下面讲的地方如果不正确,还请大佬们多多指正。

    5,分块就涉及一个问题,分割的时候。切点就切在了一行数据的中间,怎么办?

     下雨了我要回家收衣服了。

     

    所以我们要对这个起始位置,和末尾位置的字符进行判断,是否是换行符:\r\n  ,  \r   ,  \n

    不是的话,我们要实现一种递归,去找到当行的起始位置,这样就可以对这一整行进行完整的输出了。

    6,把结果输出到指定文件中,这时候就可以使用RandomAccessFile 的write 方法。write()方法在调用结束之后自动移动文件指针,所以你不需要频繁地把指针移动到下一个将要写入数据的位置。所以最终得到处理后的结果数据是无序的。

    (我也测试过,每个线程自己输出到一个临时文件,最终再合并成一个一个总文件,并删除临时文件。这样的结果是有序的,但是感觉有些画蛇添足。不知道各位有没有更好的实现方法)


    参考代码:

    BufferIO.class

    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.Future;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author SanYi&&HLing
     * 
     *         目的:多线程---实现对百万数据---的txt文件进行---处理,并输出到指定的txt文件中
     * */
    public class BufferIO {
    	/**
    	 * 核心线程池大小 4
    	 * */
    	public static final int CORE_POOL_SIZE = 4;
    	/**
    	 * 最大线程池大小 4
    	 * */
    	public static final int MAX_MUM_POOL_SIZE = 4;
    	/**
    	 * 线程最大空闲时间
    	 * */
    	public static final int KEEP_ALIVE_TIME = 1;
    
    	/**
    	 * main 入口类
    	 * */
    	public static void main(String[] args) throws IOException,
    			InterruptedException {
    		// 测试执行时间
    		long start = System.currentTimeMillis();
    		// 创建RandomAccessFile 操作文件
    		RandomAccessFile fileWirter = null;
    		// 创建线程池对象
    		ThreadPoolExecutor executor = null;
    
    		try {
    			// 构造线程池对象
    			executor = new ThreadPoolExecutor(CORE_POOL_SIZE,
    					MAX_MUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES,
    					new ArrayBlockingQueue<Runnable>(1));
    			// 多个 tasks
    			List<CopyFile> tasks = new ArrayList<>();
    			// 指定输出的文件
    			fileWirter = new RandomAccessFile("result.txt", "rw");
    			// 添加执行的线程
    			for (int i = 0; i < CORE_POOL_SIZE; i++) {
    				tasks.add(new CopyFile("data.txt", CORE_POOL_SIZE, i + 1,
    						fileWirter));
    			}
    			// 任务批量提交并执行
    			List<Future<Object>> futures = executor.invokeAll(tasks);
    			// 获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
    			for (Future<Object> future : futures) {
    				future.get();
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			// 关闭
    			if (fileWirter != null) {
    				fileWirter.close();
    			}
    			if (executor != null) {
    				executor.shutdown();
    			}
    		}
    		// 测试执行时间
    		long end = System.currentTimeMillis();
    		System.out.println("执行了" + (end - start) + "ms");
    	}
    
    }

    CopyFile.class

    import java.io.RandomAccessFile;
    import java.util.concurrent.Callable;
    
    /**
     * @author SanYi&&HLing 
     * 	通过实现Callable接口,重写 call 方法来实现线程
     * */
    public class CopyFile implements Callable<Object> {
    
    	// 私有化
    	private String fileName;
    	private int threadSize = 1;
    	private int currentBlock = 1;
    	private RandomAccessFile fileWirter;
    
    	/**
    	 * 有参构造方法
    	 * 
    	 * @param fileName
    	 *            源文件
    	 * @param threadSize
    	 *            线程数
    	 * @param currentBlock
    	 *            当前块
    	 * @param fileWirter
    	 *            写对象
    	 * */
    	public CopyFile(String fileName, int threadSize, int currentBlock,
    			RandomAccessFile fileWirter) {
    		this.fileName = fileName;
    		this.threadSize = threadSize;
    		this.currentBlock = currentBlock;
    		this.fileWirter = fileWirter;
    	}
    
    	/**
    	 * 重写的 call()
    	 * */
    	@Override
    	public Object call() throws Exception {
    		// 置空
    		RandomAccessFile fileAccess = null;
    		try {
    
    			// 起始位置 currentPosition, 末尾位置 nextPosition
    
    			// 定义对源文件只读
    			fileAccess = new RandomAccessFile(fileName, "r");
    			// 获取源文件的总字节
    			long len = fileAccess.length();
    			// 根据线程的数量进行分块,每个线程负责操作自己的块内容。达到并发的效果
    			long step = len / threadSize;
    			// 根绝当前线程编号,得到起始指针指向位置
    			long currentPosition = step * (currentBlock - 1);
    			// 得到末尾指针指向位置
    			long nextPosition = currentPosition + step;
    
    			// 分块的时候,会出现分割的临界点在一行数据的中间,此时需要制定一个策略
    			// 我这里设置的是,头指针如果不是指向这一行的开头,那么使用递归向前一位查找
    			// 对比判断是否是换行符,是则将本行输出
    			// 换行符有三种:\r\n, \r, \n
    
    			// 处理"\r\n"情况
    			// 指向末尾,获取字符,判断是否\r\n中的\n,是的话末尾位置向下移一位,可以将本行输出
    			fileAccess.seek(nextPosition);
    			char lastChar = (char) fileAccess.read();
    			if (lastChar == '\n') {
    				nextPosition += 1;
    			}
    			// 指向起始位置
    			fileAccess.seek(currentPosition);
    
    			if (currentPosition > 0) {
    				currentPosition += 1;
    				long offset = 1;
    				// 起始位置后退一位
    				fileAccess.seek(currentPosition - offset);
    				// 获得后退一位的字符
    				int beforeChar = fileAccess.read();
    				// 递归判断是否是 \r, \n
    				while (beforeChar != '\n' && beforeChar != '\r') {
    					offset += 1;
    					fileAccess.seek(currentPosition - offset);
    					beforeChar = fileAccess.read();
    				}
    			}
    
    			// 临时变量
    			String lineValue = "";
    			long filePointer;
    			// 判断当行是否为空
    			while ((lineValue = fileAccess.readLine()) != null) {
    				// 获得当前指向的字节下标
    				filePointer = fileAccess.getFilePointer();
    				// 届点
    				if (filePointer != len && filePointer > nextPosition) {
    					break;
    				} else if (!lineValue.isEmpty()) {
    					// 这里是过滤的条件
    					if (!lineValue.endsWith("@live.com")) {
    						// 写
    						fileWirter.write((lineValue + "\n").getBytes());
    					}
    				}
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			// 关闭
    			if (fileAccess != null) {
    				fileAccess.close();
    			}
    		}
    		return null;
    	}
    
    }
    

    时间有点赶,可能有些地方写的不是很好。还请大家多多批评指正,若觉得本文章有帮助到你,还希望可以点赞鼓励下。谢谢阅读~

    展开全文
  • Java调用Zip类批量压缩多个文件,此前有一个是压缩单个文件,也可参考,相关代码中可找到此源码。  public class ZipDemo extends JFrame{  JFileChooser fileChooser; //文件选择器  JList fileList; //待...
  • 主要为大家详细介绍了java实现动态上传多个文件,并解决文件重名问题的方法,感兴趣的小伙伴们可以参考一下
  • java ftp上传文件 支持并发

    千次阅读 2018-11-29 10:21:42
    import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import...
    package com.dl.utils;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.MalformedURLException;
    import java.util.Map;
    
    import org.apache.commons.net.ftp.FTPClient;
    import org.apache.commons.net.ftp.FTPFile;
    import org.apache.commons.net.ftp.FTPReply;
    
    public class FTPUtils {
    	
            //ftp服务器地址
    //		private static String hostname = ReadRootPathUtils.getItemsPath();
            //ftp服务器端口号默认为21
    //		private static Integer port = 21 ;
            //ftp登录账号
    //		private static String username = "root";
            //ftp登录密码
    //		private static String password = "123";
            
    //		private static FTPClient ftpClient = null;
            
            /**
             * 初始化ftp服务器
             */
            public static FTPClient initFtpClient() {
            	FTPClient ftpClient = null;
            	Map<String, String> ftp = ReadRootPathUtils.getFTP();
                ftpClient = new FTPClient();
                ftpClient.setControlEncoding("utf-8");
                try {
                    System.out.println("connecting...ftp服务器:"+ftp.get("ftpIp")+":"+ftp.get("port")); 
                    ftpClient.connect(ftp.get("ftpIp"), Integer.parseInt(ftp.get("port"))); //连接ftp服务器
                    
                    ftpClient.login(ftp.get("userName"),ftp.get("pwd") ); //登录ftp服务器
                    
                    int replyCode = ftpClient.getReplyCode(); //是否成功登录服务器
                    if(!FTPReply.isPositiveCompletion(replyCode)){
                        System.out.println("connect failed...ftp服务器:"+ftp.get("ftpIp")+":"+ftp.get("port")); 
                    }
                    System.out.println("connect successfu...ftp服务器:"+ftp.get("ftpIp")+":"+ftp.get("port")); 
                }catch (MalformedURLException e) { 
                   e.printStackTrace(); 
                }catch (IOException e) { 
                   e.printStackTrace(); 
                }
    			return ftpClient; 
            }
            
            
    
            /**
            * 上传文件
            * @param pathname ftp服务保存地址
            * @param fileName 上传到ftp的文件名
            *  @param originfilename 待上传文件的名称(绝对地址) * 
            * @return
            */
            public static boolean uploadFile( String pathname, String fileName,String originfilename){
            	boolean flag = false;
                InputStream inputStream = null;
                FTPClient initFtpClient = initFtpClient();
                try{
                    System.out.println("开始上传文件");
                    //把文件转化为流
                    inputStream = new FileInputStream(new File(originfilename));
                    //初始化ftp
                    initFtpClient();
                    //设置编码
                    initFtpClient.setFileType(initFtpClient.BINARY_FILE_TYPE);
                    //文件需要保存的路径
                    CreateDirecroty(pathname,initFtpClient);
                    //
                    initFtpClient.makeDirectory(pathname);
                    //
                    initFtpClient.changeWorkingDirectory(pathname);
                    //
                    initFtpClient.storeFile(fileName, inputStream);
                    
                    inputStream.close();
                    initFtpClient.logout();
                    flag = true;
                    System.out.println("上传文件成功");
                }catch (Exception e) {
                    System.out.println("上传文件失败");
                    e.printStackTrace();
                }finally{
                    if(initFtpClient.isConnected()){ 
                        try{
                        	initFtpClient.disconnect();
                        }catch(IOException e){
                            e.printStackTrace();
                        }
                    } 
                    if(null != inputStream){
                        try {
                            inputStream.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        } 
                    } 
                }
                return true;
            }
            /**
             * 上传文件
             * @param pathname ftp服务保存地址
             * @param fileName 上传到ftp的文件名
             * @param inputStream 输入文件流 
             * @return
             */
            public static boolean uploadFile( String pathname, String fileName,InputStream inputStream){
            	FTPClient initFtpClient = initFtpClient();
            	boolean flag = false;
                try{
                    System.out.println("开始上传文件");
                    initFtpClient();
                    initFtpClient.setFileType(initFtpClient.BINARY_FILE_TYPE);
                    CreateDirecroty(pathname,initFtpClient);
                    initFtpClient.makeDirectory(pathname);
                    initFtpClient.changeWorkingDirectory(pathname);
                    initFtpClient.storeFile(fileName, inputStream);
                    inputStream.close();
                    initFtpClient.logout();
                    flag = true;
                    System.out.println("上传文件成功");
                }catch (Exception e) {
                    System.out.println("上传文件失败");
                    e.printStackTrace();
                }finally{
                    if(initFtpClient.isConnected()){ 
                        try{
                        	initFtpClient.disconnect();
                        }catch(IOException e){
                            e.printStackTrace();
                        }
                    } 
                    if(null != inputStream){
                        try {
                            inputStream.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        } 
                    } 
                }
                return true;
            }
            //改变目录路径
             public static boolean changeWorkingDirectory(String directory ,FTPClient initFtpClient) {
            	 boolean flag = true;
                    try {
                        flag = initFtpClient.changeWorkingDirectory(directory);
                        if (flag) {
                          System.out.println("进入文件夹" + directory + " 成功!");
    
                        } else {
                            System.out.println("进入文件夹" + directory + " 失败!开始创建文件夹");
                        }
                    } catch (IOException ioe) {
                        ioe.printStackTrace();
                    }
                    return flag;
                }
    
            //创建多层目录文件,如果有ftp服务器已存在该文件,则不创建,如果无,则创建
            public static boolean CreateDirecroty(String remote,FTPClient initFtpClient) throws IOException {
                boolean success = true;
                String directory = remote + "/";
                // 如果远程目录不存在,则递归创建远程服务器目录
                if (!directory.equalsIgnoreCase("/") && !changeWorkingDirectory(new String(directory),initFtpClient)) {
                    int start = 0;
                    int end = 0;
                    if (directory.startsWith("/")) {
                        start = 1;
                    } else {
                        start = 0;
                    }
                    end = directory.indexOf("/", start);
                    String path = "";
                    String paths = "";
                    while (true) {
                        String subDirectory = new String(remote.substring(start, end).getBytes("GBK"), "iso-8859-1");
                        path = path + "/" + subDirectory;
                        if (!existFile(path,initFtpClient)) {
                            if (makeDirectory(subDirectory,initFtpClient)) {
                                changeWorkingDirectory(subDirectory,initFtpClient);
                            } else {
                                System.out.println("创建目录[" + subDirectory + "]失败");
                                changeWorkingDirectory(subDirectory,initFtpClient);
                            }
                        } else {
                            changeWorkingDirectory(subDirectory,initFtpClient);
                        }
    
                        paths = paths + "/" + subDirectory;
                        start = end + 1;
                        end = directory.indexOf("/", start);
                        // 检查所有目录是否创建完毕
                        if (end <= start) {
                            break;
                        }
                    }
                }
                return success;
            }
    
          //判断ftp服务器文件是否存在    
            public static boolean existFile(String path,FTPClient initFtpClient) throws IOException {
                    
            	boolean flag = false;
                    FTPFile[] ftpFileArr = initFtpClient.listFiles(path);
                    if (ftpFileArr.length > 0) {
                        flag = true;
                    }
                    return flag;
                }
            //创建目录
            public static boolean makeDirectory(String dir,FTPClient initFtpClient) {
                boolean flag = true;
                try {
                    flag = initFtpClient.makeDirectory(dir);
                    if (flag) {
                        System.out.println("创建文件夹" + dir + " 成功!");
    
                    } else {
                        System.out.println("创建文件夹" + dir + " 失败!");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return flag;
            }
            
            /** * 下载文件 * 
            * @param pathname FTP服务器文件目录 * 
            * @param filename 文件名称 * 
            * @param localpath 下载后的文件路径 * 
            * @return */
            public static boolean downloadFile(String pathname, String filename, String localpath){ 
                boolean flag = false; 
                FTPClient initFtpClient = initFtpClient();
                OutputStream os=null;
                try { 
                    System.out.println("开始下载文件");
                    initFtpClient();
                    //切换FTP目录 
                    initFtpClient.changeWorkingDirectory(pathname); 
                    FTPFile[] ftpFiles = initFtpClient.listFiles(); 
                    for(FTPFile file : ftpFiles){ 
                        if(filename.equalsIgnoreCase(file.getName())){ 
                            File localFile = new File(localpath + "/" + file.getName()); 
                            os = new FileOutputStream(localFile); 
                            initFtpClient.retrieveFile(file.getName(), os); 
                            os.close(); 
                        } 
                    } 
                    initFtpClient.logout(); 
                    flag = true; 
                    System.out.println("下载文件成功");
                } catch (Exception e) { 
                    System.out.println("下载文件失败");
                    e.printStackTrace(); 
                } finally{ 
                    if(initFtpClient.isConnected()){ 
                        try{
                        	initFtpClient.disconnect();
                        }catch(IOException e){
                            e.printStackTrace();
                        }
                    } 
                    if(null != os){
                        try {
                            os.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        } 
                    } 
                } 
                return flag; 
            }
            //读取文件
            public static boolean readFile(String pathname, String filename, String localpath){
            	FTPClient initFtpClient = initFtpClient();
            	  boolean flag = false; 
                  OutputStream os=null;
                  try { 
                      System.out.println("开始下载文件");
                      initFtpClient();
                      //切换FTP目录 
                      initFtpClient.changeWorkingDirectory(pathname); 
                      FTPFile[] ftpFiles = initFtpClient.listFiles(); 
                      
                      for(FTPFile file : ftpFiles){ 
                          if(filename.equalsIgnoreCase(file.getName())){ 
                        	  
                            File localFile = new File("/usr/dlconfig/temp/" + file.getName()); 
                            os = new FileOutputStream(localFile); 
                            initFtpClient.retrieveFile(file.getName(), os); 
                            os.close(); 
                          } 
                      } 
                      initFtpClient.logout(); 
                      flag = true; 
                      System.out.println("下载文件成功");
                  } catch (Exception e) { 
                      System.out.println("下载文件失败");
                      e.printStackTrace(); 
                  } finally{ 
                      if(initFtpClient.isConnected()){ 
                          try{
                        	  initFtpClient.disconnect();
                          }catch(IOException e){
                              e.printStackTrace();
                          }
                      } 
                      if(null != os){
                          try {
                              os.close();
                          } catch (IOException e) {
                              e.printStackTrace();
                          } 
                      } 
                  } 
                  return flag; 
            }
            
            /** * 删除文件 * 
            * @param pathname FTP服务器保存目录 * 
            * @param filename 要删除的文件名称 * 
            * @return */ 
            public static boolean deleteFile(String pathname, String filename){ 
                boolean flag = false; 
                FTPClient initFtpClient = initFtpClient();
                try { 
                    System.out.println("开始删除文件");
                    initFtpClient();
                    //切换FTP目录 
                    initFtpClient.changeWorkingDirectory(pathname); 
                    initFtpClient.dele(filename); 
                    initFtpClient.logout();
                    flag = true; 
                    System.out.println("删除文件成功");
                } catch (Exception e) { 
                    System.out.println("删除文件失败");
                    e.printStackTrace(); 
                } finally {
                    if(initFtpClient.isConnected()){ 
                        try{
                        	initFtpClient.disconnect();
                        }catch(IOException e){
                            e.printStackTrace();
                        }
                    } 
                }
                return flag; 
            }
            
    //        public static void main(String[] args) {
                FtpUtils ftp =new FtpUtils(); 
    //            //ftp.uploadFile("ftpFile/data", "123.docx", "E://123.docx");
    //            //ftp.downloadFile("ftpFile/data", "123.docx", "F://");
                ftp.deleteFile("ftpFile/data", "123.docx");
                System.out.println("ok");
    //        	FTPUtils f = new FTPUtils();
    //        	f.initFtpClient();
    //        	
    //        	
    //        }
    }
    
    展开全文
  • JAVA并发编程的艺术》之 Java并发编程实战 文章目录《JAVA并发编程的艺术》之 Java并发编程实战生产者和消费者模式生产者消费者模式实战多生产者和多消费者场景线程池与生产消费者模式线上问题定位性能测试查看...

    《JAVA并发编程的艺术》之 Java并发编程实战


    当你在进行并发编程时,看着程序的执行速度在自己的优化下运行得越来越快,你会觉得越来越有成就感,这就是并发编程的魅力。但与此同时,并发编程产生的问题和风险可能也 会随之而来。本章先介绍几个并发编程的实战案例,然后再介绍如何排查并发编程造成的问题。

    生产者和消费者模式

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度。

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。

    **什么是生产者和消费者模式 **

    生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

    生产者消费者模式实战

    我和同事一起利用业余时间开发的Yuna工具中使用了生产者和消费者模式。我先介绍下Yuna[1]工具,在阿里巴巴很多同事都喜欢通过邮件分享技术文章,因为通过邮件分享很方便,大家在网上看到好的技术文章,执行复制→粘贴→发送就完成了一次分享,但是我发现技术文章不能沉淀下来,新来的同事看不到以前分享的技术文章,大家也很难找到以前分享过的技术文章。为了解决这个问题,我们开发了一个Yuna工具。

    我们申请了一个专门用来收集分享邮件的邮箱,比如share@alibaba.com,大家将分享的文 章发送到这个邮箱,让大家每次都抄送到这个邮箱肯定很麻烦,所以我们的做法是将这个邮箱地址放在部门邮件列表里,所以分享的同事只需要和以前一样向整个部门分享文章就行。

    Yuna工具通过读取邮件服务器里该邮箱的邮件,把所有分享的邮件下载下来,包括邮件的附件、图片和邮件回复。因为我们可能会从这个邮箱里下载到一些非分享的文章,所以我们要求分享的邮件标题必须带有一个关键字,比如“内贸技术分享”。下载完邮件之后,通过confluence的Web Service接口,把文章插入到confluence里,这样新同事就可以在confluence里看以前分享过的文章了,并且Yuna工具还可以自动把文章进行分类和归档。

    为了快速上线该功能,当时我们花了3天业余时间快速开发了Yuna 1.0版本。在1.0版本中并没有使用生产者消费模式,而是使用单线程来处理,因为当时只需要处理我们一个部门的 邮件,所以单线程明显够用,整个过程是串行执行的。在一个线程里,程序先抽取全部的邮件,转化为文章对象,然后添加全部的文章,最后删除抽取过的邮件。代码如下。

    public void extract() { 
        logger.debug("开始" + getExtractorName() + "。。"); // 抽取邮件 
        List<Article> articles = extractEmail(); // 添加文章 
        for (Article article : articles) { 
            addArticleOrComment(article);
        }
        // 清空邮件 
        cleanEmail(); 
        logger.debug("完成" + getExtractorName() + "。。");
    }
    

    Yuna工具在推广后,越来越多的部门使用这个工具,处理的时间越来越慢,Yuna是每隔5分钟进行一次抽取的,而当邮件多的时候一次处理可能就花了几分钟,于是我在Yuna 2.0版本里使用了生产者消费者模式来处理邮件,首先生产者线程按一定的规则去邮件系统里抽取邮件,然后存放在阻塞队列里,消费者从阻塞队列里取出文章后插入到conflunce里。代码如下。

    public class QuickEmailToWikiExtractor extends AbstractExtractor { 
        private ThreadPoolExecutor threadsPool; 
        
    private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue; 
        public QuickEmailToWikiExtractor() { 
            emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>(); 
            int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; 
            threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit. SECONDS,new LinkedBlockingQueue<Runnable>(2000)); 
        }
        public void extract() { 
            logger.debug("开始" + getExtractorName() + "。。");
            long start = System.currentTimeMillis(); // 抽取所有邮件放到队列里 
            new ExtractEmailTask().start(); 
            // 把队列里的文章插入到
            Wiki insertToWiki(); 
            long end = System.currentTimeMillis(); 
            double cost = (end - start) / 1000; 
            logger.debug("完成" + getExtractorName() + ",花费时间:" + cost + "秒"); 
        }
        /*** 把队列里的文章插入到Wiki */ 
        private void insertToWiki() { 
            // 登录Wiki,每间隔一段时间需要登录一次 
            confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD); 
            while (true) {
                // 2秒内取不到就退出 
                ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS); 
                if (email == null) { 
                    break; 
                }
                threadsPool.submit(new insertToWikiTask(email));
            } 
        }
        protected List<Article> extractEmail() { 
            List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails(); 
            if (allEmails == null) {
                return null;
            }
            for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {
                emailQueue.offer(exchangeEmailShallowDTO);
                
    }
            return null;
        }
        /*** 抽取邮件任务 ** @author tengfei.fangtf */ 
        public class ExtractEmailTask extends Thread {
            public void run() { 
                extractEmail();
            }
        }
    }
    

    代码的执行逻辑是,生产者启动一个线程把所有邮件全部抽取到队列中,消费者启动CPU*2个线程数处理邮件,从之前的单线程处理邮件变成了现在的多线程处理,并且抽取邮件的线程不需要等处理邮件的线程处理完再抽取新邮件,所以使用了生产者和消费者模式后,邮件的整体处理速度比以前要快了几倍。

    [1] Yuna取名自我非常喜欢的一款RPG游戏《最终幻想》中女主角的名字。

    多生产者和多消费者场景

    在多核时代,多线程并发处理速度比单线程处理速度更快,所以可以使用多个线程来生产数据,同样可以使用多个消费线程来消费数据。而更复杂的情况是,消费者消费的数据,有可能需要继续处理,于是消费者处理完数据之后,它又要作为生产者把数据放在新的队列里,交给其他消费者继续处理,如图11-1所示。

    在这里插入图片描述

    我们在一个长连接服务器中使用了这种模式,生产者1负责将所有客户端发送的消息存放在阻塞队列1里,消费者1从队列里读消息,然后通过消息ID进行散列得到N个队列中的一个,然后根据编号将消息存放在到不同的队列里,每个阻塞队列会分配一个线程来消费阻塞队列里的数据。如果消费者2无法消费消息,就将消息再抛回到阻塞队列1中,交给其他消费者处理。

    以下是消息总队列的代码。

    /*** 总消息队列管理 
    *
    * @author tengfei.fangtf 
    */
    public class MsgQueueManager implements IMsgQueue{
        private static final Logger LOGGER = LoggerFactory.getLogger(MsgQueueManager.class);
        /*** 消息总队列 */
        public final BlockingQueue<Message> messageQueue; 
        private MsgQueueManager() {
            messageQueue = new LinkedTransferQueue<Message>(); 
        }
        public void put(Message msg) { 
            try {
                messageQueue.put(msg);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); 
            } 
        }
        public Message take() {
            try {
                return messageQueue.take(); 
            } catch (InterruptedException e) { 
                Thread.currentThread().interrupt();
            }
            return null; 
        }
    }
    

    启动一个消息分发线程。在这个线程里子队列自动去总队列里获取消息。

    /** 
    * 分发消息,负责把消息从大队列塞到小队列里 
    *
    * @author tengfei.fangtf 
    */ 
    static class DispatchMessageTask implements Runnable { 
        @Override 
        public void run() { 
            BlockingQueue<Message> subQueue; 
            for (;;) {
    // 如果没有数据,则阻塞在这里 
                Message msg = MsgQueueFactory.getMessageQueue().take(); 
                // 如果为空,则表示没有Session机器连接上来, 
                // 需要等待,直到有Session机器连接上来 
                while ((subQueue = getInstance().getSubQueue()) == null) { 
                    try {
                        Thread.sleep(1000); 
                    } catch (InterruptedException e) { 
                        Thread.currentThread().interrupt(); 
                    } 
                }
                // 把消息放到小队列里 
                try {
                    subQueue.put(msg);
                } catch (InterruptedException e) { 
                    Thread.currentThread().interrupt();
                }
            } 
        } 
    }
    

    使用散列(hash)算法获取一个子队列,代码如下。

    /** 
    * 均衡获取一个子队列。 
    *
    * @return 
    */ 
    public BlockingQueue<Message> getSubQueue() { 
        int errorCount = 0; 
        for (;;) { 
            if (subMsgQueues.isEmpty()) {
                return null;
            }
            int index = (int) (System.nanoTime() % subMsgQueues.size()); 
            try {
                return subMsgQueues.get(index);
            } catch (Exception e) { 
                // 出现错误表示,在获取队列大小之后,队列进行了一次删除操作 
                LOGGER.error("获取子队列出现错误", e); 
                if ((++errorCount) < 3) { 
                    continue; 
                } 
            } 
        } 
    }
    

    使用的时候,只需要往总队列里发消息。

    // 往消息队列里添加一条消息
    IMsgQueue messageQueue = MsgQueueFactory.getMessageQueue();
    Packet msg = Packet.createPacket(Packet64FrameType. TYPE_DATA, "{}".getBytes(), (short) 1); 
    messageQueue.put(msg);
    

    线程池与生产消费者模式

    Java中的线程池类其实就是一种生产者和消费者模式的实现方式,但是我觉得其实现方式更加高明。生产者把任务丢给线程池,线程池创建线程并处理任务,如果将要运行的任务数大于线程池的基本线程数就把任务扔到阻塞队列里,这种做法比只使用一个阻塞队列来实现生产者和消费者模式显然要高明很多,因为消费者能够处理直接就处理掉了,这样速度更快,而生产者先存,消费者再取这种方式显然慢一些。

    我们的系统也可以使用线程池来实现多生产者和消费者模式。例如,创建N个不同规模的Java线程池来处理不同性质的任务,比如线程池1将数据读到内存之后,交给线程池2里的线程继续处理压缩数据。线程池1主要处理IO密集型任务,线程池2主要处理CPU密集型任务。

    本节讲解了生产者和消费者模式,并给出了实例。读者可以在平时的工作中思考一下哪 些场景可以使用生产者消费者模式,我相信这种场景应该非常多,特别是需要处理任务时间比较长的场景,比如上传附件并处理,用户把文件上传到系统后,系统把文件丢到队列里,然后立刻返回告诉用户上传成功,最后消费者再去队列里取出文件处理。再如,调用一个远程接口查询数据,如果远程服务接口查询时需要几十秒的时间,那么它可以提供一个申请查询的接口,这个接口把要申请查询任务放数据库中,然后该接口立刻返回。然后服务器端用线程轮询并获取申请任务进行处理,处理完之后发消息给调用方,让调用方再来调用另外一个接口取数据。

    线上问题定位

    有时候,有很多问题只有在线上或者预发环境才能发现,而线上又不能调试代码,所以线上问题定位就只能看日志、系统状态和dump线程,本节只是简单地介绍一些常用的工具,以帮助大家定位线上问题。

    1. 在Linux命令行下使用TOP命令查看每个进程的情况,显示如下。
    top - 22:27:25 up 463 days, 12:46, 1 user, load average: 11.80, 12.19, 11.79 
    Tasks: 113 total, 5 running, 108 sleeping, 0 stopped, 0 zombie 
    Cpu(s): 62.0%us, 2.8%sy, 0.0%ni, 34.3%id, 0.0%wa, 0.0%hi, 0.7%si, 0.2%st 
    Mem: 7680000k total, 7665504k used, 14496k free, 97268k buffers 
    Swap: 2096472k total, 14904k used, 2081568k free, 3033060k cached 
    PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 
    31177 admin 18 0 5351m 4.0g 49m S 301.4 54.0 935:02.08 java 
    31738 admin 15 0 36432 12m 1052 S 8.7 0.2 11:21.05 nginx-proxy
    

    我们的程序是Java应用,所以只需要关注COMMAND是Java的性能数据,COMMAND表示启动当前进程的命令,在Java进程这一行里可以看到CPU利用率是300%,不用担心,这个是当前机器所有核加在一起的CPU利用率。

    1. 再使用top的交互命令数字1查看每个CPU的性能数据。
    top - 22:24:50 up 463 days, 12:43, 1 user, load average: 12.55, 12.27, 11.73 Tasks: 110 total, 3 running, 107 sleeping, 0 stopped, 0 zombie 
    Cpu0 : 72.4%us, 3.6%sy, 0.0%ni, 22.7%id, 0.0%wa, 0.0%hi, 0.7%si, 0.7%st 
    Cpu1 : 58.7%us, 4.3%sy, 0.0%ni, 34.3%id, 0.0%wa, 0.0%hi, 2.3%si, 0.3%st 
    Cpu2 : 53.3%us, 2.6%sy, 0.0%ni, 34.1%id, 0.0%wa, 0.0%hi, 9.6%si, 0.3%st 
    Cpu3 : 52.7%us, 2.7%sy, 0.0%ni, 25.2%id, 0.0%wa, 0.0%hi, 19.5%si, 0.0%st 
    Cpu4 : 59.5%us, 2.7%sy, 0.0%ni, 31.2%id, 0.0%wa, 0.0%hi, 6.6%si, 0.0%st 
    Mem: 7680000k total, 7663152k used, 16848k free, 98068k buffers 
    Swap: 2096472k total, 14904k used, 2081568k free, 3032636k cached
    

    命令行显示了CPU4,说明这是一个5核的虚拟机,平均每个CPU利用率在60%以上。如果这里显示CPU利用率100%,则很有可能程序里写了一个死循环。这些参数的含义,可以对比表11-1来查看。

    表11-1 CPU参数含义
    在这里插入图片描述

    1. 使用top的交互命令H查看每个线程的性能信息。
    PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 3
    1558 admin 15 0 5351m 4.0g 49m S 12.2 54.0 10:08.31 java 
    31561 admin 15 0 5351m 4.0g 49m R 12.2 54.0 9:45.43 java 
    31626 admin 15 0 5351m 4.0g 49m S 11.9 54.0 13:50.21 java 
    31559 admin 15 0 5351m 4.0g 49m S 10.9 54.0 5:34.67 java 
    31612 admin 15 0 5351m 4.0g 49m S 10.6 54.0 8:42.77 java 
    31555 admin 15 0 5351m 4.0g 49m S 10.3 54.0 13:00.55 java 
    31630 admin 15 0 5351m 4.0g 49m R 10.3 54.0 4:00.75 java 
    31646 admin 15 0 5351m 4.0g 49m S 10.3 54.0 3:19.92 java 
    31653 admin 15 0 5351m 4.0g 49m S 10.3 54.0 8:52.90 java 
    31607 admin 15 0 5351m 4.0g 49m S 9.9 54.0 14:37.82 java
    

    在这里可能会出现3种情况。

    1. 第一种情况,某个线程CPU利用率一直100%,则说明是这个线程有可能有死循环,那么请记住这个PID。
    2. 第二种情况,某个线程一直在TOP 10的位置,这说明这个线程可能有性能问题。
    3. 第三种情况,CPU利用率高的几个线程在不停变化,说明并不是由某一个线程导致CPU偏高。

    如果是第一种情况,也有可能是GC造成,可以用jstat命令看一下GC情况,看看是不是因为持久代或年老代满了,产生Full GC,导致CPU利用率持续飙高,命令和回显如下。

    sudo /opt/java/bin/jstat -gcutil 31177 1000 5 
    S0 S1 E O P YGC YGCT FGC FGCT GCT
    0.00 1.27 61.30 55.57 59.98 16040 143.775 30 77.692 221.467 
    0.00 1.27 95.77 55.57 59.98 16040 143.775 30 77.692 221.467 
    1.37 0.00 33.21 55.57 59.98 16041 143.781 30 77.692 221.474 
    1.37 0.00 74.96 55.57 59.98 16041 143.781 30 77.692 221.474 
    0.00 1.59 22.14 55.57 59.98 16042 143.789 30 77.692 221.481
    

    还可以把线程dump下来,看看究竟是哪个线程、执行什么代码造成的CPU利用率高。执行以下命令,把线程dump到文件dump17里。执行如下命令。

    sudo -u admin /opt/taobao/java/bin/jstack 31177 > /home/tengfei.fangtf/dump17
    

    dump出来内容的类似下面内容。

    "http-0.0.0.0-7001-97" daemon prio=10 tid=0x000000004f6a8000 nid=0x555e in Object. wait() [0x0000000052423000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on (a org.apache.tomcat.util.net.AprEndpoint$Worker) at java.lang.Object.wait(Object.java:485) at org.apache.tomcat.util.net.AprEndpoint$Worker.await(AprEndpoint.java:1464) - locked (a org.apache.tomcat.util.net.AprEndpoint$Worker) at org.apache.tomcat.util.net.AprEndpoint$Worker.run(AprEndpoint.java:1489) at java.lang.Thread.run(Thread.java:662)
    

    dump出来的线程ID(nid)是十六进制的,而我们用TOP命令看到的线程ID是十进制的,所以要用printf命令转换一下进制。然后用十六进制的ID去dump里找到对应的线程。

    printf "%x\n" 31558
    

    输出:7b46。

    性能测试

    因为要支持某个业务,有同事向我们提出需求,希望系统的某个接口能够支持2万的QPS,因为我们的应用部署在多台机器上,要支持两万的QPS,我们必须先要知道该接口在单机上能支持多少QPS,如果单机能支持1千QPS,我们需要20台机器才能支持2万的QPS。需要注意的是,要支持的2万的QPS必须是峰值,而不能是平均值,比如一天当中有23个小时QPS不足1万,只有一个小时的QPS达到了2万,我们的系统也要支持2万的QPS。

    我们先进行性能测试。我们使用公司同事开发的性能测试工具进行测试,该工具的原理 是,用户写一个Java程序向服务器端发起请求,这个工具会启动一个线程池来调度这些任务,可以配置同时启动多少个线程、发起请求次数和任务间隔时长。将这个程序部署在多台机器上执行,统计出QPS和响应时长。我们在10台机器上部署了这个测试程序,每台机器启动了100个线程进行测试,压测时长为半小时。注意不能压测线上机器,我们压测的是开发服务器。测试开始后,首先登录到服务器里查看当前有多少台机器在压测服务器,因为程序的端口是12200,所以使用netstat命令查询有多少台机器连接到这个端口上。命令如下。

    $ netstat -nat | grep 12200 –c 
    10
    

    通过这个命令可以知道已经有10台机器在压测服务器。

    QPS达到了1400,程序开始报错获取不到数据库连接,因为我们的数据库端口是3306,用netstat命令查看已经使用了多少个数据库连接。命令如下。

    $ netstat -nat | grep 3306 –c 
    12
    

    增加数据库连接到20,QPS没上去,但是响应时长从平均1000毫秒下降到700毫秒,使用TOP命令观察CPU利用率,发现已经90%多了,于是升级CPU,将2核升级成4核,和线上的机器保持一致。再进行压测,CPU利用率下去了达到了75%,QPS上升到了1800。执行一段时间后响应时长稳定在200毫秒。

    增加应用服务器里线程池的核心线程数和最大线程数到1024,通过ps命令查看下线程数是否增长了,执行的命令如下。

    $ ps -eLf | grep java -c 
    1520
    

    再次压测,QPS并没有明显的增长,单机QPS稳定在1800左右,响应时长稳定在200毫秒。

    我在性能测试之前先优化了程序的SQL语句。使用了如下命令统计执行最慢的SQL,左边 的是执行时长,单位是毫秒,右边的是执行的语句,可以看到系统执行最慢的SQL是queryNews和queryNewIds,优化到几十毫秒。

    $ grep Y /home/admin/logs/xxx/monitor/dal-rw-monitor.log |awk -F',' '{print $7$5}' | sort -nr|head -20 
    1811 queryNews 
    1764 queryNews 
    1740 queryNews 
    1697 queryNews 
    679 queryNewIds
    

    性能测试中使用的其他命令

    查看网络流量。

    $ cat /proc/net/dev 
    Inter-| Receive | Transmit 
    face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed 
    lo:242953548208 231437133 0 0 0 0 0 0 242953548208 231437133 0 0 0 0 0 0 
    eth0:153060432504 446365779 0 0 0 0 0 0 108596061848 479947142 0 0 0 0 0 0 
    bond0:153060432504 446365779 0 0 0 0 0 0 108596061848 479947142 0 0 0 0 0 0
    

    查看系统平均负载。

    $ cat /proc/loadavg 
    0.00 0.04 0.85 1/1266 22459
    

    查看系统内存情况。

    $ cat /proc/meminfo
    MemTotal: 4106756 kB 
    MemFree: 71196 kB 
    Buffers: 12832 kB 
    Cached: 2603332 kB 
    SwapCached: 4016 kB 
    Active: 2303768 kB 
    Inactive: 1507324 kB 
    Active(anon): 996100 kB 
    部分省略
    

    查看CPU的利用率。

    cat /proc/stat 
    cpu 167301886 6156 331902067 17552830039 8645275 13082 1044952 33931469 0 
    cpu0 45406479 1992 75489851 4410199442 7321828 12872 688837 5115394 0 
    cpu1 39821071 1247 132648851 4319596686 379255 67 132447 11365141 0 
    cpu2 40912727 1705 57947971 4418978718 389539 78 110994 8342835 0 
    cpu3 41161608 1211 65815393 4404055191 554651 63 112672 9108097 0
    

    异步任务池

    Java中的线程池设计得非常巧妙,可以高效并发执行多个任务,但是在某些场景下需要对线程池进行扩展才能更好地服务于系统。例如,如果一个任务仍进线程池之后,运行线程池的程序重启了,那么线程池里的任务就会丢失。另外,线程池只能处理本机的任务,在集群环境下不能有效地调度所有机器的任务。所以,需要结合线程池开发一个异步任务处理池。图11-2为异步任务池设计图。

    在这里插入图片描述

    任务池的主要处理流程是,每台机器会启动一个任务池,每个任务池里有多个线程池,当某台机器将一个任务交给任务池后,任务池会先将这个任务保存到数据中,然后某台机器上的任务池会从数据库中获取待执行的任务,再执行这个任务。

    每个任务有几种状态,分别是创建(NEW)、执行中(EXECUTING)、RETRY(重试)、挂起 (SUSPEND)、中止(TEMINER)和执行完成(FINISH)。

    • 创建:提交给任务池之后的状态。
    • 执行中:任务池从数据库中拿到任务执行时的状态。
    • 重试:当执行任务时出现错误,程序显式地告诉任务池这个任务需要重试,并设置下一次执行时间。
    • 挂起:当一个任务的执行依赖于其他任务完成时,可以将这个任务挂起,当收到消息后,再开始执行。
    • 中止:任务执行失败,让任务池停止执行这个任务,并设置错误消息告诉调用端。
    • 执行完成:任务执行结束。

    任务池的任务隔离。异步任务有很多种类型,比如抓取网页任务、同步数据任务等,不同类型的任务优先级不一样,但是系统资源是有限的,如果低优先级的任务非常多,高优先级的任务就可能得不到执行,所以必须对任务进行隔离执行。使用不同的线程池处理不同的任务,或者不同的线程池处理不同优先级的任务,如果任务类型非常少,建议用任务类型来隔离,如果任务类型非常多,比如几十个,建议采用优先级的方式来隔离。任务池的重试策略。根据不同的任务类型设置不同的重试策略,有的任务对实时性要求高,那么每次的重试间隔就会非常短,如果对实时性要求不高,可以采用默认的重试策略,重试间隔随着次数的增加,时间不断增长,比如间隔几秒、几分钟到几小时。每个任务类型可以设置执行该任务类型线程池的最小和最大线程数、最大重试次数。

    使用任务池的注意事项。任务必须无状态:任务不能在执行任务的机器中保存数据,比如某个任务是处理上传的文件,任务的属性里有文件的上传路径,如果文件上传到机器1,机器2获取到了任务则会处理失败,所以上传的文件必须存在其他的集群里,比如OSS或SFTP。异步任务的属性。包括任务名称、下次执行时间、已执行次数、任务类型、任务优先级和执行时的报错信息(用于快速定位问题)

    展开全文
  • Java文件锁以及并发读写中的应用

    千次阅读 2017-06-12 09:37:04
    引言​ 在项目中,如果遇到需要并发读写文件的问题,那么对文件上锁分开访问是十分有必要的。因此这篇博文主要介绍文件锁的相关知识。有误之处,希望指出。什么是文件锁​ 文件锁就如同编程概念中其他锁的意义一样。...
  • Java并发编程最佳实例详解系列

    万次阅读 2018-04-26 20:22:51
    Java并发编程最佳实例详解系列: Java并发编程(一)线程定义、状态和属性 Java并发编程(一)线程定义、状态和属性 线程是指程序在执行过程中,能够执行程序代码的一个执行单元。在java语言中,线程有四种状态...
  • Java根据指定的路径删除文件

    千次阅读 2020-07-12 11:41:15
    一、根据指定的路径删除文件 package com.hrtxn.ringtone.project.system.video.util; /** * 作者: yushuangyu * 时间: 2020年07月11日 17:34 * 描述: 删除视频文件 */ public class DelFile { /** * 删除...
  • Java知识体系最强总结(2021版)

    万次阅读 多人点赞 2019-12-18 10:09:56
    本人从事Java开发已多年,平时有记录问题解决方案和总结知识点的习惯,整理了一些有关Java的知识体系,这不是最终版,会不定期的更新。也算是记录自己在从事编程工作的成长足迹,通过博客可以促进博主与阅读者的共同...
  • 将其解码,李四用张三的公钥加密信息,并发送给李四,张三用自己的私钥解密从李四处收到的信息…… Java利用DES私钥对称加密代码实例 同上 java聊天室 2个目标文件,简单。 java模拟掷骰子2个 1个目标文件,输出演示...
  • 背景最近想更深入了解下Java多线程相关的知识,对Java多线程有一个全面的认识,所以想找一本Java多线程相关的书籍来阅读,最后我选择了《Java并发编程实战》这本个人认为还算相当不错,至于为什么选择它,下面有介绍...
  • Java 程序中怎么保证多线程的运行安全?并行和并发有什么区别?什么是多线程多线程的好处多线程的劣势:线程和进程区别什么是上下文切换?守护线程和用户线程有什么区别呢?如何在 Windows 和 Linux 上查找哪个...
  • 思维导图融入大量java并发编程知识的同时,覆盖大量并发类面试题; 根据并发编程知识点,融入大量实例分析,知识点清晰易理解; 含有多种并发模式分析,为后续实践过程打好基础。 面试题列举(小部分) 说说...
  • Java并发机制及锁的实现原理

    万次阅读 多人点赞 2016-07-18 20:04:21
    Java并发机制及锁实现原理
  • Java基础知识面试题(2020最新版)

    万次阅读 多人点赞 2020-02-19 12:11:27
    文章目录Java概述何为编程什么是Javajdk1.5之后的三大版本JVM、JRE和JDK的关系什么是跨平台性?原理是什么Java语言有哪些特点什么是字节码?采用字节码的最大好处是什么什么是Java程序的主类?应用程序和小程序的...
  • 并发编程篇:java并发面试题

    万次阅读 多人点赞 2018-02-28 21:43:18
    3、java thread状态 NEW 状态是指线程刚创建, 尚未启动 RUNNABLE Java线程中将就绪(ready)和运行中(running)两种状态笼统的成为“运行”。线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。...
  • Java获取zip文件

    千次阅读 2019-11-23 11:04:14
    项目中经常遇到需要导出压缩文件的情况,...文件删除策略,无用文件及时删除,避免磁盘空间的浪费。 保存到内存中,返回时从内存中读取二进制内容 以流的形式传输数据,避免临时文件的生成,完成后关闭流,z...
  • java并发编程:进程和线程java并发编程:进程和线程进程的由来线程的由来java并发的问题其他参考 java并发编程:进程和线程 进程的由来 为什么有进程:现在计算机的功能已经十分丰富了,可以一边看视屏,一边浏览...
  • java ssh传输文件

    千次阅读 2018-11-21 16:43:17
    * @param isDel 是否删除文件 * */ public void upload(String directory, String uploadFile, boolean isDel) throws Exception { try { //执行列表展示ls 命令 channel.ls(directory); //执行盘符切换cd ...
  • Java面试题大全(2020版)

    万次阅读 多人点赞 2019-11-26 11:59:06
    发现网上很多Java面试题都没有答案,所以花了很长时间搜集整理出来了这套Java面试题大全,希望对大家有帮助哈~ 本套Java面试题大全,全的不能再全,哈哈~ 一、Java 基础 1. JDK 和 JRE 有什么区别? JDK:Java ...
  • 《实战Java并发程序设计》中有很多代码范例,适合初学者通过实践入门并发编程 鉴于没有配套代码,笔者做了笔记并整理源代码,近期还在不断更新中,欢迎交流和 Star 笔记在 notes,源代码在 src 如有需要...
  • java多线程并发机制

    千次阅读 2017-04-27 10:59:33
    进程拥有各种资源和状态信息,包括打开的文件、子进程和信号处理。 线程:表示程序的执行流程,是CPU调度执行的基本单位;线程有自己的程序计数器、寄存器、堆栈和帧。同一进程中的线程共用相同的地址空间,同时共享...
  • java并发面试题

    千次阅读 2018-11-02 16:09:31
    整个并发框架中对线程的挂起操作被封装在 LockSupport类中,LockSupport类中有各种版本pack方法,但最终都调用了Unsafe.park()方法。 cas  Java中Unsafe类详解 12、线程池 线程池的作用:  在程序启动的时候...
  • Java并发秒杀API(三)之Web层

    千次阅读 2017-10-05 18:03:54
    Java并发秒杀API(三)之Web层1. 设计前的分析 Web层内容相关前端交互设计 Restful规范 SpringMVC Bootstrap + jQuery 前端页面流程 详情页流程逻辑 为什么要获取标准系统时间(服务器的时间) 用户可能处在不同...
  • java实现大文件分片上传

    千次阅读 2018-10-11 18:10:39
    java实现大文件分片上传 在项目中用到了大文件上传功能,最初从网上参考了一些代码来实现,但是最终的上传效果不是很好,速度比较慢。 之前的上传思路是: 前端利用webUploader分片大文件 后端接收各个分片后的小...
  • 一,实现文件上传(图片,文件,音视频) 普通的上传 @Override public Object uploadImage(MultipartFile file) { //获取文件名 String originalFilename = file.getOriginalFilename(); //获取文件后缀名 ...
  • java将多个文件(Excel)压缩成zip,并发送给页面 方案 将多个文件夹中的文件,放入一个临时文件夹,并将临时文件夹进行压缩打包,发回给页面。 转移文件类 import java.io.File; import java.io.FileInputStream; ...
  • java多线程与并发编程详解

    千次阅读 2018-03-17 11:35:28
    进程拥有各种资源和状态信息,包括打开的文件、子进程和信号处理。线程:表示程序的执行流程,是CPU调度执行的基本单位;线程有自己的程序计数器、寄存器、堆栈和帧。同一进程中的线程共用相同的地址空间,同时共享...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 133,833
精华内容 53,533
关键字:

java并发删除文件

java 订阅