Compare commits

...

2 Commits

Author SHA1 Message Date
J. Fernando Sánchez 880a9f2a1c black formatting 2 years ago
J. Fernando Sánchez 227fdf050e Fix conditionals 2 years ago

@ -2,16 +2,17 @@ from networkx import Graph
import random import random
import networkx as nx import networkx as nx
def mygenerator(n=5, n_edges=5): def mygenerator(n=5, n_edges=5):
''' """
Just a simple generator that creates a network with n nodes and Just a simple generator that creates a network with n nodes and
n_edges edges. Edges are assigned randomly, only avoiding self loops. n_edges edges. Edges are assigned randomly, only avoiding self loops.
''' """
G = nx.Graph() G = nx.Graph()
for i in range(n): for i in range(n):
G.add_node(i) G.add_node(i)
for i in range(n_edges): for i in range(n_edges):
nodes = list(G.nodes) nodes = list(G.nodes)
n_in = random.choice(nodes) n_in = random.choice(nodes)
@ -19,9 +20,3 @@ def mygenerator(n=5, n_edges=5):
n_out = random.choice(nodes) n_out = random.choice(nodes)
G.add_edge(n_in, n_out) G.add_edge(n_in, n_out)
return G return G

@ -2,34 +2,37 @@ from soil.agents import FSM, state, default_state
class Fibonacci(FSM): class Fibonacci(FSM):
'''Agent that only executes in t_steps that are Fibonacci numbers''' """Agent that only executes in t_steps that are Fibonacci numbers"""
defaults = { defaults = {"prev": 1}
'prev': 1
}
@default_state @default_state
@state @state
def counting(self): def counting(self):
self.log('Stopping at {}'.format(self.now)) self.log("Stopping at {}".format(self.now))
prev, self['prev'] = self['prev'], max([self.now, self['prev']]) prev, self["prev"] = self["prev"], max([self.now, self["prev"]])
return None, self.env.timeout(prev) return None, self.env.timeout(prev)
class Odds(FSM): class Odds(FSM):
'''Agent that only executes in odd t_steps''' """Agent that only executes in odd t_steps"""
@default_state @default_state
@state @state
def odds(self): def odds(self):
self.log('Stopping at {}'.format(self.now)) self.log("Stopping at {}".format(self.now))
return None, self.env.timeout(1+self.now%2) return None, self.env.timeout(1 + self.now % 2)
if __name__ == '__main__': if __name__ == "__main__":
from soil import Simulation from soil import Simulation
s = Simulation(network_agents=[{'ids': [0], 'agent_class': Fibonacci},
{'ids': [1], 'agent_class': Odds}], s = Simulation(
network_params={"generator": "complete_graph", "n": 2}, network_agents=[
max_time=100, {"ids": [0], "agent_class": Fibonacci},
) {"ids": [1], "agent_class": Odds},
],
network_params={"generator": "complete_graph", "n": 2},
max_time=100,
)
s.run(dry_run=True) s.run(dry_run=True)

@ -14,16 +14,18 @@ def network_portrayal(env):
# The model ensures there is 0 or 1 agent per node # The model ensures there is 0 or 1 agent per node
portrayal = dict() portrayal = dict()
wealths = {node_id: data['agent'].wealth for (node_id, data) in env.G.nodes(data=True)} wealths = {
node_id: data["agent"].wealth for (node_id, data) in env.G.nodes(data=True)
}
portrayal["nodes"] = [ portrayal["nodes"] = [
{ {
"id": node_id, "id": node_id,
"size": 2*(wealth+1), "size": 2 * (wealth + 1),
"color": "#CC0000" if wealth == 0 else "#007959", "color": "#CC0000" if wealth == 0 else "#007959",
# "color": "#CC0000", # "color": "#CC0000",
"label": f"{node_id}: {wealth}", "label": f"{node_id}: {wealth}",
} for (node_id, wealth) in wealths.items() }
for (node_id, wealth) in wealths.items()
] ]
portrayal["edges"] = [ portrayal["edges"] = [
@ -41,7 +43,7 @@ def gridPortrayal(agent):
:param agent: the agent in the simulation :param agent: the agent in the simulation
:return: the portrayal dictionary :return: the portrayal dictionary
""" """
color = max(10, min(agent.wealth*10, 100)) color = max(10, min(agent.wealth * 10, 100))
return { return {
"Shape": "rect", "Shape": "rect",
"w": 1, "w": 1,
@ -52,7 +54,7 @@ def gridPortrayal(agent):
"Text": agent.unique_id, "Text": agent.unique_id,
"x": agent.pos[0], "x": agent.pos[0],
"y": agent.pos[1], "y": agent.pos[1],
"Color": f"rgba(31, 10, 255, 0.{color})" "Color": f"rgba(31, 10, 255, 0.{color})",
} }
@ -79,7 +81,7 @@ model_params = {
10, 10,
1, 1,
description="Grid height", description="Grid height",
), ),
"width": UserSettableParameter( "width": UserSettableParameter(
"slider", "slider",
"width", "width",
@ -88,16 +90,20 @@ model_params = {
10, 10,
1, 1,
description="Grid width", description="Grid width",
), ),
"agent_class": UserSettableParameter('choice', 'Agent class', value='MoneyAgent', "agent_class": UserSettableParameter(
choices=['MoneyAgent', 'SocialMoneyAgent']), "choice",
"Agent class",
value="MoneyAgent",
choices=["MoneyAgent", "SocialMoneyAgent"],
),
"generator": graph_generator, "generator": graph_generator,
} }
canvas_element = CanvasGrid(gridPortrayal, canvas_element = CanvasGrid(
model_params["width"].value, gridPortrayal, model_params["width"].value, model_params["height"].value, 500, 500
model_params["height"].value, 500, 500) )
server = ModularServer( server = ModularServer(

@ -1,9 +1,10 @@
''' """
This is an example that adds soil agents and environment in a normal This is an example that adds soil agents and environment in a normal
mesa workflow. mesa workflow.
''' """
from mesa import Agent as MesaAgent from mesa import Agent as MesaAgent
from mesa.space import MultiGrid from mesa.space import MultiGrid
# from mesa.time import RandomActivation # from mesa.time import RandomActivation
from mesa.datacollection import DataCollector from mesa.datacollection import DataCollector
from mesa.batchrunner import BatchRunner from mesa.batchrunner import BatchRunner
@ -12,12 +13,13 @@ import networkx as nx
from soil import NetworkAgent, Environment, serialization from soil import NetworkAgent, Environment, serialization
def compute_gini(model): def compute_gini(model):
agent_wealths = [agent.wealth for agent in model.agents] agent_wealths = [agent.wealth for agent in model.agents]
x = sorted(agent_wealths) x = sorted(agent_wealths)
N = len(list(model.agents)) N = len(list(model.agents))
B = sum( xi * (N-i) for i,xi in enumerate(x) ) / (N*sum(x)) B = sum(xi * (N - i) for i, xi in enumerate(x)) / (N * sum(x))
return (1 + (1/N) - 2*B) return 1 + (1 / N) - 2 * B
class MoneyAgent(MesaAgent): class MoneyAgent(MesaAgent):
@ -32,9 +34,8 @@ class MoneyAgent(MesaAgent):
def move(self): def move(self):
possible_steps = self.model.grid.get_neighborhood( possible_steps = self.model.grid.get_neighborhood(
self.pos, self.pos, moore=True, include_center=False
moore=True, )
include_center=False)
new_position = self.random.choice(possible_steps) new_position = self.random.choice(possible_steps)
self.model.grid.move_agent(self, new_position) self.model.grid.move_agent(self, new_position)
@ -69,6 +70,7 @@ class SocialMoneyAgent(NetworkAgent, MoneyAgent):
other.wealth += 1 other.wealth += 1
self.wealth -= 1 self.wealth -= 1
def graph_generator(n=5): def graph_generator(n=5):
G = nx.Graph() G = nx.Graph()
for ix in range(n): for ix in range(n):
@ -78,16 +80,22 @@ def graph_generator(n=5):
class MoneyEnv(Environment): class MoneyEnv(Environment):
"""A model with some number of agents.""" """A model with some number of agents."""
def __init__(self, width, height, N, generator=graph_generator,
agent_class=SocialMoneyAgent, def __init__(
topology=None, **kwargs): self,
width,
height,
N,
generator=graph_generator,
agent_class=SocialMoneyAgent,
topology=None,
**kwargs
):
generator = serialization.deserialize(generator) generator = serialization.deserialize(generator)
agent_class = serialization.deserialize(agent_class, globs=globals()) agent_class = serialization.deserialize(agent_class, globs=globals())
topology = generator(n=N) topology = generator(n=N)
super().__init__(topology=topology, super().__init__(topology=topology, N=N, **kwargs)
N=N,
**kwargs)
self.grid = MultiGrid(width, height, False) self.grid = MultiGrid(width, height, False)
self.populate_network(agent_class=agent_class) self.populate_network(agent_class=agent_class)
@ -99,26 +107,29 @@ class MoneyEnv(Environment):
self.grid.place_agent(agent, (x, y)) self.grid.place_agent(agent, (x, y))
self.datacollector = DataCollector( self.datacollector = DataCollector(
model_reporters={"Gini": compute_gini}, model_reporters={"Gini": compute_gini}, agent_reporters={"Wealth": "wealth"}
agent_reporters={"Wealth": "wealth"}) )
if __name__ == '__main__': if __name__ == "__main__":
fixed_params = {"generator": nx.complete_graph, fixed_params = {
"width": 10, "generator": nx.complete_graph,
"network_agents": [{"agent_class": SocialMoneyAgent, "width": 10,
'weight': 1}], "network_agents": [{"agent_class": SocialMoneyAgent, "weight": 1}],
"height": 10} "height": 10,
}
variable_params = {"N": range(10, 100, 10)} variable_params = {"N": range(10, 100, 10)}
batch_run = BatchRunner(MoneyEnv, batch_run = BatchRunner(
variable_parameters=variable_params, MoneyEnv,
fixed_parameters=fixed_params, variable_parameters=variable_params,
iterations=5, fixed_parameters=fixed_params,
max_steps=100, iterations=5,
model_reporters={"Gini": compute_gini}) max_steps=100,
model_reporters={"Gini": compute_gini},
)
batch_run.run_all() batch_run.run_all()
run_data = batch_run.get_model_vars_dataframe() run_data = batch_run.get_model_vars_dataframe()

@ -4,24 +4,26 @@ from mesa.time import RandomActivation
from mesa.datacollection import DataCollector from mesa.datacollection import DataCollector
from mesa.batchrunner import BatchRunner from mesa.batchrunner import BatchRunner
def compute_gini(model): def compute_gini(model):
agent_wealths = [agent.wealth for agent in model.schedule.agents] agent_wealths = [agent.wealth for agent in model.schedule.agents]
x = sorted(agent_wealths) x = sorted(agent_wealths)
N = model.num_agents N = model.num_agents
B = sum( xi * (N-i) for i,xi in enumerate(x) ) / (N*sum(x)) B = sum(xi * (N - i) for i, xi in enumerate(x)) / (N * sum(x))
return (1 + (1/N) - 2*B) return 1 + (1 / N) - 2 * B
class MoneyAgent(Agent): class MoneyAgent(Agent):
""" An agent with fixed initial wealth.""" """An agent with fixed initial wealth."""
def __init__(self, unique_id, model): def __init__(self, unique_id, model):
super().__init__(unique_id, model) super().__init__(unique_id, model)
self.wealth = 1 self.wealth = 1
def move(self): def move(self):
possible_steps = self.model.grid.get_neighborhood( possible_steps = self.model.grid.get_neighborhood(
self.pos, self.pos, moore=True, include_center=False
moore=True, )
include_center=False)
new_position = self.random.choice(possible_steps) new_position = self.random.choice(possible_steps)
self.model.grid.move_agent(self, new_position) self.model.grid.move_agent(self, new_position)
@ -37,8 +39,10 @@ class MoneyAgent(Agent):
if self.wealth > 0: if self.wealth > 0:
self.give_money() self.give_money()
class MoneyModel(Model): class MoneyModel(Model):
"""A model with some number of agents.""" """A model with some number of agents."""
def __init__(self, N, width, height): def __init__(self, N, width, height):
self.num_agents = N self.num_agents = N
self.grid = MultiGrid(width, height, True) self.grid = MultiGrid(width, height, True)
@ -55,29 +59,29 @@ class MoneyModel(Model):
self.grid.place_agent(a, (x, y)) self.grid.place_agent(a, (x, y))
self.datacollector = DataCollector( self.datacollector = DataCollector(
model_reporters={"Gini": compute_gini}, model_reporters={"Gini": compute_gini}, agent_reporters={"Wealth": "wealth"}
agent_reporters={"Wealth": "wealth"}) )
def step(self): def step(self):
self.datacollector.collect(self) self.datacollector.collect(self)
self.schedule.step() self.schedule.step()
if __name__ == '__main__': if __name__ == "__main__":
fixed_params = {"width": 10, fixed_params = {"width": 10, "height": 10}
"height": 10}
variable_params = {"N": range(10, 500, 10)} variable_params = {"N": range(10, 500, 10)}
batch_run = BatchRunner(MoneyModel, batch_run = BatchRunner(
variable_params, MoneyModel,
fixed_params, variable_params,
iterations=5, fixed_params,
max_steps=100, iterations=5,
model_reporters={"Gini": compute_gini}) max_steps=100,
model_reporters={"Gini": compute_gini},
)
batch_run.run_all() batch_run.run_all()
run_data = batch_run.get_model_vars_dataframe() run_data = batch_run.get_model_vars_dataframe()
run_data.head() run_data.head()
print(run_data.Gini) print(run_data.Gini)

@ -3,84 +3,83 @@ import logging
class DumbViewer(FSM, NetworkAgent): class DumbViewer(FSM, NetworkAgent):
''' """
A viewer that gets infected via TV (if it has one) and tries to infect A viewer that gets infected via TV (if it has one) and tries to infect
its neighbors once it's infected. its neighbors once it's infected.
''' """
defaults = { defaults = {
'prob_neighbor_spread': 0.5, "prob_neighbor_spread": 0.5,
'prob_tv_spread': 0.1, "prob_tv_spread": 0.1,
} }
@default_state @default_state
@state @state
def neutral(self): def neutral(self):
if self['has_tv']: if self["has_tv"]:
if self.prob(self.model['prob_tv_spread']): if self.prob(self.model["prob_tv_spread"]):
return self.infected return self.infected
@state @state
def infected(self): def infected(self):
for neighbor in self.get_neighboring_agents(state_id=self.neutral.id): for neighbor in self.get_neighboring_agents(state_id=self.neutral.id):
if self.prob(self.model['prob_neighbor_spread']): if self.prob(self.model["prob_neighbor_spread"]):
neighbor.infect() neighbor.infect()
def infect(self): def infect(self):
''' """
This is not a state. It is a function that other agents can use to try to This is not a state. It is a function that other agents can use to try to
infect this agent. DumbViewer always gets infected, but other agents like infect this agent. DumbViewer always gets infected, but other agents like
HerdViewer might not become infected right away HerdViewer might not become infected right away
''' """
self.set_state(self.infected) self.set_state(self.infected)
class HerdViewer(DumbViewer): class HerdViewer(DumbViewer):
''' """
A viewer whose probability of infection depends on the state of its neighbors. A viewer whose probability of infection depends on the state of its neighbors.
''' """
def infect(self): def infect(self):
'''Notice again that this is NOT a state. See DumbViewer.infect for reference''' """Notice again that this is NOT a state. See DumbViewer.infect for reference"""
infected = self.count_neighboring_agents(state_id=self.infected.id) infected = self.count_neighboring_agents(state_id=self.infected.id)
total = self.count_neighboring_agents() total = self.count_neighboring_agents()
prob_infect = self.model['prob_neighbor_spread'] * infected/total prob_infect = self.model["prob_neighbor_spread"] * infected / total
self.debug('prob_infect', prob_infect) self.debug("prob_infect", prob_infect)
if self.prob(prob_infect): if self.prob(prob_infect):
self.set_state(self.infected) self.set_state(self.infected)
class WiseViewer(HerdViewer): class WiseViewer(HerdViewer):
''' """
A viewer that can change its mind. A viewer that can change its mind.
''' """
defaults = { defaults = {
'prob_neighbor_spread': 0.5, "prob_neighbor_spread": 0.5,
'prob_neighbor_cure': 0.25, "prob_neighbor_cure": 0.25,
'prob_tv_spread': 0.1, "prob_tv_spread": 0.1,
} }
@state @state
def cured(self): def cured(self):
prob_cure = self.model['prob_neighbor_cure'] prob_cure = self.model["prob_neighbor_cure"]
for neighbor in self.get_neighboring_agents(state_id=self.infected.id): for neighbor in self.get_neighboring_agents(state_id=self.infected.id):
if self.prob(prob_cure): if self.prob(prob_cure):
try: try:
neighbor.cure() neighbor.cure()
except AttributeError: except AttributeError:
self.debug('Viewer {} cannot be cured'.format(neighbor.id)) self.debug("Viewer {} cannot be cured".format(neighbor.id))
def cure(self): def cure(self):
self.set_state(self.cured.id) self.set_state(self.cured.id)
@state @state
def infected(self): def infected(self):
cured = max(self.count_neighboring_agents(self.cured.id), cured = max(self.count_neighboring_agents(self.cured.id), 1.0)
1.0) infected = max(self.count_neighboring_agents(self.infected.id), 1.0)
infected = max(self.count_neighboring_agents(self.infected.id), prob_cure = self.model["prob_neighbor_cure"] * (cured / infected)
1.0)
prob_cure = self.model['prob_neighbor_cure'] * (cured/infected)
if self.prob(prob_cure): if self.prob(prob_cure):
return self.cured return self.cured
return self.set_state(super().infected) return self.set_state(super().infected)

@ -1,6 +1,6 @@
''' """
Example of a fully programmatic simulation, without definition files. Example of a fully programmatic simulation, without definition files.
''' """
from soil import Simulation, agents from soil import Simulation, agents
from networkx import Graph from networkx import Graph
import logging import logging
@ -14,21 +14,22 @@ def mygenerator():
class MyAgent(agents.FSM): class MyAgent(agents.FSM):
@agents.default_state @agents.default_state
@agents.state @agents.state
def neutral(self): def neutral(self):
self.debug('I am running') self.debug("I am running")
if agents.prob(0.2): if agents.prob(0.2):
self.info('This runs 2/10 times on average') self.info("This runs 2/10 times on average")
s = Simulation(name='Programmatic', s = Simulation(
network_params={'generator': mygenerator}, name="Programmatic",
num_trials=1, network_params={"generator": mygenerator},
max_time=100, num_trials=1,
agent_class=MyAgent, max_time=100,
dry_run=True) agent_class=MyAgent,
dry_run=True,
)
# By default, logging will only print WARNING logs (and above). # By default, logging will only print WARNING logs (and above).

@ -5,7 +5,8 @@ import logging
class CityPubs(Environment): class CityPubs(Environment):
'''Environment with Pubs''' """Environment with Pubs"""
level = logging.INFO level = logging.INFO
def __init__(self, *args, number_of_pubs=3, pub_capacity=10, **kwargs): def __init__(self, *args, number_of_pubs=3, pub_capacity=10, **kwargs):
@ -13,68 +14,70 @@ class CityPubs(Environment):
pubs = {} pubs = {}
for i in range(number_of_pubs): for i in range(number_of_pubs):
newpub = { newpub = {
'name': 'The awesome pub #{}'.format(i), "name": "The awesome pub #{}".format(i),
'open': True, "open": True,
'capacity': pub_capacity, "capacity": pub_capacity,
'occupancy': 0, "occupancy": 0,
} }
pubs[newpub['name']] = newpub pubs[newpub["name"]] = newpub
self['pubs'] = pubs self["pubs"] = pubs
def enter(self, pub_id, *nodes): def enter(self, pub_id, *nodes):
'''Agents will try to enter. The pub checks if it is possible''' """Agents will try to enter. The pub checks if it is possible"""
try: try:
pub = self['pubs'][pub_id] pub = self["pubs"][pub_id]
except KeyError: except KeyError:
raise ValueError('Pub {} is not available'.format(pub_id)) raise ValueError("Pub {} is not available".format(pub_id))
if not pub['open'] or (pub['capacity'] < (len(nodes) + pub['occupancy'])): if not pub["open"] or (pub["capacity"] < (len(nodes) + pub["occupancy"])):
return False return False
pub['occupancy'] += len(nodes) pub["occupancy"] += len(nodes)
for node in nodes: for node in nodes:
node['pub'] = pub_id node["pub"] = pub_id
return True return True
def available_pubs(self): def available_pubs(self):
for pub in self['pubs'].values(): for pub in self["pubs"].values():
if pub['open'] and (pub['occupancy'] < pub['capacity']): if pub["open"] and (pub["occupancy"] < pub["capacity"]):
yield pub['name'] yield pub["name"]
def exit(self, pub_id, *node_ids): def exit(self, pub_id, *node_ids):
'''Agents will notify the pub they want to leave''' """Agents will notify the pub they want to leave"""
try: try:
pub = self['pubs'][pub_id] pub = self["pubs"][pub_id]
except KeyError: except KeyError:
raise ValueError('Pub {} is not available'.format(pub_id)) raise ValueError("Pub {} is not available".format(pub_id))
for node_id in node_ids: for node_id in node_ids:
node = self.get_agent(node_id) node = self.get_agent(node_id)
if pub_id == node['pub']: if pub_id == node["pub"]:
del node['pub'] del node["pub"]
pub['occupancy'] -= 1 pub["occupancy"] -= 1
class Patron(FSM, NetworkAgent): class Patron(FSM, NetworkAgent):
'''Agent that looks for friends to drink with. It will do three things: """Agent that looks for friends to drink with. It will do three things:
1) Look for other patrons to drink with 1) Look for other patrons to drink with
2) Look for a bar where the agent and other agents in the same group can get in. 2) Look for a bar where the agent and other agents in the same group can get in.
3) While in the bar, patrons only drink, until they get drunk and taken home. 3) While in the bar, patrons only drink, until they get drunk and taken home.
''' """
level = logging.DEBUG level = logging.DEBUG
pub = None pub = None
drunk = False drunk = False
pints = 0 pints = 0
max_pints = 3 max_pints = 3
kicked_out = False
@default_state @default_state
@state @state
def looking_for_friends(self): def looking_for_friends(self):
'''Look for friends to drink with''' """Look for friends to drink with"""
self.info('I am looking for friends') self.info("I am looking for friends")
available_friends = list(self.get_agents(drunk=False, available_friends = list(
pub=None, self.get_agents(drunk=False, pub=None, state_id=self.looking_for_friends.id)
state_id=self.looking_for_friends.id)) )
if not available_friends: if not available_friends:
self.info('Life sucks and I\'m alone!') self.info("Life sucks and I'm alone!")
return self.at_home return self.at_home
befriended = self.try_friends(available_friends) befriended = self.try_friends(available_friends)
if befriended: if befriended:
@ -82,91 +85,91 @@ class Patron(FSM, NetworkAgent):
@state @state
def looking_for_pub(self): def looking_for_pub(self):
'''Look for a pub that accepts me and my friends''' """Look for a pub that accepts me and my friends"""
if self['pub'] != None: if self["pub"] != None:
return self.sober_in_pub return self.sober_in_pub
self.debug('I am looking for a pub') self.debug("I am looking for a pub")
group = list(self.get_neighboring_agents()) group = list(self.get_neighboring_agents())
for pub in self.model.available_pubs(): for pub in self.model.available_pubs():
self.debug('We\'re trying to get into {}: total: {}'.format(pub, len(group))) self.debug("We're trying to get into {}: total: {}".format(pub, len(group)))
if self.model.enter(pub, self, *group): if self.model.enter(pub, self, *group):
self.info('We\'re all {} getting in {}!'.format(len(group), pub)) self.info("We're all {} getting in {}!".format(len(group), pub))
return self.sober_in_pub return self.sober_in_pub
@state @state
def sober_in_pub(self): def sober_in_pub(self):
'''Drink up.''' """Drink up."""
self.drink() self.drink()
if self['pints'] > self['max_pints']: if self["pints"] > self["max_pints"]:
return self.drunk_in_pub return self.drunk_in_pub
@state @state
def drunk_in_pub(self): def drunk_in_pub(self):
'''I'm out. Take me home!''' """I'm out. Take me home!"""
self.info('I\'m so drunk. Take me home!') self.info("I'm so drunk. Take me home!")
self['drunk'] = True self["drunk"] = True
pass # out drunk if self.kicked_out:
return self.at_home
pass # out drun
@state @state
def at_home(self): def at_home(self):
'''The end''' """The end"""
others = self.get_agents(state_id=Patron.at_home.id, limit_neighbors=True) others = self.get_agents(state_id=Patron.at_home.id, limit_neighbors=True)
self.debug('I\'m home. Just like {} of my friends'.format(len(others))) self.debug("I'm home. Just like {} of my friends".format(len(others)))
def drink(self): def drink(self):
self['pints'] += 1 self["pints"] += 1
self.debug('Cheers to that') self.debug("Cheers to that")
def kick_out(self): def kick_out(self):
self.set_state(self.at_home) self.kicked_out = True
def befriend(self, other_agent, force=False): def befriend(self, other_agent, force=False):
''' """
Try to become friends with another agent. The chances of Try to become friends with another agent. The chances of
success depend on both agents' openness. success depend on both agents' openness.
''' """
if force or self['openness'] > self.random.random(): if force or self["openness"] > self.random.random():
self.add_edge(self, other_agent) self.add_edge(self, other_agent)
self.info('Made some friend {}'.format(other_agent)) self.info("Made some friend {}".format(other_agent))
return True return True
return False return False
def try_friends(self, others): def try_friends(self, others):
''' Look for random agents around me and try to befriend them''' """Look for random agents around me and try to befriend them"""
befriended = False befriended = False
k = int(10*self['openness']) k = int(10 * self["openness"])
self.random.shuffle(others) self.random.shuffle(others)
for friend in islice(others, k): # random.choice >= 3.7 for friend in islice(others, k): # random.choice >= 3.7
if friend == self: if friend == self:
continue continue
if friend.befriend(self): if friend.befriend(self):
self.befriend(friend, force=True) self.befriend(friend, force=True)
self.debug('Hooray! new friend: {}'.format(friend.id)) self.debug("Hooray! new friend: {}".format(friend.id))
befriended = True befriended = True
else: else:
self.debug('{} does not want to be friends'.format(friend.id)) self.debug("{} does not want to be friends".format(friend.id))
return befriended return befriended
class Police(FSM): class Police(FSM):
'''Simple agent to take drunk people out of pubs.''' """Simple agent to take drunk people out of pubs."""
level = logging.INFO level = logging.INFO
@default_state @default_state
@state @state
def patrol(self): def patrol(self):
drunksters = list(self.get_agents(drunk=True, drunksters = list(self.get_agents(drunk=True, state_id=Patron.drunk_in_pub.id))
state_id=Patron.drunk_in_pub.id))
for drunk in drunksters: for drunk in drunksters:
self.info('Kicking out the trash: {}'.format(drunk.id)) self.info("Kicking out the trash: {}".format(drunk.id))
drunk.kick_out() drunk.kick_out()
else: else:
self.info('No trash to take out. Too bad.') self.info("No trash to take out. Too bad.")
if __name__ == '__main__': if __name__ == "__main__":
from soil import simulation from soil import simulation
simulation.run_from_config('pubcrawl.yml',
dry_run=True, simulation.run_from_config("pubcrawl.yml", dry_run=True, dump=None, parallel=False)
dump=None,
parallel=False)

@ -2,3 +2,13 @@ There are two similar implementations of this simulation.
- `basic`. Using simple primites - `basic`. Using simple primites
- `improved`. Using more advanced features such as the `time` module to avoid unnecessary computations (i.e., skip steps), and generator functions. - `improved`. Using more advanced features such as the `time` module to avoid unnecessary computations (i.e., skip steps), and generator functions.
The examples can be run directly in the terminal, and they accept command like arguments.
For example, to enable the CSV exporter and the Summary exporter, while setting `max_time` to `100` and `seed` to `CustomSeed`:
```
python rabbit_agents.py --set max_time=100 --csv -e summary --set 'seed="CustomSeed"'
```
To learn more about how this functionality works, check out the `soil.easy` function.

@ -1,13 +1,10 @@
from soil import FSM, state, default_state, BaseAgent, NetworkAgent, Environment from soil import FSM, state, default_state, BaseAgent, NetworkAgent, Environment
from soil.time import Delta
from enum import Enum
from collections import Counter from collections import Counter
import logging import logging
import math import math
class RabbitEnv(Environment): class RabbitEnv(Environment):
@property @property
def num_rabbits(self): def num_rabbits(self):
return self.count_agents(agent_class=Rabbit) return self.count_agents(agent_class=Rabbit)
@ -21,7 +18,7 @@ class RabbitEnv(Environment):
return self.count_agents(agent_class=Female) return self.count_agents(agent_class=Female)
class Rabbit(FSM, NetworkAgent): class Rabbit(NetworkAgent, FSM):
sexual_maturity = 30 sexual_maturity = 30
life_expectancy = 300 life_expectancy = 300
@ -29,7 +26,7 @@ class Rabbit(FSM, NetworkAgent):
@default_state @default_state
@state @state
def newborn(self): def newborn(self):
self.info('I am a newborn.') self.info("I am a newborn.")
self.age = 0 self.age = 0
self.offspring = 0 self.offspring = 0
return self.youngling return self.youngling
@ -38,7 +35,7 @@ class Rabbit(FSM, NetworkAgent):
def youngling(self): def youngling(self):
self.age += 1 self.age += 1
if self.age >= self.sexual_maturity: if self.age >= self.sexual_maturity:
self.info(f'I am fertile! My age is {self.age}') self.info(f"I am fertile! My age is {self.age}")
return self.fertile return self.fertile
@state @state
@ -62,17 +59,18 @@ class Male(Rabbit):
return self.dead return self.dead
# Males try to mate # Males try to mate
for f in self.model.agents(agent_class=Female, for f in self.model.agents(
state_id=Female.fertile.id, agent_class=Female, state_id=Female.fertile.id, limit=self.max_females
limit=self.max_females): ):
self.debug('FOUND A FEMALE: ', repr(f), self.mating_prob) self.debug("FOUND A FEMALE: ", repr(f), self.mating_prob)
if self.prob(self['mating_prob']): if self.prob(self["mating_prob"]):
f.impregnate(self) f.impregnate(self)
break # Take a break break # Take a break
class Female(Rabbit): class Female(Rabbit):
gestation = 30 gestation = 10
pregnancy = -1
@state @state
def fertile(self): def fertile(self):
@ -80,69 +78,73 @@ class Female(Rabbit):
self.age += 1 self.age += 1
if self.age > self.life_expectancy: if self.age > self.life_expectancy:
return self.dead return self.dead
if self.pregnancy >= 0:
return self.pregnant
def impregnate(self, male): def impregnate(self, male):
self.info(f'{repr(male)} impregnating female {repr(self)}') self.info(f"impregnated by {repr(male)}")
self.mate = male self.mate = male
self.pregnancy = -1 self.pregnancy = 0
self.set_state(self.pregnant, when=self.now) self.number_of_babies = int(8 + 4 * self.random.random())
self.number_of_babies = int(8+4*self.random.random())
@state @state
def pregnant(self): def pregnant(self):
self.debug('I am pregnant') self.info("I am pregnant")
self.age += 1 self.age += 1
self.pregnancy += 1
if self.prob(self.age / self.life_expectancy): if self.age >= self.life_expectancy:
return self.die() return self.die()
if self.pregnancy >= self.gestation: if self.pregnancy < self.gestation:
self.info('Having {} babies'.format(self.number_of_babies)) self.pregnancy += 1
for i in range(self.number_of_babies): return
state = {}
agent_class = self.random.choice([Male, Female]) self.info("Having {} babies".format(self.number_of_babies))
child = self.model.add_node(agent_class=agent_class, for i in range(self.number_of_babies):
**state) state = {}
child.add_edge(self) agent_class = self.random.choice([Male, Female])
try: child = self.model.add_node(agent_class=agent_class, **state)
child.add_edge(self.mate) child.add_edge(self)
self.model.agents[self.mate].offspring += 1 try:
except ValueError: child.add_edge(self.mate)
self.debug('The father has passed away') self.model.agents[self.mate].offspring += 1
except ValueError:
self.offspring += 1 self.debug("The father has passed away")
self.mate = None
return self.fertile self.offspring += 1
self.mate = None
self.pregnancy = -1
return self.fertile
@state def die(self):
def dead(self): if "pregnancy" in self and self["pregnancy"] > -1:
super().dead() self.info("A mother has died carrying a baby!!")
if 'pregnancy' in self and self['pregnancy'] > -1: return super().die()
self.info('A mother has died carrying a baby!!')
class RandomAccident(BaseAgent): class RandomAccident(BaseAgent):
def step(self): def step(self):
rabbits_alive = self.model.G.number_of_nodes() rabbits_alive = self.model.G.number_of_nodes()
if not rabbits_alive: if not rabbits_alive:
return self.die() return self.die()
prob_death = self.model.get('prob_death', 1e-100)*math.floor(math.log10(max(1, rabbits_alive))) prob_death = self.model.get("prob_death", 1e-100) * math.floor(
self.debug('Killing some rabbits with prob={}!'.format(prob_death)) math.log10(max(1, rabbits_alive))
)
self.debug("Killing some rabbits with prob={}!".format(prob_death))
for i in self.iter_agents(agent_class=Rabbit): for i in self.iter_agents(agent_class=Rabbit):
if i.state_id == i.dead.id: if i.state_id == i.dead.id:
continue continue
if self.prob(prob_death): if self.prob(prob_death):
self.info('I killed a rabbit: {}'.format(i.id)) self.info("I killed a rabbit: {}".format(i.id))
rabbits_alive -= 1 rabbits_alive -= 1
i.set_state(i.dead) i.die()
self.debug('Rabbits alive: {}'.format(rabbits_alive)) self.debug("Rabbits alive: {}".format(rabbits_alive))
if __name__ == '__main__': if __name__ == "__main__":
from soil import easy from soil import easy
sim = easy('rabbits.yml')
sim.run() with easy("rabbits.yml") as sim:
sim.run()

@ -1,130 +1,157 @@
from soil.agents import FSM, state, default_state, BaseAgent, NetworkAgent from soil import FSM, state, default_state, BaseAgent, NetworkAgent, Environment
from soil.time import Delta, When, NEVER from soil.time import Delta
from enum import Enum from enum import Enum
from collections import Counter
import logging import logging
import math import math
class RabbitModel(FSM, NetworkAgent): class RabbitEnv(Environment):
@property
def num_rabbits(self):
return self.count_agents(agent_class=Rabbit)
mating_prob = 0.005 @property
offspring = 0 def num_males(self):
birth = None return self.count_agents(agent_class=Male)
sexual_maturity = 3 @property
life_expectancy = 30 def num_females(self):
return self.count_agents(agent_class=Female)
@default_state
@state
def newborn(self):
self.birth = self.now
self.info(f'I am a newborn.')
self.model['rabbits_alive'] = self.model.get('rabbits_alive', 0) + 1
# Here we can skip the `youngling` state by using a coroutine/generator. class Rabbit(FSM, NetworkAgent):
while self.age < self.sexual_maturity:
interval = self.sexual_maturity - self.age
yield Delta(interval)
self.info(f'I am fertile! My age is {self.age}') sexual_maturity = 30
return self.fertile life_expectancy = 300
birth = None
@property @property
def age(self): def age(self):
if self.birth is None:
return None
return self.now - self.birth return self.now - self.birth
@default_state
@state
def newborn(self):
self.info("I am a newborn.")
self.birth = self.now
self.offspring = 0
return self.youngling, Delta(self.sexual_maturity - self.age)
@state
def youngling(self):
if self.age >= self.sexual_maturity:
self.info(f"I am fertile! My age is {self.age}")
return self.fertile
@state @state
def fertile(self): def fertile(self):
raise Exception("Each subclass should define its fertile state") raise Exception("Each subclass should define its fertile state")
def step(self): @state
super().step() def dead(self):
if self.prob(self.age / self.life_expectancy): self.die()
return self.die()
class Male(RabbitModel):
class Male(Rabbit):
max_females = 5 max_females = 5
mating_prob = 0.001
@state @state
def fertile(self): def fertile(self):
if self.age > self.life_expectancy:
return self.dead
# Males try to mate # Males try to mate
for f in self.model.agents(agent_class=Female, for f in self.model.agents(
state_id=Female.fertile.id, agent_class=Female, state_id=Female.fertile.id, limit=self.max_females
limit=self.max_females): ):
self.debug('Found a female:', repr(f)) self.debug("FOUND A FEMALE: ", repr(f), self.mating_prob)
if self.prob(self['mating_prob']): if self.prob(self["mating_prob"]):
f.impregnate(self) f.impregnate(self)
break # Take a break, don't try to impregnate the rest break # Do not try to impregnate other females
class Female(Rabbit):
class Female(RabbitModel):
due_date = None
age_of_pregnancy = None
gestation = 10 gestation = 10
mate = None conception = None
@state @state
def fertile(self): def fertile(self):
return self.fertile, NEVER # Just wait for a Male
@state
def pregnant(self):
self.info('I am pregnant')
if self.age > self.life_expectancy: if self.age > self.life_expectancy:
return self.dead return self.dead
if self.conception is not None:
return self.pregnant
self.due_date = self.now + self.gestation @property
def pregnancy(self):
number_of_babies = int(8+4*self.random.random()) if self.conception is None:
return None
while self.now < self.due_date: return self.now - self.conception
yield When(self.due_date)
self.info('Having {} babies'.format(number_of_babies)) def impregnate(self, male):
for i in range(number_of_babies): self.info(f"impregnated by {repr(male)}")
agent_class = self.random.choice([Male, Female]) self.mate = male
child = self.model.add_node(agent_class=agent_class, self.conception = self.now
topology=self.topology) self.number_of_babies = int(8 + 4 * self.random.random())
self.model.add_edge(self, child)
self.model.add_edge(self.mate, child)
self.offspring += 1
self.model.agents[self.mate].offspring += 1
self.mate = None
self.due_date = None
return self.fertile
@state @state
def dead(self): def pregnant(self):
super().dead() self.debug("I am pregnant")
if self.due_date is not None:
self.info('A mother has died carrying a baby!!')
def impregnate(self, male): if self.age > self.life_expectancy:
self.info(f'{repr(male)} impregnating female {repr(self)}') self.info("Dying before giving birth")
self.mate = male return self.die()
self.set_state(self.pregnant, when=self.now)
if self.pregnancy >= self.gestation:
self.info("Having {} babies".format(self.number_of_babies))
for i in range(self.number_of_babies):
state = {}
agent_class = self.random.choice([Male, Female])
child = self.model.add_node(agent_class=agent_class, **state)
child.add_edge(self)
if self.mate:
child.add_edge(self.mate)
self.mate.offspring += 1
else:
self.debug("The father has passed away")
self.offspring += 1
self.mate = None
return self.fertile
def die(self):
if self.pregnancy is not None:
self.info("A mother has died carrying a baby!!")
return super().die()
class RandomAccident(BaseAgent): class RandomAccident(BaseAgent):
def step(self):
rabbits_alive = self.model.G.number_of_nodes()
level = logging.INFO if not rabbits_alive:
return self.die()
def step(self): prob_death = self.model.get("prob_death", 1e-100) * math.floor(
rabbits_total = self.model.topology.number_of_nodes() math.log10(max(1, rabbits_alive))
if 'rabbits_alive' not in self.model: )
self.model['rabbits_alive'] = 0 self.debug("Killing some rabbits with prob={}!".format(prob_death))
rabbits_alive = self.model.get('rabbits_alive', rabbits_total) for i in self.iter_agents(agent_class=Rabbit):
prob_death = self.model.get('prob_death', 1e-100)*math.floor(math.log10(max(1, rabbits_alive))) if i.state_id == i.dead.id:
self.debug('Killing some rabbits with prob={}!'.format(prob_death))
for i in self.model.network_agents:
if i.state.id == i.dead.id:
continue continue
if self.prob(prob_death): if self.prob(prob_death):
self.info('I killed a rabbit: {}'.format(i.id)) self.info("I killed a rabbit: {}".format(i.id))
rabbits_alive = self.model['rabbits_alive'] = rabbits_alive -1 rabbits_alive -= 1
i.set_state(i.dead) i.die()
self.debug('Rabbits alive: {}/{}'.format(rabbits_alive, rabbits_total)) self.debug("Rabbits alive: {}".format(rabbits_alive))
if self.model.count_agents(state_id=RabbitModel.dead.id) == self.model.topology.number_of_nodes():
self.die()
if __name__ == "__main__":
from soil import easy
with easy("rabbits.yml") as sim:
sim.run()

@ -7,11 +7,10 @@ description: null
group: null group: null
interval: 1.0 interval: 1.0
max_time: 100 max_time: 100
model_class: soil.environment.Environment model_class: rabbit_agents.RabbitEnv
model_params: model_params:
agents: agents:
topology: true topology: true
agent_class: rabbit_agents.RabbitModel
distribution: distribution:
- agent_class: rabbit_agents.Male - agent_class: rabbit_agents.Male
weight: 1 weight: 1
@ -34,5 +33,10 @@ model_params:
nodes: nodes:
- id: 1 - id: 1
- id: 0 - id: 0
model_reporters:
num_males: 'num_males'
num_females: 'num_females'
num_rabbits: |
py:lambda env: env.num_males + env.num_females
extra: extra:
visualization_params: {} visualization_params: {}

@ -1,42 +1,43 @@
''' """
Example of setting a Example of setting a
Example of a fully programmatic simulation, without definition files. Example of a fully programmatic simulation, without definition files.
''' """
from soil import Simulation, agents from soil import Simulation, agents
from soil.time import Delta from soil.time import Delta
class MyAgent(agents.FSM): class MyAgent(agents.FSM):
''' """
An agent that first does a ping An agent that first does a ping
''' """
defaults = {'pong_counts': 2} defaults = {"pong_counts": 2}
@agents.default_state @agents.default_state
@agents.state @agents.state
def ping(self): def ping(self):
self.info('Ping') self.info("Ping")
return self.pong, Delta(self.random.expovariate(1/16)) return self.pong, Delta(self.random.expovariate(1 / 16))
@agents.state @agents.state
def pong(self): def pong(self):
self.info('Pong') self.info("Pong")
self.pong_counts -= 1 self.pong_counts -= 1
self.info(str(self.pong_counts)) self.info(str(self.pong_counts))
if self.pong_counts < 1: if self.pong_counts < 1:
return self.die() return self.die()
return None, Delta(self.random.expovariate(1/16)) return None, Delta(self.random.expovariate(1 / 16))
s = Simulation(name='Programmatic', s = Simulation(
network_agents=[{'agent_class': MyAgent, 'id': 0}], name="Programmatic",
topology={'nodes': [{'id': 0}], 'links': []}, network_agents=[{"agent_class": MyAgent, "id": 0}],
num_trials=1, topology={"nodes": [{"id": 0}], "links": []},
max_time=100, num_trials=1,
agent_class=MyAgent, max_time=100,
dry_run=True) agent_class=MyAgent,
dry_run=True,
)
envs = s.run() envs = s.run()

@ -20,56 +20,83 @@ class TerroristSpreadModel(FSM, Geo):
def __init__(self, model=None, unique_id=0, state=()): def __init__(self, model=None, unique_id=0, state=()):
super().__init__(model=model, unique_id=unique_id, state=state) super().__init__(model=model, unique_id=unique_id, state=state)
self.information_spread_intensity = model.environment_params['information_spread_intensity'] self.information_spread_intensity = model.environment_params[
self.terrorist_additional_influence = model.environment_params['terrorist_additional_influence'] "information_spread_intensity"
self.prob_interaction = model.environment_params['prob_interaction'] ]
self.terrorist_additional_influence = model.environment_params[
if self['id'] == self.civilian.id: # Civilian "terrorist_additional_influence"
]
self.prob_interaction = model.environment_params["prob_interaction"]
if self["id"] == self.civilian.id: # Civilian
self.mean_belief = self.random.uniform(0.00, 0.5) self.mean_belief = self.random.uniform(0.00, 0.5)
elif self['id'] == self.terrorist.id: # Terrorist elif self["id"] == self.terrorist.id: # Terrorist
self.mean_belief = self.random.uniform(0.8, 1.00) self.mean_belief = self.random.uniform(0.8, 1.00)
elif self['id'] == self.leader.id: # Leader elif self["id"] == self.leader.id: # Leader
self.mean_belief = 1.00 self.mean_belief = 1.00
else: else:
raise Exception('Invalid state id: {}'.format(self['id'])) raise Exception("Invalid state id: {}".format(self["id"]))
if 'min_vulnerability' in model.environment_params:
self.vulnerability = self.random.uniform( model.environment_params['min_vulnerability'], model.environment_params['max_vulnerability'] )
else :
self.vulnerability = self.random.uniform( 0, model.environment_params['max_vulnerability'] )
if "min_vulnerability" in model.environment_params:
self.vulnerability = self.random.uniform(
model.environment_params["min_vulnerability"],
model.environment_params["max_vulnerability"],
)
else:
self.vulnerability = self.random.uniform(
0, model.environment_params["max_vulnerability"]
)
@state @state
def civilian(self): def civilian(self):
neighbours = list(self.get_neighboring_agents(agent_class=TerroristSpreadModel)) neighbours = list(self.get_neighboring_agents(agent_class=TerroristSpreadModel))
if len(neighbours) > 0: if len(neighbours) > 0:
# Only interact with some of the neighbors # Only interact with some of the neighbors
interactions = list(n for n in neighbours if self.random.random() <= self.prob_interaction) interactions = list(
influence = sum( self.degree(i) for i in interactions ) n for n in neighbours if self.random.random() <= self.prob_interaction
mean_belief = sum( i.mean_belief * self.degree(i) / influence for i in interactions ) )
mean_belief = mean_belief * self.information_spread_intensity + self.mean_belief * ( 1 - self.information_spread_intensity ) influence = sum(self.degree(i) for i in interactions)
self.mean_belief = mean_belief * self.vulnerability + self.mean_belief * ( 1 - self.vulnerability ) mean_belief = sum(
i.mean_belief * self.degree(i) / influence for i in interactions
)
mean_belief = (
mean_belief * self.information_spread_intensity
+ self.mean_belief * (1 - self.information_spread_intensity)
)
self.mean_belief = mean_belief * self.vulnerability + self.mean_belief * (
1 - self.vulnerability
)
if self.mean_belief >= 0.8: if self.mean_belief >= 0.8:
return self.terrorist return self.terrorist
@state @state
def leader(self): def leader(self):
self.mean_belief = self.mean_belief ** ( 1 - self.terrorist_additional_influence ) self.mean_belief = self.mean_belief ** (1 - self.terrorist_additional_influence)
for neighbour in self.get_neighboring_agents(state_id=[self.terrorist.id, self.leader.id]): for neighbour in self.get_neighboring_agents(
state_id=[self.terrorist.id, self.leader.id]
):
if self.betweenness(neighbour) > self.betweenness(self): if self.betweenness(neighbour) > self.betweenness(self):
return self.terrorist return self.terrorist
@state @state
def terrorist(self): def terrorist(self):
neighbours = self.get_agents(state_id=[self.terrorist.id, self.leader.id], neighbours = self.get_agents(
agent_class=TerroristSpreadModel, state_id=[self.terrorist.id, self.leader.id],
limit_neighbors=True) agent_class=TerroristSpreadModel,
limit_neighbors=True,
)
if len(neighbours) > 0: if len(neighbours) > 0:
influence = sum( self.degree(n) for n in neighbours ) influence = sum(self.degree(n) for n in neighbours)
mean_belief = sum( n.mean_belief * self.degree(n) / influence for n in neighbours ) mean_belief = sum(
mean_belief = mean_belief * self.vulnerability + self.mean_belief * ( 1 - self.vulnerability ) n.mean_belief * self.degree(n) / influence for n in neighbours
self.mean_belief = self.mean_belief ** ( 1 - self.terrorist_additional_influence ) )
mean_belief = mean_belief * self.vulnerability + self.mean_belief * (
1 - self.vulnerability
)
self.mean_belief = self.mean_belief ** (
1 - self.terrorist_additional_influence
)
# Check if there are any leaders in the group # Check if there are any leaders in the group
leaders = list(filter(lambda x: x.state.id == self.leader.id, neighbours)) leaders = list(filter(lambda x: x.state.id == self.leader.id, neighbours))
@ -82,21 +109,29 @@ class TerroristSpreadModel(FSM, Geo):
return self.leader return self.leader
def ego_search(self, steps=1, center=False, node=None, **kwargs): def ego_search(self, steps=1, center=False, node=None, **kwargs):
'''Get a list of nodes in the ego network of *node* of radius *steps*''' """Get a list of nodes in the ego network of *node* of radius *steps*"""
node = as_node(node if node is not None else self) node = as_node(node if node is not None else self)
G = self.subgraph(**kwargs) G = self.subgraph(**kwargs)
return nx.ego_graph(G, node, center=center, radius=steps).nodes() return nx.ego_graph(G, node, center=center, radius=steps).nodes()
def degree(self, node, force=False): def degree(self, node, force=False):
node = as_node(node) node = as_node(node)
if force or (not hasattr(self.model, '_degree')) or getattr(self.model, '_last_step', 0) < self.now: if (
force
or (not hasattr(self.model, "_degree"))
or getattr(self.model, "_last_step", 0) < self.now
):
self.model._degree = nx.degree_centrality(self.G) self.model._degree = nx.degree_centrality(self.G)
self.model._last_step = self.now self.model._last_step = self.now
return self.model._degree[node] return self.model._degree[node]
def betweenness(self, node, force=False): def betweenness(self, node, force=False):
node = as_node(node) node = as_node(node)
if force or (not hasattr(self.model, '_betweenness')) or getattr(self.model, '_last_step', 0) < self.now: if (
force
or (not hasattr(self.model, "_betweenness"))
or getattr(self.model, "_last_step", 0) < self.now
):
self.model._betweenness = nx.betweenness_centrality(self.G) self.model._betweenness = nx.betweenness_centrality(self.G)
self.model._last_step = self.now self.model._last_step = self.now
return self.model._betweenness[node] return self.model._betweenness[node]
@ -114,17 +149,20 @@ class TrainingAreaModel(FSM, Geo):
def __init__(self, model=None, unique_id=0, state=()): def __init__(self, model=None, unique_id=0, state=()):
super().__init__(model=model, unique_id=unique_id, state=state) super().__init__(model=model, unique_id=unique_id, state=state)
self.training_influence = model.environment_params['training_influence'] self.training_influence = model.environment_params["training_influence"]
if 'min_vulnerability' in model.environment_params: if "min_vulnerability" in model.environment_params:
self.min_vulnerability = model.environment_params['min_vulnerability'] self.min_vulnerability = model.environment_params["min_vulnerability"]
else: self.min_vulnerability = 0 else:
self.min_vulnerability = 0
@default_state @default_state
@state @state
def terrorist(self): def terrorist(self):
for neighbour in self.get_neighboring_agents(agent_class=TerroristSpreadModel): for neighbour in self.get_neighboring_agents(agent_class=TerroristSpreadModel):
if neighbour.vulnerability > self.min_vulnerability: if neighbour.vulnerability > self.min_vulnerability:
neighbour.vulnerability = neighbour.vulnerability ** ( 1 - self.training_influence ) neighbour.vulnerability = neighbour.vulnerability ** (
1 - self.training_influence
)
class HavenModel(FSM, Geo): class HavenModel(FSM, Geo):
@ -141,11 +179,12 @@ class HavenModel(FSM, Geo):
def __init__(self, model=None, unique_id=0, state=()): def __init__(self, model=None, unique_id=0, state=()):
super().__init__(model=model, unique_id=unique_id, state=state) super().__init__(model=model, unique_id=unique_id, state=state)
self.haven_influence = model.environment_params['haven_influence'] self.haven_influence = model.environment_params["haven_influence"]
if 'min_vulnerability' in model.environment_params: if "min_vulnerability" in model.environment_params:
self.min_vulnerability = model.environment_params['min_vulnerability'] self.min_vulnerability = model.environment_params["min_vulnerability"]
else: self.min_vulnerability = 0 else:
self.max_vulnerability = model.environment_params['max_vulnerability'] self.min_vulnerability = 0
self.max_vulnerability = model.environment_params["max_vulnerability"]
def get_occupants(self, **kwargs): def get_occupants(self, **kwargs):
return self.get_neighboring_agents(agent_class=TerroristSpreadModel, **kwargs) return self.get_neighboring_agents(agent_class=TerroristSpreadModel, **kwargs)
@ -158,14 +197,18 @@ class HavenModel(FSM, Geo):
for neighbour in self.get_occupants(): for neighbour in self.get_occupants():
if neighbour.vulnerability > self.min_vulnerability: if neighbour.vulnerability > self.min_vulnerability:
neighbour.vulnerability = neighbour.vulnerability * ( 1 - self.haven_influence ) neighbour.vulnerability = neighbour.vulnerability * (
1 - self.haven_influence
)
return self.civilian return self.civilian
@state @state
def terrorist(self): def terrorist(self):
for neighbour in self.get_occupants(): for neighbour in self.get_occupants():
if neighbour.vulnerability < self.max_vulnerability: if neighbour.vulnerability < self.max_vulnerability:
neighbour.vulnerability = neighbour.vulnerability ** ( 1 - self.haven_influence ) neighbour.vulnerability = neighbour.vulnerability ** (
1 - self.haven_influence
)
return self.terrorist return self.terrorist
@ -184,10 +227,10 @@ class TerroristNetworkModel(TerroristSpreadModel):
def __init__(self, model=None, unique_id=0, state=()): def __init__(self, model=None, unique_id=0, state=()):
super().__init__(model=model, unique_id=unique_id, state=state) super().__init__(model=model, unique_id=unique_id, state=state)
self.vision_range = model.environment_params['vision_range'] self.vision_range = model.environment_params["vision_range"]
self.sphere_influence = model.environment_params['sphere_influence'] self.sphere_influence = model.environment_params["sphere_influence"]
self.weight_social_distance = model.environment_params['weight_social_distance'] self.weight_social_distance = model.environment_params["weight_social_distance"]
self.weight_link_distance = model.environment_params['weight_link_distance'] self.weight_link_distance = model.environment_params["weight_link_distance"]
@state @state
def terrorist(self): def terrorist(self):
@ -201,27 +244,48 @@ class TerroristNetworkModel(TerroristSpreadModel):
def update_relationships(self): def update_relationships(self):
if self.count_neighboring_agents(state_id=self.civilian.id) == 0: if self.count_neighboring_agents(state_id=self.civilian.id) == 0:
close_ups = set(self.geo_search(radius=self.vision_range, agent_class=TerroristNetworkModel)) close_ups = set(
step_neighbours = set(self.ego_search(self.sphere_influence, agent_class=TerroristNetworkModel, center=False)) self.geo_search(
neighbours = set(agent.id for agent in self.get_neighboring_agents(agent_class=TerroristNetworkModel)) radius=self.vision_range, agent_class=TerroristNetworkModel
)
)
step_neighbours = set(
self.ego_search(
self.sphere_influence,
agent_class=TerroristNetworkModel,
center=False,
)
)
neighbours = set(
agent.id
for agent in self.get_neighboring_agents(
agent_class=TerroristNetworkModel
)
)
search = (close_ups | step_neighbours) - neighbours search = (close_ups | step_neighbours) - neighbours
for agent in self.get_agents(search): for agent in self.get_agents(search):
social_distance = 1 / self.shortest_path_length(agent.id) social_distance = 1 / self.shortest_path_length(agent.id)
spatial_proximity = ( 1 - self.get_distance(agent.id) ) spatial_proximity = 1 - self.get_distance(agent.id)
prob_new_interaction = self.weight_social_distance * social_distance + self.weight_link_distance * spatial_proximity prob_new_interaction = (
if agent['id'] == agent.civilian.id and self.random.random() < prob_new_interaction: self.weight_social_distance * social_distance
+ self.weight_link_distance * spatial_proximity
)
if (
agent["id"] == agent.civilian.id
and self.random.random() < prob_new_interaction
):
self.add_edge(agent) self.add_edge(agent)
break break
def get_distance(self, target): def get_distance(self, target):
source_x, source_y = nx.get_node_attributes(self.G, 'pos')[self.id] source_x, source_y = nx.get_node_attributes(self.G, "pos")[self.id]
target_x, target_y = nx.get_node_attributes(self.G, 'pos')[target] target_x, target_y = nx.get_node_attributes(self.G, "pos")[target]
dx = abs( source_x - target_x ) dx = abs(source_x - target_x)
dy = abs( source_y - target_y ) dy = abs(source_y - target_y)
return ( dx ** 2 + dy ** 2 ) ** ( 1 / 2 ) return (dx**2 + dy**2) ** (1 / 2)
def shortest_path_length(self, target): def shortest_path_length(self, target):
try: try:
return nx.shortest_path_length(self.G, self.id, target) return nx.shortest_path_length(self.G, self.id, target)
except nx.NetworkXNoPath: except nx.NetworkXNoPath:
return float('inf') return float("inf")

@ -5,6 +5,7 @@ import sys
import os import os
import logging import logging
import traceback import traceback
from contextlib import contextmanager
from .version import __version__ from .version import __version__
@ -30,6 +31,7 @@ def main(
*, *,
do_run=False, do_run=False,
debug=False, debug=False,
pdb=False,
**kwargs, **kwargs,
): ):
import argparse import argparse
@ -154,6 +156,7 @@ def main(
if args.pdb or debug: if args.pdb or debug:
args.synchronous = True args.synchronous = True
os.environ["SOIL_POSTMORTEM"] = "true"
res = [] res = []
try: try:
@ -214,8 +217,21 @@ def main(
return res return res
def easy(cfg, debug=False, **kwargs): @contextmanager
return main(cfg, **kwargs)[0] def easy(cfg, pdb=False, debug=False, **kwargs):
ex = None
try:
yield main(cfg, **kwargs)[0]
except Exception as e:
if os.environ.get("SOIL_POSTMORTEM"):
from .debugging import post_mortem
print(traceback.format_exc())
post_mortem()
ex = e
finally:
if ex:
raise ex
if __name__ == "__main__": if __name__ == "__main__":

@ -29,10 +29,6 @@ def as_node(agent):
IGNORED_FIELDS = ("model", "logger") IGNORED_FIELDS = ("model", "logger")
class DeadAgent(Exception):
pass
class MetaAgent(ABCMeta): class MetaAgent(ABCMeta):
def __new__(mcls, name, bases, namespace): def __new__(mcls, name, bases, namespace):
defaults = {} defaults = {}
@ -47,9 +43,9 @@ class MetaAgent(ABCMeta):
} }
for attr, func in namespace.items(): for attr, func in namespace.items():
if attr == 'step' and inspect.isgeneratorfunction(func): if attr == "step" and inspect.isgeneratorfunction(func):
orig_func = func orig_func = func
new_nmspc['_MetaAgent__coroutine'] = None new_nmspc["_MetaAgent__coroutine"] = None
@wraps(func) @wraps(func)
def func(self): def func(self):
@ -66,10 +62,10 @@ class MetaAgent(ABCMeta):
func.is_default = False func.is_default = False
new_nmspc[attr] = func new_nmspc[attr] = func
elif ( elif (
isinstance(func, types.FunctionType) isinstance(func, types.FunctionType)
or isinstance(func, property) or isinstance(func, property)
or isinstance(func, classmethod) or isinstance(func, classmethod)
or attr[0] == "_" or attr[0] == "_"
): ):
new_nmspc[attr] = func new_nmspc[attr] = func
elif attr == "defaults": elif attr == "defaults":
@ -198,7 +194,7 @@ class BaseAgent(MesaAgent, MutableMapping, metaclass=MetaAgent):
def step(self): def step(self):
if not self.alive: if not self.alive:
raise DeadAgent(self.unique_id) raise time.DeadAgent(self.unique_id)
return super().step() or time.Delta(self.interval) return super().step() or time.Delta(self.interval)
def log(self, message, *args, level=logging.INFO, **kwargs): def log(self, message, *args, level=logging.INFO, **kwargs):
@ -264,6 +260,10 @@ class NetworkAgent(BaseAgent):
return list(self.iter_agents(limit_neighbors=True, **kwargs)) return list(self.iter_agents(limit_neighbors=True, **kwargs))
def add_edge(self, other): def add_edge(self, other):
assert self.node_id
assert other.node_id
assert self.node_id in self.G.nodes
assert other.node_id in self.G.nodes
self.topology.add_edge(self.node_id, other.node_id) self.topology.add_edge(self.node_id, other.node_id)
@property @property
@ -303,7 +303,9 @@ class NetworkAgent(BaseAgent):
return G return G
def remove_node(self): def remove_node(self):
print(f"Removing node for {self.unique_id}: {self.node_id}")
self.G.remove_node(self.node_id) self.G.remove_node(self.node_id)
self.node_id = None
def add_edge(self, other, edge_attr_dict=None, *edge_attrs): def add_edge(self, other, edge_attr_dict=None, *edge_attrs):
if self.node_id not in self.G.nodes(data=False): if self.node_id not in self.G.nodes(data=False):
@ -322,6 +324,8 @@ class NetworkAgent(BaseAgent):
) )
def die(self, remove=True): def die(self, remove=True):
if not self.alive:
return
if remove: if remove:
self.remove_node() self.remove_node()
return super().die() return super().die()
@ -351,7 +355,7 @@ def state(name=None):
self._coroutine = None self._coroutine = None
next_state = ex.value next_state = ex.value
if next_state is not None: if next_state is not None:
self.set_state(next_state) self._set_state(next_state)
return next_state return next_state
func.id = name or func.__name__ func.id = name or func.__name__
@ -401,8 +405,8 @@ class MetaFSM(MetaAgent):
class FSM(BaseAgent, metaclass=MetaFSM): class FSM(BaseAgent, metaclass=MetaFSM):
def __init__(self, *args, **kwargs): def __init__(self, **kwargs):
super(FSM, self).__init__(*args, **kwargs) super(FSM, self).__init__(**kwargs)
if not hasattr(self, "state_id"): if not hasattr(self, "state_id"):
if not self._default_state: if not self._default_state:
raise ValueError( raise ValueError(
@ -411,7 +415,7 @@ class FSM(BaseAgent, metaclass=MetaFSM):
self.state_id = self._default_state.id self.state_id = self._default_state.id
self._coroutine = None self._coroutine = None
self.set_state(self.state_id) self._set_state(self.state_id)
def step(self): def step(self):
self.debug(f"Agent {self.unique_id} @ state {self.state_id}") self.debug(f"Agent {self.unique_id} @ state {self.state_id}")
@ -434,11 +438,11 @@ class FSM(BaseAgent, metaclass=MetaFSM):
pass pass
if next_state is not None: if next_state is not None:
self.set_state(next_state) self._set_state(next_state)
return when or default_interval return when or default_interval
def set_state(self, state, when=None): def _set_state(self, state, when=None):
if hasattr(state, "id"): if hasattr(state, "id"):
state = state.id state = state.id
if state not in self._states: if state not in self._states:
@ -576,83 +580,6 @@ def _convert_agent_classs(ind, to_string=False, **kwargs):
return deserialize_definition(ind, **kwargs) return deserialize_definition(ind, **kwargs)
# def _agent_from_definition(definition, random, value=-1, unique_id=None):
# """Used in the initialization of agents given an agent distribution."""
# if value < 0:
# value = random.random()
# for d in sorted(definition, key=lambda x: x.get('threshold')):
# threshold = d.get('threshold', (-1, -1))
# # Check if the definition matches by id (first) or by threshold
# if (unique_id is not None and unique_id in d.get('ids', [])) or \
# (value >= threshold[0] and value < threshold[1]):
# state = {}
# if 'state' in d:
# state = deepcopy(d['state'])
# return d['agent_class'], state
# raise Exception('Definition for value {} not found in: {}'.format(value, definition))
# def _definition_to_dict(definition, random, size=None, default_state=None):
# state = default_state or {}
# agents = {}
# remaining = {}
# if size:
# for ix in range(size):
# remaining[ix] = copy(state)
# else:
# remaining = defaultdict(lambda x: copy(state))
# distro = sorted([item for item in definition if 'weight' in item])
# id = 0
# def init_agent(item, id=ix):
# while id in agents:
# id += 1
# agent = remaining[id]
# agent['state'].update(copy(item.get('state', {})))
# agents[agent.unique_id] = agent
# del remaining[id]
# return agent
# for item in definition:
# if 'ids' in item:
# ids = item['ids']
# del item['ids']
# for id in ids:
# agent = init_agent(item, id)
# for item in definition:
# if 'number' in item:
# times = item['number']
# del item['number']
# for times in range(times):
# if size:
# ix = random.choice(remaining.keys())
# agent = init_agent(item, id)
# else:
# agent = init_agent(item)
# if not size:
# return agents
# if len(remaining) < 0:
# raise Exception('Invalid definition. Too many agents to add')
# total_weight = float(sum(s['weight'] for s in distro))
# unit = size / total_weight
# for item in distro:
# times = unit * item['weight']
# del item['weight']
# for times in range(times):
# ix = random.choice(remaining.keys())
# agent = init_agent(item, id)
# return agents
class AgentView(Mapping, Set): class AgentView(Mapping, Set):
"""A lazy-loaded list of agents.""" """A lazy-loaded list of agents."""

@ -31,8 +31,8 @@ class Debug(pdb.Pdb):
def __init__(self, *args, skip_soil=False, **kwargs): def __init__(self, *args, skip_soil=False, **kwargs):
skip = kwargs.get("skip", []) skip = kwargs.get("skip", [])
skip.append("soil") skip.append("soil")
skip.append("contextlib")
if skip_soil: if skip_soil:
skip.append("soil")
skip.append("soil.*") skip.append("soil.*")
skip.append("mesa.*") skip.append("mesa.*")
super(Debug, self).__init__(*args, skip=skip, **kwargs) super(Debug, self).__init__(*args, skip=skip, **kwargs)
@ -181,7 +181,7 @@ def set_trace(frame=None, **kwargs):
debugger.set_trace(frame) debugger.set_trace(frame)
def post_mortem(traceback=None): def post_mortem(traceback=None, **kwargs):
global debugger global debugger
if debugger is None: if debugger is None:
debugger = Debug(**kwargs) debugger = Debug(**kwargs)

@ -142,12 +142,12 @@ class BaseEnvironment(Model):
"The environment has not been scheduled, so it has no sense of time" "The environment has not been scheduled, so it has no sense of time"
) )
def add_agent(self, agent_class, unique_id=None, **kwargs): def add_agent(self, unique_id=None, **kwargs):
a = None
if unique_id is None: if unique_id is None:
unique_id = self.next_id() unique_id = self.next_id()
a = agent_class(model=self, unique_id=unique_id, **args) kwargs["unique_id"] = unique_id
a = self._agent_from_dict(kwargs)
self.schedule.add(a) self.schedule.add(a)
return a return a
@ -169,7 +169,9 @@ class BaseEnvironment(Model):
Advance one step in the simulation, and update the data collection and scheduler appropriately Advance one step in the simulation, and update the data collection and scheduler appropriately
""" """
super().step() super().step()
self.logger.info(f"--- Step: {self.schedule.steps:^5} - Time: {self.now:^5} ---") self.logger.info(
f"--- Step: {self.schedule.steps:^5} - Time: {self.now:^5} ---"
)
self.schedule.step() self.schedule.step()
self.datacollector.collect(self) self.datacollector.collect(self)
@ -236,6 +238,7 @@ class NetworkEnvironment(BaseEnvironment):
node_id = agent.get("node_id", None) node_id = agent.get("node_id", None)
if node_id is None: if node_id is None:
node_id = network.find_unassigned(self.G, random=self.random) node_id = network.find_unassigned(self.G, random=self.random)
self.G.nodes[node_id]["agent"] = None
agent["node_id"] = node_id agent["node_id"] = node_id
agent["unique_id"] = unique_id agent["unique_id"] = unique_id
agent["topology"] = self.G agent["topology"] = self.G
@ -269,18 +272,35 @@ class NetworkEnvironment(BaseEnvironment):
node_id = network.find_unassigned( node_id = network.find_unassigned(
G=self.G, shuffle=True, random=self.random G=self.G, shuffle=True, random=self.random
) )
if node_id is None:
node_id = f"node_for_{unique_id}"
if node_id in G.nodes: if node_id not in self.G.nodes:
self.G.nodes[node_id]["agent"] = None # Reserve
else:
self.G.add_node(node_id) self.G.add_node(node_id)
assert "agent" not in self.G.nodes[node_id]
self.G.nodes[node_id]["agent"] = None # Reserve
a = self.add_agent( a = self.add_agent(
unique_id=unique_id, agent_class=agent_class, node_id=node_id, **kwargs unique_id=unique_id,
agent_class=agent_class,
topology=self.G,
node_id=node_id,
**kwargs,
) )
a["visible"] = True a["visible"] = True
return a return a
def add_agent(self, *args, **kwargs):
a = super().add_agent(*args, **kwargs)
if "node_id" in a:
if a.node_id == 24:
import pdb
pdb.set_trace()
assert self.G.nodes[a.node_id]["agent"] == a
return a
def agent_for_node_id(self, node_id): def agent_for_node_id(self, node_id):
return self.G.nodes[node_id].get("agent") return self.G.nodes[node_id].get("agent")

@ -202,7 +202,12 @@ class summary(Exporter):
for (t, df) in self.get_dfs(env): for (t, df) in self.get_dfs(env):
if not len(df): if not len(df):
continue continue
msg = indent(str(df.describe()), ' ') msg = indent(str(df.describe()), " ")
logger.info(dedent(f''' logger.info(
dedent(
f"""
Dataframe {t}: Dataframe {t}:
''') + msg) """
)
+ msg
)

@ -65,10 +65,8 @@ def find_unassigned(G, shuffle=False, random=random):
random.shuffle(candidates) random.shuffle(candidates)
for next_id, data in candidates: for next_id, data in candidates:
if "agent" not in data: if "agent" not in data:
node_id = next_id return next_id
break return None
return node_id
def dump_gexf(G, f): def dump_gexf(G, f):

@ -226,7 +226,9 @@ Model stats:
) )
model.step() model.step()
if model.schedule.time < until: # Simulation ended (no more steps) before until (i.e., no changes expected) if (
model.schedule.time < until
): # Simulation ended (no more steps) before the expected time
model.schedule.time = until model.schedule.time = until
return model return model

@ -13,6 +13,10 @@ from mesa import Agent as MesaAgent
INFINITY = float("inf") INFINITY = float("inf")
class DeadAgent(Exception):
pass
class When: class When:
def __init__(self, time): def __init__(self, time):
if isinstance(time, When): if isinstance(time, When):
@ -38,23 +42,27 @@ class When:
return self._time > other return self._time > other
return self._time > other.next(self._time) return self._time > other.next(self._time)
def ready(self, time): def ready(self, agent):
return self._time <= time return self._time <= agent.model.schedule.time
class Cond(When): class Cond(When):
def __init__(self, func, delta=1): def __init__(self, func, delta=1):
self._func = func self._func = func
self._delta = delta self._delta = delta
self._checked = False
def next(self, time): def next(self, time):
return time + self._delta if self._checked:
return time + self._delta
return time
def abs(self, time): def abs(self, time):
return self return self
def ready(self, time): def ready(self, agent):
return self._func(time) self._checked = True
return self._func(agent)
def __eq__(self, other): def __eq__(self, other):
return False return False
@ -109,10 +117,12 @@ class TimedActivation(BaseScheduler):
elif not isinstance(when, When): elif not isinstance(when, When):
when = When(when) when = When(when)
if agent.unique_id in self._agents: if agent.unique_id in self._agents:
self._queue.remove((self._next[agent.unique_id], agent))
del self._agents[agent.unique_id] del self._agents[agent.unique_id]
heapify(self._queue) if agent.unique_id in self._next:
self._queue.remove((self._next[agent.unique_id], agent))
heapify(self._queue)
self._next[agent.unique_id] = when
heappush(self._queue, (when, agent)) heappush(self._queue, (when, agent))
super().add(agent) super().add(agent)
@ -139,8 +149,9 @@ class TimedActivation(BaseScheduler):
if when > self.time: if when > self.time:
break break
heappop(self._queue) heappop(self._queue)
if when.ready(self.time): if when.ready(agent):
to_process.append(agent) to_process.append(agent)
self._next.pop(agent.unique_id, None)
continue continue
next_time = min(next_time, when.next(self.time)) next_time = min(next_time, when.next(self.time))
@ -155,13 +166,19 @@ class TimedActivation(BaseScheduler):
for agent in to_process: for agent in to_process:
self.logger.debug(f"Stepping agent {agent}") self.logger.debug(f"Stepping agent {agent}")
returned = ((agent.step() or Delta(1))).abs(self.time) try:
returned = ((agent.step() or Delta(1))).abs(self.time)
except DeadAgent:
if agent.unique_id in self._next:
del self._next[agent.unique_id]
agent.alive = False
continue
if not getattr(agent, "alive", True): if not getattr(agent, "alive", True):
self.remove(agent) self.remove(agent)
continue continue
value = when.next(self.time) value = returned.next(self.time)
if value < self.time: if value < self.time:
raise Exception( raise Exception(
@ -172,6 +189,8 @@ class TimedActivation(BaseScheduler):
self._next[agent.unique_id] = returned self._next[agent.unique_id] = returned
heappush(self._queue, (returned, agent)) heappush(self._queue, (returned, agent))
else:
assert not self._next[agent.unique_id]
self.steps += 1 self.steps += 1
self.logger.debug(f"Updating time step: {self.time} -> {next_time}") self.logger.debug(f"Updating time step: {self.time} -> {next_time}")

@ -24,7 +24,7 @@ class TestMain(TestCase):
'''A dead agent should raise an exception if it is stepped after death''' '''A dead agent should raise an exception if it is stepped after death'''
d = Dead(unique_id=0, model=environment.Environment()) d = Dead(unique_id=0, model=environment.Environment())
d.step() d.step()
with pytest.raises(agents.DeadAgent): with pytest.raises(stime.DeadAgent):
d.step() d.step()

@ -0,0 +1,74 @@
from unittest import TestCase
from soil import time, agents, environment
class TestMain(TestCase):
def test_cond(self):
'''
A condition should match a When if the concition is True
'''
t = time.Cond(lambda t: True)
f = time.Cond(lambda t: False)
for i in range(10):
w = time.When(i)
assert w == t
assert w is not f
def test_cond(self):
'''
Comparing a Cond to a Delta should always return False
'''
c = time.Cond(lambda t: False)
d = time.Delta(1)
assert c is not d
def test_cond_env(self):
'''
'''
times_started = []
times_awakened = []
times = []
done = 0
class CondAgent(agents.BaseAgent):
def step(self):
nonlocal done
times_started.append(self.now)
while True:
yield time.Cond(lambda agent: agent.model.schedule.time >= 10)
times_awakened.append(self.now)
if self.now >= 10:
break
done += 1
env = environment.Environment(agents=[{'agent_class': CondAgent}])
while env.schedule.time < 11:
env.step()
times.append(env.now)
assert env.schedule.time == 11
assert times_started == [0]
assert times_awakened == [10]
assert done == 1
# The first time will produce the Cond.
# Since there are no other agents, time will not advance, but the number
# of steps will.
assert env.schedule.steps == 12
assert len(times) == 12
while env.schedule.time < 12:
env.step()
times.append(env.now)
assert env.schedule.time == 12
assert times_started == [0, 11]
assert times_awakened == [10, 11]
assert done == 2
# Once more to yield the cond, another one to continue
assert env.schedule.steps == 14
assert len(times) == 14
Loading…
Cancel
Save