精华内容
下载资源
问答
  • 主要介绍了使用java的HttpClient实现多线程并发的相关资料,需要的朋友可以参考下
  • Java 结合多线程实现简单 HTTP 服务器

    千次阅读 2018-09-09 17:10:12
    能自己实现一个简单的 HTTP 服务器,还有些小激动的。 实现给客户端返回 HTML 文本和 二进制文件(图片), 啥也不说了,直接上码吧: import java.io.*; import java.net.ServerSocket; import java.net....

    能自己实现一个简单的 HTTP 服务器,还有些小激动的。
    本简单 HTTP Server 实现给客户端返回 HTML 文本和二进制文件(图片),
    啥也不说了,直接上码吧:

    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    class HttpRequestHandler implements Runnable {
        private Socket socket;
        private String basePath;
    
        public HttpRequestHandler(Socket socket, String basePath) {
            this.socket = socket;
            this.basePath = basePath;
        }
    
        @Override
        public void run() {
            String threadName = Thread.currentThread().getName();
            BufferedReader reader = null;
    
            // 向客户端输出内容
            PrintStream printStream = null;
            InputStream in = null;
            BufferedReader br = null;
    
            try {
                // 在 socket 上获得输入流,使用默认字符集的 InputStreamReader
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    
                // 读取 HTTP 报文的请求行
                String header = reader.readLine();
                System.out.println("thread name: " + threadName + ", HTTP 报文请求行: " + header);
    
                // 读取HTTP请求报文的起始行,并根据空格分割开,存入数组,得到请求的资源
                // 由相对路径计算出绝对路径
                String filePath = this.basePath + header.split(" ")[1];
    
                printStream = new PrintStream(socket.getOutputStream(), true);
    
                // 如果请求资源的后缀为jpg 或者 ico,则读取资源并输出
                if (filePath.endsWith("jpg") || filePath.endsWith("ico")) {
                    System.out.println("thread name: " + threadName + ", filePath: " + filePath);
    
                    in = new FileInputStream(filePath);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    
                    byte[] buff = new byte[1024];
                    int i;
                    while ((i = in.read(buff)) != -1) {
                        baos.write(buff, 0, i);
                    }
                    byte[] array = baos.toByteArray();
    
                    // 设置响应报文
                    printStream.println("HTTP/1.1 200 OK");
                    printStream.println("Server: MyServer");
                    printStream.println("Content-Type: image/jpeg");
                    printStream.println("Content-Length: " + array.length);
                    // 根据 HTTP 协议, 空行将结束头信息
                    printStream.println();
                    printStream.write(array);
                } else if (filePath.endsWith("html")) {
                    // 客户端请求的是 html 文件
                    br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
                    printStream.println("HTTP/1.1 200 OK");
                    printStream.println("Server: MyServer");
                    printStream.println("Content-Type: text/html; charset=UTF-8");
                    printStream.println();
    
                    String line;
                    while ((line = br.readLine()) != null) {
                        printStream.println(line);
                    }
                }
                printStream.flush();
            } catch (Exception ex) {
                ex.printStackTrace();
                printStream.println("HTTP/1.1 500");
                printStream.println();
                printStream.flush();
            } finally {
                this.close(br, in, reader, socket, printStream);
            }
    
        }
    
        private void close(Closeable... closeables) {
            if (closeables != null) {
                for (Closeable closeable : closeables) {
                    try {
                        if (closeable != null) {
                            closeable.close();
                        }
                    } catch (Exception e) {
    
                    }
                }
            }
    
        }
    }
    
    public class MySimpleHttpServer {
    
        // 处理 HttpRequest 的线程池
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
    
        /**
         * SimpleHttpServer 的根路径
         */
        private String basePath;
    
        /**
         * 服务监听端口
         */
        private int port = 8080;
    
        public void start() throws Exception {
            System.out.println("start at " + port + ", basePath: " + this.basePath);
    
            // serverSocket 接受客户端的请求
            ServerSocket serverSocket = new ServerSocket(port);
            Socket socket;
            /**
             * 服务端 serverSocket 接受客户端请求,并创建客户端 socket 对象
             */
            while ((socket = serverSocket.accept()) != null) {
                /**
                 * 接收一个客户端Socket,生成一个HttpRequestHandler,放入线程池队列
                 */
                executor.execute(new HttpRequestHandler(socket, this.basePath));
            }
            serverSocket.close();
        }
    
        public void setBasePath(String basePath) {
            if (basePath != null && new File(basePath).exists() &&
                    new File(basePath).isDirectory()) {
                this.basePath = basePath;
            }
        }
    
        public void setPort(int port) {
            if (port > 0) {
                this.port = port;
            }
        }
    }
    

    运行server:

    public class ServerStart {
        public static void main(String[] args) throws Exception {
            MySimpleHttpServer httpServer = new MySimpleHttpServer();
            httpServer.setBasePath("/data/simpleHttpServer");
            httpServer.start();
        }
    }
    

    写一个 HTML 页面测试一下吧:

    // index.html 
    <!DOCTYPE html>
    <html>
    <head>
    <meta charset="utf-8">
    <link rel=”icon” href=”favicon.ico” mce_href=”favicon.ico” type=”image/x-icon”>
    <title>simple server test</title>
    </head>
    
    <body>
    文档内容......
    <h1>第一张图片</h1>
    <img src="1.jpg" align="middle" />
    <h1>第二张图片</h1>
    <img src="2.jpg" align="middle" />
    <h1>第三张图片</h1>
    <img src="3.jpg" align="middle" />
    
    </body>
    
    </html>
    

    index.html1.jpg等资源放到basePath下,然后在浏览器的地址栏里输入:http://127.0.0.1:8080/index.html,然后就看到这个奇妙的网页了:
    这里写图片描述

    浏览器并发的请求,因为有了多线程,这个简单的 HTTP Server 的吞吐量也还可以的,响应时间也很小。

    参考: 用java实现的一个简单web服务器程序
    java实现简单的http服务器
    HTTP头部详解及使用Java套接字处理HTTP请求

    展开全文
  • 使用Java编写的一个简易多线程HTTP服务器 源代码写于2018年5月份,计算机网络课程的实验作业 代码bug较多,欢迎各位dalao指教
  • java模拟多线程http请求

    千次阅读 2017-07-20 09:51:56
    import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Map.Entry;import java.util.concurrent.ExecutorService;import java.util.conc
    package test;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.apache.http.HttpEntity;
    import org.apache.http.NameValuePair;
    import org.apache.http.client.config.RequestConfig;
    import org.apache.http.client.entity.UrlEncodedFormEntity;
    import org.apache.http.client.methods.CloseableHttpResponse;
    import org.apache.http.client.methods.HttpPost;
    import org.apache.http.impl.client.CloseableHttpClient;
    import org.apache.http.impl.client.HttpClients;
    import org.apache.http.message.BasicNameValuePair;
    import org.apache.http.util.EntityUtils;
    
    /**java 模拟测试多线程测试http请求
     * @author wdh
     *
     */
    
    public class ServiceThreadTest implements Runnable{
    		
    	private String url;
    	
    	private Map<String,Object> paramStr;
    	
    	public ServiceThreadTest(String url, Map<String,Object> paramStr) {
    		super();
    		this.url = url;
    		this.paramStr = paramStr;
    	}
    
    	
    	public String getUrl() {
    		return url;
    	}
    
    
    	public void setUrl(String url) {
    		this.url = url;
    	}
    
    
    	public  Map<String,Object> getParamStr() {
    		return paramStr;
    	}
    
    
    	public void setParamStr(Map<String,Object> paramStr) {
    		this.paramStr = paramStr;
    	}
    
    
    	@Override
    	public void run() {
                   // http请求实现方式
    		CloseableHttpClient httpClient = HttpClients.createDefault();
    		HttpPost post = new HttpPost(url);
    		RequestConfig config = RequestConfig.custom().setConnectionRequestTimeout(10000).setConnectTimeout(10000)
    				.setSocketTimeout(10000).build();
    		CloseableHttpResponse response = null;
    		try {
    			List<NameValuePair> params = setHttpNameValues(paramStr);  
    		    HttpEntity httpentity=new UrlEncodedFormEntity(params,"utf-8");
    			post.setEntity(httpentity);
    			post.setConfig(config);
    			response = httpClient.execute(post);	
    			HttpEntity entity = response.getEntity();
    			String content = EntityUtils.toString(entity);
    			System.out.println("content:" + content);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}			
    	}	
    	
    	private List<NameValuePair> setHttpNameValues(Map<String,Object> paramMap) {
    		List<NameValuePair> params = new ArrayList<NameValuePair>(); 
    		for (Entry<String, Object> entry:paramMap.entrySet()){
    			  params.add(new BasicNameValuePair(entry.getKey(),entry.getValue().toString()));  
    		}
    	  
    		return params;
    	}
    
    
    	public static void main(String[] args) {
                 //运用java工具类中线程池
    		ExecutorService  pool = Executors.newCachedThreadPool();
    		for (int i =0;i<2;i++) { //开启俩个线程
    			String url = "xxxx";
    			Map<String,Object> paramStr = getHttpParamStr();
    			pool.execute(new ServiceThreadTest(url,paramStr));
    		}
    	}
    
    	public  static Map<String,Object> getHttpParamStr() {
    		Map<String, Object> param = new HashMap<String, Object>();
    		param.put("apiversion", 1);
    		param.put("appversion", "3.6.2");
    		return param;
    	}
    }
    

    展开全文
  • 一个使用多线程发送消息到kafka集群的java程序 本人之前参与的某个大数据项目,由于测试的需要,所以帮助写了一个数据发送工具用于发送大量数据到kakfa进行业务性能压测。程序虽然简单,但我想对于初学kafka的同学,...
    本人之前参与的某个大数据项目,由于测试的需要,所以帮助写了一个数据发送工具用于发送大量数据到kakfa进行业务性能压测。程序虽然简单,但我想对于初学kafka的同学,还是有一定的借鉴意义的,遂打算贡献出来。
    本来是打算把源码打成压缩包上传的,无奈公司在安全方面做的很严格,将文件上传到csdn, github等网站会被公司电脑检测到并被禁止,无奈以只能粘贴到博文中(目前已经去除与公司业务相关的任何代码,只剩一个通用的外壳,并能保证可运行),虽然黏贴到博文中的代码比较多,但本人保证黏贴的比较全,是可以根据博文完全还原这个项目的。
    虽然本人会张贴出所有代码,但文件结构已经打乱,所以截图一张原始项目的文件结构图,整个项目19个类,所以也不是很多,还原项目还是很容易的, 完整项目的截图如下所示:

    项目文件结构图

    本程序是一个完整的java程序, 采用了生产者-消费者设计模式, 生产者不停的生成消息,并存入一个线程安全的队列,消费者不断的从队列中取出消息,并发送到kafka集群。
    1. 需要发送的消息总数可通过配置文件参数totalCount指定
    2. 可以通过配置文件修改参数 initialCount,只有生产的消息数量达到initialCount指定的值,程序才开始启动消费者,如果指定的数字是0,则消息生成线程,和消息发送线程同时启动运行
    3. bootstrap.servers 用于配置kafka集群机器的地址,地址格式是:ip + 端口号, 不能使用主机名 + 端口号, 如果有多个节点,则 使用英文逗号分隔
    4. zookeeper.connector 用于配置kafka集群依赖的zookeeper集群地址,格式同样是ip + 端口号, 不能使用主机名 + 端口号, 如果有多个节点,则 使用英文逗号分隔
    5. request.required.acks=1, 该参数配置为1, 表示kafka客户端发出消息且kafka集群反馈成功才开始发送下一条消息, 配置为0表示发送后不等待反馈结果直接发送下一条消息,配置为0并发速度更高,但是不能保证消息一定发送成功
    6. produce_thread_count 表示消息生成线程的数量
    7. send_thread_count 表示消息发送线程的数量
    8. topic 表示消息发往的目标topic
    9. print_progress 设置为true表示是否打印发送进度
    10. print_progress_strategy, 表示打印消息进度的策略, 生成一条数据或消费一条数据就汇报一次, ONE_BY_ONE, 每分n条数据才报告一次:INTERVAL, 报告的进度是百分比:PERCENTAGE
    11. print_progress_interval 参数表示进度报告间隔, 配置为50, 表示每发送50条数据,打印一次进度,配置为0,则没法送一条消息,进报告一次进度
    12. msg_generate_class 可以指定生成消息的实现类, 本人公开的代码只有一种实现类,该类与公司业务无关,也即com.core.service.SimpleMsgGenerator, 该消息生成类生成的消息是一句简单的话,“hello kafka”, 有需要的朋友可以根据自身业务自定义实现类

    配置文件的名称如上图,叫做send-cfg.properties, 完整的配置项罗列如下:

    
    #要发送的消息总数
    totalCount=20000
    
    #生成的消息累积到这个数目后才开始发送数据
    initialCount=10000
    
    # ip + 端口号, 不能使用主机名 + 端口号
    bootstrap.servers=10.16.70.211:9092
    
    # ip + 端口号, 不能使用主机名 + 端口号
    zookeeper.connector=10.16.70.211:2181
    
    # 1:发送后报告成功才发送下一条,  0:发送成功后不报告下一条,直接发送,配置为0并发速度更高,但是不能保证发送成功
    request.required.acks=1
    
    #生产数据的线程数目
    produce_thread_count=6
    
    #发送线程数目
    send_thread_count=2
    
    #topic
    topic=HIK_SMART_METADATA_TOPIC
    
    #是否打印发送进度
    print_progress=true
    
    #生成一条数据或消费一条数据就汇报一次:ONE_BY_ONE,   每分n条数据才报告一次:INTERVAL, 报告的进度是百分比:PERCENTAGE
    print_progress_strategy=ONE_BY_ONE
    
    #每发送50条数据,打印一次进度
    print_progress_interval=100
    
    msg_generate_class=com.core.service.SimpleMsgGenerator
    
    

    run.bat是程序打成jar包后, 在windows上的的运行脚本,内容如下:

    echo hello kfaka
    
    java -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8999,suspend=n  -Xms1024M -Xmx1024M -XX:PermSize=512M -XX:MaxNewSize=256M -XX:MaxPermSize=512M  -jar bodySender-1.0.0-SNAPSHOT-jar-with-dependencies.jar send-cfg.properties
    
    pause
    
    

    run.sh是程序打成jar包后, 在linux上的运行脚本,内容如下:

    #! /bin/bash
      
    runPath=$(cd `dirname $0`;pwd)
    cd runPath
    
    echo "the script is located in directory :" $runPath
    echo "config file is send-cfg.properties" 
    
    
    if  [ $#>=0 ]; then
    
        echo "java路径是:"; $1
            $1/bin/java -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8999,suspend=n  -Xms1024M -Xmx1024M -XX:PermSize=512M -XX:MaxNewSize=256M -XX:MaxPermSize=512M  -jar  bodySender-1.0.0-SNAPSHOT-jar-with-dependencies.jar send-cfg.properties
    
    else
    
        echo "没有指定java路径,默认java环境变量已经配置";
            java -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8999,suspend=n  -Xms1024M -Xmx1024M -XX:PermSize=512M -XX:MaxNewSize=256M -XX:MaxPermSize=512M  -jar bodySender-1.0.0-SNAPSHOT-jar-with-dependencies.jar send-cfg.properties
    
    fi
    
    
    

    readme.txt 和 打包方式.md两个文件分别提供了打包方式和运行说明


    打包方式(供开发参考)

    1. maven打包,得到 bodySender-1.0.0-SNAPSHOT-jar-with-dependencies.jar
    2. 新建文件夹sendTool
    3. 将:bodySender-1.0.0-SNAPSHOT-jar-with-dependencies.jar, run.bat, run.sh, readme.txt, send-cfg.properties拷贝到该文件夹下
    4. 将sendTool发送给测试人员,测试参照readme.txt中运行方式部分的说明运行程序

    运行方式(供测试参考)

    在windows运行:

    进入sendTool文件夹,双击run.bat即可运行,如果要修改配置,则修改sendTool目录下的send-cfg.properties文件

    在linux上进行
    1. 将sendTool文件夹拷贝到linux,如果要修改配置,则修改sendTool目录下的send-cfg.properties文件

    2. 在命令行中输入vim run.sh 编辑run.sh

    3. 在vim编辑器的命令模式下输入 set ff=unix 并回车,修改改文件的格式

    4. 输入:wq退出vim

    5. 执行命令 chmod 777 run.sh赋予文件可执行权限

    6. 假如linux机器已经配置了java环境变量,则运行./run.sh命令即可启动程序,如果没有配置java环境变量,则运行 ./run.sh /usr/lib/cluster001/jdk1.8.0_162

    7. 注意:/usr/lib/cluster001/jdk1.8.0_162 是java的安装路径,需要根据具体情况修改,该路径参数最后不能添加"/", 比如 ./run.sh /usr/lib/cluster001/jdk1.8.0_162/ 这样运行程序会报错 注意:/usr/lib/cluster001/jdk1.8.0_162 是java的安装路径,需要根据具体情况修改,该路径参数最后不能添加"/", 比如 ./run.sh /usr/lib/cluster001/jdk1.8.0_162/ 这样运行程序会报错

    根据上述解释,程序功能还是比较清晰了, 想要运行程序,如果项目已经还原到IDE中,则直接运行APP类的main函数即可,如果已经打成jar包,可以参照readme.txt描述的方式运行。

    完整程序的源码如下(19个类, 每个类所在的包可参加上图):

    package com.core;
    
    import com.core.constant.Config;
    import com.core.service.Container;
    import com.core.service.ProduceThread;
    import com.core.service.SendThread;
    import com.core.util.JsonUtils;
    import com.core.util.PropertiesFile;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Scanner;
    import java.util.concurrent.CountDownLatch;
    
    public class APP {
    
        private static final String CONFIG_FILE = "send-cfg.properties";
    
        public static void main(String... args) {
    
    
            System.out.println("请输入任意非空内容并回车启动程序");
            String line = null;
            do {
                Scanner scan = new Scanner(System.in);
                line = scan.nextLine();
            } while (StringUtils.isBlank(line));
    
    
            run(args);
        }
    
    
    
    
    
    
        public static void run(String... args) {
    
            long start = System.currentTimeMillis();
    
            PropertiesFile prop = null;
            if (args == null || args.length == 0) {
                prop = new PropertiesFile(CONFIG_FILE, false);
            } else {
                prop = new PropertiesFile(args[0], true);
            }
    
            Config config = new Config(prop);
    
            System.out.println("用户配置的参数是:\r\n" + JsonUtils.object2Json(config, true));
    
            check(config);
    
            Container<ProducerRecord<byte[], byte[]>> container = new Container<ProducerRecord<byte[], byte[]>>(config.getiInitialCount() + 10000);
    
            System.out.println("开始生成消息");
            Integer produceThreadNum = config.getProduceThreadNum();
            List<Thread> producerList = new ArrayList<Thread>();
    
            CountDownLatch produceSwitch = new CountDownLatch(config.getProduceThreadNum());
            for(int i = 0; i < produceThreadNum; i++) {
                Thread t = new Thread(new ProduceThread(config, container, produceSwitch));
                t.setDaemon(true);
                t.setName("send-thred-" + i);
                t.start();
                producerList.add(t);
            }
    
            try {
                produceSwitch.await();
                System.out.println();
                System.out.println("消息达到初始量" + config.getiInitialCount() + ", 开始发送消息");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            //
    
            Integer sendThreadNum = config.getSendThreadNum();
            CountDownLatch consumeStartSwitch = new CountDownLatch(1);
            CountDownLatch consumeEndSwitch = new CountDownLatch(sendThreadNum);
            List<Thread> senderList = new ArrayList<Thread>();
            for(int i = 0; i < sendThreadNum; i++) {
                Thread t = new Thread(new SendThread(config, container, consumeStartSwitch, consumeEndSwitch));
                t.setName("send-thread-" + i);
                t.setDaemon(true);
                t.start();
                senderList.add(t);
            }
    
            consumeStartSwitch.countDown();
    
            try {
                consumeEndSwitch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println();
    
            System.out.println("一共发送消息" + config.getTotalCount() + "条");
    
            int error = (container.getConsumeCount() - config.getTotalCount());
            System.out.println("发送的消息总量误差是[" + (error >= 0 ? "+" : "-") + error + "]条");
    
            System.out.println("耗时" + (System.currentTimeMillis() - start) + "毫秒");
        }
    
    
    
    
    
        public static void check(Config config) {
            if (config == null) {
                return;
            }
    
            if (config.getTotalCount() <= 0 || config.getiInitialCount() < 0) {
                throw new IllegalArgumentException("require: totalCount > 0 && initialCount >= 0");
            }
    
            if (config.getiInitialCount() > config.getTotalCount()) {
                throw new IllegalArgumentException("require: initialCount <= totalCount");
            }
    
    
            if (config.getProduceThreadNum() <= 0 || config.getSendThreadNum() <= 0) {
                throw new IllegalArgumentException("require: produce_thread_count > 0 && send_thread_count > 0");
            }
    
    
        }
    
    
    
    
    
    }
    package com.core.constant;
    
    import com.core.util.PropertiesFile;
    
    public class Config {
    
        private PropertiesFile props;
        private int totalCount;
        private String bootstrap_servers;
        private String zookeeper_connector;
        private String request_required_acks;
        private int produce_thread_count;
        private int send_thread_count;
        private int initialCount;
        private String topic;
        private boolean print_progress;
        private String print_progress_strategy;
        private int print_progress_interval;
        private String msg_generate_class;
    
    
        public Config(PropertiesFile props) {
            this.props = props;
            this.totalCount = props.getIntProperty("totalCount", 0);
            this.bootstrap_servers = props.getStringProperty("bootstrap.servers");
            this.zookeeper_connector = props.getStringProperty("zookeeper.connector");
            this.request_required_acks = props.getStringProperty("request.required.acks", "1");
            this.produce_thread_count = props.getIntProperty("produce_thread_count", 0);
            this.send_thread_count = props.getIntProperty("send_thread_count", 0);
            this.initialCount = props.getIntProperty("initialCount", 0);
            this.topic = props.getStringProperty("topic");
            this.print_progress = props.getBooleanProperty("print_progress", true);
            this.print_progress_strategy = props.getStringProperty("print_progress_strategy");
            this.print_progress_interval = props.getIntProperty("print_progress_interval", 1000);
            this.msg_generate_class = props.getStringProperty("msg_generate_class");
    
    
    
        }
    
        public Integer getTotalCount() {
            return totalCount;
        }
    
        public String getBootstrapServers() {
            return bootstrap_servers;
        }
    
        public String getZookeeperConnector() {
            return zookeeper_connector;
        }
    
        public String getRequestRequiredAcks() {
            return request_required_acks;
        }
    
        public Integer getProduceThreadNum() {
            return produce_thread_count;
        }
    
        public Integer getSendThreadNum() {
            return send_thread_count;
        }
    
        public Integer getiInitialCount() {
            return initialCount;
        }
    
    
        public String getTopic() {
            return topic;
        }
    
        public String getMsgGenerateClass() {
            return msg_generate_class;
        }
    
    
    
        public boolean printProgress() {
            return print_progress;
        }
    
        public String getProgressStrategy() {
            return print_progress_strategy;
        }
    
        public int getProgressInterval() {
            return print_progress_interval;
        }
    
    }
    package com.core.constant;
    
    public class Constant {
    
        public final  static String PRODUCER_SIGN = ">";
    
        public final  static String CONSUMER_SIGN = "<";
    
    
    }
    package com.core.exception;
    
    /**
     * @description 所有运行时异常的基类
     * @author lihong10 2018/4/16 13:44:00
     */
    public class BaseRuntimeException extends RuntimeException {
    
    
        /**
         * 序列化ID
         */
        private static final long serialVersionUID = 7830353921973771800L;
    
        /*
         * 错误码
         */
        protected Integer errorCode;
    
        /**
         * 创建一个新的实例CommonException
         * @param errorCode
         * @param msg
         */
        public BaseRuntimeException(int errorCode, String msg) {
            super(msg);
            this.errorCode = errorCode;
        }
    
        /**
         * 创建一个新的实例CommonException
         * @param errorCode
         * @param msg
         * @param cause
         */
        public BaseRuntimeException(int errorCode, String msg, Throwable cause) {
            super(msg, cause);
            this.errorCode = errorCode;
        }
    
        public BaseRuntimeException(String msg, Throwable cause) {
            super(msg, cause);
        }
    
    
        public BaseRuntimeException(String msg) {
            super(msg);
        }
    
        public Integer getErrorCode() {
            return errorCode;
        }
    
        public void setErrorCode(Integer errorCode) {
            this.errorCode = errorCode;
        }
    }
    package com.core.exception;
    
    /**
     * @description 不合法的参数异常,比如参数为空,参数格式不对均可以抛出该异常
     * @author lihong10 2018/4/16 11:34:00
     */
    public class IllegalParameterException extends BaseRuntimeException {
    
    
        public IllegalParameterException(int errorCode, String msg) {
            super(errorCode, msg);
        }
    
        public IllegalParameterException(int errorCode, String msg, Throwable cause) {
            super(errorCode, msg, cause);
        }
    
        public IllegalParameterException(String msg, Throwable cause) {
            super(msg, cause);
        }
    
        public IllegalParameterException(String msg) {
            super(msg);
        }
    
    
    }
    package com.core.progress;
    
    import com.core.constant.Config;
    
    public class IntervalReporter extends Reporter {
    
    
        @Override
        public void report(int progress, Config config, boolean isProducer) {
    
            if (config.getProgressInterval() > 0) {
                if (progress % config.getProgressInterval() == 0) {
                    print(getSign(isProducer), false);
                }
    
                return;
            }
    
            //间隔 <= 0,则没有间隔,退化为 ONE_BY_ONE 策略
            print(getSign(isProducer), false);
    
        }
    }
    package com.core.progress;
    
    import com.core.constant.Config;
    
    public class OneByOneReporter extends Reporter {
    
    
        @Override
        public void report(int progress, Config config, boolean isProducer) {
    
            print(getSign(isProducer), false);
    
        }
    }
    package com.core.progress;
    
    import com.core.constant.Config;
    
    public class PercentageReporter extends Reporter {
    
        @Override
        public void report(int progress, Config config, boolean isProducer) {
    
    
            if (config.getProgressInterval() > 0) {
                if (progress % config.getProgressInterval() == 0) {
    
                    print(getSign(isProducer, progress, config.getTotalCount()), false);
                }
    
                return;
            }
    
    
            //间隔 <= 0,则没有间隔,退化为 ONE_BY_ONE 策略
            print(getSign(isProducer, progress, config.getTotalCount()) , false);
    
    
        }
    
    
        private String getSign(boolean isProducer, int progress, int total) {
            double percent = ((progress * 1.0) / total) * 100;
            if (isProducer) {
                return  "(" + getSign(isProducer) + " %" + percent + ")  ";
            }
    
            return "(%" + percent + " " + getSign(isProducer) + ")  ";
    
        }
    
    
    
    }
    package com.core.progress;
    
    import com.core.constant.Config;
    import org.apache.commons.lang3.StringUtils;
    
    public class ProgressReportClient {
    
        private final static OneByOneReporter oneByeOne = new OneByOneReporter();
        private final static IntervalReporter interval = new IntervalReporter();
        private final static PercentageReporter percentage = new PercentageReporter();
    
    
        public static void report(int progress, Config config, boolean isProducer) {
            //不打印进度,直接返回
            if (!config.printProgress()) {
                return;
            }
    
            ProgressReportType type = null;
            String strategy = config.getProgressStrategy();
            if (StringUtils.isBlank(strategy)) {
                type = ProgressReportType.ONE_BY_ONE;
            } else {
                type = ProgressReportType.valueOf(strategy);
            }
    
            switch (type) {
                case INTERVAL:
                    interval.report(progress, config, isProducer);
                    break;
                case ONE_BY_ONE:
                     oneByeOne.report(progress, config, isProducer);
                    break;
                case PERCENTAGE:
                    percentage.report(progress, config, isProducer);
                    break;
                    default:
                        oneByeOne.report(progress, config, isProducer);
            }
        }
    
    
    
    
    
    
    
    
    }
    package com.core.progress;
    
    public enum ProgressReportType {
    
        //一条消息报告一次进度
        ONE_BY_ONE,
    
        //间隔一定消息后才报告一次
        INTERVAL,
    
        //按照百分比汇报
        PERCENTAGE;
    
    
    
    }
    package com.core.progress;
    
    import com.core.constant.Config;
    import com.core.constant.Constant;
    
    public abstract class Reporter {
    
    
    
        public static void print(Object obj, boolean newLine) {
            if (newLine) {
                System.out.println(obj);
            }else {
                System.out.print(obj);
            }
        }
    
        public static String getSign(boolean isProducer) {
            if (isProducer) {
                return Constant.PRODUCER_SIGN;
            } else {
                return Constant.CONSUMER_SIGN;
            }
        }
    
    
        public abstract void report(int progress, Config config, boolean isProducer);
    
    }
    package com.core.service;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Container<E> {
    
        private BlockingQueue<E> pool;
        //生产线程生成的消息数目
        private  volatile AtomicInteger produceCount = new AtomicInteger(0);
        //成功消费的消息数据
        private volatile AtomicInteger consumeCount = new AtomicInteger(0);
        private int num;
    
        public Container(int num) {
            this.num = num;
            this.pool = new ArrayBlockingQueue<E>(num);
        }
    
        public int addConsumeCount(int num) {
            return  consumeCount.addAndGet(num);
        }
    
    
    
        public E get() {
            try {
                E msg = pool.take();
    //            consumeCount.addAndGet(1); //发送成功回调才累加,所以此处注释
                return  msg;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return null;
        }
    
        public void offer(E msg) {
            try {
                pool.put(msg);
                produceCount.addAndGet(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    
        public int getProduceCount() {
            return produceCount.get();
        }
    
        public int getConsumeCount() {
            return consumeCount.get();
        }
    }
    package com.core.service;
    
    import com.core.constant.Config;
    
    public interface MsgGenerator {
    
        public String getMsg(Config config);
    
    }
    package com.core.service;
    
    import com.core.constant.Config;
    import com.core.exception.IllegalParameterException;
    import com.core.progress.ProgressReportClient;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.io.UnsupportedEncodingException;
    import java.util.concurrent.CountDownLatch;
    
    public class ProduceThread implements Runnable {
    
        private Config config;
        private Container<ProducerRecord<byte[], byte[]>> container;
        private volatile CountDownLatch produceSwitch;
        private MsgGenerator msgGenerator;
    
        public ProduceThread(Config config, Container<ProducerRecord<byte[], byte[]>> container, CountDownLatch produceSwitch) {
            this.config = config;
            this.container = container;
            this.produceSwitch = produceSwitch;
    
            try {
                msgGenerator = (MsgGenerator) Class.forName(config.getMsgGenerateClass()).newInstance();
            } catch (Exception e) {
                e.printStackTrace();
                throw new IllegalParameterException("无法加载消息生成的实现类");
            }
        }
    
    
        @Override
        public void run() {
            for (; ; ) {
    
                //因为并发的原因,使用=号会导致countDown()调用遗漏
                if (container.getProduceCount() >= config.getiInitialCount()) {
                    if (produceSwitch != null) {
                        produceSwitch.countDown();
                        //防止在区间[initialCount, totalCount]内出现多次countDown()调用,导致发送线程被过早唤醒
                        produceSwitch = null;
                    }
                }
    
                if (container.getProduceCount() > config.getTotalCount()) {
                    break;
                }
    
                try {
                    // new SimpleMsgGenerator() , 这里每次都创建一个实例,可以优化
                    String  msg = msgGenerator.getMsg(config);
                    ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(config.getTopic(), msg.getBytes("UTF-8"));
                    container.offer(record);
                    ProgressReportClient.report(container.getProduceCount(), config, true);
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
    
            }
        }
    }
    package com.core.service;
    
    import com.core.constant.Config;
    import com.core.progress.ProgressReportClient;
    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    
    public class SendThread implements Runnable {
    
        private Config config;
        private Container<ProducerRecord<byte[], byte[]>> container;
        private static Properties kafkaProperties = new Properties();
        private static KafkaProducer<byte[], byte[]> kafkaProducer = null;
        private  CountDownLatch consumeStartSwitch;
        private CountDownLatch consumeEndSwitch;
    
    
        public SendThread(Config config) {
            this.config = config;
        }
    
        public SendThread(Config config, Container container, CountDownLatch consumeStartSwitch, CountDownLatch consumeEndSwitch ) {
            this(config);
            this.container = container;
            this.consumeStartSwitch = consumeStartSwitch;
            this. consumeEndSwitch = consumeEndSwitch;
    
            kafkaProperties.put("bootstrap.servers", config.getBootstrapServers());
            kafkaProperties.put("zookeeper.connector", config.getZookeeperConnector());
            kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
            kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
            kafkaProperties.put("serializer.class", "kafka.serializer.StringEncoder");
            kafkaProperties.put("request.required.acks", config.getRequestRequiredAcks());
            this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
    
    
    
        }
    
    
    
    
    
        @Override
        public void run() {
    
            try {
                consumeStartSwitch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
    
    
            for (;;) {
    
                if (container.getConsumeCount() >= config.getTotalCount()) {
                    consumeEndSwitch.countDown();
                    break;
                }
    
                ProducerRecord<byte[], byte[]> record = container.get();
    
                if (record != null) {
                    kafkaProducer.send(record, new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                e.printStackTrace();
                                System.out.println("该条消息发送失败");
                            }
    
                            ProgressReportClient.report(container.getProduceCount(), config, false);
    
                        }
                    });
                    container.addConsumeCount(1);
                }
    
            }
    
        }
    }
    package com.core.service;
    
    import com.core.constant.Config;
    
    public class SimpleMsgGenerator implements MsgGenerator {
    
        @Override
        public String getMsg(Config config) {
            return "hello kafka";
        }
    }
    package com.core.service;
    
    import org.apache.kafka.clients.producer.*;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.UnsupportedEncodingException;
    import java.util.Properties;
    
    public class ToKafkaSender {
    
        private static Properties kafkaProperties = new Properties();
        private static KafkaProducer<byte[], byte[]> kafkaProducer = null;
    
    
        @Before
        public void init() {
    
            String ip = "10.16.70.211";
            /**
             * 用于自举(bootstrapping ),producer只是用它来获得元数据(topic, partition, replicas)
             * 实际用户发送消息的socket会根据返回的元数据来确定
             */
            // kafkaProperties.put("metadata.broker.list", "vsp13:9092");
            kafkaProperties.put("bootstrap.servers", ip.trim() + ":9092");
            kafkaProperties.put("zookeeper.connector", ip.trim() + ":2181");
            kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
            kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
    
            kafkaProperties.put("serializer.class", "kafka.serializer.StringEncoder");
            /**
             * producer发送消息后是否等待broker的ACK,默认是0
             * 1 表示等待ACK,保证消息的可靠性
             */
            kafkaProperties.put("request.required.acks", "1");
            this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
        }
    
    
        @Test
        public void testSend() {
            long now = System.currentTimeMillis();
            for(int i = 0;  i < 20; i++) {
                send();
            }
            System.out.println(System.currentTimeMillis() - now);
        }
    
        private void send() {
            String msg = getMsg();
            ProducerRecord<byte[], byte[]> record = null;
    
    
            try {
                //MY_TOPIC 是kafka集群中的某个topic
                record = getProducerRecord("MY_TOPIC",  msg.getBytes("UTF-8"));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
    
            kafkaProducer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                        System.out.println("发送失败");
                    }
                }
            });
    
        }
    
    
        @After
        public void clean() {
            if (kafkaProducer != null) {
                try {
                    kafkaProducer.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        private String getMsg() {
            return "hello kafka !";
        }
    
    
    
        private ProducerRecord<byte[], byte[]> getProducerRecord(String topic, byte[] msg) {
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, msg);
            return record;
        }
    
    
    }
    package com.core.util;
    
    import com.fasterxml.jackson.databind.DeserializationFeature;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.List;
    import java.util.Map;
    import java.util.TimeZone;
    
    public class JsonUtils {
        private static final Logger LOG = LoggerFactory.getLogger(JsonUtils.class);
        private static ObjectMapper objectMapper = new ObjectMapper();
    
        public static String toJsonWithFormat(Object obj, String dateFormat) {
            if (obj == null) {
                return null;
            }
            String result = null;
            try {
                ObjectMapper objectMapper = new ObjectMapper();
                //备份老的日期格式
                //DateFormat oldFormat = objectMapper.getDateFormat();
                if (StringUtils.isNotEmpty(dateFormat)) {
                    objectMapper.setDateFormat(new SimpleDateFormat(dateFormat));
                    //不设置时区,会与系统当前时间相差8小时
                    TimeZone timeZone = TimeZone.getTimeZone("GMT+8");
                    objectMapper.setTimeZone(timeZone);
                }
                result = objectMapper.writeValueAsString(obj);
                //恢复日期格式
                //objectMapper.setDateFormat(oldFormat);
            } catch (IOException e) {
            }
            return result;
        }
    
        public static String object2Json(Object obj) {
            if (obj == null) {
                return null;
            }
            String result = null;
            try {
                result = objectMapper.writeValueAsString(obj);
            } catch (IOException e) {
                e.printStackTrace();
                LOG.error("对象转JSON字符串异常", e);
            }
            return result;
        }
    
        public static String object2Json(Object obj, boolean indented) {
    
            if(obj == null) {
                return null;
            }
            String result = null;
            try {
                ObjectMapper objectMapper = new ObjectMapper();
                if(indented) {
                    result = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj);
                } else {
                    result = objectMapper.writeValueAsString(obj);
                }
            } catch (IOException e) {
                LOG.error("error when object to json", e);
            }
            return result;
        }
    
        public static Map<?, ?> jsonToMap(String json) {
            return json2Object(json, Map.class);
        }
    
        public static <T> T json2Object(String json, Class<T> cls) {
            T result = null;
            try {
                objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
                result = objectMapper.readValue(json, cls);
            } catch (IOException e) {
                e.printStackTrace();
                LOG.error("JSON字符串转对象异常", e);
            }
    
            return result;
        }
    
        public static <T> T conveterObject(Object srcObject, Class<T> destObjectType) {
            String jsonContent = object2Json(srcObject);
            return json2Object(jsonContent, destObjectType);
        }
    
        public static <T> List<T> fromJsonList(String json, Class<T> clazz) throws IOException {
            return objectMapper.readValue(json, objectMapper.getTypeFactory().constructCollectionType(List.class, clazz));
        }
    
    }
    package com.core.util;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.*;
    import java.util.Properties;
    
    
    public class PropertiesFile {
    
        private final static Logger LOG = LoggerFactory.getLogger(PropertiesFile.class);
        private Properties p;
        private String fileName;
    
        /**
         * @param fileName 要加载的properties文件名, 必要的话可加上路径
         * @author lihong10 2015-4-14 上午11:19:41
         * @since v1.0
         */
        public PropertiesFile(String fileName, boolean outside) {
            this.p = new Properties();
            this.fileName = fileName;
    
            InputStream inputStream = null;
            try {
                if (outside) {
                    inputStream = getInputStreamByFile(fileName);
                } else {
                    inputStream = getInputStream(Thread.currentThread().getContextClassLoader(), fileName);
                    if (inputStream == null) {
                        inputStream = getInputStream(PropertiesFile.class.getClassLoader(), fileName);
                    }
                }
                p.load(inputStream);
            } catch (Exception ex) {
    //            LOG.error("找不到配置文件: " + fileName, ex);
                throw new RuntimeException("找不到配置文件: " + fileName, ex);
            } finally {
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                        LOG.error("关闭文件流失败", e);
                    }
                }
    
            }
        }
    
        public static InputStream getInputStreamByFile(String path) {
            File file = new File(path);
            if (!file.isFile() || !file.exists()) {
                throw new IllegalArgumentException("文件" + path + "不存在");
            }
    
            InputStream in = null;
            try {
                in = new FileInputStream(file);
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
    
            return in;
        }
    
        public static InputStream getInputStream(ClassLoader classLoader, String fileName) {
            if (classLoader == null || StringUtils.isBlank(fileName)) {
                LOG.info("classLoader is null or fileName is null");
                return null;
            }
    
            fileName = fileName.trim();
    
            InputStream stream = null;
            try {
                stream = classLoader.getResourceAsStream(fileName);
            } catch (Exception e) {
                LOG.error("read " + fileName + " error", e);
            }
    
            if (stream == null && !fileName.startsWith("/")) {
                try {
                    stream = classLoader.getResourceAsStream("/" + fileName);
                } catch (Exception e) {
                    LOG.error("read /" + fileName + " error", e);
                }
            }
            return stream;
        }
    
        /**
         * @param propertyName
         * @return property value
         * @author lihong10 2015-4-14 上午11:22:23
         * @since v1.0
         */
        public String getStringProperty(String propertyName) {
            return p.getProperty(propertyName);
        }
    
        public String getStringProperty(String propertyName, String dft) {
            String value = p.getProperty(propertyName);
            if (StringUtils.isBlank(value)) {
                return dft;
            }
            return value;
        }
    
        public Integer getIntProperty(String propertyName, Integer dft) {
            String raw = p.getProperty(propertyName);
            return getInt(raw, dft);
        }
    
        public Long getLongProperty(String propertyName, Long dft) {
            String raw = p.getProperty(propertyName);
            return getLong(raw, dft);
        }
    
        public Boolean getBooleanProperty(String propertyName, Boolean dft) {
            String raw = p.getProperty(propertyName);
            return getBoolean(raw, dft);
        }
    
        /**
         * @param propertyName
         * @param propertyValue
         * @author lihong10 2015-6-15 下午4:16:54
         * @since v1.0
         */
        public void setProperty(String propertyName, String propertyValue) {
            p.setProperty(propertyName, propertyValue);
        }
    
        /**
         * @return the Properties
         */
        public Properties getProps() {
            return p;
        }
    
        /**
         * @return the fileName
         */
        public String getFileName() {
            return fileName;
        }
    
        private Integer getInt(String str, Integer dft) {
            try {
                return Integer.parseInt(str.trim());
            } catch (Exception e) {
                LOG.error("error when parsing " + str + " to int, use default value: " + dft);
                return dft;
            }
        }
    
        private Long getLong(String str, Long dft) {
            Long value = null;
            try {
                value =  Long.parseLong(str.trim());
            } catch (Exception e) {
                LOG.error("error when parsing " + str + " to long, use default value: " + dft);
                return dft;
            }
    
            return (value == null) ? dft : value;
        }
    
        private Boolean getBoolean(String str, Boolean dft) {
            try {
                return Boolean.parseBoolean(str.trim());
            } catch (Exception e) {
                LOG.error("error when parsing " + str + " to bool, use default value: " + dft);
                return dft;
            }
        }
    }
    
    
    

    为了张贴全,项目使用的pom文件也给出来:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
    
        <groupId>com.lhever</groupId>
        <artifactId>bodySender</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    
        <properties>
            <kafka.version>0.10.0.1</kafka.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <junit.version>4.11</junit.version>
    
    
            <protobuf.java.version>3.1.0</protobuf.java.version>
            <protobuf.java.format.version>1.2</protobuf.java.format.version>
    
            <!--skipTests会编译测试类,即生成.class文件,只是不运行测试类, 你可以手动运行测试类。-->
            <skipTests>true</skipTests>
        </properties>
    
        <dependencies>
    
            <!--   单元测试 -->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>${junit.version}</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.7</version>
            </dependency>
    
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-annotations</artifactId>
                <version>2.6.6</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.6.6</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.6.6</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.dataformat</groupId>
                <artifactId>jackson-dataformat-cbor</artifactId>
                <version>2.6.6</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.dataformat</groupId>
                <artifactId>jackson-dataformat-smile</artifactId>
                <version>2.6.6</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.dataformat</groupId>
                <artifactId>jackson-dataformat-yaml</artifactId>
                <version>2.6.6</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.dataformat</groupId>
                <artifactId>jackson-dataformat-cbor</artifactId>
                <version>2.6.6</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.module</groupId>
                <artifactId>jackson-module-paranamer</artifactId>
                <version>2.6.6</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${kafka.version}</version>
            </dependency>
    
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>2.5.0</version>
            </dependency>
            <dependency>
                <groupId>com.googlecode.protobuf-java-format</groupId>
                <artifactId>protobuf-java-format</artifactId>
                <version>${protobuf.java.format.version}</version>
            </dependency>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.9</version>
            </dependency>
    
            <dependency>
                <groupId>joda-time</groupId>
                <artifactId>joda-time</artifactId>
                <version>2.10</version>
            </dependency>
    
    
    
        </dependencies>
    
        <build>
            <plugins>
               <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <mainClass>com.core.APP</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    展开全文
  • JAVA代码实现多线程下载

    千次阅读 2018-08-10 11:21:03
    首先,我们构建一个多线程下载工具类--DownUtil.代码如下: import java.net.URL; import java.net.HttpURLConnection; import java.io.InputStream ; import java.io.RandomAccessFile; public class DownUtil { ...
    1. 首先,我们构建一个多线程下载工具类--DownUtil.代码如下:
    import java.net.URL;
    import java.net.HttpURLConnection; 
    import java.io.InputStream ;
    import java.io.RandomAccessFile;
    public class DownUtil
    {
    	//定义下载资源的路径
    	private String path;
    	//指定所下载的文件的保存位置
    	private String targetFile;
    	//定义需要使用多少个线程下载资源
    	private int threadNum;
    	//定义下载的线程对象
    	private DownThread[] threads;
    	//定义下载的文件的总大小
    	private int fileSize;
    
    	//构造器
    	public DownUtil(String path,String targetFile,int threadNum)
    	{
    		this.path=path;
    		this.threadNum=threadNum;
    		//初始化threads数组
    		threads=new DownThread[threadNum];
    		this.targetFile=targetFile;
    	}
    
    	public void download() throws Exception
    	{
    		URL url=new URL(path);
    		//1.通过调用URL对象的openConnection()方法来创建URLConnection对象
    		HttpURLConnection conn=(HttpURLConnection)url.openConnection();
    		//2.设置URLConnection的参数和普通请求属性
    		conn.setConnectTimeout(5*1000);
    		conn.setRequestMethod("GET");
    		conn.setRequestProperty(
    			"Accept",
    			"image/gif,image/jpeg,image/pjpeg,image/pjpeg,"
    			+"application/x-shockwave-flash,application/xaml+xml,"
    			+"application/vnd.ms-xpsdocument,application/x-ms-xbap,"
    			+"application/x-ms-application,application/vnd.ms-excel,"
    			+"application/vnd.ms-powerpoint,application/msword,*/*");
    		conn.setRequestProperty("Accept-Language","zh-CN");
    		conn.setRequestProperty("Charset","UTF-8");
    		conn.setRequestProperty("Connection","Keep-Alive");
    
    		//得到文件大小
    		fileSize=conn.getContentLength();
    		conn.disconnect();
    		int currentPartSize=fileSize/threadNum+1;
    		RandomAccessFile file=new RandomAccessFile(targetFile,"rw");
    		//设置本地文件的大小
    		file.setLength(fileSize);
    		file.close();
    		for(int i=0;i<threadNum;i++)
    		{
    			//计算每个线程下载开始的位置
    			int startPos=i*currentPartSize;
    			//每个线程使用一个RandomAccessFile进行下载
    			RandomAccessFile currentPart=new RandomAccessFile(targetFile,"rw");
    			//定位该线程的下载位置
    			currentPart.seek(startPos);
    			//创建下载线程
    			threads[i]=new DownThread(startPos,currentPartSize,currentPart);
    			//启动下载线程
    			threads[i].start();
    		}
    	}
    	
    	//获取下载的完成百分比
    	public double getCompleteRate()
    	{
    		//统计多个线程已经下载的总大小
    		int sumSize=0;
    		for(int i=0;i<threadNum;i++)
    		{
    			sumSize+=threads[i].length;
    		}
    		//返回已经完成的百分比
    		return sumSize*1.0/fileSize;
    	}
    
    	public class DownThread extends Thread
    	{
    		//当前线程的下载位置
    		private int startPos;
    		//定义当前线程负责下载的文件大小
    		private int currentPartSize;
    		//当前线程需要下载的文件块
    		private RandomAccessFile currentPart;
    		//定义该线程已下载的字节数
    		public int length;
    
    		//构造器
    		public DownThread(int startPos,int currentPartSize,RandomAccessFile currentPart)
    		{
    			this.startPos=startPos;
    			this.currentPartSize=currentPartSize;
    			this.currentPart=currentPart;
    		}
    		
    		//下载线程的主函数体
    		public void run()
    		{
    			try
    			{
    				URL url=new URL(path);
    				//1.通过调用URL对象的openConnection()方法来创建URLConnection对象
    				HttpURLConnection conn=(HttpURLConnection)url.openConnection();
    				//2.设置URLConnection的参数和普通请求属性
    				conn.setConnectTimeout(5*1000);
    				conn.setRequestMethod("GET");
    				conn.setRequestProperty(
    					"Accept",
    					"image/gif,image/jpeg,image/pjpeg,image/pjpeg,"
    					+"application/x-shockwave-flash,application/xaml+xml,"
    					+"application/vnd.ms-xpsdocument,application/x-ms-xbap,"
    					+"application/x-ms-application,application/vnd.ms-excel,"
    					+"application/vnd.ms-powerpoint,application/msword,*/*");
    				conn.setRequestProperty("Accept-Language","zh-CN");
    				conn.setRequestProperty("Charset","UTF-8");
    				//4.远程资源变为可用,程序可以通过输入流读取远程资源的数据
    				InputStream inStream=conn.getInputStream();
    
    				//跳过stratPos个字节,表明该线程只下载自己负责的那部分文件
                                    //同时每个线程都会在指定的文件区域写入,所以不会因为多线程而出
                                    //现资源组合的错乱,从指定位置读取资源,写入到指定位置                  
    				inStream.skip(this.startPos);
    				byte[] buffer=new byte[1024];//自己设置一个缓冲区
    				int hasread=0;
    
    				//-----------------读取网路数据,并写入本地文件------------------- 
    				//inStream.read(buffer))==-1 表示读取到文件末尾
    				while(length<currentPartSize&&(hasread=inStream.read(buffer))!=-1)
    				{ 
    					currentPart.write(buffer,0,hasread);
    					//累计该线程下载的总大小
    					length+=hasread;
    					//System.out.println(getName()+" "+hasread);
    				}
    				//System.out.println(getName()+" length "+length+" currentPartSize "+currentPartSize);
                                    //即使length>currentPartSize会是的该线程多写入几个字节,
                                    //但是下一个线程会从文件的指定位置写入,就会覆盖掉之前线程多写的一部分内容
    				currentPart.close();
    				inStream.close();
    			}
    			catch (Exception e)
    			{
    				e.printStackTrace();
    			}
    		}
    
    	}
    }
    

    上面代码的大致思路如下:

    DownUtils类—启动、实现多线程下载的类;

    DownThread—线程下载类,作为DownUtils类的辅助类,主要实现单个线程的下载逻辑。

    程序中DownUtils类中的download()方法负责按如下步骤实现多线程下载

    1. 创建URL对象
    2. 获取指定URL对象所指向资源的大小(通过getConnectLength()方法获得),此处,用到了URLConnection类,该类代表Java应用程序和URL之间的通信链接。
    3. 在本地磁盘上创建一个与网络资源具有同样大小的空文件
    4. 计算每个线程应该下载网络资源的哪个部分
    5. 依次创建、启动多个线程来下载网络资源的指定部分 

    创建一个和URL的连接,并发送请求、读取此URL引用的资源需要如下几个步骤:

    1. 通过调用URL对象的openConnection()方法来创建URLConnection对象;
    2. 设置URLConnection的参数和普通请求属性;
    3. 如果只是发送GET方式请求,则使用connect()方法建立和远程资源之间的实际连接即可;如果需要发送POST方式的请求,则需要获取URLConnection实例对应的输出流来发送请求参数
    4. 远程资源变为可用,程序可以访问远程资源的头字段或通过输入流读取远程资源的数据

    测试函数如下:

    public class  MultiThreadDown
    {
    	public static void main(String[] args) 
    		throws Exception
    	{
    		//初始化DownUtil对象
    		final DownUtil downUtil=new DownUtil("http://img10.360buyimg.com/n0/jfs"+
    		"/t18166/359/202833592/24066/9da49/5a628ffeN32d2b7c8.jpg","ios.png",4);
    		//开始下载
    		downUtil.download();
    		new Thread(()->
    		{
    			while(downUtil.getCompleteRate()<1)
    			{
    				//每隔0.1秒查询一次任务的完成进度
    				//GUI程序中可根据该进度来绘制进度条
    				System.out.println("已完成:"+downUtil.getCompleteRate());
    				try
    				{
    					Thread.sleep(100);
    				}
    				catch (Exception ex){}
    				
    			}
    		}).start();
    	}
    }

     

    展开全文
  • 日历表格面板 [ConfigLine.java] 控制条类 [RoundBox.java] 限定选择控件 [MonthMaker.java] 月份表算法类 [Pallet.java] 调色板,统一配色类 Java扫雷源码 Java生成自定义控件源代码 2个目标文件 Java实现HTTP连接...
  • java多线程-学习总结(完整版)

    千次阅读 多人点赞 2020-12-04 00:02:16
    java多线程 线程和进程 线程的生命周期 新建New 就绪&运行 Runable&Running 阻塞Blocked 等待 waiting 计时等待Time waiting 销毁Terminated 线程池概念和多线程使用场景 线程池的参数解析 线程池阻塞队列...
  • java多线程web服务器

    千次阅读 2016-04-08 11:36:01
    一、原理说明HTTP协议的作用原理包括四个步骤:(1)连接:Web浏览器与Web服务器建立连接,打开一个称为socket(套接字)的虚拟文件,此文件的建立标志着连接建立成功。(2)请求:Web浏览器通过socket向Web服务器提交...
  • Java 多线程NIO

    千次阅读 2018-07-04 21:12:12
    IO模型 1. 阻塞IO如果数据没有...3. 路复用IO NIO路复用IO,会有一个线程不断地去轮询个socket的状态,当socket有读写事件的时候才会调用IO读写操作。用一个线程管理个socket,是通过selector.select(...
  • 通过服务器中转消息,实现多客户端之间的对话。客户端输入格式为:接受消息的客户端编号+空格+要发出的消息。客户端输入end下线。有客户端上线或下线,服务器都会通知其他客户端情况。效果放在本文最后的位置。1....
  • Java多线程学习(吐血超详细总结)

    万次阅读 多人点赞 2015-03-14 13:13:17
    本文主要讲了java多线程的使用方法、线程同步、线程数据传递、线程状态及相应的一些线程函数用法、概述等。
  • Java 多线程分段下载原理分析和实现

    千次阅读 2017-07-14 13:27:32
    多线程下载介绍  多线程下载技术是很常见的一种下载方案,这种方式充分利用了多线程的优势,在同一时间段内通过多个线程发起下载请求,将需要下载的数据分割成多个部分,每一个线程只负责下载其中一个部分,然后将...
  • Java多线程面试题,我丝毫不慌

    万次阅读 多人点赞 2020-07-28 09:18:51
    文章目录 一、什么是多线程 一、初识多线程 1.1介绍进程 1.2回到线程 1.3进程与线程 1.4并行与并发 1.5Java实现多线程 1.5.1继承Thread,重写run方法 1.5.2实现Runnable接口,重写run方法 1.6Java实现多线程需要注意...
  • 多线程下载介绍 转自:http://blog.csdn.net/ylyg050518/article/details/52711653   多线程下载技术是很常见的一种下载方案,这种方式充分利用了多线程的优势,在同一时间段内通过多个线程发起下载请求,...
  • Java面试题大全(2020版)

    万次阅读 多人点赞 2019-11-26 11:59:06
    发现网上很多Java面试题都没有答案,所以花了很长时间搜集整理出来了这套Java面试题大全,希望对大家有帮助哈~ 本套Java面试题大全,全的不能再全,哈哈~ 一、Java 基础 1. JDK 和 JRE 有什么区别? JDK:Java ...
  • Java Socket实现基于TCP多线程通信

    千次阅读 2018-04-18 15:22:42
    在简化的计算机网络OSI模型中,它完成第四层传输层所指定的功能,用户数据报协议(UDP,下一篇博客会实现)是同一层内 另一个重要的传输协议。在因特网协议族(Internet protocol suite)中,TCP...
  • java使用socket实现一个多线程web服务器除了服务器类,还包括请求类和响应类请求类:获取客户的HTTP请求,分析客户所需要的文件响应类:获得用户请求后将用户需要的文件读出,添加上HTTP应答头。发送给客户端。...
  • socket编程 java多线程web服务器设计与实现

    千次阅读 多人点赞 2019-05-11 17:51:03
    相信很多人在socket编程学习过程中,都会遇到类似的web服务器的设计案例,可是对于刚接触socket的新手来说,往往难以迈出第一步,于是在此分享一个多线程的web服务器代码(参考计算机网络课程的代码框架),便于各位...
  • java网络编程之http多线程下载

    热门讨论 2012-01-21 01:11:31
    本demo通过RandomAccessFile, URLConnection和多线程机制实现Http下载功能.与别的网上的例子来看, 本demo是可运行的, 可以判断网站是否支持分段下载. 你是否遇到了java下载的图片显示不出来或者RAR解压不了的情况,...
  • 一、概念梳理 1、Socket是什么? Socket是应用层与TCP/IP协议族通信...目前java动态代理的实现分为两种 1.基于JDK的动态代理 2.基于CGILB的动态代理 在业务中使用动态代理,一般是为了给需要实现的方法添加预处理
  • Java中所使用的并发机制依赖于JVM的实现和CPU的指令。下边我们对常见的实现同步的两个关键字volatile和synchronized进行底层原理的分析,分析之余我们就会了解到JVM在对锁的优化所做的事情,这样的话我们以后在使用...
  • Java实现邮件发送

    万次阅读 多人点赞 2019-07-20 16:03:19
    Java实现邮件发送 一、邮件服务器与传输协议 要在网络上实现邮件功能,必须要有专门的邮件服务器。这些邮件服务器类似于现实生活中的邮局,它主要负责接收用户投递过来的邮件,并把邮件投递到邮件接收者的电子邮箱...
  • 1.新建springgboot项目 新建springboot项目redis-queue. 2.引入依赖 引入相关依赖,其中用到...project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:s
  • Java多线程应用实例

    万次阅读 2016-08-23 18:23:50
    Java多线程及其应用实例,数据库连接池,线程池
  • Java多线程爬虫爬取京东商品信息

    千次阅读 2017-07-24 15:44:35
    爬虫可以通过模拟浏览器访问网页,从而获取数据,一般网页里会有很个URL,爬虫可以访问这些URL到达其他网页,相当于形成了一种数据结构——图,我们通过广度优先搜索和深度优先搜索的方式来遍历这个图,从而做到...
  • 部分主要阐述 Thread 的基础知识,详细介绍线程的 API 使用、线程安全、线程间数据通信,以及如何保护共享资源等内容,它是深入学习多线程内容的基础。 第二部分引入了 ClassLoader,这是因为 ClassLoader 与线程...
  • JAVA高并发多线程必须懂的50个问题

    万次阅读 多人点赞 2017-06-30 14:28:19
    http://www.importnew.com/12773.html ImportNew ...2014/08/21 | 分类: 基础技术 | 27 条评论 | 标签: 多线程, 面试题 分享到: 692 本文由 ImportNew - 李 广 翻译自 javarevisited。欢迎加入翻译小
  • JAVA多线程阻塞

    万次阅读 多人点赞 2017-02-13 17:15:33
    原文出处:...你如果看懂了这个图,那么对于多线程的理解将会更加深刻! 1、新建状态(New):新创建了一个线程对象。 2、就绪状态(Runnable):线程对象创建后,其他线程调用了该对象的start()
  • 如果我们希望任务完成之后有返回值,可以实现Callable接口。 2)Callable是一个具有类型参数的范型,他的类型参数方法表示为方法call()而不是run()中返回的值,并且必须使用ExecutorService.submint()方法进行调用...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 152,586
精华内容 61,034
关键字:

java实现多线程发送http

java 订阅