1
0
mirror of https://github.com/gsi-upm/soil synced 2024-11-22 11:12:29 +00:00
soil/tests/test_main.py

365 lines
12 KiB
Python
Raw Permalink Normal View History

from unittest import TestCase
import os
2019-04-30 14:16:46 +00:00
import io
import yaml
2022-05-10 14:29:06 +00:00
import copy
import pickle
import networkx as nx
from functools import partial
from os.path import join
2022-09-13 16:16:31 +00:00
from soil import (simulation, Environment, agents, network, serialization,
2022-09-16 16:13:39 +00:00
utils, config)
from soil.time import Delta
ROOT = os.path.abspath(os.path.dirname(__file__))
EXAMPLES = join(ROOT, '..', 'examples')
2022-09-13 16:16:31 +00:00
class CustomAgent(agents.FSM, agents.NetworkAgent):
@agents.default_state
@agents.state
def normal(self):
self.neighbors = self.count_agents(state_id='normal',
limit_neighbors=True)
@agents.state
def unreachable(self):
return
class TestMain(TestCase):
def test_load_graph(self):
"""
Load a graph from file if the extension is known.
Raise an exception otherwise.
"""
config = {
'network_params': {
'path': join(ROOT, 'test.gexf')
}
}
2022-09-13 16:16:31 +00:00
G = network.from_config(config['network_params'])
assert G
assert len(G) == 2
with self.assertRaises(AttributeError):
config = {
'network_params': {
'path': join(ROOT, 'unknown.extension')
}
}
2022-09-13 16:16:31 +00:00
G = network.from_config(config['network_params'])
print(G)
def test_generate_barabasi(self):
"""
If no path is given, a generator and network parameters
should be used to generate a network
"""
2022-09-13 16:16:31 +00:00
cfg = {
'params': {
'generator': 'barabasi_albert_graph'
}
}
2022-09-13 16:16:31 +00:00
with self.assertRaises(Exception):
G = network.from_config(cfg)
cfg['params']['n'] = 100
cfg['params']['m'] = 10
G = network.from_config(cfg)
assert len(G) == 100
def test_empty_simulation(self):
"""A simulation with a base behaviour should do nothing"""
config = {
'network_params': {
'path': join(ROOT, 'test.gexf')
},
'agent_type': 'BaseAgent',
'environment_params': {
}
}
2022-05-10 14:29:06 +00:00
s = simulation.from_old_config(config)
s.run_simulation(dry_run=True)
2022-05-10 14:29:06 +00:00
def test_network_agent(self):
"""
The initial states should be applied to the agent and the
agent should be able to update its state."""
config = {
'name': 'CounterAgent',
'network_params': {
2022-05-10 14:29:06 +00:00
'generator': nx.complete_graph,
'n': 2,
},
'agent_type': 'CounterModel',
2022-05-10 14:29:06 +00:00
'states': {
0: {'times': 10},
1: {'times': 20},
},
'max_time': 2,
'num_trials': 1,
'environment_params': {
}
}
2022-05-10 14:29:06 +00:00
s = simulation.from_old_config(config)
2022-09-13 16:16:31 +00:00
2022-05-10 14:29:06 +00:00
def test_counter_agent(self):
"""
2022-05-10 14:29:06 +00:00
The initial states should be applied to the agent and the
agent should be able to update its state."""
config = {
2022-09-13 16:16:31 +00:00
'version': '2',
'general': {
'name': 'CounterAgent',
'max_time': 2,
'dry_run': True,
'num_trials': 1,
},
2022-09-13 16:16:31 +00:00
'topologies': {
'default': {
'path': join(ROOT, 'test.gexf')
}
},
'agents': {
'default': {
'agent_class': 'CounterModel',
},
'counters': {
'topology': 'default',
'fixed': [{'state': {'times': 10}}, {'state': {'times': 20}}],
}
}
}
2022-09-13 16:16:31 +00:00
s = simulation.from_config(config)
env = s.get_env()
assert isinstance(env.agents[0], agents.CounterModel)
assert env.agents[0].topology == env.topologies['default']
assert env.agents[0]['times'] == 10
assert env.agents[0]['times'] == 10
env.step()
assert env.agents[0]['times'] == 11
assert env.agents[1]['times'] == 21
def test_custom_agent(self):
"""Allow for search of neighbors with a certain state_id"""
config = {
'network_params': {
'path': join(ROOT, 'test.gexf')
},
'network_agents': [{
'agent_type': CustomAgent,
'weight': 1
}],
'max_time': 10,
'environment_params': {
}
}
2022-05-10 14:29:06 +00:00
s = simulation.from_old_config(config)
env = s.run_simulation(dry_run=True)[0]
2022-09-13 16:16:31 +00:00
assert env.agents[1].count_agents(state_id='normal') == 2
assert env.agents[1].count_agents(state_id='normal', limit_neighbors=True) == 1
assert env.agents[0].neighbors == 1
def test_torvalds_example(self):
"""A complete example from a documentation should work."""
2019-04-26 17:22:45 +00:00
config = serialization.load_file(join(EXAMPLES, 'torvalds.yml'))[0]
config['network_params']['path'] = join(EXAMPLES,
config['network_params']['path'])
2022-05-10 14:29:06 +00:00
s = simulation.from_old_config(config)
2019-04-29 16:47:15 +00:00
env = s.run_simulation(dry_run=True)[0]
for a in env.network_agents:
skill_level = a.state['skill_level']
if a.id == 'Torvalds':
assert skill_level == 'God'
assert a.state['total'] == 3
assert a.state['neighbors'] == 2
elif a.id == 'balkian':
assert skill_level == 'developer'
assert a.state['total'] == 3
assert a.state['neighbors'] == 1
else:
assert skill_level == 'beginner'
assert a.state['total'] == 3
assert a.state['neighbors'] == 1
def test_yaml(self):
"""
2022-05-10 14:29:06 +00:00
The YAML version of a newly created configuration should be equivalent
to the configuration file used.
Values not present in the original config file should have reasonable
defaults.
"""
with utils.timer('loading'):
2019-04-26 17:22:45 +00:00
config = serialization.load_file(join(EXAMPLES, 'complete.yml'))[0]
2022-05-10 14:29:06 +00:00
s = simulation.from_old_config(config)
with utils.timer('serializing'):
2022-09-13 16:16:31 +00:00
serial = s.to_yaml()
with utils.timer('recovering'):
recovered = yaml.load(serial, Loader=yaml.SafeLoader)
2022-05-10 14:29:06 +00:00
for (k, v) in config.items():
assert recovered[k] == v
def test_configuration_changes(self):
"""
The configuration should not change after running
the simulation.
"""
2019-04-26 17:22:45 +00:00
config = serialization.load_file(join(EXAMPLES, 'complete.yml'))[0]
2022-05-10 14:29:06 +00:00
s = simulation.from_old_config(config)
init_config = copy.copy(s.config)
2021-10-14 15:37:06 +00:00
s.run_simulation(dry_run=True)
2022-05-10 14:29:06 +00:00
nconfig = s.config
# del nconfig['to
assert init_config == nconfig
2017-10-17 17:48:56 +00:00
def test_save_geometric(self):
"""
There is a bug in networkx that prevents it from creating a GEXF file
from geometric models. We should work around it.
"""
G = nx.random_geometric_graph(20, 0.1)
2019-04-29 16:47:15 +00:00
env = Environment(topology=G)
2019-04-30 14:16:46 +00:00
f = io.BytesIO()
env.dump_gexf(f)
2018-12-04 08:54:29 +00:00
def test_serialize_class(self):
2022-05-10 14:29:06 +00:00
ser, name = serialization.serialize(agents.BaseAgent, known_modules=[])
2018-12-04 08:54:29 +00:00
assert name == 'soil.agents.BaseAgent'
assert ser == agents.BaseAgent
2022-05-10 14:29:06 +00:00
ser, name = serialization.serialize(agents.BaseAgent, known_modules=['soil', ])
assert name == 'BaseAgent'
assert ser == agents.BaseAgent
2019-04-26 17:22:45 +00:00
ser, name = serialization.serialize(CustomAgent)
2018-12-04 08:54:29 +00:00
assert name == 'test_main.CustomAgent'
assert ser == CustomAgent
pickle.dumps(ser)
2018-12-04 08:54:29 +00:00
def test_serialize_builtin_types(self):
2018-12-04 08:54:29 +00:00
for i in [1, None, True, False, {}, [], list(), dict()]:
2019-04-26 17:22:45 +00:00
ser, name = serialization.serialize(i)
2018-12-04 08:54:29 +00:00
assert type(ser) == str
2019-04-26 17:22:45 +00:00
des = serialization.deserialize(name, ser)
2018-12-04 08:54:29 +00:00
assert i == des
def test_serialize_agent_type(self):
'''A class from soil.agents should be serialized without the module part'''
ser = agents.serialize_type(CustomAgent)
assert ser == 'test_main.CustomAgent'
ser = agents.serialize_type(agents.BaseAgent)
assert ser == 'BaseAgent'
pickle.dumps(ser)
2018-12-04 08:54:29 +00:00
def test_deserialize_agent_distribution(self):
agent_distro = [
{
'agent_type': 'CounterModel',
'weight': 1
},
{
'agent_type': 'test_main.CustomAgent',
2018-12-04 08:54:29 +00:00
'weight': 2
},
]
2021-10-14 15:37:06 +00:00
converted = agents.deserialize_definition(agent_distro)
2018-12-04 08:54:29 +00:00
assert converted[0]['agent_type'] == agents.CounterModel
assert converted[1]['agent_type'] == CustomAgent
pickle.dumps(converted)
2018-12-04 08:54:29 +00:00
def test_serialize_agent_distribution(self):
agent_distro = [
{
'agent_type': agents.CounterModel,
'weight': 1
},
{
'agent_type': CustomAgent,
2018-12-04 08:54:29 +00:00
'weight': 2
},
]
2021-10-14 15:37:06 +00:00
converted = agents.serialize_definition(agent_distro)
2018-12-04 08:54:29 +00:00
assert converted[0]['agent_type'] == 'CounterModel'
assert converted[1]['agent_type'] == 'test_main.CustomAgent'
pickle.dumps(converted)
2019-02-19 20:17:19 +00:00
def test_subgraph(self):
'''An agent should be able to subgraph the global topology'''
G = nx.Graph()
G.add_node(3)
G.add_edge(1, 2)
distro = agents.calculate_distribution(agent_type=agents.NetworkAgent)
2022-09-16 16:13:39 +00:00
distro[0]['topology'] = 'default'
aconfig = config.AgentConfig(distribution=distro, topology='default')
env = Environment(name='Test', topologies={'default': G}, agents={'network': aconfig})
2019-02-19 20:17:19 +00:00
lst = list(env.network_agents)
2022-09-16 16:13:39 +00:00
a2 = env.find_one(node_id=2)
a3 = env.find_one(node_id=3)
2019-02-19 20:17:19 +00:00
assert len(a2.subgraph(limit_neighbors=True)) == 2
assert len(a3.subgraph(limit_neighbors=True)) == 1
assert len(a3.subgraph(limit_neighbors=True, center=False)) == 0
assert len(a3.subgraph(agent_type=agents.NetworkAgent)) == 3
2019-04-26 17:22:45 +00:00
def test_templates(self):
'''Loading a template should result in several configs'''
configs = serialization.load_file(join(EXAMPLES, 'template.yml'))
assert len(configs) > 0
def test_until(self):
config = {
2021-10-14 15:37:06 +00:00
'name': 'until_sim',
'network_params': {},
'agent_type': 'CounterModel',
'max_time': 2,
'num_trials': 50,
'environment_params': {}
}
2022-05-10 14:29:06 +00:00
s = simulation.from_old_config(config)
runs = list(s.run_simulation(dry_run=True))
over = list(x.now for x in runs if x.now>2)
assert len(runs) == config['num_trials']
assert len(over) == 0
def test_fsm(self):
'''Basic state change'''
class ToggleAgent(agents.FSM):
@agents.default_state
@agents.state
def ping(self):
return self.pong
@agents.state
def pong(self):
return self.ping
a = ToggleAgent(unique_id=1, model=Environment())
assert a.state_id == a.ping.id
a.step()
assert a.state_id == a.pong.id
a.step()
assert a.state_id == a.ping.id
def test_fsm_when(self):
'''Basic state change'''
class ToggleAgent(agents.FSM):
@agents.default_state
@agents.state
def ping(self):
return self.pong, 2
@agents.state
def pong(self):
return self.ping
a = ToggleAgent(unique_id=1, model=Environment())
when = a.step()
assert when == 2
when = a.step()
assert when == Delta(a.interval)