Source code for pecos.io

"""
The io module contains functions to read/send data and write results to 
files/html reports.
"""
import pandas as pd
import numpy as np
import logging
import os
from os.path import abspath, dirname, join
import pecos.graphics
import datetime
from jinja2 import Environment, PackageLoader
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
import base64

try:
    from sqlalchemy import create_engine
    import minimalmodbus 
except:
    pass

try:
    from nose.tools import nottest as _nottest
except ImportError:
    def _nottest(afunction):
        return afunction
        
logger = logging.getLogger(__name__)

env = Environment(loader=PackageLoader('pecos', 'templates'))

[docs]def read_campbell_scientific(filename, index_col='TIMESTAMP', encoding=None): """ Read Campbell Scientific CSV file. Parameters ---------- filename : string File name index_col : string, optional Index column name, default = 'TIMESTAMP' encoding : string, optional Character encoding (i.e. utf-16) Returns --------- pandas DataFrame Data """ logger.info("Reading Campbell Scientific CSV file " + filename) try: df = pd.read_csv(filename, skiprows=1, encoding=encoding, index_col=index_col, parse_dates=True, dtype ='unicode', error_bad_lines=False) #, low_memory=False) df = df[2:] index = pd.to_datetime(df.index) Unnamed = df.filter(regex='Unnamed') df = df.drop(Unnamed.columns, 1) df = pd.DataFrame(data = df.values, index = index, columns = df.columns, dtype='float64') except: logger.warning("Cannot extract database, CSV file reader failed " + filename) df = pd.DataFrame() return # Drop rows with NaT (not a time) in the index try: df.drop(pd.NaT, inplace=True) except: pass return df
[docs]def send_email(subject, body, recipient, sender, attachment=None, host='localhost', username=None, password=None): """ Send email using Python smtplib and email packages. Parameters ---------- subject : string Subject text body : string Email body, in HTML or plain format recipient : list of string Recipient email address or addresses sender : string Sender email address attachment : string, optional Name of file to attach host : string, optional Name of email host (or host:port), default = 'localhost' username : string, optional Email username for authentication password : string, optional Email password for authentication """ logger.info("Sending email") msg = _create_email_message(subject, body, recipient, sender) if attachment is not None: fp = open(attachment, "rb") # Read as a binary file, even if it's text att = MIMEApplication(fp.read()) att.add_header('Content-Disposition', 'attachment', filename=os.path.basename(attachment)) fp.close() msg.attach(att) s = smtplib.SMTP(host) try: # Authentication s.ehlo() s.starttls() s.login(username, password) except: pass s.sendmail(sender, recipient, msg.as_string()) s.quit()
def _create_email_message(subject, body, recipient, sender): msg = MIMEMultipart() msg['Subject'] = subject msg['To'] = ', '.join(recipient) msg['From'] = sender if "</html>" in body.lower(): content = MIMEText(body, 'html') else: content = MIMEText(body, 'plain') msg.attach(content) return msg
[docs]def write_metrics(metrics, filename='metrics.csv'): """ Write metrics file. Parameters ----------- metrics : pandas DataFrame Data to add to the metrics file filename : string, optional File name. If the full path is not provided, the file is saved into the current working directory. By default, the file is named 'metrics.csv' Returns ------------ string filename """ logger.info("Write metrics file") try: previous_metrics = pd.read_csv(filename, index_col='TIMESTEP') #, parse_dates=True) except: previous_metrics = pd.DataFrame() metrics.index = metrics.index.to_native_types() # this is necessary when using time zones metrics = metrics.combine_first(previous_metrics) if os.path.dirname(filename) == '': full_filename = os.path.join(os.getcwd(), filename) else: full_filename = filename fout = open(full_filename, 'w') metrics.to_csv(fout, index_label='TIMESTEP', na_rep = 'NaN') fout.close() return full_filename
[docs]@_nottest def write_test_results(test_results, filename='test_results.csv'): """ Write test results file. Parameters ----------- test_results : pandas DataFrame Summary of the quality control test results (pm.test_results) filename : string, optional File name. If the full path is not provided, the file is saved into the current working directory. By default, the file is named 'test_results.csv' Returns ------------ string filename """ test_results.sort_values(list(test_results.columns), inplace=True) test_results.index = np.arange(1, test_results.shape[0]+1) logger.info("Writing test results csv file " + filename) if os.path.dirname(filename) == '': full_filename = os.path.join(os.getcwd(), filename) else: full_filename = filename fout = open(full_filename, 'w') test_results.to_csv(fout, na_rep = 'NaN') fout.close() return full_filename
[docs]def write_monitoring_report(data, test_results, test_results_graphics=None, custom_graphics=None, metrics=None, title='Pecos Monitoring Report', config=None, logo=False, im_width_test_results=1, im_width_custom=1, im_width_logo=0.1, encode=False, file_format='html', filename='monitoring_report.html'): """ Generate a monitoring report. The monitoring report is used to report quality control test results for a single system. The report includes custom graphics, performance metrics, and test results. Parameters ---------- data : pandas DataFrame Data, indexed by time (pm.data) test_results : pandas DataFrame Summary of the quality control test results (pm.test_results) test_results_graphics : list of strings or None, optional Graphics files, with full path. These graphics highlight data points that failed a quality control test, created using pecos.graphics.plot_test_results(). If None, test results graphics are not included in the report. custom_graphics : list of strings or None, optional Custom files, with full path. Created by the user. If None, custom graphics are not included in the report. metrics : pandas Series or DataFrame, optional Performance metrics to add as a table to the monitoring report title : string, optional Monitoring report title, default = 'Pecos Monitoring Report' config : dictionary or None, optional Configuration options, to be printed at the end of the report. If None, configuration options are not included in the report. logo : string, optional Graphic to be added to the report header im_width_test_results : float, optional Image width as a fraction of page size, for test results graphics, default = 1 im_width_custom : float, optional Image width as a fraction of page size, for custom graphics, default = 1 im_width_logo: float, optional Image width as a fraction of page size, for the logo, default = 0.1 encode : boolean, optional Encode graphics in the html, default = False filename : string, optional File name. If the full path is not provided, the file is saved into the current working directory. By default, the file is named 'monitoring_report.html' Returns ------------ string filename """ logger.info("Writing HTML report") if test_results_graphics is None: test_results_graphics = [] if custom_graphics is None: custom_graphics = [] if config is None: config = {} if data.empty: logger.warning("Empty database") start_time = 'NaN' end_time = 'NaN' else: start_time = data.index[0] end_time = data.index[-1] # Set pandas display option pd.set_option('display.max_colwidth', None) pd.set_option('display.width', 40) # Collect notes (from the logger file) try: logfiledir = os.path.join(dirname(abspath(__file__))) f = open(join(logfiledir,'logfile'), 'r') notes = f.read() f.close() notes_df = pd.DataFrame(notes.splitlines()) notes_df.index += 1 except: notes_df = pd.DataFrame() test_results.sort_values(list(test_results.columns), inplace=True) test_results.index = np.arange(1, test_results.shape[0]+1) # Convert to html format if metrics is None: metrics = pd.DataFrame() content = {'start_time': str(start_time), 'end_time': str(end_time), 'num_notes': str(notes_df.shape[0]), 'num_data_columns': str(data.shape[1]), 'num_test_results': str(test_results.shape[0]), 'num_metrics': str(metrics.shape[0]), 'config': config} title = os.path.basename(title) if file_format == 'html': content['test_results_graphics'] = test_results_graphics content['custom_graphics'] = custom_graphics if isinstance(metrics, pd.Series): metrics_html = metrics.to_frame().to_html(header=False) if isinstance(metrics, pd.DataFrame): metrics_html = metrics.to_html(justify='left') content['metrics'] = metrics_html content['test_results'] = test_results.to_html(justify='left') content['notes'] = notes_df.to_html(justify='left', header=False) im_width_test_results = im_width_test_results*800 im_width_custom = im_width_custom*800 im_width_logo = im_width_logo*800 file_string = _html_template_monitoring_report(content, title, logo, im_width_test_results, im_width_custom, im_width_logo, encode) else: test_results_graphics = [g.replace('\\', '/') for g in test_results_graphics] custom_graphics = [g.replace('\\', '/') for g in custom_graphics] content['test_results_graphics'] = test_results_graphics content['custom_graphics'] = custom_graphics content['metrics'] = metrics.to_latex(longtable=True) content['test_results'] = test_results.to_latex(longtable=True) content['notes'] = notes_df.to_latex(longtable=True) file_string = _latex_template_monitoring_report(content, title, logo, im_width_test_results, im_width_custom, im_width_logo) # Write file if os.path.dirname(filename) == '': full_filename = os.path.join(os.getcwd(), filename) else: full_filename = filename fid = open(full_filename,"w") fid.write(file_string) fid.close() logger.info("") return full_filename
[docs]def write_dashboard(column_names, row_names, content, title='Pecos Dashboard', footnote='', logo=False, im_width=250, datatables=False, encode=False, filename='dashboard.html'): """ Generate a dashboard. The dashboard is used to compare results across multiple systems. Each cell in the dashboard includes custom system graphics and metrics. Parameters ---------- column_names : list of strings Column names listed in the order they should appear in the dashboard, i.e. ['location1', 'location2'] row_names : list of strings Row names listed in the order they should appear in the dashboard, i.e. ['system1', 'system2'] content : dictionary Dashboard content for each cell. Dictionary keys are tuples indicating the row name and column name, i.e. ('row name', 'column name'), where 'row name' is in the list row_names and 'column name' is in the list column_names. For each key, another dictionary is defined that contains the content to be included in each cell of the dashboard. Each cell can contain text, graphics, a table, and an html link. These are defined using the following keys: - text (string) = text at the top of each cell - graphics (list of strings) = a list of graphics file names. Each file name includes the full path - table (string) = a table in html format, for example a table of performance metrics. DataFrames can be converted to an html string using df.to_html() or df.transpose().to_html(). Values in the table can be color coded using pandas Styler class. - link (dict) = a dictionary where keys define the name of the link and values define the html link (with full path) For example:: content = {('row name', 'column name'): { 'text': 'text at the top', 'graphic': ['C:\\\\pecos\\\\results\\\\custom_graphic.png'], 'table': df.to_html(), 'link': {'Link to monitoring report': 'C:\\\\pecos\\\\results\\\\monitoring_report.html'}} title : string, optional Dashboard title, default = 'Pecos Dashboard' footnote : string, optional Text to be added to the end of the report logo : string, optional Graphic to be added to the report header im_width : float, optional Image width in the HTML report, default = 250 datatables : boolean, optional Use datatables.net to format the dashboard, default = False. See https://datatables.net/ for more information. encode : boolean, optional Encode graphics in the html, default = False filename : string, optional File name. If the full path is not provided, the file is saved into the current working directory. By default, the file is named 'dashboard.html' Returns ------------ string filename """ logger.info("Writing dashboard") # Set pandas display option pd.set_option('display.max_colwidth', -1) pd.set_option('display.width', 40) html_string = _html_template_dashboard(column_names, row_names, content, title, footnote, logo, im_width, datatables, encode) # Write html file if os.path.dirname(filename) == '': full_filename = os.path.join(os.getcwd(), filename) else: full_filename = filename html_file = open(full_filename,"w") html_file.write(html_string) html_file.close() logger.info("") return full_filename
def _latex_template_monitoring_report(content, title, logo, im_width_test_results, im_width_custom, im_width_logo): template = env.get_template('monitoring_report.tex') date = datetime.datetime.now() datestr = date.strftime('%m/%d/%Y') version = pecos.__version__ return template.render(**locals()) def _html_template_monitoring_report(content, title, logo, im_width_test_results, im_width_custom, im_width_logo, encode): # if encode == True, encode the images img_dic = {} if encode: for im in content['custom_graphics']: img_encode = base64.b64encode(open(im, "rb").read()).decode("utf-8") img_dic[im] = img_encode for im in content['test_results_graphics']: img_encode = base64.b64encode(open(im, "rb").read()).decode("utf-8") img_dic[im] = img_encode template = env.get_template('monitoring_report.html') date = datetime.datetime.now() datestr = date.strftime('%m/%d/%Y') version = pecos.__version__ return template.render(**locals()) def _html_template_dashboard(column_names, row_names, content, title, footnote, logo, im_width, datatables, encode): # if encode == True, encode the images img_dic = {} if encode: for column in column_names: for row in row_names: try: # not all row/columns have graphics for im in content[row, column]['graphics']: img_encode = base64.b64encode(open(im, "rb").read()).decode("utf-8") img_dic[im] = img_encode except: pass template = env.get_template('dashboard.html') date = datetime.datetime.now() datestr = date.strftime('%m/%d/%Y') version = pecos.__version__ return template.render(**locals())
[docs]def device_to_client(config): """ Read channels on modbus device, scale and calibrate the values, and store the data in a MySQL database. The inputs are provided by a configuration dictionary that describe general information for data acquisition and the devices. Parameters ---------- config : dictionary Configuration options, see :ref:`devicetoclient_config` """ # Extract Database Information sec0 = float(datetime.datetime.now().strftime('%s')) while True: logging.info('Device to client: '+str(datetime.datetime.now())) sec1 = float(datetime.datetime.now().strftime('%s')) if sec1 - sec0 >= config['Client']['Interval']: run = True sec0 = sec1 else: run = False if run: dt = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") labels,dall = [],[] retry = config['Client']['Retries'] for device in config['Devices']: # Read channels on modbus device instr = minimalmodbus.Instrument(device['USB'],device['Address']) instr.serial.baudrate = device['Baud'] instr.serial.bytesize = device['Bytes'] instr.serial.stopbits = device['Stopbits'] instr.serial.parity = device['Parity'] ds,ls = [],[] for data in device['Data']: i = 0 while i < retry: l = data['Name'] try: d = instr.read_register(data['Channel'], numberOfDecimals=data['Scale'], functioncode=data['Fcode'], signed=data['Signed']) * data['Conversion'] break except: if i == retry-1: d = np.nan else: pass i += 1 ds.append(d) ls.append(l) dall.extend(ds) labels.extend(ls) # Add datetime to collected channel values and labels dall.extend([dt]) labels.extend(['datetime']) logging.info(ds) # Convert collected data into pandas DataFrame format df = pd.DataFrame(dall).T df.columns = labels df = df.where((pd.notnull(df)),None) # Insert data into database try: # Connect to database engine = create_engine('mysql://'+config['Client']['Username']+ \ ':'+config['Client']['Password']+'@'+ \ config['Client']['IP']+'/'+ \ config['Client']['Database']) # Write DataFrame to database df.to_sql(name=config['Client']['Table'],con=engine, if_exists='append', index=False) #,dtype = data_type) except: pass