aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--emulated/emulated.go290
-rw-r--r--emulated/example/main.go38
-rw-r--r--example/main.go46
-rw-r--r--go.mod7
-rw-r--r--go.sum4
-rw-r--r--transport/client.go135
-rw-r--r--transport/serial/serial.go269
-rw-r--r--transport/serial/usb.go8
-rw-r--r--transport/stream_conn.go187
-rw-r--r--transport/stream_conn_test.go69
10 files changed, 743 insertions, 310 deletions
diff --git a/emulated/emulated.go b/emulated/emulated.go
index 2001276..de62c2a 100644
--- a/emulated/emulated.go
+++ b/emulated/emulated.go
@@ -8,12 +8,20 @@ import (
"github.com/crypto-smoke/meshtastic-go"
"github.com/crypto-smoke/meshtastic-go/mqtt"
"github.com/crypto-smoke/meshtastic-go/radio"
+ "github.com/crypto-smoke/meshtastic-go/transport"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
+ "io"
+ "net"
"sync"
"time"
)
+const (
+ // MinAppVersion is the minimum app version supported by the emulated radio.
+ MinAppVersion = 30200
+)
+
// Config is the configuration for the emulated Radio.
type Config struct {
// Dependencies
@@ -45,6 +53,9 @@ type Config struct {
// PositionAltitude is the altitude of the position which will be regularly broadcasted.
// This is in meters above MSL.
PositionAltitude int32
+
+ // TCPListenAddr is the address the emulated radio will listen on for TCP connections and offer the Client API over.
+ TCPListenAddr string
}
func (c *Config) validate() error {
@@ -149,6 +160,11 @@ func (r *Radio) Run(ctx context.Context) error {
}
})
}
+ if r.cfg.TCPListenAddr != "" {
+ eg.Go(func() error {
+ return r.listenTCP(egCtx)
+ })
+ }
return eg.Wait()
}
@@ -175,6 +191,17 @@ func (r *Radio) updateNodeDB(nodeID uint32, updateFunc func(*pb.NodeInfo)) {
r.nodeDB[nodeID] = nodeInfo
}
+func (r *Radio) getNodeDB() []*pb.NodeInfo {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ nodes := make([]*pb.NodeInfo, 0, len(r.nodeDB))
+ for _, node := range r.nodeDB {
+ clonedNode := proto.Clone(node).(*pb.NodeInfo)
+ nodes = append(nodes, clonedNode)
+ }
+ return nodes
+}
+
func (r *Radio) tryHandleMQTTMessage(msg mqtt.Message) error {
serviceEnvelope := &pb.ServiceEnvelope{}
if err := proto.Unmarshal(msg.Payload, serviceEnvelope); err != nil {
@@ -365,25 +392,254 @@ func (r *Radio) dispatchMessageToFromRadio(msg *pb.FromRadio) error {
return nil
}
-// FromRadio subscribes to messages from the radio.
-func (r *Radio) FromRadio(ctx context.Context, ch chan<- *pb.FromRadio) error {
- r.mu.Lock()
- defer r.mu.Unlock()
- r.fromRadioSubscribers[ch] = struct{}{}
- // TODO: Unsubscribe from the channel when the context is cancelled??
+func (r *Radio) handleToRadioWantConfigID(conn *transport.StreamConn, req *pb.ToRadio_WantConfigId) error {
+ // Send MyInfo
+ err := conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_MyInfo{
+ MyInfo: &pb.MyNodeInfo{
+ MyNodeNum: r.cfg.NodeID.Uint32(),
+ RebootCount: 0,
+ // TODO: Track this as a const
+ MinAppVersion: MinAppVersion,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Metadata
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Metadata{
+ Metadata: &pb.DeviceMetadata{
+ // TODO: Establish firmwareVersion/deviceStateVersion to fake here
+ FirmwareVersion: "2.2.19-fake",
+ DeviceStateVersion: 22,
+ CanShutdown: true,
+ HasWifi: true,
+ HasBluetooth: true,
+ // PositionFlags?
+ HwModel: pb.HardwareModel_PRIVATE_HW,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send all NodeDB entries - plus myself.
+ // TODO: Our own node info entry should be in the DB to avoid the special case here.
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_NodeInfo{
+ NodeInfo: &pb.NodeInfo{
+ Num: r.cfg.NodeID.Uint32(),
+ User: &pb.User{
+ Id: r.cfg.NodeID.String(),
+ LongName: r.cfg.LongName,
+ ShortName: r.cfg.ShortName,
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ for _, nodeInfo := range r.getNodeDB() {
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_NodeInfo{
+ NodeInfo: nodeInfo,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+
+ // TODO: Send all channels
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Channel{
+ Channel: &pb.Channel{
+ Index: 0,
+ Settings: &pb.ChannelSettings{
+ Psk: nil,
+ },
+ Role: pb.Channel_PRIMARY,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Config: Device
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Config{
+ Config: &pb.Config{
+ PayloadVariant: &pb.Config_Device{
+ Device: &pb.Config_DeviceConfig{
+ SerialEnabled: true,
+ NodeInfoBroadcastSecs: uint32(r.cfg.BroadcastNodeInfoInterval.Seconds()),
+ },
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send ConfigComplete to indicate we're done
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_ConfigCompleteId{
+ ConfigCompleteId: req.WantConfigId,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
return nil
}
-// ToRadio sends a message to the radio.
-func (r *Radio) ToRadio(ctx context.Context, msg *pb.ToRadio) error {
- switch payload := msg.PayloadVariant.(type) {
- case *pb.ToRadio_Disconnect:
- r.logger.Info("received Disconnect from ToRadio")
- case *pb.ToRadio_Packet:
- r.logger.Info("received Packet from ToRadio")
- return r.sendPacket(ctx, payload.Packet)
- default:
- r.logger.Debug("unknown payload variant", "payload", payload)
+func (r *Radio) handleConn(ctx context.Context, underlying io.ReadWriteCloser) error {
+ streamConn := transport.NewRadioStreamConn(underlying)
+ defer func() {
+ if err := streamConn.Close(); err != nil {
+ r.logger.Error("failed to close streamConn", "err", err)
+ }
+ }()
+
+ eg, egCtx := errgroup.WithContext(ctx)
+ // Handling messages coming from client
+ eg.Go(func() error {
+ for {
+ select {
+ case <-egCtx.Done():
+ return nil
+ default:
+ }
+ msg := &pb.ToRadio{}
+ if err := streamConn.Read(msg); err != nil {
+ return fmt.Errorf("reading from streamConn: %w", err)
+ }
+ r.logger.Info("received ToRadio from streamConn", "msg", msg)
+ switch payload := msg.PayloadVariant.(type) {
+ case *pb.ToRadio_Disconnect:
+ // The meshtastic python client sends a Disconnect command and with the TCP implementation, it expects
+ // the radio to close the connection. So we end the read loop here, and return to close the connection.
+ return nil
+ case *pb.ToRadio_WantConfigId:
+ if err := r.handleToRadioWantConfigID(streamConn, payload); err != nil {
+ return fmt.Errorf("handling WantConfigId: %w", err)
+ }
+ case *pb.ToRadio_Packet:
+ if decoded := payload.Packet.GetDecoded(); decoded != nil {
+ if decoded.Portnum == pb.PortNum_ADMIN_APP {
+ admin := &pb.AdminMessage{}
+ if err := proto.Unmarshal(decoded.Payload, admin); err != nil {
+ return fmt.Errorf("unmarshalling admin: %w", err)
+ }
+
+ switch adminPayload := admin.PayloadVariant.(type) {
+ // TODO: Properly handle channel listing, this hack is just so the Python CLI thinks
+ // it's connected
+ case *pb.AdminMessage_GetChannelRequest:
+ r.logger.Info("received GetChannelRequest", "adminPayload", adminPayload, "packet", payload)
+ resp := &pb.AdminMessage{
+ PayloadVariant: &pb.AdminMessage_GetChannelResponse{
+ GetChannelResponse: &pb.Channel{
+ Index: 0,
+ Settings: &pb.ChannelSettings{
+ Psk: nil,
+ },
+ Role: pb.Channel_DISABLED,
+ },
+ },
+ }
+ respBytes, err := proto.Marshal(resp)
+ if err != nil {
+ return fmt.Errorf("marshalling GetChannelResponse: %w", err)
+ }
+ // Send GetChannelResponse
+ if err := streamConn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Packet{
+ Packet: &pb.MeshPacket{
+ Id: r.nextPacketID(),
+ From: r.cfg.NodeID.Uint32(),
+ To: r.cfg.NodeID.Uint32(),
+ PayloadVariant: &pb.MeshPacket_Decoded{
+ Decoded: &pb.Data{
+ Portnum: pb.PortNum_ADMIN_APP,
+ Payload: respBytes,
+ RequestId: payload.Packet.Id,
+ },
+ },
+ },
+ },
+ }); err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+ }
+ }
+ }
+ }
+ })
+ // Handle sending messages to client
+ eg.Go(func() error {
+ ch := make(chan *pb.FromRadio)
+ r.mu.Lock()
+ r.fromRadioSubscribers[ch] = struct{}{}
+ r.mu.Unlock()
+ defer func() {
+ r.mu.Lock()
+ delete(r.fromRadioSubscribers, ch)
+ r.mu.Unlock()
+ }()
+
+ for {
+ select {
+ case <-egCtx.Done():
+ return nil
+ case msg := <-ch:
+ if err := streamConn.Write(msg); err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+ }
+ })
+
+ return eg.Wait()
+}
+
+func (r *Radio) listenTCP(ctx context.Context) error {
+ l, err := net.Listen("tcp", r.cfg.TCPListenAddr)
+ if err != nil {
+ return fmt.Errorf("listening: %w", err)
+ }
+ r.logger.Info("listening for tcp connections", "addr", r.cfg.TCPListenAddr)
+
+ for {
+ c, err := l.Accept()
+ if err != nil {
+ r.logger.Error("failed to accept connection", "err", err)
+ continue
+ }
+ go func() {
+ if err := r.handleConn(ctx, c); err != nil {
+ r.logger.Error("failed to handle TCP connection", "err", err)
+ }
+ }()
}
- return fmt.Errorf("not implemented")
+}
+
+// Conn returns an in-memory connection to the emulated radio.
+func (r *Radio) Conn(ctx context.Context) net.Conn {
+ clientConn, radioConn := net.Pipe()
+ go func() {
+ if err := r.handleConn(ctx, radioConn); err != nil {
+ r.logger.Error("failed to handle in-memory connection", "err", err)
+ }
+ }()
+ return clientConn
}
diff --git a/emulated/example/main.go b/emulated/example/main.go
index 1e9b82b..e2978e2 100644
--- a/emulated/example/main.go
+++ b/emulated/example/main.go
@@ -9,6 +9,7 @@ import (
"github.com/crypto-smoke/meshtastic-go/emulated"
"github.com/crypto-smoke/meshtastic-go/mqtt"
"github.com/crypto-smoke/meshtastic-go/radio"
+ "github.com/crypto-smoke/meshtastic-go/transport"
"golang.org/x/sync/errgroup"
"time"
)
@@ -42,13 +43,14 @@ func main() {
PositionLatitudeI: 515014760,
PositionLongitudeI: -1406340,
PositionAltitude: 2,
+
+ TCPListenAddr: "127.0.0.1:4403",
})
if err != nil {
panic(err)
}
eg, egCtx := errgroup.WithContext(ctx)
-
eg.Go(func() error {
if err := r.Run(egCtx); err != nil {
return fmt.Errorf("running radio: %w", err)
@@ -57,15 +59,12 @@ func main() {
})
eg.Go(func() error {
- // Forgive me, for I have sinned.
- // TODO: We need a way of knowing the radio has come up and is ready that's better than waiting ten seconds.
- select {
- case <-egCtx.Done():
- return nil
- case <-time.After(10 * time.Second):
+ conn, err := transport.NewClientStreamConn(r.Conn(egCtx))
+ if err != nil {
+ return fmt.Errorf("creating connection: %w", err)
}
- err := r.ToRadio(egCtx, &pb.ToRadio{
+ msg := &pb.ToRadio{
PayloadVariant: &pb.ToRadio_Packet{
Packet: &pb.MeshPacket{
From: nodeID.Uint32(),
@@ -79,29 +78,22 @@ func main() {
},
},
},
- })
- if err != nil {
- return fmt.Errorf("sending to radio: %w", err)
}
-
- return nil
- })
-
- eg.Go(func() error {
- ch := make(chan *pb.FromRadio)
- defer close(ch)
- err := r.FromRadio(egCtx, ch)
- if err != nil {
- return fmt.Errorf("setting up FromRadio subscriber: %w", err)
+ if err := conn.Write(msg); err != nil {
+ return fmt.Errorf("writing to radio: %w", err)
}
for {
select {
case <-egCtx.Done():
return nil
- case fromRadio := <-ch:
- log.Info("FromRadio!!", "packet", fromRadio)
+ default:
+ }
+ msg := &pb.FromRadio{}
+ if err := conn.Read(msg); err != nil {
+ return fmt.Errorf("reading from radio: %w", err)
}
+ log.Info("FromRadio!!", "packet", msg)
}
})
diff --git a/example/main.go b/example/main.go
new file mode 100644
index 0000000..3986791
--- /dev/null
+++ b/example/main.go
@@ -0,0 +1,46 @@
+package main
+
+import (
+ pb "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic"
+ "context"
+ "github.com/charmbracelet/log"
+ "github.com/crypto-smoke/meshtastic-go/transport"
+ "github.com/crypto-smoke/meshtastic-go/transport/serial"
+ "google.golang.org/protobuf/proto"
+ "os"
+ "os/signal"
+)
+
+func main() {
+ ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
+ defer cancel()
+
+ log.SetLevel(log.DebugLevel)
+
+ ports := serial.GetPorts()
+ serialConn, err := serial.Connect(ports[0])
+ if err != nil {
+ panic(err)
+ }
+ streamConn, err := transport.NewClientStreamConn(serialConn)
+ if err != nil {
+ panic(err)
+ }
+ defer func() {
+ if err := streamConn.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ client := transport.NewClient(streamConn, false)
+ client.Handle(new(pb.MeshPacket), func(msg proto.Message) {
+ pkt := msg.(*pb.MeshPacket)
+ log.Info("Received message from radio", "msg", pkt)
+ })
+ if client.Connect() != nil {
+ panic("Failed to connect to the radio")
+ }
+
+ log.Info("Waiting for interrupt signal")
+ <-ctx.Done()
+}
diff --git a/go.mod b/go.mod
index faec524..50bfafc 100644
--- a/go.mod
+++ b/go.mod
@@ -6,8 +6,9 @@ require (
buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go v1.32.0-20240117225219-a9940c43223e.1
github.com/charmbracelet/log v0.3.1
github.com/eclipse/paho.mqtt.golang v1.4.3
- github.com/kylelemons/godebug v1.1.0
+ github.com/stretchr/testify v1.8.4
go.bug.st/serial v1.6.1
+ golang.org/x/sync v0.1.0
google.golang.org/protobuf v1.32.0
)
@@ -15,6 +16,7 @@ require (
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/charmbracelet/lipgloss v0.9.1 // indirect
github.com/creack/goselect v0.1.2 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
@@ -22,9 +24,10 @@ require (
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/muesli/reflow v0.3.0 // indirect
github.com/muesli/termenv v0.15.2 // indirect
+ github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/net v0.8.0 // indirect
- golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.13.0 // indirect
+ gopkg.in/yaml.v3 v3.0.1 // indirect
)
diff --git a/go.sum b/go.sum
index fe4e795..4dcd71b 100644
--- a/go.sum
+++ b/go.sum
@@ -20,8 +20,6 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
-github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
-github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98=
@@ -55,5 +53,7 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/transport/client.go b/transport/client.go
new file mode 100644
index 0000000..042d2ce
--- /dev/null
+++ b/transport/client.go
@@ -0,0 +1,135 @@
+package transport
+
+import (
+ "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic"
+ "fmt"
+ "github.com/charmbracelet/log"
+ "google.golang.org/protobuf/proto"
+ "math/rand"
+)
+
+type HandlerFunc func(message proto.Message)
+
+type Client struct {
+ sc *StreamConn
+ handlers *HandlerRegistry
+ log *log.Logger
+
+ config struct {
+ complete bool
+ configID uint32
+ *meshtastic.MyNodeInfo
+ *meshtastic.DeviceMetadata
+ nodes []*meshtastic.NodeInfo
+ channels []*meshtastic.Channel
+ config []*meshtastic.Config
+ modules []*meshtastic.ModuleConfig
+ }
+}
+
+func NewClient(sc *StreamConn, errorOnNoHandler bool) *Client {
+ return &Client{
+ log: log.WithPrefix("client"),
+ sc: sc,
+ handlers: NewHandlerRegistry(errorOnNoHandler),
+ }
+}
+
+// You have to send this first to get the radio into protobuf mode and have it accept and send packets via serial
+func (c *Client) sendGetConfig() error {
+ r := rand.Uint32()
+ c.config.configID = r
+ msg := &meshtastic.ToRadio{
+ PayloadVariant: &meshtastic.ToRadio_WantConfigId{
+ WantConfigId: r,
+ },
+ }
+ c.log.Debug("sending want config", "id", r)
+ if err := c.sc.Write(msg); err != nil {
+ return fmt.Errorf("writing want config command: %w", err)
+ }
+ c.log.Debug("sent want config")
+ return nil
+}
+
+func (c *Client) Handle(kind proto.Message, handler MessageHandler) {
+ c.handlers.RegisterHandler(kind, handler)
+}
+
+func (c *Client) SendToRadio(msg *meshtastic.ToRadio) error {
+ return c.sc.Write(msg)
+}
+
+func (c *Client) Connect() error {
+ if err := c.sendGetConfig(); err != nil {
+ return fmt.Errorf("requesting config: %w", err)
+ }
+ go func() {
+ for {
+ msg := &meshtastic.FromRadio{}
+ err := c.sc.Read(msg)
+ if err != nil {
+ c.log.Error("error reading from radio", "err", err)
+ continue
+ }
+ c.log.Debug("received message from radio", "msg", msg)
+ var variant proto.Message
+ switch msg.GetPayloadVariant().(type) {
+ // These pbufs all get sent upon initial connection to the node
+ case *meshtastic.FromRadio_MyInfo:
+ c.config.MyNodeInfo = msg.GetMyInfo()
+ variant = c.config.MyNodeInfo
+ case *meshtastic.FromRadio_Metadata:
+ c.config.DeviceMetadata = msg.GetMetadata()
+ variant = c.config.DeviceMetadata
+ case *meshtastic.FromRadio_NodeInfo:
+ node := msg.GetNodeInfo()
+ c.config.nodes = append(c.config.nodes, node)
+ variant = node
+ case *meshtastic.FromRadio_Channel:
+ channel := msg.GetChannel()
+ c.config.channels = append(c.config.channels, channel)
+ variant = channel
+ case *meshtastic.FromRadio_Config:
+ cfg := msg.GetConfig()
+ c.config.config = append(c.config.config, cfg)
+ variant = cfg
+ case *meshtastic.FromRadio_ModuleConfig:
+ cfg := msg.GetModuleConfig()
+ c.config.modules = append(c.config.modules, cfg)
+ variant = cfg
+ case *meshtastic.FromRadio_ConfigCompleteId:
+ c.log.Info("config complete")
+ c.config.complete = true
+ continue
+ // below are packets not part of initial connection
+
+ case *meshtastic.FromRadio_LogRecord:
+ variant = msg.GetLogRecord()
+ case *meshtastic.FromRadio_MqttClientProxyMessage:
+ variant = msg.GetMqttClientProxyMessage()
+ case *meshtastic.FromRadio_QueueStatus:
+ variant = msg.GetQueueStatus()
+ case *meshtastic.FromRadio_Rebooted:
+ // true if radio just rebooted
+ c.log.Debug("rebooted", "rebooted", msg.GetRebooted())
+ continue
+ case *meshtastic.FromRadio_XmodemPacket:
+ variant = msg.GetXmodemPacket()
+
+ case *meshtastic.FromRadio_Packet:
+ variant = msg.GetPacket()
+ default:
+ c.log.Warn("unhandled protobuf from radio")
+ }
+ if !c.config.complete {
+ continue
+ }
+ err = c.handlers.HandleMessage(variant)
+ if err != nil {
+ c.log.Error("error handling message", "err", err)
+ }
+ }
+ }()
+ return nil
+}
diff --git a/transport/serial/serial.go b/transport/serial/serial.go
index 5e0cb9f..2ace0a8 100644
--- a/transport/serial/serial.go
+++ b/transport/serial/serial.go
@@ -1,279 +1,20 @@
package serial
import (
- "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic"
- "encoding/binary"
- "fmt"
- "github.com/charmbracelet/log"
- "github.com/crypto-smoke/meshtastic-go/transport"
"go.bug.st/serial"
- "google.golang.org/protobuf/proto"
- "io"
- "math/rand"
- "time"
)
const (
- WAIT_AFTER_WAKE = 100 * time.Millisecond
- START1 = 0x94
- START2 = 0xc3
- PACKET_MTU = 512
- PORT_SPEED = 115200 //921600
+ PORT_SPEED = 115200 //921600
)
-type HandlerFunc func(message proto.Message)
-
-// Serial connection to a node
-type Conn struct {
- serialPort string
- serialConn serial.Port
- handlers *transport.HandlerRegistry
-
- config struct {
- complete bool
- configID uint32
- *meshtastic.MyNodeInfo
- *meshtastic.DeviceMetadata
- nodes []*meshtastic.NodeInfo
- channels []*meshtastic.Channel
- config []*meshtastic.Config
- modules []*meshtastic.ModuleConfig
- }
-}
-
-func NewConn(port string, errorOnNoHandler bool) *Conn {
- var c = Conn{serialPort: port,
- handlers: transport.NewHandlerRegistry(errorOnNoHandler)}
- return &c
-}
-
-// You have to send this first to get the radio into protobuf mode and have it accept and send packets via serial
-func (c *Conn) sendGetConfig() {
- r := rand.Uint32()
- c.config.configID = r
- //log.Info("want config id", r)
- msg := &meshtastic.ToRadio{
- PayloadVariant: &meshtastic.ToRadio_WantConfigId{
- WantConfigId: r,
- },
- }
- c.SendToRadio(msg)
-}
-func (c *Conn) Handle(kind proto.Message, handler transport.MessageHandler) {
- c.handlers.RegisterHandler(kind, handler)
-}
-
-func (c *Conn) Connect() error {
- mode := &serial.Mode{
- BaudRate: PORT_SPEED,
- }
- port, err := serial.Open(c.serialPort, mode)
- if err != nil {
- return err
- }
- c.serialConn = port
- ch := make(chan *meshtastic.FromRadio)
- go c.decodeProtos(false, ch)
- go func() {
- for {
- msg := <-ch
- var variant proto.Message
- switch msg.GetPayloadVariant().(type) {
- // These pbufs all get sent upon initial connection to the node
- case *meshtastic.FromRadio_MyInfo:
- c.config.MyNodeInfo = msg.GetMyInfo()
- variant = c.config.MyNodeInfo
- case *meshtastic.FromRadio_Metadata:
- c.config.DeviceMetadata = msg.GetMetadata()
- variant = c.config.DeviceMetadata
- case *meshtastic.FromRadio_NodeInfo:
- node := msg.GetNodeInfo()
- c.config.nodes = append(c.config.nodes, node)
- variant = node
- case *meshtastic.FromRadio_Channel:
- channel := msg.GetChannel()
- c.config.channels = append(c.config.channels, channel)
- variant = channel
- case *meshtastic.FromRadio_Config:
- cfg := msg.GetConfig()
- c.config.config = append(c.config.config, cfg)
- variant = cfg
- case *meshtastic.FromRadio_ModuleConfig:
- cfg := msg.GetModuleConfig()
- c.config.modules = append(c.config.modules, cfg)
- variant = cfg
- case *meshtastic.FromRadio_ConfigCompleteId:
- // done getting config info
- //fmt.Println("config complete")
- c.config.complete = true
- /*
- out, err := json.MarshalIndent(c.config, "", " ")
- if err != nil {
- log.Error("failed marshalling", "err", err)
- continue
- }
- fmt.Println(string(out))
- out, err = json.MarshalIndent(c.config.config, "", " ")
- if err != nil {
- log.Error("failed marshalling", "err", err)
- continue
- }
- fmt.Println(string(out))
-
- */
- continue
- // below are packets not part of initial connection
-
- case *meshtastic.FromRadio_LogRecord:
- variant = msg.GetLogRecord()
- case *meshtastic.FromRadio_MqttClientProxyMessage:
- variant = msg.GetMqttClientProxyMessage()
- case *meshtastic.FromRadio_QueueStatus:
- variant = msg.GetQueueStatus()
- case *meshtastic.FromRadio_Rebooted:
- // true if radio just rebooted
- fmt.Print("rebooted", msg.GetRebooted())
- continue
- case *meshtastic.FromRadio_XmodemPacket:
- variant = msg.GetXmodemPacket()
-
- case *meshtastic.FromRadio_Packet:
- variant = msg.GetPacket()
- default:
- log.Error("unhandled protobuf from radio")
- }
- if !c.config.complete {
- continue
- }
- err = c.handlers.HandleMessage(variant)
- if err != nil {
- log.Error("error handling message", "err", err)
- }
- }
- }()
-
- c.sendGetConfig()
- return nil
-}
-func (c *Conn) ConnectOld(ch chan *meshtastic.FromRadio, ch2 chan *meshtastic.ToRadio) error {
+func Connect(port string) (serial.Port, error) {
mode := &serial.Mode{
BaudRate: PORT_SPEED,
}
- port, err := serial.Open(c.serialPort, mode)
- if err != nil {
- return err
- }
- c.serialConn = port
-
- go c.decodeProtos(false, ch)
- go func() {
- for {
- msg := <-ch2
- c.SendToRadio(msg)
- }
- }()
- c.sendGetConfig()
- return nil
-}
-
-func (c *Conn) decodeProtos(printDebug bool, ch chan *meshtastic.FromRadio) {
- for {
- data, err := readUntilProtobuf(c.serialConn, printDebug)
- if err != nil {
- log.Info("error:", err)
- continue
- }
- //log.Info("read from serial and got proto")
- var msg2 meshtastic.FromRadio
- err = proto.Unmarshal(data, &msg2)
- if err != nil {
- log.Fatal(err)
- }
- ch <- &msg2
- }
-}
-func readUntilProtobuf(reader io.Reader, printDebug bool) ([]byte, error) {
- buf := make([]byte, 4)
- for {
- // Read the first byte, looking for START1.
- _, err := io.ReadFull(reader, buf[:1])
- if err != nil {
- return nil, err
- }
-
- // Check for START1.
- if buf[0] != 0x94 {
- if printDebug {
- fmt.Print(string(buf[0]))
- }
- continue
- }
-
- // Read the second byte, looking for START2.
- _, err = io.ReadFull(reader, buf[1:2])
- if err != nil {
- return nil, err
- }
-
- // Check for START2.
- if buf[1] != 0xc3 {
- continue
- }
-
- // The next two bytes should be the length of the protobuf message.
- _, err = io.ReadFull(reader, buf[2:])
- if err != nil {
- return nil, err
- }
-
- length := int(binary.BigEndian.Uint16(buf[2:]))
- if length > PACKET_MTU {
- //packet corrupt, start over
- continue
- }
- //fmt.Println("got packet from node with length", length)
- data := make([]byte, length)
-
- // Read the protobuf data.
- _, err = io.ReadFull(reader, data)
- if err != nil {
- return nil, err
- }
-
- return data, nil
- }
-}
-
-func (c *Conn) flushPort() error {
- flush := make([]byte, 32)
- for j := 0; j < len(flush); j++ {
- flush[j] = START2
- }
- _, err := c.serialConn.Write(flush)
- if err != nil {
- return err
- }
- return nil
-}
-func (c *Conn) SendToRadio(msg *meshtastic.ToRadio) error {
- err := c.flushPort()
- if err != nil {
- return err
- }
- //fmt.Printf("Sent %v bytes\n", n)
- data, err := proto.Marshal(msg)
- if err != nil {
- panic(err)
- }
- time.Sleep(WAIT_AFTER_WAKE)
-
- datalen := len(data)
- header := []byte{START1, START2, byte(datalen >> 8), byte(datalen)}
- data = append(header, data...)
- _, err = c.serialConn.Write(data)
+ p, err := serial.Open(port, mode)
if err != nil {
- log.Fatal(err)
+ return nil, err
}
- //fmt.Printf("Sent %v bytes\n", n)
- return nil
+ return p, nil
}
diff --git a/transport/serial/usb.go b/transport/serial/usb.go
index b341321..7656fa4 100644
--- a/transport/serial/usb.go
+++ b/transport/serial/usb.go
@@ -12,7 +12,11 @@ type usbDevice struct {
}
var knownDevices = []usbDevice{
- {VID: "239A", PID: "8029"}, // rak4631_19003
+ // rak4631_19003
+ {VID: "239A", PID: "8029"},
+ // CP210x UART Bridge
+ // Commonly found on Heltec and other devices.
+ {VID: "10C4", PID: "EA60"},
}
func GetPorts() []string {
@@ -26,7 +30,7 @@ func GetPorts() []string {
return nil
}
for _, port := range ports {
- //fmt.Printf("Found port: %s\n", port.SettingName)
+ // fmt.Printf("Found port: %s %s\n", port.PID, port.VID)
if port.IsUSB {
for _, device := range knownDevices {
if device.VID != port.VID {
diff --git a/transport/stream_conn.go b/transport/stream_conn.go
new file mode 100644
index 0000000..e936318
--- /dev/null
+++ b/transport/stream_conn.go
@@ -0,0 +1,187 @@
+package transport
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "google.golang.org/protobuf/proto"
+ "io"
+ "sync"
+ "time"
+)
+
+const (
+ // WaitAfterWake is the amount of time to wait after sending the wake message before sending the first message.
+ WaitAfterWake = 100 * time.Millisecond
+ // Start1 is a magic byte used in the meshtastic stream protocol.
+ // Start1 is sent at the beginning of a message to indicate the start of a new message.
+ Start1 = 0x94
+ // Start2 is a magic byte used in the meshtastic stream protocol.
+ // It is sent after Start1 to indicate the start of a new message.
+ Start2 = 0xc3
+ // PacketMTU is the maximum size of the protobuf message which can be sent within the header.
+ PacketMTU = 512
+)
+
+// StreamConn implements the meshtastic client API stream protocol.
+// This protocol is used to send and receive protobuf messages over a serial or TCP connection.
+// See https://meshtastic.org/docs/development/device/client-api#streaming-version for additional information.
+type StreamConn struct {
+ conn io.ReadWriteCloser
+ // DebugWriter is an optional writer that is used when a non-protobuf message is sent over the connection.
+ DebugWriter io.Writer
+
+ readMu sync.Mutex
+ writeMu sync.Mutex
+}
+
+// NewClientStreamConn creates a new StreamConn with the provided io.ReadWriteCloser.
+// Once an io.ReadWriteCloser is provided, the StreamConn should be used read, write and close operations.
+func NewClientStreamConn(conn io.ReadWriteCloser) (*StreamConn, error) {
+ sConn := &StreamConn{conn: conn}
+ if err := sConn.writeWake(); err != nil {
+ return nil, fmt.Errorf("sending wake message: %w", err)
+ }
+ return sConn, nil
+}
+
+// NewRadioStreamConn creates a new StreamConn with the provided io.ReadWriteCloser.
+// Once an io.ReadWriteCloser is provided, the StreamConn should be used read, write and close operations.
+func NewRadioStreamConn(conn io.ReadWriteCloser) *StreamConn {
+ return &StreamConn{conn: conn}
+}
+
+// Close closes the connection.
+func (c *StreamConn) Close() (err error) {
+ return c.conn.Close()
+}
+
+// Read reads a protobuf message from the connection.
+func (c *StreamConn) Read(out proto.Message) error {
+ data, err := c.ReadBytes()
+ if err != nil {
+ return err
+ }
+ return proto.Unmarshal(data, out)
+}
+
+// ReadBytes reads a byte message from the connection.
+// Prefer using Read if you have a protobuf message.
+func (c *StreamConn) ReadBytes() ([]byte, error) {
+ c.readMu.Lock()
+ defer c.readMu.Unlock()
+ buf := make([]byte, 4)
+ for {
+ // Read the first byte, looking for Start1.
+ _, err := io.ReadFull(c.conn, buf[:1])
+ if err != nil {
+ return nil, err
+ }
+
+ // Check for Start1.
+ if buf[0] != Start1 {
+ if c.DebugWriter != nil {
+ c.DebugWriter.Write(buf[0:1])
+ }
+ continue
+ }
+
+ // Read the second byte, looking for Start2.
+ _, err = io.ReadFull(c.conn, buf[1:2])
+ if err != nil {
+ return nil, err
+ }
+
+ // Check for Start2.
+ if buf[1] != Start2 {
+ continue
+ }
+
+ // The next two bytes should be the length of the protobuf message.
+ _, err = io.ReadFull(c.conn, buf[2:])
+ if err != nil {
+ return nil, err
+ }
+
+ length := int(binary.BigEndian.Uint16(buf[2:]))
+ if length > PacketMTU {
+ //packet corrupt, start over
+ continue
+ }
+ data := make([]byte, length)
+
+ // Read the protobuf data.
+ _, err = io.ReadFull(c.conn, data)
+ if err != nil {
+ return nil, err
+ }
+
+ return data, nil
+ }
+}
+
+// writeStreamHeader writes the stream protocol header to the provided writer.
+// See https://meshtastic.org/docs/development/device/client-api#streaming-version
+func writeStreamHeader(w io.Writer, dataLen uint16) error {
+ header := bytes.NewBuffer(nil)
+ // First we write Start1, Start2
+ header.WriteByte(Start1)
+ header.WriteByte(Start2)
+ // Next we write the length of the protobuf message as a big-endian uint16
+ err := binary.Write(header, binary.BigEndian, dataLen)
+ if err != nil {
+ return fmt.Errorf("writing length to buffer: %w", err)
+ }
+
+ _, err = w.Write(header.Bytes())
+ return err
+}
+
+// Write writes a protobuf message to the connection.
+func (c *StreamConn) Write(in proto.Message) error {
+ protoBytes, err := proto.Marshal(in)
+ if err != nil {
+ return fmt.Errorf("marshalling proto message: %w", err)
+ }
+
+ if err := c.WriteBytes(protoBytes); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// WriteBytes writes a byte slice to the connection.
+// Prefer using Write if you have a protobuf message.
+func (c *StreamConn) WriteBytes(data []byte) error {
+ if len(data) > PacketMTU {
+ return fmt.Errorf("data length exceeds MTU: %d > %d", len(data), PacketMTU)
+ }
+ c.writeMu.Lock()
+ defer c.writeMu.Unlock()
+
+ if err := writeStreamHeader(c.conn, uint16(len(data))); err != nil {
+ return fmt.Errorf("writing stream header: %w", err)
+ }
+
+ if _, err := c.conn.Write(data); err != nil {
+ return fmt.Errorf("writing proto message: %w", err)
+ }
+ return nil
+}
+
+// writeWake writes a wake message to the radio.
+// This should only be called on the client side.
+//
+// TODO: Rather than just sending this on start, do we need to also send this after a long period of inactivity?
+func (c *StreamConn) writeWake() error {
+ // Send 32 bytes of Start2 to wake the radio if sleeping.
+ _, err := c.conn.Write(
+ bytes.Repeat([]byte{Start2}, 32),
+ )
+ if err != nil {
+ return err
+ }
+ time.Sleep(WaitAfterWake)
+ return nil
+}
diff --git a/transport/stream_conn_test.go b/transport/stream_conn_test.go
new file mode 100644
index 0000000..c0d996f
--- /dev/null
+++ b/transport/stream_conn_test.go
@@ -0,0 +1,69 @@
+package transport
+
+import (
+ pb "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic"
+ "bytes"
+ "github.com/stretchr/testify/require"
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/protobuf/proto"
+ "net"
+ "testing"
+)
+
+func TestStreamConn(t *testing.T) {
+ radioNetConn, clientNetConn := net.Pipe()
+ var client *StreamConn
+ var radio *StreamConn
+
+ // Test client -> radio
+ sent := &pb.ToRadio{
+ PayloadVariant: &pb.ToRadio_WantConfigId{
+ WantConfigId: 123,
+ },
+ }
+ received := &pb.ToRadio{}
+ eg := errgroup.Group{}
+ eg.Go(func() error {
+ var err error
+ client, err = NewClientStreamConn(clientNetConn)
+ require.NoError(t, err)
+ return client.Write(sent)
+ })
+ eg.Go(func() error {
+ radio = NewRadioStreamConn(radioNetConn)
+ return radio.Read(received)
+ })
+ require.NoError(t, eg.Wait())
+ require.True(t, proto.Equal(sent, received))
+
+ // Test radio -> client
+ replySent := &pb.FromRadio{
+ Id: 123,
+ PayloadVariant: &pb.FromRadio_Config{
+ Config: &pb.Config{
+ PayloadVariant: &pb.Config_Device{
+ Device: &pb.Config_DeviceConfig{
+ Role: pb.Config_DeviceConfig_ROUTER,
+ },
+ },
+ },
+ },
+ }
+ replyReceived := &pb.FromRadio{}
+ eg = errgroup.Group{}
+ eg.Go(func() error {
+ return radio.Write(replySent)
+ })
+ eg.Go(func() error {
+ return client.Read(replyReceived)
+ })
+ require.NoError(t, eg.Wait())
+ require.True(t, proto.Equal(replySent, replyReceived))
+}
+
+func Test_writeStreamHeader(t *testing.T) {
+ out := bytes.NewBuffer(nil)
+ err := writeStreamHeader(out, 257)
+ require.NoError(t, err)
+ require.Equal(t, []byte{Start1, Start2, 0x01, 0x01}, out.Bytes())
+}