KafkaFlow入门指南 构建可扩展的Kafka事件驱动应用 (kafka分区策略有哪些)

KafkaFlow入门指南 构建可扩展的Kafka事件驱动应用 (kafka分区策略有哪些)

在本文中,我们将会探讨 KafkaFlow 提供提供的特性。如果你正在使用.NET 构建 Apache Kafka 生产者和消费者,那么本文将会介绍如何借助 KafkaFlow 来简化你的生活。

为何要关注它?

KafkaFlow 为 Confluent .NET Kafka 客户端提供了一个抽象层。它使得使用、维护和测试 Kafka 消费者和生产者均更加容易。

假设你要为市场营销活动创建一个客户端目录(Client Catalog)。我们需要一项服务来消费那些捕获新客户端的消息。当开始设计所需的服务时,你会发现现有的服务在如何消费消息方面并不一致。

常见的情形是,团队在解决一些简单的问题(如优雅关机)时,往往会陷入困境。你会发现整个组织有四种不同的 JSON 序列化器实现,这只是挑战之一。

采用 KafkaFlow 这样的框架能够简化流程并加快开发周期。KafkaFlow 拥有一系列旨在提升开发人员体验的特性:

KafkaFlow 生产者:简化消息的生成

我们从消息的生产者开始。

向 Kafka 中生成消息并不是什么高难的火箭科学。即便如此,KafkaFlow 还是为 Confluent 的.NET Kafka 客户端的生产者接口提供了更高级别的抽象,从而能够简化代码并提升可维护性。

下面是一个如何使用 KafkaFlow 生产者发送消息的样例:

await _producers["my-topic-events"].ProduceAsync("my-topic", message.Id.ToString(), message);
复制代码

这样,我们就可以向 Kafka 生成消息,而无需直接处理序列化或底层 Kafka 客户端的其他复杂问题。不仅如此,定义和管理生产者还可以通过服务配置上的流畅接口(Fluent Interface)轻松实现。

services.AddKafka(kafka => kafka.AddCluster(cluster => cluster.WithBrokers(new[] { "host:9092" }).AddProducer("product-events",producer =>
复制代码

生产者往往很简单,但也有一些常见的问题需要解决,比如压缩或序列化。我们来探讨一下。

在 KafkaFlow 中自定义序列化/反序列化

在 Apache Kafka 中,一个很有吸引力的特性就是与数据格式无关。但是,这就将责任转移给了生产者和消费者。如果考虑不周全,可能会导致在整个系统中出现由多种方式实现同一种结果的现象。因此,序列化显然是一个由客户端框架处理的用例。

KafkaFlow 具有适用于 JSON、Protobuf 甚至 Avro 的序列化器。只需将它们添加到中间件配置中就可以使用。

.AddProducer<ProductEventsProducer>(producer => producer.AddMiddlewares(middlewares => middleware.AddSerializer<JsonMessageSerializer>()
复制代码

鉴于我们可以为消息使用自定义的序列化器/反序列化器,所以这个列表并不局限于这三种。虽然 Confluent 的.NET Kafka 客户端已经支持自定义序列化/反序列化,但 KafkaFlow 通过提供更优雅的处理方式简化了这一过程。举例来说,要使用自定义序列化器,我们可以这样做:

public class MySerializer : ISerializerpublic Task SerializeAsync(object message, Stream output, ISerializerContext context)// 序列化逻辑在这里public async Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)// 反序列化逻辑在这里// 在设置Kafka消费者/生产者的时候,注册自定义的序列化器.AddProducer<MyProducer>(producer => producer.AddMiddlewares(middlewares => middleware.AddSerializer<MySerializer>()
复制代码

KafkaFlow 中的消息处理

消费者带来了大量的问题和可能性。第一个问题就是“如何处理消息?”

我们从最简单的方式开始。随着像 MediatR 这样的库的出现,CQRS 和 Meditor 模式得到了普及,.NET 开发人员习惯于将消息处理器与请求/消息接收器解耦。KafkaFlow 将同样的原则引入到了 Kafka 消费者中。

KafkaFlow 消息处理器允许开发人员定义特定的逻辑来处理来自 Kafka 主题的消息。按照设计,KafkaFlow 的消息处理器结构能够更好地分离关注点,并使代码更整洁、更易于维护。

如下是一个消息处理器的示例:

public class MyMessageHandler : IMessageHandler<MyMessageType>public Task Handle(IMessageContext context, MyMessageType message)// 消息处理逻辑在这里
复制代码

这个处理器可以在消费者配置中进行注册:

.AddConsumer(consumer => consumer.AddMiddlewares(middlewares => middlewares.AddTypedHandlers(handlers => handlers.AddHandler<MyMessageHandler>()
复制代码

通过这种方式,可以轻松地将消费者和处理器分开,从而提升了可维护性和可测性。如果你的微服务只处理具有一种消息类型的一个主题,这可能会显得引入了不必要的复杂性。在这种情况下,你可以使用中间件。

KafkaFlow 中的中间件

KafkaFlow 是面向中间件的。你可能已经注意到,在消息处理器的代码片段中提到了“中间件”。所以,你可能会问什么是中间件。

中间件使得类型化处理器(Typed Handler)成为可能。消息会被传递到一个中间件管道,该管道将会被依次调用。如果你使用过 MediatR 管道的话,可能会对这一概念有所了解。此外,中间件还可以用来进行一系列的转换。换句话说,给定的中间件可以将传入的消息转换到下一个中间件。

KafkaFlow 中的中间件封装了处理消息的逻辑。管道是可扩展的,允许开发人员在消息处理管道中添加行为。

如下是一个中间件的样例:

public class MyMiddleware : IMessageMiddlewarepublic async Task Invoke(IMessageContext context, MiddlewareDelegate next)// 预处理逻辑位于这里await next(context);// 后处理逻辑位于这里
复制代码

要使用该中间件,可以在消费者配置中进行注册:

.AddConsumer(consumer => consumer.AddMiddlewares(middlewares => middlewares
复制代码

通过这种方式,开发人员就可以在消息处理管道中插入自定义逻辑,从而提供灵活性和控制力。类型化处理器是中间件的一种形式。所以,你甚至可以在没有类型化处理器的情况下处理消息,实现自己的中间件,或者也可以使用中间件来构建消息管道,在处理消息之前执行校验、丰富化等操作。

在 KafkaFlow 中处理并发

一旦开始思考基础设施的效率,你就会发现许多 Kafka 消费者没有得到充分利用。最常见的实现方式是单线程的,这限制了资源的利用率。因此,当我们需要扩展的时候,只能进行横向扩展,以保持所需的吞吐量。

KafkaFlow 为实现基础设施的高效率带来了另外一种可选方案。KafkaFlow 让开发人员可以控制单个消费者可以并发处理多少消息。它使用了 Worker 的理念,这些 Worker 可以协同消费一个主题。这一功能能够让你优化 Kafka 消费者,使其更好地匹配系统的能力。

如下是一个如何为消费者设置并发 worker 数量的样例:

.AddConsumer(consumer => consumer.Topic("topic-name").WithGroupId("sample-group").WithBufferSize(100).WithWorkersCount(10) // 设置worker的数量.AddMiddlewares(middlewares => middlewares
复制代码

即便有并发 worker,KafkaFlow 也能确保顺序。

批处理

随着规模的扩大,你将会面临延迟和吞吐量之间的权衡。为了解决这个问题,KafkaFlow 有一个重要的特性,叫做“批量消费”。这个特性满足了以批量方式消费和处理来自 Kafka 的消息时对效率和性能的要求。在需要一起处理一组消息,而不是单个处理消息的场景下,该特性发挥着重要作用。

什么是批量消费?

在批量消费方式中,系统不是在收到消息后对其进行原子性地处理,而是将多条消息分组,然后一次性地对其进行处理。这种方法在处理大量数据时更为有效,尤其是在消息相互独立的情况下。批量执行操作会提高整体性能。

KafkaFlow 的批量消费方式

KafkaFlow 利用中间件系统提供批量处理功能。批量处理中间件能够让你根据批量大小或时间跨度(timespan)对消息进行分组。一旦达到其中的某个条件,中间件就会将这组消息转发给下一个中间件。

services.AddKafka(kafka => kafka.AddCluster(cluster => cluster.WithBrokers(new[] { "host:9092" }).AddConsumer(consumerBuilder => consumerBuilder.AddMiddlewares(middlewares => middlewares.BatchConsume(100, TimeSpan.FromSeconds(10))
复制代码

批量消费对性能的影响

通过批量处理,开发人员可以在基于 Kafka 的应用程序中实现更高的吞吐量。它可以加快处理速度,因为与启动和完成每个处理任务相关的开销会大大减少。这将全面提升系统的性能。

同时,这种方式还能减少网络 I/O 操作,因为数据是以更大的分块获取的,这能够进一步提高处理速度,尤其是在需要关注网络延迟的系统中。

KafkaFlow 的消费者管理

KafkaFlow 还简化了 Kafka 消费者管理相关的任务。通过 KafkaFlow 的管理 API,我们可以启动、停止、暂停消费者以及倒回偏移(rewind offset)。

管理 API 可以在编程接口、REST API 或 Dashboard UI 中使用。

KafkaFlow 的管理仪表盘

消费者限流

通常,底层技术可能无法像 Kafka 消费者那样以相同的方式应对高负载期。这会带来稳定性的问题,而这正是限流的用武之地。

消费者限流是一种管理消息消费的方式,它能够使应用程序根据指标动态调整消息消费的速度。

优先级

假设你正在运行一个应用程序,希望将原子操作和批量操作分隔到不同的消费者和主题中。与批量操作相比,你可能更愿意优先处理原子操作。按照传统方式,由于消息生成的速度可能存在差异,所以管理这种差异化可能很具挑战性。

在这种情况下,消费者限流就很有价值了,它允许我们监控那些负责原子操作的消费者的滞后(lag)情况。根据这一指标,我们可以对处理批量操作的消费者实施限流,确保优先处理原子操作。

那结果是什么呢?高效、灵活和优化的消费流程。

借助 KafkaFlow 的流畅接口,为消费者添加限流功能是非常简单的。下面是一个简单的样例:

.AddConsumer(consumer => consumer.Topic("bulk-topic").WithName("bulkConsumer").AddMiddlewares(middlewares => middlewares.ThrottleConsumer(.ByOtherConsumersLag("singleConsumer").WithInterval(TimeSpan.FromSeconds(5)).AddAction(a => a.AboveThreshold(10).ApplyDelay(100)).AddAction(a => a.AboveThreshold(100).ApplyDelay(1_000)).AddAction(a => a.AboveThreshold(1_000).ApplyDelay(10_000))).AddSerializer<JsonCoreSerializer>()
复制代码

KafkaFlow:展望未来

目前,KafkaFlow 在 Kafka 的基础上提供了一个健壮的、对开发人员友好的抽象,简化了使用.NET 构建实时数据处理应用程序的过程。但是,与其他活跃的开源项目一样,KafkaFlow 也在不断演进和完善。

从项目目前的发展轨迹来看,我们可以预测几个方面的发展方向。例如,KafkaFlow 可能会进一步增强其中间件系统,为消息处理提供更多的控制权和灵活性。我们可能还会看到更广泛的管理 API,为开发人员提供对 Kafka 集群更大的控制权。

由于设计上的可扩展性,我们可以期待 KafkaFlow 社区会不断壮大,带来更多的贡献、创新特性、扩展和支持。随着越来越多的开发人员和组织采用 KafkaFlow,我们会看到学习资源、教程、案例和其他社区内容不断涌现,这些内容可以帮助新用户入门,也可以帮助现有的用户从库中学习更多的知识。

结论

KafkaFlow 是一个便利、对开发人员友好的工具,它简化了在.NET 中使用 Kafka 的工作。在开发人员体验和可用性方面,它均表现出色。该框架的设计非常适合整洁、可读性强的代码。在 Apache Kafka 上构建应用程序时,KafkaFlow 通过中间件、消息处理器以及对复杂问题的抽象,实现了清晰的分离,这有助于保持代码库的可管理性和可理解性。

除此之外,围绕 KafkaFlow 的社区在不断壮大。如果你正在使用 Kafka 并希望提高生产力和可靠性,那 KafkaFlow 绝对值得考虑。

原文链接:

Building Kafka Event-Driven Applications with KafkaFlow

声明:本文来自用户分享和网络收集,仅供学习与参考,测试请备份。