@ -1,8 +1,9 @@
import os
import time
import imp
import imp ortlib
import sys
import yaml
import traceback
import networkx as nx
from networkx . readwrite import json_graph
from multiprocessing import Pool
@ -78,6 +79,7 @@ class Simulation(NetworkSimulation):
"""
def __init__ ( self , name = None , topology = None , network_params = None ,
network_agents = None , agent_type = None , states = None ,
default_state = None , interval = 1 , dump = None , dry_run = False ,
@ -104,23 +106,21 @@ class Simulation(NetworkSimulation):
self . seed = str ( seed ) or str ( time . time ( ) )
self . dump = dump
self . dry_run = dry_run
self . environment_params = environment_params or { }
self . environment_class = utils . deserialize ( environment_class ,
known_modules = [ ' soil.environment ' , ] ) or Environment
self . _loaded_module = None
sys . path + = [ self . dir_path , os . getcwd ( ) ]
if load_module :
path = sys . path + [ self . dir_path , os . getcwd ( ) ]
f , fp , desc = imp . find_module ( load_module , path )
self . _loaded_module = imp . load_module ( ' soil.agents.custom ' , f , fp , desc )
self . environment_params = environment_params or { }
self . environment_class = utils . deserialize ( environment_class ,
known_modules = [ ' soil.environment ' , ] ) or Environment
environment_agents = environment_agents or [ ]
self . environment_agents = agents . _convert_agent_types ( environment_agents )
self . environment_agents = agents . _convert_agent_types ( environment_agents ,
known_modules = [ self . load_module ] )
distro = agents . calculate_distribution ( network_agents ,
agent_type )
self . network_agents = agents . _convert_agent_types ( distro )
self . network_agents = agents . _convert_agent_types ( distro ,
known_modules = [ self . load_module ] )
self . states = agents . _validate_states ( states ,
self . topology )
@ -136,13 +136,17 @@ class Simulation(NetworkSimulation):
p = Pool ( )
with utils . timer ( ' simulation {} ' . format ( self . name ) ) :
if parallel :
func = partial ( self . run_trial , dry_run = dry_run or self . dry_run ,
return_env = not parallel , * * kwargs )
func = partial ( self . run_trial_exceptions , dry_run = dry_run or self . dry_run ,
return_env = True ,
* * kwargs )
for i in p . imap_unordered ( func , range ( self . num_trials ) ) :
if isinstance ( i , Exception ) :
logger . error ( ' Trial failed: \n \t {} ' . format ( i . message ) )
continue
yield i
else :
for i in range ( self . num_trials ) :
yield self . run_trial ( i , dry_run = dry_run or self . dry_run , * * kwargs )
yield self . run_trial ( i , dry_run = dry_run or self . dry_run , * * kwargs )
if not ( dry_run or self . dry_run ) :
logger . info ( ' Dumping results to {} ' . format ( self . dir_path ) )
self . dump_pickle ( self . dir_path )
@ -150,9 +154,9 @@ class Simulation(NetworkSimulation):
else :
logger . info ( ' NOT dumping results ' )
def get_env ( self , trial_id = 0 , * * kwargs ) :
opts = self . environment_params . copy ( )
env_name = ' {} _trial_ {} ' . format ( self . name , trial_id )
def get_env ( self , trial_id = 0 , * * kwargs ) :
opts = self . environment_params . copy ( )
env_name = ' {} _trial_ {} ' . format ( self . name , trial_id )
opts . update ( {
' name ' : env_name ,
' topology ' : self . topology . copy ( ) ,
@ -167,10 +171,10 @@ class Simulation(NetworkSimulation):
' dir_path ' : self . dir_path ,
} )
opts . update ( kwargs )
env = self . environment_class ( * * opts )
env = self . environment_class ( * * opts )
return env
def run_trial ( self , trial_id = 0 , until = None , return_env = True , * * opts ) :
def run_trial ( self , trial_id = 0 , until = None , return_env = True , * * opts ) :
""" Run a single trial of the simulation
Parameters
@ -178,16 +182,27 @@ class Simulation(NetworkSimulation):
trial_id : int
"""
# Set-up trial environment and graph
until = until or self . max_time
env = self . get_env ( trial_id = trial_id , * * opts )
until = until or self . max_time
env = self . get_env ( trial_id = trial_id , * * opts )
# Set up agents on nodes
with utils . timer ( ' Simulation {} trial {} ' . format ( self . name , trial_id ) ) :
env . run ( until )
if self . dump and not self . dry_run :
with utils . timer ( ' Dumping simulation {} trial {} ' . format ( self . name , trial_id ) ) :
env . dump ( formats = self . dump )
env . dump ( formats = self . dump )
if return_env :
return env
def run_trial_exceptions ( self , * args , * * kwargs ) :
'''
A wrapper for run_trial that catches exceptions and returns them .
It is meant for async simulations
'''
try :
return self . run_trial ( * args , * * kwargs )
except Exception as ex :
c = ex . __cause__
c . message = ' ' . join ( traceback . format_tb ( c . __traceback__ ) [ 3 : ] )
return c
def to_dict ( self ) :
return self . __getstate__ ( )
@ -195,48 +210,53 @@ class Simulation(NetworkSimulation):
def to_yaml ( self ) :
return yaml . dump ( self . to_dict ( ) )
def dump_yaml ( self , dir_path = None , file_name = None ) :
dir_path = dir_path or self . dir_path
def dump_yaml ( self , dir_path = None , file_name = None ) :
dir_path = dir_path or self . dir_path
if not os . path . exists ( dir_path ) :
os . makedirs ( dir_path )
if not file_name :
file_name = os . path . join ( dir_path ,
file_name = os . path . join ( dir_path ,
' {} .dumped.yml ' . format ( self . name ) )
with open ( file_name , ' w ' ) as f :
f . write ( self . to_yaml ( ) )
def dump_pickle ( self , dir_path = None , pickle_name = None ) :
dir_path = dir_path or self . dir_path
def dump_pickle ( self , dir_path = None , pickle_name = None ) :
dir_path = dir_path or self . dir_path
if not os . path . exists ( dir_path ) :
os . makedirs ( dir_path )
if not pickle_name :
pickle_name = os . path . join ( dir_path ,
pickle_name = os . path . join ( dir_path ,
' {} .simulation.pickle ' . format ( self . name ) )
with open ( pickle_name , ' wb ' ) as f :
pickle . dump ( self , f )
def __getstate__ ( self ) :
state = { }
state = { }
for k , v in self . __dict__ . items ( ) :
if k [ 0 ] != ' _ ' :
state [ k ] = v
state [ ' topology ' ] = json_graph . node_link_data ( self . topology )
state [ ' network_agents ' ] = agents . serialize_distribution ( self . network_agents )
state [ ' environment_agents ' ] = agents . _convert_agent_types ( self . environment_agents ,
to_string = True )
state [ ' environment_class ' ] = utils . serialize ( self . environment_class ,
known_modules = [ ' soil.environment ' , ] ) [ 1 ] # func, name
state [ k ] = v
state [ ' topology ' ] = json_graph . node_link_data ( self . topology )
state [ ' network_agents ' ] = agents . serialize_distribution ( self . network_agents ,
known_modules = [ ] )
state [ ' environment_agents ' ] = agents . serialize_distribution ( self . environment_agents ,
known_modules = [ ] )
state [ ' environment_class ' ] = utils . serialize ( self . environment_class ,
known_modules = [ ' soil.environment ' ] ) [ 1 ] # func, name
if state [ ' load_module ' ] is None :
del state [ ' load_module ' ]
return state
def __setstate__ ( self , state ) :
self . __dict__ = state
self . load_module = getattr ( self , ' load_module ' , None )
if self . dir_path not in sys . path :
sys . path + = [ self . dir_path , os . getcwd ( ) ]
self . topology = json_graph . node_link_graph ( state [ ' topology ' ] )
self . network_agents = agents . calculate_distribution ( agents . _convert_agent_types ( self . network_agents ) )
self . environment_agents = agents . _convert_agent_types ( self . environment_agents )
self . environment_agents = agents . _convert_agent_types ( self . environment_agents ,
known_modules = [ self . load_module ] )
self . environment_class = utils . deserialize ( self . environment_class ,
known_modules = [ ' soil.environment ' , ] ) # func, name
known_modules = [ self . load_module , ' soil.environment ' , ] ) # func, name
return state