mirror of
https://github.com/gsi-upm/soil
synced 2024-11-22 19:22:29 +00:00
209 lines
8.5 KiB
Python
209 lines
8.5 KiB
Python
|
import random
|
||
|
import networkx as nx
|
||
|
from soil.agents import Geo, NetworkAgent, FSM, state, default_state
|
||
|
from soil import Environment
|
||
|
|
||
|
|
||
|
class TerroristSpreadModel(FSM, Geo):
|
||
|
"""
|
||
|
Settings:
|
||
|
information_spread_intensity
|
||
|
|
||
|
terrorist_additional_influence
|
||
|
|
||
|
min_vulnerability (optional else zero)
|
||
|
|
||
|
max_vulnerability
|
||
|
|
||
|
prob_interaction
|
||
|
"""
|
||
|
|
||
|
def __init__(self, environment=None, agent_id=0, state=()):
|
||
|
super().__init__(environment=environment, agent_id=agent_id, state=state)
|
||
|
|
||
|
self.information_spread_intensity = environment.environment_params['information_spread_intensity']
|
||
|
self.terrorist_additional_influence = environment.environment_params['terrorist_additional_influence']
|
||
|
self.prob_interaction = environment.environment_params['prob_interaction']
|
||
|
|
||
|
if self['id'] == self.civilian.id: # Civilian
|
||
|
self.mean_belief = random.uniform(0.00, 0.5)
|
||
|
elif self['id'] == self.terrorist.id: # Terrorist
|
||
|
self.mean_belief = random.uniform(0.8, 1.00)
|
||
|
elif self['id'] == self.leader.id: # Leader
|
||
|
self.mean_belief = 1.00
|
||
|
else:
|
||
|
raise Exception('Invalid state id: {}'.format(self['id']))
|
||
|
|
||
|
if 'min_vulnerability' in environment.environment_params:
|
||
|
self.vulnerability = random.uniform( environment.environment_params['min_vulnerability'], environment.environment_params['max_vulnerability'] )
|
||
|
else :
|
||
|
self.vulnerability = random.uniform( 0, environment.environment_params['max_vulnerability'] )
|
||
|
|
||
|
|
||
|
@state
|
||
|
def civilian(self):
|
||
|
neighbours = list(self.get_neighboring_agents(agent_type=TerroristSpreadModel))
|
||
|
if len(neighbours) > 0:
|
||
|
# Only interact with some of the neighbors
|
||
|
interactions = list(n for n in neighbours if random.random() <= self.prob_interaction)
|
||
|
influence = sum( self.degree(i) for i in interactions )
|
||
|
mean_belief = sum( i.mean_belief * self.degree(i) / influence for i in interactions )
|
||
|
mean_belief = mean_belief * self.information_spread_intensity + self.mean_belief * ( 1 - self.information_spread_intensity )
|
||
|
self.mean_belief = mean_belief * self.vulnerability + self.mean_belief * ( 1 - self.vulnerability )
|
||
|
|
||
|
if self.mean_belief >= 0.8:
|
||
|
return self.terrorist
|
||
|
|
||
|
@state
|
||
|
def leader(self):
|
||
|
self.mean_belief = self.mean_belief ** ( 1 - self.terrorist_additional_influence )
|
||
|
for neighbour in self.get_neighboring_agents(state_id=[self.terrorist.id, self.leader.id]):
|
||
|
if self.betweenness(neighbour) > self.betweenness(self):
|
||
|
return self.terrorist
|
||
|
|
||
|
@state
|
||
|
def terrorist(self):
|
||
|
neighbours = self.get_agents(state_id=[self.terrorist.id, self.leader.id],
|
||
|
agent_type=TerroristSpreadModel,
|
||
|
limit_neighbors=True)
|
||
|
if len(neighbours) > 0:
|
||
|
influence = sum( self.degree(n) for n in neighbours )
|
||
|
mean_belief = sum( n.mean_belief * self.degree(n) / influence for n in neighbours )
|
||
|
mean_belief = mean_belief * self.vulnerability + self.mean_belief * ( 1 - self.vulnerability )
|
||
|
self.mean_belief = self.mean_belief ** ( 1 - self.terrorist_additional_influence )
|
||
|
|
||
|
# Check if there are any leaders in the group
|
||
|
leaders = list(filter(lambda x: x.state.id == self.leader.id, neighbours))
|
||
|
if not leaders:
|
||
|
# Check if this is the potential leader
|
||
|
# Stop once it's found. Otherwise, set self as leader
|
||
|
for neighbour in neighbours:
|
||
|
if self.betweenness(self) < self.betweenness(neighbour):
|
||
|
return
|
||
|
return self.leader
|
||
|
|
||
|
|
||
|
class TrainingAreaModel(FSM, Geo):
|
||
|
"""
|
||
|
Settings:
|
||
|
training_influence
|
||
|
|
||
|
min_vulnerability
|
||
|
|
||
|
Requires TerroristSpreadModel.
|
||
|
"""
|
||
|
|
||
|
def __init__(self, environment=None, agent_id=0, state=()):
|
||
|
super().__init__(environment=environment, agent_id=agent_id, state=state)
|
||
|
self.training_influence = environment.environment_params['training_influence']
|
||
|
if 'min_vulnerability' in environment.environment_params:
|
||
|
self.min_vulnerability = environment.environment_params['min_vulnerability']
|
||
|
else: self.min_vulnerability = 0
|
||
|
|
||
|
@default_state
|
||
|
@state
|
||
|
def terrorist(self):
|
||
|
for neighbour in self.get_neighboring_agents(agent_type=TerroristSpreadModel):
|
||
|
if neighbour.vulnerability > self.min_vulnerability:
|
||
|
neighbour.vulnerability = neighbour.vulnerability ** ( 1 - self.training_influence )
|
||
|
|
||
|
|
||
|
class HavenModel(FSM, Geo):
|
||
|
"""
|
||
|
Settings:
|
||
|
haven_influence
|
||
|
|
||
|
min_vulnerability
|
||
|
|
||
|
max_vulnerability
|
||
|
|
||
|
Requires TerroristSpreadModel.
|
||
|
"""
|
||
|
|
||
|
def __init__(self, environment=None, agent_id=0, state=()):
|
||
|
super().__init__(environment=environment, agent_id=agent_id, state=state)
|
||
|
self.haven_influence = environment.environment_params['haven_influence']
|
||
|
if 'min_vulnerability' in environment.environment_params:
|
||
|
self.min_vulnerability = environment.environment_params['min_vulnerability']
|
||
|
else: self.min_vulnerability = 0
|
||
|
self.max_vulnerability = environment.environment_params['max_vulnerability']
|
||
|
|
||
|
def get_occupants(self, **kwargs):
|
||
|
return self.get_neighboring_agents(agent_type=TerroristSpreadModel, **kwargs)
|
||
|
|
||
|
@state
|
||
|
def civilian(self):
|
||
|
civilians = self.get_occupants(state_id=self.civilian.id)
|
||
|
if not civilians:
|
||
|
return self.terrorist
|
||
|
|
||
|
for neighbour in self.get_occupants():
|
||
|
if neighbour.vulnerability > self.min_vulnerability:
|
||
|
neighbour.vulnerability = neighbour.vulnerability * ( 1 - self.haven_influence )
|
||
|
return self.civilian
|
||
|
|
||
|
@state
|
||
|
def terrorist(self):
|
||
|
for neighbour in self.get_occupants():
|
||
|
if neighbour.vulnerability < self.max_vulnerability:
|
||
|
neighbour.vulnerability = neighbour.vulnerability ** ( 1 - self.haven_influence )
|
||
|
return self.terrorist
|
||
|
|
||
|
|
||
|
class TerroristNetworkModel(TerroristSpreadModel):
|
||
|
"""
|
||
|
Settings:
|
||
|
sphere_influence
|
||
|
|
||
|
vision_range
|
||
|
|
||
|
weight_social_distance
|
||
|
|
||
|
weight_link_distance
|
||
|
"""
|
||
|
|
||
|
def __init__(self, environment=None, agent_id=0, state=()):
|
||
|
super().__init__(environment=environment, agent_id=agent_id, state=state)
|
||
|
|
||
|
self.vision_range = environment.environment_params['vision_range']
|
||
|
self.sphere_influence = environment.environment_params['sphere_influence']
|
||
|
self.weight_social_distance = environment.environment_params['weight_social_distance']
|
||
|
self.weight_link_distance = environment.environment_params['weight_link_distance']
|
||
|
|
||
|
@state
|
||
|
def terrorist(self):
|
||
|
self.update_relationships()
|
||
|
return super().terrorist()
|
||
|
|
||
|
@state
|
||
|
def leader(self):
|
||
|
self.update_relationships()
|
||
|
return super().leader()
|
||
|
|
||
|
def update_relationships(self):
|
||
|
if self.count_neighboring_agents(state_id=self.civilian.id) == 0:
|
||
|
close_ups = set(self.geo_search(radius=self.vision_range, agent_type=TerroristNetworkModel))
|
||
|
step_neighbours = set(self.ego_search(self.sphere_influence, agent_type=TerroristNetworkModel, center=False))
|
||
|
neighbours = set(agent.id for agent in self.get_neighboring_agents(agent_type=TerroristNetworkModel))
|
||
|
search = (close_ups | step_neighbours) - neighbours
|
||
|
for agent in self.get_agents(search):
|
||
|
social_distance = 1 / self.shortest_path_length(agent.id)
|
||
|
spatial_proximity = ( 1 - self.get_distance(agent.id) )
|
||
|
prob_new_interaction = self.weight_social_distance * social_distance + self.weight_link_distance * spatial_proximity
|
||
|
if agent['id'] == agent.civilian.id and random.random() < prob_new_interaction:
|
||
|
self.add_edge(agent)
|
||
|
break
|
||
|
|
||
|
def get_distance(self, target):
|
||
|
source_x, source_y = nx.get_node_attributes(self.global_topology, 'pos')[self.id]
|
||
|
target_x, target_y = nx.get_node_attributes(self.global_topology, 'pos')[target]
|
||
|
dx = abs( source_x - target_x )
|
||
|
dy = abs( source_y - target_y )
|
||
|
return ( dx ** 2 + dy ** 2 ) ** ( 1 / 2 )
|
||
|
|
||
|
def shortest_path_length(self, target):
|
||
|
try:
|
||
|
return nx.shortest_path_length(self.global_topology, self.id, target)
|
||
|
except nx.NetworkXNoPath:
|
||
|
return float('inf')
|