aboutsummaryrefslogtreecommitdiff
path: root/emulated/emulated.go
diff options
context:
space:
mode:
Diffstat (limited to 'emulated/emulated.go')
-rw-r--r--emulated/emulated.go290
1 files changed, 273 insertions, 17 deletions
diff --git a/emulated/emulated.go b/emulated/emulated.go
index 2001276..de62c2a 100644
--- a/emulated/emulated.go
+++ b/emulated/emulated.go
@@ -8,12 +8,20 @@ import (
"github.com/crypto-smoke/meshtastic-go"
"github.com/crypto-smoke/meshtastic-go/mqtt"
"github.com/crypto-smoke/meshtastic-go/radio"
+ "github.com/crypto-smoke/meshtastic-go/transport"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
+ "io"
+ "net"
"sync"
"time"
)
+const (
+ // MinAppVersion is the minimum app version supported by the emulated radio.
+ MinAppVersion = 30200
+)
+
// Config is the configuration for the emulated Radio.
type Config struct {
// Dependencies
@@ -45,6 +53,9 @@ type Config struct {
// PositionAltitude is the altitude of the position which will be regularly broadcasted.
// This is in meters above MSL.
PositionAltitude int32
+
+ // TCPListenAddr is the address the emulated radio will listen on for TCP connections and offer the Client API over.
+ TCPListenAddr string
}
func (c *Config) validate() error {
@@ -149,6 +160,11 @@ func (r *Radio) Run(ctx context.Context) error {
}
})
}
+ if r.cfg.TCPListenAddr != "" {
+ eg.Go(func() error {
+ return r.listenTCP(egCtx)
+ })
+ }
return eg.Wait()
}
@@ -175,6 +191,17 @@ func (r *Radio) updateNodeDB(nodeID uint32, updateFunc func(*pb.NodeInfo)) {
r.nodeDB[nodeID] = nodeInfo
}
+func (r *Radio) getNodeDB() []*pb.NodeInfo {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ nodes := make([]*pb.NodeInfo, 0, len(r.nodeDB))
+ for _, node := range r.nodeDB {
+ clonedNode := proto.Clone(node).(*pb.NodeInfo)
+ nodes = append(nodes, clonedNode)
+ }
+ return nodes
+}
+
func (r *Radio) tryHandleMQTTMessage(msg mqtt.Message) error {
serviceEnvelope := &pb.ServiceEnvelope{}
if err := proto.Unmarshal(msg.Payload, serviceEnvelope); err != nil {
@@ -365,25 +392,254 @@ func (r *Radio) dispatchMessageToFromRadio(msg *pb.FromRadio) error {
return nil
}
-// FromRadio subscribes to messages from the radio.
-func (r *Radio) FromRadio(ctx context.Context, ch chan<- *pb.FromRadio) error {
- r.mu.Lock()
- defer r.mu.Unlock()
- r.fromRadioSubscribers[ch] = struct{}{}
- // TODO: Unsubscribe from the channel when the context is cancelled??
+func (r *Radio) handleToRadioWantConfigID(conn *transport.StreamConn, req *pb.ToRadio_WantConfigId) error {
+ // Send MyInfo
+ err := conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_MyInfo{
+ MyInfo: &pb.MyNodeInfo{
+ MyNodeNum: r.cfg.NodeID.Uint32(),
+ RebootCount: 0,
+ // TODO: Track this as a const
+ MinAppVersion: MinAppVersion,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Metadata
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Metadata{
+ Metadata: &pb.DeviceMetadata{
+ // TODO: Establish firmwareVersion/deviceStateVersion to fake here
+ FirmwareVersion: "2.2.19-fake",
+ DeviceStateVersion: 22,
+ CanShutdown: true,
+ HasWifi: true,
+ HasBluetooth: true,
+ // PositionFlags?
+ HwModel: pb.HardwareModel_PRIVATE_HW,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send all NodeDB entries - plus myself.
+ // TODO: Our own node info entry should be in the DB to avoid the special case here.
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_NodeInfo{
+ NodeInfo: &pb.NodeInfo{
+ Num: r.cfg.NodeID.Uint32(),
+ User: &pb.User{
+ Id: r.cfg.NodeID.String(),
+ LongName: r.cfg.LongName,
+ ShortName: r.cfg.ShortName,
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ for _, nodeInfo := range r.getNodeDB() {
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_NodeInfo{
+ NodeInfo: nodeInfo,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+
+ // TODO: Send all channels
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Channel{
+ Channel: &pb.Channel{
+ Index: 0,
+ Settings: &pb.ChannelSettings{
+ Psk: nil,
+ },
+ Role: pb.Channel_PRIMARY,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Config: Device
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Config{
+ Config: &pb.Config{
+ PayloadVariant: &pb.Config_Device{
+ Device: &pb.Config_DeviceConfig{
+ SerialEnabled: true,
+ NodeInfoBroadcastSecs: uint32(r.cfg.BroadcastNodeInfoInterval.Seconds()),
+ },
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send ConfigComplete to indicate we're done
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_ConfigCompleteId{
+ ConfigCompleteId: req.WantConfigId,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
return nil
}
-// ToRadio sends a message to the radio.
-func (r *Radio) ToRadio(ctx context.Context, msg *pb.ToRadio) error {
- switch payload := msg.PayloadVariant.(type) {
- case *pb.ToRadio_Disconnect:
- r.logger.Info("received Disconnect from ToRadio")
- case *pb.ToRadio_Packet:
- r.logger.Info("received Packet from ToRadio")
- return r.sendPacket(ctx, payload.Packet)
- default:
- r.logger.Debug("unknown payload variant", "payload", payload)
+func (r *Radio) handleConn(ctx context.Context, underlying io.ReadWriteCloser) error {
+ streamConn := transport.NewRadioStreamConn(underlying)
+ defer func() {
+ if err := streamConn.Close(); err != nil {
+ r.logger.Error("failed to close streamConn", "err", err)
+ }
+ }()
+
+ eg, egCtx := errgroup.WithContext(ctx)
+ // Handling messages coming from client
+ eg.Go(func() error {
+ for {
+ select {
+ case <-egCtx.Done():
+ return nil
+ default:
+ }
+ msg := &pb.ToRadio{}
+ if err := streamConn.Read(msg); err != nil {
+ return fmt.Errorf("reading from streamConn: %w", err)
+ }
+ r.logger.Info("received ToRadio from streamConn", "msg", msg)
+ switch payload := msg.PayloadVariant.(type) {
+ case *pb.ToRadio_Disconnect:
+ // The meshtastic python client sends a Disconnect command and with the TCP implementation, it expects
+ // the radio to close the connection. So we end the read loop here, and return to close the connection.
+ return nil
+ case *pb.ToRadio_WantConfigId:
+ if err := r.handleToRadioWantConfigID(streamConn, payload); err != nil {
+ return fmt.Errorf("handling WantConfigId: %w", err)
+ }
+ case *pb.ToRadio_Packet:
+ if decoded := payload.Packet.GetDecoded(); decoded != nil {
+ if decoded.Portnum == pb.PortNum_ADMIN_APP {
+ admin := &pb.AdminMessage{}
+ if err := proto.Unmarshal(decoded.Payload, admin); err != nil {
+ return fmt.Errorf("unmarshalling admin: %w", err)
+ }
+
+ switch adminPayload := admin.PayloadVariant.(type) {
+ // TODO: Properly handle channel listing, this hack is just so the Python CLI thinks
+ // it's connected
+ case *pb.AdminMessage_GetChannelRequest:
+ r.logger.Info("received GetChannelRequest", "adminPayload", adminPayload, "packet", payload)
+ resp := &pb.AdminMessage{
+ PayloadVariant: &pb.AdminMessage_GetChannelResponse{
+ GetChannelResponse: &pb.Channel{
+ Index: 0,
+ Settings: &pb.ChannelSettings{
+ Psk: nil,
+ },
+ Role: pb.Channel_DISABLED,
+ },
+ },
+ }
+ respBytes, err := proto.Marshal(resp)
+ if err != nil {
+ return fmt.Errorf("marshalling GetChannelResponse: %w", err)
+ }
+ // Send GetChannelResponse
+ if err := streamConn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Packet{
+ Packet: &pb.MeshPacket{
+ Id: r.nextPacketID(),
+ From: r.cfg.NodeID.Uint32(),
+ To: r.cfg.NodeID.Uint32(),
+ PayloadVariant: &pb.MeshPacket_Decoded{
+ Decoded: &pb.Data{
+ Portnum: pb.PortNum_ADMIN_APP,
+ Payload: respBytes,
+ RequestId: payload.Packet.Id,
+ },
+ },
+ },
+ },
+ }); err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+ }
+ }
+ }
+ }
+ })
+ // Handle sending messages to client
+ eg.Go(func() error {
+ ch := make(chan *pb.FromRadio)
+ r.mu.Lock()
+ r.fromRadioSubscribers[ch] = struct{}{}
+ r.mu.Unlock()
+ defer func() {
+ r.mu.Lock()
+ delete(r.fromRadioSubscribers, ch)
+ r.mu.Unlock()
+ }()
+
+ for {
+ select {
+ case <-egCtx.Done():
+ return nil
+ case msg := <-ch:
+ if err := streamConn.Write(msg); err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+ }
+ })
+
+ return eg.Wait()
+}
+
+func (r *Radio) listenTCP(ctx context.Context) error {
+ l, err := net.Listen("tcp", r.cfg.TCPListenAddr)
+ if err != nil {
+ return fmt.Errorf("listening: %w", err)
+ }
+ r.logger.Info("listening for tcp connections", "addr", r.cfg.TCPListenAddr)
+
+ for {
+ c, err := l.Accept()
+ if err != nil {
+ r.logger.Error("failed to accept connection", "err", err)
+ continue
+ }
+ go func() {
+ if err := r.handleConn(ctx, c); err != nil {
+ r.logger.Error("failed to handle TCP connection", "err", err)
+ }
+ }()
}
- return fmt.Errorf("not implemented")
+}
+
+// Conn returns an in-memory connection to the emulated radio.
+func (r *Radio) Conn(ctx context.Context) net.Conn {
+ clientConn, radioConn := net.Pipe()
+ go func() {
+ if err := r.handleConn(ctx, radioConn); err != nil {
+ r.logger.Error("failed to handle in-memory connection", "err", err)
+ }
+ }()
+ return clientConn
}