AI News HubLIVE
站内改写

PySpark优化:12种加速Spark作业的成熟技巧

现代数据管道每天处理海量结构化与非结构化数据。随着数据集增长,优化不佳的Spark作业会变得缓慢、昂贵且难以扩展。本文介绍了12种经实践证明的PySpark优化技术,包括列式文件格式、早期过滤、广播连接、自适应查询执行等,并附有实际示例和性能策略。

文章情报

工程师进阶

要点

  • 使用Parquet或ORC列式文件格式可显著提升I/O性能和压缩率
  • 尽早过滤数据和仅选择所需列以减少处理数据量
  • 利用广播连接优化小表与大表的连接操作
  • 启用自适应查询执行(AQE)以动态优化执行计划

为什么重要

这条新闻值得关注,因为使用Parquet或ORC列式文件格式可显著提升I/O性能和压缩率。

技术影响

可能影响模型选型、推理成本、产品能力和评测基准。

现代数据管道每天处理大量结构化和非结构化数据。随着数据集不断增长,未经优化的Spark作业会变得缓慢、成本高昂且难以扩展。常见问题包括执行时间长、数据shuffle过多、内存瓶颈以及连接操作效率低下。有效的PySpark优化可以显著提升性能、降低基础设施成本并提高集群效率。本文将为数据工程师介绍12种经实践证明的PySpark优化技术,并附带实际示例和性能策略。

Spark如何执行代码

在开始优化之前,了解Spark如何执行代码至关重要。许多开发者编写PySpark代码却不了解其底层机制,这会导致性能决策不佳。

**Spark架构**:Spark由Driver和Executors组成。Driver负责规划执行策略并监督所有操作,Executors则执行实际的计算任务。Spark将工作划分为Job、Stage和Task三个层次。一个Job由多个Stage组成,Stage则是无需网络shuffle即可运行的任务集合,而Task是处理单个分区的最小工作单元。

**惰性求值**:Spark的转换操作(如filter、select)不会立即执行,而是记录操作计划。只有当调用行动操作(如count、show)时,Spark才会优化并执行整个计划。这种机制使Spark能够重排操作、下推过滤条件并移除不必要的部分。

**理解执行计划**:使用explain()方法可以查看完整的查询执行计划,包括过滤下推、广播连接和shuffle操作。例如,通过explain(True)可以观察到PushedFilters,这意味着过滤在文件级别应用,是性能良好的标志。

12种PySpark优化技术

**技巧1:使用列式文件格式(如Parquet或ORC)** CSV和JSON是行式格式,读取单列时必须读取所有行,浪费I/O和CPU。Parquet和ORC是列式格式,支持列式存储、更优的压缩以及谓词下推。例如,当仅选择3列时,Spark会跳过其余47列。建议在分析工作负载中使用Parquet,在使用Hive或HBase时使用ORC,并搭配Snappy压缩。

**技巧2:尽早过滤数据** 谓词下推是指将过滤条件尽可能靠近数据源应用。早期过滤减少了后续操作(如连接、聚合、排序)处理的数据量,降低了内存、网络和CPU开销。

**技巧3:仅选择所需列** 避免使用select(*)或读取不必要的列,列式格式在读取时可以跳过未选择的列。

**技巧4:优化分区** 合理设置分区数,通常每个分区大小建议为100-200MB。过多分区导致任务开销,过少分区导致数据倾斜。使用repartition或coalesce调整分区。

**技巧5:对小表使用广播连接** 当一张表较小时(默认小于10MB),使用广播连接将小表复制到每个Executor,避免shuffle。可通过spark.sql.autoBroadcastJoinThreshold调整阈值。

**技巧6:启用自适应查询执行(AQE)** AQE在运行时动态优化执行计划,包括合并shuffle分区、优化连接策略和调整数据倾斜。在Spark 3.0及以上版本中启用spark.sql.adaptive.enabled=true。

**技巧7:尽可能避免Python UDF** Python UDF会导致数据在JVM和Python之间序列化,产生巨大开销。尽量使用内置函数或pandas UDF。

**技巧8:策略性缓存数据** 缓存重复使用的DataFrame或表,但避免过度缓存。使用cache()或persist(),并根据需求选择合适的存储级别。

**技巧9:高效处理数据倾斜** 数据倾斜会导致某些任务执行极慢。可通过增加分区、使用salting技术或调整连接键来处理。

**技巧10:最小化Shuffle操作** Shuffle是高开销操作,尽量通过优化连接、使用pre-shuffle聚合或调整分区来减少shuffle。

**技巧11:对重复连接使用分桶(Bucketing)** 分桶将数据按哈希值组织,在重复连接时可大幅减少shuffle。需要预先对表进行分桶并优化桶数。

**技巧12:调整Spark配置参数** 常见调整包括executor内存(spark.executor.memory)、shuffle分区数(spark.sql.shuffle.partitions)、并行度(spark.default.parallelism)等。需根据集群资源和作业特点进行调优。

端到端示例

结合上述技巧,优化一个典型作业:从Parquet读取数据,尽早过滤,仅选择必要列,对大表与小表连接使用广播,启用AQE,并缓存中间结果。

结论

PySpark优化需要理解Spark执行机制并应用正确技术。通过使用列式格式、早期过滤、广播连接、AQE等12种技巧,可以显著提升性能、降低成本。持续监控Spark UI并根据作业特点调整参数是关键。