You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
soil/soil/simulation.py

219 lines
7.5 KiB
Python

import os
2 years ago
from time import time as current_time, strftime
import importlib
7 years ago
import sys
import yaml
import traceback
import logging
import networkx as nx
2 years ago
from networkx.readwrite import json_graph
from multiprocessing import Pool
from functools import partial
import pickle
5 years ago
from . import serialization, utils, basestring, agents
6 years ago
from .environment import Environment
5 years ago
from .utils import logger
from .exporters import default
from .stats import defaultStats
2 years ago
from .config import Config, convert_old
2 years ago
#TODO: change documentation for simulation
class Simulation:
"""
6 years ago
Parameters
---------
2 years ago
config (optional): :class:`config.Config`
6 years ago
name of the Simulation
2 years ago
kwargs: parameters to use to initialize a new configuration, if one has not been provided.
"""
2 years ago
def __init__(self, config=None,
**kwargs):
if kwargs:
2 years ago
cfg = {}
if config:
cfg.update(config.dict(include_defaults=False))
cfg.update(kwargs)
config = Config(**cfg)
if not config:
raise ValueError("You need to specify a simulation configuration")
2 years ago
self.config = config
2 years ago
2 years ago
@property
def name(self) -> str:
2 years ago
return self.config.general.id
def run_simulation(self, *args, **kwargs):
return self.run(*args, **kwargs)
def run(self, *args, **kwargs):
5 years ago
'''Run the simulation and return the list of resulting environments'''
return list(self.run_gen(*args, **kwargs))
5 years ago
2 years ago
def _run_sync_or_async(self, parallel=False, **kwargs):
if parallel and not os.environ.get('SENPY_DEBUG', None):
5 years ago
p = Pool()
2 years ago
func = partial(self.run_trial_exceptions, **kwargs)
2 years ago
for i in p.imap_unordered(func, range(self.config.general.num_trials)):
5 years ago
if isinstance(i, Exception):
5 years ago
logger.error('Trial failed:\n\t%s', i.message)
5 years ago
continue
yield i
else:
2 years ago
for i in range(self.config.general.num_trials):
yield self.run_trial(trial_id=i,
5 years ago
**kwargs)
2 years ago
def run_gen(self, parallel=False, dry_run=False,
exporters=[default, ], stats=[], outdir=None, exporter_params={},
stats_params={}, log_level=None,
**kwargs):
'''Run the simulation and yield the resulting environments.'''
if log_level:
logger.setLevel(log_level)
5 years ago
logger.info('Using exporters: %s', exporters or [])
logger.info('Output directory: %s', outdir)
exporters = serialization.deserialize_all(exporters,
simulation=self,
known_modules=['soil.exporters',],
dry_run=dry_run,
outdir=outdir,
**exporter_params)
stats = serialization.deserialize_all(simulation=self,
2 years ago
names=stats,
known_modules=['soil.stats',],
**stats_params)
5 years ago
2 years ago
with utils.timer('simulation {}'.format(self.config.general.id)):
for stat in stats:
2 years ago
stat.sim_start()
5 years ago
for exporter in exporters:
exporter.start()
2 years ago
2 years ago
for env in self._run_sync_or_async(parallel=parallel,
log_level=log_level,
5 years ago
**kwargs):
2 years ago
for exporter in exporters:
exporter.trial_start(env)
collected = list(stat.trial_end(env) for stat in stats)
2 years ago
saved = self._update_stats(collected, t_step=env.now, trial_id=env.name)
5 years ago
for exporter in exporters:
2 years ago
exporter.trial_end(env, saved)
5 years ago
yield env
collected = list(stat.end() for stat in stats)
2 years ago
saved = self._update_stats(collected)
5 years ago
for exporter in exporters:
2 years ago
exporter.sim_end(saved)
2 years ago
def _update_stats(self, collection, **kwargs):
stats = dict(kwargs)
for stat in collection:
stats.update(stat)
return stats
def log_stats(self, stats):
logger.info('Stats: \n{}'.format(yaml.dump(stats, default_flow_style=False)))
def get_env(self, trial_id=0, **kwargs):
5 years ago
'''Create an environment for a trial of the simulation'''
5 years ago
opts = self.environment_params.copy()
opts.update({
'name': '{}_trial_{}'.format(self.name, trial_id),
'topology': self.topology.copy(),
'network_params': self.network_params,
'seed': '{}_trial_{}'.format(self.seed, trial_id),
'initial_time': 0,
'interval': self.interval,
'network_agents': self.network_agents,
'initial_time': 0,
'states': self.states,
'dir_path': self.dir_path,
'default_state': self.default_state,
2 years ago
'history': bool(self._history),
'environment_agents': self.environment_agents,
})
opts.update(kwargs)
5 years ago
env = self.environment_class(**opts)
return env
2 years ago
def run_trial(self, trial_id=None, until=None, log_level=logging.INFO, **opts):
"""
Run a single trial of the simulation
"""
2 years ago
trial_id = trial_id if trial_id is not None else current_time()
if log_level:
logger.setLevel(log_level)
# Set-up trial environment and graph
2 years ago
until = until or self.config.general.max_time
2 years ago
env = Environment.from_config(self.config, trial_id=trial_id)
# Set up agents on nodes
2 years ago
with utils.timer('Simulation {} trial {}'.format(self.config.general.id, trial_id)):
env.run(until)
5 years ago
return env
def run_trial_exceptions(self, *args, **kwargs):
'''
A wrapper for run_trial that catches exceptions and returns them.
It is meant for async simulations
'''
try:
return self.run_trial(*args, **kwargs)
except Exception as ex:
if ex.__cause__ is not None:
ex = ex.__cause__
ex.message = ''.join(traceback.format_exception(type(ex), ex, ex.__traceback__)[:])
return ex
5 years ago
def all_from_config(config):
configs = list(serialization.load_config(config))
for config, _ in configs:
sim = Simulation(**config)
yield sim
def from_config(conf_or_path):
config = list(serialization.load_config(conf_or_path))
if len(config) > 1:
raise AttributeError('Provide only one configuration')
config = config[0][0]
6 years ago
sim = Simulation(**config)
return sim
2 years ago
def from_old_config(conf_or_path):
config = list(serialization.load_config(conf_or_path))
if len(config) > 1:
raise AttributeError('Provide only one configuration')
config = convert_old(config[0][0])
return Simulation(config)
5 years ago
def run_from_config(*configs, **kwargs):
for config_def in configs:
# logger.info("Found {} config(s)".format(len(ls)))
5 years ago
for config, path in serialization.load_config(config_def):
2 years ago
name = config.general.id
7 years ago
logger.info("Using config(s): {name}".format(name=name))
2 years ago
dir_path = config.general.dir_path or os.path.dirname(path)
5 years ago
sim = Simulation(dir_path=dir_path,
**config)
sim.run_simulation(**kwargs)