# -*- coding: utf_8 -*-
import os
# set default corrosion path-variable
DEFCORRPATH="/home/janj/git/corrosion"
# get source location, choose between dev and web prod
SOURCELOC=os.environ["CORROSION_PATH"] if "CORROSION_PATH" in os.environ else DEFCORRPATH
SCRIPTLOC=SOURCELOC+"/corrosion"
# only print to stdout when in dev mode or else it ends up in apache error log
if SOURCELOC == DEFCORRPATH: print ("Corrosion webpage location: ",SOURCELOC,"\n")
from bottle import \
abort,\
auth_basic,\
BaseRequest,\
request,\
route,\
run,\
static_file,\
template,\
default_app,\
TEMPLATE_PATH
import re
# enable loading and saving through streams/temporary files
# because of uploading of files
from tempfile import NamedTemporaryFile, gettempdir
# import ast library
import ast
# import math library
import math
# import hash library
import hashlib
# add application path to sys path
# to allow for loading of our own libraries
import sys
sys.path.append(SCRIPTLOC)
# import corrosion related libraries
import corrdb
import corrinout
import corrgraph
import corrdata
application = default_app()
TEMPLATE_PATH.insert(0,SCRIPTLOC+"/views")
# set the db file to use
CORROSIONDB=SOURCELOC+"/db/corrosion.db"
# adjust request size to avoid issues
BaseRequest.MEMFILE_MAX = 1024 * 8192
# log mechanism
#tlog=open(SCRIPTLOC+"/corr.log","a")
#tlog.write("OS.ENVIRON:\n")
#for v in os.environ:
# tlog.write(v+": "+os.environ[v]+"\n")
#tlog.close()
# start by connecting to database it is needed in all we do
cdb = corrdb.CorrDB(CORROSIONDB)
if cdb.connect() == False:
print ("ERROR! Unable to connect to database: "+cdb.error()+"\n")
exit()
# connect to database and return instance of corrdb
[docs]
def connect():
# start by connecting to database it is needed in all we do
cdb = corrdb.CorrDB(CORROSIONDB)
if cdb.connect() == False:
print ("ERROR! Unable to connect to database: "+cdb.error()+"\n")
exit()
return cdb
[docs]
def sort_human(lst):
"""
Sorts the list in a human way (as opposed to int og alphanum)
Args:
lst (list): list of values, integer or alphanumerical or both, that are to be sorted.
Returns:
list: The input-list in a sorted incarnation.
"""
convert = lambda text: text if text.isdigit() else text
alphanum = lambda key: [convert(c) for c in re.split('([-+]?[0-9\\-]*\\.?[0-9]*)', key)]
lst.sort(key=alphanum)
return lst
[docs]
def is_authenticated(user,pw):
""" Check user auth information
Reads the password for the user from the file: ../pw
Args:
user (str): Username to check for.
pw (str): Password to check for.
Returns:
bool: True if successfully authenticated, False otherwise.
"""
# convert password to sha256 for comparison
pw=str(pw).replace("\n","")
pw=str(pw).replace("\r","")
pw=hashlib.sha256(str(pw).encode()).hexdigest()
# attempt to open password file
fh=None
try:
fh=open(SOURCELOC+"/pw","r")
except Exception as ex:
print (ex)
return False
else:
# we successfully opened the file - read it
syspw=fh.read()
# close the file
fh.close()
# clean away carriage return
syspw=str(syspw).replace("\n","")
syspw=str(syspw).replace("\r","")
# check password
if str(user).lower() == "admin" and str(pw) == str(syspw):
return True
else:
return False
[docs]
@route('/static/<filename>')
def servestatic(filename):
""" Serve files statically from the static-folder
Args:
filename (str): Filename of static-folder file to server
without path.
Returns:
HTTPResponse: The file data for the file that was asked to be served.
Otherwise returns HTTPError.
"""
# we should not get any slashes here, but in case
# some inventive character encodings - we remove any
filename=str(filename).replace("/","")
return static_file(filename, root=SCRIPTLOC+"/static/")
# route to view data stored, root path is also linked
# to the view page
[docs]
@route("/", method=["GET","POST"])
@route("/view", method=["GET","POST"])
def view() -> str:
""" Generate the normal view of the Corrosion website
Args:
None accepted.
Returns:
The finished, generated template for the page.
"""
# connect to database
cdb=connect()
# get total number of rows in database
rowstotal=cdb.getCardinality()
search={}
clearform = "" if "clearform" not in request.forms else request.forms["clearform"]
# go through all columns and see if we have received any input
# and setup the search if so...
for field in corrdb.COLORDER:
if field in request.forms:
if clearform == "Clear":
value=""
request.forms[field]=""
continue
# there are issues with the decoding of forms values which
# are to be corrected in version 0.14 of Bottle. Until then using
# the (soon to be) deprecated method of getunicode.
# else: value=str(request.forms[field]).strip()
else: value=str(request.forms.getunicode(field,None,"utf-8")).strip()
if value is not None and value !="":
search[field]=value
# get logical operator
lop="AND" if "lop" not in request.forms else request.forms["lop"]
# perform the search for relevant data
data=cdb.search(search,lop)
if data is None:
# failed to search database - report error
return template("error",error=True,errormsg=cdb.error())
graphs=None
uniques={}
if data is not None:
# instantiate data class
cdata=corrdata.CorrData()
# get unique institutions, instrument type and instrument family
uniques["museum"]=cdata.getFieldUniqueValues(data,"museum")
uniques["type"]=cdata.getFieldUniqueValues(data,"type")
uniques["family"]=cdata.getFieldUniqueValues(data,"family")
# get graphs
graphs=generate_graphs(data)
return template("view",data=data,columns=corrdb.COLUMNS,colordersearch=corrdb.COLORDERSEARCH,
colorderinclude=corrdb.COLORDERINCL,graphs=graphs,formdata=request.forms,
rowstotal=rowstotal,uniques=uniques)
[docs]
def generate_graphs(data) -> list:
""" Generate the graphs for the Corrosion website
Args:
data (dict): The CorrDB dict data-set to generate the graphs from.
Returns:
list: A list of strings that are the rendered HTML code for each graph. Each code used must be put into
the template using "!" in front of the template variable to ensure it is not escaped and thereby
breaking the graph code. Trust the Graph!
"""
# instantiate graph class
graph=corrgraph.CorrGraph(False,
config={"displaylogo": False,
"modeBarButtonsToRemove": [
"pan2d",
"lasso2d",
"autoScale",
"select",
"resetScale",
]})
# instantiate data class
cdata=corrdata.CorrData()
# get total number of entries in data
total=cdata.getRowCount(data)
# a little trick to avoid div by zero
pertotal=0
if total > 0: pertotal=100/total
# make a variable/value/settings hash
gv={}
gv["graph"]=graph
gv["data"]=data
gv["cdata"]=cdata
gv["total"]=total
gv["pertotal"]=pertotal
# check if total result is low, then do not attempt to generate graphs
if total <= 0:
# return empty graphs
return ["","","","","","","","","","","","","",""]
# generate the graphs
ginvest=generateMusicalInstrumentsInvestigated(gv)
gdating=generateDatings(gv)
gregion=generateProvenienceOrigin(gv)
gcombinations=generateOrganicMaterialToMetalTypeContactCorrosionFrequency(gv)
gmaterial1=generateMetalMaterialsInvolvedInContactCorrosion(gv)
gcmaterial2=generateOrganicMaterialsInvolvedInContactCorrosion(gv)
gcondcp=generateConditionOfContactPoint(gv)
gsigndegred=generateSignsOfDegredation(gv)
gcolor2metal=generateMetalTypeToMetalColor(gv)
gcolor2metalbc=generateMetalTypeToMetalColorBarChart(gv)
gexchange=generateEarlierExchangeOrTreatmentOfInstrument(gv)
gcorr=generateDependenceOfCorrosionToContactPoint(gv)
grh=generateRelativeHumidity(gv)
gfuncmat=generateFunctionOfMaterials(gv)
# return graphs as a list
return [ginvest,gmaterial1,gcmaterial2,gdating,gcorr,gregion,gcombinations,gcolor2metal,gcolor2metalbc,gsigndegred,gcondcp,gexchange,grh,gfuncmat]
[docs]
def generateMusicalInstrumentsInvestigated(gv) -> str:
graph=gv["graph"]
data=gv["data"]
pertotal=gv["pertotal"]
cdata=gv["cdata"]
# generate pie-chart of investigated number of various families
# get unique family values in the data
families=cdata.getFieldUniqueValues(data,"family")
investigated={}
invest=[]
# get the number of rows with a given family value
for family in families:
# get the count
investigated[family]=cdata.getFieldCount(data,"family",family)
# add data to a usable format
invest.append({ "Family" : family,
"Percentage" : round(pertotal*investigated[family],1),
"Count" : investigated[family]
})
# generate the graph from the data
ginvest=graph.generatePieChart(invest,
"Percentage",
"Family",
"Musical Instruments Investigated")
# False,
# 0,
# "Family",
# ["Family","Count","Percentage"])
return ginvest
[docs]
def generateDatings(gv) -> str:
graph=gv["graph"]
data=gv["data"]
cdata=gv["cdata"]
# show datings of instruments
# get unique ranges
ranges=cdata.getFieldUniqueValues(data,"proddate")
# sort ranges
ranges=sort_human(ranges)
# move "after 2001" so that it appears on top of vertical axis
if "after 2001" in ranges:
ranges.remove("after 2001")
ranges.append("after 2001")
# go through each range list and count occurrence and add to data
dating=[]
datingtotal=0
periodmax=0
for period in ranges:
count=cdata.getFieldCount(data,"proddate",period)
datingtotal=datingtotal+count
dating.append({"Dating": period, "Count": count})
if count > periodmax: periodmax=count
# go through and add percentage
datingper=0
if datingtotal > 0: datingper=100 / datingtotal
for item in dating:
item["Investigated instruments (%)"]=round(datingper*item["Count"],1)
# set a rangemax that is not too big
rangemax=60 if periodmax < 60 else periodmax
# generate datings graph
gdating=graph.generateHorizontalBarChart(dating,
"Investigated instruments (%)",
"Dating",
"Date of production",
None,
"Dating",
["Count","Investigated instruments (%)"],
None,
False,
None,
[0,rangemax])
return gdating
[docs]
def generateProvenienceOrigin(gv) -> str:
graph=gv["graph"]
pertotal=gv["pertotal"]
data=gv["data"]
cdata=gv["cdata"]
# generate provenience/origin graph
# define unique geographical areas
regions=["Asia","Africa","North America","South America","Europe","Antarctica","Australia","unknown"]
#regions=cdata.getFieldUniqueValues(data,"prodplace")
regions=sort_human(regions)
regioncount=[]
for region in regions:
count=cdata.getFieldCount(data,"prodplace",region)
regioncount.append({ "Continent":region,
"Investigated instruments (%)":round(pertotal*count,1),
"Count": count,
})
# generate the barchart of origins
gregion=graph.generateHorizontalBarChart(regioncount,
"Investigated instruments (%)",
"Continent",
"Place of production",
None,
"Continent",
["Investigated instruments (%)","Count"],
None,
False)
return gregion
[docs]
def generateSignsOfDegredation(gv) -> str:
graph=gv["graph"]
data=gv["data"]
cdata=gv["cdata"]
# make signs of degradation graph
degdict={};
mattypes=["metal","organic"]
categories=["discolor","surfalter","structdam","nochange"]
names={}
names["metal"]="Metal"
names["organic"]="Organic material"
names["discolor"]="Discolouration"
names["surfalter"]="Surface alteration"
names["structdam"]="Structural damage"
names["nochange"]="No change"
catnames=[]
for category in categories:
catnames.append(names[category])
degtotal={}
# go through all data and harvest
for rowno in data:
# check for all contact points possible
for no in range(1,5):
# skip this data point if no contact corrosion was found
if str(data[rowno]["cols"]["corrfound"+str(no)]["value"]).lower() != "yes": continue
# go through both material types and categories within the types
# and check if instrument has any changes in those categories - if so count it
for material in mattypes:
# init material type dict if not already there
if material not in degdict: degdict[material]={}
if material not in degtotal: degtotal[material]=0
for category in categories:
# init category count if not already set
if category not in degdict[material]: degdict[material][category]=0
# get value of given category
value=str(data[rowno]["cols"][material+category+str(no)]["value"]).strip()
# if value for category is 1, add it to the count we already have
if value == "1":
# add another count to this category of degradation
degdict[material][category]=degdict[material][category]+1
# count overall degradation count for this material type
degtotal[material]=degtotal[material]+1
# convert totals to percentage and avoid div by zero in so doing
for material in mattypes:
if material in degtotal and degtotal[material] > 0:
degtotal[material]=100 / degtotal[material]
# now that we have the count for all material and its categories, make data array
degdata=[]
for material in mattypes:
for category in categories:
# add this to the degdata
degdata.append({ "Material": names[material],
"Category": names[category],
"Count": degdict[material][category],
"Percentage": round(degtotal[material]*degdict[material][category],1) })
# generate the barchart of signs of degradation
gsigndegred=graph.generateHorizontalBarChart(degdata,
"Percentage",
"Material",
"Signs of degradation",
"Category",
"Category",
["Material","Percentage","Count"],
None,
False)
return gsigndegred
[docs]
def generateEarlierExchangeOrTreatmentOfInstrument(gv) -> str:
graph=gv["graph"]
data=gv["data"]
# make earlier part- and treatment exchange graph
exdict={};
extypes=["metal","organic"]
excat=["earlierex","earliertreat","earliertreatnov"]
exnames={}
exnames["metal"]="Metal"
exnames["organic"]="Organic material"
exnames["earlierex"]="Earlier exchange of part"
exnames["earliertreat"]="Earlier treatment"
exnames["earliertreatnov"]="No earlier treatment visible"
excatnames=[]
for category in excat:
excatnames.append(exnames[category])
extotal={}
# go through all data and harvest
for rowno in data:
# check for all contact points possible
for material in extypes:
# init material type dict if not already there
if material not in exdict: exdict[material]={}
if material not in extotal: extotal[material]=0
# go throuch each category for each material
for category in excat:
# init category count if not already set
if category not in exdict[material]: exdict[material][category]=0
# get value of given category
value=str(data[rowno]["cols"][material+category]["value"]).strip()
# if value for category is 1, add it to the count we already have
if value == "1":
# add another count to this category of exchange/treatment
exdict[material][category]=exdict[material][category]+1
# count overall exchange/treatment count for this material type
extotal[material]=extotal[material]+1
# convert totals to percentage and avoid div by zero in so doing
for material in extypes:
if extotal[material] > 0:
extotal[material]=100 / extotal[material]
# now that we have the count for all material and its categories, make data array
exdata=[]
for material in extypes:
for category in excat:
# add this to the degdata
exdata.append({ "Material": exnames[material],
"Category": exnames[category],
"Count": exdict[material][category],
"Percentage": round(extotal[material]*exdict[material][category]) })
# generate the barchart of earlier exchange/treatment
gexchange=graph.generateHorizontalBarChart(exdata,
"Percentage",
"Material",
"Earlier treatments",
"Category",
"Category",
["Material","Percentage","Count"])
return gexchange
[docs]
def generateRelativeHumidity(gv) -> str:
graph=gv["graph"]
data=gv["data"]
pertotal=gv["pertotal"]
# create relative humidity graph
# create rh-list with 11 datapoints which corresponds to the
# the division of 100% into 10 equal sub-value ranges (100% being a datapoint of its own)
rhlist=[0,0,0,0,0,0,0,0,0,0,0]
# go through each row and collect data
for rowno in data:
# get the rh value for this row
value=str(data[rowno]["cols"]["rh"]["value"]).strip()
value=int(value)
if value >= 0 and value <= 100:
# locate which subrange value belongs to
subrange=math.floor(value / 10)
# increase count in given subrange
rhlist[subrange]=rhlist[subrange]+1
# create the data for the graph itself
rhdata=[]
for subrange in range(0,11):
# add the value for this subrange
if subrange == 10: rangetxt="100"
else: rangetxt=str(subrange*10)+"-"+str((subrange*10)+9)
rhdata.append({ "Relative humidity (%)": rangetxt,
"Investigated instruments (%)": round(pertotal*rhlist[subrange],1),
"Count": rhlist[subrange] })
# generate the barchart of earlier exchange/treatment
grh=graph.generateHorizontalBarChart(rhdata,
"Investigated instruments (%)",
"Relative humidity (%)",
"The environmental situation",
"Relative humidity (%)",
"Relative humidity (%)",
["Relative humidity (%)","Investigated instruments (%)","Count"],
None,
False,
None,
None,
False)
return grh
[docs]
def generateFunctionOfMaterials(gv) -> str:
graph=gv["graph"]
data=gv["data"]
cdata=gv["cdata"]
# generate the function of the materials graph
funcmat={}
locations=[]
loccount={}
# go through all data and harvest combination points
for rowno in data:
# check for all contact points possible
for no in range(1,5):
# skip this data point if no contact corrosion was found
if str(data[rowno]["cols"]["corrfound"+str(no)]["value"]).lower() != "yes": continue
for material in ["metal","organic"]:
# get the values to work with
location=str(data[rowno]["cols"][material+"loc"+str(no)]["value"]).strip()
# we need value to proceed
if location == "": continue
# save the unique locations
if location not in locations: locations.append(location)
# create dict with combination counter information
if material not in funcmat: funcmat[material]={}
if location not in funcmat[material]: funcmat[material][location]=0
if material not in loccount: loccount[material]=0
# increase count for this combination
funcmat[material][location]=funcmat[material][location]+1
# increase overall location counter for a given material
loccount[material]=loccount[material]+1
# make dataset
mat2loc=[]
for material in funcmat:
# calculate percentage for this material
mper=0
if loccount[material] > 0: mper=100 / loccount[material]
for location in locations:
# set initial count to zero, so we have value even if location is not
# available in the material
count=0
if location in funcmat[material]: count=funcmat[material][location]
materialstr="Metal"
if material == "organic": materialstr="Organic material"
mat2loc.append({ "Material Type": materialstr,
"Location": location,
"Count": count,
"Function of materials (%)": round(mper*count,1),
})
gmat2loc=""
if len(mat2loc) > 0:
# now, generate the graph
gmat2loc=graph.generateHorizontalBarChart(mat2loc,
"Function of materials (%)",
"Material Type",
"The function of the materials involved in contact corrosion",
"Location",
"Location",
["Location","Material Type",
"Function of materials (%)","Count"])
if gmat2loc is None: print ("ERROR:",graph.error())
return gmat2loc
[docs]
@route("/import")
@auth_basic(is_authenticated)
def import_data() -> str:
""" Import data from files, into the DB
This route requires authentication to be used.
Args:
None is accepted.
Returns:
str: The string with the rendered template code.
"""
return template("import")
[docs]
@route("/import/upload", method="POST")
@auth_basic(is_authenticated)
def import_upload() -> str:
""" Upload the file specified to a temporary file
Args:
None is accepted.
Returns:
str: Returns the rendered template for the route
"""
# connect to database
cdb=connect()
# some variables
# get the parameters for the upload
upload = request.files.get("upload")
name, ext = os.path.splitext(upload.filename)
if ext not in (".xlsx",".csv"):
errormsg="This file does not seem to be an excel- or csv-file. Must have extension xlsx or csv."
return template("error",error=True,errormsg=errormsg)
# store the uploaded file to a temporary file before loading it
tmp = NamedTemporaryFile(suffix = ext)
tmp.seek(0)
# save the uploaded file to the temporary file
upload.save(tmp)
# instantiate CorrInOut
cio=corrinout.CorrInOut()
err=False
errormsg=""
if str(ext).upper() == ".XLSX":
# attempt to open as excel-file
data = cio.import_excel_file(tmp)
if type(data) is None:
err=True
errormsg=cio.error()
elif str(ext).upper() == ".CSV":
# attempt to open as csv-file when excel-file failed
data = cio.import_csv_file(tmp)
if type(data) is None:
err=True
errormsg=cio.error()
# close tmp-file
tmp.close()
# check the data content
# tag repeated/already existing data, invalid etc.
data = cdb.check_data(data)
# get number of keys in data
rowcount=len(data.keys())
# get number of invalid keys
invalidcount=0
for row in data.keys():
if data[row]["invalid"]: invalidcount=invalidcount+1
# invoke template for page and feed it with the import data
return template("upload",data=data,columns=corrdb.COLUMNS,colorder=corrdb.COLORDER,\
rowcount=rowcount,invalidcount=invalidcount,error=err,errormsg=errormsg)
[docs]
@route("/import/update", method="POST")
@auth_basic(is_authenticated)
def import_update() -> str:
""" Update the database with the rows that was decided to be imported.
Args:
None is accepted.
Returns:
str: The rendered template for the update-process.
"""
# connect to database
cdb=connect()
# get data structure - convert through literal eval
data=ast.literal_eval(request.forms.data)
rows=len(data.keys())
# go through each row
for rowno in range(1,rows+1):
# attempt to get row choice data
oper=request.forms.get('choices_'+str(rowno)) or ""
if oper == "": continue
# we got a operation choice - tag it if replace, delete, skip
if oper == "replace": data[rowno]["replace"]=True
if oper == "delete": data[rowno]["delete"]=True
if oper == "skip": data[rowno]["skip"]=True
# lets write data into database
updret=cdb.update(data)
# get return result
ret=updret[0]
count=0
# get number of rows imported to db if update was successful
if ret: count=updret[1]
return template("update",data=data,count=count,error=not ret,errormsg=cdb.error())
[docs]
@route("/export/<path:re:excel|csv>/all", method=["GET","POST"])
@auth_basic(is_authenticated)
def export_data_all(path):
""" Export all data in database
This route requires authentication
Args:
path (str): The path specifies which format to export to (csv or excel).
Returns:
HTTPResponse: The created file of exported data. HTTPError upon failure.
"""
# this path requires auth, and if auth,
# we set that all fields are to be exported
return export_data(path,True)
[docs]
@route("/export/<path:re:excel|csv>", method=["GET","POST"])
def export_data(path,all=False):
""" Export data in database to excel- or csv-format
Args:
path (str): The path specifies which format to export to (csv or excel).
all (bool): Specifies if all (including protected data) is to be exported or not. This option is
not available through the immediate route above, but must be called through the
export_data_all()-function.
Returns:
HTTPResponse: The created file of exported data. HTTPError upon failure.
"""
# connect to database
cdb=connect()
# create file extension to use
ext="xlsx" if path == "excel" else "csv"
# get delimiter
delimiter=";" if "delimiter" not in request.forms else request.forms["delimiter"]
# get logical operator
lop="AND" if "lop" not in request.forms else request.forms["lop"]
# set export filename
outname="corrosion_export" if "filename" not in request.forms else request.forms.filename
outname=outname+"."+ext
# check if we have received any search
search={}
for field in corrdb.COLORDER:
if field in request.forms:
value=str(request.forms[field]).strip()
if value is not None and value !="":
search[field]=value
# attempt to search for the data
data=cdb.search(search,lop,all)
if data is None:
# failed to search database - report error
return template("error",error=True,errormsg=cdb.error())
# lets ask for an export
cio=corrinout.CorrInOut()
# generate data and return the tmp-file instance
tmp=None
if path == "excel":
# generate excel-file
tmp=cio.export2excel(data,all)
else:
# generate csv-file
tmp=cio.export2csv(data,delimiter,all)
# get the name part of the full path and filename
name=os.path.basename(tmp.name)
# return the generated file
return static_file(name,root=gettempdir(),download=outname)
[docs]
@route('/logout', method=["GET", "POST"])
@auth_basic(is_authenticated)
def logout():
""" Logout the admin user of the corrosion web site
Have to already be authenticated before this function can run.
Args:
None is accepted.
Returns:
HTTPResponse: 401 error message with successful logout...
"""
abort(401, "Logged out successfully...")
if ((__name__ == '__main__') and ("CORROSION_PATH" not in os.environ)):
run(host='localhost', port=8080, debug=True, reloader=True)