DataFrame
- PySpark - Create an empty DataFrame
- PySpark - Convert RDD to DataFrame
- PySpark - Convert DataFrame to Pandas
- PySpark - show()
- PySpark - StructType & StructField
- PySpark - Column Class
- PySpark - select()
- PySpark - collect()
- PySpark - withColumn()
- PySpark - withColumnRenamed()
- PySpark - where() & filter()
- PySpark - drop() & dropDuplicates()
- PySpark - orderBy() and sort()
- PySpark - groupBy()
- PySpark - join()
- PySpark - union() & unionAll()
- PySpark - unionByName()
- PySpark - UDF (User Defined Function)
- PySpark - transform()
- PySpark - apply()
- PySpark - map()
- PySpark - flatMap()
- PySpark - foreach()
- PySpark - sample() vs sampleBy()
- PySpark - fillna() & fill()
- PySpark - pivot() (Row to Column)
- PySpark - partitionBy()
- PySpark - MapType (Map/Dict)
# Copy pyspark.sql.dataframe.DataFrame to another dataframe
df_copy = df.alias('df')
# Subtract two pyspark.sql.dataframe.DataFrame
diff_df = df.subtract(df_copy)
diff_df.display()
result_df = df.where(df["id"].isin(['5edc8f7d-0036-4910-84c4-48d46f7eeb04']))
result_df.display()
result_df.head()
# group by
df1.groupBy(F.date_format('updatedAt','yyyy-MM-dd').alias('day')).count().display()
# filter
df2 = df1.filter((df1.updatedAt >= "2023-04-05"))
df3 = df2.filter(df.amount.isNotNull())
# select
df1.select('amount').groupby('amount').count().display()
# group by count, order by count desc
from pyspark.sql.functions import desc
dfFilter.sort(desc(df.groupby('DepositTransactionId').count())).display()
# dropDuplicates
df = df.dropDuplicates()
CAST
# cast columns to different types
from pyspark.sql.functions import col
from pyspark.sql.types import DateType, LongType, DoubleType, IntegerType, BooleanType
df = df.withColumn("col_name", col("col_name").cast(IntegerType())) \
.withColumn("col_name2", col("col_name2").cast(IntegerType())) \
.withColumn("col_name3", col("col_name3").cast(BooleanType()))
Spark DataFrame vs Pandas DataFrame
| Spark DataFrame | Pandas DataFrame |
|---|---|
| Spark DataFrame supports parallelization. | Pandas DataFrame does not support parallelization. |
| Spark DataFrame has Multiple Nodes. | Pandas DataFrame has a Single Node. |
| It follows Lazy Execution which means that a task is not executed until an action is performed. | It follows Eager Execution, which means task is executed immediately. |
| Spark DataFrame is Immutable. | Pandas DataFrame is Mutable. |
| Complex operations are difficult to perform as compared to Pandas DataFrame. | Complex operations are easier to perform as compared to Spark DataFrame. |
| Spark DataFrame is distributed and hence processing in the Spark DataFrame is faster for a large amount of data. | Pandas DataFrame is not distributed and hence processing in the Pandas DataFrame will be slower for a large amount of data. |
| sparkDataFrame.count() returns the number of rows. | pandasDataFrame.count() returns the number of non NA/null observations for each column. |
| Spark DataFrames are excellent for building a scalable application. | Pandas DataFrames can’t be used to build a scalable application. |
| Spark DataFrame assures fault tolerance. | Pandas DataFrame does not assure fault tolerance. We need to implement our own framework to assure it. |
Difference Between Spark DataFrame and Pandas DataFrame - GeeksforGeeks
Pandas vs PySpark DataFrame With Examples - Spark By Examples
Spark collect
PySpark RDD/DataFrame collect() is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group() e.t.c. Retrieving larger datasets results in OutOfMemory error.
When to avoid Collect()
Usually, collect() is used to retrieve the action output when you have very small result set and calling collect() on an RDD/DataFrame with a bigger result set causes out of memory as it returns the entire dataset (from all workers) to the driver hence we should avoid calling collect() on a larger dataset.
collect () vs select ()
select() is a transformation that returns a new DataFrame and holds the columns that are selected whereas collect() is an action that returns the entire data set in an Array to the driver.
PySpark Collect() - Retrieve data from DataFrame - Spark By Examples
Writing Data - .saveAsTable() vs .writeTo()
Using .saveAsTable()
This is the classic Spark SQL method. It’s been around for a long time and works with Hive tables, Parquet tables, and modern catalogs like Iceberg.
df.write.mode("overwrite").format("iceberg").saveAsTable("iceberg_jdbc.default.people_data_iceberg")
This line of code:
- Writes a DataFrame to the table people_data_iceberg
- Uses the mode "overwrite" to replace existing data
Supports Iceberg, Hive, or any table with a registered catalog
But there’s a catch — if you use mode("overwrite") without caution, Spark will drop and recreate the entire table, deleting all partitions.
The Partition Overwrite Problem
Imagine you have this Iceberg table:
Now, you only want to replace records for India.
If you do this:
india_df = df.filter("country = 'India'")
india_df.write.mode("overwrite").format("iceberg").saveAsTable("iceberg_jdbc.default.people_data_iceberg")
Boom — Spark overwrites the entire table unless you enable dynamic partition overwrite.
The Workaround — Dynamic Partition Overwrite
To safely overwrite only specific partitions, you can enable a Spark config:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
Then run:
india_df.write.mode("overwrite").format("iceberg").saveAsTable("iceberg_jdbc.default.people_data_iceberg")
Enter .writeTo(): The Modern API
Starting from Spark 3.1+, a new, safer, and more expressive API was introduced — .writeTo().
Here’s how it looks:
india_df.writeTo("iceberg_jdbc.default.people_data_iceberg").overwritePartitions()
This achieves the same goal — only the India partition is replaced — but with no extra configuration needed.
Now Spark will:
- Detect the partitions present in the incoming DataFrame (country='India')
- Overwrite only those partitions
- Leave others (USA, Brazil) untouched
Without this config, you risk erasing your whole table.
Final Thoughts
While .saveAsTable() remains useful and backward-compatible, it’s a legacy approach designed before modern table formats existed. On the other hand, .writeTo() is built for atomic, schema-aware, and partition-safe writes — especially for Iceberg, Delta, and Hudi tables.
Use .writeTo() when
- Working with Iceberg or Delta
- You want fine-grained control (create, replace, overwritePartitions)
- You care about explicit and safe write semantics
Use .saveAsTable() when
- You’re integrating with legacy Hive tables
- You already have a global partitionOverwriteMode set
saveAsTable() vs .writeTo() in Apache Spark: The Subtle but Powerful Difference