diff options
Diffstat (limited to 'node.go')
| -rw-r--r-- | node.go | 598 |
1 files changed, 598 insertions, 0 deletions
@@ -0,0 +1,598 @@ +package main + +import ( + "bufio" + "bytes" + "context" + "crypto/aes" + "crypto/rand" + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" + "io" + "os/exec" + "strconv" + "sync" + "time" + + "github.com/charmbracelet/log" + "github.com/elastic/go-freelru" + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "github.com/meshnet-gophers/meshtastic-go/radio" + "github.com/meshnet-gophers/meshtastic-go/transport" + "github.com/pion/dtls/v3/pkg/crypto/ccm" + "golang.org/x/crypto/curve25519" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/proto" +) + +const ( + // MinAppVersion is the minimum app version supported by the emulated radio. + MinAppVersion = 30200 + ROUTE_MAX_SIZE = 8 +) + +var ( + errNoAdminConnection = errors.New("no admin connection") +) + +// NodeConfig is the configuration for the emulated Radio. +type NodeConfig struct { + // Database + DatabaseDir string + + // Logs directory + LogsDir string + + // Node configuration + // NodeID is the ID of the node. + NodeID meshtastic.NodeID + // LongName is the long name of the node. + LongName string + // ShortName is the short name of the node. + ShortName string + + // MaxHops sets the maximum value for value for HopStart/HopLimit on Tx + MaxHops uint32 + // DefaultHops sets the default value for HopStart/HopLimit on Tx + DefaultHops uint32 + + // Channels is the set of channels the radio will listen and transmit on. + // The first channel in the set is considered the primary channel and is used for broadcasting NodeInfo and Position + Channels *pb.ChannelSet + // BroadcastNodeInfoInterval is the interval at which the radio will broadcast a NodeInfo on the Primary channel. + // The zero value disables broadcasting NodeInfo. + BroadcastNodeInfoInterval time.Duration + // BroadcastPositionInterval is the interval at which the radio will broadcast Position on the Primary channel. + // The zero value disables broadcasting NodeInfo. + BroadcastPositionInterval time.Duration + // PositionLatitudeI is the latitude of the position which will be regularly broadcasted. + // This is in degrees multiplied by 1e7. + PositionLatitudeI int32 + // PositionLongitudeI is the longitude of the position which will be regularly broadcasted. + // This is in degrees multiplied by 1e7. + PositionLongitudeI int32 + // PositionAltitude is the altitude of the position which will be regularly broadcasted. + // This is in meters above MSL. + PositionAltitude int32 + // TCPListenAddr is the address the emulated radio will listen on for TCP connections and offer the Client API over. + TCPListenAddr string + + DeviceMetricsBroadcastInterval time.Duration + DeviceMetricsUPSAddress string + X25519SecretKey []byte + X25519PublicKey []byte +} + +func (c *NodeConfig) validate() error { + if c.DatabaseDir == "" { + return fmt.Errorf("DatabasePath is required") + } + if c.LogsDir == "" { + return fmt.Errorf("LogsDir is required") + } + if c.NodeID == 0 { + return fmt.Errorf("NodeID is required") + } + if c.LongName == "" { + c.LongName = c.NodeID.DefaultLongName() + } + if c.ShortName == "" { + c.ShortName = c.NodeID.DefaultShortName() + } + if c.Channels == nil { + //lint:ignore ST1005 we're referencing an actual field here. + return fmt.Errorf("Channels is required") + } + if len(c.Channels.Settings) == 0 { + return fmt.Errorf("Channels.Settings should be non-empty") + } + return nil +} + +// Node emulates a meshtastic Node, communicating with a meshtastic network via MQTT. +type Node struct { + cfg NodeConfig + db *Database + mesh MeshIf + logger *log.Logger + dl *DataLogger + start time.Time + nodeID uint32 + user *pb.User + chLookup map[byte][]*pb.ChannelSettings + pktLru *freelru.LRU[PacketTruncated, struct{}] + packetID uint32 + mu sync.Mutex + adminConn map[*transport.StreamConn]struct{} +} + +// NewNode creates a new emulated radio. +func NewNode(cfg NodeConfig) (*Node, error) { + if err := cfg.validate(); err != nil { + return nil, fmt.Errorf("validating config: %w", err) + } + db, err := NewDB(cfg.DatabaseDir) + if err != nil { + return nil, err + } + logger := log.WithPrefix("Node") + chLookup := make(map[byte][]*pb.ChannelSettings) + for i, c := range cfg.Channels.Settings { + c.ChannelNum = uint32(i) + ch, err := radio.ChannelHash(c.Name, c.Psk) + if err != nil { + return nil, err + } + chLookup[byte(ch)] = append(chLookup[byte(ch)], c) + logger.Infof("Channel[%d]: Name='%s', Hash=0x%02x", i, c.Name, ch) + } + dl, err := NewDataLogger(cfg.LogsDir) + if err != nil { + return nil, err + } + pktLru, err := freelru.New[PacketTruncated, struct{}](32, hashPacket) + if err != nil { + panic(err) + } + var nonce [4]byte + if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil { + return nil, err + } + adminConn := make(map[*transport.StreamConn]struct{}) + return &Node{ + cfg: cfg, + logger: logger, + db: db, + dl: dl, + start: time.Now(), + packetID: binary.LittleEndian.Uint32(nonce[:]), + chLookup: chLookup, + pktLru: pktLru, + adminConn: adminConn, + }, nil +} + +func (n *Node) Close() error { + return n.db.Close() +} + +// Run starts the radio. It blocks until the context is cancelled. +func (n *Node) Run(ctx context.Context, meshConn MeshIf) error { + n.logger.Infof("** Connecting to Mesh...") + if err := meshConn.Open(); err != nil { + return fmt.Errorf("connecting to modem: %w", err) + } + n.mesh = meshConn + go func() { + <-ctx.Done() + meshConn.Close() + }() + n.logger.Infof("** Connected to Mesh...") + n.nodeID = n.cfg.NodeID.Uint32() + n.NodeInfo(n.nodeID, func(ni *pb.NodeInfo) bool { + var updated bool + n.user = &pb.User{ + Id: n.cfg.NodeID.String(), + LongName: n.cfg.LongName, + ShortName: n.cfg.ShortName, + HwModel: pb.HardwareModel_PRIVATE_HW, + Role: pb.Config_DeviceConfig_CLIENT_MUTE, + IsLicensed: false, + IsUnmessagable: Ptr(false), + PublicKey: n.cfg.X25519PublicKey, + } + if !proto.Equal(ni.User, n.user) { + ni.User = n.user + updated = true + } + if n.cfg.DeviceMetricsBroadcastInterval > 0 { + deviceMetrics, err := n.getDeviceMetrics() + if err == nil { + ni.DeviceMetrics = deviceMetrics + updated = true + } + } + return updated + }) + + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(func() error { + return n.RxLoop(egCtx) + }) + + if n.cfg.BroadcastNodeInfoInterval > 0 { + eg.Go(FuncTicker(n.cfg.BroadcastNodeInfoInterval, egCtx, n.broadcastNodeInfo)) + } + if n.cfg.BroadcastPositionInterval > 0 { + eg.Go(FuncTicker(n.cfg.BroadcastPositionInterval, egCtx, n.broadcastPosition)) + } + if n.cfg.DeviceMetricsBroadcastInterval > 0 { + eg.Go(FuncTicker(n.cfg.DeviceMetricsBroadcastInterval, egCtx, n.broadcastDeviceMetrics)) + } + if n.cfg.TCPListenAddr != "" { + eg.Go(func() error { + return n.listenTCP(egCtx) + }) + } + return eg.Wait() +} + +func (n *Node) NodeInfo(nodeID uint32, updateFunc func(*pb.NodeInfo) bool) *pb.NodeInfo { + if nodeID == 0 { + return nil + } + ni, err := n.db.GetNodeInfo(nodeID) + if err == errNotExists { + ni = &pb.NodeInfo{User: &pb.User{}, Position: &pb.Position{}, DeviceMetrics: &pb.DeviceMetrics{}} + } else if err != nil { + n.logger.Error("!! NodeInfo error", "error", err) + return nil + } + ni.Num = nodeID + var needsUpdate bool + if updateFunc != nil { + needsUpdate = updateFunc(ni) + } else if err == nil { + return ni // no (updatefunc or new node) + } + if !needsUpdate { + return ni + } + ni.LastHeard = uint32(time.Now().Unix()) + n.logger.Debug("** SetNodeInfo", "nodeInfo", ni) + if err = n.db.SetNodeInfo(nodeID, ni); err != nil { + n.logger.Error(err) + } + return ni +} + +func (n *Node) tryDecryptPKC(p *pb.MeshPacket) ([]byte, []byte, error) { + ni, err := n.db.GetNodeInfo(p.From) + if err != nil { + return nil, nil, err + } + if ni.User == nil || ni.User.PublicKey == nil { + return nil, nil, errors.New("invalid user publickey") + } + pk := ni.User.PublicKey + sh, err := curve25519.X25519(n.cfg.X25519SecretKey, pk) + if err != nil { + return nil, nil, err + } + h := sha256.New() + h.Write(sh) + shk := h.Sum(nil) + c, err := aes.NewCipher(shk) + if err != nil { + return nil, nil, err + } + aead, err := ccm.NewCCM(c, 8, 13) + if err != nil { + return nil, nil, err + } + payload := p.GetEncrypted() + if len(payload) <= 4+8 { + return nil, nil, errors.New("too short") + } + extra := binary.LittleEndian.Uint32(payload[len(payload)-4:]) + nonce := buildPkcNonce(p.Id, extra, p.From) + data, err := aead.Open(nil, nonce, payload[:len(payload)-4], nil) + if err != nil { + return nil, nil, err + } + return data, pk, nil +} + +func (n *Node) tryDecryptPSK(p *pb.MeshPacket) ([]byte, *pb.ChannelSettings) { + channelCandidates, ok := n.chLookup[byte(p.Channel)] + if !ok { + return nil, nil // cannot find channel + } + var err error + var plaintext []byte + var channel *pb.ChannelSettings + for _, channel = range channelCandidates { + plaintext, err = radio.XOR( + p.GetEncrypted(), + channel.Psk, + p.Id, + p.From, + ) + if err == nil { + break + } + } + if err != nil { + return nil, nil + } + return plaintext, channel +} + +func (n *Node) nextPacketID() uint32 { + n.mu.Lock() + n.packetID += 1 + n.mu.Unlock() + return n.packetID +} + +func (n *Node) txPortnumMessage(channelID uint32, to uint32, portNum pb.PortNum, message proto.Message) error { + payload, err := proto.Marshal(message) + if err != nil { + return err + } + return n.txPacket(&pb.MeshPacket{ + From: n.nodeID, + To: to, + Channel: channelID, + PayloadVariant: &pb.MeshPacket_Decoded{Decoded: &pb.Data{ + Portnum: portNum, + Payload: payload, + }}, + }) +} + +func (n *Node) txPacket(p *pb.MeshPacket) error { + var plaintext []byte + var decoded *pb.Data + switch payload := p.PayloadVariant.(type) { + case *pb.MeshPacket_Decoded: + var err error + plaintext, err = proto.Marshal(payload.Decoded) + if err != nil { + return fmt.Errorf("marshalling user: %w", err) + } + decoded = payload.Decoded + default: + panic("unexpected MeshPacket payload variant") + } + + if p.To == 0 { + p.To = broadcastID + } + if p.Id == 0 { + p.Id = n.nextPacketID() + } + if p.RelayNode == 0 { + p.RelayNode = p.From + } + if p.RelayNode == 0 { + p.RelayNode = 0xFF + } + if p.HopStart == 0 { + p.HopStart = n.cfg.DefaultHops + p.HopLimit = n.cfg.DefaultHops + } + if p.HopLimit > n.cfg.MaxHops { + p.HopLimit = n.cfg.MaxHops + } + dstNi, _ := n.db.GetNodeInfo(p.To) + if dstNi == nil { + dstNi = &pb.NodeInfo{Num: p.To} + } + + // PKC + if p.PkiEncrypted && p.PublicKey != nil { + // explicit PKC with a key, do nothing + } else if p.From == n.nodeID && decoded.Portnum != pb.PortNum_TRACEROUTE_APP && decoded.Portnum != pb.PortNum_NODEINFO_APP && decoded.Portnum != pb.PortNum_ROUTING_APP && decoded.Portnum != pb.PortNum_POSITION_APP { + // implicit PKC, or explicit PKC without a key + if dstNi.User != nil && dstNi.User.PublicKey != nil { + p.PublicKey = dstNi.User.PublicKey + } + } + // Ensure if PKC is selected, there is a recepient key + if p.PkiEncrypted && p.PublicKey == nil { + return errors.New("missing recepient public key") + } + + // Encrypt payload + buf := bytes.NewBuffer(nil) + var encrChannel string + var channel *pb.ChannelSettings + if p.PkiEncrypted { + channel = pkcChan + encrChannel = pkcChan.Name + p.Channel = 0 + sh, err := curve25519.X25519(n.cfg.X25519SecretKey, p.PublicKey) + if err != nil { + return err + } + h := sha256.New() + h.Write(sh) + shk := h.Sum(nil) + c, err := aes.NewCipher(shk) + if err != nil { + return err + } + aead, err := ccm.NewCCM(c, 8, 13) + if err != nil { + return err + } + var nonceExtra [4]byte + if _, err := rand.Read(nonceExtra[:]); err != nil { + return err + } + nonce := buildPkcNonce(p.Id, binary.LittleEndian.Uint32(nonceExtra[:]), p.From) + ciphertextAndTag := aead.Seal(nil, nonce, plaintext, nil) + buf.Write(ciphertextAndTag) + buf.Write(nonceExtra[:]) + } else { + chIdx := p.Channel + if chIdx >= uint32(len(n.cfg.Channels.Settings)) { + return errors.New("invalid channel index") + } + channel = n.cfg.Channels.Settings[chIdx] + encrChannel = fmt.Sprintf("#%s", channel.Name) + ciphertext, err := radio.XOR( + plaintext, + channel.Psk, + p.Id, + p.From, + ) + if err != nil { + return fmt.Errorf("failed to PSK encrypt: %w", err) + } + buf.Write(ciphertext) + } + b := buf.Bytes() + buf.Reset() + out := proto.Clone(p).(*pb.MeshPacket) + out.PayloadVariant = &pb.MeshPacket_Encrypted{Encrypted: b} + if err := n.mesh.WriteMeshPacket(out); err != nil { + n.logger.Debug("X( MeshPacket", "channel", encrChannel, "packet", p.String(), "out", out.String()) + return err + } + n.dl.LogPacket(p, channel, &pb.NodeInfo{Num: n.nodeID, User: n.user}, dstNi, decoded, decoded.Payload) + pt := PacketTruncated{ + To: p.To, + From: p.From, + ID: p.Id, + } + n.pktLru.Add(pt, struct{}{}) + n.logger.Debug("(( MeshPacket", "channel", encrChannel, "packet", p.String(), "out", out.String()) + return nil +} + +func (n *Node) broadcastNodeInfo(ctx context.Context) error { + n.logger.Info("(( NodeInfo") + return n.txPortnumMessage(0, broadcastID, pb.PortNum_NODEINFO_APP, n.user) +} + +func (n *Node) getPosition() *pb.Position { + return &pb.Position{ + LatitudeI: &n.cfg.PositionLatitudeI, + LongitudeI: &n.cfg.PositionLongitudeI, + Altitude: &n.cfg.PositionAltitude, + Time: uint32(time.Now().Unix()), + LocationSource: pb.Position_LOC_MANUAL, + } +} + +func (n *Node) broadcastPosition(ctx context.Context) error { + n.logger.Info("(( Position") + position := n.getPosition() + n.NodeInfo(n.nodeID, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.Position = position + return true + }) + return n.txPortnumMessage(0, broadcastID, pb.PortNum_POSITION_APP, position) +} + +func (n *Node) getDeviceMetrics() (*pb.DeviceMetrics, error) { + cmdStdout := bytes.NewBuffer(nil) + cmd := exec.Command("upsc", n.cfg.DeviceMetricsUPSAddress) + cmd.Stdout = cmdStdout + err := cmd.Run() + if err != nil { + n.logger.Error("failed to get UPS data", "err", err) + return nil, err + } + stdoutBytes := cmdStdout.Bytes() + stdout := bufio.NewReader(bytes.NewReader(stdoutBytes)) + var batteryLevel uint32 + var voltage float32 + var channelUtilization float32 + var airUtilTx float32 + for { + line, _, err := stdout.ReadLine() + if errors.Is(err, io.EOF) { + break + } + lineValue := bytes.SplitN(line, []byte(": "), 2) + if len(lineValue) != 2 { + continue + } + if bytes.Equal(lineValue[0], []byte("battery.voltage")) { + v, err := strconv.ParseFloat(string(lineValue[1]), 64) + if err != nil { + continue + } + voltage = float32(v) + } + if bytes.Equal(lineValue[0], []byte("battery.charge")) { + charge, err := strconv.ParseUint(string(lineValue[1]), 10, 64) + if err != nil { + continue + } + batteryLevel = uint32(charge) + } + } + if batteryLevel == 100 && voltage > 0 { + batteryLevel = 101 + } + uptime := uint32(time.Since(n.start).Seconds()) + return &pb.DeviceMetrics{ + BatteryLevel: &batteryLevel, + Voltage: &voltage, + ChannelUtilization: &channelUtilization, + AirUtilTx: &airUtilTx, + UptimeSeconds: &uptime, + }, nil +} +func (n *Node) broadcastDeviceMetrics(ctx context.Context) error { + n.logger.Info("(( DeviceMetrics") + deviceMetrics, err := n.getDeviceMetrics() + if err != nil { + return err + } + n.NodeInfo(n.nodeID, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.DeviceMetrics = deviceMetrics + return true + }) + return n.txPortnumMessage(0, broadcastID, pb.PortNum_TELEMETRY_APP, &pb.Telemetry{ + Variant: &pb.Telemetry_DeviceMetrics{ + DeviceMetrics: deviceMetrics, + }, + }) +} + +func (n *Node) getDeviceMetadata() *pb.DeviceMetadata { + return &pb.DeviceMetadata{ + FirmwareVersion: "2.6.0-golang", + DeviceStateVersion: 24, + HwModel: pb.HardwareModel_PRIVATE_HW, + Role: pb.Config_DeviceConfig_CLIENT_MUTE, + HasPKC: true, + HasRemoteHardware: false, + HasWifi: false, + HasBluetooth: false, + HasEthernet: false, + CanShutdown: false, + } +} + +// dispatchMessageToAdmin sends a FromRadio message to all current subscribers to +// the FromRadio. +func (n *Node) dispatchMessageToAdmin(msg *pb.FromRadio) error { + if len(n.adminConn) == 0 { + return errNoAdminConnection + } + for conn := range n.adminConn { + if err := conn.Write(msg); err != nil { + n.logger.Errorf("failed to send to admin: %w", err) + } + } + return nil +} |
