1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-26 05:08:22 +00:00

20 Commits

Author SHA1 Message Date
J. Fernando Sánchez
ea848f1a78 Version 0.10.3
* Ability to include "optional" fields from tweets (e.g., retweeted_status).
* Optional caching (for very large datasets)
2020-06-24 11:01:02 +02:00
J. Fernando Sánchez
030c41b826 Changes to user and tweet search: Cache by default
* Improved printing of credential limits
* Tweet and user searchers cache by default. Write has been changed to dry_run
2020-01-07 20:36:19 +01:00
J. Fernando Sánchez
bba73091e4 Fixes for serialize
Some commands in the CLI did not properly include serialize (get_tweet,
get_users and search)
2020-01-07 19:36:18 +01:00
J. Fernando Sánchez
80b58541e7 Add serialize 2019-11-12 17:15:26 +01:00
J. Fernando Sánchez
40a8b45231 Fix concurrency issue
Download_list sometimes failed with:
BrokenPipeError: [Errno 32] Broken pipe
2019-09-20 13:39:51 +02:00
J. Fernando Sánchez
fadeced761 Fix tests py2 2019-09-19 12:18:56 +02:00
J. Fernando Sánchez
bdb844d75f Fix compatibility click >=7 2019-09-19 11:45:12 +02:00
J. Fernando Sánchez
653487e2d7 Improve download_list 2019-04-30 19:15:15 +02:00
J. Fernando Sánchez
02aec5eefa Fix bug user_ids
Add number of failed downloads to the output.
Add flag to retry previously failed downloads.
2018-09-16 12:20:41 +02:00
J. Fernando Sánchez
e6b08c4ffb Fix bug user ids 2018-08-30 15:57:49 +02:00
J. Fernando Sánchez
311b972b27 Fix typo and bump to version 0.9.1 2018-08-21 13:02:58 +02:00
J. Fernando Sánchez
7724967285 Add filter-edges 2018-08-21 12:57:03 +02:00
J. Fernando Sánchez
dd662acd22 Fix pip installation 2018-08-21 12:54:25 +02:00
J. Fernando Sánchez
5199d5b5aa Improve CLI. Add credentials 2018-03-20 13:29:18 +01:00
J. Fernando Sánchez
6259013978 Compose and bug fixes 2018-03-19 14:36:25 +01:00
J. Fernando Sánchez
53bb7edabc Add sh scripts 2018-03-19 14:35:07 +01:00
J. Fernando Sánchez
57eb73b53b Fix README 2017-12-28 18:13:58 +01:00
J. Fernando Sánchez
7c829ee102 Fix py2 compatibility 2017-12-20 16:51:53 +01:00
J. Fernando Sánchez
27bc3557b2 Fixed JSON regression 2017-12-19 20:44:55 +01:00
J. Fernando Sánchez
9c82dea298 Config from variable or file
This replaces the old file of credentials (with one per line) with a
configuration in YAML format.

The configuration can be stored either in a file or in an environment variable
(BITTER_CONFIG).

There is still a command line argument to add the credentials in that file to
the config.
2017-12-19 20:34:39 +01:00
34 changed files with 1163 additions and 321 deletions

1
.gitignore vendored
View File

@@ -2,6 +2,7 @@ __pycache__
*.egg-info *.egg-info
dist dist
env env
.env
__* __*
.* .*
*.pyc *.pyc

View File

@@ -2,6 +2,6 @@
From python:2.7-onbuild From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]" RUN pip install ".[server]"
ENTRYPOINT ["bitter"] ENTRYPOINT ["bitter"]

View File

@@ -2,6 +2,6 @@
From python:3.4-onbuild From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]" RUN pip install ".[server]"
ENTRYPOINT ["bitter"] ENTRYPOINT ["bitter"]

View File

@@ -2,6 +2,6 @@
From python:{{PYVERSION}}-onbuild From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]" RUN pip install ".[server]"
ENTRYPOINT ["bitter"] ENTRYPOINT ["bitter"]

View File

@@ -1,4 +1,4 @@
PYVERSIONS=3.4 2.7 PYVERSIONS=3.5
PYMAIN=$(firstword $(PYVERSIONS)) PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter NAME=bitter
REPO=balkian REPO=balkian
@@ -19,7 +19,7 @@ Dockerfile-%: Dockerfile.template
dev-%: dev-%:
@docker start $(NAME)-dev$* || (\ @docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \ $(MAKE) build-$*; \
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \ docker run -d -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\ )\
docker exec -ti $(NAME)-dev$* bash docker exec -ti $(NAME)-dev$* bash
@@ -38,7 +38,7 @@ test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS)) testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-% test-%: build-%
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ; docker run --rm -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%: pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ; docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
@@ -71,6 +71,6 @@ pip_upload:
pip_test: $(addprefix pip_test-,$(PYVERSIONS)) pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build run: build
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' docker run --rm --env-file $$PWD/.env -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run .PHONY: test test-% build-% build test test_pip run

139
README.md
View File

@@ -1,4 +1,5 @@
# Description # Description
There are two parts to bitter. There are two parts to bitter.
First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts). First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts).
Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper. Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper.
@@ -29,16 +30,146 @@ e.g. to get the latest 500 tweets by the python software foundation:
``` ```
bitter api statuses/user_timeline --id thepsf --count 500 bitter api statuses/user_timeline --id thepsf --count 500
``` ```
# Credentials format
## Adding credentials
``` ```
{"user": "balkian", "consumer_secret": "xxx", "consumer_key": "xxx", "token_key": "xxx", "token_secret": "xxx"} bitter --config <YOUR CONFIGURATION FILE> credentials add
``` ```
By default, bitter uses '~/.bitter-credentials.json', but you may choose a different file: You can specify the parameters in the command or let the command line guide you through the process.
# Examples
## Downloading a list of tweets
Bitter can download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.
You can even specify the column number for tweet ids.
Bitter will not try to download
``` ```
python -m bitter -c <credentials_file> ... Usage: bitter tweet get_all [OPTIONS] TWEETSFILE
Download tweets from a list of tweets in a CSV file. The result is stored
as individual json files in your folder of choice.
Options:
-f, --folder TEXT
-d, --delimiter TEXT
-h, --header Discard the first line (use it as a header)
-q, --quotechar TEXT
-c, --column INTEGER
--help Show this message and exit.
```
For instance, this will download `tweet_ids.csv` in the `tweet_info` folder:
```
bitter tweet get_all -f tweet_info tweet_ids.csv
```
## Downloading a list of users
Bitter downloads users and tweets in a similar way:
```
Usage: bitter users get_all [OPTIONS] USERSFILE
Download users from a list of user ids/screen names in a CSV file. The
result is stored as individual json files in your folder of choice.
Options:
-f, --folder TEXT
-d, --delimiter TEXT
-h, --header Discard the first line (use it as a header)
-q, --quotechar TEXT
-c, --column INTEGER
--help Show this message and exit.
```
The only difference is that users can be downloaded via `screen_name` or `user_id`.
This method does not try to resolve screen names to user ids, so users may be downloaded more than once if they appear in both ways.
## Downloading a stream
```
Usage: bitter stream get [OPTIONS]
Options:
-l, --locations TEXT
-t, --track TEXT
-f, --file TEXT File to store the stream of tweets. Default: standard output
-p, --politelyretry Politely retry after a hangup/connection error
--help Show this message and exit.
```
```
bitter --config .bitter.yaml stream get
```
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
## REST queries
In newer versions of bitter, individual methods to download tweets/users using the REST API are being replaced with a generic method to call the API.
```
bitter api <URL endpoint> --parameter VALUE ... | [--tweets | --users] [--max_count MAX_COUNT] [--count COUNT_PER_CALL]
```
For instance:
```
# Get 100 tweets that mentioned Obama after tweet 942689870501302300
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama
```
That is equivalent to this call to the api: `api/1.1/searc/tweets?since_id=942689870501302300&count=100&q=Obama`.
The flags `--tweets` and `--users` are optional.
If you use them, bitter will try to intelligently fetch all the tweets/users by using pagination with the API.
For example:
```
# Download 1000 tweets, 100 tweets per call.
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama --max_count=1000 --tweets
```
```
# Download all the followers of @balkian
bitter api 'followers/list' --_id balkian --users --max_count -1
```
Note that some reserved words (such as `id`) have to be preceeded by an underscore.
This limitation is imposed by the python-twitter library.
# Configuration format
```
credentials:
- user: "balkian"
consumer_secret: "xxx"
consumer_key: "xxx"
token_key: "xxx"
token_secret: "xxx"
- user: ....
```
By default, bitter uses '~/.bitter.yaml', but you may choose a different file:
```
python -m bitter --config <config_file> ...
```
Or use an environment variable:
```
export BITTER_CONFIG=$(cat myconfig.yaml)
``` ```
# Server # Server

10
bin/README.md Normal file
View File

@@ -0,0 +1,10 @@
Scripts to process jsonlines
To get the jsonlines file, you can use the streaming API or the search api, like so:
```
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
```
To keep track of the query that generated the file, you can save the command in a text file.
For instance, the example above is also in `example_query.sh`.

1
bin/example_query.sh Executable file
View File

@@ -0,0 +1 @@
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines

13
bin/extract-hashtags.sh Executable file
View File

@@ -0,0 +1,13 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export FIELDS="created_at,id,text"
for i in "$@"
do
OUTPUT=$i.hashtags.csv
echo "$FIELDS" > $OUTPUT
pv -l $i -N "hashtags $i" | jq -r '. | .created_at as $created_at | .id_str as $id | .entities.hashtags | select(. != null) | .[] | [$created_at, $id, .text] | @csv' >> $OUTPUT
done

15
bin/extract-interactions.sh Executable file
View File

@@ -0,0 +1,15 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
for i in "$@"
do
REPLYOUTPUT=$i.replies.csv
RTOUTPUT=$i.rts.csv
echo 'created_at,id,user_id,reply_user_id' > $REPLYOUTPUT
echo 'created_at,id,user_id,rt_user_id' > $RTOUTPUT
pv -l -N "$i" $i | jq -r '. | select(.in_reply_to_user_id_str != null) | [.created_at, .id_str, .user.id_str, .in_reply_to_user_id_str] | @csv' >> $REPLYOUTPUT
pv -l -N "$i" $i | jq -r '. | select(.retweeted_status != null) | [.created_at, .retweeted_status.id_str, .user.id_str, .retweeted_status.user.id_str] | @csv' >> $RTOUTPUT
done

16
bin/extract-limits.sh Executable file
View File

@@ -0,0 +1,16 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export QUERY='.limit | select(. != null) | [.timestamp_ms, .track] | @csv'
export FIELDS="timestamp,track"
for i in "$@"
do
OUTPUT=$i.limits.csv
echo $FIELDS > $OUTPUT
pv -N "$i limits" -l $i | jq -r "$QUERY" >> $OUTPUT
done

16
bin/extract-media.sh Executable file
View File

@@ -0,0 +1,16 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export QUERY='select(.id != null) | .id_str as $id | .entities.urls[] | select(.expanded_url | select(. != null) | contains("open.spotify") or contains("youtube.com") or contains("youtu.be")) | [$id, .expanded_url] | @csv'
export FIELDS="id,url"
for i in "$@"
do
OUTPUT=$i.media.csv
echo $FIELDS > $OUTPUT
pv -N "$i media" -l $i | jq -r "$QUERY" >> $OUTPUT
done

28
bin/extract-users.sh Executable file
View File

@@ -0,0 +1,28 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export USER_FIELDS="\$created_at,\
.id_str,\
.screen_name,\
.followers_count,\
.lang,\
.description,\
.statuses_count,\
.favourites_count,\
.friends_count,\
.created_at,\
.name,\
.location,\
.listed_count,\
.time_zone\
"
for i in "$@"
do
OUTPUT=$i.users.csv
echo \#$USER_FIELDS > $OUTPUT
jq -r ".created_at as \$created_at | .user,.retweeted_status.user | select(. != null) | [$USER_FIELDS] | @csv " $i | pv -N "$i" -l >> $OUTPUT
done

32
bin/extract.sh Executable file
View File

@@ -0,0 +1,32 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
FIELDS=".id_str,\
.user.screen_name,\
.user.id,\
.favorite_count,\
.retweet_count,\
.quote_count,\
.reply_count,\
.created_at,\
.lang,\
.in_reply_to_user_id_str,\
.in_reply_to_status_id_str,\
.retweeted_status.id_str,\
.retweeted_status.user.id,\
.retweeted_status.favorite_count,\
.retweeted_status.retweet_count,\
.retweeted_status.quote_count,\
.retweeted_status.reply_count,\
.retweeted_status.created_at\
"
for i in "$@"
do
OUTPUT=$i.tweets.csv
echo "$FIELDS" | sed -e 's/,[ \t\n]*\./,/g' | sed -e 's/^[#]\?\.//' > $OUTPUT
jq -r "[$FIELDS]|@csv" $i | pv -N "$i" -l >> $OUTPUT
done

17
bin/extract_extended.sh Executable file
View File

@@ -0,0 +1,17 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
QUERY='.| select(.retweeted_status != null) | .retweeted_status | .id_str as $rt_id | .extended_tweet | select(. != null) | [$rt_id,.full_text]|@csv'
HEADER='rt_id,full_text'
for i in "$@"
do
OUTPUT=$i.full_text.csv
echo $HEADER > $OUTPUT
jq "$QUERY" $i | pv -N "$i" -l >> $OUTPUT
sort -u $OUTPUT -o $OUTPUT
sed -ri 's/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g' $OUTPUT
done

16
bin/extract_text.sh Executable file
View File

@@ -0,0 +1,16 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
QUERY='(.full_text // .retweeted_status.full_text) as $text | [ .id_str,$text ] | @csv'
HEADER='id,text'
for i in "$@"
do
OUTPUT=$i.text.csv
echo $HEADER > $OUTPUT
pv -l -N "$i" $i | jq -r "$QUERY" >> $OUTPUT
# sed -ri s/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g $OUTPUT
done

10
bin/filter-edges.sh Normal file
View File

@@ -0,0 +1,10 @@
if [ "$#" -lt 2 ]
then
echo "Find edge lines in a file that contain one of the users in a user list."
echo ""
echo "Usage: $0 <file with edges> <file with the list of users>"
exit 1
fi
pv -c -N 'read' "$1" | grep -F -w -f "$2" | pv -lc -N 'found'

23
bin/functions.py Normal file
View File

@@ -0,0 +1,23 @@
import pandas as pd
def read_rts(rtsfile, tweetsfile):
tweets = pd.read_csv(tweetsfile, index_col=0)
rts = pd.read_csv(rtsfile, index_col=1)
merged = rts.groupby(by=['id', 'rt_user_id']).size().rename('count').reset_index(level=1).merge(tweets, left_index=True, right_index=True)
return merged.sort_values(by='count', ascending=False)
def read_tweets(tweetsfile):
'''When the dataset is small enough, we can load tweets as-in'''
with open(tweetsfile) as f:
header = f.readline().strip().split(',')
dtypes = {}
for key in header:
if key.endswith('_str') or key.endswith('.id'):
dtypes[key] = object
tweets = pd.read_csv(tweetsfile, dtype=dtypes, index_col=0)
return tweets
if __name__ == '__main__':
import argparse

1
bin/print-hashtags.sh Executable file
View File

@@ -0,0 +1 @@
cat "$@" | awk -F"," '{print tolower($3)}' | sort | uniq -c | sort -h

14
bin/print-replies.sh Executable file
View File

@@ -0,0 +1,14 @@
MAX_TAGS=100
function get_text {
while read line
do
echo $line
rtid=$(echo $line | awk -F"," '{print $2}')
text=$(grep -m 1 $rtid *.text.csv)
echo "$line - $text"
done < "/dev/stdin"
}
cat "$@" | get_text

15
bin/print-rts.sh Executable file
View File

@@ -0,0 +1,15 @@
MAX_TAGS=100
function get_text {
while read line
do
echo $line
rtid=$(echo $line | awk '{print $2}')
count=$(echo $line | awk '{print $1}')
text=$(grep -m 1 $rtid *.text.csv)
echo "$line - $text"
done < "/dev/stdin"
}
cat "$@" | awk -F"," '{print tolower($2)}' | sort | uniq -c | sort -h | tail -n $MAX_TAGS | get_text

View File

@@ -1 +1 @@
0.7.4 0.10.3

View File

@@ -6,10 +6,12 @@ http://github.com/balkian/bitter
import os import os
from .version import __version__ from .version import __version__
from . import config as bconf
def easy(*args, **kwargs): def easy(conffile=bconf.CONFIG_FILE):
from .crawlers import TwitterQueue from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
return TwitterQueue.from_config(conffile=conffile)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -26,19 +26,125 @@ else:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def serialize(function):
'''Common options to serialize output to CSV or other formats'''
@click.option('--fields', help='Provide a list of comma-separated fields to print.', default='', type=str)
@click.option('--ignore_missing', help='Do not show warnings for missing fields.', is_flag=True)
@click.option('--header', help='Header that will be printed at the beginning of the file', default=None)
@click.option('--csv', help='Print each object as a csv row.', is_flag=True)
@click.option('--jsonlines', '--json', help='Print each object as JSON in a new line.', is_flag=True)
@click.option('--indented', help='Print each object as an indented JSON object', is_flag=True)
@click.option('--outdelimiter', help='Delimiter for some output formats, such as CSV. It defaults to \t', default='\t')
@click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout)
def decorated(fields, ignore_missing, header, csv, jsonlines, indented, outfile, outdelimiter, **kwargs):
it = function(**kwargs)
outformat = 'json'
if csv:
outformat = 'csv'
elif jsonlines:
outformat = 'jsonlines'
elif indented:
outformat = 'indented'
return utils.serialized(it, outfile, outformat=outformat, fields=fields.split(','), ignore_missing=ignore_missing, header=header, delimiter=outdelimiter)
return decorated
@click.group() @click.group()
@click.option("--verbose", is_flag=True) @click.option("--verbose", is_flag=True)
@click.option("--logging_level", required=False, default='WARN') @click.option("--logging_level", required=False, default='WARN')
@click.option("--config", required=False) @click.option('--config', show_default=True, default=bconf.CONFIG_FILE)
@click.option('-c', '--credentials', show_default=True, default='~/.bitter-credentials.json') @click.option('--credentials', show_default=True, help="DEPRECATED: If specified, these credentials will be copied to the configuratation file.", default=bconf.CREDENTIALS)
@click.pass_context @click.pass_context
def main(ctx, verbose, logging_level, config, credentials): def main(ctx, verbose, logging_level, config, credentials):
logging.basicConfig(level=getattr(logging, logging_level)) logging.basicConfig(level=getattr(logging, logging_level))
ctx.obj = {} ctx.obj = {}
ctx.obj['VERBOSE'] = verbose ctx.obj['VERBOSE'] = verbose
ctx.obj['CONFIG'] = config bconf.CONFIG_FILE = config
bconf.CREDENTIALS = credentials bconf.CREDENTIALS = credentials
utils.create_credentials(credentials) if os.path.exists(utils.get_config_path(credentials)):
utils.copy_credentials_to_config(credentials, config)
@main.group(invoke_without_command=True)
@click.pass_context
def credentials(ctx):
if ctx.invoked_subcommand is not None:
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for worker in wq.queue:
print('#'*20)
try:
resp = worker.client.application.rate_limit_status()
print(worker.name)
except Exception as ex:
print('{}: AUTHENTICATION ERROR: {}'.format(worker.name, ex) )
@credentials.command('limits')
@click.option('--no_aggregate', is_flag=True, default=False,
help=('Print limits from all workers. By default, limits are '
'aggregated (summed).'))
@click.option('--no_diff', is_flag=True, default=False,
help=('Print all limits. By default, only limits that '
'have been consumed will be shown.'))
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, no_aggregate, no_diff, url):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
limits = {}
if url:
print('URL is: {}'.format(url))
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
for urlimits in resp['resources'].values():
for url, value in urlimits.items():
if url not in limits:
limits[url] = {}
glob = limits[url].get('global', {})
limits[url][worker.name] = value
for k in ['limit', 'remaining']:
if k not in glob:
glob[k] = 0
glob[k] += value[k]
limits[url]['global'] = glob
for url, lims in limits.items():
worker_list = lims.keys() if no_aggregate else ['global', ]
url_printed = False
for worker in worker_list:
vals = lims[worker]
consumed = vals['limit'] - vals['remaining']
if no_diff or consumed:
if not url_printed:
print(url)
url_printed = True
print('\t', worker, ':')
print('\t\t', vals)
@credentials.command('add')
@click.option('--consumer_key', default=None)
@click.option('--consumer_secret', default=None)
@click.option('--token_key', default=None)
@click.option('--token_secret', default=None)
@click.argument('user_name')
def add(user_name, consumer_key, consumer_secret, token_key, token_secret):
if not consumer_key:
consumer_key = click.prompt('Please, enter your YOUR CONSUMER KEY')
if not consumer_secret:
consumer_secret = click.prompt('Please, enter your CONSUMER SECRET')
if not token_key:
token_key = click.prompt('Please, enter your ACCESS TOKEN')
if not token_secret:
token_secret = click.prompt('Please, enter your ACCESS TOKEN SECRET')
utils.add_credentials(conffile=bconf.CONFIG_FILE, user=user_name, consumer_key=consumer_key, consumer_secret=consumer_secret,
token_key=token_key, token_secret=token_secret)
click.echo('Credentials added for {}'.format(user_name))
@main.group() @main.group()
@click.pass_context @click.pass_context
@@ -46,35 +152,61 @@ def tweet(ctx):
pass pass
@tweet.command('get') @tweet.command('get')
@click.option('-w', '--write', is_flag=True, default=False) @click.option('-d', '--dry_run', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid') @click.argument('tweetid')
def get_tweet(tweetid, write, folder, update): @serialize
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) def get_tweet(tweetid, dry_run, folder, update):
utils.download_tweet(wq, tweetid, write, folder, update) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
yield from utils.download_tweet(wq, tweetid, not dry_run, folder, update)
@tweet.command('get_all') @tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file.
@click.argument('tweetsfile', 'File with a list of tweets to look up') The result is stored as individual json files in your folder of choice.''')
@click.argument('tweetsfile')
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",")
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
@click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0)
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-q', '--quotechar', default='"')
@click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context @click.pass_context
def get_tweets(ctx, tweetsfile, folder): def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, nocache, skip, quotechar, commentchar, column):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) if update and not click.confirm('This may overwrite existing tweets. Continue?'):
utils.download_tweets(wq, tweetsfile, folder) click.echo('Cancelling')
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
status = tqdm('Queried')
failed = 0
for tid, obj in utils.download_tweets_file(wq, tweetsfile, folder, delimiter=delimiter, cache=not nocache,
skip=skip, quotechar=quotechar, commentchar=commentchar,
column=column, update=update, retry_failed=retry):
status.update(1)
if not obj:
failed += 1
status.set_description('Failed: %s. Queried' % failed, refresh=True)
continue
yield obj
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')
@serialize
@click.pass_context @click.pass_context
def search(ctx, query): def search(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
t = utils.search_tweet(wq, query) yield from utils.search_tweet(wq, query)
print(json.dumps(t, indent=2))
@tweet.command('timeline') @tweet.command('timeline')
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def timeline(ctx, user): def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
t = utils.user_timeline(wq, user) t = utils.user_timeline(wq, user)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@@ -96,43 +228,48 @@ def list_users(ctx, db):
@users.command('get') @users.command('get')
@click.argument('user') @click.argument('user')
@click.option('-w', '--write', is_flag=True, default=False) @click.option('-d', '--dry_run', is_flag=True, default=False)
@click.option('-f', '--folder', default="users") @click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False) @click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
def get_user(user, write, folder, update): @serialize
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) def get_user(user, dry_run, folder, update):
if not write: wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
u = utils.get_user(wq, user) yield from utils.download_user(wq, user, not dry_run, folder, update)
js = json.dumps(u, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % user)
if not update and os.path.exists(file) and os.path.isfile(file):
print('User exists: %s' % user)
return
with open(file, 'w') as f:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js, file=f)
@users.command('get_all') @users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file.
@click.argument('usersfile', 'File with a list of users to look up') The result is stored as individual json files in your folder of choice.''')
@click.argument('usersfile')
@click.option('-f', '--folder', default="users") @click.option('-f', '--folder', default="users")
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
@click.option('-d', '--delimiter', default=",")
@click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)',
is_flag=True, default=False)
@click.option('-q', '--quotechar', default='"')
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context @click.pass_context
def get_users(ctx, usersfile, folder): def get_users(ctx, usersfile, folder, update, retry, nocache, delimiter, skip, quotechar, commentchar, column):
with open(usersfile) as f: if update and not click.confirm('This may overwrite existing users. Continue?'):
for line in f: click.echo('Cancelling')
uid = line.strip() return
ctx.invoke(get_user, folder=folder, user=uid, write=True) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for i in utils.download_users_file(wq, usersfile, folder, delimiter=delimiter,
update=update, retry_failed=retry,
skip=skip, quotechar=quotechar,
cache=not nocache,
commentchar=commentchar,
column=column):
yield i
@users.command('crawl') @users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.') @click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.') @click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up') @click.argument('usersfile')
@click.pass_context @click.pass_context
def crawl_users(ctx, usersfile, skip, until, threads, db): def crawl_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
@@ -146,7 +283,7 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
return ExitStack() return ExitStack()
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue))) len(wq.queue)))
@@ -310,7 +447,7 @@ def users_extractor(ctx):
@click.pass_context @click.pass_context
def extract(ctx, recursive, user, name, initfile): def extract(ctx, recursive, user, name, initfile):
print(locals()) print(locals())
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
dburi = ctx.obj['DBURI'] dburi = ctx.obj['DBURI']
utils.extract(wq, utils.extract(wq,
recursive=recursive, recursive=recursive,
@@ -322,56 +459,40 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset') @extractor.command('reset')
@click.pass_context @click.pass_context
def reset_extractor(ctx): def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
db = ctx.obj['DBURI'] db = ctx.obj['DBURI']
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@main.command('limits')
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
print('#'*20)
print(worker.name)
if url:
limit = 'NOT FOUND'
print('URL is: {}'.format(url))
cat = url.split('/')[1]
if cat in resp['resources']:
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
else:
print('Cat {} not found'.format(cat))
print('{}: {}'.format(url, limit))
else:
print(json.dumps(resp, indent=2))
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False),
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False)) help='''Issue a call to an endpoint of the Twitter API.''')
@click.argument('cmd', nargs=1) @click.argument('cmd', nargs=1)
@click.option('--tweets', is_flag=True, help='Fetch more tweets using smart pagination. Use --count to control how many tweets to fetch per call, and --max_count to set the number of desired tweets (or -1 to get as many as possible).', type=bool, default=False)
@click.option('--users', is_flag=True, help='Fetch more users using smart pagination. Use --count to control how many users to fetch per call, and --max_count to set the number of desired users (or -1 to get as many as possible).', type=bool, default=False)
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED) @click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
@click.pass_context @click.pass_context
def api(ctx, cmd, api_args): def api(ctx, cmd, tweets, users, api_args):
opts = {} opts = {}
mappings = {
'id': '_id'
}
i = iter(api_args) i = iter(api_args)
for k, v in zip(i, i): for k, v in zip(i, i):
k = k.replace('--', '') k = k.replace('--', '')
if k in mappings:
k = mappings[k]
opts[k] = v opts[k] = v
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS) wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
resp = utils.consume_feed(wq[cmd], **opts) if tweets:
# A hack to stream jsons resp = utils.consume_tweets(wq[cmd], **opts)
print('[') elif users:
first = True resp = utils.consume_users(wq[cmd], **opts)
for i in resp:
if not first:
print(',')
else: else:
first = False resp = wq[cmd](**opts)
print(json.dumps(resp))
print(json.dumps(i, indent=2)) return
print(']') for i in resp:
print(json.dumps(i))
@main.command('server') @main.command('server')
@@ -392,11 +513,11 @@ def stream(ctx):
@stream.command('get') @stream.command('get')
@click.option('-l', '--locations', default=None) @click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None) @click.option('-t', '--track', default=None)
@click.option('-f', '--file', help='File to store the stream of tweets') @click.option('-f', '--file', default=None, help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True) @click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context @click.pass_context
def get_stream(ctx, locations, track, file, politelyretry): def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1) wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1)
query_args = {} query_args = {}
if locations: if locations:
@@ -415,10 +536,14 @@ def get_stream(ctx, locations, track, file, politelyretry):
iterator = wq.statuses.sample() iterator = wq.statuses.sample()
else: else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75") iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
try:
for i in iterator: for i in iterator:
yield i yield i
if not politelyretry: if not politelyretry:
return return
except Exception:
if not politelyretry:
raise ex
thishangup = time.time() thishangup = time.time()
if thishangup - lasthangup < 60: if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.') raise Exception('Too many hangups in a row.')
@@ -436,7 +561,7 @@ def get_stream(ctx, locations, track, file, politelyretry):
def read_stream(ctx, file, tail): def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail): for tweet in utils.read_file(file, tail=tail):
try: try:
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text'])) print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['created_at'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
except (KeyError, TypeError): except (KeyError, TypeError):
print('Raw tweet: {}'.format(tweet)) print('Raw tweet: {}'.format(tweet))

View File

@@ -11,3 +11,4 @@ E.g.:
app.run() app.run()
''' '''
CREDENTIALS = '~/.bitter-credentials.json' CREDENTIALS = '~/.bitter-credentials.json'
CONFIG_FILE = '~/.bitter.yaml'

View File

@@ -58,6 +58,18 @@ class FromCredentialsMixin(object):
wq.ready(cls.worker_class(cred["user"], cred)) wq.ready(cls.worker_class(cred["user"], cred))
return wq return wq
class FromConfigMixin(object):
@classmethod
def from_config(cls, config=None, conffile=None, max_workers=None):
wq = cls()
if not config:
with utils.config(conffile) as c:
config = c
for cred in islice(config['credentials'], max_workers):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class TwitterWorker(object): class TwitterWorker(object):
api_class = None api_class = None
@@ -103,13 +115,14 @@ class RestWorker(TwitterWorker):
def get_wait(self, uriparts): def get_wait(self, uriparts):
limits = self.get_limit(uriparts) limits = self.get_limit(uriparts)
if limits['remaining'] > 0: if limits.get('remaining', 1) > 0:
return 0 return 0
reset = limits.get('reset', 0) reset = limits.get('reset', 0)
now = time.time() now = time.time()
return max(0, (reset-now)) return max(0, (reset-now))
def get_limit(self, uriparts): def get_limit(self, uriparts):
uriparts = list(u for u in uriparts if u)
uri = '/'+'/'.join(uriparts) uri = '/'+'/'.join(uriparts)
for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items(): for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
if ix.startswith(uri): if ix.startswith(uri):
@@ -142,7 +155,7 @@ class RestWorker(TwitterWorker):
class QueueException(BaseException): class QueueException(BaseException):
pass pass
class QueueMixin(AttrToFunc, FromCredentialsMixin): class QueueMixin(AttrToFunc, FromCredentialsMixin, FromConfigMixin):
def __init__(self, wait=True): def __init__(self, wait=True):
logger.debug('Creating worker queue') logger.debug('Creating worker queue')
self.queue = set() self.queue = set()

View File

@@ -3,11 +3,13 @@ import json
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import BigInteger, Integer, Text, Boolean from sqlalchemy.types import BigInteger, Integer, Text, Boolean
from sqlalchemy.schema import ForeignKey
from sqlalchemy.pool import SingletonThreadPool from sqlalchemy.pool import SingletonThreadPool
from sqlalchemy import Column, Index from sqlalchemy import Column, Index
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from functools import wraps
Base = declarative_base() Base = declarative_base()
@@ -90,6 +92,34 @@ class ExtractorEntry(Base):
busy = Column(Boolean, default=False) busy = Column(Boolean, default=False)
class Search(Base):
__tablename__ = 'search_queries'
id = Column(Integer, primary_key=True, index=True, unique=True)
endpoint = Column(Text, comment="Endpoint URL")
attrs = Column(Text, comment="Text version of the dictionary of parameters")
count = Column(Integer)
current_count = Column(Integer)
current_id = Column(BigInteger, comment='Oldest ID retrieved (should match max_id when done)')
since_id = Column(BigInteger)
class SearchResults(Base):
__tablename__ = 'search_results'
id = Column(Integer, primary_key=True, index=True, unique=True)
search_id = Column(ForeignKey('search_queries.id'))
resource_id = Column(Text)
def memoize(f):
memo = {}
@wraps(f)
def helper(self, **kwargs):
st = dict_to_str(kwargs)
key = (self.__uriparts, st)
if key not in memo:
memo[key] = f(self, **kwargs)
return memo[key]
return helper
def make_session(url): def make_session(url):
if not isinstance(url, str): if not isinstance(url, str):
print(url) print(url)
@@ -100,24 +130,6 @@ def make_session(url):
session = Session() session = Session()
return session return session
def test(db='sqlite:///users.db'):
from sqlalchemy import exists def dict_to_str(args):
session = make_session(db) return json.dumps(args, sort_keys=True)
our_user = session.query(User).first()
print(our_user.name)
print(session.query(User).count())
fake_user = User(name="Fake user")
session.add(fake_user)
session.commit()
print(session.query(User).count())
print(session.query(exists().where(User.name == "Fake user")).scalar())
fake_committed = session.query(User).filter_by(name="Fake user").first()
print(fake_committed.id)
print(fake_committed.name)
session.delete(fake_committed)
session.commit()
print(session.query(User).count())
print(list(session.execute('SELECT 1 from users where id=\'%s\'' % 1548)))

View File

@@ -3,6 +3,9 @@ from __future__ import print_function
import logging import logging
import time import time
import json import json
import yaml
import csv
import io
import signal import signal
import sys import sys
@@ -10,8 +13,15 @@ import sqlalchemy
import os import os
import multiprocessing import multiprocessing
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from multiprocessing import Queue
from functools import partial import queue
import threading
from select import select
import operator
from functools import partial, reduce
from tqdm import tqdm from tqdm import tqdm
@@ -19,6 +29,7 @@ from itertools import islice, chain
from contextlib import contextmanager from contextlib import contextmanager
from collections import Counter from collections import Counter
from random import choice
from builtins import map, filter from builtins import map, filter
@@ -28,6 +39,12 @@ from bitter.models import Following, User, ExtractorEntry, make_session
from bitter import config from bitter import config
# Fix Python 2.x.
try:
UNICODE_EXISTS = bool(type(unicode))
except NameError:
unicode = lambda s: str(s)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -44,36 +61,91 @@ def chunk(iterable, n):
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize) source = chunk(source, chunksize)
p = ThreadPool(numcpus*2) p = ThreadPool(numcpus*2)
results = p.imap_unordered(func, source, chunksize=int(1000/numcpus))
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as ex:
print('Exception on parallel thread: {}'.format(ex), file=sys.stderr)
results = p.imap_unordered(wrapped_func, source)
for i in chain.from_iterable(results): for i in chain.from_iterable(results):
yield i yield i
def get_credentials_path(credfile=None): def get_config_path(conf=None):
if not credfile: if not conf:
if config.CREDENTIALS: if config.CONFIG_FILE:
credfile = config.CREDENTIALS conf = config.CONFIG_FILE
else: else:
raise Exception('No valid credentials file') raise Exception('No valid config file')
return os.path.expanduser(credfile) return os.path.expanduser(conf)
def copy_credentials_to_config(credfile, conffile=None):
p = get_config_path(credfile)
with open(p) as old:
for line in old:
cred = json.loads(line.strip())
add_credentials(conffile, **cred)
def save_config(conf, conffile=None):
with config(conffile) as c:
c.clear()
c.update(conf)
@contextmanager @contextmanager
def credentials_file(credfile, *args, **kwargs): def config(conffile=None):
p = get_credentials_path(credfile) d = read_config(conffile)
with open(p, *args, **kwargs) as f: try:
yield f yield d
finally:
write_config(d, conffile)
def iter_credentials(credfile=None): def read_config(conffile):
with credentials_file(credfile) as f: p = conffile and get_config_path(conffile)
for l in f: if p:
yield json.loads(l.strip()) if not os.path.exists(p):
raise IOError('{} file does not exist.'.format(p))
f = open(p, 'r')
elif 'BITTER_CONFIG' not in os.environ:
raise Exception('No config file or BITTER_CONFIG env variable.')
else:
f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n'))
return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []}
def get_credentials(credfile=None, inverse=False, **kwargs): def write_config(conf, conffile=None):
if not conf:
conf = {'credentials': []}
if conffile:
p = get_config_path(conffile)
with open(p, 'w') as f:
yaml.dump(conf, f)
else:
os.environ['BITTER_CONFIG'] = yaml.dump(conf)
def iter_credentials(conffile=None):
with config(conffile) as c:
for i in c['credentials']:
yield i
def create_config_file(conffile=None):
if not conffile:
return
conffile = get_config_path(conffile)
with open(conffile, 'a'):
pass
write_config(None, conffile)
def get_credentials(conffile=None, inverse=False, **kwargs):
creds = [] creds = []
for i in iter_credentials(credfile): for i in iter_credentials(conffile):
matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items())) matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items()))
if matches and not inverse: if matches and not inverse:
creds.append(i) creds.append(i)
@@ -82,26 +154,22 @@ def get_credentials(credfile=None, inverse=False, **kwargs):
return creds return creds
def create_credentials(credfile=None): def delete_credentials(conffile=None, **creds):
credfile = get_credentials_path(credfile) tokeep = get_credentials(conffile, inverse=True, **creds)
with credentials_file(credfile, 'a'): with config(conffile) as c:
pass c['credentials'] = list(tokeep)
def delete_credentials(credfile=None, **creds): def add_credentials(conffile=None, **creds):
tokeep = get_credentials(credfile, inverse=True, **creds) try:
with credentials_file(credfile, 'w') as f: exist = get_credentials(conffile, **creds)
for i in tokeep: except IOError:
f.write(json.dumps(i)) exist = False
f.write('\n') create_config_file(conffile)
if exist:
return
def add_credentials(credfile=None, **creds): with config(conffile) as c:
exist = get_credentials(credfile, **creds) c['credentials'].append(creds)
if not exist:
with credentials_file(credfile, 'a') as f:
f.write(json.dumps(creds))
f.write('\n')
def get_hashtags(iter_tweets, best=None): def get_hashtags(iter_tweets, best=None):
@@ -112,7 +180,11 @@ def get_hashtags(iter_tweets, best=None):
def read_file(filename, tail=False): def read_file(filename, tail=False):
with open(filename) as f: if filename == '-':
f = sys.stdin
else:
f = open(filename)
try:
while True: while True:
line = f.readline() line = f.readline()
if line not in (None, '', '\n'): if line not in (None, '', '\n'):
@@ -123,6 +195,9 @@ def read_file(filename, tail=False):
time.sleep(1) time.sleep(1)
else: else:
return return
finally:
if f != sys.stdin:
close(f)
def get_users(wq, ulist, by_name=False, queue=None, max_users=100): def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
@@ -204,8 +279,7 @@ def download_entry(wq, entry_id, dburi=None, recursive=False):
download_user(wq, session, user, entry, recursive) download_user(wq, session, user, entry, recursive)
session.close() session.close()
def crawl_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
total_followers = user.followers_count total_followers = user.followers_count
@@ -353,10 +427,13 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
pending = pending_entries(dburi) pending = pending_entries(dburi)
session.close() session.close()
for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users): with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq:
for i in tq:
tq.write('Got {}'.format(i))
logger.info("Got %s" % i) logger.info("Got %s" % i)
def pending_entries(dburi): def pending_entries(dburi):
session = make_session(dburi) session = make_session(dburi)
while True: while True:
@@ -383,7 +460,7 @@ def get_tweet(c, tid):
return c.statuses.show(id=tid) return c.statuses.show(id=tid)
def search_tweet(c, query): def search_tweet(c, query):
return c.search.tweets(q=query) yield from c.search.tweets(q=query)['statuses']
def user_timeline(c, query): def user_timeline(c, query):
try: try:
@@ -398,117 +475,329 @@ def get_user(c, user):
except ValueError: except ValueError:
return c.users.lookup(screen_name=user)[0] return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False): def download_tweet(wq, tweetid, cache=True, folder="downloaded_tweets", update=False):
cached = cached_tweet(tweetid, folder) tweet = cached_id(tweetid, folder)
tweet = None if update or not tweet:
if update or not cached:
tweet = get_tweet(wq, tweetid) tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2) if cache and update:
if write:
if tweet: if tweet:
write_tweet_json(js, folder) js = json.dumps(tweet)
else: write_json(js, folder)
print(js) yield tweet
def cached_tweet(tweetid, folder): def download_user(wq, userid, cache=True, folder="downloaded_users", update=False):
user = cached_id(userid, folder)
if update or not user:
user = get_user(wq, userid)
if cache and update:
if user:
write_json(user, folder, aliases=[user['screen_name'], ])
yield user
def cached_id(oid, folder):
tweet = None tweet = None
file = os.path.join(folder, '%s.json' % tweetid) file = os.path.join(folder, '%s.json' % oid)
if os.path.exists(file) and os.path.isfile(file): if os.path.exists(file) and os.path.isfile(file):
try: try:
# print('%s: Tweet exists' % tweetid) # print('%s: Object exists' % oid)
with open(file) as f: with open(file) as f:
tweet = json.load(f) tweet = json.load(f)
except Exception as ex: except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex)) logger.error('Error getting cached version of {}: {}'.format(oid, ex))
return tweet return tweet
def write_tweet_json(js, folder): def write_json(js, folder, oid=None, aliases=[]):
tweetid = js['id'] if not oid:
file = tweet_file(tweetid, folder) oid = js['id']
file = id_file(oid, folder)
if not os.path.exists(folder): if not os.path.exists(folder):
os.makedirs(folder) os.makedirs(folder)
with open(file, 'w') as f: with open(file, 'w') as f:
json.dump(js, f, indent=2) json.dump(js, f)
logger.info('Written {} to file {}'.format(tweetid, file)) logger.info('Written {} to file {}'.format(oid, file))
for alias in aliases:
os.symlink('%s.json' % oid, id_file(alias, folder))
def tweet_file(tweetid, folder): def id_file(oid, folder):
return os.path.join(folder, '%s.json' % tweetid) return os.path.join(folder, '%s.json' % oid)
def tweet_fail_file(tweetid, folder): def fail_file(oid, folder):
failsfolder = os.path.join(folder, 'failed') failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder): if not os.path.exists(failsfolder):
os.makedirs(failsfolder) os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % tweetid) return os.path.join(failsfolder, '%s.failed' % oid)
def tweet_failed(tweetid, folder): def id_failed(oid, folder):
return os.path.isfile(tweet_fail_file(tweetid, folder)) return os.path.isfile(fail_file(oid, folder))
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True): def tweet_download_batch(wq, batch):
def filter_line(line): tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
tweetid = int(line) for tid, tweet in tweets.items():
# print('Checking {}'.format(tweetid)) yield tid, tweet
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
yield None
else:
yield line
def print_result(res): def user_download_batch(wq, batch):
tid, tweet = res screen_names = []
if tweet: user_ids = []
for elem in batch:
try: try:
write_tweet_json(tweet, folder=folder) int(elem)
yield 1 user_ids.append(str(elem))
except ValueError:
screen_names.append(elem.lower())
args = {}
if user_ids:
args['user_id'] = ','.join(user_ids)
if screen_names:
args['screen_name'] = ','.join(screen_names)
try:
users = wq.users.lookup(**args)
except TwitterHTTPError as ex:
if ex.e.code in (404,):
users = []
else:
raise
found_ids = []
found_names = []
for user in users:
uid = user['id_str']
if uid in user_ids:
found_ids.append(uid)
yield (uid, user)
uname = user['screen_name'].lower()
if uname in screen_names:
found_names.append(uname)
yield (uname, user)
for uid in set(user_ids) - set(found_ids):
yield (uid, None)
for name in set(screen_names) - set(found_names):
yield (name, None)
def dump_result(oid, obj, folder, ignore_fails=True):
if obj:
try:
write_json(obj, folder=folder, oid=oid)
failed = fail_file(oid, folder)
if os.path.exists(failed):
os.remove(failed)
except Exception as ex: except Exception as ex:
logger.error('%s: %s' % (tid, ex)) logger.error('%s: %s' % (oid, ex))
if not ignore_fails: if not ignore_fails:
raise raise
else: else:
logger.info('Tweet not recovered: {}'.format(tid)) logger.info('Object not recovered: {}'.format(oid))
with open(tweet_fail_file(tid, folder), 'w') as f: with open(fail_file(oid, folder), 'w') as f:
print('Tweet not found', file=f) print('Object not found', file=f)
yield -1
def download_batch(batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(tweetsfile) as f: def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, cache=True,
lines = map(lambda x: x.strip(), f) batch_method=tweet_download_batch):
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100) done = Queue()
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass down = Queue()
def filter_list(lst, done, down):
print('filtering')
for oid in lst:
cached = cached_id(oid, folder)
if (cached and not update):
done.put((oid, cached))
elif (id_failed(oid, folder) and not retry_failed):
done.put((oid, None))
else:
down.put(oid)
down.put(None)
def download_results(batch_method, down, done):
def gen():
while True:
r = down.get()
if r is None:
down.close()
down.join_thread()
return
yield r
for t in parallel(batch_method, gen(), 100):
done.put(t)
def batch(*args, **kwargs):
return batch_method(wq, *args, **kwargs)
tc = threading.Thread(target=filter_list, args=(lst, done, down), daemon=True)
tc.start()
td = threading.Thread(target=download_results, args=(batch, down, done), daemon=True)
td.start()
def check_threads(ts, done):
for t in ts:
t.join()
done.put(None)
wait = threading.Thread(target=check_threads, args=([tc, td], done), daemon=True)
wait.start()
while True:
rec = done.get()
if rec is None:
done.close()
done.join_thread()
break
oid, obj = rec
if cache or (not obj):
dump_result(oid, obj, folder, ignore_fails)
yield rec
wait.join()
def download_tweets_file(*args, **kwargs):
kwargs['batch_method'] = tweet_download_batch
yield from download_file(*args, **kwargs)
def download_users_file(*args, **kwargs):
kwargs['batch_method'] = user_download_batch
yield from download_file(*args, **kwargs)
def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0, cache=True,
quotechar='"', commentchar=None, batch_method=tweet_download_batch,
**kwargs):
with open(csvfile) as f:
if commentchar:
f = (line for line in f if not line.startswith('#'))
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
for n in range(skip):
next(csvreader)
def reader(r):
for row in csvreader:
if len(row) > column:
yield row[column].strip()
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, cache=cache,
**kwargs):
yield res
def download_timeline(wq, user): def download_timeline(wq, user):
return wq.statuses.user_timeline(id=user) return wq.statuses.user_timeline(id=user)
def consume_feed(func, *args, **kwargs): def _consume_feed(func, feed_control=None, **kwargs):
''' '''
Get all the tweets using pagination and a given method. Get all the tweets using pagination and a given method.
It can be controlled with the `count` parameter. It can be controlled with the `count` parameter.
If count < 0 => Loop until the whole feed is consumed. If max_count < 0 => Loop until the whole feed is consumed.
If count == 0 => Only call the API once, with the default values. If max_count == 0 => Only call the API once, with the default values.
If count > 0 => Get count tweets from the feed. If max_count > 0 => Get max_count tweets from the feed.
''' '''
remaining = int(kwargs.pop('count', 0)) remaining = int(kwargs.pop('max_count', 0))
consume = remaining < 0 count = int(kwargs.get('count', -1))
limit = False limit = False
# Simulate a do-while by updating the condition at the end # We need to at least perform a query, so we simulate a do-while
# by running once with no limit and updating the condition at the end
with tqdm(total=remaining) as pbar:
while not limit: while not limit:
if remaining > 0: if remaining > 0 and ((count < 0) or (count > remaining)):
kwargs['count'] = remaining kwargs['count'] = remaining
resp = func(*args, **kwargs) resp, stop = feed_control(func, kwargs, remaining=remaining, batch_size=count)
if not resp: if not resp:
return return
for t in resp: for entry in resp:
yield t yield entry
if consume: pbar.update(len(resp))
limit = stop
if remaining < 0:
# If the loop was run with a negative remaining, it will only stop
# when the control function tells it to.
continue continue
# Otherwise, check if we have already downloaded all the required items
remaining -= len(resp) remaining -= len(resp)
max_id = min(s['id'] for s in func(*args, **kwargs)) - 1 limit = limit or remaining <= 0
kwargs['max_id'] = max_id
limit = remaining <= 0
def consume_tweets(*args, **kwargs):
return _consume_feed(*args, feed_control=_tweets_control, **kwargs)
def consume_users(*args, **kwargs):
return _consume_feed(*args, feed_control=_users_control, **kwargs)
def _tweets_control(func, apiargs, remaining=0, **kwargs):
''' Return a list of entries, the remaining '''
resp = func(**apiargs)
if not resp:
return None, True
# Update the arguments for the next call
# Two options: either resp is a list, or a dict like:
# {'statuses': ... 'search_metadata': ...}
if isinstance(resp, dict) and 'search_metadata' in resp:
resp = resp['statuses']
max_id = min(s['id'] for s in resp) - 1
apiargs['max_id'] = max_id
return resp, False
def _users_control(func, apiargs, remaining=0, **kwargs):
resp = func(**apiargs)
stop = True
# Update the arguments for the next call
if 'next_cursor' in resp:
cursor = resp['next_cursor']
apiargs['cursor'] = cursor
if int(cursor) != -1:
stop = False
return resp['users'], stop
def serialized(it, outfile, outformat='csv', fields=[], header=None, ignore_missing=False, delimiter='\t'):
outformat = outformat.lower()
def do(out):
if outformat == 'csv':
writer = csv.writer(out, quoting=csv.QUOTE_ALL, delimiter=delimiter)
if header != '':
h = header
if h is None:
h = delimiter.join(fields)
print(h, file=out)
attrs = list(token.strip().split('.') for token in fields)
for obj in it:
values = []
for attr in attrs:
try:
values.append(reduce(operator.getitem, attr, obj))
except KeyError:
if not ignore_missing:
print('Key not present: {}'.format(attr), file=sys.stderr)
values.append(None)
writer.writerow(values)
elif outformat == 'jsonlines':
for obj in it:
print(json.dumps(obj, sort_keys=True), file=out)
elif outformat == 'indented':
for obj in it:
print(json.dumps(obj, indent=4, sort_keys=True), file=out)
else:
for obj in it:
print(obj, file=out)
if outfile is sys.stdout:
return do(sys.stdout)
with open(outfile, 'w') as out:
return do(out)

12
docker-compose.yml Normal file
View File

@@ -0,0 +1,12 @@
version: '2'
services:
dev:
build:
context: .
dockerfile: Dockerfile-3.4
volumes:
- '.:/usr/src/app'
tty: yes
working_dir: '/usr/src/app'
entrypoint: '/bin/bash'
command: ''

View File

@@ -2,3 +2,4 @@ sqlalchemy
twitter twitter
click click
tqdm tqdm
pyyaml

View File

@@ -1,29 +1,23 @@
import pip
from setuptools import setup from setuptools import setup
from pip.req import parse_requirements
# parse_requirements() returns generator of pip.req.InstallRequirement objects def parse_requirements(filename):
# pip 6 introduces the *required* session argument """ load requirements from a pip requirements file """
try: with open(filename, 'r') as f:
install_reqs = parse_requirements("requirements.txt", session=pip.download.PipSession()) lineiter = list(line.strip() for line in f)
py2_reqs = parse_requirements("requirements-py2.txt", session=pip.download.PipSession()) return [line for line in lineiter if line and not line.startswith("#")]
test_reqs = parse_requirements("test-requirements.txt", session=pip.download.PipSession())
except AttributeError:
install_reqs = parse_requirements("requirements.txt") install_reqs = parse_requirements("requirements.txt")
py2_reqs = parse_requirements("requirements-py2.txt") py2_reqs = parse_requirements("requirements-py2.txt")
test_reqs = parse_requirements("test-requirements.txt") test_reqs = parse_requirements("test-requirements.txt")
import sys import sys
import os
import itertools import itertools
if sys.version_info <= (3, 0): if sys.version_info <= (3, 0):
install_reqs = itertools.chain(install_reqs, py2_reqs) install_reqs = install_reqs + py2_reqs
# reqs is a list of requirement with open(os.path.join('bitter', 'VERSION'), 'r') as f:
# e.g. ['django==1.5.1', 'mezzanine==1.4.6'] __version__ = f.read().strip()
install_reqs = [str(ir.req) for ir in install_reqs]
test_reqs = [str(ir.req) for ir in test_reqs]
from bitter import __version__
setup( setup(
name="bitter", name="bitter",

View File

@@ -12,7 +12,11 @@ from bitter import config as c
class TestCrawlers(TestCase): class TestCrawlers(TestCase):
def setUp(self): def setUp(self):
self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json')) CONF_PATH = os.path.join(os.path.dirname(__file__), '.bitter.yaml')
if os.path.exists(CONF_PATH):
self.wq = easy(CONF_PATH)
else:
self.wq = easy()
def test_create_worker(self): def test_create_worker(self):
assert len(self.wq.queue)==1 assert len(self.wq.queue)==1

23
tests/test_models.py Normal file
View File

@@ -0,0 +1,23 @@
from unittest import TestCase
import os
import types
from bitter import utils
from bitter.models import *
from sqlalchemy import exists
class TestModels(TestCase):
def setUp(self):
self.session = make_session('sqlite://')
def test_user(self):
fake_user = User(name="Fake user", id=1548)
self.session.add(fake_user)
self.session.commit()
fake_committed = self.session.query(User).filter_by(name="Fake user").first()
assert fake_committed
self.session.delete(fake_committed)
self.session.commit()
assert not list(self.session.execute('SELECT 1 from users where id=\'%s\'' % 1548))

View File

@@ -8,56 +8,63 @@ from bitter import config as c
class TestUtils(TestCase): class TestUtils(TestCase):
configfile = '/tmp/bitter.yaml'
def setUp(self): def setUp(self):
self.credfile = '/tmp/credentials.txt' c.CONFIG_FILE = self.configfile
c.CREDENTIALS = self.credfile if os.path.exists(self.configfile):
if os.path.exists(self.credfile): os.remove(self.configfile)
os.remove(self.credfile) assert not os.path.exists(self.configfile)
utils.create_credentials(self.credfile) utils.create_config_file(self.configfile)
assert os.path.exists(self.configfile)
def test_create_credentials(self):
assert os.path.exists(self.credfile)
os.remove(self.credfile)
utils.create_credentials() # From config
assert os.path.exists(self.credfile)
def test_add_credentials(self): def test_add_credentials(self):
utils.add_credentials(self.credfile, user="test") utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(self.credfile) assert utils.get_credentials(self.configfile)
assert utils.get_credentials(user="test") assert utils.get_credentials(self.configfile, user="test")
assert list(utils.get_credentials(user="test"))[0]["user"] == "test" assert list(utils.get_credentials(self.configfile, user="test"))[0]["user"] == "test"
def test_get_credentials(self): def test_get_credentials(self):
utils.add_credentials(self.credfile, user="test") utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(user="test") assert utils.get_credentials(self.configfile, user="test")
assert not utils.get_credentials(user="test", inverse=True) assert not utils.get_credentials(self.configfile, user="test", inverse=True)
def test_add_two_credentials(self): def test_add_two_credentials(self):
utils.add_credentials(self.credfile, user="test") utils.add_credentials(self.configfile, user="test")
utils.add_credentials(self.credfile, user="test2") utils.add_credentials(self.configfile, user="test2")
assert utils.get_credentials(user="test") assert utils.get_credentials(self.configfile, user="test")
assert utils.get_credentials(user="test2") assert utils.get_credentials(self.configfile, user="test2")
def test_delete_credentials(self): def test_delete_credentials(self):
utils.add_credentials(self.credfile, user="test") utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(user="test") assert utils.get_credentials(self.configfile, user="test")
utils.delete_credentials(user="test") utils.delete_credentials(self.configfile, user="test")
print(utils.get_credentials()) assert not utils.get_credentials(self.configfile, user="test")
assert not utils.get_credentials(user="test")
def test_parallel(self): def test_parallel(self):
import time import time
def echo(i): def echo(i):
time.sleep(2) time.sleep(0.5)
return i return i
tic = time.time() tic = time.time()
resp = utils.parallel(echo, [1, 2, 3]) resp = utils.parallel(echo, [1, 2, 3])
assert isinstance(resp, types.GeneratorType) assert isinstance(resp, types.GeneratorType)
assert list(resp) == [1,2,3] assert sorted(list(resp)) == [1, 2, 3]
toc = time.time() toc = time.time()
assert (tic-toc) < 6000 assert (tic-toc) < 600
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2,3,4] assert sorted(list(resp2)) == [1, 2, 3, 4]
class TestUtilsEnv(TestUtils):
configfile = None
def setUp(self):
if 'BITTER_CONFIG' in os.environ:
self.oldenv = os.environ['BITTER_CONFIG']
os.environ['BITTER_CONFIG'] = ''
def tearDown(self):
if hasattr(self, 'oldenv'):
os.environ['BITTER_CONFIG'] = self.oldenv