您当前的位置:首页 > 电脑百科 > 程序开发 > 框架

并行计算框架Polars、Dask的数据处理性能对比

时间:2023-07-10 16:24:57  来源:  作者:Luís Oliveira

在Pandas 2.0发布以后,我们发布过一些评测的文章,这次我们看看,除了Pandas以外,常用的两个都是为了大数据处理的并行数据框架的对比测试。

本文我们使用两个类似的脚本来执行提取、转换和加载(ETL)过程。

测试内容

这两个脚本主要功能包括:

从两个parquet 文件中提取数据,对于小型数据集,变量path1将为“yellow_tripdata/ yellow_tripdata_2014-01”,对于中等大小的数据集,变量path1将是“yellow_tripdata/yellow_tripdata”。对于大数据集,变量path1将是“yellow_tripdata/yellow_tripdata*.parquet”;

进行数据转换:a)连接两个DF,b)根据PULocationID计算行程距离的平均值,c)只选择某些条件的行,d)将步骤b的值四舍五入为2位小数,e)将列“trip_distance”重命名为“mean_trip_distance”,f)对列“mean_trip_distance”进行排序。

将最终的结果保存到新的文件。

脚本

1、Polars

数据加载读取

def extraction():
    """
    Extract two datasets from parquet files
    """
    path1="yellow_tripdata/yellow_tripdata_2014-01.parquet"
    df_trips= pl_read_parquet(path1,)
    path2 = "taxi+_zone_lookup.parquet"
    df_zone = pl_read_parquet(path2,)
 
    return df_trips, df_zone
 
 def pl_read_parquet(path, ):
    """
    Converting parquet file into Polars dataframe
    """
    df= pl.scan_parquet(path,)
    return df

转换函数

def transformation(df_trips, df_zone):
    """
    Proceed to several transformations
    """
    df_trips= mean_test_speed_pl(df_trips, )
     
    df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",)
    df = df.select(["Borough","Zone","trip_distance",])
   
    df = get_Queens_test_speed_pd(df)
    df = round_column(df, "trip_distance",2)
    df = rename_column(df, "trip_distance","mean_trip_distance")
 
    df = sort_by_columns_desc(df, "mean_trip_distance")
    return df
 
 
 def mean_test_speed_pl(df_pl,):
    """
    Getting Mean per PULocationID
    """
    df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
    return df_pl
 
 def get_Queens_test_speed_pd(df_pl):
    """
    Only getting Borough in Queens
    """
 
    df_pl = df_pl.filter(pl.col("Borough")=='Queens')
 
    return df_pl
 
 def round_column(df, column,to_round):
    """
    Round numbers on columns
    """
    df = df.with_columns(pl.col(column).round(to_round))
    return df
 
 def rename_column(df, column_old, column_new):
    """
    Renaming columns
    """
    df = df.rename({column_old: column_new})
    return df
 
 def sort_by_columns_desc(df, column):
    """
    Sort by column
    """
    df = df.sort(column, descending=True)
    return df

保存

def loading_into_parquet(df_pl):
    """
    Save dataframe in parquet
    """
    df_pl.collect(streaming=True).write_parquet(f'yellow_tripdata_pl.parquet')

其他代码

import polars as pl
 import time
 
 def pl_read_parquet(path, ):
    """
    Converting parquet file into Polars dataframe
    """
    df= pl.scan_parquet(path,)
    return df
 
 def mean_test_speed_pl(df_pl,):
    """
    Getting Mean per PULocationID
    """
    df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
    return df_pl
 
 def get_Queens_test_speed_pd(df_pl):
    """
    Only getting Borough in Queens
    """
 
    df_pl = df_pl.filter(pl.col("Borough")=='Queens')
 
    return df_pl
 
 def round_column(df, column,to_round):
    """
    Round numbers on columns
    """
    df = df.with_columns(pl.col(column).round(to_round))
    return df
 
 def rename_column(df, column_old, column_new):
    """
    Renaming columns
    """
    df = df.rename({column_old: column_new})
    return df
 
 
 def sort_by_columns_desc(df, column):
    """
    Sort by column
    """
    df = df.sort(column, descending=True)
    return df
 
 
 def mAIn():
     
    print(f'Starting ETL for Polars')
    start_time = time.perf_counter()
 
    print('Extracting...')
    df_trips, df_zone =extraction()
        
    end_extract=time.perf_counter() 
    time_extract =end_extract- start_time
 
    print(f'Extraction Parquet end in {round(time_extract,5)} seconds')
    print('Transforming...')
    df = transformation(df_trips, df_zone)
    end_transform = time.perf_counter() 
    time_transformation =time.perf_counter() - end_extract
    print(f'Transformation end in {round(time_transformation,5)} seconds')
    print('Loading...')
    loading_into_parquet(df,)
    load_transformation =time.perf_counter() - end_transform
    print(f'Loading end in {round(load_transformation,5)} seconds')
    print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}")
 
 
 if __name__ == "__main__":
     
    main()

2、Dask

函数功能与上面一样,所以我们把代码整合在一起:

import dask.dataframe as dd
 from dask.distributed import Client
 import time
 
 def extraction():
    path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet"
    df_trips = dd.read_parquet(path1)
    path2 = "taxi+_zone_lookup.parquet"
    df_zone = dd.read_parquet(path2)
 
    return df_trips, df_zone
 
 def transformation(df_trips, df_zone):
    df_trips = mean_test_speed_dask(df_trips)
    df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID")
    df = df[["Borough", "Zone", "trip_distance"]]
 
    df = get_Queens_test_speed_dask(df)
    df = round_column(df, "trip_distance", 2)
    df = rename_column(df, "trip_distance", "mean_trip_distance")
 
    df = sort_by_columns_desc(df, "mean_trip_distance")
    return df
 
 def loading_into_parquet(df_dask):
    df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet")
 
 def mean_test_speed_dask(df_dask):
    df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"})
    return df_dask
 
 def get_Queens_test_speed_dask(df_dask):
    df_dask = df_dask[df_dask["Borough"] == "Queens"]
    return df_dask
 
 def round_column(df, column, to_round):
    df[column] = df[column].round(to_round)
    return df
 
 def rename_column(df, column_old, column_new):
    df = df.rename(columns={column_old: column_new})
    return df
 
 def sort_by_columns_desc(df, column):
    df = df.sort_values(column, ascending=False)
    return df
 
 
 
 def main():
    print("Starting ETL for Dask")
    start_time = time.perf_counter()
 
    client = Client() # Start Dask Client
 
    df_trips, df_zone = extraction()
 
    end_extract = time.perf_counter()
    time_extract = end_extract - start_time
 
    print(f"Extraction Parquet end in {round(time_extract, 5)} seconds")
    print("Transforming...")
    df = transformation(df_trips, df_zone)
    end_transform = time.perf_counter()
    time_transformation = time.perf_counter() - end_extract
    print(f"Transformation end in {round(time_transformation, 5)} seconds")
    print("Loading...")
    loading_into_parquet(df)
    load_transformation = time.perf_counter() - end_transform
    print(f"Loading end in {round(load_transformation, 5)} seconds")
    print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}")
 
    client.close() # Close Dask Client
 
 if __name__ == "__main__":
    main()

测试结果对比

1、小数据集

我们使用164 Mb的数据集,这样大小的数据集对我们来说比较小,在日常中也时非常常见的。

下面是每个库运行五次的结果:

Polars

Dask

2、中等数据集

我们使用1.1 Gb的数据集,这种类型的数据集是GB级别,虽然可以完整的加载到内存中,但是数据体量要比小数据集大很多。

Polars

Dask

3、大数据集

我们使用一个8gb的数据集,这样大的数据集可能一次性加载不到内存中,需要框架的处理。

Polars

Dask

总结

从结果中可以看出,Polars和Dask都可以使用惰性求值。所以读取和转换非常快,执行它们的时间几乎不随数据集大小而变化;

可以看到这两个库都非常擅长处理中等规模的数据集。

由于polar和Dask都是使用惰性运行的,所以下面展示了完整ETL的结果(平均运行5次)。

Polars在小型数据集和中型数据集的测试中都取得了胜利。但是,Dask在大型数据集上的平均时间性能为26秒。

这可能和Dask的并行计算优化有关,因为官方的文档说“Dask任务的运行速度比Spark ETL查询快三倍,并且使用更少的CPU资源”。

上面是测试使用的电脑配置,Dask在计算时占用的CPU更多,可以说并行性能更好。



Tags:框架   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
Web Components实践:如何搭建一个框架无关的AI组件库
一、让人又爱又恨的Web ComponentsWeb Components是一种用于构建可重用的Web元素的技术。它允许开发者创建自定义的HTML元素,这些元素可以在不同的Web应用程序中重复使用,并且...【详细内容】
2024-04-03  Search: 框架  点击:(8)  评论:(0)  加入收藏
Htmx,它到底是框架还是库?
在最近的前端开发技术的探讨中,htmx经常成为热议的话题。一些人批评它,认为尽管htmx批评现代前端框架过于复杂,但它自己却似乎也是一个复杂的框架。这种看法值得我们深入思考。...【详细内容】
2024-03-28  Search: 框架  点击:(16)  评论:(0)  加入收藏
五大跨平台桌面应用开发框架:Electron、Tauri、Flutter等
一、什么是跨平台桌面应用开发框架跨平台桌面应用开发框架是一种工具或框架,它允许开发者使用一种统一的代码库或语言来创建能够在多个操作系统上运行的桌面应用程序。传统上...【详细内容】
2024-02-26  Search: 框架  点击:(47)  评论:(0)  加入收藏
Spring Security权限控制框架使用指南
在常用的后台管理系统中,通常都会有访问权限控制的需求,用于限制不同人员对于接口的访问能力,如果用户不具备指定的权限,则不能访问某些接口。本文将用 waynboot-mall 项目举例...【详细内容】
2024-02-19  Search: 框架  点击:(39)  评论:(0)  加入收藏
Go Gin框架实现优雅地重启和停止
在Web应用程序中,有时候我们需要重启或停止服务器,无论是因为更新代码还是进行例行维护。在这种情景下,我们需要保证应用程序的可用性和数据的一致性。这就需要优雅地关闭和重...【详细内容】
2024-01-30  Search: 框架  点击:(67)  评论:(0)  加入收藏
链世界:一种简单而有效的人类行为Agent模型强化学习框架
强化学习是一种机器学习的方法,它通过让智能体(Agent)与环境交互,从而学习如何选择最优的行动来最大化累积的奖励。强化学习在许多领域都有广泛的应用,例如游戏、机器人、自动驾...【详细内容】
2024-01-30  Search: 框架  点击:(68)  评论:(0)  加入收藏
OpenHarmony - 基于ArkUI框架实现日历应用
前言对于刚刚接触OpenHarmony应用开发的开发者,最快的入门方式就是开发一个简单的应用,下面记录了一个日历应用的开发过程,通过日历应用的开发,来熟悉基本图形的绘制,ArkUI的组件...【详细内容】
2024-01-16  Search: 框架  点击:(54)  评论:(0)  加入收藏
阿里“AI替换万物”框架火爆社区,网友:偶像不需要真人了?
白交 发自 凹非寺量子位 | 公众号 QbitAIReplace Anything as you want。现在只需框住你需要保留的区域,AI就可以替换万物了!比如让霉霉穿上中国旗袍,结果发饰、服装、背景等各...【详细内容】
2024-01-15  Search: 框架  点击:(66)  评论:(0)  加入收藏
分布式事务框架选择与实践
分布式事务是处理跨多个服务的原子操作的关键概念,而选择适合应用场景的框架对于确保事务一致性至关重要。以下是几个常见的分布式事务框架,并讨论它们的使用和实践。1. XA协...【详细内容】
2024-01-05  Search: 框架  点击:(96)  评论:(0)  加入收藏
JavaScript前端框架2024年展望
Angular、Next.js、React和Solid的维护者和创作者们展望2024年,分享了他们计划中的改进。译自2024 Predictions by JavaScript Frontend Framework Maintainers,作者 Loraine...【详细内容】
2024-01-05  Search: 框架  点击:(91)  评论:(0)  加入收藏
▌简易百科推荐
Web Components实践:如何搭建一个框架无关的AI组件库
一、让人又爱又恨的Web ComponentsWeb Components是一种用于构建可重用的Web元素的技术。它允许开发者创建自定义的HTML元素,这些元素可以在不同的Web应用程序中重复使用,并且...【详细内容】
2024-04-03  京东云开发者    Tags:Web Components   点击:(8)  评论:(0)  加入收藏
Kubernetes 集群 CPU 使用率只有 13% :这下大家该知道如何省钱了
作者 | THE STACK译者 | 刘雅梦策划 | Tina根据 CAST AI 对 4000 个 Kubernetes 集群的分析,Kubernetes 集群通常只使用 13% 的 CPU 和平均 20% 的内存,这表明存在严重的过度...【详细内容】
2024-03-08  InfoQ    Tags:Kubernetes   点击:(12)  评论:(0)  加入收藏
Spring Security:保障应用安全的利器
SpringSecurity作为一个功能强大的安全框架,为Java应用程序提供了全面的安全保障,包括认证、授权、防护和集成等方面。本文将介绍SpringSecurity在这些方面的特性和优势,以及它...【详细内容】
2024-02-27  风舞凋零叶    Tags:Spring Security   点击:(54)  评论:(0)  加入收藏
五大跨平台桌面应用开发框架:Electron、Tauri、Flutter等
一、什么是跨平台桌面应用开发框架跨平台桌面应用开发框架是一种工具或框架,它允许开发者使用一种统一的代码库或语言来创建能够在多个操作系统上运行的桌面应用程序。传统上...【详细内容】
2024-02-26  贝格前端工场    Tags:框架   点击:(47)  评论:(0)  加入收藏
Spring Security权限控制框架使用指南
在常用的后台管理系统中,通常都会有访问权限控制的需求,用于限制不同人员对于接口的访问能力,如果用户不具备指定的权限,则不能访问某些接口。本文将用 waynboot-mall 项目举例...【详细内容】
2024-02-19  程序员wayn  微信公众号  Tags:Spring   点击:(39)  评论:(0)  加入收藏
开发者的Kubernetes懒人指南
你可以将本文作为开发者快速了解 Kubernetes 的指南。从基础知识到更高级的主题,如 Helm Chart,以及所有这些如何影响你作为开发者。译自Kubernetes for Lazy Developers。作...【详细内容】
2024-02-01  云云众生s  微信公众号  Tags:Kubernetes   点击:(50)  评论:(0)  加入收藏
链世界:一种简单而有效的人类行为Agent模型强化学习框架
强化学习是一种机器学习的方法,它通过让智能体(Agent)与环境交互,从而学习如何选择最优的行动来最大化累积的奖励。强化学习在许多领域都有广泛的应用,例如游戏、机器人、自动驾...【详细内容】
2024-01-30  大噬元兽  微信公众号  Tags:框架   点击:(68)  评论:(0)  加入收藏
Spring实现Kafka重试Topic,真的太香了
概述Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重...【详细内容】
2024-01-26  HELLO程序员  微信公众号  Tags:Spring   点击:(86)  评论:(0)  加入收藏
SpringBoot如何实现缓存预热?
缓存预热是指在 Spring Boot 项目启动时,预先将数据加载到缓存系统(如 Redis)中的一种机制。那么问题来了,在 Spring Boot 项目启动之后,在什么时候?在哪里可以将数据加载到缓存系...【详细内容】
2024-01-19   Java中文社群  微信公众号  Tags:SpringBoot   点击:(86)  评论:(0)  加入收藏
花 15 分钟把 Express.js 搞明白,全栈没有那么难
Express 是老牌的 Node.js 框架,以简单和轻量著称,几行代码就可以启动一个 HTTP 服务器。市面上主流的 Node.js 框架,如 Egg.js、Nest.js 等都与 Express 息息相关。Express 框...【详细内容】
2024-01-16  程序员成功  微信公众号  Tags:Express.js   点击:(88)  评论:(0)  加入收藏
站内最新
站内热门
站内头条