From 220671ed46e10e4c67741286da493df1d45cdcfc Mon Sep 17 00:00:00 2001 From: Marin Ivanov Date: Wed, 3 Sep 2025 20:59:24 +0300 Subject: Initial release --- nodeadmin.go | 392 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 392 insertions(+) create mode 100644 nodeadmin.go (limited to 'nodeadmin.go') diff --git a/nodeadmin.go b/nodeadmin.go new file mode 100644 index 0000000..e6b88a5 --- /dev/null +++ b/nodeadmin.go @@ -0,0 +1,392 @@ +package main + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "net" + "strings" + "time" + + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "github.com/meshnet-gophers/meshtastic-go/transport" + "google.golang.org/protobuf/proto" +) + +func phoneLastUpdateKey(identifier string) []byte { + return []byte("phone-lastupdate-" + identifier) +} + +func (n *Node) setPhoneLastUpdate(identifier string, t time.Time) error { + var lastUpdate [8]byte + binary.BigEndian.PutUint64(lastUpdate[:], uint64(t.UnixMicro())) + return n.db.Set(phoneLastUpdateKey(identifier), lastUpdate[:]) +} + +func (n *Node) handleToRadioWantConfigID(identifier string, conn *transport.StreamConn, req *pb.ToRadio_WantConfigId) error { + // Send MyInfo + err := conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_MyInfo{ + MyInfo: &pb.MyNodeInfo{ + MyNodeNum: n.nodeID, + RebootCount: 0, // use db nonce key? + // TODO: Track this as a const + MinAppVersion: MinAppVersion, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Metadata + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Metadata{ + Metadata: n.getDeviceMetadata(), + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send our node + nodeInfo := n.NodeInfo(n.nodeID, nil) + if nodeInfo.User == nil { + nodeInfo.User = n.user + } + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_NodeInfo{ + NodeInfo: nodeInfo, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Other Nodes + nodes, err := n.db.ListNodeInfo() + if err != nil { + return fmt.Errorf("listing nodes: %w", err) + } + for _, ni := range nodes { + if ni.Num == n.nodeID { + continue + } + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_NodeInfo{ + NodeInfo: ni, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + + // Send Channels + for i, channel := range n.cfg.Channels.Settings { + role := pb.Channel_SECONDARY + if i == 0 { + role = pb.Channel_PRIMARY + } + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Channel{ + Channel: &pb.Channel{ + Index: int32(i), + Settings: &pb.ChannelSettings{ + Name: channel.Name, + Psk: channel.Psk, + }, + Role: role, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + + // Send Config: Device + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Config{ + Config: &pb.Config{ + PayloadVariant: &pb.Config_Device{ + Device: &pb.Config_DeviceConfig{ + NodeInfoBroadcastSecs: uint32(n.cfg.BroadcastNodeInfoInterval.Seconds()), + }, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Config: LoRa + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Config{ + Config: &pb.Config{ + PayloadVariant: &pb.Config_Lora{ + Lora: &pb.Config_LoRaConfig{ + UsePreset: true, + ModemPreset: pb.Config_LoRaConfig_SHORT_FAST, + Region: pb.Config_LoRaConfig_EU_868, + ConfigOkToMqtt: true, + }, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send ConfigComplete to indicate we're done + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_ConfigCompleteId{ + ConfigCompleteId: req.WantConfigId, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + start := uint64(0) + lastPhoneUpdate, _ := n.db.Get(phoneLastUpdateKey(identifier)) + if len(lastPhoneUpdate) == 8 { + start = binary.BigEndian.Uint64(lastPhoneUpdate) + } else { + start = uint64(time.Now().UnixMicro()) + } + startTime := time.UnixMicro(int64(start)) + now := time.Now() + n.logger.Info("** Replaying FromRadio packets to admin...", "from", startTime, "to", now) + c, errc := n.db.ReadFromRadio(startTime, now) + for msg := range c { + pkt := msg.GetPacket() + // process only messages that are broadcast or to us. + if pkt == nil || (pkt.To != broadcastID && pkt.To != n.nodeID) { + continue + } + if err := conn.Write(msg); err != nil { + return fmt.Errorf("failed to send to admin: %w", err) + } + + } + err = <-errc + n.logger.Info("** Finished WantConfigID", "err", err) + + n.setPhoneLastUpdate(identifier, now) + return err +} + +func (n *Node) handleConn(ctx context.Context, underlying io.ReadWriteCloser, identifier string) error { + streamConn := transport.NewRadioStreamConn(underlying) + n.adminConn[streamConn] = struct{}{} + defer func() { + delete(n.adminConn, streamConn) + if err := streamConn.Close(); err != nil { + n.logger.Error("failed to close streamConn", "err", err) + } + }() + go func() { + <-ctx.Done() + streamConn.Close() + }() + + var configured bool + var lastPktTime time.Time + defer func() { + if lastPktTime.IsZero() { + return + } + n.setPhoneLastUpdate(identifier, lastPktTime) + }() + for { + msg := &pb.ToRadio{} + if err := streamConn.Read(msg); err != nil { + return fmt.Errorf("reading from streamConn: %w", err) + } + if configured { + lastPktTime = time.Now() + } + n.logger.Info(">> ToRadio", "msg", msg) + switch payload := msg.PayloadVariant.(type) { + case *pb.ToRadio_Disconnect: + // The meshtastic python client sends a Disconnect command and with the TCP implementation, it expects + // the radio to close the connection. So we end the read loop here, and return to close the connection. + streamConn.Close() + return nil + case *pb.ToRadio_WantConfigId: + if err := n.handleToRadioWantConfigID(identifier, streamConn, payload); err != nil { + return fmt.Errorf("handling WantConfigId: %w", err) + } + configured = true + case *pb.ToRadio_Packet: + var reply *pb.FromRadio_Packet + var sendAck bool = payload.Packet.WantAck + var errorCode pb.Routing_Error = pb.Routing_NONE + if decoded := payload.Packet.GetDecoded(); decoded != nil { + n.logger.Info(">> ToRadio_Packet", "payload", payload) + if payload.Packet.To == n.nodeID { + switch decoded.Portnum { + case pb.PortNum_ADMIN_APP: + admin := &pb.AdminMessage{} + if err := proto.Unmarshal(decoded.Payload, admin); err != nil { + return fmt.Errorf("unmarshalling admin: %w", err) + } + n.logger.Info(fmt.Sprintf(">> AdminMessage %T:", admin.PayloadVariant), "msg", admin.PayloadVariant) + + switch adminPayload := admin.PayloadVariant.(type) { + case *pb.AdminMessage_GetChannelRequest: + n.logger.Info(">> AdminMessage.GetChannelRequest", "adminPayload", adminPayload, "packet", payload) + chIdx := adminPayload.GetChannelRequest - 1 + var c *pb.ChannelSettings + role := pb.Channel_DISABLED + if chIdx > uint32(len(n.cfg.Channels.Settings)) { + c = &pb.ChannelSettings{} + } else { + c = n.cfg.Channels.Settings[chIdx] + role = pb.Channel_SECONDARY + if chIdx == 0 { + role = pb.Channel_PRIMARY + } + } + resp := &pb.AdminMessage{ + PayloadVariant: &pb.AdminMessage_GetChannelResponse{ + GetChannelResponse: &pb.Channel{ + Index: int32(chIdx), + Settings: c, + Role: role, + }, + }, + } + respBytes, err := proto.Marshal(resp) + if err != nil { + return fmt.Errorf("marshalling GetChannelResponse: %w", err) + } + // Send GetChannelResponse + reply = &pb.FromRadio_Packet{ + Packet: &pb.MeshPacket{ + Id: n.nextPacketID(), + From: n.nodeID, + To: n.nodeID, + PayloadVariant: &pb.MeshPacket_Decoded{ + Decoded: &pb.Data{ + Portnum: pb.PortNum_ADMIN_APP, + Payload: respBytes, + RequestId: payload.Packet.Id, + }, + }, + }, + } + case *pb.AdminMessage_SetFavoriteNode: + n.NodeInfo(adminPayload.SetFavoriteNode, func(ni *pb.NodeInfo) bool { + ni.IsFavorite = true + return true + }) + case *pb.AdminMessage_RemoveFavoriteNode: + n.NodeInfo(adminPayload.RemoveFavoriteNode, func(ni *pb.NodeInfo) bool { + ni.IsFavorite = false + return true + }) + case *pb.AdminMessage_RemoveByNodenum: + n.NodeInfo(adminPayload.RemoveByNodenum, func(ni *pb.NodeInfo) bool { + ni.Num = 0 // delete NodeInfo + return true + }) + case *pb.AdminMessage_SetTimeOnly: + default: + errorCode = pb.Routing_BAD_REQUEST + n.logger.Warnf(">> !Unhandled AdminMessage of type '%T'", adminPayload) + } + default: + n.logger.Warnf(">> No handling of ToRadio.Packet to us w/ portNum '%s'", decoded.Portnum.String()) + } + } else { + defaultWarning := true + switch decoded.Portnum { + case pb.PortNum_AUDIO_APP, pb.PortNum_NODEINFO_APP, pb.PortNum_TEXT_MESSAGE_APP, pb.PortNum_POSITION_APP, pb.PortNum_TRACEROUTE_APP: + defaultWarning = false + fallthrough + default: + if defaultWarning { + n.logger.Warnf(">> Default handling of ToRadio.Packet message w/ portNum '%s'", decoded.Portnum.String()) + } + from := payload.Packet.From + if from == 0 { + from = n.nodeID + } + pkt := proto.Clone(payload.Packet).(*pb.MeshPacket) + pkt.From = from + decoded := pkt.GetDecoded() + if decoded.Bitfield != nil { + *decoded.Bitfield |= (1 << 0) // ok to mqtt + } else { + bitfield := uint32(1) + decoded.Bitfield = &bitfield // ok to mqtt + } + if err := n.txPacket(pkt); err != nil { + return fmt.Errorf("failed to tx packet: %w", err) + } + } + } + } + if errorCode != pb.Routing_NONE || sendAck { + if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: buildRoutingReply(payload.Packet, errorCode)}}); err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + if reply != nil { + if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: reply.Packet}}); err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + case *pb.ToRadio_Heartbeat: + default: + n.logger.Warnf(">> !Unhandled ToRadio message of type '%T'", payload) + } + } +} + +func (n *Node) listenTCP(ctx context.Context) error { + l, err := net.Listen("tcp", n.cfg.TCPListenAddr) + if err != nil { + return fmt.Errorf("listening: %w", err) + } + defer l.Close() + n.logger.Info("** Listening for tcp connections", "addr", n.cfg.TCPListenAddr) + + go func() { + <-ctx.Done() + l.Close() + }() + for { + c, err := l.Accept() + if err != nil { + n.logger.Error("!! Failed to accept connection", "err", err) + return err + } + go func(c net.Conn) { + defer c.Close() + connctx, cancel := context.WithCancel(ctx) + defer cancel() + tcpconn := c.(*net.TCPConn) + tcpconn.SetKeepAlive(true) + tcpconn.SetKeepAlivePeriod(25 * time.Second) + address := tcpconn.RemoteAddr().String() + ipString := strings.Split(address, ":")[0] + n.logger.Info("** Accepted connection", "addr", address) + if err := n.handleConn(connctx, c, ipString); err != nil { + if errors.Is(err, io.EOF) { + n.logger.Info("** Connection EOF", "addr", address) + } else { + n.logger.Error("!! Connection error", "err", err) + } + } else { + n.logger.Info("** Connection ended", "addr", address) + } + }(c) + } +} -- cgit v1.2.3