package main import ( "bytes" "context" "encoding/hex" "fmt" "github.com/charmbracelet/log" "github.com/meshnet-gophers/meshtastic-go" pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" "google.golang.org/protobuf/proto" ) func (n *Node) RxLoop(ctx context.Context) error { for { if err := ctx.Err(); err != nil { return err } p, ts, err := n.mesh.ReadMeshPacket() if err != nil { n.logger.Error("Read mesh packet failed", "err", err) return err } if err := n.rxMessage(ts, p); err != nil { n.logger.Error("Processing Radio Message", "error", err) } } } func (n *Node) rxMessage(ts uint64, p *pb.MeshPacket) error { pt := PacketTruncated{To: p.To, From: p.From, ID: p.Id} info := fmt.Sprintf("ts:%d rssi:%-4d snr:%-5.1f", p.RxTime, p.RxRssi, p.RxSnr) header := fmt.Sprintf("from:%08x to:%08x id:%08x ch:%02x next:%02x last:%02x hs:%d hl:%d mq:%d wack:%d", p.From, p.To, p.Id, uint8(p.Channel), p.NextHop, p.RelayNode, p.HopStart, p.HopLimit, bool2Int(p.ViaMqtt), bool2Int(p.WantAck)) payloadstr := fmt.Sprintf("%x", p.GetEncrypted()) var dupe bool if _, dupe = n.pktLru.Get(pt); dupe { n.logger.Debug(") MeshPacket", "info", info, "header", header, "payload", payloadstr) } else { n.logger.Debug(")) MeshPacket", "info", info, "header", header, "payload", payloadstr) n.pktLru.Add(pt, struct{}{}) } if dupe { // XXX: it would need to be changed for the traceroute return nil // skip duplicates } if p.From == n.nodeID { return nil // ourself } if len(p.GetEncrypted()) == 0 { n.logger.Warn(":: Skipping unencrypted or empty packet") return nil // we skip unencrypted packets } var plaintext []byte var pubKey []byte var channel *pb.ChannelSettings if p.To == n.nodeID && p.Channel == 0x00 { var err error if plaintext, pubKey, err = n.tryDecryptPKC(p); err != nil { log.Printf("failed to decrypt PKC message: %s", err) } } if plaintext != nil { channel = pkcChan p.Channel = 0 } else { plaintext, channel = n.tryDecryptPSK(p) if plaintext == nil { return nil // failed decrypt } p.Channel = channel.ChannelNum } data := &pb.Data{} if err := proto.Unmarshal(plaintext, data); err != nil { n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext)) return fmt.Errorf("unmarshalling decrypted data: %w", err) } p.PublicKey = pubKey p.PkiEncrypted = channel == pkcChan p.PayloadVariant = &pb.MeshPacket_Decoded{Decoded: data} msgFromRadio := &pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: p}} source := data.Source if source == 0 { source = p.From } dest := data.Dest if dest == 0 { dest = p.To } toUs := dest == n.nodeID if dest == broadcastID || toUs { n.dispatchMessageToAdmin(msgFromRadio) } n.db.StoreFromRadio(int64(ts), msgFromRadio) n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext), "data", data) src := n.NodeInfo(source, func(ni *pb.NodeInfo) bool { if ni.Num == n.nodeID { return false // it's us } if p.HopLimit > p.HopStart { return false // weird hoplimit value } hopsAway := uint32(p.HopStart - p.HopLimit) if ni.HopsAway != nil && *ni.HopsAway <= hopsAway { return false // skip update if these hops away are longer } ni.HopsAway = &hopsAway return true }) dst := n.NodeInfo(dest, nil) var payload any var reply proto.Message replyPortNum := data.Portnum switch data.Portnum { case pb.PortNum_ADMIN_APP: admin := &pb.AdminMessage{} if err := proto.Unmarshal(data.Payload, admin); err != nil { return fmt.Errorf("unmarshalling admin message: %w", err) } payload = admin switch adminPayload := admin.PayloadVariant.(type) { case *pb.AdminMessage_GetDeviceMetadataRequest: if data.WantResponse && toUs { reply = &pb.AdminMessage{ PayloadVariant: &pb.AdminMessage_GetDeviceMetadataResponse{ GetDeviceMetadataResponse: n.getDeviceMetadata(), }, } } default: n.logger.Warnf("unhandled admin message type: %T", adminPayload) } case pb.PortNum_NODEINFO_APP: user := &pb.User{} if err := proto.Unmarshal(data.Payload, user); err != nil { return fmt.Errorf("unmarshalling user: %w", err) } payload = user n.logger.Info(")) NodeInfo", "user", user) n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { if nodeInfo.User != nil { if len(nodeInfo.User.PublicKey) > 0 && !bytes.Equal(nodeInfo.User.PublicKey, user.PublicKey) { n.logger.Error("NodeInfo w/ different key", "nodeInfo", nodeInfo, "new", user) return false // do not update } } nodeInfo.User = user return true }) if data.WantResponse && toUs { reply = n.user } case pb.PortNum_TEXT_MESSAGE_APP: n.logger.Info(")) TextMessage", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "text", string(data.Payload)) case pb.PortNum_ROUTING_APP: routing := &pb.Routing{} if err := proto.Unmarshal(data.Payload, routing); err != nil { return fmt.Errorf("unmarshalling routingPayload: %w", err) } payload = routing n.logger.Info(")) Routing", "routing", routing) case pb.PortNum_TRACEROUTE_APP: routeDiscovery := &pb.RouteDiscovery{} if err := proto.Unmarshal(data.Payload, routeDiscovery); err != nil { return fmt.Errorf("unmarshalling traceroute payload: %w", err) } payload = routeDiscovery n.logger.Info(")) RouteDiscovery", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "payload", routeDiscovery) if !toUs { break } towardsDestination := data.ReplyId == 0 var route *[]uint32 var snr *[]int32 if towardsDestination { route = &routeDiscovery.Route snr = &routeDiscovery.SnrTowards } else { route = &routeDiscovery.RouteBack snr = &routeDiscovery.SnrBack } // Only insert unknown hops if hop_start is valid if p.HopStart != 0 && p.HopLimit <= p.HopStart { var hopsTaken uint8 = uint8(p.HopStart - p.HopLimit) diff := int(hopsTaken) - len(*route) for i := 0; i < diff; i++ { if len(*route) < ROUTE_MAX_SIZE { *route = append(*route, broadcastID) } } // Add unknown SNR values if necessary diff = len(*route) - len(*snr) for i := 0; i < diff; i++ { if len(*snr) < ROUTE_MAX_SIZE { *snr = append(*snr, 0xFF) } } } if len(*snr) < ROUTE_MAX_SIZE { *snr = append(*snr, int32(p.RxSnr*4)) } // We don't need to add route, if we are the dest // // Length of route array can normally not be exceeded due to the max. hop_limit of 7 // if *route_count < ROUTE_SIZE { // route[*route_count] = myNodeInfo.my_node_num // *route_count += 1 // } else { // LOG_WARN("Route exceeded maximum hop limit!") // Are you bridging networks? // } reply = routeDiscovery case pb.PortNum_POSITION_APP: position := &pb.Position{} if err := proto.Unmarshal(data.Payload, position); err != nil { return fmt.Errorf("unmarshalling positionPayload: %w", err) } payload = position n.logger.Info(")) Position", "position", position) if data.WantResponse && toUs { reply = n.getPosition() } else { n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { nodeInfo.Position = position return true }) } case pb.PortNum_TELEMETRY_APP: telemetry := &pb.Telemetry{} if err := proto.Unmarshal(data.Payload, telemetry); err != nil { return fmt.Errorf("unmarshalling telemetryPayload: %w", err) } payload = telemetry n.logger.Info(")) Telemetry deviceMetrics", "telemetry", telemetry) if data.WantResponse && toUs { metrics, err := n.getDeviceMetrics() if err != nil { break } reply = metrics } else { deviceMetrics := telemetry.GetDeviceMetrics() if deviceMetrics == nil { break } n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { nodeInfo.DeviceMetrics = deviceMetrics return true }) } default: n.logger.Debug(")) !Unhandled app payload", "data", data) } n.dl.LogPacket(p, channel, src, dst, data, payload) if toUs && p.WantAck { err := n.txPacket(buildRoutingReply(&pb.MeshPacket{ From: p.From, To: p.To, HopStart: uint32(p.HopStart), Channel: channel.ChannelNum, Id: p.Id, PkiEncrypted: msgFromRadio.GetPacket().PkiEncrypted, PublicKey: msgFromRadio.GetPacket().PublicKey, }, pb.Routing_NONE)) if err != nil { n.logger.Warn("failed to send text ack", "err", err) } } if reply != nil { err := n.txPortnumMessage(channel.ChannelNum, source, replyPortNum, reply) if err != nil { n.logger.Error("Failed to reply", "err", err, "reply", reply) } else { n.dl.LogPacket(p, channel, dst, src, &pb.Data{Portnum: replyPortNum}, payload) } } return nil }