From b551c87b9671d3dc2c5a4b807f9b252cc7fc38b0 Mon Sep 17 00:00:00 2001 From: Marin Ivanov Date: Thu, 4 Sep 2025 10:23:46 +0300 Subject: Device metrics command and telemetry fix * feat: execute command to retrieve device metrics * bugfix: add requestid to (telemetry) reply data --- config.go | 5 +-- config.json.example | 10 ++++-- main.go | 3 +- node.go | 91 +++++++++++++++++------------------------------------ noderxloop.go | 2 +- 5 files changed, 43 insertions(+), 68 deletions(-) diff --git a/config.go b/config.go index 4698998..fffefdd 100644 --- a/config.go +++ b/config.go @@ -24,8 +24,9 @@ type Config struct { Altitude float64 `json:"altitude"` } `json:"position"` DeviceMetrics struct { - BroadcastInterval float64 `json:"broadcastInterval"` - UPSAddress string `json:"upsAddress"` + BroadcastInterval float64 `json:"broadcastInterval"` + Command string `json:"command"` + CommandArgs []string `json:"commandArgs"` } `json:"deviceMetrics"` KavaModem struct { Address string `json:"address"` diff --git a/config.json.example b/config.json.example index 14ab023..ce7d209 100644 --- a/config.json.example +++ b/config.json.example @@ -13,8 +13,14 @@ "altitude": 2.0 }, "deviceMetrics": { - "broadcastInterval": 300, - "upsAddress": "upsname@localhost" + "broadcastInterval": 0, + "command": "jq", + "commandArgs": [ + "-n", + "--arg", "battery_level", "100", + "--arg", "voltage", "13.37", + ".battery_level=($battery_level|tonumber) | .voltage=($voltage|tonumber)" + ] }, "channels": [ { diff --git a/main.go b/main.go index 710f195..04755ed 100644 --- a/main.go +++ b/main.go @@ -75,7 +75,8 @@ func main() { PositionAltitude: int32(cfg.Position.Altitude), // Device Metrics DeviceMetricsBroadcastInterval: time.Duration(cfg.DeviceMetrics.BroadcastInterval * float64(time.Second)), - DeviceMetricsUPSAddress: cfg.DeviceMetrics.UPSAddress, + DeviceMetricsCommand: cfg.DeviceMetrics.Command, + DeviceMetricsCommandArgs: cfg.DeviceMetrics.CommandArgs, TCPListenAddr: "0.0.0.0:4403", } if len(cfg.X25519Key) == 32 { diff --git a/node.go b/node.go index 0481396..99c5b38 100644 --- a/node.go +++ b/node.go @@ -1,18 +1,17 @@ package main import ( - "bufio" "bytes" "context" "crypto/aes" "crypto/rand" "crypto/sha256" "encoding/binary" + "encoding/json" "errors" "fmt" "io" "os/exec" - "strconv" "sync" "time" @@ -81,7 +80,8 @@ type NodeConfig struct { TCPListenAddr string DeviceMetricsBroadcastInterval time.Duration - DeviceMetricsUPSAddress string + DeviceMetricsCommand string + DeviceMetricsCommandArgs []string X25519SecretKey []byte X25519PublicKey []byte } @@ -338,19 +338,17 @@ func (n *Node) nextPacketID() uint32 { return n.packetID } -func (n *Node) txPortnumMessage(channelID uint32, to uint32, portNum pb.PortNum, message proto.Message) error { - payload, err := proto.Marshal(message) - if err != nil { +func (n *Node) txDataMessage(channelID uint32, to uint32, dataIn *pb.Data, message proto.Message) error { + data := proto.Clone(dataIn).(*pb.Data) + var err error + if data.Payload, err = proto.Marshal(message); err != nil { return err } return n.txPacket(&pb.MeshPacket{ - From: n.nodeID, - To: to, - Channel: channelID, - PayloadVariant: &pb.MeshPacket_Decoded{Decoded: &pb.Data{ - Portnum: portNum, - Payload: payload, - }}, + From: n.nodeID, + To: to, + Channel: channelID, + PayloadVariant: &pb.MeshPacket_Decoded{Decoded: data}, }) } @@ -477,7 +475,7 @@ func (n *Node) txPacket(p *pb.MeshPacket) error { func (n *Node) broadcastNodeInfo(ctx context.Context) error { n.logger.Info("(( NodeInfo") - return n.txPortnumMessage(0, broadcastID, pb.PortNum_NODEINFO_APP, n.user) + return n.txDataMessage(0, broadcastID, &pb.Data{Portnum: pb.PortNum_NODEINFO_APP}, n.user) } func (n *Node) getPosition() *pb.Position { @@ -497,59 +495,30 @@ func (n *Node) broadcastPosition(ctx context.Context) error { nodeInfo.Position = position return true }) - return n.txPortnumMessage(0, broadcastID, pb.PortNum_POSITION_APP, position) + return n.txDataMessage(0, broadcastID, &pb.Data{Portnum: pb.PortNum_POSITION_APP}, position) } func (n *Node) getDeviceMetrics() (*pb.DeviceMetrics, error) { + metrics := &pb.DeviceMetrics{ + BatteryLevel: Ptr[uint32](101), + UptimeSeconds: Ptr(uint32(time.Since(n.start).Seconds())), + } + if n.cfg.DeviceMetricsCommand == "" { + return metrics, nil + } cmdStdout := bytes.NewBuffer(nil) - cmd := exec.Command("upsc", n.cfg.DeviceMetricsUPSAddress) + cmd := exec.Command(n.cfg.DeviceMetricsCommand, n.cfg.DeviceMetricsCommandArgs...) cmd.Stdout = cmdStdout - err := cmd.Run() - if err != nil { - n.logger.Error("failed to get UPS data", "err", err) + if err := cmd.Run(); err != nil { + n.logger.Error("failed to get execute DeviceMetrics command", "err", err) return nil, err } stdoutBytes := cmdStdout.Bytes() - stdout := bufio.NewReader(bytes.NewReader(stdoutBytes)) - var batteryLevel uint32 - var voltage float32 - var channelUtilization float32 - var airUtilTx float32 - for { - line, _, err := stdout.ReadLine() - if errors.Is(err, io.EOF) { - break - } - lineValue := bytes.SplitN(line, []byte(": "), 2) - if len(lineValue) != 2 { - continue - } - if bytes.Equal(lineValue[0], []byte("battery.voltage")) { - v, err := strconv.ParseFloat(string(lineValue[1]), 64) - if err != nil { - continue - } - voltage = float32(v) - } - if bytes.Equal(lineValue[0], []byte("battery.charge")) { - charge, err := strconv.ParseUint(string(lineValue[1]), 10, 64) - if err != nil { - continue - } - batteryLevel = uint32(charge) - } - } - if batteryLevel == 100 && voltage > 0 { - batteryLevel = 101 + if err := json.Unmarshal(stdoutBytes, metrics); err != nil { + n.logger.Error("failed to get decode DeviceMetrics json", "err", err) + return nil, err } - uptime := uint32(time.Since(n.start).Seconds()) - return &pb.DeviceMetrics{ - BatteryLevel: &batteryLevel, - Voltage: &voltage, - ChannelUtilization: &channelUtilization, - AirUtilTx: &airUtilTx, - UptimeSeconds: &uptime, - }, nil + return metrics, nil } func (n *Node) broadcastDeviceMetrics(ctx context.Context) error { n.logger.Info("(( DeviceMetrics") @@ -561,10 +530,8 @@ func (n *Node) broadcastDeviceMetrics(ctx context.Context) error { nodeInfo.DeviceMetrics = deviceMetrics return true }) - return n.txPortnumMessage(0, broadcastID, pb.PortNum_TELEMETRY_APP, &pb.Telemetry{ - Variant: &pb.Telemetry_DeviceMetrics{ - DeviceMetrics: deviceMetrics, - }, + return n.txDataMessage(0, broadcastID, &pb.Data{Portnum: pb.PortNum_TELEMETRY_APP}, &pb.Telemetry{ + Variant: &pb.Telemetry_DeviceMetrics{DeviceMetrics: deviceMetrics}, }) } diff --git a/noderxloop.go b/noderxloop.go index d7e275e..36b1250 100644 --- a/noderxloop.go +++ b/noderxloop.go @@ -269,7 +269,7 @@ func (n *Node) rxMessage(ts uint64, p *pb.MeshPacket) error { } } if reply != nil { - err := n.txPortnumMessage(channel.ChannelNum, source, replyPortNum, reply) + err := n.txDataMessage(channel.ChannelNum, source, &pb.Data{Portnum: replyPortNum, RequestId: p.Id}, reply) if err != nil { n.logger.Error("Failed to reply", "err", err, "reply", reply) } else { -- cgit v1.2.3