From ef40bdb545b7c23b86be405177b96758bbca6e57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=2E=20Fernando=20S=C3=A1nchez?= Date: Mon, 10 Apr 2017 16:36:43 +0200 Subject: [PATCH] Replace gevent with tornado Closes #28 Added: * Async test (still missing one that includes the IOLoop) * Async plugin under tests. To manually try async functionalities: ``` senpy -f tests/ ``` --- requirements.txt | 2 +- senpy/__main__.py | 14 +++++++------ senpy/extensions.py | 2 ++ tests/plugins/async_plugin/asyncplugin.py | 21 ++++++++++++++++++++ tests/plugins/async_plugin/asyncplugin.senpy | 8 ++++++++ tests/test_extensions.py | 13 +++++++++++- 6 files changed, 52 insertions(+), 8 deletions(-) create mode 100644 tests/plugins/async_plugin/asyncplugin.py create mode 100644 tests/plugins/async_plugin/asyncplugin.senpy diff --git a/requirements.txt b/requirements.txt index 0dcc894..db3c57b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ Flask>=0.10.1 requests>=2.4.1 -gevent>=1.1rc4 +tornado>=4.4.3 PyLD>=0.6.5 six future diff --git a/senpy/__main__.py b/senpy/__main__.py index dd711e4..4358174 100644 --- a/senpy/__main__.py +++ b/senpy/__main__.py @@ -22,15 +22,16 @@ the server. from flask import Flask from senpy.extensions import Senpy -from gevent.wsgi import WSGIServer -from gevent.monkey import patch_all +from tornado.wsgi import WSGIContainer +from tornado.httpserver import HTTPServer +from tornado.ioloop import IOLoop + + import logging import os import argparse import senpy -patch_all(thread=False) - SERVER_PORT = os.environ.get("PORT", 5000) @@ -92,9 +93,10 @@ def main(): print('Server running on port %s:%d. Ctrl+C to quit' % (args.host, args.port)) if not app.debug: - http_server = WSGIServer((args.host, args.port), app) + http_server = HTTPServer(WSGIContainer(app)) + http_server.listen(args.port, address=args.host) try: - http_server.serve_forever() + IOLoop.instance().start() except KeyboardInterrupt: print('Bye!') http_server.stop() diff --git a/senpy/extensions.py b/senpy/extensions.py index 36e2654..5989603 100644 --- a/senpy/extensions.py +++ b/senpy/extensions.py @@ -303,6 +303,7 @@ class Senpy(object): else: th = Thread(target=act) th.start() + return th def deactivate_plugin(self, plugin_name, sync=False): try: @@ -327,6 +328,7 @@ class Senpy(object): else: th = Thread(target=deact) th.start() + return th @classmethod def validate_info(cls, info): diff --git a/tests/plugins/async_plugin/asyncplugin.py b/tests/plugins/async_plugin/asyncplugin.py new file mode 100644 index 0000000..345ff2d --- /dev/null +++ b/tests/plugins/async_plugin/asyncplugin.py @@ -0,0 +1,21 @@ +from senpy.plugins import AnalysisPlugin + +import multiprocessing + + +class AsyncPlugin(AnalysisPlugin): + def _train(self, process_number): + return process_number + + def _do_async(self, num_processes): + with multiprocessing.Pool(processes=num_processes) as pool: + values = pool.map(self._train, range(num_processes)) + return values + + def activate(self): + self.value = self._do_async(4) + + def analyse_entry(self, entry, params): + values = self._do_async(2) + entry.async_values = values + yield entry diff --git a/tests/plugins/async_plugin/asyncplugin.senpy b/tests/plugins/async_plugin/asyncplugin.senpy new file mode 100644 index 0000000..8c71849 --- /dev/null +++ b/tests/plugins/async_plugin/asyncplugin.senpy @@ -0,0 +1,8 @@ +--- +name: Async +module: asyncplugin +description: I am async +author: "@balkian" +version: '0.1' +async: true +extra_params: {} \ No newline at end of file diff --git a/tests/test_extensions.py b/tests/test_extensions.py index 0523789..067f697 100644 --- a/tests/test_extensions.py +++ b/tests/test_extensions.py @@ -167,7 +167,7 @@ class ExtensionsTest(TestCase): assert len(senpy.plugins) > 1 def test_convert_emotions(self): - self.senpy.activate_all() + self.senpy.activate_all(sync=True) plugin = Plugin({ 'id': 'imaginary', 'onyx:usesEmotionModel': 'emoml:fsre-dimensions' @@ -205,3 +205,14 @@ class ExtensionsTest(TestCase): [plugin, ], params) assert len(r3.entries[0].emotions) == 1 + + # def test_async_plugin(self): + # """ We should accept multiprocessing plugins with async=False""" + # thread1 = self.senpy.activate_plugin("Async", sync=False) + # thread1.join(timeout=1) + # assert len(self.senpy.plugins['Async'].value) == 4 + + # resp = self.senpy.analyse(input='nothing', algorithm='Async') + + # assert len(resp.entries[0].async_values) == 2 + # self.senpy.activate_plugin("Async", sync=True)