package main import ( "context" "encoding/binary" "errors" "fmt" "io" "net" "strings" "time" pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" "github.com/meshnet-gophers/meshtastic-go/transport" "google.golang.org/protobuf/proto" ) func phoneLastUpdateKey(identifier string) []byte { return []byte("phone-lastupdate-" + identifier) } func (n *Node) setPhoneLastUpdate(identifier string, t time.Time) error { var lastUpdate [8]byte binary.BigEndian.PutUint64(lastUpdate[:], uint64(t.UnixMicro())) return n.db.Set(phoneLastUpdateKey(identifier), lastUpdate[:]) } func (n *Node) handleToRadioWantConfigID(identifier string, conn *transport.StreamConn, req *pb.ToRadio_WantConfigId) error { // Send MyInfo err := conn.Write(&pb.FromRadio{ PayloadVariant: &pb.FromRadio_MyInfo{ MyInfo: &pb.MyNodeInfo{ MyNodeNum: n.nodeID, RebootCount: 0, // use db nonce key? // TODO: Track this as a const MinAppVersion: MinAppVersion, }, }, }) if err != nil { return fmt.Errorf("writing to streamConn: %w", err) } // Send Metadata err = conn.Write(&pb.FromRadio{ PayloadVariant: &pb.FromRadio_Metadata{ Metadata: n.getDeviceMetadata(), }, }) if err != nil { return fmt.Errorf("writing to streamConn: %w", err) } // Send our node nodeInfo := n.NodeInfo(n.nodeID, nil) if nodeInfo.User == nil { nodeInfo.User = n.user } err = conn.Write(&pb.FromRadio{ PayloadVariant: &pb.FromRadio_NodeInfo{ NodeInfo: nodeInfo, }, }) if err != nil { return fmt.Errorf("writing to streamConn: %w", err) } // Send Other Nodes nodes, err := n.db.ListNodeInfo() if err != nil { return fmt.Errorf("listing nodes: %w", err) } for _, ni := range nodes { if ni.Num == n.nodeID { continue } err = conn.Write(&pb.FromRadio{ PayloadVariant: &pb.FromRadio_NodeInfo{ NodeInfo: ni, }, }) if err != nil { return fmt.Errorf("writing to streamConn: %w", err) } } // Send Channels for i, channel := range n.cfg.Channels.Settings { role := pb.Channel_SECONDARY if i == 0 { role = pb.Channel_PRIMARY } err = conn.Write(&pb.FromRadio{ PayloadVariant: &pb.FromRadio_Channel{ Channel: &pb.Channel{ Index: int32(i), Settings: &pb.ChannelSettings{ Name: channel.Name, Psk: channel.Psk, }, Role: role, }, }, }) if err != nil { return fmt.Errorf("writing to streamConn: %w", err) } } // Send Config: Device err = conn.Write(&pb.FromRadio{ PayloadVariant: &pb.FromRadio_Config{ Config: &pb.Config{ PayloadVariant: &pb.Config_Device{ Device: &pb.Config_DeviceConfig{ NodeInfoBroadcastSecs: uint32(n.cfg.BroadcastNodeInfoInterval.Seconds()), }, }, }, }, }) if err != nil { return fmt.Errorf("writing to streamConn: %w", err) } // Send Config: LoRa err = conn.Write(&pb.FromRadio{ PayloadVariant: &pb.FromRadio_Config{ Config: &pb.Config{ PayloadVariant: &pb.Config_Lora{ Lora: &pb.Config_LoRaConfig{ UsePreset: true, ModemPreset: pb.Config_LoRaConfig_SHORT_FAST, Region: pb.Config_LoRaConfig_EU_868, ConfigOkToMqtt: true, }, }, }, }, }) if err != nil { return fmt.Errorf("writing to streamConn: %w", err) } // Send ConfigComplete to indicate we're done err = conn.Write(&pb.FromRadio{ PayloadVariant: &pb.FromRadio_ConfigCompleteId{ ConfigCompleteId: req.WantConfigId, }, }) if err != nil { return fmt.Errorf("writing to streamConn: %w", err) } start := uint64(0) lastPhoneUpdate, _ := n.db.Get(phoneLastUpdateKey(identifier)) if len(lastPhoneUpdate) == 8 { start = binary.BigEndian.Uint64(lastPhoneUpdate) } else { start = uint64(time.Now().UnixMicro()) } startTime := time.UnixMicro(int64(start)) now := time.Now() n.logger.Info("** Replaying FromRadio packets to admin...", "from", startTime, "to", now) c, errc := n.db.ReadFromRadio(startTime, now) for msg := range c { pkt := msg.GetPacket() // process only messages that are broadcast or to us. if pkt == nil || (pkt.To != broadcastID && pkt.To != n.nodeID) { continue } if err := conn.Write(msg); err != nil { return fmt.Errorf("failed to send to admin: %w", err) } } err = <-errc n.logger.Info("** Finished WantConfigID", "err", err) n.setPhoneLastUpdate(identifier, now) return err } func (n *Node) handleConn(ctx context.Context, underlying io.ReadWriteCloser, identifier string) error { streamConn := transport.NewRadioStreamConn(underlying) n.adminConn[streamConn] = struct{}{} defer func() { delete(n.adminConn, streamConn) if err := streamConn.Close(); err != nil { n.logger.Error("failed to close streamConn", "err", err) } }() go func() { <-ctx.Done() streamConn.Close() }() var configured bool var lastPktTime time.Time defer func() { if lastPktTime.IsZero() { return } n.setPhoneLastUpdate(identifier, lastPktTime) }() for { msg := &pb.ToRadio{} if err := streamConn.Read(msg); err != nil { return fmt.Errorf("reading from streamConn: %w", err) } if configured { lastPktTime = time.Now() } n.logger.Info(">> ToRadio", "msg", msg) switch payload := msg.PayloadVariant.(type) { case *pb.ToRadio_Disconnect: // The meshtastic python client sends a Disconnect command and with the TCP implementation, it expects // the radio to close the connection. So we end the read loop here, and return to close the connection. streamConn.Close() return nil case *pb.ToRadio_WantConfigId: if err := n.handleToRadioWantConfigID(identifier, streamConn, payload); err != nil { return fmt.Errorf("handling WantConfigId: %w", err) } configured = true case *pb.ToRadio_Packet: var reply *pb.FromRadio_Packet var sendAck bool = payload.Packet.WantAck var errorCode pb.Routing_Error = pb.Routing_NONE if decoded := payload.Packet.GetDecoded(); decoded != nil { n.logger.Info(">> ToRadio_Packet", "payload", payload) if payload.Packet.To == n.nodeID { switch decoded.Portnum { case pb.PortNum_ADMIN_APP: admin := &pb.AdminMessage{} if err := proto.Unmarshal(decoded.Payload, admin); err != nil { return fmt.Errorf("unmarshalling admin: %w", err) } n.logger.Info(fmt.Sprintf(">> AdminMessage %T:", admin.PayloadVariant), "msg", admin.PayloadVariant) switch adminPayload := admin.PayloadVariant.(type) { case *pb.AdminMessage_GetChannelRequest: n.logger.Info(">> AdminMessage.GetChannelRequest", "adminPayload", adminPayload, "packet", payload) chIdx := adminPayload.GetChannelRequest - 1 var c *pb.ChannelSettings role := pb.Channel_DISABLED if chIdx > uint32(len(n.cfg.Channels.Settings)) { c = &pb.ChannelSettings{} } else { c = n.cfg.Channels.Settings[chIdx] role = pb.Channel_SECONDARY if chIdx == 0 { role = pb.Channel_PRIMARY } } resp := &pb.AdminMessage{ PayloadVariant: &pb.AdminMessage_GetChannelResponse{ GetChannelResponse: &pb.Channel{ Index: int32(chIdx), Settings: c, Role: role, }, }, } respBytes, err := proto.Marshal(resp) if err != nil { return fmt.Errorf("marshalling GetChannelResponse: %w", err) } // Send GetChannelResponse reply = &pb.FromRadio_Packet{ Packet: &pb.MeshPacket{ Id: n.nextPacketID(), From: n.nodeID, To: n.nodeID, PayloadVariant: &pb.MeshPacket_Decoded{ Decoded: &pb.Data{ Portnum: pb.PortNum_ADMIN_APP, Payload: respBytes, RequestId: payload.Packet.Id, }, }, }, } case *pb.AdminMessage_SetFavoriteNode: n.NodeInfo(adminPayload.SetFavoriteNode, func(ni *pb.NodeInfo) bool { ni.IsFavorite = true return true }) case *pb.AdminMessage_RemoveFavoriteNode: n.NodeInfo(adminPayload.RemoveFavoriteNode, func(ni *pb.NodeInfo) bool { ni.IsFavorite = false return true }) case *pb.AdminMessage_RemoveByNodenum: n.NodeInfo(adminPayload.RemoveByNodenum, func(ni *pb.NodeInfo) bool { ni.Num = 0 // delete NodeInfo return true }) case *pb.AdminMessage_SetTimeOnly: default: errorCode = pb.Routing_BAD_REQUEST n.logger.Warnf(">> !Unhandled AdminMessage of type '%T'", adminPayload) } default: n.logger.Warnf(">> No handling of ToRadio.Packet to us w/ portNum '%s'", decoded.Portnum.String()) } } else { defaultWarning := true switch decoded.Portnum { case pb.PortNum_AUDIO_APP, pb.PortNum_NODEINFO_APP, pb.PortNum_TEXT_MESSAGE_APP, pb.PortNum_POSITION_APP, pb.PortNum_TRACEROUTE_APP: defaultWarning = false fallthrough default: if defaultWarning { n.logger.Warnf(">> Default handling of ToRadio.Packet message w/ portNum '%s'", decoded.Portnum.String()) } from := payload.Packet.From if from == 0 { from = n.nodeID } pkt := proto.Clone(payload.Packet).(*pb.MeshPacket) pkt.From = from decoded := pkt.GetDecoded() if decoded.Bitfield != nil { *decoded.Bitfield |= (1 << 0) // ok to mqtt } else { bitfield := uint32(1) decoded.Bitfield = &bitfield // ok to mqtt } if err := n.txPacket(pkt); err != nil { return fmt.Errorf("failed to tx packet: %w", err) } } } } if errorCode != pb.Routing_NONE || sendAck { if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: buildRoutingReply(payload.Packet, errorCode)}}); err != nil { return fmt.Errorf("writing to streamConn: %w", err) } } if reply != nil { if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: reply.Packet}}); err != nil { return fmt.Errorf("writing to streamConn: %w", err) } } case *pb.ToRadio_Heartbeat: default: n.logger.Warnf(">> !Unhandled ToRadio message of type '%T'", payload) } } } func (n *Node) listenTCP(ctx context.Context) error { l, err := net.Listen("tcp", n.cfg.TCPListenAddr) if err != nil { return fmt.Errorf("listening: %w", err) } defer l.Close() n.logger.Info("** Listening for tcp connections", "addr", n.cfg.TCPListenAddr) go func() { <-ctx.Done() l.Close() }() for { c, err := l.Accept() if err != nil { n.logger.Error("!! Failed to accept connection", "err", err) return err } go func(c net.Conn) { defer c.Close() connctx, cancel := context.WithCancel(ctx) defer cancel() tcpconn := c.(*net.TCPConn) tcpconn.SetKeepAlive(true) tcpconn.SetKeepAlivePeriod(25 * time.Second) address := tcpconn.RemoteAddr().String() ipString := strings.Split(address, ":")[0] n.logger.Info("** Accepted connection", "addr", address) if err := n.handleConn(connctx, c, ipString); err != nil { if errors.Is(err, io.EOF) { n.logger.Info("** Connection EOF", "addr", address) } else { n.logger.Error("!! Connection error", "err", err) } } else { n.logger.Info("** Connection ended", "addr", address) } }(c) } }