1
0
mirror of https://github.com/balkian/bitter.git synced 2025-07-11 16:02:20 +00:00

Compare commits

...

22 Commits

Author SHA1 Message Date
J. Fernando Sánchez
ea848f1a78 Version 0.10.3
* Ability to include "optional" fields from tweets (e.g., retweeted_status).
* Optional caching (for very large datasets)
2020-06-24 11:01:02 +02:00
J. Fernando Sánchez
030c41b826 Changes to user and tweet search: Cache by default
* Improved printing of credential limits
* Tweet and user searchers cache by default. Write has been changed to dry_run
2020-01-07 20:36:19 +01:00
J. Fernando Sánchez
bba73091e4 Fixes for serialize
Some commands in the CLI did not properly include serialize (get_tweet,
get_users and search)
2020-01-07 19:36:18 +01:00
J. Fernando Sánchez
80b58541e7 Add serialize 2019-11-12 17:15:26 +01:00
J. Fernando Sánchez
40a8b45231 Fix concurrency issue
Download_list sometimes failed with:
BrokenPipeError: [Errno 32] Broken pipe
2019-09-20 13:39:51 +02:00
J. Fernando Sánchez
fadeced761 Fix tests py2 2019-09-19 12:18:56 +02:00
J. Fernando Sánchez
bdb844d75f Fix compatibility click >=7 2019-09-19 11:45:12 +02:00
J. Fernando Sánchez
653487e2d7 Improve download_list 2019-04-30 19:15:15 +02:00
J. Fernando Sánchez
02aec5eefa Fix bug user_ids
Add number of failed downloads to the output.
Add flag to retry previously failed downloads.
2018-09-16 12:20:41 +02:00
J. Fernando Sánchez
e6b08c4ffb Fix bug user ids 2018-08-30 15:57:49 +02:00
J. Fernando Sánchez
311b972b27 Fix typo and bump to version 0.9.1 2018-08-21 13:02:58 +02:00
J. Fernando Sánchez
7724967285 Add filter-edges 2018-08-21 12:57:03 +02:00
J. Fernando Sánchez
dd662acd22 Fix pip installation 2018-08-21 12:54:25 +02:00
J. Fernando Sánchez
5199d5b5aa Improve CLI. Add credentials 2018-03-20 13:29:18 +01:00
J. Fernando Sánchez
6259013978 Compose and bug fixes 2018-03-19 14:36:25 +01:00
J. Fernando Sánchez
53bb7edabc Add sh scripts 2018-03-19 14:35:07 +01:00
J. Fernando Sánchez
57eb73b53b Fix README 2017-12-28 18:13:58 +01:00
J. Fernando Sánchez
7c829ee102 Fix py2 compatibility 2017-12-20 16:51:53 +01:00
J. Fernando Sánchez
27bc3557b2 Fixed JSON regression 2017-12-19 20:44:55 +01:00
J. Fernando Sánchez
9c82dea298 Config from variable or file
This replaces the old file of credentials (with one per line) with a
configuration in YAML format.

The configuration can be stored either in a file or in an environment variable
(BITTER_CONFIG).

There is still a command line argument to add the credentials in that file to
the config.
2017-12-19 20:34:39 +01:00
J. Fernando Sánchez
cf766a6bf3 API command
* Added API command
* Fixed bug in chunk
2017-11-30 16:49:42 +01:00
J. Fernando Sánchez
e65f6836b3 Fixed tweet error 2017-05-21 21:28:35 +02:00
34 changed files with 1399 additions and 396 deletions

1
.gitignore vendored
View File

@ -2,6 +2,7 @@ __pycache__
*.egg-info
dist
env
.env
__*
.*
*.pyc

View File

@ -2,6 +2,6 @@
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
RUN pip install ".[server]"
ENTRYPOINT ["bitter"]

View File

@ -2,6 +2,6 @@
From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
RUN pip install ".[server]"
ENTRYPOINT ["bitter"]

View File

@ -2,6 +2,6 @@
From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
RUN pip install ".[server]"
ENTRYPOINT ["bitter"]

View File

@ -1,4 +1,4 @@
PYVERSIONS=3.4 2.7
PYVERSIONS=3.5
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian
@ -19,7 +19,7 @@ Dockerfile-%: Dockerfile.template
dev-%:
@docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \
docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
docker run -d -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\
docker exec -ti $(NAME)-dev$* bash
@ -38,7 +38,7 @@ test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-%
docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
docker run --rm -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
@ -71,6 +71,6 @@ pip_upload:
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build
docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
docker run --rm --env-file $$PWD/.env -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run

146
README.md
View File

@ -1,4 +1,5 @@
# Description
There are two parts to bitter.
First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts).
Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper.
@ -22,16 +23,153 @@ wq = easy()
print(wq.users.show(user_name='balkian'))
```
# Credentials format
You can also make custom calls to the API through the command line.
e.g. to get the latest 500 tweets by the python software foundation:
```
{"user": "balkian", "consumer_secret": "xxx", "consumer_key": "xxx", "token_key": "xxx", "token_secret": "xxx"}
bitter api statuses/user_timeline --id thepsf --count 500
```
By default, bitter uses '~/.bitter-credentials.json', but you may choose a different file:
## Adding credentials
```
python -m bitter -c <credentials_file> ...
bitter --config <YOUR CONFIGURATION FILE> credentials add
```
You can specify the parameters in the command or let the command line guide you through the process.
# Examples
## Downloading a list of tweets
Bitter can download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.
You can even specify the column number for tweet ids.
Bitter will not try to download
```
Usage: bitter tweet get_all [OPTIONS] TWEETSFILE
Download tweets from a list of tweets in a CSV file. The result is stored
as individual json files in your folder of choice.
Options:
-f, --folder TEXT
-d, --delimiter TEXT
-h, --header Discard the first line (use it as a header)
-q, --quotechar TEXT
-c, --column INTEGER
--help Show this message and exit.
```
For instance, this will download `tweet_ids.csv` in the `tweet_info` folder:
```
bitter tweet get_all -f tweet_info tweet_ids.csv
```
## Downloading a list of users
Bitter downloads users and tweets in a similar way:
```
Usage: bitter users get_all [OPTIONS] USERSFILE
Download users from a list of user ids/screen names in a CSV file. The
result is stored as individual json files in your folder of choice.
Options:
-f, --folder TEXT
-d, --delimiter TEXT
-h, --header Discard the first line (use it as a header)
-q, --quotechar TEXT
-c, --column INTEGER
--help Show this message and exit.
```
The only difference is that users can be downloaded via `screen_name` or `user_id`.
This method does not try to resolve screen names to user ids, so users may be downloaded more than once if they appear in both ways.
## Downloading a stream
```
Usage: bitter stream get [OPTIONS]
Options:
-l, --locations TEXT
-t, --track TEXT
-f, --file TEXT File to store the stream of tweets. Default: standard output
-p, --politelyretry Politely retry after a hangup/connection error
--help Show this message and exit.
```
```
bitter --config .bitter.yaml stream get
```
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
## REST queries
In newer versions of bitter, individual methods to download tweets/users using the REST API are being replaced with a generic method to call the API.
```
bitter api <URL endpoint> --parameter VALUE ... | [--tweets | --users] [--max_count MAX_COUNT] [--count COUNT_PER_CALL]
```
For instance:
```
# Get 100 tweets that mentioned Obama after tweet 942689870501302300
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama
```
That is equivalent to this call to the api: `api/1.1/searc/tweets?since_id=942689870501302300&count=100&q=Obama`.
The flags `--tweets` and `--users` are optional.
If you use them, bitter will try to intelligently fetch all the tweets/users by using pagination with the API.
For example:
```
# Download 1000 tweets, 100 tweets per call.
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama --max_count=1000 --tweets
```
```
# Download all the followers of @balkian
bitter api 'followers/list' --_id balkian --users --max_count -1
```
Note that some reserved words (such as `id`) have to be preceeded by an underscore.
This limitation is imposed by the python-twitter library.
# Configuration format
```
credentials:
- user: "balkian"
consumer_secret: "xxx"
consumer_key: "xxx"
token_key: "xxx"
token_secret: "xxx"
- user: ....
```
By default, bitter uses '~/.bitter.yaml', but you may choose a different file:
```
python -m bitter --config <config_file> ...
```
Or use an environment variable:
```
export BITTER_CONFIG=$(cat myconfig.yaml)
```
# Server

10
bin/README.md Normal file
View File

@ -0,0 +1,10 @@
Scripts to process jsonlines
To get the jsonlines file, you can use the streaming API or the search api, like so:
```
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
```
To keep track of the query that generated the file, you can save the command in a text file.
For instance, the example above is also in `example_query.sh`.

1
bin/example_query.sh Executable file
View File

@ -0,0 +1 @@
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines

13
bin/extract-hashtags.sh Executable file
View File

@ -0,0 +1,13 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export FIELDS="created_at,id,text"
for i in "$@"
do
OUTPUT=$i.hashtags.csv
echo "$FIELDS" > $OUTPUT
pv -l $i -N "hashtags $i" | jq -r '. | .created_at as $created_at | .id_str as $id | .entities.hashtags | select(. != null) | .[] | [$created_at, $id, .text] | @csv' >> $OUTPUT
done

15
bin/extract-interactions.sh Executable file
View File

@ -0,0 +1,15 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
for i in "$@"
do
REPLYOUTPUT=$i.replies.csv
RTOUTPUT=$i.rts.csv
echo 'created_at,id,user_id,reply_user_id' > $REPLYOUTPUT
echo 'created_at,id,user_id,rt_user_id' > $RTOUTPUT
pv -l -N "$i" $i | jq -r '. | select(.in_reply_to_user_id_str != null) | [.created_at, .id_str, .user.id_str, .in_reply_to_user_id_str] | @csv' >> $REPLYOUTPUT
pv -l -N "$i" $i | jq -r '. | select(.retweeted_status != null) | [.created_at, .retweeted_status.id_str, .user.id_str, .retweeted_status.user.id_str] | @csv' >> $RTOUTPUT
done

16
bin/extract-limits.sh Executable file
View File

@ -0,0 +1,16 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export QUERY='.limit | select(. != null) | [.timestamp_ms, .track] | @csv'
export FIELDS="timestamp,track"
for i in "$@"
do
OUTPUT=$i.limits.csv
echo $FIELDS > $OUTPUT
pv -N "$i limits" -l $i | jq -r "$QUERY" >> $OUTPUT
done

16
bin/extract-media.sh Executable file
View File

@ -0,0 +1,16 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export QUERY='select(.id != null) | .id_str as $id | .entities.urls[] | select(.expanded_url | select(. != null) | contains("open.spotify") or contains("youtube.com") or contains("youtu.be")) | [$id, .expanded_url] | @csv'
export FIELDS="id,url"
for i in "$@"
do
OUTPUT=$i.media.csv
echo $FIELDS > $OUTPUT
pv -N "$i media" -l $i | jq -r "$QUERY" >> $OUTPUT
done

28
bin/extract-users.sh Executable file
View File

@ -0,0 +1,28 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export USER_FIELDS="\$created_at,\
.id_str,\
.screen_name,\
.followers_count,\
.lang,\
.description,\
.statuses_count,\
.favourites_count,\
.friends_count,\
.created_at,\
.name,\
.location,\
.listed_count,\
.time_zone\
"
for i in "$@"
do
OUTPUT=$i.users.csv
echo \#$USER_FIELDS > $OUTPUT
jq -r ".created_at as \$created_at | .user,.retweeted_status.user | select(. != null) | [$USER_FIELDS] | @csv " $i | pv -N "$i" -l >> $OUTPUT
done

32
bin/extract.sh Executable file
View File

@ -0,0 +1,32 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
FIELDS=".id_str,\
.user.screen_name,\
.user.id,\
.favorite_count,\
.retweet_count,\
.quote_count,\
.reply_count,\
.created_at,\
.lang,\
.in_reply_to_user_id_str,\
.in_reply_to_status_id_str,\
.retweeted_status.id_str,\
.retweeted_status.user.id,\
.retweeted_status.favorite_count,\
.retweeted_status.retweet_count,\
.retweeted_status.quote_count,\
.retweeted_status.reply_count,\
.retweeted_status.created_at\
"
for i in "$@"
do
OUTPUT=$i.tweets.csv
echo "$FIELDS" | sed -e 's/,[ \t\n]*\./,/g' | sed -e 's/^[#]\?\.//' > $OUTPUT
jq -r "[$FIELDS]|@csv" $i | pv -N "$i" -l >> $OUTPUT
done

17
bin/extract_extended.sh Executable file
View File

@ -0,0 +1,17 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
QUERY='.| select(.retweeted_status != null) | .retweeted_status | .id_str as $rt_id | .extended_tweet | select(. != null) | [$rt_id,.full_text]|@csv'
HEADER='rt_id,full_text'
for i in "$@"
do
OUTPUT=$i.full_text.csv
echo $HEADER > $OUTPUT
jq "$QUERY" $i | pv -N "$i" -l >> $OUTPUT
sort -u $OUTPUT -o $OUTPUT
sed -ri 's/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g' $OUTPUT
done

16
bin/extract_text.sh Executable file
View File

@ -0,0 +1,16 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
QUERY='(.full_text // .retweeted_status.full_text) as $text | [ .id_str,$text ] | @csv'
HEADER='id,text'
for i in "$@"
do
OUTPUT=$i.text.csv
echo $HEADER > $OUTPUT
pv -l -N "$i" $i | jq -r "$QUERY" >> $OUTPUT
# sed -ri s/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g $OUTPUT
done

10
bin/filter-edges.sh Normal file
View File

@ -0,0 +1,10 @@
if [ "$#" -lt 2 ]
then
echo "Find edge lines in a file that contain one of the users in a user list."
echo ""
echo "Usage: $0 <file with edges> <file with the list of users>"
exit 1
fi
pv -c -N 'read' "$1" | grep -F -w -f "$2" | pv -lc -N 'found'

23
bin/functions.py Normal file
View File

@ -0,0 +1,23 @@
import pandas as pd
def read_rts(rtsfile, tweetsfile):
tweets = pd.read_csv(tweetsfile, index_col=0)
rts = pd.read_csv(rtsfile, index_col=1)
merged = rts.groupby(by=['id', 'rt_user_id']).size().rename('count').reset_index(level=1).merge(tweets, left_index=True, right_index=True)
return merged.sort_values(by='count', ascending=False)
def read_tweets(tweetsfile):
'''When the dataset is small enough, we can load tweets as-in'''
with open(tweetsfile) as f:
header = f.readline().strip().split(',')
dtypes = {}
for key in header:
if key.endswith('_str') or key.endswith('.id'):
dtypes[key] = object
tweets = pd.read_csv(tweetsfile, dtype=dtypes, index_col=0)
return tweets
if __name__ == '__main__':
import argparse

1
bin/print-hashtags.sh Executable file
View File

@ -0,0 +1 @@
cat "$@" | awk -F"," '{print tolower($3)}' | sort | uniq -c | sort -h

14
bin/print-replies.sh Executable file
View File

@ -0,0 +1,14 @@
MAX_TAGS=100
function get_text {
while read line
do
echo $line
rtid=$(echo $line | awk -F"," '{print $2}')
text=$(grep -m 1 $rtid *.text.csv)
echo "$line - $text"
done < "/dev/stdin"
}
cat "$@" | get_text

15
bin/print-rts.sh Executable file
View File

@ -0,0 +1,15 @@
MAX_TAGS=100
function get_text {
while read line
do
echo $line
rtid=$(echo $line | awk '{print $2}')
count=$(echo $line | awk '{print $1}')
text=$(grep -m 1 $rtid *.text.csv)
echo "$line - $text"
done < "/dev/stdin"
}
cat "$@" | awk -F"," '{print tolower($2)}' | sort | uniq -c | sort -h | tail -n $MAX_TAGS | get_text

View File

@ -1 +1 @@
0.7.1
0.10.3

View File

@ -6,10 +6,12 @@ http://github.com/balkian/bitter
import os
from .version import __version__
from . import config as bconf
def easy(*args, **kwargs):
def easy(conffile=bconf.CONFIG_FILE):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
return TwitterQueue.from_config(conffile=conffile)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@ -26,19 +26,125 @@ else:
logger = logging.getLogger(__name__)
def serialize(function):
'''Common options to serialize output to CSV or other formats'''
@click.option('--fields', help='Provide a list of comma-separated fields to print.', default='', type=str)
@click.option('--ignore_missing', help='Do not show warnings for missing fields.', is_flag=True)
@click.option('--header', help='Header that will be printed at the beginning of the file', default=None)
@click.option('--csv', help='Print each object as a csv row.', is_flag=True)
@click.option('--jsonlines', '--json', help='Print each object as JSON in a new line.', is_flag=True)
@click.option('--indented', help='Print each object as an indented JSON object', is_flag=True)
@click.option('--outdelimiter', help='Delimiter for some output formats, such as CSV. It defaults to \t', default='\t')
@click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout)
def decorated(fields, ignore_missing, header, csv, jsonlines, indented, outfile, outdelimiter, **kwargs):
it = function(**kwargs)
outformat = 'json'
if csv:
outformat = 'csv'
elif jsonlines:
outformat = 'jsonlines'
elif indented:
outformat = 'indented'
return utils.serialized(it, outfile, outformat=outformat, fields=fields.split(','), ignore_missing=ignore_missing, header=header, delimiter=outdelimiter)
return decorated
@click.group()
@click.option("--verbose", is_flag=True)
@click.option("--logging_level", required=False, default='WARN')
@click.option("--config", required=False)
@click.option('-c', '--credentials', show_default=True, default='~/.bitter-credentials.json')
@click.option('--config', show_default=True, default=bconf.CONFIG_FILE)
@click.option('--credentials', show_default=True, help="DEPRECATED: If specified, these credentials will be copied to the configuratation file.", default=bconf.CREDENTIALS)
@click.pass_context
def main(ctx, verbose, logging_level, config, credentials):
logging.basicConfig(level=getattr(logging, logging_level))
ctx.obj = {}
ctx.obj['VERBOSE'] = verbose
ctx.obj['CONFIG'] = config
bconf.CONFIG_FILE = config
bconf.CREDENTIALS = credentials
utils.create_credentials(credentials)
if os.path.exists(utils.get_config_path(credentials)):
utils.copy_credentials_to_config(credentials, config)
@main.group(invoke_without_command=True)
@click.pass_context
def credentials(ctx):
if ctx.invoked_subcommand is not None:
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for worker in wq.queue:
print('#'*20)
try:
resp = worker.client.application.rate_limit_status()
print(worker.name)
except Exception as ex:
print('{}: AUTHENTICATION ERROR: {}'.format(worker.name, ex) )
@credentials.command('limits')
@click.option('--no_aggregate', is_flag=True, default=False,
help=('Print limits from all workers. By default, limits are '
'aggregated (summed).'))
@click.option('--no_diff', is_flag=True, default=False,
help=('Print all limits. By default, only limits that '
'have been consumed will be shown.'))
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, no_aggregate, no_diff, url):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
limits = {}
if url:
print('URL is: {}'.format(url))
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
for urlimits in resp['resources'].values():
for url, value in urlimits.items():
if url not in limits:
limits[url] = {}
glob = limits[url].get('global', {})
limits[url][worker.name] = value
for k in ['limit', 'remaining']:
if k not in glob:
glob[k] = 0
glob[k] += value[k]
limits[url]['global'] = glob
for url, lims in limits.items():
worker_list = lims.keys() if no_aggregate else ['global', ]
url_printed = False
for worker in worker_list:
vals = lims[worker]
consumed = vals['limit'] - vals['remaining']
if no_diff or consumed:
if not url_printed:
print(url)
url_printed = True
print('\t', worker, ':')
print('\t\t', vals)
@credentials.command('add')
@click.option('--consumer_key', default=None)
@click.option('--consumer_secret', default=None)
@click.option('--token_key', default=None)
@click.option('--token_secret', default=None)
@click.argument('user_name')
def add(user_name, consumer_key, consumer_secret, token_key, token_secret):
if not consumer_key:
consumer_key = click.prompt('Please, enter your YOUR CONSUMER KEY')
if not consumer_secret:
consumer_secret = click.prompt('Please, enter your CONSUMER SECRET')
if not token_key:
token_key = click.prompt('Please, enter your ACCESS TOKEN')
if not token_secret:
token_secret = click.prompt('Please, enter your ACCESS TOKEN SECRET')
utils.add_credentials(conffile=bconf.CONFIG_FILE, user=user_name, consumer_key=consumer_key, consumer_secret=consumer_secret,
token_key=token_key, token_secret=token_secret)
click.echo('Credentials added for {}'.format(user_name))
@main.group()
@click.pass_context
@ -46,35 +152,61 @@ def tweet(ctx):
pass
@tweet.command('get')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-d', '--dry_run', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid')
def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
utils.download_tweet(wq, tweetid, write, folder, update)
@serialize
def get_tweet(tweetid, dry_run, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
yield from utils.download_tweet(wq, tweetid, not dry_run, folder, update)
@tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up')
@tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.''')
@click.argument('tweetsfile')
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",")
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
@click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0)
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-q', '--quotechar', default='"')
@click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context
def get_tweets(ctx, tweetsfile, folder):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
utils.download_tweets(wq, tweetsfile, folder)
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, nocache, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing tweets. Continue?'):
click.echo('Cancelling')
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
status = tqdm('Queried')
failed = 0
for tid, obj in utils.download_tweets_file(wq, tweetsfile, folder, delimiter=delimiter, cache=not nocache,
skip=skip, quotechar=quotechar, commentchar=commentchar,
column=column, update=update, retry_failed=retry):
status.update(1)
if not obj:
failed += 1
status.set_description('Failed: %s. Queried' % failed, refresh=True)
continue
yield obj
@tweet.command('search')
@click.argument('query')
@serialize
@click.pass_context
def search(ctx, query):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.search_tweet(wq, query)
print(json.dumps(t, indent=2))
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
yield from utils.search_tweet(wq, query)
@tweet.command('timeline')
@click.argument('user')
@click.pass_context
def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
t = utils.user_timeline(wq, user)
print(json.dumps(t, indent=2))
@ -96,43 +228,48 @@ def list_users(ctx, db):
@users.command('get')
@click.argument('user')
@click.option('-w', '--write', is_flag=True, default=False)
@click.option('-d', '--dry_run', is_flag=True, default=False)
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
def get_user(user, write, folder, update):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
if not write:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % user)
if not update and os.path.exists(file) and os.path.isfile(file):
print('User exists: %s' % user)
return
with open(file, 'w') as f:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js, file=f)
@serialize
def get_user(user, dry_run, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
yield from utils.download_user(wq, user, not dry_run, folder, update)
@users.command('get_all')
@click.argument('usersfile', 'File with a list of users to look up')
@users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file.
The result is stored as individual json files in your folder of choice.''')
@click.argument('usersfile')
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
@click.option('-d', '--delimiter', default=",")
@click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)',
is_flag=True, default=False)
@click.option('-q', '--quotechar', default='"')
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context
def get_users(ctx, usersfile, folder):
with open(usersfile) as f:
for line in f:
uid = line.strip()
ctx.invoke(get_user, folder=folder, user=uid, write=True)
def get_users(ctx, usersfile, folder, update, retry, nocache, delimiter, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing users. Continue?'):
click.echo('Cancelling')
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for i in utils.download_users_file(wq, usersfile, folder, delimiter=delimiter,
update=update, retry_failed=retry,
skip=skip, quotechar=quotechar,
cache=not nocache,
commentchar=commentchar,
column=column):
yield i
@users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile', 'File with a list of users to look up')
@click.argument('usersfile')
@click.pass_context
def crawl_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
@ -146,7 +283,7 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
return ExitStack()
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue)))
@ -240,11 +377,6 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
logger.info('Done!')
@main.group('api')
def api():
pass
@main.group('extractor')
@click.pass_context
@click.option('--db', required=True, help='Database of users.')
@ -315,7 +447,7 @@ def users_extractor(ctx):
@click.pass_context
def extract(ctx, recursive, user, name, initfile):
print(locals())
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
dburi = ctx.obj['DBURI']
utils.extract(wq,
recursive=recursive,
@ -327,31 +459,41 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset')
@click.pass_context
def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
db = ctx.obj['DBURI']
session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@api.command('limits')
@click.argument('url', required=False)
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False),
help='''Issue a call to an endpoint of the Twitter API.''')
@click.argument('cmd', nargs=1)
@click.option('--tweets', is_flag=True, help='Fetch more tweets using smart pagination. Use --count to control how many tweets to fetch per call, and --max_count to set the number of desired tweets (or -1 to get as many as possible).', type=bool, default=False)
@click.option('--users', is_flag=True, help='Fetch more users using smart pagination. Use --count to control how many users to fetch per call, and --max_count to set the number of desired users (or -1 to get as many as possible).', type=bool, default=False)
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
print('#'*20)
print(worker.name)
if url:
limit = 'NOT FOUND'
print('URL is: {}'.format(url))
cat = url.split('/')[1]
if cat in resp['resources']:
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
def api(ctx, cmd, tweets, users, api_args):
opts = {}
mappings = {
'id': '_id'
}
i = iter(api_args)
for k, v in zip(i, i):
k = k.replace('--', '')
if k in mappings:
k = mappings[k]
opts[k] = v
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
if tweets:
resp = utils.consume_tweets(wq[cmd], **opts)
elif users:
resp = utils.consume_users(wq[cmd], **opts)
else:
print('Cat {} not found'.format(cat))
print('{}: {}'.format(url, limit))
else:
print(json.dumps(resp, indent=2))
resp = wq[cmd](**opts)
print(json.dumps(resp))
return
for i in resp:
print(json.dumps(i))
@main.command('server')
@click.argument('CONSUMER_KEY', required=True)
@ -371,11 +513,11 @@ def stream(ctx):
@stream.command('get')
@click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None)
@click.option('-f', '--file', help='File to store the stream of tweets')
@click.option('-f', '--file', default=None, help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context
def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1)
query_args = {}
if locations:
@ -394,10 +536,14 @@ def get_stream(ctx, locations, track, file, politelyretry):
iterator = wq.statuses.sample()
else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
try:
for i in iterator:
yield i
if not politelyretry:
return
except Exception:
if not politelyretry:
raise ex
thishangup = time.time()
if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.')
@ -415,7 +561,7 @@ def get_stream(ctx, locations, track, file, politelyretry):
def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail):
try:
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['created_at'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
except (KeyError, TypeError):
print('Raw tweet: {}'.format(tweet))

View File

@ -11,3 +11,4 @@ E.g.:
app.run()
'''
CREDENTIALS = '~/.bitter-credentials.json'
CONFIG_FILE = '~/.bitter.yaml'

View File

@ -10,6 +10,7 @@ from twitter import *
from collections import OrderedDict
from threading import Lock
from itertools import islice
from functools import partial
try:
import itertools.ifilter as filter
except ImportError:
@ -38,6 +39,9 @@ class AttrToFunc(object):
else:
return extend_call(k)
def __getitem__(self, k):
return partial(self.handler, self.__uriparts+k.split('/'))
def __call__(self, *args, **kwargs):
# for i, a in enumerate(args)e
# kwargs[i] = a
@ -54,6 +58,18 @@ class FromCredentialsMixin(object):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class FromConfigMixin(object):
@classmethod
def from_config(cls, config=None, conffile=None, max_workers=None):
wq = cls()
if not config:
with utils.config(conffile) as c:
config = c
for cred in islice(config['credentials'], max_workers):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class TwitterWorker(object):
api_class = None
@ -75,6 +91,12 @@ class TwitterWorker(object):
self._client = self.api_class(auth=auth)
return self._client
def __repr__(self):
msg = '<{} for {}>'.format(self.__class__.__name__, self.name)
if self.busy:
msg += ' [busy]'
return msg
class RestWorker(TwitterWorker):
api_class = Twitter
@ -93,13 +115,14 @@ class RestWorker(TwitterWorker):
def get_wait(self, uriparts):
limits = self.get_limit(uriparts)
if limits['remaining'] > 0:
if limits.get('remaining', 1) > 0:
return 0
reset = limits.get('reset', 0)
now = time.time()
return max(0, (reset-now))
def get_limit(self, uriparts):
uriparts = list(u for u in uriparts if u)
uri = '/'+'/'.join(uriparts)
for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
if ix.startswith(uri):
@ -132,7 +155,7 @@ class RestWorker(TwitterWorker):
class QueueException(BaseException):
pass
class QueueMixin(AttrToFunc, FromCredentialsMixin):
class QueueMixin(AttrToFunc, FromCredentialsMixin, FromConfigMixin):
def __init__(self, wait=True):
logger.debug('Creating worker queue')
self.queue = set()

View File

@ -3,10 +3,13 @@ import json
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import BigInteger, Integer, Text, Boolean
from sqlalchemy.schema import ForeignKey
from sqlalchemy.pool import SingletonThreadPool
from sqlalchemy import Column, Index
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from functools import wraps
Base = declarative_base()
@ -85,34 +88,48 @@ class ExtractorEntry(Base):
user = Column(BigInteger, index=True)
cursor = Column(BigInteger, default=-1)
pending = Column(Boolean, default=False)
errors = Column(Text, default="")
busy = Column(Boolean, default=False)
class Search(Base):
__tablename__ = 'search_queries'
id = Column(Integer, primary_key=True, index=True, unique=True)
endpoint = Column(Text, comment="Endpoint URL")
attrs = Column(Text, comment="Text version of the dictionary of parameters")
count = Column(Integer)
current_count = Column(Integer)
current_id = Column(BigInteger, comment='Oldest ID retrieved (should match max_id when done)')
since_id = Column(BigInteger)
class SearchResults(Base):
__tablename__ = 'search_results'
id = Column(Integer, primary_key=True, index=True, unique=True)
search_id = Column(ForeignKey('search_queries.id'))
resource_id = Column(Text)
def memoize(f):
memo = {}
@wraps(f)
def helper(self, **kwargs):
st = dict_to_str(kwargs)
key = (self.__uriparts, st)
if key not in memo:
memo[key] = f(self, **kwargs)
return memo[key]
return helper
def make_session(url):
engine = create_engine(url)#, echo=True)
if not isinstance(url, str):
print(url)
raise Exception("FUCK")
engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
return session
def test(db='sqlite:///users.db'):
from sqlalchemy import exists
session = make_session(db)
our_user = session.query(User).first()
print(our_user.name)
print(session.query(User).count())
fake_user = User(name="Fake user")
session.add(fake_user)
session.commit()
print(session.query(User).count())
print(session.query(exists().where(User.name == "Fake user")).scalar())
fake_committed = session.query(User).filter_by(name="Fake user").first()
print(fake_committed.id)
print(fake_committed.name)
session.delete(fake_committed)
session.commit()
print(session.query(User).count())
print(list(session.execute('SELECT 1 from users where id=\'%s\'' % 1548)))
def dict_to_str(args):
return json.dumps(args, sort_keys=True)

View File

@ -3,6 +3,9 @@ from __future__ import print_function
import logging
import time
import json
import yaml
import csv
import io
import signal
import sys
@ -10,18 +13,23 @@ import sqlalchemy
import os
import multiprocessing
from multiprocessing.pool import ThreadPool
from multiprocessing import Queue
import queue
import threading
from select import select
import operator
from functools import partial, reduce
from tqdm import tqdm
from itertools import islice, chain
from contextlib import contextmanager
try:
from itertools import izip_longest
except ImportError:
from itertools import zip_longest
from collections import Counter
from random import choice
from builtins import map, filter
@ -31,6 +39,12 @@ from bitter.models import Following, User, ExtractorEntry, make_session
from bitter import config
# Fix Python 2.x.
try:
UNICODE_EXISTS = bool(type(unicode))
except NameError:
unicode = lambda s: str(s)
logger = logging.getLogger(__name__)
@ -38,38 +52,100 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!')
sys.exit(0)
def chunk(iterable, n):
it = iter(iterable)
return iter(lambda: tuple(islice(it, n)), ())
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize)
p = ThreadPool(numcpus*2)
for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))):
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as ex:
print('Exception on parallel thread: {}'.format(ex), file=sys.stderr)
results = p.imap_unordered(wrapped_func, source)
for i in chain.from_iterable(results):
yield i
def get_credentials_path(credfile=None):
if not credfile:
if config.CREDENTIALS:
credfile = config.CREDENTIALS
def get_config_path(conf=None):
if not conf:
if config.CONFIG_FILE:
conf = config.CONFIG_FILE
else:
raise Exception('No valid credentials file')
return os.path.expanduser(credfile)
raise Exception('No valid config file')
return os.path.expanduser(conf)
def copy_credentials_to_config(credfile, conffile=None):
p = get_config_path(credfile)
with open(p) as old:
for line in old:
cred = json.loads(line.strip())
add_credentials(conffile, **cred)
def save_config(conf, conffile=None):
with config(conffile) as c:
c.clear()
c.update(conf)
@contextmanager
def credentials_file(credfile, *args, **kwargs):
p = get_credentials_path(credfile)
with open(p, *args, **kwargs) as f:
yield f
def config(conffile=None):
d = read_config(conffile)
try:
yield d
finally:
write_config(d, conffile)
def iter_credentials(credfile=None):
with credentials_file(credfile) as f:
for l in f:
yield json.loads(l.strip())
def get_credentials(credfile=None, inverse=False, **kwargs):
def read_config(conffile):
p = conffile and get_config_path(conffile)
if p:
if not os.path.exists(p):
raise IOError('{} file does not exist.'.format(p))
f = open(p, 'r')
elif 'BITTER_CONFIG' not in os.environ:
raise Exception('No config file or BITTER_CONFIG env variable.')
else:
f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n'))
return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []}
def write_config(conf, conffile=None):
if not conf:
conf = {'credentials': []}
if conffile:
p = get_config_path(conffile)
with open(p, 'w') as f:
yaml.dump(conf, f)
else:
os.environ['BITTER_CONFIG'] = yaml.dump(conf)
def iter_credentials(conffile=None):
with config(conffile) as c:
for i in c['credentials']:
yield i
def create_config_file(conffile=None):
if not conffile:
return
conffile = get_config_path(conffile)
with open(conffile, 'a'):
pass
write_config(None, conffile)
def get_credentials(conffile=None, inverse=False, **kwargs):
creds = []
for i in iter_credentials(credfile):
for i in iter_credentials(conffile):
matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items()))
if matches and not inverse:
creds.append(i)
@ -77,24 +153,23 @@ def get_credentials(credfile=None, inverse=False, **kwargs):
creds.append(i)
return creds
def create_credentials(credfile=None):
credfile = get_credentials_path(credfile)
with credentials_file(credfile, 'a'):
pass
def delete_credentials(credfile=None, **creds):
tokeep = get_credentials(credfile, inverse=True, **creds)
with credentials_file(credfile, 'w') as f:
for i in tokeep:
f.write(json.dumps(i))
f.write('\n')
def delete_credentials(conffile=None, **creds):
tokeep = get_credentials(conffile, inverse=True, **creds)
with config(conffile) as c:
c['credentials'] = list(tokeep)
def add_credentials(credfile=None, **creds):
exist = get_credentials(credfile, **creds)
if not exist:
with credentials_file(credfile, 'a') as f:
f.write(json.dumps(creds))
f.write('\n')
def add_credentials(conffile=None, **creds):
try:
exist = get_credentials(conffile, **creds)
except IOError:
exist = False
create_config_file(conffile)
if exist:
return
with config(conffile) as c:
c['credentials'].append(creds)
def get_hashtags(iter_tweets, best=None):
@ -103,8 +178,13 @@ def get_hashtags(iter_tweets, best=None):
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c
def read_file(filename, tail=False):
with open(filename) as f:
if filename == '-':
f = sys.stdin
else:
f = open(filename)
try:
while True:
line = f.readline()
if line not in (None, '', '\n'):
@ -115,6 +195,9 @@ def read_file(filename, tail=False):
time.sleep(1)
else:
return
finally:
if f != sys.stdin:
close(f)
def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
@ -144,6 +227,7 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
else:
yield user
def trim_user(user):
if 'status' in user:
del user['status']
@ -157,14 +241,22 @@ def trim_user(user):
return user
def add_user(session, user, enqueue=False):
def add_user(user, dburi=None, session=None, update=False):
if not session:
session = make_session(dburi)
user = trim_user(user)
olduser = session.query(User).filter(User.id == user['id'])
if olduser:
if not update:
return
olduser.delete()
user = User(**user)
nuser = User()
for key, value in user.items():
setattr(nuser, key, value)
user = nuser
if update:
session.add(user)
if extract:
logger.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry:
@ -174,87 +266,75 @@ def add_user(session, user, enqueue=False):
entry.pending = True
entry.cursor = -1
session.commit()
session.close()
# TODO: adapt to the crawler
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler)
w = wq.next()
if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name
def download_entry(wq, entry_id, dburi=None, recursive=False):
session = make_session(dburi)
if not session:
raise Exception("Provide dburi or session")
logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id)))
entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first()
user = session.query(User).filter(User.id == entry.user).first()
download_user(wq, session, user, entry, recursive)
session.close()
screen_names = []
user_ids = []
def crawl_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
def classify_user(id_or_name):
try:
int(user)
user_ids.append(user)
logger.info("Added user id")
except ValueError:
logger.info("Added screen_name")
screen_names.append(user.split('@')[-1])
total_followers = user.followers_count
if user:
classify_user(user)
if total_followers > max_followers:
entry.pending = False
logger.info("Too many followers for user: %s" % user.screen_name)
session.add(entry)
session.commit()
return
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user)
else:
logger.info('Using pending users from last session')
if not entry:
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id)
session.add(entry)
session.commit()
nusers = list(get_users(wq, screen_names, by_name=True))
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
for i in nusers:
add_user(session, i, enqueue=True)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logger.info('Total users: {}'.format(total_users))
def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logger.info('Pending: {}'.format(pending))
return pending
while pending_entries() > 0:
logger.info("Using account: %s" % w.name)
candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\
order_by(User.followers_count).first()
if not candidate:
break
pending = True
cursor = entry.cursor
uid = candidate.id
uobject = session.query(User).filter(User.id==uid).first()
name = uobject.screen_name if uobject else None
uid = user.id
name = user.name
logger.info("#"*20)
logger.info("Getting %s - %s" % (uid, name))
logger.info("Cursor %s" % cursor)
logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users))
logger.info("Using account: %s" % wq.name)
_fetched_followers = 0
def fetched_followers():
return session.query(Following).filter(Following.isfollowed==uid).count()
attempts = 0
while cursor > 0 or fetched_followers() < total_followers:
try:
resp = wq.followers.ids(user_id=uid, cursor=cursor)
except TwitterHTTPError as ex:
if ex.e.code in (401, ):
attempts += 1
if ex.e.code in (401, ) or attempts > 3:
logger.info('Not authorized for user: {}'.format(uid))
resp = {}
if 'ids' in resp:
entry.errors = ex.message
break
if 'ids' not in resp:
logger.info("Error with id %s %s" % (uid, resp))
entry.pending = False
entry.errors = "No ids in response: %s" % resp
break
logger.info("New followers: %s" % len(resp['ids']))
if recursive:
newusers = get_users(wq, resp)
for user in newusers:
add_user(session, newuser, enqueue=True)
for newuser in newusers:
add_user(session=session, user=newuser)
if 'ids' not in resp or not resp['ids']:
logger.info('NO IDS in response')
break
for i in resp['ids']:
existing_user = session.query(Following).\
filter(Following.isfollowed == uid).\
@ -268,37 +348,119 @@ def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor
created_at_stamp=now)
session.add(f)
total_followers = candidate.followers_count
fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count()
logger.info("Fetched: %s/%s followers" % (fetched_followers,
logger.info("Fetched: %s/%s followers" % (fetched_followers(),
total_followers))
cursor = resp["next_cursor"]
if cursor > 0:
pending = True
logger.info("Getting more followers for %s" % uid)
else:
logger.info("Done getting followers for %s" % uid)
cursor = -1
pending = False
else:
logger.info("Error with id %s %s" % (uid, resp))
pending = False
entry.cursor = resp["next_cursor"]
entry.pending = pending
entry.cursor = cursor
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate)
session.add(entry)
session.commit()
logger.info("Done getting followers for %s" % uid)
entry.pending = False
entry.busy = False
session.add(entry)
session.commit()
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
sys.stdout.flush()
def classify_user(id_or_name, screen_names, user_ids):
try:
int(id_or_name)
user_ids.append(id_or_name)
logger.debug("Added user id")
except ValueError:
logger.debug("Added screen_name")
screen_names.append(id_or_name.split('@')[-1])
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler)
if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name
session = make_session(dburi)
session.query(ExtractorEntry).update({ExtractorEntry.busy: False})
session.commit()
if not (user or initfile):
logger.info('Using pending users from last session')
else:
screen_names = []
user_ids = []
if user:
classify_user(user, screen_names, user_ids)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user, screen_names, user_ids)
def missing_user(ix, column=User.screen_name):
res = session.query(User).filter(column == ix).count() == 0
if res:
logger.info("Missing user %s. Count: %s" % (ix, res))
return res
screen_names = list(filter(missing_user, screen_names))
user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids))
nusers = []
logger.info("Missing user ids: %s" % user_ids)
logger.info("Missing screen names: %s" % screen_names)
if screen_names:
nusers = list(get_users(wq, screen_names, by_name=True))
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
for i in nusers:
add_user(dburi=dburi, user=i)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logger.info('Total users: {}'.format(total_users))
de = partial(download_entry, wq, dburi=dburi)
pending = pending_entries(dburi)
session.close()
with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq:
for i in tq:
tq.write('Got {}'.format(i))
logger.info("Got %s" % i)
def pending_entries(dburi):
session = make_session(dburi)
while True:
candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\
filter(ExtractorEntry.busy == False).\
order_by(User.followers_count).first()
if candidate:
entry.busy = True
session.add(entry)
session.commit()
yield int(entry.id)
continue
if session.query(ExtractorEntry).\
filter(ExtractorEntry.busy == True).count() > 0:
time.sleep(1)
continue
logger.info("No more pending entries")
break
session.close()
def get_tweet(c, tid):
return c.statuses.show(id=tid)
def search_tweet(c, query):
return c.search.tweets(q=query)
yield from c.search.tweets(q=query)['statuses']
def user_timeline(c, query):
try:
@ -313,84 +475,329 @@ def get_user(c, user):
except ValueError:
return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_tweet(tweetid, folder)
newtweet = None
if update or not cached:
newtweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2)
if write:
if newtweet:
write_tweet_json(js, folder)
else:
print(js)
def download_tweet(wq, tweetid, cache=True, folder="downloaded_tweets", update=False):
tweet = cached_id(tweetid, folder)
if update or not tweet:
tweet = get_tweet(wq, tweetid)
if cache and update:
if tweet:
js = json.dumps(tweet)
write_json(js, folder)
yield tweet
def cached_tweet(tweetid, folder):
def download_user(wq, userid, cache=True, folder="downloaded_users", update=False):
user = cached_id(userid, folder)
if update or not user:
user = get_user(wq, userid)
if cache and update:
if user:
write_json(user, folder, aliases=[user['screen_name'], ])
yield user
def cached_id(oid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % tweetid)
file = os.path.join(folder, '%s.json' % oid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Tweet exists' % tweetid)
# print('%s: Object exists' % oid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
logger.error('Error getting cached version of {}: {}'.format(oid, ex))
return tweet
def write_tweet_json(js, folder):
tweetid = js['id']
file = tweet_file(tweetid, folder)
def write_json(js, folder, oid=None, aliases=[]):
if not oid:
oid = js['id']
file = id_file(oid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f, indent=2)
logger.info('Written {} to file {}'.format(tweetid, file))
json.dump(js, f)
logger.info('Written {} to file {}'.format(oid, file))
for alias in aliases:
os.symlink('%s.json' % oid, id_file(alias, folder))
def tweet_file(tweetid, folder):
return os.path.join(folder, '%s.json' % tweetid)
def id_file(oid, folder):
return os.path.join(folder, '%s.json' % oid)
def tweet_fail_file(tweetid, folder):
def fail_file(oid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % tweetid)
return os.path.join(failsfolder, '%s.failed' % oid)
def tweet_failed(tweetid, folder):
return os.path.isfile(tweet_fail_file(tweetid, folder))
def id_failed(oid, folder):
return os.path.isfile(fail_file(oid, folder))
def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
def filter_line(line):
tweetid = int(line)
# print('Checking {}'.format(tweetid))
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
yield None
else:
yield line
def tweet_download_batch(wq, batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
for tid, tweet in tweets.items():
yield tid, tweet
def print_result(res):
tid, tweet = res
if tweet:
def user_download_batch(wq, batch):
screen_names = []
user_ids = []
for elem in batch:
try:
write_tweet_json(tweet, folder=folder)
yield 1
int(elem)
user_ids.append(str(elem))
except ValueError:
screen_names.append(elem.lower())
args = {}
if user_ids:
args['user_id'] = ','.join(user_ids)
if screen_names:
args['screen_name'] = ','.join(screen_names)
try:
users = wq.users.lookup(**args)
except TwitterHTTPError as ex:
if ex.e.code in (404,):
users = []
else:
raise
found_ids = []
found_names = []
for user in users:
uid = user['id_str']
if uid in user_ids:
found_ids.append(uid)
yield (uid, user)
uname = user['screen_name'].lower()
if uname in screen_names:
found_names.append(uname)
yield (uname, user)
for uid in set(user_ids) - set(found_ids):
yield (uid, None)
for name in set(screen_names) - set(found_names):
yield (name, None)
def dump_result(oid, obj, folder, ignore_fails=True):
if obj:
try:
write_json(obj, folder=folder, oid=oid)
failed = fail_file(oid, folder)
if os.path.exists(failed):
os.remove(failed)
except Exception as ex:
logger.error('%s: %s' % (tid, ex))
logger.error('%s: %s' % (oid, ex))
if not ignore_fails:
raise
else:
logger.info('Tweet not recovered: {}'.format(tid))
with open(tweet_fail_file(tid, folder), 'w') as f:
print('Tweet not found', file=f)
yield -1
logger.info('Object not recovered: {}'.format(oid))
with open(fail_file(oid, folder), 'w') as f:
print('Object not found', file=f)
def download_batch(batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(tweetsfile) as f:
lines = map(lambda x: x.strip(), f)
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, cache=True,
batch_method=tweet_download_batch):
done = Queue()
down = Queue()
def filter_list(lst, done, down):
print('filtering')
for oid in lst:
cached = cached_id(oid, folder)
if (cached and not update):
done.put((oid, cached))
elif (id_failed(oid, folder) and not retry_failed):
done.put((oid, None))
else:
down.put(oid)
down.put(None)
def download_results(batch_method, down, done):
def gen():
while True:
r = down.get()
if r is None:
down.close()
down.join_thread()
return
yield r
for t in parallel(batch_method, gen(), 100):
done.put(t)
def batch(*args, **kwargs):
return batch_method(wq, *args, **kwargs)
tc = threading.Thread(target=filter_list, args=(lst, done, down), daemon=True)
tc.start()
td = threading.Thread(target=download_results, args=(batch, down, done), daemon=True)
td.start()
def check_threads(ts, done):
for t in ts:
t.join()
done.put(None)
wait = threading.Thread(target=check_threads, args=([tc, td], done), daemon=True)
wait.start()
while True:
rec = done.get()
if rec is None:
done.close()
done.join_thread()
break
oid, obj = rec
if cache or (not obj):
dump_result(oid, obj, folder, ignore_fails)
yield rec
wait.join()
def download_tweets_file(*args, **kwargs):
kwargs['batch_method'] = tweet_download_batch
yield from download_file(*args, **kwargs)
def download_users_file(*args, **kwargs):
kwargs['batch_method'] = user_download_batch
yield from download_file(*args, **kwargs)
def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0, cache=True,
quotechar='"', commentchar=None, batch_method=tweet_download_batch,
**kwargs):
with open(csvfile) as f:
if commentchar:
f = (line for line in f if not line.startswith('#'))
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
for n in range(skip):
next(csvreader)
def reader(r):
for row in csvreader:
if len(row) > column:
yield row[column].strip()
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, cache=cache,
**kwargs):
yield res
def download_timeline(wq, user):
return wq.statuses.user_timeline(id=user)
def _consume_feed(func, feed_control=None, **kwargs):
'''
Get all the tweets using pagination and a given method.
It can be controlled with the `count` parameter.
If max_count < 0 => Loop until the whole feed is consumed.
If max_count == 0 => Only call the API once, with the default values.
If max_count > 0 => Get max_count tweets from the feed.
'''
remaining = int(kwargs.pop('max_count', 0))
count = int(kwargs.get('count', -1))
limit = False
# We need to at least perform a query, so we simulate a do-while
# by running once with no limit and updating the condition at the end
with tqdm(total=remaining) as pbar:
while not limit:
if remaining > 0 and ((count < 0) or (count > remaining)):
kwargs['count'] = remaining
resp, stop = feed_control(func, kwargs, remaining=remaining, batch_size=count)
if not resp:
return
for entry in resp:
yield entry
pbar.update(len(resp))
limit = stop
if remaining < 0:
# If the loop was run with a negative remaining, it will only stop
# when the control function tells it to.
continue
# Otherwise, check if we have already downloaded all the required items
remaining -= len(resp)
limit = limit or remaining <= 0
def consume_tweets(*args, **kwargs):
return _consume_feed(*args, feed_control=_tweets_control, **kwargs)
def consume_users(*args, **kwargs):
return _consume_feed(*args, feed_control=_users_control, **kwargs)
def _tweets_control(func, apiargs, remaining=0, **kwargs):
''' Return a list of entries, the remaining '''
resp = func(**apiargs)
if not resp:
return None, True
# Update the arguments for the next call
# Two options: either resp is a list, or a dict like:
# {'statuses': ... 'search_metadata': ...}
if isinstance(resp, dict) and 'search_metadata' in resp:
resp = resp['statuses']
max_id = min(s['id'] for s in resp) - 1
apiargs['max_id'] = max_id
return resp, False
def _users_control(func, apiargs, remaining=0, **kwargs):
resp = func(**apiargs)
stop = True
# Update the arguments for the next call
if 'next_cursor' in resp:
cursor = resp['next_cursor']
apiargs['cursor'] = cursor
if int(cursor) != -1:
stop = False
return resp['users'], stop
def serialized(it, outfile, outformat='csv', fields=[], header=None, ignore_missing=False, delimiter='\t'):
outformat = outformat.lower()
def do(out):
if outformat == 'csv':
writer = csv.writer(out, quoting=csv.QUOTE_ALL, delimiter=delimiter)
if header != '':
h = header
if h is None:
h = delimiter.join(fields)
print(h, file=out)
attrs = list(token.strip().split('.') for token in fields)
for obj in it:
values = []
for attr in attrs:
try:
values.append(reduce(operator.getitem, attr, obj))
except KeyError:
if not ignore_missing:
print('Key not present: {}'.format(attr), file=sys.stderr)
values.append(None)
writer.writerow(values)
elif outformat == 'jsonlines':
for obj in it:
print(json.dumps(obj, sort_keys=True), file=out)
elif outformat == 'indented':
for obj in it:
print(json.dumps(obj, indent=4, sort_keys=True), file=out)
else:
for obj in it:
print(obj, file=out)
if outfile is sys.stdout:
return do(sys.stdout)
with open(outfile, 'w') as out:
return do(out)

12
docker-compose.yml Normal file
View File

@ -0,0 +1,12 @@
version: '2'
services:
dev:
build:
context: .
dockerfile: Dockerfile-3.4
volumes:
- '.:/usr/src/app'
tty: yes
working_dir: '/usr/src/app'
entrypoint: '/bin/bash'
command: ''

View File

@ -2,3 +2,4 @@ sqlalchemy
twitter
click
tqdm
pyyaml

View File

@ -1,29 +1,23 @@
import pip
from setuptools import setup
from pip.req import parse_requirements
# parse_requirements() returns generator of pip.req.InstallRequirement objects
# pip 6 introduces the *required* session argument
try:
install_reqs = parse_requirements("requirements.txt", session=pip.download.PipSession())
py2_reqs = parse_requirements("requirements-py2.txt", session=pip.download.PipSession())
test_reqs = parse_requirements("test-requirements.txt", session=pip.download.PipSession())
except AttributeError:
def parse_requirements(filename):
""" load requirements from a pip requirements file """
with open(filename, 'r') as f:
lineiter = list(line.strip() for line in f)
return [line for line in lineiter if line and not line.startswith("#")]
install_reqs = parse_requirements("requirements.txt")
py2_reqs = parse_requirements("requirements-py2.txt")
test_reqs = parse_requirements("test-requirements.txt")
import sys
import os
import itertools
if sys.version_info <= (3, 0):
install_reqs = itertools.chain(install_reqs, py2_reqs)
install_reqs = install_reqs + py2_reqs
# reqs is a list of requirement
# e.g. ['django==1.5.1', 'mezzanine==1.4.6']
install_reqs = [str(ir.req) for ir in install_reqs]
test_reqs = [str(ir.req) for ir in test_reqs]
from bitter import __version__
with open(os.path.join('bitter', 'VERSION'), 'r') as f:
__version__ = f.read().strip()
setup(
name="bitter",

View File

@ -12,7 +12,11 @@ from bitter import config as c
class TestCrawlers(TestCase):
def setUp(self):
self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json'))
CONF_PATH = os.path.join(os.path.dirname(__file__), '.bitter.yaml')
if os.path.exists(CONF_PATH):
self.wq = easy(CONF_PATH)
else:
self.wq = easy()
def test_create_worker(self):
assert len(self.wq.queue)==1

23
tests/test_models.py Normal file
View File

@ -0,0 +1,23 @@
from unittest import TestCase
import os
import types
from bitter import utils
from bitter.models import *
from sqlalchemy import exists
class TestModels(TestCase):
def setUp(self):
self.session = make_session('sqlite://')
def test_user(self):
fake_user = User(name="Fake user", id=1548)
self.session.add(fake_user)
self.session.commit()
fake_committed = self.session.query(User).filter_by(name="Fake user").first()
assert fake_committed
self.session.delete(fake_committed)
self.session.commit()
assert not list(self.session.execute('SELECT 1 from users where id=\'%s\'' % 1548))

View File

@ -8,56 +8,63 @@ from bitter import config as c
class TestUtils(TestCase):
configfile = '/tmp/bitter.yaml'
def setUp(self):
self.credfile = '/tmp/credentials.txt'
c.CREDENTIALS = self.credfile
if os.path.exists(self.credfile):
os.remove(self.credfile)
utils.create_credentials(self.credfile)
def test_create_credentials(self):
assert os.path.exists(self.credfile)
os.remove(self.credfile)
utils.create_credentials() # From config
assert os.path.exists(self.credfile)
c.CONFIG_FILE = self.configfile
if os.path.exists(self.configfile):
os.remove(self.configfile)
assert not os.path.exists(self.configfile)
utils.create_config_file(self.configfile)
assert os.path.exists(self.configfile)
def test_add_credentials(self):
utils.add_credentials(self.credfile, user="test")
assert utils.get_credentials(self.credfile)
assert utils.get_credentials(user="test")
assert list(utils.get_credentials(user="test"))[0]["user"] == "test"
utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(self.configfile)
assert utils.get_credentials(self.configfile, user="test")
assert list(utils.get_credentials(self.configfile, user="test"))[0]["user"] == "test"
def test_get_credentials(self):
utils.add_credentials(self.credfile, user="test")
assert utils.get_credentials(user="test")
assert not utils.get_credentials(user="test", inverse=True)
utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(self.configfile, user="test")
assert not utils.get_credentials(self.configfile, user="test", inverse=True)
def test_add_two_credentials(self):
utils.add_credentials(self.credfile, user="test")
utils.add_credentials(self.credfile, user="test2")
assert utils.get_credentials(user="test")
assert utils.get_credentials(user="test2")
utils.add_credentials(self.configfile, user="test")
utils.add_credentials(self.configfile, user="test2")
assert utils.get_credentials(self.configfile, user="test")
assert utils.get_credentials(self.configfile, user="test2")
def test_delete_credentials(self):
utils.add_credentials(self.credfile, user="test")
assert utils.get_credentials(user="test")
utils.delete_credentials(user="test")
print(utils.get_credentials())
assert not utils.get_credentials(user="test")
utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(self.configfile, user="test")
utils.delete_credentials(self.configfile, user="test")
assert not utils.get_credentials(self.configfile, user="test")
def test_parallel(self):
import time
def echo(i):
time.sleep(2)
time.sleep(0.5)
return i
tic = time.time()
resp = utils.parallel(echo, [1, 2, 3])
assert isinstance(resp, types.GeneratorType)
assert list(resp) == [1,2,3]
assert sorted(list(resp)) == [1, 2, 3]
toc = time.time()
assert (tic-toc) < 6000
assert (tic-toc) < 600
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2,3,4]
assert sorted(list(resp2)) == [1, 2, 3, 4]
class TestUtilsEnv(TestUtils):
configfile = None
def setUp(self):
if 'BITTER_CONFIG' in os.environ:
self.oldenv = os.environ['BITTER_CONFIG']
os.environ['BITTER_CONFIG'] = ''
def tearDown(self):
if hasattr(self, 'oldenv'):
os.environ['BITTER_CONFIG'] = self.oldenv