mirror of
https://github.com/gsi-upm/soil
synced 2025-08-23 19:52:19 +00:00
WIP: exporters
This commit is contained in:
@@ -57,18 +57,20 @@ def main():
|
||||
logging.info('Loading config file: {}'.format(args.file))
|
||||
|
||||
try:
|
||||
dump = []
|
||||
if not args.dry_run:
|
||||
if args.csv:
|
||||
dump.append('csv')
|
||||
if args.graph:
|
||||
dump.append('gexf')
|
||||
exporters = list(args.exporter or [])
|
||||
if args.csv:
|
||||
exporters.append('CSV')
|
||||
if args.graph:
|
||||
exporters.append('Gexf')
|
||||
exp_params = {}
|
||||
if args.dry_run:
|
||||
exp_params['copy_to'] = sys.stdout
|
||||
simulation.run_from_config(args.file,
|
||||
dry_run=args.dry_run,
|
||||
dump=dump,
|
||||
exporters=args.exporter,
|
||||
exporters=exporters,
|
||||
parallel=(not args.synchronous),
|
||||
outdir=args.output)
|
||||
outdir=args.output,
|
||||
exporter_params=exp_params)
|
||||
except Exception:
|
||||
if args.pdb:
|
||||
pdb.post_mortem()
|
||||
|
@@ -1,18 +0,0 @@
|
||||
from . import BaseAgent
|
||||
|
||||
import os.path
|
||||
import matplotlib
|
||||
import matplotlib.pyplot as plt
|
||||
import networkx as nx
|
||||
|
||||
|
||||
class DrawingAgent(BaseAgent):
|
||||
"""
|
||||
Agent that draws the state of the network.
|
||||
"""
|
||||
|
||||
def step(self):
|
||||
# Outside effects
|
||||
f = plt.figure()
|
||||
nx.draw(self.env.G, node_size=10, width=0.2, pos=nx.spring_layout(self.env.G, scale=100), ax=f.add_subplot(111))
|
||||
f.savefig(os.path.join(self.env.get_path(), "graph-"+str(self.env.now)+".png"))
|
@@ -515,4 +515,3 @@ from .ModelM2 import *
|
||||
from .SentimentCorrelationModel import *
|
||||
from .SISaModel import *
|
||||
from .CounterModel import *
|
||||
from .DrawingAgent import *
|
||||
|
@@ -34,7 +34,7 @@ def _read_data(pattern, *args, from_csv=False, process_args=None, **kwargs):
|
||||
|
||||
|
||||
def read_sql(db, *args, **kwargs):
|
||||
h = history.History(db, backup=False)
|
||||
h = history.History(db_path=db, backup=False)
|
||||
df = h.read_sql(*args, **kwargs)
|
||||
return df
|
||||
|
||||
|
@@ -14,15 +14,13 @@ from networkx.readwrite import json_graph
|
||||
import networkx as nx
|
||||
import nxsim
|
||||
|
||||
from . import serialization, agents, analysis, history
|
||||
from . import serialization, agents, analysis, history, utils
|
||||
|
||||
# These properties will be copied when pickling/unpickling the environment
|
||||
_CONFIG_PROPS = [ 'name',
|
||||
'states',
|
||||
'default_state',
|
||||
'interval',
|
||||
'dry_run',
|
||||
'outdir',
|
||||
]
|
||||
|
||||
class Environment(nxsim.NetworkEnvironment):
|
||||
@@ -43,8 +41,6 @@ class Environment(nxsim.NetworkEnvironment):
|
||||
default_state=None,
|
||||
interval=1,
|
||||
seed=None,
|
||||
dry_run=False,
|
||||
outdir=None,
|
||||
topology=None,
|
||||
*args, **kwargs):
|
||||
self.name = name or 'UnnamedEnvironment'
|
||||
@@ -56,13 +52,8 @@ class Environment(nxsim.NetworkEnvironment):
|
||||
topology = nx.Graph()
|
||||
super().__init__(*args, topology=topology, **kwargs)
|
||||
self._env_agents = {}
|
||||
self.dry_run = dry_run
|
||||
self.interval = interval
|
||||
self.outdir = outdir or tempfile.mkdtemp('soil-env')
|
||||
if not dry_run:
|
||||
self.get_path()
|
||||
self._history = history.History(name=self.name if not dry_run else None,
|
||||
outdir=self.outdir,
|
||||
self._history = history.History(name=self.name,
|
||||
backup=True)
|
||||
# Add environment agents first, so their events get
|
||||
# executed before network agents
|
||||
@@ -167,8 +158,6 @@ class Environment(nxsim.NetworkEnvironment):
|
||||
self.log_stats()
|
||||
|
||||
def _save_state(self, now=None):
|
||||
# for agent in self.agents:
|
||||
# agent.save_state()
|
||||
serialization.logger.debug('Saving state @{}'.format(self.now))
|
||||
self._history.save_records(self.state_to_tuples(now=now))
|
||||
|
||||
@@ -222,15 +211,6 @@ class Environment(nxsim.NetworkEnvironment):
|
||||
'''
|
||||
return self[key] if key in self else default
|
||||
|
||||
def get_path(self, outdir=None):
|
||||
outdir = outdir or self.outdir
|
||||
if not os.path.exists(outdir):
|
||||
try:
|
||||
os.makedirs(outdir)
|
||||
except FileExistsError:
|
||||
pass
|
||||
return outdir
|
||||
|
||||
def get_agent(self, agent_id):
|
||||
return self.G.node[agent_id]['agent']
|
||||
|
||||
@@ -239,20 +219,15 @@ class Environment(nxsim.NetworkEnvironment):
|
||||
return list(self.agents)
|
||||
return [self.G.node[i]['agent'] for i in nodes]
|
||||
|
||||
def dump_csv(self, outdir=None):
|
||||
csv_name = os.path.join(self.get_path(outdir),
|
||||
'{}.environment.csv'.format(self.name))
|
||||
|
||||
with open(csv_name, 'w') as f:
|
||||
def dump_csv(self, f):
|
||||
with utils.open_or_reuse(f, 'w') as f:
|
||||
cr = csv.writer(f)
|
||||
cr.writerow(('agent_id', 't_step', 'key', 'value'))
|
||||
for i in self.history_to_tuples():
|
||||
cr.writerow(i)
|
||||
|
||||
def dump_gexf(self, outdir=None):
|
||||
def dump_gexf(self, f):
|
||||
G = self.history_to_graph()
|
||||
graph_path = os.path.join(self.get_path(outdir),
|
||||
self.name+".gexf")
|
||||
# Workaround for geometric models
|
||||
# See soil/soil#4
|
||||
for node in G.nodes():
|
||||
@@ -260,9 +235,9 @@ class Environment(nxsim.NetworkEnvironment):
|
||||
G.node[node]['viz'] = {"position": {"x": G.node[node]['pos'][0], "y": G.node[node]['pos'][1], "z": 0.0}}
|
||||
del (G.node[node]['pos'])
|
||||
|
||||
nx.write_gexf(G, graph_path, version="1.2draft")
|
||||
nx.write_gexf(G, f, version="1.2draft")
|
||||
|
||||
def dump(self, outdir=None, formats=None):
|
||||
def dump(self, *args, formats=None, **kwargs):
|
||||
if not formats:
|
||||
return
|
||||
functions = {
|
||||
@@ -271,10 +246,13 @@ class Environment(nxsim.NetworkEnvironment):
|
||||
}
|
||||
for f in formats:
|
||||
if f in functions:
|
||||
functions[f](outdir)
|
||||
functions[f](*args, **kwargs)
|
||||
else:
|
||||
raise ValueError('Unknown format: {}'.format(f))
|
||||
|
||||
def dump_sqlite(self, f):
|
||||
return self._history.dump(f)
|
||||
|
||||
def state_to_tuples(self, now=None):
|
||||
if now is None:
|
||||
now = self.now
|
||||
@@ -338,7 +316,7 @@ class Environment(nxsim.NetworkEnvironment):
|
||||
G.add_node(agent.id, **attributes)
|
||||
|
||||
return G
|
||||
|
||||
|
||||
def stats(self):
|
||||
stats = {}
|
||||
stats['network'] = {}
|
||||
|
@@ -1,43 +1,174 @@
|
||||
from .serialization import deserialize
|
||||
import os
|
||||
import time
|
||||
from io import BytesIO
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import networkx as nx
|
||||
import pandas as pd
|
||||
|
||||
from .serialization import deserialize
|
||||
from .utils import open_or_reuse, logger, timer
|
||||
|
||||
|
||||
def for_sim(simulation, names, dir_path=None):
|
||||
from . import utils
|
||||
|
||||
|
||||
def for_sim(simulation, names, *args, **kwargs):
|
||||
'''Return the set of exporters for a simulation, given the exporter names'''
|
||||
exporters = []
|
||||
for name in names:
|
||||
mod = deserialize(name, known_modules=['soil.exporters'])
|
||||
exporters.append(mod(simulation))
|
||||
exporters.append(mod(simulation, *args, **kwargs))
|
||||
return exporters
|
||||
|
||||
class DryRunner(BytesIO):
|
||||
def __init__(self, fname, *args, copy_to=None, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.__fname = fname
|
||||
self.__copy_to = copy_to
|
||||
|
||||
class Base:
|
||||
def write(self, txt):
|
||||
if self.__copy_to:
|
||||
self.__copy_to.write('{}:::{}'.format(self.__fname, txt))
|
||||
try:
|
||||
super().write(txt)
|
||||
except TypeError:
|
||||
super().write(bytes(txt, 'utf-8'))
|
||||
|
||||
def __init__(self, simulation):
|
||||
def close(self):
|
||||
logger.info('**Not** written to {} (dry run mode):\n\n{}\n\n'.format(self.__fname,
|
||||
self.getvalue().decode()))
|
||||
super().close()
|
||||
|
||||
|
||||
class Exporter:
|
||||
'''
|
||||
Interface for all exporters. It is not necessary, but it is useful
|
||||
if you don't plan to implement all the methods.
|
||||
'''
|
||||
|
||||
def __init__(self, simulation, outdir=None, dry_run=None, copy_to=None):
|
||||
self.sim = simulation
|
||||
outdir = outdir or os.getcwd()
|
||||
self.outdir = os.path.join(outdir,
|
||||
simulation.group or '',
|
||||
simulation.name)
|
||||
self.dry_run = dry_run
|
||||
self.copy_to = copy_to
|
||||
|
||||
def start(self):
|
||||
pass
|
||||
'''Method to call when the simulation starts'''
|
||||
|
||||
def end(self):
|
||||
pass
|
||||
'''Method to call when the simulation ends'''
|
||||
|
||||
def env(self):
|
||||
pass
|
||||
def trial_end(self, env):
|
||||
'''Method to call when a trial ends'''
|
||||
|
||||
def output(self, f, mode='w', **kwargs):
|
||||
if self.dry_run:
|
||||
f = DryRunner(f, copy_to=self.copy_to)
|
||||
else:
|
||||
try:
|
||||
if not os.path.isabs(f):
|
||||
f = os.path.join(self.outdir, f)
|
||||
except TypeError:
|
||||
pass
|
||||
return open_or_reuse(f, mode=mode, **kwargs)
|
||||
|
||||
|
||||
class Dummy(Base):
|
||||
class Default(Exporter):
|
||||
'''Default exporter. Writes CSV and sqlite results, as well as the simulation YAML'''
|
||||
|
||||
def start(self):
|
||||
with open(os.path.join(self.sim.outdir, 'dummy')) as f:
|
||||
f.write('simulation started @ {}'.format(time.time()))
|
||||
if not self.dry_run:
|
||||
logger.info('Dumping results to %s', self.outdir)
|
||||
self.sim.dump_yaml(outdir=self.outdir)
|
||||
else:
|
||||
logger.info('NOT dumping results')
|
||||
|
||||
def env(self, env):
|
||||
with open(os.path.join(self.sim.outdir, 'dummy-trial-{}'.format(env.name))) as f:
|
||||
def trial_end(self, env):
|
||||
if not self.dry_run:
|
||||
with timer('Dumping simulation {} trial {}'.format(self.sim.name,
|
||||
env.name)):
|
||||
with self.output('{}.sqlite'.format(env.name), mode='wb') as f:
|
||||
env.dump_sqlite(f)
|
||||
|
||||
|
||||
class CSV(Exporter):
|
||||
def trial_end(self, env):
|
||||
if not self.dry_run:
|
||||
with timer('[CSV] Dumping simulation {} trial {}'.format(self.sim.name,
|
||||
env.name)):
|
||||
with self.output('{}.csv'.format(env.name)) as f:
|
||||
env.dump_csv(f)
|
||||
|
||||
|
||||
class Gexf(Exporter):
|
||||
def trial_end(self, env):
|
||||
if not self.dry_run:
|
||||
with timer('[CSV] Dumping simulation {} trial {}'.format(self.sim.name,
|
||||
env.name)):
|
||||
with self.output('{}.gexf'.format(env.name), mode='wb') as f:
|
||||
env.dump_gexf(f)
|
||||
|
||||
|
||||
class Dummy(Exporter):
|
||||
|
||||
def start(self):
|
||||
with self.output('dummy', 'w') as f:
|
||||
f.write('simulation started @ {}\n'.format(time.time()))
|
||||
|
||||
def trial_end(self, env):
|
||||
with self.output('dummy', 'w') as f:
|
||||
for i in env.history_to_tuples():
|
||||
f.write(','.join(i))
|
||||
|
||||
f.write(','.join(map(str, i)))
|
||||
f.write('\n')
|
||||
|
||||
def end(self):
|
||||
with open(os.path.join(self.sim.outdir, 'dummy')) as f:
|
||||
f.write('simulation ended @ {}'.format(time.time()))
|
||||
with self.output('dummy', 'a') as f:
|
||||
f.write('simulation ended @ {}\n'.format(time.time()))
|
||||
|
||||
|
||||
class Distribution(Exporter):
|
||||
'''
|
||||
Write the distribution of agent states at the end of each trial,
|
||||
the mean value, and its deviation.
|
||||
'''
|
||||
|
||||
def start(self):
|
||||
self.means = []
|
||||
self.counts = []
|
||||
|
||||
def trial_end(self, env):
|
||||
df = env[None, None, None].df()
|
||||
ix = df.index[-1]
|
||||
attrs = df.columns.levels[0]
|
||||
vc = {}
|
||||
stats = {}
|
||||
for a in attrs:
|
||||
t = df.loc[(ix, a)]
|
||||
try:
|
||||
self.means.append(('mean', a, t.mean()))
|
||||
except TypeError:
|
||||
for name, count in t.value_counts().iteritems():
|
||||
self.counts.append(('count', a, name, count))
|
||||
|
||||
def end(self):
|
||||
dfm = pd.DataFrame(self.means, columns=['metric', 'key', 'value'])
|
||||
dfc = pd.DataFrame(self.counts, columns=['metric', 'key', 'value', 'count'])
|
||||
dfm = dfm.groupby(by=['key']).agg(['mean', 'std', 'count', 'median', 'max', 'min'])
|
||||
dfc = dfc.groupby(by=['key', 'value']).agg(['mean', 'std', 'count', 'median', 'max', 'min'])
|
||||
with self.output('counts.csv') as f:
|
||||
dfc.to_csv(f)
|
||||
with self.output('metrics.csv') as f:
|
||||
dfm.to_csv(f)
|
||||
|
||||
class GraphDrawing(Exporter):
|
||||
|
||||
def trial_end(self, env):
|
||||
# Outside effects
|
||||
f = plt.figure()
|
||||
nx.draw(env.G, node_size=10, width=0.2, pos=nx.spring_layout(env.G, scale=100), ax=f.add_subplot(111))
|
||||
with open('graph-{}.png'.format(env.name)) as f:
|
||||
f.savefig(f)
|
||||
|
@@ -4,6 +4,7 @@ import pandas as pd
|
||||
import sqlite3
|
||||
import copy
|
||||
import logging
|
||||
import tempfile
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -17,16 +18,18 @@ class History:
|
||||
Store and retrieve values from a sqlite database.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path=None, name=None, outdir=None, backup=False):
|
||||
if db_path is None and name:
|
||||
db_path = os.path.join(outdir or os.getcwd(),
|
||||
'{}.db.sqlite'.format(name))
|
||||
if db_path:
|
||||
if backup and os.path.exists(db_path):
|
||||
newname = db_path + '.backup{}.sqlite'.format(time.time())
|
||||
os.rename(db_path, newname)
|
||||
else:
|
||||
db_path = ":memory:"
|
||||
def __init__(self, name=None, db_path=None, backup=False):
|
||||
self._db = None
|
||||
|
||||
if db_path is None:
|
||||
if not name:
|
||||
name = time.time()
|
||||
_, db_path = tempfile.mkstemp(suffix='{}.sqlite'.format(name))
|
||||
|
||||
if backup and os.path.exists(db_path):
|
||||
newname = db_path + '.backup{}.sqlite'.format(time.time())
|
||||
os.rename(db_path, newname)
|
||||
|
||||
self.db_path = db_path
|
||||
|
||||
self.db = db_path
|
||||
@@ -49,6 +52,7 @@ class History:
|
||||
|
||||
@db.setter
|
||||
def db(self, db_path=None):
|
||||
self._close()
|
||||
db_path = db_path or self.db_path
|
||||
if isinstance(db_path, str):
|
||||
logger.debug('Connecting to database {}'.format(db_path))
|
||||
@@ -56,6 +60,13 @@ class History:
|
||||
else:
|
||||
self._db = db_path
|
||||
|
||||
def _close(self):
|
||||
if self._db is None:
|
||||
return
|
||||
self.flush_cache()
|
||||
self._db.close()
|
||||
self._db = None
|
||||
|
||||
@property
|
||||
def dtypes(self):
|
||||
self.read_types()
|
||||
@@ -110,7 +121,6 @@ class History:
|
||||
raise ValueError("Unknown datatype for {} and {}".format(key, value))
|
||||
return self._dtypes[key][2](value)
|
||||
|
||||
|
||||
def flush_cache(self):
|
||||
'''
|
||||
Use a cache to save state changes to avoid opening a session for every change.
|
||||
@@ -154,8 +164,6 @@ class History:
|
||||
return r.value()
|
||||
return r
|
||||
|
||||
|
||||
|
||||
def read_sql(self, keys=None, agent_ids=None, t_steps=None, convert_types=False, limit=-1):
|
||||
|
||||
self.read_types()
|
||||
@@ -214,16 +222,22 @@ class History:
|
||||
if t_steps:
|
||||
df_p = df_p.reindex(t_steps, method='ffill')
|
||||
return df_p.ffill()
|
||||
|
||||
|
||||
def __getstate__(self):
|
||||
state = dict(**self.__dict__)
|
||||
del state['_db']
|
||||
del state['_dtypes']
|
||||
return state
|
||||
|
||||
|
||||
def __setstate__(self, state):
|
||||
self.__dict__ = state
|
||||
self._dtypes = {}
|
||||
self._db = None
|
||||
|
||||
def dump(self, f):
|
||||
self._close()
|
||||
for line in open(self.db_path, 'rb'):
|
||||
f.write(line)
|
||||
|
||||
|
||||
class Records():
|
||||
@@ -274,10 +288,13 @@ class Records():
|
||||
i = self._df[f.key][str(f.agent_id)]
|
||||
ix = i.index.get_loc(f.t_step, method='ffill')
|
||||
return i.iloc[ix]
|
||||
except KeyError:
|
||||
except KeyError as ex:
|
||||
return self.dtypes[f.key][2]()
|
||||
return list(self)
|
||||
|
||||
def df(self):
|
||||
return self._df
|
||||
|
||||
def __getitem__(self, k):
|
||||
n = copy.copy(self)
|
||||
n.filter(k)
|
||||
@@ -293,6 +310,5 @@ class Records():
|
||||
return str(self.value())
|
||||
return '<Records for [{}]>'.format(self._filter)
|
||||
|
||||
|
||||
Key = namedtuple('Key', ['agent_id', 't_step', 'key'])
|
||||
Record = namedtuple('Record', 'agent_id t_step key value')
|
||||
|
@@ -2,16 +2,15 @@ import os
|
||||
import logging
|
||||
import ast
|
||||
import sys
|
||||
import yaml
|
||||
import importlib
|
||||
from glob import glob
|
||||
from random import random
|
||||
from copy import deepcopy
|
||||
from itertools import product, chain
|
||||
|
||||
import yaml
|
||||
import networkx as nx
|
||||
|
||||
from jinja2 import Template
|
||||
|
||||
import networkx as nx
|
||||
|
||||
logger = logging.getLogger('soil')
|
||||
logger.setLevel(logging.INFO)
|
||||
@@ -36,6 +35,9 @@ def load_network(network_params, dir_path=None):
|
||||
return method(path, **kwargs)
|
||||
|
||||
net_args = network_params.copy()
|
||||
if 'generator' not in net_args:
|
||||
return nx.Graph()
|
||||
|
||||
net_gen = net_args.pop('generator')
|
||||
|
||||
if dir_path not in sys.path:
|
||||
@@ -51,6 +53,7 @@ def load_file(infile):
|
||||
with open(infile, 'r') as f:
|
||||
return list(chain.from_iterable(map(expand_template, load_string(f))))
|
||||
|
||||
|
||||
def load_string(string):
|
||||
yield from yaml.load_all(string)
|
||||
|
||||
@@ -91,7 +94,6 @@ def expand_template(config):
|
||||
blank = list(load_string(blank_str))
|
||||
if len(blank) > 1:
|
||||
raise ValueError('Templates must not return more than one configuration')
|
||||
|
||||
if 'name' in blank[0]:
|
||||
raise ValueError('Templates cannot be named, use group instead')
|
||||
|
||||
|
@@ -15,7 +15,7 @@ from nxsim import NetworkSimulation
|
||||
|
||||
from . import serialization, utils, basestring, agents
|
||||
from .environment import Environment
|
||||
from .serialization import logger
|
||||
from .utils import logger
|
||||
from .exporters import for_sim as exporters_for_sim
|
||||
|
||||
|
||||
@@ -65,8 +65,6 @@ class Simulation(NetworkSimulation):
|
||||
whose value indicates the state
|
||||
dir_path: str, optional
|
||||
Directory path to load simulation assets (files, modules...)
|
||||
outdir : str, optional
|
||||
Directory path to save simulation results
|
||||
seed : str, optional
|
||||
Seed to use for the random generator
|
||||
num_trials : int, optional
|
||||
@@ -87,11 +85,11 @@ class Simulation(NetworkSimulation):
|
||||
|
||||
def __init__(self, name=None, group=None, topology=None, network_params=None,
|
||||
network_agents=None, agent_type=None, states=None,
|
||||
default_state=None, interval=1, dump=None, dry_run=False,
|
||||
outdir=None, num_trials=1, max_time=100,
|
||||
load_module=None, seed=None, dir_path=None,
|
||||
environment_agents=None, environment_params=None,
|
||||
environment_class=None, **kwargs):
|
||||
default_state=None, interval=1, num_trials=1,
|
||||
max_time=100, load_module=None, seed=None,
|
||||
dir_path=None, environment_agents=None,
|
||||
environment_params=None, environment_class=None,
|
||||
**kwargs):
|
||||
|
||||
self.seed = str(seed) or str(time.time())
|
||||
self.load_module = load_module
|
||||
@@ -101,18 +99,10 @@ class Simulation(NetworkSimulation):
|
||||
self.num_trials = num_trials
|
||||
self.max_time = max_time
|
||||
self.default_state = default_state or {}
|
||||
if not outdir:
|
||||
outdir = os.path.join(os.getcwd(),
|
||||
'soil_output')
|
||||
self.outdir = os.path.join(outdir,
|
||||
self.group or '',
|
||||
self.name)
|
||||
self.dir_path = dir_path or os.getcwd()
|
||||
self.interval = interval
|
||||
self.dump = dump
|
||||
self.dry_run = dry_run
|
||||
|
||||
sys.path += list(x for x in [self.outdir, os.getcwd(), self.dir_path] if x not in sys.path)
|
||||
sys.path += list(x for x in [os.getcwd(), self.dir_path] if x not in sys.path)
|
||||
|
||||
if topology is None:
|
||||
topology = serialization.load_network(network_params,
|
||||
@@ -142,6 +132,7 @@ class Simulation(NetworkSimulation):
|
||||
return self.run(*args, **kwargs)
|
||||
|
||||
def run(self, *args, **kwargs):
|
||||
'''Run the simulation and return the list of resulting environments'''
|
||||
return list(self._run_simulation_gen(*args, **kwargs))
|
||||
|
||||
def _run_sync_or_async(self, parallel=False, *args, **kwargs):
|
||||
@@ -152,7 +143,7 @@ class Simulation(NetworkSimulation):
|
||||
**kwargs)
|
||||
for i in p.imap_unordered(func, range(self.num_trials)):
|
||||
if isinstance(i, Exception):
|
||||
logger.error('Trial failed:\n\t{}'.format(i.message))
|
||||
logger.error('Trial failed:\n\t%s', i.message)
|
||||
continue
|
||||
yield i
|
||||
else:
|
||||
@@ -162,29 +153,30 @@ class Simulation(NetworkSimulation):
|
||||
**kwargs)
|
||||
|
||||
def _run_simulation_gen(self, *args, parallel=False, dry_run=False,
|
||||
exporters=None, **kwargs):
|
||||
exporters=None, outdir=None, exporter_params={}, **kwargs):
|
||||
logger.info('Using exporters: %s', exporters or [])
|
||||
logger.info('Output directory: %s', outdir)
|
||||
exporters = exporters_for_sim(self,
|
||||
exporters or [])
|
||||
exporters or [],
|
||||
dry_run=dry_run,
|
||||
outdir=outdir,
|
||||
**exporter_params)
|
||||
|
||||
with utils.timer('simulation {}'.format(self.name)):
|
||||
if not (dry_run or self.dry_run):
|
||||
logger.info('Dumping results to {}'.format(self.outdir))
|
||||
self.dump_pickle(self.outdir)
|
||||
self.dump_yaml(self.outdir)
|
||||
else:
|
||||
logger.info('NOT dumping results')
|
||||
for exporter in exporters:
|
||||
exporter.start()
|
||||
|
||||
for env in self._run_sync_or_async(*args, parallel=parallel,
|
||||
dry_run=dry_run, **kwargs):
|
||||
**kwargs):
|
||||
for exporter in exporters:
|
||||
exporter.env(env)
|
||||
exporter.trial_end(env)
|
||||
yield env
|
||||
|
||||
for exporter in exporters:
|
||||
exporter.end()
|
||||
|
||||
def get_env(self, trial_id = 0, **kwargs):
|
||||
'''Create an environment for a trial of the simulation'''
|
||||
opts = self.environment_params.copy()
|
||||
env_name = '{}_trial_{}'.format(self.name, trial_id)
|
||||
opts.update({
|
||||
@@ -192,19 +184,17 @@ class Simulation(NetworkSimulation):
|
||||
'topology': self.topology.copy(),
|
||||
'seed': self.seed+env_name,
|
||||
'initial_time': 0,
|
||||
'dry_run': self.dry_run,
|
||||
'interval': self.interval,
|
||||
'network_agents': self.network_agents,
|
||||
'states': self.states,
|
||||
'default_state': self.default_state,
|
||||
'environment_agents': self.environment_agents,
|
||||
'outdir': self.outdir,
|
||||
})
|
||||
opts.update(kwargs)
|
||||
env = self.environment_class(**opts)
|
||||
return env
|
||||
|
||||
def run_trial(self, trial_id=0, until=None, dry_run=False, **opts):
|
||||
def run_trial(self, trial_id=0, until=None, **opts):
|
||||
"""Run a single trial of the simulation
|
||||
|
||||
Parameters
|
||||
@@ -214,13 +204,9 @@ class Simulation(NetworkSimulation):
|
||||
# Set-up trial environment and graph
|
||||
until = until or self.max_time
|
||||
env = self.get_env(trial_id = trial_id, **opts)
|
||||
dry_run = self.dry_run or dry_run
|
||||
# Set up agents on nodes
|
||||
with utils.timer('Simulation {} trial {}'.format(self.name, trial_id)):
|
||||
env.run(until)
|
||||
if self.dump and not dry_run:
|
||||
with utils.timer('Dumping simulation {} trial {}'.format(self.name, trial_id)):
|
||||
env.dump(formats = self.dump)
|
||||
return env
|
||||
def run_trial_exceptions(self, *args, **kwargs):
|
||||
'''
|
||||
@@ -240,24 +226,25 @@ class Simulation(NetworkSimulation):
|
||||
def to_yaml(self):
|
||||
return yaml.dump(self.to_dict())
|
||||
|
||||
def dump_yaml(self, outdir = None, file_name = None):
|
||||
outdir = outdir or self.outdir
|
||||
if not os.path.exists(outdir):
|
||||
os.makedirs(outdir)
|
||||
if not file_name:
|
||||
file_name=os.path.join(outdir,
|
||||
'{}.dumped.yml'.format(self.name))
|
||||
with open(file_name, 'w') as f:
|
||||
|
||||
def dump_yaml(self, f=None, outdir=None):
|
||||
if not f and not outdir:
|
||||
raise ValueError('specify a file or an output directory')
|
||||
|
||||
if not f:
|
||||
f = os.path.join(outdir, '{}.dumped.yml'.format(self.name))
|
||||
|
||||
with utils.open_or_reuse(f, 'w') as f:
|
||||
f.write(self.to_yaml())
|
||||
|
||||
def dump_pickle(self, outdir = None, pickle_name = None):
|
||||
outdir = outdir or self.outdir
|
||||
if not os.path.exists(outdir):
|
||||
os.makedirs(outdir)
|
||||
if not pickle_name:
|
||||
pickle_name=os.path.join(outdir,
|
||||
'{}.simulation.pickle'.format(self.name))
|
||||
with open(pickle_name, 'wb') as f:
|
||||
def dump_pickle(self, f=None, outdir=None):
|
||||
if not outdir and not f:
|
||||
raise ValueError('specify a file or an output directory')
|
||||
|
||||
if not f:
|
||||
f = os.path.join(outdir,
|
||||
'{}.simulation.pickle'.format(self.name))
|
||||
with utils.open_or_reuse(f, 'wb') as f:
|
||||
pickle.dump(self, f)
|
||||
|
||||
def __getstate__(self):
|
||||
@@ -279,8 +266,6 @@ class Simulation(NetworkSimulation):
|
||||
def __setstate__(self, state):
|
||||
self.__dict__ = state
|
||||
self.load_module = getattr(self, 'load_module', None)
|
||||
if self.outdir not in sys.path:
|
||||
sys.path += [self.outdir, os.getcwd()]
|
||||
if self.dir_path not in sys.path:
|
||||
sys.path += [self.dir_path, os.getcwd()]
|
||||
self.topology = json_graph.node_link_graph(state['topology'])
|
||||
@@ -308,24 +293,14 @@ def from_config(conf_or_path):
|
||||
return sim
|
||||
|
||||
|
||||
def run_from_config(*configs, outdir=None, dump=None, timestamp=False, **kwargs):
|
||||
def run_from_config(*configs, **kwargs):
|
||||
for config_def in configs:
|
||||
# logger.info("Found {} config(s)".format(len(ls)))
|
||||
for config, path in serialization.load_config(config_def):
|
||||
name = config.get('name', 'unnamed')
|
||||
logger.info("Using config(s): {name}".format(name=name))
|
||||
|
||||
if timestamp:
|
||||
sim_folder = '{}_{}'.format(name,
|
||||
time.strftime("%Y-%m-%d_%H:%M:%S"))
|
||||
else:
|
||||
sim_folder = name
|
||||
if dump is not None:
|
||||
config['dump'] = dump
|
||||
dir_path = config.pop('dir_path', os.path.dirname(path))
|
||||
outdir = config.pop('outdir', outdir)
|
||||
sim = Simulation(dir_path=dir_path,
|
||||
outdir=outdir,
|
||||
**config)
|
||||
logger.info('Dumping results to {} : {}'.format(sim.outdir, sim.dump))
|
||||
sim.run_simulation(**kwargs)
|
||||
|
@@ -1,5 +1,6 @@
|
||||
import logging
|
||||
import time
|
||||
import os
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
@@ -20,3 +21,17 @@ def timer(name='task', pre="", function=logger.info, to_object=None):
|
||||
if to_object:
|
||||
to_object.start = start
|
||||
to_object.end = end
|
||||
|
||||
|
||||
def safe_open(path, *args, **kwargs):
|
||||
outdir = os.path.dirname(path)
|
||||
if outdir and not os.path.exists(outdir):
|
||||
os.makedirs(outdir)
|
||||
return open(path, *args, **kwargs)
|
||||
|
||||
|
||||
def open_or_reuse(f, *args, **kwargs):
|
||||
try:
|
||||
return safe_open(f, *args, **kwargs)
|
||||
except TypeError:
|
||||
return f
|
||||
|
Reference in New Issue
Block a user