From ee0c4517cb5c28e69d31dc12d0a984292c7ee3e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=2E=20Fernando=20S=C3=A1nchez?= Date: Tue, 16 May 2023 09:05:23 +0200 Subject: [PATCH] Decouple activation and schedulers --- .gitignore | 3 +- examples/rabbits/rabbit_improved_sim.py | 5 +- requirements.txt | 1 - soil/agents/__init__.py | 6 +- soil/time.py | 167 ++++++++++++++++-------- 5 files changed, 115 insertions(+), 67 deletions(-) diff --git a/.gitignore b/.gitignore index ed7be33..876b4e5 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ docs/_build* build/* dist/* prof -backup \ No newline at end of file +backup +*.egg-info diff --git a/examples/rabbits/rabbit_improved_sim.py b/examples/rabbits/rabbit_improved_sim.py index c6eb015..142e25c 100644 --- a/examples/rabbits/rabbit_improved_sim.py +++ b/examples/rabbits/rabbit_improved_sim.py @@ -1,12 +1,9 @@ from soil import Evented, FSM, state, default_state, BaseAgent, NetworkAgent, Environment, parameters, report, TimedOut import math -from soilent import Scheduler - class RabbitsImprovedEnv(Environment): prob_death: parameters.probability = 1e-3 - schedule_class = Scheduler def init(self): a1 = self.add_node(Male) @@ -174,4 +171,4 @@ class RandomAccident(BaseAgent): self.debug("Rabbits alive: {}".format(num_alive)) -RabbitsImprovedEnv.run(max_time=1000, seed="MySeed", iterations=1) \ No newline at end of file +RabbitsImprovedEnv.run(max_time=1000, seed="MySeed", iterations=1) diff --git a/requirements.txt b/requirements.txt index 443778f..91c73b7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,6 @@ pandas>=1 SALib>=1.3 Jinja2 Mesa>=1.2 -pydantic>=1.9 sqlalchemy>=1.4 typing-extensions>=4.4 annotated-types>=0.4 diff --git a/soil/agents/__init__.py b/soil/agents/__init__.py index 0309191..5b83fb8 100644 --- a/soil/agents/__init__.py +++ b/soil/agents/__init__.py @@ -138,10 +138,6 @@ class BaseAgent(MesaAgent, MutableMapping, metaclass=MetaAgent): else: self.debug(f"agent dying") self.alive = False - try: - self.model.schedule.remove(self) - except KeyError: - pass return time.Delay(time.INFINITY) def step(self): @@ -222,4 +218,4 @@ class Agent(FSM, EventedAgent, NetworkAgent): from .BassModel import * from .IndependentCascadeModel import * from .SISaModel import * -from .CounterModel import * \ No newline at end of file +from .CounterModel import * diff --git a/soil/time.py b/soil/time.py index 705fec9..e062919 100644 --- a/soil/time.py +++ b/soil/time.py @@ -8,6 +8,7 @@ import logging from inspect import getsource from numbers import Number from textwrap import dedent +import random as random_std from .utils import logger from mesa import Agent as MesaAgent @@ -30,6 +31,7 @@ class Delay: class When: def __init__(self, when): raise Exception("The use of When is deprecated. Use the `Agent.at` and `Agent.delay` methods instead") + class Delta: def __init__(self, delta): raise Exception("The use of Delay is deprecated. Use the `Agent.at` and `Agent.delay` methods instead") @@ -38,58 +40,57 @@ class DeadAgent(Exception): pass -class PQueueActivation(BaseScheduler): +class Event(object): + def __init__(self, when: float, func, order=1): + self.when = when + self.func = func + self.order = order + + def __repr__(self): + return f'Event @ {self.when} - Func: {self.func}' + + def __lt__(self, other): + return (self.when < other.when) or (self.when == other.when and self.order < other.order) + + +class PQueueSchedule: """ - A scheduler which activates each agent with a delay returned by the agent's step method. + A scheduler which activates each function with a delay returned by the function at each step. If no delay is returned, a default of 1 is used. - - In each activation, each agent will update its 'next_time'. """ - def __init__(self, *args, shuffle=True, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, shuffle=True, seed=None, **kwargs): self._queue = [] self._shuffle = shuffle - self.logger = getattr(self.model, "logger", logger).getChild(f"time_{ self.model }") + self.time = 0 + self.steps = 0 + self.random = random_std.Random(seed) self.next_time = self.time - self.agents_by_type = defaultdict(dict) - def add(self, agent: MesaAgent, when=None): - if when is None: - when = self.time - else: - when = float(when) - agent_class = type(agent) - self.agents_by_type[agent_class][agent.unique_id] = agent - super().add(agent) - self.add_callback(agent.step, when) - - def add_callback(self, callback, when=None, replace=False): + def insert(self, when, callback, replace=False): if when is None: when = self.time else: when = float(when) + order = 1 if self._shuffle: - key = (when, self.model.random.random()) - else: - key = when + order = self.random.random() + event = Event(when, callback, order=order) if replace: - heapreplace(self._queue, (key, callback)) + heapreplace(self._queue, event) else: - heappush(self._queue, (key, callback)) + heappush(self._queue, event) - def remove(self, agent): - del self._agents[agent.unique_id] - del self._agents[type(agent)][agent.unique_id] - for i, (key, callback) in enumerate(self._queue): - if callback == agent.step: + def remove(self, callback): + for i, event in enumerate(self._queue): + if callback == event.func: del self._queue[i] break def step(self) -> None: """ - Executes agents in order, one at a time. After each step, - an agent will signal when it wants to be scheduled next. + Executes events in order, one at a time. After each step, + an event will signal when it wants to be scheduled next. """ if self.time == INFINITY: @@ -100,45 +101,40 @@ class PQueueActivation(BaseScheduler): now = self.time while self._queue: - ((when, _id), agent) = self._queue[0] + event = self._queue[0] + when = event.when if when > now: next_time = when break - when = agent.step() or 1 + when = event.func() or 1 if when == INFINITY: heappop(self._queue) continue + when += now - self.add_callback(agent, when, replace=True) + self.insert(when, event.func, replace=True) self.steps += 1 self.time = next_time if next_time == INFINITY: - self.model.running = False self.time = INFINITY return -class TimedActivation(BaseScheduler): - def __init__(self, *args, shuffle=True, **kwargs): - super().__init__(*args, **kwargs) +class Schedule: + def __init__(self, shuffle=True, seed=None, **kwargs): self._queue = deque() self._shuffle = shuffle - self.logger = getattr(self.model, "logger", logger).getChild(f"time_{ self.model }") + self.time = 0 + self.steps = 0 + self.random = random_std.Random(seed) self.next_time = self.time - self.agents_by_type = defaultdict(dict) - def add(self, agent: MesaAgent, when=None): - self.add_callback(agent.step, when) - agent_class = type(agent) - self.agents_by_type[agent_class][agent.unique_id] = agent - super().add(agent) - def _find_loc(self, when=None): if when is None: when = self.time @@ -156,7 +152,7 @@ class TimedActivation(BaseScheduler): self._queue.insert(pos, (when, lst)) return lst - def add_callback(self, func, when=None, replace=False): + def insert(self, when, func, replace=False): lst = self._find_loc(when) lst.append(func) @@ -164,14 +160,17 @@ class TimedActivation(BaseScheduler): lst = self._find_loc(when) lst.extend(funcs) - def remove(self, agent): - del self._agents[agent.unique_id] - del self.agents_by_type[type(agent)][agent.unique_id] + def remove(self, func): + for bucket in self._queue: + for (ix, e) in enumerate(bucket): + if e == func: + bucket.remove(ix) + return def step(self) -> None: """ - Executes agents in order, one at a time. After each step, - an agent will signal when it wants to be scheduled next. + Executes events in order, one at a time. After each step, + an event will signal when it wants to be scheduled next. """ if not self._queue: return @@ -186,7 +185,7 @@ class TimedActivation(BaseScheduler): bucket = self._queue.popleft()[1] if self._shuffle: - self.model.random.shuffle(bucket) + self.random.shuffle(bucket) next_batch = defaultdict(list) for func in bucket: when = func() or 1 @@ -202,10 +201,68 @@ class TimedActivation(BaseScheduler): if self._queue: self.time = self._queue[0][0] else: - self.model.running = False self.time = INFINITY +class InnerActivation(BaseScheduler): + inner_class = Schedule + + def __init__(self, model, shuffle=True, **kwargs): + self.model = model + self.logger = getattr(self.model, "logger", logger).getChild(f"time_{ self.model }") + self._agents = {} + self.agents_by_type = defaultdict(dict) + self.inner = self.inner_class(shuffle=shuffle, seed=self.model._seed) + + @property + def steps(self): + return self.inner.steps + + @property + def time(self): + return self.inner.time + + def add(self, agent: MesaAgent, when=None): + when = when or self.inner.time + self.inner.insert(when, agent.step) + agent_class = type(agent) + self.agents_by_type[agent_class][agent.unique_id] = agent + super().add(agent) + + def remove(self, agent): + del self._agents[agent.unique_id] + del self.agents_by_type[type(agent)][agent.unique_id] + self.inner.remove(agent.step) + + def step(self) -> None: + """ + Executes agents in order, one at a time. After each step, + an agent will signal when it wants to be scheduled next. + """ + self.inner.step() + + +class BucketTimedActivation(InnerActivation): + inner_class = Schedule + + +class PQueueActivation(InnerActivation): + inner_class = PQueueSchedule + + +# Set the bucket implementation as default +try: + from soilent.soilent import BucketScheduler + + class SoilBucketActivation(InnerActivation): + inner_class = BucketScheduler + + TimedActivation = SoilBucketActivation +except ImportError: + TimedActivation = BucketTimedActivation + pass + + class ShuffledTimedActivation(TimedActivation): def __init__(self, *args, **kwargs): super().__init__(*args, shuffle=True, **kwargs) @@ -214,5 +271,3 @@ class ShuffledTimedActivation(TimedActivation): class OrderedTimedActivation(TimedActivation): def __init__(self, *args, **kwargs): super().__init__(*args, shuffle=False, **kwargs) - -