1
0
mirror of https://github.com/gsi-upm/senpy synced 2024-09-21 06:01:43 +00:00
senpy/sentiText/sentitext.py

175 lines
6.0 KiB
Python
Raw Normal View History

2015-10-30 16:58:37 +00:00
import os
import logging
import string
import nltk
import pickle
from sentiwn import SentiWordNet
from nltk.corpus import wordnet as wn
from textblob import TextBlob
from scipy.interpolate import interp1d
from os import path
from senpy.plugins import SentimentPlugin, SenpyPlugin
2016-02-24 16:41:22 +00:00
from senpy.models import Results, Entry, Sentiment
2015-10-30 16:58:37 +00:00
logger = logging.getLogger(__name__)
class SentiTextPlugin(SentimentPlugin):
def __init__(self, info, *args, **kwargs):
super(SentiTextPlugin, self).__init__(info, *args, **kwargs)
self.id = info['module']
base = path.abspath(path.dirname(__file__))
self.swn_path = path.join(base, info['sentiword_path'])
self.pos_path = path.join(base, info['pos_path'])
self._swn = None
self._pos_tagger = None
def _load_swn(self):
swn = SentiWordNet(self.swn_path)
return swn
def _load_pos_tagger(self):
with open(self.pos_path, 'r') as f:
tagger = pickle.load(f)
return tagger
def activate(self, *args, **kwargs):
self._swn = self._load_swn()
self._pos_tagger = self._load_pos_tagger()
logger.info("SentiText plugin is ready to go!")
def deactivate(self, *args, **kwargs):
logger.info("SentiText plugin is being deactivated...")
def _remove_punctuation(self, tokens):
return [t for t in tokens if t not in string.punctuation]
def _tokenize(self, text):
data = {}
sentences = nltk.sent_tokenize(text)
for i, sentence in enumerate(sentences):
sentence_ = {}
words = nltk.word_tokenize(sentence)
sentence_['sentence'] = sentence
tokens_ = [w.lower() for w in words]
sentence_['tokens'] = self._remove_punctuation(tokens_)
data[i] = sentence_
return data
def _pos(self, tokens):
for i in tokens:
tokens[i]['tokens'] = self._pos_tagger.tag(tokens[i]['tokens'])
return tokens
# def _stopwords(sentences, lang='english'):
# for i in sentences:
# sentences[i]['tokens'] = [t for t in sentences[i]['tokens'] if t not in nltk.corpus.stopwords.words(lang)]
# return sentences
def _compare_synsets(self, synsets, tokens, i):
for synset in synsets:
for word in tokens[i]['lemmas']:
for lemma in tokens[i]['lemmas'][word]:
synset_ = lemma.synset()
if synset == synset_:
return synset
return None
def analyse(self, **params):
logger.debug("Analysing with params {}".format(params))
text = params.get("input", None)
tokens = self._tokenize(text)
tokens = self._pos(tokens)
for i in tokens:
tokens[i]['lemmas'] = {}
for w in tokens[i]['tokens']:
lemmas = wn.lemmas(w[0], lang='spa')
if len(lemmas) == 0:
continue
tokens[i]['lemmas'][w[0]] = lemmas
logger.debug("Tokens: {}".format(tokens))
trans = TextBlob(unicode(text)).translate(from_lang='es',to='en')
useful_synsets = {}
for s_i, t_s in enumerate(trans.sentences):
useful_synsets[s_i] = {}
for w_i, t_w in enumerate(trans.sentences[s_i].words):
synsets = wn.synsets(trans.sentences[s_i].words[w_i])
if len(synsets) == 0:
continue
eq_synset = self._compare_synsets(synsets, tokens, s_i)
useful_synsets[s_i][t_w] = eq_synset
logger.debug("Synsets used for analysis: {}".format(useful_synsets))
scores = {}
for i in tokens:
scores[i] = {}
2016-02-24 16:41:22 +00:00
if useful_synsets is None:
for word in useful_synsets[i]:
if useful_synsets[i][word] is None:
continue
temp_scores = self._swn.get_score(useful_synsets[i][word].name().split('.')[0].replace(' ',' '))
for score in temp_scores:
if score['synset'] == useful_synsets[i][word]:
t_score = score['pos'] - score['neg']
f_score = 'neu'
if t_score > 0:
f_score = 'pos'
elif t_score < 0:
f_score = 'neg'
score['score'] = f_score
scores[i][word] = score
break
2015-10-30 16:58:37 +00:00
logger.debug("All scores (some not used): {}".format(scores))
lang = params.get("language", "auto")
p = params.get("prefix", None)
2016-02-24 16:41:22 +00:00
response = Results()
2015-10-30 16:58:37 +00:00
for i in scores:
n_pos = 0.0
n_neg = 0.0
for w in scores[i]:
if scores[i][w]['score'] == 'pos':
n_pos += 1.0
elif scores[i][w]['score'] == 'neg':
n_neg += 1.0
inter = interp1d([-1.0, 1.0], [0.0, 1.0])
try:
g_score = (n_pos - n_neg) / (n_pos + n_neg)
g_score = float(inter(g_score))
except:
if n_pos == 0 and n_neg == 0:
g_score = 0.5
polarity = 'marl:Neutral'
if g_score > 0.5:
polarity = 'marl:Positive'
elif g_score < 0.5:
polarity = 'marl:Negative'
entry = Entry(id="Entry"+str(i),
2016-02-24 16:41:22 +00:00
nif_isString=tokens[i]['sentence'])
2015-10-30 16:58:37 +00:00
2016-02-24 16:41:22 +00:00
opinion = Sentiment(id="Opinion0"+'_'+str(i),
marl__hasPolarity=polarity,
marL__polarityValue=float("{0:.2f}".format(g_score)))
2015-10-30 16:58:37 +00:00
opinion["prov:wasGeneratedBy"] = self.id
2016-02-24 16:41:22 +00:00
entry.sentiments = []
entry.sentiments.append(opinion)
2015-10-30 16:58:37 +00:00
entry.language = lang
response.entries.append(entry)
return response