Source code for pvops.timeseries.models.linear

# Source
import itertools
from sklearn.linear_model import LinearRegression, RANSACRegressor
import numpy as np
from sklearn.metrics import mean_squared_error, r2_score
import pandas as pd
from scipy import stats
from statsmodels.stats.outliers_influence import variance_inflation_factor
import statsmodels.api as sm


def _array_from_df(df, X_parameters):
    return np.vstack([df[xcol].values for xcol in X_parameters]).T


[docs] class Model: """Linear model kernel """ def __init__(self, estimators=None): self.train_index = None self.test_index = None # Other solvers, like RANSAC or THEIL SEN can be added by user self.estimators = estimators or {'OLS': {'estimator': LinearRegression()}, 'RANSAC': {'estimator': RANSACRegressor()} }
[docs] def train(self): """ Train the model. """ if self.verbose >= 1: print("\nBegin training.") for name, info in self.estimators.items(): info['estimator'].fit(self.train_X, self.train_y) self._evaluate(name, info, self.train_X, self.train_y)
def _evaluate(self, name, info, X, y, data_split='train'): """Evaluate the model. """ pred = info['estimator'].predict(X) mse = mean_squared_error(y, pred) r2 = r2_score(y, pred) try: coeffs = info['estimator'].coef_ except AttributeError: coeffs = None if not isinstance(coeffs, type(None)): # @dev: Compare p-value & CI calculations to one of # @dev: an open source package as a validation step. # @dev: Beware that the evaluation uses OLS and may # @dev: be mismatched if user inputs other regressors. Xnew = pd.DataFrame(X, columns=self.variate_names) Xnew = sm.add_constant(Xnew) est = sm.OLS(y, Xnew) est2 = est.fit() self.statsmodels_est = est2 statsdf = est2.summary() # @dev the below snippet calculates statistical parameters # @dev using base numpy. This is commented out because we # @dev have opted to use the statsmodels package. # params = np.append(info['estimator'].intercept_, coeffs) # # Check variable significanace # newX = pd.DataFrame({"Constant": np.ones( # len(X) # )}).join(pd.DataFrame(X)) # MSE = (sum((y-pred)**2))/(len(newX)-len(newX.columns)) # try: # var_b = MSE * np.linalg.inv(np.dot(newX.T, # newX) # ).diagonal() # except np.linalg.LinAlgError: # # Add random noise to avoid a LinAlg error # newX += 0.00001*np.random.rand(*newX.shape) # var_b = MSE * np.linalg.inv(np.dot(newX.T, # newX) # ).diagonal() # sd_b = np.sqrt(var_b) # ts_b = params / sd_b # p_values = [2*(1-stats.t.cdf(np.abs(i), # (len(newX)-newX.shape[1]))) # for i in ts_b] # vif = [variance_inflation_factor(newX.values, i) # for i in range(newX.shape[1])] # lb_ci = params - sd_b * 1.96 # percentile: 0.025 # ub_ci = params + sd_b * 1.96 # percentile: 0.975 # # Round # sd_b = np.round(sd_b, 3) # ts_b = np.round(ts_b, 3) # p_values = np.round(p_values, 3) # params = np.round(params, 4) # lb_ci = np.round(lb_ci, 4) # ub_ci = np.round(ub_ci, 4) # statsdf = pd.DataFrame() # (statsdf["coef"], # statsdf["std err"], # statsdf["t"], # statsdf["P>|t|"], # statsdf["[0.025"], # statsdf["0.975]"], # statsdf["vif"] # ) = [params, # sd_b, # ts_b, # p_values, # lb_ci, # ub_ci, # vif # ] # statsdf.index = ["constant"] + list(self.variate_names) else: statsdf = None if self.verbose >= 1: print(f'[{name}] Mean squared error: %.2f' % mse) print(f'[{name}] Coefficient of determination: %.2f' % r2) # Display coefficients if not isinstance(coeffs, type(None)): print(f'[{name}] {len(coeffs)} coefficient trained.') else: # For RANSAC and others pass if self.verbose >= 2: # The coefficients if not isinstance(coeffs, type(None)): if data_split == 'train': print(statsdf) else: # For RANSAC and others pass if data_split == 'train': info['train_index'] = self.train_index info['train_prediction'] = pred info['train_X'] = X info['train_y'] = y info['train_eval'] = {'mse': mse, 'r2': r2, 'statsdf': statsdf} elif data_split == 'test': info['test_index'] = self.test_index info['test_prediction'] = pred info['test_X'] = X info['test_y'] = y info['test_eval'] = {'mse': mse, 'r2': r2, 'statsdf': statsdf}
[docs] def predict(self): """Predict using the model. """ if self.verbose >= 1: print("\nBegin testing.") for name, info in self.estimators.items(): self._evaluate(name, info, self.test_X, self.test_y, data_split='test')
def _map_season(df_index): # Derived from https://stackoverflow.com/questions/44526662/group-data-by-season-according-to-the-exact-dates spring = range(80, 172) summer = range(172, 264) fall = range(264, 355) def _season(x): if x in spring: return 1 if x in summer: return 2 if x in fall: return 3 else: return 0 return df_index.dayofyear.map(_season)
[docs] class TimeWeightedProcess: """Generate time-oriented dummy variables for linear regression. Available timeframes include "month", "season", and "hour". """ def __init__(self, verbose=0): self.verbose = verbose self.set_time_bins = None
[docs] def time_weight(self, X, time_weighted='season', data_split='train'): if time_weighted == 'month': if data_split == 'train': time_bins = self.train_index.month elif data_split == 'test': time_bins = self.test_index.month if time_weighted == 'season': if data_split == 'train': time_bins = _map_season(self.train_index) elif data_split == 'test': time_bins = _map_season(self.test_index) elif time_weighted == 'hour': if data_split == 'train': time_bins = self.train_index.hour elif data_split == 'test': time_bins = self.test_index.hour elif time_weighted == 'capacity': if data_split == 'train': time_bins = self.train_capacity_bins elif data_split == 'test': time_bins = self.test_capacity_bins if data_split == 'train': self.set_time_bins = set(time_bins) elif data_split == 'test' and not isinstance(self.set_time_bins, (set, np.ndarray, list)): raise Exception( "Must construct train before constructing test " "if using the TimeWeightedProcess.") if self.verbose >= 1: print(data_split, set(time_bins)) df = pd.DataFrame() df['time_bins'] = time_bins indices = {} for group in self.set_time_bins: indices[group] = df[df["time_bins"] == group].index new_variable_names = [] for ii, (param, covariate_profile) in enumerate(zip(self.variate_names, self.covariate_degree_combinations)): df[f"col_{ii}"] = X[:, ii] # add groups for time_idx, group in enumerate(self.set_time_bins): if covariate_profile + [time_idx] in self.exclude_params: continue vals = np.zeros(len(df)) vals[indices[group]] = df.iloc[indices[group]][f"col_{ii}"] df[f"col_{ii}_{group}"] = vals new_variable_names.append(f"{param} | {time_weighted}:{group}") # remove original data del df[f"col_{ii}"] del df["time_bins"] xs = df.values self.variate_names = new_variable_names return xs
[docs] class DefaultModel(Model, TimeWeightedProcess): """Generate a simple model using the input data, without any data transposition. """ _model_name = 'default' def __init__(self, time_weighted=None, estimators=None, verbose=0, X_parameters=[]): super().__init__(estimators) self.verbose = verbose self.time_weighted = time_weighted self.X_parameters = X_parameters # Set to null as default, this is used in PolynomialModel self.exclude_params = []
[docs] def construct(self, X, y, data_split='train'): self.variate_names = self.X_parameters num_variates = len(self.variate_names) self.covariate_degree_combinations = [] for i in range(num_variates): list_sub = [0] * num_variates list_sub[i] = 1 self.covariate_degree_combinations.append(list_sub) if not isinstance(self.time_weighted, type(None)): X = self.time_weight( X, time_weighted=self.time_weighted, data_split=data_split) if data_split == 'train': self.train_X = X self.train_y = y elif data_split == 'test': self.test_X = X self.test_y = y
[docs] class PolynomialModel(Model, TimeWeightedProcess): """Add all interactions between terms with a degree. """ _model_name = "polynomial" def __init__(self, degree=2, estimators=None, time_weighted=None, verbose=0, X_parameters=[], exclude_params=[]): super().__init__(estimators) self.degree = degree self.time_weighted = time_weighted self.verbose = verbose self.X_parameters = X_parameters self.exclude_params = exclude_params
[docs] def construct(self, X, y, data_split='train'): num_inputs = X.shape[1] # add column of rows in first index of matrix xs = np.array(X) # construct identity matrix iden_matrix = [] for i in range(num_inputs): # create np.array of np.zeros row = np.zeros(num_inputs, dtype=int) # add 1 to diagonal index row[i] = 1 iden_matrix.append(row) # gather list all_combinations = [] for degree in range(1, self.degree + 1): all_combinations.append(itertools.combinations_with_replacement( iden_matrix, degree)) # list of polynomial powers poly_powers = [] self.variate_names = [] self.covariate_degree_combinations = [] for combinations in all_combinations: for combination in combinations: covariate_profile = list(np.array(combination).sum(axis=0)) # Check if this term was to be excluded. if isinstance(self.time_weighted, type(None)): if covariate_profile in self.exclude_params: # Skip the params which are meant to be excluded. continue else: self.covariate_degree_combinations.append( covariate_profile) sum_arr = np.zeros(num_inputs, dtype=int) sum_arr += sum((np.array(j) for j in combination)) s = "" has_first_term = False for idx, p in enumerate(sum_arr): if p == 0: continue if has_first_term: s += " * " else: has_first_term = True s += r"{}^{}".format(self.X_parameters[idx], p) self.variate_names.append(s) poly_powers.append(sum_arr) self.powers = poly_powers # Raise data to specified degree pattern and stack A = [] for power in poly_powers: product = (xs**power).prod(1) A.append(product.reshape(product.shape + (1,))) A = np.hstack(np.array(A)) if not isinstance(self.time_weighted, type(None)): A = self.time_weight( A, time_weighted=self.time_weighted, data_split=data_split) print("Design matrix shape:", A.shape) if data_split == 'train': self.train_df = pd.DataFrame(A, columns=self.variate_names, index=self.train_index) self.train_X = A self.train_y = y elif data_split == 'test': self.test_df = pd.DataFrame(A, columns=self.variate_names, index=self.test_index) self.test_X = A self.test_y = y
def _get_params(Y_parameter, X_parameters, prod_col_dict, kernel_type): if Y_parameter is None: Y_parameter = prod_col_dict['powerprod'] if kernel_type == 'polynomial_log': try: X_parameters.remove(prod_col_dict['irradiance']) # Place irradiance in front. X_parameters = [prod_col_dict['irradiance']] + X_parameters except ValueError: raise ValueError( "The `prod_col_dict['irradiance']` definition must be in your " + "X_parameters input for the `polynomial_log` model.") return X_parameters, Y_parameter
[docs] def modeller(prod_col_dict, kernel_type='default', time_weighted='month', X_parameters=[], Y_parameter=None, estimators=None, prod_df=None, test_split=0.2, train_df=None, test_df=None, degree=3, exclude_params=[], verbose=0): """Wrapper method to conduct the modelling of the timeseries data. To input the data, there are two options. - Option 1: include full production data in `prod_df` parameter and `test_split` so that the test split is conducted - Option 2: conduct the test-train split prior to calling the function and pass in data under `test_df` and `train_df` Parameters ---------- prod_col_dict: dict of {str : str} A dictionary that contains the column names relevant for the production data - siteid (*string*), should be assigned to site-ID column name in prod_df - **timestamp** (*string*), should be assigned to time-stamp column name in prod_df - **irradiance** (*string*), should be assigned to irradiance column name in prod_df, where data should be in [W/m^2] - **baseline** (*string*), should be assigned to preferred column name to capture model calculations in prod_df - **dcsize**, (*string*), should be assigned to preferred column name for site capacity in prod_df - **powerprod**, (*string*), should be assigned to the column name holding the power or energy production. This will be used as the output column if Y_parameter is not passed. kernel_type : str Type of kernel type for the statistical model - 'default', establishes a kernel where one component is instantiated in the model for each feature. - 'polynomial', a paraboiloidal polynomial with a dynamic number of covariates (Xs) and degrees (n). For example, with 2 covariates and a degree of 2, the formula would be: Y(α , X) = α_0 + α_1 X_1 + α_2 X_2 + α_3 X_1 X_2 + α_4 X_1^2 + α_5 X_2^2 time_weighted : str or None Interval for time-based feature generation. For each interval in this time-weight, a dummy variable is established in the model prior to training. Options include: - if 'hour', establish discrete model components for each hour of day - if 'month', establish discrete model components for each month - if 'season', establish discrete model components for each season - if None, no time-weighted dummy-variable generation is conducted. X_parameters : list of str List of prod_df column names used in the model Y_parameter : str Optional, name of the y column. Defaults to prod_col_dict['powerprod']. estimators : dict Optional, dictionary with key as regressor identifier (str) and value as a dictionary with key "estimator" and value the regressor instance following sklearn's base model convention: sklearn_docs. .. sklearn_docs: https://scikit-learn.org/stable/modules/generated/sklearn.base.is_regressor.html .. code-block:: python estimators = {'OLS': {'estimator': LinearRegression()}, 'RANSAC': {'estimator': RANSACRegressor()} } prod_df: DataFrame A data frame corresponding to the production data used for model development and evaluation. This data frame needs at least the columns specified in prod_col_dict. test_split : float A value between 0 and 1 indicating the proportion of data used for testing. Only utilized if `prod_df` is specified. If you want to specify your own test-train splits, pass values to `test_df` and `train_df`. test_df: DataFrame A data frame corresponding to the test-split of the production data. Only needed if `prod_df` and `test_split` are not specified. train_df: DataFrame A data frame corresponding to the test-split of the production data. Only needed if `prod_df` and `test_split` are not specified. degree : int Utilized for 'polynomial' and 'polynomial_log' `kernel_type` options, this parameter defines the highest degree utilized in the polynomial kernel. exclude_params : list A list of parameter definitions (defined as lists) to be excluded in the model. For example, if want to exclude a parameter in a 4-covariate model that uses 1 degree on first covariate, 2 degrees on second covariate, and no degrees for 3rd and 4th covariates, you would specify a exclude_params as ``[ [1,2,0,0] ]``. Multiple definitions can be added to list depending on how many terms need to be excluded. If a time_weighted parameter is selected, a time weighted definition will need to be appended to *each* exclusion definition. Continuing the example above, if one wants to exclude "hour 0" for the same term, then the exclude_params must be ``[ [1,2,0,0,0] ]``, where the last 0 represents the time-weighted partition setting. verbose : int Define the specificity of the print statements during this function's execution. Returns ------- model which is a ``pvops.timeseries.models.linear.Model`` object, has a useful attribute estimators which allows access to model performance and data splitting information. train_df which is the training split of prod_df test_df which is the testing split of prod_df """ estimators = estimators or {'OLS': {'estimator': LinearRegression()}, 'RANSAC': {'estimator': RANSACRegressor()}} X_parameters, Y_parameter = _get_params(Y_parameter, X_parameters, prod_col_dict, kernel_type) if (not isinstance(prod_df, type(None))) and (not isinstance(test_split, type(None))): # Split into test-train mask = np.array(range(len(prod_df))) < int( len(prod_df) * (1 - test_split)) train_df = prod_df.iloc[mask] test_df = prod_df.iloc[~mask] else: if isinstance(train_df, type(None)) or isinstance(test_df, type(None)): raise ValueError("Because `prod_df` and `test_split` were not specified," "expected `train_df` and `test_df` to be passed. But, they" "were not specified.") train_y = train_df[Y_parameter].values test_y = test_df[Y_parameter].values train_X = _array_from_df(train_df, X_parameters) test_X = _array_from_df(test_df, X_parameters) if kernel_type == 'default': model = DefaultModel(time_weighted=time_weighted, estimators=estimators, verbose=verbose, X_parameters=X_parameters) elif kernel_type == 'polynomial': model = PolynomialModel( time_weighted=time_weighted, estimators=estimators, degree=degree, verbose=verbose, X_parameters=X_parameters, exclude_params=exclude_params) model.train_index = train_df.index model.test_index = test_df.index # Support in future versions # model.train_capacity_bins = train_capacity_bins # model.test_capacity_bins = test_capacity_bins # Always construct train first in case of using time_weighted, # which caches the time regions in training to reuse in testing. model.construct(train_X, train_y, data_split='train') model.construct(test_X, test_y, data_split='test') model.train() model.predict() return model, train_df, test_df
[docs] def predicter(model, df, Y_parameter, X_parameters, prod_col_dict, verbose=0): kernel_type = model._model_name X_parameters, Y_parameter = _get_params(Y_parameter, X_parameters, prod_col_dict, kernel_type) test_y = df[Y_parameter].values test_X = df[X_parameters].values if verbose > 0: print(X_parameters) print(Y_parameter) print(test_y.shape) print(test_X.shape) model.test_index = df.index model.construct(test_X, test_y, data_split='test') model.predict() return model, test_y, test_X