Source code for pecos.graphics

"""
The graphics module contains functions to generate scatter, time series, and 
heatmap plots for reports.
"""
import pandas as pd
import numpy as np
try:
    import matplotlib.pyplot as plt
    from matplotlib.dates import DateFormatter
except:
    pass
try:
    import plotly
except:
    pass
import textwrap
import os
import logging

try:
    from nose.tools import nottest as _nottest
except ImportError:
    def _nottest(afunction):
        return afunction
     
NoneType = type(None)

logger = logging.getLogger(__name__)

[docs]def plot_scatter(x,y,xaxis_min=None, xaxis_max=None, yaxis_min=None, yaxis_max=None, title=None, figsize=(7.0, 3.0)): """ Create a scatter plot. If x and y have the same number of columns, then the columns of x are plotted against the corresponding columns of y, in order. If x (or y) has 1 column, then that column of data is plotted against all the columns in y (or x). Parameters ---------- x : pandas DataFrame X data y : pandas DataFrame Y data xaxis_min : float, optional X-axis minimum, default = None (autoscale) xaxis_max : float, optional X-axis maximum, default = None (autoscale) yaxis_min : float, optional Y-axis minimum, default = None (autoscale) yaxis_max : float, optional Y-axis maximum, default = None (autoscale) title : string, optional Title, default = None figsize : tuple, optional Figure size, default = (7.0, 3.0) """ plt.figure(figsize = figsize) ax = plt.gca() try: if x.shape[1] == y.shape[1]: for i in range(x.shape[1]): plt.plot(x.iloc[:,i],y.iloc[:,i], '.', markersize=3) plt.xticks(rotation='vertical') elif x.shape[1] != y.shape[1]: if x.shape[1] == 1: for col in y.columns: plt.plot(x,y[col], '.', markersize=3) plt.xticks(rotation='vertical') elif y.shape[1] == 1: for col in x.columns: plt.plot(x[col],y, '.', markersize=3) plt.xticks(rotation='vertical') except: plt.text(0.3,0.5,'Insufficient Data', fontsize=8) # Format axis xmin_plt, xmax_plt = plt.xlim() ymin_plt, ymax_plt = plt.ylim() if xaxis_min is None: xaxis_min = xmin_plt if xaxis_max is None: xaxis_max = xmax_plt if yaxis_min is None: yaxis_min = ymin_plt if yaxis_max is None: yaxis_max = ymax_plt plt.xlim((xaxis_min, xaxis_max)) plt.ylim((yaxis_min, yaxis_max)) if title: plt.title(title) ax.tick_params(axis='both', labelsize=8) box = ax.get_position() ax.set_position([box.x0, box.y0+0.15, box.width, box.height*0.75])
[docs]def plot_timeseries(data, tfilter=None, test_results_group=None, xaxis_min=None, xaxis_max=None, yaxis_min=None, yaxis_max=None, title=None, figsize=(7.0, 3.0), date_formatter=None): """ Create a time series plot using each column in the DataFrame. Parameters ---------- data : pandas DataFrame or Series Data, indexed by time tfilter : pandas Series, optional Boolean values used to include time filter in the plot, default = None test_results_group : pandas DataFrame, optional Test results for the data default = None xaxis_min : float, optional X-axis minimum, default = None (autoscale) xaxis_max : float, optional X-axis maximum, default = None (autoscale) yaxis_min : float, optional Y-axis minimum, default = None (autoscale) yaxis_max : float, optional Y-axis maximum, default = None (autoscale) title : string, optional Title, default = None figsize : tuple, optional Figure size, default = (7.0, 3.0) date_formatter : string, optional Date formatter used on the x axis, for example, "%m-%d". Default = None """ assert isinstance(data, (pd.Series, pd.DataFrame)) assert isinstance(tfilter, (NoneType, pd.Series)) plt.figure(figsize = figsize) ax = plt.gca() try: # plot time series if isinstance(data, pd.Series): data.plot(ax=ax, linewidth=0.5, grid=False, legend=False, color='k', fontsize=8, rot=90, label='Data', x_compat=True) else: data.plot(ax=ax, linewidth=1, grid=False, legend=False, fontsize=8, rot=90, label='Data') if isinstance(tfilter, pd.Series): # add tfilter temp = np.where(tfilter - tfilter.shift()) temp = np.append(temp[0],len(tfilter)-1) count = 0 for i in range(len(temp)-1): if tfilter[temp[i]] == 0: if count == 0: ax.axvspan(data.index[temp[i]], data.index[temp[i+1]], facecolor='k', alpha=0.2, label='Time filter') count = count+1 else: ax.axvspan(data.index[temp[i]], data.index[temp[i+1]], facecolor='k', alpha=0.2) # add errors try: if test_results_group.empty: test_results_group = None except: pass if test_results_group is not None: key2 = test_results_group['Error Flag'].fillna('') grouped2 = test_results_group.groupby(key2) for error_flag in key2.unique(): test_results_group2 = grouped2.get_group(error_flag) error_label = '\n'.join(textwrap.wrap(error_flag, 30)) if len(test_results_group2.index.values) > 4: warning_label = '\n'.join(textwrap.wrap('Warning ' + str(test_results_group2.index.values[0:4]).strip('[]') + '...', 30)) else: warning_label = '\n'.join(textwrap.wrap('Warning ' + str(test_results_group2.index.values).strip('[]'), 30)) error_label = error_label + '\n' + warning_label date_idx2 = np.array([False]*len(data.index)) for row2 in range(len(test_results_group2.index)): s_index = test_results_group2.columns.get_loc("Start Time") e_index = test_results_group2.columns.get_loc("End Time") date_idx2 = date_idx2 + ((data.index >= test_results_group2.iloc[row2,s_index]) & (data.index <= test_results_group2.iloc[row2,e_index])) if sum(date_idx2) == 0: continue data2 = data[date_idx2] if error_flag in ['Duplicate timestamp', 'Missing data', 'Corrupt data', 'Nonmonotonic timestamp']: continue #if "Data" in error_flag: # color='r' #elif "Delta" in error_flag: # color = 'g' #else: # Outlier # color = 'b' try: ax.scatter(data2.index, data2.values, marker='+', # c=color, linewidths=1, label=error_label) except: ax.scatter(data2.index[0], data2.values[0], marker='+', # c=color, linewidths=1, label=error_label) # Format axis xmin_plt, xmax_plt = plt.xlim() ymin_plt, ymax_plt = plt.ylim() if tfilter is not None: ymin_plt = np.nanmin(data[tfilter].values) ymax_plt = np.nanmax(data[tfilter].values) if np.abs(ymin_plt - ymax_plt) < 0.01: ymin_plt, ymax_plt = plt.ylim() except: plt.text(0.3,0.5,'Insufficient Data', fontsize=8) xmin_plt, xmax_plt = plt.xlim() ymin_plt, ymax_plt = plt.ylim() # Format axis y_range = (ymax_plt - ymin_plt) if xaxis_min is None: xaxis_min = xmin_plt if xaxis_max is None: xaxis_max = xmax_plt if yaxis_min is None: yaxis_min = ymin_plt-y_range/10 if yaxis_max is None: yaxis_max = ymax_plt+y_range/10 plt.xlim((xaxis_min, xaxis_max)) plt.ylim((yaxis_min, yaxis_max)) if title: plt.title(title) ax.get_yaxis().get_major_formatter().set_useOffset(False) ax.tick_params(axis='both', labelsize=8) plt.xlabel('Time', fontsize=8) box = ax.get_position() ax.set_position([box.x0, box.y0+0.15, box.width, box.height*0.75]) if date_formatter is not None: date_form = DateFormatter(date_formatter) ax.xaxis.set_major_formatter(date_form)
[docs]def plot_interactive_timeseries(data, xaxis_min=None, xaxis_max=None, yaxis_min=None, yaxis_max=None, title=None, filename=None, auto_open=True): """ Create a basic interactive time series graphic using plotly. Many more options are available, see https://plot.ly for more details. Parameters ---------- data : pandas DataFrame Data, indexed by time xaxis_min : float, optional X-axis minimum, default = None (autoscale) xaxis_max : float, optional X-axis maximum, default = None (autoscale) yaxis_min : float, optional Y-axis minimum, default = None (autoscale) yaxis_max : float, optional Y-axis maximum, default = None (autoscale) title : string, optional Title, default = None filename : string, optional HTML file name, default = None (file will be named temp-plot.html) auto_open : boolean, optional Flag indicating if HTML graphic is opened, default = True """ layout = dict(hovermode = 'closest') layout = dict(title=title, hovermode = 'closest', xaxis=dict(range=[xaxis_min,xaxis_max]), yaxis=dict(range=[yaxis_min,yaxis_max])) plotly_data = [] for col in data.columns: trace = plotly.graph_objs.Scatter(x=data.index.tz_localize(None), y=data.loc[:,col], name = col) plotly_data.append(trace) fig = dict(data=plotly_data, layout=layout) if filename: plotly.offline.plot(fig, filename=filename, auto_open=auto_open) else: plotly.offline.plot(fig, auto_open=auto_open)
[docs]def plot_heatmap(data, colors=None, nColors=12, cmap=None, vmin=None, vmax=None, show_axis=False, title=None, figsize=(5.0, 5.0)): """ Create a heatmap. Default color scheme is red to yellow to green with 12 colors. This function can be used to generate dashboards with simple color indicators in each cell (to remove borders use bbox_inches='tight' and pad_inches=0 when saving the image). Parameters ----------- data : pandas DataFrame, pandas Series, or numpy array Data colors : list or None, optional List of colors, colors can be specified in any way understandable by matplotlib.colors.ColorConverter.to_rgb(). If None, colors transitions from red to yellow to green. num_colors : int, optional Number of colors in the colormap, default = 12 cmap : string, optional Colormap, default = None. Overrides colors and num_colors listed above. vmin : float, optional Colomap minimum, default = None (autoscale) vmax : float, optional Colomap maximum, default = None (autoscale) title : string, optional Title, default = None figsize : tuple, optional Figure size, default = (5.0, 5.0) """ if colors is None: colors = [(0.75, 0.15, 0.15), (1, 0.75, 0.15), (0.15, 0.75, 0.15)] if isinstance(data, (pd.DataFrame, pd.Series)): data = data.values if len(data.shape) == 1: data = np.expand_dims(data, axis=0) if not cmap: from matplotlib.colors import LinearSegmentedColormap cmap = LinearSegmentedColormap.from_list(name='custom', colors=colors, N=nColors) plt.figure(figsize = figsize) fig = plt.imshow(data, cmap=cmap, aspect='equal', vmin=vmin, vmax=vmax) if not show_axis: plt.axis('off') fig.axes.get_xaxis().set_visible(False) fig.axes.get_yaxis().set_visible(False) if title: plt.title(title) plt.tight_layout()
[docs]def plot_doy_heatmap(data, cmap='nipy_spectral', vmin=None, vmax=None, overlay=None, title=None, figsize=(7.0, 3.0)): """ Create a day-of-year (X-axis) vs. time-of-day (Y-axis) heatmap. Parameters ---------- data : pandas DataFrame or pandas Series Data (single column), indexed by time cmap : string, optional Colomap, default = nipy_spectral vmin : float, optional Colomap minimum, default = None (autoscale) vmax : float, optional Colomap maximum, default = None (autoscale) overlay : pandas DataFrame, optional Data to overlay on the heatmap. Time index should be in day-of-year (X-axis) Values should be in time-of-day in minutes (Y-axis) title : string, optional Title, default = None figsize : tuple, optional Figure size, default = (7.0, 3.0) """ if type(data) is pd.core.series.Series: data = data.to_frame() # Convert data to a pivot table col_name = data.columns[0] data['X'] = data.index.dayofyear data['Y'] = data.index.hour*60 + \ data.index.minute + \ data.index.second/60 + \ data.index.microsecond/(60*1000000.0) piv = pd.pivot_table(data,values=col_name,index=['Y'],columns=['X'],fill_value=np.NaN) # Create the heatmap plt.figure(figsize = figsize) fig, ax = plt.subplots(figsize=figsize) im = ax.imshow(piv, cmap=cmap, aspect='auto', vmin=vmin, vmax=vmax, extent=[data['X'].min()-0.5,data['X'].max()+0.5, data['Y'].max()-0.5,data['Y'].min()+0.5]) fig.colorbar(im, ax=ax) # Add overlay if type(overlay) is pd.core.frame.DataFrame: overlay.plot(ax=ax) # Add title and labels if title: ax.set_title(title) ax.set_xlabel("Day of the year") ax.set_ylabel("Time of day (minutes)") plt.tight_layout()
[docs]@_nottest def plot_test_results(data, test_results, tfilter=None, image_format='png', dpi=500, figsize=(7.0,3.0), date_formatter=None, filename_root='test_results'): """ Create test results graphics which highlight data points that failed a quality control test. Parameters ---------- data : pandas DataFrame Data, indexed by time (pm.data) test_results : pandas DataFrame Summary of the quality control test results (pm.test_results) tfilter : pandas Series, optional Boolean values used to include time filter in the plot, default = None image_format : string , optional Image format, default = 'png' dpi : int, optional DPI resolution, default = 500 figsize : tuple, optional Figure size, default = (7.0,3.0) date_formatter : string, optional Date formatter used on the x axis, for example, "%m-%d". Default = None filename_root : string, optional File name root. If the full path is not provided, files are saved into the current working directory. Each graphic filename is appended with an integer. For example, filename_root = 'test' will generate a files named 'test0.png', 'test1.png', etc. By default, the filename root is 'test_results' Returns ---------- A list of file names """ if os.path.dirname(filename_root) == '': full_filename_root = os.path.join(os.getcwd(), filename_root) else: full_filename_root = os.path.abspath(filename_root) # Colect file names test_results_graphics = [] if test_results.empty: return test_results_graphics graphic = 0 test_results.sort_values(list(test_results.columns), inplace=True) test_results.index = np.arange(1, test_results.shape[0]+1) # Remove specific error flags remove_error_flags = ['Duplicate timestamp', 'Missing data', 'Corrupt data', 'Missing timestamp', 'Nonmonotonic timestamp'] test_results = test_results[-test_results['Error Flag'].isin(remove_error_flags)] grouped = test_results.groupby(['Variable Name']) for col_name, test_results_group in grouped: logger.info("Creating graphic for " + col_name) plot_timeseries(data[col_name], tfilter, test_results_group=test_results_group, figsize=figsize, date_formatter=date_formatter) ax = plt.gca() box = ax.get_position() ax.set_position([box.x0, box.y0, box.width*0.65, box.height]) plt.legend(loc='center left', bbox_to_anchor=(1, 0.5), fontsize=8) plt.title(col_name, fontsize=8) filename = full_filename_root + str(graphic) + '.' + image_format test_results_graphics.append(filename) plt.savefig(filename, format=image_format, dpi=dpi) graphic = graphic + 1 plt.close() return test_results_graphics