您当前的位置:首页 > 电脑百科 > 数据库 > 百科

半小时,将你的Spark SQL模型变为在线服务

时间:2020-07-26 11:27:44  来源:  作者:

SparkSQL在机器学习场景中应用

第四范式已经在很多行业落地了上万个AI应用,比如在金融行业的反欺诈,媒体行业的新闻推荐,能源行业管道检测,而SparkSQL在这些AI应用中快速实现特征变换发挥着重要的作用

半小时,将你的Spark SQL模型变为在线服务

 

SparkSQL在特征变换主要有一下几类

  1. 多表场景,用于表之间拼接操作,比如交易信息表去拼接账户表
  2. 使用udf进行简单的特征变换,比如对时间戳进行hour函数处理
  3. 使用时间窗口和udaf进行时序类特征处理,比如计算一个人最近1天的消费金额总和

SparkSQL到目前为止,解决很好的解决离线模型训练特征变换问题,但是随着AI应用的发展,大家对模型的期望不再只是得出离线调研效果,而是在真实的业务场景发挥出价值,而真实的业务场景是模型应用场景,它需要高性能,需要实时推理,这时候我们就会遇到以下问题

  1. 多表数据离线到在线怎么映射,即批量训练过程中输入很多表,到在线环境这些表该以什么形式存在,这点也会影响整个系统架构,做得好能够提升效率,做得不好就会大大增加模型产生业务价值的成本
  2. SQL转换成实时执行成本高,因为在线推理需要高性能,而数据科学家可能做出成千上万个特征,每个特征都人肉转换,会大大增加的工程成本
  3. 离线特征和在线特征保持一致困难,手动转换就会导致一致性能,而且往往很难一致
  4. 离线效果很棒但是在线效果无法满足业务需求

在具体的反欺诈场景,模型应用要求tp99 20ms去检测一笔交易是否是欺诈,所以对模型应用性能要求非常高

第四范式特征工程数据库是如何解决这些问题

半小时,将你的Spark SQL模型变为在线服务

 

通过特征工程数据库让SparkSQL的能力得到了补充

  1. 以数据库的形式,解决了离线表到在线的映射问题,我们对前面给出的答案就是离线表是怎么分布的,在线也就怎么分布
  2. 通过同一套代码去执行离线和在线特征转换,让在线模型效果得到了保证
  3. 数据科学家与业务开发团队的合作以sql为传递介质,而不再是手工去转换代码,大大提升模型迭代效率
  4. 通过llvm加速的sql,相比scala实现的spark2.x和3.x在时序复杂特征场景能够加速2~3倍,在线通过in-memory的存储,能够保证sql能够在非常低延迟返回结果

快速将spark sql 模型变成实时服务demo

demo的模型训练场景为预测一次打车行程到结束所需要的时间,这里我们将使用fedb ,pyspark,lightgbm等工具最终搭建一个http 模型推理服务,这也会是spark在机器学习场景的实践

半小时,将你的Spark SQL模型变为在线服务

 

整个demo200多行代码,制作时间不超过半个小时

  1. train_sql.py 特征计算与训练, 80行代码
  2. predict_server.py 模型推理http服务, 129行代码

场景数据和特征介绍

整个训练数据如下样子

样例数据

id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration

id3097625,1,2016-01-22 16:01:00,2016-01-22 16:15:16,2,-73.97746276855469,40.7613525390625,-73.95573425292969,40.772396087646484,N,856

id3196697,1,2016-01-28 07:20:18,2016-01-28 07:40:16,1,-73.98524475097656,40.75959777832031,-73.99615478515625,40.72945785522461,N,1198

id0224515,2,2016-01-31 00:48:27,2016-01-31 00:53:30,1,-73.98342895507812,40.7500114440918,-73.97383880615234,40.74980163574219,N,303

id3370903,1,2016-01-14 11:46:43,2016-01-14 12:25:33,2,-74.00027465820312,40.74786376953125,-73.86485290527344,40.77039337158203,N,2330

id2763851,2,2016-02-20 13:21:00,2016-02-20 13:45:56,1,-73.95218658447266,40.772220611572266,-73.9920425415039,40.74932098388672,N,1496

id0904926,1,2016-02-20 19:17:44,2016-02-20 19:33:19,4,-73.97344207763672,40.75189971923828,-73.98480224609375,40.76243209838867,N,935

id2026293,1,2016-02-25 01:16:23,2016-02-25 01:31:27,1,-73.9871597290039,40.68777847290039,-73.9115219116211,40.68180847167969,N,904

id1349988,1,2016-01-28 20:16:05,2016-01-28 20:21:36,1,-74.0028076171875,40.7338752746582,-73.9968032836914,40.743770599365234,N,331

id3218692,2,2016-02-17 16:43:27,2016-02-17 16:54:41,5,-73.98147583007812,40.77408218383789,-73.97216796875,40.76400375366211,N,674 `

场景特征变换sql脚本

特征变换

select trip_duration, passenger_count,

sum `(pickup_latitude) over w as vendor_sum_pl,`

max `(pickup_latitude) over w as vendor_max_pl,`

min `(pickup_latitude) over w as vendor_min_pl,`

avg `(pickup_latitude) over w as vendor_avg_pl,`

sum `(pickup_latitude) over w2 as pc_sum_pl,`

max `(pickup_latitude) over w2 as pc_max_pl,`

min `(pickup_latitude) over w2 as pc_min_pl,`

avg `(pickup_latitude) over w2 as pc_avg_pl ,`

count `(vendor_id) over w2 as pc_cnt,`

count `(vendor_id) over w as vendor_cnt`

from {}

window w as (partition by vendor_id order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW),

w2 as (partition by passenger_count order by pickup_datetime ROWS_RANGE BETWEEN 1d PRECEDING AND CURRENT ROW) `

我们选择了vendor_id 和 passenger_count 两个纬度做时序特征

train_df = spark.sql(train_sql)

# specify your configurations as a dict

params = {

'boosting_type' `: 'gbdt' ,

'objective' `: 'regression' ,

'metric' `: { 'l2' , 'l1' },

'num_leaves' `: 31 ,

'learning_rate' `: 0.05 ,

'feature_fraction' `: 0.9 ,

'bagging_fraction' `: 0.8 ,

'bagging_freq' `: 5 ,

'verbose' `: 0`

}

print `( 'Starting training...' )`

gbm = lgb.train(params,

lgb_train,

num_boost_round `= 20 ,`

valid_sets `= lgb_eval,

early_stopping_rounds `= 5 )`

gbm.save_model( `'model.txt' )执行模型训练过程,最终产生model.txt

模型推理过程

导入数据代码

import

def insert_row(line):

row = line.split( `',' )

row[ `2 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 2 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )`

row[ `3 ]` `=` `'%dl' % int (datetime.datetime.strptime(row[ 3 ], '%Y-%m-%d %H:%M:%S' ).timestamp()` `*` `1000 )`

insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);" `% tuple (row)

driver.executeInsert( `'db_test' , insert)

with open `( 'data/taxi_tour_table_train_simple.csv' , 'r' ) as fd:

idx = 0

for line in fd:

if idx = `= 0 :

idx = idx + 1

continue

insert_row(line.replace( `'n' , ''))

idx = idx + 1 `

注:train.csv为训练数据csv格式版本

模型推理逻辑

predict.py

def` `post( self ):

row = json.loads( `self .request.body)

ok, req = fedb_driver.getRequestBuilder( `'db_test' , sql)

if not ok or not req:

self `.write( "fail to get req" )`

return

input_schema = req.GetSchema()

if not input_schema:

self `.write( "no schema found" )`

return

str_length = 0

for i in range `(input_schema.GetColumnCnt()):`

if sql_router_sdk.DataTypeName(input_schema.GetColumnType(i)) = `= 'string' :

str_length = str_length + len `(row.get(input_schema.GetColumnName(i), ''))`

req.Init(str_length)

for i in range `(input_schema.GetColumnCnt()):`

tname = sql_router_sdk.DataTypeName(input_schema.GetColumnType(i))

if tname = `= 'string' :

req.AppendString(row.get(input_schema.GetColumnName(i), ''))

elif tname = `= 'int32' :

req.AppendInt32( `int (row.get(input_schema.GetColumnName(i),` `0 )))`

elif tname = `= 'double' :

req.AppendDouble( `float (row.get(input_schema.GetColumnName(i),` `0 )))`

elif tname = `= 'timestamp' :

req.AppendTimestamp( `int (row.get(input_schema.GetColumnName(i),` `0 )))`

else `:`

req.AppendNULL()

if not req.Build():

self `.write( "fail to build request" )`

return

ok, rs = fedb_driver.executeQuery( `'db_test' , sql, req)

if not ok:

self `.write( "fail to execute sql" )`

return

rs. `Next ()

ins = build_feature(rs)

self `.write( "----------------ins---------------n" )`

self `.write( str (ins) + "n" )

duration = bst.predict(ins)

self `.write( "---------------predict trip_duration -------------n" )`

self `.write( "%s s" % str (duration[ 0 ]))``

最终执行效果

# 发送推理请求 ,会看到如下输出

Python3 predict.py

----------------ins---------------

[[ 2. 40.774097 40.774097 40.774097 40.774097 40.774097 40.774097

40.774097 40.774097 1. 1. ]]

---------------predict trip_duration -------------

859.3298781277192 s `

运行demo请到 https://github.com/4paradigm/SparkSQLWithFeDB



Tags:Spark SQL   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
SparkSQL在机器学习场景中应用第四范式已经在很多行业落地了上万个AI应用,比如在金融行业的反欺诈,媒体行业的新闻推荐,能源行业管道检测,而SparkSQL在这些AI应用中快速实现特征...【详细内容】
2020-07-26  Tags: Spark SQL  点击:(59)  评论:(0)  加入收藏
▌简易百科推荐
1增1.1【插入单行】insert [into] <表名> (列名) values (列值)例:insert into Strdents (姓名,性别,出生日期) values (&#39;开心朋朋&#39;,&#39;男&#39;,&#39;1980/6/15&#3...【详细内容】
2021-12-27  快乐火车9d3    Tags:SQL   点击:(1)  评论:(0)  加入收藏
最近发现还有不少做开发的小伙伴,在写存储过程的时候,在参考已有的不同的写法时,往往很迷茫, 不知道各种写法孰优孰劣,该选用哪种写法,以及各种写法的优缺点,本文以一个简单的查询...【详细内容】
2021-12-23  linux上的码农    Tags:sql   点击:(9)  评论:(0)  加入收藏
《开源精选》是我们分享Github、Gitee等开源社区中优质项目的栏目,包括技术、学习、实用与各种有趣的内容。本期推荐的HasorDB 是一个全功能数据库访问工具,提供对象映射、丰...【详细内容】
2021-12-22  GitHub精选    Tags:HasorDB   点击:(5)  评论:(0)  加入收藏
作者丨Rafal Grzegorczyk译者丨陈骏策划丨孙淑娟【51CTO.com原创稿件】您是否还在手动对数据库执行各种脚本?您是否还在浪费时间去验证数据库脚本的正确性?您是否还需要将...【详细内容】
2021-12-22    51CTO  Tags:Liquibase   点击:(3)  评论:(0)  加入收藏
场景描述:由于生产环境的表比较复杂,字段很多。这里我们做下简化,只为说明今天要聊的问题。有两张表 tab1,tab2: tab1 数据如下: tab2 数据如下: 然后给你看下,我用来统计 name=&#3...【详细内容】
2021-12-20  Bald    Tags:SQL   点击:(5)  评论:(0)  加入收藏
前言知识无底,学海无涯,知识点虽然简单,但是比较多,所以将MySQL的基础写出来,方便自己以后查找,还有就是分享给大家。一、SQL简述1.SQL的概述Structure Query Language(结构化查...【详细内容】
2021-12-16  谣言止于独立思考    Tags:SQL基础   点击:(13)  评论:(0)  加入收藏
前言作为一名测试工程师,工作中在对测试结果进行数据比对的时候,或多或少要和数据库打交道的,要和数据库打交道,那么一些常用的 SQL 查询语法必须要掌握。最近有部分做测试小伙...【详细内容】
2021-12-14  柠檬班软件测试    Tags:SQL   点击:(15)  评论:(0)  加入收藏
话说C是面向内存的编程语言。数据要能存得进去,取得出来,且要考虑效率。不管是顺序存储还是链式存储,其寻址方式总是很重要。顺序存储是连续存储。同质结构的数组通过其索引表...【详细内容】
2021-12-08  小智雅汇    Tags:数据存储   点击:(17)  评论:(0)  加入收藏
概述DBConvert Studio 是一款强大的跨数据库迁移和同步软件,可在不同数据库格式之间转换数据库结构和数据。它将成熟、稳定、久经考验的 DBConvert 和 DBSync 核心与改进的现...【详细内容】
2021-11-17  雪竹聊运维    Tags:数据库   点击:(26)  评论:(0)  加入收藏
一、前言 大家好,我是小诚,《从0到1-全面深刻理解MySQL系列》已经来到第四章,这一章节的主要从一条SQL执行的开始,由浅入深的解析SQL语句由客户端到服务器的完整执行流程,最...【详细内容】
2021-11-09  woaker    Tags:SQL   点击:(35)  评论:(0)  加入收藏
相关文章
    无相关信息
最新更新
栏目热门
栏目头条