# -*- coding: utf_8 -*-
# manipulating and working with CSV files
import csv
# manipulating and working with excel files
from openpyxl import Workbook
# loading excel files
from openpyxl import load_workbook
from openpyxl.styles import numbers
# temp library
from tempfile import NamedTemporaryFile
# corrosion db library
import corrdb
# corrosion data manipulation
import corrdata
# regex library
import re
[docs]
class CorrInOut:
""" Class to import and export to/from excel/csv and from/to a db.
"""
# constructor
def __init__(self):
""" Instantiates the CorrInOut class """
self.errormsg=""
# get last error message
[docs]
def error(self) -> str:
""" Get last error from library.
Args:
None needed
Returns:
str: The error message, if any
"""
return self.errormsg;
# import csv-file
[docs]
def import_csv_file(self,fileobject) -> dict:
""" Import CSV-file and return as a dict-instance
Args:
fileobject (NamedTemporaryFile): The file to import
Returns:
dict: The csv-file as a dict-structured object or None upon failure.
Read failure message from the error()-method if applicable.
"""
method=__name__+"."+"import_csv_file"
# go through temp-file and parse as a CSV-file
return self.__process_csv_file(fileobject)
# private method that processes the csv-file
def __process_csv_file(self,fileobject) -> dict:
""" Processes a csv-file and puts data into a dict structure
Args:
fileobject (NamedTemporaryFile): CSV-file to process.
Returns:
dict: The csv-file as a dict-strucuted object or None upon failure.
Read failure message from the error()-method if applicable.
"""
method=__name__+"."+"__process_csv_file"
# create some variables needed
csvreader=""
data={}
try:
# attempt to read csv-data from temp-file
csvreader = csv.reader(fileobject, delimiter=';', quotechar='\\')
except Exception as ex:
err=ex.message if hasattr(ex,"message") else ""
self.errormsg=method+": Unable to read CSV-file: "+err
return None
else:
# go through each row
rpos=-1
for row in csvreader:
rpos=rpos+1
if rpos == 0: continue # skip first row with headers
for pos in range(0,len(row)):
# get the fieldname of the current field
fieldname=corrdb.COLORDER[pos] if pos < len(corrdb.COLORDER) else ""
# only add values that are allowable
if pos < len(corrdb.COLORDER):
if rpos not in data: data[rpos]={}
data[rpos]["replace"]=False
data[rpos]["delete"]=False
data[rpos]["exists"]=False
data[rpos]["invalid"]=False
data[rpos]["skip"]=False
if "cols" not in data[rpos]: data[rpos]["cols"]={}
data[rpos]["cols"]={}
data[rpos]["cols"][fieldname]={}
data[rpos]["cols"][fieldname]["name"]=fieldname
data[rpos]["cols"][fieldname]["value"]=str(row[pos])
# return data dict
return data
# import an excel file
[docs]
def import_excel_file(self,fileobject) -> dict:
""" Import Excel-file and return as a dict-instance
Args:
fileobject (NamedTemporaryFile): The file to import
Returns:
dict: The Excel-file as a dict-structured object or None upon failure.
Read failure message from the error()-method if applicable.
"""
method=__name__+"."+"import_excel_file"
self.errormsg=""
# attempt to load temporary file into workbook
try:
# load workbook
wb = load_workbook(filename = fileobject.name)
except Exception as ex:
err=ex.message if hasattr(ex,"message") else ""
# we had an error - return undefined
self.errormsg=method+": Unable to load excel file: "+err
return None
else:
# process the wb
return self.__process_excel_file(wb)
# private method that processes the excel-file
def __process_excel_file(self,wb) -> dict:
""" Processes a csv-file and puts data into a dict structure
Args:
wb (openpyxl.Workbook): Excel-file to process.
Returns:
dict: The excel-file as a dict-strucuted object or None upon failure.
Read failure message from the error()-method if applicable.
"""
method=__name__+"."+"__process_excel_file"
data={}
# get all worksheets in workbook
for ws in wb:
rpos=-1
# go through each row of worksheet
for row in ws.iter_rows(values_only=True):
rpos=rpos+1
if rpos == 0: continue # skip first row with headers
for pos in range(0,len(row)):
# get the fieldname of the current field
fieldname=corrdb.COLORDER[pos] if pos < len(corrdb.COLORDER) else ""
# only add values that are allowable
if pos < len(corrdb.COLORDER):
if rpos not in data: data[rpos]={}
data[rpos]["replace"]=False
data[rpos]["delete"]=False
data[rpos]["exists"]=False
data[rpos]["invalid"]=False
data[rpos]["skip"]=False
if "cols" not in data[rpos]: data[rpos]["cols"]={}
data[rpos]["cols"][fieldname]={}
data[rpos]["cols"][fieldname]["name"]=fieldname
value=row[pos]
# ensure we have sensible data into our data structure
if value is None:
value=""
elif type(value) != str and type(value) != int and type(value) != float:
value=str(value)
data[rpos]["cols"][fieldname]["value"]=value
# return data dict
return data
[docs]
def export2csv(self,data,delimiter=";",include=False) -> NamedTemporaryFile:
""" Export dict data structure to a CSV-file.
Args:
data (dict): The dict data structure to export.
delimiter (str): Sets the delimiter to use when writing data structure to file.
One character only. Defaults to ";" or semicolon.
include (bool): Specify if we are to include and export fields that are not public.
These fields are defined in the CorrDB-class library.
Returns:
NamedTemporaryFile: The NamedTemporaryFile-instance containing the CSV-data.
"""
method=__name__+"."+"export2csv"
# clean delimiter
delimiter=str(delimiter[0])
# instantiate corrdata
cdata=corrdata.CorrData()
# create a temporary file to write the data to
tmp = NamedTemporaryFile(mode="w+t",suffix = ".csv",newline="\n",encoding="utf-8")
tmp.seek(0)
# create the csvwrite instance
csvwriter = csv.writer(tmp,delimiter=delimiter,quotechar="|",quoting=csv.QUOTE_MINIMAL)
# first add the headers
colordernames=[]
for colname in corrdb.COLORDER:
# do not include restricted fields if not option is otherwise
if corrdb.COLUMNS[colname]["include"] == False and not include: continue
colordernames.append(corrdb.COLUMNS[colname]["name"])
# write the header
csvwriter.writerow(colordernames)
# then go through and add the data
for rowno in range(1,cdata.getRowCount(data)+1):
row=[]
for field in corrdb.COLORDER:
# skip including this field if it is not to be included in the output
if corrdb.COLUMNS[field]["include"] == False and not include: continue
if field not in data[rowno]["cols"]:
row.append("")
continue
# add column to row list and ensure content do not generate encoding errors
row.append(str(data[rowno]["cols"][field]["value"]).encode('utf-8'))
# add the data to the file
csvwriter.writerow(row)
# reset to start of file to prepare for being streamed out to user
tmp.seek(0)
# return the finished file
return tmp
# export data to excel format, include fields that are tagged as not to be
# included is by default false.
[docs]
def export2excel(self,data,include=False) -> NamedTemporaryFile:
""" Export dict data structure to a Excel-file.
Args:
data (dict): The dict data structure to export.
include (bool): Specify if we are to include and export fields that are not public. These fields are defined in the CorrDB-class library.
Returns:
NamedTemporaryFile: The NamedTemporaryFile-instance containing the Excel-data.
"""
method=__name__+"."+"export2excel"
# create a temporary file to write the data to
tmp = NamedTemporaryFile(suffix = ".xlsx")
tmp.seek(0)
# instantiate corrdata
cdata=corrdata.CorrData()
# create a workbook
wb = Workbook()
# create a worksheet
ws = wb.create_sheet(title="Corrosion", index=0)
# add to the worksheet
# insert the new rows needed + headers
ws.insert_rows(1,cdata.getRowCount(data)+1)
# add the needed columns as well
# either all or just a subset
if include:
ws.insert_cols(1,len(corrdb.COLORDER))
else: ws.insert_cols(1,len(corrdb.COLORDERINCL))
# go through each row in sheet and add data
rowno=-1
for row in ws.rows:
rowno=rowno+1
# stop if we are finished with the data
if rowno > cdata.getRowCount(data): break
fieldno=-1
for field in corrdb.COLORDER:
# skip including this field if it is not to be included in the output
if corrdb.COLUMNS[field]["include"] == False and not include: continue
fieldno=fieldno+1
# check if header row
if rowno == 0:
# this is header - set cell value to header name
value=corrdb.COLUMNS[field]["name"]
else:
# this is a normal data source value
value=""
# check if we have any data here - if so fetch it
if field in data[rowno]["cols"]:
value=data[rowno]["cols"][field]["value"]
# set default type case
ctype=numbers.FORMAT_TEXT
# determine which type case to use on data by checking db schema
if corrdb.COLUMNS[field]["type"] == "string": ctype=numbers.FORMAT_TEXT
elif corrdb.COLUMNS[field]["type"] == "int": ctype=numbers.FORMAT_NUMBER
elif corrdb.COLUMNS[field]["type"] == "float": ctype=numbers.FORMAT_NUMBER
# set cell data type case
row[fieldno].number_format=ctype
# set cell data
row[fieldno].value=value
# save workbook to tmp-file
wb.save(tmp.name)
# set file pointer to beginning to prepare for being streamed and downloaded
tmp.seek(0)
# return the finished file
return tmp