1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 12:48:23 +00:00

33 Commits

Author SHA1 Message Date
J. Fernando Sánchez
ea848f1a78 Version 0.10.3
* Ability to include "optional" fields from tweets (e.g., retweeted_status).
* Optional caching (for very large datasets)
2020-06-24 11:01:02 +02:00
J. Fernando Sánchez
030c41b826 Changes to user and tweet search: Cache by default
* Improved printing of credential limits
* Tweet and user searchers cache by default. Write has been changed to dry_run
2020-01-07 20:36:19 +01:00
J. Fernando Sánchez
bba73091e4 Fixes for serialize
Some commands in the CLI did not properly include serialize (get_tweet,
get_users and search)
2020-01-07 19:36:18 +01:00
J. Fernando Sánchez
80b58541e7 Add serialize 2019-11-12 17:15:26 +01:00
J. Fernando Sánchez
40a8b45231 Fix concurrency issue
Download_list sometimes failed with:
BrokenPipeError: [Errno 32] Broken pipe
2019-09-20 13:39:51 +02:00
J. Fernando Sánchez
fadeced761 Fix tests py2 2019-09-19 12:18:56 +02:00
J. Fernando Sánchez
bdb844d75f Fix compatibility click >=7 2019-09-19 11:45:12 +02:00
J. Fernando Sánchez
653487e2d7 Improve download_list 2019-04-30 19:15:15 +02:00
J. Fernando Sánchez
02aec5eefa Fix bug user_ids
Add number of failed downloads to the output.
Add flag to retry previously failed downloads.
2018-09-16 12:20:41 +02:00
J. Fernando Sánchez
e6b08c4ffb Fix bug user ids 2018-08-30 15:57:49 +02:00
J. Fernando Sánchez
311b972b27 Fix typo and bump to version 0.9.1 2018-08-21 13:02:58 +02:00
J. Fernando Sánchez
7724967285 Add filter-edges 2018-08-21 12:57:03 +02:00
J. Fernando Sánchez
dd662acd22 Fix pip installation 2018-08-21 12:54:25 +02:00
J. Fernando Sánchez
5199d5b5aa Improve CLI. Add credentials 2018-03-20 13:29:18 +01:00
J. Fernando Sánchez
6259013978 Compose and bug fixes 2018-03-19 14:36:25 +01:00
J. Fernando Sánchez
53bb7edabc Add sh scripts 2018-03-19 14:35:07 +01:00
J. Fernando Sánchez
57eb73b53b Fix README 2017-12-28 18:13:58 +01:00
J. Fernando Sánchez
7c829ee102 Fix py2 compatibility 2017-12-20 16:51:53 +01:00
J. Fernando Sánchez
27bc3557b2 Fixed JSON regression 2017-12-19 20:44:55 +01:00
J. Fernando Sánchez
9c82dea298 Config from variable or file
This replaces the old file of credentials (with one per line) with a
configuration in YAML format.

The configuration can be stored either in a file or in an environment variable
(BITTER_CONFIG).

There is still a command line argument to add the credentials in that file to
the config.
2017-12-19 20:34:39 +01:00
J. Fernando Sánchez
cf766a6bf3 API command
* Added API command
* Fixed bug in chunk
2017-11-30 16:49:42 +01:00
J. Fernando Sánchez
e65f6836b3 Fixed tweet error 2017-05-21 21:28:35 +02:00
J. Fernando Sánchez
1cb86abbdd Use easy in tests and README 2017-03-08 12:15:48 +01:00
J. Fernando Sánchez
b212a46ab7 Added CI and tests 2016-12-06 01:30:32 +01:00
J. Fernando Sánchez
0a0d8fd5f1 Improved tweet downloader (CLI and API) 2016-12-06 00:03:38 +01:00
J. Fernando Sánchez
e3a78968da Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:31:02 +01:00
J. Fernando Sánchez
67ef307cce Improved tweet extraction @ CLI 2016-11-23 10:50:01 +01:00
J. Fernando Sánchez
cb054ac365 Added (bulk) user download 2016-11-22 20:07:18 +01:00
J. Fernando Sánchez
bdc4690240 Fixed tail argument 2016-11-19 21:42:08 +01:00
J. Fernando Sánchez
c0309a1e52 Fixed python2 compatibility issues (print!) 2016-11-19 20:38:44 +01:00
J. Fernando Sánchez
4afdd6807d Fixed MANIFEST error 2016-11-19 20:24:02 +01:00
J. Fernando Sánchez
38605ba2c8 Added stream to CLI
* Save stream to file
* Parse file and get the most important hashtags
2016-11-19 20:16:56 +01:00
J. Fernando Sánchez
738823c8a2 Added Streaming workers/queues 2016-11-18 16:08:29 +01:00
36 changed files with 1749 additions and 340 deletions

2
.gitignore vendored
View File

@@ -1,6 +1,8 @@
__pycache__
*.egg-info *.egg-info
dist dist
env env
.env
__* __*
.* .*
*.pyc *.pyc

7
Dockerfile-2.7 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install ".[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile-3.4 Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install ".[server]"
ENTRYPOINT ["bitter"]

7
Dockerfile.template Normal file
View File

@@ -0,0 +1,7 @@
# onbuild copies . to /usr/src/app/
From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install ".[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,7 +1,10 @@
include requirements.txt include requirements.txt
include requirements-py2.txt
include test-requirements.txt include test-requirements.txt
include README.md include README.md
include bitter/VERSION include bitter/VERSION
graft bitter/templates graft bitter/templates
graft bitter/static graft bitter/static
graft test include tests/test*
global-exclude *.pyc
global-exclude __pycache__

76
Makefile Normal file
View File

@@ -0,0 +1,76 @@
PYVERSIONS=3.5
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian
VERSION=$(shell cat $(NAME)/VERSION)
TARNAME=$(NAME)-$(VERSION).tar.gz
IMAGENAME=$(REPO)/$(NAME)
IMAGEWTAG=$(IMAGENAME):$(VERSION)
all: build run
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
Dockerfile-%: Dockerfile.template
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
dev-%:
@docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \
docker run -d -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\
docker exec -ti $(NAME)-dev$* bash
dev: dev-$(PYMAIN)
build: $(addprefix build-, $(PYMAIN))
buildall: $(addprefix build-, $(PYVERSIONS))
build-%: Dockerfile-%
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-%
docker run --rm -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
dist/$(NAME)-$(VERSION).tar.gz:
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
sdist: dist/$(NAME)-$(VERSION).tar.gz
upload-%: test-%
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
upload: testall $(addprefix upload-,$(PYVERSIONS))
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
clean:
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
upload_git:
git commit -a
git tag ${VERSION}
git push --tags origin master
pip_upload:
python setup.py sdist upload ;
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build
docker run --rm --env-file $$PWD/.env -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run

152
README.md
View File

@@ -1,4 +1,5 @@
#Description # Description
There are two parts to bitter. There are two parts to bitter.
First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts). First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts).
Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper. Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper.
@@ -17,21 +18,158 @@ or
Programmatically: Programmatically:
```python ```python
from bitter.crawlers import TwitterQueue from bitter import easy
wq = TwitterQueue.from_credentials() wq = easy()
print(wq.users.show(user_name='balkian')) print(wq.users.show(user_name='balkian'))
``` ```
# Credentials format
You can also make custom calls to the API through the command line.
e.g. to get the latest 500 tweets by the python software foundation:
``` ```
{"user": "balkian", "consumer_secret": "xxx", "consumer_key": "xxx", "token_key": "xxx", "token_secret": "xxx"} bitter api statuses/user_timeline --id thepsf --count 500
``` ```
By default, bitter uses '~/.bitter-credentials.json', but you may choose a different file:
## Adding credentials
``` ```
python -m bitter -c <credentials_file> ... bitter --config <YOUR CONFIGURATION FILE> credentials add
```
You can specify the parameters in the command or let the command line guide you through the process.
# Examples
## Downloading a list of tweets
Bitter can download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.
You can even specify the column number for tweet ids.
Bitter will not try to download
```
Usage: bitter tweet get_all [OPTIONS] TWEETSFILE
Download tweets from a list of tweets in a CSV file. The result is stored
as individual json files in your folder of choice.
Options:
-f, --folder TEXT
-d, --delimiter TEXT
-h, --header Discard the first line (use it as a header)
-q, --quotechar TEXT
-c, --column INTEGER
--help Show this message and exit.
```
For instance, this will download `tweet_ids.csv` in the `tweet_info` folder:
```
bitter tweet get_all -f tweet_info tweet_ids.csv
```
## Downloading a list of users
Bitter downloads users and tweets in a similar way:
```
Usage: bitter users get_all [OPTIONS] USERSFILE
Download users from a list of user ids/screen names in a CSV file. The
result is stored as individual json files in your folder of choice.
Options:
-f, --folder TEXT
-d, --delimiter TEXT
-h, --header Discard the first line (use it as a header)
-q, --quotechar TEXT
-c, --column INTEGER
--help Show this message and exit.
```
The only difference is that users can be downloaded via `screen_name` or `user_id`.
This method does not try to resolve screen names to user ids, so users may be downloaded more than once if they appear in both ways.
## Downloading a stream
```
Usage: bitter stream get [OPTIONS]
Options:
-l, --locations TEXT
-t, --track TEXT
-f, --file TEXT File to store the stream of tweets. Default: standard output
-p, --politelyretry Politely retry after a hangup/connection error
--help Show this message and exit.
```
```
bitter --config .bitter.yaml stream get
```
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
## REST queries
In newer versions of bitter, individual methods to download tweets/users using the REST API are being replaced with a generic method to call the API.
```
bitter api <URL endpoint> --parameter VALUE ... | [--tweets | --users] [--max_count MAX_COUNT] [--count COUNT_PER_CALL]
```
For instance:
```
# Get 100 tweets that mentioned Obama after tweet 942689870501302300
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama
```
That is equivalent to this call to the api: `api/1.1/searc/tweets?since_id=942689870501302300&count=100&q=Obama`.
The flags `--tweets` and `--users` are optional.
If you use them, bitter will try to intelligently fetch all the tweets/users by using pagination with the API.
For example:
```
# Download 1000 tweets, 100 tweets per call.
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama --max_count=1000 --tweets
```
```
# Download all the followers of @balkian
bitter api 'followers/list' --_id balkian --users --max_count -1
```
Note that some reserved words (such as `id`) have to be preceeded by an underscore.
This limitation is imposed by the python-twitter library.
# Configuration format
```
credentials:
- user: "balkian"
consumer_secret: "xxx"
consumer_key: "xxx"
token_key: "xxx"
token_secret: "xxx"
- user: ....
```
By default, bitter uses '~/.bitter.yaml', but you may choose a different file:
```
python -m bitter --config <config_file> ...
```
Or use an environment variable:
```
export BITTER_CONFIG=$(cat myconfig.yaml)
``` ```
# Server # Server

10
bin/README.md Normal file
View File

@@ -0,0 +1,10 @@
Scripts to process jsonlines
To get the jsonlines file, you can use the streaming API or the search api, like so:
```
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
```
To keep track of the query that generated the file, you can save the command in a text file.
For instance, the example above is also in `example_query.sh`.

1
bin/example_query.sh Executable file
View File

@@ -0,0 +1 @@
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines

13
bin/extract-hashtags.sh Executable file
View File

@@ -0,0 +1,13 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export FIELDS="created_at,id,text"
for i in "$@"
do
OUTPUT=$i.hashtags.csv
echo "$FIELDS" > $OUTPUT
pv -l $i -N "hashtags $i" | jq -r '. | .created_at as $created_at | .id_str as $id | .entities.hashtags | select(. != null) | .[] | [$created_at, $id, .text] | @csv' >> $OUTPUT
done

15
bin/extract-interactions.sh Executable file
View File

@@ -0,0 +1,15 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
for i in "$@"
do
REPLYOUTPUT=$i.replies.csv
RTOUTPUT=$i.rts.csv
echo 'created_at,id,user_id,reply_user_id' > $REPLYOUTPUT
echo 'created_at,id,user_id,rt_user_id' > $RTOUTPUT
pv -l -N "$i" $i | jq -r '. | select(.in_reply_to_user_id_str != null) | [.created_at, .id_str, .user.id_str, .in_reply_to_user_id_str] | @csv' >> $REPLYOUTPUT
pv -l -N "$i" $i | jq -r '. | select(.retweeted_status != null) | [.created_at, .retweeted_status.id_str, .user.id_str, .retweeted_status.user.id_str] | @csv' >> $RTOUTPUT
done

16
bin/extract-limits.sh Executable file
View File

@@ -0,0 +1,16 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export QUERY='.limit | select(. != null) | [.timestamp_ms, .track] | @csv'
export FIELDS="timestamp,track"
for i in "$@"
do
OUTPUT=$i.limits.csv
echo $FIELDS > $OUTPUT
pv -N "$i limits" -l $i | jq -r "$QUERY" >> $OUTPUT
done

16
bin/extract-media.sh Executable file
View File

@@ -0,0 +1,16 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export QUERY='select(.id != null) | .id_str as $id | .entities.urls[] | select(.expanded_url | select(. != null) | contains("open.spotify") or contains("youtube.com") or contains("youtu.be")) | [$id, .expanded_url] | @csv'
export FIELDS="id,url"
for i in "$@"
do
OUTPUT=$i.media.csv
echo $FIELDS > $OUTPUT
pv -N "$i media" -l $i | jq -r "$QUERY" >> $OUTPUT
done

28
bin/extract-users.sh Executable file
View File

@@ -0,0 +1,28 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export USER_FIELDS="\$created_at,\
.id_str,\
.screen_name,\
.followers_count,\
.lang,\
.description,\
.statuses_count,\
.favourites_count,\
.friends_count,\
.created_at,\
.name,\
.location,\
.listed_count,\
.time_zone\
"
for i in "$@"
do
OUTPUT=$i.users.csv
echo \#$USER_FIELDS > $OUTPUT
jq -r ".created_at as \$created_at | .user,.retweeted_status.user | select(. != null) | [$USER_FIELDS] | @csv " $i | pv -N "$i" -l >> $OUTPUT
done

32
bin/extract.sh Executable file
View File

@@ -0,0 +1,32 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
FIELDS=".id_str,\
.user.screen_name,\
.user.id,\
.favorite_count,\
.retweet_count,\
.quote_count,\
.reply_count,\
.created_at,\
.lang,\
.in_reply_to_user_id_str,\
.in_reply_to_status_id_str,\
.retweeted_status.id_str,\
.retweeted_status.user.id,\
.retweeted_status.favorite_count,\
.retweeted_status.retweet_count,\
.retweeted_status.quote_count,\
.retweeted_status.reply_count,\
.retweeted_status.created_at\
"
for i in "$@"
do
OUTPUT=$i.tweets.csv
echo "$FIELDS" | sed -e 's/,[ \t\n]*\./,/g' | sed -e 's/^[#]\?\.//' > $OUTPUT
jq -r "[$FIELDS]|@csv" $i | pv -N "$i" -l >> $OUTPUT
done

17
bin/extract_extended.sh Executable file
View File

@@ -0,0 +1,17 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
QUERY='.| select(.retweeted_status != null) | .retweeted_status | .id_str as $rt_id | .extended_tweet | select(. != null) | [$rt_id,.full_text]|@csv'
HEADER='rt_id,full_text'
for i in "$@"
do
OUTPUT=$i.full_text.csv
echo $HEADER > $OUTPUT
jq "$QUERY" $i | pv -N "$i" -l >> $OUTPUT
sort -u $OUTPUT -o $OUTPUT
sed -ri 's/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g' $OUTPUT
done

16
bin/extract_text.sh Executable file
View File

@@ -0,0 +1,16 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
QUERY='(.full_text // .retweeted_status.full_text) as $text | [ .id_str,$text ] | @csv'
HEADER='id,text'
for i in "$@"
do
OUTPUT=$i.text.csv
echo $HEADER > $OUTPUT
pv -l -N "$i" $i | jq -r "$QUERY" >> $OUTPUT
# sed -ri s/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g $OUTPUT
done

10
bin/filter-edges.sh Normal file
View File

@@ -0,0 +1,10 @@
if [ "$#" -lt 2 ]
then
echo "Find edge lines in a file that contain one of the users in a user list."
echo ""
echo "Usage: $0 <file with edges> <file with the list of users>"
exit 1
fi
pv -c -N 'read' "$1" | grep -F -w -f "$2" | pv -lc -N 'found'

23
bin/functions.py Normal file
View File

@@ -0,0 +1,23 @@
import pandas as pd
def read_rts(rtsfile, tweetsfile):
tweets = pd.read_csv(tweetsfile, index_col=0)
rts = pd.read_csv(rtsfile, index_col=1)
merged = rts.groupby(by=['id', 'rt_user_id']).size().rename('count').reset_index(level=1).merge(tweets, left_index=True, right_index=True)
return merged.sort_values(by='count', ascending=False)
def read_tweets(tweetsfile):
'''When the dataset is small enough, we can load tweets as-in'''
with open(tweetsfile) as f:
header = f.readline().strip().split(',')
dtypes = {}
for key in header:
if key.endswith('_str') or key.endswith('.id'):
dtypes[key] = object
tweets = pd.read_csv(tweetsfile, dtype=dtypes, index_col=0)
return tweets
if __name__ == '__main__':
import argparse

1
bin/print-hashtags.sh Executable file
View File

@@ -0,0 +1 @@
cat "$@" | awk -F"," '{print tolower($3)}' | sort | uniq -c | sort -h

14
bin/print-replies.sh Executable file
View File

@@ -0,0 +1,14 @@
MAX_TAGS=100
function get_text {
while read line
do
echo $line
rtid=$(echo $line | awk -F"," '{print $2}')
text=$(grep -m 1 $rtid *.text.csv)
echo "$line - $text"
done < "/dev/stdin"
}
cat "$@" | get_text

15
bin/print-rts.sh Executable file
View File

@@ -0,0 +1,15 @@
MAX_TAGS=100
function get_text {
while read line
do
echo $line
rtid=$(echo $line | awk '{print $2}')
count=$(echo $line | awk '{print $1}')
text=$(grep -m 1 $rtid *.text.csv)
echo "$line - $text"
done < "/dev/stdin"
}
cat "$@" | awk -F"," '{print tolower($2)}' | sort | uniq -c | sort -h | tail -n $MAX_TAGS | get_text

View File

@@ -1 +1 @@
0.5.6 0.10.3

View File

@@ -3,15 +3,16 @@ Bitter module. A library and cli for Twitter using python-twitter.
http://github.com/balkian/bitter http://github.com/balkian/bitter
""" """
try:
from future.standard_library import install_aliases
install_aliases()
except ImportError:
# Avoid problems at setup.py and py3.x
pass
import os import os
from .version import __version__ from .version import __version__
from . import config as bconf
def easy(conffile=bconf.CONFIG_FILE):
from .crawlers import TwitterQueue
return TwitterQueue.from_config(conffile=conffile)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -1,3 +1,5 @@
from __future__ import print_function
import click import click
import json import json
import os import os
@@ -6,6 +8,7 @@ import time
import sqlalchemy.types import sqlalchemy.types
import threading import threading
import sqlite3 import sqlite3
from tqdm import tqdm
from sqlalchemy import exists from sqlalchemy import exists
@@ -18,55 +21,193 @@ if sys.version_info <= (3, 0):
from contextlib2 import ExitStack from contextlib2 import ExitStack
else: else:
from contextlib import ExitStack from contextlib import ExitStack
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def serialize(function):
'''Common options to serialize output to CSV or other formats'''
@click.option('--fields', help='Provide a list of comma-separated fields to print.', default='', type=str)
@click.option('--ignore_missing', help='Do not show warnings for missing fields.', is_flag=True)
@click.option('--header', help='Header that will be printed at the beginning of the file', default=None)
@click.option('--csv', help='Print each object as a csv row.', is_flag=True)
@click.option('--jsonlines', '--json', help='Print each object as JSON in a new line.', is_flag=True)
@click.option('--indented', help='Print each object as an indented JSON object', is_flag=True)
@click.option('--outdelimiter', help='Delimiter for some output formats, such as CSV. It defaults to \t', default='\t')
@click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout)
def decorated(fields, ignore_missing, header, csv, jsonlines, indented, outfile, outdelimiter, **kwargs):
it = function(**kwargs)
outformat = 'json'
if csv:
outformat = 'csv'
elif jsonlines:
outformat = 'jsonlines'
elif indented:
outformat = 'indented'
return utils.serialized(it, outfile, outformat=outformat, fields=fields.split(','), ignore_missing=ignore_missing, header=header, delimiter=outdelimiter)
return decorated
@click.group() @click.group()
@click.option("--verbose", is_flag=True) @click.option("--verbose", is_flag=True)
@click.option("--logging_level", required=False, default='WARN') @click.option("--logging_level", required=False, default='WARN')
@click.option("--config", required=False) @click.option('--config', show_default=True, default=bconf.CONFIG_FILE)
@click.option('-c', '--credentials', show_default=True, default='~/.bitter-credentials.json') @click.option('--credentials', show_default=True, help="DEPRECATED: If specified, these credentials will be copied to the configuratation file.", default=bconf.CREDENTIALS)
@click.pass_context @click.pass_context
def main(ctx, verbose, logging_level, config, credentials): def main(ctx, verbose, logging_level, config, credentials):
logging.basicConfig(level=getattr(logging, logging_level)) logging.basicConfig(level=getattr(logging, logging_level))
ctx.obj = {} ctx.obj = {}
ctx.obj['VERBOSE'] = verbose ctx.obj['VERBOSE'] = verbose
ctx.obj['CONFIG'] = config bconf.CONFIG_FILE = config
bconf.CREDENTIALS = credentials bconf.CREDENTIALS = credentials
utils.create_credentials(credentials) if os.path.exists(utils.get_config_path(credentials)):
utils.copy_credentials_to_config(credentials, config)
@main.group(invoke_without_command=True)
@click.pass_context
def credentials(ctx):
if ctx.invoked_subcommand is not None:
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for worker in wq.queue:
print('#'*20)
try:
resp = worker.client.application.rate_limit_status()
print(worker.name)
except Exception as ex:
print('{}: AUTHENTICATION ERROR: {}'.format(worker.name, ex) )
@credentials.command('limits')
@click.option('--no_aggregate', is_flag=True, default=False,
help=('Print limits from all workers. By default, limits are '
'aggregated (summed).'))
@click.option('--no_diff', is_flag=True, default=False,
help=('Print all limits. By default, only limits that '
'have been consumed will be shown.'))
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, no_aggregate, no_diff, url):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
limits = {}
if url:
print('URL is: {}'.format(url))
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
for urlimits in resp['resources'].values():
for url, value in urlimits.items():
if url not in limits:
limits[url] = {}
glob = limits[url].get('global', {})
limits[url][worker.name] = value
for k in ['limit', 'remaining']:
if k not in glob:
glob[k] = 0
glob[k] += value[k]
limits[url]['global'] = glob
for url, lims in limits.items():
worker_list = lims.keys() if no_aggregate else ['global', ]
url_printed = False
for worker in worker_list:
vals = lims[worker]
consumed = vals['limit'] - vals['remaining']
if no_diff or consumed:
if not url_printed:
print(url)
url_printed = True
print('\t', worker, ':')
print('\t\t', vals)
@credentials.command('add')
@click.option('--consumer_key', default=None)
@click.option('--consumer_secret', default=None)
@click.option('--token_key', default=None)
@click.option('--token_secret', default=None)
@click.argument('user_name')
def add(user_name, consumer_key, consumer_secret, token_key, token_secret):
if not consumer_key:
consumer_key = click.prompt('Please, enter your YOUR CONSUMER KEY')
if not consumer_secret:
consumer_secret = click.prompt('Please, enter your CONSUMER SECRET')
if not token_key:
token_key = click.prompt('Please, enter your ACCESS TOKEN')
if not token_secret:
token_secret = click.prompt('Please, enter your ACCESS TOKEN SECRET')
utils.add_credentials(conffile=bconf.CONFIG_FILE, user=user_name, consumer_key=consumer_key, consumer_secret=consumer_secret,
token_key=token_key, token_secret=token_secret)
click.echo('Credentials added for {}'.format(user_name))
@main.group() @main.group()
@click.pass_context @click.pass_context
def tweet(ctx): def tweet(ctx):
pass pass
@tweet.command('get') @tweet.command('get')
@click.option('-d', '--dry_run', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid') @click.argument('tweetid')
@click.pass_context @serialize
def get_tweet(ctx, tweetid): def get_tweet(tweetid, dry_run, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
t = utils.get_tweet(wq, tweetid) yield from utils.download_tweet(wq, tweetid, not dry_run, folder, update)
print(json.dumps(t, indent=2))
@tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.''')
@click.argument('tweetsfile')
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",")
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
@click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0)
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-q', '--quotechar', default='"')
@click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, nocache, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing tweets. Continue?'):
click.echo('Cancelling')
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
status = tqdm('Queried')
failed = 0
for tid, obj in utils.download_tweets_file(wq, tweetsfile, folder, delimiter=delimiter, cache=not nocache,
skip=skip, quotechar=quotechar, commentchar=commentchar,
column=column, update=update, retry_failed=retry):
status.update(1)
if not obj:
failed += 1
status.set_description('Failed: %s. Queried' % failed, refresh=True)
continue
yield obj
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')
@click.pass_context @serialize
def get_tweet(ctx, query): @click.pass_context
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) def search(ctx, query):
c = wq.next() wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
t = utils.search_tweet(c.client, query) yield from utils.search_tweet(wq, query)
print(json.dumps(t, indent=2))
@tweet.command('timeline') @tweet.command('timeline')
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def get_tweet(ctx, user): def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
c = wq.next() t = utils.user_timeline(wq, user)
t = utils.user_timeline(c.client, user)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@main.group() @main.group()
@@ -85,23 +226,52 @@ def list_users(ctx, db):
for j in i.__dict__: for j in i.__dict__:
print('\t{}: {}'.format(j, getattr(i,j))) print('\t{}: {}'.format(j, getattr(i,j)))
@users.command('get_one')
@click.argument('user')
@click.pass_context
def get_user(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
c = wq.next()
u = utils.get_user(c.client, user)
print(json.dumps(u, indent=2))
@users.command('get') @users.command('get')
@click.argument('user')
@click.option('-d', '--dry_run', is_flag=True, default=False)
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
@serialize
def get_user(user, dry_run, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
yield from utils.download_user(wq, user, not dry_run, folder, update)
@users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file.
The result is stored as individual json files in your folder of choice.''')
@click.argument('usersfile')
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
@click.option('-d', '--delimiter', default=",")
@click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)',
is_flag=True, default=False)
@click.option('-q', '--quotechar', default='"')
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context
def get_users(ctx, usersfile, folder, update, retry, nocache, delimiter, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing users. Continue?'):
click.echo('Cancelling')
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for i in utils.download_users_file(wq, usersfile, folder, delimiter=delimiter,
update=update, retry_failed=retry,
skip=skip, quotechar=quotechar,
cache=not nocache,
commentchar=commentchar,
column=column):
yield i
@users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.') @click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.') @click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up') @click.argument('usersfile')
@click.pass_context @click.pass_context
def get_users(ctx, usersfile, skip, until, threads, db): def crawl_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
if '://' not in db: if '://' not in db:
@@ -113,7 +283,7 @@ def get_users(ctx, usersfile, skip, until, threads, db):
return ExitStack() return ExitStack()
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue))) len(wq.queue)))
@@ -204,14 +374,9 @@ def get_users(ctx, usersfile, skip, until, threads, db):
speed = (collected-lastcollected)/10 speed = (collected-lastcollected)/10
with statslock: with statslock:
lastcollected = collected lastcollected = collected
logger.info('Done!') logger.info('Done!')
@main.group('api')
def api():
pass
@main.group('extractor') @main.group('extractor')
@click.pass_context @click.pass_context
@click.option('--db', required=True, help='Database of users.') @click.option('--db', required=True, help='Database of users.')
@@ -260,7 +425,7 @@ def network_extractor(ctx, as_json):
if as_json: if as_json:
import json import json
print(json.dumps(follower_map, indent=4)) print(json.dumps(follower_map, indent=4))
@extractor.command('users') @extractor.command('users')
@click.pass_context @click.pass_context
@@ -282,7 +447,7 @@ def users_extractor(ctx):
@click.pass_context @click.pass_context
def extract(ctx, recursive, user, name, initfile): def extract(ctx, recursive, user, name, initfile):
print(locals()) print(locals())
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
dburi = ctx.obj['DBURI'] dburi = ctx.obj['DBURI']
utils.extract(wq, utils.extract(wq,
recursive=recursive, recursive=recursive,
@@ -294,31 +459,41 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset') @extractor.command('reset')
@click.pass_context @click.pass_context
def reset_extractor(ctx): def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
db = ctx.obj['DBURI'] db = ctx.obj['DBURI']
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@api.command('limits')
@click.argument('url', required=False) @main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False),
help='''Issue a call to an endpoint of the Twitter API.''')
@click.argument('cmd', nargs=1)
@click.option('--tweets', is_flag=True, help='Fetch more tweets using smart pagination. Use --count to control how many tweets to fetch per call, and --max_count to set the number of desired tweets (or -1 to get as many as possible).', type=bool, default=False)
@click.option('--users', is_flag=True, help='Fetch more users using smart pagination. Use --count to control how many users to fetch per call, and --max_count to set the number of desired users (or -1 to get as many as possible).', type=bool, default=False)
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
@click.pass_context @click.pass_context
def get_limits(ctx, url): def api(ctx, cmd, tweets, users, api_args):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) opts = {}
for worker in wq.queue: mappings = {
resp = worker.client.application.rate_limit_status() 'id': '_id'
print('#'*20) }
print(worker.name) i = iter(api_args)
if url: for k, v in zip(i, i):
limit = 'NOT FOUND' k = k.replace('--', '')
print('URL is: {}'.format(url)) if k in mappings:
cat = url.split('/')[1] k = mappings[k]
if cat in resp['resources']: opts[k] = v
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat] wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
else: if tweets:
print('Cat {} not found'.format(cat)) resp = utils.consume_tweets(wq[cmd], **opts)
print('{}: {}'.format(url, limit)) elif users:
else: resp = utils.consume_users(wq[cmd], **opts)
print(json.dumps(resp, indent=2)) else:
resp = wq[cmd](**opts)
print(json.dumps(resp))
return
for i in resp:
print(json.dumps(i))
@main.command('server') @main.command('server')
@click.argument('CONSUMER_KEY', required=True) @click.argument('CONSUMER_KEY', required=True)
@@ -329,7 +504,76 @@ def run_server(ctx, consumer_key, consumer_secret):
bconf.CONSUMER_SECRET = consumer_secret bconf.CONSUMER_SECRET = consumer_secret
from .webserver import app from .webserver import app
app.run(host='0.0.0.0') app.run(host='0.0.0.0')
@main.group()
@click.pass_context
def stream(ctx):
pass
@stream.command('get')
@click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None)
@click.option('-f', '--file', default=None, help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context
def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1)
query_args = {}
if locations:
query_args['locations'] = locations
if track:
query_args['track'] = track
if not file:
file = sys.stdout
else:
file = open(file, 'a')
def insist():
lasthangup = time.time()
while True:
if not query_args:
iterator = wq.statuses.sample()
else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
try:
for i in iterator:
yield i
if not politelyretry:
return
except Exception:
if not politelyretry:
raise ex
thishangup = time.time()
if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.')
time.sleep(3)
for tweet in tqdm(insist()):
print(json.dumps(tweet), file=file)
if file != sys.stdout:
file.close()
@stream.command('read')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
@click.pass_context
def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail):
try:
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['created_at'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
except (KeyError, TypeError):
print('Raw tweet: {}'.format(tweet))
@stream.command('tags')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.argument('limit', required=False, default=None, type=int)
@click.pass_context
def tags_stream(ctx, file, limit):
c = utils.get_hashtags(utils.read_file(file))
for count, tag in c.most_common(limit):
print(u'{} - {}'.format(count, tag))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -3,7 +3,7 @@ Common configuration for other modules.
It is not elegant, but it works with flask and the oauth decorators. It is not elegant, but it works with flask and the oauth decorators.
Using this module allows you to change the config before loading any other module. Using this module allows you to change the config before loading any other module.
E.g.: E.g.:
import bitter.config as c import bitter.config as c
c.CREDENTIALS="/tmp/credentials" c.CREDENTIALS="/tmp/credentials"
@@ -11,3 +11,4 @@ E.g.:
app.run() app.run()
''' '''
CREDENTIALS = '~/.bitter-credentials.json' CREDENTIALS = '~/.bitter-credentials.json'
CONFIG_FILE = '~/.bitter.yaml'

View File

@@ -9,6 +9,13 @@ logger = logging.getLogger(__name__)
from twitter import * from twitter import *
from collections import OrderedDict from collections import OrderedDict
from threading import Lock from threading import Lock
from itertools import islice
from functools import partial
try:
import itertools.ifilter as filter
except ImportError:
pass
from . import utils from . import utils
from . import config from . import config
@@ -32,33 +39,95 @@ class AttrToFunc(object):
else: else:
return extend_call(k) return extend_call(k)
def __getitem__(self, k):
return partial(self.handler, self.__uriparts+k.split('/'))
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
# for i, a in enumerate(args)e # for i, a in enumerate(args)e
# kwargs[i] = a # kwargs[i] = a
return self.handler(self.__uriparts, *args, **kwargs) return self.handler(self.__uriparts, *args, **kwargs)
class FromCredentialsMixin(object):
@classmethod
def from_credentials(cls, cred_file=None, max_workers=None):
wq = cls()
for cred in islice(utils.get_credentials(cred_file), max_workers):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class FromConfigMixin(object):
@classmethod
def from_config(cls, config=None, conffile=None, max_workers=None):
wq = cls()
if not config:
with utils.config(conffile) as c:
config = c
for cred in islice(config['credentials'], max_workers):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class TwitterWorker(object): class TwitterWorker(object):
def __init__(self, name, client): api_class = None
def __init__(self, name, creds):
self.name = name self.name = name
self.client = client self._client = None
self.cred = creds
self._lock = Lock() self._lock = Lock()
self.busy = False self.busy = False
self.limits = self.client.application.rate_limit_status()
@property
def client(self):
if not self._client:
auth=OAuth(self.cred['token_key'],
self.cred['token_secret'],
self.cred['consumer_key'],
self.cred['consumer_secret'])
self._client = self.api_class(auth=auth)
return self._client
def __repr__(self):
msg = '<{} for {}>'.format(self.__class__.__name__, self.name)
if self.busy:
msg += ' [busy]'
return msg
class RestWorker(TwitterWorker):
api_class = Twitter
def __init__(self, *args, **kwargs):
super(RestWorker, self).__init__(*args, **kwargs)
self._limits = None
@property
def limits(self):
if not self._limits:
self._limits = self.client.application.rate_limit_status()
return self._limits
def is_limited(self, uriparts): def is_limited(self, uriparts):
return self.get_wait(uriparts)>0 return self.get_wait(uriparts)>0
def get_wait(self, uriparts): def get_wait(self, uriparts):
limits = self.get_limit(uriparts) limits = self.get_limit(uriparts)
if limits['remaining'] > 0: if limits.get('remaining', 1) > 0:
return 0 return 0
reset = limits.get('reset', 0) reset = limits.get('reset', 0)
now = time.time() now = time.time()
return max(0, (reset-now)) return max(0, (reset-now))
def get_limit(self, uriparts): def get_limit(self, uriparts):
uriparts = list(u for u in uriparts if u)
uri = '/'+'/'.join(uriparts) uri = '/'+'/'.join(uriparts)
return self.limits.get('resources', {}).get(uriparts[0], {}).get(uri, {}) for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
if ix.startswith(uri):
return i
return {}
def set_limit(self, uriparts, value): def set_limit(self, uriparts, value):
uri = '/'+'/'.join(uriparts) uri = '/'+'/'.join(uriparts)
@@ -83,10 +152,10 @@ class TwitterWorker(object):
class TwitterQueueException(BaseException): class QueueException(BaseException):
pass pass
class TwitterQueue(AttrToFunc): class QueueMixin(AttrToFunc, FromCredentialsMixin, FromConfigMixin):
def __init__(self, wait=True): def __init__(self, wait=True):
logger.debug('Creating worker queue') logger.debug('Creating worker queue')
self.queue = set() self.queue = set()
@@ -97,6 +166,10 @@ class TwitterQueue(AttrToFunc):
def ready(self, worker): def ready(self, worker):
self.queue.add(worker) self.queue.add(worker)
class TwitterQueue(QueueMixin):
worker_class = RestWorker
def handle_call(self, uriparts, *args, **kwargs): def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts)) logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs)) logger.debug('With: {} {}'.format(args, kwargs))
@@ -132,23 +205,14 @@ class TwitterQueue(AttrToFunc):
if not self.wait: if not self.wait:
patience -= 1 patience -= 1
@classmethod
def from_credentials(self, cred_file=None):
wq = TwitterQueue()
for cred in utils.get_credentials(cred_file):
c = Twitter(auth=OAuth(cred['token_key'],
cred['token_secret'],
cred['consumer_key'],
cred['consumer_secret']))
wq.ready(TwitterWorker(cred["user"], c))
return wq
def get_wait(self, uriparts): def get_wait(self, uriparts):
# Stop as soon as one is available to avoid initiating the rest
for i in self.queue:
if not i.busy and i.get_wait(uriparts) == 0:
return 0
# If None is available, let's see how much we have to wait
available = filter(lambda x: not x.busy, self.queue) available = filter(lambda x: not x.busy, self.queue)
first_worker = min(available, key=lambda x: x.get_wait(uriparts)) diff = min(worker.get_wait(uriparts) for worker in self.queue if not worker.busy)
diff = first_worker.get_wait(uriparts)
return diff return diff
@@ -159,7 +223,7 @@ class TwitterQueue(AttrToFunc):
for worker in s: for worker in s:
if not worker.is_limited(uriparts) and not worker.busy: if not worker.is_limited(uriparts) and not worker.busy:
return worker return worker
raise TwitterQueueException('No worker is available') raise QueueException('No worker is available')
def next(self, uriparts): def next(self, uriparts):
if not self.wait: if not self.wait:
@@ -167,7 +231,7 @@ class TwitterQueue(AttrToFunc):
while True: while True:
try: try:
return self._next(uriparts) return self._next(uriparts)
except TwitterQueueException: except QueueException:
available = filter(lambda x: not x.busy, self.queue) available = filter(lambda x: not x.busy, self.queue)
if available: if available:
diff = self.get_wait(uriparts) diff = self.get_wait(uriparts)
@@ -177,3 +241,44 @@ class TwitterQueue(AttrToFunc):
logger.info("All workers are busy. Waiting %s seconds" % diff) logger.info("All workers are busy. Waiting %s seconds" % diff)
time.sleep(diff) time.sleep(diff)
class StreamWorker(TwitterWorker):
api_class = TwitterStream
def __init__(self, *args, **kwargs):
super(StreamWorker, self).__init__(*args, **kwargs)
class StreamQueue(QueueMixin):
worker_class = StreamWorker
def __init__(self, wait=True):
logger.debug('Creating worker queue')
self.queue = set()
self.index = 0
self.wait = wait
AttrToFunc.__init__(self, handler=self.handle_call)
def handle_call(self, uriparts, *args, **kwargs):
logger.debug('Called: {}'.format(uriparts))
logger.debug('With: {} {}'.format(args, kwargs))
c = None
c = self.next(uriparts)
c._lock.acquire()
c.busy = True
logger.debug('Next: {}'.format(c.name))
ping = time.time()
resp = getattr(c.client, "/".join(uriparts))(*args, **kwargs)
for i in resp:
yield i
pong = time.time()
logger.debug('Listening for: {}'.format(pong-ping))
c.busy = False
c._lock.release()
def next(self, uriparts):
logger.debug('Getting next available')
s = list(self.queue)
random.shuffle(s)
for worker in s:
if not worker.busy:
return worker
raise QueueException('No worker is available')

View File

@@ -3,10 +3,13 @@ import json
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import BigInteger, Integer, Text, Boolean from sqlalchemy.types import BigInteger, Integer, Text, Boolean
from sqlalchemy.schema import ForeignKey
from sqlalchemy.pool import SingletonThreadPool
from sqlalchemy import Column, Index from sqlalchemy import Column, Index
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from functools import wraps
Base = declarative_base() Base = declarative_base()
@@ -85,34 +88,48 @@ class ExtractorEntry(Base):
user = Column(BigInteger, index=True) user = Column(BigInteger, index=True)
cursor = Column(BigInteger, default=-1) cursor = Column(BigInteger, default=-1)
pending = Column(Boolean, default=False) pending = Column(Boolean, default=False)
errors = Column(Text, default="")
busy = Column(Boolean, default=False)
class Search(Base):
__tablename__ = 'search_queries'
id = Column(Integer, primary_key=True, index=True, unique=True)
endpoint = Column(Text, comment="Endpoint URL")
attrs = Column(Text, comment="Text version of the dictionary of parameters")
count = Column(Integer)
current_count = Column(Integer)
current_id = Column(BigInteger, comment='Oldest ID retrieved (should match max_id when done)')
since_id = Column(BigInteger)
class SearchResults(Base):
__tablename__ = 'search_results'
id = Column(Integer, primary_key=True, index=True, unique=True)
search_id = Column(ForeignKey('search_queries.id'))
resource_id = Column(Text)
def memoize(f):
memo = {}
@wraps(f)
def helper(self, **kwargs):
st = dict_to_str(kwargs)
key = (self.__uriparts, st)
if key not in memo:
memo[key] = f(self, **kwargs)
return memo[key]
return helper
def make_session(url): def make_session(url):
engine = create_engine(url)#, echo=True) if not isinstance(url, str):
print(url)
raise Exception("FUCK")
engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True)
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
session = Session() session = Session()
return session return session
def test(db='sqlite:///users.db'): def dict_to_str(args):
return json.dumps(args, sort_keys=True)
from sqlalchemy import exists
session = make_session(db)
our_user = session.query(User).first()
print(our_user.name)
print(session.query(User).count())
fake_user = User(name="Fake user")
session.add(fake_user)
session.commit()
print(session.query(User).count())
print(session.query(exists().where(User.name == "Fake user")).scalar())
fake_committed = session.query(User).filter_by(name="Fake user").first()
print(fake_committed.id)
print(fake_committed.name)
session.delete(fake_committed)
session.commit()
print(session.query(User).count())
print(list(session.execute('SELECT 1 from users where id=\'%s\'' % 1548)))

View File

@@ -1,6 +1,11 @@
from __future__ import print_function
import logging import logging
import time import time
import json import json
import yaml
import csv
import io
import signal import signal
import sys import sys
@@ -8,10 +13,25 @@ import sqlalchemy
import os import os
import multiprocessing import multiprocessing
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from multiprocessing import Queue
from itertools import islice import queue
import threading
from select import select
import operator
from functools import partial, reduce
from tqdm import tqdm
from itertools import islice, chain
from contextlib import contextmanager from contextlib import contextmanager
from itertools import zip_longest
from collections import Counter
from random import choice
from builtins import map, filter
from twitter import TwitterHTTPError from twitter import TwitterHTTPError
@@ -19,6 +39,12 @@ from bitter.models import Following, User, ExtractorEntry, make_session
from bitter import config from bitter import config
# Fix Python 2.x.
try:
UNICODE_EXISTS = bool(type(unicode))
except NameError:
unicode = lambda s: str(s)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -26,39 +52,100 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!') logger.info('You pressed Ctrl+C!')
sys.exit(0) sys.exit(0)
def chunk(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()): def chunk(iterable, n):
if chunksize: it = iter(iterable)
source = chunk(source, chunksize) return iter(lambda: tuple(islice(it, n)), ())
p = ThreadPool(numcpus)
for i in p.imap(func, source):
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize)
p = ThreadPool(numcpus*2)
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as ex:
print('Exception on parallel thread: {}'.format(ex), file=sys.stderr)
results = p.imap_unordered(wrapped_func, source)
for i in chain.from_iterable(results):
yield i yield i
def get_credentials_path(credfile=None):
if not credfile: def get_config_path(conf=None):
if config.CREDENTIALS: if not conf:
credfile = config.CREDENTIALS if config.CONFIG_FILE:
conf = config.CONFIG_FILE
else: else:
raise Exception('No valid credentials file') raise Exception('No valid config file')
return os.path.expanduser(credfile) return os.path.expanduser(conf)
def copy_credentials_to_config(credfile, conffile=None):
p = get_config_path(credfile)
with open(p) as old:
for line in old:
cred = json.loads(line.strip())
add_credentials(conffile, **cred)
def save_config(conf, conffile=None):
with config(conffile) as c:
c.clear()
c.update(conf)
@contextmanager @contextmanager
def credentials_file(credfile, *args, **kwargs): def config(conffile=None):
p = get_credentials_path(credfile) d = read_config(conffile)
with open(p, *args, **kwargs) as f: try:
yield f yield d
finally:
write_config(d, conffile)
def iter_credentials(credfile=None):
with credentials_file(credfile) as f:
for l in f:
yield json.loads(l.strip())
def get_credentials(credfile=None, inverse=False, **kwargs): def read_config(conffile):
p = conffile and get_config_path(conffile)
if p:
if not os.path.exists(p):
raise IOError('{} file does not exist.'.format(p))
f = open(p, 'r')
elif 'BITTER_CONFIG' not in os.environ:
raise Exception('No config file or BITTER_CONFIG env variable.')
else:
f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n'))
return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []}
def write_config(conf, conffile=None):
if not conf:
conf = {'credentials': []}
if conffile:
p = get_config_path(conffile)
with open(p, 'w') as f:
yaml.dump(conf, f)
else:
os.environ['BITTER_CONFIG'] = yaml.dump(conf)
def iter_credentials(conffile=None):
with config(conffile) as c:
for i in c['credentials']:
yield i
def create_config_file(conffile=None):
if not conffile:
return
conffile = get_config_path(conffile)
with open(conffile, 'a'):
pass
write_config(None, conffile)
def get_credentials(conffile=None, inverse=False, **kwargs):
creds = [] creds = []
for i in iter_credentials(credfile): for i in iter_credentials(conffile):
matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items())) matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items()))
if matches and not inverse: if matches and not inverse:
creds.append(i) creds.append(i)
@@ -66,24 +153,51 @@ def get_credentials(credfile=None, inverse=False, **kwargs):
creds.append(i) creds.append(i)
return creds return creds
def create_credentials(credfile=None):
credfile = get_credentials_path(credfile)
with credentials_file(credfile, 'a'):
pass
def delete_credentials(credfile=None, **creds): def delete_credentials(conffile=None, **creds):
tokeep = get_credentials(credfile, inverse=True, **creds) tokeep = get_credentials(conffile, inverse=True, **creds)
with credentials_file(credfile, 'w') as f: with config(conffile) as c:
for i in tokeep: c['credentials'] = list(tokeep)
f.write(json.dumps(i))
f.write('\n')
def add_credentials(credfile=None, **creds):
exist = get_credentials(credfile, **creds) def add_credentials(conffile=None, **creds):
if not exist: try:
with credentials_file(credfile, 'a') as f: exist = get_credentials(conffile, **creds)
f.write(json.dumps(creds)) except IOError:
f.write('\n') exist = False
create_config_file(conffile)
if exist:
return
with config(conffile) as c:
c['credentials'].append(creds)
def get_hashtags(iter_tweets, best=None):
c = Counter()
for tweet in iter_tweets:
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c
def read_file(filename, tail=False):
if filename == '-':
f = sys.stdin
else:
f = open(filename)
try:
while True:
line = f.readline()
if line not in (None, '', '\n'):
tweet = json.loads(line.strip())
yield tweet
else:
if tail:
time.sleep(1)
else:
return
finally:
if f != sys.stdin:
close(f)
def get_users(wq, ulist, by_name=False, queue=None, max_users=100): def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
@@ -113,6 +227,7 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
else: else:
yield user yield user
def trim_user(user): def trim_user(user):
if 'status' in user: if 'status' in user:
del user['status'] del user['status']
@@ -126,148 +241,226 @@ def trim_user(user):
return user return user
def add_user(session, user, enqueue=False): def add_user(user, dburi=None, session=None, update=False):
if not session:
session = make_session(dburi)
user = trim_user(user) user = trim_user(user)
olduser = session.query(User).filter(User.id==user['id']) olduser = session.query(User).filter(User.id == user['id'])
if olduser: if olduser:
if not update:
return
olduser.delete() olduser.delete()
user = User(**user) nuser = User()
session.add(user) for key, value in user.items():
if extract: setattr(nuser, key, value)
logging.debug('Adding entry') user = nuser
if update:
session.add(user)
logger.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry: if not entry:
entry = ExtractorEntry(user=user.id) entry = ExtractorEntry(user=user.id)
session.add(entry) session.add(entry)
logging.debug(entry.pending) logger.debug(entry.pending)
entry.pending = True entry.pending = True
entry.cursor = -1 entry.cursor = -1
session.commit() session.commit()
session.close()
def download_entry(wq, entry_id, dburi=None, recursive=False):
session = make_session(dburi)
if not session:
raise Exception("Provide dburi or session")
logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id)))
entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first()
user = session.query(User).filter(User.id == entry.user).first()
download_user(wq, session, user, entry, recursive)
session.close()
def crawl_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
total_followers = user.followers_count
if total_followers > max_followers:
entry.pending = False
logger.info("Too many followers for user: %s" % user.screen_name)
session.add(entry)
session.commit()
return
if not entry:
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id)
session.add(entry)
session.commit()
pending = True
cursor = entry.cursor
uid = user.id
name = user.name
logger.info("#"*20)
logger.info("Getting %s - %s" % (uid, name))
logger.info("Cursor %s" % cursor)
logger.info("Using account: %s" % wq.name)
_fetched_followers = 0
def fetched_followers():
return session.query(Following).filter(Following.isfollowed==uid).count()
attempts = 0
while cursor > 0 or fetched_followers() < total_followers:
try:
resp = wq.followers.ids(user_id=uid, cursor=cursor)
except TwitterHTTPError as ex:
attempts += 1
if ex.e.code in (401, ) or attempts > 3:
logger.info('Not authorized for user: {}'.format(uid))
entry.errors = ex.message
break
if 'ids' not in resp:
logger.info("Error with id %s %s" % (uid, resp))
entry.pending = False
entry.errors = "No ids in response: %s" % resp
break
logger.info("New followers: %s" % len(resp['ids']))
if recursive:
newusers = get_users(wq, resp)
for newuser in newusers:
add_user(session=session, user=newuser)
if 'ids' not in resp or not resp['ids']:
logger.info('NO IDS in response')
break
for i in resp['ids']:
existing_user = session.query(Following).\
filter(Following.isfollowed == uid).\
filter(Following.follower == i).first()
now = int(time.time())
if existing_user:
existing_user.created_at_stamp = now
else:
f = Following(isfollowed=uid,
follower=i,
created_at_stamp=now)
session.add(f)
logger.info("Fetched: %s/%s followers" % (fetched_followers(),
total_followers))
entry.cursor = resp["next_cursor"]
session.add(entry)
session.commit()
logger.info("Done getting followers for %s" % uid)
entry.pending = False
entry.busy = False
session.add(entry)
session.commit()
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
sys.stdout.flush()
def classify_user(id_or_name, screen_names, user_ids):
try:
int(id_or_name)
user_ids.append(id_or_name)
logger.debug("Added user id")
except ValueError:
logger.debug("Added screen_name")
screen_names.append(id_or_name.split('@')[-1])
# TODO: adapt to the crawler
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None): def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
w = wq.next()
if not dburi: if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name dburi = 'sqlite:///%s.db' % extractor_name
session = make_session(dburi) session = make_session(dburi)
session.query(ExtractorEntry).update({ExtractorEntry.busy: False})
session.commit()
screen_names = []
user_ids = []
def classify_user(id_or_name): if not (user or initfile):
try:
int(user)
user_ids.append(user)
logger.info("Added user id")
except ValueError:
logger.info("Added screen_name")
screen_names.append(user.split('@')[-1])
if user:
classify_user(user)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user)
else:
logger.info('Using pending users from last session') logger.info('Using pending users from last session')
else:
screen_names = []
user_ids = []
if user:
classify_user(user, screen_names, user_ids)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user, screen_names, user_ids)
def missing_user(ix, column=User.screen_name):
res = session.query(User).filter(column == ix).count() == 0
if res:
logger.info("Missing user %s. Count: %s" % (ix, res))
return res
nusers = list(get_users(wq, screen_names, by_name=True)) screen_names = list(filter(missing_user, screen_names))
if user_ids: user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids))
nusers += list(get_users(wq, user_ids, by_name=False)) nusers = []
logger.info("Missing user ids: %s" % user_ids)
logger.info("Missing screen names: %s" % screen_names)
if screen_names:
nusers = list(get_users(wq, screen_names, by_name=True))
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
for i in nusers: for i in nusers:
add_user(session, i, enqueue=True) add_user(dburi=dburi, user=i)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar() total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logging.info('Total users: {}'.format(total_users)) logger.info('Total users: {}'.format(total_users))
def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logging.info('Pending: {}'.format(pending))
return pending
while pending_entries() > 0: de = partial(download_entry, wq, dburi=dburi)
logger.info("Using account: %s" % w.name) pending = pending_entries(dburi)
session.close()
with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq:
for i in tq:
tq.write('Got {}'.format(i))
logger.info("Got %s" % i)
def pending_entries(dburi):
session = make_session(dburi)
while True:
candidate, entry = session.query(User, ExtractorEntry).\ candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\ filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\ filter(ExtractorEntry.pending == True).\
order_by(User.followers_count).first() filter(ExtractorEntry.busy == False).\
if not candidate: order_by(User.followers_count).first()
break if candidate:
pending = True entry.busy = True
cursor = entry.cursor session.add(entry)
uid = candidate.id session.commit()
uobject = session.query(User).filter(User.id==uid).first() yield int(entry.id)
name = uobject.screen_name if uobject else None continue
if session.query(ExtractorEntry).\
logger.info("#"*20) filter(ExtractorEntry.busy == True).count() > 0:
logger.info("Getting %s - %s" % (uid, name)) time.sleep(1)
logger.info("Cursor %s" % cursor) continue
logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users)) logger.info("No more pending entries")
try: break
resp = wq.followers.ids(user_id=uid, cursor=cursor) session.close()
except TwitterHTTPError as ex:
if ex.e.code in (401, ):
logger.info('Not authorized for user: {}'.format(uid))
resp = {}
if 'ids' in resp:
logger.info("New followers: %s" % len(resp['ids']))
if recursive:
newusers = get_users(wq, resp)
for user in newusers:
add_user(session, newuser, enqueue=True)
for i in resp['ids']:
existing_user = session.query(Following).\
filter(Following.isfollowed==uid).\
filter(Following.follower==i).first()
now = int(time.time())
if existing_user:
existing_user.created_at_stamp = now
else:
f = Following(isfollowed=uid,
follower=i,
created_at_stamp=now)
session.add(f)
total_followers = candidate.followers_count
fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count()
logger.info("Fetched: %s/%s followers" % (fetched_followers,
total_followers))
cursor = resp["next_cursor"]
if cursor > 0:
pending = True
logger.info("Getting more followers for %s" % uid)
else:
logger.info("Done getting followers for %s" % uid)
cursor = -1
pending = False
else:
logger.info("Error with id %s %s" % (uid, resp))
pending = False
entry.pending = pending
entry.cursor = cursor
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate)
session.commit()
sys.stdout.flush()
def get_tweet(c, tid): def get_tweet(c, tid):
return c.statuses.show(id=tid) return c.statuses.show(id=tid)
def search_tweet(c, query): def search_tweet(c, query):
return c.search.tweets(q=query) yield from c.search.tweets(q=query)['statuses']
def user_timeline(c, query): def user_timeline(c, query):
try: try:
@@ -281,3 +474,330 @@ def get_user(c, user):
return c.users.lookup(user_id=user)[0] return c.users.lookup(user_id=user)[0]
except ValueError: except ValueError:
return c.users.lookup(screen_name=user)[0] return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, cache=True, folder="downloaded_tweets", update=False):
tweet = cached_id(tweetid, folder)
if update or not tweet:
tweet = get_tweet(wq, tweetid)
if cache and update:
if tweet:
js = json.dumps(tweet)
write_json(js, folder)
yield tweet
def download_user(wq, userid, cache=True, folder="downloaded_users", update=False):
user = cached_id(userid, folder)
if update or not user:
user = get_user(wq, userid)
if cache and update:
if user:
write_json(user, folder, aliases=[user['screen_name'], ])
yield user
def cached_id(oid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % oid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Object exists' % oid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(oid, ex))
return tweet
def write_json(js, folder, oid=None, aliases=[]):
if not oid:
oid = js['id']
file = id_file(oid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f)
logger.info('Written {} to file {}'.format(oid, file))
for alias in aliases:
os.symlink('%s.json' % oid, id_file(alias, folder))
def id_file(oid, folder):
return os.path.join(folder, '%s.json' % oid)
def fail_file(oid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % oid)
def id_failed(oid, folder):
return os.path.isfile(fail_file(oid, folder))
def tweet_download_batch(wq, batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
for tid, tweet in tweets.items():
yield tid, tweet
def user_download_batch(wq, batch):
screen_names = []
user_ids = []
for elem in batch:
try:
int(elem)
user_ids.append(str(elem))
except ValueError:
screen_names.append(elem.lower())
args = {}
if user_ids:
args['user_id'] = ','.join(user_ids)
if screen_names:
args['screen_name'] = ','.join(screen_names)
try:
users = wq.users.lookup(**args)
except TwitterHTTPError as ex:
if ex.e.code in (404,):
users = []
else:
raise
found_ids = []
found_names = []
for user in users:
uid = user['id_str']
if uid in user_ids:
found_ids.append(uid)
yield (uid, user)
uname = user['screen_name'].lower()
if uname in screen_names:
found_names.append(uname)
yield (uname, user)
for uid in set(user_ids) - set(found_ids):
yield (uid, None)
for name in set(screen_names) - set(found_names):
yield (name, None)
def dump_result(oid, obj, folder, ignore_fails=True):
if obj:
try:
write_json(obj, folder=folder, oid=oid)
failed = fail_file(oid, folder)
if os.path.exists(failed):
os.remove(failed)
except Exception as ex:
logger.error('%s: %s' % (oid, ex))
if not ignore_fails:
raise
else:
logger.info('Object not recovered: {}'.format(oid))
with open(fail_file(oid, folder), 'w') as f:
print('Object not found', file=f)
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, cache=True,
batch_method=tweet_download_batch):
done = Queue()
down = Queue()
def filter_list(lst, done, down):
print('filtering')
for oid in lst:
cached = cached_id(oid, folder)
if (cached and not update):
done.put((oid, cached))
elif (id_failed(oid, folder) and not retry_failed):
done.put((oid, None))
else:
down.put(oid)
down.put(None)
def download_results(batch_method, down, done):
def gen():
while True:
r = down.get()
if r is None:
down.close()
down.join_thread()
return
yield r
for t in parallel(batch_method, gen(), 100):
done.put(t)
def batch(*args, **kwargs):
return batch_method(wq, *args, **kwargs)
tc = threading.Thread(target=filter_list, args=(lst, done, down), daemon=True)
tc.start()
td = threading.Thread(target=download_results, args=(batch, down, done), daemon=True)
td.start()
def check_threads(ts, done):
for t in ts:
t.join()
done.put(None)
wait = threading.Thread(target=check_threads, args=([tc, td], done), daemon=True)
wait.start()
while True:
rec = done.get()
if rec is None:
done.close()
done.join_thread()
break
oid, obj = rec
if cache or (not obj):
dump_result(oid, obj, folder, ignore_fails)
yield rec
wait.join()
def download_tweets_file(*args, **kwargs):
kwargs['batch_method'] = tweet_download_batch
yield from download_file(*args, **kwargs)
def download_users_file(*args, **kwargs):
kwargs['batch_method'] = user_download_batch
yield from download_file(*args, **kwargs)
def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0, cache=True,
quotechar='"', commentchar=None, batch_method=tweet_download_batch,
**kwargs):
with open(csvfile) as f:
if commentchar:
f = (line for line in f if not line.startswith('#'))
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
for n in range(skip):
next(csvreader)
def reader(r):
for row in csvreader:
if len(row) > column:
yield row[column].strip()
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, cache=cache,
**kwargs):
yield res
def download_timeline(wq, user):
return wq.statuses.user_timeline(id=user)
def _consume_feed(func, feed_control=None, **kwargs):
'''
Get all the tweets using pagination and a given method.
It can be controlled with the `count` parameter.
If max_count < 0 => Loop until the whole feed is consumed.
If max_count == 0 => Only call the API once, with the default values.
If max_count > 0 => Get max_count tweets from the feed.
'''
remaining = int(kwargs.pop('max_count', 0))
count = int(kwargs.get('count', -1))
limit = False
# We need to at least perform a query, so we simulate a do-while
# by running once with no limit and updating the condition at the end
with tqdm(total=remaining) as pbar:
while not limit:
if remaining > 0 and ((count < 0) or (count > remaining)):
kwargs['count'] = remaining
resp, stop = feed_control(func, kwargs, remaining=remaining, batch_size=count)
if not resp:
return
for entry in resp:
yield entry
pbar.update(len(resp))
limit = stop
if remaining < 0:
# If the loop was run with a negative remaining, it will only stop
# when the control function tells it to.
continue
# Otherwise, check if we have already downloaded all the required items
remaining -= len(resp)
limit = limit or remaining <= 0
def consume_tweets(*args, **kwargs):
return _consume_feed(*args, feed_control=_tweets_control, **kwargs)
def consume_users(*args, **kwargs):
return _consume_feed(*args, feed_control=_users_control, **kwargs)
def _tweets_control(func, apiargs, remaining=0, **kwargs):
''' Return a list of entries, the remaining '''
resp = func(**apiargs)
if not resp:
return None, True
# Update the arguments for the next call
# Two options: either resp is a list, or a dict like:
# {'statuses': ... 'search_metadata': ...}
if isinstance(resp, dict) and 'search_metadata' in resp:
resp = resp['statuses']
max_id = min(s['id'] for s in resp) - 1
apiargs['max_id'] = max_id
return resp, False
def _users_control(func, apiargs, remaining=0, **kwargs):
resp = func(**apiargs)
stop = True
# Update the arguments for the next call
if 'next_cursor' in resp:
cursor = resp['next_cursor']
apiargs['cursor'] = cursor
if int(cursor) != -1:
stop = False
return resp['users'], stop
def serialized(it, outfile, outformat='csv', fields=[], header=None, ignore_missing=False, delimiter='\t'):
outformat = outformat.lower()
def do(out):
if outformat == 'csv':
writer = csv.writer(out, quoting=csv.QUOTE_ALL, delimiter=delimiter)
if header != '':
h = header
if h is None:
h = delimiter.join(fields)
print(h, file=out)
attrs = list(token.strip().split('.') for token in fields)
for obj in it:
values = []
for attr in attrs:
try:
values.append(reduce(operator.getitem, attr, obj))
except KeyError:
if not ignore_missing:
print('Key not present: {}'.format(attr), file=sys.stderr)
values.append(None)
writer.writerow(values)
elif outformat == 'jsonlines':
for obj in it:
print(json.dumps(obj, sort_keys=True), file=out)
elif outformat == 'indented':
for obj in it:
print(json.dumps(obj, indent=4, sort_keys=True), file=out)
else:
for obj in it:
print(obj, file=out)
if outfile is sys.stdout:
return do(sys.stdout)
with open(outfile, 'w') as out:
return do(out)

12
docker-compose.yml Normal file
View File

@@ -0,0 +1,12 @@
version: '2'
services:
dev:
build:
context: .
dockerfile: Dockerfile-3.4
volumes:
- '.:/usr/src/app'
tty: yes
working_dir: '/usr/src/app'
entrypoint: '/bin/bash'
command: ''

View File

@@ -1,3 +1,5 @@
sqlalchemy sqlalchemy
twitter twitter
click click
tqdm
pyyaml

4
setup.cfg Normal file
View File

@@ -0,0 +1,4 @@
[metadata]
description-file = README.md
[aliases]
test=pytest

View File

@@ -1,29 +1,23 @@
import pip
from setuptools import setup from setuptools import setup
from pip.req import parse_requirements
# parse_requirements() returns generator of pip.req.InstallRequirement objects def parse_requirements(filename):
# pip 6 introduces the *required* session argument """ load requirements from a pip requirements file """
try: with open(filename, 'r') as f:
install_reqs = parse_requirements("requirements.txt", session=pip.download.PipSession()) lineiter = list(line.strip() for line in f)
py2_reqs = parse_requirements("requirements-py2.txt", session=pip.download.PipSession()) return [line for line in lineiter if line and not line.startswith("#")]
test_reqs = parse_requirements("test-requirements.txt", session=pip.download.PipSession())
except AttributeError: install_reqs = parse_requirements("requirements.txt")
install_reqs = parse_requirements("requirements.txt") py2_reqs = parse_requirements("requirements-py2.txt")
py2_reqs = parse_requirements("requirements-py2.txt") test_reqs = parse_requirements("test-requirements.txt")
test_reqs = parse_requirements("test-requirements.txt")
import sys import sys
import os
import itertools import itertools
if sys.version_info <= (3, 0): if sys.version_info <= (3, 0):
install_reqs = itertools.chain(install_reqs, py2_reqs) install_reqs = install_reqs + py2_reqs
# reqs is a list of requirement with open(os.path.join('bitter', 'VERSION'), 'r') as f:
# e.g. ['django==1.5.1', 'mezzanine==1.4.6'] __version__ = f.read().strip()
install_reqs = [str(ir.req) for ir in install_reqs]
test_reqs = [str(ir.req) for ir in test_reqs]
from bitter import __version__
setup( setup(
name="bitter", name="bitter",
@@ -38,10 +32,19 @@ setup(
extras_require = { extras_require = {
'server': ['flask', 'flask-oauthlib'] 'server': ['flask', 'flask-oauthlib']
}, },
test_suite="tests", setup_requires=['pytest-runner',],
include_package_data=True, include_package_data=True,
entry_points=""" entry_points="""
[console_scripts] [console_scripts]
bitter=bitter.cli:main bitter=bitter.cli:main
""" """,
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
]
) )

View File

@@ -5,14 +5,18 @@ import types
import datetime import datetime
import time import time
from bitter import utils from bitter import utils, easy
from bitter.crawlers import TwitterQueue, TwitterWorker, TwitterQueueException from bitter.crawlers import QueueException
from bitter import config as c from bitter import config as c
class TestUtils(TestCase): class TestCrawlers(TestCase):
def setUp(self): def setUp(self):
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json')) CONF_PATH = os.path.join(os.path.dirname(__file__), '.bitter.yaml')
if os.path.exists(CONF_PATH):
self.wq = easy(CONF_PATH)
else:
self.wq = easy()
def test_create_worker(self): def test_create_worker(self):
assert len(self.wq.queue)==1 assert len(self.wq.queue)==1
@@ -64,12 +68,9 @@ class TestUtils(TestCase):
try: try:
# resp = self.wq.friends.list(screen_name='balkian') # resp = self.wq.friends.list(screen_name='balkian')
self.wq.next(['friends', 'list']) self.wq.next(['friends', 'list'])
except TwitterQueueException: except QueueException:
failed = True failed = True
assert failed assert failed
l2 = w1.get_limit(['friends', 'list']) l2 = w1.get_limit(['friends', 'list'])
assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time()) assert self.wq.get_wait(['friends', 'list']) > (l2['reset']-time.time())
assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2) assert self.wq.get_wait(['friends', 'list']) < (l2['reset']-time.time()+2)
time.sleep(w1.get_wait(['friends', 'list']))

23
tests/test_models.py Normal file
View File

@@ -0,0 +1,23 @@
from unittest import TestCase
import os
import types
from bitter import utils
from bitter.models import *
from sqlalchemy import exists
class TestModels(TestCase):
def setUp(self):
self.session = make_session('sqlite://')
def test_user(self):
fake_user = User(name="Fake user", id=1548)
self.session.add(fake_user)
self.session.commit()
fake_committed = self.session.query(User).filter_by(name="Fake user").first()
assert fake_committed
self.session.delete(fake_committed)
self.session.commit()
assert not list(self.session.execute('SELECT 1 from users where id=\'%s\'' % 1548))

View File

@@ -8,54 +8,63 @@ from bitter import config as c
class TestUtils(TestCase): class TestUtils(TestCase):
configfile = '/tmp/bitter.yaml'
def setUp(self): def setUp(self):
self.credfile = '/tmp/credentials.txt' c.CONFIG_FILE = self.configfile
c.CREDENTIALS = self.credfile if os.path.exists(self.configfile):
if os.path.exists(self.credfile): os.remove(self.configfile)
os.remove(self.credfile) assert not os.path.exists(self.configfile)
utils.create_credentials(self.credfile) utils.create_config_file(self.configfile)
assert os.path.exists(self.configfile)
def test_create_credentials(self):
assert os.path.exists(self.credfile)
os.remove(self.credfile)
utils.create_credentials() # From config
assert os.path.exists(self.credfile)
def test_add_credentials(self): def test_add_credentials(self):
utils.add_credentials(self.credfile, user="test") utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(self.credfile) assert utils.get_credentials(self.configfile)
assert utils.get_credentials(user="test") assert utils.get_credentials(self.configfile, user="test")
assert list(utils.get_credentials(user="test"))[0]["user"] == "test" assert list(utils.get_credentials(self.configfile, user="test"))[0]["user"] == "test"
def test_get_credentials(self): def test_get_credentials(self):
utils.add_credentials(self.credfile, user="test") utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(user="test") assert utils.get_credentials(self.configfile, user="test")
assert not utils.get_credentials(user="test", inverse=True) assert not utils.get_credentials(self.configfile, user="test", inverse=True)
def test_add_two_credentials(self): def test_add_two_credentials(self):
utils.add_credentials(self.credfile, user="test") utils.add_credentials(self.configfile, user="test")
utils.add_credentials(self.credfile, user="test2") utils.add_credentials(self.configfile, user="test2")
assert utils.get_credentials(user="test") assert utils.get_credentials(self.configfile, user="test")
assert utils.get_credentials(user="test2") assert utils.get_credentials(self.configfile, user="test2")
def test_delete_credentials(self): def test_delete_credentials(self):
utils.add_credentials(self.credfile, user="test") utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(user="test") assert utils.get_credentials(self.configfile, user="test")
utils.delete_credentials(user="test") utils.delete_credentials(self.configfile, user="test")
print(utils.get_credentials()) assert not utils.get_credentials(self.configfile, user="test")
assert not utils.get_credentials(user="test")
def test_parallel(self): def test_parallel(self):
import time import time
def echo(i): def echo(i):
time.sleep(2) time.sleep(0.5)
return i return i
tic = time.time() tic = time.time()
resp = utils.parallel(echo, [1,2,3]) resp = utils.parallel(echo, [1, 2, 3])
assert isinstance(resp, types.GeneratorType) assert isinstance(resp, types.GeneratorType)
assert list(resp) == [1,2,3] assert sorted(list(resp)) == [1, 2, 3]
toc = time.time() toc = time.time()
assert (tic-toc) < 6000 assert (tic-toc) < 600
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert sorted(list(resp2)) == [1, 2, 3, 4]
class TestUtilsEnv(TestUtils):
configfile = None
def setUp(self):
if 'BITTER_CONFIG' in os.environ:
self.oldenv = os.environ['BITTER_CONFIG']
os.environ['BITTER_CONFIG'] = ''
def tearDown(self):
if hasattr(self, 'oldenv'):
os.environ['BITTER_CONFIG'] = self.oldenv