Lecture 6

PySpark Basics

Byeong-Hak Choe

SUNY Geneseo

February 10, 2025

PySpark Basics

PySpark Basics

Learning Objectives

  • Loading DataFrame with Spark’s read API (analogous to read_csv())
  • Getting a Summary with printSchema() and describe()
  • Selecting and Reordering Variables with select()
  • Counting Values with groupBy().count(), countDistinct(), and count()
  • Sorting with orderBy()
  • Adding a New Variable with withColumn()
  • Removing a Variable with drop()
  • Renaming a Variable with withColumnRenamed()
  • Aggregation and Math Operations with selectExpr()

PySpark Basics

Learning Objectives

  • Converting Data Types with cast()
  • Filtering Observations with filter()
  • Dealing with Missing Values (na.drop(), na.fill(), etc.)
  • Dealing with Duplicates (dropDuplicates())
  • Grouping DataFrames with groupBy().agg()

Loading Data

Spark DataFrame

  • In PySpark, a DataFrame is a distributed collection of data organized into named columns.

  • Unlike Pandas DataFrame, a Spark DataFrame is evaluated lazily: many transformations are “planned” but not executed until an action (e.g., count(), collect()) triggers a computation on the cluster.

  • No dedicated row index is maintained like in Pandas; rows are conceptually identified by their values, not by a numeric position.

The SparkSession Entry Point

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
  • The SparkSession entry point provides the functionality for data transformation with data frames and SQL.

  • from pyspark.sql import SparkSession

    • Imports the SparkSession class from PySpark’s SQL module.
  • SparkSession.builder

    • Begins the configuration of a new SparkSession.
  • .master("local[*]")

    • Configures Spark to run locally using all available CPU cores.
  • .getOrCreate()

    • Retrieves an existing SparkSession if one exists, or creates a new one otherwise.

Reading a CSV file into the Spark Framework

path = '/content/drive/MyDrive/lecture-data/cces.csv'
df = spark.read.csv(path, 
                    inferSchema=True,
                    header=True)
df.show()  # Displays the first 20 rows
  • spark.read.csv(path, ...):
    • Read a CSV file from the location specified by the path variable.
  • inferSchema=True:
    • When set to True, Spark will automatically detect (or “infer”) the data types of the columns in the CSV file.
    • Without this, Spark would treat all columns as strings by default.
  • header=True:
    • The first row of the CSV file contains the column headers, and will be used as names of the columns.

Reading a CSV file into the Spark Framework

path = 'https://bcdanl.github.io/data/df.csv'
df = spark.read.csv(path, 
                    inferSchema=True,
                    header=True)
df.show()
  • Spark’s spark.read.csv() function relies on the Hadoop FileSystem API to access files.
  • By default, Hadoop does not support reading files directly from HTTPS URLs.
    • It expects a local file system path, HDFS path, or another supported distributed file system.

Reading a Web CSV file into the Spark Framework

  • What should we do then?
    • We can convert the Pandas DataFrame to a Spark DataFrame
    • df = spark.createDataFrame(df_pd)
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
df_pd = pd.read_csv('https://bcdanl.github.io/data/nba.csv')
df = spark.createDataFrame(df_pd)

Getting a Summary of Data

Spark DataFrame Overview Methods

  • df.printSchema(): prints the schema (column names and data types).
  • df.columns: returns the list of columns.
  • df.dtypes: returns a list of tuples (columnName, dataType).
  • df.count(): returns the total number of rows.
  • df.describe(): returns basic statistics of numerical/string columns (mean, count, std, min, max).

Displaying the First Rows with df.show()

Three Optional Parameters

df.show(5)
df.show(truncate = False)
df.show(truncate = 3)
df.show(vertical = True)
  • By default, displays the first 20 rows.

  • df.show(n, truncate, vertical) accepts three optional parameters:

    1. n (int): Number of rows to display.
    • Example: df.show(5)

Displaying the First Rows with df.show()

Three Optional Parameters

  1. truncate (bool or int):
  • If True (default), long strings are truncated to 20 characters.
  • If False, displays full column contents.
  • If an integer is provided, it specifies the maximum number of characters to display.
  • Example: df.show(truncate=False), df.show(truncate=30)

Displaying the First Rows with df.show()

Three Optional Parameters

  1. vertical (bool):
  • If True, displays each row vertically (useful for wide tables).
  • Example: df.show(vertical=True)

Displaying Variable Information with df.printSchema(), df.dtypes, and df.columns

df.printSchema()
df.dtypes
df.columns
  • df.printSchema() prints the DataFrame schema in a tree format.

    • nullable = true means that a column can contain null values.
    • We may need to handle missing data appropriately.
  • df.dtypes returns a list of tuples representing each column’s name and data type.

  • df.columns returns a list of colunm names.

Generating Descriptive Statistics with df.describe().show()

df.describe().show()
  • df.describe() computes summary statistics (e.g., count, mean, stddev, min, max) for the DataFrame’s numeric columns.
  • .show() prints these statistics in a readable table format.

Selecting Columns

Selecting a Column by its Name

# Single column -> returns a DataFrame with one column
df.select("Name").show(5)

# Multiple columns -> pass a list-like of column names
df.select("Name", "Team", "Salary").show(5)
  • In PySpark, we use select() to choose variables.
  • It returns a new DataFrame projection of the specified variables.

Counting Methods

Counting Observations / Distinct Values

# Counting how many total rows
nba_count = df.count()

# Count distinct values in one column
from pyspark.sql.functions import countDistinct
num_teams = df.select(countDistinct("Team")).collect()[0][0]

# GroupBy a column and count occurrences
df.groupBy("Team").count().show(5)
  • df.count() returns the number of rows in df.

  • Unlike Pandas, there is no direct .value_counts() or .nunique() in PySpark.

    • However, we can replicate these operations using Spark’s aggregations (groupBy().count(), countDistinct, etc.).

PySpark Basics

Let’s do Questions 1-3 in Classwork 5!

Sorting Methods

Sorting by One or More Variables

# Sort by a single column ascending
df.orderBy("Name").show(5)

# Sort by descending
from pyspark.sql.functions import desc
df.orderBy(desc("Salary")).show(5)

# Sort by multiple columns
df.orderBy(["Team", desc("Salary")]).show(5)
  • orderBy() can accept column names and ascending/descending instructions.

Equivalent of Pandas nsmallest or nlargest

# nsmallest example:
df.orderBy("Salary").limit(5).show()

# nlargest example:
df.orderBy(desc("Salary")).limit(5).show()
  • Spark does not have nsmallest() or nlargest(), but we can use limit() after sorting.

Indexing and Row Access

Row-Based Access in PySpark

  • Unlike Pandas, PySpark DataFrames do not use a row-based index, so there is no direct .loc[] or .iloc[].
  • We typically filter rows by conditions (using .filter()) or use transformations (limit(), take(), collect()) to access row data.
# Example: filter by condition
df.filter("Team == 'New York Knicks'").show()
df.limit(5).show()
df.take(5)
df.collect()
  • To get the first n rows, use df.limit(n) or df.take(n), which returns a list of Row objects.
  • df.collect(): Returns all the records as a list of Row.

Adding, Removing, Renaming, and Relocating Variables

Adding, Removing, Renaming, and Relocating Variables

Adding Columns with withColumn()

# Add a column "Salary_k" using a column expression col()
df = df.withColumn("Salary_k", col("Salary") / 1000) 

Removing Columns with drop()

df = df.drop("Salary_k")  # remove a single column
df = df.drop("Salary_2x", "Salary_3x")  # remove multiple columns

Renaming Columns with withColumnRenamed()

df = df.withColumnRenamed("Birthday", "DateOfBirth")

Rearranging Columns

df = df.select("Name", "Team", "Position", "Salary", "DateOfBirth")

Mathematical & Vectorized Operations

Aggregations & Math Methods

# Summaries for numeric columns
df.selectExpr(
    "mean(Salary) as mean_salary",
    "min(Salary) as min_salary",
    "max(Salary) as max_salary",
    "stddev_pop(Salary) as std_salary"
).show()
  • Spark has many SQL functions that can be used with selectExpr().

Creating or Transforming Columns

from pyspark.sql import functions as F
# Pre-compute the average salary (pulls it back as a Python float)
salary_mean = df.select(F.avg("Salary").alias("mean_salary")).collect()[0]["mean_salary"]

df2 = (
    df
    .withColumn("Salary_2x", F.col("Salary") * 2)    # Add Salary_2x
    .withColumn(
        "Name_w_Position",           # Concatenate Name and Position
        F.concat(F.col("Name"), F.lit(" ("), F.col("Position"), F.lit(")")))
    .withColumn(
        "Salary_minus_Mean",        # Subtract mean salary
        F.col("Salary") - F.lit(salary_mean))
)
  • All transformations in Spark are “lazy” until an action (show(), collect(), count()) is called.
  • .alias() method is a way to give a temporary (or alternate) name to the column.

Converting Data Types with the cast() Method

Converting Data Types with the cast() Method

  • Spark columns can be cast to other data types using cast():
# Convert Salary to integer
df = df.withColumn("Salary_int", col("Salary").cast("int"))

Converting Data Types with the to_date() Method

  • to_date() can be used with a given string format (e.g., “M/d/yy”)
# Convert to date or timestamp
from pyspark.sql.functions import to_date

# To have 19xx years, not 20xx ones.
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY") 

# Casting the "Birthday" column to a date type
df = df.withColumn("DateOfBirth_ts", to_date("Birthday", "M/d/yy"))

Converting Data Types

  • Integers
    • ByteType — byte (8-bit)
    • ShortType — short (16-bit)
    • IntegerType — int (32-bit)
    • LongType — long (64-bit)
  • Floating points
    • FloatType — float (32-bit floating point)
    • DoubleType — double (64-bit floating point)
    • DecimalType — decimal (Arbitrary precision numeric type)
  • StringType — string (Text data)
  • BooleanType — boolean (Boolean values (True/False))
  • DateType — date (Represents a date (year, month, day))
  • TimestampType — timestamp (Represents a timestamp (date and time))

PySpark Basics

Filtering by a Condition

Filtering Rows by a Condition

import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
df_pd = pd.read_csv("https://bcdanl.github.io/data/employment.csv")
df_pd = df_pd.where(pd.notnull(df_pd), None)  # Convert NaN to None
df = spark.createDataFrame(df_pd)

df.filter(col("Salary") > 100000).show()

Filtering by a Condition

df.filter(
    ( col("Team") == "Finance" ) & 
    ( col("Salary") >= 100000 )
).show()
df.filter(
    (col("Team") == "Finance") | 
    (col("Team") == "Legal")   | 
    (col("Team") == "Sales")
).show()
  • We could combine multiple separate Boolean conditions with logical operators.

Filtering with the isin() method

# Checking membership with "isin"
df.filter(col("Team").isin("Finance", "Legal", "Sales")).show()
  • A better solution is the isin() method.

Filtering with the between() method

df_between = df.filter(col("Salary").between(90000, 100000))
df_between.show()
  • It returns a Boolean Series where True denotes that an observation’s value falls between the specified interval.

PySpark Basics

Let’s do Questions 2-6 in Classwork 6!

Dealing with Missing Values

Checking for Missing Values: The isNull() and isNotNull() methods

  • In PySpark, missing values often appear as null.
# Count how many null values in a given column
df.filter(col("Team").isNull()).count()

# Similarly, you can filter non-null
df.filter(col("Team").isNotNull()).count()

Dropping Rows with Null Values: The .na.drop() method

# Drop any row that has a null value in any column
df_drop_any = df.na.drop()
  • The .na.drop() method removes observations that hold any NULL values.

Dropping Rows with Null Values: The .na.drop() method with how

# Drop rows that have all columns null
df_drop_all = df.na.drop(how="all")
  • We can pass the how parameter an argument of "all" to remove observations in which all values are missing.

  • Note that the how parameter’s default argument is "any".

Dropping Rows with Null Values: The .na.drop() method with subset

# Drop rows with nulls in specific columns:
df_drop_subset = df.na.drop(subset=["Gender", "Team"])
  • We can use the subset parameter to target observations with a missing value in a specific variable.
    • The above example removes observations that have a missing value in the Gender and Team variables.

Filling Null Values: The .na.fill() method

# Fill a specific column’s nulls with 0
df_fill = (
  df.na
  .fill(value = 0, 
        subset = ["Salary"])
)
  • We can specify value and subset parameters to fill a specific column’s NULLs with a specific value
# Fill multiple columns with a dictionary
df_fill_multi = (
  df.na
  .fill({"Salary": 0, 
           "Team": "Unknown"})
)
  • We can fill multiple columns’ NULLs with a dictionary.

Dealing with Duplicates

Removing Duplicates with distinct()

  • In PySpark, the distinct() method returns a new DataFrame with duplicate rows removed.
  • It is similar to the SQL SELECT DISTINCT command.
df.select("Team", "Position").distinct().show()
  • After applying distinct(), only unique observation remain.

Dealing with Duplicates with the dropDuplicates() method

  • Missing values are a common occurrence in messy data sets, and so are duplicate values.
# Drop all rows that are exact duplicates across all columns
df_no_dups = df.dropDuplicates()

# Drop duplicates based on subset of columns
df_no_dups_subset = df.dropDuplicates(["Team"])
  • By default, dropDuplicates() keeps the first occurrence of each distinct combination.

PySpark Basics

Let’s do Questions 7-8 in Classwork 6!

Group Operations

Group Operations

  • In PySpark, we use groupBy() (similar to Pandas’ groupby()) to aggregate, analyze, or transform data at a grouped level.
  • A GroupedData object is returned, which can then be used with aggregation methods such as sum(), avg(), count(), etc.

Creating a groupBy Object

  • We can create a Spark DataFrame from a list (or other data sources like CSV, Parquet, etc.).
  • Then call groupBy("Type") on the DataFrame.
food_data = [
    ("Apple", "Fruit", 1.05),
    ("Onion", "Vegie", 1.00),
    ("Orange", "Fruit", 1.25),
    ("Tomato", "Vegie", 0.85),
    ("Watermelon", "Fruit", 4.15)
]

food_df = spark.createDataFrame(food_data, ["Item", "Type", "Price"])

# Group by "Type"
groups = food_df.groupBy("Type")
  • In this example, there are two types of items: “Fruit” and “Vegie”.

Aggregation on Groups

# Calculate the average Price for each Type
groups.avg("Price").show()

# Calculate the sum of the Price for each Type
groups.sum("Price").show()

# Count how many rows in each Type
groups.count().show()
  • These group-based operations are executed once an action (like .show()) is called.

Group Aggregation with Multiple Columns

from pyspark.sql.functions import min, max, mean

food_df.groupBy("Type").agg(
    min("Price").alias("min_price"),
    max("Price").alias("max_price"),
    mean("Price").alias("mean_price")
).show()
  • We can pass multiple aggregations to .agg() to get multiple results at once.

Adding Group-Level Statistics to Each Row

  • In Pandas, .transform() is often used to add group-level statistics back onto the original DataFrame.
  • In PySpark, we typically use a Window function with the aggregated DataFrame.
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col

# Define a window partitioned by "Type"
w = Window.partitionBy("Type")

food_df_with_mean = food_df.withColumn(
    "mean_price_by_type",
    avg(col("Price")).over(w)
)
food_df_with_mean.show()
  • This keeps each original row, adding the group-level mean price for its corresponding Type.

PySpark Basics

Let’s do Classwork 7!