一、Hadoop
1.1 Map 端
- 增大环形缓冲区大小
- 增大环形缓冲区的溢写比例
- 使用 Combiner 预聚合
1.2 Reduce 端
-
合理设置 Map 和 Reduce 的个数
太多:会导致任务间竞争资源,造成任务等待超时。
太少:会造成 Task 等待。
-
设置 Map、Reduce 共存
-
规避使用 Reduce
-
增加每个 Reduce 去 Map 中拿数的并行度
-
增大 Reduce 的资源
-
使用压缩,减少 IO(LZO、Snappy)
1.3 数据倾斜
- Map 端进行预聚合
- 二次 MR(局部聚合 + 全局聚合)
- 增大 Reduce 的并行度
- 自定义分区器
二、Hive
2.1 Map 端
- 合并小文件
- 合理设置 Map 数据
- 开启 JVM 重用
- 调节 Map Task 的内存和 CU 资源大小
2.2 Reduce
- 合理设置 Reduce 数(生产文件数据大小)
- 调节 Reduce Task 的内存和 CU 资源大小
2.3 Join
- 大小表(Map Join)
- 大表大表(Sort Merge Bucket (SMB) Join)
2.4 数据倾斜优化
-
count(distinct xxx) 改为 group by+count
-
开启 skew 参数(partition 打散-预聚合 + 全局聚合)
-
Reduce 倾斜
- 过滤多余的数据 -- 不应影响业务数据
- key 打散,二次 MR
-
Join
-
大小表:Map Join
-
大表大表
- 单 Key:拆分 + union all
- 多 Key:打散 + 膨胀 + 局部 + 全局聚合
-
2.5 CBO 优化
- 谓词下推
三、Spark
3.1 资源优化
3.1.1 内存模型
可用内存
-
统一内存(60%)-- 动态
- 存储内存(Storage):广播变量、cache 缓存
- 执行内存(Execution):shuffle 产出的中间数据(可强制回收)
-
其他内存(40%):Spark 元数据或者自定义数据结构
预留内存:300M
3.1.2 资源优化
内存优化
每个 Executor 内存
- Storage 内存=广播变量 +(cache/Executor 数量)
- Executor 内存=Executor 核数(并发)*(总文件大小/并行度)
- Other 内存=自定义数据结构 * 每个 Executor 核数
案例:executor=4*100G/200=2G。100G 数据,并行度 200 个,每个 executor 4 个 core。经验值:128M -> 1G 内存
CPU 资源优化
1 core 处理 1-3 task
1 task 处理 1-5G 数据
参照:Spark 官方文档,建议设置并行度为总 core 的 2-3 倍为最佳
案例:已有资源 100G 数据,excutors:3,excutors-cores:4。并行度的设置=34(1~2)=12-24 个为合理。
3.2 并行度优化
- 调整算子并行度(repartition、coalecse、reduceByKey 和 groupByKey 等)
- 自定义分区器
3.3 代码优化
- 尽量复用 RDD,避免创建重复的 RDD
- 对重复使用的 RDD 使用持久化
- 避免使用 shuffle 类算子(广播 join)
- 使用序列化(Kryo)
3.4 数据倾斜优化
- 单表-key 打散,二次聚合
- Join-大小表-广播 Join
- Join-大大表-过滤大 key+union,局部加全局
- Join-大大表-大表打散-小表扩容,局部加全局
3.5 Map 端
-
Map 端预聚合(框架自己已经处理)
-
读取小文件优化
- 每个分区最大字节数:默认 128M。
- 打开文件开销:默认 4M(设置为接近小文件的大小)。
- 最大切片大小:min(分区最大字节数默认值),max(文件开销,计算数)
- 计算数=总文件大小/分区数
- 总文件大小=小文件数据总大小 + 小文件数量*文件开销(4M)
-
增加 Map 溢写时输出流的缓冲区(buffer)(默认:32KB)(5M,申请 2*前内存(无)-> 溢写)
3.6 Reduce 端
合理设置 Reduce 的数据
- reduce 数量=shuffle 后的分区数=并行度(和 core&并行度有关)
- 并行度和并发度,正常设置为:并行度=2-3 倍的并发度
小文件优化
-
Spark SQL 文件数=shuffle 的并行度=默认 200
-
Join 后的结果插入新表
- 使用 coalesce 算子缩小分区数(不影响 shuffle 的并行度)
- 调整 shuffle 的并行(会影响 shuffle 的并行度)
-
动态分区插入数据
- 没有 shuffle:主动 shuffle,使用 distribute by 分区字段进行文件合并(注意倾斜,有大 Key,打散拆分多个文件)
- 有 shuffle
其他优化
- 增大 reduce 端的拉取次数
- 合理利用 bypass -- 避免排序消耗(reduce read task(shuffle 分区数)<200+ 避免 map 端预聚合(map-side aggregation))
3.7 语法优化
- RBO-谓词下推
- RBO-行列过滤
- RBO-常量替换(先计算出常量值,其他不计算)
- 广播 Join
- CBO
- SMB Join(桶相等,Join 列=桶列)
3.8 Spark 3.0 AQE
- 动态合并 Shuffle 分区
- 动态切换 Join 策略
- 动态优化倾斜连接(Skew Joins)
四、Flink
4.1 特征
-
流批一体化(无界流,有界流)
-
Exactly-Once 状态一致性
- 两阶段提交 + 状态保存
- 端到端的精准一致性语义
-
状态管理
-
时间处理
-
支持窗口
-
高吞吐、高性能、低时延的实时流处理引擎
-
部署灵活性
4.2 内存优化
实际任务调整,默认:1core,4G。
4.3 并行度优化
Source 端
和 Kafka 分区一致
转换算子
- KeyBy 之前(一般不处理)
- KeyBy 之后(调整为 2 的整数次方)
Sink 端
- 输出到 Kafka(分区数)
- 数据库-性能评估
4.4 RocksDB 大状态优化
基于内存 + 磁盘,和 Hbase 存储相似。
读写顺序:
-
读:先 cache,再读磁盘。
-
写:先内存,再刷写磁盘。
优化点:
- 开启增量检查点
- 开启本地恢复
4.5 反压优化
- 调整资源
- 调整并行度
4.6 数据倾斜优化
- KeyBy 之后聚合-开启 MiniBatch+Local
- KeyBy 之前聚合-随机打撒-二次聚合
- KeyBy 后的开窗聚合-随机打散-二次窗口聚合
4.7 Flink SQL 优化
- 设置空闲状态保存时间(TTL(36 小时))
- 开启 MiniBatch(时间(5s)和数据(20000))
- 开启 count(distinct) 参数优化
- 开启窗口 MiniBatch 优化
评论区