Classwork 8

Linear Regression I

Author

Byeong-Hak Choe

Published

February 24, 2025

Modified

February 24, 2025

Setup for PySpark, UDFs, and Plots

Required Libraries and SparkSession Entry Point

import pandas as pd
import numpy as np
from tabulate import tabulate  # for table summary
import scipy.stats as stats
import matplotlib.pyplot as plt
import statsmodels.api as sm  # for lowess smoothing

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, col, pow, mean, when, log
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

spark = SparkSession.builder.master("local[*]").getOrCreate()

UDF for Regression Tables

def regression_table(model, assembler):
    """
    Creates a formatted regression table from a fitted LinearRegression model and its VectorAssembler,
    and inserts a dashed horizontal line after the Intercept row. The table includes separate columns
    for the 95% confidence interval lower and upper bounds for each coefficient (computed at the 5% significance level)
    and an "Observations" row (using model.summary.numInstances) above the R² row.
    The RMSE row is placed as the last row.

    The columns are ordered as:
        Metric | Value | Significance | Std. Error | p-value | 95% CI Lower | 95% CI Upper

    For the "Value", "Std. Error", "95% CI Lower", and "95% CI Upper" columns, commas are inserted every three digits,
    with 3 decimal places (except for Observations which is formatted as an integer with commas).

    Parameters:
        model: A fitted LinearRegression model (with a .summary attribute).
        assembler: The VectorAssembler used to assemble the features for the model.

    Returns:
        A formatted string containing the regression table.
    """
    # Extract coefficients and standard errors as NumPy arrays
    coeffs = model.coefficients.toArray()
    std_errors_all = np.array(model.summary.coefficientStandardErrors)

    # Check if the intercept's standard error is included (one extra element)
    if len(std_errors_all) == len(coeffs) + 1:
        intercept_se = std_errors_all[0]
        std_errors = std_errors_all[1:]
    else:
        intercept_se = None
        std_errors = std_errors_all

    # Compute t-statistics for feature coefficients (t = beta / SE(beta))
    # t_stats = coeffs / std_errors
    t_stats = model.summary.tValues

    # Degrees of freedom: number of instances minus number of predictors minus 1 (for intercept)
    df = model.summary.numInstances - len(coeffs) - 1

    # Compute the t-critical value for a 95% confidence interval (two-tailed, 5% significance)
    t_critical = stats.t.ppf(0.975, df)

    # Compute two-tailed p-values for each feature coefficient
    # p_values = [2 * (1 - stats.t.cdf(np.abs(t), df)) for t in t_stats]
    p_values = model.summary.pValues

    # Function to assign significance stars based on p-value
    def significance_stars(p):
        if p < 0.01:
            return "***"
        elif p < 0.05:
            return "**"
        elif p < 0.1:
            return "*"
        else:
            return ""

    # Build the table rows.
    # Order: Metric, Value, Significance, Std. Error, p-value, 95% CI Lower, 95% CI Upper.
    table = []
    for feature, beta, se, p in zip(assembler.getInputCols(), coeffs, std_errors, p_values):
        ci_lower = beta - t_critical * se
        ci_upper = beta + t_critical * se
        table.append([
            "Beta: " + feature,       # Metric name
            beta,                     # Beta estimate (Value)
            significance_stars(p),    # Significance stars
            se,                       # Standard error
            p,                        # p-value
            ci_lower,                 # 95% CI lower bound
            ci_upper                  # 95% CI upper bound
        ])

    # Compute and add the intercept row with its SE, p-value, significance, and CI (if available)
    if intercept_se is not None:
        intercept_t = model.intercept / intercept_se
        intercept_p = 2 * (1 - stats.t.cdf(np.abs(intercept_t), df))
        intercept_sig = significance_stars(intercept_p)
        ci_intercept_lower = model.intercept - t_critical * intercept_se
        ci_intercept_upper = model.intercept + t_critical * intercept_se
    else:
        intercept_se = ""
        intercept_p = ""
        intercept_sig = ""
        ci_intercept_lower = ""
        ci_intercept_upper = ""

    table.append([
        "Intercept",
        model.intercept,
        intercept_sig,
        intercept_se,
        intercept_p,
        ci_intercept_lower,
        ci_intercept_upper
    ])

    # Append overall model metrics:
    # Insert an Observations row using model.summary.numInstances,
    # then an R² row, and finally the RMSE row as the last row.
    table.append(["Observations", model.summary.numInstances, "", "", "", "", ""])
    table.append(["R²", model.summary.r2, "", "", "", "", ""])
    table.append(["RMSE", model.summary.rootMeanSquaredError, "", "", "", "", ""])

    # Format the table.
    # For the "Value" (index 1), "Std. Error" (index 3), "95% CI Lower" (index 5), and "95% CI Upper" (index 6) columns,
    # format with commas and 3 decimal places, except for Observations which should be an integer with commas.
    # For the p-value (index 4), format to 3 decimal places.
    formatted_table = []
    for row in table:
        formatted_row = []
        for i, item in enumerate(row):
            if row[0] == "Observations" and i == 1 and isinstance(item, (int, float, np.floating)) and item != "":
                # Format Observations as integer with commas, no decimals.
                formatted_row.append(f"{int(item):,}")
            elif isinstance(item, (int, float, np.floating)) and item != "":
                if i in [1, 3, 5, 6]:
                    formatted_row.append(f"{item:,.3f}")
                elif i == 4:
                    formatted_row.append(f"{item:.3f}")
                else:
                    formatted_row.append(f"{item:.3f}")
            else:
                formatted_row.append(item)
        formatted_table.append(formatted_row)

    # Generate the table string using tabulate.
    table_str = tabulate(
        formatted_table,
        headers=["Metric", "Value", "Sig.", "Std. Error", "p-value", "95% CI Lower", "95% CI Upper"],
        tablefmt="pretty",
        colalign=("left", "right", "center", "right", "right", "right", "right")
    )

    # Insert a dashed line after the Intercept row for clarity.
    lines = table_str.split("\n")
    dash_line = '-' * len(lines[0])
    for i, line in enumerate(lines):
        if "Intercept" in line and not line.strip().startswith('+'):
            lines.insert(i+1, dash_line)
            break

    return "\n".join(lines)

# Example usage:
# print(regression_table(MODEL, ASSEMBLER))

UDF for Adding Dummy Variables

def add_dummy_variables(var_name, reference_level, category_order=None):
    """
    Creates dummy variables for the specified column in the global DataFrames dtrain and dtest.
    Allows manual setting of category order.

    Parameters:
        var_name (str): The name of the categorical column (e.g., "borough_name").
        reference_level (int): Index of the category to be used as the reference (dummy omitted).
        category_order (list, optional): List of categories in the desired order. If None, categories are sorted.

    Returns:
        dummy_cols (list): List of dummy column names excluding the reference category.
        ref_category (str): The category chosen as the reference.
    """
    global dtrain, dtest

    # Get distinct categories from the training set.
    categories = dtrain.select(var_name).distinct().rdd.flatMap(lambda x: x).collect()

    # Convert booleans to strings if present.
    categories = [str(c) if isinstance(c, bool) else c for c in categories]

    # Use manual category order if provided; otherwise, sort categories.
    if category_order:
        # Ensure all categories are present in the user-defined order
        missing = set(categories) - set(category_order)
        if missing:
            raise ValueError(f"These categories are missing from your custom order: {missing}")
        categories = category_order
    else:
        categories = sorted(categories)

    # Validate reference_level
    if reference_level < 0 or reference_level >= len(categories):
        raise ValueError(f"reference_level must be between 0 and {len(categories) - 1}")

    # Define the reference category
    ref_category = categories[reference_level]
    print("Reference category (dummy omitted):", ref_category)

    # Create dummy variables for all categories
    for cat in categories:
        dummy_col_name = var_name + "_" + str(cat).replace(" ", "_")
        dtrain = dtrain.withColumn(dummy_col_name, when(col(var_name) == cat, 1).otherwise(0))
        dtest = dtest.withColumn(dummy_col_name, when(col(var_name) == cat, 1).otherwise(0))

    # List of dummy columns, excluding the reference category
    dummy_cols = [var_name + "_" + str(cat).replace(" ", "_") for cat in categories if cat != ref_category]

    return dummy_cols, ref_category
    
  
# Example usage without category_order:
# dummy_cols_year, ref_category_year = add_dummy_variables('year', 0)

# Example usage with category_order:
# custom_order_wkday = ['sunday', 'monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday']
# dummy_cols_wkday, ref_category_wkday = add_dummy_variables('wkday', reference_level=0, category_order = custom_order_wkday)


Coefficient Plots

terms = assembler3.getInputCols()
coefs = model3.coefficients.toArray()[:len(terms)]
stdErrs = model3.summary.coefficientStandardErrors[:len(terms)]

df_summary = pd.DataFrame({
    "term": terms,
    "estimate": coefs,
    "std_error": stdErrs
})

# Filter df_summary if needed

# Plot using the DataFrame columns
plt.errorbar(df_summary["term"], df_summary["estimate"],
             yerr = 1.96 * df_summary["std_error"], fmt='o', capsize=5)
plt.xlabel("Terms")
plt.ylabel("Coefficient Estimate")
plt.title("Coefficient Estimates (Model 2)")
plt.axhline(0, color="red", linestyle="--")  # Add horizontal line at 0
plt.xticks(rotation=45)
plt.show()


Residual Plots

# Convert test predictions to Pandas
dfpd = TEST_DATAFRAME.select(["prediction", "Y_VARIABLE"]).toPandas()
dfpd["residual"] = dfpd["Y_VARIABLE"] - dfpd["prediction"]
plt.scatter(dfpd["prediction"], dfpd["residual"], alpha=0.2, color="darkgray")

# Use lowess smoothing for the trend line
smoothed = sm.nonparametric.lowess(dfpd["residual"], dfpd["prediction"])
plt.plot(smoothed[:, 0], smoothed[:, 1], color="darkblue")
plt.axhline(y=0, color="red", linestyle="--")
plt.xlabel("Predicted y (Model)")
plt.ylabel("Residuals")
plt.title("Residual Plot for Model")
plt.show()



Question 1

  • Read the data file, https://bcdanl.github.io/data/bikeshare_cleaned.csv, as the PySpark DataFrame object with the name, bikeshare.
    • What are continuous variables? What are categorical variables?
    • For each continuous variable, provide descriptive statistics.
    • For each categorical variable, provide the frequency of each distinct value’s occurrence.


Variable description

Variable Description
cnt Count of total rental bikes
year Year
month Month
date Date
hr Hour
wkday Weekday
holiday Holiday indicator (1 if holiday, 0 otherwise)
seasons Season
weather_cond Weather condition
temp Temperature (measured in standard deviations from average)
hum Humidity (measured in standard deviations from average)
windspeed Wind speed (measured in standard deviations from average)



Question 2

  • Divide the bikeshare DataFrame into training and test DataFrames.
    • Use dtrain and dtest for training and test DataFrames, respectively.
    • 60% of observations in the bikeshare are assigned to dtrain; the rest is assigned to dtest.



Question 3

Train the following linear regression model. Provide the summary of the regression result.

\[ \begin{align} \text{cnt}_{i} =\ &\beta_{\text{intercept}}\\ &+ \beta_{\text{temp}} \, \text{temp}_{i} + \beta_{\text{hum}} \, \text{hum}_{i} + \beta_{\text{windspeed}} \, \text{windspeed}_{i} \nonumber \\ &+ \beta_{\text{year\_2012}} \, \text{year\_2012}_{i}\\ &+ \beta_{\text{month\_2}} \, \text{month\_2}_{i} + \beta_{\text{month\_3}} \, \text{month\_3}_{i} + \beta_{\text{month\_4}} \, \text{month\_4}_{i} \nonumber \\ &+ \beta_{\text{month\_5}} \, \text{month\_5}_{i} + \beta_{\text{month\_6}} \, \text{month\_6}_{i} + \beta_{\text{month\_7}} \, \text{month\_7}_{i} + \beta_{\text{month\_8}} \, \text{month\_8}_{i} \nonumber \\ &+ \beta_{\text{month\_9}} \, \text{month\_9}_{i} + \beta_{\text{month\_10}} \, \text{month\_10}_{i} + \beta_{\text{month\_11}} \, \text{month\_11}_{i} + \beta_{\text{month\_12}} \, \text{month\_12}_{i} \nonumber \\ &+ \beta_{\text{hr\_1}} \, \text{hr\_1}_{i} + \beta_{\text{hr\_2}} \, \text{hr\_2}_{i} + \beta_{\text{hr\_3}} \, \text{hr\_3}_{i} + \beta_{\text{hr\_4}} \, \text{hr\_4}_{i} \nonumber \\ &+ \beta_{\text{hr\_5}} \, \text{hr\_5}_{i} + \beta_{\text{hr\_6}} \, \text{hr\_6}_{i} + \beta_{\text{hr\_7}} \, \text{hr\_7}_{i} + \beta_{\text{hr\_8}} \, \text{hr\_8}_{i} \nonumber \\ &+ \beta_{\text{hr\_9}} \, \text{hr\_9}_{i} + \beta_{\text{hr\_10}} \, \text{hr\_10}_{i} + \beta_{\text{hr\_11}} \, \text{hr\_11}_{i} + \beta_{\text{hr\_12}} \, \text{hr\_12}_{i} \nonumber \\ &+ \beta_{\text{hr\_13}} \, \text{hr\_13}_{i} + \beta_{\text{hr\_14}} \, \text{hr\_14}_{i} + \beta_{\text{hr\_15}} \, \text{hr\_15}_{i} + \beta_{\text{hr\_16}} \, \text{hr\_16}_{i} \nonumber \\ &+ \beta_{\text{hr\_17}} \, \text{hr\_17}_{i} + \beta_{\text{hr\_18}} \, \text{hr\_18}_{i} + \beta_{\text{hr\_19}} \, \text{hr\_19}_{i} + \beta_{\text{hr\_20}} \, \text{hr\_20}_{i} \nonumber \\ &+ \beta_{\text{hr\_21}} \, \text{hr\_21}_{i} + \beta_{\text{hr\_22}} \, \text{hr\_22}_{i} + \beta_{\text{hr\_23}} \, \text{hr\_23}_{i} \nonumber \\ &+ \beta_{\text{wkday\_monday}} \, \text{wkday\_monday}_{i} + \beta_{\text{wkday\_tuesday}} \, \text{wkday\_tuesday}_{i} + \beta_{\text{wkday\_wednesday}} \, \text{wkday\_wednesday}_{i} \nonumber \\ &+ \beta_{\text{wkday\_thursday}} \, \text{wkday\_thursday}_{i} + \beta_{\text{wkday\_friday}} \, \text{wkday\_friday}_{i} + \beta_{\text{wkday\_saturday}} \, \text{wkday\_saturday}_{i} \nonumber \\ &+ \beta_{\text{holiday\_1}} \, \text{holiday\_1}_{i} \nonumber \\ &+ \beta_{\text{seasons\_summer}} \, \text{seasons\_summer}_{i} + \beta_{\text{seasons\_fall}} \, \text{seasons\_fall}_{i} + \beta_{\text{seasons\_winter}} \, \text{seasons\_winter}_{i} \nonumber \\ &+ \beta_{\text{weather\_cond\_Light\_Snow\_or\_Light\_Rain}} \, \text{weather\_cond\_Light\_Snow\_or\_Light\_Rain}_{i}\nonumber \\ &+ \beta_{\text{weather\_cond\_Mist\_or\_Cloudy}} \, \text{weather\_cond\_Mist\_or\_Cloudy}_{i}\\ &+ \epsilon_{i} \end{align} \]

Note that all predictors are dummy variables, except for temp, hum, and windspeed.



Question 4

Make a prediction on the outcome variable using the test DataFrame and the regression result from Question 3.



Question 5

Interpret the beta estimate of windspeed.



Question 6

  • Which hr is most strongly associated with changes in cnt?
  • Interpret the beta estimate of that hr.



Question 7

  • Draw a coefficient plot for temp, hum and windspeed variables.
  • Draw a coefficient plot for month variables.
  • Draw a coefficient plot for hr variables.
  • Draw a coefficient plot for wkday variables.
  • Draw a coefficient plot for seasons variables.
  • Draw a coefficient plot for weather_cond variables.



Question 8

  • Draw a residual plot.
    • On average, are the predictions correct in the model in Question 3? Are there systematic errors?



Back to top