# visualizations
import matplotlib
import matplotlib.pyplot as plt
import networkx as nx
from sklearn.metrics import ConfusionMatrixDisplay
from networkx.algorithms import bipartite
# data structures
import numpy as np
import pandas as pd
# utils
import copy
import datetime
from collections import Counter
# Embedding
from sklearn.feature_extraction.text import TfidfVectorizer
from gensim.models.doc2vec import TaggedDocument
from pvops.text import preprocess
[docs]
def visualize_attribute_connectivity(
om_df,
om_col_dict,
figsize=(20, 10),
attribute_colors=["lightgreen", "cornflowerblue"],
edge_width_scalar=10,
graph_aargs={},
ax=None
):
"""Visualize a bipartite graph which shows the frequency of combinations between attributes
``ATTRIBUTE1_COL`` and ``ATTRIBUTE2_COL``.
Parameters
----------
om_df : DataFrame
A pandas dataframe containing O&M data, which contains columns specified in `om_col_dict`
om_col_dict : dict of {str: str}
A dictionary that contains the column names to be used in
visualization. Must have the following structure (with keys matching exactly)::
{'attribute1_col': string, 'attribute2_col': string}
figsize : tuple
Optional. Figure size, defaults to `(20,10)`. Ignored if `ax` is provided.
attribute_colors : list[str]
List of two strings which designate the colors for `attribute1_col` and `attribute2_col`, respectively.
edge_width_scalar : numeric
Weight utilized to scale widths based on number of connections between `attribute1_col` and `attribute2_col`.
Larger values will produce larger widths, and smaller values will produce smaller widths.
graph_aargs : dict
Optional, arguments passed to networkx graph drawer.
Suggested attributes to pass:
- with_labels=True
- font_weight='bold'
- node_size=19000
- font_size=35
ax : matplotlib.pyplot.Axes
Optional, axis to plot on. If not provided, will create a new instance.
Returns
-------
matplotlib figure instance, networkx graph
"""
# initialize figure
if ax is None:
fig, ax = plt.subplots(figsize=figsize)
else:
fig = ax.get_figure()
# attribute column names
ATTRIBUTE1_COL = om_col_dict["attribute1_col"]
ATTRIBUTE2_COL = om_col_dict["attribute2_col"]
ax.set_title(
f"Connectivity between {ATTRIBUTE2_COL} and {ATTRIBUTE1_COL}",
fontweight="bold",
)
# subset dataframe to relevant columns
df_mask = (om_df[ATTRIBUTE1_COL].notna() == True) & (om_df[ATTRIBUTE2_COL].notna() == True)
df = om_df.loc[df_mask].reset_index(drop=True)
# obtain connectivity weights between attributes
nx_data = {}
for attr1 in np.unique(df[ATTRIBUTE1_COL].tolist()):
df_iter = df[df[ATTRIBUTE1_COL] == attr1]
for attr2 in np.unique(df_iter[ATTRIBUTE2_COL].tolist()):
w = len(df_iter[df_iter[ATTRIBUTE2_COL] == attr2])
nx_data[(attr1, attr2)] = w
# create graph
G = nx.Graph()
G.add_nodes_from(df[ATTRIBUTE1_COL], bipartite=0)
G.add_nodes_from(df[ATTRIBUTE2_COL], bipartite=1)
G.add_edges_from(nx_data.keys())
# rescale weights and add to graph as attribute
max_weight = max(nx_data.values())
weights = []
for node1, node2 in nx_data:
weight = nx_data[node1, node2]
rescaled_weight = 1 + (edge_width_scalar * weight / max_weight) # between 1 and edge_width_scalar+1
G[node1][node2]["weight"] = rescaled_weight
weights.append(rescaled_weight)
# get bipartite positioning
top_nodes = list(df[ATTRIBUTE2_COL].unique())
pos = nx.drawing.layout.bipartite_layout(G, top_nodes, align='horizontal')
# assign colors based on attribute column
color_map = []
for node in G:
if node in np.unique(df[ATTRIBUTE2_COL].tolist()):
color_map.append(attribute_colors[1])
else:
color_map.append(attribute_colors[0])
nx.draw_networkx(
G,
width=weights,
node_color=color_map,
pos=pos,
**graph_aargs)
plt.show(block=False)
return fig, G
[docs]
def visualize_attribute_timeseries(
om_df, om_col_dict=None, date_structure="%Y-%m", figsize=(12, 6), cmap_name="brg", ax=None
):
"""Visualize stacked bar chart of attribute frequency over time, where x-axis is time and y-axis is count, displaying separate bars
for each label within the label column
Parameters
----------
om_df : DataFrame
A pandas dataframe of O&M data, which contains columns in `om_col_dict`
om_col_dict : dict of {str : str}
A dictionary that contains the column names relevant for the get_dates fn
- **label** (*string*), should be assigned to associated column name for the label/attribute of interest in om_df
- **date** (*string*), should be assigned to associated column name for the dates relating to the documents in om_df
date_structure : str
Controls the resolution of the bar chart's timeseries
Default : "%Y-%m". Can change to include finer resolutions (e.g., by including day, "%Y-%m-%d")
or coarser resolutions (e.g., by year, "%Y")
figsize : tuple
Optional, figure size. Ignored if `ax` is provided.
cmap_name : str
Optional, color map name in matplotlib
ax : matplotlib.pyplot.Axes
Optional, axis to plot on. If not provided, creates a new instance.
Returns
-------
Matplotlib figure instance
"""
df = om_df.copy()
LABEL_COLUMN = om_col_dict["label"]
DATE_COLUMN = om_col_dict["date"]
def restructure(vals, inds, ind_set):
out = np.zeros(len(ind_set))
for ind, val in zip(inds, vals):
loc = ind_set.index(ind)
out[loc] = val
return out
if ax is None:
fig, ax = plt.subplots(figsize=figsize)
else:
fig = ax.get_figure()
asset_set = list(set(df[LABEL_COLUMN].tolist()))
dates = df[DATE_COLUMN].tolist()
assets_list = df[LABEL_COLUMN].tolist()
full_date_list = [i.strftime(date_structure) for i in dates]
datetime_list = [
datetime.datetime.strptime(i, date_structure) for i in full_date_list
]
date_set = list(set(datetime_list))
date_set = sorted(date_set)
date_set = [i.strftime(date_structure) for i in date_set]
assets_list = np.array(assets_list)
asset_sums = []
index_sums = []
for dt in date_set:
inds = [i for i, x in enumerate(full_date_list) if x == dt]
alist = assets_list[inds]
index_sums += [dt] * len(alist)
asset_sums += list(alist)
asset_set = list(set(asset_sums))
newdf = pd.DataFrame()
newdf[LABEL_COLUMN] = asset_sums
newdf[DATE_COLUMN] = index_sums
cmap = matplotlib.colormaps.get_cmap(cmap_name).resampled(len(asset_set))
graphs = []
for i, a in enumerate(asset_set):
iter_ = newdf[newdf[LABEL_COLUMN] == a]
valcounts = iter_[DATE_COLUMN].value_counts()
valcounts.sort_index(inplace=True)
vals = restructure(valcounts.values, valcounts.index, date_set)
p = ax.bar(date_set, vals, color=cmap(i))
graphs.append(p[0])
ax.grid()
ax.legend(graphs, list(asset_set))
ax.set_xlabel("Month")
ax.set_ylabel(f"Affected {LABEL_COLUMN} counts")
ax.set_xticks(ax.get_xticks(), ax.get_xticklabels(), rotation=45)
return fig
[docs]
def visualize_cluster_entropy(
doc2vec, eval_kmeans, om_df, data_cols, ks, cmap_name="brg", ax=None
):
"""Visualize entropy of embedding space parition. Currently only supports doc2vec embedding.
Parameters
----------
doc2vec : Doc2Vec model instance
Instance of gensim.models.doc2vec.Doc2Vec
eval_kmeans : callable
Callable cluster fit function
For instance,
.. code-block:: python
def eval_kmeans(X,k):
km = KMeans(n_clusters=k)
km.fit(X)
return km
om_df : DataFrame
A pandas dataframe containing O&M data, which contains columns specified in om_col_dict
data_cols : list
List of column names (str) which have text data.
ks : list
List of k parameters required for the clustering mechanic `eval_kmeans`
cmap_name :
Optional, color map
ax : matplotlib.Axes
Optional, axis to plot on. If not provided, creates a new instance.
Returns
-------
Matplotlib figure instance
"""
df = om_df.copy()
cols = data_cols
if ax is None:
fig, ax = plt.subplots(figsize=(6, 6))
else:
fig = ax.get_figure()
cmap = plt.cm.get_cmap(cmap_name, len(cols) * 2)
for i, col in enumerate(cols):
X = df[col].tolist()
X = [x.lower() for x in X]
tokenized_data = [preprocess.regex_tokenize(x) for x in X]
doc2vec_data = [
TaggedDocument(words=x, tags=[str(i)]) for i, x in enumerate(tokenized_data)
]
model = copy.deepcopy(doc2vec)
model.build_vocab(doc2vec_data)
model.train(
doc2vec_data, total_examples=model.corpus_count, epochs=model.epochs
)
X_doc2vec = [model.infer_vector(tok_doc) for tok_doc in tokenized_data]
sse = []
clusters = []
for true_k in ks:
km = eval_kmeans(X_doc2vec, true_k)
sse.append(km.inertia_)
clusters.append(km.labels_)
ax.plot(
ks, sse, color=cmap(2 * i), marker="o", label=f"Doc2Vec + {col} entropy"
)
vectorizer = TfidfVectorizer()
X_tfidf = vectorizer.fit_transform(X)
sse = []
clusters = []
for true_k in ks:
km = eval_kmeans(X_tfidf, true_k)
sse.append(km.inertia_)
clusters.append(km.labels_)
ax.plot(
ks, sse, color=cmap(2 * i + 1), marker="o", label=f"TF-IDF + {col} entropy"
)
ax.grid()
ax.set_ylim(0, None)
ax.set_xlabel(r"Number of clusters *k*")
ax.set_ylabel("Sum of squared distance")
ax.legend()
return fig
[docs]
def visualize_document_clusters(cluster_tokens, min_frequency=20, ax=None):
"""Visualize words most frequently occurring in a cluster. Especially useful when visualizing
the results of an unsupervised partitioning of documents.
Parameters
----------
cluster_tokens : list
List of tokenized documents
min_frequency : int
Minimum number of occurrences that a word must have in a cluster for it to be visualized
ax : matplotlib.Axes
Optional, axis to plot on. If not provided, creates a new instance.
Returns
-------
Matplotlib figure instance
"""
# IDEA: instead of using frequency, use importance with other embeddings too
all_tokens = [item for sublist in cluster_tokens for item in sublist]
# important_words_freq is [[word1,freq1],[word2,freq2],...]
total_important_words_freq = Counter(all_tokens).most_common()
word_freq_df = pd.DataFrame(
total_important_words_freq, columns=["word", "freq"])
all_words_of_interest = []
for tokens in cluster_tokens:
# important_words_freq is [[word1,freq1],[word2,freq2],...]
important_words_freq = Counter(tokens).most_common()
for word, freq in important_words_freq:
if freq >= min_frequency:
all_words_of_interest.append(word)
unique_words = np.unique(all_words_of_interest)
cluster_list = []
freq_list = []
word_list = []
for wd in unique_words:
freq = word_freq_df[word_freq_df["word"] == wd]["freq"].tolist()[0]
clusters_this_wd = [
idx
for idx, words_in_cluster in enumerate(all_words_of_interest)
if wd in words_in_cluster
]
clusters_this_wd = list(map(str, clusters_this_wd))
cluster_list.append(", ".join(clusters_this_wd))
freq_list.append(freq)
word_list.append(wd)
if ax is None:
_, ax = plt.subplots(figsize=(12, 6))
filter_cluster_list = []
filter_freq_list = []
filter_word_list = []
for fr, cl, wd in sorted(zip(freq_list, cluster_list, word_list)):
filter_cluster_list.append(cl)
filter_freq_list.append(fr)
filter_word_list.append(wd)
df = pd.DataFrame(index=filter_cluster_list)
df["freq"] = filter_freq_list
df["freq"].plot(kind="barh", color="coral", ax=ax)
xbias = 0.3
ybias = 0.0
for idx, i in enumerate(ax.patches):
ax.text(
i.get_width() + xbias,
i.get_y() + ybias,
filter_word_list[idx],
color="dimgrey",
)
return ax.get_figure()
[docs]
def visualize_word_frequency_plot(tokenized_words,
title="",
font_size=16,
num_tokens=30,
graph_aargs={},
ax=None):
"""
Visualize the frequency distribution of words within a set of documents. This function
identifies unique tokens and counts how many times each appears.
Parameters
----------
tokenized_words : list
List of tokenized words
title : str
Optional, title of plot
font_size : int
Optional, unused. Left for compatibility.
graph_aargs : dict
Optional, other parameters passed to `plt.plot`.
Note certain specific keys are handled by the function directly rather than `plt.plot`,
analogously to the keyword arguments in nltk's `nltk.FreqDist.plot`, which
was previously called inside this function. These are:
- `'cumulative'`: computes the count cumulatively (in order of descending count)
- `'percents'`: shows the y-axis as a percent of all tokens instead of integer count
- `'show'`: whether to call show() the matplotlib.pyplot.Figure instance
ax : matplotlib.Axes
Optional, axis to plot on. Otherwise creates a new instance.
Returns
-------
tuple of (Matplotlib Figure instance, dict)
Notes
------
The returned dict is in the format {token: count} and includes every unique token in descending order of count
For more on nltk, see below.
Bird, Steven, Edward Loper and Ewan Klein (2009), Natural Language Processing with Python. O'Reilly Media Inc.
https://www.nltk.org/
"""
unique_tokens = list(set(tokenized_words))
unique_tokens.sort(key=(lambda token: tokenized_words.count(token)), reverse=True)
unique_tokens = unique_tokens[:num_tokens]
counts = [tokenized_words.count(token) for token in unique_tokens]
# trim number of tokens if number of unique ones is less than the requested number
num_tokens = min(num_tokens, len(unique_tokens))
if ax is None:
fig, ax = plt.subplots(figsize=(12, 6))
else:
fig = ax.get_figure()
# treat the nltk-inspired keywords
if 'cumulative' in graph_aargs:
counts = list(np.cumsum(counts))
ylabel = "Cumulative "
else:
ylabel = ""
if 'percents' in graph_aargs:
counts = [count / len(tokenized_words) * 100 for count in counts]
ylabel += "Percents"
else:
ylabel += "Counts"
if "show" in graph_aargs:
show = graph_aargs["show"]
else:
show = False
for used_keyword in ["show", "percents", "cumulative"]:
graph_aargs.pop(used_keyword, None)
# plot
ax.grid()
ax.plot(counts, **graph_aargs)
ax.set_xticks(range(num_tokens))
ax.set_xticklabels([token for token in unique_tokens], rotation=90)
ax.set_ylim(0, None)
ax.set_xlabel("Samples")
ax.set_ylabel(ylabel)
ax.set_title(title)
if show:
fig.show()
return fig, {token: count for token, count in zip(unique_tokens, counts)}
[docs]
def visualize_classification_confusion_matrix(om_df, col_dict, title='', ax=None):
"""Visualize confusion matrix comparing known categorical values, and predicted categorical values.
Parameters
----------
om_df : DataFrame
A pandas dataframe containing O&M data, which contains columns specified in om_col_dict
col_dict : dict of {str: str}
A dictionary that contains the column names needed:
- attribute_col : string, assigned to the true labels
- predicted_col : string, assigned to the predicted labels
title : str
Optional, title of plot
ax : matplotlib.Axes
Optional, axis to plot on. Otherwise creates a new instance.
Returns
-------
Matplotlib figure instance
"""
act_col = col_dict['attribute_col']
pred_col = col_dict['predicted_col']
# drop any predicted labels with no actual labels in the data, for a cleaner visual
no_real_values = [cat for cat in om_df[pred_col].unique() if cat not in om_df[act_col].unique()]
no_real_values_mask = om_df[pred_col].isin(no_real_values)
om_df = om_df[~no_real_values_mask]
caption_txt = f'NOTE: Predicted values\n{no_real_values}\nhad no actual values in the dataset.'
if ax is None:
fig, ax = plt.subplots(figsize=(12, 6))
else:
fig = ax.get_figure()
ConfusionMatrixDisplay.from_predictions(y_true=om_df[act_col],
y_pred=om_df[pred_col],
normalize='true',
ax=ax)
ax.set_xticks(ax.get_xticks(), ax.get_xticklabels(), rotation=90)
print(caption_txt)
ax.set_title(title)
fig.tight_layout()
return fig