Source code for ci_rvm.ci_rvm

'''
'''
import warnings
import traceback
import sys
from functools import partial
from multiprocessing import Pool
from itertools import count as itercount
import numdifftools as nd

import numpy as np
import scipy.optimize as op
from scipy.stats import chi2
from scipy.optimize._trustregion_exact import IterativeSubproblem

from vemomoto_core.tools.doc_utils import inherit_doc



def __round_str(n, full=6, after=3):
    if np.isfinite(n) and n != 0:
        digipre = int(np.log10(np.abs(n)))+1
        after = min(max(full-digipre-1, 0), after)
    return ("{:"+str(full)+"."+str(after)+"f}").format(n)

[docs]def is_negative_definite(M, tol=1e-5): """ Returns whether the given Matrix is negative definite. Uses a Cholesky decomposition. """ try: r = np.linalg.cholesky(-M) if np.isclose(np.linalg.det(r), 0, atol=tol): return False return True except np.linalg.LinAlgError: return False
[docs]class FlexibleSubproblem(): """ Class representing constained quadratic subproblems """ def __init__(self, x, fun, jac, hess, hessp=None, k_easy=0.1, k_hard=0.2): """ ``k_easy`` and ``k_hard`` are parameters used to determine the stop criteria to the iterative subproblem solver. Take a look at the IterativeSubproblem class for more info. """ self.x = x self.fun = fun self.jac = jac self.hess = hess self.hessp = hessp self.k_easy = k_easy self.k_hard = k_hard
[docs] def solve(self, radius, positiveDefinite=None, tol=1e-5, jac0tol=0, *args, **kwargs): if radius == 0: return np.zeros_like(self.jac(self.x)), True subproblem = IterativeSubproblem(self.x, self.fun, self.jac, self.hess, self.hessp, self.k_easy, self.k_hard) # IterativeSubproblem runs into issues, if jac is too small if subproblem.jac_mag <= max(subproblem.CLOSE_TO_ZERO, jac0tol): j = self.jac(self.x) h = self.hess(self.x) if positiveDefinite is None: positiveDefinite = is_negative_definite(-h, tol) if positiveDefinite: x = np.zeros(j.size) else: vals, vecs = np.linalg.eigh(h) x = vecs[:,np.argmin(np.real(vals))] x *= radius jNorm = np.linalg.norm(j) if jNorm: x_alt = j / jNorm * radius if (np.linalg.multi_dot((x, h, x)) + np.dot(j, x) > np.linalg.multi_dot((x_alt, h, x_alt)) + np.dot(j, x_alt)): return x_alt, True return x, True return subproblem.solve(radius, *args, **kwargs)
[docs]class Flipper: """ Flips an array in a specified component """ def __init__( self, index: "index in which the result should be flipped", ): self.index = index def __call__(self, x): if not hasattr(x, "__iter__"): return x index = self.index x = x.copy() x[index] *= -1 if x.ndim > 1: x[:, index] *= -1 return x
[docs]def get_independent_row_indices(M: "considered matrix", jac: "ordering vector" = None, tol: "numerical tolerance" = None ) -> "boolean array of linearly independent rows": """ Returns a boolean array arr with arr[i]==True if and only if the i-th row of M is linearly independent of all other rows j with arr[j]==True. The vector jac provides an ordering of the returned indices. If M[i] and M[j] are linearly dependent, then arr[i] will be True if jac[i] >= jac[j]. Otherwise, arr[i] will be False and arr[j] will be True. """ n = M.shape[0] result = np.ones(n, dtype=bool) if jac is not None: indices = np.argsort(np.abs(jac))[::-1] else: indices = np.arange(n, dtype=int) rank = 0 for i in range(n): if np.linalg.matrix_rank(M[indices[:i+1]], tol=tol) > rank: rank += 1 else: result[indices[i]] = False if not result.any(): result[indices[0]] = True return result
[docs]class CounterFun: """ Counts how often a function has been called """ def __init__(self, fun): self.fun = fun self.evals = 0 def __call__(self, *args, **kwargs): self.evals += 1 return self.fun(*args, **kwargs)
STATUS_MESSAGES = {0:"Success", 1:"Result on discontinuity", 2:"Result is unbounded", 3:"Iteration limit exceeded", 4:"Ill-conditioned matrix", 5:"Unspecified error" }
[docs]def find_CI_bound(x0, fun, index, direction, jac = None, hess = None, alpha = 0.95, fun0 = None, jac0 = None, hess0 = None, customTarget = None, nmax = 200, nchecks = 65, apprxtol = 0.5, resulttol = 1e-3, singtol = 1e-4, minstep = 1e-5, radiusFactor = 1.5, infstep = 1e10, maxRadius = 1e4, disp = False, track_x = False, track_f = False): """Finds an end point of a profile likelihood confidence interval. Parameters ---------- x0 : float[] Maximum likelihood estimate (MLE) of the paramters. fun : callable Log-likelihood function. index : int Index of the parameter of interest. direction : int or bool If ``<=0``, the lower end point of the confidence interval is sought, else the upper end point is sought. jac : callable Gradient of :py:obj:`fun`. If `None`, it will be computed based on finite differences using numdifftools. hess : callable Hessian of :py:obj:`fun`. If `None`, it will be computed based on finite differences using numdifftools. alpha : float Desired confidence level. Must be in ``(0,1)`` fun0 : float log-likelihood at the MLE. jac0 : float[] Gradient of the log-liekelihood at the MLE. hess0 : float[][] Hessian of the log-likelihood at the MLE. customTarget : float Custom target log-likelihood l*. If this is given, :py:obj:`alpha` will be ignored. nmax : int Maximal number of iterations. nchecks : int Maximal number of trust-region changes per iteration. apprxtol : float Relative tolerance between :py:obj:`fun` and its approximation. resulttol : float Tolerance of the result (``fun`` and ``norm(jac)``). singtol : float Tolerance for singularity checks. minstep : int Controls the minimal radius of the trust region. radiusFactor : float Controls how quickly the trust region decreases. Must be in ``[1, 2]``. infstep : float Stepsize after which a parameter is deemed unestimbale. maxRadius : float Rradius of the trust region in the last iteration. disp : bool Whether to print a status message in each iteration. track_x : bool Whether to return the parameter trace. track_f : bool Whether to return the log-likelihood trace. """ nuisance_minstep = 1e-3 # ----- PREPARATION -------------------------------------------------------- if jac is None: jac = nd.Gradient(fun) if hess is None: hess = nd.Hessian(fun) # Make sure arguments and returned values are of correct type index = int(index) # in case the index is not given as integer x0 = np.asarray(x0) # in case x0 is not given as numpy array forward = direction > 0 # wrap the functions so that the correct end point is found jac2 = jac hess2 = hess if not forward: fun2 = fun flip = Flipper(index) fun = lambda x: fun2(flip(x)) jac = lambda x: flip(np.asarray(jac2(flip(x)))) hess = lambda x: flip(np.asarray(hess2(flip(x)))) x0 = x0.copy() x0[index] *= -1 else: # this is just to ensure the results are traced and returned correctly flip = lambda x: x jac = lambda x: np.asarray(jac2(x)) hess = lambda x: np.asarray(hess2(x)) # initial values if fun0 is None: fun0 = fun(x0) if jac0 is None: jac0 = jac(x0) else: jac0 = np.asarray(jac0) if hess0 is None: hess0 = hess(x0) else: hess0 = np.asarray(hess0) # flip initial values if necessary if not forward: jac0[index] *= -1 hess0 = flip(hess0) # compute the target if customTarget is None: target = fun0 - chi2.ppf(alpha, df=1)/2 else: target = customTarget i: "current iteration" = 0 x_track: "trace of parameter values" = [] f_track: "trace of log-likelihood values" = [] maximizing: "flag controlling whether we are maximizing f w.r.t. x[index]" = False nuisanceRadius: "search radius for unconstrained subproblems" = 1 # counters to keep track how often functions have been called fun = CounterFun(fun) jac = CounterFun(jac) hess = CounterFun(hess) multi_dot = np.linalg.multi_dot is_negative_definite_ = partial(is_negative_definite, tol=singtol) def fPredicted_fun(): """ Taylor approximation of f """ dx = xTmp-x return f + np.dot(J, dx) + multi_dot((dx, H, dx))/2 def JPredicted_fun(): """ Taylor approximation of jac """ return J + np.dot(H, xTmp-x) norm: "applied norm" = lambda arr: np.sqrt(np.mean(arr*arr)) def jac_approx_precise(JPredicted_, JActual_, J_, bound=apprxtol): """ Returns True, iff the predicted gradient is close to the actual one or, even better, closer to the target than predicted """ # note how far we are from the target likelihood if (not searchmode=="normal" or maximizing) and not closeToDisc: return True return norm(JActual_) / norm(J_) <= bound or norm(JPredicted_-JActual_) / norm(J_) <= bound # TODO Accept improvements only then always, if they are better by a certain constant (compare values - line search) def f_approx_precise(fPredicted, fActual, bound=apprxtol): """ Returns True, iff the predicted log-likelihood value is close to the actual one or, even better, closer to the target than predicted """ # steps to negative predicted values cannot occur in theory and are # either due to numerical issues or because we tried a very long step. # Therefore, we reject these steps right away, if fActual is below the # target. if fPredicted < target - resulttol and f > target and fActual < target: return False # return True if result is better than expected if fActual > fPredicted and xiTmp >= 0: return True # if we are maximizing w.r.t. the nuisance parameters, if searchmode=="max_nuisance" or searchmode=="max_nuisance_const" or maximizing: # we do not accept steps that are worse than the current stage # (we guarantee elsewhere that it IS possible to increase f) if fActual < f: if not relax_maximum_constraint or np.abs(fPredicted-fActual) > resulttol: return False bound *= 2 if relax_maximum_constraint: bound *= 2 elif searchmode=="normal": # if we are outside the admissible region, we enforce that # we get closer to it again if fActual < target and np.abs(fActual-target) > np.abs(target-f): return False elif np.abs(fActual-target) < resulttol and np.abs(fPredicted-target) < resulttol: return True elif fActual > f > target: # we relax the precision requirement, if the step is not of # disadvantage and brings us forward (we value progression in # the paramater of interest higher than a maximal likelihood) bound *= 2 # we require that the error relative to the distance from the target # is not larger than the given bound. return np.abs(fPredicted-fActual) / np.abs(target-f) <= bound def test_precision() -> "True, if appxomiations are sufficiently precise": """ Tests the precision of the Taylor approximation at the suggested parameter vector. """ fActual = fun(xTmp) fPredicted = fPredicted_fun() if not f_approx_precise(fPredicted, fActual): return False, fActual, None if np.abs(f-target) > resulttol and not closeToDisc: return True, fActual, None JActual = jac(xTmp) JPredicted = JPredicted_fun() if free_params_exist: JActual_ = JActual[~free_considered] JPredicted_ = JPredicted[~free_considered] # the jacobian shall be mostly precise w.r.t. the changed parameters. # However, it shoudl not be totally imprecise with respect to the # remaining parameters, either. jac_precise = (jac_approx_precise(JPredicted_, JActual_, J_) and jac_approx_precise(JPredicted[considered], JActual[considered], J[considered], 20*apprxtol)) else: JActual_ = JActual[considered] JPredicted_ = JPredicted[considered] jac_precise = jac_approx_precise(JPredicted_, JActual_, J_) return jac_precise, fActual, JActual def is_concave_down(): """ Checks whether the profile likelihood function is concave down. """ return a < 0 if disp: def dispprint(*args, **kwargs): print(*args, **kwargs) else: def dispprint(*args, **kwargs): pass # variable definitions ------------------------------------------------- x: "parameter vector at current iteration" = x0 xTmp: "potential new parameter vector" xi: "parameter of interest at current iteration" = x0[index] f: "log-likelihood at current iteration" = fun0 J: "gradient at current iteration" = jac0 H: "Hessian at current iteration" = hess0 maxFun: "maximal log-likelihood" = fun0 xi_max: "maximal value for the parameter of interest with f(xi, x_) >= f*" = xi x_max: "parameter vector where xi_max leads to a log-likelihood value >= f*" = x considered: "Array indexing nuisance parameters" = np.ones_like(x0, dtype=bool) considered[index] = False consideredOriginal = considered.copy() hessConsidered: "Array indexing nuisance Hessian" = np.ix_(considered, considered) H_: "nuisance Hessian" = H[hessConsidered] H0_: "nuisance entries of Hessian row corresponding to the parameter of interest" = H[index][considered] discontinuity_counter: "steps until discontinuous parameters are released" = 0 maximizing: "flag to memorize whether the target has been changed temporarily" = False resultmode: "denotes whether or what kind of result has been found" = "searching" radius: "radius of the trust region in the last iteration" = np.inf closeToDisc: "True if we ar close to a discontinuity" = False xi_hist = [] f_hist = [] subproblem = FlexibleSubproblem(None, lambda x: -f, lambda x: -J_ - (xiTmp-xi)*H0_, lambda x: -H_ ) a = 0 # just to prevent a possible name error... xiRadius = nuisanceRadius # Wrapper to catch potential errors without braking down completely try: # ----- ITERATIONS ----------------------------------------------------- for i in range(1, nmax+1): redoIteration = False ballsearch = False jacDiscont = False free_params_exist = False free_considered = ~considered infstepTmp = max(infstep, 10*np.abs(x0[index])) #closeToDisc = False if track_x: x_track.append(flip(x)) if track_f: f_track.append(f) # if discontinuities are found, some parameters are held constant # for a while. The discontinuity_counter keeps track of the time if discontinuity_counter: discontinuity_counter -= 1 else: considered = consideredOriginal.copy() hessConsidered = np.ix_(considered, considered) relax_maximum_constraint = False k = -1 # determine approximation J_ = J[considered] xi = x[index] H0 = H[index] H0_ = H0[considered] H00 = H[index, index] H_ = H[hessConsidered] # Determine search mode: # max_nuisance: set xi to specific value, maximize other parameters # max_nuisance_const: hold xi constant, maximize other parameters # normal: search x with f(x) = f*, J(x) = 0 # binary search: get back to the adissible region with a binary # search counter = 0 while True: counter += 1 if counter > 10: raise RuntimeError("Infinite loop", f, J, H, considered, free_considered) if is_negative_definite_(H_): H_inv = np.linalg.inv(H_) searchmode = "normal" break free_considered_ = ~get_independent_row_indices(H_, J_, tol=singtol) free_considered = ~considered free_considered[considered] = free_considered_ free_params_exist = free_considered_.any() if not free_params_exist: searchmode = "max_nuisance" break H_dd = H_[np.ix_(~free_considered_, ~free_considered_)] # special case for H_ = 0 if H_dd.size == 1 and np.allclose(H_dd, 0, atol=singtol): # though H_ is singular, we set its inverse to a very large # value to prevent NaNs that would mess up the result H_ddInv = np.array([[max(-infstep/singtol, 1/H_dd[0,0])]]) elif not is_negative_definite_(H_dd): free_considered = ~considered free_considered_[:] = False searchmode = "max_nuisance" break else: H_ddInv = np.linalg.inv(H_dd) H0_d = H0_[~free_considered_] J_d = J_[~free_considered_] H_inv = H_ddInv H0_ = H0_d J_ = J_d H_ = H_dd searchmode = "normal" break # ----- NORMAL SEARCH ---------------------------------------------- while searchmode == "normal": a = (H00 - multi_dot((H0_, H_inv, H0_)))/2 p = J[index] - multi_dot((H0_, H_inv, J_)) fPL0: "current value on profile log-likelihood" fPL0 = f - multi_dot((J_, H_inv, J_))/2 if maximizing: if (is_concave_down() and not (np.allclose(a, 0, atol=singtol) and np.allclose(((fPL0-target)/p)*(a/p), 0, atol=singtol)) ) or f < target: maximizing = False else: targetTmp = max(fPL0 + 1, (maxFun+f)/2) if not maximizing: targetTmp = target q = fPL0 - targetTmp if a == 0: sqroot = np.nan else: sqroot = (p/a)**2 - 4*q/a # if desired point is not on approximation if sqroot < 0: if (not np.allclose(a, 0, atol=singtol) and get_independent_row_indices(H, J, tol=singtol )[index]): if not maximizing and a<0 and f < targetTmp: warnings.warn("The current function value is not on the approximation. " + "This may be due to a too large singtol. " + "The algorithm may not converge. Index=" + str(index) + "; forward=" + str(forward) + "; a=" + str(a) + "; f=" + str(f) + "; root=" + str(sqroot)) assert not maximizing if not is_concave_down() and f >= target: # if beyond or very close to a local min if p >= 0 or -p/a < singtol: maximizing = True continue else: # jump over minimum by factor 2 p *= 2 sqroot = 0 # if f is approximately constant, we do not trust the # quadratic approximation too much and jsut try a very long # step, even if it goes beyond the predicted admissible # region else: a = 0 sqroot = np.inf else: sqroot = np.sqrt(sqroot) # if Hessian is singular in the considered component # this control is important to prevent numerical errors if # |a| << 1 and the summands of the root are equal order of # magnitude if (np.allclose(a, 0, atol=singtol) and np.allclose((q/p)*(a/p), 0, atol=singtol)): if (maximizing and q*p>0) or (not maximizing and f >= target and q*p > 0): maximizing = not maximizing continue else: xiTmp = xi - q/p # control for NaN case elif p == 0 and a == 0: xiTmp = np.inf else: xi1 = xi + (-p/a + sqroot) / 2 xi2 = xi + (-p/a - sqroot) / 2 # check which root is closer to current value if is_concave_down(): #not maximizing assert not maximizing xiTmp = max(xi1, xi2) elif p*a > 0 and not maximizing and f > target: maximizing = True continue elif maximizing and p/a < -singtol: maximizing = False continue elif maximizing: xiTmp = max(xi1, xi2) else: if np.abs(xi-xi1) < np.abs(xi-xi2): xiTmp = xi1 else: xiTmp = xi2 # if likelihood is independent of parameter if np.abs(xiTmp-xi) >= infstep * (1-singtol): # try a very large xi value xiTmp = xi + infstepTmp xTmp = x.copy() xTmp[index] = xiTmp xTmp[~free_considered] += -np.dot(H_inv, H0_*(xiTmp-xi)+J_) fActual = fun(xTmp) # if still larger than target if fActual >= target: JActual = jac(xTmp) resultmode = "unbounded" break if not f > target: if norm(J_) > singtol: searchmode = "max_nuisance" break searchmode = "binary_search" break xiTmp = xiTmp/4 + 3*xi/4 xTmp = x.copy() xTmp[index] = xiTmp xTmp[~free_considered] += -np.dot(H_inv, H0_*(xiTmp-xi)+J_) xx = x[~free_considered] upperRadius = np.linalg.norm(xx-xTmp[~free_considered]) # Test precision stop, fActual, JActual = test_precision() if stop: radius = max(radius, upperRadius) break if radius >= upperRadius: lowerRadius = upperRadius/2 else: lowerRadius = radius if upperRadius > 0: radius = upperRadius tmpRadius = lowerRadius xiStep = xiTmp-xi if tmpRadius == 0: xiTmp = xi + min(xiStep/2, infstep) else: xiTmp = xi + xiStep*np.power(tmpRadius/radius, np.log(2)/np.log(radiusFactor)) ballsearch = True break if not resultmode=="searching": x = xTmp J = JActual JActual_ = JActual[considered] f = fActual break if searchmode == "normal" and free_params_exist: J_new = norm(JPredicted_fun()[considered]) J_current = norm(J[considered]) # consider adjusted scenario with xiTmp = 0 J_f = J[considered][free_considered_] H_df = H[hessConsidered][np.ix_(~free_considered_, free_considered_)] J_new2 = J_f - multi_dot((H_df.T, H_ddInv, J_d)) J_new2 = np.linalg.norm(J_new2)/np.sqrt(J_.size) # if ignoring the free parameters hinders us from making an # improvement w.r.t. the gradient abstol = max(np.log2(np.abs(xiTmp-xi))*np.abs(f-target) /np.abs(maxFun-target), resulttol) if (J_new > abstol and J_new/J_current > apprxtol) or ( J_new2 > abstol and J_new2/J_current > apprxtol): free_considered = ~considered free_considered_[:] = False free_params_exist = False J_ = J[considered] H_ = H[hessConsidered] H0_ = H0[considered] searchmode = "max_nuisance" counter = 0 repeat = True while repeat: repeat = False if counter > 10: raise RuntimeError("Infinite loop", f, J, H, considered, free_considered) # ----- CONSTRAINED MAXIMIZATION ----------------------------------- if searchmode == "max_nuisance" or searchmode == "max_nuisance_const": xiTmp = xi tmpRadius = nuisanceRadius * np.sqrt(J_.size) ballsearch = True if not searchmode == "max_nuisance_const": if f > target: xiTmp += xiRadius else: searchmode = "max_nuisance_const" # ----- BALL SEARCH ------------------------------------------------ if ballsearch: increaseTrustRegion = True xiPrev = xi nAdjustement = 5 for k in range(nchecks): # it can happen that the radius is so conservative that # the nuisance parameters cannot be maximized # sufficiently far to keep f in the admissible region nRatioAdjustement = 10 for l in range(nRatioAdjustement): #print("tmpRadius", tmpRadius) xTmp_diff, _ = subproblem.solve(tmpRadius, searchmode=="normal", jac0tol=singtol) if xi > xiTmp and f>target: # Somethong went wrong. Print debug information. warnings.warn("Something went wrong. Please report this error " "and the information below to the bugtracker. " "Presumably the result is correct anyway.") try: print("searchmode, maximizing", searchmode, maximizing) print("a, p =", a, ",", p) print("q, fPL0 =", q, ",", fPL0) print("f, target, targetTmp =", f, ",", target,",", targetTmp) print("xiTmp, xi =", xiTmp, ",", xi) print("xi1, xi2 =", xi1, ",", xi2) print("xiStep, infstep =", xiStep, ",", infstep) print("tmpRadius, radius =", tmpRadius, ",", radius) print("index, direction, k, l =", index, forward, k, l ) print("xTmp[index]", xTmp[index]) print("---------------------") except Exception: pass xTmp = x.copy() xTmp[index] = xiTmp xTmp[~free_considered] += xTmp_diff fPredicted = fPredicted_fun() # if we maximize w.r.t. the nuisance parameters, # we want to assure that f gets increased to get # back to the likelihood ridge if (not searchmode == "max_nuisance" and not searchmode == "max_nuisance_const"): break precise, fActual, JActual = test_precision() if fPredicted >= f: break # if the step is super close and f still does not # increase, there must be a numerical issue. We # leave the treatment of this issue to the procedures # below. elif (np.isclose(xiTmp, xi, atol=minstep/100) and tmpRadius<minstep/100): break if l == nRatioAdjustement-2: xiPrev = xiTmp xiTmp = xi else: # we either increase the search radius for the # nuisance parameters or decrease the search # radius for the parameter of interest dependent # on whether the approximation is precise and # we can increase the trust region or not if increaseTrustRegion and precise and tmpRadius < maxRadius*np.sqrt(xTmp_diff.size): tmpRadius *= 2 else: # if the error occurs even though xi is # constant, there must be an error in the # approximate solution and we decrease the # search radius if np.isclose(xiTmp, xi, atol=minstep/100): tmpRadius /= 5 xiTmp = (xi+xiTmp) / 2 # if maximizing w.r.t. the nuisance parameters, # adjust the trust region if (searchmode == "max_nuisance" or searchmode == "max_nuisance_const"): if precise: # update trust region nuisanceRadius = tmpRadius / np.sqrt(J_.size) # update xiRadius only if we have changed xi if searchmode == "max_nuisance": if xi != xiTmp: xiRadius = max(xiTmp-xi, minstep) else: # if we had to set xiRadius to 0, we set # it to the last non-zero value xiRadius = max(xiPrev-xi, minstep) # since the xiRadius cannot recover on its # own, we always gradually increase it if 2*xiRadius < nuisanceRadius: xiRadius *= 2 # if we have not decreased the trust region # before, try to increase it if increaseTrustRegion and k < nAdjustement: xTmp2 = xTmp fActual2 = fActual xiTmp = xi + 2*xiRadius tmpRadius *= 2 tmpRadius = min(maxRadius*np.sqrt(xTmp_diff.size), tmpRadius) continue else: stop = True else: stop = False if increaseTrustRegion: if k == 0: # if in the first iteration, note that # we are decreasing the trust region increaseTrustRegion = False else: # set xTmp to the last precise value fActual = fActual2 xTmp = xTmp2 xiTmp = xTmp[index] stop = True else: stop, fActual, JActual = test_precision() if searchmode == "normal": if stop: # if the approximation is precise, we may increase # the radius, but save the largest current result # to fall back to xTmp2 = xTmp fActual2 = fActual lowerRadius = tmpRadius else: upperRadius = tmpRadius # if we have to decrease the trust region if upperRadius <= lowerRadius and not stop: # continue with the continuity checks and # cut the trust trust region by half pass # if precision of geometric binary search is as # desired (first equality necessary to prevent NaNs) elif upperRadius==0 or upperRadius/lowerRadius <= 2: # stop iteration and set xTmp to the optimal # value if lowerRadius > 0: radius = lowerRadius fActual = fActual2 xTmp = xTmp2 xiTmp = xTmp[index] break else: # update xiTmp and radius tmpRadius = np.sqrt(lowerRadius*upperRadius) xiTmp = xi + xiStep*np.power(tmpRadius/radius, np.log(2)/np.log(radiusFactor)) continue elif stop: break minstepTmp = max(minstep / max(norm(J_), 1), 1e-12) # if the step is too small diffs = xTmp - x if not stop and np.allclose(xTmp, x, atol=minstepTmp, rtol=1e-14): # if we are not really on a discontinuity but only # on a local maximum with a non-negative definite # Hessian, we may just release the constraint # that we need to get to a larger function value if (not searchmode=="normal" or maximizing) and not closeToDisc: relax_maximum_constraint = True discontinuity_counter = 20 if test_precision()[0]: break # check parameter of interest first xTmp = x.copy() xTmp[index] += diffs[index] # if small change in parameter of interest makes # approximation imprecise if diffs[index] and not test_precision()[0]: # make sure discontinuity happens on # profile function if norm(J[considered]) > resulttol: relax_maximum_constraint = False searchmode = "max_nuisance_const" closeToDisc = True xiDisc = xi repeat = True radius = tmpRadius break fActual = fun(xTmp) fCloseTarget = (np.allclose(target, f, atol=resulttol) and not np.allclose(target, fActual, atol=resulttol)) # if step is favourable or benign, we accept # depending on the distance measure this case # will never occur, as favourable steps may # always be classified as precise if fActual >= target or f < fActual: pass # if the target is on the jump elif ((f > target or fCloseTarget) and fActual < target): # stop here fPredicted = fPredicted_fun() resultmode = "discontinuous" xTmp = x JActual = J else: # get back to the feasible region by # conducting a binary search searchmode = "binary_search" dispprint("iter {:3d}{}: ***discontinuity of {} in x_{} at {}***".format( i, ">" if forward else "<", f-fActual, index, x)) break # check for discontinuities in nuisance parameters discontinuities = np.zeros_like(free_considered) discontinuities_rejected = discontinuities.copy() if not f_approx_precise(fPredicted, fActual): if diffs[index]: fPredicted = fPredicted_fun() fActual = fun(xTmp) else: fPredicted = fActual = f for l in np.nonzero(~free_considered)[0]: xTmp[l] += diffs[l] fActual_previous = fActual fActual = fun(xTmp) fPredicted = fPredicted_fun() if not f_approx_precise(fPredicted, fActual): discontinuities[l] = True if fActual < fActual_previous: discontinuities_rejected[l] = True xTmp[l] = x[l] fActual = fActual_previous jacDiscont = False # if the gradient is responsible for the rejection # we require a smaller steap elif np.abs(f-target) < resulttol and np.allclose(xTmp, x, rtol=minstepTmp/10): for l in np.nonzero(~free_considered)[0]: xTmp[l] += diffs[l] JActual_previous = JActual JActual = jac(xTmp) JPredicted = JPredicted_fun() jac_precise = jac_approx_precise(JPredicted[~free_considered], JActual[~free_considered], J[~free_considered]) if not jac_precise: discontinuities[l] = True if not (JActual_previous[l] * J[l] >= 0 or diffs[l]*J[l] <= 0): discontinuities_rejected[l] = True xTmp[l] = x[l] JActual = JActual_previous jacDiscont = True if discontinuities.any(): varStr = "variables" if not jacDiscont else "gradient entries" dispprint("iter {:3d}{}: ***index={}: discontinuities in {} {} at {}***".format( i, ">" if forward else "<", index, varStr, tuple(np.nonzero(discontinuities)[0]), x)) redoIteration = discontinuities.sum() == discontinuities_rejected.sum() if discontinuities_rejected.any(): discontinuity_counter = 25 considered &= ~discontinuities_rejected free_considered |= discontinuities_rejected free_considered_ = free_considered[considered] hessConsidered = np.ix_(considered, considered) relax_maximum_constraint = False break if tmpRadius > (radiusFactor**nchecks-k)*minstep: tmpRadius /= 5 xiTmp = (4*xi+xiTmp) / 5 else: tmpRadius /= radiusFactor xiTmp = (xi+xiTmp) / 2 else: xTmp = x JActual = J fActual = f if redoIteration: continue # ----- BINARY SEARCH ---------------------------------------------- if searchmode == "binary_search": tol = 0.01 bin_min = x bin_max = x_max for k in range(nchecks): xTmp = (bin_max+bin_min) / 2 fActual = fun(xTmp) if 0 <= fActual - target < tol: break if fActual < target: bin_min = xTmp else: bin_max = xTmp else: xTmp = bin_max JActual = jac(xTmp) # ----- UPDATES & CEHCKS ------------------------------------------- if (xTmp == x).all() and not jacDiscont and resultmode=="searching": dispprint("iter {:3d}{}: ***no improvement when optimizing x_{} at {}***".format( i, ">" if forward else "<", index, flip(x))) resultmode = "discontinuous" xiChange = xTmp[index] - xi xChange = norm(x-xTmp) # when we are maximizing the nuisance parameters, we may step # farther than the allowedbound if xiChange >= infstep and f >= target: resultmode = "unbounded" elif (searchmode == "max_nuisance" or searchmode == "max_nuisance_const" or maximizing) and (xiChange < nuisance_minstep and f > target+resulttol): if (len(xi_hist) > 5 and (np.abs(np.array(xi_hist[-5:]) - xi) < nuisance_minstep).all() and (np.abs(np.array(f_hist[-5:]) - f) < nuisance_minstep).all()): xTmp[index] += nuisance_minstep fActual = fun(xTmp) m = 0 fActual_previous = fActual xTmp_previous = xTmp.copy() while fActual > target and m < 1000: fActual_previous = fActual xTmp_previous = xTmp.copy() xTmp[index] += nuisance_minstep * 2**m fActual = fun(xTmp) m += 1 xTmp = xTmp_previous fActual = fActual_previous m = -1 while fActual < target-50 and m > -10: fActual_previous = fActual xTmp_previous = xTmp.copy() xTmp[index] += nuisance_minstep * 2**m fActual = fun(xTmp) m -= 1 JActual = jac(xTmp) dispprint("iter {:3d}{}: ***taking additional step in x_{} to avoid convergence issues. f={:6.3}***".format( i, ">" if forward else "<", index, fActual)) for m in range(1, 5): xi_hist[-m] += xTmp[index]-xi xi_hist.append(xTmp[index]) f_hist.append(fActual) x = xTmp xi = xTmp[index] if JActual is None: JActual = jac(xTmp) JImprovement = np.log2(norm(J[considered])/norm(JActual[considered])) fImprovement = np.log2(np.abs((f-target)/(fActual-target))) J = JActual f = fActual JActual_ = JActual[considered] # reset maximal value if f > target and xi > xi_max: xi_max = xi x_max = x fPredicted = fPredicted_fun() if disp: radius_str = "" if searchmode == "normal": if maximizing: mode = "maximizing" else: mode = "searching" elif (searchmode == "max_nuisance" or searchmode == "max_nuisance_const"): mode = "maximizing nuisance parameters" radius_str = "; step={}; radius={}".format( __round_str(xiRadius), __round_str(nuisanceRadius)) elif searchmode == "binary_search": mode = "binary search" else: mode = "other" ndiscont = np.sum(~considered)-1 if ndiscont: discontStr = "; ndisct={}".format(ndiscont) else: discontStr = "" if free_params_exist and free_considered_.any(): mode += " with " + str(np.sum(free_considered_) ) + " free params" jac_cnsStr = "; jac_cns={}".format(__round_str( norm(JActual[~free_considered]))) else: jac_cnsStr = "" dispprint(("iter {:3d}{}: x_{}_d={}; f_d={}; jac_d={}; " + "nsteps={:2d}; x_d={}; f_impr={}; jac_impr={}; " + "f_e={}{}{}{} - {}").format(i, ">" if forward else "<", index, __round_str(xi-x0[index]), __round_str(f-target), __round_str(norm(JActual_)), k+2, __round_str(xChange), __round_str(fImprovement), __round_str(JImprovement), __round_str(fActual-fPredicted), jac_cnsStr, discontStr, radius_str, mode)) if resultmode == "searching": resViol = max(norm(JActual_), np.abs(f-target)) if resViol <= resulttol: resultmode = "success" break else: break if f > maxFun: dispprint("-> iter {:3d}{}: !!!found NEW MAXIMUM for x_{} of {:6.3f} (+{:6.3f}) at {}!!!".format( i, ">" if forward else "<", index, f, f-fun0, flip(x))) maxFun = f if closeToDisc: if np.abs(xi - xiDisc) > minstep: closeToDisc = False H = hess(x) else: resultmode = "exceeded" except np.linalg.LinAlgError: resultmode = "linalg_error" exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_exception(exc_type, exc_value, exc_traceback, file=sys.stdout) except Exception: resultmode = "error" exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_exception(exc_type, exc_value, exc_traceback, file=sys.stdout) if resultmode == "success": status = 0 success = True elif resultmode == "discontinuous": status = 1 success = False elif resultmode == "unbounded": status = 2 success = True elif resultmode == "exceeded": success = False status = 3 elif resultmode == "error" or resultmode == "linalg_error": try: JActual_ = J_ except UnboundLocalError: JActual_ = jac(x)[considered] success=False if resultmode == "error": status = 5 else: status = 4 dispprint(op.OptimizeResult(x=flip(x), fun=f, jac=JActual_, success=success, status=status, nfev=fun.evals, njev=jac.evals, nhev=hess.evals, nit=i, message=STATUS_MESSAGES[status] )) if track_f: f_track.append(f) if track_x: x_track.append(flip(x)) return op.OptimizeResult(x=flip(x), fun=f, jac=JActual_, success=success, status=status, nfev=fun.evals, njev=jac.evals, nhev=hess.evals, nit=i, x_track=np.array(x_track), f_track=np.array(f_track), message=STATUS_MESSAGES[status] )
def __parallel_helper_fun(args): return args[0], args[3], find_CI_bound(*args[1:-1], **args[-1])
[docs]@inherit_doc(find_CI_bound) def find_CI(x0, fun, jac=None, hess=None, indices=None, directions=None, alpha=0.95, parallel=False, return_full_results=False, return_success=False, **kwargs): """Returns the profile likelihood confidence interval(s) for one or multiple parameters. Parameters ---------- indices : int[] Indices of the parameters of interest. If not given, all paramters will be considered. directions : float[][] Search directions. If not given, both end points of the confidence intervals will be determined. If given as a scalar, only lower end points will be returned if ``directions<=0`` and upper end points otherwise. If given as an array, the confidence interval end points specified in row ``i`` will be returned for parameter ``i``. Entries ``<=0`` indicate that lower end points are desired, whereas positive entries will result in upper end points. parallel : bool If ``True``, results will be computed in parallel using ``multiprocessing.Pool``. Note that this requires that all arguments are pickable. return_full_results : bool If ``True``, an ``OptimizeResult`` object will be returned for each confidence interval bound. Otherwise, only the confidence interval bounds for the parameters in question will be returned. return_success : bool If ``True``, an array of the same shape as the result will be returned in addition, indicating for each confidence interval bound whether it was determined successfully. **kwargs : keyword arguments Other keyword arguments passed to :py:meth:`find_CI_bound`. Look at the documentation there. """ x0 = np.asarray(x0) isScalarIndex = indices is not None and np.isscalar(indices) if indices is None: indices = np.arange(x0.size) elif isScalarIndex: indices = (int(indices),) else: indices = np.array(indices, dtype=int) isScalarDirection = directions is not None and np.isscalar(directions) if directions is None: directions = np.zeros((len(indices), 2)) directions[:,0] = -1 directions[:,1] = 1 elif isScalarDirection: directions = np.full((len(indices), 1), directions) elif type(directions) == np.ndarray: if directions.ndim == 1: directions = directions[:,None] else: directions = [[i] if not hasattr(i, "__iter__") else i for i in directions] if "fun0" not in kwargs: kwargs["fun0"] = fun(x0) if "jac0" not in kwargs: if jac is None: kwargs["jac0"] = nd.Gradient(fun)(x0) else: kwargs["jac0"] = jac(x0) if "hess0" not in kwargs: if hess is None: kwargs["hess0"] = nd.Hessian(fun)(x0) else: kwargs["hess0"] = hess(x0) task_chain = [] for i, index, directions_ in zip(itercount(), indices, directions): for direction in directions_: task_chain.append((i, x0, fun, index, direction, jac, hess, alpha, kwargs)) if parallel: with Pool() as pool: result_list = list(pool.map(__parallel_helper_fun, task_chain)) else: result_list = list(map(__parallel_helper_fun, task_chain)) result = [[] for _ in range(min(len(indices), len(directions)))] if return_success: success = [[] for _ in range(min(len(indices), len(directions)))] for i, index, res in result_list: if return_success: success[i].append(res.success) if return_full_results: result[i].append(res) else: result[i].append(res.x[index]) try: result = np.array(result) except ValueError: result = np.array(result, dtype=object) if isScalarDirection: result = result.ravel() if isScalarIndex: result = result[0] if return_success: try: success = np.array(success) except ValueError: success = np.array(success, dtype=object) if isScalarDirection: success = success.ravel() if isScalarIndex: success = success[0] return result, success else: return result
[docs]@inherit_doc(find_CI) def find_function_CI(x0, function, logL, functionJac = None, functionHess = None, logLJac = None, logLHess = None, relativeError = 1e-4, **kwargs): """Returns the profile likelihood confidence interval(s) for a function of parameters. Parameters ---------- function : callable Function of the parameters for which the confidence interval shall be computed logL : callable Log-likelihood function. functionJac : callable Gradient of :py:obj:`function`. If `None`, it will be computed based on finite differences using numdifftools. functionHess : callable Hessian of :py:obj:`function`. If `None`, it will be computed based on finite differences using numdifftools. logLJac : callable Gradient of :py:obj:`logL`. If `None`, it will be computed based on finite differences using numdifftools. logLHess : callable Hessian of :py:obj:`logL`. If `None`, it will be computed based on finite differences using numdifftools. relativeError : float Permitted relative error in the confidence interval bound. **kwargs : keyword arguments Other keyword arguments passed to :py:meth:`find_CI` and :py:meth:`find_CI_bound`. Look at the documentation there. """ function0 = function(x0) error = function0 * relativeError x0 = np.insert(x0, 0, function0) fun_2 = lambda x: 0.5 * ((function(x[1:])-x[0])/error)**2 fun = lambda x: logL(x[1:]) - fun_2(x) if functionJac is None and logLJac is None: jac = nd.Gradient(fun) else: if functionJac is None: jac_2 = nd.Gradient(fun_2) else: def jac_2(x): result = np.zeros(x0.size) diffTerm = (function(x[1:]) - x[0]) / error**2 result[0] = -diffTerm result[1:] = np.asarray(functionJac(x[1:])) * diffTerm return result if logLJac is None: logLJac = nd.Gradient(logL) def jac(x): result = -jac_2(x) result[1:] += np.asarray(logLJac(x[1:])) return result if functionHess is None: hess_2 = nd.Hessian(fun_2) else: if functionJac is None: functionJac = nd.Gradient(function) def hess_2(x): result = np.zeros((x0.size, x0.size)) functionJacX = np.asarray(functionJac(x[1:]) ) result[0, 0] = 1 / error**2 result[1:, 0] = result[0, 1:] = -functionJacX / error**2 result[1:, 1:] = (np.asarray(functionHess(x[1:])) * (function(x[1:]) - x[0]) + functionJacX * functionJacX[:,None]) / error**2 return result if logLHess is None: logLHess = nd.Hessian(logL) def hess(x): result = -hess_2(x) result[1:, 1:] += np.asarray(logLHess(x[1:])) return result # These arguments could lead to wrong results if passed on. # Hence, we ignore them. kwargs.pop("fun0", None) kwargs.pop("jac0", None) kwargs.pop("hess0", None) return find_CI(x0, fun, jac, hess, indices=0, **kwargs)
# Define aliases that adhere to the python naming conventions find_ci = find_CI find_function_ci = find_function_CI find_ci_bound = find_CI_bound