1
0
mirror of https://github.com/gsi-upm/soil synced 2024-11-24 20:02:28 +00:00
soil/examples/pubcrawl/pubcrawl_sim.py

195 lines
6.2 KiB
Python
Raw Normal View History

2022-09-16 16:13:39 +00:00
from soil.agents import FSM, NetworkAgent, state, default_state
from soil import Environment, Simulation, parameters
2018-12-04 08:54:29 +00:00
from itertools import islice
import networkx as nx
2018-12-04 08:54:29 +00:00
import logging
class CityPubs(Environment):
2022-10-17 18:23:57 +00:00
"""Environment with Pubs"""
2018-12-04 08:54:29 +00:00
level = logging.INFO
number_of_pubs: parameters.Integer = 3
ratio_extroverted: parameters.probability = 0.1
pub_capacity: parameters.Integer = 10
def init(self):
self.pubs = {}
for i in range(self.number_of_pubs):
2018-12-04 08:54:29 +00:00
newpub = {
2022-10-17 18:23:57 +00:00
"name": "The awesome pub #{}".format(i),
"open": True,
"capacity": self.pub_capacity,
2022-10-17 18:23:57 +00:00
"occupancy": 0,
2018-12-04 08:54:29 +00:00
}
self.pubs[newpub["name"]] = newpub
self.add_agent(agent_class=Police)
self.populate_network([Patron.w(openness=0.1), Patron.w(openness=1)],
[self.ratio_extroverted, 1-self.ratio_extroverted])
assert all(["agent" in node and isinstance(node["agent"], Patron) for (_, node) in self.G.nodes(data=True)])
2018-12-04 08:54:29 +00:00
def enter(self, pub_id, *nodes):
2022-10-17 18:23:57 +00:00
"""Agents will try to enter. The pub checks if it is possible"""
2018-12-04 08:54:29 +00:00
try:
2022-10-17 18:23:57 +00:00
pub = self["pubs"][pub_id]
2018-12-04 08:54:29 +00:00
except KeyError:
2022-10-17 18:23:57 +00:00
raise ValueError("Pub {} is not available".format(pub_id))
if not pub["open"] or (pub["capacity"] < (len(nodes) + pub["occupancy"])):
2018-12-04 08:54:29 +00:00
return False
2022-10-17 18:23:57 +00:00
pub["occupancy"] += len(nodes)
2018-12-04 08:54:29 +00:00
for node in nodes:
2022-10-17 18:23:57 +00:00
node["pub"] = pub_id
2018-12-04 08:54:29 +00:00
return True
def available_pubs(self):
2022-10-17 18:23:57 +00:00
for pub in self["pubs"].values():
if pub["open"] and (pub["occupancy"] < pub["capacity"]):
yield pub["name"]
2018-12-04 08:54:29 +00:00
def exit(self, pub_id, *node_ids):
2022-10-17 18:23:57 +00:00
"""Agents will notify the pub they want to leave"""
2018-12-04 08:54:29 +00:00
try:
2022-10-17 18:23:57 +00:00
pub = self["pubs"][pub_id]
2018-12-04 08:54:29 +00:00
except KeyError:
2022-10-17 18:23:57 +00:00
raise ValueError("Pub {} is not available".format(pub_id))
2018-12-04 08:54:29 +00:00
for node_id in node_ids:
node = self.get_agent(node_id)
2022-10-17 18:23:57 +00:00
if pub_id == node["pub"]:
del node["pub"]
pub["occupancy"] -= 1
2018-12-04 08:54:29 +00:00
2022-09-16 16:13:39 +00:00
class Patron(FSM, NetworkAgent):
2022-10-17 18:23:57 +00:00
"""Agent that looks for friends to drink with. It will do three things:
1) Look for other patrons to drink with
2) Look for a bar where the agent and other agents in the same group can get in.
3) While in the bar, patrons only drink, until they get drunk and taken home.
"""
level = logging.DEBUG
2018-12-04 08:54:29 +00:00
2022-10-13 20:43:16 +00:00
pub = None
drunk = False
pints = 0
max_pints = 3
2022-10-17 17:29:39 +00:00
kicked_out = False
2018-12-04 08:54:29 +00:00
@default_state
@state
def looking_for_friends(self):
2022-10-17 18:23:57 +00:00
"""Look for friends to drink with"""
self.info("I am looking for friends")
available_friends = list(
self.get_agents(drunk=False, pub=None, state_id=self.looking_for_friends.id)
)
2018-12-04 08:54:29 +00:00
if not available_friends:
2022-10-17 18:23:57 +00:00
self.info("Life sucks and I'm alone!")
2018-12-04 08:54:29 +00:00
return self.at_home
befriended = self.try_friends(available_friends)
if befriended:
return self.looking_for_pub
@state
def looking_for_pub(self):
2022-10-17 18:23:57 +00:00
"""Look for a pub that accepts me and my friends"""
if self["pub"] != None:
2018-12-04 08:54:29 +00:00
return self.sober_in_pub
2022-10-17 18:23:57 +00:00
self.debug("I am looking for a pub")
group = list(self.get_neighbors())
2022-10-13 20:43:16 +00:00
for pub in self.model.available_pubs():
2022-10-17 18:23:57 +00:00
self.debug("We're trying to get into {}: total: {}".format(pub, len(group)))
2022-10-13 20:43:16 +00:00
if self.model.enter(pub, self, *group):
2022-10-17 18:23:57 +00:00
self.info("We're all {} getting in {}!".format(len(group), pub))
2018-12-04 08:54:29 +00:00
return self.sober_in_pub
@state
def sober_in_pub(self):
2022-10-17 18:23:57 +00:00
"""Drink up."""
2018-12-04 08:54:29 +00:00
self.drink()
2022-10-17 18:23:57 +00:00
if self["pints"] > self["max_pints"]:
2018-12-04 08:54:29 +00:00
return self.drunk_in_pub
@state
def drunk_in_pub(self):
2022-10-17 18:23:57 +00:00
"""I'm out. Take me home!"""
self.info("I'm so drunk. Take me home!")
self["drunk"] = True
2022-10-17 17:29:39 +00:00
if self.kicked_out:
return self.at_home
pass # out drun
2018-12-04 08:54:29 +00:00
@state
def at_home(self):
2022-10-17 18:23:57 +00:00
"""The end"""
others = self.get_agents(state_id=Patron.at_home.id, limit_neighbors=True)
2022-10-17 18:23:57 +00:00
self.debug("I'm home. Just like {} of my friends".format(len(others)))
2018-12-04 08:54:29 +00:00
def drink(self):
2022-10-17 18:23:57 +00:00
self["pints"] += 1
self.debug("Cheers to that")
2018-12-04 08:54:29 +00:00
def kick_out(self):
2022-10-17 17:29:39 +00:00
self.kicked_out = True
2018-12-04 08:54:29 +00:00
def befriend(self, other_agent, force=False):
2022-10-17 18:23:57 +00:00
"""
2018-12-04 08:54:29 +00:00
Try to become friends with another agent. The chances of
success depend on both agents' openness.
2022-10-17 18:23:57 +00:00
"""
if force or self["openness"] > self.random.random():
self.add_edge(self, other_agent)
2022-10-17 18:23:57 +00:00
self.info("Made some friend {}".format(other_agent))
2018-12-04 08:54:29 +00:00
return True
return False
def try_friends(self, others):
2022-10-17 18:23:57 +00:00
"""Look for random agents around me and try to befriend them"""
2018-12-04 08:54:29 +00:00
befriended = False
2022-10-17 18:23:57 +00:00
k = int(10 * self["openness"])
2022-10-06 13:49:10 +00:00
self.random.shuffle(others)
2018-12-04 08:54:29 +00:00
for friend in islice(others, k): # random.choice >= 3.7
if friend == self:
continue
if friend.befriend(self):
self.befriend(friend, force=True)
self.debug("Hooray! new friend: {}".format(friend.unique_id))
2018-12-04 08:54:29 +00:00
befriended = True
else:
self.debug("{} does not want to be friends".format(friend.unique_id))
2018-12-04 08:54:29 +00:00
return befriended
2022-10-13 20:43:16 +00:00
class Police(FSM):
2022-10-17 18:23:57 +00:00
"""Simple agent to take drunk people out of pubs."""
2018-12-04 08:54:29 +00:00
level = logging.INFO
@default_state
@state
def patrol(self):
2022-10-17 18:23:57 +00:00
drunksters = list(self.get_agents(drunk=True, state_id=Patron.drunk_in_pub.id))
2018-12-04 08:54:29 +00:00
for drunk in drunksters:
self.info("Kicking out the trash: {}".format(drunk.unique_id))
2018-12-04 08:54:29 +00:00
drunk.kick_out()
else:
2022-10-17 18:23:57 +00:00
self.info("No trash to take out. Too bad.")
2018-12-04 08:54:29 +00:00
sim = Simulation(
model=CityPubs,
name="pubcrawl",
2023-04-20 15:56:44 +00:00
iterations=3,
max_steps=10,
dump=False,
2023-04-20 15:56:44 +00:00
parameters=dict(
network_generator=nx.empty_graph,
network_params={"n": 30},
model=CityPubs,
altercations=0,
number_of_pubs=3,
)
)
2022-10-17 18:23:57 +00:00
if __name__ == "__main__":
sim.run(parallel=False)