精华内容
参与话题
问答
  • pykafka

    2019-11-12 15:56:19
    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...
    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
    本文链接:https://blog.csdn.net/q383965374/article/details/91454185
             <!--一个博主专栏付费入口结束-->
            <link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/template/css/ck_htmledit_views-d284373521.css">
                                        <div id="content_views" class="markdown_views prism-atom-one-dark">
                    <!-- flowchart 箭头图标 勿删 -->
                    <svg xmlns="http://www.w3.org/2000/svg" style="display: none;">
                        <path stroke-linecap="round" d="M5,0 0,2.5 5,5z" id="raphael-marker-block" style="-webkit-tap-highlight-color: rgba(0, 0, 0, 0);"></path>
                    </svg>
                                            <p>python连接kafka的标准库比较流行的有<br>
    

    1、kafka-python

    2、pykafka

    kafka-python使用的人多是比较成熟的库,

    pykafka是Samsa的升级版本,使用samsa连接zookeeper然后使用kafka Cluster。

    区别:
    pykafka的对zookeeper支持而kafka-python并没有zk的支持

    kafka-python使用

    操作文档

    https://kafka-python.readthedocs.io/en/master/apidoc/modules.html

    https://kafka-python.readthedocs.io/en/master/index.html

    https://pypi.org/project/kafka-python/

    生产者

    import time
    from kafka import KafkaProducer
    

    producer = KafkaProducer(bootstrap_servers = [‘192.168.17.64:9092’, ‘192.168.17.65:9092’, ‘192.168.17.68:9092’])

    Assign a topic

    topic = ‘test’

    def test():
    print(‘begin’)
    n = 1
    try:
    while (n<=100):
    producer.send(topic, str(n).encode())
    print(“send” + str(n))
    n += 1
    time.sleep(0.5)
    except KafkaError as e:
    print(e)
    finally:
    producer.close()
    print(‘done’)

    def test_json():
    msg_dict = {
    “sleep_time”: 10,
    “db_config”: {
    “database”: “test_1”,
    “host”: “xxxx”,
    “user”: “root”,
    “password”: “root”
    },
    “table”: “msg”,
    “msg”: “Hello World”
    }
    msg = json.dumps(msg_dict)
    producer.send(topic, msg, partition=0)
    producer.close()

    if name == ‘main’:
    test()

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    可能遇到的问题–IOError: [Errno 24] Too many open files–多次创建KafkaProducer

    在每个controller函数中创建一个SimpleProducer。切换到KafkaProducer后,依然在每个controller中创建新的KafkaProducer。如下所示:

    try:
       producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format(
           kafka_host=app.config['KAFKA_HOST'],
           kafka_port=app.config['KAFKA_PORT']
       )])
       message_string = json.dumps(message)
       response = producer.send(kafka_topic, message_string.encode('utf-8'))
       producer.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    原因是每次创建KafkaProducer都会占用一个文件符号,controller结束时,没有释放,导致后面无法继续创建新的KafkaProducer。

    解决方法是创建全局KafkaProducer,供所有controller使用。

    注意事项–慎用RecordMetadata.get()

    官方例子中有如下的代码

    producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
    

    Asynchronous by default

    future = producer.send(‘my-topic’, b’raw_bytes’)

    Block for ‘synchronous’ sends

    try:
    record_metadata = future.get(timeout=10)
    except KafkaError:
    # Decide what to do if produce request failed…
    log.exception()
    pass

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    KafkaProducer.send 返回 RecordMetadata 对象,RecordMetadata.get 可以获取 record 的信息。但在发送大量消息时,get方法可能会造成明显的延时。所以当我们不关心消息是否发送成功时,就不要调用get方法了。

    消费者

    #!/bin/env python
    from kafka import KafkaConsumer
    

    #connect to Kafka server and pass the topic we want to consume
    consumer = KafkaConsumer(‘test’, group_id = ‘test_group’, bootstrap_servers = [‘192.168.17.64:9092’, ‘192.168.17.65:9092’, ‘192.168.17.68:9092’])
    try:
    for msg in consumer:
    print(msg)
    print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition,msg.offset, msg.key, msg.value))
    except KeyboardInterrupt, e:
    print(e)

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    设置不自动提交

    自动提交位移设为flase, 默认为取最新的偏移量

    consumer = kafka.KafkaConsumer(bootstrap_servers = ['192.168.17.64:9092','192.168.17.65:9092','192.168.17.68:9092'],
                            group_id ='test_group_id',
                            auto_offset_reset ='latest', 
                            enable_auto_commit = False)
    
    • 1
    • 2
    • 3
    • 4

    批量发送数据

    from kafka import KafkaClient
    from kafka.producer import SimpleProducer
    

    def send_data_2_kafka(datas):
    ‘’’
    向kafka解析队列发送数据
    ‘’’
    client = KafkaClient(hosts=KAFKABROKER.split(","), timeout=30)
    producer = SimpleProducer(client, async=False)

    curcount = len(datas)/PARTNUM
    for i in range(0, PARTNUM):
    start = i*curcount
    if i != PARTNUM - 1:
    end = (i+1)*curcount
    curdata = datas[start:end]
    producer.send_messages(TOPICNAME, *curdata)
    else:
    curdata = datas[start:]
    producer.send_messages(TOPICNAME, *curdata)

    producer.stop()
    client.close()

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    其中PARTNUM为topic的partition的数目,这样保证批量发送的数据均匀的落在kafka的partition中。

    消费者订阅多个主题

    # =======订阅多个消费者==========
    

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition

    consumer = KafkaConsumer(bootstrap_servers=[‘127.0.0.1:9092’])
    consumer.subscribe(topics=(‘test’,‘test0’)) #订阅要消费的主题
    print(consumer.topics())
    print(consumer.position(TopicPartition(topic=‘test’, partition=0))) #获取当前主题的最新偏移量
    for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    消费者定时拉取

    有时候,我们并不需要实时获取数据,因为这样可能会造成性能瓶颈,我们只需要定时去获取队列里的数据然后批量处理就可以,这种情况,我们可以选择主动拉取数据

    from kafka import KafkaConsumer
    import time
    

    consumer = KafkaConsumer(group_id=‘123456’, bootstrap_servers=[‘10.43.35.25:4531’])
    consumer.subscribe(topics=(‘test_rhj’,))
    index = 0
    while True:
    msg = consumer.poll(timeout_ms=5) # 从kafka获取消息
    print msg
    time.sleep(2)
    index += 1
    print ‘--------poll index is %s----------’ % index

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    每次拉取到的都是前面生产的数据,可能是多条的列表,也可能没有数据,如果没有数据,则拉取到的为空。

    消费者读取最早偏移量

    from kafka import KafkaConsumer
    

    consumer = KafkaConsumer(‘test’,auto_offset_reset=‘earliest’,bootstrap_servers=[‘127.0.0.1:9092’])

    for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
    源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}

    消费者手动设置偏移量

    # ==========读取指定位置消息===============
    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    

    consumer = KafkaConsumer(‘test’,bootstrap_servers=[‘127.0.0.1:9092’])

    print(consumer.partitions_for_topic(“test”)) #获取test主题的分区信息
    print(consumer.topics()) #获取主题列表
    print(consumer.subscription()) #获取当前消费者订阅的主题
    print(consumer.assignment()) #获取当前消费者topic、分区信息
    print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量
    consumer.seek(TopicPartition(topic=‘test’, partition=0), 5) #重置偏移量,从第5个偏移量消费
    for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消费者挂起和恢复

    # ==============消息恢复和挂起===========
    

    from kafka import KafkaConsumer
    from kafka.structs import TopicPartition
    import time

    consumer = KafkaConsumer(bootstrap_servers=[‘127.0.0.1:9092’])
    consumer.subscribe(topics=(‘test’))
    consumer.topics()
    consumer.pause(TopicPartition(topic=u’test’, partition=0)) # pause执行后,consumer不能读取,直到调用resume后恢复。
    num = 0
    while True:
    print(num)
    print(consumer.paused()) #获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print(msg)
    time.sleep(2)
    num = num + 1
    if num == 10:
    print(“resume…”)
    consumer.resume(TopicPartition(topic=‘test’, partition=0))
    print(“resume…”)

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    pykafka使用

    操作文档

    http://pykafka.readthedocs.io/en/latest/
    https://github.com/Parsely/pykafka

    需要注意的点

    kafaka和zookeeper的群集,使用samsa的时候生产者和消费者都连接了zookeeper,但pykafka文档中生产者直接连接kafaka服务器列表,消费者才用zookeeper。

    生产者

    #coding=utf-8
    

    import time
    from pykafka import KafkaClient

    class KafkaTest(object):
    “”"
    测试kafka常用api
    “”"
    def init(self, host=“192.168.0.10:9092”):
    self.host = host
    self.client = KafkaClient(hosts=self.host)

    def producer_partition(self, topic):
        """
        生产者分区查看,主要查看生产消息时offset的变化
        :return:
        """
        topic = self.client.topics[topic.encode()]
        partitions = topic.partitions
        print (u"查看所有分区 {}".format(partitions))
    
        earliest_offset = topic.earliest_available_offsets()
        print(u"获取最早可用的offset {}".format(earliest_offset))
    
        # 生产消息之前看看offset
        last_offset = topic.latest_available_offsets()
        print(u"最近可用offset {}".format(last_offset))
    
        # 同步生产消息
        p = topic.get_producer(sync=True)
        p.produce(str(time.time()).encode())
    
        # 查看offset的变化
        last_offset = topic.latest_available_offsets()
        print(u"最近可用offset {}".format(last_offset))
    
    def producer_designated_partition(self, topic):
        """
        往指定分区写消息,如果要控制打印到某个分区,
        需要在获取生产者的时候指定选区函数,
        并且在生产消息的时候额外指定一个key
        :return:
        """
    
        def assign_patition(pid, key):
            """
            指定特定分区, 这里测试写入第一个分区(id=0)
            :param pid: 为分区列表
            :param key:
            :return:
            """
            print("为消息分配partition {} {}".format(pid, key))
            return pid[0]
    
        topic = self.client.topics[topic.encode()]
        p = topic.get_producer(sync=True, partitioner=assign_patition)
        p.produce(str(time.time()).encode(), partition_key=b"partition_key_0")
    
    def async_produce_message(self, topic):
        """
        异步生产消息,消息会被推到一个队列里面,
        另外一个线程会在队列中消息大小满足一个阈值(min_queued_messages)
        或到达一段时间(linger_ms)后统一发送,默认5s
        :return:
        """
        topic = self.client.topics[topic.encode()]
        last_offset = topic.latest_available_offsets()
        print("最近的偏移量 offset {}".format(last_offset))
    
        # 记录最初的偏移量
        old_offset = last_offset[0].offset[0]
        p = topic.get_producer(sync=False, partitioner=lambda pid, key: pid[0])
        p.produce(str(time.time()).encode())
        s_time = time.time()
        while True:
            last_offset = topic.latest_available_offsets()
            print("最近可用offset {}".format(last_offset))
            if last_offset[0].offset[0] != old_offset:
                e_time = time.time()
                print('cost time {}'.format(e_time-s_time))
                break
            time.sleep(1)
    
    def get_produce_message_report(self, topic):
        """
        查看异步发送消报告,默认会等待5s后才能获得报告
        """
        topic = self.client.topics[topic.encode()]
        last_offset = topic.latest_available_offsets()
        print("最近的偏移量 offset {}".format(last_offset))
        p = topic.get_producer(sync=False, delivery_reports=True, partitioner=lambda pid, key: pid[0])
        p.produce(str(time.time()).encode())
        s_time = time.time()
        delivery_report = p.get_delivery_report()
        e_time = time.time()
        print ('等待{}s, 递交报告{}'.format(e_time-s_time, delivery_report))
        last_offset = topic.latest_available_offsets()
        print("最近的偏移量 offset {}".format(last_offset))
    

    if name == ‘main’:
    host = ‘192.168.0.10:9092,192.168.0.12:9092,192.168.0.13:9092’
    kafka_ins = KafkaTest(host)
    topic = ‘test’
    # kafka_ins.producer_partition(topic)
    # kafka_ins.producer_designated_partition(topic)
    # kafka_ins.async_produce_message(topic)
    kafka_ins.get_produce_message_report(topic)

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110

    可能出现的问题–dliver_report(包括同步)子进程阻塞

    多进程使用pykafka共享一个client,会造成只有进程能够正常的写入数据,如果使用了dliver_report(包括同步),会导致子进程彻底阻塞掉不可用

    可能出现的问题–Producer.produce accepts a bytes object as message

    使用producer.produce发送数据出现故障,如下

    #!/bin/env python
    from pykafka import KafkaClient
    host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092'
    client = KafkaClient(hosts = host)
    topic = client.topics["test"]
    with topic.get_sync_producer() as producer:
       for i in range(100):
           producer.produce('test message ' + str(i ** 2))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    报错:

    Traceback (most recent call last):
      File "TaxiKafkaProduce.py", line 15, in <module>
        producer.produce(('test message ' + str(i ** 2)))
      File "/root/anaconda3/lib/python3.6/site-packages/pykafka/producer.py", line 325, in produce
        "got '%s'", type(message))
    TypeError: ("Producer.produce accepts a bytes object as message, but it got '%s'", <class 'str'>)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    因为kafka传递的是字节,不是字符串,因此在传递字符串处encode()即可,分别是client.topics和producer.produce(),如下:

    #!/bin/env python
    from pykafka import KafkaClient
    host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092'
    client = KafkaClient(hosts = host)
    topic = client.topics["test".encode()]
    # 将产生kafka同步消息,这个调用仅仅在我们已经确认消息已经发送到集群之后
    with topic.get_sync_producer() as producer:
        for i in range(100):
            producer.produce(('test message ' + str(i ** 2)).encode())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    同步与异步

    from pykafka import KafkaClient
    #可接受多个client
    client = KafkaClient(hosts ="192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092") 
    #查看所有的topic
    client.topics
    print client.topics
    

    topic = client.topics[‘test_kafka_topic’]#选择一个topic

    message = “test message test message”

    当有了topic之后呢,可以创建一个producer,来发消息,生产kafka数据,通过字符串形式,

    with topic.get_sync_producer() as producer:
    producer.produce(message)

    以上的例子将产生kafka同步消息,这个调用仅仅在我们已经确认消息已经发送到集群之后

    #但生产环境,为了达到高吞吐量,要采用异步的方式,通过delivery_reports =True来启用队列接口;
    producer = topic.get_producer(sync=False, delivery_reports=True)
    producer.produce(message)
    try:
    msg, exc = producer.get_delivery_report(block=False)
    if exc is not None:
    print ‘Failed to deliver msg {}: {}’.format(msg.partition_key, repr(exc))
    else:
    print ‘Successfully delivered msg {}’.format(msg.partition_key)
    except Queue.Empty:
    pass

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    消费者

    pykafka消费者分为simple和balanced两种

    simple适用于需要消费指定分区且不需要自动的重分配(自定义)
    balanced自动分配则选择

    #coding=utf-8
    

    from pykafka import KafkaClient

    class KafkaTest(object):
    def init(self, host=“192.168.237.129:9092”):
    self.host = host
    self.client = KafkaClient(hosts=self.host)

    def simple_consumer(self, topic, offset=0):
        """
        消费者指定消费
        :param offset:
        :return:
        """
    
        topic = self.client.topics[topic.encode()]
        partitions = topic.partitions
        last_offset = topic.latest_available_offsets()
        print("最近可用offset {}".format(last_offset))  # 查看所有分区
        consumer = topic.get_simple_consumer(b"simple_consumer_group", partitions=[partitions[0]])  # 选择一个分区进行消费
        offset_list = consumer.held_offsets
        print("当前消费者分区offset情况{}".format(offset_list))  # 消费者拥有的分区offset的情况
        consumer.reset_offsets([(partitions[0], offset)])  # 设置offset
        msg = consumer.consume()
        print("消费 :{}".format(msg.value.decode()))
        msg = consumer.consume()
        print("消费 :{}".format(msg.value.decode()))
        msg = consumer.consume()
        print("消费 :{}".format(msg.value.decode()))
        offset = consumer.held_offsets
        print("当前消费者分区offset情况{}".format(offset)) # 3
    
    def balance_consumer(self, topic, offset=0):
        """
        使用balance consumer去消费kafka
        :return:
        """
        topic = self.client.topics["kafka_test".encode()]
        # managed=True 设置后,使用新式reblance分区方法,不需要使用zk,而False是通过zk来实现reblance的需要使用zk
        consumer = topic.get_balanced_consumer(b"consumer_group_balanced2", managed=True)
        partitions = topic.partitions
        print("分区 {}".format(partitions))
        earliest_offsets = topic.earliest_available_offsets()
        print("最早可用offset {}".format(earliest_offsets))
        last_offsets = topic.latest_available_offsets()
        print("最近可用offset {}".format(last_offsets))
        offset = consumer.held_offsets
        print("当前消费者分区offset情况{}".format(offset))
        while True:
            msg = consumer.consume()
            offset = consumer.held_offsets
            print("{}, 当前消费者分区offset情况{}".format(msg.value.decode(), offset))
    

    if name == ‘main’:
    host = ‘192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092’
    kafka_ins = KafkaTest(host)
    topic = ‘test’
    # kafka_ins.simple_consumer(topic)
    kafka_ins.balance_consumer(topic)

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    连接zookeeper

    >>>> balanced_consumer = topic.get_balanced_consumer(
     consumer_group='testgroup',
     auto_commit_enable=True,  # 设置为Flase的时候不需要添加 consumer_group
     zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot' # 这里就是连接多个zk
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5

    使用consumber_group和consumer_id

    # -* coding:utf8 *-  
    from pykafka import KafkaClient
    

    host = ‘192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092’
    client = KafkaClient(hosts = host)

    print(client.topics)

    消费者

    topic = client.topics[‘test’.encode()]
    consumer = topic.get_simple_consumer(consumer_group=‘test_group’,
    # 设置为False的时候不需要添加consumer_group,直接连接topic即可取到消息
    auto_commit_enable=True,
    auto_commit_interval_ms=1,
    #这里就是连接多个zk
    zookeeper_connect=‘192.168.17.64:2181,192.168.17.65:2181,192.168.17.68:2181’
    consumer_id=‘test_id’)

    for message in consumer:
    if message is not None:
    #打印接收到的消息体的偏移个数和值
    print(message.offset, message.value)

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    可能遇到的问题–AttributeError: ‘SimpleConsumer’ object has no attribute ‘_consumer_group’

    因为kafka在传输的时候需要bytes,而不是str,所以在str上加上b标识就可以,如下:

    # -* coding:utf8 *-  
    from pykafka import KafkaClient
    

    host = ‘192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092’
    client = KafkaClient(hosts = host)

    print(client.topics)

    消费者

    topic = client.topics[‘test’.encode()]
    consumer = topic.get_simple_consumer(consumer_group=b’test_group’, auto_commit_enable=True, auto_commit_interval_ms=1, consumer_id=b’test_id’)

    for message in consumer:
    if message is not None:
    print(message.offset, message.value.decode(‘utf-8’))

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    不要重复消费,对已经消费过的信息进行舍弃

    不希望消费历史数据的时候,需要使用auto_commit_enable这个参数

     consumer = topic.get_simple_consumer(consumer_group=b'test_group', 
                                 auto_commit_enable=True, 
                                 auto_commit_interval_ms=1, 
                                 consumer_id=b'test_id')
    
    • 1
    • 2
    • 3
    • 4
                                    </div>
                <link href="https://csdnimg.cn/release/phoenix/mdeditor/markdown_views-526ced5128.css" rel="stylesheet">
                    </div>
    
    展开全文
  • <div><p>PyKafka throws an error occasionally when trying to iterate through messages in a Kafka queue. I imagine the issue is some consumer or zookeeper timeout issue, since I know that I am not ...
  • <div><p>I use pip install pykafka, and encountered problems. python version 2.6.6 pykafka version 1.0.3 <p>I am not sure it is python version issue. please help. [root-av1 ~]# easy_install pykafka...
  • <div><p><strong>PyKafka version</strong>: <code>pykafka==2.4.0 <strong>Kafka version</strong>: N\A <p>pykafka <code>2.4.0</code> is failing to import due to revision on <code>gevent</code> ...
  • <div><p>when I use pykafka, it raise an exception which I don't know the problem, anyone know?: <pre><code> No handlers could be found for logger "pykafka.simpleconsumer" Process Process-...
  • Can't import pykafka.rdkafka

    2020-11-28 06:57:23
    I have librdkafka installed with pykafka on my supervisor machines. I can run the Python binary in my topology virtualenv and successfully call <code>from pykafka import rdkafka</code>. However, when...
  • /usr/lib/python2.7/site-packages/pykafka/simpleconsumer.py", line 277, in fetcher self.fetch() File "/usr/lib/python2.7/site-packages/pykafka/simpleconsumer.py", line 576, in fetch min_...
  • <div><p>Hello. I'd like to play around with Kafka, but I don't know which client to use to ...ll start with PyKafka in the meantime. :)</p><p>该提问来源于开源项目:Parsely/pykafka</p></div>
  • pykafka consumer

    2020-09-27 15:27:37
    from pykafka import KafkaClient client = KafkaClient(hosts="localhost:9092") topic = client.topics['maoyan_wish'] consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True,...
    from pykafka import KafkaClient
    
    client = KafkaClient(hosts="localhost:9092")
    
    topic = client.topics['maoyan_wish']
    consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, consumer_id='test')
    for message in consumer:
      if message is not None:
        print(message.offset, message.value)
    

    这个程序貌似是不会停止的

    展开全文
  • pykafka性能比较

    千次阅读 2017-05-12 22:41:32
    最近做了一下pykafka的性能测试,主要涉及到use_greenlets、use_rdkafka、sync这三个参数。 1. 测试的数据 我用一个770MB的日志文件来作为测试数据,文件包含的行数为10175702 行。 2. 测试的demo 在写测试demo...

    最近做了一下pykafka的性能测试,主要涉及到use_greenletsuse_rdkafkasync这三个参数。
    1. 测试的数据
    我用一个770MB的日志文件来作为测试数据,文件包含的行数为10175702 行。
    2. 测试的demo
    在写测试demo的时候遇到了几个问题,别看这么简单、很短的代码却也遇到了几个”棘手”的问题。

    #!env python
    #coding=utf-8
    # 
    # Author:       liuxingen@nsfocus.com
    # 
    # Created Time: 2017年05月10日 星期三 21时58分38秒
    # 
    # FileName:     test_pykafka.py
    # 
    # Description:  
    # 
    # ChangeLog:
    
    import time
    import pykafka
    import traceback
    
    if __name__ == '__main__':
        global producer
        client = pykafka.KafkaClient(hosts = 'xx.xx.xx.xx:9092')
        producer = client.topics['test_pykafka_out'].get_producer()
        fp = open('/home/lxg/logs/access.log')
        i = 0
        for line in fp:
            producer.produce(line)

    首先这个demo就是要测试默认参数的情况下pykafka的发送性能,我用这个demo跑了几遍测试数据,其中出现了几次ReferenceError: weakly-referenced object no longer exists错误,而且都是在数据全部已经发送完程序要退出的时候,根据error从google里面搜出来的一个pykafka的issue ReferenceError: weakly-referenced object no longer exists #422,简单点说就是我们因为使用的是异步发送数据,pykafka内部会有一个队列缓存我们发送的数据,但是我们的程序提前退出而缓存队列里面还有未完全发送完的数据,这样就导致了这个error。修改的方法其实很简单,就是在退出之前调用producer.stop(),其实我在正式的代码中是有这么一行的,只是在写demo的时候没有意识到这句话的重要性。
    其实这个问题我觉得应该也算是一个比较重要的一类问题,就是我们在使用一些api库的时候一定要先仔细的阅读api的文档,别看你只要用到api中很简单的几个接口,但是很有可能你就会引入一个隐藏很深的严重问题。如果你实在没空去仔细阅读api文档,那么在你动手之前一定要先看看api提供的demo代码并且按照demo的代码流程来组织你自己的代码,因为demo代码是这个api库提供的最标准的代码。
    修改了上面的这个问题,然后接着跑用use_rdkafka = True参数的用例(要使用use_rdkafka = True必须先安装librdkafka这个库),这个时候又出现了另外的一个问题,程序出现了pykafka.exceptions.ProducerQueueFullError错误。
    看到这个错误的时候我首先想到的就是block_on_queue_full参数,但是这个参数默认就是True,也就是说在发送缓存队列满了的时候producer是等待而不是抛出异常,为了万无一失我在代码中还是明确设定block_on_queue_full = True,结果还是会抛出异常。
    貌似是这个参数在rdkafka中无效,在pykafka/rdkafka/producer.py中的_mk_rdkafka_config_lists()函数中可以证实我们的猜想,这个函数是形成librdkafka的配置参数,在这个函数中没有block_on_queue_full参数,也就是说即使我们在producer中设置block_on_queue_full也会不生效。在librdkafka producer-api中有提到:

    rd_kafka_produce() is a non-blocking API, it will enqueue the message on an internal queue and return immediately. If the number of queued messages would exceed the “queue.buffering.max.messages” configuration property then rd_kafka_produce() returns -1 and sets errno to ENOBUFS, thus providing a backpressure mechanism.

    结合源码_rd_kafkamodule.c:

        Py_BEGIN_ALLOW_THREADS
            res = rd_kafka_produce(self->rdk_topic_handle,
                                   p_id,
                                   0,  /* ie don't copy and don't dealloc v */
                                   v, v_len,
                                   pk, pk_len,
                                   (void *)message);
        Py_END_ALLOW_THREADS
        if (res == -1) {
            rd_kafka_resp_err_t err = rd_kafka_errno2err(errno);
            if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
                set_pykafka_error("ProducerQueueFullError", "");
                goto failed;
            } else {
                /* Any other errors should go through the report queue,
                 * because that's where pykafka.Producer would put them */
                PyObject *put_func = (PyObject *)rd_kafka_opaque(self->rdk_handle);
                if (-1 == Producer_delivery_report_put(put_func, message, err)) {
                    goto failed;
                }
            }
            Py_DECREF(message);  /* There won't be a delivery-callback */
        }

    rd_kafka_produce返回-1的时候会抛出ProducerQueueFullError异常,也就是说我们没有办法不让ProducerQueueFullError异常抛出,我们只能捕获异常,然后重新发送消息,修改后的demo代码如下:

    #!env python
    #coding=utf-8
    # 
    # Author:       liuxingen@nsfocus.com
    # 
    # Created Time: 2017年05月10日 星期三 21时58分38秒
    # 
    # FileName:     test_pykafka.py
    # 
    # Description:  
    # 
    # ChangeLog:
    
    import time
    import pykafka
    import traceback
    
    if __name__ == '__main__':
        global producer
        client = pykafka.KafkaClient(hosts = 'xx.xx.xx.xx:9092')
        producer = client.topics['test_pykafka_out'].get_producer()
        fp = open('/home/lxg/logs/access.log')
        i = 0
        for line in fp:
            while i < 20:
                try:
                    producer.produce(line)
                    break
                except pykafka.exceptions.ProducerQueueFullError,e:
                    time.sleep(0.1)
                    i += 1
    
            i = 0
        producer.stop()

    3.测试结果
    a). 如果设置了producer的sync = True来同步发送数据,那么最后的时间超过了30min,因为运行时间太长所以我没有等程序运行完就Ctrl+C结束了任务,因为完全不能忍受。
    b). 如果设置KafkaClient的use_greenlets=True,并且安装greenlets,最后运行的时间如下(多次结果基本一致):

    第一次运行的结果
    real 5m18.654s
    user 4m21.136s
    sys 0m1.412s
    第二次运行的结果
    real 5m20.063s
    user 4m22.368s
    sys 0m1.228s

    c). 如果设置producer的use_rdkafka = True,并且在安装pykafka之前已经安装了librdkafka,最后运行的时间如下:

    第一次运行的结果
    real 2m11.423s
    user 1m57.708s
    sys 0m31.984s
    第二次运行的结果
    real 2m10.071s
    user 2m0.128s

    d). 如果采用默认的参数,也就是用threading的方式,运行的时间如下:

    第一次运行的结果
    real 5m49.165s
    user 5m12.560s
    sys 0m32.904s
    第二次运行的结果
    real 5m48.189s
    user 5m15.112s
    sys 0m31.220s

    4.结论
    虽然用greenlets能提高pykafka的效率,但是提升有限,但是如果使用rdkafka的话效率有成倍的提升。
    PS:pykafka的consumer就只有simpleconsumer能用rdkafka,balanced_consumer就不支持rdkafka了。

    展开全文
  • python操作pykafka

    千次阅读 2018-04-25 10:21:19
    from pykafka import KafkaClient client = KafkaClient(hosts ="127.0.0.1:9092") #可接受多个client #查看所有的topic topic = client.topics[b'test'] print(topic) #选择一个topic message ="...
    from pykafka import KafkaClient
    client = KafkaClient(hosts ="127.0.0.1:9092") #可接受多个client
    #查看所有的topic
    topic = client.topics[b'test']
    print(topic)
    #选择一个topic
    message ="test message test message"
    #生产kafka数据,通过字符串形式
    producer=topic.get_producer()
    producer.produce(b'message')
    print (message)

    因为kafka在传输的时候需要bytes,而不是str,所以在str上加上b标识



    展开全文
  • pykafka 调用栈

    2018-01-05 16:13:36
    使用pykafka向kafka管道中发送同步消息,但是总会出现message丢失导致的死循环。 client初始化是根据标记use_greenlets设置handler是否使用greenlets实现并行,默认使用ThreadingHandler。 在Producer的init函数里...
  • pykafka简单应用

    2017-07-10 19:15:08
    简单的pykafka生产/消费/再生产过程;
  • <div><p>Reporting this bug as I've heard it through . I don't have any information, as I'm new to the project, but want to post this issue to get other reports ...Parsely/pykafka</p></div>
  • 今天小编就为大家分享一篇Python测试Kafka集群(pykafka)实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
  • 今天小编就为大家分享一篇通过pykafka接收Kafka消息队列的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
  • pykafka的NoBrokersAvailableError原因

    万次阅读 2016-11-17 16:34:27
    pykafka NoBrokersAvailableError kafka
  • 今天小编就为大家分享一篇python3连接kafka模块pykafka生产者简单封装代码,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
  • 安装rdkafka、pykafka

    2019-11-06 11:02:13
    安装rdkafka yum -y update gcc yum -y install gcc+ gcc-c++ git clone https://github.com/edenhill/librdkafka.git cd librdkafka/ ./configure ...默认是把动态库安装到/usr/local/lib下的,所以...
  • pykafka压力测试代码

    2018-09-25 20:59:32
    利用pykafka压力测试kafka,利用了多进程模式,根据自己机器调高进程数; with topic.get_producer(delivery_reports=True) as producer: 如果使用上面的delivery_reports=True配置,能到达每秒几百兆的并发量,...
  • pykafka的使用心得

    2019-10-25 14:35:55
    一、kafka常见名称:broker、cluster、producer、consumer、partition、group broker:节点,说直白点就是kafka服务部署时使用的服务器数量,eg.一台服务器就是一个节点,以此类推 cluster:集群,也就是部署的一...
  • pykafka压力测试代码(多进程模式)
  • pykafka写字典到Kafka

    千次阅读 2019-06-14 12:51:40
    最近做项目遇到将字典数据存储到Kafka,由于在Python里面只接收字节数据,于是我将字典转化为json串之后再进行二进制编码写入Kafka,如下所...from pykafka import KafkaClient from novel.items import NovelItem ...
  • Kafka 及 PyKafka 的使用

    千次阅读 2019-06-27 16:39:47
    Kafka 及 PyKafka 的使用
  • pykafka多个消费者

    2019-04-18 14:51:51
    最近在用kafka要求是多个消费者消费同一个topic,在网上找了很多最后才确定加入 auto_commit_enable=True 下边把我的消费者代码给贴出来 client = KafkaClient(hosts="127.0.0.1:9092") ...
  • pykafka生产消费常用api

    2018-07-27 13:54:57
    pykafka生产消费常用api pykafka基本生产消费常用api 生产者 案例 Python #coding=utf-8 import time from py<span class="wp_keywordlink_affiliate"><...

空空如也

1 2 3 4 5 ... 19
收藏数 361
精华内容 144
关键字:

pykafka