#!/usr/bin/env python3
# system modules
import textwrap
import collections
# internal modules
from . import equations
from . import utils
# external modules
import numpy as np
[docs]class NumericalScheme(utils.ReprObject,utils.LoggerObject):
"""
Base class for numerical schemes
Args:
description (str): short equation description
long_description (str): long equation description
equation (DerivativeEquation): the equation
fallback_max_timestep (single numeric): the fallback maximum
timestep if no timestep can be estimated from the equation
ignore_linear (bool): ignore the linear part of the equation?
ignore_independent (bool): ignore the variable-independent part of
the equation?
ignore_nonlinear (bool): ignore the nonlinear part of the equation?
"""
def __init__(self, description = None, long_description = None,
equation = None, fallback_max_timestep = None,
ignore_linear = None, ignore_independent = None,
ignore_nonlinear = None):
if not equation is None:
self.equation = equation
if not fallback_max_timestep is None:
self.fallback_max_timestep = fallback_max_timestep
if not description is None:
self.description = description
if not long_description is None:
self.long_description = long_description
if not ignore_linear is None:
self.ignore_linear = ignore_linear
if not ignore_independent is None:
self.ignore_independent = ignore_independent
if not ignore_nonlinear is None:
self.ignore_nonlinear = ignore_nonlinear
##################
### Properties ###
##################
@property
def description(self):
"""
The numerical scheme description
:type: :any:`str`
"""
try: self._description
except AttributeError: self._description = self._default_description
return self._description
@description.setter
def description(self, newdescription):
assert isinstance(newdescription, str), "description has to be str"
self._description = newdescription
@property
def _default_description(self):
"""
The default description if none was given
:type: :any:`str`
"""
return "a numerical scheme"
@property
def long_description(self):
"""
The longer numerical scheme description
:type: :any:`str`
"""
try: self._long_description
except AttributeError:
self._long_description = self._default_long_description
return self._long_description
@long_description.setter
def long_description(self, newlong_description):
assert isinstance(newlong_description, str), \
"long_description has to be str"
self._long_description = newlong_description
@property
def _default_long_description(self):
"""
The default longer numerical scheme description if none was given
:type: :any:`str`
"""
return "This is a numerical scheme to solve a derivative equation."
@property
def fallback_max_timestep(self):
"""
The numerical scheme's fallback maximum timestep
:type: :any:`float`
"""
try: self._fallback_max_timestep
except AttributeError:
self._fallback_max_timestep = self._default_fallback_max_timestep
return self._fallback_max_timestep
@fallback_max_timestep.setter
def fallback_max_timestep(self, newfallback_max_timestep):
assert utils.is_numeric(newfallback_max_timestep), \
"fallback_max_timestep has to be numeric"
assert np.asarray(newfallback_max_timestep).size == 1, \
"fallback_max_timestep has to be of size 1"
self._fallback_max_timestep = float(newfallback_max_timestep)
@property
def _default_fallback_max_timestep(self):
"""
The numerical scheme's default fallback maximum timestep
:type: :any:`float`
"""
return 1.0
@property
def equation(self):
"""
The equation the numerical scheme's should solve
:type: :any:`DerivativeEquation`
"""
try: self._equation
except AttributeError: self._equation = self._default_equation
return self._equation
@equation.setter
def equation(self, newequation):
assert isinstance(newequation, equations.DerivativeEquation), \
"equation has to be instance of subclass of DerivativeEquation"
self._equation = newequation
@property
def _default_equation(self):
"""
The default equation the numerical scheme's should solve if none was
given
:type: :any:`DerivativeEquation`
"""
return equations.DerivativeEquation()
@property
def ignore_linear(self):
"""
Should this numerical scheme ignore the equation's linear factor?
:type: :any:`bool`
"""
try: self._ignore_linear
except AttributeError: self._ignore_linear = self._default_ignore_linear
return self._ignore_linear
@ignore_linear.setter
def ignore_linear(self, newignore_linear):
self._ignore_linear = bool(newignore_linear)
@property
def _default_ignore_linear(self):
"""
Default behaviour for :any:`ignore_linear`
:type: :any:`bool`
"""
return False
@property
def ignore_independent(self):
"""
Should this numerical scheme ignore the equation's variable-independent
addend?
:type: :any:`bool`
"""
try: self._ignore_independent
except AttributeError:
self._ignore_independent = self._default_ignore_independent
return self._ignore_independent
@ignore_independent.setter
def ignore_independent(self, newignore_independent):
self._ignore_independent = bool(newignore_independent)
@property
def _default_ignore_independent(self):
"""
Default behaviour for :any:`ignore_independent`
:type: :any:`bool`
"""
return False
@property
def ignore_nonlinear(self):
"""
Should this numerical scheme ignore the equation's nonlinear addend?
:type: :any:`bool`
"""
try: self._ignore_nonlinear
except AttributeError:
self._ignore_nonlinear = self._default_ignore_nonlinear
return self._ignore_nonlinear
@ignore_nonlinear.setter
def ignore_nonlinear(self, newignore_nonlinear):
self._ignore_nonlinear = bool(newignore_nonlinear)
@property
def _default_ignore_nonlinear(self):
"""
Default behaviour for :any:`ignore_nonlinear`
:type: :any:`bool`
"""
return False
###############
### Methods ###
###############
@property
def max_timestep(self, time = None, variablevalue = None):
"""
Return a maximum timestep for the current state. First tries the
:any:`max_timestep_estimate`, then the :any:`fallback_max_timestep`.
Args:
time (single numeric, optional): the time to calculate the
derivative. Defaults to the variable's current (last) time.
variablevalue (numpy.ndarray, optional): the variable vaulue to use.
Defaults to the value of self.variable at the given time.
Returns:
float : an estimate of the current maximum timestep
"""
try:
try: # try an estimate
ts = self.max_timestep_estimate(
time=time, variablevalue=variablevalue) # try estimate
except: # estimating didn't work
ts = self.fallback_max_timestep # try fallback
assert utils.is_numeric(ts)
assert np.asarray(ts).size == 1
except: # neither estimate nor fallback were sensible
ts = 1 # default
raise
return ts
[docs] def max_timestep_estimate(self, time = None, variablevalue = None):
"""
Based on this numerical scheme and the equation parts, estimate a
maximum timestep. Subclasses may override this.
Args:
time (single numeric, optional): the time to calculate the
derivative. Defaults to the variable's current (last) time.
variablevalue (numpy.ndarray, optional): the variable vaulue to use.
Defaults to the value of self.variable at the given time.
Returns:
single numeric or bogus: an estimate of the current
maximum timestep. Definitely check the result for integrity.
Raises:
Exception : any exception if something goes wrong
"""
# equation + state -> timestep estimate functions
def smaller_than_time_constant(time = None, variablevalue = None):
eq = self.equation
nonlin = eq.nonlinear_addend(time=time,variablevalue=variablevalue)
assert nonlin == 0, "not a linear equation"
lin = eq.linear_factor(time = time)
assert lin < 0, "not a decay equation"
tau = abs(1 / lin) # time constant
return 0.01 * tau # timestep must be smaller than time constant
# fallback function
def nothing(*args,**kwargs):
raise Exception
# mapping of schemes to functions
schemes = {
EulerExplicit: smaller_than_time_constant,
EulerImplicit: smaller_than_time_constant,
RungeKutta4: smaller_than_time_constant, # TODO correct?
}
fun = schemes.get(self.__class__, nothing ) # get the function
timestep = fun(time = time, variablevalue = variablevalue) # call it
return timestep # return
[docs] def needed_timesteps(self, timestep):
"""
Given a timestep to integrate from now on, what other timesteps of the
dependencies are needed?
Args:
timestep (single numeric): the timestep to calculate
Returns:
numpy.ndarray : the timesteps
Note:
timestep 0 means the current time
"""
assert utils.is_numeric(timestep), "timestep needs to be numeric"
ts = np.asarray(timestep)
assert ts.size == 1, "timestep needs to be one single value"
return self._needed_timesteps_for_integration_step(timestep = ts)
[docs] def _needed_timesteps_for_integration_step(self, timestep = None):
"""
Given a timestep to integrate from now on, what other timesteps of the
dependencies are needed?
Args:
timestep (single numeric): the timestep to calculate
Returns:
numpy.ndarray : the timesteps
Note:
timestep 0 means the current time
"""
raise NotImplementedError("Subclasses should override this")
[docs] def linear_factor(self, time = None):
"""
Calculate the equation's linear factor in front of the variable.
Args:
time (single numeric, optional): the time to calculate the
derivative. Defaults to the variable's current (last) time.
Returns:
numeric : the linear factor or 0 if :any:`ignore_linear` is
``True``.
"""
if self.ignore_linear:
return 0
else:
return self.equation.linear_factor( time = time )
[docs] def independent_addend(self, time = None):
"""
Calculate the equation's addend part that is independent of the
variable.
Args:
time (single numeric, optional): the time to calculate the
derivative. Defaults to the variable's current (last) time.
Returns:
numeric : the independent addend or 0 if :any:`ignore_independent`
is ``True``.
"""
if self.ignore_independent:
return 0
else:
return self.equation.independent_addend( time = time )
[docs] def nonlinear_addend(self, time = None, variablevalue = None):
"""
Calculate the derivative's addend part that is nonlinearly dependent of
the variable.
Args:
time (single numeric, optional): the time to calculate the
derivative. Defaults to the variable's current (last) time.
variablevalue (numpy.ndarray, optional): the variable vaulue to use.
Defaults to the value of self.variable at the given time.
Returns:
res (numeric): the nonlinear addend or 0 if ignore_nonlinear is
True.
"""
if self.ignore_nonlinear:
return 0
else:
return self.equation.nonlinear_addend(
time = time, variablevalue = variablevalue)
[docs] def integrate(self, time = None, until = None):
""" Integrate until a certain time, respecting the :any:`max_timestep`.
Args:
time (single numeric, optional): The time to begin. Defaults to
current variable :any:`time`.
until (single numeric, optional): The time to integrate until.
Defaults to one :any:`max_timestep` further.
"""
assert utils.is_numeric(until), "until needs to be numeric"
if time is None: time = self.equation.variable.time
current_max_timestep = self.max_timestep
self.logger.debug("current maximum timestep is {}".format(
current_max_timestep))
if until is None: until = time + current_max_timestep
self.logger.debug("integrating until time {}".format(until))
time_now = time
while time_now < until:
self.logger.debug(("current time {} is smaller than until time {}"
).format(time_now,until))
time_left = until - time_now
if time_left > current_max_timestep: # full max_timestep fits
timestep = current_max_timestep
else:
timestep = time_left
self.logger.debug(("integrate one step from time {} with "
"timestep {}").format(time_now,timestep))
# integrate one step
self.integrate_step( time = time_now, timestep = timestep )
time_now += timestep
self.logger.debug(("reached until time {}").format(time_now))
[docs] def integrate_step(self, time = None, timestep = None):
"""
Integrate "timestep" forward and set results in-place
Args:
time (single numeric, optional): The time to calculate the step
FROM. Defaults to the current variable time.
timestep (single numeric, optional): The timestep to calculate the
step. Defaults to :any:`max_timestep`.
"""
var = self.equation.variable
if timestep is None: timestep = self.max_timestep
if time is None: time = var.time
var.next_time = time + timestep # this is the next time
tend = self.step( # integrate one timestep
time = time, timestep = timestep, tendency = True )
# self.logger.debug("{} tendency: {}".format(var.id,tend))
# self.logger.debug("var() = {}".format(var()))
# self.logger.debug("var({}) = {}".format(time,var(time)))
# self.logger.debug("var.time = {}".format(var.time))
# self.logger.debug("var(var.time) = {}".format(var(var.time)))
new = var(time) + tend
var.value = new # save value
var.next_time = None # unset next_time
[docs] def step(self, time, timestep, tendency=True):
"""
Integrate one "timestep" from "time" forward and return value
Args:
time (single numeric): The time to calculate the step FROM
timestep (single numeric): The timestep to calculate the step
tendency (bool, optional): return the tendency or the actual value
of the variable after the timestep?
Returns:
numpy.ndarray : The resulting variable value or tendency
"""
raise NotImplementedError("Subclasses should override this")
[docs] def __str__(self): # pragma: no cover
"""
Stringification
Returns:
str : a summary
"""
string = (
" \"{description}\" \n"
"{long_description}\n"
"maximum timestep: {max_timestep} \n"
"[numerical scheme for equation:] \n"
"{equation}\n"
).format(description=self.description,
equation = textwrap.indent(text=str(self.equation),prefix="> "),
max_timestep = self.max_timestep,
variable=self.equation.variable.id,
long_description = self.long_description)
return string
[docs]class EulerExplicit(NumericalScheme):
"""
Euler-explicit numerical scheme
"""
@property
def _default_description(self):
return "Euler-explicit scheme"
@property
def _default_long_description(self):
return "This is a Euler-explicit scheme to solve a derivative equation."
[docs] def step(self, time = None, timestep = None, tendency = True):
if timestep is None: timestep = self.max_timestep
v = self.equation.variable
# get equation parts
linear = self.linear_factor( time = time )
indep = self.independent_addend( time = time )
nonlin = self.nonlinear_addend( time = time )
cur = v( time )
# explicit scheme
tend = timestep * ( linear * cur + indep + nonlin )
if tendency: # only tendency desired
res = tend
else: # value desired
res = cur + tend
return res
def _needed_timesteps_for_integration_step(self, timestep = None):
return np.array([0]) # only current time needed
[docs]class EulerImplicit(NumericalScheme):
"""
Euler-implicit numerical scheme
"""
@property
def _default_description(self):
return "Euler-implicit scheme"
@property
def _default_long_description(self):
return "This is a Euler-implicit scheme to solve a derivative equation."
[docs] def step(self, time = None, timestep = None, tendency = True):
"""
Integrate one "timestep" from "time" forward with the Euler-implicit
scheme and return the resulting variable value.
Args:
time (single numeric): The time to calculate the step FROM
timestep (single numeric): The timestep to calculate the step
tendency (bool, optional): return the tendency or the actual value
of the variable after the timestep?
Returns:
numpy.ndarray : The resulting variable value or tendency
Raises:
AssertionError : when the equation's nonlinear part is not zero and
:any:`ignore_nonlinear` is not set to ``True``
"""
if timestep is None: timestep = self.max_timestep
v = self.equation.variable
# get equation parts
linear = self.linear_factor( time = time )
indep = self.independent_addend( time = time )
nonlin = self.nonlinear_addend( time = time )
assert np.all(nonlin == 0), ("nonlinear part of equation is not "
"zero! Cannot integrate equation with implicit scheme. Set "
"ignore_nonlinear to ignore nonlinear part.")
# implicit scheme
new = ( indep * timestep + v( time ) ) / ( 1 - linear * timestep )
if tendency: # tendency desired
res = new - v( time ) # only tendency
else: # new value desired
res = new
return res
def _needed_timesteps_for_integration_step(self, timestep):
return np.array([0,1]) * timestep # current time and timestep needed
[docs]class LeapFrog(NumericalScheme):
"""
Leap-Frog numerical scheme
"""
@property
def _default_description(self):
return "Leap-Frog scheme"
@property
def _default_long_description(self):
return "This is a Leap-Frog scheme to solve a derivative equation."
[docs] def step(self, time = None, timestep = None, tendency = True):
if timestep is None: timestep = self.max_timestep
v = self.equation.variable
if time is None: time = v.time
# get equation parts
linear = self.linear_factor( time = time )
indep = self.independent_addend( time = time )
nonlin = self.nonlinear_addend( time = time )
# previous value
prev = v( time - timestep )
cur = v( time )
# leap-frog scheme
new = prev + 2 * timestep * ( linear * cur + indep + nonlin )
if tendency: # tendency desired
res = new - cur # only tendency
else: # new value desired
res = new
return res
def _needed_timesteps_for_integration_step(self, timestep):
return np.array([-1,0]) * timestep # prev time and current time needed
[docs]class RungeKutta4(NumericalScheme):
"""
Runte-Kutta-4 numerical scheme
"""
@property
def _default_description(self):
return "Runge-Kutta-4 scheme"
@property
def _default_long_description(self):
return ("This is a Runge-Kutta-4th-order scheme to solve a "
"derivative equation.")
[docs] def step(self, time = None, timestep = None, tendency = True):
if timestep is None: timestep = self.max_timestep
v = self.equation.variable
if time is None: time = v.time
# get equation parts
linear = self.linear_factor( time = time )
indep = self.independent_addend( time = time )
nonlin = self.nonlinear_addend( time = time )
half_time = time + timestep / 2
next_time = time + timestep
cur = v( time )
def F(): return linear * cur + indep + nonlin
# first part
k1 = timestep * F()
# second part
linear = self.linear_factor( time = half_time )
indep = self.independent_addend( time = half_time )
nonlin = self.nonlinear_addend(
time = half_time, variablevalue = cur + k1 / 2 )
k2 = timestep * F()
nonlin = self.nonlinear_addend(
time = half_time, variablevalue = cur + k2 / 2 )
k3 = timestep * F()
linear = self.linear_factor( time = next_time )
indep = self.independent_addend( time = next_time )
nonlin = self.nonlinear_addend(
time = next_time, variablevalue = cur + k3 )
k4 = timestep * F()
tend = ( k1 + 2 * k2 + 2 * k3 + k4 ) / 6
if tendency: # tendency desired
res = tend # only tendency
else: # new value desired
res = cur + tend # add tendency
return res
def _needed_timesteps_for_integration_step(self, timestep):
return np.array([0,0.5,1]) * timestep # current time, half and full
###############################
### Sets of NumericalScheme ###
###############################
[docs]class SetOfNumericalSchemes(utils.SetOfObjects):
"""
Base class for sets of NumericalSchemes
Args:
elements (:any:`list` of :any:`NumericalScheme`, optional): the the
numerical schemes
fallback_plan (list, optional): the fallback plan if automatic planning
fails. Depending on the combination of numerical scheme and equations,
a certain order or solving the equations is crucial. For some
cases, the order can be determined automatically, but if that
fails, one has to provide this information by hand.
Has to be a :any:`list` of ``[varname, [timestep1,timestep2,...]]``
pairs.
varname:
the name of the equation variable. Obviously there has to be at
least one entry in the list for each equation.
timestepN:
the normed timesteps (betw. 0 and 1) to calculate. Normed
means, that if it is requested to integrate the set of numerical
equations by an overall timestep, what percentages of this
timestep have to be available of this variable. E.g. an overall
timestep of 10 is requested. Another equation needs this
variable at the timesteps 2 and 8. Then the timesteps would be
[0.2,0.8]. Obviously, the equations that looks farest into the
future (e.g. Runge-Kutta or Euler-Implicit) has to be last in
this fallback_plan list.
"""
def __init__(self, elements = [], fallback_plan = None):
utils.SetOfObjects.__init__(self, # call SetOfObjects constructor
elements = elements,
element_type = NumericalScheme, # only NumericalScheme is allowed
)
# set properties
if fallback_plan is None:
self.fallback_plan = self._default_fallback_plan
else: self.fallback_plan = fallback_plan
##################
### Properties ###
##################
@property
def plan(self):
"""
The unified plan for this set of numerical schemes. First try to
determine the plan automatically, if that fails, use the
:any:`fallback_plan`.
:type: :any:`list`
"""
try: # try automatic planning
plan = self._automatic_plan
except: # automatic planning didn't work
plan = self.fallback_plan # use fallback
return plan
@property
def fallback_plan(self):
"""
The fallback plan if automatic plan determination does not work
:type: :any:`list`
"""
try: self._fallback_plan
except AttributeError: self._fallback_plan = self._default_fallback_plan
return self._fallback_plan
@fallback_plan.setter
def fallback_plan(self, newfallback_plan):
try:
variables = { p[0] for p in newfallback_plan }
assert all( v in variables for v in self.keys() ), \
"not all equations present in plan"
timesteps = [ np.asarray(p[1]) for p in newfallback_plan ]
assert all( np.all( np.logical_and(0<=ts,ts<=1) ) \
for ts in timesteps), \
"timesteps outside (0,1) requested in plan"
except:
raise ValueError("wrong scheme plan format")
self._fallback_plan = newfallback_plan
@property
def _default_fallback_plan(self):
"""
The default fallback plan if none was given
"""
# stupidest plan: solve equations in alphabetical order
plan = [ [var,[1]] for var in sorted(self.keys()) ]
return plan
@property
def _automatic_plan(self):
"""
Try to determine the scheme plan based on the equations and the
numerical schemes.
:type: :any:`list`
.. todo::
Implementing this is definitely possible. All needed information is
accessible:
- the equation's dependencies
- the numerical scheme's needed timesteps
"""
# TODO: This definitely has to be implemented for convenience
raise NotImplementedError("Automatic planning is definitely possible, "
"but not yet implemented")
###############
### Methods ###
###############
[docs] def _object_to_key(self, obj):
""" key transformation function.
Args:
obj (object): the element
Returns:
key (str): the unique key for this object. The equation's variable's
id is used.
"""
return obj.equation.variable.id
[docs] def integrate(self, start_time, final_time):
""" Integrate the model until final_time
Args:
start_time (float): the starting time
final_time (float): time to integrate until
"""
self.logger.info("start integration")
current_time = start_time
while current_time < final_time:
self.logger.debug("current time {} is smaller than "
"final time {}".format(current_time, final_time))
# timestep of most dependent equation
biggest_timestep = self[self.plan[-1][0]].max_timestep
self.logger.debug("timestep of last scheme: {}".format(
biggest_timestep))
run_time_left = final_time - current_time
if run_time_left > biggest_timestep:
big_timestep = biggest_timestep
else:
big_timestep = run_time_left
for plan_step in self.plan:
scheme_time = current_time
varname = plan_step[0] # variable name
scheme = self[varname] # get scheme
timesteps = np.asarray( plan_step[1] ) * big_timestep
# self.logger.debug("timesteps: {}".format(timesteps))
for ts in timesteps: # loop over all timesteps
until_time = current_time + ts
self.logger.debug(
("integrate scheme '{}' for equation '{}' until time {}"
).format( scheme.description,
scheme.equation.description, until_time))
scheme.integrate(
time = scheme_time,
until = until_time)
scheme_time = current_time + ts
current_time = current_time + big_timestep
self.logger.info("end of integration")