161 lines
3.8 KiB
Go
161 lines
3.8 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
MQTT "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
type MQTTCallback func(id string, loc Location)
|
|
|
|
type MQTTReceiver struct {
|
|
mqtt MQTT.Client
|
|
remote string
|
|
topic string
|
|
connected bool
|
|
Received MQTTCallback
|
|
}
|
|
|
|
func (mqtt *MQTTReceiver) Connect(remote string, topic string, username string, password string, clientid string) error {
|
|
// Ensure topics ends with '#'
|
|
if !strings.HasSuffix(topic, "#") {
|
|
if !strings.HasSuffix(topic, "/") {
|
|
topic += "/#"
|
|
} else {
|
|
topic += "#"
|
|
}
|
|
}
|
|
|
|
// Add default port to remote if not existing
|
|
if !strings.Contains(remote, ":") {
|
|
remote += ":1883"
|
|
}
|
|
|
|
mqtt.remote = remote
|
|
mqtt.topic = topic
|
|
opts := MQTT.NewClientOptions().AddBroker("tcp://" + remote)
|
|
if username != "" {
|
|
opts.SetUsername(username)
|
|
}
|
|
if password != "" {
|
|
opts.SetPassword(password)
|
|
}
|
|
if clientid != "" {
|
|
opts.SetClientID(clientid)
|
|
}
|
|
opts.SetKeepAlive(20 * time.Second)
|
|
opts.SetConnectionLostHandler(mqtt.mqttConnectionLost)
|
|
opts.SetOnConnectHandler(mqtt.mqttConnected)
|
|
opts.SetAutoReconnect(true)
|
|
c := MQTT.NewClient(opts)
|
|
mqtt.mqtt = c
|
|
if token := c.Connect(); token.Wait() && token.Error() != nil {
|
|
return token.Error()
|
|
} else {
|
|
if token := c.Subscribe(topic, 0, mqtt.mqttReceive); token.Wait() && token.Error() != nil {
|
|
log.Println(fmt.Sprintf("Error subscribing listener %s - %s", remote, token.Error()))
|
|
} else {
|
|
log.Printf("MQTT %s subscribed to topic '%s'", remote, topic)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (mqtt *MQTTReceiver) mqttReceive(client MQTT.Client, msg MQTT.Message) {
|
|
payload := string(msg.Payload())
|
|
topic := msg.Topic()
|
|
if strings.HasPrefix(topic, "owntracks/") {
|
|
// Topic: owntracks/USER/DEVICE
|
|
// Split topic to user and device
|
|
topic = topic[10:]
|
|
i := strings.Index(topic, "/")
|
|
if i <= 0 {
|
|
log.Printf("Illegal owntracks topic received\n")
|
|
return
|
|
}
|
|
username := topic[:i]
|
|
devicename := topic[i+1:]
|
|
|
|
i = strings.Index(devicename, "/")
|
|
if i > 0 { // Ignore special device topics (e.g. 'cmd' or 'events')
|
|
//fmt.Printf("Ignoring topic %s \n", topic)
|
|
return
|
|
}
|
|
identifier := fmt.Sprintf("%s/%s", username, devicename)
|
|
loc, err := parseLocationJson(payload)
|
|
if err != nil {
|
|
log.Printf("Error processing JSON (MQTT) : %s", err)
|
|
return
|
|
}
|
|
if mqtt.Received != nil {
|
|
mqtt.Received(identifier, loc)
|
|
}
|
|
}
|
|
// Ignore invalid topic.
|
|
}
|
|
|
|
func (mqtt *MQTTReceiver) mqttConnectionLost(client MQTT.Client, err error) {
|
|
mqtt.connected = false
|
|
options := client.OptionsReader()
|
|
remotes := options.Servers()
|
|
remote := ""
|
|
if len(remotes) > 0 {
|
|
// TODO: What to do if there are more?
|
|
remote = remotes[0].String()
|
|
}
|
|
for {
|
|
fmt.Fprintf(os.Stderr, "Reconnecting to %s after failure: %s\n", remote, err)
|
|
token := client.Connect()
|
|
if token.Wait() {
|
|
break
|
|
}
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
log.Printf("MQTT connection %s established", remote)
|
|
}
|
|
|
|
func (mqtt *MQTTReceiver) mqttConnected(client MQTT.Client) {
|
|
log.Printf("MQTT connected: %s", mqtt.remote)
|
|
mqtt.connected = true
|
|
}
|
|
|
|
/** Parse json as location */
|
|
func parseLocationJson(buf string) (Location, error) {
|
|
var dat map[string]interface{}
|
|
var loc Location
|
|
|
|
if err := json.Unmarshal([]byte(buf), &dat); err != nil {
|
|
return loc, err
|
|
}
|
|
// Get contents from json message
|
|
/* FORMER check to ensure we get "location" types
|
|
if dat["_type"].(string) != "location" {
|
|
return loc, errors.New("Illegal json type")
|
|
} else {
|
|
*/
|
|
if dat["lon"] != nil {
|
|
loc.Lon = float32(dat["lon"].(float64))
|
|
}
|
|
if dat["lat"] != nil {
|
|
loc.Lat = float32(dat["lat"].(float64))
|
|
}
|
|
if dat["alt"] != nil {
|
|
loc.Alt = float32(dat["alt"].(float64))
|
|
}
|
|
if dat["acc"] != nil {
|
|
loc.Acc = float32(dat["acc"].(float64))
|
|
}
|
|
if dat["tst"] != nil {
|
|
loc.Timestamp = int64(dat["tst"].(float64))
|
|
}
|
|
if dat["vel"] != nil {
|
|
loc.Velocity = float32(dat["vel"].(float64))
|
|
}
|
|
return loc, nil
|
|
}
|