From 598d6e45b2a79b169056c5abe7a6266aa6d5cb23 Mon Sep 17 00:00:00 2001 From: Noah Stride Date: Mon, 5 Feb 2024 15:32:49 +0000 Subject: Refactor Meshtastic "Stream" Protocol handling and implement TCP listener (#5) * Start hacking on a "StreamConn" * Tidy up write side * Write basic send/receive test * Add support for "wake" Start2 spam * Add test case for reply * Add TCP listener to meshtastic stream conn * Very ugly basic impl that supports `meshtastic --nodes` * Support graceful disconnection command from client * Refactor handling for handleToRadioWantConfigID into it's own method * Send FromRadio messages to clients * Refactor client logic into own type * Fix up serial support for new client * Fix eample * Remove datadump * Make TCP listener optional * Add locking for reading/writing from the connection * Explain knownDevices * Properly close streamConn in example --- emulated/emulated.go | 290 +++++++++++++++++++++++++++++++++++++++--- emulated/example/main.go | 38 +++--- example/main.go | 46 +++++++ go.mod | 7 +- go.sum | 4 +- transport/client.go | 135 ++++++++++++++++++++ transport/serial/serial.go | 269 +-------------------------------------- transport/serial/usb.go | 8 +- transport/stream_conn.go | 187 +++++++++++++++++++++++++++ transport/stream_conn_test.go | 69 ++++++++++ 10 files changed, 743 insertions(+), 310 deletions(-) create mode 100644 example/main.go create mode 100644 transport/client.go create mode 100644 transport/stream_conn.go create mode 100644 transport/stream_conn_test.go diff --git a/emulated/emulated.go b/emulated/emulated.go index 2001276..de62c2a 100644 --- a/emulated/emulated.go +++ b/emulated/emulated.go @@ -8,12 +8,20 @@ import ( "github.com/crypto-smoke/meshtastic-go" "github.com/crypto-smoke/meshtastic-go/mqtt" "github.com/crypto-smoke/meshtastic-go/radio" + "github.com/crypto-smoke/meshtastic-go/transport" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" + "io" + "net" "sync" "time" ) +const ( + // MinAppVersion is the minimum app version supported by the emulated radio. + MinAppVersion = 30200 +) + // Config is the configuration for the emulated Radio. type Config struct { // Dependencies @@ -45,6 +53,9 @@ type Config struct { // PositionAltitude is the altitude of the position which will be regularly broadcasted. // This is in meters above MSL. PositionAltitude int32 + + // TCPListenAddr is the address the emulated radio will listen on for TCP connections and offer the Client API over. + TCPListenAddr string } func (c *Config) validate() error { @@ -149,6 +160,11 @@ func (r *Radio) Run(ctx context.Context) error { } }) } + if r.cfg.TCPListenAddr != "" { + eg.Go(func() error { + return r.listenTCP(egCtx) + }) + } return eg.Wait() } @@ -175,6 +191,17 @@ func (r *Radio) updateNodeDB(nodeID uint32, updateFunc func(*pb.NodeInfo)) { r.nodeDB[nodeID] = nodeInfo } +func (r *Radio) getNodeDB() []*pb.NodeInfo { + r.mu.Lock() + defer r.mu.Unlock() + nodes := make([]*pb.NodeInfo, 0, len(r.nodeDB)) + for _, node := range r.nodeDB { + clonedNode := proto.Clone(node).(*pb.NodeInfo) + nodes = append(nodes, clonedNode) + } + return nodes +} + func (r *Radio) tryHandleMQTTMessage(msg mqtt.Message) error { serviceEnvelope := &pb.ServiceEnvelope{} if err := proto.Unmarshal(msg.Payload, serviceEnvelope); err != nil { @@ -365,25 +392,254 @@ func (r *Radio) dispatchMessageToFromRadio(msg *pb.FromRadio) error { return nil } -// FromRadio subscribes to messages from the radio. -func (r *Radio) FromRadio(ctx context.Context, ch chan<- *pb.FromRadio) error { - r.mu.Lock() - defer r.mu.Unlock() - r.fromRadioSubscribers[ch] = struct{}{} - // TODO: Unsubscribe from the channel when the context is cancelled?? +func (r *Radio) handleToRadioWantConfigID(conn *transport.StreamConn, req *pb.ToRadio_WantConfigId) error { + // Send MyInfo + err := conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_MyInfo{ + MyInfo: &pb.MyNodeInfo{ + MyNodeNum: r.cfg.NodeID.Uint32(), + RebootCount: 0, + // TODO: Track this as a const + MinAppVersion: MinAppVersion, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Metadata + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Metadata{ + Metadata: &pb.DeviceMetadata{ + // TODO: Establish firmwareVersion/deviceStateVersion to fake here + FirmwareVersion: "2.2.19-fake", + DeviceStateVersion: 22, + CanShutdown: true, + HasWifi: true, + HasBluetooth: true, + // PositionFlags? + HwModel: pb.HardwareModel_PRIVATE_HW, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send all NodeDB entries - plus myself. + // TODO: Our own node info entry should be in the DB to avoid the special case here. + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_NodeInfo{ + NodeInfo: &pb.NodeInfo{ + Num: r.cfg.NodeID.Uint32(), + User: &pb.User{ + Id: r.cfg.NodeID.String(), + LongName: r.cfg.LongName, + ShortName: r.cfg.ShortName, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + for _, nodeInfo := range r.getNodeDB() { + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_NodeInfo{ + NodeInfo: nodeInfo, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + + // TODO: Send all channels + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Channel{ + Channel: &pb.Channel{ + Index: 0, + Settings: &pb.ChannelSettings{ + Psk: nil, + }, + Role: pb.Channel_PRIMARY, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Config: Device + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Config{ + Config: &pb.Config{ + PayloadVariant: &pb.Config_Device{ + Device: &pb.Config_DeviceConfig{ + SerialEnabled: true, + NodeInfoBroadcastSecs: uint32(r.cfg.BroadcastNodeInfoInterval.Seconds()), + }, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send ConfigComplete to indicate we're done + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_ConfigCompleteId{ + ConfigCompleteId: req.WantConfigId, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + return nil } -// ToRadio sends a message to the radio. -func (r *Radio) ToRadio(ctx context.Context, msg *pb.ToRadio) error { - switch payload := msg.PayloadVariant.(type) { - case *pb.ToRadio_Disconnect: - r.logger.Info("received Disconnect from ToRadio") - case *pb.ToRadio_Packet: - r.logger.Info("received Packet from ToRadio") - return r.sendPacket(ctx, payload.Packet) - default: - r.logger.Debug("unknown payload variant", "payload", payload) +func (r *Radio) handleConn(ctx context.Context, underlying io.ReadWriteCloser) error { + streamConn := transport.NewRadioStreamConn(underlying) + defer func() { + if err := streamConn.Close(); err != nil { + r.logger.Error("failed to close streamConn", "err", err) + } + }() + + eg, egCtx := errgroup.WithContext(ctx) + // Handling messages coming from client + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return nil + default: + } + msg := &pb.ToRadio{} + if err := streamConn.Read(msg); err != nil { + return fmt.Errorf("reading from streamConn: %w", err) + } + r.logger.Info("received ToRadio from streamConn", "msg", msg) + switch payload := msg.PayloadVariant.(type) { + case *pb.ToRadio_Disconnect: + // The meshtastic python client sends a Disconnect command and with the TCP implementation, it expects + // the radio to close the connection. So we end the read loop here, and return to close the connection. + return nil + case *pb.ToRadio_WantConfigId: + if err := r.handleToRadioWantConfigID(streamConn, payload); err != nil { + return fmt.Errorf("handling WantConfigId: %w", err) + } + case *pb.ToRadio_Packet: + if decoded := payload.Packet.GetDecoded(); decoded != nil { + if decoded.Portnum == pb.PortNum_ADMIN_APP { + admin := &pb.AdminMessage{} + if err := proto.Unmarshal(decoded.Payload, admin); err != nil { + return fmt.Errorf("unmarshalling admin: %w", err) + } + + switch adminPayload := admin.PayloadVariant.(type) { + // TODO: Properly handle channel listing, this hack is just so the Python CLI thinks + // it's connected + case *pb.AdminMessage_GetChannelRequest: + r.logger.Info("received GetChannelRequest", "adminPayload", adminPayload, "packet", payload) + resp := &pb.AdminMessage{ + PayloadVariant: &pb.AdminMessage_GetChannelResponse{ + GetChannelResponse: &pb.Channel{ + Index: 0, + Settings: &pb.ChannelSettings{ + Psk: nil, + }, + Role: pb.Channel_DISABLED, + }, + }, + } + respBytes, err := proto.Marshal(resp) + if err != nil { + return fmt.Errorf("marshalling GetChannelResponse: %w", err) + } + // Send GetChannelResponse + if err := streamConn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Packet{ + Packet: &pb.MeshPacket{ + Id: r.nextPacketID(), + From: r.cfg.NodeID.Uint32(), + To: r.cfg.NodeID.Uint32(), + PayloadVariant: &pb.MeshPacket_Decoded{ + Decoded: &pb.Data{ + Portnum: pb.PortNum_ADMIN_APP, + Payload: respBytes, + RequestId: payload.Packet.Id, + }, + }, + }, + }, + }); err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + } + } + } + } + }) + // Handle sending messages to client + eg.Go(func() error { + ch := make(chan *pb.FromRadio) + r.mu.Lock() + r.fromRadioSubscribers[ch] = struct{}{} + r.mu.Unlock() + defer func() { + r.mu.Lock() + delete(r.fromRadioSubscribers, ch) + r.mu.Unlock() + }() + + for { + select { + case <-egCtx.Done(): + return nil + case msg := <-ch: + if err := streamConn.Write(msg); err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + } + }) + + return eg.Wait() +} + +func (r *Radio) listenTCP(ctx context.Context) error { + l, err := net.Listen("tcp", r.cfg.TCPListenAddr) + if err != nil { + return fmt.Errorf("listening: %w", err) + } + r.logger.Info("listening for tcp connections", "addr", r.cfg.TCPListenAddr) + + for { + c, err := l.Accept() + if err != nil { + r.logger.Error("failed to accept connection", "err", err) + continue + } + go func() { + if err := r.handleConn(ctx, c); err != nil { + r.logger.Error("failed to handle TCP connection", "err", err) + } + }() } - return fmt.Errorf("not implemented") +} + +// Conn returns an in-memory connection to the emulated radio. +func (r *Radio) Conn(ctx context.Context) net.Conn { + clientConn, radioConn := net.Pipe() + go func() { + if err := r.handleConn(ctx, radioConn); err != nil { + r.logger.Error("failed to handle in-memory connection", "err", err) + } + }() + return clientConn } diff --git a/emulated/example/main.go b/emulated/example/main.go index 1e9b82b..e2978e2 100644 --- a/emulated/example/main.go +++ b/emulated/example/main.go @@ -9,6 +9,7 @@ import ( "github.com/crypto-smoke/meshtastic-go/emulated" "github.com/crypto-smoke/meshtastic-go/mqtt" "github.com/crypto-smoke/meshtastic-go/radio" + "github.com/crypto-smoke/meshtastic-go/transport" "golang.org/x/sync/errgroup" "time" ) @@ -42,13 +43,14 @@ func main() { PositionLatitudeI: 515014760, PositionLongitudeI: -1406340, PositionAltitude: 2, + + TCPListenAddr: "127.0.0.1:4403", }) if err != nil { panic(err) } eg, egCtx := errgroup.WithContext(ctx) - eg.Go(func() error { if err := r.Run(egCtx); err != nil { return fmt.Errorf("running radio: %w", err) @@ -57,15 +59,12 @@ func main() { }) eg.Go(func() error { - // Forgive me, for I have sinned. - // TODO: We need a way of knowing the radio has come up and is ready that's better than waiting ten seconds. - select { - case <-egCtx.Done(): - return nil - case <-time.After(10 * time.Second): + conn, err := transport.NewClientStreamConn(r.Conn(egCtx)) + if err != nil { + return fmt.Errorf("creating connection: %w", err) } - err := r.ToRadio(egCtx, &pb.ToRadio{ + msg := &pb.ToRadio{ PayloadVariant: &pb.ToRadio_Packet{ Packet: &pb.MeshPacket{ From: nodeID.Uint32(), @@ -79,29 +78,22 @@ func main() { }, }, }, - }) - if err != nil { - return fmt.Errorf("sending to radio: %w", err) } - - return nil - }) - - eg.Go(func() error { - ch := make(chan *pb.FromRadio) - defer close(ch) - err := r.FromRadio(egCtx, ch) - if err != nil { - return fmt.Errorf("setting up FromRadio subscriber: %w", err) + if err := conn.Write(msg); err != nil { + return fmt.Errorf("writing to radio: %w", err) } for { select { case <-egCtx.Done(): return nil - case fromRadio := <-ch: - log.Info("FromRadio!!", "packet", fromRadio) + default: + } + msg := &pb.FromRadio{} + if err := conn.Read(msg); err != nil { + return fmt.Errorf("reading from radio: %w", err) } + log.Info("FromRadio!!", "packet", msg) } }) diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..3986791 --- /dev/null +++ b/example/main.go @@ -0,0 +1,46 @@ +package main + +import ( + pb "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic" + "context" + "github.com/charmbracelet/log" + "github.com/crypto-smoke/meshtastic-go/transport" + "github.com/crypto-smoke/meshtastic-go/transport/serial" + "google.golang.org/protobuf/proto" + "os" + "os/signal" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + log.SetLevel(log.DebugLevel) + + ports := serial.GetPorts() + serialConn, err := serial.Connect(ports[0]) + if err != nil { + panic(err) + } + streamConn, err := transport.NewClientStreamConn(serialConn) + if err != nil { + panic(err) + } + defer func() { + if err := streamConn.Close(); err != nil { + panic(err) + } + }() + + client := transport.NewClient(streamConn, false) + client.Handle(new(pb.MeshPacket), func(msg proto.Message) { + pkt := msg.(*pb.MeshPacket) + log.Info("Received message from radio", "msg", pkt) + }) + if client.Connect() != nil { + panic("Failed to connect to the radio") + } + + log.Info("Waiting for interrupt signal") + <-ctx.Done() +} diff --git a/go.mod b/go.mod index faec524..50bfafc 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,9 @@ require ( buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go v1.32.0-20240117225219-a9940c43223e.1 github.com/charmbracelet/log v0.3.1 github.com/eclipse/paho.mqtt.golang v1.4.3 - github.com/kylelemons/godebug v1.1.0 + github.com/stretchr/testify v1.8.4 go.bug.st/serial v1.6.1 + golang.org/x/sync v0.1.0 google.golang.org/protobuf v1.32.0 ) @@ -15,6 +16,7 @@ require ( github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/charmbracelet/lipgloss v0.9.1 // indirect github.com/creack/goselect v0.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect @@ -22,9 +24,10 @@ require ( github.com/mattn/go-runewidth v0.0.15 // indirect github.com/muesli/reflow v0.3.0 // indirect github.com/muesli/termenv v0.15.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/net v0.8.0 // indirect - golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.13.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index fe4e795..4dcd71b 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,6 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= -github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98= @@ -55,5 +53,7 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/transport/client.go b/transport/client.go new file mode 100644 index 0000000..042d2ce --- /dev/null +++ b/transport/client.go @@ -0,0 +1,135 @@ +package transport + +import ( + "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic" + "fmt" + "github.com/charmbracelet/log" + "google.golang.org/protobuf/proto" + "math/rand" +) + +type HandlerFunc func(message proto.Message) + +type Client struct { + sc *StreamConn + handlers *HandlerRegistry + log *log.Logger + + config struct { + complete bool + configID uint32 + *meshtastic.MyNodeInfo + *meshtastic.DeviceMetadata + nodes []*meshtastic.NodeInfo + channels []*meshtastic.Channel + config []*meshtastic.Config + modules []*meshtastic.ModuleConfig + } +} + +func NewClient(sc *StreamConn, errorOnNoHandler bool) *Client { + return &Client{ + log: log.WithPrefix("client"), + sc: sc, + handlers: NewHandlerRegistry(errorOnNoHandler), + } +} + +// You have to send this first to get the radio into protobuf mode and have it accept and send packets via serial +func (c *Client) sendGetConfig() error { + r := rand.Uint32() + c.config.configID = r + msg := &meshtastic.ToRadio{ + PayloadVariant: &meshtastic.ToRadio_WantConfigId{ + WantConfigId: r, + }, + } + c.log.Debug("sending want config", "id", r) + if err := c.sc.Write(msg); err != nil { + return fmt.Errorf("writing want config command: %w", err) + } + c.log.Debug("sent want config") + return nil +} + +func (c *Client) Handle(kind proto.Message, handler MessageHandler) { + c.handlers.RegisterHandler(kind, handler) +} + +func (c *Client) SendToRadio(msg *meshtastic.ToRadio) error { + return c.sc.Write(msg) +} + +func (c *Client) Connect() error { + if err := c.sendGetConfig(); err != nil { + return fmt.Errorf("requesting config: %w", err) + } + go func() { + for { + msg := &meshtastic.FromRadio{} + err := c.sc.Read(msg) + if err != nil { + c.log.Error("error reading from radio", "err", err) + continue + } + c.log.Debug("received message from radio", "msg", msg) + var variant proto.Message + switch msg.GetPayloadVariant().(type) { + // These pbufs all get sent upon initial connection to the node + case *meshtastic.FromRadio_MyInfo: + c.config.MyNodeInfo = msg.GetMyInfo() + variant = c.config.MyNodeInfo + case *meshtastic.FromRadio_Metadata: + c.config.DeviceMetadata = msg.GetMetadata() + variant = c.config.DeviceMetadata + case *meshtastic.FromRadio_NodeInfo: + node := msg.GetNodeInfo() + c.config.nodes = append(c.config.nodes, node) + variant = node + case *meshtastic.FromRadio_Channel: + channel := msg.GetChannel() + c.config.channels = append(c.config.channels, channel) + variant = channel + case *meshtastic.FromRadio_Config: + cfg := msg.GetConfig() + c.config.config = append(c.config.config, cfg) + variant = cfg + case *meshtastic.FromRadio_ModuleConfig: + cfg := msg.GetModuleConfig() + c.config.modules = append(c.config.modules, cfg) + variant = cfg + case *meshtastic.FromRadio_ConfigCompleteId: + c.log.Info("config complete") + c.config.complete = true + continue + // below are packets not part of initial connection + + case *meshtastic.FromRadio_LogRecord: + variant = msg.GetLogRecord() + case *meshtastic.FromRadio_MqttClientProxyMessage: + variant = msg.GetMqttClientProxyMessage() + case *meshtastic.FromRadio_QueueStatus: + variant = msg.GetQueueStatus() + case *meshtastic.FromRadio_Rebooted: + // true if radio just rebooted + c.log.Debug("rebooted", "rebooted", msg.GetRebooted()) + continue + case *meshtastic.FromRadio_XmodemPacket: + variant = msg.GetXmodemPacket() + + case *meshtastic.FromRadio_Packet: + variant = msg.GetPacket() + default: + c.log.Warn("unhandled protobuf from radio") + } + if !c.config.complete { + continue + } + err = c.handlers.HandleMessage(variant) + if err != nil { + c.log.Error("error handling message", "err", err) + } + } + }() + return nil +} diff --git a/transport/serial/serial.go b/transport/serial/serial.go index 5e0cb9f..2ace0a8 100644 --- a/transport/serial/serial.go +++ b/transport/serial/serial.go @@ -1,279 +1,20 @@ package serial import ( - "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic" - "encoding/binary" - "fmt" - "github.com/charmbracelet/log" - "github.com/crypto-smoke/meshtastic-go/transport" "go.bug.st/serial" - "google.golang.org/protobuf/proto" - "io" - "math/rand" - "time" ) const ( - WAIT_AFTER_WAKE = 100 * time.Millisecond - START1 = 0x94 - START2 = 0xc3 - PACKET_MTU = 512 - PORT_SPEED = 115200 //921600 + PORT_SPEED = 115200 //921600 ) -type HandlerFunc func(message proto.Message) - -// Serial connection to a node -type Conn struct { - serialPort string - serialConn serial.Port - handlers *transport.HandlerRegistry - - config struct { - complete bool - configID uint32 - *meshtastic.MyNodeInfo - *meshtastic.DeviceMetadata - nodes []*meshtastic.NodeInfo - channels []*meshtastic.Channel - config []*meshtastic.Config - modules []*meshtastic.ModuleConfig - } -} - -func NewConn(port string, errorOnNoHandler bool) *Conn { - var c = Conn{serialPort: port, - handlers: transport.NewHandlerRegistry(errorOnNoHandler)} - return &c -} - -// You have to send this first to get the radio into protobuf mode and have it accept and send packets via serial -func (c *Conn) sendGetConfig() { - r := rand.Uint32() - c.config.configID = r - //log.Info("want config id", r) - msg := &meshtastic.ToRadio{ - PayloadVariant: &meshtastic.ToRadio_WantConfigId{ - WantConfigId: r, - }, - } - c.SendToRadio(msg) -} -func (c *Conn) Handle(kind proto.Message, handler transport.MessageHandler) { - c.handlers.RegisterHandler(kind, handler) -} - -func (c *Conn) Connect() error { - mode := &serial.Mode{ - BaudRate: PORT_SPEED, - } - port, err := serial.Open(c.serialPort, mode) - if err != nil { - return err - } - c.serialConn = port - ch := make(chan *meshtastic.FromRadio) - go c.decodeProtos(false, ch) - go func() { - for { - msg := <-ch - var variant proto.Message - switch msg.GetPayloadVariant().(type) { - // These pbufs all get sent upon initial connection to the node - case *meshtastic.FromRadio_MyInfo: - c.config.MyNodeInfo = msg.GetMyInfo() - variant = c.config.MyNodeInfo - case *meshtastic.FromRadio_Metadata: - c.config.DeviceMetadata = msg.GetMetadata() - variant = c.config.DeviceMetadata - case *meshtastic.FromRadio_NodeInfo: - node := msg.GetNodeInfo() - c.config.nodes = append(c.config.nodes, node) - variant = node - case *meshtastic.FromRadio_Channel: - channel := msg.GetChannel() - c.config.channels = append(c.config.channels, channel) - variant = channel - case *meshtastic.FromRadio_Config: - cfg := msg.GetConfig() - c.config.config = append(c.config.config, cfg) - variant = cfg - case *meshtastic.FromRadio_ModuleConfig: - cfg := msg.GetModuleConfig() - c.config.modules = append(c.config.modules, cfg) - variant = cfg - case *meshtastic.FromRadio_ConfigCompleteId: - // done getting config info - //fmt.Println("config complete") - c.config.complete = true - /* - out, err := json.MarshalIndent(c.config, "", " ") - if err != nil { - log.Error("failed marshalling", "err", err) - continue - } - fmt.Println(string(out)) - out, err = json.MarshalIndent(c.config.config, "", " ") - if err != nil { - log.Error("failed marshalling", "err", err) - continue - } - fmt.Println(string(out)) - - */ - continue - // below are packets not part of initial connection - - case *meshtastic.FromRadio_LogRecord: - variant = msg.GetLogRecord() - case *meshtastic.FromRadio_MqttClientProxyMessage: - variant = msg.GetMqttClientProxyMessage() - case *meshtastic.FromRadio_QueueStatus: - variant = msg.GetQueueStatus() - case *meshtastic.FromRadio_Rebooted: - // true if radio just rebooted - fmt.Print("rebooted", msg.GetRebooted()) - continue - case *meshtastic.FromRadio_XmodemPacket: - variant = msg.GetXmodemPacket() - - case *meshtastic.FromRadio_Packet: - variant = msg.GetPacket() - default: - log.Error("unhandled protobuf from radio") - } - if !c.config.complete { - continue - } - err = c.handlers.HandleMessage(variant) - if err != nil { - log.Error("error handling message", "err", err) - } - } - }() - - c.sendGetConfig() - return nil -} -func (c *Conn) ConnectOld(ch chan *meshtastic.FromRadio, ch2 chan *meshtastic.ToRadio) error { +func Connect(port string) (serial.Port, error) { mode := &serial.Mode{ BaudRate: PORT_SPEED, } - port, err := serial.Open(c.serialPort, mode) - if err != nil { - return err - } - c.serialConn = port - - go c.decodeProtos(false, ch) - go func() { - for { - msg := <-ch2 - c.SendToRadio(msg) - } - }() - c.sendGetConfig() - return nil -} - -func (c *Conn) decodeProtos(printDebug bool, ch chan *meshtastic.FromRadio) { - for { - data, err := readUntilProtobuf(c.serialConn, printDebug) - if err != nil { - log.Info("error:", err) - continue - } - //log.Info("read from serial and got proto") - var msg2 meshtastic.FromRadio - err = proto.Unmarshal(data, &msg2) - if err != nil { - log.Fatal(err) - } - ch <- &msg2 - } -} -func readUntilProtobuf(reader io.Reader, printDebug bool) ([]byte, error) { - buf := make([]byte, 4) - for { - // Read the first byte, looking for START1. - _, err := io.ReadFull(reader, buf[:1]) - if err != nil { - return nil, err - } - - // Check for START1. - if buf[0] != 0x94 { - if printDebug { - fmt.Print(string(buf[0])) - } - continue - } - - // Read the second byte, looking for START2. - _, err = io.ReadFull(reader, buf[1:2]) - if err != nil { - return nil, err - } - - // Check for START2. - if buf[1] != 0xc3 { - continue - } - - // The next two bytes should be the length of the protobuf message. - _, err = io.ReadFull(reader, buf[2:]) - if err != nil { - return nil, err - } - - length := int(binary.BigEndian.Uint16(buf[2:])) - if length > PACKET_MTU { - //packet corrupt, start over - continue - } - //fmt.Println("got packet from node with length", length) - data := make([]byte, length) - - // Read the protobuf data. - _, err = io.ReadFull(reader, data) - if err != nil { - return nil, err - } - - return data, nil - } -} - -func (c *Conn) flushPort() error { - flush := make([]byte, 32) - for j := 0; j < len(flush); j++ { - flush[j] = START2 - } - _, err := c.serialConn.Write(flush) - if err != nil { - return err - } - return nil -} -func (c *Conn) SendToRadio(msg *meshtastic.ToRadio) error { - err := c.flushPort() - if err != nil { - return err - } - //fmt.Printf("Sent %v bytes\n", n) - data, err := proto.Marshal(msg) - if err != nil { - panic(err) - } - time.Sleep(WAIT_AFTER_WAKE) - - datalen := len(data) - header := []byte{START1, START2, byte(datalen >> 8), byte(datalen)} - data = append(header, data...) - _, err = c.serialConn.Write(data) + p, err := serial.Open(port, mode) if err != nil { - log.Fatal(err) + return nil, err } - //fmt.Printf("Sent %v bytes\n", n) - return nil + return p, nil } diff --git a/transport/serial/usb.go b/transport/serial/usb.go index b341321..7656fa4 100644 --- a/transport/serial/usb.go +++ b/transport/serial/usb.go @@ -12,7 +12,11 @@ type usbDevice struct { } var knownDevices = []usbDevice{ - {VID: "239A", PID: "8029"}, // rak4631_19003 + // rak4631_19003 + {VID: "239A", PID: "8029"}, + // CP210x UART Bridge + // Commonly found on Heltec and other devices. + {VID: "10C4", PID: "EA60"}, } func GetPorts() []string { @@ -26,7 +30,7 @@ func GetPorts() []string { return nil } for _, port := range ports { - //fmt.Printf("Found port: %s\n", port.SettingName) + // fmt.Printf("Found port: %s %s\n", port.PID, port.VID) if port.IsUSB { for _, device := range knownDevices { if device.VID != port.VID { diff --git a/transport/stream_conn.go b/transport/stream_conn.go new file mode 100644 index 0000000..e936318 --- /dev/null +++ b/transport/stream_conn.go @@ -0,0 +1,187 @@ +package transport + +import ( + "bytes" + "encoding/binary" + "fmt" + "google.golang.org/protobuf/proto" + "io" + "sync" + "time" +) + +const ( + // WaitAfterWake is the amount of time to wait after sending the wake message before sending the first message. + WaitAfterWake = 100 * time.Millisecond + // Start1 is a magic byte used in the meshtastic stream protocol. + // Start1 is sent at the beginning of a message to indicate the start of a new message. + Start1 = 0x94 + // Start2 is a magic byte used in the meshtastic stream protocol. + // It is sent after Start1 to indicate the start of a new message. + Start2 = 0xc3 + // PacketMTU is the maximum size of the protobuf message which can be sent within the header. + PacketMTU = 512 +) + +// StreamConn implements the meshtastic client API stream protocol. +// This protocol is used to send and receive protobuf messages over a serial or TCP connection. +// See https://meshtastic.org/docs/development/device/client-api#streaming-version for additional information. +type StreamConn struct { + conn io.ReadWriteCloser + // DebugWriter is an optional writer that is used when a non-protobuf message is sent over the connection. + DebugWriter io.Writer + + readMu sync.Mutex + writeMu sync.Mutex +} + +// NewClientStreamConn creates a new StreamConn with the provided io.ReadWriteCloser. +// Once an io.ReadWriteCloser is provided, the StreamConn should be used read, write and close operations. +func NewClientStreamConn(conn io.ReadWriteCloser) (*StreamConn, error) { + sConn := &StreamConn{conn: conn} + if err := sConn.writeWake(); err != nil { + return nil, fmt.Errorf("sending wake message: %w", err) + } + return sConn, nil +} + +// NewRadioStreamConn creates a new StreamConn with the provided io.ReadWriteCloser. +// Once an io.ReadWriteCloser is provided, the StreamConn should be used read, write and close operations. +func NewRadioStreamConn(conn io.ReadWriteCloser) *StreamConn { + return &StreamConn{conn: conn} +} + +// Close closes the connection. +func (c *StreamConn) Close() (err error) { + return c.conn.Close() +} + +// Read reads a protobuf message from the connection. +func (c *StreamConn) Read(out proto.Message) error { + data, err := c.ReadBytes() + if err != nil { + return err + } + return proto.Unmarshal(data, out) +} + +// ReadBytes reads a byte message from the connection. +// Prefer using Read if you have a protobuf message. +func (c *StreamConn) ReadBytes() ([]byte, error) { + c.readMu.Lock() + defer c.readMu.Unlock() + buf := make([]byte, 4) + for { + // Read the first byte, looking for Start1. + _, err := io.ReadFull(c.conn, buf[:1]) + if err != nil { + return nil, err + } + + // Check for Start1. + if buf[0] != Start1 { + if c.DebugWriter != nil { + c.DebugWriter.Write(buf[0:1]) + } + continue + } + + // Read the second byte, looking for Start2. + _, err = io.ReadFull(c.conn, buf[1:2]) + if err != nil { + return nil, err + } + + // Check for Start2. + if buf[1] != Start2 { + continue + } + + // The next two bytes should be the length of the protobuf message. + _, err = io.ReadFull(c.conn, buf[2:]) + if err != nil { + return nil, err + } + + length := int(binary.BigEndian.Uint16(buf[2:])) + if length > PacketMTU { + //packet corrupt, start over + continue + } + data := make([]byte, length) + + // Read the protobuf data. + _, err = io.ReadFull(c.conn, data) + if err != nil { + return nil, err + } + + return data, nil + } +} + +// writeStreamHeader writes the stream protocol header to the provided writer. +// See https://meshtastic.org/docs/development/device/client-api#streaming-version +func writeStreamHeader(w io.Writer, dataLen uint16) error { + header := bytes.NewBuffer(nil) + // First we write Start1, Start2 + header.WriteByte(Start1) + header.WriteByte(Start2) + // Next we write the length of the protobuf message as a big-endian uint16 + err := binary.Write(header, binary.BigEndian, dataLen) + if err != nil { + return fmt.Errorf("writing length to buffer: %w", err) + } + + _, err = w.Write(header.Bytes()) + return err +} + +// Write writes a protobuf message to the connection. +func (c *StreamConn) Write(in proto.Message) error { + protoBytes, err := proto.Marshal(in) + if err != nil { + return fmt.Errorf("marshalling proto message: %w", err) + } + + if err := c.WriteBytes(protoBytes); err != nil { + return err + } + + return nil +} + +// WriteBytes writes a byte slice to the connection. +// Prefer using Write if you have a protobuf message. +func (c *StreamConn) WriteBytes(data []byte) error { + if len(data) > PacketMTU { + return fmt.Errorf("data length exceeds MTU: %d > %d", len(data), PacketMTU) + } + c.writeMu.Lock() + defer c.writeMu.Unlock() + + if err := writeStreamHeader(c.conn, uint16(len(data))); err != nil { + return fmt.Errorf("writing stream header: %w", err) + } + + if _, err := c.conn.Write(data); err != nil { + return fmt.Errorf("writing proto message: %w", err) + } + return nil +} + +// writeWake writes a wake message to the radio. +// This should only be called on the client side. +// +// TODO: Rather than just sending this on start, do we need to also send this after a long period of inactivity? +func (c *StreamConn) writeWake() error { + // Send 32 bytes of Start2 to wake the radio if sleeping. + _, err := c.conn.Write( + bytes.Repeat([]byte{Start2}, 32), + ) + if err != nil { + return err + } + time.Sleep(WaitAfterWake) + return nil +} diff --git a/transport/stream_conn_test.go b/transport/stream_conn_test.go new file mode 100644 index 0000000..c0d996f --- /dev/null +++ b/transport/stream_conn_test.go @@ -0,0 +1,69 @@ +package transport + +import ( + pb "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic" + "bytes" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/proto" + "net" + "testing" +) + +func TestStreamConn(t *testing.T) { + radioNetConn, clientNetConn := net.Pipe() + var client *StreamConn + var radio *StreamConn + + // Test client -> radio + sent := &pb.ToRadio{ + PayloadVariant: &pb.ToRadio_WantConfigId{ + WantConfigId: 123, + }, + } + received := &pb.ToRadio{} + eg := errgroup.Group{} + eg.Go(func() error { + var err error + client, err = NewClientStreamConn(clientNetConn) + require.NoError(t, err) + return client.Write(sent) + }) + eg.Go(func() error { + radio = NewRadioStreamConn(radioNetConn) + return radio.Read(received) + }) + require.NoError(t, eg.Wait()) + require.True(t, proto.Equal(sent, received)) + + // Test radio -> client + replySent := &pb.FromRadio{ + Id: 123, + PayloadVariant: &pb.FromRadio_Config{ + Config: &pb.Config{ + PayloadVariant: &pb.Config_Device{ + Device: &pb.Config_DeviceConfig{ + Role: pb.Config_DeviceConfig_ROUTER, + }, + }, + }, + }, + } + replyReceived := &pb.FromRadio{} + eg = errgroup.Group{} + eg.Go(func() error { + return radio.Write(replySent) + }) + eg.Go(func() error { + return client.Read(replyReceived) + }) + require.NoError(t, eg.Wait()) + require.True(t, proto.Equal(replySent, replyReceived)) +} + +func Test_writeStreamHeader(t *testing.T) { + out := bytes.NewBuffer(nil) + err := writeStreamHeader(out, 257) + require.NoError(t, err) + require.Equal(t, []byte{Start1, Start2, 0x01, 0x01}, out.Bytes()) +} -- cgit v1.2.3