Part 2 - PySpark: DataFrames, Spark SQL, and Schema Management
Published on Thursday, Mar 6, 2025
1 min read
Recap: PySpark Foundations
In Part 1 of this PySpark series, we explored the basics of Apache Spark, focusing on RDDs (Resilient Distributed Datasets), transformations, actions, and the underlying execution model.
We discussed the importance of partitions, the difference between narrow and wide transformations, and how Spark jobs, stages, and tasks are orchestrated for distributed data processing.
If you’re new to Spark or want a refresher on RDDs, transformations, and actions, check out the previous post.
Part 2: Higher-Level APIs in Apache Spark
As Spark evolved, so did its APIs. While RDDs provide fine-grained control, most real-world data engineering and analytics tasks benefit from higher-level abstractions: DataFrames and Spark SQL. These APIs offer powerful, expressive, and optimized ways to work with structured data.
Why Move Beyond RDDs?
- Productivity: DataFrames and Spark SQL allow you to write less code for complex tasks.
- Performance: Spark’s Catalyst optimizer can optimize DataFrame and SQL queries for better performance.
- Interoperability: DataFrames are easily convertible to SQL tables and vice versa, enabling seamless integration with BI tools and SQL workflows.
Working with DataFrames
A DataFrame is a distributed collection of data organized into named columns, similar to a table in a relational database.
- Load the data file and create a Spark Dataframe
- Apply Transformations
- Write the results back to Storage. (Spark Session is an entry point to the Spark Cluster in case of Higher Level APIs. Spark Context is for lower level RDDs)
Creating a DataFrame
The standard way to create a DataFrame from a CSV file:
df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/path/to/your/file.csv")
- SparkSession is the entry point for DataFrame and SQL functionality.
- SparkContext is used for lower-level RDD operations.
DataFrames and SQL Tables: Interoperability
DataFrames and Spark SQL tables are interconvertible:
orders_df.createOrReplaceTempView("orders")
- orders_df is your DataFrame.
- createOrReplaceTempView registers it as a temporary SQL table called “orders”.
Now, you can run SQL queries directly:
filtered_df = spark.sql("SELECT * FROM orders WHERE order_status = 'Closed'")
To convert a Spark SQL table back to a DataFrame:
orders_df = spark.read.table("orders")
orders_df.show()
Temporary Views: Local vs Global
Spark provides several ways to register DataFrames as tables:
- createOrReplaceTempView: Creates or replaces a temporary view (session-scoped).
- createTempView: Errors out if the view already exists.
- createGlobalTempView: Creates a global view accessible across Spark sessions.
- createOrReplaceGlobalTempView: Replaces any existing global view.
Managing Databases and Tables
- Create a Database:
spark.sql("CREATE DATABASE IF NOT EXISTS my_database")
- Show Databases:
spark.sql("SHOW DATABASES").show()
- Show Tables:
spark.sql("SHOW TABLES").show()
DataFrame API vs Spark SQL API: Practical Examples
Let’s see how common analytics tasks can be performed using both APIs.
- Top 15 Customers by Number of Orders
-
DataFrame API:
result = orders_df.groupBy("customer_id").count().sort("count", ascending=False).limit(15)
-
Spark SQL:
result = spark.sql(""" SELECT customer_id, COUNT(order_id) AS count FROM orders GROUP BY customer_id ORDER BY count DESC LIMIT 15 """)
- Customers with Most Closed Orders
-
DataFrame API:
results = orders_df.filter("order_status = 'CLOSED'") \ .groupBy("customer_id").count().sort("count", ascending=False)
-
Spark SQL
results = spark.sql(""" SELECT customer_id, COUNT(order_id) AS count FROM orders WHERE order_status = 'CLOSED' GROUP BY customer_id ORDER BY count DESC """)
Actions, Transformations, and Utility Functions
- Actions: count, show, head, tail, collect
- Transformations: groupBy.count, orderBy, filter, distinct, join
- Utility Functions: printSchema, createOrReplaceGlobalTempView
Schema Inference vs Explicit Schema
When reading data, Spark can automatically infer the schema (data types and column names) by scanning the input file. This is convenient, but not always reliable or efficient.
1. Schema Inference (Automatic)
By default, Spark tries to infer the schema if you set inferSchema
to true
:
df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/documents/orders.csv")
df.printSchema()
Drawbacks of Schema Inference:
- Incorrect Data Types: Spark may misinterpret column types (e.g., reading IDs as integers when they should be strings).
- Performance Overhead: Inferring schema requires scanning the entire dataset, which can be slow for large files.
- Inconsistent Results: Schema inference can yield different results if the data is inconsistent or contains malformed records.
2. Explicit Schema Definition (Recommended)
To avoid these issues, it’s best to define the schema explicitly. There are two main approaches:
A. String Schema
You can specify the schema as a string:
orders_schema = 'order_id long, order_date date, cust_id long, ord_status string'
df = spark.read \
.format("csv") \
.schema(orders_schema) \
.option("header", "true") \
.load("/documents/orders.csv")
df.printSchema()
B. StructType Schema (More Flexible & Recommended)
For more control and clarity, use StructType
and StructField
from pyspark.sql.types
:
from pyspark.sql.types import StructType, StructField, LongType, DateType, IntegerType, StringType
orders_schema_struct = StructType([
StructField("order_id", LongType()),
StructField("order_date", DateType()),
StructField("cust_id", LongType()),
StructField("ord_status", StringType())
])
df = spark.read \
.format("csv") \
.schema(orders_schema_struct) \
.option("header", "true") \
.load("/documents/orders.csv")
df.printSchema()
Benefits of Explicit Schema:
- Guarantees correct data types and column names.
- Avoids the overhead of scanning data for inference.
- Makes your ETL pipelines more robust and predictable.
DataFrame Read Modes
- permissive (default): Malformed records become NULL.
- failfast: Errors out on malformed records.
- dropMalformed: Drops malformed records.
Creating DataFrames from RDDs
-
Approach 1:
spark.createDataFrame(rdd, schema)
or
spark.createDataFrame(rdd).toDF(*list_of_column_names)
-
Approach 2:
rdd.toDF(schema)
Dataframe Transformations
Transformations | Description | Syntax |
---|---|---|
1. withColumn | To add a new Column or change existing column | df2 = df1.select("<list-of-column-names>", expr("<expression>")) or df2 = df1.selectExpr("<list-of-column-names-and-expressions>") |
2. withColumnRenamed | To rename an existing Column | df2 = df1.withColumnRenamed("<existing-column-names>", "<new-column-name>") |
3. drop | To drop a Column | df2 = df1.drop("<list-of-column-names>") |
Note:
- In case of “select” we will have to explicitly segregate the column names and expressions and mention the expressions used within an expr.
- In case of “selectExpr”, it automatically identifies whether the value passed is a column name or an expression and accordingly actions it.
Removing Duplicates
df2 = df1.distinct()
— removes duplicates across all columns.df2 = df1.dropDuplicates(["col1", "col2"])
— removes duplicates based on specific columns.
What’s Next? (Coming in Part 3)
Stay tuned for the next part of this series, where we’ll unlock even more advanced Spark concepts and practical tips, including:
- Joins in Spark: Deep dive into different types of joins (inner, left, right, outer), how Spark executes them, and when to use broadcast joins for optimal performance on large and small datasets.
- Repartition vs Coalesce: Understand the differences between these two powerful DataFrame operations, and learn how to efficiently manage data distribution and optimize resource usage in your Spark jobs.
- Spark UI & Resource Manager (YARN): Get hands-on with Spark’s web UI to monitor, debug, and optimize your jobs. Learn how YARN manages resources and how to interpret job metrics for better performance tuning.
- Caching Strategies: Discover when and how to cache RDDs, DataFrames, and tables to speed up iterative computations and avoid redundant processing.
- File Formats Explained: Compare row-based formats like CSV with column-based formats like Parquet. Learn the trade-offs, best use cases, and how file format choices impact performance and storage.