您当前的位置:首页 > 电脑百科 > 数据库 > 百科

如何使用Python、Apache Kafka和云平台构建健壮的实时数据管道

时间:2024-01-26 12:42:51  来源:51CTO  作者:

译者 | 李睿

审校 | 重楼

在当今竞争激烈的市场环境中,为了生存和发展,企业必须能够实时收集、处理和响应数据。无论是检测欺诈、个性化用户体验还是监控系统,现在都需要接近即时的数据。

然而,构建和运行任务关键型实时数据管道具有挑战性。基础设施必须具有容错性、无限可扩展性,并与各种数据源和应用程序集成。这就是ApacheKafka、Python/ target=_blank class=infotextkey>Python和云平台的用武之地。

这个综合指南中将介绍:

  • 概述Apache Kafka架构
  • 在云中运行Kafka集群
  • 使用Python构建实时数据管道
  • 使用PySpark进行扩展处理
  • 实际示例,例如用户活动跟踪、物联网数据管道,并支持聊天分析

这里将包括大量的代码片段、配置示例和文档链接,以便获得这些非常有用的技术的实践经验。

Apache Kafka架构介绍

Apache Kafka是一个分布式、分区、复制的提交日志,用于可靠且大规模地存储数据流。Apache Kafka的核心是提供以下功能:

  • 发布-订阅消息:Kafka允许广播来自生产者的数据流,例如页面浏览量、交易、用户事件等,并支持消费者实时消费。
  • 消息存储:Kafka在消息到达时将其持久保存在磁盘上,并在指定的时间内保留它们。消息通过指示日志中位置的偏移量来存储和索引。
  • 容错:数据在可配置数量的服务器上复制。如果一台服务器宕机,另一台服务器可以保证持续运行。
  • 横向可扩展性:Kafka集群可以通过简单地添加更多的服务器来弹性扩展。这允许无限的存储和处理能力。

Kafka架构由以下主要组件组成:

(1)主题

消息被发布到名为“主题”的类别中。每个主题都充当消息提要或消息队列。常见的场景是每个消息类型或数据流的一个主题。Kafka主题中的每条消息都有一个唯一的标识符,称为偏移量,它代表了在主题中的位置。一个主题可以分为多个分区,这些分区是可以存储在不同代理上的主题片段。分区允许Kafka通过在多个消费者之间分配负载来扩展和并行化数据处理。

(2)生产者

生产者是向Kafka主题发布消息的应用程序。它们连接到Kafka集群,序列化数据(例如JSON或Avro),分配一个密钥,并将其发送到适当的主题。

例如,一个Web应用程序可以产生点击流事件,或者一个移动应用程序可以产生使用统计。

(3)消费者

消费者从Kafka主题中读取消息并进行处理。处理可能涉及解析数据、验证、聚合、过滤、存储到数据库等。

消费者连接到Kafka集群,并订阅一个或多个主题来获取消息提要,然后根据用例需求进行处理。

(4)代理

这是一个Kafka服务器,它接收来自生产者的消息,分配偏移量,将消息提交到存储中,并将数据提供给消费者。Kafka集群由多个代理组成,以实现可扩展性和容错性。

(5)ZooKeeper

ZooKeeper处理代理之间的协调和共识,例如控制器选举和主题配置。它维护Kafka操作所需的集群状态和配置信息。

这涵盖了Kafka的基础知识。要深入了解,可以参考一些Kafka文档。

以下了解如何通过在云中运行Kafka来简化管理。

在云中运行Kafka

虽然Kafka具有高度可扩展性和可靠性,但它的运行涉及部署、基础设施管理、监控、安全、故障处理、升级等方面的大量工作。

值得庆幸的是,Kafka现在是所有主要云计算提供商提供的完全托管服务:

服务

描述

定价

AWS MSK

在AWS上完全托管、高可用的Apache Kafka集群。处理基础设施,扩展,安全,故障处理等。

基于代理的数量

google Cloud Pub/Sub

基于Kafka的无服务器实时消息服务。自动扩展,至少一次交付保证。

基于使用指标

Confluent Cloud

完全管理的事件流平台,由Apache Kafka提供支持。提供免费层。

基于功能的分层定价

Azure Event Hubs

Apache Kafka的高吞吐量事件摄取服务。与Azure数据服务的集成。

基于吞吐量单位

托管服务抽象了Kafka操作的复杂性,可以让用户专注数据管道。

接下来,将使用Python、Kafka和云平台构建一个实时管道。也可以参考以下的指南作为另一个示例。

构建实时数据管道

Kafka的基本实时管道有两个主要组件:向Kafka发布消息的生产者和订阅主题并处理消息的消费者。

其架构遵循以下流程:

为了进行简化,将使用Confluent Kafka Python客户端库。

1. Python生产者

生产者应用程序从数据源收集数据并将其发布到Kafka主题。作为一个例子,假设有一个Python服务从一个Web应用程序收集用户点击流事件。

在Web应用程序中,当用户的行为像是页面浏览或产品评级时,可以捕获这些事件并将它们发送给Kafka。

可以抽象出Web应用程序如何收集数据的实现细节。

Python 
 from confluent_kafka import Producer
 import json

 # User event data
 event = {
 "timestamp": "2022-01-01T12:22:25", 
 "userid": "user123",
 "page": "/product123", 
 "action": "view"
 }

 # Convert to JSON
 event_json = json.dumps(event)

 # Kafka producer configuration 
 conf = {
 'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092',
 'client.id': 'clickstream-producer' 
 }

 # Create producer instance
 producer = Producer(conf)

 # Publish event 
 producer.produce(topic='clickstream', value=event_json)

 # Flush and close producer
 producer.flush()
 producer.close()

这将事件发布到云托管Kafka集群上的clickstream主题。

Confluent_Kafka Python客户端在将消息发送到Kafka之前使用内部缓冲区来批处理消息。与单独发送每条消息相比,这提高了效率。

在默认情况下,消息会在缓冲区中累积,直到:

(1)已达到缓冲区大小限制(默认为32MB)。

(2)调用flush()方法。

当调用flush()时,缓冲区中的任何消息都会立即发送到Kafka代理。

如果不调用flush(),而是依赖于缓冲区大小限制,那么在下一次自动刷新之前,如果发生故障,就有丢失事件的风险。调用flush()能够更好地控制最小化潜在的消息丢失。

但是,在每次生产后调用flush()会带来额外的开销。找到合适的缓冲配置取决于特定的可靠性需求和吞吐量需求。

可以在事件发生时不断添加事件来构建实时流。这为下游数据消费者提供了连续的事件提要。

2.Python消费者

接下来,有一个消费者应用程序来从Kafka摄取事件并处理它们。

例如,可能想要解析事件,筛选特定的子类型,并验证模式。

Python 
 from confluent_kafka import Consumer
 import json

 # Kafka consumer configuration
 conf = {'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092',
  'group.id': 'clickstream-processor',
 'auto.offset.reset': 'earliest'}

 # Create consumer instance 
 consumer = Consumer(conf)

 # Subscribe to 'clickstream' topic
 consumer.subscribe(['clickstream'])

 # Poll Kafka for messages infinitely 
 while True:
 msg = consumer.poll(1.0)
 if msg is None:
 continue
 
 # Parse JSON from message value
 event = json.loads(msg.value())
 
 # Process event based on business logic
 if event['action'] == 'view':
 print('User viewed product page')
 
 elif event['action'] == 'rating':
 # Validate rating, insert to DB etc
  pass
 
 print(event) # Print event 
 
 # Close consumer
 consumer.close()

这个轮询clickstream主题以获取新消息,使用它们,并根据事件类型采取行动——打印、更新数据库等。

对于一个简单的管道来说,这很有效。但如果每秒事件数增加100倍呢?消费者将无法跟上其增长。这就是像PySpark这样的工具可以帮助扩展处理的地方。

3.使用PySpark进行扩展

PySpark为Apache Spark提供了一个Python API,Apache Spark是一个为大规模数据处理优化的分布式计算框架

使用PySpark,可以利用Spark的内存计算和并行执行来更快地使用Kafka流。

首先,将Kafka数据加载到DataFrame中,DataFrame可以使用Spark SQL或Python进行操作。

Python 
 from pyspark.sql import SparkSession

 # Initialize Spark session
 spark = SparkSession.builder 
 .AppName('clickstream-consumer') 
 .getOrCreate()

 # Read stream from Kafka 'clickstream' 
 df = spark.readStream 
 .format("kafka") 
 .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") 
 .option("subscribe", "clickstream") 
 .load()

 # Parse JSON from value
 df = df.selectExpr("CAST(value AS STRING)")
 df = df.select(from_json(col("value"), schema).alias("data"))
 Next, we can express whatever processing logic we need using DataFrame transformations:
 from pyspark.sql.functions import *

 # Filter for 'page view' events 
 views = df.filter(col("data.action") == "view")

 # Count views per page URL 
 counts = views.groupBy(col("data.page"))
 .count()
 .orderBy("count")

 # Print the stream 
 query = counts.writeStream 
 .outputMode("complete")  
 .format("console") 
 .start() 
 
 query.awAItTermination()

它利用Spark的分布式运行时,在数据流上实时应用过滤、聚合和排序等操作。

还可以使用多个消费者组并行化消费,并将输出接收器写入数据库、云存储等。

这允许在Kafka的数据上构建可扩展的流处理。

现在已经介绍了端到端管道,以下了解应用它的一些实际用例。

实际用例

以下探索一些实际用例,在这些用例中,这些技术可以帮助大规模地处理大量实时数据。

1.用户活动跟踪

许多现代网络和移动应用程序跟踪用户的行为,例如页面浏览量、按钮点击、交易等,以收集使用情况分析。

(1)问题

  • 数据量可以随着数百万活跃用户而大规模扩展。
  • 需要实时洞察以检测问题并个性化内容。
  • 希望为历史报表存储汇总数据。

(2)解决方案

  • 使用Python或任何语言将点击流事件摄取到Kafka主题中。
  • 使用PySpark进行清理、聚合和分析。
  • 将输出保存到数据库,例如Cassandra的仪表板。
  • 使用Spark ML实时警报检测异常。

2.物联网数据管道

物联网传感器产生大量的实时遥测数据,例如温度、压力、位置等。

(1)问题

  • 每秒产生数百万个传感器事件。
  • 需要清洗、改造、丰富。
  • 需要实时监控和历史存储。

(2)解决方案

  • 使用语言SDK收集Kafka主题中的传感器数据。
  • 使用PySpark进行数据整理和连接外部数据。
  • 将数据流输入机器学习模型进行实时预测。
  • 将聚合数据存储在时间序列数据库中以实现可视化。

3.客户支持聊天分析

像Zendesk这样的聊天平台捕获了大量的客户支持对话。

(1)问题

  • 每月产生数百万条聊天信息。
  • 需要了解客户痛点和代理表现。
  • 必须发现负面情绪和紧急问题。

(2)解决方案

  • 使用连接器将聊天记录导入Kafka主题。
  • 使用PySpark SQL和DataFrames进行聚合和处理。
  • 将数据输入NLP模型,对情绪和意图进行分类。
  • 存储洞察到数据库的历史报告。
  • 为联络中心操作提供实时仪表板。

这个用例演示了如何将这些技术应用于涉及大量快速移动数据的实际业务问题。

结论

综上所述, Python、Kafka和云平台为构建健壮的、可扩展的实时数据管道提供了一个很好的组合。

原文标题:Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud,作者:Dmitrii Mitiaev



Tags:数据管道   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除。
▌相关推荐
如何使用Python、Apache Kafka和云平台构建健壮的实时数据管道
译者 | 李睿审校 | 重楼在当今竞争激烈的市场环境中,为了生存和发展,企业必须能够实时收集、处理和响应数据。无论是检测欺诈、个性化用户体验还是监控系统,现在都需要接近即时...【详细内容】
2024-01-26  Tags: 数据管道  点击:(2)  评论:(0)  加入收藏
一文带你了解大数据管道
介绍如果您从大数据开始,通常会被众多工具,框架和选项所困扰。 在本文中,我将尝试总结其成分和基本配方,以帮助您开始大数据之旅。 我的目标是对不同的工具进行分类,并试图解释每...【详细内容】
2020-10-07  Tags: 数据管道  点击:(312)  评论:(0)  加入收藏
聊聊数据的高效传输带——数据管道DataPipeline
数据管道(Data Pipeline)是一种允许数据通过数据分析过程从一个位置高效流向另一个位置的软件。这就好比一条传送带,它能高效、准确地将数据传送到流程的每一步。例如,数据管道...【详细内容】
2020-09-18  Tags: 数据管道  点击:(407)  评论:(0)  加入收藏
▌简易百科推荐
如何使用Python、Apache Kafka和云平台构建健壮的实时数据管道
译者 | 李睿审校 | 重楼在当今竞争激烈的市场环境中,为了生存和发展,企业必须能够实时收集、处理和响应数据。无论是检测欺诈、个性化用户体验还是监控系统,现在都需要接近即时...【详细内容】
2024-01-26    51CTO  Tags:数据管道   点击:(2)  评论:(0)  加入收藏
一文读懂:什么是数据库,它到底有啥用?
提到数据库,可能很多人会很陌生。但据库其实已经渗入我们生活的方方面面,像网上购物、扫码点餐、抢红包等等应用背后都离不开数据库的支持。可以说数据库是支撑各类应用软件运...【详细内容】
2024-01-25  了不起的云计算    Tags:数据库   点击:(3)  评论:(0)  加入收藏
一个流行的支持超多数据库的ORM库
Sequelize 是一个流行的 Node.js ORM(对象关系映射)库,用于在 Node.js 中操作关系型数据库。它支持多种数据库系统,如 PostgreSQL、MySQL、SQLite 和 MSSQL,并提供了简单易用的 A...【详细内容】
2024-01-15  老猫coder  微信公众号  Tags:数据库   点击:(14)  评论:(0)  加入收藏
简易百科之什么是向量数据库
随着大数据时代的到来,数据存储和处理成为了一个重要的问题。传统的关系型数据库已经无法满足一些场景的需求,例如大规模高维数据的处理和分析。在这样的背景下,向量数据库应运...【详细内容】
2024-01-15    简易百科  Tags:向量数据库   点击:(31)  评论:(0)  加入收藏
通用数据湖仓一体架构正当时
这篇博文中提出的建议并不新鲜。事实上许多组织已经投入了数年时间和昂贵的数据工程团队的工作,以慢慢构建这种架构的某个版本。我知道这一点,因为我以前在Uber和LinkedIn做过...【详细内容】
2024-01-15    IT168企业级  Tags:数据湖   点击:(11)  评论:(0)  加入收藏
腾讯云把向量数据库“卷”到哪一步了?
“不是我不明白,这世界变化快”,崔健在20世纪写下的这句歌词,放在刚刚过去的2023年,也同样适用。技术风向的变化之快,让不少人感到惊讶,向量数据库这一年的潮起潮落,就是一个典型的...【详细内容】
2024-01-14  AI新智界    Tags:向量数据库   点击:(12)  评论:(0)  加入收藏
MongoDB 大量数据插入时的性能影响及解决方法
MongoDB 是一种广泛应用的 NoSQL 数据库,以其高度可扩展性和灵活性而闻名。然而,在处理大量数据时,MongoDB 的性能可能会受到一些影响。大量数据插入对 MongoDB 性能的影响磁盘...【详细内容】
2024-01-05  编程技术汇  今日头条  Tags:MongoDB   点击:(18)  评论:(0)  加入收藏
数据库中避免INNER JOIN连接表时出现重复数据的方法
在进行 SQL 查询时,我们经常需要联合多个表来获取更全面的数据。然而,在使用 INNER JOIN 连接表时,有时会遇到重复数据的问题,这可能会导致查询结果不准确或者性能下降。在关系...【详细内容】
2024-01-04  编程技术汇  今日头条  Tags:数据库   点击:(17)  评论:(0)  加入收藏
加快SQL查询的九种优秀实践
译者 | 陈峻审校 | 重楼如您所知,SQL多年来一直是开发和查询数据库的主要语言。在编程实践中,人们逐渐积累了各种在使用过程中的小技巧。下面,让我们来看看有关如何编写出更高...【详细内容】
2024-01-04    51CTO  Tags:SQL查询   点击:(12)  评论:(0)  加入收藏
MyCat分库分表实时同步到GreatSQL
这个事情怎么产生的MyCat作为经典的分库分表中间件,在长时间内被广泛认为是管理超大MySQL数据库集合的有效解决方案。近来接到客户需求,需要将MyCat集群迁移到GreatSQL中,并且...【详细内容】
2024-01-03  GreatSQL社区  微信公众号  Tags:MyCat   点击:(30)  评论:(0)  加入收藏
站内最新
站内热门
站内头条