您当前的位置:首页 > 电脑百科 > 程序开发 > 框架

并行计算框架Polars、Dask的数据处理性能对比

时间:2023-07-10 16:24:57  来源:  作者:Luís Oliveira

在Pandas 2.0发布以后,我们发布过一些评测的文章,这次我们看看,除了Pandas以外,常用的两个都是为了大数据处理的并行数据框架的对比测试。

本文我们使用两个类似的脚本来执行提取、转换和加载(ETL)过程。

测试内容

这两个脚本主要功能包括:

从两个parquet 文件中提取数据,对于小型数据集,变量path1将为“yellow_tripdata/ yellow_tripdata_2014-01”,对于中等大小的数据集,变量path1将是“yellow_tripdata/yellow_tripdata”。对于大数据集,变量path1将是“yellow_tripdata/yellow_tripdata*.parquet”;

进行数据转换:a)连接两个DF,b)根据PULocationID计算行程距离的平均值,c)只选择某些条件的行,d)将步骤b的值四舍五入为2位小数,e)将列“trip_distance”重命名为“mean_trip_distance”,f)对列“mean_trip_distance”进行排序。

将最终的结果保存到新的文件。

脚本

1、Polars

数据加载读取

def extraction():
    """
    Extract two datasets from parquet files
    """
    path1="yellow_tripdata/yellow_tripdata_2014-01.parquet"
    df_trips= pl_read_parquet(path1,)
    path2 = "taxi+_zone_lookup.parquet"
    df_zone = pl_read_parquet(path2,)
 
    return df_trips, df_zone
 
 def pl_read_parquet(path, ):
    """
    Converting parquet file into Polars dataframe
    """
    df= pl.scan_parquet(path,)
    return df

转换函数

def transformation(df_trips, df_zone):
    """
    Proceed to several transformations
    """
    df_trips= mean_test_speed_pl(df_trips, )
     
    df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",)
    df = df.select(["Borough","Zone","trip_distance",])
   
    df = get_Queens_test_speed_pd(df)
    df = round_column(df, "trip_distance",2)
    df = rename_column(df, "trip_distance","mean_trip_distance")
 
    df = sort_by_columns_desc(df, "mean_trip_distance")
    return df
 
 
 def mean_test_speed_pl(df_pl,):
    """
    Getting Mean per PULocationID
    """
    df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
    return df_pl
 
 def get_Queens_test_speed_pd(df_pl):
    """
    Only getting Borough in Queens
    """
 
    df_pl = df_pl.filter(pl.col("Borough")=='Queens')
 
    return df_pl
 
 def round_column(df, column,to_round):
    """
    Round numbers on columns
    """
    df = df.with_columns(pl.col(column).round(to_round))
    return df
 
 def rename_column(df, column_old, column_new):
    """
    Renaming columns
    """
    df = df.rename({column_old: column_new})
    return df
 
 def sort_by_columns_desc(df, column):
    """
    Sort by column
    """
    df = df.sort(column, descending=True)
    return df

保存

def loading_into_parquet(df_pl):
    """
    Save dataframe in parquet
    """
    df_pl.collect(streaming=True).write_parquet(f'yellow_tripdata_pl.parquet')

其他代码

import polars as pl
 import time
 
 def pl_read_parquet(path, ):
    """
    Converting parquet file into Polars dataframe
    """
    df= pl.scan_parquet(path,)
    return df
 
 def mean_test_speed_pl(df_pl,):
    """
    Getting Mean per PULocationID
    """
    df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
    return df_pl
 
 def get_Queens_test_speed_pd(df_pl):
    """
    Only getting Borough in Queens
    """
 
    df_pl = df_pl.filter(pl.col("Borough")=='Queens')
 
    return df_pl
 
 def round_column(df, column,to_round):
    """
    Round numbers on columns
    """
    df = df.with_columns(pl.col(column).round(to_round))
    return df
 
 def rename_column(df, column_old, column_new):
    """
    Renaming columns
    """
    df = df.rename({column_old: column_new})
    return df
 
 
 def sort_by_columns_desc(df, column):
    """
    Sort by column
    """
    df = df.sort(column, descending=True)
    return df
 
 
 def mAIn():
     
    print(f'Starting ETL for Polars')
    start_time = time.perf_counter()
 
    print('Extracting...')
    df_trips, df_zone =extraction()
        
    end_extract=time.perf_counter() 
    time_extract =end_extract- start_time
 
    print(f'Extraction Parquet end in {round(time_extract,5)} seconds')
    print('Transforming...')
    df = transformation(df_trips, df_zone)
    end_transform = time.perf_counter() 
    time_transformation =time.perf_counter() - end_extract
    print(f'Transformation end in {round(time_transformation,5)} seconds')
    print('Loading...')
    loading_into_parquet(df,)
    load_transformation =time.perf_counter() - end_transform
    print(f'Loading end in {round(load_transformation,5)} seconds')
    print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}")
 
 
 if __name__ == "__main__":
     
    main()

2、Dask

函数功能与上面一样,所以我们把代码整合在一起:

import dask.dataframe as dd
 from dask.distributed import Client
 import time
 
 def extraction():
    path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet"
    df_trips = dd.read_parquet(path1)
    path2 = "taxi+_zone_lookup.parquet"
    df_zone = dd.read_parquet(path2)
 
    return df_trips, df_zone
 
 def transformation(df_trips, df_zone):
    df_trips = mean_test_speed_dask(df_trips)
    df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID")
    df = df[["Borough", "Zone", "trip_distance"]]
 
    df = get_Queens_test_speed_dask(df)
    df = round_column(df, "trip_distance", 2)
    df = rename_column(df, "trip_distance", "mean_trip_distance")
 
    df = sort_by_columns_desc(df, "mean_trip_distance")
    return df
 
 def loading_into_parquet(df_dask):
    df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet")
 
 def mean_test_speed_dask(df_dask):
    df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"})
    return df_dask
 
 def get_Queens_test_speed_dask(df_dask):
    df_dask = df_dask[df_dask["Borough"] == "Queens"]
    return df_dask
 
 def round_column(df, column, to_round):
    df[column] = df[column].round(to_round)
    return df
 
 def rename_column(df, column_old, column_new):
    df = df.rename(columns={column_old: column_new})
    return df
 
 def sort_by_columns_desc(df, column):
    df = df.sort_values(column, ascending=False)
    return df
 
 
 
 def main():
    print("Starting ETL for Dask")
    start_time = time.perf_counter()
 
    client = Client() # Start Dask Client
 
    df_trips, df_zone = extraction()
 
    end_extract = time.perf_counter()
    time_extract = end_extract - start_time
 
    print(f"Extraction Parquet end in {round(time_extract, 5)} seconds")
    print("Transforming...")
    df = transformation(df_trips, df_zone)
    end_transform = time.perf_counter()
    time_transformation = time.perf_counter() - end_extract
    print(f"Transformation end in {round(time_transformation, 5)} seconds")
    print("Loading...")
    loading_into_parquet(df)
    load_transformation = time.perf_counter() - end_transform
    print(f"Loading end in {round(load_transformation, 5)} seconds")
    print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}")
 
    client.close() # Close Dask Client
 
 if __name__ == "__main__":
    main()

测试结果对比

1、小数据集

我们使用164 Mb的数据集,这样大小的数据集对我们来说比较小,在日常中也时非常常见的。

下面是每个库运行五次的结果:

Polars

Dask

2、中等数据集

我们使用1.1 Gb的数据集,这种类型的数据集是GB级别,虽然可以完整的加载到内存中,但是数据体量要比小数据集大很多。

Polars

Dask

3、大数据集

我们使用一个8gb的数据集,这样大的数据集可能一次性加载不到内存中,需要框架的处理。

Polars

Dask

总结

从结果中可以看出,Polars和Dask都可以使用惰性求值。所以读取和转换非常快,执行它们的时间几乎不随数据集大小而变化;

可以看到这两个库都非常擅长处理中等规模的数据集。

由于polar和Dask都是使用惰性运行的,所以下面展示了完整ETL的结果(平均运行5次)。

Polars在小型数据集和中型数据集的测试中都取得了胜利。但是,Dask在大型数据集上的平均时间性能为26秒。

这可能和Dask的并行计算优化有关,因为官方的文档说“Dask任务的运行速度比Spark ETL查询快三倍,并且使用更少的CPU资源”。

上面是测试使用的电脑配置,Dask在计算时占用的CPU更多,可以说并行性能更好。



Tags:框架   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除。
▌相关推荐
在Pandas 2.0发布以后,我们发布过一些评测的文章,这次我们看看,除了Pandas以外,常用的两个都是为了大数据处理的并行数据框架的对比测试。本文我们使用两个类似的脚本来执行提取...【详细内容】
2023-07-10  Tags: 框架  点击:(0)  评论:(0)  加入收藏
前言OpenAuth.Net 是基于.NET Core/.NET 5的开源权限工作流快速开发框架。框架汲取Martin Fowler企业级应用开发思想及全新技术组合(IdentityServer、EF core、Quartz、AutoF...【详细内容】
2023-07-03  Tags: 框架  点击:(11)  评论:(0)  加入收藏
译者 | 布加迪审校 | 重楼Stack Overflow在2023年5月进行了年度开发者调查,以了解开发者在使用什么工具、他们如何学习这些工具以及他们的看法。超过9万名开发者参与了这次调...【详细内容】
2023-06-28  Tags: 框架  点击:(21)  评论:(0)  加入收藏
前言Spring 是 Java EE 编程领域的一款轻量级的开源框架,由被称为“Spring 之父”的 Rod Johnson 于 2002 年提出并创立,它的目标就是要简化 Java 企业级应用程序的开发难度...【详细内容】
2023-06-21  Tags: 框架  点击:(19)  评论:(0)  加入收藏
React类组件是通过创建 class 继承 React.Component创建的。类组件中通过render函数返回react元素。react组件的三大核心属性是建立在react的类组件基础上的,即:state、props...【详细内容】
2023-06-19  Tags: 框架  点击:(24)  评论:(0)  加入收藏
答:1.DjangoDjango是Python世界中最出名、最成熟的Web框架。Django功能全面,各模块之间结合紧密,(不讲其他的)Django赢在「 全面」。Django提供了丰富、完善的「 文档」,帮助开发者...【详细内容】
2023-06-14  Tags: 框架  点击:(38)  评论:(0)  加入收藏
React是用来帮助你加快编码速度的的一个库,React UI能够进一步提速,如果你基于Google的设计原则进行设计可以使用MUI;如果开发面向移动设备的UI设计可以使用Onsen UI React;如果...【详细内容】
2023-06-13  Tags: 框架  点击:(41)  评论:(0)  加入收藏
新智元报道编辑:LRS【新智元导读】AI时代的软件开发流程是什么样的?任何一个大型软件都不是一开始就构思完善的,而是通过开发人员的一次次改进、编辑、单元测试、修复构建错误...【详细内容】
2023-06-07  Tags: 框架  点击:(51)  评论:(0)  加入收藏
话不多说,直接开始!1.什么是反应式编程反应式编程是一种编程思想与方式,是为了简化并发编程而出现的。与传统的处理方式相比,反应式编程能够基于数据流中的事件进行反应处理。例...【详细内容】
2023-06-06  Tags: 框架  点击:(48)  评论:(0)  加入收藏
最近公司的一些项目需要跨端框架,技术老大选了Taro,实践了一段时间下来,愈发觉得Taro是个好东西,所以在本篇文章中稍微介绍下。什么是Taro?Taro(或称为Taro框架)是一种用于构建跨平...【详细内容】
2023-06-05  Tags: 框架  点击:(43)  评论:(0)  加入收藏
▌简易百科推荐
在Pandas 2.0发布以后,我们发布过一些评测的文章,这次我们看看,除了Pandas以外,常用的两个都是为了大数据处理的并行数据框架的对比测试。本文我们使用两个类似的脚本来执行提取...【详细内容】
2023-07-10  Luís Oliveira    Tags:框架   点击:(0)  评论:(0)  加入收藏
我们知道SpringBoot Starter也就是启动器。是SpringBoot组件化的一大优点。基于这个思想,基于这个思想SpringBoot 才变得非常强大,官方给我们提供很多开箱即用的启动器。Sprin...【详细内容】
2023-07-07  架构师之道  今日头条  Tags:SpringBoot   点击:(12)  评论:(0)  加入收藏
一、概述SpringBoot FatJar 的设计,打破了标准 jar 的结构,在 jar 包内携带了其所依赖的 jar 包,通过在标准 jar 包中指定的 Main-Class 的 main 方法启动后,创建自己的类加载器...【详细内容】
2023-07-07  架构染色  微信公众号  Tags:SpringBoot   点击:(13)  评论:(0)  加入收藏
一、是什么?查看接口注释:根据给定的注释元数据,根据需要注册bean定义......spring会遍历所有的beanDefinition,逐个创建对应的bean。public interface ImportBeanDefinitionReg...【详细内容】
2023-07-05  言沫东  今日头条  Tags:spring   点击:(11)  评论:(0)  加入收藏
Spring Boot的启动原理是通过SpringApplication类来实现的。具体流程如下:创建一个应用程序的SpringApplication实例。分析应用程序的上下文环境(包括Java系统属性、环境变量...【详细内容】
2023-06-30    java知路  Tags:Springboot   点击:(14)  评论:(0)  加入收藏
前言Shiro是一个功能强大且易于使用的Java安全框架,提供全面的身份验证、授权、密码管理和会话管理功能。它支持多种认证方式,如基于表单、HTTP基本身份验证和RememberMe。授...【详细内容】
2023-06-29  SnaiL  FreeBuf.COM  Tags:Shiro   点击:(21)  评论:(0)  加入收藏
译者 | 布加迪审校 | 重楼Stack Overflow在2023年5月进行了年度开发者调查,以了解开发者在使用什么工具、他们如何学习这些工具以及他们的看法。超过9万名开发者参与了这次调...【详细内容】
2023-06-28    51CTO  Tags:框架   点击:(21)  评论:(0)  加入收藏
作为一个 Java 开发 ,Spring 是我用的最多的框架。Spring 这个名字起的很好,一语双关,在英文里 spring 是春天的意思,寓意程序员的春天,同时 spring 还有弹簧的意思,它确实是一个...【详细内容】
2023-06-28  台近秋  今日头条  Tags:Spring   点击:(7)  评论:(0)  加入收藏
从配置文件中获取属性应该是SpringBoot 开发中最为常用的功能之一,但就是这么常用的功能,仍然有很多开发者在这个方面踩坑。我整理了几种获取配置属性的方式,目的不仅是要让大...【详细内容】
2023-06-27    OSC开源社区  Tags:Springboot   点击:(27)  评论:(0)  加入收藏
前言Spring 是 Java EE 编程领域的一款轻量级的开源框架,由被称为“Spring 之父”的 Rod Johnson 于 2002 年提出并创立,它的目标就是要简化 Java 企业级应用程序的开发难度...【详细内容】
2023-06-21    Java码农之路  Tags:Spring   点击:(19)  评论:(0)  加入收藏
站内最新
站内热门
站内头条