精华内容
下载资源
问答
  • 如何通过重写接口抽象方法的方式获取接口返回的数据,并做自己的业务处理。这里用到kafka作为数据的来源。也可以不使用,不使用的话,代码里面有注释怎么做。 一. 接口层的代码实现 1.新建接口类 IManager ...

    本文记录一下自己工作中遇到的场景。如何通过重写接口抽象方法的方式获取接口返回的数据,并做自己的业务处理。这里用到kafka作为数据的来源。也可以不使用,不使用的话,代码里面有注释怎么做。

    一. 接口层的代码实现

    1.新建接口类 IManager

    package com.cwp.jar.impl;
    
    /**
     * .业务逻辑接口类
     *
     * @since:
     * @create: 2020-07-10 10:55
     */
    public interface IManager {
    
        /**
         * 消息处理, 让接口调用者去重写该方法
         */
        int process(long offset, String pram);
    
    }
    

    2.新建业务逻辑类 ManagerService

    package com.cwp.jar.service;
    
    import com.cwp.jar.impl.IManager;
    import com.cwp.jar.kafka.Consumer;
    
    /**
     * .业务逻辑层
     *
     * @create: 2020-07-10 11:03
     * @since:
     */
    public class ManagerService {
    
        private static ManagerService instance;
    
        private ManagerService() {
        }
    
        public static ManagerService getInstance() {
            if (instance == null) {
                instance = new ManagerService();
            }
            return instance;
        }
    
        /**
         * 对外提供的接口
         *
         * @param manager 接口调用时传入的类名,并实现里面的方法,即可获取到kafka返回的消息,并做自己的业务逻辑
         */
        public void handle(IManager manager) {
            Consumer consumer = new Consumer(manager);
            consumer.init();
    
            // TODO 以下代码是没有使用kafka消费者的情况
            // int result;
            // for (long i = 0; i < 20; i++) {
            //     result = manager.process(i, "hello..." + i);
            //     if (result == 0) {
            //         System.out.println("业务处理完成...");
            //     } else {
            //         System.out.println("处理失败...");
            //     }
            //     try {
            //         Thread.sleep(1000);
            //     } catch (InterruptedException e) {
            //         e.printStackTrace();
            //     }
            // }
    
        }
    
    }
    

    3.新建kafka消费者类 Consumer

    package com.cwp.jar.kafka;
    
    import com.cwp.jar.impl.IManager;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    /**
     * .kafka 消费者类
     *
     * @create: 2020-06-05 11:42
     * @since:
     */
    public class Consumer {
    
        private static KafkaConsumer consumer = null;
    
        private IManager iManager;
    
        public Consumer(IManager iManager) {
            this.iManager = iManager;
        }
    
        public void init() {
            while (consumer == null) {
                consumer = this.getKafkaConsumer(); // 创建消费者
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            consumer.subscribe(Arrays.asList("test")); // 消费者订阅的topic, 可同时订阅多个
            System.out.println("成功订阅主题 ===> " + Arrays.asList("test"));
            while (true) {
                try {
                    ConsumerRecords<String, byte[]> records = consumer.poll(100); // 读取数据,读取超时时间为100ms
                    for (ConsumerRecord<String, byte[]> record : records) {
                        byte[] data = record.value();
                        System.out.println("接收到的主题 = " + record.topic() + " \t 分区 = " + record.partition() + "\t offset = "
                            + record.offset() + "\t 接收到的内容 = " + new String(data) + "\t key = " + record.key());
    
                        this.handleRecord(record); // 业务处理
    
                        this.commitOffset(record);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 消息处理方法, 交给接口调用者去实现业务逻辑
         */
        private void handleRecord(ConsumerRecord<String, byte[]> record) {
            int result = iManager.process(record.offset(), new String(record.value()));
            if (result == 0) {
                System.out.println("业务处理成功...");
            } else {
                System.out.println("业务处理失败...");
            }
        }
    
        /**
         * 提交 offset
         */
        private void commitOffset(ConsumerRecord<String, byte[]> record) {
            Map<TopicPartition, OffsetAndMetadata> commitDataMap = new HashMap<>();
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata meta = new OffsetAndMetadata(record.offset());
            commitDataMap.put(topicPartition, meta);
            consumer.commitSync(commitDataMap);
            System.out.println("提交offset成功... offset = " + (record.offset()));
        }
    
        /**
         * 创建kafka消费者
         */
        private KafkaConsumer<String, byte[]> getKafkaConsumer() {
            Properties props = new Properties();
            // 定义kakfa 服务的地址,不需要将所有broker指定上
            props.put("bootstrap.servers", "192.168.126.128:9092");
            // 制定consumer group
            props.put("group.id", "test");
            // 是否自动确认offset
            props.put("enable.auto.commit", "false");
            // 自动确认offset的时间间隔
            props.put("auto.commit.interval.ms", "1000");
            // key的序列化类
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // value的序列化类
            props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    
            // 定义consumer
            return new KafkaConsumer<>(props);
        }
    }
    

    二. 调用层的代码实现

    1.新建测试类 Main

    package com.cwp.jar.init;
    
    import com.cwp.jar.impl.IManager;
    import com.cwp.jar.service.ManagerService;
    
    /**
     * .具体接口调用的业务类
     *
     * @create: 2020-07-10 11:18
     * @since:
     */
    public class Main {
    
        public static void main(String[] args) {
    
            ManagerService managerService = ManagerService.getInstance();
    
            managerService.handle(new IManager() {
                @Override
                public int process(long offset, String pram) {
                    System.out.println("业务处理中... offset = " + offset + " pram = " + pram);
                    return 0;
                }
            });
    
        }
    }
    

    至此,代码已经完成,启动main方法即可获取到接口返回的数据。

    最后,附上pom文件的依赖和kafka生产者的代码提供参考

            <!-- Kafka消息队列 -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.9.0.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.5</version>
            </dependency>
    

    kafka生产者类 Producer

    package com.cwp.kafka;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    /**
     * .kafka生产者类
     *
     * @create: 2020-06-05 11:45
     * @since:
     */
    public class Producer {
    
        private static KafkaProducer<String, byte[]> producer = null;
    
        public static void main(String[] args) {
            new Thread(() -> {
                int index = 0;
                while (true) {
                    send("test", 0, String.valueOf(index), ("hello " + index).getBytes());
                    index++;
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    
        private static void send(String topic, int partition, String key, byte[] packet) {
            while (producer == null) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                producer = getKafkaProducer();
            }
            producer.send(new ProducerRecord<>(topic, partition, key, packet), (metadata, exception) -> {
                if (metadata != null) {
                    System.out.println(
                        "同步发送的 主题 = " + metadata.topic() + "\t 分区 = " + metadata.partition() + "\t offset ==> "
                            + metadata.offset() + "\t 大小 = " + packet.length + "\t 内容 = " + new String(packet));
                }
                if (exception != null) {
                    exception.printStackTrace();
                }
            });
        }
    
        private static KafkaProducer<String, byte[]> getKafkaProducer() {
            Properties props = new Properties();
            // Kafka服务端的主机名和端口号
            props.put("bootstrap.servers", "192.168.126.128:9092");
            // 等待所有副本节点的应答
            props.put("acks", "all");
            // 消息发送最大尝试次数
            props.put("retries", 0);
            // 一批消息处理大小
            props.put("batch.size", 20971520);
            props.put("max.request.size", 2097152);
            // 增加服务端请求延时
            props.put("linger.ms", 1);
            // 发送缓存区内存大小
            props.put("buffer.memory", 33554432);
            // key序列化
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // value序列化
            props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            return new KafkaProducer<>(props);
        }
    
    }
    

     

    展开全文
  • java接收远程调用的数据,得到的是如上个数的返回内容,我怎么写才能获取到值,现在使用的请求方法如下:public static HttpResult postJsonData(String url, Map params, String charset) throws Exception{...

    548d111c355d2eae8c95dfa99aa4f155.png

    java接收远程调用的数据,得到的是如上个数的返回内容,我怎么写才能获取到值,现在使用的请求方法如下:

    public static HttpResult postJsonData(String url, Map params, String charset) throws Exception{

    CloseableHttpClient httpclient = HttpClientUtil.createDefault();

    HttpPost httpPost = new HttpPost(url);

    //拼接参数

    List list = new ArrayList();

    for (Map.Entry entry : params.entrySet()) {

    String key = entry.getKey().toString();

    String value = entry.getValue().toString();

    System.out.println("key=" \+ key + " value=" \+ value);

    NameValuePair pair = new BasicNameValuePair(key, value);

    list.add(pair);

    }

    CloseableHttpResponse response=null;

    try {

    if(StringUtils.isEmpty(charset)){

    charset = DEFAULT\_CHARSET;

    }

    httpPost.setEntity(new UrlEncodedFormEntity(list,charset));

    response = httpclient.execute(httpPost);

    /\*\*请求发送成功,并得到响应\*\*/

    if(response!=null && response.getStatusLine().getStatusCode() == HttpStatus.SC\_OK){

    HttpEntity httpEntity = response.getEntity();

    if (httpEntity!=null){

    System.out.println("响应内容为:" \+ EntityUtils.toString(httpEntity));

    //System.out.println("响应内容为1:" + httpEntity.getContent());

    }

    String result=null;

    try {

    result = EntityUtils.toString(httpEntity,DEFAULT\_CHARSET);

    logger.info(result);

    } catch (IOException e) {

    throw e;

    } finally {

    EntityUtils.consume(httpEntity);

    }

    return JSON.parseObject(result,HttpResult.class);

    //return httpEntity.getContent();

    }else{

    return null;

    }

    } catch (IOException e) {

    throw new Exception(e.getMessage());

    }finally {

    try{

    // 释放资源

    if(response!=null){

    response.close();

    }

    if(httpclient!=null){

    httpclient.close();

    }

    }catch (IOException e1){

    throw new Exception("CloseableHttpResponse close exception");

    }

    }

    }

    求教怎么写才能把返回的值取到,现在用上边的方法会报如下错误:

    bf71e9a03e1532fe4cea1bbed9f69157.png

    展开全文
  • java接收远程调用的数据,得到的是如上个数的返回内容,我怎么写才能获取到值,现在使用的请求方法如下:public static HttpResult postJsonData(String url, Map params, String charset) throws Exception{...

    548d111c355d2eae8c95dfa99aa4f155.png

    java接收远程调用的数据,得到的是如上个数的返回内容,我怎么写才能获取到值,现在使用的请求方法如下:

    public static HttpResult postJsonData(String url, Map params, String charset) throws Exception{

    CloseableHttpClient httpclient = HttpClientUtil.createDefault();

    HttpPost httpPost = new HttpPost(url);

    //拼接参数

    List list = new ArrayList();

    for (Map.Entry entry : params.entrySet()) {

    String key = entry.getKey().toString();

    String value = entry.getValue().toString();

    System.out.println("key=" \+ key + " value=" \+ value);

    NameValuePair pair = new BasicNameValuePair(key, value);

    list.add(pair);

    }

    CloseableHttpResponse response=null;

    try {

    if(StringUtils.isEmpty(charset)){

    charset = DEFAULT\_CHARSET;

    }

    httpPost.setEntity(new UrlEncodedFormEntity(list,charset));

    response = httpclient.execute(httpPost);

    /\*\*请求发送成功,并得到响应\*\*/

    if(response!=null && response.getStatusLine().getStatusCode() == HttpStatus.SC\_OK){

    HttpEntity httpEntity = response.getEntity();

    if (httpEntity!=null){

    System.out.println("响应内容为:" \+ EntityUtils.toString(httpEntity));

    //System.out.println("响应内容为1:" + httpEntity.getContent());

    }

    String result=null;

    try {

    result = EntityUtils.toString(httpEntity,DEFAULT\_CHARSET);

    logger.info(result);

    } catch (IOException e) {

    throw e;

    } finally {

    EntityUtils.consume(httpEntity);

    }

    return JSON.parseObject(result,HttpResult.class);

    //return httpEntity.getContent();

    }else{

    return null;

    }

    } catch (IOException e) {

    throw new Exception(e.getMessage());

    }finally {

    try{

    // 释放资源

    if(response!=null){

    response.close();

    }

    if(httpclient!=null){

    httpclient.close();

    }

    }catch (IOException e1){

    throw new Exception("CloseableHttpResponse close exception");

    }

    }

    }

    求教怎么写才能把返回的值取到,现在用上边的方法会报如下错误:

    bf71e9a03e1532fe4cea1bbed9f69157.png

    展开全文
  • java接收远程调用的数据,得到的是如上个数的返回内容,我怎么写才能获取到值,现在使用的请求方法如下:public static HttpResult postJsonData(String url, Map params, String charset) throws Exception{...

    548d111c355d2eae8c95dfa99aa4f155.png

    java接收远程调用的数据,得到的是如上个数的返回内容,我怎么写才能获取到值,现在使用的请求方法如下:

    public static HttpResult postJsonData(String url, Map params, String charset) throws Exception{

    CloseableHttpClient httpclient = HttpClientUtil.createDefault();

    HttpPost httpPost = new HttpPost(url);

    //拼接参数

    List list = new ArrayList();

    for (Map.Entry entry : params.entrySet()) {

    String key = entry.getKey().toString();

    String value = entry.getValue().toString();

    System.out.println("key=" \+ key + " value=" \+ value);

    NameValuePair pair = new BasicNameValuePair(key, value);

    list.add(pair);

    }

    CloseableHttpResponse response=null;

    try {

    if(StringUtils.isEmpty(charset)){

    charset = DEFAULT\_CHARSET;

    }

    httpPost.setEntity(new UrlEncodedFormEntity(list,charset));

    response = httpclient.execute(httpPost);

    /\*\*请求发送成功,并得到响应\*\*/

    if(response!=null && response.getStatusLine().getStatusCode() == HttpStatus.SC\_OK){

    HttpEntity httpEntity = response.getEntity();

    if (httpEntity!=null){

    System.out.println("响应内容为:" \+ EntityUtils.toString(httpEntity));

    //System.out.println("响应内容为1:" + httpEntity.getContent());

    }

    String result=null;

    try {

    result = EntityUtils.toString(httpEntity,DEFAULT\_CHARSET);

    logger.info(result);

    } catch (IOException e) {

    throw e;

    } finally {

    EntityUtils.consume(httpEntity);

    }

    return JSON.parseObject(result,HttpResult.class);

    //return httpEntity.getContent();

    }else{

    return null;

    }

    } catch (IOException e) {

    throw new Exception(e.getMessage());

    }finally {

    try{

    // 释放资源

    if(response!=null){

    response.close();

    }

    if(httpclient!=null){

    httpclient.close();

    }

    }catch (IOException e1){

    throw new Exception("CloseableHttpResponse close exception");

    }

    }

    }

    求教怎么写才能把返回的值取到,现在用上边的方法会报如下错误:

    bf71e9a03e1532fe4cea1bbed9f69157.png

    展开全文
  • 前言现在开发大部分都是服务化或者微服务,数据交换都是跨服务,这里记录java调取其他接口的方法,下面话不多说了,来一起看看详细介绍吧。java代码如下:/****类描述:接口读取工具。*/public class ...
  • // Request curl -X POST --data '{"jsonrpc":"2.0","method":"shh_getFilterChanges...使用火狐httprequest插件时候,输入url(http:127.0.0.1:8545)和参数,调用post方法,接口返回数据,这个工具可以实现嘛?
  • 1.获取方式 Map<String, String> map = WitMedicalUtils.parseRequestMap(request.getParameterMap()); 2.解析方法:WitMedicalUtils.parseRequestMap() public static Map<String,String> ...
  • C#后端访问外部接口获取返回的json数据 在项目开发中经常会遇到需要调用第三方或者...先写一个不带参数调用接口获取接口返回的json数据。 先看案例 //要访问的接口地址 string Url = "http://localhost:8086/FES...
  • 获取天气数据 (根据天气接口返回的数据) 接口:心知天气 ulr:https://api.seniverse.com/v3/pro/weather/grid/now.json?key=your_api_key&amp;location=39.865927:116.359805 请求参数: key:你的API密钥 location...
  • //Java获取接口返回的json JSONObject jsonObj = new JSONObject(); jsonObj.put("userId", user_id); JSONObject object=httpUtils.sendPost(...
  • 获取接口数据的存放(不涉及返回大量数据) 返回的单条数据存放 //将json字符串数据转换成json对象 JSONObject jsonobject = JSONObject.parseObject(jsonString); String list = jsonobject.getString("item"); ...
  • 在对接美团外卖,新手,怎么获取回掉地址中的数据,并返回给美团成功标识?一下方法可以获取数据,那怎么返回给美团 success呢??publicfunctioncancel_mt_address(){header("Content-type:text/html;charset=utf8...
  • 需求:在java工程中,需要用到别的接口提供的数据时,如何获取到这个数据,这个工具类可以获取接口返回的json数据,直接拷贝就可以用了。 package test.utils; import java.io.BufferedReader; import java.io....
  • 总所周知uniapp的接口是异步的,所以我们可以用promise await async去解决这个问题 methods:{ getDtail(){ return new Promise...接下来是重点操作我们要在onShow或者onLoad方法去获取这个接口返回成功的数据 //o.
  • Laravel5.8+vue 使用echarts 饼图动态获取接口返回数据 使用echarts在我上一篇文章Laravel5.8+vue 使用echarts 话不多说直接上代码吧 <script> //初始化一个空数组方便存放给饼图的数据 var servicedata=[];...
  • 比如302接口返回的headers如下: <pre><code> { "Server": "nginx/1.12.2", "Date": "Fri, 03 Jan 2020 07:17:07 GMT", "Content-Length": "0", "...
  • #接口返回数据假如如下,返回的是简书的首页数据 header={ 'Accept': 'application/json', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 ...
  • 获取接口请求的数据可以在Interceptor的afterCompletion中实现但是要重写RequestWrapper,代码记录如下: HttpServletRequestFilter import javax.servlet.*; import javax.servlet.annotation.WebFilter; import...
  • 问题描述:本地的换行文本换行可以正常,但是调用接口返回的数据(带\n的数据)不能换行, 备注:text组件才可以换行,view组件是不能自动识别\n换行的 解决方案:两种方案 json的问题 数据库中\n,转json后会变成\...
  • AsyncTask获取的json数据通过接口返回给Activity
  • //200为请求成功(只是请求成功,如进行查询,连上了,查询失败也是200,具体成功与否看返回的数据) if (statusCode == 200) { //获取返回值 HttpEntity entity = response.getEntity(); String mes = EntityUtils....
  • ``` btn1.setOnClickListener(new View.OnClickListener() { @Override ...但是如果点击btn1 执行获取jsonArray接口事件,点击btn2 遍历jsonArray,就可以正常获取jsonArray数据。。这是啥原因??
  • 有A,B两个接口,B接口传入的参数需要从A接口的response获取,其中A返回的数据是数组类型,此时如何把A返回的数组循环传给B接口呢?使用ForEach控制器(可以顺序取出上一接口的某个参数 ) 其中有两种写法: 1.写...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,285
精华内容 2,114
关键字:

获取接口返回的数据