summaryrefslogtreecommitdiff
path: root/node.go
diff options
context:
space:
mode:
Diffstat (limited to 'node.go')
-rw-r--r--node.go598
1 files changed, 598 insertions, 0 deletions
diff --git a/node.go b/node.go
new file mode 100644
index 0000000..a2b8edb
--- /dev/null
+++ b/node.go
@@ -0,0 +1,598 @@
+package main
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "crypto/aes"
+ "crypto/rand"
+ "crypto/sha256"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "os/exec"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/charmbracelet/log"
+ "github.com/elastic/go-freelru"
+ "github.com/meshnet-gophers/meshtastic-go"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "github.com/meshnet-gophers/meshtastic-go/radio"
+ "github.com/meshnet-gophers/meshtastic-go/transport"
+ "github.com/pion/dtls/v3/pkg/crypto/ccm"
+ "golang.org/x/crypto/curve25519"
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/protobuf/proto"
+)
+
+const (
+ // MinAppVersion is the minimum app version supported by the emulated radio.
+ MinAppVersion = 30200
+ ROUTE_MAX_SIZE = 8
+)
+
+var (
+ errNoAdminConnection = errors.New("no admin connection")
+)
+
+// NodeConfig is the configuration for the emulated Radio.
+type NodeConfig struct {
+ // Database
+ DatabaseDir string
+
+ // Logs directory
+ LogsDir string
+
+ // Node configuration
+ // NodeID is the ID of the node.
+ NodeID meshtastic.NodeID
+ // LongName is the long name of the node.
+ LongName string
+ // ShortName is the short name of the node.
+ ShortName string
+
+ // MaxHops sets the maximum value for value for HopStart/HopLimit on Tx
+ MaxHops uint32
+ // DefaultHops sets the default value for HopStart/HopLimit on Tx
+ DefaultHops uint32
+
+ // Channels is the set of channels the radio will listen and transmit on.
+ // The first channel in the set is considered the primary channel and is used for broadcasting NodeInfo and Position
+ Channels *pb.ChannelSet
+ // BroadcastNodeInfoInterval is the interval at which the radio will broadcast a NodeInfo on the Primary channel.
+ // The zero value disables broadcasting NodeInfo.
+ BroadcastNodeInfoInterval time.Duration
+ // BroadcastPositionInterval is the interval at which the radio will broadcast Position on the Primary channel.
+ // The zero value disables broadcasting NodeInfo.
+ BroadcastPositionInterval time.Duration
+ // PositionLatitudeI is the latitude of the position which will be regularly broadcasted.
+ // This is in degrees multiplied by 1e7.
+ PositionLatitudeI int32
+ // PositionLongitudeI is the longitude of the position which will be regularly broadcasted.
+ // This is in degrees multiplied by 1e7.
+ PositionLongitudeI int32
+ // PositionAltitude is the altitude of the position which will be regularly broadcasted.
+ // This is in meters above MSL.
+ PositionAltitude int32
+ // TCPListenAddr is the address the emulated radio will listen on for TCP connections and offer the Client API over.
+ TCPListenAddr string
+
+ DeviceMetricsBroadcastInterval time.Duration
+ DeviceMetricsUPSAddress string
+ X25519SecretKey []byte
+ X25519PublicKey []byte
+}
+
+func (c *NodeConfig) validate() error {
+ if c.DatabaseDir == "" {
+ return fmt.Errorf("DatabasePath is required")
+ }
+ if c.LogsDir == "" {
+ return fmt.Errorf("LogsDir is required")
+ }
+ if c.NodeID == 0 {
+ return fmt.Errorf("NodeID is required")
+ }
+ if c.LongName == "" {
+ c.LongName = c.NodeID.DefaultLongName()
+ }
+ if c.ShortName == "" {
+ c.ShortName = c.NodeID.DefaultShortName()
+ }
+ if c.Channels == nil {
+ //lint:ignore ST1005 we're referencing an actual field here.
+ return fmt.Errorf("Channels is required")
+ }
+ if len(c.Channels.Settings) == 0 {
+ return fmt.Errorf("Channels.Settings should be non-empty")
+ }
+ return nil
+}
+
+// Node emulates a meshtastic Node, communicating with a meshtastic network via MQTT.
+type Node struct {
+ cfg NodeConfig
+ db *Database
+ mesh MeshIf
+ logger *log.Logger
+ dl *DataLogger
+ start time.Time
+ nodeID uint32
+ user *pb.User
+ chLookup map[byte][]*pb.ChannelSettings
+ pktLru *freelru.LRU[PacketTruncated, struct{}]
+ packetID uint32
+ mu sync.Mutex
+ adminConn map[*transport.StreamConn]struct{}
+}
+
+// NewNode creates a new emulated radio.
+func NewNode(cfg NodeConfig) (*Node, error) {
+ if err := cfg.validate(); err != nil {
+ return nil, fmt.Errorf("validating config: %w", err)
+ }
+ db, err := NewDB(cfg.DatabaseDir)
+ if err != nil {
+ return nil, err
+ }
+ logger := log.WithPrefix("Node")
+ chLookup := make(map[byte][]*pb.ChannelSettings)
+ for i, c := range cfg.Channels.Settings {
+ c.ChannelNum = uint32(i)
+ ch, err := radio.ChannelHash(c.Name, c.Psk)
+ if err != nil {
+ return nil, err
+ }
+ chLookup[byte(ch)] = append(chLookup[byte(ch)], c)
+ logger.Infof("Channel[%d]: Name='%s', Hash=0x%02x", i, c.Name, ch)
+ }
+ dl, err := NewDataLogger(cfg.LogsDir)
+ if err != nil {
+ return nil, err
+ }
+ pktLru, err := freelru.New[PacketTruncated, struct{}](32, hashPacket)
+ if err != nil {
+ panic(err)
+ }
+ var nonce [4]byte
+ if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
+ return nil, err
+ }
+ adminConn := make(map[*transport.StreamConn]struct{})
+ return &Node{
+ cfg: cfg,
+ logger: logger,
+ db: db,
+ dl: dl,
+ start: time.Now(),
+ packetID: binary.LittleEndian.Uint32(nonce[:]),
+ chLookup: chLookup,
+ pktLru: pktLru,
+ adminConn: adminConn,
+ }, nil
+}
+
+func (n *Node) Close() error {
+ return n.db.Close()
+}
+
+// Run starts the radio. It blocks until the context is cancelled.
+func (n *Node) Run(ctx context.Context, meshConn MeshIf) error {
+ n.logger.Infof("** Connecting to Mesh...")
+ if err := meshConn.Open(); err != nil {
+ return fmt.Errorf("connecting to modem: %w", err)
+ }
+ n.mesh = meshConn
+ go func() {
+ <-ctx.Done()
+ meshConn.Close()
+ }()
+ n.logger.Infof("** Connected to Mesh...")
+ n.nodeID = n.cfg.NodeID.Uint32()
+ n.NodeInfo(n.nodeID, func(ni *pb.NodeInfo) bool {
+ var updated bool
+ n.user = &pb.User{
+ Id: n.cfg.NodeID.String(),
+ LongName: n.cfg.LongName,
+ ShortName: n.cfg.ShortName,
+ HwModel: pb.HardwareModel_PRIVATE_HW,
+ Role: pb.Config_DeviceConfig_CLIENT_MUTE,
+ IsLicensed: false,
+ IsUnmessagable: Ptr(false),
+ PublicKey: n.cfg.X25519PublicKey,
+ }
+ if !proto.Equal(ni.User, n.user) {
+ ni.User = n.user
+ updated = true
+ }
+ if n.cfg.DeviceMetricsBroadcastInterval > 0 {
+ deviceMetrics, err := n.getDeviceMetrics()
+ if err == nil {
+ ni.DeviceMetrics = deviceMetrics
+ updated = true
+ }
+ }
+ return updated
+ })
+
+ eg, egCtx := errgroup.WithContext(ctx)
+ eg.Go(func() error {
+ return n.RxLoop(egCtx)
+ })
+
+ if n.cfg.BroadcastNodeInfoInterval > 0 {
+ eg.Go(FuncTicker(n.cfg.BroadcastNodeInfoInterval, egCtx, n.broadcastNodeInfo))
+ }
+ if n.cfg.BroadcastPositionInterval > 0 {
+ eg.Go(FuncTicker(n.cfg.BroadcastPositionInterval, egCtx, n.broadcastPosition))
+ }
+ if n.cfg.DeviceMetricsBroadcastInterval > 0 {
+ eg.Go(FuncTicker(n.cfg.DeviceMetricsBroadcastInterval, egCtx, n.broadcastDeviceMetrics))
+ }
+ if n.cfg.TCPListenAddr != "" {
+ eg.Go(func() error {
+ return n.listenTCP(egCtx)
+ })
+ }
+ return eg.Wait()
+}
+
+func (n *Node) NodeInfo(nodeID uint32, updateFunc func(*pb.NodeInfo) bool) *pb.NodeInfo {
+ if nodeID == 0 {
+ return nil
+ }
+ ni, err := n.db.GetNodeInfo(nodeID)
+ if err == errNotExists {
+ ni = &pb.NodeInfo{User: &pb.User{}, Position: &pb.Position{}, DeviceMetrics: &pb.DeviceMetrics{}}
+ } else if err != nil {
+ n.logger.Error("!! NodeInfo error", "error", err)
+ return nil
+ }
+ ni.Num = nodeID
+ var needsUpdate bool
+ if updateFunc != nil {
+ needsUpdate = updateFunc(ni)
+ } else if err == nil {
+ return ni // no (updatefunc or new node)
+ }
+ if !needsUpdate {
+ return ni
+ }
+ ni.LastHeard = uint32(time.Now().Unix())
+ n.logger.Debug("** SetNodeInfo", "nodeInfo", ni)
+ if err = n.db.SetNodeInfo(nodeID, ni); err != nil {
+ n.logger.Error(err)
+ }
+ return ni
+}
+
+func (n *Node) tryDecryptPKC(p *pb.MeshPacket) ([]byte, []byte, error) {
+ ni, err := n.db.GetNodeInfo(p.From)
+ if err != nil {
+ return nil, nil, err
+ }
+ if ni.User == nil || ni.User.PublicKey == nil {
+ return nil, nil, errors.New("invalid user publickey")
+ }
+ pk := ni.User.PublicKey
+ sh, err := curve25519.X25519(n.cfg.X25519SecretKey, pk)
+ if err != nil {
+ return nil, nil, err
+ }
+ h := sha256.New()
+ h.Write(sh)
+ shk := h.Sum(nil)
+ c, err := aes.NewCipher(shk)
+ if err != nil {
+ return nil, nil, err
+ }
+ aead, err := ccm.NewCCM(c, 8, 13)
+ if err != nil {
+ return nil, nil, err
+ }
+ payload := p.GetEncrypted()
+ if len(payload) <= 4+8 {
+ return nil, nil, errors.New("too short")
+ }
+ extra := binary.LittleEndian.Uint32(payload[len(payload)-4:])
+ nonce := buildPkcNonce(p.Id, extra, p.From)
+ data, err := aead.Open(nil, nonce, payload[:len(payload)-4], nil)
+ if err != nil {
+ return nil, nil, err
+ }
+ return data, pk, nil
+}
+
+func (n *Node) tryDecryptPSK(p *pb.MeshPacket) ([]byte, *pb.ChannelSettings) {
+ channelCandidates, ok := n.chLookup[byte(p.Channel)]
+ if !ok {
+ return nil, nil // cannot find channel
+ }
+ var err error
+ var plaintext []byte
+ var channel *pb.ChannelSettings
+ for _, channel = range channelCandidates {
+ plaintext, err = radio.XOR(
+ p.GetEncrypted(),
+ channel.Psk,
+ p.Id,
+ p.From,
+ )
+ if err == nil {
+ break
+ }
+ }
+ if err != nil {
+ return nil, nil
+ }
+ return plaintext, channel
+}
+
+func (n *Node) nextPacketID() uint32 {
+ n.mu.Lock()
+ n.packetID += 1
+ n.mu.Unlock()
+ return n.packetID
+}
+
+func (n *Node) txPortnumMessage(channelID uint32, to uint32, portNum pb.PortNum, message proto.Message) error {
+ payload, err := proto.Marshal(message)
+ if err != nil {
+ return err
+ }
+ return n.txPacket(&pb.MeshPacket{
+ From: n.nodeID,
+ To: to,
+ Channel: channelID,
+ PayloadVariant: &pb.MeshPacket_Decoded{Decoded: &pb.Data{
+ Portnum: portNum,
+ Payload: payload,
+ }},
+ })
+}
+
+func (n *Node) txPacket(p *pb.MeshPacket) error {
+ var plaintext []byte
+ var decoded *pb.Data
+ switch payload := p.PayloadVariant.(type) {
+ case *pb.MeshPacket_Decoded:
+ var err error
+ plaintext, err = proto.Marshal(payload.Decoded)
+ if err != nil {
+ return fmt.Errorf("marshalling user: %w", err)
+ }
+ decoded = payload.Decoded
+ default:
+ panic("unexpected MeshPacket payload variant")
+ }
+
+ if p.To == 0 {
+ p.To = broadcastID
+ }
+ if p.Id == 0 {
+ p.Id = n.nextPacketID()
+ }
+ if p.RelayNode == 0 {
+ p.RelayNode = p.From
+ }
+ if p.RelayNode == 0 {
+ p.RelayNode = 0xFF
+ }
+ if p.HopStart == 0 {
+ p.HopStart = n.cfg.DefaultHops
+ p.HopLimit = n.cfg.DefaultHops
+ }
+ if p.HopLimit > n.cfg.MaxHops {
+ p.HopLimit = n.cfg.MaxHops
+ }
+ dstNi, _ := n.db.GetNodeInfo(p.To)
+ if dstNi == nil {
+ dstNi = &pb.NodeInfo{Num: p.To}
+ }
+
+ // PKC
+ if p.PkiEncrypted && p.PublicKey != nil {
+ // explicit PKC with a key, do nothing
+ } else if p.From == n.nodeID && decoded.Portnum != pb.PortNum_TRACEROUTE_APP && decoded.Portnum != pb.PortNum_NODEINFO_APP && decoded.Portnum != pb.PortNum_ROUTING_APP && decoded.Portnum != pb.PortNum_POSITION_APP {
+ // implicit PKC, or explicit PKC without a key
+ if dstNi.User != nil && dstNi.User.PublicKey != nil {
+ p.PublicKey = dstNi.User.PublicKey
+ }
+ }
+ // Ensure if PKC is selected, there is a recepient key
+ if p.PkiEncrypted && p.PublicKey == nil {
+ return errors.New("missing recepient public key")
+ }
+
+ // Encrypt payload
+ buf := bytes.NewBuffer(nil)
+ var encrChannel string
+ var channel *pb.ChannelSettings
+ if p.PkiEncrypted {
+ channel = pkcChan
+ encrChannel = pkcChan.Name
+ p.Channel = 0
+ sh, err := curve25519.X25519(n.cfg.X25519SecretKey, p.PublicKey)
+ if err != nil {
+ return err
+ }
+ h := sha256.New()
+ h.Write(sh)
+ shk := h.Sum(nil)
+ c, err := aes.NewCipher(shk)
+ if err != nil {
+ return err
+ }
+ aead, err := ccm.NewCCM(c, 8, 13)
+ if err != nil {
+ return err
+ }
+ var nonceExtra [4]byte
+ if _, err := rand.Read(nonceExtra[:]); err != nil {
+ return err
+ }
+ nonce := buildPkcNonce(p.Id, binary.LittleEndian.Uint32(nonceExtra[:]), p.From)
+ ciphertextAndTag := aead.Seal(nil, nonce, plaintext, nil)
+ buf.Write(ciphertextAndTag)
+ buf.Write(nonceExtra[:])
+ } else {
+ chIdx := p.Channel
+ if chIdx >= uint32(len(n.cfg.Channels.Settings)) {
+ return errors.New("invalid channel index")
+ }
+ channel = n.cfg.Channels.Settings[chIdx]
+ encrChannel = fmt.Sprintf("#%s", channel.Name)
+ ciphertext, err := radio.XOR(
+ plaintext,
+ channel.Psk,
+ p.Id,
+ p.From,
+ )
+ if err != nil {
+ return fmt.Errorf("failed to PSK encrypt: %w", err)
+ }
+ buf.Write(ciphertext)
+ }
+ b := buf.Bytes()
+ buf.Reset()
+ out := proto.Clone(p).(*pb.MeshPacket)
+ out.PayloadVariant = &pb.MeshPacket_Encrypted{Encrypted: b}
+ if err := n.mesh.WriteMeshPacket(out); err != nil {
+ n.logger.Debug("X( MeshPacket", "channel", encrChannel, "packet", p.String(), "out", out.String())
+ return err
+ }
+ n.dl.LogPacket(p, channel, &pb.NodeInfo{Num: n.nodeID, User: n.user}, dstNi, decoded, decoded.Payload)
+ pt := PacketTruncated{
+ To: p.To,
+ From: p.From,
+ ID: p.Id,
+ }
+ n.pktLru.Add(pt, struct{}{})
+ n.logger.Debug("(( MeshPacket", "channel", encrChannel, "packet", p.String(), "out", out.String())
+ return nil
+}
+
+func (n *Node) broadcastNodeInfo(ctx context.Context) error {
+ n.logger.Info("(( NodeInfo")
+ return n.txPortnumMessage(0, broadcastID, pb.PortNum_NODEINFO_APP, n.user)
+}
+
+func (n *Node) getPosition() *pb.Position {
+ return &pb.Position{
+ LatitudeI: &n.cfg.PositionLatitudeI,
+ LongitudeI: &n.cfg.PositionLongitudeI,
+ Altitude: &n.cfg.PositionAltitude,
+ Time: uint32(time.Now().Unix()),
+ LocationSource: pb.Position_LOC_MANUAL,
+ }
+}
+
+func (n *Node) broadcastPosition(ctx context.Context) error {
+ n.logger.Info("(( Position")
+ position := n.getPosition()
+ n.NodeInfo(n.nodeID, func(nodeInfo *pb.NodeInfo) bool {
+ nodeInfo.Position = position
+ return true
+ })
+ return n.txPortnumMessage(0, broadcastID, pb.PortNum_POSITION_APP, position)
+}
+
+func (n *Node) getDeviceMetrics() (*pb.DeviceMetrics, error) {
+ cmdStdout := bytes.NewBuffer(nil)
+ cmd := exec.Command("upsc", n.cfg.DeviceMetricsUPSAddress)
+ cmd.Stdout = cmdStdout
+ err := cmd.Run()
+ if err != nil {
+ n.logger.Error("failed to get UPS data", "err", err)
+ return nil, err
+ }
+ stdoutBytes := cmdStdout.Bytes()
+ stdout := bufio.NewReader(bytes.NewReader(stdoutBytes))
+ var batteryLevel uint32
+ var voltage float32
+ var channelUtilization float32
+ var airUtilTx float32
+ for {
+ line, _, err := stdout.ReadLine()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ lineValue := bytes.SplitN(line, []byte(": "), 2)
+ if len(lineValue) != 2 {
+ continue
+ }
+ if bytes.Equal(lineValue[0], []byte("battery.voltage")) {
+ v, err := strconv.ParseFloat(string(lineValue[1]), 64)
+ if err != nil {
+ continue
+ }
+ voltage = float32(v)
+ }
+ if bytes.Equal(lineValue[0], []byte("battery.charge")) {
+ charge, err := strconv.ParseUint(string(lineValue[1]), 10, 64)
+ if err != nil {
+ continue
+ }
+ batteryLevel = uint32(charge)
+ }
+ }
+ if batteryLevel == 100 && voltage > 0 {
+ batteryLevel = 101
+ }
+ uptime := uint32(time.Since(n.start).Seconds())
+ return &pb.DeviceMetrics{
+ BatteryLevel: &batteryLevel,
+ Voltage: &voltage,
+ ChannelUtilization: &channelUtilization,
+ AirUtilTx: &airUtilTx,
+ UptimeSeconds: &uptime,
+ }, nil
+}
+func (n *Node) broadcastDeviceMetrics(ctx context.Context) error {
+ n.logger.Info("(( DeviceMetrics")
+ deviceMetrics, err := n.getDeviceMetrics()
+ if err != nil {
+ return err
+ }
+ n.NodeInfo(n.nodeID, func(nodeInfo *pb.NodeInfo) bool {
+ nodeInfo.DeviceMetrics = deviceMetrics
+ return true
+ })
+ return n.txPortnumMessage(0, broadcastID, pb.PortNum_TELEMETRY_APP, &pb.Telemetry{
+ Variant: &pb.Telemetry_DeviceMetrics{
+ DeviceMetrics: deviceMetrics,
+ },
+ })
+}
+
+func (n *Node) getDeviceMetadata() *pb.DeviceMetadata {
+ return &pb.DeviceMetadata{
+ FirmwareVersion: "2.6.0-golang",
+ DeviceStateVersion: 24,
+ HwModel: pb.HardwareModel_PRIVATE_HW,
+ Role: pb.Config_DeviceConfig_CLIENT_MUTE,
+ HasPKC: true,
+ HasRemoteHardware: false,
+ HasWifi: false,
+ HasBluetooth: false,
+ HasEthernet: false,
+ CanShutdown: false,
+ }
+}
+
+// dispatchMessageToAdmin sends a FromRadio message to all current subscribers to
+// the FromRadio.
+func (n *Node) dispatchMessageToAdmin(msg *pb.FromRadio) error {
+ if len(n.adminConn) == 0 {
+ return errNoAdminConnection
+ }
+ for conn := range n.adminConn {
+ if err := conn.Write(msg); err != nil {
+ n.logger.Errorf("failed to send to admin: %w", err)
+ }
+ }
+ return nil
+}