kafka 数据结构_kafka数据结构 - CSDN
  • kafka源码-request&response数据结构

    目录

    1.概述

    2.解析request源码

    2.1 RequestHeader

    2.2 parseRequest

    3.基础数据结构类

    3.1Type类:

    3.2 ArrayOf类:

    3.3 Field类:

    3.4 Schema类

    3.5 Sturct类

    3.6 ApiKeys

    4.请求交互二进制数据格式

    4.1 MetaDataRequest二进制格式

    4.2 ProducerRequest二进制格式

    4.3 FetchRequest二进制格式

    .......

    5.关系图:


    1.概述

    kafka启动时做很多初始化运行环境工作,具体请参考:kafka源码--broker的基础模块serversocket

    其中SockeServer类启动时,首先初始化NIO网络环境、启动监听、创建主线程、工作线程池、设置参数等等。

    kafka的所有逻辑处理和交互实际是交给KafkaApi类来处理的。

    通过请求的类型,把具体的request路由到对应的handler处理。

    2.解析request源码

    2.1 RequestHeader

    val header = RequestHeader.parse(receive.payload)
    public class RequestHeader implements AbstractRequestResponse {
    
     public static RequestHeader parse(ByteBuffer buffer) {
            short apiKey = -1;
            try {
                apiKey = buffer.getShort();
                short apiVersion = buffer.getShort();
                short headerVersion = ApiKeys.forId(apiKey).requestHeaderVersion(apiVersion);
                // 设置position=0,mark=-1,为了后面重新读取
                buffer.rewind();
                return new RequestHeader(new RequestHeaderData(
                    new ByteBufferAccessor(buffer), headerVersion), headerVersion);
            } catch (UnsupportedVersionException e) {
                throw new InvalidRequestException("Unknown API key " + apiKey, e);
            } catch (Throwable ex) {
                throw new InvalidRequestException("Error parsing request header. Our best guess of the apiKey is: " +
                        apiKey, ex);
            }
        }
    }
    public class RequestHeaderData implements ApiMessage {
    
        @Override
        public void read(Readable _readable, short _version) {
            // 读取2个字节数据设置到requestApiKey
            this.requestApiKey = _readable.readShort();
            // 读取2个字节数据设置到requestApiVersion
            this.requestApiVersion = _readable.readShort();
            // 读取4个字节数据设置到correlationId
            this.correlationId = _readable.readInt();
            if (_version >= 1) {
                int length;
                length = _readable.readShort();
                if (length < 0) {
                    this.clientId = null;
                } else if (length > 0x7fff) {
                    throw new RuntimeException("string field clientId had invalid length " + length);
                } else {
                    this.clientId = _readable.readString(length);
                }
            } else {
                this.clientId = "";
            }
            this._unknownTaggedFields = null;
            if (_version >= 2) {
                int _numTaggedFields = _readable.readUnsignedVarint();
                for (int _i = 0; _i < _numTaggedFields; _i++) {
                    int _tag = _readable.readUnsignedVarint();
                    int _size = _readable.readUnsignedVarint();
                    switch (_tag) {
                        default:
                            this._unknownTaggedFields = _readable.readUnknownTaggedField(this._unknownTaggedFields, _tag, _size);
                            break;
                    }
                }
            }
        }
    
    }

    2.2 parseRequest

    
    public class RequestContext implements AuthorizableRequestContext {
    
        public RequestAndSize parseRequest(ByteBuffer buffer) {
            if (isUnsupportedApiVersionsRequest()) {
                // Unsupported ApiVersion requests are treated as v0 requests and are not parsed
                ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest(new ApiVersionsRequestData(), (short) 0, header.apiVersion());
                return new RequestAndSize(apiVersionsRequest, 0);
            } else {
                ApiKeys apiKey = header.apiKey();
                try {
                    short apiVersion = header.apiVersion();
                    Struct struct = apiKey.parseRequest(apiVersion, buffer);
                    AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct);
                    return new RequestAndSize(body, struct.sizeOf());
                } catch (Throwable ex) {
                    throw new InvalidRequestException("Error getting request for apiKey: " + apiKey +
                            ", apiVersion: " + header.apiVersion() +
                            ", connectionId: " + connectionId +
                            ", listenerName: " + listenerName +
                            ", principal: " + principal, ex);
                }
            }
        }
    
    }
    public abstract class AbstractRequest implements AbstractRequestResponse {
        /**
         * Factory method for getting a request object based on ApiKey ID and a version
         */
        public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Struct struct) {
            switch (apiKey) {
                case PRODUCE:
                    return new ProduceRequest(struct, apiVersion);
                case FETCH:
                    return new FetchRequest(struct, apiVersion);
                case LIST_OFFSETS:
                    return new ListOffsetRequest(struct, apiVersion);
                case METADATA:
                    return new MetadataRequest(struct, apiVersion);
                case OFFSET_COMMIT:
                    return new OffsetCommitRequest(struct, apiVersion);
                case OFFSET_FETCH:
                    return new OffsetFetchRequest(struct, apiVersion);
                case FIND_COORDINATOR:
                    return new FindCoordinatorRequest(struct, apiVersion);
                case JOIN_GROUP:
                    return new JoinGroupRequest(struct, apiVersion);
                case HEARTBEAT:
                    return new HeartbeatRequest(struct, apiVersion);
                case LEAVE_GROUP:
                    return new LeaveGroupRequest(struct, apiVersion);
                case SYNC_GROUP:
                    return new SyncGroupRequest(struct, apiVersion);
                case STOP_REPLICA:
                    return new StopReplicaRequest(struct, apiVersion);
                case CONTROLLED_SHUTDOWN:
                    return new ControlledShutdownRequest(struct, apiVersion);
                case UPDATE_METADATA:
                    return new UpdateMetadataRequest(struct, apiVersion);
                case LEADER_AND_ISR:
                    return new LeaderAndIsrRequest(struct, apiVersion);
                case DESCRIBE_GROUPS:
                    return new DescribeGroupsRequest(struct, apiVersion);
                case LIST_GROUPS:
                    return new ListGroupsRequest(struct, apiVersion);
                case SASL_HANDSHAKE:
                    return new SaslHandshakeRequest(struct, apiVersion);
                case API_VERSIONS:
                    return new ApiVersionsRequest(struct, apiVersion);
                case CREATE_TOPICS:
                    return new CreateTopicsRequest(struct, apiVersion);
                case DELETE_TOPICS:
                    return new DeleteTopicsRequest(struct, apiVersion);
                case DELETE_RECORDS:
                    return new DeleteRecordsRequest(struct, apiVersion);
                case INIT_PRODUCER_ID:
                    return new InitProducerIdRequest(struct, apiVersion);
                case OFFSET_FOR_LEADER_EPOCH:
                    return new OffsetsForLeaderEpochRequest(struct, apiVersion);
                case ADD_PARTITIONS_TO_TXN:
                    return new AddPartitionsToTxnRequest(struct, apiVersion);
                case ADD_OFFSETS_TO_TXN:
                    return new AddOffsetsToTxnRequest(struct, apiVersion);
                case END_TXN:
                    return new EndTxnRequest(struct, apiVersion);
                case WRITE_TXN_MARKERS:
                    return new WriteTxnMarkersRequest(struct, apiVersion);
                case TXN_OFFSET_COMMIT:
                    return new TxnOffsetCommitRequest(struct, apiVersion);
                case DESCRIBE_ACLS:
                    return new DescribeAclsRequest(struct, apiVersion);
                case CREATE_ACLS:
                    return new CreateAclsRequest(struct, apiVersion);
                case DELETE_ACLS:
                    return new DeleteAclsRequest(struct, apiVersion);
                case DESCRIBE_CONFIGS:
                    return new DescribeConfigsRequest(struct, apiVersion);
                case ALTER_CONFIGS:
                    return new AlterConfigsRequest(struct, apiVersion);
                case ALTER_REPLICA_LOG_DIRS:
                    return new AlterReplicaLogDirsRequest(struct, apiVersion);
                case DESCRIBE_LOG_DIRS:
                    return new DescribeLogDirsRequest(struct, apiVersion);
                case SASL_AUTHENTICATE:
                    return new SaslAuthenticateRequest(struct, apiVersion);
                case CREATE_PARTITIONS:
                    return new CreatePartitionsRequest(struct, apiVersion);
                case CREATE_DELEGATION_TOKEN:
                    return new CreateDelegationTokenRequest(struct, apiVersion);
                case RENEW_DELEGATION_TOKEN:
                    return new RenewDelegationTokenRequest(struct, apiVersion);
                case EXPIRE_DELEGATION_TOKEN:
                    return new ExpireDelegationTokenRequest(struct, apiVersion);
                case DESCRIBE_DELEGATION_TOKEN:
                    return new DescribeDelegationTokenRequest(struct, apiVersion);
                case DELETE_GROUPS:
                    return new DeleteGroupsRequest(struct, apiVersion);
                case ELECT_LEADERS:
                    return new ElectLeadersRequest(struct, apiVersion);
                case INCREMENTAL_ALTER_CONFIGS:
                    return new IncrementalAlterConfigsRequest(struct, apiVersion);
                case ALTER_PARTITION_REASSIGNMENTS:
                    return new AlterPartitionReassignmentsRequest(struct, apiVersion);
                case LIST_PARTITION_REASSIGNMENTS:
                    return new ListPartitionReassignmentsRequest(struct, apiVersion);
                case OFFSET_DELETE:
                    return new OffsetDeleteRequest(struct, apiVersion);
                default:
                    throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                            "code should be updated to do so.", apiKey));
            }
        }
    
    }

    3.基础数据结构类

    3.1Type类:

    • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
    • 这是一个abstrace class, 主要是定义了ByteBuffer与各种Object之间的序列化和反序列化;
      public abstract void write(ByteBuffer buffer, Object o);
      public abstract Object read(ByteBuffer buffer);
      public abstract Object validate(Object o);
      public abstract int sizeOf(Object o);
      public boolean isNullable();

       

    • 定义了若干Type类的实现类:

    public static final Type INT8
    public static final Type INT16
    public static final Type INT32
    public static final Type INT64
    public static final Type STRING
    public static final Type BYTES
    public static final Type NULLABLE_BYTES

    3.2 ArrayOf类:

    • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
    • Type类的具体实现, 是Type对象的数组类型;

    3.3 Field类:

    • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
    • 定义了在这个schema中的一个字段;
    • 成员:
       final int index;
        public final String name;
        public final Type type;
        public final Object defaultValue;
        public final String doc;
        final Schema schema;

    3.4 Schema类

    • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/types/schema.java
    • Schema类本身实现了Type类, 又包含了一个Field类对象的数组, 构成了记录的Schema;

    3.5 Sturct类

    • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/types/struct.java
    • 包括了一个Schema对象; 一个Object[] values数组,用于存放Schema描述的所有Field对应的值;
        private final Schema schema;
        private final Object[] values;
    • 定义了一系列getXXX方法, 用来获取schema中某个Field对应的值;
    • 定义了set方法, 用来设置schema中某个Field对应的值;
    • writeTo 用来将Stuct对象序列华到ByteBuffer;
    • Schema就是模板,Struct负责特化这个模板,向模板里添数据,构造出具体的request对象, 并可以将这个对象与ByteBuffer互相转化;

    3.6 ApiKeys

    • 所在文件: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
    • 定义了所有Kafka Api 的ID和名字
    ​
    
    public enum ApiKeys {
        PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions()),
        FETCH(1, "Fetch", FetchRequest.schemaVersions(), FetchResponse.schemaVersions()),
        LIST_OFFSETS(2, "ListOffsets", ListOffsetRequest.schemaVersions(), ListOffsetResponse.schemaVersions()),
        METADATA(3, "Metadata", MetadataRequestData.SCHEMAS, MetadataResponseData.SCHEMAS),
        LEADER_AND_ISR(4, "LeaderAndIsr", true, LeaderAndIsrRequestData.SCHEMAS, LeaderAndIsrResponseData.SCHEMAS),
        STOP_REPLICA(5, "StopReplica", true, StopReplicaRequestData.SCHEMAS, StopReplicaResponseData.SCHEMAS),
        UPDATE_METADATA(6, "UpdateMetadata", true, UpdateMetadataRequestData.SCHEMAS, UpdateMetadataResponseData.SCHEMAS),
        CONTROLLED_SHUTDOWN(7, "ControlledShutdown", true, ControlledShutdownRequestData.SCHEMAS,
                ControlledShutdownResponseData.SCHEMAS),
        OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequestData.SCHEMAS, OffsetCommitResponseData.SCHEMAS),
        OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequestData.SCHEMAS, OffsetFetchResponseData.SCHEMAS),
        FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequestData.SCHEMAS,
            FindCoordinatorResponseData.SCHEMAS),
        JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
        HEARTBEAT(12, "Heartbeat", HeartbeatRequestData.SCHEMAS, HeartbeatResponseData.SCHEMAS),
        LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
        SYNC_GROUP(14, "SyncGroup", SyncGroupRequestData.SCHEMAS, SyncGroupResponseData.SCHEMAS),
        DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequestData.SCHEMAS,
                DescribeGroupsResponseData.SCHEMAS),
        LIST_GROUPS(16, "ListGroups", ListGroupsRequestData.SCHEMAS, ListGroupsResponseData.SCHEMAS),
        SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, SaslHandshakeResponseData.SCHEMAS),
        API_VERSIONS(18, "ApiVersions", ApiVersionsRequestData.SCHEMAS, ApiVersionsResponseData.SCHEMAS) {
            @Override
            public Struct parseResponse(short version, ByteBuffer buffer) {
                // Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest
                // using a version higher than that supported by the broker, a version 0 response is sent
                // to the client indicating UNSUPPORTED_VERSION.
                return parseResponse(version, buffer, (short) 0);
            }
        },
        CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS),
        DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS),
        DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()),
        INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequestData.SCHEMAS, InitProducerIdResponseData.SCHEMAS),
        OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", false, OffsetsForLeaderEpochRequest.schemaVersions(),
                OffsetsForLeaderEpochResponse.schemaVersions()),
        ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2,
                AddPartitionsToTxnRequest.schemaVersions(), AddPartitionsToTxnResponse.schemaVersions()),
        ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequest.schemaVersions(),
                AddOffsetsToTxnResponse.schemaVersions()),
        END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2, EndTxnRequestData.SCHEMAS, EndTxnResponseData.SCHEMAS),
        WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequest.schemaVersions(),
                WriteTxnMarkersResponse.schemaVersions()),
        TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequestData.SCHEMAS,
                          TxnOffsetCommitResponseData.SCHEMAS),
        DESCRIBE_ACLS(29, "DescribeAcls", DescribeAclsRequestData.SCHEMAS, DescribeAclsResponseData.SCHEMAS),
        CREATE_ACLS(30, "CreateAcls", CreateAclsRequestData.SCHEMAS, CreateAclsResponseData.SCHEMAS),
        DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequestData.SCHEMAS, DeleteAclsResponseData.SCHEMAS),
        DESCRIBE_CONFIGS(32, "DescribeConfigs", DescribeConfigsRequest.schemaVersions(),
                DescribeConfigsResponse.schemaVersions()),
        ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequest.schemaVersions(),
                AlterConfigsResponse.schemaVersions()),
        ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", AlterReplicaLogDirsRequest.schemaVersions(),
                AlterReplicaLogDirsResponse.schemaVersions()),
        DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequest.schemaVersions(),
                DescribeLogDirsResponse.schemaVersions()),
        SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequestData.SCHEMAS,
                SaslAuthenticateResponseData.SCHEMAS),
        CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequestData.SCHEMAS,
                CreatePartitionsResponseData.SCHEMAS),
        CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS, CreateDelegationTokenResponseData.SCHEMAS),
        RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequestData.SCHEMAS, RenewDelegationTokenResponseData.SCHEMAS),
        EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequestData.SCHEMAS, ExpireDelegationTokenResponseData.SCHEMAS),
        DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequestData.SCHEMAS, DescribeDelegationTokenResponseData.SCHEMAS),
        DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequestData.SCHEMAS, DeleteGroupsResponseData.SCHEMAS),
        ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS,
                ElectLeadersResponseData.SCHEMAS),
        INCREMENTAL_ALTER_CONFIGS(44, "IncrementalAlterConfigs", IncrementalAlterConfigsRequestData.SCHEMAS,
                                  IncrementalAlterConfigsResponseData.SCHEMAS),
        ALTER_PARTITION_REASSIGNMENTS(45, "AlterPartitionReassignments", AlterPartitionReassignmentsRequestData.SCHEMAS,
                                      AlterPartitionReassignmentsResponseData.SCHEMAS),
        LIST_PARTITION_REASSIGNMENTS(46, "ListPartitionReassignments", ListPartitionReassignmentsRequestData.SCHEMAS,
                                     ListPartitionReassignmentsResponseData.SCHEMAS),
        OFFSET_DELETE(47, "OffsetDelete", OffsetDeleteRequestData.SCHEMAS, OffsetDeleteResponseData.SCHEMAS);
    
    }
    
    ​

    4.请求交互二进制数据格式

    kafka中客户端与server端交互有多种类型:ProduceRequest、FetchRequest......

    请求交互二进制数据组成为:请求类型 + 请求数据。

    4.1 MetaDataRequest二进制格式

    4.2 ProducerRequest二进制格式

    4.3 FetchRequest二进制格式

    .......

     

     

    5.关系图:

     

     

     

     

    展开全文
  • 一 /brokers节点 /brokers /brokers/topics /brokers/topics/test2 /brokers/topics/test2/partitions /brokers/topics/test2/partitions/0 .../brokers/topics/test2/partitions/0/state 的值为... ...
    一 /brokers节点
    /brokers
    /brokers/topics
    /brokers/topics/test2
    /brokers/topics/test2/partitions
    /brokers/topics/test2/partitions/0
    /brokers/topics/test2/partitions/0/state 的值为...
    /brokers/ids
    /brokers/ids/202
    /brokers/ids/203
    /brokers/ids/204
    /brokers/ids/202 的值为...
    /brokers/seqid的值为null

    二 /consumers节点
    /consumers
    /consumers/console-consumer-75113/ids
                             消费者组                                 该组下的消费者
    /consumers/console-consumer-75113/ids/console-consumer-75113_s2013e 的值为{“version":1,“subscription”:{test2},...}
                                                                   消费的主题
    /consumers/console-consumer-75113/owners
    /consumers/console-consumer-75113/owners/test2
    /consumers/console-consumer-75113/owners/test2/0的值为console-consumer-75113_s2013e
                            消费者组                          偏移量  主题  分区     值
    /consumers/console-consumer-75113/offsets/test2/0的值为0

    三 其他节点
    /controller的值为{“version":1,“brokerid":202}

    删除的主题
    /admin/delete_topics/test 的值为null

    配置的主题
    /config/topics/test
    /config/topics/test2的值为{"version":1,"config":{}}
    /config/changes的值为null
    /config/clients的值为null



    展开全文
  • 本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和...1 Producer端基本数据结构 ProducerRecord: 一个ProducerRecord表示一条待发送的...

    本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

    1 Producer端基本数据结构

    • ProducerRecord: 一个ProducerRecord表示一条待发送的消息记录,主要由5个字段构成:

        topic          所属topic
        partition      所属分区
        key            键值
        value          消息体
        timestamp      时间戳
      复制代码
    • RecordMetadata: Kafka服务器端返回给客户端的消息的元数据信息,前3项相对比较重要,Producer端可以使用这些消息做一些消息发送成功之后的处理。

        offset                   该条消息的位移
        timestamp                消息时间戳
        topic + partition        所属topic的分区
        checksum                 消息CRC32码
        serializedKeySize        序列化后的消息键字节数
        serializedValueSize      序列化后的消息体字节数
      复制代码

    2 Producer端消息发送流程

    • 在send()的发送消息动作触发之前,通过props属性中指定的servers连接到broker集群,从Zookeeper收集集群Metedata信息,从而了解哪些broker掌管哪一个Topic的哪一个partition,以及brokers的健康状态。

    • 下面就是流水线操作,ProducerRecord对象携带者topic,partition,message等信息,在Serializer这个“车间”被序列化。

    • 序列化过后的ProducerRecord对象进入Partitioner“车间”,按照上文所述的Partitioning 策略决定这个消息将被分配到哪个Partition中。

    • 确定partition的ProducerRecord进入一个缓冲区,通过减少IO来提升性能,在这个“车间”,消息被按照TopicPartition信息进行归类整理,相同Topic且相同parition的ProducerRecord被放在同一个RecordBatch中,等待被发送。什么时候发送?都在Producer的props中被指定了,有默认值,显然我们可以自己指定。

        (1) batch.size:设置每个RecordBatch可以缓存的最大字节数 
        (2) buffer.memory:设置所有RecordBatch的总共最大字节数 
        (3) linger.ms设置每个RecordBatch的最长延迟发送时间 
        (4) max.block.ms 设置每个RecordBatch的最长阻塞时间 
      复制代码
    • 一旦,当单个RecordBatch的linger.ms延迟到达或者batch.size达到上限,这个 RecordBatch会被立即发送。另外,如果所有RecordBatch作为一个整体,达到了buffer.memroy或者max.block.ms上限,所有的RecordBatch都会被发送。

    • ProducerRecord消息按照分配好的Partition发送到具体的broker中,broker接收保存消息,更新Metadata信息,同步给Zookeeper。

    • Producer端其他优化点:

        (5) acks:Producer的数据确认阻塞设置,0表示不管任何响应,只管发,发完了立即执行下个任务,这种方式最快,但是很不保险。1表示只确保leader成功响应,接收到数据。2表示确保leader及其所有follwer成功接收保存消息,也可以用”all”。
        (6) retries:消息发送失败重试的次数。
        (7) retry.backoff.ms:失败补偿时间,每次失败重试的时间间隔,不可设置太短,避免第一条消息的响应还没返回,第二条消息又发出去了,造成逻辑错误。
        (8) max.in.flight.request.per.connection:同一时间,每个Producer能够发送的消息上限。
        (9) compression.type  producer所使用的压缩器,目前支持gzip, snappy和lz4。压缩是在用户主线程完成的,通常都需要花费大量的CPU时间,但对于减少网络IO来说确实利器。生产环境中可以结合压力测试进行适当配置
      复制代码

    3 消息缓冲区(accumulator)再剖析

    • producer创建时会创建一个默认32MB(由buffer.memory参数指定)的accumulator缓冲区,专门保存待发送的消息。

    • 该数据结构中还包含了一个特别重要的集合信息:消息批次信息(batches)。该集合本质上是一个HashMap,里面分别保存了每个topic分区下的batch队列,即前面说的批次是按照topic分区进行分组的。这样发往不同分区的消息保存在对应分区下的batch队列中。

    • 假设消息M1, M2被发送到test的0分区但属于不同的batch,M3分送到test的1分区,那么batches中包含的信息就是:{"test-0" -> [batch1, batch2], "test-1" -> [batch3]}

    • 每个batch中最重要的3个组件包括:

        compressor: 负责执行追加写入操作
        batch缓冲区:由batch.size参数控制,消息被真正追加写入到的地方
        thunks:保存消息回调逻辑的集合
      复制代码
    • 本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

    • Sender线程自KafkaProducer创建后就一直都在运行着 。它的工作流程基本上是这样的:

        (1)不断轮询缓冲区寻找已做好发送准备的分区 
        (2)将轮询获得的各个batch按照目标分区所在的leader broker进行分组
        (3)将分组后的batch通过底层创建的Socket连接发送给各个broker
        (4)等待服务器端发送response回来
      复制代码

    • Sender线程会发送PRODUCE请求给对应的broker,broker处理完毕之后发送对应的PRODUCE response。一旦Sender线程接收到response将依次(按照消息发送顺序)调用batch中的回调方法

    4 总结

    • Sender线程自KafkaProducer创建后就一直都在运行着,单个RecordBatch的linger.ms延迟到达或者batch.size达到上限,作为后台线程就会检测到立即发送。
    • accumulator缓冲器按照Topic partion进行分组,来进行集中向某一个Broker发送。
    • 本文通过学习胡夕的相关技术博客和书籍,进行的学习笔记总结,辛苦成文,实属不易,各自珍惜,谢谢。
    • 秦凯新 于深圳 201812030018
    展开全文
  • Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Messa...

    参考资料:https://blog.csdn.net/gongxinju/article/details/72672375 以后继续深入总结。

    Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。借用官方的一张图,可以直观地看到topic和partition的关系。

    Anatomy of a Topic

    partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。

    接下来,本文将分析partition目录中的文件的存储格式和相关的代码所在的位置。

    3.1、Partition的数据文件

    Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:

    • offset
    • MessageSize
    • data

    其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。它的格式和Kafka通讯协议中介绍的MessageSet格式是一致。

    Partition的数据文件则包含了若干条上述格式的Message,按offset由小到大排列在一起。它的实现类为FileMessageSet,类图如下:
    FileMessageSet类图
    它的主要方法如下:

    • append: 把给定的ByteBufferMessageSet中的Message写入到这个数据文件中。
    • searchFor: 从指定的startingPosition开始搜索找到第一个Message其offset是大于或者等于指定的offset,并返回其在文件中的位置Position。它的实现方式是从startingPosition开始读取12个字节,分别是当前MessageSet的offset和size。如果当前offset小于指定的offset,那么将position向后移动LogOverHead+MessageSize(其中LogOverHead为offset+messagesize,为12个字节)。
    • read:准确名字应该是slice,它截取其中一部分返回一个新的FileMessageSet。它不保证截取的位置数据的完整性。
    • sizeInBytes: 表示这个FileMessageSet占有了多少字节的空间。
    • truncateTo: 把这个文件截断,这个方法不保证截断位置的Message的完整性。
    • readInto: 从指定的相对位置开始把文件的内容读取到对应的ByteBuffer中。

    我们来思考一下,如果一个partition只有一个数据文件会怎么样?

    1. 新数据是添加在文件末尾(调用FileMessageSet的append方法),不论文件数据文件有多大,这个操作永远都是O(1)的。
    2. 查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。

    那Kafka是如何解决查找效率的的问题呢?有两大法宝:1) 分段 2) 索引。

    3.2、数据文件的分段

    Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。

    3.3、为数据文件建索引

    数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
    索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

    • 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
    • position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。

    index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

    在Kafka中,索引文件的实现类为OffsetIndex,它的类图如下:
    OffsetIndex类图

    主要的方法有:

    • append方法,添加一对offset和position到index文件中,这里的offset将会被转成相对的offset。
    • lookup, 用二分查找的方式去查找小于或等于给定offset的最大的那个offset

    小结

    我们以几张图来总结一下Message是如何在Kafka中存储的,以及如何查找指定offset的Message的。

    Message是按照topic来组织,每个topic可以分成多个的partition,比如:有5个partition的名为为page_visits的topic的目录结构为:
    topic_partition

    partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件:
    partition
    可以看到,这个partition有4个LogSegment。

    展示是如何查找Message的。
    search
    比如:要查找绝对offset为7的Message:

    1. 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
    2. 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
    3. 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

    这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。

    一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。

    展开全文
  • Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。借用官方...

    引言

    Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。借用官方的一张图,可以直观地看到topic和partition的关系。
    Anatomy of a Topic

    partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。

    接下来,本文将分析partition目录中的文件的存储格式和相关的代码所在的位置。

    3.1、Partition的数据文件

    Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:

    • offset
    • MessageSize
    • data

    其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。它的格式和Kafka通讯协议中介绍的MessageSet格式是一致。

    Partition的数据文件则包含了若干条上述格式的Message,按offset由小到大排列在一起。它的实现类为FileMessageSet,类图如下:
    FileMessageSet类图
    它的主要方法如下:

    • append: 把给定的ByteBufferMessageSet中的Message写入到这个数据文件中。
    • searchFor: 从指定的startingPosition开始搜索找到第一个Message其offset是大于或者等于指定的offset,并返回其在文件中的位置Position。它的实现方式是从startingPosition开始读取12个字节,分别是当前MessageSet的offset和size。如果当前offset小于指定的offset,那么将position向后移动LogOverHead+MessageSize(其中LogOverHead为offset+messagesize,为12个字节)。
    • read:准确名字应该是slice,它截取其中一部分返回一个新的FileMessageSet。它不保证截取的位置数据的完整性。
    • sizeInBytes: 表示这个FileMessageSet占有了多少字节的空间。
    • truncateTo: 把这个文件截断,这个方法不保证截断位置的Message的完整性。
    • readInto: 从指定的相对位置开始把文件的内容读取到对应的ByteBuffer中。

    我们来思考一下,如果一个partition只有一个数据文件会怎么样?

    1. 新数据是添加在文件末尾(调用FileMessageSet的append方法),不论文件数据文件有多大,这个操作永远都是O(1)的。
    2. 查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。

    那Kafka是如何解决查找效率的的问题呢?有两大法宝:1) 分段 2) 索引。

    3.2、数据文件的分段

    Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。

    3.3、为数据文件建索引

    数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
    索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

    • 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
    • position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。

    index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

    在Kafka中,索引文件的实现类为OffsetIndex,它的类图如下:
    OffsetIndex类图

    主要的方法有:

    • append方法,添加一对offset和position到index文件中,这里的offset将会被转成相对的offset。
    • lookup, 用二分查找的方式去查找小于或等于给定offset的最大的那个offset

    小结

    我们以几张图来总结一下Message是如何在Kafka中存储的,以及如何查找指定offset的Message的。

    Message是按照topic来组织,每个topic可以分成多个的partition,比如:有5个partition的名为为page_visits的topic的目录结构为:
    topic_partition

    partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件:
    partition
    可以看到,这个partition有4个LogSegment。

    展示是如何查找Message的。
    search
    比如:要查找绝对offset为7的Message:

    1. 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
    2. 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
    3. 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

    这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。

    一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。

    展开全文
  • Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message
  • 业务更新数据写到数据库中 业务更新数据需要实时传递给下游依赖处理 所以传统的处理架构可能会这样: 但这个架构也存在着不少弊端:我们需要在项目中维护很多发送消息的代码。新增或者更新消息都会带来不少维护...
  • 一.Kafka的逻辑架构   ... 当一个Topic中消息... 是为了对大量的数据进行分而治之,把数据分区,不同的Consumer可以消费不同分区的数据,不同Consumer对数据的消费可以做成并行的,这样可以加快数据处理的速
  • kafka架构详解图

    2020-03-19 08:28:39
    生产到消费 备份副本1 备份副本2 leader follwer 第二个生产到消费 元数据信息,节点信息等记录到zk中
  • Kafka官方文档笔记

    2019-08-12 21:51:23
    Kafka现在不仅仅可以当做消息队列去使用,也可以进行流处理计算的平台。 其主要功能如下: 发布和订阅:将消息流作为一个发布订阅的信息传递系统。 处理:数据流可以高效且实时响应事件 存储:将数据流安全的...
  • KAFKA目录结构

    2018-08-20 17:26:27
     找到kafka的安装目录 find / -name kafka* -type d &nbsp; [root@DockerHostconfig]# cd /wls/kafka_2.11-0.8.2.1/config [root@DockerHostconfig]# ll total28 -rw-...
  • Kafka存储结构示意图

    2020-04-13 22:21:05
  • 但订阅数据库过于死板,当源表数据结构发生变化时管道就要重新写,不够灵活。这一篇我们来看下数据通过kafka同步到mysql是如何配置的。 kafka origin的安装就不介绍了,直接在Package Manager里点击安装就可以了。...
  • 起因 由于需要做各种数据库摆渡到kafka的组件研究。 其中clickhouse和kafka间的数据摆渡,根据官方给出的kafka引擎文档,便有了我这篇实践记录。 这边对数据库和kafka不再累述。 一、版本特性
  • 总结一下用golang 写的服务中接入kafka消息队列的关键和有用链接: golang 有两个主流接入kafka的lib, sarama和confluent-kafka-go, 在编译运行时均需要用到gcc。构建镜像需要加入apk add build-base 关于consumer,...
  • Kafka数据存储

    2017-06-27 15:17:08
    kafka数据的存储方式;kafka如何通过offset查找message。 1.前言 写介绍kafka的几个重要概念(可以参考之前的博文Kafka的简单介绍): Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以...
  • kafka的offset值一直不变,可以往里面写数据 会是什么原因呢 手动改变offset的值是可以消费数据
  • kafka查找数据的过程

    2019-12-30 22:56:49
    一.kafka查找数据: kafka里面一个topic有多个partition组成,每一个partition里面有多个segment组成,每个segment都由两部分组成,分别是.log文件....index文件:索引文件,使用索引文件,加快kafka数据的查找速...
  • 通过本模块的学习,能够掌握Kafka的负载均衡、Producer生产数据Kafka文件存储机制、Kafka自定义partition 课程大纲: 1、 Kafka整体结构图 2、 Consumer与topic关系 3、 Kafka Producer消息分发
  • kafka存储结构

    2018-04-23 18:32:10
    topic 主题,特指kafka处理的消息源的不同分类 partition topic物理上的分组,一个topic可以分为多个partition message 消息,是通信的基本单位 存储结构 topic结构 topic包含多个partition, topic只是...
1 2 3 4 5 ... 20
收藏数 42,599
精华内容 17,039
关键字:

kafka 数据结构