Black formatting

mesa
J. Fernando Sánchez 2 years ago
parent a2fb25c160
commit 2f5e5d0a74

@ -28,11 +28,12 @@ from mesa.space import MultiGrid
@dataclass @dataclass
class Journey: class Journey:
""" """
This represents a request for a journey. Passengers and drivers exchange this object. This represents a request for a journey. Passengers and drivers exchange this object.
A journey may have a driver assigned or not. If the driver has not been assigned, this A journey may have a driver assigned or not. If the driver has not been assigned, this
object is considered a "request for a journey". object is considered a "request for a journey".
""" """
origin: (int, int) origin: (int, int)
destination: (int, int) destination: (int, int)
tip: float tip: float
@ -54,20 +55,33 @@ class City(EventedEnvironment):
:param int height: Height of the internal grid :param int height: Height of the internal grid
:param int width: Width of the internal grid :param int width: Width of the internal grid
""" """
def __init__(self, *args, n_cars=1, n_passengers=10,
height=100, width=100, agents=None, def __init__(
model_reporters=None, self,
**kwargs): *args,
n_cars=1,
n_passengers=10,
height=100,
width=100,
agents=None,
model_reporters=None,
**kwargs,
):
self.grid = MultiGrid(width=width, height=height, torus=False) self.grid = MultiGrid(width=width, height=height, torus=False)
if agents is None: if agents is None:
agents = [] agents = []
for i in range(n_cars): for i in range(n_cars):
agents.append({'agent_class': Driver}) agents.append({"agent_class": Driver})
for i in range(n_passengers): for i in range(n_passengers):
agents.append({'agent_class': Passenger}) agents.append({"agent_class": Passenger})
model_reporters = model_reporters or {'earnings': 'total_earnings', 'n_passengers': 'number_passengers'} model_reporters = model_reporters or {
print('REPORTERS', model_reporters) "earnings": "total_earnings",
super().__init__(*args, agents=agents, model_reporters=model_reporters, **kwargs) "n_passengers": "number_passengers",
}
print("REPORTERS", model_reporters)
super().__init__(
*args, agents=agents, model_reporters=model_reporters, **kwargs
)
for agent in self.agents: for agent in self.agents:
self.grid.place_agent(agent, (0, 0)) self.grid.place_agent(agent, (0, 0))
self.grid.move_to_empty(agent) self.grid.move_to_empty(agent)
@ -87,13 +101,13 @@ class Driver(Evented, FSM):
earnings = 0 earnings = 0
def on_receive(self, msg, sender): def on_receive(self, msg, sender):
'''This is not a state. It will run (and block) every time check_messages is invoked''' """This is not a state. It will run (and block) every time check_messages is invoked"""
if self.journey is None and isinstance(msg, Journey) and msg.driver is None: if self.journey is None and isinstance(msg, Journey) and msg.driver is None:
msg.driver = self msg.driver = self
self.journey = msg self.journey = msg
def check_passengers(self): def check_passengers(self):
'''If there are no more passengers, stop forever''' """If there are no more passengers, stop forever"""
c = self.count_agents(agent_class=Passenger) c = self.count_agents(agent_class=Passenger)
self.info(f"Passengers left {c}") self.info(f"Passengers left {c}")
if not c: if not c:
@ -102,17 +116,19 @@ class Driver(Evented, FSM):
@default_state @default_state
@state @state
def wandering(self): def wandering(self):
'''Move around the city until a journey is accepted''' """Move around the city until a journey is accepted"""
target = None target = None
self.check_passengers() self.check_passengers()
self.journey = None self.journey = None
while self.journey is None: # No potential journeys detected (see on_receive) while self.journey is None: # No potential journeys detected (see on_receive)
if target is None or not self.move_towards(target): if target is None or not self.move_towards(target):
target = self.random.choice(self.model.grid.get_neighborhood(self.pos, moore=False)) target = self.random.choice(
self.model.grid.get_neighborhood(self.pos, moore=False)
)
self.check_passengers() self.check_passengers()
self.check_messages() # This will call on_receive behind the scenes, and the agent's status will be updated self.check_messages() # This will call on_receive behind the scenes, and the agent's status will be updated
yield Delta(30) # Wait at least 30 seconds before checking again yield Delta(30) # Wait at least 30 seconds before checking again
try: try:
# Re-send the journey to the passenger, to confirm that we have been selected # Re-send the journey to the passenger, to confirm that we have been selected
@ -126,7 +142,7 @@ class Driver(Evented, FSM):
@state @state
def driving(self): def driving(self):
'''The journey has been accepted. Pick them up and take them to their destination''' """The journey has been accepted. Pick them up and take them to their destination"""
while self.move_towards(self.journey.origin): while self.move_towards(self.journey.origin):
yield yield
while self.move_towards(self.journey.destination, with_passenger=True): while self.move_towards(self.journey.destination, with_passenger=True):
@ -136,7 +152,7 @@ class Driver(Evented, FSM):
return self.wandering return self.wandering
def move_towards(self, target, with_passenger=False): def move_towards(self, target, with_passenger=False):
'''Move one cell at a time towards a target''' """Move one cell at a time towards a target"""
self.info(f"Moving { self.pos } -> { target }") self.info(f"Moving { self.pos } -> { target }")
if target[0] == self.pos[0] and target[1] == self.pos[1]: if target[0] == self.pos[0] and target[1] == self.pos[1]:
return False return False
@ -151,30 +167,36 @@ class Driver(Evented, FSM):
break break
self.model.grid.move_agent(self, tuple(next_pos)) self.model.grid.move_agent(self, tuple(next_pos))
if with_passenger: if with_passenger:
self.journey.passenger.pos = self.pos # This could be communicated through messages self.journey.passenger.pos = (
self.pos
) # This could be communicated through messages
return True return True
class Passenger(Evented, FSM): class Passenger(Evented, FSM):
pos = None pos = None
def on_receive(self, msg, sender): def on_receive(self, msg, sender):
'''This is not a state. It will be run synchronously every time `check_messages` is run''' """This is not a state. It will be run synchronously every time `check_messages` is run"""
if isinstance(msg, Journey): if isinstance(msg, Journey):
self.journey = msg self.journey = msg
return msg return msg
@default_state @default_state
@state @state
def asking(self): def asking(self):
destination = (self.random.randint(0, self.model.grid.height), self.random.randint(0, self.model.grid.width)) destination = (
self.random.randint(0, self.model.grid.height),
self.random.randint(0, self.model.grid.width),
)
self.journey = None self.journey = None
journey = Journey(origin=self.pos, journey = Journey(
destination=destination, origin=self.pos,
tip=self.random.randint(10, 100), destination=destination,
passenger=self) tip=self.random.randint(10, 100),
passenger=self,
)
timeout = 60 timeout = 60
expiration = self.now + timeout expiration = self.now + timeout
@ -185,20 +207,27 @@ class Passenger(Evented, FSM):
yield self.received(expiration=expiration) yield self.received(expiration=expiration)
except events.TimedOut: except events.TimedOut:
self.info(f"Passenger at: { self.pos }. Asking for journey.") self.info(f"Passenger at: { self.pos }. Asking for journey.")
self.model.broadcast(journey, ttl=timeout, sender=self, agent_class=Driver) self.model.broadcast(
journey, ttl=timeout, sender=self, agent_class=Driver
)
expiration = self.now + timeout expiration = self.now + timeout
self.check_messages() self.check_messages()
return self.driving_home return self.driving_home
@state @state
def driving_home(self): def driving_home(self):
while self.pos[0] != self.journey.destination[0] or self.pos[1] != self.journey.destination[1]: while (
self.pos[0] != self.journey.destination[0]
or self.pos[1] != self.journey.destination[1]
):
yield self.received(timeout=60) yield self.received(timeout=60)
self.info("Got home safe!") self.info("Got home safe!")
self.die() self.die()
simulation = Simulation(name='RideHailing', model_class=City, model_params={'n_passengers': 2}) simulation = Simulation(
name="RideHailing", model_class=City, model_params={"n_passengers": 2}
)
if __name__ == "__main__": if __name__ == "__main__":
with easy(simulation) as s: with easy(simulation) as s:

Loading…
Cancel
Save