精华内容
下载资源
问答
  • 然而由于技术保密安全等原因,信息化不是无纸化,纸质文件依然存在,造成需要分发交换的文件和文件的电子信息是脱节的,经常出现文件的电子信息已经发到领导的电脑上而纸质文件还没有到达,或者领导已经在纸质文件上...
  • 电信设备-一种通信工程光纤交换箱.zip
  • 行业文档-设计装置-一种交换箱投口识读装置
  • 行业分类-嵌入式设备-一种公文交换箱嵌入式计数结构装置.zip
  • BCH刀箱交换模块Vlan配置,值得下载,大家下
  • 网络游戏-网络交换板和机箱.zip
  • 电信设备-集成信息交换功能的灯箱.zip
  • YUY-1121现代交换技术实验 产品特点 1模块化设计以用户交换机为模型功能划分清晰开放接口丰富 提供四路电话用户接口其中两路设计为可拔插模块 集成了三种交换方式人工交换空分交换时分交换均设计为可拔插模块 提供...
  • 动态链接函数工具,包含各类已经写好的函数。
  • 行业分类-物理装置-一种新型现代交换技术实验.zip
  • 电信设备-一种智能回收与商业信息交换系统.zip
  • 行业分类-外包设计-可实现气体交换的塑料包装.zip
  • 电信设备-带有移动式控制的空气优化交换系统.zip
  • 行业分类-机械工程-船机用热交换器主体的膨胀装置.zip
  • 行业分类-外包设计-家具包装封机主从站数据交换通讯控制系统结构.zip
  • 行业分类-外包设计-可实现气体交换的塑料包装的介绍分析.rar
  • 行业分类-设备装置-含有燃料添加剂的内限时释放离子交换树脂.zip
  • 发件模式描述了一种让服务以安全和一致的方式执行这两项任务的方法; 它为源服务提供即时“读取你自己的写入”语义,同时提供跨服务边界的可靠,最终一致的数据交换。 如果你已经构建了几个微服务,你可能会同意最...

    作为其业务逻辑的一部分,微服务通常不仅需要更新自己的本地数据存储,而且还需要向其他服务通知发生的数据更改。发件箱模式描述了一种让服务以安全和一致的方式执行这两项任务的方法; 它为源服务提供即时“读取你自己的写入”语义,同时提供跨服务边界的可靠,最终一致的数据交换。

    如果你已经构建了几个微服务,你可能会同意最困难的部分是数据:微服务不是孤立存在的,而且往往需要在彼此之间传播数据和数据变化。

    例如,考虑一个管理采购订单的微服务:当下新订单时,可能必须将有关该订单的信息转发给货运服务(因此它可以组装一个或多个订单的货件)和客户服务(因此它可以根据新订单更新客户的总贷方余额等事项。

    有不同的方法让订单服务了解其他两个关于新采购订单的方法; 例如,它可以调用这些服务提供的一些REST,grpc或其他(同步)API。但是,这可能会产生一些不希望的耦合:发送服务必须知道要调用哪些其他服务以及在哪里找到它们。它也必须准备好暂时无法使用这些服务。通过提供请求路由,重试,断路器等功能,Istio等服务网络可以在这里提供帮助。

    任何同步方法的一般问题是,如果没有它调用的其他服务,一个服务就无法真正运行。虽然缓冲和重试可能有助于仅需要通知某些事件的其他服务,但如果服务实际上需要查询其他服务以获取信息,则情况并非如此。例如,当下订单时,订单服务可能需要从库存服务获得所购买商品库存的次数。

    这种同步方法的另一个缺点是它缺乏可重玩性,即新消费者在事件发送后到达并且仍然能够从头开始消费整个事件流的可能性。

    这两个问题都可以通过使用异步数据交换方法来解决:即让订单,库存和其他服务通过持久的消息日志(如Apache Kafka)传播事件。通过订阅这些事件流,将通知每个服务有关其他服务的数据更改。它可以对这些事件做出反应,并且如果需要,可以使用针对其自身需求定制的表示在其自己的数据存储中创建该数据的本地表示。例如,这种视图可以被非规范化以有效地支持特定的访问模式,或者它可以仅包含与消费服务相关的原始数据的子集。

    持久日志还支持可重复播放性,即可以根据需要添加新的消费者,从而实现你最初可能没有想到的用例,而无需触及源服务。例如,考虑一个数据仓库,该数据仓库应保留有关所有订单的信息,或基于Elasticsearch的采购订单上的一些全文搜索功能。一旦采购订单事件出现在Kafka主题中(Kafka主题的保留策略设置可用于确保事件保留在主题中,只要它对于给定的用例和业务要求是必需的),新的消费者可以订阅,处理主题从一开始就实现了微服务数据库,搜索索引,数据仓库等所有数据的视图。

    处理主题增长

    根据数据量(记录的数量和大小,变化的频率),将事件长时间或甚至无限期地保留在主题中可能是也可能不可行。通常,在给定时间点之后,与给定数据项(例如,特定购买订单)有关的一些或甚至所有事件可能有资格从商业角度删除。请参阅下面的“从Kafka主题中删除事件”框,了解有关从Kafka主题中删除事件的更多想法,以便将其大小保持在范围内

    双重写的问题

    微服务为了提供它们的业务功能,微服务通常具有它们自己的本地数据存储。例如,订单服务可以使用关系数据库来持久保存关于采购订单的信息。放置新订单时,这可能会导致服务数据库INSERT中的表PurchaseOrder中的操作。同时,服务可能希望向Apache Kafka发送有关新订单的事件,以便将该信息传播给其他感兴趣的服务。

    但是,简单地发出这两个请求可能会导致潜在的不一致。原因是我们不能拥有一个跨越服务数据库和Apache Kafka的共享事务,因为后者不支持在分布式(XA)事务中加入。因此,在不幸的情况下,我们最终可能会在本地数据库中保留新的采购订单,但没有将相应的消息发送给Kafka(例如由于某些网络问题)。或者,反过来说,我们可能已将消息发送给Kafka但未能在本地数据库中保留采购订单。两种情况都是不可取的 这可能导致无法为看似成功下单的订单创建货件。或者货物被创建,

    那么如何避免这种情况呢?答案是只修改两个资源(数据库或Apache的卡夫卡)中一个,然后以最终一致的方式驱动第二个的更新。

    让我们首先考虑只写入Apache Kafka的情况。

    当收到新的采购订单时,订单服务不会INSERT同步进入其数据库; 相反,它只会向Kafka主题发送描述新订单的事件。因此,一次只能修改一个资源,如果出现问题,我们会立即发现它并向订单服务的调用方报告请求失败。

    同时,服务本身将订阅该Kafka主题。这样,当新消息到达主题时它将被通知,并且它可以在其数据库中保留新的采购订单。

    但是,这里有一个微妙的挑战,那就是缺乏“读你自己的写入数据”语义:例如,我们假设订单服务还有一个API,用于搜索给定客户的所有采购订单。在放置新订单后立即调用该API时,由于处理来自Kafka主题的消息的异步性质,可能会发生采购订单尚未保留在服务的数据库中,因此该查询不会返回该订单。这可能导致非常混乱的用户体验,因为用户例如可能错过他们的购物历史中新放置的订单。

    有办法处理这种情况,例如,服务可以将新放置的采购订单保留在内存中并基于此数据响应后续查询。尽管在实现更复杂的查询或考虑订单服务可能还包括群集设置中的多个节点时,就需要在群集内传播该数据。

    现在,我们看看另外一个方式,只是同步写入数据库并基于此驱动向Apache Kafka导出消息的情况会怎样?这是发件箱模式的用武之地。

    发件箱模式

    这种方法的想法是在服务的数据库中有一个“发件箱”表。当接收到下订单的请求时,不仅INSERT进入PurchaseOrder表中,而且,作为同一事务的一部分,还将表示要发送的事件的记录插入该发件箱表中。

    该记录描述了服务中发生的事件,例如它可能是一个JSON结构,表示已经放置了新的采购订单,包括订单本身的数据,订单行以及上下文信息(如使用情况)案例标识符。通过通过发件箱表中的记录显式地发出事件,可以确保以适合外部消费者的方式构造事件。这也有助于确保事件使用者在例如更改内部域模型或PurchaseOrder表时不会中断。

    异步进程监视该表以查找新条目。如果有,它会将事件作为消息传播到Apache Kafka。这为我们提供了非常好的特性平衡:通过同步写入PurchaseOrder表,源服务受益于“读取你自己的写入”语义。一旦提交了第一个交易,后续的采购订单查询将返回新的持久订单。与此同时,我们通过Apache Kafka获得可靠,异步,最终一致的数据传播到其他服务。

    现在,发件箱模式实际上并不是一个新想法。它已经使用了相当长的一段时间。实际上,即使使用实际上可以参与分布式事务的JMS样式的消息代理,也可以避免任何耦合以及远程资源(如消息代理)的停机时间的潜在影响。你还可以在Chris Richardson优秀的microservices.io网站上找到该模式的描述。

    然而,该模式得到的关注远远少于它应得的,并且在微服务环境中尤其有用。正如我们所看到的,可以使用变更数据捕获和Debezium以非常优雅和有效的方式实现发件箱模式。在下面,让我们探讨如何。

    基于变更数据捕获的实现

    基于日志的变更数据捕获(CDC)非常适合捕获发件箱表中的新条目并将其流式传输到Apache Kafka。与任何基于轮询的方法相反,事件捕获在近实时中以非常低的开销发生。Debezium附带了几个数据库的CDC连接器,如MySQL,Postgres和SQL Server。以下示例将使用Postberes的Debezium连接器。

    你可以在GitHub上找到该示例的完整源代码。有关构建和运行示例代码的详细信息,请参阅README.md。该示例以两个微服务,订单服务和发货服务为中心。两者都是用Java实现的,使用CDI作为组件模型,使用JPA / Hibernate访问各自的数据库。订单服务在WildFly上运行,并公开一个简单的REST API,用于下订单和取消特定订单行。它使用Postgres数据库作为其本地数据存储。装运服务基于Thorntail; 通过Apache Kafka,它接收订单服务导出的事件,并在自己的MySQL数据库中创建相应的货件条目。Debezium对订单服务的Postgres数据库的事务日志(“预写日志”,WAL)进行了定制,以便捕获发件箱表中的任何新事件并将它们传播到Apache Kafka。

    解决方案的整体架构如下图所示:
    在这里插入图片描述

    请注意,该模式与这些特定的实现选择无关。使用Spring Boot(例如利用Spring Data 对域事件的支持),普通JDBC或除Java之外的其他编程语言等替代技术同样可以实现。

    现在让我们仔细看看解决方案的一些相关组件。

    发件箱表

    该outbox表位于订单服务的数据库中,具有以下结构:

    Column     |          Type   | Modifiers
    --------------+------------------------+-----------
    id        | uuid             | not null
    aggregatetype | character varying(255) | not null
    aggregateid  | character varying(255) | not null
    type       | character varying(255) | not null
    payload     | jsonb            | not null
    

    它的列是这些:

    • id:每条消息的唯一ID; 消费者可以使用它来检测任何重复事件,例如在故障后重新启动以读取消息时。在创建新事件时生成。
    • aggregatetype:与给定事件相关的聚合根的类型; 理念是,依赖于领域驱动设计的相同概念,导出事件应该引用聚合(“可以被视为单个单元的域对象集群”),其中聚合根提供唯一的入口点用于访问聚合中的任何实体。例如,这可以是“采购订单”或“客户”。
      此值将用于将事件路由到Kafka中的相应主题,因此会有与采购订单相关的所有事件的主题,所有与客户相关的事件的一个主题等。请注意,还包含与子实体相关的事件一个这样的聚合应该使用相同的类型。因此,例如,表示取消单个订单行(它是采购订单汇总的一部分)的事件也应该使用其聚合根的类型“订单”,以确保此事件也将进入“订单”Kafka主题。
    • aggregateid:受给定事件影响的聚合根的id; 例如,这可以是采购订单的ID或客户ID;与聚合类型类似,与聚合中包含的子实体相关的事件应使用包含聚合根的id,例如订单行取消事件的采购订单ID。此ID将在以后用作Kafka消息的密钥。这样,与一个聚合根或其任何包含的子实体相关的所有事件都将进入该Kafka主题的同一分区,这将确保该主题的使用者将消耗与该主题中的同一聚合相关的所有事件。生产时的确切顺序。
    • type:事件类型,例如“订单已创建”或“订单行已取消”。允许消费者触发合适的事件处理程序。
    • payload:具有实际事件内容的JSON结构,例如包含采购订单,有关购买者的信息,包含的订单行,其价格等。

    将事件发送到发件箱

    为了“发送”事件到发件箱,订单服务中的代码通常只能INSERT进入发件箱表。但是,最好采用稍微抽象的API,如果需要,可以在以后更轻松地调整发件箱的实现细节。CDI活动非常方便。它们可以在应用程序代码中引发,并由发件箱事件发送者同步处理,它将INSERT在发件箱表中执行所需操作。

    所有发件箱事件类型都应实现以下合同,类似于之前显示的发件箱表的结构:

    public interface ExportedEvent {
    
        String getAggregateId();
        String getAggregateType();
        JsonNode getPayload();
        String getType();
    }
    

    为了产生这样的事件,应用程序代码使用注入的Event实例,例如在OrderService类中:

    @ApplicationScoped
    public class OrderService {
    
        @PersistenceContext
        private EntityManager entityManager;
    
        @Inject
        private Event<ExportedEvent> event;
    
        @Transactional
        public PurchaseOrder addOrder(PurchaseOrder order) {
            order = entityManager.merge(order);
    
            event.fire(OrderCreatedEvent.of(order));
            event.fire(InvoiceCreatedEvent.of(order));
    
            return order;
        }
    
        @Transactional
        public PurchaseOrder updateOrderLine(long orderId, long orderLineId,
                OrderLineStatus newStatus) {
            // ...
        }
    }
    

    在该addOrder()方法中,JPA实体管理器用于在数据库中保留传入的订单,并且注入event用于触发相应的OrderCreatedEvent和InvoiceCreatedEvent。同样,请记住,尽管存在“事件”的概念,但这两件事情发生在同一个事务中。即在此交易中,将在数据库中插入三条记录:一张在带有采购订单的表中,另一张在发件箱表中。

    实际的事件实现是直截了当的; 例如,这是OrderCreatedEvent类:

    public class OrderCreatedEvent implements ExportedEvent {
    
        private static ObjectMapper mapper = new ObjectMapper();
    
        private final long id;
        private final JsonNode order;
    
        private OrderCreatedEvent(long id, JsonNode order) {
            this.id = id;
            this.order = order;
        }
    
        public static OrderCreatedEvent of(PurchaseOrder order) {
            ObjectNode asJson = mapper.createObjectNode()
                    .put("id", order.getId())
                    .put("customerId", order.getCustomerId())
                    .put("orderDate", order.getOrderDate().toString());
    
            ArrayNode items = asJson.putArray("lineItems");
    
            for (OrderLine orderLine : order.getLineItems()) {
            items.add(
                    mapper.createObjectNode()
                    .put("id", orderLine.getId())
                    .put("item", orderLine.getItem())
                    .put("quantity", orderLine.getQuantity())
                    .put("totalPrice", orderLine.getTotalPrice())
                    .put("status", orderLine.getStatus().name())
                );
            }
    
            return new OrderCreatedEvent(order.getId(), asJson);
        }
    
        @Override
        public String getAggregateId() {
            return String.valueOf(id);
        }
    
        @Override
        public String getAggregateType() {
            return "Order";
        }
    
        @Override
        public String getType() {
            return "OrderCreated";
        }
    
        @Override
        public JsonNode getPayload() {
            return order;
        }
    }
    

    请注意Jackson 如何ObjectMapper用于创建事件有效负载的JSON表示。

    现在让我们看看消耗任何被激活的代码ExportedEvent并对outbox表进行相应的写操作:

    @ApplicationScoped
    public class EventSender {
    
        @PersistenceContext
        private EntityManager entityManager;
    
        public void onExportedEvent(@Observes ExportedEvent event) {
            OutboxEvent outboxEvent = new OutboxEvent(
                    event.getAggregateType(),
                    event.getAggregateId(),
                    event.getType(),
                    event.getPayload()
            );
    
            entityManager.persist(outboxEvent);
            entityManager.remove(outboxEvent);
        }
    }
    

    它相当简单:对于每个事件,CDI运行时将调用该onExportedEvent()方法。OutboxEvent实体的一个实例持久存储在数据库中 - 并立即删除!

    起初这可能会令人惊讶。但是,在记住基于日志的CDC如何工作时,它是有意义的:它不会检查数据库中表的实际内容,而是会关闭仅附加事务日志。一旦事务提交,调用persist()并将在日志中remove()创建一个INSERT和一个DELETE条目。之后,Debezium将处理这些事件:对于任何事件INSERT,具有事件有效负载的消息将被发送到Apache Kafka。DELETE另一方面,事件可以被忽略,因为从发件箱表中删除仅仅是技术性,不需要任何传播到消息代理。因此,我们可以通过CDC捕获添加到发件箱表中的事件,但是当查看表本身的内容时,它将始终为空。这意味着表格不需要额外的磁盘空间(除了将在某个时刻自动丢弃的日志文件元素),也不需要单独的管理过程来阻止它无限增长。

    注册Debezium连接器

    有了outbox实现,就可以注册Debezium Postgres连接器了,这样它就可以捕获发件箱表中的任何新事件并将它们转发给Apache Kafka。这可以通过将以下JSON请求发布到Kafka Connect的REST API来完成:

    {
        "name": "outbox-connector",
        "config": {
            "connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
            "tasks.max" : "1",
            "database.hostname" : "order-db",
            "database.port" : "5432",
            "database.user" : "postgresuser",
            "database.password" : "postgrespw",
            "database.dbname" : "orderdb",
            "database.server.name" : "dbserver1",
            "schema.whitelist" : "inventory",
            "table.whitelist" : "inventory.outboxevent",
            "tombstones.on.delete" : "false",
            "transforms" : "router",
            "transforms.router.type" : "io.debezium.examples.outbox.routingsmt.EventRouter"
        }
    }
    

    这将设置一个实例io.debezium.connector.postgresql.PostgresConnector,从指定的Postgres实例捕获更改。请注意,通过表白名单,仅outboxevent捕获表中的更改。它还应用了名为的单个消息转换(SMT)EventRouter。

    删除Kafka主题中的事件

    通过设置tombstones.on.deleteto false,当从发件箱表中删除事件记录时,连接器将不会发出删除标记(“tombstones”)。这是有道理的,因为从发件箱表中删除不应影响相应Kafka主题中事件的保留。相反,可以在Kafka中配置事件主题的特定保留时间,例如,将所有采购订单事件保留30天。

    或者,可以使用压缩的主题。这需要对发件箱表中的事件设计进行一些更改:

    他们必须描述整个集合; 因此,例如,表示取消单个订单行的事件也应描述包含采购订单的完整当前状态; 这样,在日志压缩运行之后,当只看到与给定订单有关的最后一个事件时,消费者也能够获得采购订单的整个状态。

    它们必须还有一个boolean属性,指示特定事件是否表示删除事件的聚合根。OrderDeleted然后,可以由下一节中描述的事件路由SMT使用这样的事件(例如类型)来为该聚合根生成删除标记。然后,当OrderDeleted事件已写入主题时,日志压缩将删除与给定采购订单相关的所有事件。

    当然,在删除事件时,事件流将不再从一开始就可以重新播放。根据具体的业务需求,仅保留给定采购订单,客户等的最终状态可能就足够了。这可以通过使用压缩的主题和主题delete.retention.ms设置的足够值来实现。另一个选择可能是将历史事件移动到某种冷存储(例如Amazon S3存储桶),如果需要可以从中检索它们,然后从Kafka主题中读取最新事件。采用哪种方法取决于开发和运行解决方案的团队的具体要求,预期数据量和专业知识。

    主题路由

    默认情况下,Debezium连接器会将源自一个给定表的所有更改事件发送到同一主题,即我们最终会得到一个名为Kafka的主题dbserver1.inventory.outboxevent,该主题将包含所有事件,包括订单事件,客户事件等。

    为了简化仅对特定事件类型感兴趣的消费者的实现,更有意义的是,具有多个主题,例如OrderEvents,CustomerEvents等等。例如,装运服务可能对任何客户事件不感兴趣。通过仅订阅该OrderEvents主题,它将确保永远不会收到任何客户事件。

    为了将从发件箱表捕获的更改事件路由到不同的主题,使用该自定义SMT EventRouter。以下是其apply()方法的代码,Kafka Connect将为Debezium连接器发出的每条记录调用它:

    @Override
    public R apply(R record) {
        // Ignoring tombstones just in case
        if (record.value() == null) {
            return record;
        }
    
        Struct struct = (Struct) record.value();
        String op = struct.getString("op");
    
        // ignoring deletions in the outbox table
        if (op.equals("d")) {
            return null;
        }
        else if (op.equals("c")) {
            Long timestamp = struct.getInt64("ts_ms");
            Struct after = struct.getStruct("after");
    
            String key = after.getString("aggregateid");
            String topic = after.getString("aggregatetype") + "Events";
    
            String eventId = after.getString("id");
            String eventType = after.getString("type");
            String payload = after.getString("payload");
    
            Schema valueSchema = SchemaBuilder.struct()
                .field("eventType", after.schema().field("type").schema())
                .field("ts_ms", struct.schema().field("ts_ms").schema())
                .field("payload", after.schema().field("payload").schema())
                .build();
    
            Struct value = new Struct(valueSchema)
                .put("eventType", eventType)
                .put("ts_ms", timestamp)
                .put("payload", payload);
    
            Headers headers = record.headers();
            headers.addString("eventId", eventId);
    
            return record.newRecord(topic, null, Schema.STRING_SCHEMA, key, valueSchema, value,
                    record.timestamp(), headers);
        }
        // not expecting update events, as the outbox table is "append only",
        // i.e. event records will never be updated
        else {
            throw new IllegalArgumentException("Record of unexpected op type: " + record);
        }
    }
    

    当收到删除事件(op= d)时,它将丢弃该事件,因为从发件箱表中删除事件记录与下游消费者无关。收到创建事件(op= c)时,事情变得更有趣。这样的记录将传播到Apache Kafka。

    Debezium的更改事件具有复杂的结构,包含所表示行的old(before)和new(after)状态。要传播的事件结构是从after状态获得的。在aggregatetype从捕获的事件记录值被用来构建主题的名称将事件发送到。例如,aggregatetype设置为的事件Order将发送到OrderEvents主题。aggregateid用作消息密钥,确保该聚合的所有消息都将进入该主题的同一分区。消息值是包含原始事件有效负载(编码为JSON)的结构,指示事件何时生成的时间戳和事件类型。最后,事件UUID作为Kafka头字段传播。这允许消费者进行有效的重复检测,而不必检查实际的消息内容。

    Apache Kafka中的事件

    现在让我们来看看OrderEvents和CustomerEvents主题。

    如果已经检查了示例源并通过Docker Compose启动了所有组件(请参阅示例项目中的README.md文件以获取更多详细信息),还可以通过订单服务的REST API下载采购订单,如下所示:

    cat resources/data/create-order-request.json | http POST
    http://localhost:8080/order-service/rest/orders

    同样,可以取消特定的订单行:

    cat resources/data/cancel-order-line-request.json | http PUT
    http://localhost:8080/order-service/rest/orders/1/lines/2

    当使用诸如非常实用的kafkacat实用程序之类的工具时,你现在应该在OrderEvents主题中看到类似这样的消息:

    kafkacat -b kafka:9092 -C -o beginning -f ‘Headers: %h\nKey: %k\nValue: %s\n’ -q -t OrderEvents

    Headers: eventId=d03dfb18-8af8-464d-890b-09eb8b2dbbdd
    Key: "4"
    Value: {"eventType":"OrderCreated","ts_ms":1550307598558,"payload":"{\"id\": 4, \"lineItems\": [{\"id\": 7, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 8, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \?-01-31T12:13:01\", \"customerId\": 123}"}
    Headers: eventId=49f89ea0-b344-421f-b66f-c635d212f72c
    Key: "4"
    Value: {"eventType":"OrderLineUpdated","ts_ms":1550308226963,"payload":"{\"orderId\": 4, \"newStatus\": \"CANCELLED\", \"oldStatus\": \"ENTERED\", \"orderLineId\": 7}"}
    

    payload具有消息值的字段是原始事件的字符串ified JSON表示。Debezium Postgres连接器将JSONB列作为字符串发出(使用io.debezium.data.Json逻辑类型名称),这就是引号被转义的原因。JQ用处,更具体地说,它是fromjson操作者,用于显示在一个更可读的方式事件负载:

    kafkacat -b kafka:9092 -C -o beginning -t Order | jq '.payload | fromjson'
    
    {
      "id": 4,
      "lineItems": [
        {
          "id": 7,
          "item": "Debezium in Action",
          "status": "ENTERED",
          "quantity": 2,
          "totalPrice": 39.98
        },
        {
          "id": 8,
          "item": "Debezium for Dummies",
          "status": "ENTERED",
          "quantity": 1,
          "totalPrice": 29.99
        }
      ],
      "orderDate": "2019-01-31T12:13:01",
      "customerId": 123
    }
    {
      "orderId": 4,
      "newStatus": "CANCELLED",
      "oldStatus": "ENTERED",
      "orderLineId": 7
    }
    

    还可以查看CustomerEvents主题,以便在添加采购订单时检查表示创建发票的事件。

    消费服务中的重复检测

    此时,我们实现的发件箱模式功能齐全; 当订单服务收到下订单(或取消订单行)的请求时,它将在其数据库的purchaseorder和orderline表中保持相应的状态。同时,在同一事务中,相应的事件条目将添加到同一数据库中的发件箱表中。Debezium Postgres连接器捕获对该表的任何插入,并将事件路由到与给定事件所代表的聚合类型相对应的Kafka主题。

    为了总结,让我们探讨另一种微服务(例如货运服务)如何使用这些消息。该服务的切入点是常规的Kafka消费者实现,这不是太令人兴奋,因此为了简洁起见在此省略。你可以在示例存储库中找到其源代码。对于Order主题上的每个传入消息,消费者调用OrderEventHandler:

    @ApplicationScoped
    public class OrderEventHandler {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(OrderEventHandler.class);
    
        @Inject
        private MessageLog log;
    
        @Inject
        private ShipmentService shipmentService;
    
        @Transactional
        public void onOrderEvent(UUID eventId, String key, String event) {
            if (log.alreadyProcessed(eventId)) {
                LOGGER.info("Event with UUID {} was already retrieved, ignoring it", eventId);
                return;
            }
    
            JsonObject json = Json.createReader(new StringReader(event)).readObject();
            JsonObject payload = json.containsKey("schema") ? json.getJsonObject("payload") :json;
    
            String eventType = payload.getString("eventType");
            Long ts = payload.getJsonNumber("ts_ms").longValue();
            String eventPayload = payload.getString("payload");
    
            JsonReader payloadReader = Json.createReader(new StringReader(eventPayload));
            JsonObject payloadObject = payloadReader.readObject();
    
            if (eventType.equals("OrderCreated")) {
                shipmentService.orderCreated(payloadObject);
            }
            else if (eventType.equals("OrderLineUpdated")) {
                shipmentService.orderLineUpdated(payloadObject);
            }
            else {
                LOGGER.warn("Unkown event type");
            }
    
            log.processed(eventId);
        }
    }
    

    完成的第一件事onOrderEvent()是检查之前是否已处理具有给定UUID的事件。如果是这样,将忽略对该同一事件的任何进一步调用。这是为了防止由此数据管道的“至少一次”语义引起的任何重复事件处理。例如,在确认分别使用源数据库或消息传递代理检索特定事件之前,可能会发生Debezium连接器或使用服务失败。在这种情况下,在重新启动Debezium或消费服务之后,可能会再次处理一些事件。将事件UUID传播为Kafka消息头允许有效地检测和排除消费者中的重复。

    如果第一次收到消息,则解析消息值,并ShippingService使用事件有效负载调用与特定事件类型对应的方法的业务方法。最后,消息被标记为使用消息日志处理。

    这MessageLog只是跟踪服务的本地数据库中表中所有消耗的事件:

    @ApplicationScoped
    public class MessageLog {
    
        @PersistenceContext
        private EntityManager entityManager;
    
        @Transactional(value=TxType.MANDATORY)
        public void processed(UUID eventId) {
            entityManager.persist(new ConsumedMessage(eventId, Instant.now()));
        }
    
        @Transactional(value=TxType.MANDATORY)
        public boolean alreadyProcessed(UUID eventId) {
            return entityManager.find(ConsumedMessage.class, eventId) != null;
        }
    }
    

    这样,如果由于某种原因回滚事务,原始消息也不会被标记为已处理,并且异常将冒泡到Kafka事件消费者循环。这允许稍后重新尝试处理该消息。

    请注意,在将任何不可处理的消息重新路由到死信队列或类似消息之前,更完整的实现应该只负责重试给定消息一定次数。消息日志表上也应该有一些内容; 周期性地,可以删除早于消费者与代理提交的当前偏移的所有事件,因为它确保这些消息不会再次传播给消费者。

    总结

    发件箱模式是在不同微服务之间传播数据的好方法。

    通过仅修改单个资源(源服务自己的数据库),它避免了在不共享一个公共事务上下文(数据库和Apache Kafka)的情况下同时更改多个资源的任何潜在不一致。通过首先写入数据库,源服务立即“读取你自己的写入”语义,这对于一致的用户体验很重要,允许在写入后调用的查询方法立即反映任何数据更改。

    同时,该模式使异步事件传播到其他微服务。Apache Kafka是服务之间消息传递的高度可扩展和可靠的主干。给定正确的主题保留设置,新的消费者可能在最初生成事件后很长时间内出现,并根据事件历史建立自己的本地状态。

    将Apache Kafka置于整体架构的中心也可确保所涉及服务的分离。例如,如果解决方案的单个组件失效或在一段时间内不可用,例如在更新期间,事件将在稍后处理:在重新启动之后,Debezium连接器将继续从它离开的位置拖出发件箱表。之前。同样,任何消费者都将继续处理其先前偏移的主题。通过跟踪已经成功处理的消息,可以检测重复项并从重复处理中排除重复项。

    当然,不同服务之间的这种事件管道最终是一致的,即诸如运输服务之类的消费者可能落后于诸如订单服务之类的生产者。通常,这很好,并且可以根据应用程序的业务逻辑进行处理。例如,通常不需要在下订单的同一秒内创建货件。此外,整体解决方案的端到端延迟通常很低(几秒甚至亚秒范围),这要归功于基于日志的变更数据捕获,它允许近实时发送事件。

    要记住的最后一件事是,通过发件箱公开的事件的结构应该被视为发射服务的API的一部分。即在需要时,应仔细调整其结构并考虑兼容性因素。这是为了确保在升级生产服务时不会意外破坏任何消费者。同时,消费者在处理消息时应该宽容,例如在遇到接收事件中的未知属性时不会失败。

    关于Debezium

    Debezium是一个开源分布式平台,可将现有数据库转换为事件流,因此应用程序几乎可以立即查看和响应数据库中每个已提交的行级更改。Debezium构建于Kafka之上,提供Kafka Connect兼容连接器,可监控特定的数据库管理系统。Debezium记录了Kafka日志中数据更改的历史记录,因此你的应用程序可以随时停止和重新启动,并且可以轻松地使用它在未运行时丢失的所有事件,从而确保正确且完整地处理所有事件。Debezium是开源的下Apache许可证,版本2.0。

    获取更多Java高级架构最新视频,
    直接点击链接加群。https://jq.qq.com/?_wv=1027&k=5lXBNZ7

    展开全文
  • 变速用热交换器行业(2021-2026)企业市场突围战略分析与建议.docx
  • 汽车工业动力传动系行业变速用热交换器领域分析报告(研究报告).pdf
  • 行业分类-设备装置-基于物联网技术的集装物流跟踪全球网络交换服务平台.zip
  • 有用的(自定义和文件交换)脚本,工具等的集合。 包括许可证,自述文件以及从其他Internet来源获得的文件的任何其他信息。 自定义脚本 在实验室编写的功能和脚本,或者从文件交换下载的功能和脚本(可能会或可能...
  • 变速用热交换器公司(行业)薪酬管理制度方案-薪酬设计方案资料文集系列.docx
  • 行业资料-电子功用-10kV一体化柱上变台低压配电交换通道.pdf.zip
  • 2020年汽车工业动力传动系行业变速用热交换器领域行业分析报告(市场调查报告).pdf
  • 现代交换原理实验配套软件,用于程序控制交换的试验
  • 上海求育QY-JXSY24程控综合实验交换网络单元设计成插拔式模块,分为用户模块、信令信号产生模块、呼叫接续的记发器模块、信令显示与控制模块、路由交换控制模块等。交换方式的实验从人工交换、空分交换、数字时分...

    上海求育QY-JXSY24程控综合实验箱
    上海求育QY-JXSY24程控综合实验箱将交换网络单元设计成插拔式模块,分为用户模块、信令信号产生模块、呼叫接续的记发器模块、信令显示与控制模块、路由交换控制模块等。交换方式的实验从人工交换、空分交换、数字时分交换由浅入深,有助学生理解交换原理,比较各种交换方式的特点和优劣。学生可在了解空分交换、时分交换芯片原理和控制时序的前提下,编制各种交换软件(不用MS51仿真器,在我们软件的支持下在线调试),实现所需的路由交换。配备了完整的信令分析和电话接续统计软件,有助学生学会实际交换机分析软件的使用。以便学生用不同方式开发实现交换功能,对学生理解交换原理,培养芯片使用能力很有帮助.
    实验项目
    第一章 8643系统模块实验
    实验1 交换系统组成与结构
    实验2 8643电源模块
    实验3 CPU中央系统处理器
    实验4 用户接口模块(主被叫)实验
    实验5 程控交换状态设置
    实验6 信令信号的产生与观测
    实验7 双音多频(DTMF)接收与检测
    实验8 话路PCM CODEC编译码
    实验9 呼叫处理与线路信号的传输过程
    实验10 二/四线变换与回波返损测试
    *实验11 用户终端电话信号电平调整
    第二章 信令交换与信息交换
    实验12 人工及信号交换实验
    实验13 空分交换(MT8816)的过程与分析
    实验14 时分交换(MT8980)的过程与分析
    实验15 市话接口实验
    第三章 中继实验
    实验16 空分中继实验
    实验17 数字时分中继光纤通信实验
    第四章 二次开发实验
    实验18 MT8816空分交换开发与在线调试实验
    实验19 MT8980时分交换开发与在线调试实验
    实验20 市话接口开发实验
    实验21 记发器的电话呼叫接续编程实验
    实验22 液晶汉字与字符显示编程
    实验23 薄膜键盘扫描输入编程
    实验24 新服务功能及编程调试
    第五章 七号信令分折实验
    实验25 七号信令系统与OSI七层的对应关系
    实验26 固话用户呼叫过程信令分折
    实验27 电信业务统计分折
    附 录
    附录1 液晶显示驱动模块HD 44780的指令集
    附录2 数字交换网络芯片MT8980介绍

    展开全文
  • 公文交换与跟踪系统采用条码识别、IC卡身份识别以及智能公文交换箱等技术,实现了机要文件交换及收发过程中的信息自动采集核对、投箱过程中的提示与纠错、交接过程的电子化等,从而提高了机要文件交换的安全性、准确...
  • 交换原理课程实验

    2013-06-23 16:42:59
    3、 电源输入模块:产生整个实验所需要的各种电压的工作电源; 4、 中央处理器:由MSC-51系列的单片机实现,主要实现人机界面的管理; 5、 记发器:由MSC-51系列的单片机实现,实现信令的管理; 6、 话路交换控制...
  • 这个应用程序对于阅读交换收件并向Slack发送每封新电子邮件的直接消息(看不见的消息)很有用。 获取Slack API密钥和客户端机密 Visit https://api.slack.com/web Sign in if you need to Click Get Token This ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 39,794
精华内容 15,917
关键字:

交换箱是什么