Source code for swarmsim.Integrators.euler_maruyama
import numpy as np
from swarmsim.Integrators import Integrator
[docs]
class EulerMaruyamaIntegrator(Integrator):
"""
Euler-Maruyama numerical integrator for stochastic differential equations.
This integrator implements the Euler-Maruyama scheme for solving stochastic differential
equations (SDEs) governing multi-agent dynamics. It handles both scalar and matrix-valued
diffusion terms, making it suitable for complex stochastic systems with correlated noise.
The integration scheme follows:
.. math::
x_{n+1} = x_n + f(x_n, t_n) \\Delta t + g(x_n, t_n) \\sqrt{\\Delta t} \\, \\xi_n
where:
- :math:`x_n` is the state at time step n
- :math:`f(x_n, t_n)` is the drift term (deterministic dynamics)
- :math:`g(x_n, t_n)` is the diffusion term (noise amplitude)
- :math:`\\xi_n` is standard Gaussian white noise
- :math:`\\Delta t` is the integration timestep
Parameters
----------
config_path : str
Path to the YAML configuration file containing integration parameters.
Attributes
----------
dt : float
Integration timestep, inherited from the Integrator base class.
Config Requirements
-------------------
The YAML configuration file must contain the following parameters under the integrator section:
dt : float, optional
Integration timestep for the Euler-Maruyama scheme. Default is ``0.01``.
Smaller timesteps improve accuracy but increase computational cost.
Notes
-----
**Diffusion Term Handling:**
The integrator automatically detects the dimensionality of the diffusion term:
- **Scalar/Vector Diffusion** (shape ``(N, d)``): Element-wise multiplication with noise
- **Matrix Diffusion** (shape ``(N, d, d)``): Matrix-vector multiplication for correlated noise
**State Constraints:**
Agent states are automatically clipped to respect population limits after each integration step.
**Numerical Stability:**
For strong convergence, the timestep should satisfy stability conditions specific to
the drift and diffusion terms. Generally, smaller timesteps are required for systems
with large diffusion coefficients.
Raises
------
FileNotFoundError
If the configuration file is not found.
KeyError
If required integration parameters are missing in the configuration file.
AttributeError
If population objects lack required methods or attributes.
Examples
--------
**Configuration Example:**
.. code-block:: yaml
integrator:
dt: 0.01 # Small timestep for good accuracy
This will set ``dt = 0.05`` as the timestep value for numerical integration.
"""
def __init__(self, config_path):
"""
Initializes the Euler-Maruyama Integrator with configuration parameters from a YAML file.
Parameters
----------
config_path : str
Path to the YAML configuration file.
"""
# Initialize the parent `Integrator` class, which loads configuration parameters
super().__init__(config_path)
[docs]
def step(self, populations):
"""
Perform a single Euler-Maruyama integration step for all populations.
This method updates the state of each population by applying the Euler-Maruyama
scheme with automatic handling of scalar and matrix diffusion terms. The state
update follows the discrete SDE formula with proper noise scaling.
Parameters
----------
populations : list of Population
List of population objects to integrate. Each population must provide:
- ``x`` (np.ndarray): Current state array of shape ``(N, d)``
- ``get_drift()`` method: Returns drift term of shape ``(N, d)``
- ``get_diffusion()`` method: Returns diffusion term of shape ``(N, d)`` or ``(N, d, d)``
- ``lim_i`` (np.ndarray): Lower state bounds for clipping
- ``lim_s`` (np.ndarray): Upper state bounds for clipping
Notes
-----
**Integration Algorithm:**
For each population, the state update is:
1. **Drift Computation**: Get deterministic dynamics :math:`f(x,t)`
2. **Diffusion Computation**: Get noise amplitude :math:`g(x,t)`
3. **Noise Generation**: Sample :math:`\\xi \\sim \\mathcal{N}(0,I)`
4. **State Update**: Apply Euler-Maruyama formula
5. **Constraint Enforcement**: Clip states to population limits
**Diffusion Term Handling:**
- **Vector Diffusion** (shape ``(N, d)``): ``noise_term = diffusion * noise``
- **Matrix Diffusion** (shape ``(N, d, d)``): ``noise_term = diffusion @ noise``
**State Constraints:**
After integration, all agent states are clipped to respect the population's
spatial or behavioral limits defined by ``lim_i`` and ``lim_s``.
Raises
------
AttributeError
If any population object lacks required methods (``get_drift``, ``get_diffusion``)
or attributes (``x``, ``lim_i``, ``lim_s``).
ValueError
If diffusion term shape is incompatible with noise or state dimensions.
"""
for population in populations:
# Compute the drift and diffusion terms
drift = population.get_drift()
diffusion = population.get_diffusion()
# Generate random noise (standard normal distribution)
noise = np.random.normal(0, 1, size=population.x.shape)
# Update the state using the Euler-Maruyama method
if np.ndim(diffusion) == 3: # diffusion is a matrix (e.g. shape (N, d, d))
noise_term = np.matmul(diffusion, noise[..., np.newaxis]).squeeze(-1)
else:
noise_term = diffusion * noise
population.x = population.x + drift * self.dt + noise_term * np.sqrt(self.dt)
population.x = np.clip(population.x, population.lim_i, population.lim_s)