Flink SQL 在字节跳动的优化与实践

中台 数据  收藏
0 / 251

摘要:本文由 Apache Flink Committer,字节跳动架构研发工程师李本超分享,以四个章节来介绍 Flink 在字节的应用实战。 内容如下:

  1. 整体介绍

  2. 实践优化

  3. 流批一体

  4. 未来规划

Tips:点击文末「阅读原文」可查看作者原版分享视频~

一、整体介绍

图片

2018 年 12 月 Blink 宣布开源,经历了约一年的时间 Flink 1.9 于 2019 年 8 月 22 发布。在 Flink 1.9 发布之前字节跳动内部基于 master 分支进行内部的 SQL 平台构建。经历了 2~3 个月的时间字节内部在 19 年 10 月份发布了基于 Flink 1.9 的 Blink planner 构建的 Streaming SQL 平台,并进行内部推广。在这个过程中发现了一些比较有意思的需求场景,以及一些较为奇怪的 BUG。

基于 1.9 的 Flink SQL 扩展

虽然最新的 Flink 版本已经支持 SQL 的 DDL,但 Flink 1.9 并不支持。字节内部基于 Flink 1.9 进行了 DDL 的扩展支持以下语法:

  1. create table

  2. create view

  3. create function

  4. add resource

同时 Flink 1.9 版本不支持的 watermark 定义在 DDL 扩展后也支持了。

我们在推荐大家尽量的去用 SQL 表达作业时收到很多“SQL 无法表达复杂的业务逻辑”的反馈。时间久了发现其实很多用户所谓的复杂业务逻辑有的是做一些外部的 RPC 调用,字节内部针对这个场景做了一个 RPC 的维表和 sink,让用户可以去读写 RPC 服务,极大的扩展了 SQL 的使用场景,包括 FaaS 其实跟 RPC 也是类似的。在字节内部添加了 Redis/Abase/Bytable/ByteSQL/RPC/FaaS 等维表的支持。

同时还实现了多个内部使用的 connectors:

  1. source: RocketMQ

  2. sink: RocketMQ/ClickHouse/Doris/LogHouse/Redis/Abase/Bytable/ByteSQL/RPC/Print/Metrics

并且为 connector 开发了配套的 format:PB/Binlog/Bytes。

在线的界面化 SQL 平台

图片

除了对 Flink 本身功能的扩展,字节内部也上线了一个 SQL 平台,支持以下功能:

  • SQL 编辑

  • SQL 解析

  • SQL 调试

  • 自定义 UDF 和 Connector

  • 版本控制

  • 任务管理

二、实践优化

除了对功能的扩展,针对 Flink 1.9 SQL 的不足之处也做了一些优化。

Window 性能优化

1、支持了 window Mini-Batch

Mini-Batch 是 Blink planner 的一个比较有特色的功能,其主要思想是积攒一批数据,再进行一次状态访问,达到减少访问状态的次数降低序列化反序列化的开销。这个优化主要是在 RocksDB 的场景。如果是 Heap 状态 Mini-Batch 并没什么优化。在一些典型的业务场景中,得到的反馈是能减少 20~30% 左右的 CPU 开销。

2、扩展 window 类型

目前 SQL 中的三种内置 window,滚动窗口、滑动窗口、session 窗口,这三种语意的窗口无法满足一些用户场景的需求。比如在直播的场景,分析师想统计一个主播在开播之后,每一个小时的 UV(Unique Visitor)、GMV(Gross Merchandise Volume) 等指标。自然的滚动窗口的划分方式并不能够满足用户的需求,字节内部就做了一些定制的窗口来满足用户的一些共性需求。

-- my_window 为自定义的窗口,满足特定的划分方式
SELECT
room_id,
COUNT(DISTINCT user_id)
FROM MySource
GROUP BY
room_id,
my_window(ts, INTERVAL '1' HOURS)

3、window offset

这是一个较为通用的功能,在 Datastream API 层是支持的,但 SQL 中并没有。这里有个比较有意思的场景,用户想要开一周的窗口,一周的窗口变成了从周四开始的非自然周。因为谁也不会想到 1970 年 1 月 1 号那天居然是周四。在加入了 offset 的支持后就可以支持正确的自然周窗口。

SELECT
room_id,
COUNT(DISTINCT user_id)
FROM MySource
GROUP BY
room_id,
TUMBLE(ts, INTERVAL '7' DAY, INTERVAL '3', DAY)

维表优化

1、延迟 Join

维表 Join 的场景下因为维表经常发生变化尤其是新增维度,而 Join 操作发生在维度新增之前,经常导致关联不上。

所以用户希望如果 Join 不到,则暂时将数据缓存起来之后再进行尝试,并且可以控制尝试次数,能够自定义延迟 Join 的规则。这个需求场景不单单在字节内部,社区的很多同学也有类似的需求。

基于上面的场景实现了延迟 Join 功能,添加了一个可以支持延迟 Join 维表的算子。当 Join 没有命中,local cache 不会缓存空的结果,同时将数据暂时保存在一个状态中,之后根据设置定时器以及它的重试次数进行重试。

图片

2、维表 Keyby 功能

图片

通过拓扑我们发现 Cacl 算子和 lookUpJoin 算子是 chain 在一起的。因为它没有一个 key 的语义。

当作业并行度比较大,每一个维表 Join 的 subtask,访问的是所有的缓存空间,这样对缓存来说有很大的压力。

但观察 Join 的 SQL,等值 Join 是天然具有 Hash 属性的。直接开放了配置,运行用户直接把维表 Join 的 key 作为 Hash 的条件,将数据进行分区。这样就能保证下游每一个算子的 subtask 之间的访问空间是独立的,这样可以大大的提升开始的缓存命中率。

除了以上的优化,还有两点目前正在开发的维表优化。

  1. 广播维表:有些场景下维表比较小,而且更新不频繁,但作业的 QPS 特别高。如果依然访问外部系统进行 Join,那么压力会非常大。并且当作业 Failover 的时候 local cache 会全部失效,进而又对外部系统造成很大访问压力。那么改进的方案是定期全量 scan 维表,通过Join key hash 的方式发送到下游,更新每个维表 subtask 的缓存。

  2. Mini-Batch:主要针对一些 I/O 请求比较高,系统又支持 batch 请求的能力,比如说 RPC、HBase、Redis 等。以往的方式都是逐条的请求,且 Async I/O 只能解决 I/O 延迟的问题,并不能解决访问量的问题。通过实现 Mini-Batch 版本的维表算子,大量降低维表关联访问外部存储次数。

Join 优化

目前 Flink 支持的三种 Join 方式;分别是 Interval Join、Regular Join、Temporal Table Function。

前两种语义是一样的流和流 Join。而 Temporal Table 是流和表的的 Join,右边的流会以主键的形式形成一张表,左边的流去 Join 这张表,这样一次 Join 只能有一条数据参与并且只返回一个结果。而不是有多少条都能 Join 到。

它们之间的区别列了几点:

图片

可以看到三种 Join 方式都有它本身的一些缺陷。

  1. Interval Join 目前使用上的缺陷是它会产生一个 out join 数据和 watermark 乱序的情况。

  2. Regular Join 的话,它最大的缺陷是 retract 放大(之后会详细说明这个问题)。

  3. Temporal table function 的问题较其它多一些,有三个问题。

  • 不支持 DDl

  • 不支持 out join 的语义 (FLINK-7865 的限制)

  • 右侧数据断流导致 watermark 不更新,下游无法正确计算 (FLINK-18934)

对于以上的不足之处字节内部都做了对应的修改。

增强 Checkpoint 恢复能力

对于 SQL 作业来说一旦发生条件变化都很难从 checkpoint 中恢复。

SQL 作业确实从 checkpoint 恢复的能力比较弱,因为有时候做一些看起来不太影响 checkpoint 的修改,它仍然无法恢复。无法恢复主要有两点;

  • 第一点:operate ID 是自动生成的,然后因为某些原因导致它生成的 ID 改变了。

  • 第二点:算子的计算的逻辑发生了改变,即算子内部的状态的定义发生了变化。

例子1:并行度发生修改导致无法恢复。

图片

source 是一个最常见的有状态的算子,source 如果和之后的算子的 operator chain 逻辑发生了改变,是完全无法恢复的。

下图左上是正常的社区版的作业会产生的一个逻辑, source 和后面的并行度一样的算子会被 chain 在一起,用户是无法去改变的。但算子并行度是常会会发生修改,比如说 source 由原来的 100 修改为 50,cacl 的并发是 100。此时 chain 的逻辑就会发生变化。

图片

针对这种情况,字节内部做了修改,允许用户去配置,即使 source 的并行度跟后面整体的作业的并行度是一样的,也让其不与之后的算子 chain 在一起。

例子2:DAG 改变导致无法恢复。

图片

这是一种比较特殊的情况,有一条 SQL (上图),可以看到 source 没有发生变化,之后的三个聚合互相之间没有关系,状态竟然也是无法恢复。

作业之所以无法恢复,是因为 operator ID 生成规则导致的。目前 SQL 中 operator ID 的生成的规则与上游、本身配置以及下游可以 chain 在一起的算子的数量都有关系。 因为新增指标,会导致新增一个 Calc 的下游节点,进而导致 operator ID 发生变化。

为了处理这种情况,支持了一种特殊的配置模式,允许用户配置生成 operator ID 的时候可以忽略下游 chain 在一起算子数量的条件。

例子3:新增聚合指标导致无法恢复

这块是用户诉求最大的,也是最复杂的部分。用户期望新增一些聚合指标后,原来的指标要能从 checkpoint 中恢复。

图片

** 四、未来工作和规划**

**优化 **retract 放大问题

图片

什么是 retract 放大?

上图有 4 张表,第一张表进行去重操作 (Dedup),之后分别和另外三张表做 Join。逻辑比较简单,表 A 输入(A1),最后产出 (A1,B1,C1,D1) 的结果。

当表 A 输入一个 A2,因为 Dedup 算子,导致数据需要去重,则向下游发送一个撤回 A1 的操作 -(A1) 和一个新增 A2 的操作 +(A2)。第一个 Join 算子收到 -(A1) 后会将 -(A1) 变成 -(A1,B1) 和 +(null,B1)(为了保持它认为的正确语义) 发送到下游。之后又收到了 +(A2) ,则又向下游发送 -(null,B1) 和 +(A2,B1) 这样操作就放大了两倍。再经由下游的算子操作会一直被放大,到最终的 sink 输出可能会被放大 1000 倍之多。

图片

如何解决?

将原先 retract 的两条数据变成一条 changelog 的格式数据,在算子之间传递。算子接收到 changelog 后处理变更,然后仅仅向下游发送一个变更 changelog 即可。

未来规划

图片

1.功能优化

  • 支持所有类型聚合指标变更的 checkpoint 恢复能力

  • window local-global

  • 事件时间的 Fast Emit

  • 广播维表

  • 更多算子的 Mini-Batch 支持:维表,TopN,Join 等

  • 全面兼容 Hive SQL 语法

2.业务扩展

  • 进一步推动流式 SQL 达到 80%

  • 探索落地流批一体产品形态

  • 推动实时数仓标准化