Apache Doris Colocate Join 原理实践教程
What Colocate Join
我们都知道Join的常见连接类型如下:
INNER JOINOUTER JOINCROSS JOINSEMI JOINANTI JOINJoin的常见算法实现包括:
Nested Loop JoinSort Merge JoinHash Join分布式系统实现连接数据分布的常用策略包括:
Shuffle JoinBroadcast JoinColocate/Local JoinColocate/Local Join是指多个节点加入时,没有数据移动和网络传输,每个节点只在本地加入。本地连接的前提是同一个连接键的数据分布在同一个节点。
Why Colocate Join
与混洗连接和广播连接相比,同位连接具有更高的性能,因为在查询期间没有数据的网络传输。在Doris的具体实现中,Colocate Join可以拥有比Shuffle Join更高的并发粒度,也可以显著提高Join的性能,后面会有说明。
How Colocate Join
核心思维
对于协同定位表,在任何情况下都应该保证数据的局部性。
具体包括:数据导入时保证数据本地性查询调度时保证数据本地性数据 balance 后保证数据本地性最复杂的实现是第三点:处理协同定位表的平衡。
术语定义
共生群
我们称一组具有相同共置属性的表为Group,下图中t1和t2具有相同的共置组。
协同定位父表
我们把决定一个组的数据分布的表叫做父表,下图中的t1就是同位父表。
协同定位子表
我们把一个组中除了父表以外的表都叫做子表,下图中的t2就是共置子表。
时段序列
如下所示,如果一个表有n个分区,那么每个分区的第m个桶的桶序列是m。
1导入数据时确保本地性。
Doris的分区如下:首先按照分区字段范围进行分区,然后按照指定的分布式Key Hash划分成桶:
所以导入数据时保证本地性的核心思想是两次映射。对于协同定位表,我们确保相同分布式键的数据映射到相同的Bucket Seq,然后确保相同Bucket Seq的Bucket映射到相同的BE。
具体来说,第一步:我们计算分布式密钥的哈希值,并对bucket num取模,以确保相同分布式密钥的数据映射到相同的Bucket Seq。
步骤2:将同一协同定位组下同一时段序列的所有时段映射到同一BE,如下所示:
Group 中所有 Table 的 Bucket Seq 和 BE 节点的映射关系和 Parent Table 一致Parent Table 中所有 Partition 的 Bucket Seq 和 BE 节点的映射关系和第一个 Partition 一致Parent Table 第一个 Partition 的 Bucket Seq 和 BE 节点的映射关系利用原生的 Round Robin 算法决定2协同定位连接查询计划
关于HashJoinFragment,由于Join的很多表都有数据局部性保证,所以可以去掉交换节点,避免网络传输,直接设置ScanNode为Hash Join节点的子节点。
3协同定位连接查询计划
调度目标:在协同定位连接中,所有ScanNode中具有相同存储桶序列的所有存储桶都被调度到同一个BE。
调度策略:第一个ScanNode的桶随机选择BE,其余scannode与第一个scan node一致。
4在时段序列级别的协同定位连接
目前,Doris的散列连接是服务器粒度的:
关于协同定位连接,由于同一个协同定位组下的同一个Bucket Seq的Bucket分布在同一个BE中,我们将连接的粒度从服务器粒度降低到Bucket Seq粒度:
5协同定位连接元数据维护
对于协同定位连接,我们需要维护以下核心元数据:
代码中,colocate group id 就是 colocate parent table idgroup2BackendsPerBucketSeq 代表每个 colocate group 中每个 bucket seq 映射到哪些 BE为了支持 balance,以及保证元数据的一致性,这些元数据都需要持久化6如何决定一个查询可以共存连接
Join 的 tables 是 colocate ableThe colocate group 是 stable 状态,没有 balancingJoin 的 Key 包含分桶的 Distributed Key7协同定位连接支持平衡
核心理念:
添加一个守护线程来处理并置表的平衡,让普通的平衡线程不处理并置表的平衡。
平衡时:
当添加、删除或关闭BE节点时。
天平的粒度:
正常余额的粒度是桶,但是对于同宿表,我们必须保证所有桶在同一个同宿组下的数据局部性,所以我们余额的单位是同宿组。
余额对查询的影响:
当共存组处于平衡状态时,共存连接将退化为原始的随机连接或广播连接。
平衡过程:
为需要复制或迁移的 Bucket 选择目标 BE标记 colocate group 的转态为 balancing对于需要复制或迁移的 Bucket,发起 Clone Job,Clone Job 会从 Bucket 的现有副本复制一个新副本目标 BE更新 backendsPerBucketSeq(维护 Bucket Seq 到 BE 映射关系的元数据)当一个 colocate group 下的所有 Clone Job 都完成时,标记 colocate group 的转态为 stable删除冗余的副本当一个BE节点被删除或长时间挂起时,选择目标BE的策略:
选择策略与正常平衡中的相同。考虑到集群的整体负载,尽量选择负载较低的BE。
添加BE节点时,选择目标BE的策略:
对于当前 colocate group,计算每个新增 BE 需要增加的 bucket seqs 个数:假如我们有 3 个 BE,8 个 bucket,每个 bucket 是 3 副本,则每个 BE 负责 8 个 bucket 副本,我们新增 1 个 BE 后,可以计算出每个 BE 负责的平均 bucket 副本数应该是 3 * 8 / 4 = 6,每个新增 BE 需要增加的 bucket seqs 个数为 6 / 1 = 6.对于每个 bucket seqs, 随机选择从哪个旧的 BE 迁移副本到新增的 BE。Colocate Join Performance
测试数据:
表A、B、C、B、C都有10天的数据,每天一个分区,每个分区有570万个数据。
测试集群:
4台低级物理机,每台24个CPU和96个内存
测试SQL:
SQL1:
select count(*) FROM A t1INNER JOIN [shuffle] B t5 ON ((t1.dt = t5.dt) AND (t1.id = t5.id))INNER JOIN [shuffle] C t6 ON ((t1.dt = t6.dt) AND (t1.id = t6.id))where t1.dt in (xxx days);SQL2:
select t1.dt, t1.id, t1.name, t1.second_id,t1.second_name,t5.id, t5.weight_time,t5.list,t6.ord_id, t6._idFROM A t1INNER JOIN B t5 ON ((t1.dt = t5.dt) AND (t1.id = t5.id))INNER JOIN C t6 ON ((t1.dt = t6.dt) AND (t1.id = t6.id))where t1.dt in (xxx days)limit 10000;SQL1的测试结果:
SQL2的测试结果:
可以看出,协同定位连接相对于混洗连接有明显的性能提升,而且集群规模越大,连接的数据量越多,协同定位连接的优势也会越明显。
How To Use Colocate Join
最新的社区代码已经支持Colocate Join,但默认情况下是关闭的。您只需在FE配置中将disable_colocate_join设置为false,即可启用Colocate Join。
使用时,只需要在创建表时添加colocate _ with属性即可。colocate_with的值可以设置为同一组colocate表中的任何一个,但是您需要确保首先创建colocate_with属性中的表。
如果需要共同定位连接表t1和t2,可以根据以下语句构建一个表:
CREATE TABLE `t1` ( `id` int(11) COMMENT "", `value` varchar(8) COMMENT "") ENGINE=OLAPDUPLICATE KEY(`id`)DISTRIBUTED BY HASH(`id`) BUCKETS 10PROPERTIES ("colocate_with" = "t1");CREATE TABLE `t2` ( `id` int(11) COMMENT "", `value` varchar(8) COMMENT "") ENGINE=OLAPDUPLICATE KEY(`id`)DISTRIBUTED BY HASH(`id`) BUCKETS 10PROPERTIES ("colocate_with" = "t1");协同定位连接目前受到限制
Colocate Table 必须是 OLAP 类型的表colocate_with 属性相同表的 BUCKET 数必须一样colocate_with 属性相同表的 副本数必须一样 (这个限制之后可能会去掉,但对用户应该没有实际影响)colocate_with 属性相同表的 DISTRIBUTED Columns 的数据类型必须一样协同定位连接适用场景
Colocate Join非常适合几个表按照同一个字段划分为桶,按照同一个字段进行高频连接的场景。比如很多电商的应用都是按照商家Id分桶,按照商家Id高频加入。
协同定位连接常见问题
总之,任何不能共址加入的场景都会自动退化为原来的洗牌加入或者广播加入。
Q1:你支持多表共存吗?
答:支持
Q2:你支持共存表和普通表连接吗?
答:支持
Q3:共存表支持用Join-bucket键连接吗?
答:是:随机加入或广播加入将用于不满足共存加入条件的加入。
Q4:如何根据协同定位连接来确定连接是否被执行?
在A: explain的结果中,如果Hash Join的子节点直接是OlapScanNode,并且没有交换节点,则说明是Colocate Join。
Q5:如何修改colocate_with属性?
a:更改表example_db.my_table集合(& quot与& quot= & quot目标表& quot);
问:如何禁用共存连接?
a:设置disable _ colocate _ join = true查询时,您可以禁用共存联接,并使用随机联接或广播联接。
总结
大部分支持Join的OLAP系统都会考虑支持Colocate Join,比如MemSQL、SnappyData、Ali AnalyticDB等。Ali AnalyticDB甚至在其数据模型中引入了表组的概念。一般来说,通过在数据导入、查询计划、查询调度和数据平衡中保证和考虑数据的局部性,同位连接可以在特定场景下显著地加快下行连接查询的速度,是一个非常有用的特性。
以上是Apache Doris Colocate Join原理练习教程的详细内容。关于Apache Doris co locate Join principle的更多信息,请关注脚本之家的其他相关文章!
如果您的问题还未解决可以联系站长付费协助。
有问题可以加入技术QQ群一起交流学习
本站vip会员 请加入无忧模板网 VIP群(50604020) PS:加入时备注用户名或昵称
普通注册会员或访客 请加入无忧模板网 技术交流群(50604130)
客服微信号:15898888535
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若内容侵犯了原著者的合法权益,可联系站长删除。