Compare commits

...

4 commits

Author SHA1 Message Date
Felix Niederwanger bf56691bd8
Introduce storage module
Add a storage module for storing all received datapoints.
2024-02-12 08:54:53 +01:00
Felix Niederwanger d67f3c1cc3 Merge pull request 'Improve mqtt' (#5) from mqtt into main
Reviewed-on: https://codeberg.org/grisu48/ot-browser/pulls/5
2023-12-04 21:06:14 +00:00
Felix Niederwanger 932fa7e376
Add health endpoint
Adds a health endpoint for the service.
2023-12-04 22:05:06 +01:00
Felix Niederwanger 1b1419961b
Fail on mqtt connection failure
Fail to start the service, if the initial mqtt connection fails
2023-12-04 22:03:36 +01:00
7 changed files with 111 additions and 13 deletions

View file

@ -12,6 +12,8 @@ type Location struct {
Alt float32 `json:"alt"` Alt float32 `json:"alt"`
Acc float32 `json:"acc"` Acc float32 `json:"acc"`
Velocity float32 `json:"vel"` Velocity float32 `json:"vel"`
// Bearing is currently not pushed via Owntracks
Bearing float32
} }
// Distance computes the distance (in meters) between two points // Distance computes the distance (in meters) between two points

View file

@ -19,10 +19,12 @@ type Config struct {
mqttRemote string mqttRemote string
mqttClientId string mqttClientId string
mqttTopic string mqttTopic string
db string
} }
var config Config var config Config
var mqtt MQTTReceiver var mqtt MQTTReceiver
var db Storage
var devices map[string]Location var devices map[string]Location
// SetDefaults sets the configuration defaults // SetDefaults sets the configuration defaults
@ -32,16 +34,26 @@ func (c *Config) SetDefaults() {
c.mqttRemote = "127.0.0.1" c.mqttRemote = "127.0.0.1"
c.mqttTopic = "owntracks/#" c.mqttTopic = "owntracks/#"
c.mqttClientId = "ot-browser" c.mqttClientId = "ot-browser"
c.db = ""
} }
func mqttRecv(id string, loc Location) { func mqttRecv(id string, loc Location) {
if !loc.IsPlausible() { // Ignore stupid locations if !loc.IsPlausible() { // Ignore stupid locations
return return
} }
if old, ok := devices[id]; ok { // if we already have a location
// Write all datapoints before doing additional checks on known devices
if config.db != "" {
if err := db.InsertLocation(id, loc); err != nil {
fmt.Fprintf(os.Stderr, "error writing location to database: %s\n", err)
}
}
// Additional checks on known devices
if old, ok := devices[id]; ok {
// Ignore, if the new location is less precise, and the old one lies within it's accuracy
// This might happen, if there is a provider switch on the phone (e.g. GPX -> coarse location)
if loc.Acc > old.Acc { if loc.Acc > old.Acc {
// Ignore, if the new location is less precise, and the old one lies within it's accuracy
// This might happen, if there is a provider switch on the phone (e.g. GPX -> coarse location)
distance := old.Distance(loc) distance := old.Distance(loc)
if distance <= loc.Acc { if distance <= loc.Acc {
return return
@ -66,6 +78,7 @@ func parseProgramArguments() error {
fmt.Println(" -b,--bind ADDR Bind webserver to ADDR") fmt.Println(" -b,--bind ADDR Bind webserver to ADDR")
fmt.Println(" --mqtt ADDR Set MQTT remote address") fmt.Println(" --mqtt ADDR Set MQTT remote address")
fmt.Println(" --clientid CLIENTID Set MQTT client id") fmt.Println(" --clientid CLIENTID Set MQTT client id")
fmt.Println(" --db FILENAME Set storage database")
os.Exit(0) os.Exit(0)
} else if arg == "-w" || arg == "--www" { } else if arg == "-w" || arg == "--www" {
i++ i++
@ -84,6 +97,9 @@ func parseProgramArguments() error {
} else if arg == "--clientid" { } else if arg == "--clientid" {
i++ i++
config.mqttClientId = args[i] config.mqttClientId = args[i]
} else if arg == "--db" {
i++
config.db = args[i]
} else { } else {
return fmt.Errorf("invalid argument: %s", arg) return fmt.Errorf("invalid argument: %s", arg)
} }
@ -92,19 +108,30 @@ func parseProgramArguments() error {
} }
func main() { func main() {
fmt.Println("ot-browser") var err error
devices = make(map[string]Location, 0) devices = make(map[string]Location, 0)
config.SetDefaults() config.SetDefaults()
parseProgramArguments() parseProgramArguments()
// Setup database
if config.db != "" {
db, err = CreateDatabase(config.db)
if err != nil {
fmt.Fprintf(os.Stderr, "database error: %s\n", err)
os.Exit(1)
}
}
// Connect mqtt // Connect mqtt
if config.mqttRemote == "" { if config.mqttRemote == "" {
fmt.Fprintf(os.Stderr, "No mqtt remote set\n") fmt.Fprintf(os.Stderr, "No mqtt remote set\n")
os.Exit(1) os.Exit(1)
} }
mqtt.Received = mqttRecv mqtt.Received = mqttRecv
if err := mqtt.Connect(config.mqttRemote, config.mqttTopic, "", "", config.mqttClientId); err != nil { if err = mqtt.Connect(config.mqttRemote, config.mqttTopic, "", "", config.mqttClientId); err != nil {
fmt.Fprintf(os.Stderr, "mqtt error: %s\n", err) fmt.Fprintf(os.Stderr, "%s\n", err)
fmt.Fprintf(os.Stderr, "error: mqtt connection failed\n")
os.Exit(1) os.Exit(1)
} }
@ -113,9 +140,11 @@ func main() {
http.Handle("/", http.StripPrefix("/", fs)) http.Handle("/", http.StripPrefix("/", fs))
http.HandleFunc("/devices", handlerDevices) http.HandleFunc("/devices", handlerDevices)
http.HandleFunc("/devices.json", handlerDevices) http.HandleFunc("/devices.json", handlerDevices)
http.HandleFunc("/health.json", handlerHealth)
http.HandleFunc("/health", handlerHealth)
http.HandleFunc("/locations", handlerLocations) http.HandleFunc("/locations", handlerLocations)
http.HandleFunc("/devices/", handlerDeviceQuery) http.HandleFunc("/devices/", handlerDeviceQuery)
fmt.Println("Serving: http://" + config.bindAddr) fmt.Println("ot-browser serving: http://" + config.bindAddr)
log.Fatal(http.ListenAndServe(config.bindAddr, nil)) log.Fatal(http.ListenAndServe(config.bindAddr, nil))
} }
@ -133,6 +162,7 @@ func (c *Config) ReadFile(filename string) error {
if val := mqtt.Key("clientid").String(); val != "" { if val := mqtt.Key("clientid").String(); val != "" {
c.mqttClientId = val c.mqttClientId = val
} }
www := cfg.Section("www") www := cfg.Section("www")
if val := www.Key("remote").String(); val != "" { if val := www.Key("remote").String(); val != "" {
c.wwwDir = val c.wwwDir = val
@ -140,9 +170,20 @@ func (c *Config) ReadFile(filename string) error {
if val := www.Key("bind").String(); val != "" { if val := www.Key("bind").String(); val != "" {
c.bindAddr = val c.bindAddr = val
} }
storage := cfg.Section("storage")
if val := storage.Key("database").String(); val != "" {
c.db = val
}
return nil return nil
} }
// handlerHealth - health endpoint - writes a health status message to the client
func handlerHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"ok"}`))
}
func handlerDevices(w http.ResponseWriter, r *http.Request) { func handlerDevices(w http.ResponseWriter, r *http.Request) {
devs := make([]string, 0) devs := make([]string, 0)
for dev := range devices { for dev := range devices {

View file

@ -32,7 +32,6 @@ func (mqtt *MQTTReceiver) Connect(remote string, topic string, username string,
} }
// Add default port to remote if not existing // Add default port to remote if not existing
// TODO: IPv6 handling is not yet implemented
if !strings.Contains(remote, ":") { if !strings.Contains(remote, ":") {
remote += ":1883" remote += ":1883"
} }
@ -55,9 +54,8 @@ func (mqtt *MQTTReceiver) Connect(remote string, topic string, username string,
opts.SetAutoReconnect(true) opts.SetAutoReconnect(true)
c := MQTT.NewClient(opts) c := MQTT.NewClient(opts)
mqtt.mqtt = c mqtt.mqtt = c
// TODO: Add listener also if initial connection fails (and attempt reconnects)
if token := c.Connect(); token.Wait() && token.Error() != nil { if token := c.Connect(); token.Wait() && token.Error() != nil {
log.Println(fmt.Sprintf("Error connecting to MQTT %s - %s", remote, token.Error())) return token.Error()
} else { } else {
if token := c.Subscribe(topic, 0, mqtt.mqttReceive); token.Wait() && token.Error() != nil { if token := c.Subscribe(topic, 0, mqtt.mqttReceive); token.Wait() && token.Error() != nil {
log.Println(fmt.Sprintf("Error subscribing listener %s - %s", remote, token.Error())) log.Println(fmt.Sprintf("Error subscribing listener %s - %s", remote, token.Error()))
@ -97,9 +95,8 @@ func (mqtt *MQTTReceiver) mqttReceive(client MQTT.Client, msg MQTT.Message) {
if mqtt.Received != nil { if mqtt.Received != nil {
mqtt.Received(identifier, loc) mqtt.Received(identifier, loc)
} }
} else {
// Invalid topic. Ignore for now.
} }
// Ignore invalid topic.
} }
func (mqtt *MQTTReceiver) mqttConnectionLost(client MQTT.Client, err error) { func (mqtt *MQTTReceiver) mqttConnectionLost(client MQTT.Client, err error) {
@ -108,7 +105,7 @@ func (mqtt *MQTTReceiver) mqttConnectionLost(client MQTT.Client, err error) {
remotes := options.Servers() remotes := options.Servers()
remote := "" remote := ""
if len(remotes) > 0 { if len(remotes) > 0 {
// TODO:: What to do if there are more? // TODO: What to do if there are more?
remote = remotes[0].String() remote = remotes[0].String()
} }
for { for {

52
cmd/ot-browser/storage.go Normal file
View file

@ -0,0 +1,52 @@
package main
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
)
type Storage struct {
db *sql.DB
}
func (stor *Storage) init() error {
// initialization
sql_table := `
CREATE TABLE IF NOT EXISTS geopoints (
device VARCHAR(256), timestamp INTEGER, lon REAL, lat REAL, alt REAL, acc REAL, velocity REAL, bearing REAL, PRIMARY KEY(owner, timestamp)
);
`
if _, err := stor.db.Exec(sql_table); err != nil {
return err
}
return nil
}
func CreateDatabase(filename string) (Storage, error) {
var stor Storage
var err error
stor.db, err = sql.Open("sqlite3", filename)
if err != nil {
return stor, err
}
return stor, stor.init()
}
func (stor *Storage) InsertLocation(device string, loc Location) error {
tx, err := stor.db.Begin()
if err != nil {
return err
}
sql := "INSERT INTO geopoints (device, timestamp, lon, lat, alt, acc, velocity, bearing) VALUES (?,?,?,?,?,?,?)"
stmt, err := tx.Prepare(sql)
if err != nil {
return err
}
defer stmt.Close()
stmt.Exec(device, loc.Timestamp, loc.Lon, loc.Lat, loc.Alt, loc.Velocity, loc.Bearing)
return tx.Commit()
}

1
go.mod
View file

@ -4,6 +4,7 @@ go 1.15
require ( require (
github.com/eclipse/paho.mqtt.golang v1.3.4 github.com/eclipse/paho.mqtt.golang v1.3.4
github.com/mattn/go-sqlite3 v1.14.22
github.com/smartystreets/goconvey v1.6.4 // indirect github.com/smartystreets/goconvey v1.6.4 // indirect
gopkg.in/ini.v1 v1.62.0 gopkg.in/ini.v1 v1.62.0
) )

2
go.sum
View file

@ -6,6 +6,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=

View file

@ -7,3 +7,6 @@ clientid = ot-browser
[www] [www]
dir = ./www dir = ./www
bind = 127.0.0.1:8090 bind = 127.0.0.1:8090
[storage]
database = otb.db