Skip to main content

DataFrame

# Copy pyspark.sql.dataframe.DataFrame to another dataframe
df_copy = df.alias('df')

# Subtract two pyspark.sql.dataframe.DataFrame
diff_df = df.subtract(df_copy)
diff_df.display()

result_df = df.where(df["id"].isin(['5edc8f7d-0036-4910-84c4-48d46f7eeb04']))
result_df.display()
result_df.head()

# group by
df1.groupBy(F.date_format('updatedAt','yyyy-MM-dd').alias('day')).count().display()

# filter
df2 = df1.filter((df1.updatedAt >= "2023-04-05"))
df3 = df2.filter(df.amount.isNotNull())

# select
df1.select('amount').groupby('amount').count().display()

# group by count, order by count desc
from pyspark.sql.functions import desc

dfFilter.sort(desc(df.groupby('DepositTransactionId').count())).display()

# dropDuplicates
df = df.dropDuplicates()

CAST

# cast columns to different types
from pyspark.sql.functions import col

from pyspark.sql.types import DateType, LongType, DoubleType, IntegerType, BooleanType

df = df.withColumn("col_name", col("col_name").cast(IntegerType())) \
.withColumn("col_name2", col("col_name2").cast(IntegerType())) \
.withColumn("col_name3", col("col_name3").cast(BooleanType()))

Spark DataFrame vs Pandas DataFrame

Spark DataFramePandas DataFrame
Spark DataFrame supports parallelization.Pandas DataFrame does not support parallelization.
Spark DataFrame has Multiple Nodes.Pandas DataFrame has a Single Node.
It follows Lazy Execution which means that a task is not executed until an action is performed.It follows Eager Execution, which means task is executed immediately.
Spark DataFrame is Immutable.Pandas DataFrame is Mutable.
Complex operations are difficult to perform as compared to Pandas DataFrame.Complex operations are easier to perform as compared to Spark DataFrame.
Spark DataFrame is distributed and hence processing in the Spark DataFrame is faster for a large amount of data.Pandas DataFrame is not distributed and hence processing in the Pandas DataFrame will be slower for a large amount of data.
sparkDataFrame.count() returns the number of rows.pandasDataFrame.count() returns the number of non NA/null observations for each column.
Spark DataFrames are excellent for building a scalable application.Pandas DataFrames can’t be used to build a scalable application.
Spark DataFrame assures fault tolerance.Pandas DataFrame does not assure fault tolerance. We need to implement our own framework to assure it.

Difference Between Spark DataFrame and Pandas DataFrame - GeeksforGeeks

Pandas vs PySpark DataFrame With Examples - Spark By Examples

Spark collect

PySpark RDD/DataFrame collect() is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group() e.t.c. Retrieving larger datasets results in OutOfMemory error.

When to avoid Collect()

Usually, collect() is used to retrieve the action output when you have very small result set and calling collect() on an RDD/DataFrame with a bigger result set causes out of memory as it returns the entire dataset (from all workers) to the driver hence we should avoid calling collect() on a larger dataset.

collect () vs select ()

select() is a transformation that returns a new DataFrame and holds the columns that are selected whereas collect() is an action that returns the entire data set in an Array to the driver.

PySpark Collect() - Retrieve data from DataFrame - Spark By Examples

Writing Data - .saveAsTable() vs .writeTo()

Using .saveAsTable()

This is the classic Spark SQL method. It’s been around for a long time and works with Hive tables, Parquet tables, and modern catalogs like Iceberg.

df.write.mode("overwrite").format("iceberg").saveAsTable("iceberg_jdbc.default.people_data_iceberg")

This line of code:

  • Writes a DataFrame to the table people_data_iceberg
  • Uses the mode "overwrite" to replace existing data

Supports Iceberg, Hive, or any table with a registered catalog

But there’s a catch — if you use mode("overwrite") without caution, Spark will drop and recreate the entire table, deleting all partitions.

The Partition Overwrite Problem

Imagine you have this Iceberg table:

Article content

Now, you only want to replace records for India.

If you do this:

india_df = df.filter("country = 'India'")
india_df.write.mode("overwrite").format("iceberg").saveAsTable("iceberg_jdbc.default.people_data_iceberg")

Boom — Spark overwrites the entire table unless you enable dynamic partition overwrite.

The Workaround — Dynamic Partition Overwrite

To safely overwrite only specific partitions, you can enable a Spark config:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

Then run:

india_df.write.mode("overwrite").format("iceberg").saveAsTable("iceberg_jdbc.default.people_data_iceberg")

Enter .writeTo(): The Modern API

Starting from Spark 3.1+, a new, safer, and more expressive API was introduced — .writeTo().

Here’s how it looks:

india_df.writeTo("iceberg_jdbc.default.people_data_iceberg").overwritePartitions()

This achieves the same goal — only the India partition is replaced — but with no extra configuration needed.

Now Spark will:

  • Detect the partitions present in the incoming DataFrame (country='India')
  • Overwrite only those partitions
  • Leave others (USA, Brazil) untouched

Without this config, you risk erasing your whole table.

Final Thoughts

While .saveAsTable() remains useful and backward-compatible, it’s a legacy approach designed before modern table formats existed. On the other hand, .writeTo() is built for atomic, schema-aware, and partition-safe writes — especially for Iceberg, Delta, and Hudi tables.

Use .writeTo() when

  • Working with Iceberg or Delta
  • You want fine-grained control (create, replace, overwritePartitions)
  • You care about explicit and safe write semantics

Use .saveAsTable() when

  • You’re integrating with legacy Hive tables
  • You already have a global partitionOverwriteMode set

saveAsTable() vs .writeTo() in Apache Spark: The Subtle but Powerful Difference