diff options
Diffstat (limited to 'emulated')
| -rw-r--r-- | emulated/emulated.go | 290 | ||||
| -rw-r--r-- | emulated/example/main.go | 38 |
2 files changed, 288 insertions, 40 deletions
diff --git a/emulated/emulated.go b/emulated/emulated.go index 2001276..de62c2a 100644 --- a/emulated/emulated.go +++ b/emulated/emulated.go @@ -8,12 +8,20 @@ import ( "github.com/crypto-smoke/meshtastic-go" "github.com/crypto-smoke/meshtastic-go/mqtt" "github.com/crypto-smoke/meshtastic-go/radio" + "github.com/crypto-smoke/meshtastic-go/transport" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" + "io" + "net" "sync" "time" ) +const ( + // MinAppVersion is the minimum app version supported by the emulated radio. + MinAppVersion = 30200 +) + // Config is the configuration for the emulated Radio. type Config struct { // Dependencies @@ -45,6 +53,9 @@ type Config struct { // PositionAltitude is the altitude of the position which will be regularly broadcasted. // This is in meters above MSL. PositionAltitude int32 + + // TCPListenAddr is the address the emulated radio will listen on for TCP connections and offer the Client API over. + TCPListenAddr string } func (c *Config) validate() error { @@ -149,6 +160,11 @@ func (r *Radio) Run(ctx context.Context) error { } }) } + if r.cfg.TCPListenAddr != "" { + eg.Go(func() error { + return r.listenTCP(egCtx) + }) + } return eg.Wait() } @@ -175,6 +191,17 @@ func (r *Radio) updateNodeDB(nodeID uint32, updateFunc func(*pb.NodeInfo)) { r.nodeDB[nodeID] = nodeInfo } +func (r *Radio) getNodeDB() []*pb.NodeInfo { + r.mu.Lock() + defer r.mu.Unlock() + nodes := make([]*pb.NodeInfo, 0, len(r.nodeDB)) + for _, node := range r.nodeDB { + clonedNode := proto.Clone(node).(*pb.NodeInfo) + nodes = append(nodes, clonedNode) + } + return nodes +} + func (r *Radio) tryHandleMQTTMessage(msg mqtt.Message) error { serviceEnvelope := &pb.ServiceEnvelope{} if err := proto.Unmarshal(msg.Payload, serviceEnvelope); err != nil { @@ -365,25 +392,254 @@ func (r *Radio) dispatchMessageToFromRadio(msg *pb.FromRadio) error { return nil } -// FromRadio subscribes to messages from the radio. -func (r *Radio) FromRadio(ctx context.Context, ch chan<- *pb.FromRadio) error { - r.mu.Lock() - defer r.mu.Unlock() - r.fromRadioSubscribers[ch] = struct{}{} - // TODO: Unsubscribe from the channel when the context is cancelled?? +func (r *Radio) handleToRadioWantConfigID(conn *transport.StreamConn, req *pb.ToRadio_WantConfigId) error { + // Send MyInfo + err := conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_MyInfo{ + MyInfo: &pb.MyNodeInfo{ + MyNodeNum: r.cfg.NodeID.Uint32(), + RebootCount: 0, + // TODO: Track this as a const + MinAppVersion: MinAppVersion, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Metadata + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Metadata{ + Metadata: &pb.DeviceMetadata{ + // TODO: Establish firmwareVersion/deviceStateVersion to fake here + FirmwareVersion: "2.2.19-fake", + DeviceStateVersion: 22, + CanShutdown: true, + HasWifi: true, + HasBluetooth: true, + // PositionFlags? + HwModel: pb.HardwareModel_PRIVATE_HW, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send all NodeDB entries - plus myself. + // TODO: Our own node info entry should be in the DB to avoid the special case here. + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_NodeInfo{ + NodeInfo: &pb.NodeInfo{ + Num: r.cfg.NodeID.Uint32(), + User: &pb.User{ + Id: r.cfg.NodeID.String(), + LongName: r.cfg.LongName, + ShortName: r.cfg.ShortName, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + for _, nodeInfo := range r.getNodeDB() { + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_NodeInfo{ + NodeInfo: nodeInfo, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + + // TODO: Send all channels + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Channel{ + Channel: &pb.Channel{ + Index: 0, + Settings: &pb.ChannelSettings{ + Psk: nil, + }, + Role: pb.Channel_PRIMARY, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Config: Device + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Config{ + Config: &pb.Config{ + PayloadVariant: &pb.Config_Device{ + Device: &pb.Config_DeviceConfig{ + SerialEnabled: true, + NodeInfoBroadcastSecs: uint32(r.cfg.BroadcastNodeInfoInterval.Seconds()), + }, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send ConfigComplete to indicate we're done + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_ConfigCompleteId{ + ConfigCompleteId: req.WantConfigId, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + return nil } -// ToRadio sends a message to the radio. -func (r *Radio) ToRadio(ctx context.Context, msg *pb.ToRadio) error { - switch payload := msg.PayloadVariant.(type) { - case *pb.ToRadio_Disconnect: - r.logger.Info("received Disconnect from ToRadio") - case *pb.ToRadio_Packet: - r.logger.Info("received Packet from ToRadio") - return r.sendPacket(ctx, payload.Packet) - default: - r.logger.Debug("unknown payload variant", "payload", payload) +func (r *Radio) handleConn(ctx context.Context, underlying io.ReadWriteCloser) error { + streamConn := transport.NewRadioStreamConn(underlying) + defer func() { + if err := streamConn.Close(); err != nil { + r.logger.Error("failed to close streamConn", "err", err) + } + }() + + eg, egCtx := errgroup.WithContext(ctx) + // Handling messages coming from client + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return nil + default: + } + msg := &pb.ToRadio{} + if err := streamConn.Read(msg); err != nil { + return fmt.Errorf("reading from streamConn: %w", err) + } + r.logger.Info("received ToRadio from streamConn", "msg", msg) + switch payload := msg.PayloadVariant.(type) { + case *pb.ToRadio_Disconnect: + // The meshtastic python client sends a Disconnect command and with the TCP implementation, it expects + // the radio to close the connection. So we end the read loop here, and return to close the connection. + return nil + case *pb.ToRadio_WantConfigId: + if err := r.handleToRadioWantConfigID(streamConn, payload); err != nil { + return fmt.Errorf("handling WantConfigId: %w", err) + } + case *pb.ToRadio_Packet: + if decoded := payload.Packet.GetDecoded(); decoded != nil { + if decoded.Portnum == pb.PortNum_ADMIN_APP { + admin := &pb.AdminMessage{} + if err := proto.Unmarshal(decoded.Payload, admin); err != nil { + return fmt.Errorf("unmarshalling admin: %w", err) + } + + switch adminPayload := admin.PayloadVariant.(type) { + // TODO: Properly handle channel listing, this hack is just so the Python CLI thinks + // it's connected + case *pb.AdminMessage_GetChannelRequest: + r.logger.Info("received GetChannelRequest", "adminPayload", adminPayload, "packet", payload) + resp := &pb.AdminMessage{ + PayloadVariant: &pb.AdminMessage_GetChannelResponse{ + GetChannelResponse: &pb.Channel{ + Index: 0, + Settings: &pb.ChannelSettings{ + Psk: nil, + }, + Role: pb.Channel_DISABLED, + }, + }, + } + respBytes, err := proto.Marshal(resp) + if err != nil { + return fmt.Errorf("marshalling GetChannelResponse: %w", err) + } + // Send GetChannelResponse + if err := streamConn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Packet{ + Packet: &pb.MeshPacket{ + Id: r.nextPacketID(), + From: r.cfg.NodeID.Uint32(), + To: r.cfg.NodeID.Uint32(), + PayloadVariant: &pb.MeshPacket_Decoded{ + Decoded: &pb.Data{ + Portnum: pb.PortNum_ADMIN_APP, + Payload: respBytes, + RequestId: payload.Packet.Id, + }, + }, + }, + }, + }); err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + } + } + } + } + }) + // Handle sending messages to client + eg.Go(func() error { + ch := make(chan *pb.FromRadio) + r.mu.Lock() + r.fromRadioSubscribers[ch] = struct{}{} + r.mu.Unlock() + defer func() { + r.mu.Lock() + delete(r.fromRadioSubscribers, ch) + r.mu.Unlock() + }() + + for { + select { + case <-egCtx.Done(): + return nil + case msg := <-ch: + if err := streamConn.Write(msg); err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + } + }) + + return eg.Wait() +} + +func (r *Radio) listenTCP(ctx context.Context) error { + l, err := net.Listen("tcp", r.cfg.TCPListenAddr) + if err != nil { + return fmt.Errorf("listening: %w", err) + } + r.logger.Info("listening for tcp connections", "addr", r.cfg.TCPListenAddr) + + for { + c, err := l.Accept() + if err != nil { + r.logger.Error("failed to accept connection", "err", err) + continue + } + go func() { + if err := r.handleConn(ctx, c); err != nil { + r.logger.Error("failed to handle TCP connection", "err", err) + } + }() } - return fmt.Errorf("not implemented") +} + +// Conn returns an in-memory connection to the emulated radio. +func (r *Radio) Conn(ctx context.Context) net.Conn { + clientConn, radioConn := net.Pipe() + go func() { + if err := r.handleConn(ctx, radioConn); err != nil { + r.logger.Error("failed to handle in-memory connection", "err", err) + } + }() + return clientConn } diff --git a/emulated/example/main.go b/emulated/example/main.go index 1e9b82b..e2978e2 100644 --- a/emulated/example/main.go +++ b/emulated/example/main.go @@ -9,6 +9,7 @@ import ( "github.com/crypto-smoke/meshtastic-go/emulated" "github.com/crypto-smoke/meshtastic-go/mqtt" "github.com/crypto-smoke/meshtastic-go/radio" + "github.com/crypto-smoke/meshtastic-go/transport" "golang.org/x/sync/errgroup" "time" ) @@ -42,13 +43,14 @@ func main() { PositionLatitudeI: 515014760, PositionLongitudeI: -1406340, PositionAltitude: 2, + + TCPListenAddr: "127.0.0.1:4403", }) if err != nil { panic(err) } eg, egCtx := errgroup.WithContext(ctx) - eg.Go(func() error { if err := r.Run(egCtx); err != nil { return fmt.Errorf("running radio: %w", err) @@ -57,15 +59,12 @@ func main() { }) eg.Go(func() error { - // Forgive me, for I have sinned. - // TODO: We need a way of knowing the radio has come up and is ready that's better than waiting ten seconds. - select { - case <-egCtx.Done(): - return nil - case <-time.After(10 * time.Second): + conn, err := transport.NewClientStreamConn(r.Conn(egCtx)) + if err != nil { + return fmt.Errorf("creating connection: %w", err) } - err := r.ToRadio(egCtx, &pb.ToRadio{ + msg := &pb.ToRadio{ PayloadVariant: &pb.ToRadio_Packet{ Packet: &pb.MeshPacket{ From: nodeID.Uint32(), @@ -79,29 +78,22 @@ func main() { }, }, }, - }) - if err != nil { - return fmt.Errorf("sending to radio: %w", err) } - - return nil - }) - - eg.Go(func() error { - ch := make(chan *pb.FromRadio) - defer close(ch) - err := r.FromRadio(egCtx, ch) - if err != nil { - return fmt.Errorf("setting up FromRadio subscriber: %w", err) + if err := conn.Write(msg); err != nil { + return fmt.Errorf("writing to radio: %w", err) } for { select { case <-egCtx.Done(): return nil - case fromRadio := <-ch: - log.Info("FromRadio!!", "packet", fromRadio) + default: + } + msg := &pb.FromRadio{} + if err := conn.Read(msg); err != nil { + return fmt.Errorf("reading from radio: %w", err) } + log.Info("FromRadio!!", "packet", msg) } }) |
