summaryrefslogtreecommitdiff
path: root/noderxloop.go
diff options
context:
space:
mode:
authorMarin Ivanov <[email protected]>2025-09-03 20:59:24 +0300
committerMarin Ivanov <[email protected]>2025-09-03 20:59:24 +0300
commit220671ed46e10e4c67741286da493df1d45cdcfc (patch)
tree0711f179f4619959fd937c2929c6247ed8ab124a /noderxloop.go
Initial release
Diffstat (limited to 'noderxloop.go')
-rw-r--r--noderxloop.go280
1 files changed, 280 insertions, 0 deletions
diff --git a/noderxloop.go b/noderxloop.go
new file mode 100644
index 0000000..f4ed602
--- /dev/null
+++ b/noderxloop.go
@@ -0,0 +1,280 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "encoding/hex"
+ "fmt"
+
+ "github.com/charmbracelet/log"
+ "github.com/meshnet-gophers/meshtastic-go"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "google.golang.org/protobuf/proto"
+)
+
+func (n *Node) RxLoop(ctx context.Context) error {
+ for {
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ p, ts, err := n.mesh.ReadMeshPacket()
+ if err != nil {
+ n.logger.Error("Read mesh packet failed", "err", err)
+ return err
+ }
+ if err := n.rxMessage(ts, p); err != nil {
+ n.logger.Error("Processing Radio Message", "error", err)
+ }
+ }
+}
+
+func (n *Node) rxMessage(ts uint64, p *pb.MeshPacket) error {
+ pt := PacketTruncated{To: p.To, From: p.From, ID: p.Id}
+ info := fmt.Sprintf("ts:%d rssi:%-4d snr:%-5.1f", p.RxTime, p.RxRssi, p.RxSnr)
+ header := fmt.Sprintf("from:%08x to:%08x id:%08x ch:%02x next:%02x last:%02x hs:%d hl:%d mq:%d wack:%d", p.From, p.To, p.Id, uint8(p.Channel), p.NextHop, p.RelayNode, p.HopStart, p.HopLimit, bool2Int(p.ViaMqtt), bool2Int(p.WantAck))
+ payloadstr := fmt.Sprintf("%x", p.GetEncrypted())
+ var dupe bool
+ if _, dupe = n.pktLru.Get(pt); dupe {
+ n.logger.Debug(") MeshPacket", "info", info, "header", header, "payload", payloadstr)
+ } else {
+ n.logger.Debug(")) MeshPacket", "info", info, "header", header, "payload", payloadstr)
+ n.pktLru.Add(pt, struct{}{})
+ }
+ if dupe { // XXX: it would need to be changed for the traceroute
+ return nil // skip duplicates
+ }
+ if p.From == n.nodeID {
+ return nil // ourself
+ }
+ if len(p.GetEncrypted()) == 0 {
+ n.logger.Warn(":: Skipping unencrypted or empty packet")
+ return nil // we skip unencrypted packets
+ }
+ var plaintext []byte
+ var pubKey []byte
+ var channel *pb.ChannelSettings
+ if p.To == n.nodeID && p.Channel == 0x00 {
+ var err error
+ if plaintext, pubKey, err = n.tryDecryptPKC(p); err != nil {
+ log.Printf("failed to decrypt PKC message: %s", err)
+ }
+ }
+ if plaintext != nil {
+ channel = pkcChan
+ p.Channel = 0
+ } else {
+ plaintext, channel = n.tryDecryptPSK(p)
+ if plaintext == nil {
+ return nil // failed decrypt
+ }
+ p.Channel = channel.ChannelNum
+ }
+ data := &pb.Data{}
+ if err := proto.Unmarshal(plaintext, data); err != nil {
+ n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext))
+ return fmt.Errorf("unmarshalling decrypted data: %w", err)
+ }
+
+ p.PublicKey = pubKey
+ p.PkiEncrypted = channel == pkcChan
+ p.PayloadVariant = &pb.MeshPacket_Decoded{Decoded: data}
+ msgFromRadio := &pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: p}}
+ source := data.Source
+ if source == 0 {
+ source = p.From
+ }
+ dest := data.Dest
+ if dest == 0 {
+ dest = p.To
+ }
+ toUs := dest == n.nodeID
+ if dest == broadcastID || toUs {
+ n.dispatchMessageToAdmin(msgFromRadio)
+ }
+ n.db.StoreFromRadio(int64(ts), msgFromRadio)
+ n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext), "data", data)
+
+ src := n.NodeInfo(source, func(ni *pb.NodeInfo) bool {
+ if ni.Num == n.nodeID {
+ return false // it's us
+ }
+ if p.HopLimit > p.HopStart {
+ return false // weird hoplimit value
+ }
+ hopsAway := uint32(p.HopStart - p.HopLimit)
+ if ni.HopsAway != nil && *ni.HopsAway <= hopsAway {
+ return false // skip update if these hops away are longer
+ }
+ ni.HopsAway = &hopsAway
+ return true
+ })
+ dst := n.NodeInfo(dest, nil)
+
+ var payload any
+ var reply proto.Message
+ replyPortNum := data.Portnum
+ switch data.Portnum {
+ case pb.PortNum_ADMIN_APP:
+ admin := &pb.AdminMessage{}
+ if err := proto.Unmarshal(data.Payload, admin); err != nil {
+ return fmt.Errorf("unmarshalling admin message: %w", err)
+ }
+ payload = admin
+ switch adminPayload := admin.PayloadVariant.(type) {
+ case *pb.AdminMessage_GetDeviceMetadataRequest:
+ if data.WantResponse && toUs {
+ reply = &pb.AdminMessage{
+ PayloadVariant: &pb.AdminMessage_GetDeviceMetadataResponse{
+ GetDeviceMetadataResponse: n.getDeviceMetadata(),
+ },
+ }
+ }
+ default:
+ n.logger.Warnf("unhandled admin message type: %T", adminPayload)
+ }
+ case pb.PortNum_NODEINFO_APP:
+ user := &pb.User{}
+ if err := proto.Unmarshal(data.Payload, user); err != nil {
+ return fmt.Errorf("unmarshalling user: %w", err)
+ }
+ payload = user
+ n.logger.Info(")) NodeInfo", "user", user)
+ n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool {
+ if nodeInfo.User != nil {
+ if len(nodeInfo.User.PublicKey) > 0 && !bytes.Equal(nodeInfo.User.PublicKey, user.PublicKey) {
+ n.logger.Error("NodeInfo w/ different key", "nodeInfo", nodeInfo, "new", user)
+ return false // do not update
+ }
+ }
+ nodeInfo.User = user
+ return true
+ })
+ if data.WantResponse && toUs {
+ reply = n.user
+ }
+ case pb.PortNum_TEXT_MESSAGE_APP:
+ n.logger.Info(")) TextMessage", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "text", string(data.Payload))
+ case pb.PortNum_ROUTING_APP:
+ routing := &pb.Routing{}
+ if err := proto.Unmarshal(data.Payload, routing); err != nil {
+ return fmt.Errorf("unmarshalling routingPayload: %w", err)
+ }
+ payload = routing
+ n.logger.Info(")) Routing", "routing", routing)
+ case pb.PortNum_TRACEROUTE_APP:
+ routeDiscovery := &pb.RouteDiscovery{}
+ if err := proto.Unmarshal(data.Payload, routeDiscovery); err != nil {
+ return fmt.Errorf("unmarshalling traceroute payload: %w", err)
+ }
+ payload = routeDiscovery
+ n.logger.Info(")) RouteDiscovery", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "payload", routeDiscovery)
+ if !toUs {
+ break
+ }
+ towardsDestination := data.ReplyId == 0
+ var route *[]uint32
+ var snr *[]int32
+ if towardsDestination {
+ route = &routeDiscovery.Route
+ snr = &routeDiscovery.SnrTowards
+ } else {
+ route = &routeDiscovery.RouteBack
+ snr = &routeDiscovery.SnrBack
+ }
+ // Only insert unknown hops if hop_start is valid
+ if p.HopStart != 0 && p.HopLimit <= p.HopStart {
+ var hopsTaken uint8 = uint8(p.HopStart - p.HopLimit)
+ diff := int(hopsTaken) - len(*route)
+ for i := 0; i < diff; i++ {
+ if len(*route) < ROUTE_MAX_SIZE {
+ *route = append(*route, broadcastID)
+ }
+ }
+ // Add unknown SNR values if necessary
+ diff = len(*route) - len(*snr)
+ for i := 0; i < diff; i++ {
+ if len(*snr) < ROUTE_MAX_SIZE {
+ *snr = append(*snr, 0xFF)
+ }
+ }
+ }
+
+ if len(*snr) < ROUTE_MAX_SIZE {
+ *snr = append(*snr, int32(p.RxSnr*4))
+ }
+ // We don't need to add route, if we are the dest
+ // // Length of route array can normally not be exceeded due to the max. hop_limit of 7
+ // if *route_count < ROUTE_SIZE {
+ // route[*route_count] = myNodeInfo.my_node_num
+ // *route_count += 1
+ // } else {
+ // LOG_WARN("Route exceeded maximum hop limit!") // Are you bridging networks?
+ // }
+ reply = routeDiscovery
+ case pb.PortNum_POSITION_APP:
+ position := &pb.Position{}
+ if err := proto.Unmarshal(data.Payload, position); err != nil {
+ return fmt.Errorf("unmarshalling positionPayload: %w", err)
+ }
+ payload = position
+ n.logger.Info(")) Position", "position", position)
+ if data.WantResponse && toUs {
+ reply = n.getPosition()
+ } else {
+ n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool {
+ nodeInfo.Position = position
+ return true
+ })
+ }
+ case pb.PortNum_TELEMETRY_APP:
+ telemetry := &pb.Telemetry{}
+ if err := proto.Unmarshal(data.Payload, telemetry); err != nil {
+ return fmt.Errorf("unmarshalling telemetryPayload: %w", err)
+ }
+ payload = telemetry
+ n.logger.Info(")) Telemetry deviceMetrics", "telemetry", telemetry)
+ if data.WantResponse && toUs {
+ metrics, err := n.getDeviceMetrics()
+ if err != nil {
+ break
+ }
+ reply = metrics
+ } else {
+ deviceMetrics := telemetry.GetDeviceMetrics()
+ if deviceMetrics == nil {
+ break
+ }
+ n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool {
+ nodeInfo.DeviceMetrics = deviceMetrics
+ return true
+ })
+ }
+ default:
+ n.logger.Debug(")) !Unhandled app payload", "data", data)
+ }
+ n.dl.LogPacket(p, channel, src, dst, data, payload)
+ if toUs && p.WantAck {
+ err := n.txPacket(buildRoutingReply(&pb.MeshPacket{
+ From: p.From,
+ To: p.To,
+ HopStart: uint32(p.HopStart),
+ Channel: channel.ChannelNum,
+ Id: p.Id,
+ PkiEncrypted: msgFromRadio.GetPacket().PkiEncrypted,
+ PublicKey: msgFromRadio.GetPacket().PublicKey,
+ }, pb.Routing_NONE))
+ if err != nil {
+ n.logger.Warn("failed to send text ack", "err", err)
+ }
+ }
+ if reply != nil {
+ err := n.txPortnumMessage(channel.ChannelNum, source, replyPortNum, reply)
+ if err != nil {
+ n.logger.Error("Failed to reply", "err", err, "reply", reply)
+ } else {
+ n.dl.LogPacket(p, channel, dst, src, &pb.Data{Portnum: replyPortNum}, payload)
+ }
+ }
+
+ return nil
+}