aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNoah Stride <[email protected]>2024-02-01 00:28:09 +0000
committerGitHub <[email protected]>2024-01-31 14:28:09 -1000
commit463760ccf1cb99f387f5bf99ed83e31772744671 (patch)
tree99feec624d4f04542e0c4aaab037b4e0b937a9f4
parent202889338e29c416e810de888734a2d41c6b1069 (diff)
Rough initial radio emulator (#1)
* Super rough initial subscription to channels and sending node info * Very ugly hacky FromRadio subscription * Add more TODOs :weary: * Add log message before we try to broadcast NodeInfo * Add basic positioning broadcasting * demo sending message from consumer * Use const for BroadcastNodeID * Config validation and defaults * Tidy up examples/TODOs * Tidy up example * Handle position and devicetelemetry updates * Fix dodgy merge * Allow configuring interval for nodeinfo/position seperately * Make broadcasted position configurable * Add MQTTProtoTopic constant rather than modifying topic root * Allow altitude of broadcasted position to be configured * static check ignore
-rw-r--r--emulated/emulated.go391
-rw-r--r--emulated/example/main.go108
-rw-r--r--mqtt/client.go8
-rw-r--r--radio/radio.go12
-rw-r--r--util.go4
5 files changed, 513 insertions, 10 deletions
diff --git a/emulated/emulated.go b/emulated/emulated.go
new file mode 100644
index 0000000..7077ee4
--- /dev/null
+++ b/emulated/emulated.go
@@ -0,0 +1,391 @@
+package emulated
+
+import (
+ pb "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic"
+ "context"
+ "fmt"
+ "github.com/charmbracelet/log"
+ "github.com/crypto-smoke/meshtastic-go"
+ "github.com/crypto-smoke/meshtastic-go/mqtt"
+ "github.com/crypto-smoke/meshtastic-go/radio"
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/protobuf/proto"
+ "sync"
+ "time"
+)
+
+// Config is the configuration for the emulated Radio.
+type Config struct {
+ // Dependencies
+ MQTTClient *mqtt.Client
+
+ // Node configuration
+ // NodeID is the ID of the node.
+ NodeID meshtastic.NodeID
+ // LongName is the long name of the node.
+ LongName string
+ // ShortName is the short name of the node.
+ ShortName string
+ // Channels is the set of channels the radio will listen and transmit on.
+ // The first channel in the set is considered the primary channel and is used for broadcasting NodeInfo and Position
+ Channels *pb.ChannelSet
+ // BroadcastNodeInfoInterval is the interval at which the radio will broadcast a NodeInfo on the Primary channel.
+ // The zero value disables broadcasting NodeInfo.
+ BroadcastNodeInfoInterval time.Duration
+
+ // BroadcastPositionInterval is the interval at which the radio will broadcast Position on the Primary channel.
+ // The zero value disables broadcasting NodeInfo.
+ BroadcastPositionInterval time.Duration
+ // PositionLatitudeI is the latitude of the position which will be regularly broadcasted.
+ // This is in degrees multiplied by 1e7.
+ PositionLatitudeI int32
+ // PositionLongitudeI is the longitude of the position which will be regularly broadcasted.
+ // This is in degrees multiplied by 1e7.
+ PositionLongitudeI int32
+ // PositionAltitude is the altitude of the position which will be regularly broadcasted.
+ // This is in meters above MSL.
+ PositionAltitude int32
+}
+
+func (c *Config) validate() error {
+ if c.MQTTClient == nil {
+ return fmt.Errorf("MQTTClient is required")
+ }
+ if c.NodeID == 0 {
+ return fmt.Errorf("NodeID is required")
+ }
+ if c.LongName == "" {
+ // TODO: Generate from NodeID
+ return fmt.Errorf("LongName is required")
+ }
+ if c.ShortName == "" {
+ // TODO: Generate from NodeID
+ return fmt.Errorf("ShortName is required")
+ }
+ if c.Channels == nil {
+ //lint:ignore ST1005 we're referencing an actual field here.
+ return fmt.Errorf("Channels is required")
+ }
+ if len(c.Channels.Settings) == 0 {
+ return fmt.Errorf("Channels.Settings should be non-empty")
+ }
+ return nil
+}
+
+// Radio emulates a meshtastic Node, communicating with a meshtastic network via MQTT.
+type Radio struct {
+ cfg Config
+ mqtt *mqtt.Client
+ logger *log.Logger
+
+ // TODO: rwmutex?? seperate mutexes??
+ mu sync.Mutex
+ fromRadioSubscribers map[chan<- *pb.FromRadio]struct{}
+ nodeDB map[uint32]*pb.NodeInfo
+ // packetID is incremented and included in each packet sent from the radio.
+ // TODO: Eventually, we should offer an easy way of persisting this so that we can resume from where we left off.
+ packetID uint32
+}
+
+// NewRadio creates a new emulated radio.
+func NewRadio(cfg Config) (*Radio, error) {
+ if err := cfg.validate(); err != nil {
+ return nil, fmt.Errorf("validating config: %w", err)
+ }
+ return &Radio{
+ cfg: cfg,
+ logger: log.With("radio", cfg.NodeID.String()),
+ fromRadioSubscribers: map[chan<- *pb.FromRadio]struct{}{},
+ mqtt: cfg.MQTTClient,
+ nodeDB: map[uint32]*pb.NodeInfo{},
+ }, nil
+}
+
+// Run starts the radio. It blocks until the context is cancelled.
+func (r *Radio) Run(ctx context.Context) error {
+ if err := r.mqtt.Connect(); err != nil {
+ return fmt.Errorf("connecting to mqtt: %w", err)
+ }
+ // TODO: Disconnect??
+
+ // Subscribe to all configured channels
+ for _, ch := range r.cfg.Channels.Settings {
+ r.logger.Debug("subscribing to mqtt for channel", "channel", ch.Name)
+ r.mqtt.Handle(ch.Name, r.handleMQTTMessage)
+ }
+
+ // TODO: Rethink concurrency. Do we want a goroutine servicing ToRadio and one servicing FromRadio?
+
+ eg, egCtx := errgroup.WithContext(ctx)
+ // Spin up goroutine to send NodeInfo every interval
+ if r.cfg.BroadcastNodeInfoInterval > 0 {
+ eg.Go(func() error {
+ ticker := time.NewTicker(r.cfg.BroadcastNodeInfoInterval)
+ defer ticker.Stop()
+ for {
+ if err := r.broadcastNodeInfo(egCtx); err != nil {
+ r.logger.Error("failed to broadcast node info", "err", err)
+ }
+ select {
+ case <-egCtx.Done():
+ return nil
+ case <-ticker.C:
+ }
+ }
+ })
+ }
+ // Spin up goroutine to send Position every interval
+ if r.cfg.BroadcastPositionInterval > 0 {
+ eg.Go(func() error {
+ ticker := time.NewTicker(r.cfg.BroadcastPositionInterval)
+ defer ticker.Stop()
+ for {
+ if err := r.broadcastPosition(egCtx); err != nil {
+ r.logger.Error("failed to broadcast position", "err", err)
+ }
+ select {
+ case <-egCtx.Done():
+ return nil
+ case <-ticker.C:
+ }
+ }
+ })
+ }
+
+ return eg.Wait()
+}
+
+func (r *Radio) handleMQTTMessage(msg mqtt.Message) {
+ // TODO: Determine how "github.com/eclipse/paho.mqtt.golang" handles concurrency. Do we need to dispatch here to
+ // a goroutine which handles incoming messages to unblock this one?
+ if err := r.tryHandleMQTTMessage(msg); err != nil {
+ r.logger.Error("failed to handle incoming mqtt message", "err", err)
+ }
+}
+
+func (r *Radio) updateNodeDB(nodeID uint32, updateFunc func(*pb.NodeInfo)) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ nodeInfo, ok := r.nodeDB[nodeID]
+ if !ok {
+ nodeInfo = &pb.NodeInfo{
+ Num: nodeID,
+ }
+ }
+ updateFunc(nodeInfo)
+ nodeInfo.LastHeard = uint32(time.Now().Unix())
+ r.nodeDB[nodeID] = nodeInfo
+}
+
+func (r *Radio) tryHandleMQTTMessage(msg mqtt.Message) error {
+ serviceEnvelope := &pb.ServiceEnvelope{}
+ if err := proto.Unmarshal(msg.Payload, serviceEnvelope); err != nil {
+ return fmt.Errorf("unmarshalling: %w", err)
+ }
+ meshPacket := serviceEnvelope.Packet
+
+ // TODO: Attempt decryption first before dispatching to subscribers
+ // TODO: This means we move this further below.
+ if err := r.dispatchMessageToFromRadio(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Packet{
+ Packet: meshPacket,
+ },
+ }); err != nil {
+ r.logger.Error("failed to dispatch message to FromRadio subscribers", "err", err)
+ }
+
+ // From now on, we only care about messages on the primary channel
+ primaryName := r.cfg.Channels.Settings[0].Name
+ primaryPSK := r.cfg.Channels.Settings[0].Psk
+ if serviceEnvelope.ChannelId != primaryName {
+ return nil
+ }
+
+ r.logger.Debug("received service envelope for primary channel", "serviceEnvelope", serviceEnvelope)
+ // Check if we should try and decrypt the message
+ var data *pb.Data
+ switch payload := meshPacket.PayloadVariant.(type) {
+ case *pb.MeshPacket_Decoded:
+ data = payload.Decoded
+ case *pb.MeshPacket_Encrypted:
+ // TODO: Check if we have the key for this channel
+ plaintext, err := radio.XOR(
+ payload.Encrypted,
+ primaryPSK,
+ meshPacket.Id,
+ meshPacket.From,
+ )
+ if err != nil {
+ return fmt.Errorf("decrypting: %w", err)
+ }
+ data = &pb.Data{}
+ if err := proto.Unmarshal(plaintext, data); err != nil {
+ return fmt.Errorf("unmarshalling decrypted data: %w", err)
+ }
+ default:
+ return fmt.Errorf("unknown payload variant %T", payload)
+ }
+ r.logger.Debug("received data for primary channel", "data", data)
+
+ // For messages on the primary channel, we want to handle these and potentially update the nodeDB.
+ switch data.Portnum {
+ case pb.PortNum_NODEINFO_APP:
+ user := &pb.User{}
+ if err := proto.Unmarshal(data.Payload, user); err != nil {
+ return fmt.Errorf("unmarshalling user: %w", err)
+ }
+ r.logger.Info("received NodeInfo", "user", user)
+ r.updateNodeDB(meshPacket.From, func(nodeInfo *pb.NodeInfo) {
+ nodeInfo.User = user
+ })
+ case pb.PortNum_TEXT_MESSAGE_APP:
+ r.logger.Info("received TextMessage", "message", string(data.Payload))
+ case pb.PortNum_ROUTING_APP:
+ routingPayload := &pb.Routing{}
+ if err := proto.Unmarshal(data.Payload, routingPayload); err != nil {
+ return fmt.Errorf("unmarshalling routingPayload: %w", err)
+ }
+ r.logger.Info("received Routing", "routing", routingPayload)
+ case pb.PortNum_POSITION_APP:
+ positionPayload := &pb.Position{}
+ if err := proto.Unmarshal(data.Payload, positionPayload); err != nil {
+ return fmt.Errorf("unmarshalling positionPayload: %w", err)
+ }
+ r.logger.Info("received Position", "position", positionPayload)
+ r.updateNodeDB(meshPacket.From, func(nodeInfo *pb.NodeInfo) {
+ nodeInfo.Position = positionPayload
+ })
+ case pb.PortNum_TELEMETRY_APP:
+ telemetryPayload := &pb.Telemetry{}
+ if err := proto.Unmarshal(data.Payload, telemetryPayload); err != nil {
+ return fmt.Errorf("unmarshalling telemetryPayload: %w", err)
+ }
+ deviceMetrics := telemetryPayload.GetDeviceMetrics()
+ if deviceMetrics == nil {
+ break
+ }
+ r.logger.Info("received Telemetry deviceMetrics", "telemetry", telemetryPayload)
+ r.updateNodeDB(meshPacket.From, func(nodeInfo *pb.NodeInfo) {
+ nodeInfo.DeviceMetrics = deviceMetrics
+ })
+ default:
+ r.logger.Debug("received unhandled app payload", "data", data)
+ }
+
+ return nil
+}
+
+func (r *Radio) nextPacketID() uint32 {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.packetID++
+ return r.packetID
+}
+
+func (r *Radio) sendPacket(ctx context.Context, packet *pb.MeshPacket) error {
+ // TODO: Optimistically attempt to encrypt the packet here if we recognise the channel, encryption is enabled and
+ // the payload is not currently encrypted.
+
+ // sendPacket is responsible for setting the packet ID.
+ r.packetID = r.nextPacketID()
+
+ se := &pb.ServiceEnvelope{
+ // TODO: Fetch channel to use based on packet.Channel rather than hardcoding to primary channel.
+ ChannelId: r.cfg.Channels.Settings[0].Name,
+ GatewayId: r.cfg.NodeID.String(),
+ Packet: packet,
+ }
+ bytes, err := proto.Marshal(se)
+ if err != nil {
+ return fmt.Errorf("marshalling service envelope: %w", err)
+ }
+ return r.mqtt.Publish(&mqtt.Message{
+ Topic: r.mqtt.GetFullTopicForChannel(r.cfg.Channels.Settings[0].Name) + "/" + r.cfg.NodeID.String(),
+ Payload: bytes,
+ })
+}
+
+func (r *Radio) broadcastNodeInfo(ctx context.Context) error {
+ r.logger.Info("broadcasting NodeInfo")
+ // TODO: Lots of stuff missing here. However, this is enough for it to show in the UI of another node listening to
+ // the MQTT server.
+ user := &pb.User{
+ Id: r.cfg.NodeID.String(),
+ LongName: r.cfg.LongName,
+ ShortName: r.cfg.ShortName,
+ HwModel: pb.HardwareModel_PRIVATE_HW,
+ }
+ userBytes, err := proto.Marshal(user)
+ if err != nil {
+ return fmt.Errorf("marshalling user: %w", err)
+ }
+ return r.sendPacket(ctx, &pb.MeshPacket{
+ From: r.cfg.NodeID.Uint32(),
+ To: meshtastic.BroadcastNodeID.Uint32(),
+ PayloadVariant: &pb.MeshPacket_Decoded{
+ Decoded: &pb.Data{
+ Portnum: pb.PortNum_NODEINFO_APP,
+ Payload: userBytes,
+ },
+ },
+ })
+}
+
+func (r *Radio) broadcastPosition(ctx context.Context) error {
+ r.logger.Info("broadcasting Position")
+ position := &pb.Position{
+ LatitudeI: r.cfg.PositionLatitudeI,
+ LongitudeI: r.cfg.PositionLongitudeI,
+ Altitude: r.cfg.PositionAltitude,
+ Time: uint32(time.Now().Unix()),
+ }
+ positionBytes, err := proto.Marshal(position)
+ if err != nil {
+ return fmt.Errorf("marshalling position: %w", err)
+ }
+ return r.sendPacket(ctx, &pb.MeshPacket{
+ From: r.cfg.NodeID.Uint32(),
+ To: meshtastic.BroadcastNodeID.Uint32(),
+ PayloadVariant: &pb.MeshPacket_Decoded{
+ Decoded: &pb.Data{
+ Portnum: pb.PortNum_POSITION_APP,
+ Payload: positionBytes,
+ },
+ },
+ })
+}
+
+// dispatchMessageToFromRadio sends a FromRadio message to all current subscribers to
+// the FromRadio.
+func (r *Radio) dispatchMessageToFromRadio(msg *pb.FromRadio) error {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ for ch := range r.fromRadioSubscribers {
+ // TODO: Make this way safer/resilient
+ ch <- msg
+ }
+ return nil
+}
+
+// FromRadio subscribes to messages from the radio.
+func (r *Radio) FromRadio(ctx context.Context, ch chan<- *pb.FromRadio) error {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.fromRadioSubscribers[ch] = struct{}{}
+ // TODO: Unsubscribe from the channel when the context is cancelled??
+ return nil
+}
+
+// ToRadio sends a message to the radio.
+func (r *Radio) ToRadio(ctx context.Context, msg *pb.ToRadio) error {
+ switch payload := msg.PayloadVariant.(type) {
+ case *pb.ToRadio_Disconnect:
+ r.logger.Info("received Disconnect from ToRadio")
+ case *pb.ToRadio_Packet:
+ r.logger.Info("received Packet from ToRadio")
+ return r.sendPacket(ctx, payload.Packet)
+ default:
+ r.logger.Debug("unknown payload variant", "payload", payload)
+ }
+ return fmt.Errorf("not implemented")
+}
diff --git a/emulated/example/main.go b/emulated/example/main.go
new file mode 100644
index 0000000..d0a16e7
--- /dev/null
+++ b/emulated/example/main.go
@@ -0,0 +1,108 @@
+package main
+
+import (
+ pb "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic"
+ "context"
+ "fmt"
+ "github.com/charmbracelet/log"
+ "github.com/crypto-smoke/meshtastic-go"
+ "github.com/crypto-smoke/meshtastic-go/emulated"
+ "github.com/crypto-smoke/meshtastic-go/mqtt"
+ "github.com/crypto-smoke/meshtastic-go/radio"
+ "golang.org/x/sync/errgroup"
+ "time"
+)
+
+func main() {
+ // TODO: Flesh this example out and make it configurable
+ ctx := context.Background()
+ log.SetLevel(log.DebugLevel)
+
+ myNodeID := meshtastic.NodeID(3735928559)
+ r, err := emulated.NewRadio(emulated.Config{
+ LongName: "EXAMPLE",
+ ShortName: "EMPL",
+ NodeID: myNodeID,
+ MQTTClient: &mqtt.DefaultClient,
+ Channels: &pb.ChannelSet{
+ Settings: []*pb.ChannelSettings{
+ {
+ Name: "LongFast",
+ Psk: radio.DefaultKey,
+ },
+ },
+ },
+ BroadcastNodeInfoInterval: 5 * time.Minute,
+
+ BroadcastPositionInterval: 5 * time.Minute,
+ // Hardcoded to the position of Buckingham Palace.
+ PositionLatitudeI: 515014760,
+ PositionLongitudeI: -1406340,
+ PositionAltitude: 2,
+ })
+ if err != nil {
+ panic(err)
+ }
+
+ eg, egCtx := errgroup.WithContext(ctx)
+
+ eg.Go(func() error {
+ if err := r.Run(egCtx); err != nil {
+ return fmt.Errorf("running radio: %w", err)
+ }
+ return nil
+ })
+
+ eg.Go(func() error {
+ // Forgive me, for I have sinned.
+ // TODO: We need a way of knowing the radio has come up and is ready that's better than waiting ten seconds.
+ select {
+ case <-egCtx.Done():
+ return nil
+ case <-time.After(10 * time.Second):
+ }
+
+ err := r.ToRadio(egCtx, &pb.ToRadio{
+ PayloadVariant: &pb.ToRadio_Packet{
+ Packet: &pb.MeshPacket{
+ From: myNodeID.Uint32(),
+ // This is hard coded to Noah's node ID
+ To: 2437877602,
+ PayloadVariant: &pb.MeshPacket_Decoded{
+ Decoded: &pb.Data{
+ Portnum: pb.PortNum_TEXT_MESSAGE_APP,
+ Payload: []byte("from main!!"),
+ },
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("sending to radio: %w", err)
+ }
+
+ return nil
+ })
+
+ eg.Go(func() error {
+ ch := make(chan *pb.FromRadio)
+ defer close(ch)
+ err := r.FromRadio(egCtx, ch)
+ if err != nil {
+ return fmt.Errorf("setting up FromRadio subscriber: %w", err)
+ }
+
+ for {
+ select {
+ case <-egCtx.Done():
+ return nil
+ case fromRadio := <-ch:
+ log.Info("FromRadio!!", "packet", fromRadio)
+ }
+ }
+ })
+
+ if err := eg.Wait(); err != nil {
+ panic(err)
+ }
+}
diff --git a/mqtt/client.go b/mqtt/client.go
index c3746b5..eff4736 100644
--- a/mqtt/client.go
+++ b/mqtt/client.go
@@ -9,6 +9,8 @@ import (
"time"
)
+const MQTTProtoTopic = "/2/c/"
+
type Client struct {
server string
username string
@@ -108,11 +110,13 @@ func (c *Client) Handle(channel string, h HandlerFunc) {
c.channelHandlers[channel] = append(c.channelHandlers[channel], h)
c.client.Subscribe(topic+"/+", 0, c.handleBrokerMessage)
}
+
func (c *Client) GetFullTopicForChannel(channel string) string {
- return c.topicRoot + "/c/" + channel
+ return c.topicRoot + MQTTProtoTopic + channel
}
+
func (c *Client) GetChannelFromTopic(topic string) string {
- trimmed := strings.TrimPrefix(topic, c.topicRoot+"/c/")
+ trimmed := strings.TrimPrefix(topic, c.topicRoot+MQTTProtoTopic)
sepIndex := strings.Index(trimmed, "/")
if sepIndex > 0 {
return trimmed[:sepIndex]
diff --git a/radio/radio.go b/radio/radio.go
index 31d113a..749cbae 100644
--- a/radio/radio.go
+++ b/radio/radio.go
@@ -2,11 +2,10 @@ package radio
import (
generated "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic"
- b64 "encoding/base64"
+ "encoding/base64"
"errors"
"fmt"
"google.golang.org/protobuf/proto"
- "strings"
)
// not sure what i was getting at with this
@@ -37,12 +36,9 @@ type Something struct {
// as base64: 1PG7OiApB1nwvP+rz05pAQ==
var DefaultKey = []byte{0xd4, 0xf1, 0xbb, 0x3a, 0x20, 0x29, 0x07, 0x59, 0xf0, 0xbc, 0xff, 0xab, 0xcf, 0x4e, 0x69, 0x01}
-// clean up a base64 key that has been rendered safe for use in a URL
-func ParseKey(key string) []byte {
- key = strings.ReplaceAll(key, "-", "+")
- key = strings.ReplaceAll(key, "_", "/")
- sDec, _ := b64.StdEncoding.DecodeString(key)
- return sDec
+// ParseKey converts the most common representation of a channel encryption key (URL encoded base64) to a byte slice
+func ParseKey(key string) ([]byte, error) {
+ return base64.URLEncoding.DecodeString(key)
}
func NewThing() *Something {
diff --git a/util.go b/util.go
index 0de4297..ef2c61f 100644
--- a/util.go
+++ b/util.go
@@ -4,6 +4,7 @@ import (
pbuf "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic"
"encoding/binary"
"fmt"
+ "math"
)
type NodeID uint32
@@ -23,6 +24,9 @@ func (n NodeID) Bytes() []byte {
return bytes
}
+// BroadcastNodeID is the special NodeID used when broadcasting a packet to a channel.
+const BroadcastNodeID NodeID = math.MaxUint32
+
type Node struct {
LongName string
ShortName string