mirror of
https://github.com/gsi-upm/soil
synced 2024-11-22 11:12:29 +00:00
feab0ba79e
The examples weren't being properly tested in the last commit. When we fixed that a lot of bugs in the new implementation of environment and agent were found, which accounts for most of these changes. The main difference is the mechanism to load simulations from a configuration file. For that to work, we had to rework our module loading code in `serialization` and add a `source_file` attribute to configurations (and simulations, for that matter).
195 lines
6.2 KiB
Python
195 lines
6.2 KiB
Python
from soil.agents import FSM, NetworkAgent, state, default_state
|
|
from soil import Environment, Simulation, parameters
|
|
from itertools import islice
|
|
import networkx as nx
|
|
import logging
|
|
|
|
|
|
class CityPubs(Environment):
|
|
"""Environment with Pubs"""
|
|
|
|
level = logging.INFO
|
|
number_of_pubs: parameters.Integer = 3
|
|
ratio_extroverted: parameters.probability = 0.1
|
|
pub_capacity: parameters.Integer = 10
|
|
|
|
def init(self):
|
|
self.pubs = {}
|
|
for i in range(self.number_of_pubs):
|
|
newpub = {
|
|
"name": "The awesome pub #{}".format(i),
|
|
"open": True,
|
|
"capacity": self.pub_capacity,
|
|
"occupancy": 0,
|
|
}
|
|
self.pubs[newpub["name"]] = newpub
|
|
self.add_agent(agent_class=Police)
|
|
self.populate_network([Patron.w(openness=0.1), Patron.w(openness=1)],
|
|
[self.ratio_extroverted, 1-self.ratio_extroverted])
|
|
assert all(["agent" in node and isinstance(node["agent"], Patron) for (_, node) in self.G.nodes(data=True)])
|
|
|
|
def enter(self, pub_id, *nodes):
|
|
"""Agents will try to enter. The pub checks if it is possible"""
|
|
try:
|
|
pub = self["pubs"][pub_id]
|
|
except KeyError:
|
|
raise ValueError("Pub {} is not available".format(pub_id))
|
|
if not pub["open"] or (pub["capacity"] < (len(nodes) + pub["occupancy"])):
|
|
return False
|
|
pub["occupancy"] += len(nodes)
|
|
for node in nodes:
|
|
node["pub"] = pub_id
|
|
return True
|
|
|
|
def available_pubs(self):
|
|
for pub in self["pubs"].values():
|
|
if pub["open"] and (pub["occupancy"] < pub["capacity"]):
|
|
yield pub["name"]
|
|
|
|
def exit(self, pub_id, *node_ids):
|
|
"""Agents will notify the pub they want to leave"""
|
|
try:
|
|
pub = self["pubs"][pub_id]
|
|
except KeyError:
|
|
raise ValueError("Pub {} is not available".format(pub_id))
|
|
for node_id in node_ids:
|
|
node = self.get_agent(node_id)
|
|
if pub_id == node["pub"]:
|
|
del node["pub"]
|
|
pub["occupancy"] -= 1
|
|
|
|
|
|
class Patron(FSM, NetworkAgent):
|
|
"""Agent that looks for friends to drink with. It will do three things:
|
|
1) Look for other patrons to drink with
|
|
2) Look for a bar where the agent and other agents in the same group can get in.
|
|
3) While in the bar, patrons only drink, until they get drunk and taken home.
|
|
"""
|
|
|
|
level = logging.DEBUG
|
|
|
|
pub = None
|
|
drunk = False
|
|
pints = 0
|
|
max_pints = 3
|
|
kicked_out = False
|
|
|
|
@default_state
|
|
@state
|
|
def looking_for_friends(self):
|
|
"""Look for friends to drink with"""
|
|
self.info("I am looking for friends")
|
|
available_friends = list(
|
|
self.get_agents(drunk=False, pub=None, state_id=self.looking_for_friends.id)
|
|
)
|
|
if not available_friends:
|
|
self.info("Life sucks and I'm alone!")
|
|
return self.at_home
|
|
befriended = self.try_friends(available_friends)
|
|
if befriended:
|
|
return self.looking_for_pub
|
|
|
|
@state
|
|
def looking_for_pub(self):
|
|
"""Look for a pub that accepts me and my friends"""
|
|
if self["pub"] != None:
|
|
return self.sober_in_pub
|
|
self.debug("I am looking for a pub")
|
|
group = list(self.get_neighbors())
|
|
for pub in self.model.available_pubs():
|
|
self.debug("We're trying to get into {}: total: {}".format(pub, len(group)))
|
|
if self.model.enter(pub, self, *group):
|
|
self.info("We're all {} getting in {}!".format(len(group), pub))
|
|
return self.sober_in_pub
|
|
|
|
@state
|
|
def sober_in_pub(self):
|
|
"""Drink up."""
|
|
self.drink()
|
|
if self["pints"] > self["max_pints"]:
|
|
return self.drunk_in_pub
|
|
|
|
@state
|
|
def drunk_in_pub(self):
|
|
"""I'm out. Take me home!"""
|
|
self.info("I'm so drunk. Take me home!")
|
|
self["drunk"] = True
|
|
if self.kicked_out:
|
|
return self.at_home
|
|
pass # out drun
|
|
|
|
@state
|
|
def at_home(self):
|
|
"""The end"""
|
|
others = self.get_agents(state_id=Patron.at_home.id, limit_neighbors=True)
|
|
self.debug("I'm home. Just like {} of my friends".format(len(others)))
|
|
|
|
def drink(self):
|
|
self["pints"] += 1
|
|
self.debug("Cheers to that")
|
|
|
|
def kick_out(self):
|
|
self.kicked_out = True
|
|
|
|
def befriend(self, other_agent, force=False):
|
|
"""
|
|
Try to become friends with another agent. The chances of
|
|
success depend on both agents' openness.
|
|
"""
|
|
if force or self["openness"] > self.random.random():
|
|
self.add_edge(self, other_agent)
|
|
self.info("Made some friend {}".format(other_agent))
|
|
return True
|
|
return False
|
|
|
|
def try_friends(self, others):
|
|
"""Look for random agents around me and try to befriend them"""
|
|
befriended = False
|
|
k = int(10 * self["openness"])
|
|
self.random.shuffle(others)
|
|
for friend in islice(others, k): # random.choice >= 3.7
|
|
if friend == self:
|
|
continue
|
|
if friend.befriend(self):
|
|
self.befriend(friend, force=True)
|
|
self.debug("Hooray! new friend: {}".format(friend.unique_id))
|
|
befriended = True
|
|
else:
|
|
self.debug("{} does not want to be friends".format(friend.unique_id))
|
|
return befriended
|
|
|
|
|
|
class Police(FSM):
|
|
"""Simple agent to take drunk people out of pubs."""
|
|
|
|
level = logging.INFO
|
|
|
|
@default_state
|
|
@state
|
|
def patrol(self):
|
|
drunksters = list(self.get_agents(drunk=True, state_id=Patron.drunk_in_pub.id))
|
|
for drunk in drunksters:
|
|
self.info("Kicking out the trash: {}".format(drunk.unique_id))
|
|
drunk.kick_out()
|
|
else:
|
|
self.info("No trash to take out. Too bad.")
|
|
|
|
|
|
sim = Simulation(
|
|
model=CityPubs,
|
|
name="pubcrawl",
|
|
num_trials=3,
|
|
max_steps=10,
|
|
dump=False,
|
|
model_params=dict(
|
|
network_generator=nx.empty_graph,
|
|
network_params={"n": 30},
|
|
model=CityPubs,
|
|
altercations=0,
|
|
number_of_pubs=3,
|
|
)
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sim.run(parallel=False) |