'''
Created on 20.03.2020
@author: Samuel
'''
import os
import sys
import numpy as np
from matplotlib import pyplot as plt
from hybrid_vector_model.hybrid_vector_model import TransportNetwork, IDTYPE
from vemomoto_core.tools import saveobject
from vemomoto_core.tools.hrprint import HierarchichalPrinter
try:
from .test_graph import adjust_ticks
except ImportError:
from test_graph import adjust_ticks
if __name__ == '__main__':
from vemomoto_core.tools.tee import Tee
if len(sys.argv) > 1:
teeObject = Tee(sys.argv[1])
FILE_EXT = ".rrn"
SAVEEXT = ".roc"
FIGARGS = {"figsize":(4.4,4)}
TICKS = [0, 0.25, 0.5, 0.75, 1]
[docs]class RouteTester(TransportNetwork):
def __init__(self, fileName, fileNameEdges=None, fileNameVertices=None,
preprocessingArgs=None,
edgeLengthRandomization=.001,
**printerArgs):
self.fileNameSave = fileNameSave
self.result_dir = "REVC_results_" + fileName
self.figure_dir = "REVC_figures_" + fileName
self.fileName = fileName
HierarchichalPrinter.__init__(self)
if not fileNameEdges:
return
TransportNetwork.__init__(self, fileNameEdges, fileNameVertices,
destinationToDestination=False,
preprocessingArgs=preprocessingArgs,
edgeLengthRandomization=edgeLengthRandomization,
**printerArgs)
self.preprocessing()
self.extension = FILE_EXT
self.save()
[docs] def save(self, fileName=None):
if not hasattr(self, "vertices"):
return
if fileName is None:
fileName = self.fileName
self.lock = None
if fileName is not None:
self.prst("Saving the model as file", fileName + FILE_EXT)
self.save_object(fileName)
[docs] @staticmethod
def new(fileName, fileNameEdges, fileNameVertices, restart=False):
if restart:
return RouteTester(fileName, fileNameEdges, fileNameVertices)
try:
print("Loading file", fileName+FILE_EXT)
return saveobject.load_object(fileName+FILE_EXT)
except FileNotFoundError:
return RouteTester(fileName, None)
[docs] def test_empirical_validity(self, fileNameData, additionalStations=[],
level=0.02, stretchConstant=1.5, restart=False,
show=True, colorCycler=None):
baseFileName = "ROC-{}-{}".format(level, stretchConstant)
baseFileNameResults = "ROC-"
if self.result_dir:
if not os.access(self.result_dir, os.F_OK):
os.makedirs(self.result_dir)
fileName = os.path.join(self.result_dir, baseFileName)
FileNameResults = os.path.join(self.result_dir, baseFileNameResults)
else:
fileName = baseFileName
FileNameResults = baseFileNameResults
restartTmp = restart
if not restart:
try:
ROC = saveobject.load_object(fileName+SAVEEXT)
except FileNotFoundError:
restartTmp = True
if restartTmp:
ROC = self._get_empirical_validity(fileNameData, FileNameResults,
additionalStations, level,
stretchConstant, restart)
saveobject.save_object(ROC, fileName+SAVEEXT)
with open(fileName+".txt", "w") as file:
file.write(str(ROC.dtype.names) + "\n")
np.rec.array(ROC).tofile(file, "\n")
AUC = np.trapz(ROC["truePosRate"], ROC["falsePosRate"])
print("=+=+= beta: {:4.2f}; level: {:4.2f}; AUC: {:4.2f} =+=+=".format(
stretchConstant, level, AUC))
if colorCycler is None:
color = None
else:
color = colorCycler.__next__()
plt.plot(ROC["falsePosRate"], ROC["truePosRate"],
label=r'$\beta$={:3.1f} (area={:4.2f})'.format(stretchConstant, AUC),
color=color)
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.xlim([0, 1])
plt.ylim([0, 1])
if show:
plt.plot([0, 1], [0, 1], "--", color='k')
l = plt.legend(loc="lower right")
l.get_frame().set_linewidth(0.0)
if self.figure_dir:
if not os.access(self.figure_dir, os.F_OK):
os.makedirs(self.figure_dir)
fileName = os.path.join(self.figure_dir, baseFileName)
else:
fileName = baseFileName
plt.savefig(fileName + ".pdf")
if show:
plt.show()
def _get_empirical_validity(self, fileNameData, fileNameResults=None,
additionalStations=[],
level=0.02, stretchConstant=1.5,
restart=False):
self.prst("Testing the empirical validity of the algorithm.")
self.increase_print_level()
self.prst("level:", level)
self.prst("stretchConstant:", stretchConstant)
rawData = np.genfromtxt(fileNameData, delimiter=",",
skip_header = True,
dtype = {"names":["fromID", "toID",
"stationID", "count"],
"formats":[IDTYPE, IDTYPE, IDTYPE, float]},
autostrip = True)
data = np.zeros_like(rawData,
dtype={"names":["fromIndex", "toIndex",
"stationIndex", "count"],
"formats":[int, int, int, float]})
vertexIDToVertexIndex = {iD:i for i, iD
in enumerate(self.vertices.array["ID"])}
data["fromIndex"] = [vertexIDToVertexIndex[iD] for iD in rawData["fromID"]]
data["toIndex"] = [self.sinkIndexToVertexIndex[self.sinkIDToSinkIndex[iD]] for iD in rawData["toID"]]
data["stationIndex"] = [self.stationIDToStationIndex[iD] for iD in rawData["stationID"]]
data["count"] = rawData["count"]
pairs = {(fromIndex, toIndex) for fromIndex, toIndex in
data[["fromIndex", "toIndex"]]}
pairs = {pair:i for i, pair in enumerate(pairs)}
stations = {station:i for i, station in
enumerate(np.unique(data["stationIndex"]))}
stationIDs = np.zeros(len(stations)+len(additionalStations), dtype=IDTYPE)
for station, i in stations.items():
stationIDs[i] = self.stationIndexToStationID[station]
for station in additionalStations:
stationIDs[len(stations)] = station
stations[self.stationIDToStationIndex[station]] = len(stations)
coverageData = np.zeros(len(pairs), dtype=[("pair", "2int"),
("pairOD", "2int"),
("coverage",
str(len(stations))+"double")
])
origins = np.unique(data["fromIndex"])
destinations = np.unique(data["toIndex"])
vertexIndexToSourceIndex = {origin:i for i, origin in enumerate(origins)}
vertexIndexToSinkIndex = {destination:i for i, destination in enumerate(destinations)}
for pair, i in pairs.items():
coverageData[i]["pair"] = pair
coverageData[i]["pairOD"] = (vertexIndexToSourceIndex[pair[0]],
vertexIndexToSinkIndex[pair[1]])
for fromIndex, toIndex, stationIndex, count in data:
coverageData[pairs[fromIndex, toIndex]]["coverage"][stations[stationIndex]] = count
shortestDistances = None
locOptConsts = np.array([0.02, 0.03, 0.05, 0.07, 0.1, 0.13, 0.16, 0.2,
0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 1])[::-1]
result = np.zeros(locOptConsts.size+3, dtype=[("alpha", float),
("truePosRate", float),
("falsePosRate", float)])
result[0] = (1., 0., 0.)
result[-1] = (0., 1., 1.)
result[-2] = (0., 1., 1-np.mean(coverageData["coverage"] >= level))
observedCoverage = coverageData["coverage"] >= level
fileName = fileNameResults
np.savetxt(fileName+"obscov-{}.txt".format(level), observedCoverage, "%1d", delimiter=',')
np.savetxt(fileName+"stations.txt", stationIDs, "%11s", delimiter=',')
pairsIDs = np.zeros((len(pairs), 2), dtype=IDTYPE)
pairsIDs[:,0] = self.vertices.array["ID"][coverageData["pair"][:,0]]
pairsIDs[:,1] = self.vertices.array["ID"][coverageData["pair"][:,1]]
np.savetxt(fileName+"pairs.txt", pairsIDs, "%11s", delimiter=',')
#with open(fileName+"coverageData.txt", "w") as file:
# file.write(str(coverageData.dtype.names) + "\n")
# np.rec.array(coverageData).tofile(file, "\n")
positiveN = np.sum(observedCoverage)
negativeN = np.sum(~observedCoverage)
extCov = ".cov"
for i, locopt in enumerate(locOptConsts):
fileName = fileNameResults + "cov_{}_{}".format(stretchConstant,
locopt)
restartTmp = False
if not restart:
try:
coverage = saveobject.load_object(fileName+extCov)
except FileNotFoundError:
restartTmp = True
if restart or restartTmp:
if shortestDistances is None:
shortestDistances = self.find_shortest_distance_array(origins,
destinations)
_, inspectedRoutes, _ = self.find_locally_optimal_paths(origins,
destinations,
shortestDistances,
stretchConstant=stretchConstant,
localOptimalityConstant=locopt,
acceptionFactor=0.9,
rejectionFactor=1.1)
coverage = np.zeros_like(coverageData["coverage"], dtype=bool)
for j, pairOD in enumerate(coverageData["pairOD"]):
for station, k in stations.items():
coverage[j, k] = (station in inspectedRoutes and
tuple(pairOD) in inspectedRoutes[station])
saveobject.save_object(coverage, fileName+extCov)
np.savetxt(fileName+".txt", coverage, "%1d", delimiter=',')
i += 2
TPR = np.sum(observedCoverage & coverage) / positiveN
FPR = np.sum((~observedCoverage) & coverage) / negativeN
result[i] = locopt, TPR, FPR
print("=== alpha: {:4.2}; TPR: {:4.2}; FPR: {:4.2} ===".format(
locopt, TPR, FPR))
self.decrease_print_level()
return result