mirror of https://github.com/gsi-upm/soil
Compare commits
6 Commits
2f5e5d0a74
...
d3cee18635
Author | SHA1 | Date |
---|---|---|
J. Fernando Sánchez | d3cee18635 | 2 years ago |
J. Fernando Sánchez | 9a7b62e88e | 2 years ago |
J. Fernando Sánchez | c09e480d37 | 2 years ago |
J. Fernando Sánchez | b2d48cb4df | 2 years ago |
J. Fernando Sánchez | a1262edd2a | 2 years ago |
J. Fernando Sánchez | cbbaf73538 | 2 years ago |
@ -1 +1 @@
|
|||||||
0.30.0rc2
|
0.30.0rc3
|
@ -1,57 +1,77 @@
|
|||||||
from . import BaseAgent
|
from . import BaseAgent
|
||||||
from ..events import Message, Tell, Ask, Reply, TimedOut
|
from ..events import Message, Tell, Ask, TimedOut
|
||||||
from ..time import Cond
|
from ..time import BaseCond
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
|
||||||
|
|
||||||
class Evented(BaseAgent):
|
class ReceivedOrTimeout(BaseCond):
|
||||||
|
def __init__(
|
||||||
|
self, agent, expiration=None, timeout=None, check=True, ignore=False, **kwargs
|
||||||
|
):
|
||||||
|
if expiration is None:
|
||||||
|
if timeout is not None:
|
||||||
|
expiration = agent.now + timeout
|
||||||
|
self.expiration = expiration
|
||||||
|
self.ignore = ignore
|
||||||
|
self.check = check
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
|
def expired(self, time):
|
||||||
|
return self.expiration and self.expiration < time
|
||||||
|
|
||||||
|
def ready(self, agent, time):
|
||||||
|
return len(agent._inbox) or self.expired(time)
|
||||||
|
|
||||||
|
def return_value(self, agent):
|
||||||
|
if not self.ignore and self.expired(agent.now):
|
||||||
|
raise TimedOut("No messages received")
|
||||||
|
if self.check:
|
||||||
|
agent.check_messages()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def schedule_next(self, time, delta, first=False):
|
||||||
|
if self._delta is not None:
|
||||||
|
delta = self._delta
|
||||||
|
return (time + delta, self)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"ReceivedOrTimeout(expires={self.expiration})"
|
||||||
|
|
||||||
|
|
||||||
|
class EventedAgent(BaseAgent):
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self._inbox = deque()
|
self._inbox = deque()
|
||||||
self._received = 0
|
|
||||||
self._processed = 0
|
self._processed = 0
|
||||||
|
|
||||||
|
|
||||||
def on_receive(self, *args, **kwargs):
|
def on_receive(self, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def received(self, expiration=None, timeout=None):
|
def received(self, *args, **kwargs):
|
||||||
current = self._received
|
return ReceivedOrTimeout(self, *args, **kwargs)
|
||||||
if expiration is None:
|
|
||||||
expiration = float('inf') if timeout is None else self.now + timeout
|
|
||||||
|
|
||||||
if expiration < self.now:
|
|
||||||
raise ValueError("Invalid expiration time")
|
|
||||||
|
|
||||||
def ready(agent):
|
|
||||||
return agent._received > current or agent.now >= expiration
|
|
||||||
|
|
||||||
def value(agent):
|
def tell(self, msg, sender=None):
|
||||||
if agent.now > expiration:
|
self._inbox.append(Tell(timestamp=self.now, payload=msg, sender=sender))
|
||||||
raise TimedOut("No message received")
|
|
||||||
|
|
||||||
c = Cond(func=ready, return_func=value)
|
def ask(self, msg, timeout=None, **kwargs):
|
||||||
c._checked = True
|
ask = Ask(timestamp=self.now, payload=msg, sender=self)
|
||||||
return c
|
|
||||||
|
|
||||||
def tell(self, msg, sender):
|
|
||||||
self._received += 1
|
|
||||||
self._inbox.append(Tell(payload=msg, sender=sender))
|
|
||||||
|
|
||||||
def ask(self, msg, timeout=None):
|
|
||||||
self._received += 1
|
|
||||||
ask = Ask(payload=msg)
|
|
||||||
self._inbox.append(ask)
|
self._inbox.append(ask)
|
||||||
expiration = float('inf') if timeout is None else self.now + timeout
|
expiration = float("inf") if timeout is None else self.now + timeout
|
||||||
return ask.replied(expiration=expiration)
|
return ask.replied(expiration=expiration, **kwargs)
|
||||||
|
|
||||||
def check_messages(self):
|
def check_messages(self):
|
||||||
|
changed = False
|
||||||
while self._inbox:
|
while self._inbox:
|
||||||
msg = self._inbox.popleft()
|
msg = self._inbox.popleft()
|
||||||
self._processed += 1
|
self._processed += 1
|
||||||
if msg.expired(self.now):
|
if msg.expired(self.now):
|
||||||
continue
|
continue
|
||||||
|
changed = True
|
||||||
reply = self.on_receive(msg.payload, sender=msg.sender)
|
reply = self.on_receive(msg.payload, sender=msg.sender)
|
||||||
if isinstance(msg, Ask):
|
if isinstance(msg, Ask):
|
||||||
msg.reply = reply
|
msg.reply = reply
|
||||||
|
return changed
|
||||||
|
|
||||||
|
|
||||||
|
Evented = EventedAgent
|
||||||
|
Loading…
Reference in New Issue