ot-browser/cmd/ot-browser/receiver.go
Felix Niederwanger 1b1419961b
Fail on mqtt connection failure
Fail to start the service, if the initial mqtt connection fails
2023-12-04 22:03:36 +01:00

161 lines
3.8 KiB
Go

package main
import (
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type MQTTCallback func(id string, loc Location)
type MQTTReceiver struct {
mqtt MQTT.Client
remote string
topic string
connected bool
Received MQTTCallback
}
func (mqtt *MQTTReceiver) Connect(remote string, topic string, username string, password string, clientid string) error {
// Ensure topics ends with '#'
if !strings.HasSuffix(topic, "#") {
if !strings.HasSuffix(topic, "/") {
topic += "/#"
} else {
topic += "#"
}
}
// Add default port to remote if not existing
if !strings.Contains(remote, ":") {
remote += ":1883"
}
mqtt.remote = remote
mqtt.topic = topic
opts := MQTT.NewClientOptions().AddBroker("tcp://" + remote)
if username != "" {
opts.SetUsername(username)
}
if password != "" {
opts.SetPassword(password)
}
if clientid != "" {
opts.SetClientID(clientid)
}
opts.SetKeepAlive(20 * time.Second)
opts.SetConnectionLostHandler(mqtt.mqttConnectionLost)
opts.SetOnConnectHandler(mqtt.mqttConnected)
opts.SetAutoReconnect(true)
c := MQTT.NewClient(opts)
mqtt.mqtt = c
if token := c.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
} else {
if token := c.Subscribe(topic, 0, mqtt.mqttReceive); token.Wait() && token.Error() != nil {
log.Println(fmt.Sprintf("Error subscribing listener %s - %s", remote, token.Error()))
} else {
log.Printf("MQTT %s subscribed to topic '%s'", remote, topic)
}
}
return nil
}
func (mqtt *MQTTReceiver) mqttReceive(client MQTT.Client, msg MQTT.Message) {
payload := string(msg.Payload())
topic := msg.Topic()
if strings.HasPrefix(topic, "owntracks/") {
// Topic: owntracks/USER/DEVICE
// Split topic to user and device
topic = topic[10:]
i := strings.Index(topic, "/")
if i <= 0 {
log.Printf("Illegal owntracks topic received\n")
return
}
username := topic[:i]
devicename := topic[i+1:]
i = strings.Index(devicename, "/")
if i > 0 { // Ignore special device topics (e.g. 'cmd' or 'events')
//fmt.Printf("Ignoring topic %s \n", topic)
return
}
identifier := fmt.Sprintf("%s/%s", username, devicename)
loc, err := parseLocationJson(payload)
if err != nil {
log.Printf("Error processing JSON (MQTT) : %s", err)
return
}
if mqtt.Received != nil {
mqtt.Received(identifier, loc)
}
}
// Ignore invalid topic.
}
func (mqtt *MQTTReceiver) mqttConnectionLost(client MQTT.Client, err error) {
mqtt.connected = false
options := client.OptionsReader()
remotes := options.Servers()
remote := ""
if len(remotes) > 0 {
// TODO: What to do if there are more?
remote = remotes[0].String()
}
for {
fmt.Fprintf(os.Stderr, "Reconnecting to %s after failure: %s\n", remote, err)
token := client.Connect()
if token.Wait() {
break
}
time.Sleep(2 * time.Second)
}
log.Printf("MQTT connection %s established", remote)
}
func (mqtt *MQTTReceiver) mqttConnected(client MQTT.Client) {
log.Printf("MQTT connected: %s", mqtt.remote)
mqtt.connected = true
}
/** Parse json as location */
func parseLocationJson(buf string) (Location, error) {
var dat map[string]interface{}
var loc Location
if err := json.Unmarshal([]byte(buf), &dat); err != nil {
return loc, err
}
// Get contents from json message
/* FORMER check to ensure we get "location" types
if dat["_type"].(string) != "location" {
return loc, errors.New("Illegal json type")
} else {
*/
if dat["lon"] != nil {
loc.Lon = float32(dat["lon"].(float64))
}
if dat["lat"] != nil {
loc.Lat = float32(dat["lat"].(float64))
}
if dat["alt"] != nil {
loc.Alt = float32(dat["alt"].(float64))
}
if dat["acc"] != nil {
loc.Acc = float32(dat["acc"].(float64))
}
if dat["tst"] != nil {
loc.Timestamp = int64(dat["tst"].(float64))
}
if dat["vel"] != nil {
loc.Velocity = float32(dat["vel"].(float64))
}
return loc, nil
}