summaryrefslogtreecommitdiff
path: root/nodeadmin.go
diff options
context:
space:
mode:
authorMarin Ivanov <[email protected]>2025-09-03 20:59:24 +0300
committerMarin Ivanov <[email protected]>2025-09-03 20:59:24 +0300
commit220671ed46e10e4c67741286da493df1d45cdcfc (patch)
tree0711f179f4619959fd937c2929c6247ed8ab124a /nodeadmin.go
Initial release
Diffstat (limited to 'nodeadmin.go')
-rw-r--r--nodeadmin.go392
1 files changed, 392 insertions, 0 deletions
diff --git a/nodeadmin.go b/nodeadmin.go
new file mode 100644
index 0000000..e6b88a5
--- /dev/null
+++ b/nodeadmin.go
@@ -0,0 +1,392 @@
+package main
+
+import (
+ "context"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "strings"
+ "time"
+
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "github.com/meshnet-gophers/meshtastic-go/transport"
+ "google.golang.org/protobuf/proto"
+)
+
+func phoneLastUpdateKey(identifier string) []byte {
+ return []byte("phone-lastupdate-" + identifier)
+}
+
+func (n *Node) setPhoneLastUpdate(identifier string, t time.Time) error {
+ var lastUpdate [8]byte
+ binary.BigEndian.PutUint64(lastUpdate[:], uint64(t.UnixMicro()))
+ return n.db.Set(phoneLastUpdateKey(identifier), lastUpdate[:])
+}
+
+func (n *Node) handleToRadioWantConfigID(identifier string, conn *transport.StreamConn, req *pb.ToRadio_WantConfigId) error {
+ // Send MyInfo
+ err := conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_MyInfo{
+ MyInfo: &pb.MyNodeInfo{
+ MyNodeNum: n.nodeID,
+ RebootCount: 0, // use db nonce key?
+ // TODO: Track this as a const
+ MinAppVersion: MinAppVersion,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Metadata
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Metadata{
+ Metadata: n.getDeviceMetadata(),
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send our node
+ nodeInfo := n.NodeInfo(n.nodeID, nil)
+ if nodeInfo.User == nil {
+ nodeInfo.User = n.user
+ }
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_NodeInfo{
+ NodeInfo: nodeInfo,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Other Nodes
+ nodes, err := n.db.ListNodeInfo()
+ if err != nil {
+ return fmt.Errorf("listing nodes: %w", err)
+ }
+ for _, ni := range nodes {
+ if ni.Num == n.nodeID {
+ continue
+ }
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_NodeInfo{
+ NodeInfo: ni,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+
+ // Send Channels
+ for i, channel := range n.cfg.Channels.Settings {
+ role := pb.Channel_SECONDARY
+ if i == 0 {
+ role = pb.Channel_PRIMARY
+ }
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Channel{
+ Channel: &pb.Channel{
+ Index: int32(i),
+ Settings: &pb.ChannelSettings{
+ Name: channel.Name,
+ Psk: channel.Psk,
+ },
+ Role: role,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+
+ // Send Config: Device
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Config{
+ Config: &pb.Config{
+ PayloadVariant: &pb.Config_Device{
+ Device: &pb.Config_DeviceConfig{
+ NodeInfoBroadcastSecs: uint32(n.cfg.BroadcastNodeInfoInterval.Seconds()),
+ },
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Config: LoRa
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Config{
+ Config: &pb.Config{
+ PayloadVariant: &pb.Config_Lora{
+ Lora: &pb.Config_LoRaConfig{
+ UsePreset: true,
+ ModemPreset: pb.Config_LoRaConfig_SHORT_FAST,
+ Region: pb.Config_LoRaConfig_EU_868,
+ ConfigOkToMqtt: true,
+ },
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send ConfigComplete to indicate we're done
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_ConfigCompleteId{
+ ConfigCompleteId: req.WantConfigId,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ start := uint64(0)
+ lastPhoneUpdate, _ := n.db.Get(phoneLastUpdateKey(identifier))
+ if len(lastPhoneUpdate) == 8 {
+ start = binary.BigEndian.Uint64(lastPhoneUpdate)
+ } else {
+ start = uint64(time.Now().UnixMicro())
+ }
+ startTime := time.UnixMicro(int64(start))
+ now := time.Now()
+ n.logger.Info("** Replaying FromRadio packets to admin...", "from", startTime, "to", now)
+ c, errc := n.db.ReadFromRadio(startTime, now)
+ for msg := range c {
+ pkt := msg.GetPacket()
+ // process only messages that are broadcast or to us.
+ if pkt == nil || (pkt.To != broadcastID && pkt.To != n.nodeID) {
+ continue
+ }
+ if err := conn.Write(msg); err != nil {
+ return fmt.Errorf("failed to send to admin: %w", err)
+ }
+
+ }
+ err = <-errc
+ n.logger.Info("** Finished WantConfigID", "err", err)
+
+ n.setPhoneLastUpdate(identifier, now)
+ return err
+}
+
+func (n *Node) handleConn(ctx context.Context, underlying io.ReadWriteCloser, identifier string) error {
+ streamConn := transport.NewRadioStreamConn(underlying)
+ n.adminConn[streamConn] = struct{}{}
+ defer func() {
+ delete(n.adminConn, streamConn)
+ if err := streamConn.Close(); err != nil {
+ n.logger.Error("failed to close streamConn", "err", err)
+ }
+ }()
+ go func() {
+ <-ctx.Done()
+ streamConn.Close()
+ }()
+
+ var configured bool
+ var lastPktTime time.Time
+ defer func() {
+ if lastPktTime.IsZero() {
+ return
+ }
+ n.setPhoneLastUpdate(identifier, lastPktTime)
+ }()
+ for {
+ msg := &pb.ToRadio{}
+ if err := streamConn.Read(msg); err != nil {
+ return fmt.Errorf("reading from streamConn: %w", err)
+ }
+ if configured {
+ lastPktTime = time.Now()
+ }
+ n.logger.Info(">> ToRadio", "msg", msg)
+ switch payload := msg.PayloadVariant.(type) {
+ case *pb.ToRadio_Disconnect:
+ // The meshtastic python client sends a Disconnect command and with the TCP implementation, it expects
+ // the radio to close the connection. So we end the read loop here, and return to close the connection.
+ streamConn.Close()
+ return nil
+ case *pb.ToRadio_WantConfigId:
+ if err := n.handleToRadioWantConfigID(identifier, streamConn, payload); err != nil {
+ return fmt.Errorf("handling WantConfigId: %w", err)
+ }
+ configured = true
+ case *pb.ToRadio_Packet:
+ var reply *pb.FromRadio_Packet
+ var sendAck bool = payload.Packet.WantAck
+ var errorCode pb.Routing_Error = pb.Routing_NONE
+ if decoded := payload.Packet.GetDecoded(); decoded != nil {
+ n.logger.Info(">> ToRadio_Packet", "payload", payload)
+ if payload.Packet.To == n.nodeID {
+ switch decoded.Portnum {
+ case pb.PortNum_ADMIN_APP:
+ admin := &pb.AdminMessage{}
+ if err := proto.Unmarshal(decoded.Payload, admin); err != nil {
+ return fmt.Errorf("unmarshalling admin: %w", err)
+ }
+ n.logger.Info(fmt.Sprintf(">> AdminMessage %T:", admin.PayloadVariant), "msg", admin.PayloadVariant)
+
+ switch adminPayload := admin.PayloadVariant.(type) {
+ case *pb.AdminMessage_GetChannelRequest:
+ n.logger.Info(">> AdminMessage.GetChannelRequest", "adminPayload", adminPayload, "packet", payload)
+ chIdx := adminPayload.GetChannelRequest - 1
+ var c *pb.ChannelSettings
+ role := pb.Channel_DISABLED
+ if chIdx > uint32(len(n.cfg.Channels.Settings)) {
+ c = &pb.ChannelSettings{}
+ } else {
+ c = n.cfg.Channels.Settings[chIdx]
+ role = pb.Channel_SECONDARY
+ if chIdx == 0 {
+ role = pb.Channel_PRIMARY
+ }
+ }
+ resp := &pb.AdminMessage{
+ PayloadVariant: &pb.AdminMessage_GetChannelResponse{
+ GetChannelResponse: &pb.Channel{
+ Index: int32(chIdx),
+ Settings: c,
+ Role: role,
+ },
+ },
+ }
+ respBytes, err := proto.Marshal(resp)
+ if err != nil {
+ return fmt.Errorf("marshalling GetChannelResponse: %w", err)
+ }
+ // Send GetChannelResponse
+ reply = &pb.FromRadio_Packet{
+ Packet: &pb.MeshPacket{
+ Id: n.nextPacketID(),
+ From: n.nodeID,
+ To: n.nodeID,
+ PayloadVariant: &pb.MeshPacket_Decoded{
+ Decoded: &pb.Data{
+ Portnum: pb.PortNum_ADMIN_APP,
+ Payload: respBytes,
+ RequestId: payload.Packet.Id,
+ },
+ },
+ },
+ }
+ case *pb.AdminMessage_SetFavoriteNode:
+ n.NodeInfo(adminPayload.SetFavoriteNode, func(ni *pb.NodeInfo) bool {
+ ni.IsFavorite = true
+ return true
+ })
+ case *pb.AdminMessage_RemoveFavoriteNode:
+ n.NodeInfo(adminPayload.RemoveFavoriteNode, func(ni *pb.NodeInfo) bool {
+ ni.IsFavorite = false
+ return true
+ })
+ case *pb.AdminMessage_RemoveByNodenum:
+ n.NodeInfo(adminPayload.RemoveByNodenum, func(ni *pb.NodeInfo) bool {
+ ni.Num = 0 // delete NodeInfo
+ return true
+ })
+ case *pb.AdminMessage_SetTimeOnly:
+ default:
+ errorCode = pb.Routing_BAD_REQUEST
+ n.logger.Warnf(">> !Unhandled AdminMessage of type '%T'", adminPayload)
+ }
+ default:
+ n.logger.Warnf(">> No handling of ToRadio.Packet to us w/ portNum '%s'", decoded.Portnum.String())
+ }
+ } else {
+ defaultWarning := true
+ switch decoded.Portnum {
+ case pb.PortNum_AUDIO_APP, pb.PortNum_NODEINFO_APP, pb.PortNum_TEXT_MESSAGE_APP, pb.PortNum_POSITION_APP, pb.PortNum_TRACEROUTE_APP:
+ defaultWarning = false
+ fallthrough
+ default:
+ if defaultWarning {
+ n.logger.Warnf(">> Default handling of ToRadio.Packet message w/ portNum '%s'", decoded.Portnum.String())
+ }
+ from := payload.Packet.From
+ if from == 0 {
+ from = n.nodeID
+ }
+ pkt := proto.Clone(payload.Packet).(*pb.MeshPacket)
+ pkt.From = from
+ decoded := pkt.GetDecoded()
+ if decoded.Bitfield != nil {
+ *decoded.Bitfield |= (1 << 0) // ok to mqtt
+ } else {
+ bitfield := uint32(1)
+ decoded.Bitfield = &bitfield // ok to mqtt
+ }
+ if err := n.txPacket(pkt); err != nil {
+ return fmt.Errorf("failed to tx packet: %w", err)
+ }
+ }
+ }
+ }
+ if errorCode != pb.Routing_NONE || sendAck {
+ if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: buildRoutingReply(payload.Packet, errorCode)}}); err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+ if reply != nil {
+ if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: reply.Packet}}); err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+ case *pb.ToRadio_Heartbeat:
+ default:
+ n.logger.Warnf(">> !Unhandled ToRadio message of type '%T'", payload)
+ }
+ }
+}
+
+func (n *Node) listenTCP(ctx context.Context) error {
+ l, err := net.Listen("tcp", n.cfg.TCPListenAddr)
+ if err != nil {
+ return fmt.Errorf("listening: %w", err)
+ }
+ defer l.Close()
+ n.logger.Info("** Listening for tcp connections", "addr", n.cfg.TCPListenAddr)
+
+ go func() {
+ <-ctx.Done()
+ l.Close()
+ }()
+ for {
+ c, err := l.Accept()
+ if err != nil {
+ n.logger.Error("!! Failed to accept connection", "err", err)
+ return err
+ }
+ go func(c net.Conn) {
+ defer c.Close()
+ connctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ tcpconn := c.(*net.TCPConn)
+ tcpconn.SetKeepAlive(true)
+ tcpconn.SetKeepAlivePeriod(25 * time.Second)
+ address := tcpconn.RemoteAddr().String()
+ ipString := strings.Split(address, ":")[0]
+ n.logger.Info("** Accepted connection", "addr", address)
+ if err := n.handleConn(connctx, c, ipString); err != nil {
+ if errors.Is(err, io.EOF) {
+ n.logger.Info("** Connection EOF", "addr", address)
+ } else {
+ n.logger.Error("!! Connection error", "err", err)
+ }
+ } else {
+ n.logger.Info("** Connection ended", "addr", address)
+ }
+ }(c)
+ }
+}