小知识:Apache Hudi性能提升三倍的查询优化

hudi 0.10.0版本开始,我们很高兴推出在数据库领域中称为 z-order 和 hilbert 空间填充曲线的高级数据布局优化技术的支持。

1. 背景

amazon emr 团队最近发表了一篇很不错的文章展示了对数据进行聚簇是如何提高查询性能的,为了更好地了解发生了什么以及它与空间填充曲线的关系,让我们仔细研究该文章的设置。

文章中比较了 2 个 apache hudi 表(均来自 amazon reviews 数据集):

未聚簇的 amazon_reviews 表(即数据尚未按任何特定键重新排序)

amazon_reviews_clustered 聚簇表。当数据被聚簇后,数据按字典顺序排列(这里我们将这种排序称为线性排序),排序列为star_rating、total_votes两列(见下图)

%小知识:Apache Hudi性能提升三倍的查询优化-猿站网-插图

为了展示查询性能的改进,对这两个表执行以下查询:

%小知识:Apache Hudi性能提升三倍的查询优化-1猿站网-插图

%小知识:Apache Hudi性能提升三倍的查询优化-2猿站网-插图

这里要指出的重要考虑因素是查询指定了排序的两个列(star_rating 和 total_votes)。但不幸的是这是线性/词典排序的一个关键限制,如果添加更多列,排序的价值会会随之减少。

%小知识:Apache Hudi性能提升三倍的查询优化-3猿站网-插图

从上图可以看到,对于按字典顺序排列的 3 元组整数,只有第一列能够对所有具有相同值的记录具有关键的局部性属性:例如所有记录都具有以“开头的值” 1″、”2″、”3″(在第一列中)很好地聚簇在一起。但是如果尝试在第三列中查找所有值为”5″的值,会发现这些值现在分散在所有地方,根本没有局部性,过滤效果很差。

提高查询性能的关键因素是局部性:它使查询能够显着减少搜索空间和需要扫描、解析等的文件数量。

但是这是否意味着如果我们按表排序的列的第一个(或更准确地说是前缀)以外的任何内容进行过滤,我们的查询就注定要进行全面扫描?不完全是,局部性也是空间填充曲线在枚举多维空间时启用的属性(我们表中的记录可以表示为 n 维空间中的点,其中 n 是我们表中的列数)

那么它是如何工作的?我们以 z 曲线为例:拟合二维平面的 z 阶曲线如下所示:

%小知识:Apache Hudi性能提升三倍的查询优化-4猿站网-插图

可以看到按照路径,不是简单地先按一个坐标 (“x”) 排序,然后再按另一个坐标排序,它实际上是在对它们进行排序,就好像这些坐标的位已交织成单个值一样:

%小知识:Apache Hudi性能提升三倍的查询优化-5猿站网-插图

在线性排序的情况下局部性仅使用第一列相比,该方法的局部性使用到所有列。

以类似的方式,希尔伯特曲线允许将 n 维空间中的点(我们表中的行)映射到一维曲线上,基本上对它们进行排序,同时仍然保留局部性的关键属性,在此处阅读有关希尔伯特曲线的更多详细信息,到目前为止我们的实验表明,使用希尔伯特曲线对数据进行排序会有更好的聚簇和性能结果。

现在让我们来看看它的实际效果!

2. 设置

我们将再次使用 amazon reviews 数据集,但这次我们将使用 hudi 按 product_id、customer_id 列元组进行 z-order排序,而不是聚簇或线性排序。

数据集不需要特别的准备,可以直接从 s3 中以 parquet 格式下载并将其直接用作 spark 将其摄取到 hudi 表。

启动spark-shell

?
1
2
3
4
./bin/spark-shell –master local[4] –driver-memory 8g –executor-memory 8g \
–jars ../../packaging/hudi-spark-bundle/target/hudi-spark3-bundle_2.12-0.10.0.jar \
–packages org.apache.spark:spark-avro_2.12:2.4.4 \
–conf spark.serializer=org.apache.spark.serializer.kryoserializer

导入hudi表

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.hadoop.fs.{filestatus, path}
import scala.collection.javaconversions._
import org.apache.spark.sql.savemode._
import org.apache.hudi.{datasourcereadoptions, datasourcewriteoptions}
import org.apache.hudi.datasourcewriteoptions._
import org.apache.hudi.common.fs.fsutils
import org.apache.hudi.common.table.hoodietablemetaclient
import org.apache.hudi.common.util.clusteringutils
import org.apache.hudi.config.hoodieclusteringconfig
import org.apache.hudi.config.hoodiewriteconfig._
import org.apache.spark.sql.dataframe
import java.util.stream.collectors
val layoutoptstrategy = “z-order”; // or “hilbert”
val inputpath = s”file:///${system.getproperty(“user.home”)}/datasets/amazon_reviews_parquet”
val tablename = s”amazon_reviews_${layoutoptstrategy}”
val outputpath = s”file:///tmp/hudi/$tablename”
def safetablename(s: string) = s.replace(-, _)
val commonopts =
map(
“hoodie.compact.inline” -> “false”,
“hoodie.bulk_insert.shuffle.parallelism” -> “10”
)
////////////////////////////////////////////////////////////////
// writing to hudi
////////////////////////////////////////////////////////////////
val df = spark.read.parquet(inputpath)
df.write.format(“hudi”)
.option(datasourcewriteoptions.table_type.key(), cow_table_type_opt_val)
.option(“hoodie.table.name”, tablename)
.option(precombine_field.key(), “review_id”)
.option(recordkey_field.key(), “review_id”)
.option(datasourcewriteoptions.partitionpath_field.key(), “product_category”)
.option(“hoodie.clustering.inline”, “true”)
.option(“hoodie.clustering.inline.max.commits”, “1”)
// note: small file limit is intentionally kept _above_ target file-size max threshold for clustering,
// to force re-clustering
.option(“hoodie.clustering.plan.strategy.small.file.limit”, string.valueof(1024 * 1024 * 1024)) // 1gb
.option(“hoodie.clustering.plan.strategy.target.file.max.bytes”, string.valueof(128 * 1024 * 1024)) // 128mb
// note: were increasing cap on number of file-groups produced as part of the clustering run to be able to accommodate for the
// whole dataset (~33gb)
.option(“hoodie.clustering.plan.strategy.max.num.groups”, string.valueof(4096))
.option(hoodieclusteringconfig.layout_optimize_enable.key, “true”)
.option(hoodieclusteringconfig.layout_optimize_strategy.key, layoutoptstrategy)
.option(hoodieclusteringconfig.plan_strategy_sort_columns.key, “product_id,customer_id”)
.option(datasourcewriteoptions.operation.key(), datasourcewriteoptions.bulk_insert_operation_opt_val)
.option(bulk_insert_sort_mode.key(), “none”)
.options(commonopts)
.mode(errorifexists)

3. 测试

每个单独的测试请在单独的 spark-shell 中运行,以避免缓存影响测试结果。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
////////////////////////////////////////////////////////////////
// reading
///////////////////////////////////////////////////////////////
// temp table w/ data skipping disabled
val readdf: dataframe =
spark.read.option(datasourcereadoptions.enable_data_skipping.key(), “false”).format(“hudi”).load(outputpath)
val rawsnapshottablename = safetablename(s”${tablename}_sql_snapshot”)
readdf.createorreplacetempview(rawsnapshottablename)
// temp table w/ data skipping enabled
val readdfskip: dataframe =
spark.read.option(datasourcereadoptions.enable_data_skipping.key(), “true”).format(“hudi”).load(outputpath)
val dataskippingsnapshottablename = safetablename(s”${tablename}_sql_snapshot_skipping”)
readdfskip.createorreplacetempview(dataskippingsnapshottablename)
// query 1: total votes by product_category, for 6 months
def runquery1(tablename: string) = {
// query 1: total votes by product_category, for 6 months
spark.sql(s”select sum(total_votes), product_category from $tablename where review_date > 2013-12-15 and review_date < 2014-06-01 group by product_category”).show()
}
// query 2: average star rating by product_id, for some product
def runquery2(tablename: string) = {
spark.sql(s”select avg(star_rating), product_id from $tablename where product_id in (b0184xc75u) group by product_id”).show()
}
// query 3: count number of reviews by customer_id for some 5 customers
def runquery3(tablename: string) = {
spark.sql(s”select count(*) as num_reviews, customer_id from $tablename where customer_id in (53096570,10046284,53096576,10000196,21700145) group by customer_id”).show()
}
//
// query 1: is a “wide” query and hence its expected to touch a lot of files
//
scala> runquery1(rawsnapshottablename)
+—————-+——————–+
|sum(total_votes)|    product_category|
+—————-+——————–+
|         1050944|                  pc|
|          867794|             kitchen|
|         1167489|                home|
|          927531|            wireless|
|            6861|               video|
|           39602| digital_video_games|
|          954924|digital_video_dow…|
|           81876|             luggage|
|          320536|         video_games|
|          817679|              sports|
|           11451|  mobile_electronics|
|          228739|  home_entertainment|
|         3769269|digital_ebook_pur…|
|          252273|                baby|
|          735042|             apparel|
|           49101|    major_appliances|
|          484732|             grocery|
|          285682|               tools|
|          459980|         electronics|
|          454258|            outdoors|
+—————-+——————–+
only showing top 20 rows
scala> runquery1(dataskippingsnapshottablename)
+—————-+——————–+
|sum(total_votes)|    product_category|
+—————-+——————–+
|         1050944|                  pc|
|          867794|             kitchen|
|         1167489|                home|
|          927531|            wireless|
|            6861|               video|
|           39602| digital_video_games|
|          954924|digital_video_dow…|
|           81876|             luggage|
|          320536|         video_games|
|          817679|              sports|
|           11451|  mobile_electronics|
|          228739|  home_entertainment|
|         3769269|digital_ebook_pur…|
|          252273|                baby|
|          735042|             apparel|
|           49101|    major_appliances|
|          484732|             grocery|
|          285682|               tools|
|          459980|         electronics|
|          454258|            outdoors|
+—————-+——————–+
only showing top 20 rows
//
// query 2: is a “pointwise” query and hence its expected that data-skipping should substantially reduce number
// of files scanned (as compared to baseline)
//
// note: that linear ordering (as compared to space-curve based on) will have similar effect on performance reducing
// total # of parquet files scanned, since were querying on the prefix of the ordering key
//
scala> runquery2(rawsnapshottablename)
+—————-+———-+
|avg(star_rating)|product_id|
+—————-+———-+
|             1.0|b0184xc75u|
+—————-+———-+
scala> runquery2(dataskippingsnapshottablename)
+—————-+———-+
|avg(star_rating)|product_id|
+—————-+———-+
|             1.0|b0184xc75u|
+—————-+———-+
//
// query 3: similar to q2, is a “pointwise” query, but querying other part of the ordering-key (product_id, customer_id)
// and hence its expected that data-skipping should substantially reduce number of files scanned (as compared to baseline, linear ordering).
//
// note: that linear ordering (as compared to space-curve based on) will _not_ have similar effect on performance reducing
// total # of parquet files scanned, since were not querying on the prefix of the ordering key
//
scala> runquery3(rawsnapshottablename)
+———–+———–+
|num_reviews|customer_id|
+———–+———–+
|         50|   53096570|
|          3|   53096576|
|         25|   10046284|
|          1|   10000196|
|         14|   21700145|
+———–+———–+
scala> runquery3(dataskippingsnapshottablename)
+———–+———–+
|num_reviews|customer_id|
+———–+———–+
|         50|   53096570|
|          3|   53096576|
|         25|   10046284|
|          1|   10000196|
|         14|   21700145|
+———–+———–+

4. 结果

我们总结了以下的测试结果

%小知识:Apache Hudi性能提升三倍的查询优化-6猿站网-插图

可以看到多列线性排序对于按列(q2、q3)以外的列进行过滤的查询不是很有效,这与空间填充曲线(z-order 和 hilbert)形成了非常明显的对比,后者将查询时间加快多达 3倍 。值得注意的是性能提升在很大程度上取决于基础数据和查询,在我们内部数据的基准测试中,能够实现超过 11倍 的查询性能改进!

5. 总结

apache hudi v0.10 为开源带来了新的布局优化功能 z-order 和 hilbert。 使用这些行业领先的布局优化技术可以为用户查询带来显着的性能提升和成本节约!

以上就是apache hudi性能提升三倍的查询优化的详细内容,更多关于apache hudi查询优化的资料请关注服务器之家其它相关文章!

原文链接:https://www.cnblogs.com/leesf456/p/15972685.html

声明: 猿站网有关资源均来自网络搜集与网友提供,任何涉及商业盈利目的的均不得使用,否则产生的一切后果将由您自己承担! 本平台资源仅供个人学习交流、测试使用 所有内容请在下载后24小时内删除,制止非法恶意传播,不对任何下载或转载者造成的危害负任何法律责任!也请大家支持、购置正版! 。本站一律禁止以任何方式发布或转载任何违法的相关信息访客发现请向站长举报,会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。本网站的资源部分来源于网络,如有侵权烦请发送邮件至:2697268773@qq.com进行处理。
建站知识

小知识:OnZoom基于Apache Hudi的一体架构实践解析

2023-3-10 15:51:09

建站知识

小知识:Apache Hudi灵活的Payload机制硬核解析

2023-3-10 16:06:01

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索