Source code for swarmsim.Simulators.base_simulator

from swarmsim.Utils import load_config
import progressbar
import numpy as np

from swarmsim.Environments import Environment
from swarmsim.Populations import Population
from swarmsim.Interactions import Interaction
from swarmsim.Controllers import Controller
from swarmsim.Integrators import Integrator
from swarmsim.Loggers import Logger
from swarmsim.Renderers import Renderer


[docs] class Simulator: """ Main simulation engine that orchestrates multi-agent system simulations. The Simulator class coordinates all components of a multi-agent system including populations, environment, interactions, controllers, integrators, loggers, and renderers. It manages the simulation loop and ensures proper execution order of all simulation steps. Parameters ---------- config_path : str Path to the YAML configuration file containing simulation parameters. populations : list of Population List of agent populations to simulate. environment : Environment The environment in which agents operate. integrator : Integrator Numerical integration scheme for updating agent states. interactions : list of Interaction, optional List of interaction models between agents. Default is None. controllers : list of Controller, optional List of controllers that influence agent behavior. Default is None. logger : Logger, optional Logger instance for recording simulation data. Default is None. renderer : Renderer, optional Renderer instance for visualization. Default is None. Attributes ---------- dt : float Simulation timestep, inherited from the integrator. T : float Total simulation time duration. populations : list of Population List of agent populations in the simulation. environment : Environment The simulation environment. interactions : list of Interaction or None List of interaction models. controllers : list of Controller or None List of control strategies. logger : Logger or None Data logger instance. renderer : Renderer or None Visualization renderer. integrator : Integrator Numerical integration scheme. Config Requirements ------------------- The YAML configuration file must contain the following parameters under the simulator section: T : float, optional Total simulation time. Default is ``10.0``. dt : float, optional Simulation timestep. Default is ``0.01``. Notes ----- The simulation loop executes the following steps in order: 1. **Logging**: Record current simulation state 2. **Rendering**: Visualize current state (if renderer is provided) 3. **Control**: Computes control actions (if controllers are provided) 4. **Interactions**: Computes inter-agent forces (if interactions are provided) 5. **Integration**: Update agent states using numerical integration 6. **Reset Forces**: Clear interaction forces for next timestep 7. **Environment Update**: Update environment state Examples -------- Basic simulation setup: .. code-block:: python from swarmsim import BrownianMotion, Simulator from swarmsim.Environments import EmptyEnvironment from swarmsim.Integrators import EulerMaruyama # Create components population = BrownianMotion('config.yaml') environment = EmptyEnvironment('config.yaml') integrator = EulerMaruyama('config.yaml') # Create and run simulator simulator = Simulator( config_path='config.yaml', populations=[population], environment=environment, integrator=integrator ) simulator.simulate() Example YAML configuration: .. code-block:: yaml simulator: T: 50.0 # Run simulation for 50 time units """ def __init__(self, config_path: str, populations: list[Population], environment: Environment, integrator: Integrator, interactions: list[Interaction] | None =None, controllers: list[Controller] | None =None, logger: Logger | None =None, renderer: Renderer | None =None, ) -> None: """ Initialize the Simulator with all required and optional components. Parameters ---------- config_path : str Path to the YAML configuration file containing simulation parameters. populations : list of Population List of agent populations to include in the simulation. environment : Environment The environment where agents operate. integrator : Integrator Numerical integration scheme for state evolution. interactions : list of Interaction, optional Inter-agent interaction models. Default is None. controllers : list of Controller, optional Control strategies for influencing agent behavior. Default is None. logger : Logger, optional Data recording component. Default is None. renderer : Renderer, optional Visualization component. Default is None. Raises ------ FileNotFoundError If the configuration file cannot be found. """ config: dict = load_config(config_path) simulator_config: dict = config.get('simulator', {}) self.dt: float = integrator.dt self.T: float = simulator_config.get('T', 10) # get parameters from initialization self.populations: list[Population] = populations self.environment: Environment = environment self.interactions: list[Interaction] | None = interactions self.controllers: list[Controller] | None = controllers self.logger: Logger | None = logger self.renderer: Renderer | None = renderer self.integrator: Integrator = integrator
[docs] def simulate(self): """ Execute the main simulation loop. This method runs the complete simulation from t=0 to t=T, coordinating all simulation components in the proper sequence. It includes progress tracking and handles early termination if requested by the logger. The simulation loop performs the following steps at each timestep: 1. **Data Logging**: Record current simulation state and check for termination 2. **Visualization**: Render current state if renderer is available 3. **Control Actions**: Apply control inputs from all controllers 4. **Agent Interactions**: Compute forces between agents 5. **State Integration**: Update agent positions using the integrator 6. **Force Reset**: Clear interaction forces for the next timestep 7. **Environment Update**: Update environmental conditions Notes ----- - Progress is displayed using a progress bar - Controllers operate at their specified sampling rates - Interaction forces are reset at each timestep - Early termination is possible if the logger returns a done flag - All populations are reset to initial conditions before simulation starts Raises ------ RuntimeError If any component fails during simulation execution. """ num_steps = int(self.T / self.dt) # Calculate the number of steps as an integer bar = progressbar.ProgressBar( max_value=num_steps, widgets=[ 'Processing: ', # Custom description progressbar.Percentage(), ' ', progressbar.Bar(marker='=', left='[', right=']'), ' ', progressbar.ETA() ] ) self.logger.reset() for population in self.populations: population.reset() if self.interactions is not None: for interaction in self.interactions: interaction.reset() for t in range(num_steps): #print(t) done = self.logger.log() # Log and get done flag # Render the Scene if a renderer is defined if self.renderer is not None: self.renderer.render() # Implement the control actions if self.controllers is not None: for c in self.controllers: if c.dt is None or (t % (round(c.dt / self.dt))) == 0: c.population.u = c.get_action() # Compute the interactions between the agents if self.interactions is not None: for interact in self.interactions: interact.target_population.f += interact.get_interaction() # Update the state of the agents self.integrator.step(self.populations) # Reset interaction forces for population in self.populations: population.f = np.zeros([population.N, population.input_dim]) # Update the environment self.environment.update() # Execute every N steps bar.update(t) if done: # Truncate early the simulation print('\nSimulation truncated') break self.logger.close() if self.renderer is not None: self.renderer.render()