Refacor Node

Refactor the Node structure to allow multiple measurements.
This commit is contained in:
Felix Niederwanger 2023-02-07 16:56:22 +01:00
parent 8226f622fa
commit 0ca9ee0dba
Signed by: phoenix
GPG key ID: 31860289A704FB3C

View file

@ -28,12 +28,23 @@ type Config struct {
Verbose bool
}
type Node struct {
ID int `json:"id"` // NodeID
NodeName string `json:"name"` // Name of the measurement node
Measurements map[string]Measurement `json:"measurements"` // Stored measurements of this node
}
func (n *Node) setMeasurement(mst Measurement) {
n.Measurements[mst.Name] = mst
}
type Measurement struct {
Node int `json:"id"` // NodeID
NodeName string `json:"name"` // Name of the measurement node
Timestamp int64 `json:"tsp"` // Timestamp when the measurement was taken
Measurement string `json:"measurement"` // Measurement name
Values map[string]float32 `json:"values"` // Measurement values
Node int `json:"id"` // NodeID
NodeName string `json:"name"` // Name of the measurement node
Timestamp int64 `json:"tsp"` // Timestamp when the measurement was taken
Name string `json:"measurement"` // Measurement name
Values map[string]float32 `json:"values"` // Measurement values
Tags map[string]string `json:"tags"` // Optional tags for this measurement
}
func (mst *Measurement) json() string {
@ -89,14 +100,14 @@ func (c *Config) loadIni(filename string) error {
var config Config
var influx InfluxDB
var nodes map[int]Measurement
var nodes map[int]Node
func receiveMeasurement(msg mqtt.Message) (Measurement, error) {
var mst Measurement
payload := msg.Payload()
mst.Timestamp = time.Now().Unix()
mst.Measurement = "meteo"
mst.Name = "meteo"
// Parse node ID from topic, if present
topic := msg.Topic()
@ -138,30 +149,49 @@ func receiveMeasurement(msg mqtt.Message) (Measurement, error) {
return mst, nil
}
// Callback when a mqtt message has been received
func received(msg mqtt.Message) {
mst, err := receiveMeasurement(msg)
if err != nil {
fmt.Fprintf(os.Stderr, "error receiving measurement: %s\n", err)
return
}
nodes[mst.Node] = mst
err = processMeasurement(mst)
if err != nil {
fmt.Fprintf(os.Stderr, "%s\n", err)
return
}
if config.Verbose {
fmt.Println(mst.json())
}
}
func processMeasurement(mst Measurement) error {
// Update node
if node, ok := nodes[mst.Node]; ok {
node.setMeasurement(mst)
nodes[mst.Node] = node
} else {
// Create new node
node := Node{ID: mst.Node, NodeName: mst.NodeName}
node.setMeasurement(mst)
nodes[mst.Node] = node
}
tags := map[string]string{"node": fmt.Sprintf("%d", mst.Node)}
// Write measurement as a whole
// Write each value as own measurement
for k, v := range mst.Tags {
tags[k] = v
}
// Write the whole measurement at once
fields := make(map[string]interface{}, 0)
for k, v := range mst.Values {
fields[k] = v
}
if err := influx.Write(config.InfluxOrg, config.InfluxBucket, mst.Measurement, tags, fields); err != nil {
fmt.Fprintf(os.Stderr, "cannot write to influxdb: %s\n", err)
return
}
if config.Verbose {
fmt.Println(mst.json())
if err := influx.Write(config.InfluxOrg, config.InfluxBucket, mst.Name, tags, fields); err != nil {
return fmt.Errorf("cannot write to influxdb: %s", err)
}
return nil
}
// awaits SIGINT or SIGTERM
@ -204,7 +234,7 @@ func fileExists(filename string) bool {
func main() {
var err error
nodes = make(map[int]Measurement)
nodes = make(map[int]Node)
// Default settings
config.MqttHost = "127.0.0.1"
@ -339,7 +369,7 @@ func main() {
http.HandleFunc("/api", handlerAPIDefault)
http.HandleFunc("/api/v1/health", handlerHealth)
http.HandleFunc("/api/v1/nodes", handlerNodes)
http.HandleFunc("/api/v1/measurements", handlerMeasurements)
//http.HandleFunc("/api/v1/$node/measurements", handlerMeasurements)
// Listen on http server
go func() {
@ -381,24 +411,15 @@ func handlerAPIDefault(w http.ResponseWriter, r *http.Request) {
w.Write(content)
}
func handlerMeasurements(w http.ResponseWriter, r *http.Request) {
// Post array of current measurements
mst := make([]Measurement, 0)
for _, m := range nodes {
mst = append(mst, m)
}
replyJson(w, mst)
}
func handlerNodes(w http.ResponseWriter, r *http.Request) {
// Post array of current nodes
// Post array of current nodes without measurements
type Node struct {
ID int `json:"id"`
Name string `json:"name"`
}
ret := make([]Node, 0)
for _, m := range nodes {
ret = append(ret, Node{ID: m.Node, Name: m.NodeName})
ret = append(ret, Node{ID: m.ID, Name: m.NodeName})
}
replyJson(w, ret)
}