Introduce storage module
Add a storage module to store all received geopoints into a sqlite3 database.
This commit is contained in:
parent
d67f3c1cc3
commit
d534856fb6
|
@ -12,6 +12,8 @@ type Location struct {
|
|||
Alt float32 `json:"alt"`
|
||||
Acc float32 `json:"acc"`
|
||||
Velocity float32 `json:"vel"`
|
||||
// Bearing is currently not pushed via Owntracks
|
||||
Bearing float32
|
||||
}
|
||||
|
||||
// Distance computes the distance (in meters) between two points
|
||||
|
|
|
@ -19,10 +19,12 @@ type Config struct {
|
|||
mqttRemote string
|
||||
mqttClientId string
|
||||
mqttTopic string
|
||||
db string
|
||||
}
|
||||
|
||||
var config Config
|
||||
var mqtt MQTTReceiver
|
||||
var db Storage
|
||||
var devices map[string]Location
|
||||
|
||||
// SetDefaults sets the configuration defaults
|
||||
|
@ -32,16 +34,26 @@ func (c *Config) SetDefaults() {
|
|||
c.mqttRemote = "127.0.0.1"
|
||||
c.mqttTopic = "owntracks/#"
|
||||
c.mqttClientId = "ot-browser"
|
||||
c.db = ""
|
||||
}
|
||||
|
||||
func mqttRecv(id string, loc Location) {
|
||||
if !loc.IsPlausible() { // Ignore stupid locations
|
||||
return
|
||||
}
|
||||
if old, ok := devices[id]; ok { // if we already have a location
|
||||
|
||||
// Write all datapoints before doing additional checks on known devices
|
||||
if config.db != "" {
|
||||
if err := db.InsertLocation(id, loc); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error writing location to database: %s\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Additional checks on known devices
|
||||
if old, ok := devices[id]; ok {
|
||||
// Ignore, if the new location is less precise, and the old one lies within it's accuracy
|
||||
// This might happen, if there is a provider switch on the phone (e.g. GPX -> coarse location)
|
||||
if loc.Acc > old.Acc {
|
||||
// Ignore, if the new location is less precise, and the old one lies within it's accuracy
|
||||
// This might happen, if there is a provider switch on the phone (e.g. GPX -> coarse location)
|
||||
distance := old.Distance(loc)
|
||||
if distance <= loc.Acc {
|
||||
return
|
||||
|
@ -66,6 +78,7 @@ func parseProgramArguments() error {
|
|||
fmt.Println(" -b,--bind ADDR Bind webserver to ADDR")
|
||||
fmt.Println(" --mqtt ADDR Set MQTT remote address")
|
||||
fmt.Println(" --clientid CLIENTID Set MQTT client id")
|
||||
fmt.Println(" --db FILENAME Set storage database")
|
||||
os.Exit(0)
|
||||
} else if arg == "-w" || arg == "--www" {
|
||||
i++
|
||||
|
@ -84,6 +97,9 @@ func parseProgramArguments() error {
|
|||
} else if arg == "--clientid" {
|
||||
i++
|
||||
config.mqttClientId = args[i]
|
||||
} else if arg == "--db" {
|
||||
i++
|
||||
config.db = args[i]
|
||||
} else {
|
||||
return fmt.Errorf("invalid argument: %s", arg)
|
||||
}
|
||||
|
@ -92,17 +108,28 @@ func parseProgramArguments() error {
|
|||
}
|
||||
|
||||
func main() {
|
||||
var err error
|
||||
devices = make(map[string]Location, 0)
|
||||
config.SetDefaults()
|
||||
parseProgramArguments()
|
||||
|
||||
// Setup database
|
||||
if config.db != "" {
|
||||
db, err = CreateDatabase(config.db)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "database error: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Connect mqtt
|
||||
if config.mqttRemote == "" {
|
||||
fmt.Fprintf(os.Stderr, "No mqtt remote set\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
mqtt.Received = mqttRecv
|
||||
if err := mqtt.Connect(config.mqttRemote, config.mqttTopic, "", "", config.mqttClientId); err != nil {
|
||||
if err = mqtt.Connect(config.mqttRemote, config.mqttTopic, "", "", config.mqttClientId); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "%s\n", err)
|
||||
fmt.Fprintf(os.Stderr, "error: mqtt connection failed\n")
|
||||
os.Exit(1)
|
||||
|
@ -135,6 +162,7 @@ func (c *Config) ReadFile(filename string) error {
|
|||
if val := mqtt.Key("clientid").String(); val != "" {
|
||||
c.mqttClientId = val
|
||||
}
|
||||
|
||||
www := cfg.Section("www")
|
||||
if val := www.Key("remote").String(); val != "" {
|
||||
c.wwwDir = val
|
||||
|
@ -142,6 +170,11 @@ func (c *Config) ReadFile(filename string) error {
|
|||
if val := www.Key("bind").String(); val != "" {
|
||||
c.bindAddr = val
|
||||
}
|
||||
|
||||
storage := cfg.Section("storage")
|
||||
if val := storage.Key("database").String(); val != "" {
|
||||
c.db = val
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
83
cmd/ot-browser/storage.go
Normal file
83
cmd/ot-browser/storage.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
type Storage struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func (stor *Storage) init() error {
|
||||
// initialization
|
||||
sql_table := `
|
||||
CREATE TABLE IF NOT EXISTS geopoints (
|
||||
device VARCHAR(256), timestamp INTEGER, lon REAL, lat REAL, alt REAL, acc REAL, velocity REAL, bearing REAL, PRIMARY KEY(device, timestamp)
|
||||
);
|
||||
`
|
||||
tx, err := stor.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.Exec(sql_table); err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (stor *Storage) IsReady() bool {
|
||||
return stor.db != nil
|
||||
}
|
||||
|
||||
func CreateDatabase(filename string) (Storage, error) {
|
||||
var stor Storage
|
||||
var err error
|
||||
stor.db, err = sql.Open("sqlite3", filename)
|
||||
if err != nil {
|
||||
return stor, err
|
||||
}
|
||||
|
||||
return stor, stor.init()
|
||||
}
|
||||
|
||||
func (stor *Storage) InsertLocation(device string, loc Location) error {
|
||||
tx, err := stor.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sql := "INSERT INTO geopoints (device, timestamp, lon, lat, alt, acc, velocity, bearing) VALUES (?,?,?,?,?,?,?,?)"
|
||||
stmt, err := tx.Prepare(sql)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = stmt.Exec(device, loc.Timestamp, loc.Lon, loc.Lat, loc.Alt, loc.Acc, loc.Velocity, loc.Bearing)
|
||||
if err != nil {
|
||||
stmt.Close()
|
||||
return err
|
||||
}
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
func (stor *Storage) GetLastLocation(device string) (Location, error) {
|
||||
var loc Location
|
||||
sql := "SELECT timestamp, lon, lat, alt, acc, velocity, bearing FROM geopoints WHERE device LIKE ? ORDER BY timestamp DESC LIMIT 1"
|
||||
stmt, err := stor.db.Prepare(sql)
|
||||
if err != nil {
|
||||
return loc, err
|
||||
}
|
||||
defer stmt.Close()
|
||||
rows, err := stmt.Query(device)
|
||||
if err != nil {
|
||||
return loc, err
|
||||
}
|
||||
defer rows.Close()
|
||||
if rows.Next() {
|
||||
err := rows.Scan(&loc.Timestamp, &loc.Lon, &loc.Lat, &loc.Alt, &loc.Acc, &loc.Velocity, &loc.Bearing)
|
||||
return loc, err
|
||||
}
|
||||
return loc, nil
|
||||
}
|
63
cmd/ot-browser/storage_test.go
Normal file
63
cmd/ot-browser/storage_test.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const DATABASE = ":memory:"
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
if _, err := os.Stat(DATABASE); err == nil {
|
||||
fmt.Fprintf(os.Stderr, "error: test database exists already\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
// Run tests
|
||||
ret := m.Run()
|
||||
if DATABASE != ":memory:" {
|
||||
os.Remove(DATABASE) // Ignore errors
|
||||
}
|
||||
os.Exit(ret)
|
||||
}
|
||||
|
||||
func TestLocation(t *testing.T) {
|
||||
var ref Location
|
||||
|
||||
stor, err := CreateDatabase(DATABASE)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
device1 := "test1"
|
||||
t0 := time.Now().Unix()
|
||||
for i := 0; i < 360; i++ {
|
||||
ref.Timestamp = t0 + int64(i) // Never let it be zerom otherwise the check below will fail!
|
||||
ref.Lon = float32(i - 180)
|
||||
ref.Lat = float32((-90 + i) % 180)
|
||||
ref.Alt = float32(-100 + i*2)
|
||||
ref.Acc = float32(i % 2 * 10)
|
||||
ref.Velocity = float32(i / 1000.0)
|
||||
ref.Bearing = 0
|
||||
|
||||
if err := stor.InsertLocation(device1, ref); err != nil {
|
||||
t.Errorf("InsertLocation failed: %s", err)
|
||||
return
|
||||
}
|
||||
loc, err := stor.GetLastLocation(device1)
|
||||
if err != nil {
|
||||
t.Errorf("GetLastLocation failed: %s", err)
|
||||
return
|
||||
}
|
||||
if loc.Timestamp == 0 {
|
||||
t.Error("GetLastLocation returns an empty result", err)
|
||||
return
|
||||
}
|
||||
if loc != ref {
|
||||
t.Errorf("Fetched location doesn't match reference")
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
}
|
1
go.mod
1
go.mod
|
@ -4,6 +4,7 @@ go 1.15
|
|||
|
||||
require (
|
||||
github.com/eclipse/paho.mqtt.golang v1.3.4
|
||||
github.com/mattn/go-sqlite3 v1.14.22
|
||||
github.com/smartystreets/goconvey v1.6.4 // indirect
|
||||
gopkg.in/ini.v1 v1.62.0
|
||||
)
|
||||
|
|
2
go.sum
2
go.sum
|
@ -6,6 +6,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
|
|||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
|
||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
|
||||
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
|
||||
|
|
|
@ -7,3 +7,6 @@ clientid = ot-browser
|
|||
[www]
|
||||
dir = ./www
|
||||
bind = 127.0.0.1:8090
|
||||
|
||||
[storage]
|
||||
database = otb.db
|
Loading…
Reference in a new issue