You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
186 lines
3.7 KiB
Go
186 lines
3.7 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"database/sql"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"path"
|
|
"runtime/pprof"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
var lastChecked, started time.Time
|
|
var lastProgress int64
|
|
var curSpeed, avgSpeed float64
|
|
|
|
func updateStatus(lines, current, total int64) {
|
|
// fmt.Printf("\033[0;0H")
|
|
// os.Stdout.Write([]byte("\033[F")) //back to previous line
|
|
// os.Stdout.Write([]byte("\033[K")) //clear line
|
|
if !lastChecked.IsZero() {
|
|
curSpeed = float64(current-lastProgress) / time.Since(lastChecked).Seconds()
|
|
}
|
|
avgSpeed = float64((current)) / time.Since(started).Seconds()
|
|
lastChecked = time.Now()
|
|
lastProgress = current
|
|
|
|
fmt.Printf("%10d/%-10d Bytes (%7.3f%%) %d lines - %.2f Bps (avg. %.2f Bps)\n",
|
|
current,
|
|
total,
|
|
100*float32(current)/float32(total),
|
|
lines,
|
|
curSpeed,
|
|
avgSpeed)
|
|
|
|
}
|
|
|
|
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
if *cpuprofile != "" {
|
|
f, err := os.Create(*cpuprofile)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
pprof.StartCPUProfile(f)
|
|
defer pprof.StopCPUProfile()
|
|
}
|
|
filename := flag.Args()[0]
|
|
|
|
var pos int64
|
|
|
|
f, err := os.Open(filename)
|
|
checkErr(err)
|
|
|
|
stat, err := f.Stat()
|
|
checkErr(err)
|
|
total := stat.Size()
|
|
|
|
started = time.Now()
|
|
users := make(chan []string, 1000)
|
|
wg := sync.WaitGroup{}
|
|
|
|
var counter int64
|
|
|
|
timer := time.NewTicker(time.Second)
|
|
doneChan := make(chan bool)
|
|
stopMetrics := make(chan bool)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-timer.C:
|
|
updateStatus(counter, pos, total)
|
|
case <-stopMetrics:
|
|
updateStatus(counter, pos, total)
|
|
doneChan <- true
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
scanner := bufio.NewScanner(f)
|
|
|
|
scanLines := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
|
|
advance, token, err = bufio.ScanLines(data, atEOF)
|
|
pos += int64(advance)
|
|
return
|
|
}
|
|
scanner.Split(scanLines)
|
|
var line string
|
|
var tokens []string
|
|
for scanner.Scan() {
|
|
counter++
|
|
line = scanner.Text()
|
|
tokens = strings.Split(line, "\t")
|
|
if len(tokens) != 2 {
|
|
fmt.Println("Wrong line: ", line, len(line))
|
|
continue
|
|
}
|
|
users <- tokens[:2]
|
|
}
|
|
close(users)
|
|
fmt.Println("Done with file")
|
|
}()
|
|
|
|
commitTimer := time.NewTicker(30 * time.Second)
|
|
defer commitTimer.Stop()
|
|
wg.Add(1)
|
|
go func() {
|
|
bname := path.Base(filename)
|
|
defer wg.Done()
|
|
|
|
db, err := sql.Open("sqlite3", fmt.Sprintf("./%s.go.db", bname))
|
|
checkErr(err)
|
|
db.SetMaxOpenConns(1)
|
|
_, err = db.Exec("PRAGMA journal_mode=MEMORY;")
|
|
checkErr(err)
|
|
|
|
_, err = db.Exec("CREATE TABLE IF NOT EXISTS followers (user int, follower int)")
|
|
checkErr(err)
|
|
_, err = db.Exec("CREATE INDEX IF NOT EXISTS followers_user ON followers(user)")
|
|
checkErr(err)
|
|
_, err = db.Exec("CREATE INDEX IF NOT EXISTS followers_follower ON followers(follower)")
|
|
checkErr(err)
|
|
_, err = db.Exec("CREATE UNIQUE INDEX IF NOT EXISTS followers_unique ON followers(user, follower)")
|
|
checkErr(err)
|
|
|
|
// # Using transactions
|
|
insertStmt, err := db.Prepare("INSERT OR IGNORE INTO followers VALUES (?, ?)")
|
|
checkErr(err)
|
|
tx, err := db.Begin()
|
|
checkErr(err)
|
|
var pair []string
|
|
var ok bool
|
|
txst := tx.Stmt(insertStmt)
|
|
loop:
|
|
for {
|
|
select {
|
|
case pair, ok = <-users:
|
|
if !ok {
|
|
fmt.Println("No more pairs")
|
|
break loop
|
|
}
|
|
_, err = txst.Exec(pair[0], pair[1])
|
|
checkErr(err)
|
|
case <-commitTimer.C:
|
|
tx.Commit()
|
|
tx, err = db.Begin()
|
|
checkErr(err)
|
|
txst = tx.Stmt(insertStmt)
|
|
}
|
|
}
|
|
|
|
err = tx.Commit()
|
|
checkErr(err)
|
|
|
|
db.Close()
|
|
fmt.Println("Done writing SQL")
|
|
}()
|
|
|
|
wg.Wait()
|
|
|
|
fmt.Println("Stopping timer")
|
|
timer.Stop()
|
|
stopMetrics <- true
|
|
|
|
<-doneChan // Wait for metrics to finish
|
|
|
|
}
|
|
|
|
func checkErr(err error) {
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|