Source code for ci_rvm.test_ci_rvm

'''
Created on 18.09.2019

@author: Samuel
'''
import sys
from itertools import repeat, starmap
from multiprocessing import Pool
from collections import defaultdict
from functools import partial

import numpy as np
from scipy import optimize as op, integrate, sparse
from scipy.stats import chi2, nbinom

try:
    from matplotlib import pyplot as plt
except ModuleNotFoundError:
    raise ModuleNotFoundError("The package matplotlib must be installed to run the test.")

try:
    from numdifftools import Gradient, Hessian
    import autograd.numpy as ag
except ModuleNotFoundError:
    raise ModuleNotFoundError("This module requires numdifftools and autograd "
                              "for numerical and automatic differenciation. "
                              "The packages are available on the Python package"
                              "index.")

try:
    from .ci_rvm import find_CI_bound, CounterFun, find_CI as find_CI_new, find_function_CI
    from .__ci_rvm_mpinv import find_profile_CI_bound as find_profile_CI_bound_mpinv
except ImportError:
    from ci_rvm import find_CI_bound, CounterFun, find_CI as find_CI_new, find_function_CI
    from __ci_rvm_mpinv import find_profile_CI_bound as find_profile_CI_bound_mpinv

if __name__ == '__main__':
    try:
        # if vemomoto_core_tools is installed, you can call the module with an 
        # argument specifying a log file to which the output is written in addition
        # to the console.
        from vemomoto_core.tools.tee import Tee
        if len(sys.argv) > 1:
            teeObject = Tee(sys.argv[1])
    except ModuleNotFoundError:
        pass

np.random.seed()

CONVERT = True
ALPHA = 0.95
[docs]def convertPos(x, indices=None, convert=CONVERT): if not convert: return x x = x.copy() """ res = x[indices] cons = res > 50 if not cons.any(): res[:] = np.log(np.exp(res)+1) else: res2 = np.log(np.exp(res)+1) res2[cons] = res[cons] res = res2 x[indices] = res return x """ x[indices] = np.exp(x[indices]) return x
[docs]def convertPos_(x, convert=CONVERT): if not convert: return x return np.exp(x) #!!!!!!! if x <= 50: return np.log(np.exp(x)+1) else: return x
[docs]def convertPosInv(x, indices=None, convert=CONVERT): if not convert: return x x = x.copy() res = x[indices] """ cons = res > 50 res2 = np.log(np.exp(res)-1) if cons.any(): res2[cons] = res[cons] """ res2 = np.log(res) #!!!!!!!! res2[~np.isfinite(res2)] = -10 #000 x[indices] = res2 return x
[docs]def convertPosInv_(x, convert=CONVERT): if not convert: return x res = np.log(x) if not np.isfinite(res): res = -10 return res if not convert or x > 50: return x res = np.log(np.exp(x)-1) if not np.isfinite(res): res = -10000 return res
[docs]class TrackFun(): def __init__(self, fun): self.track = [] self.fun = fun def __call__(self, args): self.track.append(args.copy()) return self.fun(args)
[docs]def venzon_moolgavkar(index, direction, x0, fun, jac, hess, target=None, alpha=0.95, fun0=None, hess0=None, nmax=200, epsilon=1e-6, disp=True, track_x=False): if fun0 is None: fun0 = fun(x0) if target is None: diff = chi2.ppf(alpha, df=1)/2 target = fun0-diff step_scale = direction x_track = [] # TODO: handle singular matrices (check determinant, return NaN), extend algorithm to be more robust i = 0 try: # preparation if fun0 is None: fun0 = fun(x0) target_diff = fun0-target if hess0 is None: dl2_dx2_0 = hess(x0) else: dl2_dx2_0 = hess0 considered = np.ones_like(x0, dtype=bool) considered[index] = False hessConsidered = np.ix_(considered, considered) # test if False: f = lambda x: (np.square(fun(x)-target) + np.sum(np.square(jac(x)[considered]))) print(op.minimize(f, x0)) # x is the parameter vector # w is the vector of nuisance parameters (x without row "index") # b is the parameter of interest, which is x[index] # l is fun # dl_d* is the derivative of l w.r.t. * # dl2_d*2 is the second derivative of l w.r.t. * # choosing the first step x1 dl2_dw2 = dl2_dx2_0[hessConsidered] dl2_dbdw = dl2_dx2_0[index][considered] dl2_dw2_inv = np.linalg.inv(dl2_dw2) dw_db = -np.dot(dl2_dw2_inv, dl2_dbdw) factor = (np.sqrt(target_diff / (-2*(dl2_dx2_0[index, index] - np.dot(np.dot(dl2_dbdw, dl2_dw2_inv), dl2_dbdw)))) * step_scale) if np.isnan(factor): factor = 0.5 #raise np.linalg.LinAlgError() init_direction = np.zeros_like(x0) init_direction[considered] = dw_db init_direction[index] = 1 x = x0 + factor*init_direction # iteration for i in range(nmax): if track_x: x_track.append(x.copy()) l = fun(x) dl_dx_ = jac(x) violation = max(np.max(np.abs(dl_dx_[considered])), np.abs(l-target)) if disp: #print(x) print("iteration {}: fun_diff={}; jac_violation={}; direction={}".format( i, l-target, np.max(np.abs(dl_dx_[considered])), step_scale)) if violation < epsilon: break elif np.isnan(violation): raise np.linalg.LinAlgError() D = dl2_dx2 = hess(x) dl2_dx2_ = dl2_dx2.copy() dl2_dx2_[index] = dl_dx_ dl_dx_[index] = l-target dl2_dx2__inv = np.linalg.inv(dl2_dx2_) g = dl2_dx2__inv[:,index] v = np.dot(dl2_dx2__inv, dl_dx_) Dg = np.dot(D, g) vDg = np.dot(v, Dg) gDg = np.dot(g, Dg) vDv = np.dot(np.dot(v, D), v) p = 2 * (vDg-1) / gDg q = vDv / gDg _s = p*p/4 - q if _s >= 0: _s = np.sqrt(_s) s_ = - p/2 s_1 = s_ + _s s_2 = s_ - _s v_sg_1 = v + s_1*g v_sg_2 = v + s_2*g measure1 = np.abs(np.dot(np.dot(v_sg_1, dl2_dx2_0), v_sg_1)) measure2 = np.abs(np.dot(np.dot(v_sg_2, dl2_dx2_0), v_sg_2)) if measure1 < measure2: x -= v_sg_1 else: x -= v_sg_2 else: # TODO: do line search here! x -= 0.1 * v else: l = fun(x) dl_dx_ = jac(x) violation = max(np.max(np.abs(dl_dx_[considered])), np.abs(l-target)) return op.OptimizeResult(x=x, fun=l, jac=dl_dx_, violation=violation, success=False, status=1, nfev=nmax+1, njev=nmax+1, nhev=nmax, nit=nmax, x_track=np.array(x_track), message="Iteration limit exceeded" ) return op.OptimizeResult(x=x, fun=l, jac=jac(x), violation=violation, success=True, status=0, nfev=i, njev=i, nhev=i, nit=i, x_track=np.array(x_track), message="Success" ) except np.linalg.LinAlgError: return op.OptimizeResult(x=x0+np.nan, fun=np.nan, jac=x0+np.nan, violation=np.nan, success=False, status=2, nfev=i, njev=i, nhev=i, nit=i, x_track=np.array(x_track), message="Ill-conditioned matrix" )
[docs]def wald(x, hess, direction=None, alpha=ALPHA): for _ in [None]: diff = np.nan try: hinv = np.linalg.inv(hess) except Exception: message = "H is singular" break try: diff = np.sqrt(-np.diag(hinv) * chi2.ppf(alpha, df=1)) except Exception: message = "H is not negative definite" break else: message = "success" success = message == "success" if not direction: if not success: return None, None return x-diff, x+diff elif direction==1: res = x+diff else: res = x-diff return op.OptimizeResult(x=res, fun=np.nan, success=success, nfev=0, njev=0, nhev=1, nit=0, x_track=[], f_track=[], message=message )
[docs]def root(index, fun, x0, alpha=ALPHA, method="df-sane"): #method='df-sane'): x2 = np.delete(x0, index) fun0 = fun(x0) target = fun0-chi2.ppf(alpha, df=1)/2 def PL(xi): x = x2 fun_nuiscance = lambda xx: -fun(np.insert(xx, index, xi)) res = op.minimize(fun_nuiscance, x) x[:] = res.x print(xi, -res.fun-target) return -res.fun-target fullres = op.root(PL, x0[index], method=method) fullres.x = np.insert(x2, fullres.x, index)
[docs]def binsearch(index, direction, x0, fun, jac=None, alpha=ALPHA, initstep=1, resulttol=1e-3, infstep=1e10, stepn=200, checkNo=5): fun = CounterFun(fun) if jac is not None: jac = CounterFun(jac) jac_nuiscance = lambda x: -np.delete(jac(np.insert(x, index, xi)), index) else: jac_nuiscance = None xi = x0[index] + direction*initstep xi0 = x0[index] fun_nuiscance = lambda x: -fun(np.insert(x, index, xi)) fun0 = fun(x0) target = fun0-chi2.ppf(alpha, df=1)/2 xi_above = xi0 xi_below = None x = np.delete(x0, index) f_trace = [fun0] x_trace = [x0] checkit = checkNo for i in range(stepn): opres = op.minimize(fun_nuiscance, x, jac=jac_nuiscance) f = -opres.fun print("{}: x_{}_d={:6.3f}, f_d={:6.3f}".format(i, index, xi-xi0, f-target)) if np.isnan(f): f = -np.inf f_trace.append(f) x_trace.append(np.insert(opres.x, index, xi)) if checkit < 0 and f > target: checkit = checkNo xi_below = None if f < target: xi_below = xi checkit = checkNo else: xi_above = xi x = opres.x checkit -= 1 if xi_below is not None: if np.allclose(xi_above, xi_below, atol=resulttol): message = "success" success = True break if checkit == 0: xi = xi_below checkit -= 1 else: xi = (xi_above+xi_below) / 2 else: if np.abs(xi-xi0) > infstep and f > target: message = "unbounded" success = True break initstep *= 10 xi += direction*initstep else: message = "iteration limit exceeded" success = False result = op.OptimizeResult(x=np.insert(x, index, xi), fun=f, success=success, nfev=fun.evals, njev=(0 if jac is None else jac.evals), nhev=0, nit=i, x_track=np.array(x_trace), f_track=np.array(f_trace), message=message ) #print(result) return result
[docs]def gridsearch(index, direction, x0, fun, jac, hess, step=0.2, alpha=ALPHA, resulttol=1e-3, stepn=200, infstep=1000): f = fun0 = fun(x0) target = fun0-chi2.ppf(alpha, df=1)/2 fun = CounterFun(fun) xi = xi0 = x0[index] dim = x0.size A = sparse.dok_matrix((dim, dim)) A[index, index] = 1 target_fun = lambda p: -fun(p) def target_fun(p): res = -fun(p) #print(res+target, p[index]-xi) #, np.round(p,2) return res target_jac = CounterFun(lambda p: -jac(p)) target_hess = CounterFun(lambda p: -hess(p)) x = x0 bound0 = np.full_like(x0, -np.inf) bound1 = np.full_like(x0, np.inf) for i in range(stepn+1): if i == stepn: x_prev = x f_prev = f xi += direction*infstep else: xi += direction*step #bounds = (-np.inf, xi) if direction < 0 else (xi, np.inf) bound0[index] = bound1[index] = xi bounds = (bound0, bound1) constraints = op.LinearConstraint(A, *bounds) res = op.minimize(target_fun, x, jac=target_jac, hess=target_hess, constraints=constraints, options=dict(maxiter=500), tol=resulttol, method="trust-constr") #, gtol=resulttol f = -res.fun print("{}: x_{}_d={:6.3f}, f_d={:6.3f}".format(i, index, xi-xi0, f-target)) if f <= target: if step > resulttol: xi -= direction*step step /= 2 else: message = "Success" #xi -= direction*step/2 success = True break else: x = res.x else: if f < target: message = "iteration limit exceeded" success = False x = x_prev f = f_prev else: message = "Result is unbounded" success = True result = op.OptimizeResult(x=x, fun=f, success=success, nfev=fun.evals, njev=target_jac.evals, nhev=target_hess.evals, nit=i, message=message ) return result
[docs]def bisection(index, direction, x0, fun_, jac=None, alpha=ALPHA, initstep=1, resulttol=1e-3, constrainedMin=True, infstep=1e10, stepn=200): fun = CounterFun(fun_) xi = x0[index] + direction*initstep xi0 = x0[index] jac_nuiscance = None if constrainedMin: fun_nuiscance = lambda x: -fun(x) if jac is not None: jac = CounterFun(jac) unit = np.eye(1, x0.size, index).ravel() #def jac_nuiscance(x): # print(x) # jj = -jac(x) # return jj #jac_nuiscance = CounterFun(jac_nuiscance) jac_nuiscance = CounterFun(lambda x: -jac(x)) constraints = dict(type="eq", fun=lambda x: x[index]-xi, jac=lambda x: unit) dim = x0.size A = sparse.dok_matrix((dim, dim)) A[index, index] = 1 else: constraints = None fun_nuiscance = lambda x: -fun(np.insert(x, index, xi)) if jac is not None: jac = CounterFun(jac) jac_nuiscance = lambda x: -np.delete(jac(np.insert(x, index, xi)), index) fun0 = fun(x0) target = fun0-chi2.ppf(alpha, df=1)/2 f_above = fun0 xi_above = xi0 f_below = None xi_below = None f_trace = [fun0] xi_trace = [xi0] x_trace_full = [x0] if not constrainedMin: x_trace = [np.delete(x0, index)] else: x_trace = x_trace_full for i in range(stepn): #if constrainedMin: # bounds = (-np.inf, xi) if direction < 0 else (xi, np.inf) # constraints = op.LinearConstraint(A, *bounds) x = x_trace[np.argmin(np.abs(xi-np.array(xi_trace)))] x[np.isnan(x)] = x_trace[0][np.isnan(x)] for j in range(20): try: opres = op.minimize(fun_nuiscance, x, jac=jac_nuiscance, constraints=constraints) if not np.isnan(opres.fun): break except Exception: pass xi = (xi + xi_above) / 2 else: message = "Error in function or jacobian" success = False break x_trace.append(opres.x) if not constrainedMin: x_trace_full.append(np.insert(opres.x, index, xi)) f = -opres.fun f_trace.append(f) xi_trace.append(xi) print("{}: x_{}_d={:6.3f}, f_d={:6.3f}".format(i, index, xi-xi0, f-target)) if np.isnan(f): print("Nan") if (xi_below is not None and (np.allclose(f, target, atol=resulttol) or np.allclose(xi_above, xi_below, atol=resulttol))): message = "success" success = True break elif np.abs(xi-xi0) > infstep and f > target: message = "unbounded" success = True break two_point = np.sum(np.array(f_trace)>target) + (f_below is not None) < 3 if f >= target and f_below is not None: if f_below > target: if f < f_below: f_above = f_below xi_above = xi_below else: two_point = True f_below = f xi_below = xi else: f_above = f xi_above = xi else: if f_below is not None and f_below > target: f_above = f_below xi_above = xi_below f_below = f xi_below = xi if two_point: p = (fun0-f)/(xi0-xi-0.5/xi0*(xi0**2-xi**2)) a = -p/(2*xi0) q = fun0-p*(xi0)-a*xi0**2 else: m = np.array([[xi0**2, xi0, 1], [xi_above**2, xi_above, 1], [xi_below**2, xi_below, 1]]) a, p, q = np.dot(np.linalg.inv(m), [fun0, f_above, f_below]) root = p**2/(4*a**2)-(q-target)/a if root < 0: initstep *= 10 xi += direction*initstep else: dirFact = 1 if a<0 else -1 xi = -p/(2*a) + dirFact * direction * np.sqrt(root) if np.abs(xi-xi0) > infstep: xi = direction*infstep*1.1 else: message = "iteration limit exceeded" success = False result = op.OptimizeResult(x=x_trace_full[-1], fun=f, success=success, nfev=fun.evals, njev=(0 if jac is None else jac.evals), nhev=0, nit=i, x_track=np.array(x_trace_full), f_track=np.array(f_trace), message=message ) print(result.x, result.fun) return result
[docs]def constrained_max(index, direction, x0, fun, jac=None, alpha=ALPHA, nit=500): fun0 = fun(x0) print("start", index, direction) target = fun0-chi2.ppf(alpha, df=1)/2 preconditioning = 0.001 target_fun = lambda x: -direction*x[index] unit = -np.eye(1, x0.size, index).ravel()*direction target_jac = lambda x: unit constraints = dict(type="eq", fun=lambda x: (fun(x)-target)*preconditioning, jac=lambda x: jac(x)*preconditioning) try: result = op.minimize(target_fun, x0, jac=target_jac, constraints=constraints, options=dict(maxiter=500)) except Exception: return op.OptimizeResult(fun=np.nan, success=False, x=x0, nit=0) print("done", index, direction) return result
[docs]def mixed_min(index, direction, x0, fun, jac=None, alpha=ALPHA): preconditioning = 0.01 fun0 = fun(x0) target = fun0-chi2.ppf(alpha, df=1)/2 f = lambda x: (2*np.square(fun(x)-target) - direction*x[index])*preconditioning if jac is not None: unit = np.eye(1, x0.size, index).ravel() j = lambda x: (4*jac(x)*(fun(x)-target) - direction*unit)*preconditioning else: j = None #j = Gradient(f, step=1e-7) result = op.minimize(f, x0, jac=j) #, method='trust-exact') print("index {:2d}: f_diff={:7.3f} (direction {})".format(index, fun(result.x)-target, direction)) result.fun = fun(result.x) print(target-result.fun) #print(result) return result
[docs]def fixedFun(f, origArg, flex): def fun(x): xx = origArg.copy().astype(x.dtype) xx[flex] = x return f(xx) if isinstance(f, CounterFun): fun.evals = lambda: f.evals return fun
[docs]class BaseTester(): def __init__(self): self.ML = None self._MLE = None self.x = None self.dataN = None self.data = None self.convert = False self.convertIndices = None self.fixed = None self.dim = None self.test_prediction = False @property def MLE(self): if self.fixed is None: return self._MLE return np.delete(self._MLE, self.fixed) @MLE.setter def MLE(self, val): self._MLE = val
[docs] def convertPos(self, x): return convertPos(x, self.convertIndices, self.convert)
[docs] def convertPosInv(self, x): return convertPosInv(x, self.convertIndices, self.convert)
@property def funs(self): return self.get_funs()
[docs] def get_funs(self): raise NotImplementedError("get_funs has not been implemented yet")
[docs]class DynamicalSystemTester(BaseTester): def __init__(self, *sim_args, **sim_kwargs): BaseTester.__init__(self) self.convert = CONVERT self.dim = 8 self.simulate(*sim_args, **sim_kwargs)
[docs] @staticmethod def rss_dyn(f, t, y, dim): tmax = np.max(t) def ff(p): fp = lambda t, x: f(t, x, p[dim:]) try: res = np.sum(np.square( integrate.solve_ivp(fp, (0, tmax), p[:dim], rtol=1e-9, atol=1e-11, dense_output=True, vectorized=True).sol(t) - y)) except Exception: return 1000000 #print(res) return res return ff
[docs] @staticmethod def f_constr(f, t, y, dim, fact, target, tol): tmax = np.max(t) tt = np.linspace(0, tmax, 1000) def ff(p): fp = lambda t, x: f(t, x, p[dim:-1]) try: sol = integrate.solve_ivp(fp, (0, tmax), p[:dim], rtol=1e-9, atol=1e-11, dense_output=True, vectorized=True) res = fact*np.sum(np.square(sol.sol(t) - y)) + (p[-1]-np.mean(sol.sol(tt)[0]))**2/tol**2*target except Exception: res = 1000000 #print(res) return res return ff
[docs] @staticmethod def f_prime(t, x, params): r, K, a, h, c, d = params x, y = x conversion = a*y*x/(1+h*x) return np.array((r*x*(1-x/K) - conversion, (c*conversion-y*d)))
[docs] def get_rss(self): rss_ = DynamicalSystemTester.rss_dyn(DynamicalSystemTester.f_prime, *self.data, 2) return lambda p: rss_(convertPos(p))
[docs] def get_funs(self): rss = self.get_rss() fact = self.fact f_ = lambda x: fact*rss(x) if self.fixed is not None: MLE = self._MLE flex = np.ones_like(MLE, dtype=bool) flex[self.fixed] = False f_ = fixedFun(f_, MLE, flex) f = CounterFun(f_) g = Gradient(f, step=1e-7, method='complex') h = Hessian(f, step=1e-7, method='complex') return f, g, h
[docs] def get_funs_constr(self): varMLE = self.ML / (self.dataN*2) fact = -0.5/varMLE dim = 2 f = DynamicalSystemTester.f_constr(DynamicalSystemTester.f_prime, *self.data, dim, fact, chi2.ppf(.95, df=1)/2, 1e-3) g = Gradient(f, step=1e-8, method='complex') h = Hessian(f, step=1e-8, method='complex') t = self.data[0] tmax = np.max(t) p = self.MLE sol = integrate.solve_ivp(DynamicalSystemTester.f_prime, (0, tmax), p[:dim], rtol=1e-9, atol=1e-11, dense_output=True, vectorized=True) tt = np.linspace(0, tmax, 1000) self.cMLE = np.append(self.MLE, np.mean(sol.sol(tt)[0])) return f, g, h
[docs] def simulate(self, params=None, dataN=50, tEnd=20, std=3, plot=True): if params is None: params = np.array((50, 25, 2, 200, 0.1, 0.01, 0.3, 1)) print("simulating data") t = np.linspace(0, tEnd, dataN) paramsFull = params x0, params = params[:2], params[2:] res = integrate.solve_ivp(partial(self.f_prime, params=params), (0, tEnd), x0, dense_output=True, vectorized=True) x = np.maximum(res.sol(t) + np.random.randn(*t.shape)*std, 0) self.data = [t, x] #print(x[:,-1]) rss = self.get_rss() print("creating likelihood function") def rsss(p): #print(convertPos(p)) res = rss(p) print(res) return res self.params = convertPosInv(paramsFull) print(rss(self.params)) g = Gradient(rss, method='complex') #Gradient(rss, 1e-6, num_steps=4) h = Hessian(rss, method='complex') fit = op.minimize(rsss, self.params, jac=g, hess=h) #, options=dict(maxiter=1)) print(fit) xMLE, paramsMLE = fit.x[:2], fit.x[2:] varMLE = fit.fun / dataN**2 self.fact = -0.5/varMLE self.ML = fit.fun * self.fact self.MLE = fit.x self.x = paramsFull self.dataN = dataN if plot: print("plotting") tt = np.linspace(0, tEnd, tEnd*100) resHat = integrate.solve_ivp(partial(self.f_prime, params=convertPos(paramsMLE)), (0, tEnd), convertPos(xMLE), dense_output=True, vectorized=True) for y in res.sol(tt): plt.plot(tt, y) for y in resHat.sol(tt): plt.plot(tt, y) for y in x: plt.plot(t, y, '.') plt.show()
[docs]class LogRegressTester(BaseTester): def __init__(self, *sim_args, seed=None, **sim_kwargs): BaseTester.__init__(self) np.random.seed(seed) self.simulate(*sim_args, **sim_kwargs)
[docs] def get_funs(self): covariates, switch = self.data if self.powers: covariateN = self.covariateN #f = lambda p: (-ag.sum(ag.log(1+ag.exp(-ag.sum(p[:covariateN]*covariates[switch:]**p[covariateN:-1], 1)-p[-1]))) # -ag.sum(ag.log(1+ag.exp(ag.sum(p[:covariateN]*covariates[:switch]**p[covariateN:-1], 1)+p[-1])))) f_ = lambda p: (-ag.sum(ag.log(1+ag.exp(-ag.sum(p[:covariateN]*ag.power(covariates[switch:],p[covariateN:-1]), 1)-p[-1]))) -ag.sum(ag.log(1+ag.exp(ag.sum(p[:covariateN]*ag.power(covariates[:switch],p[covariateN:-1]), 1)+p[-1])))) def f_(p): p = self.convertPos(p) return (-ag.sum(ag.log(1+ag.exp(-ag.sum(p[:covariateN]*ag.power(covariates[switch:],p[covariateN:-1]), 1)-p[-1]))) -ag.sum(ag.log(1+ag.exp(ag.sum(p[:covariateN]*ag.power(covariates[:switch],p[covariateN:-1]), 1)+p[-1])))) else: f_ = lambda p: (-ag.sum(ag.log(1+ag.exp(-ag.sum(covariates[switch:]*p[:-1], 1)-p[-1]))) -ag.sum(ag.log(1+ag.exp(ag.sum(covariates[:switch]*p[:-1], 1)+p[-1])))) if self.test_prediction: testData = self.testData tol = 0.001 target = 2 f = lambda p: (f_(p[:-1]) - (p[-1]-np.mean(1/(1+np.exp( -np.sum(p[:covariateN]*testData**p[covariateN:-2], 1) -p[-2]))))**2/tol**2*target) def f(p): diff = (p[-1]-np.mean(1/(1+np.exp( -np.sum(p[:covariateN]*testData**p[covariateN:-2], 1) -p[-2])))) res = f_(p[:-1]) - (diff/tol)**2*target if isinstance(diff, float): #print(diff) pass return res else: f = f_ f = CounterFun(f) if self.fixed is not None: MLE = self._MLE flex = np.ones_like(MLE, dtype=bool) flex[self.fixed] = False f = fixedFun(f, MLE, flex) g = Gradient(f, method='complex', step=1e-7) h = Hessian(f, method='complex', step=1e-7) else: #g = AgGradient(f) #h = AgHessian(f) g = Gradient(f, method='complex', step=1e-7) h = Hessian(f, method='complex', step=1e-7, num_steps=1) return f, g, h
[docs] def simulate(self, params=None, dataN=1000, testDataN=0, powers=True, mode="3"): print("simulating data") if params is None: if mode=="3": params = np.array([5, 0.5, -10]) elif mode=="11": params = np.array([5, 2, -1, -3, -2, 0.2, 1, 0.1, 0.2, 0.5, -1]) elif mode=="11cx": params = np.array([0.8, 0.2, -0.6, -1, -1, 0.2, 0.5, 0.1, -0.2, 0.2, 2]) powers = False if powers: covariateN = (len(params)-1)//2 else: covariateN = len(params)-1 self.dataN = dataN self.dim = len(params) if powers: self.convert = True self.convertIndices = slice((self.dim-1)//2, -1) allDataN = testDataN + dataN covariates = np.zeros((allDataN, covariateN), dtype=int) m = 5 p = 0.5 p2 = 0.2 for i in range(covariateN): if not i % 2: covariates[:,i] = np.random.negative_binomial(m, p, allDataN) else: covariates[:,i] = np.random.binomial(covariates[:,i-1], p2, allDataN) #+ np.random.poisson(0.5, allDataN) covariates = covariates+1e-16 self.testData = covariates[dataN:] covariates = covariates[:dataN] if powers: data = np.random.binomial(1, 1/(1+np.exp(-np.sum(params[:covariateN]*covariates**params[covariateN:-1], 1)-params[-1]))) else: data = np.random.binomial(1, 1/(1+np.exp(-np.sum(covariates*params[:-1], 1)-params[-1]))) print(data.mean()) if powers: print(np.round(1/(1+np.exp(-np.sum(params[:covariateN]*covariates**params[covariateN:-1], 1)-params[-1])),2)) else: print(np.round(1/(1+np.exp(-np.sum(params[:covariateN]*covariates, 1)-params[-1])),2)) self.covariateN = covariateN self.powers = powers order = np.argsort(data) covariates = covariates[order] data = data[order] switch = np.argmax(data) self.data = [covariates, switch] f, j, h = self.funs print(f(self.convertPosInv(params))) ff = lambda p: -f(p) jj = lambda p: -j(p) hh = lambda p: -h(p) fit = op.minimize(ff, self.convertPosInv(params), jac=jj) fit = op.minimize(ff, fit.x, jac=jj, hess=hh, method='trust-exact') print(j(params)) print(fit) print(params) self.test_prediction = testDataN if self.test_prediction: testP = 1/(1+np.exp(-np.sum(params[:covariateN]*self.testData**params[covariateN:-1], 1)-params[-1])) testPMLE = 1/(1+np.exp(-np.sum(fit.x[:covariateN]*self.testData**fit.x[covariateN:-1], 1)-fit.x[-1])) self.x = np.append(params, np.mean(testP)) self.MLE = np.append(fit.x, np.mean(testPMLE)) else: self.x = params self.MLE = fit.x self.ML = -fit.fun
[docs] def simulate_(self, params=None, dataN=1000, powers=True): if params is None: params = 5 if isinstance(params, int): params = np.random.randn(params) self.dim = len(params) if powers: covariateN = (len(params)-1)//2 else: covariateN = len(params)-1 if powers: params[covariateN:-1] = np.random.rand(covariateN) print("simulating data") cov = np.random.randn(covariateN, covariateN)+10 cov = np.dot(cov, cov.T) m = np.random.randn(covariateN) covariates = np.random.multivariate_normal(m, cov, dataN)/50 #covariates[:,1] = covariates[:,0] + np.random.rand(dataN)*0.01 if powers: shift = 0.01 - np.min(covariates) params[-1] -= shift covariates += shift data = np.random.binomial(1, 1/(1+np.exp(-np.sum(params[:covariateN]*covariates**params[covariateN:-1], 1)-params[-1]))) else: data = np.random.binomial(1, 1/(1+np.exp(-np.sum(covariates*params[:-1], 1)-params[-1]))) self.covariateN = covariateN self.powers = powers order = np.argsort(data) covariates = covariates[order] data = data[order] switch = np.argmax(data) self.data = [covariates, switch] f, j, h = self.funs print(f(params)) ff = lambda p: -f(p) self.x = params print(j(params)) fit = op.minimize(ff, params, jac=j, hess=h, method='trust-exact') print(fit) print(params) self.ML = -fit.fun self.MLE = fit.x
[docs]class H14Tester(BaseTester): def __init__(self, fixed=[0], vals=[25]): #fixed=[0, 1, 2], vals=[25, 0, 0]): BaseTester.__init__(self) self.convert = CONVERT self.dim = 7-len(fixed) r_source = robjects.r['source'] print("Executing R script") robjects.r('setwd("test_CI/Histone_H1")') r_source("get_f.R") #x0 = np.array([0.02, 0., 0, 0.090083433, 0.015029212, 0.001207238, 0.002303329]) x0 = np.array([0, 0.03429, 0, 0.37168, 0.05365, 0, 0.00794]) x0 = np.array([0, 0.04024, 0.00794, 0.36572, 0.05365, 0, 0 ]) x0 = np.array([0, 0, 0.00794, 0.40596, 0.04912, 0.00453, 0 ]) x0 = np.array([0, 0, 0, 4.05836e-01, 4.91267e-02, 3.79864e-03, 8.67439e-03]) #x0 = np.array([0] + [1e-3]*6) if fixed is not None: fixed = np.array(fixed, dtype=int) fixed.sort() insertIndices = fixed - np.arange(fixed.size) x0[fixed] = vals x0 = convertPosInv(x0) ff = robjects.r['f'] vals = x0[fixed] x0 = np.delete(x0, fixed) def f(x): if fixed is not None and len(fixed): x = np.insert(x, insertIndices, vals) try: return -ff(robjects.FloatVector(convertPos(x)))[0] except Exception: #rinterface.RRuntimeError: return -np.inf fun2 = lambda x: -f(x) j = Gradient(fun2, 1e-5, num_steps=2) h = Hessian(fun2, 1e-5, num_steps=2) opres = op.minimize(fun2, x0, jac=j, hess=h, method='trust-exact') print(opres) n = robjects.r['dataN'][0] var = opres.fun / n fact = 0.5/var self.constVals = (insertIndices, vals) self.MLE = opres.x self.ML = -fact * opres.fun self.fact = fact self.x = convertPos(opres.x) print("done", self.x, self.ML, self.fact)
[docs] def get_funs(self): insertIndices, vals = self.constVals from rpy2 import robjects r_source = robjects.r['source'] print("Executing R script") #robjects.r('setwd("test_CI/Histone_H1")') r_source("get_f.R") ff = robjects.r['f'] def f(x): if insertIndices is not None and len(insertIndices): x = np.insert(x, insertIndices, vals) try: return -ff(robjects.FloatVector(convertPos(x)))[0] except Exception: #rinterface.RRuntimeError: return -np.inf fact = self.fact fun = CounterFun(lambda x: fact*f(x)) if self.fixed is not None: MLE = self._MLE flex2 = np.ones_like(MLE, dtype=bool) flex2[self.fixed] = False fun = fixedFun(fun, MLE, flex2) j = Gradient(fun, 1e-5, num_steps=2) h = Hessian(fun, 1e-5, num_steps=2) return fun, j, h
[docs]def find_CI(tester, index, direction, method="RVM", f_track=False): nmax = 600 infstep = 1e3 resulttol=1e-5 if method == "Wald": tmpPos = tester.convertPos tester.convertPos = False f_, g, h = tester.funs if method == "Wald": tester.convertPos = tmpPos if f_track: f = TrackFun(f_) else: f = f_ try: if method == "RVM": res = find_CI_bound(tester.MLE, f, index, direction, g, h, nmax=nmax, apprxtol=0.5, alpha=ALPHA, disp=False, infstep=infstep) elif method == "RVM_psI": res = find_profile_CI_bound_mpinv(index, direction, tester.MLE, f, g, h, nmax=nmax, apprxtol=0.5, alpha=ALPHA, disp=False, infstep=infstep) elif method == "VM": res = venzon_moolgavkar(index, direction, tester.MLE, f, g, h, nmax=nmax, alpha=ALPHA, disp=False) elif method == "bisection": res = bisection(index, direction, tester.MLE, f, g, stepn=nmax, infstep=infstep, resulttol=resulttol) elif method == "binsearch": res = binsearch(index, direction, tester.MLE, f, g, stepn=nmax, infstep=infstep, resulttol=resulttol) elif method == "gridsearch": res = gridsearch(index, direction, tester.MLE, f, g, h, stepn=nmax, resulttol=resulttol) elif method == "Wald": res = wald(tester.MLE, h(tester.convertPos(tester.MLE)), direction) return res, 0 elif method == "mixed_min": res = mixed_min(index, direction, tester.MLE, f, g) elif method == "constrained_max": res = constrained_max(index, direction, tester.MLE, f, g) else: raise ValueError("Method {} unknown.".format(method)) if f_track: return res, f.track return res, f_.evals except Exception: return op.OptimizeResult(x=tester.MLE*np.nan, fun=np.nan, success=False, nfev=f_.evals, njev=0, nhev=0, nit=0, x_track=np.array(tester.MLE), f_track=np.array(tester.ML), message="Error when computing the CI." ), f_.evals
[docs]def find_CIs(tester, method="RVM", indices=None, printCI=True, multiprocessing=True): print("Using method", method) if indices is None: n = tester.dim indices = range(n) else: n = len(indices) #results = list(starmap(find_CI, zip(repeat(tester), list(range(4, dim))*2, [1]*dim + [-1]*dim, repeat(method)))) args = zip(repeat(tester), list(indices)*2, [-1]*n + [1]*n, repeat(method)) #args = zip([tester], [10], [1], repeat(method)) if multiprocessing: with Pool() as pool: results = pool.starmap(find_CI, args) else: results = starmap(find_CI, args) results = list(results) results2 = [] target = tester.ML-chi2.ppf(ALPHA, df=1)/2 tol = 1e-3 if printCI: print("============== {:<15}================".format(method)) for i, j in enumerate(indices): result1, feval1 = results[i] result2, feval2 = results[i+n] left = tester.convertPos(result1.x)[j] right = tester.convertPos(result2.x)[j] leftOriginal = result1.x[j] rightOriginal = result2.x[j] if printCI: print("[{:14.5f}{}, {:9.5f}, {:14.5f}{}], ({:4d}, {:4d}), ({:6d}, {:6d})".format( left, "*" if not result1.success else " ", tester.x[j], right, "*" if not result2.success else " ", result1.nit, result2.nit, feval1, feval2)) if method=="RVM" and (result1.fun < target-tol or result2.fun < target-tol): print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") print(result1.fun -(target-tol), result2.fun - (target-tol)) print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") results2.append([left, right, leftOriginal, rightOriginal, result1.fun >= target-tol, result2.fun >= target-tol, result1.success, result2.success, feval1, feval2]) if printCI: print("==============================================") print() return results2
[docs]def test_dynamical_system(): # r, K, a, h, c, d p = np.array((50, 25, 2, 200, 0.1, 0.1, 0.3, 1)) tester = DynamicalSystemTester(None) #p, std=3) find_CIs(tester, "mixed_min") find_CIs(tester, "RVM") find_CIs(tester, "constrained_max") find_CIs(tester, "Wald") find_CIs(tester, "binsearch") find_CIs(tester, "bisection") return
[docs]def test_LogRegress(methods): tester = LogRegressTester(dataN=500) #find_CI(tester, 3, 1, "constrained_max") for m in methods: find_CIs(tester, m)
[docs]def test_LogRegress_pred(): tester = LogRegressTester(testDataN=10) indices = [tester.dim] find_CIs(tester, "constrained_max", indices) find_CIs(tester, "mixed_min", indices) find_CIs(tester, "Wald", indices) find_CIs(tester, "RVM", indices) find_CIs(tester, "binsearch", indices) find_CIs(tester, "bisection", indices) return
[docs]def test_H14(): tester = H14Tester() # find_CIs(tester, "RVM") # find_CIs(tester, "RVM_psI") # find_CIs(tester, "constrained_max") # find_CIs(tester, "mixed_min") # find_CIs(tester, "binsearch") # find_CIs(tester, "bisection") # find_CIs(tester, "gridsearch") # find_CIs(tester, "Wald") find_CIs(tester, "VM") return
[docs]def create_plot(tester, considered, additional=[], methods=["RVM"], fileName=None): print("Creating figure for", tester, considered, additional) fixed = list(range(tester.dim)) for i in considered: fixed.remove(i) for i in additional: fixed.remove(i) if methods == ["RVM"]: rvmplot = True consIndices = np.sort(np.concatenate((considered, additional))) index = np.searchsorted(consIndices, considered[0]) ind2 = np.searchsorted(consIndices, considered[1]) plotinds = np.array([index, ind2]) flex = [] for i in additional: flex.append(np.searchsorted(consIndices, i)) flex = np.array(flex) tester.fixed = fixed fun = tester.funs[0] xmax = -np.inf xmin = np.inf ymax = -np.inf ymin = np.inf x, y = [], [] x2, y2 = [], [] strings = [] for method in methods: r1, track1 = find_CI(tester, index, -1, method, f_track=True) r2, track2 = find_CI(tester, index, 1, method, f_track=True) xmax = max(xmax, tester.convertPos(r2.x)[considered[0]], tester.convertPos(r1.x)[considered[0]]) xmin = min(xmin, tester.convertPos(r2.x)[considered[0]], tester.convertPos(r1.x)[considered[0]]) ymax = max(ymax, tester.convertPos(r2.x)[considered[1]], tester.convertPos(r1.x)[considered[1]]) ymin = min(ymin, tester.convertPos(r2.x)[considered[1]], tester.convertPos(r1.x)[considered[1]]) strings.append(method + ": [{:6.3f}, {:6.3f}]".format(r1.x[considered[0]], r2.x[considered[0]])) xx, yy = tester.convertPos(np.concatenate((track1[::-1], track2)).T)[considered] x.append(xx) y.append(yy) if rvmplot: xx2, yy2 = tester.convertPos(np.concatenate((r1.x_track[::-1], r2.x_track)).T)[considered] x2.append(xx2) y2.append(yy2) for s in strings: print(s) if rvmplot: xmax = min(np.max(x[0]), 99) xmin = np.min(x[0]) ymax = min(np.max(y[0]), 99) ymin = np.min(y[0]) xOrder = 10**np.floor(np.log10(xmax))/2 yOrder = 10**np.floor(np.log10(ymax)) xmax = (xmax // xOrder + 1) *xOrder xmin = max((xmin // xOrder) * xOrder, -10) ymax = (ymax // yOrder + 1) *yOrder ymin = (ymin // yOrder) * yOrder if tester.dataN >= 10000: xmin, xmax = 0, 11 else: xdiff = xmax-xmin ydiff = ymax-ymin xmax += 0.1*xdiff xmin -= 0.1*xdiff ymax += 0.1*ydiff ymin -= 0.1*ydiff #print(np.arctan(r1.x[index])/np.pi+0.5, np.arctan(r2.x[index])/np.pi+0.5) if not (r1.success and r2.success): #raise Exception("no success") pass #x, y = tester.convertPos(np.concatenate((r1.x_track[::-1], r2.x_track))).T[considered] """ xmax = np.max(x[0]) xmin = np.min(x[0]) ymax = np.max(y[0]) ymin = np.min(y[0]) """ #xmin, xmax = -100, 2 #ymin, ymax = -2, 2 #ymax = np.mean(y) #xmax = np.mean(x) #xmin = max(0, xmin) #ymin = max(0, ymin) plotn = 200 XX = np.linspace(xmin, xmax, plotn) YY = np.linspace(ymin, ymax, plotn) X, Y = np.meshgrid(XX, YY) Z = np.zeros_like(X) x0 = tester.MLE target = tester.ML-chi2.ppf(ALPHA, df=1)/2 print(target, tester.ML) figsize = (3.5, 3) rect = [0.15, 0.18, 0.75, 0.78] plt.figure(figsize=figsize).add_axes(rect) if rvmplot: for i, xx2 in enumerate(XX): print(i) for j, yy2 in enumerate(YY): if tester.convert and (xx2<0 or yy2<0): Z[j, i] = np.nan else: if additional: x02 = x0.copy() x02 = tester.convertPos(x02) x02[index] = xx2 x02[ind2] = yy2 x02 = tester.convertPosInv(x02) #x02[flex] = -x02[ind2] ff = fixedFun(lambda x: -fun(x), x02, flex) #x03 = op.basinhopping(ff, x02[flex], 10).x if len(additional) == 1: Z[j, i] = -op.minimize_scalar(ff, bounds=(-300, 100), method="bounded").fun else: Z[j, i] = -op.minimize(ff, x02).fun #Z[j, i] = -op.minimize(ff, x02[flex]).fun else: Z[j, i] = fun(tester.convertPosInv(np.array([xx2, yy2]))) Z[np.isnan(Z)] = -np.inf plt.pcolormesh(X, Y, np.maximum(Z, tester.ML-50), shading='gouraud') #cmap=cmap) #, #""" #plt.plot(xx, yy, marker = 'x', color='C5') #plt.plot(x, y, marker = 'o', color='C1') markers = ["o", "v", "s", "h", "D"] colors = ["C1", "C0", "C2", "C3", "C4", "C5"] plt.contour(X, Y, Z, levels=[target]) #cmap=cmap) #, if rvmplot: for xx, yy, marker, color, method in zip(x2, y2, markers, colors, methods): plt.plot(xx, yy, marker=marker, color=color) for xx, yy, marker, color, method in zip(x, y, markers, colors, methods): opacity = 1 if not rvmplot: if method=="RVM": color = 'k' else: opacity = 0.5 markersize = 5 else: markersize = 3 plt.plot(xx, yy, marker=marker, color=color, alpha=opacity, linewidth=0.3, label=method, markersize=markersize) plt.plot(*tester.convertPos(x0)[plotinds], marker = 'o', color="C3", label="MLE", linewidth=0) plt.xlim(np.min(X), np.max(X)) plt.ylim(np.min(Y), np.max(Y)) if rvmplot: plt.xlabel(r"$\beta_1$") plt.ylabel(r"$\alpha_1$") plt.locator_params(nticks=3, nbins=3) else: plt.xlabel(r"$x_{}$".format(considered[0])) plt.ylabel(r"$x_{}$".format(considered[1])) plt.legend() #print(xx) #print(yy) #ax.plot_surface(X, Y, Z, rstride=1, cstride=1, # linewidth=0, antialiased=True) if rvmplot: fileName = "rvm" + str(tester.dataN) if fileName: plt.savefig(fileName+".png", dpi=1000) plt.savefig(fileName+".pdf", dpi=1000) if not fileName or rvmplot: plt.show()
[docs]def benchmark_(methods, simkwargs): tester = LogRegressTester(**simkwargs) return [find_CIs(tester, method, printCI=True, multiprocessing=False) for method in methods]
[docs]def benchmark(methods, nsim=100, **simkwargs): print("Benchmarking {} realizations.".format(nsim)) print("simargs:", simkwargs) dtype = [("left", float), ("right", float), ("leftOriginal", float), ("rightOriginal", float), ("leftAdmissible", bool), ("rightAdmissible", bool), ("leftSuccess", bool), ("rightSuccess", bool), ("leftEvals", int), ("rightEvals", int)] stats = defaultdict(lambda: dict(success=[], error=[], evals=[], errorTot=[], evalsTot=[], errorReduced=[], largeErrors=[], largeErrorsTot=[])) bound = 1000 tol = 1e-3 rtol = 0.05 with Pool() as pool: for results_ in pool.starmap(benchmark_, zip(repeat(methods), repeat(simkwargs, nsim)), chunksize=1): results = np.core.records.fromarrays(np.array(results_).T, dtype=dtype) lefts = np.ma.array(results["left"], mask=~results["leftAdmissible"]) rights = np.ma.array(results["right"], mask=~results["rightAdmissible"]) results["left"] = np.maximum(results["left"], -bound) results["right"] = np.minimum(results["right"], bound) trueLeft = np.min(lefts, 1) trueLeft[trueLeft.mask] = np.nan trueLeft = trueLeft.data trueRight = np.max(rights, 1) trueRight[trueRight.mask] = np.nan trueRight = trueRight.data for method, result in zip(methods, results.T): methodStats = stats[method] methodStats["success"].extend( (result["leftSuccess"] & np.isclose(result["left"], trueLeft, rtol=rtol, atol=tol)) | ((result["leftOriginal"] <= -bound) & result["leftAdmissible"])) methodStats["success"].extend( (result["rightSuccess"] & np.isclose(result["right"], trueRight, rtol=rtol, atol=tol)) | ((result["rightOriginal"] >= bound) & result["rightAdmissible"])) leftError = np.abs(result["left"]-trueLeft) rightError = np.abs(np.abs(result["right"]-trueRight)) methodStats["error"].extend(leftError[result["leftSuccess"]]) methodStats["error"].extend(rightError[result["rightSuccess"]]) methodStats["errorTot"].extend(leftError[trueLeft > -bound]) methodStats["errorTot"].extend(rightError[trueRight < bound]) largeLeftError = ((leftError > 10) & result["leftSuccess"]) | ((trueLeft <= -bound) & (result["left"] > 10-bound)) largeRightError = ((rightError > 10) & result["rightSuccess"]) | ((trueRight >= bound) & (result["right"] < bound-10)) largeLeftErrorTot = (leftError > 10) & ~((trueLeft <= -bound) & (result["left"] < 10-bound)) largeRightErrorTot = (rightError > 10) & ~((trueRight >= bound) & (result["right"] > bound-10)) if method=='RVM' and (largeLeftError.any() or largeRightError.any()): print("####################################") print(results) print("####################################") methodStats["errorReduced"].extend(leftError[result["leftSuccess"] & (~largeLeftError)]) methodStats["errorReduced"].extend(rightError[result["rightSuccess"] & (~largeRightError)]) methodStats["largeErrors"].extend(largeLeftError) methodStats["largeErrors"].extend(largeRightError) methodStats["largeErrorsTot"].extend(largeLeftErrorTot) methodStats["largeErrorsTot"].extend(largeRightErrorTot) #methodStats["errorTot"].extend(np.abs(result["left"]-trueLeft)[result["leftOriginal"] > -bound]) #methodStats["errorTot"].extend(np.abs(result["right"]-trueRight)[result["rightOriginal"] < bound]) methodStats["evals"].extend(result["leftEvals"][result["leftSuccess"]]) methodStats["evals"].extend(result["rightEvals"][result["rightSuccess"]]) methodStats["evalsTot"].extend(result["leftEvals"]) methodStats["evalsTot"].extend(result["rightEvals"]) for method in methods: methodStats = stats[method] print("{:<15}: success={:7.5f}, error={:8.4f}, errorReduced={:8.4f}, errorTot={:8.4f}, largeErrors={:7.5f}, largeErrorsTot={:7.5f}, evals={:8.1f}, evalsTot={:8.1f}".format( method, np.nanmean(methodStats["success"]), np.nanmean(methodStats["error"]), np.nanmean(methodStats["errorReduced"]), np.nanmean(methodStats["errorTot"]), np.nanmean(methodStats["largeErrors"]), np.nanmean(methodStats["largeErrorsTot"]), np.nanmean(methodStats["evals"]), np.nanmean(methodStats["evalsTot"])))
def _transform_parameters(params): k, p = params return np.exp(k), 1/(1+np.exp(-p)) def _LL_nbinom(params, data): k, p = _transform_parameters(params) return nbinom.logpmf(data, k, p).sum() def _grad_LL_nbinom(params, data): return Gradient(_LL_nbinom)(params, data) def _hess_LL_nbinom(params, data): return Hessian(_LL_nbinom)(params, data)
[docs]def test_find_CI(): n = 100 k, p = 5, 0.1 data = np.random.negative_binomial(k, p, size=n) def LL(x): k, p = np.maximum(x, 1e-10) return nbinom.logpmf(data, k, p).sum() LLParallel = partial(_LL_nbinom, data=data) def nLLParallel(params): return -LLParallel(params) x0 = [1, 0.5] def nLL(params): return -LL(params) def function(params): return np.prod(params) functionJac = Gradient(function) functionHess = Hessian(function) jac = Gradient(LL) hess = Hessian(LL) result = op.differential_evolution(nLL, [(0, 10), (0, 1)]) resultParallel = op.minimize(nLLParallel, x0) print("Estimate: k={:5.3f}, p={:5.3f}".format(*result.x)) CI1 = find_CI_new(result.x, LL) print("All CIs:", CI1) CI2 = find_CI_new(resultParallel.x, LLParallel, parallel=True) print("All CIs parallel:", CI2) CIF0 = find_function_CI(result.x, lambda x: x[0], LL) print("CI for the first parameter (via function interface):", CIF0) CIF = find_function_CI(result.x, function, LL) print("CI for the product of all parameters:", CIF) CIF = find_function_CI(result.x, function, LL, logLJac=jac, logLHess=hess) print("CI for the product of all parameters:", CIF) CIF = find_function_CI(result.x, function, LL, functionJac=functionJac, functionHess=functionHess) print("CI for the product of all parameters:", CIF) CIF = find_function_CI(result.x, function, LL, logLJac=jac, logLHess=hess, functionJac=functionJac, functionHess=functionHess) print("CI for the product of all parameters:", CIF) CI3 = find_CI_new(result.x, LL, return_full_results=True) print("All CIs, full results:", CI3) CI4, success = find_CI_new(result.x, LL, return_success=True) print("All CIs and success:", CI4, success) CI5 = find_CI_new(result.x, LL, None, None, 1) print("Second parameter:", CI5) CI6 = find_CI_new(result.x, LL, None, None, None, 1) print("Upper bounds:", CI6) CI7 = find_CI_new(result.x, LL, None, None, None , [1, [False, True]]) print("Upper bound for the first, both bounds for the second:", CI7) CI8 = find_CI_new(result.x, LL, None, None, [0, 1], [1, -1]) print("Upper bound for the first, lower for the second:", CI8) CI9 = find_CI_new(result.x, LL, None, None, 0, 0) print("Lower bound for the first:", CI9)
if __name__ == '__main__': methods = ["Wald", "RVM", "RVM_psI", "bisection", "mixed_min", "constrained_max", "binsearch", "VM", "gridsearch"] """ # Uncomment this to test the H14 model, which requires additional R files. try: from rpy2 import robjects test_H14() except ImportError: raise ImportError("The package rpy2 must be installed in order to test " "the H14 model.") """ test_find_CI() benchmark(methods, 200, dataN=50, mode="11cx") benchmark(methods, 200, dataN=10000, mode="11") benchmark(methods, 200, dataN=10000, mode="3") sys.exit()