""" This is an example of a simplified city, where there are Passengers and Drivers that can take those passengers from their location to their desired location. An example scenario could play like the following: - Drivers start in the `wandering` state, where they wander around the city until they have been assigned a journey - Passenger(1) tells every driver that it wants to request a Journey. - Each driver receives the request. If Driver(2) is interested in providing the Journey, it asks Passenger(1) to confirm that it accepts Driver(2)'s request - When Passenger(1) accepts the request, two things happen: - Passenger(1) changes its state to `driving_home` - Driver(2) starts moving towards the origin of the Journey - Once Driver(2) reaches the origin, it starts moving itself and Passenger(1) to the destination of the Journey - When Driver(2) reaches the destination (carrying Passenger(1) along): - Driver(2) starts wondering again - Passenger(1) dies, and is removed from the simulation - If there are no more passengers available in the simulation, Drivers die """ from __future__ import annotations from soil import * from soil import events from mesa.space import MultiGrid # More complex scenarios may use more than one type of message between objects. # A common pattern is to use `enum.Enum` to represent state changes in a request. @dataclass class Journey: """ This represents a request for a journey. Passengers and drivers exchange this object. A journey may have a driver assigned or not. If the driver has not been assigned, this object is considered a "request for a journey". """ origin: (int, int) destination: (int, int) tip: float passenger: Passenger driver: Driver = None class City(EventedEnvironment): """ An environment with a grid where drivers and passengers will be placed. The number of drivers and riders is configurable through its parameters: :param str n_cars: The total number of drivers to add :param str n_passengers: The number of passengers in the simulation :param list agents: Specific agents to use in the simulation. It overrides the `n_passengers` and `n_cars` params. :param int height: Height of the internal grid :param int width: Width of the internal grid """ def __init__(self, *args, n_cars=1, n_passengers=10, height=100, width=100, agents=None, model_reporters=None, **kwargs): self.grid = MultiGrid(width=width, height=height, torus=False) if agents is None: agents = [] for i in range(n_cars): agents.append({'agent_class': Driver}) for i in range(n_passengers): agents.append({'agent_class': Passenger}) model_reporters = model_reporters or {'earnings': 'total_earnings', 'n_passengers': 'number_passengers'} print('REPORTERS', model_reporters) super().__init__(*args, agents=agents, model_reporters=model_reporters, **kwargs) for agent in self.agents: self.grid.place_agent(agent, (0, 0)) self.grid.move_to_empty(agent) @property def total_earnings(self): return sum(d.earnings for d in self.agents(agent_class=Driver)) @property def number_passengers(self): return self.count_agents(agent_class=Passenger) class Driver(Evented, FSM): pos = None journey = None earnings = 0 def on_receive(self, msg, sender): '''This is not a state. It will run (and block) every time check_messages is invoked''' if self.journey is None and isinstance(msg, Journey) and msg.driver is None: msg.driver = self self.journey = msg def check_passengers(self): '''If there are no more passengers, stop forever''' c = self.count_agents(agent_class=Passenger) self.info(f"Passengers left {c}") if not c: self.die() @default_state @state def wandering(self): '''Move around the city until a journey is accepted''' target = None self.check_passengers() self.journey = None while self.journey is None: # No potential journeys detected (see on_receive) if target is None or not self.move_towards(target): target = self.random.choice(self.model.grid.get_neighborhood(self.pos, moore=False)) self.check_passengers() self.check_messages() # This will call on_receive behind the scenes, and the agent's status will be updated yield Delta(30) # Wait at least 30 seconds before checking again try: # Re-send the journey to the passenger, to confirm that we have been selected self.journey = yield self.journey.passenger.ask(self.journey, timeout=60) except events.TimedOut: # No journey has been accepted. Try again self.journey = None return return self.driving @state def driving(self): '''The journey has been accepted. Pick them up and take them to their destination''' while self.move_towards(self.journey.origin): yield while self.move_towards(self.journey.destination, with_passenger=True): yield self.earnings += self.journey.tip self.check_passengers() return self.wandering def move_towards(self, target, with_passenger=False): '''Move one cell at a time towards a target''' self.info(f"Moving { self.pos } -> { target }") if target[0] == self.pos[0] and target[1] == self.pos[1]: return False next_pos = [self.pos[0], self.pos[1]] for idx in [0, 1]: if self.pos[idx] < target[idx]: next_pos[idx] += 1 break if self.pos[idx] > target[idx]: next_pos[idx] -= 1 break self.model.grid.move_agent(self, tuple(next_pos)) if with_passenger: self.journey.passenger.pos = self.pos # This could be communicated through messages return True class Passenger(Evented, FSM): pos = None def on_receive(self, msg, sender): '''This is not a state. It will be run synchronously every time `check_messages` is run''' if isinstance(msg, Journey): self.journey = msg return msg @default_state @state def asking(self): destination = (self.random.randint(0, self.model.grid.height), self.random.randint(0, self.model.grid.width)) self.journey = None journey = Journey(origin=self.pos, destination=destination, tip=self.random.randint(10, 100), passenger=self) timeout = 60 expiration = self.now + timeout self.model.broadcast(journey, ttl=timeout, sender=self, agent_class=Driver) while not self.journey: self.info(f"Passenger at: { self.pos }. Checking for responses.") try: yield self.received(expiration=expiration) except events.TimedOut: self.info(f"Passenger at: { self.pos }. Asking for journey.") self.model.broadcast(journey, ttl=timeout, sender=self, agent_class=Driver) expiration = self.now + timeout self.check_messages() return self.driving_home @state def driving_home(self): while self.pos[0] != self.journey.destination[0] or self.pos[1] != self.journey.destination[1]: yield self.received(timeout=60) self.info("Got home safe!") self.die() simulation = Simulation(name='RideHailing', model_class=City, model_params={'n_passengers': 2}) if __name__ == "__main__": with easy(simulation) as s: s.run()