Source code for pvops.timeseries.preprocess

# A set of preprocessing methods, both based on photovoltaic-specific physics and data quality methods.

import pvlib
import pvanalytics
from timezonefinder import TimezoneFinder
import pandas as pd


[docs] def establish_solar_loc(prod_df, prod_col_dict, meta_df, meta_col_dict): """Adds solar position column using pvLib. Parameters ---------- prod_df : DataFrame A data frame corresponding to production data containing a datetime index. prod_col_dict : dict of {str : str} A dictionary that contains the column names associated with the production data, which consist of at least: - **siteid** (*string*), should be assigned to site-ID column name in prod_df meta_df : DataFrame A data frame corresponding to site metadata. At the least, the columns in meta_col_dict be present. The index must contain the site IDs used in prod_df. meta_col_dict : dict of {str : str} A dictionary that contains the column names relevant for the meta-data - **longitude** (*string*), should be assigned to site's longitude - **latitude** (*string*), should be assigned to site's latitude Returns ------- Original dataframe (copied) with new timeseries solar position data using the same column name definitions provided in pvLib. """ prod_df = prod_df.copy() meta_df = meta_df.copy() sites = prod_df['randid'].unique() longitude_col = meta_col_dict['longitude'] latitude_col = meta_col_dict['latitude'] positional_columns = ['apparent_zenith', 'zenith', 'apparent_elevation', 'elevation', 'azimuth', 'equation_of_time'] for site in sites: site_mask = prod_df[prod_col_dict['siteid']] == site prod_df.loc[site_mask, positional_columns] = ( pvlib.solarposition.spa_python(prod_df.loc[site_mask].index, meta_df.loc[site, longitude_col], meta_df.loc[site, latitude_col] )) return prod_df
[docs] def normalize_production_by_capacity(prod_df, prod_col_dict, meta_df, meta_col_dict): """Normalize power by capacity. This preprocessing step is meant as a step prior to a modeling attempt where a model is trained on multiple sites simultaneously. Parameters ---------- prod_df: DataFrame A data frame corresponding to production data. prod_df_col_dict: dict of {str : str} A dictionary that contains the column names associated with the production data, which consist of at least: - **energyprod** (*string*), should be assigned to production data in prod_df - **siteid** (*string*), should be assigned to site-ID column name in prod_df - **capacity_normalized_power** (*string*), should be assigned to a column name where the normalized output signal will be stored meta_df: DataFrame A data frame corresponding to site metadata. At the least, the columns in meta_col_dict be present. meta_col_dict: dict of {str : str} A dictionary that contains the column names relevant for the meta-data - **siteid** (*string*), should be assigned to site-ID column name - **dcsize** (*string*), should be assigned to column name corresponding to site's DC size Returns ------- prod_df : DataFrame normalized production data """ prod_df = prod_df.copy() meta_df = meta_df.copy() output_name = prod_col_dict["capacity_normalized_power"] power_name = prod_col_dict["energyprod"] dcsize_name = meta_col_dict["dcsize"] individual_sites = set(meta_df[meta_col_dict['siteid']].tolist()) for site in individual_sites: # Get site-specific meta data site_meta_mask = meta_df.loc[:, meta_col_dict["siteid"]] == site site_prod_mask = prod_df.loc[:, prod_col_dict["siteid"]] == site # Calculate and save power/capacity prod_df.loc[site_prod_mask, output_name] = \ prod_df.loc[site_prod_mask, power_name] / \ meta_df.loc[site_meta_mask, dcsize_name].iloc[0] return prod_df
[docs] def prod_irradiance_filter(prod_df, prod_col_dict, meta_df, meta_col_dict, drop=True, irradiance_type='ghi', csi_max=1.1 ): """Filter rows of production data frame according to performance and data quality. THIS METHOD IS CURRENTLY IN DEVELOPMENT. Parameters ---------- prod_df : DataFrame A data frame corresponding to production data. prod_df_col_dict : dict of {str : str} A dictionary that contains the column names associated with the production data, which consist of at least: - **timestamp** (*string*), should be assigned to associated time-stamp column name in prod_df - **siteid** (*string*), should be assigned to site-ID column name in prod_df - **irradiance** (*string*), should be assigned to associated irradiance column name in prod_df - **clearsky_irr** (*string*), should be assigned to clearsky irradiance column name in prod_df meta_df : DataFrame A data frame corresponding to site metadata. At the least, the columns in meta_col_dict be present. meta_col_dict : dict of {str : str} A dictionary that contains the column names relevant for the meta-data - **siteid** (*string*), should be assigned to site-ID column name - **latitude** (*string*), should be assigned to column name corresponding to site's latitude - **longitude** (*string*), should be assigned to column name corresponding to site's longitude irradiance_type : str A string description of the irradiance_type which was passed in prod_df. Options: `ghi`, `dni`, `dhi`. In future, `poa` may be a feature. csi_max: int A pvanalytics parameter of maximum ratio of measured to clearsky (clearsky index). Returns ------- prod_df: DataFrame A dataframe with new **clearsky_irr** column. If drop=True, a filtered prod_df according to clearsky. clearsky_mask : series Returns True for each value where the clearsky index is less than or equal to csi_mask """ prod_df = prod_df.copy() meta_df = meta_df.copy() irr_name = prod_col_dict["irradiance"] clearsky_irr_name = prod_col_dict["clearsky_irr"] individual_sites = set(meta_df[meta_col_dict['siteid']].tolist()) for site in individual_sites: # Get site-specific meta data # site_meta_data= meta_df[meta_df[meta_col_dict['siteid']] == site] site_meta_mask = meta_df.loc[:, meta_col_dict["siteid"]] == site site_prod_mask = prod_df.loc[:, prod_col_dict["siteid"]] == site # Save times in object prod_times = prod_df.loc[site_prod_mask, prod_col_dict['timestamp']].tolist() # Extract site's position latitude = meta_df.loc[site_meta_mask, meta_col_dict['latitude']].tolist()[ 0] longitude = meta_df.loc[site_meta_mask, meta_col_dict['longitude']].tolist()[ 0] # Derive tf = TimezoneFinder() derived_timezone = tf.timezone_at(lng=longitude, lat=latitude) # Define Location object # Altitude is not passed because it's not available usually. Fortunately, a clearsky # model exists which does not use altitude. loc = pvlib.location.Location(latitude, longitude, tz=derived_timezone) times = pd.DatetimeIndex( data=prod_times, tz=loc.tz, ) # Derive clearsky values cs = loc.get_clearsky(times, model='haurwitz') # Localize timestamps cs.index = cs.index.tz_localize(None) if irradiance_type == 'poa': raise ValueError( "POA is currently not configured because it requires `surface_tilt` and `surface_azimuth`, \ a trait which is not usually in the meta data.") # Establish solarposition # solpos = pvlib.solarposition.get_solarposition(prod_times, # latitude, longitude) # # Returns dataframe with columns: # # 'poa_global', 'poa_direct', 'poa_diffuse', 'poa_sky_diffuse', 'poa_ground_diffuse' # cs_POA_irradiance = pvlib.irradiance.get_total_irradiance( # surface_tilt=20, # surface_azimuth=180, # dni=cs['dni'], # ghi=cs['ghi'], # dhi=cs['dhi'], # solar_zenith=solpos['apparent_zenith'].tolist(), # solar_azimuth=solpos['azimuth']) # df = pd.merge(df, POA_irradiance, how="inner", left_index=True, right_index=True) elif irradiance_type in ['dni', 'ghi', 'dhi']: prod_df[clearsky_irr_name] = cs[irradiance_type] else: raise ValueError( "Incorrect value passed to `irradiance_type`. Expected ['dni','ghi', or 'dhi']") mask_series = pvanalytics.quality.irradiance.clearsky_limits( prod_df[irr_name], prod_df[clearsky_irr_name], csi_max=csi_max) prod_df['mask'] = mask_series if not drop: return prod_df, mask_series if drop: prod_df = prod_df[prod_df['mask'] == False] prod_df.drop(columns=['mask'], inplace=True) return prod_df, mask_series
[docs] def prod_inverter_clipping_filter(prod_df, prod_col_dict, meta_df, meta_col_dict, model, **kwargs): """Filter rows of production data frame according to performance and data quality Parameters ---------- prod_df : DataFrame A data frame corresponding to production data. prod_df_col_dict : dict of {str : str} A dictionary that contains the column names associated with the production data, which consist of at least: - **timestamp** (*string*), should be assigned to associated time-stamp column name in prod_df - **siteid** (*string*), should be assigned to site-ID column name in prod_df - **powerprod** (*string*), should be assigned to associated power production column name in prod_df meta_df : DataFrame A data frame corresponding to site metadata. At the least, the columns in meta_col_dict be present. meta_col_dict : dict of {str : str} A dictionary that contains the column names relevant for the meta-data - **siteid** (*string*), should be assigned to site-ID column name - **latitude** (*string*), should be assigned to column name corresponding to site's latitude - **longitude** (*string*), should be assigned to column name corresponding to site's longitude model : str A string distinguishing the inverter clipping detection model programmed in pvanalytics. Available options: ['geometric', 'threshold', 'levels'] kwargs: Extra parameters passed to the relevant pvanalytics model. If none passed, defaults are used. Returns ------- prod_df : DataFrame If drop=True, a filtered dataframe with clipping periods removed is returned. """ prod_df = prod_df.copy() meta_df = meta_df.copy() individual_sites = set(meta_df[meta_col_dict['siteid']].tolist()) for site in individual_sites: site_prod_mask = prod_df.loc[:, prod_col_dict["siteid"]] == site ac_power = prod_df.loc[site_prod_mask, prod_col_dict["powerprod"]] if len(ac_power) == 0: # If no rows exist for this company, skip it. continue if model == 'geometric': window = kwargs.get('window') slope_max = kwargs.get('slope_max') or 0.2 freq = kwargs.get('freq') # Optional tracking = kwargs.get('tracking') or False prod_df.loc[site_prod_mask, "mask"] = pvanalytics.features.clipping.geometric( ac_power, window=window, slope_max=slope_max, freq=freq, tracking=tracking) elif model == 'threshold': slope_max = kwargs.get('slope_max') or 0.0035 power_min = kwargs.get('power_min') or 0.75 power_quantile = kwargs.get('power_quantile') or 0.995 freq = kwargs.get('freq') # Optional prod_df.loc[site_prod_mask, "mask"] = pvanalytics.features.clipping.threshold( ac_power, slope_max=slope_max, power_min=power_min, power_quantile=power_quantile, freq=freq) elif model == 'levels': window = kwargs.get('window') or 4 fraction_in_window = kwargs.get('fraction_in_window') or 0.75 rtol = kwargs.get('rtol') or 0.005 levels = kwargs.get('levels') or 2 prod_df.loc[site_prod_mask, "mask"] = pvanalytics.features.clipping.levels( ac_power, window=window, fraction_in_window=fraction_in_window, rtol=rtol, levels=levels) else: raise ValueError( "Invalid value passed to parameter `calculation`. Expected a value in ['geometric', 'threshold', 'levels']") return prod_df
[docs] def identify_right_censored_data(om_df, col_dict): """ Identify censored data for site-group pairs in a given DataFrame. This function processes a DataFrame containing failure events to identify the first observed failure for each site-group pair and the last failure for each site. It constructs a new DataFrame that includes both observed and right-censored data, where unobserved site-group pairs are reported with the time of the last observed failure for that site. Parameters ---------- om_df : pandas.DataFrame A DataFrame containing failure data with at least two columns specified in `col_dict`: one for grouping and one for site. col_dict : dict A dictionary containing the following keys: - 'group_by': The column name to group by. - 'site': The column name representing the site. Returns ------- pandas.DataFrame A DataFrame indexed by unique site-group pairs, containing the first observed failure times and the last failure times, with an additional column indicating whether the failure was observed or censored. """ # extract the columns we need group_by = col_dict['group_by'] site = col_dict['site'] # find the first failure of a given site-group_by pair first_fails_df = om_df.groupby([site, group_by]).first() first_fails_df['was_observed'] = True # find the last failure for a given site last_fails_df = om_df.groupby(site).last().drop(columns=[group_by]) # we don't care about the group_by value last_fails_df['was_observed'] = False # initialize dataframe with a row for every unique site-group_by pair unique_sites = om_df[site].unique() unique_group_bys = om_df[group_by].unique() all_sites_assets_df = pd.DataFrame(index=pd.MultiIndex.from_product([unique_sites, unique_group_bys], names=[site, group_by]), columns=first_fails_df.columns, dtype=first_fails_df.dtypes.values) # prefill dataframe with the last possible times (the censored times) for unique_site in unique_sites: all_sites_assets_df.loc[(unique_site, slice(None)), :] = last_fails_df.loc[unique_site].values # for every row that did have a recorded event, replace the censored time with the observed one all_sites_assets_df.loc[first_fails_df.index] = first_fails_df # set the column dtypes appropriately return all_sites_assets_df.astype(first_fails_df.dtypes)