1
0
mirror of https://github.com/balkian/bitter.git synced 2025-10-25 20:58:24 +00:00

1 Commits
0.9.5 ... 0.7.0

Author SHA1 Message Date
J. Fernando Sánchez
4b2f107b8a Py2 compatibility and queue handling
* Removed install_aliases(), which caused problems with urllib2
* Better waiting time calculation (used in queue handling)
2016-11-23 12:27:53 +01:00
36 changed files with 337 additions and 1419 deletions

2
.gitignore vendored
View File

@@ -1,8 +1,6 @@
__pycache__
*.egg-info
dist
env
.env
__*
.*
*.pyc

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:2.7-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install -e "/usr/src/app/[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:3.4-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install ".[server]"
ENTRYPOINT ["bitter"]

View File

@@ -1,7 +0,0 @@
# onbuild copies . to /usr/src/app/
From python:{{PYVERSION}}-onbuild
Maintainer J. Fernando Sánchez @balkian
RUN pip install ".[server]"
ENTRYPOINT ["bitter"]

View File

@@ -5,6 +5,4 @@ include README.md
include bitter/VERSION
graft bitter/templates
graft bitter/static
include tests/test*
global-exclude *.pyc
global-exclude __pycache__
graft test

View File

@@ -1,76 +0,0 @@
PYVERSIONS=3.4 2.7
PYMAIN=$(firstword $(PYVERSIONS))
NAME=bitter
REPO=balkian
VERSION=$(shell cat $(NAME)/VERSION)
TARNAME=$(NAME)-$(VERSION).tar.gz
IMAGENAME=$(REPO)/$(NAME)
IMAGEWTAG=$(IMAGENAME):$(VERSION)
all: build run
dockerfiles: $(addprefix Dockerfile-,$(PYVERSIONS))
Dockerfile-%: Dockerfile.template
sed "s/{{PYVERSION}}/$*/" Dockerfile.template > Dockerfile-$*
dev-%:
@docker start $(NAME)-dev$* || (\
$(MAKE) build-$*; \
docker run -d -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD:/usr/src/app --entrypoint=/bin/bash -ti --name $(NAME)-dev$* '$(IMAGEWTAG)-python$*'; \
)\
docker exec -ti $(NAME)-dev$* bash
dev: dev-$(PYMAIN)
build: $(addprefix build-, $(PYMAIN))
buildall: $(addprefix build-, $(PYVERSIONS))
build-%: Dockerfile-%
docker build -t '$(REPO)/$(NAME):$(VERSION)-python$*' -f Dockerfile-$* .;
test: $(addprefix test-,$(PYMAIN))
testall: $(addprefix test-,$(PYVERSIONS))
test-%: build-%
docker run --rm -w /usr/src/app/ --env-file $$PWD/.env -v $$PWD/tests/credentials.json:/usr/src/app/tests/credentials.json --entrypoint=/usr/local/bin/python -ti '$(REPO)/$(NAME):$(VERSION)-python$*' setup.py test --addopts "-vvv -s --pdb" ;
pip_test-%:
docker run --rm -v $$PWD/dist:/dist/ -ti python:$* pip install /dist/$(NAME)-$(VERSION).tar.gz ;
dist/$(NAME)-$(VERSION).tar.gz:
docker run --rm -ti -v $$PWD:/usr/src/app/ -w /usr/src/app/ python:$(PYMAIN) python setup.py sdist;
sdist: dist/$(NAME)-$(VERSION).tar.gz
upload-%: test-%
docker push '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
upload: testall $(addprefix upload-,$(PYVERSIONS))
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME):$(VERSION)'
docker tag '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)' '$(REPO)/$(NAME)'
clean:
@docker ps -a | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1;}}' | xargs docker rm 2>/dev/null|| true
@docker images | awk '/$(REPO)\/$(NAME)/{ split($$2, vers, "-"); if(vers[1] != "${VERSION}"){ print $$1":"$$2;}}' | xargs docker rmi 2>/dev/null|| true
upload_git:
git commit -a
git tag ${VERSION}
git push --tags origin master
pip_upload:
python setup.py sdist upload ;
pip_test: $(addprefix pip_test-,$(PYVERSIONS))
run: build
docker run --rm --env-file $$PWD/.env -p 5000:5000 -ti '$(REPO)/$(NAME):$(VERSION)-python$(PYMAIN)'
.PHONY: test test-% build-% build test test_pip run

150
README.md
View File

@@ -1,5 +1,4 @@
#Description
There are two parts to bitter.
First of all, it is a wrapper over Python twitter that adds support for several Twitter API credentials (e.g. authorizing the same app with different user accounts).
Secondly, it is a command line tool to automate several actions (e.g. downloading user networks) using the wrapper.
@@ -18,158 +17,21 @@ or
Programmatically:
```python
from bitter import easy
wq = easy()
from bitter.crawlers import TwitterQueue
wq = TwitterQueue.from_credentials()
print(wq.users.show(user_name='balkian'))
```
You can also make custom calls to the API through the command line.
e.g. to get the latest 500 tweets by the python software foundation:
# Credentials format
```
bitter api statuses/user_timeline --id thepsf --count 500
{"user": "balkian", "consumer_secret": "xxx", "consumer_key": "xxx", "token_key": "xxx", "token_secret": "xxx"}
```
## Adding credentials
By default, bitter uses '~/.bitter-credentials.json', but you may choose a different file:
```
bitter --config <YOUR CONFIGURATION FILE> credentials add
```
You can specify the parameters in the command or let the command line guide you through the process.
# Examples
## Downloading a list of tweets
Bitter can download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.
You can even specify the column number for tweet ids.
Bitter will not try to download
```
Usage: bitter tweet get_all [OPTIONS] TWEETSFILE
Download tweets from a list of tweets in a CSV file. The result is stored
as individual json files in your folder of choice.
Options:
-f, --folder TEXT
-d, --delimiter TEXT
-h, --header Discard the first line (use it as a header)
-q, --quotechar TEXT
-c, --column INTEGER
--help Show this message and exit.
```
For instance, this will download `tweet_ids.csv` in the `tweet_info` folder:
```
bitter tweet get_all -f tweet_info tweet_ids.csv
```
## Downloading a list of users
Bitter downloads users and tweets in a similar way:
```
Usage: bitter users get_all [OPTIONS] USERSFILE
Download users from a list of user ids/screen names in a CSV file. The
result is stored as individual json files in your folder of choice.
Options:
-f, --folder TEXT
-d, --delimiter TEXT
-h, --header Discard the first line (use it as a header)
-q, --quotechar TEXT
-c, --column INTEGER
--help Show this message and exit.
```
The only difference is that users can be downloaded via `screen_name` or `user_id`.
This method does not try to resolve screen names to user ids, so users may be downloaded more than once if they appear in both ways.
## Downloading a stream
```
Usage: bitter stream get [OPTIONS]
Options:
-l, --locations TEXT
-t, --track TEXT
-f, --file TEXT File to store the stream of tweets. Default: standard output
-p, --politelyretry Politely retry after a hangup/connection error
--help Show this message and exit.
```
```
bitter --config .bitter.yaml stream get
```
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
## REST queries
In newer versions of bitter, individual methods to download tweets/users using the REST API are being replaced with a generic method to call the API.
```
bitter api <URL endpoint> --parameter VALUE ... | [--tweets | --users] [--max_count MAX_COUNT] [--count COUNT_PER_CALL]
```
For instance:
```
# Get 100 tweets that mentioned Obama after tweet 942689870501302300
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama
```
That is equivalent to this call to the api: `api/1.1/searc/tweets?since_id=942689870501302300&count=100&q=Obama`.
The flags `--tweets` and `--users` are optional.
If you use them, bitter will try to intelligently fetch all the tweets/users by using pagination with the API.
For example:
```
# Download 1000 tweets, 100 tweets per call.
bitter api '/search/tweets' --since_id 942689870501302300 --count 100 --q Obama --max_count=1000 --tweets
```
```
# Download all the followers of @balkian
bitter api 'followers/list' --_id balkian --users --max_count -1
```
Note that some reserved words (such as `id`) have to be preceeded by an underscore.
This limitation is imposed by the python-twitter library.
# Configuration format
```
credentials:
- user: "balkian"
consumer_secret: "xxx"
consumer_key: "xxx"
token_key: "xxx"
token_secret: "xxx"
- user: ....
```
By default, bitter uses '~/.bitter.yaml', but you may choose a different file:
```
python -m bitter --config <config_file> ...
```
Or use an environment variable:
```
export BITTER_CONFIG=$(cat myconfig.yaml)
python -m bitter -c <credentials_file> ...
```
# Server

View File

@@ -1,10 +0,0 @@
Scripts to process jsonlines
To get the jsonlines file, you can use the streaming API or the search api, like so:
```
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines
```
To keep track of the query that generated the file, you can save the command in a text file.
For instance, the example above is also in `example_query.sh`.

View File

@@ -1 +0,0 @@
python -m bitter.cli --config .bitter.yaml api '/search/tweets' --result_type recent --q 'bitter OR #bitter OR @bitter' --tweet_mode extended --tweets --max_count 5000 >> mytweets.jsonlines

View File

@@ -1,13 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export FIELDS="created_at,id,text"
for i in "$@"
do
OUTPUT=$i.hashtags.csv
echo "$FIELDS" > $OUTPUT
pv -l $i -N "hashtags $i" | jq -r '. | .created_at as $created_at | .id_str as $id | .entities.hashtags | select(. != null) | .[] | [$created_at, $id, .text] | @csv' >> $OUTPUT
done

View File

@@ -1,15 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
for i in "$@"
do
REPLYOUTPUT=$i.replies.csv
RTOUTPUT=$i.rts.csv
echo 'created_at,id,user_id,reply_user_id' > $REPLYOUTPUT
echo 'created_at,id,user_id,rt_user_id' > $RTOUTPUT
pv -l -N "$i" $i | jq -r '. | select(.in_reply_to_user_id_str != null) | [.created_at, .id_str, .user.id_str, .in_reply_to_user_id_str] | @csv' >> $REPLYOUTPUT
pv -l -N "$i" $i | jq -r '. | select(.retweeted_status != null) | [.created_at, .retweeted_status.id_str, .user.id_str, .retweeted_status.user.id_str] | @csv' >> $RTOUTPUT
done

View File

@@ -1,16 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export QUERY='.limit | select(. != null) | [.timestamp_ms, .track] | @csv'
export FIELDS="timestamp,track"
for i in "$@"
do
OUTPUT=$i.limits.csv
echo $FIELDS > $OUTPUT
pv -N "$i limits" -l $i | jq -r "$QUERY" >> $OUTPUT
done

View File

@@ -1,16 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export QUERY='select(.id != null) | .id_str as $id | .entities.urls[] | select(.expanded_url | select(. != null) | contains("open.spotify") or contains("youtube.com") or contains("youtu.be")) | [$id, .expanded_url] | @csv'
export FIELDS="id,url"
for i in "$@"
do
OUTPUT=$i.media.csv
echo $FIELDS > $OUTPUT
pv -N "$i media" -l $i | jq -r "$QUERY" >> $OUTPUT
done

View File

@@ -1,28 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
export USER_FIELDS="\$created_at,\
.id_str,\
.screen_name,\
.followers_count,\
.lang,\
.description,\
.statuses_count,\
.favourites_count,\
.friends_count,\
.created_at,\
.name,\
.location,\
.listed_count,\
.time_zone\
"
for i in "$@"
do
OUTPUT=$i.users.csv
echo \#$USER_FIELDS > $OUTPUT
jq -r ".created_at as \$created_at | .user,.retweeted_status.user | select(. != null) | [$USER_FIELDS] | @csv " $i | pv -N "$i" -l >> $OUTPUT
done

View File

@@ -1,32 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
FIELDS=".id_str,\
.user.screen_name,\
.user.id,\
.favorite_count,\
.retweet_count,\
.quote_count,\
.reply_count,\
.created_at,\
.lang,\
.in_reply_to_user_id_str,\
.in_reply_to_status_id_str,\
.retweeted_status.id_str,\
.retweeted_status.user.id,\
.retweeted_status.favorite_count,\
.retweeted_status.retweet_count,\
.retweeted_status.quote_count,\
.retweeted_status.reply_count,\
.retweeted_status.created_at\
"
for i in "$@"
do
OUTPUT=$i.tweets.csv
echo "$FIELDS" | sed -e 's/,[ \t\n]*\./,/g' | sed -e 's/^[#]\?\.//' > $OUTPUT
jq -r "[$FIELDS]|@csv" $i | pv -N "$i" -l >> $OUTPUT
done

View File

@@ -1,17 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
QUERY='.| select(.retweeted_status != null) | .retweeted_status | .id_str as $rt_id | .extended_tweet | select(. != null) | [$rt_id,.full_text]|@csv'
HEADER='rt_id,full_text'
for i in "$@"
do
OUTPUT=$i.full_text.csv
echo $HEADER > $OUTPUT
jq "$QUERY" $i | pv -N "$i" -l >> $OUTPUT
sort -u $OUTPUT -o $OUTPUT
sed -ri 's/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g' $OUTPUT
done

View File

@@ -1,16 +0,0 @@
if [ "$#" -lt 1 ]
then
echo "Usage: $0 <files to convert>"
exit 1
fi
QUERY='(.full_text // .retweeted_status.full_text) as $text | [ .id_str,$text ] | @csv'
HEADER='id,text'
for i in "$@"
do
OUTPUT=$i.text.csv
echo $HEADER > $OUTPUT
pv -l -N "$i" $i | jq -r "$QUERY" >> $OUTPUT
# sed -ri s/^"([0-9]+),\\"(.*)\\""$/"\1","\2"/g $OUTPUT
done

View File

@@ -1,10 +0,0 @@
if [ "$#" -lt 2 ]
then
echo "Find edge lines in a file that contain one of the users in a user list."
echo ""
echo "Usage: $0 <file with edges> <file with the list of users>"
exit 1
fi
pv -c -N 'read' "$1" | grep -F -w -f "$2" | pv -lc -N 'found'

View File

@@ -1,23 +0,0 @@
import pandas as pd
def read_rts(rtsfile, tweetsfile):
tweets = pd.read_csv(tweetsfile, index_col=0)
rts = pd.read_csv(rtsfile, index_col=1)
merged = rts.groupby(by=['id', 'rt_user_id']).size().rename('count').reset_index(level=1).merge(tweets, left_index=True, right_index=True)
return merged.sort_values(by='count', ascending=False)
def read_tweets(tweetsfile):
'''When the dataset is small enough, we can load tweets as-in'''
with open(tweetsfile) as f:
header = f.readline().strip().split(',')
dtypes = {}
for key in header:
if key.endswith('_str') or key.endswith('.id'):
dtypes[key] = object
tweets = pd.read_csv(tweetsfile, dtype=dtypes, index_col=0)
return tweets
if __name__ == '__main__':
import argparse

View File

@@ -1 +0,0 @@
cat "$@" | awk -F"," '{print tolower($3)}' | sort | uniq -c | sort -h

View File

@@ -1,14 +0,0 @@
MAX_TAGS=100
function get_text {
while read line
do
echo $line
rtid=$(echo $line | awk -F"," '{print $2}')
text=$(grep -m 1 $rtid *.text.csv)
echo "$line - $text"
done < "/dev/stdin"
}
cat "$@" | get_text

View File

@@ -1,15 +0,0 @@
MAX_TAGS=100
function get_text {
while read line
do
echo $line
rtid=$(echo $line | awk '{print $2}')
count=$(echo $line | awk '{print $1}')
text=$(grep -m 1 $rtid *.text.csv)
echo "$line - $text"
done < "/dev/stdin"
}
cat "$@" | awk -F"," '{print tolower($2)}' | sort | uniq -c | sort -h | tail -n $MAX_TAGS | get_text

View File

@@ -1 +1 @@
0.9.5
0.7.0

View File

@@ -7,10 +7,4 @@ import os
from .version import __version__
def easy(*args, **kwargs):
from .crawlers import TwitterQueue
return TwitterQueue.from_credentials(*args, **kwargs)
__all__ = ['cli', 'config', 'crawlers', 'models', 'utils' ]

View File

@@ -29,91 +29,16 @@ logger = logging.getLogger(__name__)
@click.group()
@click.option("--verbose", is_flag=True)
@click.option("--logging_level", required=False, default='WARN')
@click.option('--config', show_default=True, default=bconf.CONFIG_FILE)
@click.option('--credentials', show_default=True, help="DEPRECATED: If specified, these credentials will be copied to the configuratation file.", default=bconf.CREDENTIALS)
@click.option("--config", required=False)
@click.option('-c', '--credentials', show_default=True, default='~/.bitter-credentials.json')
@click.pass_context
def main(ctx, verbose, logging_level, config, credentials):
logging.basicConfig(level=getattr(logging, logging_level))
ctx.obj = {}
ctx.obj['VERBOSE'] = verbose
bconf.CONFIG_FILE = config
ctx.obj['CONFIG'] = config
bconf.CREDENTIALS = credentials
if os.path.exists(utils.get_config_path(credentials)):
utils.copy_credentials_to_config(credentials, config)
@main.group(invoke_without_command=True)
@click.pass_context
def credentials(ctx):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for worker in wq.queue:
print('#'*20)
try:
resp = worker.client.application.rate_limit_status()
print(worker.name)
except Exception as ex:
print('{}: AUTHENTICATION ERROR: {}'.format(worker.name, ex) )
@credentials.command('limits')
@click.option('--all', type=bool, default=False, required=False,
help=('Print all limits. By default, it only limits that '
'have been consumed will be shown.'))
@click.argument('url', required=False)
@click.pass_context
def get_limits(ctx, all, url):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
total = {}
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
print('#'*20)
print(worker.name)
if url:
limit = 'NOT FOUND'
print('URL is: {}'.format(url))
cat = url.split('/')[1]
if cat in resp['resources']:
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
else:
print('Cat {} not found'.format(cat))
continue
for k in limit:
total[k] = total.get(k, 0) + limit[k]
print('{}: {}'.format(url, limit))
continue
nres = {}
if not all:
for res, urls in resp['resources'].items():
nurls = {}
for u, limits in urls.items():
if limits['limit'] != limits['remaining']:
nurls[u] = limits
if nurls:
nres[res] = nurls
resp = nres
print(json.dumps(resp, indent=2))
if url:
print('Total for {}: {}'.format(url, total))
@credentials.command('add')
@click.option('--consumer_key', default=None)
@click.option('--consumer_secret', default=None)
@click.option('--token_key', default=None)
@click.option('--token_secret', default=None)
@click.argument('user_name')
def add(user_name, consumer_key, consumer_secret, token_key, token_secret):
if not consumer_key:
consumer_key = click.prompt('Please, enter your YOUR CONSUMER KEY')
if not consumer_secret:
consumer_secret = click.prompt('Please, enter your CONSUMER SECRET')
if not token_key:
token_key = click.prompt('Please, enter your ACCESS TOKEN')
if not token_secret:
token_secret = click.prompt('Please, enter your ACCESS TOKEN SECRET')
utils.add_credentials(conffile=bconf.CONFIG_FILE, user=user_name, consumer_key=consumer_key, consumer_secret=consumer_secret,
token_key=token_key, token_secret=token_secret)
click.echo('Credentials added for {}'.format(user_name))
utils.create_credentials(credentials)
@main.group()
@click.pass_context
@@ -126,43 +51,41 @@ def tweet(ctx):
@click.option('-u', '--update', help="Update the file even if the tweet exists", is_flag=True, default=False)
@click.argument('tweetid')
def get_tweet(tweetid, write, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
utils.download_tweet(wq, tweetid, write, folder, update)
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
if not write:
t = utils.get_tweet(wq, tweetid)
js = json.dumps(t, indent=2)
print(js)
return
if not os.path.exists(folder):
os.makedirs(folder)
file = os.path.join(folder, '%s.json' % tweetid)
if not update and os.path.exists(file) and os.path.isfile(file):
print('%s: Tweet exists' % tweetid)
return
try:
t = utils.get_tweet(wq, tweetid)
with open(file, 'w') as f:
js = json.dumps(t, indent=2)
print(js, file=f)
except Exception as ex:
print('%s: %s' % (tweetid, ex), file=sys.stderr)
@tweet.command('get_all', help='''Download tweets from a list of tweets in a CSV file.
The result is stored as individual json files in your folder of choice.''')
@tweet.command('get_all')
@click.argument('tweetsfile', 'File with a list of tweets to look up')
@click.option('-f', '--folder', default="tweets")
@click.option('-u', '--update', is_flag=True, default=False, help='Download tweet even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",")
@click.option('-h', '--header', help='Discard the first line (use it as a header)',
is_flag=True, default=False)
@click.option('-q', '--quotechar', default='"')
@click.option('-c', '--column', type=int, default=0)
@click.pass_context
def get_tweets(ctx, tweetsfile, folder, update, retry, delimiter, header, quotechar, column):
if update and not click.confirm('This may overwrite existing tweets. Continue?'):
click.echo('Cancelling')
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
status = tqdm('Queried')
failed = 0
for tid, obj in utils.download_file(wq, tweetsfile, folder, delimiter=delimiter,
batch_method=utils.tweet_download_batch,
header=header, quotechar=quotechar,
column=column, update=update, retry_failed=retry):
status.update(1)
if not obj:
failed += 1
status.set_description('Failed: %s. Queried' % failed, refresh=True)
def get_tweets(ctx, tweetsfile, folder):
with open(tweetsfile) as f:
for line in f:
tid = line.strip()
ctx.invoke(get_tweet, folder=folder, tweetid=tid, write=True)
@tweet.command('search')
@click.argument('query')
@click.pass_context
def search(ctx, query):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.search_tweet(wq, query)
print(json.dumps(t, indent=2))
@@ -170,7 +93,7 @@ def search(ctx, query):
@click.argument('user')
@click.pass_context
def timeline(ctx, user):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
t = utils.user_timeline(wq, user)
print(json.dumps(t, indent=2))
@@ -196,7 +119,7 @@ def list_users(ctx, db):
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', help="Update the file even if the user exists", is_flag=True, default=False)
def get_user(user, write, folder, update):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
if not write:
u = utils.get_user(wq, user)
js = json.dumps(u, indent=2)
@@ -213,29 +136,15 @@ def get_user(user, write, folder, update):
js = json.dumps(u, indent=2)
print(js, file=f)
@users.command('get_all', help='''Download users from a list of user ids/screen names in a CSV file.
The result is stored as individual json files in your folder of choice.''')
@users.command('get_all')
@click.argument('usersfile', 'File with a list of users to look up')
@click.option('-f', '--folder', default="users")
@click.option('-u', '--update', is_flag=True, default=False, help='Download user even if it is already present. WARNING: it will overwrite existing files!')
@click.option('-r', '--retry', is_flag=True, default=False, help='Retry failed downloads')
@click.option('-d', '--delimiter', default=",")
@click.option('-h', '--header', help='Discard the first line (use it as a header)',
is_flag=True, default=False)
@click.option('-q', '--quotechar', default='"')
@click.option('-c', '--column', type=int, default=0)
@click.pass_context
def get_users(ctx, usersfile, folder, update, retry, delimiter, header, quotechar, column):
if update and not click.confirm('This may overwrite existing users. Continue?'):
click.echo('Cancelling')
return
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
for i in utils.download_file(wq, usersfile, folder, delimiter=delimiter,
batch_method=utils.user_download_batch,
update=update, retry_failed=retry,
header=header, quotechar=quotechar,
column=column):
pass
def get_users(ctx, usersfile, folder):
with open(usersfile) as f:
for line in f:
uid = line.strip()
ctx.invoke(get_user, folder=folder, user=uid, write=True)
@users.command('crawl')
@click.option('--db', required=True, help='Database to save all users.')
@@ -256,7 +165,7 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
return ExitStack()
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
logger.info('Starting Network crawler with {} threads and {} credentials.'.format(threads,
len(wq.queue)))
@@ -350,6 +259,11 @@ def crawl_users(ctx, usersfile, skip, until, threads, db):
logger.info('Done!')
@main.group('api')
def api():
pass
@main.group('extractor')
@click.pass_context
@click.option('--db', required=True, help='Database of users.')
@@ -420,7 +334,7 @@ def users_extractor(ctx):
@click.pass_context
def extract(ctx, recursive, user, name, initfile):
print(locals())
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
dburi = ctx.obj['DBURI']
utils.extract(wq,
recursive=recursive,
@@ -432,42 +346,31 @@ def extract(ctx, recursive, user, name, initfile):
@extractor.command('reset')
@click.pass_context
def reset_extractor(ctx):
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
db = ctx.obj['DBURI']
session = make_session(db)
session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).update({'pending':False})
@main.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=False),
help='''Issue a call to an endpoint of the Twitter API.''')
@click.argument('cmd', nargs=1)
@click.option('--tweets', is_flag=True, help='Fetch more tweets using smart pagination. Use --count to control how many tweets to fetch per call, and --max_count to set the number of desired tweets (or -1 to get as many as possible).', type=bool, default=False)
@click.option('--users', is_flag=True, help='Fetch more users using smart pagination. Use --count to control how many users to fetch per call, and --max_count to set the number of desired users (or -1 to get as many as possible).', type=bool, default=False)
@click.argument('api_args', nargs=-1, type=click.UNPROCESSED)
@api.command('limits')
@click.argument('url', required=False)
@click.pass_context
def api(ctx, cmd, tweets, users, api_args):
opts = {}
mappings = {
'id': '_id'
}
i = iter(api_args)
for k, v in zip(i, i):
k = k.replace('--', '')
if k in mappings:
k = mappings[k]
opts[k] = v
wq = crawlers.TwitterQueue.from_config(conffile=bconf.CONFIG_FILE)
if tweets:
resp = utils.consume_tweets(wq[cmd], **opts)
elif users:
resp = utils.consume_users(wq[cmd], **opts)
def get_limits(ctx, url):
wq = crawlers.TwitterQueue.from_credentials(bconf.CREDENTIALS)
for worker in wq.queue:
resp = worker.client.application.rate_limit_status()
print('#'*20)
print(worker.name)
if url:
limit = 'NOT FOUND'
print('URL is: {}'.format(url))
cat = url.split('/')[1]
if cat in resp['resources']:
limit = resp['resources'][cat].get(url, None) or resp['resources'][cat]
else:
resp = wq[cmd](**opts)
print(json.dumps(resp))
return
for i in resp:
print(json.dumps(i))
print('Cat {} not found'.format(cat))
print('{}: {}'.format(url, limit))
else:
print(json.dumps(resp, indent=2))
@main.command('server')
@click.argument('CONSUMER_KEY', required=True)
@@ -487,11 +390,11 @@ def stream(ctx):
@stream.command('get')
@click.option('-l', '--locations', default=None)
@click.option('-t', '--track', default=None)
@click.option('-f', '--file', default=None, help='File to store the stream of tweets')
@click.option('-f', '--file', help='File to store the stream of tweets')
@click.option('-p', '--politelyretry', help='Politely retry after a hangup/connection error', is_flag=True, default=True)
@click.pass_context
def get_stream(ctx, locations, track, file, politelyretry):
wq = crawlers.StreamQueue.from_config(conffile=bconf.CONFIG_FILE, max_workers=1)
wq = crawlers.StreamQueue.from_credentials(bconf.CREDENTIALS, 1)
query_args = {}
if locations:
@@ -510,14 +413,10 @@ def get_stream(ctx, locations, track, file, politelyretry):
iterator = wq.statuses.sample()
else:
iterator = wq.statuses.filter(**query_args)#"-4.25,40.16,-3.40,40.75")
try:
for i in iterator:
yield i
if not politelyretry:
return
except Exception:
if not politelyretry:
raise ex
thishangup = time.time()
if thishangup - lasthangup < 60:
raise Exception('Too many hangups in a row.')
@@ -535,7 +434,7 @@ def get_stream(ctx, locations, track, file, politelyretry):
def read_stream(ctx, file, tail):
for tweet in utils.read_file(file, tail=tail):
try:
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['created_at'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
print(u'{timestamp_ms}- @{screen_name}: {text}'.format(timestamp_ms=tweet['timestamp_ms'], screen_name=tweet['user']['screen_name'], text=tweet['text']))
except (KeyError, TypeError):
print('Raw tweet: {}'.format(tweet))

View File

@@ -11,4 +11,3 @@ E.g.:
app.run()
'''
CREDENTIALS = '~/.bitter-credentials.json'
CONFIG_FILE = '~/.bitter.yaml'

View File

@@ -10,7 +10,6 @@ from twitter import *
from collections import OrderedDict
from threading import Lock
from itertools import islice
from functools import partial
try:
import itertools.ifilter as filter
except ImportError:
@@ -39,9 +38,6 @@ class AttrToFunc(object):
else:
return extend_call(k)
def __getitem__(self, k):
return partial(self.handler, self.__uriparts+k.split('/'))
def __call__(self, *args, **kwargs):
# for i, a in enumerate(args)e
# kwargs[i] = a
@@ -58,18 +54,6 @@ class FromCredentialsMixin(object):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class FromConfigMixin(object):
@classmethod
def from_config(cls, config=None, conffile=None, max_workers=None):
wq = cls()
if not config:
with utils.config(conffile) as c:
config = c
for cred in islice(config['credentials'], max_workers):
wq.ready(cls.worker_class(cred["user"], cred))
return wq
class TwitterWorker(object):
api_class = None
@@ -91,12 +75,6 @@ class TwitterWorker(object):
self._client = self.api_class(auth=auth)
return self._client
def __repr__(self):
msg = '<{} for {}>'.format(self.__class__.__name__, self.name)
if self.busy:
msg += ' [busy]'
return msg
class RestWorker(TwitterWorker):
api_class = Twitter
@@ -115,14 +93,13 @@ class RestWorker(TwitterWorker):
def get_wait(self, uriparts):
limits = self.get_limit(uriparts)
if limits.get('remaining', 1) > 0:
if limits['remaining'] > 0:
return 0
reset = limits.get('reset', 0)
now = time.time()
return max(0, (reset-now))
def get_limit(self, uriparts):
uriparts = list(u for u in uriparts if u)
uri = '/'+'/'.join(uriparts)
for (ix, i) in self.limits.get('resources', {}).get(uriparts[0], {}).items():
if ix.startswith(uri):
@@ -155,7 +132,7 @@ class RestWorker(TwitterWorker):
class QueueException(BaseException):
pass
class QueueMixin(AttrToFunc, FromCredentialsMixin, FromConfigMixin):
class QueueMixin(AttrToFunc, FromCredentialsMixin):
def __init__(self, wait=True):
logger.debug('Creating worker queue')
self.queue = set()

View File

@@ -3,13 +3,10 @@ import json
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import BigInteger, Integer, Text, Boolean
from sqlalchemy.schema import ForeignKey
from sqlalchemy.pool import SingletonThreadPool
from sqlalchemy import Column, Index
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from functools import wraps
Base = declarative_base()
@@ -88,48 +85,34 @@ class ExtractorEntry(Base):
user = Column(BigInteger, index=True)
cursor = Column(BigInteger, default=-1)
pending = Column(Boolean, default=False)
errors = Column(Text, default="")
busy = Column(Boolean, default=False)
class Search(Base):
__tablename__ = 'search_queries'
id = Column(Integer, primary_key=True, index=True, unique=True)
endpoint = Column(Text, comment="Endpoint URL")
attrs = Column(Text, comment="Text version of the dictionary of parameters")
count = Column(Integer)
current_count = Column(Integer)
current_id = Column(BigInteger, comment='Oldest ID retrieved (should match max_id when done)')
since_id = Column(BigInteger)
class SearchResults(Base):
__tablename__ = 'search_results'
id = Column(Integer, primary_key=True, index=True, unique=True)
search_id = Column(ForeignKey('search_queries.id'))
resource_id = Column(Text)
def memoize(f):
memo = {}
@wraps(f)
def helper(self, **kwargs):
st = dict_to_str(kwargs)
key = (self.__uriparts, st)
if key not in memo:
memo[key] = f(self, **kwargs)
return memo[key]
return helper
def make_session(url):
if not isinstance(url, str):
print(url)
raise Exception("FUCK")
engine = create_engine(url, poolclass=SingletonThreadPool)#, echo=True)
engine = create_engine(url)#, echo=True)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
return session
def dict_to_str(args):
return json.dumps(args, sort_keys=True)
def test(db='sqlite:///users.db'):
from sqlalchemy import exists
session = make_session(db)
our_user = session.query(User).first()
print(our_user.name)
print(session.query(User).count())
fake_user = User(name="Fake user")
session.add(fake_user)
session.commit()
print(session.query(User).count())
print(session.query(exists().where(User.name == "Fake user")).scalar())
fake_committed = session.query(User).filter_by(name="Fake user").first()
print(fake_committed.id)
print(fake_committed.name)
session.delete(fake_committed)
session.commit()
print(session.query(User).count())
print(list(session.execute('SELECT 1 from users where id=\'%s\'' % 1548)))

View File

@@ -1,11 +1,6 @@
from __future__ import print_function
import logging
import time
import json
import yaml
import csv
import io
import signal
import sys
@@ -13,23 +8,11 @@ import sqlalchemy
import os
import multiprocessing
from multiprocessing.pool import ThreadPool
from multiprocessing import Queue
import queue
import threading
from select import select
from functools import partial
from tqdm import tqdm
from itertools import islice, chain
from itertools import islice
from contextlib import contextmanager
from future.moves.itertools import zip_longest
from collections import Counter
from random import choice
from builtins import map, filter
from twitter import TwitterHTTPError
@@ -37,12 +20,6 @@ from bitter.models import Following, User, ExtractorEntry, make_session
from bitter import config
# Fix Python 2.x.
try:
UNICODE_EXISTS = bool(type(unicode))
except NameError:
unicode = lambda s: str(s)
logger = logging.getLogger(__name__)
@@ -50,93 +27,39 @@ def signal_handler(signal, frame):
logger.info('You pressed Ctrl+C!')
sys.exit(0)
def chunk(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def chunk(iterable, n):
it = iter(iterable)
return iter(lambda: tuple(islice(it, n)), ())
def parallel(func, source, chunksize=1, numcpus=multiprocessing.cpu_count()):
def parallel(func, source, chunksize=0, numcpus=multiprocessing.cpu_count()):
if chunksize:
source = chunk(source, chunksize)
p = ThreadPool(numcpus*2)
results = p.imap_unordered(func, source)
for i in chain.from_iterable(results):
p = ThreadPool(numcpus)
for i in p.imap(func, source):
yield i
def get_config_path(conf=None):
if not conf:
if config.CONFIG_FILE:
conf = config.CONFIG_FILE
def get_credentials_path(credfile=None):
if not credfile:
if config.CREDENTIALS:
credfile = config.CREDENTIALS
else:
raise Exception('No valid config file')
return os.path.expanduser(conf)
def copy_credentials_to_config(credfile, conffile=None):
p = get_config_path(credfile)
with open(p) as old:
for line in old:
cred = json.loads(line.strip())
add_credentials(conffile, **cred)
def save_config(conf, conffile=None):
with config(conffile) as c:
c.clear()
c.update(conf)
raise Exception('No valid credentials file')
return os.path.expanduser(credfile)
@contextmanager
def config(conffile=None):
d = read_config(conffile)
try:
yield d
finally:
write_config(d, conffile)
def credentials_file(credfile, *args, **kwargs):
p = get_credentials_path(credfile)
with open(p, *args, **kwargs) as f:
yield f
def iter_credentials(credfile=None):
with credentials_file(credfile) as f:
for l in f:
yield json.loads(l.strip())
def read_config(conffile):
p = conffile and get_config_path(conffile)
if p:
if not os.path.exists(p):
raise IOError('{} file does not exist.'.format(p))
f = open(p, 'r')
elif 'BITTER_CONFIG' not in os.environ:
raise Exception('No config file or BITTER_CONFIG env variable.')
else:
f = io.StringIO(unicode(os.environ.get('BITTER_CONFIG', "")).strip().replace('\\n', '\n'))
return yaml.load(f) or {'credentials': []}
def write_config(conf, conffile=None):
if not conf:
conf = {'credentials': []}
if conffile:
p = get_config_path(conffile)
with open(p, 'w') as f:
yaml.dump(conf, f)
else:
os.environ['BITTER_CONFIG'] = yaml.dump(conf)
def iter_credentials(conffile=None):
with config(conffile) as c:
for i in c['credentials']:
yield i
def create_config_file(conffile=None):
if not conffile:
return
conffile = get_config_path(conffile)
with open(conffile, 'a'):
pass
write_config(None, conffile)
def get_credentials(conffile=None, inverse=False, **kwargs):
def get_credentials(credfile=None, inverse=False, **kwargs):
creds = []
for i in iter_credentials(conffile):
for i in iter_credentials(credfile):
matches = all(map(lambda x: i[x[0]] == x[1], kwargs.items()))
if matches and not inverse:
creds.append(i)
@@ -144,23 +67,24 @@ def get_credentials(conffile=None, inverse=False, **kwargs):
creds.append(i)
return creds
def create_credentials(credfile=None):
credfile = get_credentials_path(credfile)
with credentials_file(credfile, 'a'):
pass
def delete_credentials(conffile=None, **creds):
tokeep = get_credentials(conffile, inverse=True, **creds)
with config(conffile) as c:
c['credentials'] = list(tokeep)
def delete_credentials(credfile=None, **creds):
tokeep = get_credentials(credfile, inverse=True, **creds)
with credentials_file(credfile, 'w') as f:
for i in tokeep:
f.write(json.dumps(i))
f.write('\n')
def add_credentials(conffile=None, **creds):
try:
exist = get_credentials(conffile, **creds)
except IOError:
exist = False
create_config_file(conffile)
if exist:
return
with config(conffile) as c:
c['credentials'].append(creds)
def add_credentials(credfile=None, **creds):
exist = get_credentials(credfile, **creds)
if not exist:
with credentials_file(credfile, 'a') as f:
f.write(json.dumps(creds))
f.write('\n')
def get_hashtags(iter_tweets, best=None):
@@ -169,13 +93,8 @@ def get_hashtags(iter_tweets, best=None):
c.update(tag['text'] for tag in tweet.get('entities', {}).get('hashtags', {}))
return c
def read_file(filename, tail=False):
if filename == '-':
f = sys.stdin
else:
f = open(filename)
try:
with open(filename) as f:
while True:
line = f.readline()
if line not in (None, '', '\n'):
@@ -186,9 +105,6 @@ def read_file(filename, tail=False):
time.sleep(1)
else:
return
finally:
if f != sys.stdin:
close(f)
def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
@@ -218,7 +134,6 @@ def get_users(wq, ulist, by_name=False, queue=None, max_users=100):
else:
yield user
def trim_user(user):
if 'status' in user:
del user['status']
@@ -232,101 +147,104 @@ def trim_user(user):
return user
def add_user(user, dburi=None, session=None, update=False):
if not session:
session = make_session(dburi)
def add_user(session, user, enqueue=False):
user = trim_user(user)
olduser = session.query(User).filter(User.id==user['id'])
if olduser:
if not update:
return
olduser.delete()
nuser = User()
for key, value in user.items():
setattr(nuser, key, value)
user = nuser
if update:
user = User(**user)
session.add(user)
logger.debug('Adding entry')
if extract:
logging.debug('Adding entry')
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first()
if not entry:
entry = ExtractorEntry(user=user.id)
session.add(entry)
logger.debug(entry.pending)
logging.debug(entry.pending)
entry.pending = True
entry.cursor = -1
session.commit()
session.close()
def download_entry(wq, entry_id, dburi=None, recursive=False):
# TODO: adapt to the crawler
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler)
w = wq.next()
if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name
session = make_session(dburi)
if not session:
raise Exception("Provide dburi or session")
logger.info("Downloading entry: %s (%s)" % (entry_id, type(entry_id)))
entry = session.query(ExtractorEntry).filter(ExtractorEntry.id==entry_id).first()
user = session.query(User).filter(User.id == entry.user).first()
download_user(wq, session, user, entry, recursive)
session.close()
screen_names = []
user_ids = []
def classify_user(id_or_name):
try:
int(user)
user_ids.append(user)
logger.info("Added user id")
except ValueError:
logger.info("Added screen_name")
screen_names.append(user.split('@')[-1])
if user:
classify_user(user)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user)
else:
logger.info('Using pending users from last session')
def download_user(wq, session, user, entry=None, recursive=False, max_followers=50000):
nusers = list(get_users(wq, screen_names, by_name=True))
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
total_followers = user.followers_count
for i in nusers:
add_user(session, i, enqueue=True)
if total_followers > max_followers:
entry.pending = False
logger.info("Too many followers for user: %s" % user.screen_name)
session.add(entry)
session.commit()
return
if not entry:
entry = session.query(ExtractorEntry).filter(ExtractorEntry.user==user.id).first() or ExtractorEntry(user=user.id)
session.add(entry)
session.commit()
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logging.info('Total users: {}'.format(total_users))
def pending_entries():
pending = session.query(ExtractorEntry).filter(ExtractorEntry.pending == True).count()
logging.info('Pending: {}'.format(pending))
return pending
while pending_entries() > 0:
logger.info("Using account: %s" % w.name)
candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\
order_by(User.followers_count).first()
if not candidate:
break
pending = True
cursor = entry.cursor
uid = user.id
name = user.name
uid = candidate.id
uobject = session.query(User).filter(User.id==uid).first()
name = uobject.screen_name if uobject else None
logger.info("#"*20)
logger.info("Getting %s - %s" % (uid, name))
logger.info("Cursor %s" % cursor)
logger.info("Using account: %s" % wq.name)
_fetched_followers = 0
def fetched_followers():
return session.query(Following).filter(Following.isfollowed==uid).count()
attempts = 0
while cursor > 0 or fetched_followers() < total_followers:
logger.info("Pending: %s/%s" % (session.query(ExtractorEntry).filter(ExtractorEntry.pending==True).count(), total_users))
try:
resp = wq.followers.ids(user_id=uid, cursor=cursor)
except TwitterHTTPError as ex:
attempts += 1
if ex.e.code in (401, ) or attempts > 3:
if ex.e.code in (401, ):
logger.info('Not authorized for user: {}'.format(uid))
entry.errors = ex.message
break
if 'ids' not in resp:
logger.info("Error with id %s %s" % (uid, resp))
entry.pending = False
entry.errors = "No ids in response: %s" % resp
break
resp = {}
if 'ids' in resp:
logger.info("New followers: %s" % len(resp['ids']))
if recursive:
newusers = get_users(wq, resp)
for newuser in newusers:
add_user(session=session, user=newuser)
if 'ids' not in resp or not resp['ids']:
logger.info('NO IDS in response')
break
for user in newusers:
add_user(session, newuser, enqueue=True)
for i in resp['ids']:
existing_user = session.query(Following).\
filter(Following.isfollowed==uid).\
@@ -340,111 +258,32 @@ def download_user(wq, session, user, entry=None, recursive=False, max_followers=
created_at_stamp=now)
session.add(f)
logger.info("Fetched: %s/%s followers" % (fetched_followers(),
total_followers = candidate.followers_count
fetched_followers = session.query(Following).filter(Following.isfollowed==uid).count()
logger.info("Fetched: %s/%s followers" % (fetched_followers,
total_followers))
entry.cursor = resp["next_cursor"]
session.add(entry)
session.commit()
cursor = resp["next_cursor"]
if cursor > 0:
pending = True
logger.info("Getting more followers for %s" % uid)
else:
logger.info("Done getting followers for %s" % uid)
cursor = -1
pending = False
else:
logger.info("Error with id %s %s" % (uid, resp))
pending = False
entry.pending = False
entry.busy = False
session.add(entry)
entry.pending = pending
entry.cursor = cursor
logging.debug('Entry: {} - {}'.format(entry.user, entry.pending))
session.add(candidate)
session.commit()
logger.debug('Entry: {} - {}'.format(entry.user, entry.pending))
sys.stdout.flush()
def classify_user(id_or_name, screen_names, user_ids):
try:
int(id_or_name)
user_ids.append(id_or_name)
logger.debug("Added user id")
except ValueError:
logger.debug("Added screen_name")
screen_names.append(id_or_name.split('@')[-1])
def extract(wq, recursive=False, user=None, initfile=None, dburi=None, extractor_name=None):
signal.signal(signal.SIGINT, signal_handler)
if not dburi:
dburi = 'sqlite:///%s.db' % extractor_name
session = make_session(dburi)
session.query(ExtractorEntry).update({ExtractorEntry.busy: False})
session.commit()
if not (user or initfile):
logger.info('Using pending users from last session')
else:
screen_names = []
user_ids = []
if user:
classify_user(user, screen_names, user_ids)
elif initfile:
logger.info("No user. I will open %s" % initfile)
with open(initfile, 'r') as f:
for line in f:
user = line.strip().split(',')[0]
classify_user(user, screen_names, user_ids)
def missing_user(ix, column=User.screen_name):
res = session.query(User).filter(column == ix).count() == 0
if res:
logger.info("Missing user %s. Count: %s" % (ix, res))
return res
screen_names = list(filter(missing_user, screen_names))
user_ids = list(filter(partial(missing_user, column=User.id_str), user_ids))
nusers = []
logger.info("Missing user ids: %s" % user_ids)
logger.info("Missing screen names: %s" % screen_names)
if screen_names:
nusers = list(get_users(wq, screen_names, by_name=True))
if user_ids:
nusers += list(get_users(wq, user_ids, by_name=False))
for i in nusers:
add_user(dburi=dburi, user=i)
total_users = session.query(sqlalchemy.func.count(User.id)).scalar()
logger.info('Total users: {}'.format(total_users))
de = partial(download_entry, wq, dburi=dburi)
pending = pending_entries(dburi)
session.close()
for i in tqdm(parallel(de, pending), desc='Downloading users', total=total_users):
logger.info("Got %s" % i)
def pending_entries(dburi):
session = make_session(dburi)
while True:
candidate, entry = session.query(User, ExtractorEntry).\
filter(ExtractorEntry.user == User.id).\
filter(ExtractorEntry.pending == True).\
filter(ExtractorEntry.busy == False).\
order_by(User.followers_count).first()
if candidate:
entry.busy = True
session.add(entry)
session.commit()
yield int(entry.id)
continue
if session.query(ExtractorEntry).\
filter(ExtractorEntry.busy == True).count() > 0:
time.sleep(1)
continue
logger.info("No more pending entries")
break
session.close()
def get_tweet(c, tid):
return c.statuses.show(id=tid)
@@ -463,263 +302,3 @@ def get_user(c, user):
return c.users.lookup(user_id=user)[0]
except ValueError:
return c.users.lookup(screen_name=user)[0]
def download_tweet(wq, tweetid, write=True, folder="downloaded_tweets", update=False):
cached = cached_id(tweetid, folder)
tweet = None
if update or not cached:
tweet = get_tweet(wq, tweetid)
js = json.dumps(tweet)
if write:
if tweet:
write_json(js, folder)
else:
print(js)
def cached_id(oid, folder):
tweet = None
file = os.path.join(folder, '%s.json' % oid)
if os.path.exists(file) and os.path.isfile(file):
try:
# print('%s: Object exists' % oid)
with open(file) as f:
tweet = json.load(f)
except Exception as ex:
logger.error('Error getting cached version of {}: {}'.format(oid, ex))
return tweet
def write_json(js, folder, oid=None):
if not oid:
oid = js['id']
file = id_file(oid, folder)
if not os.path.exists(folder):
os.makedirs(folder)
with open(file, 'w') as f:
json.dump(js, f)
logger.info('Written {} to file {}'.format(oid, file))
def id_file(oid, folder):
return os.path.join(folder, '%s.json' % oid)
def fail_file(oid, folder):
failsfolder = os.path.join(folder, 'failed')
if not os.path.exists(failsfolder):
os.makedirs(failsfolder)
return os.path.join(failsfolder, '%s.failed' % oid)
def id_failed(oid, folder):
return os.path.isfile(fail_file(oid, folder))
def tweet_download_batch(wq, batch):
tweets = wq.statuses.lookup(_id=",".join(batch), map=True)['id']
for tid, tweet in tweets.items():
yield tid, tweet
def user_download_batch(wq, batch):
screen_names = []
user_ids = []
for elem in batch:
try:
int(elem)
user_ids.append(str(elem))
except ValueError:
screen_names.append(elem.lower())
args = {}
if user_ids:
args['user_id'] = ','.join(user_ids)
if screen_names:
args['screen_name'] = ','.join(screen_names)
try:
users = wq.users.lookup(**args)
except TwitterHTTPError as ex:
if ex.e.code in (404,):
users = []
else:
raise
found_ids = []
found_names = []
for user in users:
uid = user['id_str']
if uid in user_ids:
found_ids.append(uid)
yield (uid, user)
uname = user['screen_name'].lower()
if uname in screen_names:
found_names.append(uname)
yield (uname, user)
for uid in set(user_ids) - set(found_ids):
yield (uid, None)
for name in set(screen_names) - set(found_names):
yield (name, None)
def dump_result(oid, obj, folder, ignore_fails=True):
if obj:
try:
write_json(obj, folder=folder, oid=oid)
failed = fail_file(oid, folder)
if os.path.exists(failed):
os.remove(failed)
except Exception as ex:
logger.error('%s: %s' % (oid, ex))
if not ignore_fails:
raise
else:
logger.info('Object not recovered: {}'.format(oid))
with open(fail_file(oid, folder), 'w') as f:
print('Object not found', file=f)
def download_list(wq, lst, folder, update=False, retry_failed=False, ignore_fails=False,
batch_method=tweet_download_batch):
done = Queue()
down = Queue()
def filter_list(lst, done, down):
print('filtering')
for oid in lst:
# print('Checking {}'.format(line))
cached = cached_id(oid, folder)
if (cached and not update):
done.put((oid, cached))
elif (id_failed(oid, folder) and not retry_failed):
done.put((oid, None))
else:
down.put(oid)
down.put(None)
def download_results(batch_method, down, done):
def gen():
while True:
r = down.get()
if not r:
return
yield r
for t in parallel(batch_method, gen(), 100):
done.put(t)
def batch(*args, **kwargs):
return batch_method(wq, *args, **kwargs)
tc = threading.Thread(target=filter_list, args=(lst, done, down), daemon=True)
tc.start()
td = threading.Thread(target=download_results, args=(batch, down, done), daemon=True)
td.start()
def check_threads(ts, done):
for t in ts:
t.join()
done.put(None)
wait = threading.Thread(target=check_threads, args=([tc, td], done), daemon=True)
wait.start()
while True:
rec = done.get()
if rec is None:
break
oid, obj = rec
dump_result(oid, obj, folder, ignore_fails)
yield rec
wait.join()
def download_file(wq, csvfile, folder, column=0, delimiter=',',
header=False, quotechar='"', batch_method=tweet_download_batch,
**kwargs):
with open(csvfile) as f:
csvreader = csv.reader(f, delimiter=str(delimiter), quotechar=str(quotechar))
if header:
next(csvreader)
def reader(r):
for row in csvreader:
if len(row) > column:
yield row[column].strip()
for res in download_list(wq, reader(csvreader), folder, batch_method=batch_method,
**kwargs):
yield res
def download_timeline(wq, user):
return wq.statuses.user_timeline(id=user)
def _consume_feed(func, feed_control=None, **kwargs):
'''
Get all the tweets using pagination and a given method.
It can be controlled with the `count` parameter.
If max_count < 0 => Loop until the whole feed is consumed.
If max_count == 0 => Only call the API once, with the default values.
If max_count > 0 => Get max_count tweets from the feed.
'''
remaining = int(kwargs.pop('max_count', 0))
count = int(kwargs.get('count', -1))
limit = False
# We need to at least perform a query, so we simulate a do-while
# by running once with no limit and updating the condition at the end
with tqdm(total=remaining) as pbar:
while not limit:
if remaining > 0 and ((count < 0) or (count > remaining)):
kwargs['count'] = remaining
resp, stop = feed_control(func, kwargs, remaining=remaining, batch_size=count)
if not resp:
return
for entry in resp:
yield entry
pbar.update(len(resp))
limit = stop
if remaining < 0:
# If the loop was run with a negative remaining, it will only stop
# when the control function tells it to.
continue
# Otherwise, check if we have already downloaded all the required items
remaining -= len(resp)
limit = limit or remaining <= 0
def consume_tweets(*args, **kwargs):
return _consume_feed(*args, feed_control=_tweets_control, **kwargs)
def consume_users(*args, **kwargs):
return _consume_feed(*args, feed_control=_users_control, **kwargs)
def _tweets_control(func, apiargs, remaining=0, **kwargs):
''' Return a list of entries, the remaining '''
resp = func(**apiargs)
if not resp:
return None, True
# Update the arguments for the next call
# Two options: either resp is a list, or a dict like:
# {'statuses': ... 'search_metadata': ...}
if isinstance(resp, dict) and 'search_metadata' in resp:
resp = resp['statuses']
max_id = min(s['id'] for s in resp) - 1
apiargs['max_id'] = max_id
return resp, False
def _users_control(func, apiargs, remaining=0, **kwargs):
resp = func(**apiargs)
stop = True
# Update the arguments for the next call
if 'next_cursor' in resp:
cursor = resp['next_cursor']
apiargs['cursor'] = cursor
if int(cursor) != -1:
stop = False
return resp['users'], stop

View File

@@ -1,12 +0,0 @@
version: '2'
services:
dev:
build:
context: .
dockerfile: Dockerfile-3.4
volumes:
- '.:/usr/src/app'
tty: yes
working_dir: '/usr/src/app'
entrypoint: '/bin/bash'
command: ''

View File

@@ -2,4 +2,3 @@ sqlalchemy
twitter
click
tqdm
pyyaml

View File

@@ -1,4 +0,0 @@
[metadata]
description-file = README.md
[aliases]
test=pytest

View File

@@ -1,23 +1,29 @@
import pip
from setuptools import setup
from pip.req import parse_requirements
def parse_requirements(filename):
""" load requirements from a pip requirements file """
with open(filename, 'r') as f:
lineiter = list(line.strip() for line in f)
return [line for line in lineiter if line and not line.startswith("#")]
# parse_requirements() returns generator of pip.req.InstallRequirement objects
# pip 6 introduces the *required* session argument
try:
install_reqs = parse_requirements("requirements.txt", session=pip.download.PipSession())
py2_reqs = parse_requirements("requirements-py2.txt", session=pip.download.PipSession())
test_reqs = parse_requirements("test-requirements.txt", session=pip.download.PipSession())
except AttributeError:
install_reqs = parse_requirements("requirements.txt")
py2_reqs = parse_requirements("requirements-py2.txt")
test_reqs = parse_requirements("test-requirements.txt")
import sys
import os
import itertools
if sys.version_info <= (3, 0):
install_reqs = install_reqs + py2_reqs
install_reqs = itertools.chain(install_reqs, py2_reqs)
with open(os.path.join('bitter', 'VERSION'), 'r') as f:
__version__ = f.read().strip()
# reqs is a list of requirement
# e.g. ['django==1.5.1', 'mezzanine==1.4.6']
install_reqs = [str(ir.req) for ir in install_reqs]
test_reqs = [str(ir.req) for ir in test_reqs]
from bitter import __version__
setup(
name="bitter",
@@ -32,7 +38,7 @@ setup(
extras_require = {
'server': ['flask', 'flask-oauthlib']
},
setup_requires=['pytest-runner',],
test_suite="tests",
include_package_data=True,
entry_points="""
[console_scripts]
@@ -42,7 +48,7 @@ setup(
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
'License :: OSI Approved :: Apache Software License',
'License :: OSI Approved :: Apache 2 License',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',

View File

@@ -5,18 +5,14 @@ import types
import datetime
import time
from bitter import utils, easy
from bitter.crawlers import QueueException
from bitter import utils
from bitter.crawlers import TwitterQueue, TwitterWorker, QueueException
from bitter import config as c
class TestCrawlers(TestCase):
class TestUtils(TestCase):
def setUp(self):
CONF_PATH = os.path.join(os.path.dirname(__file__), '.bitter.yaml')
if os.path.exists(CONF_PATH):
self.wq = easy(CONF_PATH)
else:
self.wq = easy()
self.wq = TwitterQueue.from_credentials(os.path.join(os.path.dirname(__file__), 'credentials.json'))
def test_create_worker(self):
assert len(self.wq.queue)==1

View File

@@ -1,23 +0,0 @@
from unittest import TestCase
import os
import types
from bitter import utils
from bitter.models import *
from sqlalchemy import exists
class TestModels(TestCase):
def setUp(self):
self.session = make_session('sqlite://')
def test_user(self):
fake_user = User(name="Fake user", id=1548)
self.session.add(fake_user)
self.session.commit()
fake_committed = self.session.query(User).filter_by(name="Fake user").first()
assert fake_committed
self.session.delete(fake_committed)
self.session.commit()
assert not list(self.session.execute('SELECT 1 from users where id=\'%s\'' % 1548))

View File

@@ -8,63 +8,54 @@ from bitter import config as c
class TestUtils(TestCase):
configfile = '/tmp/bitter.yaml'
def setUp(self):
c.CONFIG_FILE = self.configfile
if os.path.exists(self.configfile):
os.remove(self.configfile)
assert not os.path.exists(self.configfile)
utils.create_config_file(self.configfile)
assert os.path.exists(self.configfile)
self.credfile = '/tmp/credentials.txt'
c.CREDENTIALS = self.credfile
if os.path.exists(self.credfile):
os.remove(self.credfile)
utils.create_credentials(self.credfile)
def test_create_credentials(self):
assert os.path.exists(self.credfile)
os.remove(self.credfile)
utils.create_credentials() # From config
assert os.path.exists(self.credfile)
def test_add_credentials(self):
utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(self.configfile)
assert utils.get_credentials(self.configfile, user="test")
assert list(utils.get_credentials(self.configfile, user="test"))[0]["user"] == "test"
utils.add_credentials(self.credfile, user="test")
assert utils.get_credentials(self.credfile)
assert utils.get_credentials(user="test")
assert list(utils.get_credentials(user="test"))[0]["user"] == "test"
def test_get_credentials(self):
utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(self.configfile, user="test")
assert not utils.get_credentials(self.configfile, user="test", inverse=True)
utils.add_credentials(self.credfile, user="test")
assert utils.get_credentials(user="test")
assert not utils.get_credentials(user="test", inverse=True)
def test_add_two_credentials(self):
utils.add_credentials(self.configfile, user="test")
utils.add_credentials(self.configfile, user="test2")
assert utils.get_credentials(self.configfile, user="test")
assert utils.get_credentials(self.configfile, user="test2")
utils.add_credentials(self.credfile, user="test")
utils.add_credentials(self.credfile, user="test2")
assert utils.get_credentials(user="test")
assert utils.get_credentials(user="test2")
def test_delete_credentials(self):
utils.add_credentials(self.configfile, user="test")
assert utils.get_credentials(self.configfile, user="test")
utils.delete_credentials(self.configfile, user="test")
assert not utils.get_credentials(self.configfile, user="test")
utils.add_credentials(self.credfile, user="test")
assert utils.get_credentials(user="test")
utils.delete_credentials(user="test")
print(utils.get_credentials())
assert not utils.get_credentials(user="test")
def test_parallel(self):
import time
def echo(i):
time.sleep(0.5)
time.sleep(2)
return i
tic = time.time()
resp = utils.parallel(echo, [1,2,3])
assert isinstance(resp, types.GeneratorType)
assert list(resp) == [1,2,3]
toc = time.time()
assert (tic-toc) < 600
resp2 = utils.parallel(echo, [1,2,3,4], chunksize=2)
assert list(resp2) == [1,2, 3,4]
assert (tic-toc) < 6000
class TestUtilsEnv(TestUtils):
configfile = None
def setUp(self):
if 'BITTER_CONFIG' in os.environ:
self.oldenv = os.environ['BITTER_CONFIG']
os.environ['BITTER_CONFIG'] = ''
def tearDown(self):
if hasattr(self, 'oldenv'):
os.environ['BITTER_CONFIG'] = self.oldenv