Make py3 compatibility explicit

history
J. Fernando Sánchez 7 years ago
parent e8988015e2
commit eafecc9e5e

@ -0,0 +1,3 @@
FROM python:3.5-onbuild
ENTRYPOINT ["python", "-m", "soil"]

@ -1,596 +0,0 @@
from nxsim import BaseNetworkAgent
import numpy as np
import random
import settings
settings.init()
##############################
# Variables initialization #
##############################
def init():
global networkStatus
networkStatus = {} # Dict that will contain the status of every agent in the network
sentimentCorrelationNodeArray=[]
for x in range(0, settings.number_of_nodes):
sentimentCorrelationNodeArray.append({'id':x})
# Initialize agent states. Let's assume everyone is normal.
init_states = [{'id': 0, } for _ in range(settings.number_of_nodes)] # add keys as as necessary, but "id" must always refer to that state category
####################
# Available models #
####################
class BaseBehaviour(BaseNetworkAgent):
def __init__(self, environment=None, agent_id=0, state=()):
super().__init__(environment=environment, agent_id=agent_id, state=state)
self._attrs = {}
@property
def attrs(self):
now = self.env.now
if now not in self._attrs:
self._attrs[now] = {}
return self._attrs[now]
@attrs.setter
def attrs(self, value):
self._attrs[self.env.now] = value
def run(self):
while True:
self.step(self.env.now)
yield self.env.timeout(settings.timeout)
def step(self, now):
networkStatus['agent_%s'% self.id] = self.to_json()
def to_json(self):
final = {}
for stamp, attrs in self._attrs.items():
for a in attrs:
if a not in final:
final[a] = {}
final[a][stamp] = attrs[a]
return final
class ControlModelM2(BaseBehaviour):
#Init infected
init_states[random.randint(0,settings.number_of_nodes-1)] = {'id':1}
init_states[random.randint(0,settings.number_of_nodes-1)] = {'id':1}
# Init beacons
init_states[random.randint(0, settings.number_of_nodes-1)] = {'id': 4}
init_states[random.randint(0, settings.number_of_nodes-1)] = {'id': 4}
def __init__(self, environment=None, agent_id=0, state=()):
super().__init__(environment=environment, agent_id=agent_id, state=state)
self.prob_neutral_making_denier = np.random.normal(settings.prob_neutral_making_denier, settings.standard_variance)
self.prob_infect = np.random.normal(settings.prob_infect, settings.standard_variance)
self.prob_cured_healing_infected = np.random.normal(settings.prob_cured_healing_infected, settings.standard_variance)
self.prob_cured_vaccinate_neutral = np.random.normal(settings.prob_cured_vaccinate_neutral, settings.standard_variance)
self.prob_vaccinated_healing_infected = np.random.normal(settings.prob_vaccinated_healing_infected, settings.standard_variance)
self.prob_vaccinated_vaccinate_neutral = np.random.normal(settings.prob_vaccinated_vaccinate_neutral, settings.standard_variance)
self.prob_generate_anti_rumor = np.random.normal(settings.prob_generate_anti_rumor, settings.standard_variance)
def step(self, now):
if self.state['id'] == 0: #Neutral
self.neutral_behaviour()
elif self.state['id'] == 1: #Infected
self.infected_behaviour()
elif self.state['id'] == 2: #Cured
self.cured_behaviour()
elif self.state['id'] == 3: #Vaccinated
self.vaccinated_behaviour()
elif self.state['id'] == 4: #Beacon-off
self.beacon_off_behaviour()
elif self.state['id'] == 5: #Beacon-on
self.beacon_on_behaviour()
self.attrs['status'] = self.state['id']
super().step(now)
def neutral_behaviour(self):
# Infected
infected_neighbors = self.get_neighboring_agents(state_id=1)
if len(infected_neighbors)>0:
if random.random() < self.prob_neutral_making_denier:
self.state['id'] = 3 # Vaccinated making denier
def infected_behaviour(self):
# Neutral
neutral_neighbors = self.get_neighboring_agents(state_id=0)
for neighbor in neutral_neighbors:
if random.random() < self.prob_infect:
neighbor.state['id'] = 1 # Infected
def cured_behaviour(self):
# Vaccinate
neutral_neighbors = self.get_neighboring_agents(state_id=0)
for neighbor in neutral_neighbors:
if random.random() < self.prob_cured_vaccinate_neutral:
neighbor.state['id'] = 3 # Vaccinated
# Cure
infected_neighbors = self.get_neighboring_agents(state_id=1)
for neighbor in infected_neighbors:
if random.random() < self.prob_cured_healing_infected:
neighbor.state['id'] = 2 # Cured
def vaccinated_behaviour(self):
# Cure
infected_neighbors = self.get_neighboring_agents(state_id=1)
for neighbor in infected_neighbors:
if random.random() < self.prob_cured_healing_infected:
neighbor.state['id'] = 2 # Cured
# Vaccinate
neutral_neighbors = self.get_neighboring_agents(state_id=0)
for neighbor in neutral_neighbors:
if random.random() < self.prob_cured_vaccinate_neutral:
neighbor.state['id'] = 3 # Vaccinated
# Generate anti-rumor
infected_neighbors_2 = self.get_neighboring_agents(state_id=1)
for neighbor in infected_neighbors_2:
if random.random() < self.prob_generate_anti_rumor:
neighbor.state['id'] = 2 # Cured
def beacon_off_behaviour(self):
infected_neighbors = self.get_neighboring_agents(state_id=1)
if len(infected_neighbors) > 0:
self.state['id'] == 5 #Beacon on
def beacon_on_behaviour(self):
# Cure (M2 feature added)
infected_neighbors = self.get_neighboring_agents(state_id=1)
for neighbor in infected_neighbors:
if random.random() < self.prob_generate_anti_rumor:
neighbor.state['id'] = 2 # Cured
neutral_neighbors_infected = neighbor.get_neighboring_agents(state_id=0)
for neighbor in neutral_neighbors_infected:
if random.random() < self.prob_generate_anti_rumor:
neighbor.state['id'] = 3 # Vaccinated
infected_neighbors_infected = neighbor.get_neighboring_agents(state_id=1)
for neighbor in infected_neighbors_infected:
if random.random() < self.prob_generate_anti_rumor:
neighbor.state['id'] = 2 # Cured
# Vaccinate
neutral_neighbors = self.get_neighboring_agents(state_id=0)
for neighbor in neutral_neighbors:
if random.random() < self.prob_cured_vaccinate_neutral:
neighbor.state['id'] = 3 # Vaccinated
class SpreadModelM2(BaseBehaviour):
init_states[random.randint(0,settings.number_of_nodes)] = {'id':1}
init_states[random.randint(0,settings.number_of_nodes)] = {'id':1}
def __init__(self, environment=None, agent_id=0, state=()):
super().__init__(environment=environment, agent_id=agent_id, state=state)
self.prob_neutral_making_denier = np.random.normal(settings.prob_neutral_making_denier, settings.standard_variance)
self.prob_infect = np.random.normal(settings.prob_infect, settings.standard_variance)
self.prob_cured_healing_infected = np.random.normal(settings.prob_cured_healing_infected, settings.standard_variance)
self.prob_cured_vaccinate_neutral = np.random.normal(settings.prob_cured_vaccinate_neutral, settings.standard_variance)
self.prob_vaccinated_healing_infected = np.random.normal(settings.prob_vaccinated_healing_infected, settings.standard_variance)
self.prob_vaccinated_vaccinate_neutral = np.random.normal(settings.prob_vaccinated_vaccinate_neutral, settings.standard_variance)
self.prob_generate_anti_rumor = np.random.normal(settings.prob_generate_anti_rumor, settings.standard_variance)
def step(self, now):
if self.state['id'] == 0: #Neutral
self.neutral_behaviour()
elif self.state['id'] == 1: #Infected
self.infected_behaviour()
elif self.state['id'] == 2: #Cured
self.cured_behaviour()
elif self.state['id'] == 3: #Vaccinated
self.vaccinated_behaviour()
self.attrs['status'] = self.state['id']
super().step(now)
def neutral_behaviour(self):
# Infected
infected_neighbors = self.get_neighboring_agents(state_id=1)
if len(infected_neighbors)>0:
if random.random() < self.prob_neutral_making_denier:
self.state['id'] = 3 # Vaccinated making denier
def infected_behaviour(self):
# Neutral
neutral_neighbors = self.get_neighboring_agents(state_id=0)
for neighbor in neutral_neighbors:
if random.random() < self.prob_infect:
neighbor.state['id'] = 1 # Infected
def cured_behaviour(self):
# Vaccinate
neutral_neighbors = self.get_neighboring_agents(state_id=0)
for neighbor in neutral_neighbors:
if random.random() < self.prob_cured_vaccinate_neutral:
neighbor.state['id'] = 3 # Vaccinated
# Cure
infected_neighbors = self.get_neighboring_agents(state_id=1)
for neighbor in infected_neighbors:
if random.random() < self.prob_cured_healing_infected:
neighbor.state['id'] = 2 # Cured
def vaccinated_behaviour(self):
# Cure
infected_neighbors = self.get_neighboring_agents(state_id=1)
for neighbor in infected_neighbors:
if random.random() < self.prob_cured_healing_infected:
neighbor.state['id'] = 2 # Cured
# Vaccinate
neutral_neighbors = self.get_neighboring_agents(state_id=0)
for neighbor in neutral_neighbors:
if random.random() < self.prob_cured_vaccinate_neutral:
neighbor.state['id'] = 3 # Vaccinated
# Generate anti-rumor
infected_neighbors_2 = self.get_neighboring_agents(state_id=1)
for neighbor in infected_neighbors_2:
if random.random() < self.prob_generate_anti_rumor:
neighbor.state['id'] = 2 # Cured
class SISaModel(BaseBehaviour):
def __init__(self, environment=None, agent_id=0, state=()):
super().__init__(environment=environment, agent_id=agent_id, state=state)
self.neutral_discontent_spon_prob = np.random.normal(settings.neutral_discontent_spon_prob, settings.standard_variance)
self.neutral_discontent_infected_prob = np.random.normal(settings.neutral_discontent_infected_prob,settings.standard_variance)
self.neutral_content_spon_prob = np.random.normal(settings.neutral_content_spon_prob,settings.standard_variance)
self.neutral_content_infected_prob = np.random.normal(settings.neutral_content_infected_prob,settings.standard_variance)
self.discontent_neutral = np.random.normal(settings.discontent_neutral,settings.standard_variance)
self.discontent_content = np.random.normal(settings.discontent_content,settings.variance_d_c)
self.content_discontent = np.random.normal(settings.content_discontent,settings.variance_c_d)
self.content_neutral = np.random.normal(settings.content_neutral,settings.standard_variance)
def step(self, now):
if self.state['id'] == 0:
self.neutral_behaviour()
if self.state['id'] == 1:
self.discontent_behaviour()
if self.state['id'] == 2:
self.content_behaviour()
self.attrs['status'] = self.state['id']
super().step(now)
def neutral_behaviour(self):
#Spontaneus effects
if random.random() < self.neutral_discontent_spon_prob:
self.state['id'] = 1
if random.random() < self.neutral_content_spon_prob:
self.state['id'] = 2
#Infected
discontent_neighbors = self.get_neighboring_agents(state_id=1)
if random.random() < len(discontent_neighbors)*self.neutral_discontent_infected_prob:
self.state['id'] = 1
content_neighbors = self.get_neighboring_agents(state_id=2)
if random.random() < len(content_neighbors)*self.neutral_content_infected_prob:
self.state['id'] = 2
def discontent_behaviour(self):
#Healing
if random.random() < self.discontent_neutral:
self.state['id'] = 0
#Superinfected
content_neighbors = self.get_neighboring_agents(state_id=2)
if random.random() < len(content_neighbors)*self.discontent_content:
self.state['id'] = 2
def content_behaviour(self):
#Healing
if random.random() < self.content_neutral:
self.state['id'] = 0
#Superinfected
discontent_neighbors = self.get_neighboring_agents(state_id=1)
if random.random() < len(discontent_neighbors)*self.content_discontent:
self.state['id'] = 1
class BigMarketModel(BaseBehaviour):
def __init__(self, environment=None, agent_id=0, state=()):
super().__init__(environment=environment, agent_id=agent_id, state=state)
self.enterprises = settings.enterprises
self.type = ""
self.number_of_enterprises = len(settings.enterprises)
if self.id < self.number_of_enterprises: #Enterprises
self.state['id']=self.id
self.type="Enterprise"
self.tweet_probability = settings.tweet_probability_enterprises[self.id]
else: #normal users
self.state['id']=self.number_of_enterprises
self.type="User"
self.tweet_probability = settings.tweet_probability_users
self.tweet_relevant_probability = settings.tweet_relevant_probability
self.tweet_probability_about = settings.tweet_probability_about #List
self.sentiment_about = settings.sentiment_about #List
def step(self, now):
if(self.id < self.number_of_enterprises): # Ennterprise
self.enterpriseBehaviour()
else: # Usuario
self.userBehaviour()
for i in range(self.number_of_enterprises): # So that it never is set to 0 if there are not changes (logs)
self.attrs['sentiment_enterprise_%s'% self.enterprises[i]] = self.sentiment_about[i]
super().step(now)
def enterpriseBehaviour(self):
if random.random()< self.tweet_probability: #Tweets
aware_neighbors = self.get_neighboring_agents(state_id=self.number_of_enterprises) #Nodes neighbour users
for x in aware_neighbors:
if random.uniform(0,10) < 5:
x.sentiment_about[self.id] += 0.1 #Increments for enterprise
else:
x.sentiment_about[self.id] -= 0.1 #Decrements for enterprise
# Establecemos limites
if x.sentiment_about[self.id] > 1:
x.sentiment_about[self.id] = 1
if x.sentiment_about[self.id]< -1:
x.sentiment_about[self.id] = -1
x.attrs['sentiment_enterprise_%s'% self.enterprises[self.id]] = x.sentiment_about[self.id]
def userBehaviour(self):
if random.random() < self.tweet_probability: #Tweets
if random.random() < self.tweet_relevant_probability: #Tweets something relevant
#Tweet probability per enterprise
for i in range(self.number_of_enterprises):
random_num = random.random()
if random_num < self.tweet_probability_about[i]:
#The condition is fulfilled, sentiments are evaluated towards that enterprise
if self.sentiment_about[i] < 0:
#NEGATIVO
self.userTweets("negative",i)
elif self.sentiment_about[i] == 0:
#NEUTRO
pass
else:
#POSITIVO
self.userTweets("positive",i)
def userTweets(self,sentiment,enterprise):
aware_neighbors = self.get_neighboring_agents(state_id=self.number_of_enterprises) #Nodes neighbours users
for x in aware_neighbors:
if sentiment == "positive":
x.sentiment_about[enterprise] +=0.003
elif sentiment == "negative":
x.sentiment_about[enterprise] -=0.003
else:
pass
# Establecemos limites
if x.sentiment_about[enterprise] > 1:
x.sentiment_about[enterprise] = 1
if x.sentiment_about[enterprise] < -1:
x.sentiment_about[enterprise] = -1
x.attrs['sentiment_enterprise_%s'% self.enterprises[enterprise]] = x.sentiment_about[enterprise]
class SentimentCorrelationModel(BaseBehaviour):
def __init__(self, environment=None, agent_id=0, state=()):
super().__init__(environment=environment, agent_id=agent_id, state=state)
self.outside_effects_prob = settings.outside_effects_prob
self.anger_prob = settings.anger_prob
self.joy_prob = settings.joy_prob
self.sadness_prob = settings.sadness_prob
self.disgust_prob = settings.disgust_prob
self.time_awareness=[]
for i in range(4): #In this model we have 4 sentiments
self.time_awareness.append(0) #0-> Anger, 1-> joy, 2->sadness, 3 -> disgust
sentimentCorrelationNodeArray[self.id][self.env.now]=0
def step(self, now):
self.behaviour()
super().step(now)
def behaviour(self):
angry_neighbors_1_time_step=[]
joyful_neighbors_1_time_step=[]
sad_neighbors_1_time_step=[]
disgusted_neighbors_1_time_step=[]
angry_neighbors = self.get_neighboring_agents(state_id=1)
for x in angry_neighbors:
if x.time_awareness[0] > (self.env.now-500):
angry_neighbors_1_time_step.append(x)
num_neighbors_angry = len(angry_neighbors_1_time_step)
joyful_neighbors = self.get_neighboring_agents(state_id=2)
for x in joyful_neighbors:
if x.time_awareness[1] > (self.env.now-500):
joyful_neighbors_1_time_step.append(x)
num_neighbors_joyful = len(joyful_neighbors_1_time_step)
sad_neighbors = self.get_neighboring_agents(state_id=3)
for x in sad_neighbors:
if x.time_awareness[2] > (self.env.now-500):
sad_neighbors_1_time_step.append(x)
num_neighbors_sad = len(sad_neighbors_1_time_step)
disgusted_neighbors = self.get_neighboring_agents(state_id=4)
for x in disgusted_neighbors:
if x.time_awareness[3] > (self.env.now-500):
disgusted_neighbors_1_time_step.append(x)
num_neighbors_disgusted = len(disgusted_neighbors_1_time_step)
anger_prob= settings.anger_prob+(len(angry_neighbors_1_time_step)*settings.anger_prob)
joy_prob= settings.joy_prob+(len(joyful_neighbors_1_time_step)*settings.joy_prob)
sadness_prob = settings.sadness_prob+(len(sad_neighbors_1_time_step)*settings.sadness_prob)
disgust_prob = settings.disgust_prob+(len(disgusted_neighbors_1_time_step)*settings.disgust_prob)
outside_effects_prob= settings.outside_effects_prob
num = random.random()
if(num<outside_effects_prob):
self.state['id'] = random.randint(1,4)
sentimentCorrelationNodeArray[self.id][self.env.now]=self.state['id'] #It is stored when it has been infected for the dynamic network
self.time_awareness[self.state['id']-1] = self.env.now
self.attrs['sentiment'] = self.state['id']
if(num<anger_prob):
self.state['id'] = 1
sentimentCorrelationNodeArray[self.id][self.env.now]=1
self.time_awareness[self.state['id']-1] = self.env.now
elif (num<joy_prob+anger_prob and num>anger_prob):
self.state['id'] = 2
sentimentCorrelationNodeArray[self.id][self.env.now]=2
self.time_awareness[self.state['id']-1] = self.env.now
elif (num<sadness_prob+anger_prob+joy_prob and num>joy_prob+anger_prob):
self.state['id'] = 3
sentimentCorrelationNodeArray[self.id][self.env.now]=3
self.time_awareness[self.state['id']-1] = self.env.now
elif (num<disgust_prob+sadness_prob+anger_prob+joy_prob and num>sadness_prob+anger_prob+joy_prob):
self.state['id'] = 4
sentimentCorrelationNodeArray[self.id][self.env.now]=4
self.time_awareness[self.state['id']-1] = self.env.now
self.attrs['sentiment'] = self.state['id']
class BassModel(BaseBehaviour):
def __init__(self, environment=None, agent_id=0, state=()):
super().__init__(environment=environment, agent_id=agent_id, state=state)
self.innovation_prob = settings.innovation_prob
self.imitation_prob = settings.imitation_prob
sentimentCorrelationNodeArray[self.id][self.env.now]=0
def step(self, now):
self.behaviour()
super().step(now)
def behaviour(self):
#Outside effects
if random.random() < settings.innovation_prob:
if self.state['id'] == 0:
self.state['id'] = 1
sentimentCorrelationNodeArray[self.id][self.env.now]=1
else:
pass
self.attrs['status'] = self.state['id']
return
#Imitation effects
if self.state['id'] == 0:
aware_neighbors = self.get_neighboring_agents(state_id=1)
num_neighbors_aware = len(aware_neighbors)
if random.random() < (settings.imitation_prob*num_neighbors_aware):
self.state['id'] = 1
sentimentCorrelationNodeArray[self.id][self.env.now]=1
else:
pass
self.attrs['status'] = self.state['id']
class IndependentCascadeModel(BaseBehaviour):
def __init__(self, environment=None, agent_id=0, state=()):
super().__init__(environment=environment, agent_id=agent_id, state=state)
self.innovation_prob = settings.innovation_prob
self.imitation_prob = settings.imitation_prob
self.time_awareness = 0
sentimentCorrelationNodeArray[self.id][self.env.now]=0
def step(self,now):
self.behaviour()
super().step(now)
def behaviour(self):
aware_neighbors_1_time_step=[]
#Outside effects
if random.random() < settings.innovation_prob:
if self.state['id'] == 0:
self.state['id'] = 1
sentimentCorrelationNodeArray[self.id][self.env.now]=1
self.time_awareness = self.env.now #To know when they have been infected
else:
pass
self.attrs['status'] = self.state['id']
return
#Imitation effects
if self.state['id'] == 0:
aware_neighbors = self.get_neighboring_agents(state_id=1)
for x in aware_neighbors:
if x.time_awareness == (self.env.now-1):
aware_neighbors_1_time_step.append(x)
num_neighbors_aware = len(aware_neighbors_1_time_step)
if random.random() < (settings.imitation_prob*num_neighbors_aware):
self.state['id'] = 1
sentimentCorrelationNodeArray[self.id][self.env.now]=1
else:
pass
self.attrs['status'] = self.state['id']
return

@ -28,7 +28,16 @@ setup(
download_url='https://github.com/gsi-upm/soil/archive/{}.tar.gz'.format(
__version__),
keywords=['agent', 'social', 'simulator'],
classifiers=[],
classifiers=[
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Intended Audience :: End Users/Desktop',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft :: Windows',
'Operating System :: POSIX',
'Programming Language :: Python :: 3'],
install_requires=install_reqs,
tests_require=test_reqs,
setup_requires=['pytest-runner', ],

@ -6,7 +6,7 @@ network_params:
generator: barabasi_albert_graph
n: 100
m: 2
agent_distribution:
network_agents:
- agent_type: ControlModelM2
weight: 0.1
state:
@ -30,11 +30,11 @@ max_time: 50
num_trials: 2
network_params:
generator: erdos_renyi_graph
n: 10000
n: 1000
p: 0.05
#other_agents:
# - agent_type: DrawingAgent
agent_distribution:
network_agents:
- agent_type: SISaModel
weight: 1
state:

@ -2,7 +2,7 @@ import importlib
import sys
import os
__version__ = "0.9.4"
__version__ = "0.9.6"
try:
basestring

@ -0,0 +1,32 @@
import importlib
import sys
import argparse
from . import simulation
def main():
parser = argparse.ArgumentParser(description='Run a SOIL simulation')
parser.add_argument('file', type=str,
nargs="?",
default='simulation.yml',
help='python module containing the simulation configuration.')
parser.add_argument('--module', '-m', type=str,
help='file containing the code of any custom agents.')
parser.add_argument('--dry-run', '--dry', action='store_true',
help='Do not store the results of the simulation.')
parser.add_argument('--output', '-o', type=str,
help='folder to write results to. It defaults to the current directory.')
args = parser.parse_args()
if args.module:
sys.path.append(os.getcwd())
importlib.import_module(args.module)
print('Loading config file: {}'.format(args.file, args.output))
simulation.run_from_config(args.file, dump=not args.dry_run, results_dir=args.output)
if __name__ == '__main__':
main()

@ -1,123 +0,0 @@
import nxsim
from collections import OrderedDict
from copy import deepcopy
import json
from functools import wraps
class BaseAgent(nxsim.BaseAgent):
"""
A special simpy BaseAgent that keeps track of its state history.
"""
def __init__(self, *args, **kwargs):
self._history = OrderedDict()
self._neighbors = None
super().__init__(*args, **kwargs)
self._history[None] = deepcopy(self.state)
@property
def now(self):
try:
return self.env.now
except AttributeError:
# No environment
return None
def run(self):
while True:
res = self.step()
self._history[self.env.now] = deepcopy(self.state)
yield res or self.env.timeout(self.env.interval)
def step(self):
pass
def to_json(self):
return json.dumps(self._history)
class NetworkAgent(BaseAgent, nxsim.BaseNetworkAgent):
def count_agents(self, state_id=None, limit_neighbors=False):
if limit_neighbors:
agents = self.global_topology.neighbors(self.id)
else:
agents = self.global_topology.nodes()
count = 0
for agent in agents:
if state_id and state_id != self.global_topology.node[agent]['agent'].state['id']:
continue
count += 1
return count
def count_neighboring_agents(self, state_id=None):
return self.count_agents(state_id, limit_neighbors=True)
def state(func):
@wraps(func)
def func_wrapper(self):
when = None
next_state = func(self)
try:
next_state, when = next_state
except TypeError:
pass
if next_state:
try:
self.state['id'] = next_state.id
except AttributeError:
raise NotImplemented('State id %s is not valid.' % next_state)
return when
func_wrapper.id = func.__name__
func_wrapper.is_default = False
return func_wrapper
def default_state(func):
func.is_default = True
return func
class MetaFSM(type):
def __init__(cls, name, bases, nmspc):
super(MetaFSM, cls).__init__(name, bases, nmspc)
states = {}
# Re-use states from inherited classes
default_state = None
for i in bases:
if isinstance(i, MetaFSM):
for state_id, state in i.states.items():
if state.is_default:
default_state = state
states[state_id] = state
# Add new states
for name, func in nmspc.items():
if hasattr(func, 'id'):
if func.is_default:
default_state = func
states[func.id] = func
cls.default_state = default_state
cls.states = states
class FSM(BaseAgent, metaclass=MetaFSM):
def __init__(self, *args, **kwargs):
super(FSM, self).__init__(*args, **kwargs)
if 'id' not in self.state:
self.state['id'] = self.default_state.id
def step(self):
if 'id' in self.state:
next_state = self.state['id']
elif self.default_state:
next_state = self.default_state.id
else:
raise Exception('{} has no valid state id or default state'.format(self))
if next_state not in self.states:
raise Exception('{} is not a valid id for {}'.format(next_state, self))
self.states[next_state](self)

@ -1,9 +1,9 @@
import random
import numpy as np
from . import FSM, state
from . import FSM, NetworkAgent, state
class SISaModel(FSM):
class SISaModel(FSM, NetworkAgent):
"""
Settings:
neutral_discontent_spon_prob

Loading…
Cancel
Save