Source code for main

# -*- coding: utf_8 -*-
import os
# set default corrosion path-variable
DEFCORRPATH="/home/janj/git/corrosion"
# get source location, choose between dev and web prod
SOURCELOC=os.environ["CORROSION_PATH"] if "CORROSION_PATH" in os.environ else DEFCORRPATH
SCRIPTLOC=SOURCELOC+"/corrosion"
# only print to stdout when in dev mode or else it ends up in apache error log
if SOURCELOC == DEFCORRPATH: print ("Corrosion webpage location: ",SOURCELOC,"\n")
from bottle import \
     abort,\
     auth_basic,\
     BaseRequest,\
     request,\
     route,\
     run,\
     static_file,\
     template,\
     default_app,\
     TEMPLATE_PATH
import re
# enable loading and saving through streams/temporary files
# because of uploading of files
from tempfile import NamedTemporaryFile, gettempdir
# import ast library
import ast
# import math library
import math
# import hash library
import hashlib

# add application path to sys path
# to allow for loading of our own libraries
import sys
sys.path.append(SCRIPTLOC)
# import corrosion related libraries
import corrdb
import corrinout
import corrgraph
import corrdata

application = default_app()
TEMPLATE_PATH.insert(0,SCRIPTLOC+"/views")

# set the db file to use
CORROSIONDB=SOURCELOC+"/db/corrosion.db"

# adjust request size to avoid issues
BaseRequest.MEMFILE_MAX = 1024 * 8192

# log mechanism
#tlog=open(SCRIPTLOC+"/corr.log","a")
#tlog.write("OS.ENVIRON:\n")
#for v in os.environ:
#    tlog.write(v+": "+os.environ[v]+"\n")
#tlog.close()

# start by connecting to database it is needed in all we do
cdb = corrdb.CorrDB(CORROSIONDB)
if cdb.connect() == False:
   print ("ERROR! Unable to connect to database: "+cdb.error()+"\n")
   exit()

# connect to database and return instance of corrdb
[docs] def connect(): # start by connecting to database it is needed in all we do cdb = corrdb.CorrDB(CORROSIONDB) if cdb.connect() == False: print ("ERROR! Unable to connect to database: "+cdb.error()+"\n") exit() return cdb
[docs] def sort_human(lst): """ Sorts the list in a human way (as opposed to int og alphanum) Args: lst (list): list of values, integer or alphanumerical or both, that are to be sorted. Returns: list: The input-list in a sorted incarnation. """ convert = lambda text: text if text.isdigit() else text alphanum = lambda key: [convert(c) for c in re.split('([-+]?[0-9\\-]*\\.?[0-9]*)', key)] lst.sort(key=alphanum) return lst
[docs] def is_authenticated(user,pw): """ Check user auth information Reads the password for the user from the file: ../pw Args: user (str): Username to check for. pw (str): Password to check for. Returns: bool: True if successfully authenticated, False otherwise. """ # convert password to sha256 for comparison pw=str(pw).replace("\n","") pw=str(pw).replace("\r","") pw=hashlib.sha256(str(pw).encode()).hexdigest() # attempt to open password file fh=None try: fh=open(SOURCELOC+"/pw","r") except Exception as ex: print (ex) return False else: # we successfully opened the file - read it syspw=fh.read() # close the file fh.close() # clean away carriage return syspw=str(syspw).replace("\n","") syspw=str(syspw).replace("\r","") # check password if str(user).lower() == "admin" and str(pw) == str(syspw): return True else: return False
[docs] @route('/static/<filename>') def servestatic(filename): """ Serve files statically from the static-folder Args: filename (str): Filename of static-folder file to server without path. Returns: HTTPResponse: The file data for the file that was asked to be served. Otherwise returns HTTPError. """ # we should not get any slashes here, but in case # some inventive character encodings - we remove any filename=str(filename).replace("/","") return static_file(filename, root=SCRIPTLOC+"/static/")
# route to view data stored, root path is also linked # to the view page
[docs] @route("/", method=["GET","POST"]) @route("/view", method=["GET","POST"]) def view() -> str: """ Generate the normal view of the Corrosion website Args: None accepted. Returns: The finished, generated template for the page. """ # connect to database cdb=connect() # get total number of rows in database rowstotal=cdb.getCardinality() search={} clearform = "" if "clearform" not in request.forms else request.forms["clearform"] # go through all columns and see if we have received any input # and setup the search if so... for field in corrdb.COLORDER: if field in request.forms: if clearform == "Clear": value="" request.forms[field]="" continue # there are issues with the decoding of forms values which # are to be corrected in version 0.14 of Bottle. Until then using # the (soon to be) deprecated method of getunicode. # else: value=str(request.forms[field]).strip() else: value=str(request.forms.getunicode(field,None,"utf-8")).strip() if value is not None and value !="": search[field]=value # get logical operator lop="AND" if "lop" not in request.forms else request.forms["lop"] # perform the search for relevant data data=cdb.search(search,lop) if data is None: # failed to search database - report error return template("error",error=True,errormsg=cdb.error()) graphs=None uniques={} if data is not None: # instantiate data class cdata=corrdata.CorrData() # get unique institutions, instrument type and instrument family uniques["museum"]=cdata.getFieldUniqueValues(data,"museum") uniques["type"]=cdata.getFieldUniqueValues(data,"type") uniques["family"]=cdata.getFieldUniqueValues(data,"family") # get graphs graphs=generate_graphs(data) return template("view",data=data,columns=corrdb.COLUMNS,colordersearch=corrdb.COLORDERSEARCH, colorderinclude=corrdb.COLORDERINCL,graphs=graphs,formdata=request.forms, rowstotal=rowstotal,uniques=uniques)
[docs] def generate_graphs(data) -> list: """ Generate the graphs for the Corrosion website Args: data (dict): The CorrDB dict data-set to generate the graphs from. Returns: list: A list of strings that are the rendered HTML code for each graph. Each code used must be put into the template using "!" in front of the template variable to ensure it is not escaped and thereby breaking the graph code. Trust the Graph! """ # instantiate graph class graph=corrgraph.CorrGraph(False, config={"displaylogo": False, "modeBarButtonsToRemove": [ "pan2d", "lasso2d", "autoScale", "select", "resetScale", ]}) # instantiate data class cdata=corrdata.CorrData() # get total number of entries in data total=cdata.getRowCount(data) # a little trick to avoid div by zero pertotal=0 if total > 0: pertotal=100/total # make a variable/value/settings hash gv={} gv["graph"]=graph gv["data"]=data gv["cdata"]=cdata gv["total"]=total gv["pertotal"]=pertotal # check if total result is low, then do not attempt to generate graphs if total <= 0: # return empty graphs return ["","","","","","","","","","","","","",""] # generate the graphs ginvest=generateMusicalInstrumentsInvestigated(gv) gdating=generateDatings(gv) gregion=generateProvenienceOrigin(gv) gcombinations=generateOrganicMaterialToMetalTypeContactCorrosionFrequency(gv) gmaterial1=generateMetalMaterialsInvolvedInContactCorrosion(gv) gcmaterial2=generateOrganicMaterialsInvolvedInContactCorrosion(gv) gcondcp=generateConditionOfContactPoint(gv) gsigndegred=generateSignsOfDegredation(gv) gcolor2metal=generateMetalTypeToMetalColor(gv) gcolor2metalbc=generateMetalTypeToMetalColorBarChart(gv) gexchange=generateEarlierExchangeOrTreatmentOfInstrument(gv) gcorr=generateDependenceOfCorrosionToContactPoint(gv) grh=generateRelativeHumidity(gv) gfuncmat=generateFunctionOfMaterials(gv) # return graphs as a list return [ginvest,gmaterial1,gcmaterial2,gdating,gcorr,gregion,gcombinations,gcolor2metal,gcolor2metalbc,gsigndegred,gcondcp,gexchange,grh,gfuncmat]
[docs] def generateMusicalInstrumentsInvestigated(gv) -> str: graph=gv["graph"] data=gv["data"] pertotal=gv["pertotal"] cdata=gv["cdata"] # generate pie-chart of investigated number of various families # get unique family values in the data families=cdata.getFieldUniqueValues(data,"family") investigated={} invest=[] # get the number of rows with a given family value for family in families: # get the count investigated[family]=cdata.getFieldCount(data,"family",family) # add data to a usable format invest.append({ "Family" : family, "Percentage" : round(pertotal*investigated[family],1), "Count" : investigated[family] }) # generate the graph from the data ginvest=graph.generatePieChart(invest, "Percentage", "Family", "Musical Instruments Investigated") # False, # 0, # "Family", # ["Family","Count","Percentage"]) return ginvest
[docs] def generateMetalMaterialsInvolvedInContactCorrosion(gv) -> str: graph=gv["graph"] data=gv["data"] cdata=gv["cdata"] # generate material 1 count (metal) materials1=[] # get unique materials for matno in range(1,5): tmpmat=cdata.getFieldUniqueValues(data,"metaltype"+str(matno)) for material in tmpmat: if material not in materials1: materials1.append(material) material1count={} material1=[] material1total=0 # get the number of rows with given material value entryname="Metal Type" for material in materials1: # get the count - go through all possible corrosion point from 1-4 # count only those that have corrfoundN = Yes for matno in range(1,5): if material not in material1count: material1count[material]=0 material1count[material]=material1count[material]+cdata.getFieldCount(data,"metaltype"+str(matno),[material],True,False,"corrfound"+str(matno),["Yes"]) # increment total of materials found material1total=material1total + material1count[material] # add data to a usable format entry={} entry[entryname]=material entry["Count"]=material1count[material] material1.append(entry) # go through and add percentage mat1per=0 if material1total > 0: mat1per=100 / material1total for item in material1: item["Percentage"]=round((mat1per*item["Count"]),1) # generate the graph from the data gmaterial1=graph.generateDonutChart(material1, "Percentage", entryname, "Metals involved in contact corrosion") return gmaterial1
[docs] def generateOrganicMaterialsInvolvedInContactCorrosion(gv) -> str: graph=gv["graph"] data=gv["data"] cdata=gv["cdata"] # generate contact material 2 count (organic) cmaterials2=[] # get unique organic materials for matno in range(1,5): tmpmat=cdata.getFieldUniqueValues(data,"organictype"+str(matno)) for material in tmpmat: if material not in cmaterials2: cmaterials2.append(material) cmaterial2count={} cmaterial2=[] cmaterial2total=0 # get the number of rows with given material value for material in cmaterials2: # get the count - go through all possible corrosion points from 1-4 # count only those that have corrfoundN = Yes for matno in range(1,5): if material not in cmaterial2count: cmaterial2count[material]=0 cmaterial2count[material]=cmaterial2count[material]+cdata.getFieldCount(data, "organictype"+str(matno), [material], True, False, "corrfound"+str(matno), ["Yes"]) # increment total of materials found cmaterial2total=cmaterial2total + cmaterial2count[material] # add data to a usable format entryname="Organic material type" entry={} entry[entryname]=material entry["Count"]=cmaterial2count[material] cmaterial2.append(entry) # go through and add percentage mat2per=0 if cmaterial2total > 0: mat2per=100 / cmaterial2total for item in cmaterial2: item["Percentage"]=round(mat2per*item["Count"],1) # generate the graph from the data gcmaterial2=graph.generateDonutChart(cmaterial2, "Percentage", entryname, "Organic materials involved in contact corrosion") return gcmaterial2
[docs] def generateDatings(gv) -> str: graph=gv["graph"] data=gv["data"] cdata=gv["cdata"] # show datings of instruments # get unique ranges ranges=cdata.getFieldUniqueValues(data,"proddate") # sort ranges ranges=sort_human(ranges) # move "after 2001" so that it appears on top of vertical axis if "after 2001" in ranges: ranges.remove("after 2001") ranges.append("after 2001") # go through each range list and count occurrence and add to data dating=[] datingtotal=0 periodmax=0 for period in ranges: count=cdata.getFieldCount(data,"proddate",period) datingtotal=datingtotal+count dating.append({"Dating": period, "Count": count}) if count > periodmax: periodmax=count # go through and add percentage datingper=0 if datingtotal > 0: datingper=100 / datingtotal for item in dating: item["Investigated instruments (%)"]=round(datingper*item["Count"],1) # set a rangemax that is not too big rangemax=60 if periodmax < 60 else periodmax # generate datings graph gdating=graph.generateHorizontalBarChart(dating, "Investigated instruments (%)", "Dating", "Date of production", None, "Dating", ["Count","Investigated instruments (%)"], None, False, None, [0,rangemax]) return gdating
[docs] def generateDependenceOfCorrosionToContactPoint(gv) -> str: graph=gv["graph"] pertotal=gv["pertotal"] data=gv["data"] cdata=gv["cdata"] # generate dependenance of corrosion to contact point graph corrcount={} for category in ["metalcorrindcp","earlierwaterdam"]: # add counted statistics to the data set count=cdata.getFieldCount(data,category,"yes",None,True) corrcount[category]=count # store percentage in each category corrodata=[] for category in ["metalcorrindcp","earlierwaterdam"]: # calculate and add percentage corrodata.append({"Category":str(corrdb.COLUMNS[category]["name"]).capitalize(), "Count":corrcount[category], "Investigated instruments (%)":round(pertotal*corrcount[category],1)}) # generate graph gcorr=graph.generateHorizontalBarChart(corrodata, "Investigated instruments (%)", "Category", "Earlier water damage", "Category", "Category", ["Investigated instruments (%)","Count"], None, False, None, [0,100]) return gcorr
[docs] def generateProvenienceOrigin(gv) -> str: graph=gv["graph"] pertotal=gv["pertotal"] data=gv["data"] cdata=gv["cdata"] # generate provenience/origin graph # define unique geographical areas regions=["Asia","Africa","North America","South America","Europe","Antarctica","Australia","unknown"] #regions=cdata.getFieldUniqueValues(data,"prodplace") regions=sort_human(regions) regioncount=[] for region in regions: count=cdata.getFieldCount(data,"prodplace",region) regioncount.append({ "Continent":region, "Investigated instruments (%)":round(pertotal*count,1), "Count": count, }) # generate the barchart of origins gregion=graph.generateHorizontalBarChart(regioncount, "Investigated instruments (%)", "Continent", "Place of production", None, "Continent", ["Investigated instruments (%)","Count"], None, False) return gregion
[docs] def generateOrganicMaterialToMetalTypeContactCorrosionFrequency(gv) -> str: graph=gv["graph"] data=gv["data"] cdata=gv["cdata"] # generate bubble chart for contact points between metal and organic materials matincontact={} metals=[] organics=[] incontactcount=0 # go through all data and harvest combination points for rowno in data: # check for all contact points possible for no in range(1,5): # skip this data point if no contact corrosion was found if str(data[rowno]["cols"]["corrfound"+str(no)]["value"]).lower() != "yes": continue # get the values to work with metal=str(data[rowno]["cols"]["metaltype"+str(no)]["value"]).strip() organic=str(data[rowno]["cols"]["organictype"+str(no)]["value"]).strip() # we need values on both to proceed if metal == "" or organic =="": continue # save the unique color and organic types if metal not in metals and len(metals) < 2: metals.append(metal) if organic not in organics and len(organics) < 2: organics.append(organic) # create dict with combination counter information if metal not in matincontact: matincontact[metal]={} if organic not in matincontact[metal]: matincontact[metal][organic]=0 # increase count for this combination matincontact[metal][organic]=matincontact[metal][organic]+1 # add to total of contact points incontactcount=incontactcount+1 # add blank metal if less than 2 if len(metals) == 0: metals.append("") metals.append(" ") if len(metals) == 1: metals.append("") # create percentage for cp incontactper=0 if incontactcount > 0: incontactper=100 / incontactcount # make dataset with the combination points and their occurrence combinations=[] for metal in matincontact: for organic in matincontact[metal]: combinations.append({ "Metal Type": metal, "Organic Material": organic, "Count": matincontact[metal][organic], "Percentage": round(incontactper*matincontact[metal][organic],1), }) gcombinations="" if len(combinations) > 0: gcombinations=graph.generateBubbleChart(combinations, "Organic Material", "Metal Type", "Frequency of materials involved in contact corrosion", "Percentage", "Organic Material", organics, metals, "Percentage", "Percentage", False, 60, None, ["Metal Type","Organic Material","Count","Percentage"], False) if gcombinations is None: print ("ERROR:",graph.error()) return gcombinations
[docs] def generateMetalTypeToMetalColor(gv) -> str: graph=gv["graph"] data=gv["data"] cdata=gv["cdata"] # generate bubble chart for contact points between metal and corrotion color matincontact={} metals=[] colors=[] colormap={ "Brown" : "rgba(186,74,0,0.6)", "Orange" : "rgba(243,156,18,0.6)", "Red" : "rgba(255,0,0,0.6)", "Black" : "rgba(0,0,0,0.6)", "Grey" : "rgba(128,128,128,0.6)", "Gray" : "rgba(128,128,128,0.6)", "Green" : "rgba(0,128,0,0.6)", "White" : "rgba(255,255,255,0.6)", } colorcount={} # go through all data and harvest combination points for rowno in data: # check for all contact points possible for no in range(1,5): # skip this data point if no contact corrosion was found if str(data[rowno]["cols"]["corrfound"+str(no)]["value"]).lower() != "yes": continue # get the values to work with metal=str(data[rowno]["cols"]["metaltype"+str(no)]["value"]).strip() corrcolor=str(data[rowno]["cols"]["metalcolor"+str(no)]["value"]).strip() # we need values on both to proceed if metal == "" or corrcolor =="": continue # save the unique color and metal types if metal not in metals and len(metals) < 2: metals.append(metal) if corrcolor not in colors and len(colors) < 2: colors.append(corrcolor) # create dict with combination counter information if metal not in matincontact: matincontact[metal]={} if corrcolor not in matincontact[metal]: matincontact[metal][corrcolor]=0 if metal not in colorcount: colorcount[metal]=0 # increase count for this combination matincontact[metal][corrcolor]=matincontact[metal][corrcolor]+1 # increase overall metal counter colorcount[metal]=colorcount[metal]+1 # add blank metal if less than 2 if len(metals) == 0: metals.append("") metals.append(" ") if len(metals) == 1: metals.append("") # make dataset with the combination points and their occurrence color2metal=[] for metal in matincontact: # calculate percentage for this metal mper=0 if colorcount[metal] > 0: mper=100 / colorcount[metal] for corrcolor in matincontact[metal]: color2metal.append({ "Metal Type": metal, "Corrosion Color": corrcolor, "Count": matincontact[metal][corrcolor], "Percentage": round(mper*matincontact[metal][corrcolor],1), }) # save color2metal for later use gv["data_color2metal"]=color2metal gcolor2metal="" if len(color2metal) > 0: gcolor2metal=graph.generateBubbleChart(color2metal, "Corrosion Color", "Metal Type", "Colour of corrosion on specific metal types", "Count", "Corrosion Color", colors, metals, "Count", "Count", False, 60, colormap, ["Metal Type","Corrosion Color","Count","Percentage"], False) if gcolor2metal is None: print ("ERROR:",graph.error()) return gcolor2metal
[docs] def generateMetalTypeToMetalColorBarChart(gv) -> str: graph=gv["graph"] color2metal=gv["data_color2metal"] # generate the same metal corrosion to color information as a barchart - first define the colormap # all colors opaque colormap={ "Brown" : "rgba(186,74,0,1)", "Orange" : "rgba(243,156,18,1)", "Red" : "rgba(255,0,0,1)", "Black" : "rgba(0,0,0,1)", "Grey" : "rgba(128,128,128,1)", "Gray" : "rgba(128,128,128,1)", "Green" : "rgba(0,128,0,1)", "White" : "rgba(255,255,255,1)", } # now, generate the graph gcolor2metalbc=graph.generateHorizontalBarChart(color2metal, "Percentage", "Metal Type", "Metal Type To Corrosion Color Frequency", "Corrosion Color", "Corrosion Color", ["Corrosion Color","Metal Type", "Percentage","Count"], colormap) return gcolor2metalbc
[docs] def generateSignsOfDegredation(gv) -> str: graph=gv["graph"] data=gv["data"] cdata=gv["cdata"] # make signs of degradation graph degdict={}; mattypes=["metal","organic"] categories=["discolor","surfalter","structdam","nochange"] names={} names["metal"]="Metal" names["organic"]="Organic material" names["discolor"]="Discolouration" names["surfalter"]="Surface alteration" names["structdam"]="Structural damage" names["nochange"]="No change" catnames=[] for category in categories: catnames.append(names[category]) degtotal={} # go through all data and harvest for rowno in data: # check for all contact points possible for no in range(1,5): # skip this data point if no contact corrosion was found if str(data[rowno]["cols"]["corrfound"+str(no)]["value"]).lower() != "yes": continue # go through both material types and categories within the types # and check if instrument has any changes in those categories - if so count it for material in mattypes: # init material type dict if not already there if material not in degdict: degdict[material]={} if material not in degtotal: degtotal[material]=0 for category in categories: # init category count if not already set if category not in degdict[material]: degdict[material][category]=0 # get value of given category value=str(data[rowno]["cols"][material+category+str(no)]["value"]).strip() # if value for category is 1, add it to the count we already have if value == "1": # add another count to this category of degradation degdict[material][category]=degdict[material][category]+1 # count overall degradation count for this material type degtotal[material]=degtotal[material]+1 # convert totals to percentage and avoid div by zero in so doing for material in mattypes: if material in degtotal and degtotal[material] > 0: degtotal[material]=100 / degtotal[material] # now that we have the count for all material and its categories, make data array degdata=[] for material in mattypes: for category in categories: # add this to the degdata degdata.append({ "Material": names[material], "Category": names[category], "Count": degdict[material][category], "Percentage": round(degtotal[material]*degdict[material][category],1) }) # generate the barchart of signs of degradation gsigndegred=graph.generateHorizontalBarChart(degdata, "Percentage", "Material", "Signs of degradation", "Category", "Category", ["Material","Percentage","Count"], None, False) return gsigndegred
[docs] def generateConditionOfContactPoint(gv) -> str: graph=gv["graph"] data=gv["data"] # create condition of cp graph condlist=[0,0,0,0,0] condcount=0 # go through all data and harvest for rowno in data: # check for all contact points possible for no in range(1,5): # skip this data point if no contact corrosion was found if str(data[rowno]["cols"]["corrfound"+str(no)]["value"]).lower() != "yes": continue # get the condition of cp value value=data[rowno]["cols"]["conditioncp"+str(no)]["value"] # add value is within 1 - 5 and position in array corresponds to the value 1 - 5 of the rating if value and int(value) > 0 and int(value) < 6: condlist[value-1]=condlist[value-1]+1 condcount=condcount+1 # create the list to give to the graph library conddata=[] condpercent=0 if condcount > 0: condpercent=100 / condcount for rating in reversed(range(1,6)): conddata.append({ "Rating": rating, "Percentage": round(condpercent*condlist[rating-1],1), "Count": condlist[rating-1] }) # generate the condition of cp graph itself gcondcp=graph.generateBarChart(conddata, "Rating", "Percentage", "The condition of the materials at the contact point - Ratings", "v", None, "Rating", ["Rating","Percentage","Count"], None, False) return gcondcp
[docs] def generateEarlierExchangeOrTreatmentOfInstrument(gv) -> str: graph=gv["graph"] data=gv["data"] # make earlier part- and treatment exchange graph exdict={}; extypes=["metal","organic"] excat=["earlierex","earliertreat","earliertreatnov"] exnames={} exnames["metal"]="Metal" exnames["organic"]="Organic material" exnames["earlierex"]="Earlier exchange of part" exnames["earliertreat"]="Earlier treatment" exnames["earliertreatnov"]="No earlier treatment visible" excatnames=[] for category in excat: excatnames.append(exnames[category]) extotal={} # go through all data and harvest for rowno in data: # check for all contact points possible for material in extypes: # init material type dict if not already there if material not in exdict: exdict[material]={} if material not in extotal: extotal[material]=0 # go throuch each category for each material for category in excat: # init category count if not already set if category not in exdict[material]: exdict[material][category]=0 # get value of given category value=str(data[rowno]["cols"][material+category]["value"]).strip() # if value for category is 1, add it to the count we already have if value == "1": # add another count to this category of exchange/treatment exdict[material][category]=exdict[material][category]+1 # count overall exchange/treatment count for this material type extotal[material]=extotal[material]+1 # convert totals to percentage and avoid div by zero in so doing for material in extypes: if extotal[material] > 0: extotal[material]=100 / extotal[material] # now that we have the count for all material and its categories, make data array exdata=[] for material in extypes: for category in excat: # add this to the degdata exdata.append({ "Material": exnames[material], "Category": exnames[category], "Count": exdict[material][category], "Percentage": round(extotal[material]*exdict[material][category]) }) # generate the barchart of earlier exchange/treatment gexchange=graph.generateHorizontalBarChart(exdata, "Percentage", "Material", "Earlier treatments", "Category", "Category", ["Material","Percentage","Count"]) return gexchange
[docs] def generateRelativeHumidity(gv) -> str: graph=gv["graph"] data=gv["data"] pertotal=gv["pertotal"] # create relative humidity graph # create rh-list with 11 datapoints which corresponds to the # the division of 100% into 10 equal sub-value ranges (100% being a datapoint of its own) rhlist=[0,0,0,0,0,0,0,0,0,0,0] # go through each row and collect data for rowno in data: # get the rh value for this row value=str(data[rowno]["cols"]["rh"]["value"]).strip() value=int(value) if value >= 0 and value <= 100: # locate which subrange value belongs to subrange=math.floor(value / 10) # increase count in given subrange rhlist[subrange]=rhlist[subrange]+1 # create the data for the graph itself rhdata=[] for subrange in range(0,11): # add the value for this subrange if subrange == 10: rangetxt="100" else: rangetxt=str(subrange*10)+"-"+str((subrange*10)+9) rhdata.append({ "Relative humidity (%)": rangetxt, "Investigated instruments (%)": round(pertotal*rhlist[subrange],1), "Count": rhlist[subrange] }) # generate the barchart of earlier exchange/treatment grh=graph.generateHorizontalBarChart(rhdata, "Investigated instruments (%)", "Relative humidity (%)", "The environmental situation", "Relative humidity (%)", "Relative humidity (%)", ["Relative humidity (%)","Investigated instruments (%)","Count"], None, False, None, None, False) return grh
[docs] def generateFunctionOfMaterials(gv) -> str: graph=gv["graph"] data=gv["data"] cdata=gv["cdata"] # generate the function of the materials graph funcmat={} locations=[] loccount={} # go through all data and harvest combination points for rowno in data: # check for all contact points possible for no in range(1,5): # skip this data point if no contact corrosion was found if str(data[rowno]["cols"]["corrfound"+str(no)]["value"]).lower() != "yes": continue for material in ["metal","organic"]: # get the values to work with location=str(data[rowno]["cols"][material+"loc"+str(no)]["value"]).strip() # we need value to proceed if location == "": continue # save the unique locations if location not in locations: locations.append(location) # create dict with combination counter information if material not in funcmat: funcmat[material]={} if location not in funcmat[material]: funcmat[material][location]=0 if material not in loccount: loccount[material]=0 # increase count for this combination funcmat[material][location]=funcmat[material][location]+1 # increase overall location counter for a given material loccount[material]=loccount[material]+1 # make dataset mat2loc=[] for material in funcmat: # calculate percentage for this material mper=0 if loccount[material] > 0: mper=100 / loccount[material] for location in locations: # set initial count to zero, so we have value even if location is not # available in the material count=0 if location in funcmat[material]: count=funcmat[material][location] materialstr="Metal" if material == "organic": materialstr="Organic material" mat2loc.append({ "Material Type": materialstr, "Location": location, "Count": count, "Function of materials (%)": round(mper*count,1), }) gmat2loc="" if len(mat2loc) > 0: # now, generate the graph gmat2loc=graph.generateHorizontalBarChart(mat2loc, "Function of materials (%)", "Material Type", "The function of the materials involved in contact corrosion", "Location", "Location", ["Location","Material Type", "Function of materials (%)","Count"]) if gmat2loc is None: print ("ERROR:",graph.error()) return gmat2loc
[docs] @route("/import") @auth_basic(is_authenticated) def import_data() -> str: """ Import data from files, into the DB This route requires authentication to be used. Args: None is accepted. Returns: str: The string with the rendered template code. """ return template("import")
[docs] @route("/import/upload", method="POST") @auth_basic(is_authenticated) def import_upload() -> str: """ Upload the file specified to a temporary file Args: None is accepted. Returns: str: Returns the rendered template for the route """ # connect to database cdb=connect() # some variables # get the parameters for the upload upload = request.files.get("upload") name, ext = os.path.splitext(upload.filename) if ext not in (".xlsx",".csv"): errormsg="This file does not seem to be an excel- or csv-file. Must have extension xlsx or csv." return template("error",error=True,errormsg=errormsg) # store the uploaded file to a temporary file before loading it tmp = NamedTemporaryFile(suffix = ext) tmp.seek(0) # save the uploaded file to the temporary file upload.save(tmp) # instantiate CorrInOut cio=corrinout.CorrInOut() err=False errormsg="" if str(ext).upper() == ".XLSX": # attempt to open as excel-file data = cio.import_excel_file(tmp) if type(data) is None: err=True errormsg=cio.error() elif str(ext).upper() == ".CSV": # attempt to open as csv-file when excel-file failed data = cio.import_csv_file(tmp) if type(data) is None: err=True errormsg=cio.error() # close tmp-file tmp.close() # check the data content # tag repeated/already existing data, invalid etc. data = cdb.check_data(data) # get number of keys in data rowcount=len(data.keys()) # get number of invalid keys invalidcount=0 for row in data.keys(): if data[row]["invalid"]: invalidcount=invalidcount+1 # invoke template for page and feed it with the import data return template("upload",data=data,columns=corrdb.COLUMNS,colorder=corrdb.COLORDER,\ rowcount=rowcount,invalidcount=invalidcount,error=err,errormsg=errormsg)
[docs] @route("/import/update", method="POST") @auth_basic(is_authenticated) def import_update() -> str: """ Update the database with the rows that was decided to be imported. Args: None is accepted. Returns: str: The rendered template for the update-process. """ # connect to database cdb=connect() # get data structure - convert through literal eval data=ast.literal_eval(request.forms.data) rows=len(data.keys()) # go through each row for rowno in range(1,rows+1): # attempt to get row choice data oper=request.forms.get('choices_'+str(rowno)) or "" if oper == "": continue # we got a operation choice - tag it if replace, delete, skip if oper == "replace": data[rowno]["replace"]=True if oper == "delete": data[rowno]["delete"]=True if oper == "skip": data[rowno]["skip"]=True # lets write data into database updret=cdb.update(data) # get return result ret=updret[0] count=0 # get number of rows imported to db if update was successful if ret: count=updret[1] return template("update",data=data,count=count,error=not ret,errormsg=cdb.error())
[docs] @route("/export/<path:re:excel|csv>/all", method=["GET","POST"]) @auth_basic(is_authenticated) def export_data_all(path): """ Export all data in database This route requires authentication Args: path (str): The path specifies which format to export to (csv or excel). Returns: HTTPResponse: The created file of exported data. HTTPError upon failure. """ # this path requires auth, and if auth, # we set that all fields are to be exported return export_data(path,True)
[docs] @route("/export/<path:re:excel|csv>", method=["GET","POST"]) def export_data(path,all=False): """ Export data in database to excel- or csv-format Args: path (str): The path specifies which format to export to (csv or excel). all (bool): Specifies if all (including protected data) is to be exported or not. This option is not available through the immediate route above, but must be called through the export_data_all()-function. Returns: HTTPResponse: The created file of exported data. HTTPError upon failure. """ # connect to database cdb=connect() # create file extension to use ext="xlsx" if path == "excel" else "csv" # get delimiter delimiter=";" if "delimiter" not in request.forms else request.forms["delimiter"] # get logical operator lop="AND" if "lop" not in request.forms else request.forms["lop"] # set export filename outname="corrosion_export" if "filename" not in request.forms else request.forms.filename outname=outname+"."+ext # check if we have received any search search={} for field in corrdb.COLORDER: if field in request.forms: value=str(request.forms[field]).strip() if value is not None and value !="": search[field]=value # attempt to search for the data data=cdb.search(search,lop,all) if data is None: # failed to search database - report error return template("error",error=True,errormsg=cdb.error()) # lets ask for an export cio=corrinout.CorrInOut() # generate data and return the tmp-file instance tmp=None if path == "excel": # generate excel-file tmp=cio.export2excel(data,all) else: # generate csv-file tmp=cio.export2csv(data,delimiter,all) # get the name part of the full path and filename name=os.path.basename(tmp.name) # return the generated file return static_file(name,root=gettempdir(),download=outname)
[docs] @route('/logout', method=["GET", "POST"]) @auth_basic(is_authenticated) def logout(): """ Logout the admin user of the corrosion web site Have to already be authenticated before this function can run. Args: None is accepted. Returns: HTTPResponse: 401 error message with successful logout... """ abort(401, "Logged out successfully...")
if ((__name__ == '__main__') and ("CORROSION_PATH" not in os.environ)): run(host='localhost', port=8080, debug=True, reloader=True)