from warnings import warn
from functools import partial
from itertools import product
import numpy as np
from scipy import optimize
import matplotlib.pyplot as plt
from scipy.stats import vonmises
from numdifftools import Gradient, Hessian
from ci_rvm import find_CI_bound
[docs]class BaseTrafficDensityDayTime():
[docs] def pdf(self, t):
raise NotImplementedError()
[docs] def interval_probability(self, tStart=None, tEnd=None):
raise NotImplementedError()
[docs] def plot(self, show=True, scalingFactor=1):
raise NotImplementedError()
[docs] def neg_log_likelihood(self, coefficients, obsTime, shiftStart, shiftEnd):
return (- np.sum(np.log(self.pdf(obsTime, coefficients)))
+ np.sum(np.log(self.interval_probability(shiftStart, shiftEnd,
[docs] def maximize_likelihood(self, obsTime, shiftStart, shiftEnd, x0,
eqConstr = (lambda coeff:
optResult = optimize.minimize(self.neg_log_likelihood, x0,
(obsTime, shiftStart, shiftEnd),
options={'disp': False, 'ftol': 1e-08}
if not optResult.success:
warn("Optimization was not successful")
self.coefficients = optResult.x
self.AIC = 2*(optResult.fun + len(self.coefficients))
print("AIC:", self.AIC)
return optResult
[docs]class TrafficDensityDayTime_PLinear(BaseTrafficDensityDayTime):
def __init__(self, tMin, tMax, intervalNumber, coefficients=None):
if tMin >= tMax:
raise ValueError("tMax-tMin must be positive.")
self.tMin = tMin
self.tMax = tMax
self.intervalNumber = intervalNumber
self.intervalWidth = (tMax - tMin) / intervalNumber
if coefficients is not None:
if not len(coefficients) == intervalNumber+1:
raise ValueError("The coefficient array must match the number "
+ "of intervals.")
self.coefficients = np.array(coefficients)
[docs] def pdf(self, t, coefficients=None):
if coefficients is None:
coefficients = self.coefficients
if coefficients is None:
raise ValueError("No coefficients are given.")
elif type(coefficients) is not np.ndarray and hasattr(t, "__iter__"):
coefficients = np.array(coefficients)
# Add: Check whether t is in [tMin, tMax]
intervalWidth = self.intervalWidth
t = t-self.tMin
if np.any(t < 0):
raise RuntimeError("Some observation occurred outside the shifts.")
intervalIndex = (t // intervalWidth).astype(int)
modTime = t % intervalWidth
return (coefficients[list(intervalIndex)] * (1 - modTime)
+ coefficients[list(intervalIndex+1)] * modTime)
[docs] def interval_probability(self, tStart=None, tEnd=None, coefficients=None):
if coefficients is None:
coefficients = self.coefficients
if coefficients is None:
raise ValueError("No coefficients are given.")
elif type(coefficients) is not np.ndarray:
coefficients = np.array(coefficients)
tMin, tMax = self.tMin, self.tMax
if tStart is None:
tStart = tMin
if tEnd is None:
tEnd = tMax
width = self.intervalWidth
lowerParts = np.minimum(coefficients[:-1], coefficients[1:])
differences = coefficients[1:] - coefficients[:-1]
upperParts = np.abs(differences) / 2
partIntegrals = (lowerParts + upperParts) * width
if tStart is tMin and tEnd is tMax:
return np.sum(partIntegrals)
startIndex = ((tStart-tMin) // width).astype(int)
endIndex = ((tEnd-tMin) // width).astype(int)
if not hasattr(tStart, "__iter__"):
startIndex = np.array([startIndex])
if not hasattr(tEnd, "__iter__"):
endIndex = np.array([endIndex])
r = np.arange(len(partIntegrals))
mask = (startIndex[:,None] <= r) & (endIndex[:,None] > r)
integral = np.einsum('j,ij->i', partIntegrals, mask)
startModTime = (tStart-tMin) % width
endModTime = (tEnd-tMin) % width
differences = np.append(differences, 0)
integral += (
((0.5/width) * differences[startIndex]
* startModTime + coefficients[startIndex])
* (-startModTime)
+ ((0.5/width) * differences[endIndex]
* endModTime + coefficients[endIndex])
* endModTime
return integral
[docs] def maximize_likelihood(self, obsTime, shiftStart, shiftEnd):
x0 = np.ones(self.intervalNumber+1) / (self.tMax - self.tMin)
ineqConstr = lambda coeff: coeff
return super().maximize_likelihood(obsTime, shiftStart, shiftEnd, x0,
[docs] def plot(self, show=True, scalingFactor=1):
times = np.linspace(self.tMin, self.tMax, self.intervalNumber+1)
if self.coefficients is None:
raise AttributeError("self.coefficients must be defined to allow "
+ "plotting.")
plt.plot(times, self.coefficients*scalingFactor, label="Distribution with "
+ str(self.intervalNumber) + " intervals")
plt.xlabel("Day-Time [h]")
plt.ylabel("Traffic Density")
plt.legend(loc=2, frameon=False)
if show: plt.show()
[docs]class TrafficDensityDayTime_StepFun(BaseTrafficDensityDayTime):
def __init__(self, tMin, tMax, intervalNumber, coefficients=None):
if tMin >= tMax:
raise ValueError("tMax-tMin must be positive.")
self.tMin = tMin
self.tMax = tMax
self.intervalNumber = intervalNumber
self.intervalWidth = (tMax - tMin) / intervalNumber
if coefficients is not None:
if not len(coefficients) == intervalNumber:
raise ValueError("The coefficient array must match the number "
+ "of intervals.")
self.coefficients = np.array(coefficients)
[docs] def pdf(self, t, coefficients=None):
if coefficients is None:
coefficients = self.coefficients
if coefficients is None:
raise ValueError("No coefficients are given.")
elif type(coefficients) is not np.ndarray and hasattr(t, "__iter__"):
coefficients = np.array(coefficients)
# Add: Check whether t is in [tMin, tMax]
intervalWidth = self.intervalWidth
t = t-self.tMin
if np.any(t < 0):
raise RuntimeError("Some observation occurred outside the shifts.")
intervalIndex = (t // intervalWidth).astype(int)
return coefficients[list(intervalIndex)]
[docs] def interval_probability(self, tStart=None, tEnd=None, coefficients=None):
if coefficients is None:
coefficients = self.coefficients
if coefficients is None:
raise ValueError("No coefficients are given.")
elif type(coefficients) is not np.ndarray:
coefficients = np.array(coefficients)
tMin, tMax = self.tMin, self.tMax
if tStart is None:
tStart = tMin
if tEnd is None:
tEnd = tMax
tStart = np.maximum(tStart, tMin)
tEnd = np.minimum(tEnd, tMax)
width = self.intervalWidth
coefficients = np.append(coefficients, 0)
cumulativeDensity = np.cumsum(coefficients) * width
tStart -= tMin
tEnd -= tMin
restWidthStart = tStart % width
restWidthEnd = tEnd % width
startIndex = (tStart // width).astype(int)
endIndex = (tEnd // width).astype(int)
if not hasattr(tStart, "__iter__"):
startIndex = np.array([startIndex])
if not hasattr(tEnd, "__iter__"):
endIndex = np.array([endIndex])
integral = cumulativeDensity[endIndex-1] - cumulativeDensity[startIndex]
integral += (width-restWidthStart)*coefficients[startIndex]
integral += (restWidthEnd)*coefficients[endIndex]
return integral
[docs] def maximize_likelihood(self, obsTime, shiftStart, shiftEnd):
x0 = np.ones(self.intervalNumber) / (self.tMax - self.tMin)
ineqConstr = lambda coeff: coeff
return super().maximize_likelihood(obsTime, shiftStart, shiftEnd, x0,
[docs] def plot(self, show=True, scalingFactor=1):
times = np.insert(np.linspace(self.tMin, self.tMax,
0, self.tMin)
if self.coefficients is None:
raise AttributeError("self.coefficients must be defined to allow "
+ "plotting.")
plt.step(times, np.hstack([0, self.coefficients, 0])*scalingFactor,
label="Distribution with "
+ str(self.intervalNumber) + " intervals",
plt.xlabel("Day-Time [h]")
plt.ylabel("Traffic Density")
plt.legend(loc=2, frameon=False)
if show: plt.show()
[docs]class TrafficDensityVonMises(BaseTrafficDensityDayTime):
def __init__(self, location=None, kappa=None):
self.location = location
self.kappa = kappa
[docs] def interval_probability(self, tStart, tEnd, location=None,
if location is None:
location = self.location
if location is None:
raise ValueError("No mean is given.")
if kappa is None:
kappa = self.kappa
if kappa is None:
raise ValueError("No variance is given.")
return (vonmises.cdf(tEnd, kappa, location, 12/np.pi)
- vonmises.cdf(tStart, kappa, location, 12/np.pi))
[docs] def neg_log_likelihood(self, coeff, obsTime, shiftStart, shiftEnd):
location, kappa = np.abs(coeff)
return (- np.sum(vonmises.logpdf(obsTime, kappa, location,
+ np.sum(np.log(self.interval_probability(shiftStart, shiftEnd,
location, kappa))))
[docs] def maximize_likelihood(self, obsTime, shiftStart, shiftEnd, getCI=False):
x0 = np.array((12., 1.))
if not hasattr(obsTime, "__iter__") or len(np.unique(obsTime)) == 1:
optResult = optimize.OptimizeResult()
optResult.x = np.array([obsTime, 1000])
optResult.fun = -np.inf
optResult.success = True
elif not hasattr(obsTime, "__iter__") and len(np.unique(obsTime)) == 0:
optResult = optimize.OptimizeResult()
optResult.x = x0
optResult.fun = np.nan
optResult.success = True
ineqConstr1 = lambda coeff: coeff
ineqConstr2 = lambda coeff: 24-coeff[0]
fun = partial(self.neg_log_likelihood,
obsTime=obsTime, shiftStart=shiftStart,
optResult = optimize.differential_evolution(
[(0,24), (0, 10)],
optResult = optimize.minimize(fun, x0,
options={'disp': False, 'ftol': 1e-08}
if getCI:
x0 = optResult.x
ffun = lambda x: -fun(x)
jac, hess = Gradient(ffun), Hessian(ffun)
dim = len(x0)
CIs = np.zeros((dim, 2))
fun0 = -optResult.fun
hess0 = hess(x0)
for i, j in product(range(dim), range(2)):
direction = j*2-1
op_result = find_CI_bound(x0, ffun, i, direction, jac, hess,
fun0=fun0, hess0=hess0)
CIs[i, j] = op_result.x[i]
print("Confidence intervals:")
print("Optimization result:")
if optResult.fun < 0:
self.neg_log_likelihood(optResult.x, obsTime, shiftStart, shiftEnd)
if not optResult.success:
raise RuntimeError("Optimization was not successful")
self.location, self.kappa = optResult.x
self.AIC = 2*(optResult.fun + 2)
self.negLL = optResult.fun
print("AIC:", self.AIC)
return optResult
[docs] def plot(self, normInterval=None, show=True, fileName=None):
if normInterval is None:
times = np.linspace(0, 24, 5000)
normFact = 1
times = np.linspace(*normInterval, 5000)
normFact = 1/self.interval_probability(*normInterval)
if self.location is None or self.kappa is None:
raise AttributeError("location and kappa must be defined to allow "
+ "plotting.")
plt.plot(times, self.pdf(times)*normFact,
label="Von Mises Distribution")
plt.xlabel("Day-Time [h]")
plt.ylabel("Traffic Density")
plt.legend(loc=2, frameon=False)
if fileName is not None:
plt.savefig(fileName + ".png", dpi=1000)
plt.savefig(fileName + ".pdf")
if show: plt.show()
[docs] def pdf(self, t):
return vonmises.pdf(t, self.kappa, self.location, 12/np.pi)
[docs] def sample(self, n=None):
return np.random.vonmises((self.location-12)*np.pi/12, self.kappa,
n)*(12/np.pi) + 12
[docs]def cropData(data, left, right):
if right <= left:
raise ValueError("It must be right > left!")
croppedData = data + 0
considered = np.ones(data.shape[0], bool)
for i, row in enumerate(data):
if row[2] < left or row[2] > right:
considered[i] = False
croppedData[i, 0] = np.maximum(left, croppedData[i, 0])
croppedData[i, 1] = np.minimum(right, croppedData[i, 1])
return croppedData[considered]
[docs]def readTimeData(fileName, delimiter=",", missing_values="", crop=None):
data = np.genfromtxt(fileName, float, delimiter=delimiter, skip_header=True) #, missing_values=missing_values
if not crop == None:
data = cropData(data, *crop)
data = np.append(data, np.zeros((1,data.shape[1])), axis=0)
considered = np.ones(data.shape[0]-1, dtype=bool)
observationTimes = np.unique(data[:,:2][~np.isnan(data[:,:2])])
observationNumbers = np.zeros_like(observationTimes)
compare = data[0,:2]
index = 0
for i, row in enumerate(data):
if not (row[:2][np.logical_not(np.isnan(row[:2]))] == compare[np.logical_not(np.isnan(compare))]).all():
sStart, sEnd = row[:2]
noEnd = 0
if np.isnan(compare[0]):
minIndex = np.nanargmin(data[index:i, 2]) + index
data[index:i, 0] = data[minIndex, 2]
considered[minIndex] = False
sStart = data[minIndex, 2]
if np.isnan(compare[1]):
maxIndex = np.nanargmax(data[index:i, 2]) + index
data[index:i, 1] = data[maxIndex, 2]
considered[maxIndex] = False
sEnd = data[maxIndex, 2]
noEnd = 1
# Note how many shifts were running during each time of the day
observationNumbers[np.searchsorted(observationTimes, sStart):
np.searchsorted(observationTimes, sEnd)
-noEnd] += 1
except ValueError:
considered[index:i] = False
index = i
compare = row[:2]
data = data[:-1]
with np.errstate(invalid='ignore'):
considered &= (~np.isnan(data[:,2]) & (data[:,2] <= data[:,1])
& (data[:,2] >= data[:,0]) & (data[:,0] < data[:,1]))
return data[considered], observationTimes, observationNumbers
[docs]def fitEstimator(fileName, intervalNumber):
data, observationTimes, observationNumbers = readTimeData(fileName)
density = TrafficDensityDayTime_PLinear(np.min(data[:, 0]), np.max(data[:, 1]),
density.maximize_likelihood(data[:, 2], data[:, 0], data[:, 1])
return density
if __name__ == '__main__':
# This is the name of the file containing the data
fileName = "ExportBoaterTimes.csv"
# Read the data. They must be in the following format:
# [Shift Start],[Shift End],[Observation Time]
data, observationTimes, observationNumbers = readTimeData(fileName)
# Fit a von-Mises distribution
print("Von Mises distribution")
density2 = TrafficDensityVonMises()
density2.maximize_likelihood(data[:, 2], data[:, 0], data[:, 1])
# Fit a probability distribution without pre-defined shape.
# Use
# - TrafficDensityDayTime_StepFun for a distribution resembling a histrogram
# - TrafficDensityDayTime_PLinear for a piece-wise linear distribution
# These values determine how many intervals your histogram has.
# Feel free to add, remove, or cahnge the values. It must be positive
# integers, though. At least one number followed by a comma is required
start, end = np.min(data[:, 0]), np.max(data[:, 1])
for n in 5, 16:
print("Semi-parametric distribution with n =", n)
density = TrafficDensityDayTime_StepFun(start, end, n)
# Uncomment this, if you want a piecewise linear function instead
#density = TrafficDensityDayTime_PLinear(np.min(data[:, 0]), np.max(data[:, 1]), n)
density.maximize_likelihood(data[:, 2], data[:, 0], data[:, 1])
# Add this distribution to the final output plot, but allow to add
# further distributions later
density.plot(False, density2.interval_probability(start, end))
# Plot the von Mises distribution and finish and show the plot
# Create a plot showing how many shifts cover which time of the day
ax2 = plt.twinx()
ax2.step(observationTimes, observationNumbers, "k", label='Number of shifts')
ax2.set_ylabel("Number of Shifts")
plt.legend(loc=1, frameon=False)