aboutsummaryrefslogtreecommitdiff
path: root/emulated
diff options
context:
space:
mode:
authorNoah Stride <[email protected]>2024-02-05 15:32:49 +0000
committerGitHub <[email protected]>2024-02-05 05:32:49 -1000
commit598d6e45b2a79b169056c5abe7a6266aa6d5cb23 (patch)
tree5a628024bd06a73fda221712a87eaf20b30ccf29 /emulated
parent95a38401baeb4df14d701405609a3dd26ffd747f (diff)
Refactor Meshtastic "Stream" Protocol handling and implement TCP listener (#5)
* Start hacking on a "StreamConn" * Tidy up write side * Write basic send/receive test * Add support for "wake" Start2 spam * Add test case for reply * Add TCP listener to meshtastic stream conn * Very ugly basic impl that supports `meshtastic --nodes` * Support graceful disconnection command from client * Refactor handling for handleToRadioWantConfigID into it's own method * Send FromRadio messages to clients * Refactor client logic into own type * Fix up serial support for new client * Fix eample * Remove datadump * Make TCP listener optional * Add locking for reading/writing from the connection * Explain knownDevices * Properly close streamConn in example
Diffstat (limited to 'emulated')
-rw-r--r--emulated/emulated.go290
-rw-r--r--emulated/example/main.go38
2 files changed, 288 insertions, 40 deletions
diff --git a/emulated/emulated.go b/emulated/emulated.go
index 2001276..de62c2a 100644
--- a/emulated/emulated.go
+++ b/emulated/emulated.go
@@ -8,12 +8,20 @@ import (
"github.com/crypto-smoke/meshtastic-go"
"github.com/crypto-smoke/meshtastic-go/mqtt"
"github.com/crypto-smoke/meshtastic-go/radio"
+ "github.com/crypto-smoke/meshtastic-go/transport"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
+ "io"
+ "net"
"sync"
"time"
)
+const (
+ // MinAppVersion is the minimum app version supported by the emulated radio.
+ MinAppVersion = 30200
+)
+
// Config is the configuration for the emulated Radio.
type Config struct {
// Dependencies
@@ -45,6 +53,9 @@ type Config struct {
// PositionAltitude is the altitude of the position which will be regularly broadcasted.
// This is in meters above MSL.
PositionAltitude int32
+
+ // TCPListenAddr is the address the emulated radio will listen on for TCP connections and offer the Client API over.
+ TCPListenAddr string
}
func (c *Config) validate() error {
@@ -149,6 +160,11 @@ func (r *Radio) Run(ctx context.Context) error {
}
})
}
+ if r.cfg.TCPListenAddr != "" {
+ eg.Go(func() error {
+ return r.listenTCP(egCtx)
+ })
+ }
return eg.Wait()
}
@@ -175,6 +191,17 @@ func (r *Radio) updateNodeDB(nodeID uint32, updateFunc func(*pb.NodeInfo)) {
r.nodeDB[nodeID] = nodeInfo
}
+func (r *Radio) getNodeDB() []*pb.NodeInfo {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ nodes := make([]*pb.NodeInfo, 0, len(r.nodeDB))
+ for _, node := range r.nodeDB {
+ clonedNode := proto.Clone(node).(*pb.NodeInfo)
+ nodes = append(nodes, clonedNode)
+ }
+ return nodes
+}
+
func (r *Radio) tryHandleMQTTMessage(msg mqtt.Message) error {
serviceEnvelope := &pb.ServiceEnvelope{}
if err := proto.Unmarshal(msg.Payload, serviceEnvelope); err != nil {
@@ -365,25 +392,254 @@ func (r *Radio) dispatchMessageToFromRadio(msg *pb.FromRadio) error {
return nil
}
-// FromRadio subscribes to messages from the radio.
-func (r *Radio) FromRadio(ctx context.Context, ch chan<- *pb.FromRadio) error {
- r.mu.Lock()
- defer r.mu.Unlock()
- r.fromRadioSubscribers[ch] = struct{}{}
- // TODO: Unsubscribe from the channel when the context is cancelled??
+func (r *Radio) handleToRadioWantConfigID(conn *transport.StreamConn, req *pb.ToRadio_WantConfigId) error {
+ // Send MyInfo
+ err := conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_MyInfo{
+ MyInfo: &pb.MyNodeInfo{
+ MyNodeNum: r.cfg.NodeID.Uint32(),
+ RebootCount: 0,
+ // TODO: Track this as a const
+ MinAppVersion: MinAppVersion,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Metadata
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Metadata{
+ Metadata: &pb.DeviceMetadata{
+ // TODO: Establish firmwareVersion/deviceStateVersion to fake here
+ FirmwareVersion: "2.2.19-fake",
+ DeviceStateVersion: 22,
+ CanShutdown: true,
+ HasWifi: true,
+ HasBluetooth: true,
+ // PositionFlags?
+ HwModel: pb.HardwareModel_PRIVATE_HW,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send all NodeDB entries - plus myself.
+ // TODO: Our own node info entry should be in the DB to avoid the special case here.
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_NodeInfo{
+ NodeInfo: &pb.NodeInfo{
+ Num: r.cfg.NodeID.Uint32(),
+ User: &pb.User{
+ Id: r.cfg.NodeID.String(),
+ LongName: r.cfg.LongName,
+ ShortName: r.cfg.ShortName,
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ for _, nodeInfo := range r.getNodeDB() {
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_NodeInfo{
+ NodeInfo: nodeInfo,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+
+ // TODO: Send all channels
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Channel{
+ Channel: &pb.Channel{
+ Index: 0,
+ Settings: &pb.ChannelSettings{
+ Psk: nil,
+ },
+ Role: pb.Channel_PRIMARY,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Config: Device
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Config{
+ Config: &pb.Config{
+ PayloadVariant: &pb.Config_Device{
+ Device: &pb.Config_DeviceConfig{
+ SerialEnabled: true,
+ NodeInfoBroadcastSecs: uint32(r.cfg.BroadcastNodeInfoInterval.Seconds()),
+ },
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send ConfigComplete to indicate we're done
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_ConfigCompleteId{
+ ConfigCompleteId: req.WantConfigId,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
return nil
}
-// ToRadio sends a message to the radio.
-func (r *Radio) ToRadio(ctx context.Context, msg *pb.ToRadio) error {
- switch payload := msg.PayloadVariant.(type) {
- case *pb.ToRadio_Disconnect:
- r.logger.Info("received Disconnect from ToRadio")
- case *pb.ToRadio_Packet:
- r.logger.Info("received Packet from ToRadio")
- return r.sendPacket(ctx, payload.Packet)
- default:
- r.logger.Debug("unknown payload variant", "payload", payload)
+func (r *Radio) handleConn(ctx context.Context, underlying io.ReadWriteCloser) error {
+ streamConn := transport.NewRadioStreamConn(underlying)
+ defer func() {
+ if err := streamConn.Close(); err != nil {
+ r.logger.Error("failed to close streamConn", "err", err)
+ }
+ }()
+
+ eg, egCtx := errgroup.WithContext(ctx)
+ // Handling messages coming from client
+ eg.Go(func() error {
+ for {
+ select {
+ case <-egCtx.Done():
+ return nil
+ default:
+ }
+ msg := &pb.ToRadio{}
+ if err := streamConn.Read(msg); err != nil {
+ return fmt.Errorf("reading from streamConn: %w", err)
+ }
+ r.logger.Info("received ToRadio from streamConn", "msg", msg)
+ switch payload := msg.PayloadVariant.(type) {
+ case *pb.ToRadio_Disconnect:
+ // The meshtastic python client sends a Disconnect command and with the TCP implementation, it expects
+ // the radio to close the connection. So we end the read loop here, and return to close the connection.
+ return nil
+ case *pb.ToRadio_WantConfigId:
+ if err := r.handleToRadioWantConfigID(streamConn, payload); err != nil {
+ return fmt.Errorf("handling WantConfigId: %w", err)
+ }
+ case *pb.ToRadio_Packet:
+ if decoded := payload.Packet.GetDecoded(); decoded != nil {
+ if decoded.Portnum == pb.PortNum_ADMIN_APP {
+ admin := &pb.AdminMessage{}
+ if err := proto.Unmarshal(decoded.Payload, admin); err != nil {
+ return fmt.Errorf("unmarshalling admin: %w", err)
+ }
+
+ switch adminPayload := admin.PayloadVariant.(type) {
+ // TODO: Properly handle channel listing, this hack is just so the Python CLI thinks
+ // it's connected
+ case *pb.AdminMessage_GetChannelRequest:
+ r.logger.Info("received GetChannelRequest", "adminPayload", adminPayload, "packet", payload)
+ resp := &pb.AdminMessage{
+ PayloadVariant: &pb.AdminMessage_GetChannelResponse{
+ GetChannelResponse: &pb.Channel{
+ Index: 0,
+ Settings: &pb.ChannelSettings{
+ Psk: nil,
+ },
+ Role: pb.Channel_DISABLED,
+ },
+ },
+ }
+ respBytes, err := proto.Marshal(resp)
+ if err != nil {
+ return fmt.Errorf("marshalling GetChannelResponse: %w", err)
+ }
+ // Send GetChannelResponse
+ if err := streamConn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Packet{
+ Packet: &pb.MeshPacket{
+ Id: r.nextPacketID(),
+ From: r.cfg.NodeID.Uint32(),
+ To: r.cfg.NodeID.Uint32(),
+ PayloadVariant: &pb.MeshPacket_Decoded{
+ Decoded: &pb.Data{
+ Portnum: pb.PortNum_ADMIN_APP,
+ Payload: respBytes,
+ RequestId: payload.Packet.Id,
+ },
+ },
+ },
+ },
+ }); err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+ }
+ }
+ }
+ }
+ })
+ // Handle sending messages to client
+ eg.Go(func() error {
+ ch := make(chan *pb.FromRadio)
+ r.mu.Lock()
+ r.fromRadioSubscribers[ch] = struct{}{}
+ r.mu.Unlock()
+ defer func() {
+ r.mu.Lock()
+ delete(r.fromRadioSubscribers, ch)
+ r.mu.Unlock()
+ }()
+
+ for {
+ select {
+ case <-egCtx.Done():
+ return nil
+ case msg := <-ch:
+ if err := streamConn.Write(msg); err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+ }
+ })
+
+ return eg.Wait()
+}
+
+func (r *Radio) listenTCP(ctx context.Context) error {
+ l, err := net.Listen("tcp", r.cfg.TCPListenAddr)
+ if err != nil {
+ return fmt.Errorf("listening: %w", err)
+ }
+ r.logger.Info("listening for tcp connections", "addr", r.cfg.TCPListenAddr)
+
+ for {
+ c, err := l.Accept()
+ if err != nil {
+ r.logger.Error("failed to accept connection", "err", err)
+ continue
+ }
+ go func() {
+ if err := r.handleConn(ctx, c); err != nil {
+ r.logger.Error("failed to handle TCP connection", "err", err)
+ }
+ }()
}
- return fmt.Errorf("not implemented")
+}
+
+// Conn returns an in-memory connection to the emulated radio.
+func (r *Radio) Conn(ctx context.Context) net.Conn {
+ clientConn, radioConn := net.Pipe()
+ go func() {
+ if err := r.handleConn(ctx, radioConn); err != nil {
+ r.logger.Error("failed to handle in-memory connection", "err", err)
+ }
+ }()
+ return clientConn
}
diff --git a/emulated/example/main.go b/emulated/example/main.go
index 1e9b82b..e2978e2 100644
--- a/emulated/example/main.go
+++ b/emulated/example/main.go
@@ -9,6 +9,7 @@ import (
"github.com/crypto-smoke/meshtastic-go/emulated"
"github.com/crypto-smoke/meshtastic-go/mqtt"
"github.com/crypto-smoke/meshtastic-go/radio"
+ "github.com/crypto-smoke/meshtastic-go/transport"
"golang.org/x/sync/errgroup"
"time"
)
@@ -42,13 +43,14 @@ func main() {
PositionLatitudeI: 515014760,
PositionLongitudeI: -1406340,
PositionAltitude: 2,
+
+ TCPListenAddr: "127.0.0.1:4403",
})
if err != nil {
panic(err)
}
eg, egCtx := errgroup.WithContext(ctx)
-
eg.Go(func() error {
if err := r.Run(egCtx); err != nil {
return fmt.Errorf("running radio: %w", err)
@@ -57,15 +59,12 @@ func main() {
})
eg.Go(func() error {
- // Forgive me, for I have sinned.
- // TODO: We need a way of knowing the radio has come up and is ready that's better than waiting ten seconds.
- select {
- case <-egCtx.Done():
- return nil
- case <-time.After(10 * time.Second):
+ conn, err := transport.NewClientStreamConn(r.Conn(egCtx))
+ if err != nil {
+ return fmt.Errorf("creating connection: %w", err)
}
- err := r.ToRadio(egCtx, &pb.ToRadio{
+ msg := &pb.ToRadio{
PayloadVariant: &pb.ToRadio_Packet{
Packet: &pb.MeshPacket{
From: nodeID.Uint32(),
@@ -79,29 +78,22 @@ func main() {
},
},
},
- })
- if err != nil {
- return fmt.Errorf("sending to radio: %w", err)
}
-
- return nil
- })
-
- eg.Go(func() error {
- ch := make(chan *pb.FromRadio)
- defer close(ch)
- err := r.FromRadio(egCtx, ch)
- if err != nil {
- return fmt.Errorf("setting up FromRadio subscriber: %w", err)
+ if err := conn.Write(msg); err != nil {
+ return fmt.Errorf("writing to radio: %w", err)
}
for {
select {
case <-egCtx.Done():
return nil
- case fromRadio := <-ch:
- log.Info("FromRadio!!", "packet", fromRadio)
+ default:
+ }
+ msg := &pb.FromRadio{}
+ if err := conn.Read(msg); err != nil {
+ return fmt.Errorf("reading from radio: %w", err)
}
+ log.Info("FromRadio!!", "packet", msg)
}
})