1
0
mirror of https://github.com/balkian/bitter.git synced 2025-07-11 16:02:20 +00:00

Compare commits

..

1 Commits

Author SHA1 Message Date
J. Fernando Sánchez
c940709df8 Fixed tweet error 2017-05-21 21:27:46 +02:00
34 changed files with 400 additions and 1403 deletions

1
.gitignore vendored
View File

@ -2,7 +2,6 @@ __pycache__
*.egg-info *.egg-info
dist dist
env env
.env
__* __*
.* .*
*.pyc *.pyc

View File

@ -2,6 +2,6 @@
From python:2.7-onbuild From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian Maintainer J. Fernando Sánchez @balkian
RUN pip install ".[server]" RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"] ENTRYPOINT ["bitter"]

View File

@ -2,6 +2,6 @@
From python:3.4-onbuild From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian Maintainer J. Fernando Sánchez @balkian
RUN pip install ".[server]" RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"] ENTRYPOINT ["bitter"]

View File

@ -2,6 +2,6 @@
From python:{{PYVERSION}}-onbuild From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian Maintainer J. Fernando Sánchez @balkian
RUN pip install ".[server]" RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"] ENTRYPOINT ["bitter"]

View File

@ -1,4 +1,4 @@
PYVERSIONS=3.5 PYVERSIONS=3.4 2.7
PYMAIN=$(firstword $(PYVERSIONS)) PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter NAME=bitter
REPO=balkian REPO=balkian
@ -19,7 +19,7 @@ Dockerfile-%: Dockerfile.template
dev-%: dev-%:
@docker start $(NAME)-dev$* || (\ @docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \ $(MAKE) build-$*; \
docker run -d -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \ docker run -d -w /usr/src/app/ -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\ )\
docker exec -ti $(NAME)-dev$* bash docker exec -ti $(NAME)-dev$* bash
@ -38,7 +38,7 @@ test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS)) testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-% test-%: build-%
docker run --rm -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ; docker run --rm -w /usr/src/app/ -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%: pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ; docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
@ -71,6 +71,6 @@ pip_upload:
pip_test: $(addprefix pip_test-,$(PYVERSIONS)) pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build run: build
docker run --rm --env-file $$PWD/.env -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' docker run --rm -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run .PHONY: test test-% build-% build test test_pip run

148
README.md
View File

@ -1,5 +1,4 @@
# Description #Description
There are two parts to bitter. There are two parts to bitter.
First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts). First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts).
Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper. Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper.
@ -23,153 +22,16 @@ wq = easy()
print(wq.users.show(user_name='balkian')) print(wq.users.show(user_name='balkian'))
``` ```
# Credentials format
You can also make custom calls to the API through the command line.
e.g. to get the latest 500 tweets by the python software foundation:
``` ```
bitter api statuses/user_timeline --id thepsf --count 500 {"user": "balkian", "consumer_secret": "xxx", "consumer_key": "xxx", "token_key": "xxx", "token_secret": "xxx"}
``` ```
By default, bitter uses '~/.bitter-credentials.json', but you may choose a different file:
## Adding credentials
``` ```
bitter --config <YOUR CONFIGURATION FILE> credentials add python -m bitter -c <credentials_file> ...
```
You can specify the parameters in the command or let the command line guide you through the process.
# Examples
## Downloading a list of tweets
Bitter can download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.
You can even specify the column number for tweet ids.
Bitter will not try to download
```
Usage: bitter tweet get_all [OPTIONS] TWEETSFILE
Download tweets from a list of tweets in a CSV file. The result is stored
as individual json files in your folder of choice.
Options:
-f, --folder TEXT
-d, --delimiter TEXT
-h, --header Discard the first line (use it as a header)
-q, --quotechar TEXT
-c, --column INTEGER
--help Show this message and exit.
```
For instance, this will download `tweet_ids.csv` in the `tweet_info` folder:
```
bitter tweet get_all -f tweet_info tweet_ids.csv
```
## Downloading a list of users
Bitter downloads users and tweets in a similar way:
```
Usage: bitter users get_all [OPTIONS] USERSFILE
Download users from a list of user ids/screen names in a CSV file. The
result is stored as individual json files in your folder of choice.
Options:
-f, --folder TEXT
-d, --delimiter TEXT
-h, --header Discard the first line (use it as a header)
-q, --quotechar TEXT
-c, --column INTEGER
--help Show this message and exit.
```
The only difference is that users can be downloaded via `screen_name` or `user_id`.
This method does not try to resolve screen names to user ids, so users may be downloaded more than once if they appear in both ways.
## Downloading a stream
```
Usage: bitter stream get [OPTIONS]
Options:
-l, --locations TEXT
-t, --track TEXT
-f, --file TEXT File to store the stream of tweets. Default: standard output
-p, --politelyretry Politely retry after a hangup/connection error
--help Show this message and exit.
```
```
bitter --config .bitter.yaml stream get
```
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
## REST queries
In newer versions of bitter, individual methods to download tweets/users using the REST API are being replaced with a generic method to call the API.
```
bitter api <URL endpoint> --parameter VALUE ... | [--tweets | --users] [--max_count MAX_COUNT] [--count COUNT_PER_CALL]
```
For instance:
```
# Get 100 tweets that mentioned Obama after tweet 942689870501302300
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama
```
That is equivalent to this call to the api: `api/1.1/searc/tweets?since_id=942689870501302300&count=100&q=Obama`.
The flags `--tweets` and `--users` are optional.
If you use them, bitter will try to intelligently fetch all the tweets/users by using pagination with the API.
For example:
```
# Download 1000 tweets, 100 tweets per call.
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama --max_count=1000 --tweets
```
```
# Download all the followers of @balkian
bitter api 'followers/list' --_id balkian --users --max_count -1
```
Note that some reserved words (such as `id`) have to be preceeded by an underscore.
This limitation is imposed by the python-twitter library.
# Configuration format
```
credentials:
- user: "balkian"
consumer_secret: "xxx"
consumer_key: "xxx"
token_key: "xxx"
token_secret: "xxx"
- user: ....
```
By default, bitter uses '~/.bitter.yaml', but you may choose a different file:
```
python -m bitter --config <config_file> ...
```
Or use an environment variable:
```
export BITTER_CONFIG=$(cat myconfig.yaml)
``` ```
# Server # Server

View File

@ -1,10 +0,0 @@
Scripts to process jsonlines
To get the jsonlines file, you can use the streaming API or the search api, like so:
```
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
```
To keep track of the query that generated the file, you can save the command in a text file.
For instance, the example above is also in `example_query.sh`.

View File

@ -1 +0,0 @@
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines

View File

@ -1,13 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export FIELDS="created_at,id,text"
for i in "$@"
do
OUTPUT=$i.hashtags.csv
echo "$FIELDS" > $OUTPUT
pv -l $i -N "hashtags $i" | jq -r '. | .created_at as $created_at | .id_str as $id | .entities.hashtags | select(. != null) | .[] | [$created_at, $id, .text] | @csv' >> $OUTPUT
done

View File

@ -1,15 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
for i in "$@"
do
REPLYOUTPUT=$i.replies.csv
RTOUTPUT=$i.rts.csv
echo 'created_at,id,user_id,reply_user_id' > $REPLYOUTPUT
echo 'created_at,id,user_id,rt_user_id' > $RTOUTPUT
pv -l -N "$i" $i | jq -r '. | select(.in_reply_to_user_id_str != null) | [.created_at, .id_str, .user.id_str, .in_reply_to_user_id_str] | @csv' >> $REPLYOUTPUT
pv -l -N "$i" $i | jq -r '. | select(.retweeted_status != null) | [.created_at, .retweeted_status.id_str, .user.id_str, .retweeted_status.user.id_str] | @csv' >> $RTOUTPUT
done

View File

@ -1,16 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export QUERY='.limit | select(. != null) | [.timestamp_ms, .track] | @csv'
export FIELDS="timestamp,track"
for i in "$@"
do
OUTPUT=$i.limits.csv
echo $FIELDS > $OUTPUT
pv -N "$i limits" -l $i | jq -r "$QUERY" >> $OUTPUT
done

View File

@ -1,16 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export QUERY='select(.id != null) | .id_str as $id | .entities.urls[] | select(.expanded_url | select(. != null) | contains("open.spotify") or contains("youtube.com") or contains("youtu.be")) | [$id, .expanded_url] | @csv'
export FIELDS="id,url"
for i in "$@"
do
OUTPUT=$i.media.csv
echo $FIELDS > $OUTPUT
pv -N "$i media" -l $i | jq -r "$QUERY" >> $OUTPUT
done

View File

@ -1,28 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export USER_FIELDS="\$created_at,\
.id_str,\
.screen_name,\
.followers_count,\
.lang,\
.description,\
.statuses_count,\
.favourites_count,\
.friends_count,\
.created_at,\
.name,\
.location,\
.listed_count,\
.time_zone\
"
for i in "$@"
do
OUTPUT=$i.users.csv
echo \#$USER_FIELDS > $OUTPUT
jq -r ".created_at as \$created_at | .user,.retweeted_status.user | select(. != null) | [$USER_FIELDS] | @csv " $i | pv -N "$i" -l >> $OUTPUT
done

View File

@ -1,32 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
FIELDS=".id_str,\
.user.screen_name,\
.user.id,\
.favorite_count,\
.retweet_count,\
.quote_count,\
.reply_count,\
.created_at,\
.lang,\
.in_reply_to_user_id_str,\
.in_reply_to_status_id_str,\
.retweeted_status.id_str,\
.retweeted_status.user.id,\
.retweeted_status.favorite_count,\
.retweeted_status.retweet_count,\
.retweeted_status.quote_count,\
.retweeted_status.reply_count,\
.retweeted_status.created_at\
"
for i in "$@"
do
OUTPUT=$i.tweets.csv
echo "$FIELDS" | sed -e 's/,[ \t\n]*\./,/g' | sed -e 's/^[#]\?\.//' > $OUTPUT
jq -r "[$FIELDS]|@csv" $i | pv -N "$i" -l >> $OUTPUT
done

View File

@ -1,17 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
QUERY='.| select(.retweeted_status != null) | .retweeted_status | .id_str as $rt_id | .extended_tweet | select(. != null) | [$rt_id,.full_text]|@csv'
HEADER='rt_id,full_text'
for i in "$@"
do
OUTPUT=$i.full_text.csv
echo $HEADER > $OUTPUT
jq "$QUERY" $i | pv -N "$i" -l >> $OUTPUT
sort -u $OUTPUT -o $OUTPUT
sed -ri 's/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g' $OUTPUT
done

View File

@ -1,16 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
QUERY='(.full_text // .retweeted_status.full_text) as $text | [ .id_str,$text ] | @csv'
HEADER='id,text'
for i in "$@"
do
OUTPUT=$i.text.csv
echo $HEADER > $OUTPUT
pv -l -N "$i" $i | jq -r "$QUERY" >> $OUTPUT
# sed -ri s/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g $OUTPUT
done

View File

@ -1,10 +0,0 @@
if [ "$#" -lt 2 ]
then
echo "Find edge lines in a file that contain one of the users in a user list."
echo ""
echo "Usage: $0 <file with edges> <file with the list of users>"
exit 1
fi
pv -c -N 'read' "$1" | grep -F -w -f "$2" | pv -lc -N 'found'

View File

@ -1,23 +0,0 @@
import pandas as pd
def read_rts(rtsfile, tweetsfile):
tweets = pd.read_csv(tweetsfile, index_col=0)
rts = pd.read_csv(rtsfile, index_col=1)
merged = rts.groupby(by=['id', 'rt_user_id']).size().rename('count').reset_index(level=1).merge(tweets, left_index=True, right_index=True)
return merged.sort_values(by='count', ascending=False)
def read_tweets(tweetsfile):
'''When the dataset is small enough, we can load tweets as-in'''
with open(tweetsfile) as f:
header = f.readline().strip().split(',')
dtypes = {}
for key in header:
if key.endswith('_str') or key.endswith('.id'):
dtypes[key] = object
tweets = pd.read_csv(tweetsfile, dtype=dtypes, index_col=0)
return tweets
if __name__ == '__main__':
import argparse

View File

@ -1 +0,0 @@
cat "$@" | awk -F"," '{print tolower($3)}' | sort | uniq -c | sort -h

View File

@ -1,14 +0,0 @@
MAX_TAGS=100
function get_text {
while read line
do
echo $line
rtid=$(echo $line | awk -F"," '{print $2}')
text=$(grep -m 1 $rtid *.text.csv)
echo "$line - $text"
done < "/dev/stdin"
}
cat "$@" | get_text

View File

@ -1,15 +0,0 @@
MAX_TAGS=100
function get_text {
while read line
do
echo $line
rtid=$(echo $line | awk '{print $2}')
count=$(echo $line | awk '{print $1}')
text=$(grep -m 1 $rtid *.text.csv)
echo "$line - $text"
done < "/dev/stdin"
}
cat "$@" | awk -F"," '{print tolower($2)}' | sort | uniq -c | sort -h | tail -n $MAX_TAGS | get_text

View File

@ -1 +1 @@
0.10.3 0.7.1

View File

@ -6,12 +6,10 @@ http://github.com/balkian/bitter
import os import os
from .version import __version__ from .version import __version__
from . import config as bconf
def easy(conffile=bconf.CONFIG_FILE): def easy(*args, **kwargs):
from .crawlers import TwitterQueue from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
return TwitterQueue.from_config(conffile=conffile)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ] __all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@ -21,192 +21,60 @@ if sys.version_info <= (3, 0):
from contextlib2 import ExitStack from contextlib2 import ExitStack
else: else:
from contextlib import ExitStack from contextlib import ExitStack
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def serialize(function):
'''Common options to serialize output to CSV or other formats'''
@click.option('--fields', help='Provide a list of comma-separated fields to print.', default='', type=str)
@click.option('--ignore_missing', help='Do not show warnings for missing fields.', is_flag=True)
@click.option('--header', help='Header that will be printed at the beginning of the file', default=None)
@click.option('--csv', help='Print each object as a csv row.', is_flag=True)
@click.option('--jsonlines', '--json', help='Print each object as JSON in a new line.', is_flag=True)
@click.option('--indented', help='Print each object as an indented JSON object', is_flag=True)
@click.option('--outdelimiter', help='Delimiter for some output formats, such as CSV. It defaults to \t', default='\t')
@click.option('--outfile', help='Output file. It defaults to STDOUT', default=sys.stdout)
def decorated(fields, ignore_missing, header, csv, jsonlines, indented, outfile, outdelimiter, **kwargs):
it = function(**kwargs)
outformat = 'json'
if csv:
outformat = 'csv'
elif jsonlines:
outformat = 'jsonlines'
elif indented:
outformat = 'indented'
return utils.serialized(it, outfile, outformat=outformat, fields=fields.split(','), ignore_missing=ignore_missing, header=header, delimiter=outdelimiter)
return decorated
@click.group() @click.group()
@click.option("--verbose", is_flag=True) @click.option("--verbose", is_flag=True)
@click.option("--logging_level", required=False, default='WARN') @click.option("--logging_level", required=False, default='WARN')
@click.option('--config', show_default=True, default=bconf.CONFIG_FILE) @click.option("--config", required=False)
@click.option('--credentials', show_default=True, help="DEPRECATED: If specified, these credentials will be copied to the configuratation file.", default=bconf.CREDENTIALS) @click.option('-c', '--credentials', show_default=True, default='~/.bitter-credentials.json')
@click.pass_context @click.pass_context
def main(ctx, verbose, logging_level, config, credentials): def main(ctx, verbose, logging_level, config, credentials):
logging.basicConfig(level=getattr(logging, logging_level)) logging.basicConfig(level=getattr(logging, logging_level))
ctx.obj = {} ctx.obj = {}
ctx.obj['VERBOSE'] = verbose ctx.obj['VERBOSE'] = verbose
bconf.CONFIG_FILE = config ctx.obj['CONFIG'] = config
bconf.CREDENTIALS = credentials bconf.CREDENTIALS = credentials
if os.path.exists(utils.get_config_path(credentials)): utils.create_credentials(credentials)
utils.copy_credentials_to_config(credentials, config)
@main.group(invoke_without_command=True)
@click.pass_context
def credentials(ctx):
if ctx.invoked_subcommand is not None:
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for worker in wq.queue:
print('#'*20)
try:
resp = worker.client.application.rate_limit_status()
print(worker.name)
except Exception as ex:
print('{}: AUTHENTICATION ERROR: {}'.format(worker.name, ex) )
@credentials.command('limits')
@click.option('--no_aggregate', is_flag=True, default=False,
help=('Print limits from all workers. By default, limits are '
'aggregated (summed).'))
@click.option('--no_diff', is_flag=True, default=False,
help=('Print all limits. By default, only limits that '
'have been consumed will be shown.'))
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, no_aggregate, no_diff, url):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
limits = {}
if url:
print('URL is: {}'.format(url))
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
for urlimits in resp['resources'].values():
for url, value in urlimits.items():
if url not in limits:
limits[url] = {}
glob = limits[url].get('global', {})
limits[url][worker.name] = value
for k in ['limit', 'remaining']:
if k not in glob:
glob[k] = 0
glob[k] += value[k]
limits[url]['global'] = glob
for url, lims in limits.items():
worker_list = lims.keys() if no_aggregate else ['global', ]
url_printed = False
for worker in worker_list:
vals = lims[worker]
consumed = vals['limit'] - vals['remaining']
if no_diff or consumed:
if not url_printed:
print(url)
url_printed = True
print('\t', worker, ':')
print('\t\t', vals)
@credentials.command('add')
@click.option('--consumer_key', default=None)
@click.option('--consumer_secret', default=None)
@click.option('--token_key', default=None)
@click.option('--token_secret', default=None)
@click.argument('user_name')
def add(user_name, consumer_key, consumer_secret, token_key, token_secret):
if not consumer_key:
consumer_key = click.prompt('Please, enter your YOUR CONSUMER KEY')
if not consumer_secret:
consumer_secret = click.prompt('Please, enter your CONSUMER SECRET')
if not token_key:
token_key = click.prompt('Please, enter your ACCESS TOKEN')
if not token_secret:
token_secret = click.prompt('Please, enter your ACCESS TOKEN SECRET')
utils.add_credentials(conffile=bconf.CONFIG_FILE, user=user_name, consumer_key=consumer_key, consumer_secret=consumer_secret,
token_key=token_key, token_secret=token_secret)
click.echo('Credentials added for {}'.format(user_name))
@main.group() @main.group()
@click.pass_context @click.pass_context
def tweet(ctx): def tweet(ctx):
pass pass
@tweet.command('get') @tweet.command('get')
@click.option('-d', '--dry_run', is_flag=True, default=False) @click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False) @click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid') @click.argument('tweetid')
@serialize def get_tweet(tweetid, write, folder, update):
def get_tweet(tweetid, dry_run, folder, update): wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) utils.download_tweet(wq, tweetid, write, folder, update)
yield from utils.download_tweet(wq, tweetid, not dry_run, folder, update)
@tweet.command('get_all')
@tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file. @click.argument('tweetsfile', 'File with a list of tweets to look up')
The result is stored as individual json files in your folder of choice.''')
@click.argument('tweetsfile')
@click.option('-f', '--folder', default="tweets") @click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",")
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
@click.option('--skip', help='Discard the first DISCARD lines (use them as a header)', default=0)
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-q', '--quotechar', default='"')
@click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context @click.pass_context
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, nocache, skip, quotechar, commentchar, column): def get_tweets(ctx, tweetsfile, folder):
if update and not click.confirm('This may overwrite existing tweets. Continue?'): wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
click.echo('Cancelling') utils.download_tweets(wq, tweetsfile, folder)
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
status = tqdm('Queried')
failed = 0
for tid, obj in utils.download_tweets_file(wq, tweetsfile, folder, delimiter=delimiter, cache=not nocache,
skip=skip, quotechar=quotechar, commentchar=commentchar,
column=column, update=update, retry_failed=retry):
status.update(1)
if not obj:
failed += 1
status.set_description('Failed: %s. Queried' % failed, refresh=True)
continue
yield obj
@tweet.command('search') @tweet.command('search')
@click.argument('query') @click.argument('query')
@serialize @click.pass_context
@click.pass_context
def search(ctx, query): def search(ctx, query):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
yield from utils.search_tweet(wq, query) t = utils.search_tweet(wq, query)
print(json.dumps(t, indent=2))
@tweet.command('timeline') @tweet.command('timeline')
@click.argument('user') @click.argument('user')
@click.pass_context @click.pass_context
def timeline(ctx, user): def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.user_timeline(wq, user) t = utils.user_timeline(wq, user)
print(json.dumps(t, indent=2)) print(json.dumps(t, indent=2))
@ -228,48 +96,43 @@ def list_users(ctx, db):
@users.command('get') @users.command('get')
@click.argument('user') @click.argument('user')
@click.option('-d', '--dry_run', is_flag=True, default=False) @click.option('-w', '--write', is_flag=True, default=False)
@click.option('-f', '--folder', default="users") @click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False) @click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
@serialize def get_user(user, write, folder, update):
def get_user(user, dry_run, folder, update): wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) if not write:
yield from utils.download_user(wq, user, not dry_run, folder, update) u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
@users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file. print(js)
The result is stored as individual json files in your folder of choice.''')
@click.argument('usersfile')
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-n', '--nocache', is_flag=True, default=False, help='Do not cache results')
@click.option('-d', '--delimiter', default=",")
@click.option('--skip', help='Discard the first SKIP lines (e.g., use them as a header)',
is_flag=True, default=False)
@click.option('-q', '--quotechar', default='"')
@click.option('--commentchar', help='Lines starting with this character will be ignored', default=None)
@click.option('-c', '--column', type=int, default=0)
@serialize
@click.pass_context
def get_users(ctx, usersfile, folder, update, retry, nocache, delimiter, skip, quotechar, commentchar, column):
if update and not click.confirm('This may overwrite existing users. Continue?'):
click.echo('Cancelling')
return return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) if not os.path.exists(folder):
for i in utils.download_users_file(wq, usersfile, folder, delimiter=delimiter, os.makedirs(folder)
update=update, retry_failed=retry, file = os.path.join(folder, '%s.json' % user)
skip=skip, quotechar=quotechar, if not update and os.path.exists(file) and os.path.isfile(file):
cache=not nocache, print('User exists: %s' % user)
commentchar=commentchar, return
column=column): with open(file, 'w') as f:
yield i u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
print(js, file=f)
@users.command('get_all')
@click.argument('usersfile', 'File with a list of users to look up')
@click.option('-f', '--folder', default="users")
@click.pass_context
def get_users(ctx, usersfile, folder):
with open(usersfile) as f:
for line in f:
uid = line.strip()
ctx.invoke(get_user, folder=folder, user=uid, write=True)
@users.command('crawl') @users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.') @click.option('--db', required=True, help='Database to save all users.')
@click.option('--skip', required=False, default=0, help='Skip N lines from the file.') @click.option('--skip', required=False, default=0, help='Skip N lines from the file.')
@click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.') @click.option('--until', required=False, type=str, default=0, help='Skip all lines until ID.')
@click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.') @click.option('--threads', required=False, type=str, default=20, help='Number of crawling threads.')
@click.argument('usersfile') @click.argument('usersfile', 'File with a list of users to look up')
@click.pass_context @click.pass_context
def crawl_users(ctx, usersfile, skip, until, threads, db): def crawl_users(ctx, usersfile, skip, until, threads, db):
global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock global dburl, ids_queue, skipped, enqueued, collected, lastid, db_lock
@ -283,7 +146,7 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
return ExitStack() return ExitStack()
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads, logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue))) len(wq.queue)))
@ -374,9 +237,14 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
speed = (collected-lastcollected)/10 speed = (collected-lastcollected)/10
with statslock: with statslock:
lastcollected = collected lastcollected = collected
logger.info('Done!') logger.info('Done!')
@main.group('api')
def api():
pass
@main.group('extractor') @main.group('extractor')
@click.pass_context @click.pass_context
@click.option('--db', required=True, help='Database of users.') @click.option('--db', required=True, help='Database of users.')
@ -425,7 +293,7 @@ def network_extractor(ctx, as_json):
if as_json: if as_json:
import json import json
print(json.dumps(follower_map, indent=4)) print(json.dumps(follower_map, indent=4))
@extractor.command('users') @extractor.command('users')
@click.pass_context @click.pass_context
@ -447,7 +315,7 @@ def users_extractor(ctx):
@click.pass_context @click.pass_context
def extract(ctx, recursive, user, name, initfile): def extract(ctx, recursive, user, name, initfile):
print(locals()) print(locals())
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
dburi = ctx.obj['DBURI'] dburi = ctx.obj['DBURI']
utils.extract(wq, utils.extract(wq,
recursive=recursive, recursive=recursive,
@ -459,41 +327,31 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset') @extractor.command('reset')
@click.pass_context @click.pass_context
def reset_extractor(ctx): def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
db = ctx.obj['DBURI'] db = ctx.obj['DBURI']
session = make_session(db) session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False}) session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@api.command('limits')
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False), @click.argument('url', required=False)
help='''Issue a call to an endpoint of the Twitter API.''')
@click.argument('cmd', nargs=1)
@click.option('--tweets', is_flag=True, help='Fetch more tweets using smart pagination. Use --count to control how many tweets to fetch per call, and --max_count to set the number of desired tweets (or -1 to get as many as possible).', type=bool, default=False)
@click.option('--users', is_flag=True, help='Fetch more users using smart pagination. Use --count to control how many users to fetch per call, and --max_count to set the number of desired users (or -1 to get as many as possible).', type=bool, default=False)
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
@click.pass_context @click.pass_context
def api(ctx, cmd, tweets, users, api_args): def get_limits(ctx, url):
opts = {} wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
mappings = { for worker in wq.queue:
'id': '_id' resp = worker.client.application.rate_limit_status()
} print('#'*20)
i = iter(api_args) print(worker.name)
for k, v in zip(i, i): if url:
k = k.replace('--', '') limit = 'NOT FOUND'
if k in mappings: print('URL is: {}'.format(url))
k = mappings[k] cat = url.split('/')[1]
opts[k] = v if cat in resp['resources']:
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE) limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
if tweets: else:
resp = utils.consume_tweets(wq[cmd], **opts) print('Cat {} not found'.format(cat))
elif users: print('{}: {}'.format(url, limit))
resp = utils.consume_users(wq[cmd], **opts) else:
else: print(json.dumps(resp, indent=2))
resp = wq[cmd](**opts)
print(json.dumps(resp))
return
for i in resp:
print(json.dumps(i))
@main.command('server') @main.command('server')
@click.argument('CONSUMER_KEY', required=True) @click.argument('CONSUMER_KEY', required=True)
@ -504,20 +362,20 @@ def run_server(ctx, consumer_key, consumer_secret):
bconf.CONSUMER_SECRET = consumer_secret bconf.CONSUMER_SECRET = consumer_secret
from .webserver import app from .webserver import app
app.run(host='0.0.0.0') app.run(host='0.0.0.0')
@main.group() @main.group()
@click.pass_context @click.pass_context
def stream(ctx): def stream(ctx):
pass pass
@stream.command('get') @stream.command('get')
@click.option('-l', '--locations', default=None) @click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None) @click.option('-t', '--track', default=None)
@click.option('-f', '--file', default=None, help='File to store the stream of tweets') @click.option('-f', '--file', help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True) @click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context @click.pass_context
def get_stream(ctx, locations, track, file, politelyretry): def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1) wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
query_args = {} query_args = {}
if locations: if locations:
@ -536,14 +394,10 @@ def get_stream(ctx, locations, track, file, politelyretry):
iterator = wq.statuses.sample() iterator = wq.statuses.sample()
else: else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75") iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
try: for i in iterator:
for i in iterator: yield i
yield i if not politelyretry:
if not politelyretry: return
return
except Exception:
if not politelyretry:
raise ex
thishangup = time.time() thishangup = time.time()
if thishangup - lasthangup < 60: if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.') raise Exception('Too many hangups in a row.')
@ -557,23 +411,23 @@ def get_stream(ctx, locations, track, file, politelyretry):
@stream.command('read') @stream.command('read')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True) @click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False) @click.option('-t', '--tail', is_flag=True, help='Keep reading from the file, like tail', type=bool, default=False)
@click.pass_context @click.pass_context
def read_stream(ctx, file, tail): def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail): for tweet in utils.read_file(file, tail=tail):
try: try:
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['created_at'], screen_name=tweet['user']['screen_name'], text=tweet['text'])) print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
except (KeyError, TypeError): except (KeyError, TypeError):
print('Raw tweet: {}'.format(tweet)) print('Raw tweet: {}'.format(tweet))
@stream.command('tags') @stream.command('tags')
@click.option('-f', '--file', help='File to read the stream of tweets from', required=True) @click.option('-f', '--file', help='File to read the stream of tweets from', required=True)
@click.argument('limit', required=False, default=None, type=int) @click.argument('limit', required=False, default=None, type=int)
@click.pass_context @click.pass_context
def tags_stream(ctx, file, limit): def tags_stream(ctx, file, limit):
c = utils.get_hashtags(utils.read_file(file)) c = utils.get_hashtags(utils.read_file(file))
for count, tag in c.most_common(limit): for count, tag in c.most_common(limit):
print(u'{} - {}'.format(count, tag)) print(u'{} - {}'.format(count, tag))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -3,7 +3,7 @@ Common configuration for other modules.
It is not elegant, but it works with flask and the oauth decorators. It is not elegant, but it works with flask and the oauth decorators.
Using this module allows you to change the config before loading any other module. Using this module allows you to change the config before loading any other module.
E.g.: E.g.:
import bitter.config as c import bitter.config as c
c.CREDENTIALS="/tmp/credentials" c.CREDENTIALS="/tmp/credentials"
@ -11,4 +11,3 @@ E.g.:
app.run() app.run()
''' '''
CREDENTIALS = '~/.bitter-credentials.json' CREDENTIALS = '~/.bitter-credentials.json'
CONFIG_FILE = '~/.bitter.yaml'

View File

@ -10,7 +10,6 @@ from twitter import *
from collections import OrderedDict from collections import OrderedDict
from threading import Lock from threading import Lock
from itertools import islice from itertools import islice
from functools import partial
try: try:
import itertools.ifilter as filter import itertools.ifilter as filter
except ImportError: except ImportError:
@ -39,9 +38,6 @@ class AttrToFunc(object):
else: else:
return extend_call(k) return extend_call(k)
def __getitem__(self, k):
return partial(self.handler, self.__uriparts+k.split('/'))
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
# for i, a in enumerate(args)e # for i, a in enumerate(args)e
# kwargs[i] = a # kwargs[i] = a
@ -58,18 +54,6 @@ class FromCredentialsMixin(object):
wq.ready(cls.worker_class(cred["user"], cred)) wq.ready(cls.worker_class(cred["user"], cred))
return wq return wq
class FromConfigMixin(object):
@classmethod
def from_config(cls, config=None, conffile=None, max_workers=None):
wq = cls()
if not config:
with utils.config(conffile) as c:
config = c
for cred in islice(config['credentials'], max_workers):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class TwitterWorker(object): class TwitterWorker(object):
api_class = None api_class = None
@ -91,12 +75,6 @@ class TwitterWorker(object):
self._client = self.api_class(auth=auth) self._client = self.api_class(auth=auth)
return self._client return self._client
def __repr__(self):
msg = '<{} for {}>'.format(self.__class__.__name__, self.name)
if self.busy:
msg += ' [busy]'
return msg
class RestWorker(TwitterWorker): class RestWorker(TwitterWorker):
api_class = Twitter api_class = Twitter
@ -115,14 +93,13 @@ class RestWorker(TwitterWorker):
def get_wait(self, uriparts): def get_wait(self, uriparts):
limits = self.get_limit(uriparts) limits = self.get_limit(uriparts)
if limits.get('remaining', 1) > 0: if limits['remaining'] > 0:
return 0 return 0
reset = limits.get('reset', 0) reset = limits.get('reset', 0)
now = time.time() now = time.time()
return max(0, (reset-now)) return max(0, (reset-now))
def get_limit(self, uriparts): def get_limit(self, uriparts):
uriparts = list(u for u in uriparts if u)
uri = '/'+'/'.join(uriparts) uri = '/'+'/'.join(uriparts)
for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items(): for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
if ix.startswith(uri): if ix.startswith(uri):
@ -155,7 +132,7 @@ class RestWorker(TwitterWorker):
class QueueException(BaseException): class QueueException(BaseException):
pass pass
class QueueMixin(AttrToFunc, FromCredentialsMixin, FromConfigMixin): class QueueMixin(AttrToFunc, FromCredentialsMixin):
def __init__(self, wait=True): def __init__(self, wait=True):
logger.debug('Creating worker queue') logger.debug('Creating worker queue')
self.queue = set() self.queue = set()

View File

@ -3,13 +3,10 @@ import json
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import BigInteger, Integer, Text, Boolean from sqlalchemy.types import BigInteger, Integer, Text, Boolean
from sqlalchemy.schema import ForeignKey
from sqlalchemy.pool import SingletonThreadPool
from sqlalchemy import Column, Index from sqlalchemy import Column, Index
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from functools import wraps
Base = declarative_base() Base = declarative_base()
@ -88,48 +85,34 @@ class ExtractorEntry(Base):
user = Column(BigInteger, index=True) user = Column(BigInteger, index=True)
cursor = Column(BigInteger, default=-1) cursor = Column(BigInteger, default=-1)
pending = Column(Boolean, default=False) pending = Column(Boolean, default=False)
errors = Column(Text, default="")
busy = Column(Boolean, default=False)
class Search(Base):
__tablename__ = 'search_queries'
id = Column(Integer, primary_key=True, index=True, unique=True)
endpoint = Column(Text, comment="Endpoint URL")
attrs = Column(Text, comment="Text version of the dictionary of parameters")
count = Column(Integer)
current_count = Column(Integer)
current_id = Column(BigInteger, comment='Oldest ID retrieved (should match max_id when done)')
since_id = Column(BigInteger)
class SearchResults(Base):
__tablename__ = 'search_results'
id = Column(Integer, primary_key=True, index=True, unique=True)
search_id = Column(ForeignKey('search_queries.id'))
resource_id = Column(Text)
def memoize(f):
memo = {}
@wraps(f)
def helper(self, **kwargs):
st = dict_to_str(kwargs)
key = (self.__uriparts, st)
if key not in memo:
memo[key] = f(self, **kwargs)
return memo[key]
return helper
def make_session(url): def make_session(url):
if not isinstance(url, str): engine = create_engine(url)#, echo=True)
print(url)
raise Exception("FUCK")
engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True)
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
session = Session() session = Session()
return session return session
def dict_to_str(args): def test(db='sqlite:///users.db'):
return json.dumps(args, sort_keys=True)
from sqlalchemy import exists
session = make_session(db)
our_user = session.query(User).first()
print(our_user.name)
print(session.query(User).count())
fake_user = User(name="Fake user")
session.add(fake_user)
session.commit()
print(session.query(User).count())
print(session.query(exists().where(User.name == "Fake user")).scalar())
fake_committed = session.query(User).filter_by(name="Fake user").first()
print(fake_committed.id)
print(fake_committed.name)
session.delete(fake_committed)
session.commit()
print(session.query(User).count())
print(list(session.execute('SELECT 1 from users where id=\'%s\'' % 1548)))

View File

@ -3,9 +3,6 @@ from __future__ import print_function
import logging import logging
import time import time
import json import json
import yaml
import csv
import io
import signal import signal
import sys import sys
@ -13,23 +10,18 @@ import sqlalchemy
import os import os
import multiprocessing import multiprocessing
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from multiprocessing import Queue
import queue
import threading
from select import select
import operator
from functools import partial, reduce
from tqdm import tqdm from tqdm import tqdm
from itertools import islice, chain from itertools import islice, chain
from contextlib import contextmanager from contextlib import contextmanager
try:
from itertools import izip_longest
except ImportError:
from itertools import zip_longest
from collections import Counter from collections import Counter
from random import choice
from builtins import map, filter from builtins import map, filter
@ -39,12 +31,6 @@ from bitter.models import Following, User, ExtractorEntry, make_session
from bitter import config from bitter import config
# Fix Python 2.x.
try:
UNICODE_EXISTS = bool(type(unicode))
except NameError:
unicode = lambda s: str(s)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -52,100 +38,38 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!') logger.info('You pressed Ctrl+C!')
sys.exit(0) sys.exit(0)
def chunk(iterable, n): def chunk(iterable, n):
it = iter(iterable) it = iter(iterable)
return iter(lambda: tuple(islice(it, n)), ()) return iter(lambda: tuple(islice(it, n)), ())
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()): def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
source = chunk(source, chunksize) source = chunk(source, chunksize)
p = ThreadPool(numcpus*2) p = ThreadPool(numcpus*2)
for i in chain.from_iterable(p.imap_unordered(func, source, int(1000/numcpus))):
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as ex:
print('Exception on parallel thread: {}'.format(ex), file=sys.stderr)
results = p.imap_unordered(wrapped_func, source)
for i in chain.from_iterable(results):
yield i yield i
def get_credentials_path(credfile=None):
def get_config_path(conf=None): if not credfile:
if not conf: if config.CREDENTIALS:
if config.CONFIG_FILE: credfile = config.CREDENTIALS
conf = config.CONFIG_FILE
else: else:
raise Exception('No valid config file') raise Exception('No valid credentials file')
return os.path.expanduser(conf) return os.path.expanduser(credfile)
def copy_credentials_to_config(credfile, conffile=None):
p = get_config_path(credfile)
with open(p) as old:
for line in old:
cred = json.loads(line.strip())
add_credentials(conffile, **cred)
def save_config(conf, conffile=None):
with config(conffile) as c:
c.clear()
c.update(conf)
@contextmanager @contextmanager
def config(conffile=None): def credentials_file(credfile, *args, **kwargs):
d = read_config(conffile) p = get_credentials_path(credfile)
try: with open(p, *args, **kwargs) as f:
yield d yield f
finally:
write_config(d, conffile)
def iter_credentials(credfile=None):
with credentials_file(credfile) as f:
for l in f:
yield json.loads(l.strip())
def read_config(conffile): def get_credentials(credfile=None, inverse=False, **kwargs):
p = conffile and get_config_path(conffile)
if p:
if not os.path.exists(p):
raise IOError('{} file does not exist.'.format(p))
f = open(p, 'r')
elif 'BITTER_CONFIG' not in os.environ:
raise Exception('No config file or BITTER_CONFIG env variable.')
else:
f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n'))
return yaml.load(f, Loader=yaml.SafeLoader) or {'credentials': []}
def write_config(conf, conffile=None):
if not conf:
conf = {'credentials': []}
if conffile:
p = get_config_path(conffile)
with open(p, 'w') as f:
yaml.dump(conf, f)
else:
os.environ['BITTER_CONFIG'] = yaml.dump(conf)
def iter_credentials(conffile=None):
with config(conffile) as c:
for i in c['credentials']:
yield i
def create_config_file(conffile=None):
if not conffile:
return
conffile = get_config_path(conffile)
with open(conffile, 'a'):
pass
write_config(None, conffile)
def get_credentials(conffile=None, inverse=False, **kwargs):
creds = [] creds = []
for i in iter_credentials(conffile): for i in iter_credentials(credfile):
matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items())) matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items()))
if matches and not inverse: if matches and not inverse:
creds.append(i) creds.append(i)
@ -153,23 +77,24 @@ def get_credentials(conffile=None, inverse=False, **kwargs):
creds.append(i) creds.append(i)
return creds return creds
def create_credentials(credfile=None):
credfile = get_credentials_path(credfile)
with credentials_file(credfile, 'a'):
pass
def delete_credentials(conffile=None, **creds): def delete_credentials(credfile=None, **creds):
tokeep = get_credentials(conffile, inverse=True, **creds) tokeep = get_credentials(credfile, inverse=True, **creds)
with config(conffile) as c: with credentials_file(credfile, 'w') as f:
c['credentials'] = list(tokeep) for i in tokeep:
f.write(json.dumps(i))
f.write('\n')
def add_credentials(credfile=None, **creds):
def add_credentials(conffile=None, **creds): exist = get_credentials(credfile, **creds)
try: if not exist:
exist = get_credentials(conffile, **creds) with credentials_file(credfile, 'a') as f:
except IOError: f.write(json.dumps(creds))
exist = False f.write('\n')
create_config_file(conffile)
if exist:
return
with config(conffile) as c:
c['credentials'].append(creds)
def get_hashtags(iter_tweets, best=None): def get_hashtags(iter_tweets, best=None):
@ -178,27 +103,19 @@ def get_hashtags(iter_tweets, best=None):
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {})) c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c return c
def read_file(filename, tail=False): def read_file(filename, tail=False):
if filename == '-': with open(filename) as f:
f = sys.stdin while True:
else: line = f.readline()
f = open(filename) if line not in (None, '', '\n'):
try: tweet = json.loads(line.strip())
while True: yield tweet
line = f.readline() else:
if line not in (None, '', '\n'): if tail:
tweet = json.loads(line.strip()) time.sleep(1)
yield tweet else:
else: return
if tail:
time.sleep(1)
else:
return
finally:
if f != sys.stdin:
close(f)
def get_users(wq, ulist, by_name=False, queue=None, max_users=100): def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
t = 'name' if by_name else 'uid' t = 'name' if by_name else 'uid'
@ -227,7 +144,6 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
else: else:
yield user yield user
def trim_user(user): def trim_user(user):
if 'status' in user: if 'status' in user:
del user['status'] del user['status']
@ -241,22 +157,14 @@ def trim_user(user):
return user return user
def add_user(user, dburi=None, session=None, update=False): def add_user(session, user, enqueue=False):
if not session:
session = make_session(dburi)
user = trim_user(user) user = trim_user(user)
olduser = session.query(User).filter(User.id == user['id']) olduser = session.query(User).filter(User.id==user['id'])
if olduser: if olduser:
if not update:
return
olduser.delete() olduser.delete()
nuser = User() user = User(**user)
for key, value in user.items(): session.add(user)
setattr(nuser, key, value) if extract:
user = nuser
if update:
session.add(user)
logger.debug('Adding entry') logger.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry: if not entry:
@ -266,201 +174,131 @@ def add_user(user, dburi=None, session=None, update=False):
entry.pending = True entry.pending = True
entry.cursor = -1 entry.cursor = -1
session.commit() session.commit()
session.close()
def download_entry(wq, entry_id, dburi=None, recursive=False):
session = make_session(dburi)
if not session:
raise Exception("Provide dburi or session")
logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id)))
entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first()
user = session.query(User).filter(User.id == entry.user).first()
download_user(wq, session, user, entry, recursive)
session.close()
def crawl_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
total_followers = user.followers_count
if total_followers > max_followers:
entry.pending = False
logger.info("Too many followers for user: %s" % user.screen_name)
session.add(entry)
session.commit()
return
if not entry:
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id)
session.add(entry)
session.commit()
pending = True
cursor = entry.cursor
uid = user.id
name = user.name
logger.info("#"*20)
logger.info("Getting %s - %s" % (uid, name))
logger.info("Cursor %s" % cursor)
logger.info("Using account: %s" % wq.name)
_fetched_followers = 0
def fetched_followers():
return session.query(Following).filter(Following.isfollowed==uid).count()
attempts = 0
while cursor > 0 or fetched_followers() < total_followers:
try:
resp = wq.followers.ids(user_id=uid, cursor=cursor)
except TwitterHTTPError as ex:
attempts += 1
if ex.e.code in (401, ) or attempts > 3:
logger.info('Not authorized for user: {}'.format(uid))
entry.errors = ex.message
break
if 'ids' not in resp:
logger.info("Error with id %s %s" % (uid, resp))
entry.pending = False
entry.errors = "No ids in response: %s" % resp
break
logger.info("New followers: %s" % len(resp['ids']))
if recursive:
newusers = get_users(wq, resp)
for newuser in newusers:
add_user(session=session, user=newuser)
if 'ids' not in resp or not resp['ids']:
logger.info('NO IDS in response')
break
for i in resp['ids']:
existing_user = session.query(Following).\
filter(Following.isfollowed == uid).\
filter(Following.follower == i).first()
now = int(time.time())
if existing_user:
existing_user.created_at_stamp = now
else:
f = Following(isfollowed=uid,
follower=i,
created_at_stamp=now)
session.add(f)
logger.info("Fetched: %s/%s followers" % (fetched_followers(),
total_followers))
entry.cursor = resp["next_cursor"]
session.add(entry)
session.commit()
logger.info("Done getting followers for %s" % uid)
entry.pending = False
entry.busy = False
session.add(entry)
session.commit()
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
sys.stdout.flush()
def classify_user(id_or_name, screen_names, user_ids):
try:
int(id_or_name)
user_ids.append(id_or_name)
logger.debug("Added user id")
except ValueError:
logger.debug("Added screen_name")
screen_names.append(id_or_name.split('@')[-1])
# TODO: adapt to the crawler
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None): def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
w = wq.next()
if not dburi: if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name dburi = 'sqlite:///%s.db' % extractor_name
session = make_session(dburi) session = make_session(dburi)
session.query(ExtractorEntry).update({ExtractorEntry.busy: False})
session.commit()
screen_names = []
user_ids = []
if not (user or initfile): def classify_user(id_or_name):
logger.info('Using pending users from last session') try:
int(user)
user_ids.append(user)
logger.info("Added user id")
except ValueError:
logger.info("Added screen_name")
screen_names.append(user.split('@')[-1])
if user:
classify_user(user)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user)
else: else:
screen_names = [] logger.info('Using pending users from last session')
user_ids = []
if user:
classify_user(user, screen_names, user_ids)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user, screen_names, user_ids)
def missing_user(ix, column=User.screen_name):
res = session.query(User).filter(column == ix).count() == 0
if res:
logger.info("Missing user %s. Count: %s" % (ix, res))
return res
screen_names = list(filter(missing_user, screen_names)) nusers = list(get_users(wq, screen_names, by_name=True))
user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids)) if user_ids:
nusers = [] nusers += list(get_users(wq, user_ids, by_name=False))
logger.info("Missing user ids: %s" % user_ids)
logger.info("Missing screen names: %s" % screen_names)
if screen_names:
nusers = list(get_users(wq, screen_names, by_name=True))
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
for i in nusers: for i in nusers:
add_user(dburi=dburi, user=i) add_user(session, i, enqueue=True)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar() total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logger.info('Total users: {}'.format(total_users)) logger.info('Total users: {}'.format(total_users))
def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logger.info('Pending: {}'.format(pending))
return pending
de = partial(download_entry, wq, dburi=dburi) while pending_entries() > 0:
pending = pending_entries(dburi) logger.info("Using account: %s" % w.name)
session.close()
with tqdm(parallel(de, pending), desc='Downloading users', total=total_users) as tq:
for i in tq:
tq.write('Got {}'.format(i))
logger.info("Got %s" % i)
def pending_entries(dburi):
session = make_session(dburi)
while True:
candidate, entry = session.query(User, ExtractorEntry).\ candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\ filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\ filter(ExtractorEntry.pending == True).\
filter(ExtractorEntry.busy == False).\ order_by(User.followers_count).first()
order_by(User.followers_count).first() if not candidate:
if candidate: break
entry.busy = True pending = True
session.add(entry) cursor = entry.cursor
session.commit() uid = candidate.id
yield int(entry.id) uobject = session.query(User).filter(User.id==uid).first()
continue name = uobject.screen_name if uobject else None
if session.query(ExtractorEntry).\
filter(ExtractorEntry.busy == True).count() > 0: logger.info("#"*20)
time.sleep(1) logger.info("Getting %s - %s" % (uid, name))
continue logger.info("Cursor %s" % cursor)
logger.info("No more pending entries") logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users))
break try:
session.close() resp = wq.followers.ids(user_id=uid, cursor=cursor)
except TwitterHTTPError as ex:
if ex.e.code in (401, ):
logger.info('Not authorized for user: {}'.format(uid))
resp = {}
if 'ids' in resp:
logger.info("New followers: %s" % len(resp['ids']))
if recursive:
newusers = get_users(wq, resp)
for user in newusers:
add_user(session, newuser, enqueue=True)
for i in resp['ids']:
existing_user = session.query(Following).\
filter(Following.isfollowed==uid).\
filter(Following.follower==i).first()
now = int(time.time())
if existing_user:
existing_user.created_at_stamp = now
else:
f = Following(isfollowed=uid,
follower=i,
created_at_stamp=now)
session.add(f)
total_followers = candidate.followers_count
fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count()
logger.info("Fetched: %s/%s followers" % (fetched_followers,
total_followers))
cursor = resp["next_cursor"]
if cursor > 0:
pending = True
logger.info("Getting more followers for %s" % uid)
else:
logger.info("Done getting followers for %s" % uid)
cursor = -1
pending = False
else:
logger.info("Error with id %s %s" % (uid, resp))
pending = False
entry.pending = pending
entry.cursor = cursor
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate)
session.commit()
sys.stdout.flush()
def get_tweet(c, tid): def get_tweet(c, tid):
return c.statuses.show(id=tid) return c.statuses.show(id=tid)
def search_tweet(c, query): def search_tweet(c, query):
yield from c.search.tweets(q=query)['statuses'] return c.search.tweets(q=query)
def user_timeline(c, query): def user_timeline(c, query):
try: try:
@ -475,329 +313,84 @@ def get_user(c, user):
except ValueError: except ValueError:
return c.users.lookup(screen_name=user)[0] return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, cache=True, folder="downloaded_tweets", update=False): def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
tweet = cached_id(tweetid, folder) cached = cached_tweet(tweetid, folder)
if update or not tweet:
tweet = get_tweet(wq, tweetid)
if cache and update:
if tweet:
js = json.dumps(tweet)
write_json(js, folder)
yield tweet
def download_user(wq, userid, cache=True, folder="downloaded_users", update=False):
user = cached_id(userid, folder)
if update or not user:
user = get_user(wq, userid)
if cache and update:
if user:
write_json(user, folder, aliases=[user['screen_name'], ])
yield user
def cached_id(oid, folder):
tweet = None tweet = None
file = os.path.join(folder, '%s.json' % oid) if update or not cached:
tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet, indent=2)
if write:
if tweet:
write_tweet_json(js, folder)
else:
print(js)
def cached_tweet(tweetid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % tweetid)
if os.path.exists(file) and os.path.isfile(file): if os.path.exists(file) and os.path.isfile(file):
try: try:
# print('%s: Object exists' % oid) # print('%s: Tweet exists' % tweetid)
with open(file) as f: with open(file) as f:
tweet = json.load(f) tweet = json.load(f)
except Exception as ex: except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(oid, ex)) logger.error('Error getting cached version of {}: {}'.format(tweetid, ex))
return tweet return tweet
def write_json(js, folder, oid=None, aliases=[]): def write_tweet_json(js, folder):
if not oid: tweetid = js['id']
oid = js['id'] file = tweet_file(tweetid, folder)
file = id_file(oid, folder)
if not os.path.exists(folder): if not os.path.exists(folder):
os.makedirs(folder) os.makedirs(folder)
with open(file, 'w') as f: with open(file, 'w') as f:
json.dump(js, f) json.dump(js, f, indent=2)
logger.info('Written {} to file {}'.format(oid, file)) logger.info('Written {} to file {}'.format(tweetid, file))
for alias in aliases:
os.symlink('%s.json' % oid, id_file(alias, folder))
def id_file(oid, folder): def tweet_file(tweetid, folder):
return os.path.join(folder, '%s.json' % oid) return os.path.join(folder, '%s.json' % tweetid)
def fail_file(oid, folder): def tweet_fail_file(tweetid, folder):
failsfolder = os.path.join(folder, 'failed') failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder): if not os.path.exists(failsfolder):
os.makedirs(failsfolder) os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % oid) return os.path.join(failsfolder, '%s.failed' % tweetid)
def id_failed(oid, folder): def tweet_failed(tweetid, folder):
return os.path.isfile(fail_file(oid, folder)) return os.path.isfile(tweet_fail_file(tweetid, folder))
def tweet_download_batch(wq, batch): def download_tweets(wq, tweetsfile, folder, update=False, retry_failed=False, ignore_fails=True):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id'] def filter_line(line):
for tid, tweet in tweets.items(): tweetid = int(line)
yield tid, tweet # print('Checking {}'.format(tweetid))
if (cached_tweet(tweetid, folder) and not update) or (tweet_failed(tweetid, folder) and not retry_failed):
def user_download_batch(wq, batch): yield None
screen_names = []
user_ids = []
for elem in batch:
try:
int(elem)
user_ids.append(str(elem))
except ValueError:
screen_names.append(elem.lower())
args = {}
if user_ids:
args['user_id'] = ','.join(user_ids)
if screen_names:
args['screen_name'] = ','.join(screen_names)
try:
users = wq.users.lookup(**args)
except TwitterHTTPError as ex:
if ex.e.code in (404,):
users = []
else: else:
raise yield line
found_ids = []
found_names = []
for user in users:
uid = user['id_str']
if uid in user_ids:
found_ids.append(uid)
yield (uid, user)
uname = user['screen_name'].lower()
if uname in screen_names:
found_names.append(uname)
yield (uname, user)
for uid in set(user_ids) - set(found_ids):
yield (uid, None)
for name in set(screen_names) - set(found_names):
yield (name, None)
def print_result(res):
def dump_result(oid, obj, folder, ignore_fails=True): tid, tweet = res
if obj: if tweet:
try: try:
write_json(obj, folder=folder, oid=oid) write_tweet_json(tweet, folder=folder)
failed = fail_file(oid, folder) yield 1
if os.path.exists(failed): except Exception as ex:
os.remove(failed) logger.error('%s: %s' % (tid, ex))
except Exception as ex: if not ignore_fails:
logger.error('%s: %s' % (oid, ex)) raise
if not ignore_fails:
raise
else:
logger.info('Object not recovered: {}'.format(oid))
with open(fail_file(oid, folder), 'w') as f:
print('Object not found', file=f)
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False, cache=True,
batch_method=tweet_download_batch):
done = Queue()
down = Queue()
def filter_list(lst, done, down):
print('filtering')
for oid in lst:
cached = cached_id(oid, folder)
if (cached and not update):
done.put((oid, cached))
elif (id_failed(oid, folder) and not retry_failed):
done.put((oid, None))
else:
down.put(oid)
down.put(None)
def download_results(batch_method, down, done):
def gen():
while True:
r = down.get()
if r is None:
down.close()
down.join_thread()
return
yield r
for t in parallel(batch_method, gen(), 100):
done.put(t)
def batch(*args, **kwargs):
return batch_method(wq, *args, **kwargs)
tc = threading.Thread(target=filter_list, args=(lst, done, down), daemon=True)
tc.start()
td = threading.Thread(target=download_results, args=(batch, down, done), daemon=True)
td.start()
def check_threads(ts, done):
for t in ts:
t.join()
done.put(None)
wait = threading.Thread(target=check_threads, args=([tc, td], done), daemon=True)
wait.start()
while True:
rec = done.get()
if rec is None:
done.close()
done.join_thread()
break
oid, obj = rec
if cache or (not obj):
dump_result(oid, obj, folder, ignore_fails)
yield rec
wait.join()
def download_tweets_file(*args, **kwargs):
kwargs['batch_method'] = tweet_download_batch
yield from download_file(*args, **kwargs)
def download_users_file(*args, **kwargs):
kwargs['batch_method'] = user_download_batch
yield from download_file(*args, **kwargs)
def download_file(wq, csvfile, folder, column=0, delimiter=',', skip=0, cache=True,
quotechar='"', commentchar=None, batch_method=tweet_download_batch,
**kwargs):
with open(csvfile) as f:
if commentchar:
f = (line for line in f if not line.startswith('#'))
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
for n in range(skip):
next(csvreader)
def reader(r):
for row in csvreader:
if len(row) > column:
yield row[column].strip()
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method, cache=cache,
**kwargs):
yield res
def download_timeline(wq, user):
return wq.statuses.user_timeline(id=user)
def _consume_feed(func, feed_control=None, **kwargs):
'''
Get all the tweets using pagination and a given method.
It can be controlled with the `count` parameter.
If max_count < 0 => Loop until the whole feed is consumed.
If max_count == 0 => Only call the API once, with the default values.
If max_count > 0 => Get max_count tweets from the feed.
'''
remaining = int(kwargs.pop('max_count', 0))
count = int(kwargs.get('count', -1))
limit = False
# We need to at least perform a query, so we simulate a do-while
# by running once with no limit and updating the condition at the end
with tqdm(total=remaining) as pbar:
while not limit:
if remaining > 0 and ((count < 0) or (count > remaining)):
kwargs['count'] = remaining
resp, stop = feed_control(func, kwargs, remaining=remaining, batch_size=count)
if not resp:
return
for entry in resp:
yield entry
pbar.update(len(resp))
limit = stop
if remaining < 0:
# If the loop was run with a negative remaining, it will only stop
# when the control function tells it to.
continue
# Otherwise, check if we have already downloaded all the required items
remaining -= len(resp)
limit = limit or remaining <= 0
def consume_tweets(*args, **kwargs):
return _consume_feed(*args, feed_control=_tweets_control, **kwargs)
def consume_users(*args, **kwargs):
return _consume_feed(*args, feed_control=_users_control, **kwargs)
def _tweets_control(func, apiargs, remaining=0, **kwargs):
''' Return a list of entries, the remaining '''
resp = func(**apiargs)
if not resp:
return None, True
# Update the arguments for the next call
# Two options: either resp is a list, or a dict like:
# {'statuses': ... 'search_metadata': ...}
if isinstance(resp, dict) and 'search_metadata' in resp:
resp = resp['statuses']
max_id = min(s['id'] for s in resp) - 1
apiargs['max_id'] = max_id
return resp, False
def _users_control(func, apiargs, remaining=0, **kwargs):
resp = func(**apiargs)
stop = True
# Update the arguments for the next call
if 'next_cursor' in resp:
cursor = resp['next_cursor']
apiargs['cursor'] = cursor
if int(cursor) != -1:
stop = False
return resp['users'], stop
def serialized(it, outfile, outformat='csv', fields=[], header=None, ignore_missing=False, delimiter='\t'):
outformat = outformat.lower()
def do(out):
if outformat == 'csv':
writer = csv.writer(out, quoting=csv.QUOTE_ALL, delimiter=delimiter)
if header != '':
h = header
if h is None:
h = delimiter.join(fields)
print(h, file=out)
attrs = list(token.strip().split('.') for token in fields)
for obj in it:
values = []
for attr in attrs:
try:
values.append(reduce(operator.getitem, attr, obj))
except KeyError:
if not ignore_missing:
print('Key not present: {}'.format(attr), file=sys.stderr)
values.append(None)
writer.writerow(values)
elif outformat == 'jsonlines':
for obj in it:
print(json.dumps(obj, sort_keys=True), file=out)
elif outformat == 'indented':
for obj in it:
print(json.dumps(obj, indent=4, sort_keys=True), file=out)
else: else:
for obj in it: logger.info('Tweet not recovered: {}'.format(tid))
print(obj, file=out) with open(tweet_fail_file(tid, folder), 'w') as f:
print('Tweet not found', file=f)
yield -1
if outfile is sys.stdout: def download_batch(batch):
return do(sys.stdout) tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
return tweets.items()
with open(outfile, 'w') as out: with open(tweetsfile) as f:
return do(out) lines = map(lambda x: x.strip(), f)
lines_to_crawl = filter(lambda x: x is not None, tqdm(parallel(filter_line, lines), desc='Total lines'))
tweets = parallel(download_batch, lines_to_crawl, 100)
for res in tqdm(parallel(print_result, tweets), desc='Queried'):
pass

View File

@ -1,12 +0,0 @@
version: '2'
services:
dev:
build:
context: .
dockerfile: Dockerfile-3.4
volumes:
- '.:/usr/src/app'
tty: yes
working_dir: '/usr/src/app'
entrypoint: '/bin/bash'
command: ''

View File

@ -2,4 +2,3 @@ sqlalchemy
twitter twitter
click click
tqdm tqdm
pyyaml

View File

@ -1,23 +1,29 @@
import pip
from setuptools import setup from setuptools import setup
from pip.req import parse_requirements
def parse_requirements(filename): # parse_requirements() returns generator of pip.req.InstallRequirement objects
""" load requirements from a pip requirements file """ # pip 6 introduces the *required* session argument
with open(filename, 'r') as f: try:
lineiter = list(line.strip() for line in f) install_reqs = parse_requirements("requirements.txt", session=pip.download.PipSession())
return [line for line in lineiter if line and not line.startswith("#")] py2_reqs = parse_requirements("requirements-py2.txt", session=pip.download.PipSession())
test_reqs = parse_requirements("test-requirements.txt", session=pip.download.PipSession())
install_reqs = parse_requirements("requirements.txt") except AttributeError:
py2_reqs = parse_requirements("requirements-py2.txt") install_reqs = parse_requirements("requirements.txt")
test_reqs = parse_requirements("test-requirements.txt") py2_reqs = parse_requirements("requirements-py2.txt")
test_reqs = parse_requirements("test-requirements.txt")
import sys import sys
import os
import itertools import itertools
if sys.version_info <= (3, 0): if sys.version_info <= (3, 0):
install_reqs = install_reqs + py2_reqs install_reqs = itertools.chain(install_reqs, py2_reqs)
with open(os.path.join('bitter', 'VERSION'), 'r') as f: # reqs is a list of requirement
__version__ = f.read().strip() # e.g. ['django==1.5.1', 'mezzanine==1.4.6']
install_reqs = [str(ir.req) for ir in install_reqs]
test_reqs = [str(ir.req) for ir in test_reqs]
from bitter import __version__
setup( setup(
name="bitter", name="bitter",

View File

@ -12,11 +12,7 @@ from bitter import config as c
class TestCrawlers(TestCase): class TestCrawlers(TestCase):
def setUp(self): def setUp(self):
CONF_PATH = os.path.join(os.path.dirname(__file__), '.bitter.yaml') self.wq = easy(os.path.join(os.path.dirname(__file__), 'credentials.json'))
if os.path.exists(CONF_PATH):
self.wq = easy(CONF_PATH)
else:
self.wq = easy()
def test_create_worker(self): def test_create_worker(self):
assert len(self.wq.queue)==1 assert len(self.wq.queue)==1

View File

@ -1,23 +0,0 @@
from unittest import TestCase
import os
import types
from bitter import utils
from bitter.models import *
from sqlalchemy import exists
class TestModels(TestCase):
def setUp(self):
self.session = make_session('sqlite://')
def test_user(self):
fake_user = User(name="Fake user", id=1548)
self.session.add(fake_user)
self.session.commit()
fake_committed = self.session.query(User).filter_by(name="Fake user").first()
assert fake_committed
self.session.delete(fake_committed)
self.session.commit()
assert not list(self.session.execute('SELECT 1 from users where id=\'%s\'' % 1548))

View File

@ -8,63 +8,56 @@ from bitter import config as c
class TestUtils(TestCase): class TestUtils(TestCase):
configfile = '/tmp/bitter.yaml'
def setUp(self): def setUp(self):
c.CONFIG_FILE = self.configfile self.credfile = '/tmp/credentials.txt'
if os.path.exists(self.configfile): c.CREDENTIALS = self.credfile
os.remove(self.configfile) if os.path.exists(self.credfile):
assert not os.path.exists(self.configfile) os.remove(self.credfile)
utils.create_config_file(self.configfile) utils.create_credentials(self.credfile)
assert os.path.exists(self.configfile)
def test_create_credentials(self):
assert os.path.exists(self.credfile)
os.remove(self.credfile)
utils.create_credentials() # From config
assert os.path.exists(self.credfile)
def test_add_credentials(self): def test_add_credentials(self):
utils.add_credentials(self.configfile, user="test") utils.add_credentials(self.credfile, user="test")
assert utils.get_credentials(self.configfile) assert utils.get_credentials(self.credfile)
assert utils.get_credentials(self.configfile, user="test") assert utils.get_credentials(user="test")
assert list(utils.get_credentials(self.configfile, user="test"))[0]["user"] == "test" assert list(utils.get_credentials(user="test"))[0]["user"] == "test"
def test_get_credentials(self): def test_get_credentials(self):
utils.add_credentials(self.configfile, user="test") utils.add_credentials(self.credfile, user="test")
assert utils.get_credentials(self.configfile, user="test") assert utils.get_credentials(user="test")
assert not utils.get_credentials(self.configfile, user="test", inverse=True) assert not utils.get_credentials(user="test", inverse=True)
def test_add_two_credentials(self): def test_add_two_credentials(self):
utils.add_credentials(self.configfile, user="test") utils.add_credentials(self.credfile, user="test")
utils.add_credentials(self.configfile, user="test2") utils.add_credentials(self.credfile, user="test2")
assert utils.get_credentials(self.configfile, user="test") assert utils.get_credentials(user="test")
assert utils.get_credentials(self.configfile, user="test2") assert utils.get_credentials(user="test2")
def test_delete_credentials(self): def test_delete_credentials(self):
utils.add_credentials(self.configfile, user="test") utils.add_credentials(self.credfile, user="test")
assert utils.get_credentials(self.configfile, user="test") assert utils.get_credentials(user="test")
utils.delete_credentials(self.configfile, user="test") utils.delete_credentials(user="test")
assert not utils.get_credentials(self.configfile, user="test") print(utils.get_credentials())
assert not utils.get_credentials(user="test")
def test_parallel(self): def test_parallel(self):
import time import time
def echo(i): def echo(i):
time.sleep(0.5) time.sleep(2)
return i return i
tic = time.time() tic = time.time()
resp = utils.parallel(echo, [1, 2, 3]) resp = utils.parallel(echo, [1,2,3])
assert isinstance(resp, types.GeneratorType) assert isinstance(resp, types.GeneratorType)
assert sorted(list(resp)) == [1, 2, 3] assert list(resp) == [1,2,3]
toc = time.time() toc = time.time()
assert (tic-toc) < 600 assert (tic-toc) < 6000
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2) resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert sorted(list(resp2)) == [1, 2, 3, 4] assert list(resp2) == [1,2,3,4]
class TestUtilsEnv(TestUtils):
configfile = None
def setUp(self):
if 'BITTER_CONFIG' in os.environ:
self.oldenv = os.environ['BITTER_CONFIG']
os.environ['BITTER_CONFIG'] = ''
def tearDown(self):
if hasattr(self, 'oldenv'):
os.environ['BITTER_CONFIG'] = self.oldenv