diff options
| author | Noah Stride <[email protected]> | 2024-02-01 00:28:09 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-31 14:28:09 -1000 |
| commit | 463760ccf1cb99f387f5bf99ed83e31772744671 (patch) | |
| tree | 99feec624d4f04542e0c4aaab037b4e0b937a9f4 /emulated | |
| parent | 202889338e29c416e810de888734a2d41c6b1069 (diff) | |
Rough initial radio emulator (#1)
* Super rough initial subscription to channels and sending node info
* Very ugly hacky FromRadio subscription
* Add more TODOs :weary:
* Add log message before we try to broadcast NodeInfo
* Add basic positioning broadcasting
* demo sending message from consumer
* Use const for BroadcastNodeID
* Config validation and defaults
* Tidy up examples/TODOs
* Tidy up example
* Handle position and devicetelemetry updates
* Fix dodgy merge
* Allow configuring interval for nodeinfo/position seperately
* Make broadcasted position configurable
* Add MQTTProtoTopic constant rather than modifying topic root
* Allow altitude of broadcasted position to be configured
* static check ignore
Diffstat (limited to 'emulated')
| -rw-r--r-- | emulated/emulated.go | 391 | ||||
| -rw-r--r-- | emulated/example/main.go | 108 |
2 files changed, 499 insertions, 0 deletions
diff --git a/emulated/emulated.go b/emulated/emulated.go new file mode 100644 index 0000000..7077ee4 --- /dev/null +++ b/emulated/emulated.go @@ -0,0 +1,391 @@ +package emulated + +import ( + pb "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic" + "context" + "fmt" + "github.com/charmbracelet/log" + "github.com/crypto-smoke/meshtastic-go" + "github.com/crypto-smoke/meshtastic-go/mqtt" + "github.com/crypto-smoke/meshtastic-go/radio" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/proto" + "sync" + "time" +) + +// Config is the configuration for the emulated Radio. +type Config struct { + // Dependencies + MQTTClient *mqtt.Client + + // Node configuration + // NodeID is the ID of the node. + NodeID meshtastic.NodeID + // LongName is the long name of the node. + LongName string + // ShortName is the short name of the node. + ShortName string + // Channels is the set of channels the radio will listen and transmit on. + // The first channel in the set is considered the primary channel and is used for broadcasting NodeInfo and Position + Channels *pb.ChannelSet + // BroadcastNodeInfoInterval is the interval at which the radio will broadcast a NodeInfo on the Primary channel. + // The zero value disables broadcasting NodeInfo. + BroadcastNodeInfoInterval time.Duration + + // BroadcastPositionInterval is the interval at which the radio will broadcast Position on the Primary channel. + // The zero value disables broadcasting NodeInfo. + BroadcastPositionInterval time.Duration + // PositionLatitudeI is the latitude of the position which will be regularly broadcasted. + // This is in degrees multiplied by 1e7. + PositionLatitudeI int32 + // PositionLongitudeI is the longitude of the position which will be regularly broadcasted. + // This is in degrees multiplied by 1e7. + PositionLongitudeI int32 + // PositionAltitude is the altitude of the position which will be regularly broadcasted. + // This is in meters above MSL. + PositionAltitude int32 +} + +func (c *Config) validate() error { + if c.MQTTClient == nil { + return fmt.Errorf("MQTTClient is required") + } + if c.NodeID == 0 { + return fmt.Errorf("NodeID is required") + } + if c.LongName == "" { + // TODO: Generate from NodeID + return fmt.Errorf("LongName is required") + } + if c.ShortName == "" { + // TODO: Generate from NodeID + return fmt.Errorf("ShortName is required") + } + if c.Channels == nil { + //lint:ignore ST1005 we're referencing an actual field here. + return fmt.Errorf("Channels is required") + } + if len(c.Channels.Settings) == 0 { + return fmt.Errorf("Channels.Settings should be non-empty") + } + return nil +} + +// Radio emulates a meshtastic Node, communicating with a meshtastic network via MQTT. +type Radio struct { + cfg Config + mqtt *mqtt.Client + logger *log.Logger + + // TODO: rwmutex?? seperate mutexes?? + mu sync.Mutex + fromRadioSubscribers map[chan<- *pb.FromRadio]struct{} + nodeDB map[uint32]*pb.NodeInfo + // packetID is incremented and included in each packet sent from the radio. + // TODO: Eventually, we should offer an easy way of persisting this so that we can resume from where we left off. + packetID uint32 +} + +// NewRadio creates a new emulated radio. +func NewRadio(cfg Config) (*Radio, error) { + if err := cfg.validate(); err != nil { + return nil, fmt.Errorf("validating config: %w", err) + } + return &Radio{ + cfg: cfg, + logger: log.With("radio", cfg.NodeID.String()), + fromRadioSubscribers: map[chan<- *pb.FromRadio]struct{}{}, + mqtt: cfg.MQTTClient, + nodeDB: map[uint32]*pb.NodeInfo{}, + }, nil +} + +// Run starts the radio. It blocks until the context is cancelled. +func (r *Radio) Run(ctx context.Context) error { + if err := r.mqtt.Connect(); err != nil { + return fmt.Errorf("connecting to mqtt: %w", err) + } + // TODO: Disconnect?? + + // Subscribe to all configured channels + for _, ch := range r.cfg.Channels.Settings { + r.logger.Debug("subscribing to mqtt for channel", "channel", ch.Name) + r.mqtt.Handle(ch.Name, r.handleMQTTMessage) + } + + // TODO: Rethink concurrency. Do we want a goroutine servicing ToRadio and one servicing FromRadio? + + eg, egCtx := errgroup.WithContext(ctx) + // Spin up goroutine to send NodeInfo every interval + if r.cfg.BroadcastNodeInfoInterval > 0 { + eg.Go(func() error { + ticker := time.NewTicker(r.cfg.BroadcastNodeInfoInterval) + defer ticker.Stop() + for { + if err := r.broadcastNodeInfo(egCtx); err != nil { + r.logger.Error("failed to broadcast node info", "err", err) + } + select { + case <-egCtx.Done(): + return nil + case <-ticker.C: + } + } + }) + } + // Spin up goroutine to send Position every interval + if r.cfg.BroadcastPositionInterval > 0 { + eg.Go(func() error { + ticker := time.NewTicker(r.cfg.BroadcastPositionInterval) + defer ticker.Stop() + for { + if err := r.broadcastPosition(egCtx); err != nil { + r.logger.Error("failed to broadcast position", "err", err) + } + select { + case <-egCtx.Done(): + return nil + case <-ticker.C: + } + } + }) + } + + return eg.Wait() +} + +func (r *Radio) handleMQTTMessage(msg mqtt.Message) { + // TODO: Determine how "github.com/eclipse/paho.mqtt.golang" handles concurrency. Do we need to dispatch here to + // a goroutine which handles incoming messages to unblock this one? + if err := r.tryHandleMQTTMessage(msg); err != nil { + r.logger.Error("failed to handle incoming mqtt message", "err", err) + } +} + +func (r *Radio) updateNodeDB(nodeID uint32, updateFunc func(*pb.NodeInfo)) { + r.mu.Lock() + defer r.mu.Unlock() + nodeInfo, ok := r.nodeDB[nodeID] + if !ok { + nodeInfo = &pb.NodeInfo{ + Num: nodeID, + } + } + updateFunc(nodeInfo) + nodeInfo.LastHeard = uint32(time.Now().Unix()) + r.nodeDB[nodeID] = nodeInfo +} + +func (r *Radio) tryHandleMQTTMessage(msg mqtt.Message) error { + serviceEnvelope := &pb.ServiceEnvelope{} + if err := proto.Unmarshal(msg.Payload, serviceEnvelope); err != nil { + return fmt.Errorf("unmarshalling: %w", err) + } + meshPacket := serviceEnvelope.Packet + + // TODO: Attempt decryption first before dispatching to subscribers + // TODO: This means we move this further below. + if err := r.dispatchMessageToFromRadio(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Packet{ + Packet: meshPacket, + }, + }); err != nil { + r.logger.Error("failed to dispatch message to FromRadio subscribers", "err", err) + } + + // From now on, we only care about messages on the primary channel + primaryName := r.cfg.Channels.Settings[0].Name + primaryPSK := r.cfg.Channels.Settings[0].Psk + if serviceEnvelope.ChannelId != primaryName { + return nil + } + + r.logger.Debug("received service envelope for primary channel", "serviceEnvelope", serviceEnvelope) + // Check if we should try and decrypt the message + var data *pb.Data + switch payload := meshPacket.PayloadVariant.(type) { + case *pb.MeshPacket_Decoded: + data = payload.Decoded + case *pb.MeshPacket_Encrypted: + // TODO: Check if we have the key for this channel + plaintext, err := radio.XOR( + payload.Encrypted, + primaryPSK, + meshPacket.Id, + meshPacket.From, + ) + if err != nil { + return fmt.Errorf("decrypting: %w", err) + } + data = &pb.Data{} + if err := proto.Unmarshal(plaintext, data); err != nil { + return fmt.Errorf("unmarshalling decrypted data: %w", err) + } + default: + return fmt.Errorf("unknown payload variant %T", payload) + } + r.logger.Debug("received data for primary channel", "data", data) + + // For messages on the primary channel, we want to handle these and potentially update the nodeDB. + switch data.Portnum { + case pb.PortNum_NODEINFO_APP: + user := &pb.User{} + if err := proto.Unmarshal(data.Payload, user); err != nil { + return fmt.Errorf("unmarshalling user: %w", err) + } + r.logger.Info("received NodeInfo", "user", user) + r.updateNodeDB(meshPacket.From, func(nodeInfo *pb.NodeInfo) { + nodeInfo.User = user + }) + case pb.PortNum_TEXT_MESSAGE_APP: + r.logger.Info("received TextMessage", "message", string(data.Payload)) + case pb.PortNum_ROUTING_APP: + routingPayload := &pb.Routing{} + if err := proto.Unmarshal(data.Payload, routingPayload); err != nil { + return fmt.Errorf("unmarshalling routingPayload: %w", err) + } + r.logger.Info("received Routing", "routing", routingPayload) + case pb.PortNum_POSITION_APP: + positionPayload := &pb.Position{} + if err := proto.Unmarshal(data.Payload, positionPayload); err != nil { + return fmt.Errorf("unmarshalling positionPayload: %w", err) + } + r.logger.Info("received Position", "position", positionPayload) + r.updateNodeDB(meshPacket.From, func(nodeInfo *pb.NodeInfo) { + nodeInfo.Position = positionPayload + }) + case pb.PortNum_TELEMETRY_APP: + telemetryPayload := &pb.Telemetry{} + if err := proto.Unmarshal(data.Payload, telemetryPayload); err != nil { + return fmt.Errorf("unmarshalling telemetryPayload: %w", err) + } + deviceMetrics := telemetryPayload.GetDeviceMetrics() + if deviceMetrics == nil { + break + } + r.logger.Info("received Telemetry deviceMetrics", "telemetry", telemetryPayload) + r.updateNodeDB(meshPacket.From, func(nodeInfo *pb.NodeInfo) { + nodeInfo.DeviceMetrics = deviceMetrics + }) + default: + r.logger.Debug("received unhandled app payload", "data", data) + } + + return nil +} + +func (r *Radio) nextPacketID() uint32 { + r.mu.Lock() + defer r.mu.Unlock() + r.packetID++ + return r.packetID +} + +func (r *Radio) sendPacket(ctx context.Context, packet *pb.MeshPacket) error { + // TODO: Optimistically attempt to encrypt the packet here if we recognise the channel, encryption is enabled and + // the payload is not currently encrypted. + + // sendPacket is responsible for setting the packet ID. + r.packetID = r.nextPacketID() + + se := &pb.ServiceEnvelope{ + // TODO: Fetch channel to use based on packet.Channel rather than hardcoding to primary channel. + ChannelId: r.cfg.Channels.Settings[0].Name, + GatewayId: r.cfg.NodeID.String(), + Packet: packet, + } + bytes, err := proto.Marshal(se) + if err != nil { + return fmt.Errorf("marshalling service envelope: %w", err) + } + return r.mqtt.Publish(&mqtt.Message{ + Topic: r.mqtt.GetFullTopicForChannel(r.cfg.Channels.Settings[0].Name) + "/" + r.cfg.NodeID.String(), + Payload: bytes, + }) +} + +func (r *Radio) broadcastNodeInfo(ctx context.Context) error { + r.logger.Info("broadcasting NodeInfo") + // TODO: Lots of stuff missing here. However, this is enough for it to show in the UI of another node listening to + // the MQTT server. + user := &pb.User{ + Id: r.cfg.NodeID.String(), + LongName: r.cfg.LongName, + ShortName: r.cfg.ShortName, + HwModel: pb.HardwareModel_PRIVATE_HW, + } + userBytes, err := proto.Marshal(user) + if err != nil { + return fmt.Errorf("marshalling user: %w", err) + } + return r.sendPacket(ctx, &pb.MeshPacket{ + From: r.cfg.NodeID.Uint32(), + To: meshtastic.BroadcastNodeID.Uint32(), + PayloadVariant: &pb.MeshPacket_Decoded{ + Decoded: &pb.Data{ + Portnum: pb.PortNum_NODEINFO_APP, + Payload: userBytes, + }, + }, + }) +} + +func (r *Radio) broadcastPosition(ctx context.Context) error { + r.logger.Info("broadcasting Position") + position := &pb.Position{ + LatitudeI: r.cfg.PositionLatitudeI, + LongitudeI: r.cfg.PositionLongitudeI, + Altitude: r.cfg.PositionAltitude, + Time: uint32(time.Now().Unix()), + } + positionBytes, err := proto.Marshal(position) + if err != nil { + return fmt.Errorf("marshalling position: %w", err) + } + return r.sendPacket(ctx, &pb.MeshPacket{ + From: r.cfg.NodeID.Uint32(), + To: meshtastic.BroadcastNodeID.Uint32(), + PayloadVariant: &pb.MeshPacket_Decoded{ + Decoded: &pb.Data{ + Portnum: pb.PortNum_POSITION_APP, + Payload: positionBytes, + }, + }, + }) +} + +// dispatchMessageToFromRadio sends a FromRadio message to all current subscribers to +// the FromRadio. +func (r *Radio) dispatchMessageToFromRadio(msg *pb.FromRadio) error { + r.mu.Lock() + defer r.mu.Unlock() + for ch := range r.fromRadioSubscribers { + // TODO: Make this way safer/resilient + ch <- msg + } + return nil +} + +// FromRadio subscribes to messages from the radio. +func (r *Radio) FromRadio(ctx context.Context, ch chan<- *pb.FromRadio) error { + r.mu.Lock() + defer r.mu.Unlock() + r.fromRadioSubscribers[ch] = struct{}{} + // TODO: Unsubscribe from the channel when the context is cancelled?? + return nil +} + +// ToRadio sends a message to the radio. +func (r *Radio) ToRadio(ctx context.Context, msg *pb.ToRadio) error { + switch payload := msg.PayloadVariant.(type) { + case *pb.ToRadio_Disconnect: + r.logger.Info("received Disconnect from ToRadio") + case *pb.ToRadio_Packet: + r.logger.Info("received Packet from ToRadio") + return r.sendPacket(ctx, payload.Packet) + default: + r.logger.Debug("unknown payload variant", "payload", payload) + } + return fmt.Errorf("not implemented") +} diff --git a/emulated/example/main.go b/emulated/example/main.go new file mode 100644 index 0000000..d0a16e7 --- /dev/null +++ b/emulated/example/main.go @@ -0,0 +1,108 @@ +package main + +import ( + pb "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic" + "context" + "fmt" + "github.com/charmbracelet/log" + "github.com/crypto-smoke/meshtastic-go" + "github.com/crypto-smoke/meshtastic-go/emulated" + "github.com/crypto-smoke/meshtastic-go/mqtt" + "github.com/crypto-smoke/meshtastic-go/radio" + "golang.org/x/sync/errgroup" + "time" +) + +func main() { + // TODO: Flesh this example out and make it configurable + ctx := context.Background() + log.SetLevel(log.DebugLevel) + + myNodeID := meshtastic.NodeID(3735928559) + r, err := emulated.NewRadio(emulated.Config{ + LongName: "EXAMPLE", + ShortName: "EMPL", + NodeID: myNodeID, + MQTTClient: &mqtt.DefaultClient, + Channels: &pb.ChannelSet{ + Settings: []*pb.ChannelSettings{ + { + Name: "LongFast", + Psk: radio.DefaultKey, + }, + }, + }, + BroadcastNodeInfoInterval: 5 * time.Minute, + + BroadcastPositionInterval: 5 * time.Minute, + // Hardcoded to the position of Buckingham Palace. + PositionLatitudeI: 515014760, + PositionLongitudeI: -1406340, + PositionAltitude: 2, + }) + if err != nil { + panic(err) + } + + eg, egCtx := errgroup.WithContext(ctx) + + eg.Go(func() error { + if err := r.Run(egCtx); err != nil { + return fmt.Errorf("running radio: %w", err) + } + return nil + }) + + eg.Go(func() error { + // Forgive me, for I have sinned. + // TODO: We need a way of knowing the radio has come up and is ready that's better than waiting ten seconds. + select { + case <-egCtx.Done(): + return nil + case <-time.After(10 * time.Second): + } + + err := r.ToRadio(egCtx, &pb.ToRadio{ + PayloadVariant: &pb.ToRadio_Packet{ + Packet: &pb.MeshPacket{ + From: myNodeID.Uint32(), + // This is hard coded to Noah's node ID + To: 2437877602, + PayloadVariant: &pb.MeshPacket_Decoded{ + Decoded: &pb.Data{ + Portnum: pb.PortNum_TEXT_MESSAGE_APP, + Payload: []byte("from main!!"), + }, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("sending to radio: %w", err) + } + + return nil + }) + + eg.Go(func() error { + ch := make(chan *pb.FromRadio) + defer close(ch) + err := r.FromRadio(egCtx, ch) + if err != nil { + return fmt.Errorf("setting up FromRadio subscriber: %w", err) + } + + for { + select { + case <-egCtx.Done(): + return nil + case fromRadio := <-ch: + log.Info("FromRadio!!", "packet", fromRadio) + } + } + }) + + if err := eg.Wait(); err != nil { + panic(err) + } +} |
