微信公众号 订阅
微信公众号是开发者或商家在微信公众平台上申请的应用账号,该帐号与QQ账号互通,通过公众号,商家可在微信平台上实现和特定群体的文字、图片、语音、视频的全方位沟通、互动 。形成了一种主流的线上线下微信互动营销方式。2016年1月18日,腾讯在北京发布消息称,中国政务微信公号已逾10万。2018年4月,腾讯已查处9.9万多个违规公众号 [1]  。11月16日,微信公众平台发布公告称,个人注册公众号数量上限调整为1个 [2]  。2019年12月26日,微信公众号文章无法打开,点击文章链接后显示空白。随后,微信团队道歉,公众号后台大部分功能已经修复。 [3-4] 展开全文
微信公众号是开发者或商家在微信公众平台上申请的应用账号,该帐号与QQ账号互通,通过公众号,商家可在微信平台上实现和特定群体的文字、图片、语音、视频的全方位沟通、互动 。形成了一种主流的线上线下微信互动营销方式。2016年1月18日,腾讯在北京发布消息称,中国政务微信公号已逾10万。2018年4月,腾讯已查处9.9万多个违规公众号 [1]  。11月16日,微信公众平台发布公告称,个人注册公众号数量上限调整为1个 [2]  。2019年12月26日,微信公众号文章无法打开,点击文章链接后显示空白。随后,微信团队道歉,公众号后台大部分功能已经修复。 [3-4]
信息
外文名
Wechat Official Account
类    型
开放应用平台
分    类
订阅号和服务号
中文名
微信公众号
认证收费
服务号、订阅号认证均需300元/年
隶属企业
腾讯公司
微信公众号发展历程
2018年6月27日,微信官方宣布,微信公众平台上线开放转载功能,文章可以直接被转载,不需要人工再次确认。 [5]  2018年11月16日,微信公众平台发布公告称,即日起,公众号注册将做调整:个人主体注册公众号数量上限由2个调整为1个;企业类主体注册公众号数量上限由5个调整为2个。 [2]  2018年12月29日,微信公众平台运营功能再次升级,修改已发送文章的错别字上限由5个上调至10个,同时支持增、删和替换,但标题和摘要依然无法修改,修改机会依然有且仅有一次。 [6]  2019年8月,滴滴出行宣布,与万达酒店及度假村达成战略合作。用户可在万达酒店微信公众号内使用滴滴叫车。 [7]  2020年6月29日,微信公众号增加两项新功能,在文章底部新增了“分享”和“赞”。其中,用户点击“分享”可“分享到朋友圈”或“发送给朋友”。这两项新功能已陆续全量开放。 [8] 
收起全文
精华内容
参与话题
问答
  • maven配置 <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> ...

    Kafka——SpringBoot整合Kafka


    简介

    SpringBoot整合Kafka,开启生产者服务并使用Web接口的方式向Kafka集群发送消息,同时开启一个消费者服务作为消息接受消费,模拟Web环境下的消息生产和消费过程

    maven配置

    添加maven依赖,主要有springboot、spring-kafka整合依赖、springboot-web

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-test</artifactId>
            </dependency>
            
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
        </dependencies>
    

    spring配置

    spring配置写在application.yml中,将原来在代码中的配置转移到了配置文件里面,具体配置项的内容在org.springframework.boot.autoconfigure.kafka下KafkaProperties类中。这里选择了几个关键配置属性进行配置,如Kafka集群、生产者和消费者的键、值序列化器和反序列化器

    server:
      port: 8081
    spring:
      kafka:
        bootstrap-servers: 192.168.108.128:9092,192.168.108.129:9092,192.168.108.130:9092
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          enable-auto-commit: true
          auto-commit-interval: 100
    

    生产者

    生产者调用web api接口请求,通过KafkaTemplete类进行发送,在Controller层注入KafkaTemplete,他的泛型即为键和值的类型,这里都用String类型的。生产者发送消息主要有同步发送和异步发送两种方式

    @RestController
    @Slf4j
    @RequestMapping("/producer")
    public class KafkaProduerController {
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    }
    

    同步发送

    同步发送消息通过调用KafkaTemplete的send()方法。由于同步发送不关心是否发送成功,所以调用后不需要返回值

    @PostMapping("/sync")
    public String sendSync() {
    for (int i = 0; i < 20; i++) {
    kafkaTemplate.send("spring-kafka-sync", "spring-data-" + i);
    }
    return Kafka.SUCCESS.getValue();
    }
    

    异步发送

    异步发送同样是调用KafkaTemplete的send()方法,由于异步发送需要知道消息是否发送成功才进行下一条消息的发送,因此会返回一个ListenableFuture对象,并且给该对象添加回调事件,来对消息发送成功或失败事件进行响应和处理

    @PostMapping("/async")
    public String sendAsync(){
    String topic = "spring-kafka-async";
    for (int i = 0; i < 20; i++) {
    ListenableFuture<SendResult<String, String>> sendReult = kafkaTemplate.send(topic, "spring-kafka-async-"+i);
    sendReult.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onFailure(Throwable throwable) {
        log.error(topic +"--send failed");
        log.error(throwable.getMessage());
        }
    
        @Override
        public void onSuccess(SendResult<String, String> stringStringSendResult) {
        log.info(topic+" --send success");
        log.info(topic+" partition: "+ stringStringSendResult.getRecordMetadata().partition());
        log.info(topic+" offset: "+ stringStringSendResult.getRecordMetadata().offset());
        }
        });
    
        }
    return Kafka.SUCCESS.getValue();
    }
    

    消费者

    消费者通过在方法上加上@KafkaListeners注解来对Kafka中的Topic进行监听,当监听的Topic有消息时便会自动接收到消息

    @Component
    @Slf4j
    public class ConsumerListener {
        @KafkaListeners({
                @KafkaListener(id = "spring-kafka-sync", topics = "spring-kafka-sync"),
                @KafkaListener(id = "spring-kafka-async", topics = "spring-kafka-async")
        })
        public void listen(String info) {
            log.info("consumer receive info: " + info);
        }
    }
    

    调用

    同步发送

    请求地址

    • http://127.0.0.1:8081/producer/sync

    控制台
    在这里插入图片描述

    异步发送

    请求地址

    • http://127.0.0.1:8081/producer/async

    控制台

    在这里插入图片描述

    从控制台可以观察到,同步发送全部发送成功才开始依次消费消息;异步发送一边发一边消费,但是发送和消费顺序依然一致。

    参考

    Kafka——Kafka搭建及问题解决

    Kafka——Kafka相关操作

    展开全文
  • KafkaSpringBoot整合

    千次阅读 2020-06-23 15:11:41
    KafkaSpringBoot整合

    引入相关jar包

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    

    springboot启动类,无特殊配置。

    @SpringBootApplication
    public class KafkaApplication {
        public static void main(String[] args) {
            SpringApplication.run(KafkaApplication.class, args);
        }
    }
    

    因为在spring-boot-autoconfigure的spring.factories中为我们自动装配了kafka相关的属性配置。

    org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
    

    KafkaAutoConfiguration类

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(KafkaTemplate.class)
    @EnableConfigurationProperties(KafkaProperties.class)
    @Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
    public class KafkaAutoConfiguration {
    
    	private final KafkaProperties properties;
    
    	public KafkaAutoConfiguration(KafkaProperties properties) {
    		this.properties = properties;
    	}
    
    	@Bean
    	@ConditionalOnMissingBean(KafkaTemplate.class)
    	public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
    			ProducerListener<Object, Object> kafkaProducerListener,
    			ObjectProvider<RecordMessageConverter> messageConverter) {
    		KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
    		messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
    		kafkaTemplate.setProducerListener(kafkaProducerListener);
    		kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
    		return kafkaTemplate;
    	}
    
    	@Bean
    	@ConditionalOnMissingBean(ProducerListener.class)
    	public ProducerListener<Object, Object> kafkaProducerListener() {
    		return new LoggingProducerListener<>();
    	}
    
    	@Bean
    	@ConditionalOnMissingBean(ConsumerFactory.class)
    	public ConsumerFactory<?, ?> kafkaConsumerFactory(
    			ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
    		DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
    				this.properties.buildConsumerProperties());
    		customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
    		return factory;
    	}
    
    	@Bean
    	@ConditionalOnMissingBean(ProducerFactory.class)
    	public ProducerFactory<?, ?> kafkaProducerFactory(
    			ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
    		DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
    				this.properties.buildProducerProperties());
    		String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
    		if (transactionIdPrefix != null) {
    			factory.setTransactionIdPrefix(transactionIdPrefix);
    		}
    		customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
    		return factory;
    	}
    
    	@Bean
    	@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
    	@ConditionalOnMissingBean
    	public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
    		return new KafkaTransactionManager<>(producerFactory);
    	}
    
    	@Bean
    	@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
    	@ConditionalOnMissingBean
    	public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
    		KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
    		Jaas jaasProperties = this.properties.getJaas();
    		if (jaasProperties.getControlFlag() != null) {
    			jaas.setControlFlag(jaasProperties.getControlFlag());
    		}
    		if (jaasProperties.getLoginModule() != null) {
    			jaas.setLoginModule(jaasProperties.getLoginModule());
    		}
    		jaas.setOptions(jaasProperties.getOptions());
    		return jaas;
    	}
    
    	@Bean
    	@ConditionalOnMissingBean
    	public KafkaAdmin kafkaAdmin() {
    		KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
    		kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
    		return kafkaAdmin;
    	}
    
    }
    

    自动装配的方式虽然方便,但是对于参数的设置有一定的局限性,比如看一下生产者我们只能配置如下一些选项。

    public Map<String, Object> buildProperties() {
    			Properties properties = new Properties();
    			PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
    			map.from(this::getAcks).to(properties.in(ProducerConfig.ACKS_CONFIG));
    			map.from(this::getBatchSize).asInt(DataSize::toBytes).to(properties.in(ProducerConfig.BATCH_SIZE_CONFIG));
    			map.from(this::getBootstrapServers).to(properties.in(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
    			map.from(this::getBufferMemory).as(DataSize::toBytes)
    					.to(properties.in(ProducerConfig.BUFFER_MEMORY_CONFIG));
    			map.from(this::getClientId).to(properties.in(ProducerConfig.CLIENT_ID_CONFIG));
    			map.from(this::getCompressionType).to(properties.in(ProducerConfig.COMPRESSION_TYPE_CONFIG));
    			map.from(this::getKeySerializer).to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
    			map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG));
    			map.from(this::getValueSerializer).to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
    			return properties.with(this.ssl, this.security, this.properties);
    		}
    

    虽然一般情况这些参数已经足够了,但是如果真的需要配置一些特殊的参数,那么我们就需要自己定义生产者和消费者。

    自定义生产者

    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {
    	//从配置文件中读取
        @Value("${kafka.producer.servers}")
        private String servers;
        @Value("${kafka.producer.retries}")
        private int retries;
        @Value("${kafka.producer.batch.size}")
        private int batchSize;
        @Value("${kafka.producer.linger}")
        private int linger;
        @Value("${kafka.producer.buffer.memory}")
        private int bufferMemory;
    
    
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            props.put(ProducerConfig.RETRIES_CONFIG, retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            //自己修改的一些配置
            props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.wyl.config.MySelfPartitioner");
            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
            return props;
        }
    
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            KafkaTemplate kafkaTemplate
                    = new KafkaTemplate(producerFactory()) ;
            return kafkaTemplate;
        }
    }
    

    自定义消费者

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
        @Value("${kafka.consumer.servers}")
        private String servers;
        @Value("${kafka.consumer.enable.auto.commit}")
        private boolean enableAutoCommit;
        @Value("${kafka.consumer.session.timeout}")
        private String sessionTimeout;
        @Value("${kafka.consumer.auto.commit.interval}")
        private String autoCommitInterval;
        @Value("${kafka.consumer.group.id}")
        private String groupId;
        @Value("${kafka.consumer.auto.offset.reset}")
        private String autoOffsetReset;
        @Value("${kafka.consumer.concurrency}")
        private int concurrency;
    
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
            return propsMap;
        }
    
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
    	//listener可以使用@bean,也可以在类上使用@Component。
        @Bean
        public MyListener listener() {
            return new MyListener();
        }
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
        kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory
                    = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    }
    
    public class MyListener {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MyListener.class);
        
        @KafkaListener(id = "test_topic_listener", topics = {"test_topic"})
        public void consumer(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) {
            try {
                LOGGER.info("接收topic为: {},key为:{},value为:{},offset:{},partition:{}",
                        consumerRecord.topic(), consumerRecord.key(), consumerRecord.value(), consumerRecord.offset(), consumerRecord.partition());
            } catch (Exception e) {
                LOGGER.error("kafka 监听异常:{}", e);
            } finally {
                ack.acknowledge();
            }
        }   
    }
    

    消费者监听消息的大致流程:
    在这里插入图片描述

    展开全文
  • 请注意:springboot整合的kafka最低版本为0.9.xxx ## **spring整合kafka:**spring-kafka-2.1.4.RELEASE.jar spring版本:spring4.3.5 Kafka-clients:kafka-clients-1.0.0.jar **生产者:** &lt;?xml ...

    本人用kafka版本:_2.10-0.10.0.0
    请注意:springboot整合的kafka最低版本为0.9.xxx


    ## **spring整合kafka:**spring-kafka-2.1.4.RELEASE.jar
    spring版本:spring4.3.5
    Kafka-clients:kafka-clients-1.0.0.jar
    **生产者:**

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
                                http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                                http://www.springframework.org/schema/context 
                                http://www.springframework.org/schema/context/spring-context-4.0.xsd
                                http://www.springframework.org/schema/aop 
                                http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
                                http://www.springframework.org/schema/tx 
                                http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">
    
        <!--基本配置 -->
        <bean id="producerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <!-- kafka服务地址,可能是集群 localhost:9092,localhost:9093,localhost:9094-->
                    <entry key="bootstrap.servers" value="localhost:9093" />
                    <!-- 有可能导致broker接收到重复的消息,默认值为3 -->
                    <entry key="retries" value="10" />
                    <!-- 每次批量发送消息的数量 -->
                    <entry key="batch.size" value="1638" />
                    <!-- 默认0ms,在异步IO线程被触发后(任何一个topic,partition满都可以触发) -->
                    <entry key="linger.ms" value="1" />
                    <!--producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常 -->
                    <entry key="buffer.memory" value="33554432 " />
                    <!-- producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号 -->
                    <entry key="acks" value="all" />
                    <entry key="key.serializer"
                        value="org.apache.kafka.common.serialization.StringSerializer" />
                    <entry key="value.serializer"
                        value="org.apache.kafka.common.serialization.StringSerializer" />
                </map>
            </constructor-arg>
        </bean>
    
        <!-- 创建kafkatemplate需要使用的producerfactory bean -->
        <bean id="producerFactory"
            class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <ref bean="producerProperties" />
            </constructor-arg>
        </bean>
    
        <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
        <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
            <constructor-arg ref="producerFactory" />
            <!--设置对应topic -->
            <property name="defaultTopic" value="app_log" />
        </bean>
    </beans>


    **消费者:**

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
                                http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                                http://www.springframework.org/schema/context 
                                http://www.springframework.org/schema/context/spring-context-4.0.xsd
                                http://www.springframework.org/schema/aop 
                                http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
                                http://www.springframework.org/schema/tx 
                                http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">
                                
        <bean id="consumerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <!--Kafka服务地址:对应服务端的地址端口号 -->
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    <!--Consumer的组ID,相同goup.id的consumer属于同一个组。 -->
                    <entry key="group.id" value="app_log" />
                    <!--如果此值设置为true,consumer会周期性的把当前消费的offset值保存到zookeeper。当consumer失败重启之后将会使用此值作为新开始消费的值。 -->
                    <entry key="enable.auto.commit" value="true" />
                    <!--网络请求的socket超时时间。实际超时时间由max.fetch.wait + socket.timeout.ms 确定 -->
                    <entry key="session.timeout.ms" value="15000 " />
                    <entry key="key.deserializer"
                        value="org.apache.kafka.common.serialization.StringDeserializer" />
                    <entry key="value.deserializer"
                        value="org.apache.kafka.common.serialization.StringDeserializer" />
                </map>
            </constructor-arg>
        </bean>
    
        <!--指定具体监听类的bean -->
        <bean id="kafkaConsumerListener" class="com.lnsoft.module.commons.kafkaConfig.KafkaConsumerListener" />
    
        <!-- 创建consumerFactory bean -->
        <bean id="consumerFactory"
            class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <ref bean="consumerProperties" />
            </constructor-arg>
        </bean>
        <!-- 实际执行消息消费的类 -->
        <bean id="containerProperties"
            class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg value="app_log" />
            <property name="messageListener" ref="kafkaConsumerListener" />
        </bean>
    
        <bean id="messageListenerContainer"
            class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
            init-method="doStart">
            <constructor-arg ref="consumerFactory" />
            <constructor-arg ref="containerProperties" />
        </bean>
    </beans>

    **消费者的监听类**

    package com.lnsoft.module.commons.kafkaConfig;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.listener.MessageListener;
    
    public class KafkaConsumerListener implements MessageListener<String, String> {
        protected final Logger LOG = LoggerFactory.getLogger(this.getClass());
    
        @Override
        public void onMessage(ConsumerRecord<String, String> consumerRecord) {
            try {
                System.out.println("=============kafkaConsumer开始消费=============");
                System.out.println("consumerRecord==" + consumerRecord);
                Object o = consumerRecord.value();
                System.out.println("consumerRecord.value()==" + String.valueOf(o));
                String topic = consumerRecord.topic();
                System.out.println("topic==" + topic);
                String key = consumerRecord.key();
                System.out.println("key==" + key);
                String value = consumerRecord.value();
                System.out.println("value==" + value);
                long offset = consumerRecord.offset();
                System.out.println("offset==" + offset);
                int partition = consumerRecord.partition();
                System.out.println("partition==" + partition);
                System.out.println("=============kafkaConsumer消费结束=============");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    ## **springboot整合kafka:**
    kafka版本:_2.10-0.10.0.0 该版本的kafka,必须使用springboot-1.5.8springboot:spring-boot-starter-parent
                    1.5.8.RELEASE</version>
    spring-kafka:org.springframework.kafka
                    1.0.6.RELEASE
    **生产者**
    *(1)生产者的application.properties配置*
    #生产者

    spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
    server.port=8080


    *(2)生产者yml方式*

    #生产者
    #server:
    #port: 8081
    #spring:
    #kafka:
    #producer:
    #bootstrap-servers: 192.168.71.11:9092,192.168.71.12:9092,192.168.71.13:9092


    *代码:*

    package com.lnsoft;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    
    import java.util.UUID;
    
    
    @Component
    @EnableScheduling
    public class KafkaProducer {
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        @Scheduled(cron = "00/5 * * * * ?")
        public void send(){
            String message= UUID.randomUUID().toString();
    
            ListenableFuture future=
                    kafkaTemplate.send("app_log",message);
                    kafkaTemplate.send("test", message);
            future.addCallback(o -> System.out.println("成功"+message),throwable -> System.out.println("失败"+message));
        }
    }


    **消费者**
    *(1)消费者的application.properties配置*
    消费者

    spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
    spring.kafka.consumer.group-id=applog
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-offset-reset=latest
    server.port=8083


    *(2)消费者yml方式**

    #消费者
    #server:
    #port: 8082
    #spring:
    #kafka:
    #consumer:
    #enable-auto-commit: true
    #group-id: applog
    #auto-offset-reset: latest
    #bootstrap-servers: 192.168.71.11:9092,192.168.71.12:9092,192.168.71.13:9092


    *代码*
     

    package com.lnsoft;
    
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    
    @Component
    public class KafkaConsumer {
    
        @KafkaListener(topics = {"app_log"})
        public void receive(String message){
            System.out.println("app_log+消费"+message);
        }
        @KafkaListener(topics = {"test"},group ="myGroup2" )
        public void receive2(String message) {
            System.out.println("app_log+消费==========" + message);
        }
    }
    

     

    展开全文
  • linux服务器上部署的kafka版本是:kafka_2.11-0.9.0.1.tgz  kafka的安装请自行百度。 在此讲述的是spring整合后的集成,并非原生api操作的集成 pom依赖 &lt;!-- kafka start --&gt; &lt;...

    linux服务器上部署的kafka版本是:kafka_2.11-0.9.0.1.tgz 

    kafka的安装请自行百度。

    在此讲述的是spring整合后的集成,并非原生api操作的集成

    pom依赖

            <!-- kafka start -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.1.1.RELEASE</version>
            </dependency>
            <!-- kafka end -->
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.2</version>
            </dependency>

     

    springboot配置文件

    ##### kafka配置start #####
    # 指定kafka 代理地址,可以多个
    spring.kafka.bootstrap-servers=192.168.106.239:9092,192.168.106.240:9092,192.168.106.241:9092
    
    #发送失败重试次数
    spring.kafka.producer.retries=0
    
    # 每次批量发送消息的数量
    spring.kafka.producer.batch-size=16384
    
    #32MB的批处理缓冲区
    spring.kafka.producer.buffer-memory=33554432
    
    #消费者群组ID,发布-订阅模式,即如果一个生产者,多个消费者都要消费,那么需要定义自己的群组,同一群组内的消费者只有一个能消费到消息
    spring.kafka.consumer.group-id=test-consumer-group
    
    #关闭自动提交offset,spring会手工提交offset
    spring.kafka.consumer.enable-auto-commit=false
    
    #key-value序列化反序列化
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    ##### kafka配置end #####

    熟悉kafka的同学都知道,kafka在消费数据时,需要维护offset(偏移量)

    一种是kafka自动提交,可以在配置中自行配置,在这里不讲述

    一种是手动提交,就是每消费一条或一批数据,人为的去提交一次,在spring没集成kafka之前,我们使用commitSync()方法提交,集成之后,只需要将自动提交配置设置为false即可。

    #关闭自动提交offset,spring会手工提交offset
    spring.kafka.consumer.enable-auto-commit=false

    User类:

    package com.ireport.demo.entity;
    
    import lombok.Data;
    import lombok.experimental.Accessors;
    import java.io.Serializable;
    
    @Data
    @Accessors(chain = true)
    public class User implements Serializable {
    
        private int id;
        private String username;
        private int age;
        private int status;
    
    }
    

     

    Producer类:

    package com.ireport.demo.kafka;
    
    
    import com.google.gson.Gson;
    import com.ireport.demo.entity.User;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    @Component
    @Slf4j
    public class Producer {
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        public void sendMsg(){
    
            while(true){
                SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmsss");
                Gson gson = new Gson();
                int x=(int)(Math.random()*100);
                User user = new User();
                user.setId(x);
                user.setAge(12);
                user.setUsername("张三_"+sdf.format(new Date()));
                user.setStatus(1);
                String msg = gson.toJson(user);
                kafkaTemplate.send("test", msg);
                log.info("----------------- sendMsg:" + msg);
                try {
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            new Producer().sendMsg();
        }
    
    }
    

    其中,send方法为生产者向kafka集群发送数据,第一个参数是topic,第二个参数是数据,topic可以不存在,会自动创建

    Consumer类:

    package com.ireport.demo.kafka;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @Slf4j
    public class Consumer  {
    
        //注解监听
        @KafkaListener(topics = {"test"})
        public void consumerMsg(ConsumerRecord<?, ?> record){
            log.info("【消费{}主题, 第{}分区, 数据:{}, 偏移量:{}】", record.topic(), record.partition(), record.value(), record.offset());
        }
    
    }
    

    @KafkaListener(topics = {"test"})这个注解是kafka监听topic入口,topic可以是多个。此注解在项目启动时会自动监听

     

    展开全文
  • springboot整合kafka实现logback收集日志

    万次阅读 2018-06-25 19:38:53
    springboot中实现logback收集日志输出到kafka 异常容错机制,如果kafka服务宕机,输出到本地文件,可用其他方式重新加载local中的数据记录; 效率比对下:也可以尝试直接用kafka客户端写入到kafka中,手动针对异常做容错...
  • 本文主要分享下Spring Boot和Spring Kafka如何配置整合,实现发送和接收来自Spring Kafka的消息。项目结构pom依赖包<?xml version="" encoding="UTF-8"?><project xmlns="" xmlns:xsi="" xsi:...
  • kafka+springboot整合代码(推荐)

    千次阅读 2019-04-18 17:42:22
    pom文件依赖: <...org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 一.application.properties配置: spring.kafka.b...
  • 目录 pom 生产者配置文件 application.properties javaConfig 事务性javaConfig配置 Controller 异步不回调发送方式 异步带回调信息 同...
  • 目录 线程消费 pom consumerConfig consumer 批量消费 javaConfig 消费者 BatchConsumer 选择自动提交还是手动提交方式和业...
  • kafka和zookeeper集群前边写过了。如果遇到kakfa说没有连接记得把kafka下logs日志都删除了,重新启动kafka集群再启动springboot服务 zookeeper https://my.oschina.net/u/3730149/blog/3071737 kafka ...
  • kafka在windows下的安装使用以及kafkaspringboot整合 kafka服务端在windows下的安装及使用 到kafka下载网址下载最新版,最新版包含zookeeper 解压,zookeeper默认端口为2181,如需要修改可以在config/zookeeper....
  • 消费者位移的提交方式以及提交时机需要根据不同的业务场景进行选择,可以看之前的博客kafka消费者相关。 这里只做应用相关,更多的使用场景,该怎么用、何时用要看前面的博客了解原理。 参考博客:...
  • Kafka整合SpringBoot

    千次阅读 2019-02-19 11:33:14
    Kafka整合SpringBoot 准备工作 假设你了解过 SpringBootKafka。 1、SpringBoot 如果对 SpringBoot 不了解的话,建议去看看DD 大佬和纯洁的微笑的系列博客。 2、Kafka Kafka 的话可以看看的博客 :Kafka ...
  • kafka整合springboot

    2019-09-22 12:00:47
    1、pom.xml添加依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> &l...
  • Kafka 整合 SpringBoot demo

    2020-01-16 15:41:18
    在windows上部署完kafka后,我又开始整合springboot... 这篇文章建立在kafka部署完的基础上... 如果还没安装运行kafka可以参照《Kafka windows下安装和使用》...
  • kafka的环境搭建和kafkaspringboot整合 kafka的环境搭建 卸载centOS内置的openjdk1.7,安装jdk1.8 zookeeper(zookeeper-3.4.5.tar.gz)搭建 这里因为是测试所以没有搭建集群。 解压zookeeper-3.4.5.tar.gz...

空空如也

1 2 3 4 5 ... 20
收藏数 1,268
精华内容 507
关键字:

微信公众号