Apache Doris Join 优化原理详解
背景 & 目标
掌握 Apache Doris Join 优化手段及其实现原理为代码阅读提供理论基础Doris 数据划分
不同的加入方式取决于对Doris中数据分区的透彻理解。所以这里先列出必要的基础知识。
首先,在Doris中,数据是以表格的形式进行逻辑描述的。
在Doris的存储引擎中,用户数据被横向划分为若干个数据片(也称为数据桶)。每个数位板包含几个数据行。各个平板的数据之间没有交集,物理上独立存储。
一个平板电脑只属于一个数据分区。一个分区包含几个平板电脑。因为平板电脑是物理独立存放的,所以可以认为隔板也是物理独立的。平板电脑是数据移动、复制和其他操作的最小物理存储单元。
几块隔板组成一张桌子。分区可以被视为最小的逻辑管理单元。只能对一个分区进行数据的导入和删除。
Doris支持两层数据分区。第一层是Partition,支持范围和列表的划分。第二层是Bucket(Tablet),只支持哈希分区。您也可以只使用一层分区。当使用一层分区时,仅支持桶分区。
解释下图中桌子、分区和桶(平板电脑)之间的关系:
Table 按照 Range 的方式按照 date 字段进行分区,得到了 N 个 Partition每个 Partition 通过相同的 Hash 方式将其中的数据划分为 M 个 Bucket(Tablet)从逻辑上来说,Bucket 1 可以包含 N 个 Partition 中划分得到的数据,比如下图中的 Tablet 11、Tablet 21、Tablet N1特别注意:
Doris中分区和桶的定义可能与其他一些数据库系统不同。下面是一个特定的表构建语句作为示例:
CREATE TABLE IF NOT EXISTS example_db.expamle_range_tbl( `user_id` LARGEINT NOT NULL COMMENT "用户id", `date` DATE NOT NULL COMMENT "数据灌入日期时间", `timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳", `city` VARCHAR(20) COMMENT "用户所在城市", `age` SMALLINT COMMENT "用户年龄", `sex` TINYINT COMMENT "用户性别", `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间")ENGINE=OLAPAGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)PARTITION BY RANGE(`date`)( PARTITION `p201701` VALUES LESS THAN ("2017-02-01"), PARTITION `p201702` VALUES LESS THAN ("2017-03-01"), PARTITION `p201703` VALUES LESS THAN ("2017-04-01"))DISTRIBUTED BY HASH(`user_id`) BUCKETS 16PROPERTIES( "replication_num" = "3");绿色:分区,在本例中,日期字段用于分区。
蓝色突出显示:Bucket,在本例中,user_id字段用作分发列表。
划分
Partition 列可以指定一列或多列,分区列必须为 KEY 列分区数量理论上没有上限当不使用 Partition 建表时,系统会自动生成一个和表名同名的,全值范围的 Partition。该 Partition 对用户不可见,并且不可删改创建分区时,不能添加具有重叠范围的分区。
有两种分区方式:
分区方式一般用法Range通常按时间分区,以方便地管理新旧数据List支持的类型更丰富,分区值为枚举值。只有当数据为目标分区枚举值其中之一时,才可以命中分区水桶
如果使用了 Partition,则 DISTRIBUTED 语句描述的是数据在各个分区内的划分规则。如果不使用 Partition,则描述的是对整个表的数据划分规则分桶列的选择,是在 查询吞吐 和 查询并发 之间的一种权衡:如果选择多个分桶列,则数据分布更均匀。如果一个查询条件不包含所有分桶列的等值条件(意味着无法做桶裁剪以减少数据查询范围),那么该查询会触发所有分桶同时扫描,这样查询的吞吐会增加,单个查询的延迟随之降低。这个方式适合大吞吐低并发的查询场景如果仅选择一个或少数分桶列,则对应的点查询可以仅触发一个分桶扫描(意味着可以做桶裁剪以减少数据查询范围)。此时,当多个点查询并发时,这些查询有较大的概率分别触发不同的分桶扫描,各个查询之间的 IO 影响较小,尤其当不同桶分布在不同磁盘上时),所以这种方式适合高并发的点查询场景分桶的数量理论上没有上限Join 方式
纵观全局
作为一个分布式MPP数据库,在加入的过程中需要对数据进行混洗。需要对数据进行分割和调度,以确保最终的连接结果是正确的。举个简单的例子,假设关系S和R是join,N代表join计算中涉及的节点数;t代表关系的元组数。
目前多丽丝支持以上四种加入方式。这四种方法的灵活性和适用性由高到低,对数据分布的要求也越来越严格。但是通过减少网络开销,Join计算的性能越来越好。
连接方式的选择是FE生成分布式计划阶段需要考虑的问题之一。FE做分布式计划的时候,优先级顺序是(总是优先选择期望性能最好的):协同定位Join->:Bucket Shuffle Join->;广播加入-& gt;随机加入.
共存和桶混洗不能兼得。当它们无法使用时,多丽丝会自动尝试广播加入,如果估计的小桌子太大,会自动切换到洗牌加入。
但是,用户可以通过显式提示来强制所需的联接类型,例如:
select * from test join [shuffle] baseall on test.k1 = baseall.k1;广播/随机加入
原理比较简单,这里就不展开了。
桶洗牌加入
当Join条件命中左表的数据分发列表时,广播和Shuffle Join会产生不必要的网络传输开销。Bucket Shuffle Join旨在解决这类问题。通过优化左表的本地计算,可以减少左表数据在节点间的传输时间,从而加快查询速度。
在上面的例子中,Join的等价表达式命中了表A(左边的表)的数据分布列表。Bucket Shuffle Join会根据表A的数据分布信息将表B(右表)的数据发送到表A对应的数据计算节点。
定性分析:
降低了网络与内存开销(相比 Broadcast 以及 Shuffle Join 都不会更差),使一类 Join 查询有更好的性能。尤其是当 FE 能够执行左表的分区裁剪与桶裁剪时与 Colocate Join 不同,它对于表的数据分布方式没有侵入性,对于用户来说是透明的。对于表的数据分布没有强制性的要求(体现在建表语句中不需要显式地设置 colocate_with 属性),不容易导致数据倾斜的问题可以为 Join Reorder 提供更多可能的优化空间计划规则
Bucket Shuffle Join 只生效于 Join 条件为等值的场景,原因与 Colocate Join 类似,它们都依赖 Hash 来计算确定的数据分布在等值 Join 条件之中包含两张表的分桶列,当左表的分桶列为等值的 Join 条件时,它有很大概率会被规划为 Bucket Shuffle Join由于不同的数据类型的 Hash 值计算结果不同,所以 Bucket Shuffle Join 要求左表的分桶列的类型与右表等值 Join 列的类型需要保持一致,否则无法进行对应的规划Bucket Shuffle Join 只作用于 Doris 原生的 OLAP 表,对于 ODBC,MySQL,ES 等外表,当其作为左表时是无法规划生效的对于分区表,由于每一个分区的数据分布规则可能不同,所以 Bucket Shuffle Join 只能保证左表为单分区时生效。所以在 SQL 执行之中,需要尽量使用 where 条件使分区裁剪的策略能够生效假如左表为 Colocate 的表,那么它每个分区的数据分布规则是确定的,Bucket Shuffle Join 能在Colocate 表上表现更好共址联接
可以理解为在数据分布满足一定条件的前提下,减少所有不必要的网络传输开销,实现完全计算本地化,加快查询速度。同时,由于没有网络传输开销,be节点可以有更高的并发性,从而进一步提高加入性能。
要理解这种算法,首先需要知道两个术语:
Colocation Group(CG):一个 CG 中会包含一张及以上的 Table。在同一个 Group 内的 Table 有着相同的 Colocation Group Schema,并且有着相同的数据分片分布Colocation Group Schema(CGS):用于描述一个 CG 中的 Table,和 Colocation 相关的通用 Schema 信息。包括分桶列类型,分桶数以及副本数等桶序列的概念:
根据桶列值散列和桶的数量,一个表的数据最终将落入一个桶中。假设一个表的桶号是8,那么就有[0,1,2,3,4,5,6,7] 8个桶。我们称这样的序列为buckets序列。每个桶中会有一个或多个数据板。当表是单个分区表时,桶中只有一个Tablet。如果是多分区表,就会有多个(因为多个分区中的不同平板会被分到同一个桶中)。
Colorjoin功能是将一组具有相同CG的表格组合成一个CG。并且确保对应于这些表的数据切片将落在相同的BE节点上。当CG中的表以桶列连接时,本地数据可以直接连接,这样可以减少节点间数据的传输时间。
[div][div]
因此,关键问题就变成了“如何保证这些表对应的数据切片会落在同一个BE节点上?”?」
以下属性必须在同一个CG through表中实现:
分桶列和分桶数桶列,即DISTRIBUTED BY HASH(col1,col2,...)在建表的语句中。桶列决定了哪些列用于将表中的数据散列到不同的表中。同一个CG中的表必须保证桶列的类型和数量完全相同,桶号相同,这样才能保证多个表的数据片能够被一一分配和控制。
副本数同一CG中所有表的所有分区的副本数量必须相同。如果不一致,可能会出现某个Tablet的副本,而同一个BE上没有其他table slices的副本。但是,对于同一CG中的表,分区的数量和范围以及分区列的类型不需要一致。
[div][div]
在固定了bucket列和bucket号之后,同一CG中的表将具有相同的BucketsSequence。但是,副本的数量决定了每个桶中药片的副本数量,以及它们存储在哪个BEs上。假设BucketsSequence是[0,1,2,3,4,5,6,7]并且有[a,b,c,d]四个BE节点。那么可能的数据分布如下:
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+| 0 | | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 |+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+| A | | B | | C | | D | | A | | B | | C | | D || | | | | | | | | | | | | | | || B | | C | | D | | A | | B | | C | | D | | A || | | | | | | | | | | | | | | || C | | D | | A | | B | | C | | D | | A | | B |+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+CG中所有表的数据都会按照上述规则均匀分布,从而保证所有桶和列值相同的数据都在同一个BE节点上,可以进行本地数据Join。其核心思想是“二次映射”,保证同一个分布式Key的数据会被映射到同一个桶序列,然后保证桶序列对应的桶被映射到同一个BE节点:
您可以通过查询计划检查查询是否使用了协同定位连接。同时,计划交换节点被删除,ScanNode将被直接设置为Hash Join节点的子节点。
DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);-- 在 Hash Join 节点会显示:-- colocate: true/falseColocate Join非常适合将几个表按照相同的字段划分为桶,并按照固定的字段高频连接的场景。这样可以提前将数据存储在同一个桶中,实现本地计算。
Runtime Filter 优化
Doris在进行哈希连接计算时会在右表中构建一个哈希表,左表会流经右表的哈希表得到连接结果。运行时过滤器充分利用哈希表构建阶段对表做一些额外的事情。
在右表生成哈希表时,还会生成一个基于哈希表数据的过滤条件,然后下推到左表的数据扫描节点。这样,Doris可以在运行时过滤数据。
如果左表是大表,右表是小表,那么下推到左表的过滤条件可以在读取数据时提前过滤掉Join层的大部分待过滤数据(如果可以下推到引擎层,也可以使用Doris的键列过滤延迟物化),从而大大提高Join查询的性能。
运行时过滤器在查询规划期间生成,内置于HashJoinNode中,并在ScanNode中应用。比如T1(行数10w)和T2(行数2k)的Join运算:
| > HashJoinNode <| | || | 100000 | 2000| | || OlapScanNode OlapScanNode| ^ ^ | | 100000 | 2000| T1 T2|显然,扫描T2数据比T1快得多。如果我们在扫描T1之前主动等待一段时间,在T2把扫描的数据记录交给hashjonode之后,hashjonode根据T2数据计算一个过滤条件,比如T2数据的最大值和最小值,或者构造一个Bloom Filter。然后把这个过滤条件发送给等待扫描T1的ScanNode,后者应用这个过滤条件,把过滤后的数据交给HashJoinNode,这样就减少了探针哈希表的次数和网络开销。该过滤条件为运行时过滤,效果如下:
| > HashJoinNode <| | || | 6000 | 2000| | || OlapScanNode OlapScanNode| ^ ^ | | 100000 | 2000| T1 T2|如果可以将运行时过滤器下推到存储引擎,在某些情况下,可以使用索引(例如连接列是关键列,可以使用延迟物化能力)直接减少扫描数据量,从而大大减少扫描时间。效果如下:
| > HashJoinNode <| | || | 6000 | 2000| | || OlapScanNode OlapScanNode| ^ ^ | | 6000 | 2000| T1 T2|可以看出,与谓词下推和分区裁剪不同,运行时过滤是运行时动态生成的过滤条件,即在查询运行时,解析连接条件确定过滤表达式,并将表达式下推至读取左表的ScanNode,从而减少扫描数据量,进一步减少探针哈希表数量,避免不必要的IO和网络传输。由于其运行时有效的特性,它被正式视为自适应查询执行的应用程序。
根据上面的例子可以推断,当场景满足以下条件时,使用运行时滤镜的效果会更好:
左表大右表小(当右表上还有额外的过滤条件会更理想),因为构建 Runtime Filter 是需要承担计算成本的,包括一些内存的开销,而开销直接取决于右表的实际数据量左右表 Join 出来的结果很少,说明通过 Runtime Filter 可以过滤掉左表的绝大部分数据Doris支持三种运行时过滤器:
一种是 IN,很好理解,将一个 hashset 下推到数据扫描节点。第二种就是 BloomFilter,就是利用哈希表的数据构造一个 BloomFilter,然后把这个 BloomFilter 下推到查询数据的扫描节点。最后一种就是 MinMax,就是个 Range 范围,通过右表数据确定 Range 范围之后,下推给数据扫描节点。工作原理和优缺点总结如下:
Runtime Filter 类型工作原理适用场景优点缺点IN子查询的方式,实现上是将一个 Hashset 下推到 Scan 节点Broadcast Join开销小,过滤效果明显且快速右表超过一定数据量时会失效,目前 Doris 配置的阈值是 1024Min/Max通过右表构建一个 Range 范围,然后将它下推到 Scan 节点通用开销小仅对数值类型有效果;对数值以外类型无法使用BloomFilter通过右表构建一个 BloomFilter,然后将它下推到 Scan 节点通用通用性较好,适用于各种类型、效果也较好配置比较复杂且计算成本较高;当过滤率较低或者左表数据较少时,可能导致性能降低一些使用注意事项(比较细节,考虑后面结合代码进一步理解):
打开运行时过滤器后,左表中的ScanNode在扫描数据前,会为分配给自己的每个运行时过滤器等待一段时间,也就是说,如果ScanNode被分配了三个运行时过滤器,最多等待3000ms。
因为构建和合并Runtimefilters需要时间,所以ScanNode会尝试将在等待时间内到达的Runtimefilters推送到存储引擎。如果超过等待时间,ScanNode将使用已经到达的Runtimefilters直接开始扫描数据。
如果运行时过滤器在ScanNode开始扫描之后到达,则ScanNode不会将运行时过滤器下推到存储引擎,而是基于ScanNode上的运行时过滤器过滤从存储引擎扫描的数据,并且不会将运行时过滤器应用于先前扫描的数据,这样获得的中间数据的大小将大于最优解,但可以避免严重恶化。
如果集群很忙,并且集群上有许多资源密集型或耗时的查询,您可以考虑增加等待时间,以避免复杂查询错过优化机会。如果集群负载较轻,并且集群上有许多只需要几秒钟的小查询,那么可以考虑减少等待时间,以避免每个查询增加1s的延迟。
Join Reorder 优化
有了前面两个表连接的运行时过滤器铺垫,再加上连接重新排序的优化,逻辑关系就可以理顺了。
Doris目前的Join Reorder算法是基于RBO的,其逻辑描述如下:
尽量让大表跟小表做 Join,它生成的中间结果是尽可能小的把有条件的 Join 表往前放,也就是说尽量让有条件的 Join 表进行过滤Hash Join 的优先级高于 Nest Loop Join,因为 Hash join 本身是比 Nest Loop Join 快很多的可以发现,前两个是朝着让“右表”更小的方向优化的,而最后一个是从算法的性能上考虑的。
Join 调优建议
Join 列最好是相同的简单类型;同类型避免 Cast 操作,简单类型则有不错的 Join 计算性能Join 列最好是 Key 列,原因是 Key 列能够充分利用 Doris 延迟物化的特性,减少 IO 提升性能大表之间的 Join 最好能够利用上 Colocate,相当于已经做好了预 Shuffle,实际查询的时候可以直接 Join 计算不再有 Shuffle 操作,彻底避免了大表的 Shuffle 网络开销利用 Runtime Filter,Join 过滤性高时效果显著。根据 3 种 Runtime Filter 特点选择最适合的涉及多表 Join,需要判断 Join 的合理性。尽量保证“左大右小”的原则,HashJoin 优于 NLJ。必要时可以通过 SQL Rewrite,通过 Hint 来调整 Join 顺序裁判员
https://www.jb51.net/article/266004.htm
https://www.jb51.net/article/266000.htm
以上是Apache Doris加入优化原理的详细内容。更多关于Apache Doris Join优化的信息,请关注脚本之家的其他相关文章!
如果您的问题还未解决可以联系站长付费协助。
有问题可以加入技术QQ群一起交流学习
本站vip会员 请加入无忧模板网 VIP群(50604020) PS:加入时备注用户名或昵称
普通注册会员或访客 请加入无忧模板网 技术交流群(50604130)
客服微信号:15898888535
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若内容侵犯了原著者的合法权益,可联系站长删除。