You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

186 lines
3.7 KiB
Go

package main
import (
"bufio"
"database/sql"
"flag"
"fmt"
"log"
"os"
"path"
"runtime/pprof"
"strings"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
)
var lastChecked, started time.Time
var lastProgress int64
var curSpeed, avgSpeed float64
func updateStatus(lines, current, total int64) {
// fmt.Printf("\033[0;0H")
// os.Stdout.Write([]byte("\033[F")) //back to previous line
// os.Stdout.Write([]byte("\033[K")) //clear line
if !lastChecked.IsZero() {
curSpeed = float64(current-lastProgress) / time.Since(lastChecked).Seconds()
}
avgSpeed = float64((current)) / time.Since(started).Seconds()
lastChecked = time.Now()
lastProgress = current
fmt.Printf("%10d/%-10d Bytes (%7.3f%%) %d lines - %.2f Bps (avg. %.2f Bps)\n",
current,
total,
100*float32(current)/float32(total),
lines,
curSpeed,
avgSpeed)
}
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
func main() {
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
filename := flag.Args()[0]
var pos int64
f, err := os.Open(filename)
checkErr(err)
stat, err := f.Stat()
checkErr(err)
total := stat.Size()
started = time.Now()
users := make(chan []string, 1000)
wg := sync.WaitGroup{}
var counter int64
timer := time.NewTicker(time.Second)
doneChan := make(chan bool)
stopMetrics := make(chan bool)
go func() {
for {
select {
case <-timer.C:
updateStatus(counter, pos, total)
case <-stopMetrics:
updateStatus(counter, pos, total)
doneChan <- true
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
scanner := bufio.NewScanner(f)
scanLines := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = bufio.ScanLines(data, atEOF)
pos += int64(advance)
return
}
scanner.Split(scanLines)
var line string
var tokens []string
for scanner.Scan() {
counter++
line = scanner.Text()
tokens = strings.Split(line, "\t")
if len(tokens) != 2 {
fmt.Println("Wrong line: ", line, len(line))
continue
}
users <- tokens[:2]
}
close(users)
fmt.Println("Done with file")
}()
commitTimer := time.NewTicker(30 * time.Second)
defer commitTimer.Stop()
wg.Add(1)
go func() {
bname := path.Base(filename)
defer wg.Done()
db, err := sql.Open("sqlite3", fmt.Sprintf("./%s.go.db", bname))
checkErr(err)
db.SetMaxOpenConns(1)
_, err = db.Exec("PRAGMA journal_mode=MEMORY;")
checkErr(err)
_, err = db.Exec("CREATE TABLE IF NOT EXISTS followers (user int, follower int)")
checkErr(err)
_, err = db.Exec("CREATE INDEX IF NOT EXISTS followers_user ON followers(user)")
checkErr(err)
_, err = db.Exec("CREATE INDEX IF NOT EXISTS followers_follower ON followers(follower)")
checkErr(err)
_, err = db.Exec("CREATE UNIQUE INDEX IF NOT EXISTS followers_unique ON followers(user, follower)")
checkErr(err)
// # Using transactions
insertStmt, err := db.Prepare("INSERT OR IGNORE INTO followers VALUES (?, ?)")
checkErr(err)
tx, err := db.Begin()
checkErr(err)
var pair []string
var ok bool
txst := tx.Stmt(insertStmt)
loop:
for {
select {
case pair, ok = <-users:
if !ok {
fmt.Println("No more pairs")
break loop
}
_, err = txst.Exec(pair[0], pair[1])
checkErr(err)
case <-commitTimer.C:
tx.Commit()
tx, err = db.Begin()
checkErr(err)
txst = tx.Stmt(insertStmt)
}
}
err = tx.Commit()
checkErr(err)
db.Close()
fmt.Println("Done writing SQL")
}()
wg.Wait()
fmt.Println("Stopping timer")
timer.Stop()
stopMetrics <- true
<-doneChan // Wait for metrics to finish
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}