Classwork 8
Linear Regression I
Setup for PySpark, UDFs, and Plots
Required Libraries and SparkSession
Entry Point
import pandas as pd
import numpy as np
from tabulate import tabulate # for table summary
import scipy.stats as stats
import matplotlib.pyplot as plt
import statsmodels.api as sm # for lowess smoothing
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, col, pow, mean, when, log
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
= SparkSession.builder.master("local[*]").getOrCreate() spark
UDF for Regression Tables
def regression_table(model, assembler):
"""
Creates a formatted regression table from a fitted LinearRegression model and its VectorAssembler,
and inserts a dashed horizontal line after the Intercept row. The table includes separate columns
for the 95% confidence interval lower and upper bounds for each coefficient (computed at the 5% significance level)
and an "Observations" row (using model.summary.numInstances) above the R² row.
The RMSE row is placed as the last row.
The columns are ordered as:
Metric | Value | Significance | Std. Error | p-value | 95% CI Lower | 95% CI Upper
For the "Value", "Std. Error", "95% CI Lower", and "95% CI Upper" columns, commas are inserted every three digits,
with 3 decimal places (except for Observations which is formatted as an integer with commas).
Parameters:
model: A fitted LinearRegression model (with a .summary attribute).
assembler: The VectorAssembler used to assemble the features for the model.
Returns:
A formatted string containing the regression table.
"""
# Extract coefficients and standard errors as NumPy arrays
= model.coefficients.toArray()
coeffs = np.array(model.summary.coefficientStandardErrors)
std_errors_all
# Check if the intercept's standard error is included (one extra element)
if len(std_errors_all) == len(coeffs) + 1:
= std_errors_all[0]
intercept_se = std_errors_all[1:]
std_errors else:
= None
intercept_se = std_errors_all
std_errors
# Compute t-statistics for feature coefficients (t = beta / SE(beta))
# t_stats = coeffs / std_errors
= model.summary.tValues
t_stats
# Degrees of freedom: number of instances minus number of predictors minus 1 (for intercept)
= model.summary.numInstances - len(coeffs) - 1
df
# Compute the t-critical value for a 95% confidence interval (two-tailed, 5% significance)
= stats.t.ppf(0.975, df)
t_critical
# Compute two-tailed p-values for each feature coefficient
# p_values = [2 * (1 - stats.t.cdf(np.abs(t), df)) for t in t_stats]
= model.summary.pValues
p_values
# Function to assign significance stars based on p-value
def significance_stars(p):
if p < 0.01:
return "***"
elif p < 0.05:
return "**"
elif p < 0.1:
return "*"
else:
return ""
# Build the table rows.
# Order: Metric, Value, Significance, Std. Error, p-value, 95% CI Lower, 95% CI Upper.
= []
table for feature, beta, se, p in zip(assembler.getInputCols(), coeffs, std_errors, p_values):
= beta - t_critical * se
ci_lower = beta + t_critical * se
ci_upper
table.append(["Beta: " + feature, # Metric name
# Beta estimate (Value)
beta, # Significance stars
significance_stars(p), # Standard error
se, # p-value
p, # 95% CI lower bound
ci_lower, # 95% CI upper bound
ci_upper
])
# Compute and add the intercept row with its SE, p-value, significance, and CI (if available)
if intercept_se is not None:
= model.intercept / intercept_se
intercept_t = 2 * (1 - stats.t.cdf(np.abs(intercept_t), df))
intercept_p = significance_stars(intercept_p)
intercept_sig = model.intercept - t_critical * intercept_se
ci_intercept_lower = model.intercept + t_critical * intercept_se
ci_intercept_upper else:
= ""
intercept_se = ""
intercept_p = ""
intercept_sig = ""
ci_intercept_lower = ""
ci_intercept_upper
table.append(["Intercept",
model.intercept,
intercept_sig,
intercept_se,
intercept_p,
ci_intercept_lower,
ci_intercept_upper
])
# Append overall model metrics:
# Insert an Observations row using model.summary.numInstances,
# then an R² row, and finally the RMSE row as the last row.
"Observations", model.summary.numInstances, "", "", "", "", ""])
table.append(["R²", model.summary.r2, "", "", "", "", ""])
table.append(["RMSE", model.summary.rootMeanSquaredError, "", "", "", "", ""])
table.append([
# Format the table.
# For the "Value" (index 1), "Std. Error" (index 3), "95% CI Lower" (index 5), and "95% CI Upper" (index 6) columns,
# format with commas and 3 decimal places, except for Observations which should be an integer with commas.
# For the p-value (index 4), format to 3 decimal places.
= []
formatted_table for row in table:
= []
formatted_row for i, item in enumerate(row):
if row[0] == "Observations" and i == 1 and isinstance(item, (int, float, np.floating)) and item != "":
# Format Observations as integer with commas, no decimals.
f"{int(item):,}")
formatted_row.append(elif isinstance(item, (int, float, np.floating)) and item != "":
if i in [1, 3, 5, 6]:
f"{item:,.3f}")
formatted_row.append(elif i == 4:
f"{item:.3f}")
formatted_row.append(else:
f"{item:.3f}")
formatted_row.append(else:
formatted_row.append(item)
formatted_table.append(formatted_row)
# Generate the table string using tabulate.
= tabulate(
table_str
formatted_table,=["Metric", "Value", "Sig.", "Std. Error", "p-value", "95% CI Lower", "95% CI Upper"],
headers="pretty",
tablefmt=("left", "right", "center", "right", "right", "right", "right")
colalign
)
# Insert a dashed line after the Intercept row for clarity.
= table_str.split("\n")
lines = '-' * len(lines[0])
dash_line for i, line in enumerate(lines):
if "Intercept" in line and not line.strip().startswith('+'):
+1, dash_line)
lines.insert(ibreak
return "\n".join(lines)
# Example usage:
# print(regression_table(MODEL, ASSEMBLER))
UDF for Adding Dummy Variables
def add_dummy_variables(var_name, reference_level, category_order=None):
"""
Creates dummy variables for the specified column in the global DataFrames dtrain and dtest.
Allows manual setting of category order.
Parameters:
var_name (str): The name of the categorical column (e.g., "borough_name").
reference_level (int): Index of the category to be used as the reference (dummy omitted).
category_order (list, optional): List of categories in the desired order. If None, categories are sorted.
Returns:
dummy_cols (list): List of dummy column names excluding the reference category.
ref_category (str): The category chosen as the reference.
"""
global dtrain, dtest
# Get distinct categories from the training set.
= dtrain.select(var_name).distinct().rdd.flatMap(lambda x: x).collect()
categories
# Convert booleans to strings if present.
= [str(c) if isinstance(c, bool) else c for c in categories]
categories
# Use manual category order if provided; otherwise, sort categories.
if category_order:
# Ensure all categories are present in the user-defined order
= set(categories) - set(category_order)
missing if missing:
raise ValueError(f"These categories are missing from your custom order: {missing}")
= category_order
categories else:
= sorted(categories)
categories
# Validate reference_level
if reference_level < 0 or reference_level >= len(categories):
raise ValueError(f"reference_level must be between 0 and {len(categories) - 1}")
# Define the reference category
= categories[reference_level]
ref_category print("Reference category (dummy omitted):", ref_category)
# Create dummy variables for all categories
for cat in categories:
= var_name + "_" + str(cat).replace(" ", "_")
dummy_col_name = dtrain.withColumn(dummy_col_name, when(col(var_name) == cat, 1).otherwise(0))
dtrain = dtest.withColumn(dummy_col_name, when(col(var_name) == cat, 1).otherwise(0))
dtest
# List of dummy columns, excluding the reference category
= [var_name + "_" + str(cat).replace(" ", "_") for cat in categories if cat != ref_category]
dummy_cols
return dummy_cols, ref_category
# Example usage without category_order:
# dummy_cols_year, ref_category_year = add_dummy_variables('year', 0)
# Example usage with category_order:
# custom_order_wkday = ['sunday', 'monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday']
# dummy_cols_wkday, ref_category_wkday = add_dummy_variables('wkday', reference_level=0, category_order = custom_order_wkday)
Coefficient Plots
= assembler3.getInputCols()
terms = model3.coefficients.toArray()[:len(terms)]
coefs = model3.summary.coefficientStandardErrors[:len(terms)]
stdErrs
= pd.DataFrame({
df_summary "term": terms,
"estimate": coefs,
"std_error": stdErrs
})
# Filter df_summary if needed
# Plot using the DataFrame columns
"term"], df_summary["estimate"],
plt.errorbar(df_summary[= 1.96 * df_summary["std_error"], fmt='o', capsize=5)
yerr "Terms")
plt.xlabel("Coefficient Estimate")
plt.ylabel("Coefficient Estimates (Model 2)")
plt.title(0, color="red", linestyle="--") # Add horizontal line at 0
plt.axhline(=45)
plt.xticks(rotation plt.show()
Residual Plots
# Convert test predictions to Pandas
= TEST_DATAFRAME.select(["prediction", "Y_VARIABLE"]).toPandas()
dfpd "residual"] = dfpd["Y_VARIABLE"] - dfpd["prediction"]
dfpd["prediction"], dfpd["residual"], alpha=0.2, color="darkgray")
plt.scatter(dfpd[
# Use lowess smoothing for the trend line
= sm.nonparametric.lowess(dfpd["residual"], dfpd["prediction"])
smoothed 0], smoothed[:, 1], color="darkblue")
plt.plot(smoothed[:, =0, color="red", linestyle="--")
plt.axhline(y"Predicted y (Model)")
plt.xlabel("Residuals")
plt.ylabel("Residual Plot for Model")
plt.title( plt.show()
Question 1
- Read the data file,
https://bcdanl.github.io/data/bikeshare_cleaned.csv
, as the PySpark DataFrame object with the name,bikeshare
.- What are continuous variables? What are categorical variables?
- For each continuous variable, provide descriptive statistics.
- For each categorical variable, provide the frequency of each distinct value’s occurrence.
Variable description
Variable | Description |
---|---|
cnt |
Count of total rental bikes |
year |
Year |
month |
Month |
date |
Date |
hr |
Hour |
wkday |
Weekday |
holiday |
Holiday indicator (1 if holiday, 0 otherwise) |
seasons |
Season |
weather_cond |
Weather condition |
temp |
Temperature (measured in standard deviations from average) |
hum |
Humidity (measured in standard deviations from average) |
windspeed |
Wind speed (measured in standard deviations from average) |
Question 2
- Divide the
bikeshare
DataFrame into training and test DataFrames.- Use
dtrain
anddtest
for training and test DataFrames, respectively. - 60% of observations in the
bikeshare
are assigned todtrain
; the rest is assigned todtest
.
- Use
Question 3
Train the following linear regression model. Provide the summary of the regression result.
\[ \begin{align} \text{cnt}_{i} =\ &\beta_{\text{intercept}}\\ &+ \beta_{\text{temp}} \, \text{temp}_{i} + \beta_{\text{hum}} \, \text{hum}_{i} + \beta_{\text{windspeed}} \, \text{windspeed}_{i} \nonumber \\ &+ \beta_{\text{year\_2012}} \, \text{year\_2012}_{i}\\ &+ \beta_{\text{month\_2}} \, \text{month\_2}_{i} + \beta_{\text{month\_3}} \, \text{month\_3}_{i} + \beta_{\text{month\_4}} \, \text{month\_4}_{i} \nonumber \\ &+ \beta_{\text{month\_5}} \, \text{month\_5}_{i} + \beta_{\text{month\_6}} \, \text{month\_6}_{i} + \beta_{\text{month\_7}} \, \text{month\_7}_{i} + \beta_{\text{month\_8}} \, \text{month\_8}_{i} \nonumber \\ &+ \beta_{\text{month\_9}} \, \text{month\_9}_{i} + \beta_{\text{month\_10}} \, \text{month\_10}_{i} + \beta_{\text{month\_11}} \, \text{month\_11}_{i} + \beta_{\text{month\_12}} \, \text{month\_12}_{i} \nonumber \\ &+ \beta_{\text{hr\_1}} \, \text{hr\_1}_{i} + \beta_{\text{hr\_2}} \, \text{hr\_2}_{i} + \beta_{\text{hr\_3}} \, \text{hr\_3}_{i} + \beta_{\text{hr\_4}} \, \text{hr\_4}_{i} \nonumber \\ &+ \beta_{\text{hr\_5}} \, \text{hr\_5}_{i} + \beta_{\text{hr\_6}} \, \text{hr\_6}_{i} + \beta_{\text{hr\_7}} \, \text{hr\_7}_{i} + \beta_{\text{hr\_8}} \, \text{hr\_8}_{i} \nonumber \\ &+ \beta_{\text{hr\_9}} \, \text{hr\_9}_{i} + \beta_{\text{hr\_10}} \, \text{hr\_10}_{i} + \beta_{\text{hr\_11}} \, \text{hr\_11}_{i} + \beta_{\text{hr\_12}} \, \text{hr\_12}_{i} \nonumber \\ &+ \beta_{\text{hr\_13}} \, \text{hr\_13}_{i} + \beta_{\text{hr\_14}} \, \text{hr\_14}_{i} + \beta_{\text{hr\_15}} \, \text{hr\_15}_{i} + \beta_{\text{hr\_16}} \, \text{hr\_16}_{i} \nonumber \\ &+ \beta_{\text{hr\_17}} \, \text{hr\_17}_{i} + \beta_{\text{hr\_18}} \, \text{hr\_18}_{i} + \beta_{\text{hr\_19}} \, \text{hr\_19}_{i} + \beta_{\text{hr\_20}} \, \text{hr\_20}_{i} \nonumber \\ &+ \beta_{\text{hr\_21}} \, \text{hr\_21}_{i} + \beta_{\text{hr\_22}} \, \text{hr\_22}_{i} + \beta_{\text{hr\_23}} \, \text{hr\_23}_{i} \nonumber \\ &+ \beta_{\text{wkday\_monday}} \, \text{wkday\_monday}_{i} + \beta_{\text{wkday\_tuesday}} \, \text{wkday\_tuesday}_{i} + \beta_{\text{wkday\_wednesday}} \, \text{wkday\_wednesday}_{i} \nonumber \\ &+ \beta_{\text{wkday\_thursday}} \, \text{wkday\_thursday}_{i} + \beta_{\text{wkday\_friday}} \, \text{wkday\_friday}_{i} + \beta_{\text{wkday\_saturday}} \, \text{wkday\_saturday}_{i} \nonumber \\ &+ \beta_{\text{holiday\_1}} \, \text{holiday\_1}_{i} \nonumber \\ &+ \beta_{\text{seasons\_summer}} \, \text{seasons\_summer}_{i} + \beta_{\text{seasons\_fall}} \, \text{seasons\_fall}_{i} + \beta_{\text{seasons\_winter}} \, \text{seasons\_winter}_{i} \nonumber \\ &+ \beta_{\text{weather\_cond\_Light\_Snow\_or\_Light\_Rain}} \, \text{weather\_cond\_Light\_Snow\_or\_Light\_Rain}_{i}\nonumber \\ &+ \beta_{\text{weather\_cond\_Mist\_or\_Cloudy}} \, \text{weather\_cond\_Mist\_or\_Cloudy}_{i}\\ &+ \epsilon_{i} \end{align} \]
Note that all predictors are dummy variables, except for temp
, hum
, and windspeed
.
Question 4
Make a prediction on the outcome variable using the test DataFrame and the regression result from Question 3.
Question 5
Interpret the beta estimate of windspeed
.
Question 6
- Which
hr
is most strongly associated with changes incnt
? - Interpret the beta estimate of that
hr
.
Question 7
- Draw a coefficient plot for
temp
,hum
andwindspeed
variables. - Draw a coefficient plot for
month
variables. - Draw a coefficient plot for
hr
variables. - Draw a coefficient plot for
wkday
variables. - Draw a coefficient plot for
seasons
variables. - Draw a coefficient plot for
weather_cond
variables.
Question 8
- Draw a residual plot.
- On average, are the predictions correct in the model in Question 3? Are there systematic errors?