import numpy as np
n = 1000
p = 20
X = np.random.normal(size=(n, p))
Y = np.random.normal(size=n)Lab 1 Solution - Coding for ML Basics
1 Implement ordinary least squares
- Generate a matrix \(\boldsymbol{X}\) and an array \(\boldsymbol{Y}\) filled with random values using the Python package
numpy.
- Implement the calculation of the OLS estimator using
numpy.
def fit_OLS(X:np.ndarray, Y:np.ndarray) -> np.ndarray:
"""Fit Ordinary Least Squares regression.
Args:
X (np.ndarray): Design matrix of shape (n, p).
Y (np.ndarray): Response vector of shape (n,).
Returns:
np.ndarray: Estimated coefficients of shape (p,).
"""
assert X.shape[0] == Y.shape[0], "Number of rows in X must match length of Y."
return np.linalg.inv(X.T @ X) @ (X.T @ Y)Preventive check whether the output dimensions make sense
fit_OLS(X, Y).shape(20,)
- What is the time complexity of your implementation?
Please read about time complexity in computer science. Time complexity measures how the running time of your code depends on the input parameters, in this case \(N\) and \(P\). We count basic operations that the computer needs to perform. Let us break fit_OLS down component by component:
X.T |
matrix transpose | \(\mathcal{O}(NP)\) | need to visit each of the \(NP\) cells of the matrix \(\boldsymbol{X}\) |
X.T @ X |
matrix multiplication | \(\mathcal{O}(NP^2)\) | the matrix \(\boldsymbol{X}^T\boldsymbol{X}\) has \(P^2\) cells, the value of each is a sum of \(N\) elements |
(X.T @ X)^{-1} |
matrix inversion | \(\mathcal{O}(P^3)\) | numpy implements an \(\mathcal{O}(P^3)\) algorithm to invert a \(P\times P\) matrix, I think |
X.T @ Y |
matrix multiplication | \(\mathcal{O}(NP)\) | the vector \(\boldsymbol{X}^T\boldsymbol{Y}\) has \(P\) cells, the value of each is a sum of \(N\) elements |
(X.T @ X)^{-1} @ (X.T @ Y) |
matrix multiplication | \(\mathcal{O}(P^2)\) | the vector has \(P\) cells, the value of each is a sum of \(P\) elements |
The total complexity is \(\mathcal{O}(NP + NP^2 + P^3)=\mathcal{O}(NP^2)\). The \(\mathcal{O}\) notation hides any constants and considers only the asymptotically dominating factor \(\mathcal{O}(NP^2)\), which comes from the computation of the matrix product \(\boldsymbol{X}^T\boldsymbol{X}\). The reason this factor dominates the others is that \(P\leq N\), which follows from the assumption that \(\boldsymbol{X}\) has full column rank. If \(P>N\) then the OLS estimator is undefined.
- Is there a difference between implementing it like \(((\boldsymbol{X}^T\boldsymbol{X})^{-1}\boldsymbol{X}^T)\boldsymbol{Y}\) and like \((\boldsymbol{X}^T\boldsymbol{X})^{-1}(\boldsymbol{X}^T\boldsymbol{Y})\)? Why?
The former includes the computation of the matrix product of \((\boldsymbol{X}^T\boldsymbol{X})^{-1}\) with \(\boldsymbol{X}^T\) which takes \(\mathcal{O}(NP^2)\) time. The asymptotic complexity is the same for both implementations (both \(\mathcal{O}(NP^2)\)), but the former does the heaviest matrix multiplication twice, so asymptotically we can expect the latter to run twice faster.
import timeit
print(timeit.timeit("np.linalg.inv(X.T @ X) @ (X.T @ Y)", number=500, globals=globals()))
print(timeit.timeit("(np.linalg.inv(X.T @ X) @ X.T) @ Y", number=500, globals=globals()))0.3976376620000224
0.5852540070000032
- Calculate the OLS estimator again over the same data, this time using the Python package
sklearn.
import sklearn.linear_model
model = sklearn.linear_model.LinearRegression(fit_intercept=False)
model.fit(X, Y)LinearRegression(fit_intercept=False)In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
LinearRegression(fit_intercept=False)
- Compare the predictions generated by your implementation with those of
sklearn, they should be identical.
They code below shows that they are identical up to machine error. Those of you who didn’t notice that sklearn has the default parameter fit_intercept=True got very different predictions. The lesson is that when using a library function you have to be very careful in making sure it does exactly what you want it to do.
b_ols = fit_OLS(X, Y)
pred_my_ols = X @ b_ols
pred_sklearn_ols = model.predict(X)
np.max(np.abs(pred_my_ols - pred_sklearn_ols))5.412337245047638e-16
- Check whether the coefficients computed by sklearn match your own.
b_sklearn_ols = model.coef_
np.max(np.abs(b_ols - b_sklearn_ols))1.1796119636642288e-16
2 Generalized least squares
- Generate a random positive semi-definite matrix \(\boldsymbol{W}\) in whatever way you prefer. It must be different from the identity, but that is the only restriction.
A = np.random.normal(size=(n, n))
W = A.T @ A# Check if W is positive definite
np.all(np.linalg.eigvalsh(W)>0)True
- Implement the calculation of the GLS using numpy from scratch.
def fit_GLS(X:np.ndarray, Y:np.ndarray, W:np.ndarray) -> np.ndarray:
"""Fit Generalized Least Squares regression.
Args:
X (np.ndarray): Design matrix of shape (n, p).
Y (np.ndarray): Response vector of shape (n,).
W (np.ndarray): Covariance matrix of shape (n, n).
Returns:
np.ndarray: Estimated coefficients of shape (p,).
"""
assert W.shape[0] == W.shape[1] == X.shape[0] == Y.shape[0], "Incompatible dimensions."
W_inv = np.linalg.inv(W)
return np.linalg.inv(X.T @ W_inv @ X) @ (X.T @ W_inv @ Y)
fit_GLS(X, Y, W).shape(20,)
- Implement a version of your previous OLS estimator which calls the GLS calculation you built on b. (note that the OLS should not have a weight matrix as input)
def fit_OLS_w_GLS(X:np.ndarray, Y:np.ndarray) -> np.ndarray:
"""Fit Ordinary Least Squares regression using the GLS function with identity covariance.
Args:
X (np.ndarray): Design matrix of shape (n, p).
Y (np.ndarray): Response vector of shape (n,).
Returns:
np.ndarray: Estimated coefficients of shape (p,).
"""
n = X.shape[0]
W = np.eye(n)
return fit_GLS(X, Y, W)- Do the reverse: do a implementantion of the calculation of the GLS which calls the OLS you implemented on the previous exercise.
from scipy.linalg import sqrtm
def fit_GLS_w_OLS(X:np.ndarray, Y:np.ndarray, W:np.ndarray):
"""Fit Generalized Least Squares regression using the OLS function.
Args:
X (np.ndarray): Design matrix of shape (n, p).
Y (np.ndarray): Response vector of shape (n,).
W (np.ndarray): Covariance matrix of shape (n, n).
Returns:
np.ndarray: Estimated coefficients of shape (p,).
"""
X_rescaled = np.linalg.inv(sqrtm(W)) @ X
Y_rescaled = np.linalg.inv(sqrtm(W)) @ Y
return fit_OLS(X_rescaled, Y_rescaled)- Check that all previous implementations yield the same predictions and coefficients.
print(np.linalg.norm(fit_OLS(X, Y)-fit_OLS_w_GLS(X, Y)))1.598072810856837e-16
print(np.linalg.norm(fit_GLS(X, Y, W)-fit_GLS_w_OLS(X, Y, W)))2.816982217203104e-13
Class implementation of GLS
class GLSRegression:
def __init__(self):
"""Generalized Least Squares Regression model wrapped up in a class structure.
"""
self.W = None
self.coef_ = None
def fit(self, X:np.ndarray, Y:np.ndarray, W:np.ndarray=None) -> None:
"""Fit the GLS model.
Args:
X (np.ndarray): Design matrix of shape (n, p).
Y (np.ndarray): Response vector of shape (n,).
"""
if W is None:
n = X.shape[0]
self.W = np.eye(n)
self.W = W
self.coef_ = fit_GLS(X, Y, self.W)
def predict(self, X:np.ndarray) -> np.ndarray:
"""Predict using the GLS model.
Args:
X (np.ndarray): Design matrix of shape (m, p).
Returns:
np.ndarray: Predicted values of shape (m,).
"""
return X @ self.coef_gls = GLSRegression()
gls.fit(X, Y, W)
pred_gls = gls.predict(X)3 Implement a loss function
- Implement a function that computes the average squared error between two vectors \(\boldsymbol{Y}\) and \(\hat{\boldsymbol{Y}}\), element by element:
- In a vectorized way using
numpy. - Using a for loop, list comprehension or in another non-vectorized way.
- In a vectorized way using
def average_squared_error(y_true:np.ndarray, y_pred:np.ndarray) -> float:
""" Compute the average squared error between true and predicted values.
Args:
y (np.ndarray): True values of shape (n,).
yhat (np.ndarray): Predicted values of shape (n,).
Returns:
float: Average squared error.
"""
return np.mean(np.subtract(y_true, y_pred) ** 2)
def average_squared_error_non_vectorized(y_true:np.ndarray, y_pred:np.ndarray) -> float:
""" Compute the average squared error between true and predicted values without vectorization.
Args:
y (np.ndarray): True values of shape (n,).
yhat (np.ndarray): Predicted values of shape (n,).
Returns:
float: Average squared error.
"""
n = len(y_true)
assert n == len(y_pred), "Length of true and predicted values must match."
squared_error_sum = 0
for i in range(n):
squared_error_sum += (y_true[i] - y_pred[i]) ** 2
return squared_error_sum / n- Compare the execution time of the two implementations, using
timeit.
print(timeit.timeit("average_squared_error(Y, Y)", number=1000, globals=globals()))
print(timeit.timeit("average_squared_error_non_vectorized(Y, Y)", number=1000, globals=globals()))0.007807396999965022
0.3937886170000411
- Why is there a difference in execution time between the two implementations?
The difference is due to the fact that vectorized operations in numpy arrays are implemented more efficiently than the raw list+loop python structure.
4 Feature engineering
Functions implementation
For a) and b) we do a standard batched implementation
def linear_transform(X:np.ndarray, A:np.ndarray, b:np.ndarray) -> np.ndarray:
"""Apply a linear transformation to the data.
Args:
X (np.ndarray): Data matrix of shape (n, p).
A (np.ndarray): Transformation matrix of shape (q, p).
b (np.ndarray): Translation vector of shape (q,).
Returns:
np.ndarray: Transformed data matrix of shape (n, q).
"""
return (A @ X.T).T + b
def quadratic_transform(X:np.ndarray, B:np.ndarray) -> np.ndarray:
"""Apply a quadratic transformation to the data.
Args:
X (np.ndarray): Data matrix of shape (n, p).
B (np.ndarray): Transformation matrix of shape (p, p).
Returns:
np.ndarray: Transformed data matrix of shape (n, 1).
"""
## For these simple transformations, it is okay to do a loop iteration like:
# result = np.zeros((n, 1))
# for i in range(n):
# result[i, 0] = X[i, :].T @ B @ X[i, :]
# It might often not be worth the trouble to think how to do it more efficiently
# But it is good to have in mind that a simple operation like this should be possible to do in a fully vectorized fashion
# For example, noting that:
# X_t[i] = sum_j sum_k X[i,j] * B[j,k] * X[i,k]
# we can more efficiently write this as:
X_col_ext_prod = X[:,:,None] * X[:,None,:] # tensor of shape (n, p, p) with entries X[i,j] * X[i,k]
X_transformed = np.tensordot(X_col_ext_prod , B, axes=([1,2],[0,1]))
return X_transformedFor the last three functions, we can use that all of them involve a custom operation of applying multiple functions in a piecewise manner. Hence our code becomes clearer if we do a modularized implementation using their shared similarities. It is worth having in mind that we are giving up a little bit of efficiency by doing this. But the cost in this case is worth the added modularization.
Note also that in principle both the step function and piecewise linear functions are particular cases of the quadratic function. Although we could have implemented everything using only the quadratic function, we preferred to keep them separate for clarity.
Defining complicated objects in terms of simpler ones is part of make the code simpler. Doing the other way around, defining simple objects in terms of complicated ones, usually does not lend the same gain in clarity.
def piecewise_apply_single_f(x:np.ndarray, f:callable, a:float, b:float) -> float:
"""Apply single function to an array of data x restricted to the interval [a, b]:
g(x) = f(x) if a < x <= b, else 0.
Args:
x (np.ndarray): Data points array of shape (n,).
f (callable): Function to apply.
a (float): lower boundary of the interval.
b (float): upper boundary of the interval.
Returns:
np.ndarray: Transformed data array of shape (n,).
"""
return np.where((a < x) & (x <= b), f(x), 0)
def piecewise_apply_multiple_f(x:np.ndarray, f_list:list, cutoff_values:list) -> np.ndarray:
""" Compute function defined piecewise by multiple functions over specified cutoff values.
g(x) = sum_i f_i(x) * I{(c_i, c_{i+1}]}(x)
Args:
x (np.ndarray): Data points array of shape (n,).
f_list (list): List of functions to apply.
cutoff_values (list): List of cutoff values defining the intervals.
Returns:
np.ndarray: Transformed data array of shape (n,).
"""
assert len(f_list) + 1 == len(cutoff_values), "Number of cutoff values must be one more than number of functions."
x_transformed = np.zeros_like(x)
for i in range(len(f_list)):
x_transformed += piecewise_apply_single_f(x, f_list[i], cutoff_values[i], cutoff_values[i+1])
return x_transformeddef piecewise_constant_function(x:np.ndarray, values:list, cutoff_values:list) -> np.ndarray:
""" Compute piecewise constant function over specified cutoff values.
g(x) = sum_i v_i * I{(c_i, c_{i+1}]}(x)
Args:
x (np.ndarray): Data points array of shape (n,).
values (list): List of constant values for each interval.
cutoff_values (list): List of cutoff values defining the intervals.
Returns:
np.ndarray: Transformed data array of shape (n,).
"""
# We need to add the dummy argument v=v in the lambda definition of the function
# otherwise all the functions will collapse into the same object
f_list = [lambda x, v=v: v for v in values]
return piecewise_apply_multiple_f(x, f_list, cutoff_values)def piecewise_linear_function(x:np.ndarray, slopes:list, intercepts:list, cutoff_values:list) -> np.ndarray:
""" Compute piecewise constant function over specified cutoff values.
g(x) = sum_i (m_i * x + b_i) * I{(c_i, c_{i+1}]}(x)
Args:
x (np.ndarray): Data points array of shape (n,).
slopes (list): List of slope values for each interval.
intercepts (list): List of intercept values for each interval.
cutoff_values (list): List of cutoff values defining the intervals.
Returns:
np.ndarray: Transformed data array of shape (n,).
"""
f_list = [lambda x, m=slope, b=intercept: m*x + b for (slope, intercept) in zip(slopes, intercepts)]
return piecewise_apply_multiple_f(x, f_list, cutoff_values)def piecewise_quadratic_function(x:np.ndarray, quad_coeffs:list, lin_coeffs:list, const_coeffs:list, cutoff_values:list) -> np.ndarray:
""" Compute piecewise constant function over specified cutoff values.
g(x) = sum_i (quad_i * x**2 + lin_i * x + const_i) * I{(c_i, c_{i+1}]}(x)
Args:
x (np.ndarray): Data points array of shape (n,).
quad_coeffs (list): List of quadratic coefficient values for each interval.
lin_coeffs (list): List of linear coefficient values for each interval.
const_coeffs (list): List of constant coefficient values for each interval.
cutoff_values (list): List of cutoff values defining the intervals.
Returns:
np.ndarray: Transformed data array of shape (n,).
"""
f_list = [lambda x, a=quad_coeff, b=lin_coeff, c=const_coeff: a * x ** 2 + b * x + c for (quad_coeff, lin_coeff, const_coeff) in zip(quad_coeffs, lin_coeffs, const_coeffs)]
return piecewise_apply_multiple_f(x, f_list, cutoff_values)import matplotlib.pyplot as plt
cutoff_values_const = [-2, -1, 0, 1, 2]
x = np.linspace(-3, 3, 1000)
values = [0, 1, 0, -1]
x_const = piecewise_constant_function(x, values, cutoff_values_const)
plt.plot(x, x_const)
plt.title("Piecewise Constant Function")
plt.show()
cutoff_values_lin = [-np.inf, 0, np.inf]
intercepts = [1, 1]
slopes = [1, -1]
x_lin = piecewise_linear_function(x, slopes, intercepts, cutoff_values_lin)
plt.plot(x, x_lin)
plt.title("Piecewise Linear Function")
plt.show()
cutoff_values_quad = [-np.inf, 0, np.inf]
quad_coeffs = [-1, 1]
lin_coeffs = [0, 0]
const_coeffs = [0, 0]
x_quad = piecewise_quadratic_function(x, quad_coeffs, lin_coeffs, const_coeffs, cutoff_values_quad)
plt.plot(x, x_quad)
plt.title("Piecewise Quadratic Function")
plt.show()
# apply feature transformations to data matrix X
X_linear = linear_transform(X, A=np.random.normal(size=(10, p)), b=np.random.normal(size=10))
# needs to reshape transforms that output a single column to make them 2D arrays as OLS takes in 2D arrays for X
X_quadratic = quadratic_transform(X, B=np.random.normal(size=(p, p))).reshape(-1, 1)
X_piecewise_constant = piecewise_constant_function(X[:,0], values, cutoff_values_const).reshape(-1, 1)
X_piecewise_linear = piecewise_linear_function(X[:,0], slopes, intercepts, cutoff_values_lin).reshape(-1, 1)
X_piecewise_quadratic = piecewise_quadratic_function(X[:,0], quad_coeffs, lin_coeffs, const_coeffs, cutoff_values_quad).reshape(-1, 1)# compute OLS
linear_OLS = fit_OLS(X_linear, Y)
quadratic_OLS = fit_OLS(X_quadratic, Y)
piecewise_constant_OLS = fit_OLS(X_piecewise_constant, Y)
piecewise_linear_OLS = fit_OLS(X_piecewise_linear, Y)
piecewise_quadratic_OLS = fit_OLS(X_piecewise_quadratic, Y)# compute and print prediction errors
OLS_features = [X_linear, X_quadratic, X_piecewise_constant, X_piecewise_linear, X_piecewise_quadratic]
OLS_coeffs = [fit_OLS(X_feat, Y) for X_feat in OLS_features]
model_names = ["Linear OLS", "Quadratic OLS", "Piecewise Constant OLS", "Piecewise Linear OLS", "Piecewise Quadratic OLS"]
for (features, coeffs, name) in zip(OLS_features, OLS_coeffs, model_names):
y_pred = features @ coeffs # assuming first p coefficients correspond to original features
error = average_squared_error(Y, y_pred)
print(f"Average Squared Error for {name}: {error}")Average Squared Error for Linear OLS: 0.9625317178126258
Average Squared Error for Quadratic OLS: 0.9742615039197444
Average Squared Error for Piecewise Constant OLS: 0.9741390961264887
Average Squared Error for Piecewise Linear OLS: 0.9736039986650243
Average Squared Error for Piecewise Quadratic OLS: 0.9735805828697969