mirror of
https://github.com/gsi-upm/soil
synced 2025-09-14 20:22:22 +00:00
Multithreading needs pickling to work. Pickling/unpickling didn't work in some situations, like when the environment_agents parameter was left blank. This was due to two reasons: 1) agents and history didn't have a setstate method, and some of their attributes cannot be pickled (generators, sqlite connection) 2) the environment was adding generators (agents) to its state. This fixes the situation by restricting the keys that the environment exports when it pickles, and by adding the set/getstate methods in agents. The resulting pickles should contain enough information to inspect them (history, state values, etc), but very limited.
290 lines
12 KiB
Python
290 lines
12 KiB
Python
import os
|
|
import time
|
|
import importlib
|
|
import sys
|
|
import yaml
|
|
import traceback
|
|
import networkx as nx
|
|
from networkx.readwrite import json_graph
|
|
from multiprocessing import Pool
|
|
from functools import partial
|
|
|
|
import pickle
|
|
|
|
from nxsim import NetworkSimulation
|
|
|
|
from . import utils, basestring, agents
|
|
from .environment import Environment
|
|
from .utils import logger
|
|
|
|
|
|
class Simulation(NetworkSimulation):
|
|
"""
|
|
Subclass of nsim.NetworkSimulation with three main differences:
|
|
1) agent type can be specified by name or by class.
|
|
2) instead of just one type, a network agents distribution can be used.
|
|
The distribution specifies the weight (or probability) of each
|
|
agent type in the topology. This is an example distribution: ::
|
|
|
|
[
|
|
{'agent_type': 'agent_type_1',
|
|
'weight': 0.2,
|
|
'state': {
|
|
'id': 0
|
|
}
|
|
},
|
|
{'agent_type': 'agent_type_2',
|
|
'weight': 0.8,
|
|
'state': {
|
|
'id': 1
|
|
}
|
|
}
|
|
]
|
|
|
|
In this example, 20% of the nodes will be marked as type
|
|
'agent_type_1'.
|
|
3) if no initial state is given, each node's state will be set
|
|
to `{'id': 0}`.
|
|
|
|
Parameters
|
|
---------
|
|
name : str, optional
|
|
name of the Simulation
|
|
topology : networkx.Graph instance, optional
|
|
network_params : dict
|
|
parameters used to create a topology with networkx, if no topology is given
|
|
network_agents : dict
|
|
definition of agents to populate the topology with
|
|
agent_type : NetworkAgent subclass, optional
|
|
Default type of NetworkAgent to use for nodes not specified in network_agents
|
|
states : list, optional
|
|
List of initial states corresponding to the nodes in the topology. Basic form is a list of integers
|
|
whose value indicates the state
|
|
dir_path : str, optional
|
|
Directory path where to save pickled objects
|
|
seed : str, optional
|
|
Seed to use for the random generator
|
|
num_trials : int, optional
|
|
Number of independent simulation runs
|
|
max_time : int, optional
|
|
Time how long the simulation should run
|
|
environment_params : dict, optional
|
|
Dictionary of globally-shared environmental parameters
|
|
environment_agents: dict, optional
|
|
Similar to network_agents. Distribution of Agents that control the environment
|
|
environment_class: soil.environment.Environment subclass, optional
|
|
Class for the environment. It defailts to soil.environment.Environment
|
|
load_module : str, module name, deprecated
|
|
If specified, soil will load the content of this module under 'soil.agents.custom'
|
|
|
|
|
|
"""
|
|
|
|
def __init__(self, name=None, topology=None, network_params=None,
|
|
network_agents=None, agent_type=None, states=None,
|
|
default_state=None, interval=1, dump=None, dry_run=False,
|
|
dir_path=None, num_trials=1, max_time=100,
|
|
load_module=None, seed=None,
|
|
environment_agents=None, environment_params=None,
|
|
environment_class=None, **kwargs):
|
|
|
|
if topology is None:
|
|
topology = utils.load_network(network_params,
|
|
dir_path=dir_path)
|
|
elif isinstance(topology, basestring) or isinstance(topology, dict):
|
|
topology = json_graph.node_link_graph(topology)
|
|
|
|
self.load_module = load_module
|
|
self.topology = nx.Graph(topology)
|
|
self.network_params = network_params
|
|
self.name = name or 'UnnamedSimulation'
|
|
self.num_trials = num_trials
|
|
self.max_time = max_time
|
|
self.default_state = default_state or {}
|
|
self.dir_path = dir_path or os.getcwd()
|
|
self.interval = interval
|
|
self.seed = str(seed) or str(time.time())
|
|
self.dump = dump
|
|
self.dry_run = dry_run
|
|
|
|
sys.path += [self.dir_path, os.getcwd()]
|
|
|
|
self.environment_params = environment_params or {}
|
|
self.environment_class = utils.deserialize(environment_class,
|
|
known_modules=['soil.environment', ]) or Environment
|
|
|
|
environment_agents = environment_agents or []
|
|
self.environment_agents = agents._convert_agent_types(environment_agents,
|
|
known_modules=[self.load_module])
|
|
|
|
distro = agents.calculate_distribution(network_agents,
|
|
agent_type)
|
|
self.network_agents = agents._convert_agent_types(distro,
|
|
known_modules=[self.load_module])
|
|
|
|
self.states = agents._validate_states(states,
|
|
self.topology)
|
|
|
|
def run_simulation(self, *args, **kwargs):
|
|
return self.run(*args, **kwargs)
|
|
|
|
def run(self, *args, **kwargs):
|
|
return list(self.run_simulation_gen(*args, **kwargs))
|
|
|
|
def run_simulation_gen(self, *args, parallel=False, dry_run=False,
|
|
**kwargs):
|
|
p = Pool()
|
|
with utils.timer('simulation {}'.format(self.name)):
|
|
if parallel:
|
|
func = partial(self.run_trial_exceptions, dry_run=dry_run or self.dry_run,
|
|
return_env=True,
|
|
**kwargs)
|
|
for i in p.imap_unordered(func, range(self.num_trials)):
|
|
if isinstance(i, Exception):
|
|
logger.error('Trial failed:\n\t{}'.format(i.message))
|
|
continue
|
|
yield i
|
|
else:
|
|
for i in range(self.num_trials):
|
|
yield self.run_trial(i, dry_run = dry_run or self.dry_run, **kwargs)
|
|
if not (dry_run or self.dry_run):
|
|
logger.info('Dumping results to {}'.format(self.dir_path))
|
|
self.dump_pickle(self.dir_path)
|
|
self.dump_yaml(self.dir_path)
|
|
else:
|
|
logger.info('NOT dumping results')
|
|
|
|
def get_env(self, trial_id = 0, **kwargs):
|
|
opts=self.environment_params.copy()
|
|
env_name='{}_trial_{}'.format(self.name, trial_id)
|
|
opts.update({
|
|
'name': env_name,
|
|
'topology': self.topology.copy(),
|
|
'seed': self.seed+env_name,
|
|
'initial_time': 0,
|
|
'dry_run': self.dry_run,
|
|
'interval': self.interval,
|
|
'network_agents': self.network_agents,
|
|
'states': self.states,
|
|
'default_state': self.default_state,
|
|
'environment_agents': self.environment_agents,
|
|
'dir_path': self.dir_path,
|
|
})
|
|
opts.update(kwargs)
|
|
env=self.environment_class(**opts)
|
|
return env
|
|
|
|
def run_trial(self, trial_id = 0, until = None, return_env = True, **opts):
|
|
"""Run a single trial of the simulation
|
|
|
|
Parameters
|
|
----------
|
|
trial_id : int
|
|
"""
|
|
# Set-up trial environment and graph
|
|
until=until or self.max_time
|
|
env=self.get_env(trial_id = trial_id, **opts)
|
|
# Set up agents on nodes
|
|
with utils.timer('Simulation {} trial {}'.format(self.name, trial_id)):
|
|
env.run(until)
|
|
if self.dump and not self.dry_run:
|
|
with utils.timer('Dumping simulation {} trial {}'.format(self.name, trial_id)):
|
|
env.dump(formats = self.dump)
|
|
if return_env:
|
|
return env
|
|
def run_trial_exceptions(self, *args, **kwargs):
|
|
'''
|
|
A wrapper for run_trial that catches exceptions and returns them.
|
|
It is meant for async simulations
|
|
'''
|
|
try:
|
|
return self.run_trial(*args, **kwargs)
|
|
except Exception as ex:
|
|
c = ex.__cause__
|
|
c.message = ''.join(traceback.format_exception(type(c), c, c.__traceback__)[:])
|
|
return c
|
|
|
|
def to_dict(self):
|
|
return self.__getstate__()
|
|
|
|
def to_yaml(self):
|
|
return yaml.dump(self.to_dict())
|
|
|
|
def dump_yaml(self, dir_path = None, file_name = None):
|
|
dir_path=dir_path or self.dir_path
|
|
if not os.path.exists(dir_path):
|
|
os.makedirs(dir_path)
|
|
if not file_name:
|
|
file_name=os.path.join(dir_path,
|
|
'{}.dumped.yml'.format(self.name))
|
|
with open(file_name, 'w') as f:
|
|
f.write(self.to_yaml())
|
|
|
|
def dump_pickle(self, dir_path = None, pickle_name = None):
|
|
dir_path=dir_path or self.dir_path
|
|
if not os.path.exists(dir_path):
|
|
os.makedirs(dir_path)
|
|
if not pickle_name:
|
|
pickle_name=os.path.join(dir_path,
|
|
'{}.simulation.pickle'.format(self.name))
|
|
with open(pickle_name, 'wb') as f:
|
|
pickle.dump(self, f)
|
|
|
|
def __getstate__(self):
|
|
state={}
|
|
for k, v in self.__dict__.items():
|
|
if k[0] != '_':
|
|
state[k]=v
|
|
state['topology']=json_graph.node_link_data(self.topology)
|
|
state['network_agents']=agents.serialize_distribution(self.network_agents,
|
|
known_modules = [])
|
|
state['environment_agents']=agents.serialize_distribution(self.environment_agents,
|
|
known_modules = [])
|
|
state['environment_class']=utils.serialize(self.environment_class,
|
|
known_modules=['soil.environment'])[1] # func, name
|
|
if state['load_module'] is None:
|
|
del state['load_module']
|
|
return state
|
|
|
|
def __setstate__(self, state):
|
|
self.__dict__ = state
|
|
self.load_module = getattr(self, 'load_module', None)
|
|
if self.dir_path not in sys.path:
|
|
sys.path += [self.dir_path, os.getcwd()]
|
|
self.topology = json_graph.node_link_graph(state['topology'])
|
|
self.network_agents = agents.calculate_distribution(agents._convert_agent_types(self.network_agents))
|
|
self.environment_agents = agents._convert_agent_types(self.environment_agents,
|
|
known_modules=[self.load_module])
|
|
self.environment_class = utils.deserialize(self.environment_class,
|
|
known_modules=[self.load_module, 'soil.environment', ]) # func, name
|
|
return state
|
|
|
|
|
|
def from_config(config):
|
|
config = list(utils.load_config(config))
|
|
if len(config) > 1:
|
|
raise AttributeError('Provide only one configuration')
|
|
config = config[0][0]
|
|
sim = Simulation(**config)
|
|
return sim
|
|
|
|
|
|
def run_from_config(*configs, results_dir='soil_output', dump=None, timestamp=False, **kwargs):
|
|
for config_def in configs:
|
|
# logger.info("Found {} config(s)".format(len(ls)))
|
|
for config, _ in utils.load_config(config_def):
|
|
name = config.get('name', 'unnamed')
|
|
logger.info("Using config(s): {name}".format(name=name))
|
|
|
|
if timestamp:
|
|
sim_folder = '{}_{}'.format(name,
|
|
time.strftime("%Y-%m-%d_%H:%M:%S"))
|
|
else:
|
|
sim_folder = name
|
|
dir_path = os.path.join(results_dir, sim_folder)
|
|
if dump is not None:
|
|
config['dump'] = dump
|
|
sim = Simulation(dir_path=dir_path, **config)
|
|
logger.info('Dumping results to {} : {}'.format(sim.dir_path, sim.dump))
|
|
sim.run_simulation(**kwargs)
|