Source code for corrinout

# -*- coding: utf_8 -*-
# manipulating and working with CSV files
import csv
# manipulating and working with excel files
from openpyxl import Workbook
# loading excel files
from openpyxl import load_workbook
from openpyxl.styles import numbers
# temp library
from tempfile import NamedTemporaryFile
# corrosion db library
import corrdb
# corrosion data manipulation
import corrdata
# regex library
import re
[docs] class CorrInOut: """ Class to import and export to/from excel/csv and from/to a db. """ # constructor def __init__(self): """ Instantiates the CorrInOut class """ self.errormsg="" # get last error message
[docs] def error(self) -> str: """ Get last error from library. Args: None needed Returns: str: The error message, if any """ return self.errormsg;
# import csv-file
[docs] def import_csv_file(self,fileobject) -> dict: """ Import CSV-file and return as a dict-instance Args: fileobject (NamedTemporaryFile): The file to import Returns: dict: The csv-file as a dict-structured object or None upon failure. Read failure message from the error()-method if applicable. """ method=__name__+"."+"import_csv_file" # go through temp-file and parse as a CSV-file return self.__process_csv_file(fileobject)
# private method that processes the csv-file def __process_csv_file(self,fileobject) -> dict: """ Processes a csv-file and puts data into a dict structure Args: fileobject (NamedTemporaryFile): CSV-file to process. Returns: dict: The csv-file as a dict-strucuted object or None upon failure. Read failure message from the error()-method if applicable. """ method=__name__+"."+"__process_csv_file" # create some variables needed csvreader="" data={} try: # attempt to read csv-data from temp-file csvreader = csv.reader(fileobject, delimiter=';', quotechar='\\') except Exception as ex: err=ex.message if hasattr(ex,"message") else "" self.errormsg=method+": Unable to read CSV-file: "+err return None else: # go through each row rpos=-1 for row in csvreader: rpos=rpos+1 if rpos == 0: continue # skip first row with headers for pos in range(0,len(row)): # get the fieldname of the current field fieldname=corrdb.COLORDER[pos] if pos < len(corrdb.COLORDER) else "" # only add values that are allowable if pos < len(corrdb.COLORDER): if rpos not in data: data[rpos]={} data[rpos]["replace"]=False data[rpos]["delete"]=False data[rpos]["exists"]=False data[rpos]["invalid"]=False data[rpos]["skip"]=False if "cols" not in data[rpos]: data[rpos]["cols"]={} data[rpos]["cols"]={} data[rpos]["cols"][fieldname]={} data[rpos]["cols"][fieldname]["name"]=fieldname data[rpos]["cols"][fieldname]["value"]=str(row[pos]) # return data dict return data # import an excel file
[docs] def import_excel_file(self,fileobject) -> dict: """ Import Excel-file and return as a dict-instance Args: fileobject (NamedTemporaryFile): The file to import Returns: dict: The Excel-file as a dict-structured object or None upon failure. Read failure message from the error()-method if applicable. """ method=__name__+"."+"import_excel_file" self.errormsg="" # attempt to load temporary file into workbook try: # load workbook wb = load_workbook(filename = fileobject.name) except Exception as ex: err=ex.message if hasattr(ex,"message") else "" # we had an error - return undefined self.errormsg=method+": Unable to load excel file: "+err return None else: # process the wb return self.__process_excel_file(wb)
# private method that processes the excel-file def __process_excel_file(self,wb) -> dict: """ Processes a csv-file and puts data into a dict structure Args: wb (openpyxl.Workbook): Excel-file to process. Returns: dict: The excel-file as a dict-strucuted object or None upon failure. Read failure message from the error()-method if applicable. """ method=__name__+"."+"__process_excel_file" data={} # get all worksheets in workbook for ws in wb: rpos=-1 # go through each row of worksheet for row in ws.iter_rows(values_only=True): rpos=rpos+1 if rpos == 0: continue # skip first row with headers for pos in range(0,len(row)): # get the fieldname of the current field fieldname=corrdb.COLORDER[pos] if pos < len(corrdb.COLORDER) else "" # only add values that are allowable if pos < len(corrdb.COLORDER): if rpos not in data: data[rpos]={} data[rpos]["replace"]=False data[rpos]["delete"]=False data[rpos]["exists"]=False data[rpos]["invalid"]=False data[rpos]["skip"]=False if "cols" not in data[rpos]: data[rpos]["cols"]={} data[rpos]["cols"][fieldname]={} data[rpos]["cols"][fieldname]["name"]=fieldname value=row[pos] # ensure we have sensible data into our data structure if value is None: value="" elif type(value) != str and type(value) != int and type(value) != float: value=str(value) data[rpos]["cols"][fieldname]["value"]=value # return data dict return data
[docs] def export2csv(self,data,delimiter=";",include=False) -> NamedTemporaryFile: """ Export dict data structure to a CSV-file. Args: data (dict): The dict data structure to export. delimiter (str): Sets the delimiter to use when writing data structure to file. One character only. Defaults to ";" or semicolon. include (bool): Specify if we are to include and export fields that are not public. These fields are defined in the CorrDB-class library. Returns: NamedTemporaryFile: The NamedTemporaryFile-instance containing the CSV-data. """ method=__name__+"."+"export2csv" # clean delimiter delimiter=str(delimiter[0]) # instantiate corrdata cdata=corrdata.CorrData() # create a temporary file to write the data to tmp = NamedTemporaryFile(mode="w+t",suffix = ".csv",newline="\n",encoding="utf-8") tmp.seek(0) # create the csvwrite instance csvwriter = csv.writer(tmp,delimiter=delimiter,quotechar="|",quoting=csv.QUOTE_MINIMAL) # first add the headers colordernames=[] for colname in corrdb.COLORDER: # do not include restricted fields if not option is otherwise if corrdb.COLUMNS[colname]["include"] == False and not include: continue colordernames.append(corrdb.COLUMNS[colname]["name"]) # write the header csvwriter.writerow(colordernames) # then go through and add the data for rowno in range(1,cdata.getRowCount(data)+1): row=[] for field in corrdb.COLORDER: # skip including this field if it is not to be included in the output if corrdb.COLUMNS[field]["include"] == False and not include: continue if field not in data[rowno]["cols"]: row.append("") continue # add column to row list and ensure content do not generate encoding errors row.append(str(data[rowno]["cols"][field]["value"]).encode('utf-8')) # add the data to the file csvwriter.writerow(row) # reset to start of file to prepare for being streamed out to user tmp.seek(0) # return the finished file return tmp
# export data to excel format, include fields that are tagged as not to be # included is by default false.
[docs] def export2excel(self,data,include=False) -> NamedTemporaryFile: """ Export dict data structure to a Excel-file. Args: data (dict): The dict data structure to export. include (bool): Specify if we are to include and export fields that are not public. These fields are defined in the CorrDB-class library. Returns: NamedTemporaryFile: The NamedTemporaryFile-instance containing the Excel-data. """ method=__name__+"."+"export2excel" # create a temporary file to write the data to tmp = NamedTemporaryFile(suffix = ".xlsx") tmp.seek(0) # instantiate corrdata cdata=corrdata.CorrData() # create a workbook wb = Workbook() # create a worksheet ws = wb.create_sheet(title="Corrosion", index=0) # add to the worksheet # insert the new rows needed + headers ws.insert_rows(1,cdata.getRowCount(data)+1) # add the needed columns as well # either all or just a subset if include: ws.insert_cols(1,len(corrdb.COLORDER)) else: ws.insert_cols(1,len(corrdb.COLORDERINCL)) # go through each row in sheet and add data rowno=-1 for row in ws.rows: rowno=rowno+1 # stop if we are finished with the data if rowno > cdata.getRowCount(data): break fieldno=-1 for field in corrdb.COLORDER: # skip including this field if it is not to be included in the output if corrdb.COLUMNS[field]["include"] == False and not include: continue fieldno=fieldno+1 # check if header row if rowno == 0: # this is header - set cell value to header name value=corrdb.COLUMNS[field]["name"] else: # this is a normal data source value value="" # check if we have any data here - if so fetch it if field in data[rowno]["cols"]: value=data[rowno]["cols"][field]["value"] # set default type case ctype=numbers.FORMAT_TEXT # determine which type case to use on data by checking db schema if corrdb.COLUMNS[field]["type"] == "string": ctype=numbers.FORMAT_TEXT elif corrdb.COLUMNS[field]["type"] == "int": ctype=numbers.FORMAT_NUMBER elif corrdb.COLUMNS[field]["type"] == "float": ctype=numbers.FORMAT_NUMBER # set cell data type case row[fieldno].number_format=ctype # set cell data row[fieldno].value=value # save workbook to tmp-file wb.save(tmp.name) # set file pointer to beginning to prepare for being streamed and downloaded tmp.seek(0) # return the finished file return tmp