diff options
Diffstat (limited to 'noderxloop.go')
| -rw-r--r-- | noderxloop.go | 280 |
1 files changed, 280 insertions, 0 deletions
diff --git a/noderxloop.go b/noderxloop.go new file mode 100644 index 0000000..f4ed602 --- /dev/null +++ b/noderxloop.go @@ -0,0 +1,280 @@ +package main + +import ( + "bytes" + "context" + "encoding/hex" + "fmt" + + "github.com/charmbracelet/log" + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "google.golang.org/protobuf/proto" +) + +func (n *Node) RxLoop(ctx context.Context) error { + for { + if err := ctx.Err(); err != nil { + return err + } + p, ts, err := n.mesh.ReadMeshPacket() + if err != nil { + n.logger.Error("Read mesh packet failed", "err", err) + return err + } + if err := n.rxMessage(ts, p); err != nil { + n.logger.Error("Processing Radio Message", "error", err) + } + } +} + +func (n *Node) rxMessage(ts uint64, p *pb.MeshPacket) error { + pt := PacketTruncated{To: p.To, From: p.From, ID: p.Id} + info := fmt.Sprintf("ts:%d rssi:%-4d snr:%-5.1f", p.RxTime, p.RxRssi, p.RxSnr) + header := fmt.Sprintf("from:%08x to:%08x id:%08x ch:%02x next:%02x last:%02x hs:%d hl:%d mq:%d wack:%d", p.From, p.To, p.Id, uint8(p.Channel), p.NextHop, p.RelayNode, p.HopStart, p.HopLimit, bool2Int(p.ViaMqtt), bool2Int(p.WantAck)) + payloadstr := fmt.Sprintf("%x", p.GetEncrypted()) + var dupe bool + if _, dupe = n.pktLru.Get(pt); dupe { + n.logger.Debug(") MeshPacket", "info", info, "header", header, "payload", payloadstr) + } else { + n.logger.Debug(")) MeshPacket", "info", info, "header", header, "payload", payloadstr) + n.pktLru.Add(pt, struct{}{}) + } + if dupe { // XXX: it would need to be changed for the traceroute + return nil // skip duplicates + } + if p.From == n.nodeID { + return nil // ourself + } + if len(p.GetEncrypted()) == 0 { + n.logger.Warn(":: Skipping unencrypted or empty packet") + return nil // we skip unencrypted packets + } + var plaintext []byte + var pubKey []byte + var channel *pb.ChannelSettings + if p.To == n.nodeID && p.Channel == 0x00 { + var err error + if plaintext, pubKey, err = n.tryDecryptPKC(p); err != nil { + log.Printf("failed to decrypt PKC message: %s", err) + } + } + if plaintext != nil { + channel = pkcChan + p.Channel = 0 + } else { + plaintext, channel = n.tryDecryptPSK(p) + if plaintext == nil { + return nil // failed decrypt + } + p.Channel = channel.ChannelNum + } + data := &pb.Data{} + if err := proto.Unmarshal(plaintext, data); err != nil { + n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext)) + return fmt.Errorf("unmarshalling decrypted data: %w", err) + } + + p.PublicKey = pubKey + p.PkiEncrypted = channel == pkcChan + p.PayloadVariant = &pb.MeshPacket_Decoded{Decoded: data} + msgFromRadio := &pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: p}} + source := data.Source + if source == 0 { + source = p.From + } + dest := data.Dest + if dest == 0 { + dest = p.To + } + toUs := dest == n.nodeID + if dest == broadcastID || toUs { + n.dispatchMessageToAdmin(msgFromRadio) + } + n.db.StoreFromRadio(int64(ts), msgFromRadio) + n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext), "data", data) + + src := n.NodeInfo(source, func(ni *pb.NodeInfo) bool { + if ni.Num == n.nodeID { + return false // it's us + } + if p.HopLimit > p.HopStart { + return false // weird hoplimit value + } + hopsAway := uint32(p.HopStart - p.HopLimit) + if ni.HopsAway != nil && *ni.HopsAway <= hopsAway { + return false // skip update if these hops away are longer + } + ni.HopsAway = &hopsAway + return true + }) + dst := n.NodeInfo(dest, nil) + + var payload any + var reply proto.Message + replyPortNum := data.Portnum + switch data.Portnum { + case pb.PortNum_ADMIN_APP: + admin := &pb.AdminMessage{} + if err := proto.Unmarshal(data.Payload, admin); err != nil { + return fmt.Errorf("unmarshalling admin message: %w", err) + } + payload = admin + switch adminPayload := admin.PayloadVariant.(type) { + case *pb.AdminMessage_GetDeviceMetadataRequest: + if data.WantResponse && toUs { + reply = &pb.AdminMessage{ + PayloadVariant: &pb.AdminMessage_GetDeviceMetadataResponse{ + GetDeviceMetadataResponse: n.getDeviceMetadata(), + }, + } + } + default: + n.logger.Warnf("unhandled admin message type: %T", adminPayload) + } + case pb.PortNum_NODEINFO_APP: + user := &pb.User{} + if err := proto.Unmarshal(data.Payload, user); err != nil { + return fmt.Errorf("unmarshalling user: %w", err) + } + payload = user + n.logger.Info(")) NodeInfo", "user", user) + n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { + if nodeInfo.User != nil { + if len(nodeInfo.User.PublicKey) > 0 && !bytes.Equal(nodeInfo.User.PublicKey, user.PublicKey) { + n.logger.Error("NodeInfo w/ different key", "nodeInfo", nodeInfo, "new", user) + return false // do not update + } + } + nodeInfo.User = user + return true + }) + if data.WantResponse && toUs { + reply = n.user + } + case pb.PortNum_TEXT_MESSAGE_APP: + n.logger.Info(")) TextMessage", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "text", string(data.Payload)) + case pb.PortNum_ROUTING_APP: + routing := &pb.Routing{} + if err := proto.Unmarshal(data.Payload, routing); err != nil { + return fmt.Errorf("unmarshalling routingPayload: %w", err) + } + payload = routing + n.logger.Info(")) Routing", "routing", routing) + case pb.PortNum_TRACEROUTE_APP: + routeDiscovery := &pb.RouteDiscovery{} + if err := proto.Unmarshal(data.Payload, routeDiscovery); err != nil { + return fmt.Errorf("unmarshalling traceroute payload: %w", err) + } + payload = routeDiscovery + n.logger.Info(")) RouteDiscovery", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "payload", routeDiscovery) + if !toUs { + break + } + towardsDestination := data.ReplyId == 0 + var route *[]uint32 + var snr *[]int32 + if towardsDestination { + route = &routeDiscovery.Route + snr = &routeDiscovery.SnrTowards + } else { + route = &routeDiscovery.RouteBack + snr = &routeDiscovery.SnrBack + } + // Only insert unknown hops if hop_start is valid + if p.HopStart != 0 && p.HopLimit <= p.HopStart { + var hopsTaken uint8 = uint8(p.HopStart - p.HopLimit) + diff := int(hopsTaken) - len(*route) + for i := 0; i < diff; i++ { + if len(*route) < ROUTE_MAX_SIZE { + *route = append(*route, broadcastID) + } + } + // Add unknown SNR values if necessary + diff = len(*route) - len(*snr) + for i := 0; i < diff; i++ { + if len(*snr) < ROUTE_MAX_SIZE { + *snr = append(*snr, 0xFF) + } + } + } + + if len(*snr) < ROUTE_MAX_SIZE { + *snr = append(*snr, int32(p.RxSnr*4)) + } + // We don't need to add route, if we are the dest + // // Length of route array can normally not be exceeded due to the max. hop_limit of 7 + // if *route_count < ROUTE_SIZE { + // route[*route_count] = myNodeInfo.my_node_num + // *route_count += 1 + // } else { + // LOG_WARN("Route exceeded maximum hop limit!") // Are you bridging networks? + // } + reply = routeDiscovery + case pb.PortNum_POSITION_APP: + position := &pb.Position{} + if err := proto.Unmarshal(data.Payload, position); err != nil { + return fmt.Errorf("unmarshalling positionPayload: %w", err) + } + payload = position + n.logger.Info(")) Position", "position", position) + if data.WantResponse && toUs { + reply = n.getPosition() + } else { + n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.Position = position + return true + }) + } + case pb.PortNum_TELEMETRY_APP: + telemetry := &pb.Telemetry{} + if err := proto.Unmarshal(data.Payload, telemetry); err != nil { + return fmt.Errorf("unmarshalling telemetryPayload: %w", err) + } + payload = telemetry + n.logger.Info(")) Telemetry deviceMetrics", "telemetry", telemetry) + if data.WantResponse && toUs { + metrics, err := n.getDeviceMetrics() + if err != nil { + break + } + reply = metrics + } else { + deviceMetrics := telemetry.GetDeviceMetrics() + if deviceMetrics == nil { + break + } + n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.DeviceMetrics = deviceMetrics + return true + }) + } + default: + n.logger.Debug(")) !Unhandled app payload", "data", data) + } + n.dl.LogPacket(p, channel, src, dst, data, payload) + if toUs && p.WantAck { + err := n.txPacket(buildRoutingReply(&pb.MeshPacket{ + From: p.From, + To: p.To, + HopStart: uint32(p.HopStart), + Channel: channel.ChannelNum, + Id: p.Id, + PkiEncrypted: msgFromRadio.GetPacket().PkiEncrypted, + PublicKey: msgFromRadio.GetPacket().PublicKey, + }, pb.Routing_NONE)) + if err != nil { + n.logger.Warn("failed to send text ack", "err", err) + } + } + if reply != nil { + err := n.txPortnumMessage(channel.ChannelNum, source, replyPortNum, reply) + if err != nil { + n.logger.Error("Failed to reply", "err", err, "reply", reply) + } else { + n.dl.LogPacket(p, channel, dst, src, &pb.Data{Portnum: replyPortNum}, payload) + } + } + + return nil +} |
