在数据处理完成后,可以将数据存储回内部或外部系统中。但不建议直接将数据写入外部系统中,因为这可能会带来性能和可靠性的问题。Spark 允许你使用 DataFrameWriter API 写入数据。
1. 文件形式存储 #
df.write\
.format("parquet") \
.mode(saveMode) \
.option("path", "/data/flights") \
.save()
-
saveMode有四种:append:创建新的文件,不会覆盖或删除现有的文件overwrite:删掉现有的文件并创建新的文件errorIfExists:如果已于该文件,程序会引发错误,从而终止程序继续运行ignore:如果已有该文件,则不会执行任何操作
-
存储说明:
- 在存储数据时,还需要考虑文件的数量、文件的大小、分区、分桶和排序等问题。
- 可以使用
DataFrame.repartition(n)进行简单的重新分区,但这种分区可能不是理想的。 - 更好的方法是使用
DataFrameWriter.partitionBy(col1, col2),根据列的值进行分区, - 还可以使用
DataFrameWriter.bucketBy(n, col1, col2)进行分桶,并结合sortBy()创建排序的存储桶。 - 此外,可以使用
maxRecordsPerFile选项限制每个文件的记录数,这对于控制文件大小非常有用。
1.1. 示例 #
from pyspark.sql import SparkSession
from lib.logger import Log4J
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("DataSinkDemo") \
.getOrCreate()
logger = Log4J(spark)
flightTimeParquetDF = spark.read. \
format("parquet") \
.load("data/flight*.parquet")
flightTimeParquetDF.write \
.format("json") \
.mode("overwrite") \
.option("path", "dataSink/json/") \
.partitionBy("OP_CARRIER", "ORIGIN") \
.save()
- 数据集里有十个不同的 CARRIER,每个目录里还有不同的 ORIGIN 目录(这里就不全部展示了)。ORIGIN 目录里有存储相关的数据。
- 如果我们读这些数据的话,并只针对一个 CARRIER 进行筛选,那Spark SQL引擎可以通过这个存储格式来优化读取和过滤的操作,忽略其他的目录,只读取相关的目录。
- OP_CARRIER 和 ORIGIN 的信息不会存储在JSON文件里,因为两层目录的名字已经包含了相关的信息,避免造成数据冗余。
通常情况下,每个文件的大小建议控制在500MB到几GB之间。可以通过以下代码限制每个文件保存最多为 10000 条记录:
flightTimeParquetDF.write \
.format("json") \
.mode("overwrite") \
.option("path", "dataSink/json/") \
.partitionBy("OP_CARRIER", "ORIGIN") \
.option("maxRecordsPerFile", 10000)\
.save()
- 只需要加上
.option("maxRecordsPerFile", 10000)就可以限制每个文件保存最多为10000条记录。
2. 数据库形式存储 #
Spark 不仅仅是一组 API 和一个处理引擎,它本身就是一个数据库,因此可以在 Spark 中创建数据库并存储表和视图。 表可以存储为文件,并且格式可以自定义。表的元数据存储在 catalog 中。 Spark Tables 有两种类型:
- Managed Tables
- Spark 管理表里的数据以及元数据。
- 数据固定存储在
spark.sql.warehouse目录里。 - 使用
drop时,会同时删掉表里的数据和元数据。 - 比 Unmanaged Tables 多一些功能,比如 bucketing 和 sorting。
- Unmanaged Tables (External Tables)
- 表里的数据存储在任意地方,Spark 只管理元数据。
- 创建表时必须指定存储位置。
- 使用
drop时,Spark 不会删除数据文件,只会删除元数据。
2.1. 示例 #
-
首先我们需要通过
.enableHiveSupport()打开 Hive 支持,因为要用到 Hive Metastore。spark = SparkSession \ .builder \ .master("local[3]") \ .appName("SparkSQLTableDemo") \ .enableHiveSupport() \ .getOrCreate() -
Apache Spark 自带了一个默认数据库,这个数据库的名字叫“default”。
-
我们可以在 Spark 中创建自定义的数据库:
spark.sql("CREATE DATABASE IF NOT EXISTS AIRLINE_DB") -
写入数据库时,我们可以在表的名字前面加上数据库名字,从而存储到指定的数据库中。
flightTimeParquetDF.write \ .mode("overwrite") \ .saveAsTable("AIRLINE_DB.flight_data_tbl") -
或者设置当前的数据库
spark.catalog.setCurrentDatabase("AIRLINE_DB") flightTimeParquetDF.write \ .mode("overwrite") \ .saveAsTable("flight_data_tbl") -
分区存储
bucketBy()允许你限制分区的数量(有时,存储桶可以显著改进join操作)。sortBy()和bucketBy()一起使用创建排序的存储桶。- 如果想在本地查看存储的文件内容,可以在
.write后面加上.format("csv"),以 CSV 格式存储,可以方便本地查看。这里没加,所以会默认存储 Parquet 二进制文件格式。flightTimeParquetDF.write \ .mode("overwrite") \ .bucketBy(5, "OP_carrier", "ORIGIN") \ .sortBy("OP_carrier", "ORIGIN") \ .saveAsTable("AIRLINE_DB.flight_data_tbl")
-
在本地运行时,会在项目目录下创建
metastore_db和spark-warehouse目录分别存储元数据和表数据。 -
在云环境下,这两个目录都由集群管理员来配置,存在所有Spark应用程序的公共位置。