Source code for pvops.iv.models.nn

from scipy.interpolate import interp1d
from sklearn.utils import resample
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import LabelBinarizer

import keras
from keras.layers import Lambda, concatenate
from keras.layers import LSTM, Flatten, Input, Dropout, Dense, Conv1D
from keras.models import Sequential, Model
from keras.utils import to_categorical

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

keras.backend.clear_session()


[docs] def get_diff_array(sample_V, sample_I, pristine_V, pristine_I, debug=False): """Generate IV current differential between sample and pristine. Parameters ---------- sample_V : array Voltage array for a sample's IV curve sample_I : array Current array for a sample's IV curve pristine_V : array Voltage array for a pristine IV curve pristine_I : array Current array for a pristine IV curve Returns ------- all_V : array Combined voltage array diff : array Current differential calculation """ if sample_V[-1] > pristine_V[-1]: x_larger = sample_V option = 1 else: x_larger = pristine_V option = 2 f_interp1 = interp1d(np.flipud(pristine_V), np.flipud(pristine_I), kind='linear', fill_value='extrapolate') pristine_I_interp = f_interp1(x_larger) pristine_I_interp[pristine_I_interp < 0] = 0 f_interp2 = interp1d(np.flipud(sample_V), np.flipud(sample_I), kind='linear', fill_value='extrapolate') sample_I_interp = f_interp2(sample_V) sample_I_interp[sample_I_interp < 0] = 0 all_V = np.sort(np.unique(np.append(sample_V, pristine_V))) all_i1 = f_interp1(all_V) all_i1[all_i1 < 0] = 0 all_i2 = f_interp2(all_V) all_i2[all_i2 < 0] = 0 if option == 1: diff = all_i2 - all_i1 if option == 2: diff = all_i1 - all_i2 return all_V, diff
[docs] def feature_generation(bigdf, iv_col_dict, pristine_mode_identifier='Pristine array'): """Generate features of an IV curve data set including: 1. Current differential between a sample IV curve and a pristine IV curve 2. Finite difference of the IV curve along the y-axis, indicating the slope of the cuve. Parameters ---------- bigdf : dataframe Dataframe holding columns from `iv_col_dict`, except for the `derivative` and `current_diff` which are calculated here. iv_col_dict : dict Dictionary containing definitions for the column names in `df` - **current** (*str*): column name for IV current arrays. - **voltage** (*str*): column name for IV voltage arrays. - **mode** (*str*): column name for failure mode identifier. - **irradiance** (*str*): column name for the irradiance definition. - **temperature** (*str*): column name for the temperature definition. - **derivative** (*str*): column name for the finite derivative, as calculated in this function. - **current_diff** (*str*): column name for current differential, as calculated in `get_diff_array`. pristine_mode_identifier : str Pristine array identifier. The pristine curve is utilized in ``get_diff_array``. If multiple rows exist at this ``pristine_mode_identifier``, the one with the highest irradiance and lowest temperature definitions is chosen. Returns ------- all_V : array Combined voltage array diff : array Current differential calculation """ current_col = iv_col_dict["current"] voltage_col = iv_col_dict["voltage"] failure_mode_col = iv_col_dict["mode"] irradiance_col = iv_col_dict["irradiance"] temperature_col = iv_col_dict["temperature"] derivative_col = iv_col_dict["derivative"] current_diff_col = iv_col_dict["current_diff"] pristine = bigdf.loc[bigdf[failure_mode_col] == pristine_mode_identifier, :].copy() pristine.sort_values(by=[irradiance_col, temperature_col], ascending=[ False, True], inplace=True) sub = bigdf.loc[bigdf[failure_mode_col] != pristine_mode_identifier, :].copy() pristine_V = pristine[voltage_col].values[0] pristine_I = pristine[current_col].values[0] n = 0 sample_V = sub[voltage_col].values[n] sample_I = sub[current_col].values[n] diffs = [] for n in range(len(sub)): sample_V = sub[voltage_col].values[n] sample_I = sub[current_col].values[n] v, i_diff = get_diff_array(sample_V, sample_I, pristine_V, pristine_I) diffs.append(i_diff) sub[current_diff_col] = diffs finite_derivatives = [] for ind, row in sub.iterrows(): Is2 = row[current_col] finite_derivatives.append( np.array([0] + [j - i for i, j in zip(Is2[:-1], Is2[1:])])) sub[derivative_col] = finite_derivatives return sub
[docs] def balance_df(df, iv_col_dict, balance_tactic='truncate'): """Balance data so that an equal number of samples are found at each unique `ycol` definition. Parameters ---------- bigdf : dataframe Dataframe containing the `ycol` column. iv_col_dict : dict Dictionary containing at least the following definition: - **mode** (*str*), column in `df` which holds the definitions which must contain a balanced number of samples for each unique definition. balance_tactic : str mode balancing tactic, either "truncate" or "gravitate". Truncate will utilize the exact same number of samples for each category. Gravitate will sway the original number of samples towards a central target. Returns ------- balanced_df : DataFrame balanced according to the `balance_tactic`. """ ycol = iv_col_dict['mode'] print("Balance data by mode:") if balance_tactic == 'gravitate': len_rows = len(df.index) modes = set(df[ycol].tolist()) num_modes = len(modes) avg_rows_in_mode = int(len_rows / num_modes) balanced_df = pd.DataFrame() for md in modes: df_mode = df[df[ycol] == md] if len(df_mode.index) > avg_rows_in_mode: # majority class, must downsample resmpl_len = int(len(df_mode) * 0.7) if resmpl_len < avg_rows_in_mode: # if 0.7 times current value is less than avg rows per mode, utilize ag rows per mode resmpl_len = avg_rows_in_mode print('\t[Class {}]: Majority, {} --> {}' ' gravitating to {}'.format(md, len(df_mode.index), resmpl_len, avg_rows_in_mode)) df_resampled = resample(df_mode, replace=True, n_samples=resmpl_len, random_state=123) elif len(df_mode.index) < avg_rows_in_mode: # minority class, must upsample resmpl_len = int(len(df_mode) * 1.3) if resmpl_len > avg_rows_in_mode: # if 1.3 times current value is greater than avg rows per mode, utilize ag rows per mode resmpl_len = avg_rows_in_mode print('\t[Class {}]: Minority, {} --> {}' 'gravitating to {}'.format(md, len(df_mode.index), resmpl_len, avg_rows_in_mode)) df_resampled = resample(df_mode, replace=True, n_samples=resmpl_len, random_state=234) else: # Equal to average, do nothing print('\t[Class {}]: Equal, {} = {}'.format( md, len(df_mode.index), avg_rows_in_mode)) df_resampled = df_mode balanced_df = pd.concat([balanced_df, df_resampled]) elif balance_tactic == 'truncate': modes = set(df[ycol].tolist()) balanced_df = pd.DataFrame() lens = [] for md in modes: df_mode = df[df[ycol] == md] lens.append(len(df_mode.index)) minlen = min(lens) for md in modes: df_mode = df[df[ycol] == md] if len(df_mode.index) > minlen: print('\t[Class {}]: Resampled, {} --> {}'.format(md, len(df_mode.index), minlen)) df_resampled = resample(df_mode, replace=True, n_samples=minlen, random_state=123) elif minlen == len(df_mode.index): print('\t[Class {}]: Resampled, {} == {}'.format( md, len(df_mode.index), minlen)) df_resampled = df_mode balanced_df = pd.concat([balanced_df, df_resampled]) else: raise Exception( "Invalid balancedf variable: {}".format(balance_tactic)) return balanced_df
[docs] def plot_profiles(df, colx, coly, iv_col_dict, cmap_name='brg'): """Plot curves also with area colorizations to display the deviation in definitions. Parameters ---------- df : dataframe Dataframe containing the `colx`, `coly`, and iv_col_dict['mode'] column colx : str Column containing x-axis array of data on each sample. coly : str Column containing y-axis array of data on each sample. iv_col_dict : dict Dictionary containing at least the following definition: - **mode** (*str*), column in `df` which holds the definitions which must contain a balanced number of samples for each unique definition. cmap_name : str Matplotlib colormap. Returns ------- matplotlib figure """ mode_col = iv_col_dict["mode"] # Check whether the colx has equivalent values in every row xs = df[colx].tolist() try: all_equal = all((x == xs[0]).all() for x in xs) except AttributeError: all_equal = False else: all_equal = True fig = plt.figure(figsize=(10, 5)) i = 0 labeled = list(sorted(list(set(df[mode_col].tolist())))) cmap = plt.cm.get_cmap(cmap_name, len(labeled)) for mode in labeled: subdf = df[df[mode_col] == mode] s2is = subdf[coly].tolist() if all_equal: x = subdf[colx].values[0] else: x = range(len(subdf[colx].values[0])) s2ismean = np.array(s2is).mean(axis=0) s2isstd = np.array(s2is).std(axis=0) s2lowerbound = [s2ismean[i] - s2isstd[i] for i in range(len(s2ismean))] s2upperbound = [s2ismean[i] + s2isstd[i] for i in range(len(s2ismean))] plt.plot(x, s2ismean, color=cmap(i), label='{}'.format(mode)) plt.fill_between(x, s2lowerbound, s2upperbound, facecolor=cmap(i), alpha=0.2) i += 1 if all_equal: plt.xlabel(colx) else: plt.xlabel(colx + " index") plt.ylabel(coly) plt.legend() return fig
def _convert_ivdata_to_cnn_structure(df, params): data = [] for ind, row in df.iterrows(): dat = [] for i in range(len(row[params[0]])): da = [] for param in params: da.append(row[param][i]) dat.append(da) data.append(dat) return np.asarray(data) def _convert_ivdata_to_lstm_multihead_structure(df, params, n_filters=10.): data_features = [] for param in params: data_features.append(df[param].values) def _list_to_lol(lst, ln): # lst: list input # len: num vals in each sublist maxlen = len(lst) i = 0 lol = [] while i + ln < maxlen: x = lst[i:i + ln] lol.append(x) i += ln return lol print(f'Making {n_filters} and sample {len(data_features[0][0])}') length = int(len(data_features[0][0]) / n_filters) # lists restructured_data_features = [[] for _ in range(len(params))] for ividx in range(len(data_features[0])): for i in range(len(params)): restructured_data_features[i].append( _list_to_lol(data_features[i][ividx], length)) for i in range(len(params)): restructured_data_features[i] = np.asarray( restructured_data_features[i]) return restructured_data_features def _grab_structure_lstm_multihead_structure(restructured_data_features, idxs): nparms = len(restructured_data_features) return [restructured_data_features[i][idxs] for i in range(nparms)]
[docs] def classify_curves(df, iv_col_dict, nn_config): """Build and evaluate an IV trace failure `mode` classifier. Parameters ---------- df : dataframe Data with columns in `iv_col_dict` iv_col_dict : dict Dictionary containing definitions for the column names in `df` **mode** (*str*): column name for failure mode identifier nn_config : dict Parameters used for the IV trace classifier. """ # Balance ys bal_df = balance_df( df, iv_col_dict, nn_config["balance_tactic"]) # Label binarizer ys = bal_df[iv_col_dict["mode"]].values train, test = train_test_split( bal_df, train_size=nn_config["train_size"], shuffle=nn_config["shuffle_split"], stratify=(ys if nn_config["shuffle_split"] else None)) iv = IVClassifier(nn_config) iv.structure(train, test) iv.train() iv.predict() return iv, train, test
[docs] class IVClassifier: def __init__(self, nn_config): self.nn_config = nn_config self.verbose = nn_config["verbose"] self.model_name = nn_config["model_choice"] self.params = nn_config["params"]
[docs] def structure(self, train, test): """Structure the data according to the chosen network model's input structure. Parameters ---------- train : dataframe Train data containing IV data and associated features test : dataframe Test data containing IV data and associated features nn_config : dict Parameters used for the IV trace classifier. """ num_params = len(self.params) # train the label binarizer self.lb = LabelBinarizer() all_ys = train['mode'].tolist() + test['mode'].tolist() self.lb.fit(all_ys) self.test_y = test['mode'].values self.train_y = train['mode'].values self.encoded_length = len(self.lb.classes_) self.is_binary = False if self.encoded_length == 1: raise ValueError("Only one failure mode was passed in dataset. " "Add samples with other failure modes.") if self.encoded_length == 2: # Binary classification detected self.is_binary = True self.loss_defn = 'categorical_crossentropy' self.metric_defn = 'categorical_accuracy' if self.model_name == '1DCNN': self.train_x = _convert_ivdata_to_cnn_structure(train, self.params) self.test_x = _convert_ivdata_to_cnn_structure(test, self.params) self._X_for_cvsplit = self.train_x length_arr_in_sample = len(self.test_x[0]) self._1dcnn((length_arr_in_sample, num_params), nfilters=self.nn_config["nfilters"], kernel_size=self.nn_config["kernel_size"], dropout_pct=self.nn_config["dropout_pct"]) elif self.model_name == 'LSTM_multihead': self.train_x = _convert_ivdata_to_lstm_multihead_structure( train, self.params) self.test_x = _convert_ivdata_to_lstm_multihead_structure( test, self.params) self._X_for_cvsplit = self.train_x[0] n_sequences = np.asarray(self.test_x).shape[2] n_samples_in_sequence = np.asarray(self.test_x).shape[3] self.test_x = _grab_structure_lstm_multihead_structure( self.test_x, np.array(range(np.asarray(self.test_x).shape[1]))) self._lstm_multihead((n_sequences, n_samples_in_sequence), num_params, use_attention_LSTM=self.nn_config[ 'use_attention_lstm'], units=self.nn_config['units'], dropout_pct=self.nn_config['dropout_pct']) self.model.compile(loss=self.loss_defn, optimizer='adam', metrics=[self.metric_defn]) if self.verbose >= 1: print(self.model.summary())
[docs] def train(self): """Train neural network with stratified KFold. """ cv = StratifiedKFold(n_splits=self.nn_config["n_CV_splits"]) cvscores = [] for train_idx, test_idx in cv.split(self._X_for_cvsplit, self.train_y): ytr = self.lb.transform(self.train_y[train_idx]) yte = self.lb.transform(self.train_y[test_idx]) if self.is_binary: ytr = to_categorical(ytr) yte = to_categorical(yte) if self.model_name == '1DCNN': xtr, xte = self.train_x[train_idx], self.train_x[test_idx] elif self.model_name == 'LSTM_multihead': xtr = _grab_structure_lstm_multihead_structure( self.train_x, train_idx) xte = _grab_structure_lstm_multihead_structure( self.train_x, test_idx) self.model.fit(xtr, ytr, epochs=self.nn_config["max_epochs"], batch_size=self.nn_config["batch_size"], verbose=self.verbose - 1) scores = self.model.evaluate(xte, yte, verbose=self.verbose) if self.verbose >= 1: print("%s: %.2f%%" % (self.model.metrics_names[1], scores[1] * 100)) cvscores.append(scores[1] * 100) return
[docs] def predict(self, batch_size=8): """Predict using the trained model. Parameters ---------- batch_size : int Number of samples per gradient update """ yhat = self.model.predict( self.test_x, batch_size=batch_size, verbose=self.verbose) idx = np.argmax(yhat, axis=-1) decoded_preds = np.zeros(yhat.shape) decoded_preds[np.arange(decoded_preds.shape[0]), idx] = 1 decoded_preds = self.lb.inverse_transform(decoded_preds) self.test_accuracy = accuracy_score(self.test_y, decoded_preds) print(classification_report( self.test_y, decoded_preds)) print(confusion_matrix(self.test_y, decoded_preds)) print('accuracy on test: ', self.test_accuracy)
def _1dcnn(self, input_shape, nfilters=64, kernel_size=12, dropout_pct=0.5): self.model = Sequential() self.model.add(Conv1D(filters=nfilters, kernel_size=kernel_size, activation='relu', input_shape=input_shape)) self.model.add(Dropout(dropout_pct)) self.model.add(Flatten()) self.model.add(Dense(100, activation='relu')) self.model.add(Dense(self.encoded_length, activation='softmax')) def _lstm_multihead(self, input_sample_shape, n_features, use_attention_LSTM, units, dropout_pct): inputs = [] for i in range(n_features): inputs.append(Input(shape=input_sample_shape)) activations = [] for i in range(n_features): activations.append(LSTM(units, return_sequences=False)(inputs[i])) dropouts = [] for i in range(n_features): dropouts.append(Dropout(dropout_pct)(activations[i])) if use_attention_LSTM: hidden_size = [] for i in range(n_features): hidden_size.append(int(dropouts[i].shape[2])) hidden_out = [] for i in range(n_features): hidden_out.append( Lambda(lambda x: x[:, -1, :], output_shape=(hidden_size[i],) )(dropouts[i])) pre_mlp = concatenate(hidden_out, name='attention_output') else: pre_mlp = concatenate(dropouts) d = Dense(int(((units - self.encoded_length) / 2) + self.encoded_length), activation='relu', kernel_initializer='normal')(pre_mlp) activations = Dense( self.encoded_length, activation='softmax', kernel_initializer='normal')(d) self.model = Model(inputs=inputs, outputs=[activations])