First commit
commit
0bebed2ff7
@ -0,0 +1,3 @@
|
||||
.*
|
||||
*.tsv
|
||||
*.net
|
@ -0,0 +1,48 @@
|
||||
import sys
|
||||
import os
|
||||
import os.path
|
||||
import sqlite3
|
||||
|
||||
|
||||
def addusers(c, ufrom, uto):
|
||||
# Insert a row of data
|
||||
c.execute("INSERT OR IGNORE INTO followers VALUES (?, ?)", [ufrom, uto])
|
||||
|
||||
|
||||
def update_progress(i, current, total):
|
||||
sys.stdout.write("\033[F") #back to previous line
|
||||
sys.stdout.write("\033[K") #clear line
|
||||
print('Done: {} lines. {} / {} bytes ({} %)'.format(i, current, total, 100.0*current/float(total)))
|
||||
|
||||
|
||||
def main(infile):
|
||||
conn = sqlite3.connect('%s.py.db' % os.path.basename(infile))
|
||||
# Create table
|
||||
conn.execute('''CREATE TABLE IF NOT EXISTS followers
|
||||
(user int, follower int)''')
|
||||
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS followersindex ON followers(user, follower) ")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS followersindex_follower ON followers(follower) ")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS followersindex_user ON followers(user) ")
|
||||
|
||||
with open(infile) as f:
|
||||
total = os.fstat(f.fileno()).st_size
|
||||
for i, line in enumerate(f):
|
||||
tokens = line.strip().split('\t')
|
||||
if len(tokens) != 2:
|
||||
print('Wrong line: ', i, tokens)
|
||||
continue
|
||||
addusers(conn, tokens[0], tokens[1])
|
||||
|
||||
if i % 10000 == 0:
|
||||
conn.commit()
|
||||
update_progress(i, f.tell(), total)
|
||||
# Save (commit) the changes
|
||||
conn.commit()
|
||||
|
||||
# We can also close the connection if we are done with it.
|
||||
# Just be sure any changes have been committed or they will be lost.
|
||||
conn.close()
|
||||
update_progress(i, total, total)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(sys.argv[1])
|
@ -0,0 +1,185 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"database/sql"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"runtime/pprof"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
var lastChecked, started time.Time
|
||||
var lastProgress int64
|
||||
var curSpeed, avgSpeed float64
|
||||
|
||||
func updateStatus(lines, current, total int64) {
|
||||
// fmt.Printf("\033[0;0H")
|
||||
// os.Stdout.Write([]byte("\033[F")) //back to previous line
|
||||
// os.Stdout.Write([]byte("\033[K")) //clear line
|
||||
if !lastChecked.IsZero() {
|
||||
curSpeed = float64(current-lastProgress) / time.Since(lastChecked).Seconds()
|
||||
}
|
||||
avgSpeed = float64((current)) / time.Since(started).Seconds()
|
||||
lastChecked = time.Now()
|
||||
lastProgress = current
|
||||
|
||||
fmt.Printf("%10d/%-10d Bytes (%7.3f%%) %d lines - %.2f Bps (avg. %.2f Bps)\n",
|
||||
current,
|
||||
total,
|
||||
100*float32(current)/float32(total),
|
||||
lines,
|
||||
curSpeed,
|
||||
avgSpeed)
|
||||
|
||||
}
|
||||
|
||||
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
if *cpuprofile != "" {
|
||||
f, err := os.Create(*cpuprofile)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
pprof.StartCPUProfile(f)
|
||||
defer pprof.StopCPUProfile()
|
||||
}
|
||||
filename := flag.Args()[0]
|
||||
|
||||
var pos int64
|
||||
|
||||
f, err := os.Open(filename)
|
||||
checkErr(err)
|
||||
|
||||
stat, err := f.Stat()
|
||||
checkErr(err)
|
||||
total := stat.Size()
|
||||
|
||||
started = time.Now()
|
||||
users := make(chan []string, 1000)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
var counter int64
|
||||
|
||||
timer := time.NewTicker(time.Second)
|
||||
doneChan := make(chan bool)
|
||||
stopMetrics := make(chan bool)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
updateStatus(counter, pos, total)
|
||||
case <-stopMetrics:
|
||||
updateStatus(counter, pos, total)
|
||||
doneChan <- true
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
scanner := bufio.NewScanner(f)
|
||||
|
||||
scanLines := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
|
||||
advance, token, err = bufio.ScanLines(data, atEOF)
|
||||
pos += int64(advance)
|
||||
return
|
||||
}
|
||||
scanner.Split(scanLines)
|
||||
var line string
|
||||
var tokens []string
|
||||
for scanner.Scan() {
|
||||
counter++
|
||||
line = scanner.Text()
|
||||
tokens = strings.Split(line, "\t")
|
||||
if len(tokens) != 2 {
|
||||
fmt.Println("Wrong line: ", line, len(line))
|
||||
continue
|
||||
}
|
||||
users <- tokens[:2]
|
||||
}
|
||||
close(users)
|
||||
fmt.Println("Done with file")
|
||||
}()
|
||||
|
||||
commitTimer := time.NewTicker(30 * time.Second)
|
||||
defer commitTimer.Stop()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
bname := path.Base(filename)
|
||||
defer wg.Done()
|
||||
|
||||
db, err := sql.Open("sqlite3", fmt.Sprintf("./%s.go.db", bname))
|
||||
checkErr(err)
|
||||
db.SetMaxOpenConns(1)
|
||||
_, err = db.Exec("PRAGMA journal_mode=MEMORY;")
|
||||
checkErr(err)
|
||||
|
||||
_, err = db.Exec("CREATE TABLE IF NOT EXISTS followers (user int, follower int)")
|
||||
checkErr(err)
|
||||
_, err = db.Exec("CREATE INDEX IF NOT EXISTS followers_user ON followers(user)")
|
||||
checkErr(err)
|
||||
_, err = db.Exec("CREATE INDEX IF NOT EXISTS followers_follower ON followers(follower)")
|
||||
checkErr(err)
|
||||
_, err = db.Exec("CREATE UNIQUE INDEX IF NOT EXISTS followers_unique ON followers(user, follower)")
|
||||
checkErr(err)
|
||||
|
||||
// # Using transactions
|
||||
insertStmt, err := db.Prepare("INSERT OR IGNORE INTO followers VALUES (?, ?)")
|
||||
checkErr(err)
|
||||
tx, err := db.Begin()
|
||||
checkErr(err)
|
||||
var pair []string
|
||||
var ok bool
|
||||
txst := tx.Stmt(insertStmt)
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case pair, ok = <-users:
|
||||
if !ok {
|
||||
fmt.Println("No more pairs")
|
||||
break loop
|
||||
}
|
||||
_, err = txst.Exec(pair[0], pair[1])
|
||||
checkErr(err)
|
||||
case <-commitTimer.C:
|
||||
tx.Commit()
|
||||
tx, err = db.Begin()
|
||||
checkErr(err)
|
||||
txst = tx.Stmt(insertStmt)
|
||||
}
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
checkErr(err)
|
||||
|
||||
db.Close()
|
||||
fmt.Println("Done writing SQL")
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
fmt.Println("Stopping timer")
|
||||
timer.Stop()
|
||||
stopMetrics <- true
|
||||
|
||||
<-doneChan // Wait for metrics to finish
|
||||
|
||||
}
|
||||
|
||||
func checkErr(err error) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
Binary file not shown.
@ -0,0 +1,5 @@
|
||||
#!/bin/sh
|
||||
DBFILE="$1.cli.db"
|
||||
sqlite3 $DBFILE 'create table if not exists followers (user integer, follower integer)'
|
||||
sqlite3 $DBFILE '.mode tabs' ".import $1 followers"
|
||||
sqlite3 $DBFILE 'create index if not exists followers_user ON followers(user);' 'create index if not exists followers_follower on followers(follower);' 'create unique index if not exists followers_unique on followers(user,follower);'
|
Loading…
Reference in New Issue