您当前的位置:首页 > 电脑百科 > 程序开发 > 框架

Kafka不常见但是很高级的功能: Kafka 拦截器

时间:2023-02-03 12:15:58  来源:51CTO  作者:一个即将退役的码农
今天我们花了一些时间讨论 Kafka 提供的冷门功能:拦截器。如之前所说,拦截器的出场率极低,以至于我从未看到过国内大厂实际应用 Kafka 拦截器的报道。但冷门不代表没用。事实上,我们可以利用拦截器满足实际的需求,比如端到端系统性能检测、消息审计等。​

​既然是不常见,那就说明在实际场景中并没有太高的出场率,但它们依然是很高级很实用的。下面就有请今天的主角登场:Kafka 拦截器。

什么是拦截器?

如果你用过 Spring Interceptor 或是 Apache Flume,那么应该不会对拦截器这个概念感到陌生,其基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。下面这张图展示了 Spring MVC 拦截器的工作原理:

图片来源:https://o7planning.org/en/11229/spring-mvc-interceptors-tutorial

拦截器 1 和拦截器 2 分别在请求发送之前、发送之后以及完成之后三个地方插入了对应的处理逻辑。而 Flume 中的拦截器也是同理,它们插入的逻辑可以是修改待发送的消息,也可以是创建新的消息,甚至是丢弃消息。这些功能都是以配置拦截器类的方式动态插入到应用程序中的,故可以快速地切换不同的拦截器而不影响主程序逻辑。

Kafka 拦截器借鉴了这样的设计思路。你可以在消息处理的前后多个时点动态植入不同的处理逻辑,比如在消息发送前或者在消息被消费后。

作为一个非常小众的功能,Kafka 拦截器自 0.10.0.0 版本被引入后并未得到太多的实际应用,我也从未在任何 Kafka 技术峰会上看到有公司分享其使用拦截器的成功案例。但即便如此,在自己的 Kafka 工具箱中放入这么一个有用的东西依然是值得的。今天我们就让它来发挥威力,展示一些非常酷炫的功能。

Kafka 拦截器

Kafka 拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。

举个例子,假设你想在生产消息前执行两个“前置动作”:第一个是为消息增加一个头信息,封装发送该消息的时间,第二个是更新发送消息数字段,那么当你将这两个拦截器串联在一起统一指定给 Producer 后,Producer 会按顺序执行上面的动作,然后再发送消息。

当前 Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。拿上面的例子来说,假设第一个拦截器的完整类路径是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二个类是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定拦截器:

 
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器 1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……

现在问题来了,我们应该怎么编写 AddTimeStampInterceptor 和 UpdateCounterInterceptor 类呢?其实很简单,这两个类以及你自己编写的所有 Producer 端拦截器实现类都要继承org.apache.kafka.clients.producer.ProducerInterceptor 接口。该接口是 Kafka 提供的,里面有两个核心的方法。

  1. onSend:该方法会在消息发送之前被调用。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会。
  2. onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。还记得我在上一期中提到的发送回调通知 callback 吗?onAcknowledgement 的调用要早于 callback 的调用。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。

同理,指定消费者拦截器也是同样的方法,只是具体的实现类要实现org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,这里面也有两个核心方法。

  1. onConsume:该方法在消息返回给 Consumer 程序之前调用。也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。
  2. onCommit:Consumer 在提交位移之后调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。

一定要注意的是,指定拦截器类时要指定它们的全限定名,即 full qualified name。通俗点说就是要把完整包名也加上,不要只有一个类名在那里,并且还要保证你的 Producer 程序能够正确加载你的拦截器类。

典型使用场景

Kafka 拦截器都能用在哪些地方呢?其实,跟很多拦截器的用法相同,Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。

我以端到端系统性能检测和消息审计为例来展开介绍下。

今天 Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,你很难从具体的消息维度去追踪集群间消息的流转路径。同时,如何监控一条消息从生产到最后消费的端到端延时也是很多 Kafka 用户迫切需要解决的问题。

从技术上来说,我们可以在客户端程序中增加这样的统计逻辑,但是对于那些将 Kafka 作为企业级基础架构的公司来说,在应用代码中编写统一的监控逻辑其实是很难的,毕竟这东西非常灵活,不太可能提前确定好所有的计算逻辑。另外,将监控逻辑与主业务逻辑耦合也是软件工程中不提倡的做法。

现在,通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够从具体的消息层面上去收集这些数据。这就是 Kafka 拦截器的一个非常典型的使用场景。

我们再来看看消息审计(message audit)的场景。设想你的公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要涉及多租户以及消息审计的功能。

作为私有云的 PaaS 提供方,你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器。

案例分享

下面我以一个具体的案例来说明一下拦截器的使用。在这个案例中,我们通过编写拦截器类来统计消息端到端处理的延时,非常实用,我建议你可以直接移植到你自己的生产环境中。

我曾经给一个公司做 Kafka 培训,在培训过程中,那个公司的人提出了一个诉求。他们的场景很简单,某个业务只有一个 Producer 和一个 Consumer,他们想知道该业务消息从被生产出来到最后被消费的平均总时长是多少,但是目前 Kafka 并没有提供这种端到端的延时统计。

学习了拦截器之后,我们现在知道可以用拦截器来满足这个需求。既然是要计算总延时,那么一定要有个公共的地方来保存它,并且这个公共的地方还是要让生产者和消费者程序都能访问的。在这个例子中,我们假设数据被保存在 redis 中。

Okay,这个需求显然要实现生产者拦截器,也要实现消费者拦截器。我们先来实现前者:

 
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
 
 
    private Jedis jedis; // 省略 Jedis 初始化
 
 
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        jedis.incr("totalSentMessage");
        return record;
    }
 
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }
 
 
    @Override
    public void close() {
    }
 
 
    @Override
    public void configure(Map<JAVA.lang.String, ?> configs) {
    }

上面的代码比较关键的是在发送消息前更新总的已发送消息数。为了节省时间,我没有考虑发送失败的情况,因为发送失败可能导致总发送数不准确。不过好在处理思路是相同的,你可以有针对性地调整下代码逻辑。

下面是消费者端的拦截器实现,代码如下:

 
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
 
 
    private Jedis jedis; // 省略 Jedis 初始化
 
 
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        for (ConsumerRecord<String, String> record : records) {
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", lantency);
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }
 
 
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }
 
 
    @Override
    public void close() {
    }
 
 
    @Override
    public void configure(Map<String, ?> configs) {

在上面的消费者拦截器中,我们在真正消费一批消息前首先更新了它们的总延时,方法就是用当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到 Redis 中。之后的逻辑就很简单了,我们分别从 Redis 中读取更新过的总延时和总消息数,两者相除即得到端到端消息的平均处理延时。

创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。

小结

今天我们花了一些时间讨论 Kafka 提供的冷门功能:拦截器。如之前所说,拦截器的出场率极低,以至于我从未看到过国内大厂实际应用 Kafka 拦截器的报道。但冷门不代表没用。事实上,我们可以利用拦截器满足实际的需求,比如端到端系统性能检测、消息审计等。​



Tags:Kafka   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
Spring实现Kafka重试Topic,真的太香了
概述Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重...【详细内容】
2024-01-26  Search: Kafka  点击:(91)  评论:(0)  加入收藏
如何使用Python、Apache Kafka和云平台构建健壮的实时数据管道
译者 | 李睿审校 | 重楼在当今竞争激烈的市场环境中,为了生存和发展,企业必须能够实时收集、处理和响应数据。无论是检测欺诈、个性化用户体验还是监控系统,现在都需要接近即时...【详细内容】
2024-01-26  Search: Kafka  点击:(49)  评论:(0)  加入收藏
深入浅出Kafka:高可用、顺序消费及幂等性
在我们旅行于数据海洋的途中,如果把 Kafka 比作是一艘承载无数信息航行的快船,前文《Kafka实战漫谈:大数据领域的不败王者》已经讲述了如何搭建起这艘快船,让它在起风的早晨开始...【详细内容】
2023-12-18  Search: Kafka  点击:(179)  评论:(0)  加入收藏
7k Star,一款开源的 Kafka 管理平台,功能齐全、页面美观!
Apache Kafka UI 是一个免费的开源 Web UI,用于监控和管理 Apache Kafka 集群,可方便地查看 Kafka Brokers、Topics、消息、Consumer 等情况,支持多集群管理、性能监控、访问控...【详细内容】
2023-12-15  Search: Kafka  点击:(137)  评论:(0)  加入收藏
利用Apache Kafka、Flink和Druid构建实时数据架构
译者 | 陈峻审校 | 重楼如今,对于使用批处理工作流程的数据团队而言,要满足业务的实时要求并非易事。从数据的交付、处理到分析,整个批处理工作流往往需要大量的等待,其中包括:等...【详细内容】
2023-12-11  Search: Kafka  点击:(239)  评论:(0)  加入收藏
运维兄弟!Kafka怎么又"超时"了?
现象凌晨,当运维刚躺下,就被业务研发的电话叫醒,"哥们!kafka服务又异常了?影响到业务了,快看看",业务研发给出的异常日志如下:基本分析 集群检查:立即确认kafka集群以及涉及到topic健...【详细内容】
2023-12-07  Search: Kafka  点击:(142)  评论:(0)  加入收藏
图解Kafka适用场景,全网最全!
消息系统消息系统被用于各种场景,如解耦数据生产者,缓存未处理的消息。Kafka 可作为传统的消息系统的替代者,与传统消息系统相比,kafka有更好的吞吐量、更好的可用性,这有利于处...【详细内容】
2023-11-29  Search: Kafka  点击:(184)  评论:(0)  加入收藏
Kafka有哪些应用场景?你能说上来几个?
下面我们来总结一下Kafka的一些应用场景:1、日志处理与分析(最常用的场景)下图显示了典型的 ELK(Elastic-Logstash-Kibana)堆栈。Kafka 有效地从每个实例收集日志流。ElasticSe...【详细内容】
2023-11-28  Search: Kafka  点击:(166)  评论:(0)  加入收藏
Kafka:解锁大数据时代的搜索与分析
在当今大数据时代,数据湖作为一种新兴的数据存储和分析解决方案,正受到越来越多企业的青睐。而作为一种高性能、可扩展的事件流平台,Kafka在数据湖领域发挥着重要的作用。本文...【详细内容】
2023-11-24  Search: Kafka  点击:(293)  评论:(0)  加入收藏
解密Kafka主题的分区策略:提升实时数据处理的关键
Kafka几乎是当今时代背景下数据管道的首选,无论你是做后端开发、还是大数据开发,对它可能都不陌生。开源软件Kafka的应用越来越广泛。面对Kafka的普及和学习热潮,哪吒想分享一...【详细内容】
2023-11-21  Search: Kafka  点击:(184)  评论:(0)  加入收藏
▌简易百科推荐
Qt与Flutter:在跨平台UI框架中哪个更受欢迎?
在跨平台UI框架领域,Qt和Flutter是两个备受瞩目的选择。它们各自具有独特的优势,也各自有着广泛的应用场景。本文将对Qt和Flutter进行详细的比较,以探讨在跨平台UI框架中哪个更...【详细内容】
2024-04-12  刘长伟    Tags:UI框架   点击:(6)  评论:(0)  加入收藏
Web Components实践:如何搭建一个框架无关的AI组件库
一、让人又爱又恨的Web ComponentsWeb Components是一种用于构建可重用的Web元素的技术。它允许开发者创建自定义的HTML元素,这些元素可以在不同的Web应用程序中重复使用,并且...【详细内容】
2024-04-03  京东云开发者    Tags:Web Components   点击:(8)  评论:(0)  加入收藏
Kubernetes 集群 CPU 使用率只有 13% :这下大家该知道如何省钱了
作者 | THE STACK译者 | 刘雅梦策划 | Tina根据 CAST AI 对 4000 个 Kubernetes 集群的分析,Kubernetes 集群通常只使用 13% 的 CPU 和平均 20% 的内存,这表明存在严重的过度...【详细内容】
2024-03-08  InfoQ    Tags:Kubernetes   点击:(22)  评论:(0)  加入收藏
Spring Security:保障应用安全的利器
SpringSecurity作为一个功能强大的安全框架,为Java应用程序提供了全面的安全保障,包括认证、授权、防护和集成等方面。本文将介绍SpringSecurity在这些方面的特性和优势,以及它...【详细内容】
2024-02-27  风舞凋零叶    Tags:Spring Security   点击:(59)  评论:(0)  加入收藏
五大跨平台桌面应用开发框架:Electron、Tauri、Flutter等
一、什么是跨平台桌面应用开发框架跨平台桌面应用开发框架是一种工具或框架,它允许开发者使用一种统一的代码库或语言来创建能够在多个操作系统上运行的桌面应用程序。传统上...【详细内容】
2024-02-26  贝格前端工场    Tags:框架   点击:(50)  评论:(0)  加入收藏
Spring Security权限控制框架使用指南
在常用的后台管理系统中,通常都会有访问权限控制的需求,用于限制不同人员对于接口的访问能力,如果用户不具备指定的权限,则不能访问某些接口。本文将用 waynboot-mall 项目举例...【详细内容】
2024-02-19  程序员wayn  微信公众号  Tags:Spring   点击:(40)  评论:(0)  加入收藏
开发者的Kubernetes懒人指南
你可以将本文作为开发者快速了解 Kubernetes 的指南。从基础知识到更高级的主题,如 Helm Chart,以及所有这些如何影响你作为开发者。译自Kubernetes for Lazy Developers。作...【详细内容】
2024-02-01  云云众生s  微信公众号  Tags:Kubernetes   点击:(53)  评论:(0)  加入收藏
链世界:一种简单而有效的人类行为Agent模型强化学习框架
强化学习是一种机器学习的方法,它通过让智能体(Agent)与环境交互,从而学习如何选择最优的行动来最大化累积的奖励。强化学习在许多领域都有广泛的应用,例如游戏、机器人、自动驾...【详细内容】
2024-01-30  大噬元兽  微信公众号  Tags:框架   点击:(71)  评论:(0)  加入收藏
Spring实现Kafka重试Topic,真的太香了
概述Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重...【详细内容】
2024-01-26  HELLO程序员  微信公众号  Tags:Spring   点击:(91)  评论:(0)  加入收藏
SpringBoot如何实现缓存预热?
缓存预热是指在 Spring Boot 项目启动时,预先将数据加载到缓存系统(如 Redis)中的一种机制。那么问题来了,在 Spring Boot 项目启动之后,在什么时候?在哪里可以将数据加载到缓存系...【详细内容】
2024-01-19   Java中文社群  微信公众号  Tags:SpringBoot   点击:(88)  评论:(0)  加入收藏
站内最新
站内热门
站内头条