Logistic Regression
March 10, 2025
\[ \begin{align} \texttt{G(z[i]) = } \dfrac{\texttt{exp(z[i])}}{\texttt{1 + exp(z[i])}}.\notag \end{align} \]
The logistic function \(G(z_i)\) maps the linear combination of predictors to the probability that the outcome \(y_{i}\) is \(1\).
\(z_{i} = b_{0} + b_{1}x_{1, i} + b_{2}x_{2, i} + \,\cdots\, + x_{k, i}\)
\(z_{i} \overset{G}{\rightarrow} \texttt{ Prob} ( y_{i} = 1 )\)
The function \(G(z_i)\) is called the logistic function because the function \(G(z_i)\) is the inverse function of a logit (or a log-odd) of the probability that the outcome \(y_{i}\) is 1. \[ \begin{align} G^{-1}(z_i) &\,\equiv\, \text{logit} (\text{Prob}(y_{i} = 1))\\ &\,\equiv \log\left(\, \frac{\text{Prob}(y_{i} = 1)}{\text{Prob}(y_{i} = 0)} \,\right)\\ &\,=\, b_0 + b_{1}x_{1,i} + b_{2}x_{2,i} + \,\cdots\, + b_{k}x_{k,i} \end{align} \]
Logistic regression is a linear regression model for log odds.
\[ \begin{align} \text{Prob}(y_{i} = 1) &\,=\, G( b_0 + b_{1}x_{1,i} + b_{2}x_{2,i} + \,\cdots\, + b_{k}x_{k,i} )\\ \text{ }\\ \Leftrightarrow\qquad \log\left(\dfrac{\text{Prob}( y_i = 1 )}{\text{Prob}( y_i = 0 )}\right) &\,=\, b_0 + b_{1}x_{1,i} + b_{2}x_{2,i} + \,\cdots\, + b_{k}x_{k,i} \end{align} \]
Variable | Type | Description |
---|---|---|
atRisk
|
Bool | 1 if Apgar < 7, 0 otherwise |
PWGT
|
Num | Prepregnancy weight |
UPREVIS
|
Int | Prenatal visits |
CIG_REC
|
Bool | 1 if smoker, 0 otherwise |
GESTREC3
|
Cat | < 37 weeks or ≥ 37 weeks |
DPLURAL
|
Cat | Single / Twin / Triplet+ |
ULD_MECO
|
Bool | 1 if heavy meconium |
ULD_PRECIP
|
Bool | 1 if labor < 3 hours |
ULD_BREECH
|
Bool | 1 if breech birth |
URF_DIAB
|
Bool | 1 if diabetic |
URF_CHYPER
|
Bool | 1 if chronic hypertension |
URF_PHYPER
|
Bool | 1 if pregnancy hypertension |
URF_ECLAM
|
Bool | 1 if eclampsia |
from pyspark.ml.regression import GeneralizedLinearRegression
dummy_cols_GESTREC3, ref_category_GESTREC3 = add_dummy_variables('GESTREC3', 1)
dummy_cols_DPLURAL, ref_category_DPLURAL = add_dummy_variables('DPLURAL', 0)
# assembling predictors
x_cols = ['PWGT', 'UPREVIS', 'CIG_REC',
'ULD_MECO', 'ULD_PRECIP', 'ULD_BREECH', 'URF_DIAB',
'URF_CHYPER', 'URF_PHYPER', 'URF_ECLAM']
assembler_predictors = (
x_cols +
dummy_cols_GESTREC3 + dummy_cols_DPLURAL
)
assembler_1 = VectorAssembler(
inputCols = assembler_predictors,
outputCol = "predictors"
)
dtrain_1 = assembler_1.transform(dtrain)
dtest_1 = assembler_1.transform(dtest)
# training model
model_1 = (
GeneralizedLinearRegression(featuresCol="predictors",
labelCol="atRisk",
family="binomial",
link="logit")
.fit(dtrain_1)
)
# making prediction
dtrain_1 = model_1.transform(dtrain_1)
dtest_1 = model_1.transform(dtest_1)
\[ \text{Deviance} = -2 \log(\text{Likelihood}) + C, \] where \(C\) is constant that we can ignore.
\[ G(b_0 + b_{1}x_{1,i} + b_{2}x_{2,i} + \,\cdots\, + b_{k}x_{k,i}) \]
is the best possible estimate of the binary outcome \(y_i\).
Logistic regression finds the beta parameters that maximize the log likelihood of the data, given the model, which is equivalent to minimizing the sum of the residual deviances.
def marginal_effects(model, means):
"""
Compute marginal effects for all predictors in a PySpark GeneralizedLinearRegression model (logit)
and return a formatted table with statistical significance.
Parameters:
model: Fitted GeneralizedLinearRegression model (with binomial family and logit link).
means: List of mean values for the predictor variables.
Returns:
A formatted string containing the marginal effects table.
"""
global assembler_predictors # Use the global assembler_predictors list
# Extract model coefficients, standard errors, and intercept
coeffs = np.array(model.coefficients)
std_errors = np.array(model.summary.coefficientStandardErrors)
intercept = model.intercept
# Compute linear combination of means and coefficients (XB)
XB = np.dot(means, coeffs) + intercept
# Compute derivative of logistic function (G'(XB))
G_prime_XB = np.exp(XB) / ((1 + np.exp(XB)) ** 2)
# Helper: significance stars.
def significance_stars(p):
if p < 0.01:
return "***"
elif p < 0.05:
return "**"
elif p < 0.1:
return "*"
else:
return ""
# Create table to store results
results = []
for i, predictor in enumerate(assembler_predictors):
# Compute marginal effect
marginal_effect = G_prime_XB * coeffs[i]
# Compute standard error of the marginal effect
std_error = G_prime_XB * std_errors[i]
# Compute z-score and p-value
z_score = marginal_effect / std_error if std_error != 0 else np.nan
p_value = 2 * (1 - norm.cdf(abs(z_score))) if not np.isnan(z_score) else np.nan
# Compute confidence interval (95%)
ci_lower = marginal_effect - 1.96 * std_error
ci_upper = marginal_effect + 1.96 * std_error
# Append results
results.append([predictor, f"{marginal_effect: .4f}", significance_stars(p_value), f"{ci_lower: .4f}", f"{ci_upper: .4f}"])
# Convert results to tabulated format
table_str = tabulate(results, headers=["Variable", "Marginal Effect", "Significance", "95% CI Lower", "95% CI Upper"],
tablefmt="pretty", colalign=("left", "decimal", "left", "decimal", "decimal"))
return table_str
# Example usage:
# means = [0.5, 30] # Mean values for x1 and x2
# assembler_predictors = ['x1', 'x2'] # Define globally before calling the function
# table_output = mfx_glm(fitted_model, means)
# print(table_output)
import matplotlib.pyplot as plt
import seaborn as sns
# Filter training data for atRisk == 1 and atRisk == 0
pdf = dtrain_1.select("prediction", "atRisk").toPandas()
train_true = pdf[pdf["atRisk"] == 1]
train_false = pdf[pdf["atRisk"] == 0]
# Create the first density plot
plt.figure(figsize=(8, 6))
sns.kdeplot(train_true["prediction"], label="TRUE", color="red", fill=True)
sns.kdeplot(train_false["prediction"], label="FALSE", color="blue", fill=True)
plt.xlabel("Prediction")
plt.ylabel("Density")
plt.title("Density Plot of Predictions")
plt.legend(title="atRisk")
plt.show()
# Define threshold for vertical line
threshold = 0.02 # Replace with actual value
# Create the second density plot with vertical line
plt.figure(figsize=(8, 6))
sns.kdeplot(train_true["prediction"], label="TRUE", color="red", fill=True)
sns.kdeplot(train_false["prediction"], label="FALSE", color="blue", fill=True)
plt.axvline(x=threshold, color="blue", linestyle="dashed", label=f"Threshold = {threshold}")
plt.xlabel("Prediction")
plt.ylabel("Density")
plt.title("Density Plot of Predictions with Threshold")
plt.legend(title="atRisk")
plt.show()
# Compute confusion matrix
dtest_1 = dtest_1.withColumn("predicted_class", when(col("prediction") > .02, 1).otherwise(0))
conf_matrix = dtest_1.groupBy("atRisk", "predicted_class").count().orderBy("atRisk", "predicted_class")
TP = dtest_1.filter((col("atRisk") == 1) & (col("predicted_class") == 1)).count()
FP = dtest_1.filter((col("atRisk") == 0) & (col("predicted_class") == 1)).count()
FN = dtest_1.filter((col("atRisk") == 1) & (col("predicted_class") == 0)).count()
TN = dtest_1.filter((col("atRisk") == 0) & (col("predicted_class") == 0)).count()
# Print formatted confusion matrix with labels
print("\n Confusion Matrix:\n")
print(" Predicted")
print(" | Negative | Positive ")
print("------------+------------+------------")
print(f"Actual Neg. | {TN:5} | {FP:5} |")
print("------------+------------+------------")
print(f"Actual Pos. | {FN:5} | {TP:5} |")
print("------------+------------+------------")
accuracy = (TP + TN) / (TP + FP + FN + TN)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
specificity = TN / (TN + FP)
average_rate = (TP + FN) / (TP + TN + FP + FN) # Proportion of actual at-risk babies
enrichment = precision / average_rate
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall (Sensitivity): {recall:.4f}")
print(f"Specificity: {specificity:.4f}")
print(f"Average Rate: {average_rate:.4f}")
print(f"Enrichment: {enrichment:.4f} (Relative Precision)")
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve
pdf = dtest_1.select("prediction", "atRisk").toPandas()
# Extract predictions and true labels
y_true = pdf["atRisk"] # True labels
y_scores = pdf["prediction"] # Predicted probabilities
# Compute precision, recall, and thresholds
precision, recall, thresholds = precision_recall_curve(y_true, y_scores)
# Compute enrichment: precision divided by average at-risk rate
average_rate = np.mean(y_true)
enrichment = precision / average_rate
# Define optimal threshold (example: threshold where recall ≈ enrichment balance)
optimal_threshold = 0.02 # Adjust based on the plot
# Plot Enrichment vs. Recall vs. Threshold
plt.figure(figsize=(8, 6))
plt.plot(thresholds, enrichment[:-1], label="Enrichment", color="blue", linestyle="--")
plt.plot(thresholds, recall[:-1], label="Recall", color="red", linestyle="-")
# Add vertical line for chosen threshold
plt.axvline(x=optimal_threshold, color="black", linestyle="dashed", label=f"Optimal Threshold = {optimal_threshold}")
# Labels and legend
plt.xlabel("Threshold")
plt.ylabel("Score")
plt.title("Enrichment vs. Recall")
plt.legend()
plt.grid(True)
plt.show()
There is also a trade-off between sensitivity and specificity.
The receiver operating characteristic curve (or ROC curve) plot both the true positive rate (recall) and the false positive rate (or 1 - specificity) for all threshold levels.
from sklearn.metrics import roc_curve
# Convert to Pandas
pdf = dtest_1.select("prediction", "atRisk").toPandas()
# Compute ROC curve
fpr, tpr, _ = roc_curve(pdf["atRisk"], pdf["prediction"])
# Plot ROC curve
plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, label=f"ROC Curve (AUC = {auc:.4f})")
plt.plot([0, 1], [0, 1], 'k--', label="Random Guess")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.show()
The AUC for the random model is 0.5.
When comparing multiple classifiers, you generally want to prefer classifiers that have a higher AUC.
You also want to examine the shape of the ROC to explore possible trade-offs.
dtrain, dtest = df.randomSplit([0.5, 0.5], seed = 1234)
pd_dtrain = dtrain.toPandas()
pd_dtest = dtest.toPandas()
# Set seed for reproducibility
np.random.seed(23464)
# Sample 1000 random indices from the test dataset without replacement
sample_indices = np.random.choice(pd_dtest.index, size=1000, replace=False)
# Separate the selected observations from testing data
separated = pd_dtest.loc[sample_indices]
# Remove the selected observations from the testing data
# Consider this as data from NY hospitals
pd_dtest_NY = pd_dtest.drop(sample_indices)
# Split the separated sample into at-risk and not-at-risk groups
at_risk_sample = separated[separated["atRisk"] == 1] # Only at-risk cases
not_at_risk_sample = separated[separated["atRisk"] == 0] # Only not-at-risk cases
# Create test sets for MA hospitals with different at-risk rates
pd_dtest_MA_moreRisk = pd.concat([pd_dtest_NY, at_risk_sample]) # Adds back only at-risk cases
pd_dtest_MA_lessRisk = pd.concat([pd_dtest_NY, not_at_risk_sample]) # Adds back only not-at-risk cases
# Show counts to verify results
print("Original Test Set Size:", pd_dtest.shape[0])
print("Sampled Separated Size:", separated.shape[0])
print("NY Hospitals Data Size:", pd_dtest_NY.shape[0])
print("MA More Risk Data Size:", pd_dtest_MA_moreRisk.shape[0])
print("MA Less Risk Data Size:", pd_dtest_MA_lessRisk.shape[0])
fail = TRUE
when safety = low
, andfail = FALSE
when safety ≠ low
,safety
.➡️ This leads to infinite (non-estimable) coefficients and convergence problems.
fail = TRUE
for all cars with safety = low
,fail
is mixed for safety = med
or high
.➡️ Model still suffers from unstable coefficient estimates or high standard errors.