aboutsummaryrefslogtreecommitdiff
path: root/transport/stream_conn.go
diff options
context:
space:
mode:
Diffstat (limited to 'transport/stream_conn.go')
-rw-r--r--transport/stream_conn.go187
1 files changed, 187 insertions, 0 deletions
diff --git a/transport/stream_conn.go b/transport/stream_conn.go
new file mode 100644
index 0000000..e936318
--- /dev/null
+++ b/transport/stream_conn.go
@@ -0,0 +1,187 @@
+package transport
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "google.golang.org/protobuf/proto"
+ "io"
+ "sync"
+ "time"
+)
+
+const (
+ // WaitAfterWake is the amount of time to wait after sending the wake message before sending the first message.
+ WaitAfterWake = 100 * time.Millisecond
+ // Start1 is a magic byte used in the meshtastic stream protocol.
+ // Start1 is sent at the beginning of a message to indicate the start of a new message.
+ Start1 = 0x94
+ // Start2 is a magic byte used in the meshtastic stream protocol.
+ // It is sent after Start1 to indicate the start of a new message.
+ Start2 = 0xc3
+ // PacketMTU is the maximum size of the protobuf message which can be sent within the header.
+ PacketMTU = 512
+)
+
+// StreamConn implements the meshtastic client API stream protocol.
+// This protocol is used to send and receive protobuf messages over a serial or TCP connection.
+// See https://meshtastic.org/docs/development/device/client-api#streaming-version for additional information.
+type StreamConn struct {
+ conn io.ReadWriteCloser
+ // DebugWriter is an optional writer that is used when a non-protobuf message is sent over the connection.
+ DebugWriter io.Writer
+
+ readMu sync.Mutex
+ writeMu sync.Mutex
+}
+
+// NewClientStreamConn creates a new StreamConn with the provided io.ReadWriteCloser.
+// Once an io.ReadWriteCloser is provided, the StreamConn should be used read, write and close operations.
+func NewClientStreamConn(conn io.ReadWriteCloser) (*StreamConn, error) {
+ sConn := &StreamConn{conn: conn}
+ if err := sConn.writeWake(); err != nil {
+ return nil, fmt.Errorf("sending wake message: %w", err)
+ }
+ return sConn, nil
+}
+
+// NewRadioStreamConn creates a new StreamConn with the provided io.ReadWriteCloser.
+// Once an io.ReadWriteCloser is provided, the StreamConn should be used read, write and close operations.
+func NewRadioStreamConn(conn io.ReadWriteCloser) *StreamConn {
+ return &StreamConn{conn: conn}
+}
+
+// Close closes the connection.
+func (c *StreamConn) Close() (err error) {
+ return c.conn.Close()
+}
+
+// Read reads a protobuf message from the connection.
+func (c *StreamConn) Read(out proto.Message) error {
+ data, err := c.ReadBytes()
+ if err != nil {
+ return err
+ }
+ return proto.Unmarshal(data, out)
+}
+
+// ReadBytes reads a byte message from the connection.
+// Prefer using Read if you have a protobuf message.
+func (c *StreamConn) ReadBytes() ([]byte, error) {
+ c.readMu.Lock()
+ defer c.readMu.Unlock()
+ buf := make([]byte, 4)
+ for {
+ // Read the first byte, looking for Start1.
+ _, err := io.ReadFull(c.conn, buf[:1])
+ if err != nil {
+ return nil, err
+ }
+
+ // Check for Start1.
+ if buf[0] != Start1 {
+ if c.DebugWriter != nil {
+ c.DebugWriter.Write(buf[0:1])
+ }
+ continue
+ }
+
+ // Read the second byte, looking for Start2.
+ _, err = io.ReadFull(c.conn, buf[1:2])
+ if err != nil {
+ return nil, err
+ }
+
+ // Check for Start2.
+ if buf[1] != Start2 {
+ continue
+ }
+
+ // The next two bytes should be the length of the protobuf message.
+ _, err = io.ReadFull(c.conn, buf[2:])
+ if err != nil {
+ return nil, err
+ }
+
+ length := int(binary.BigEndian.Uint16(buf[2:]))
+ if length > PacketMTU {
+ //packet corrupt, start over
+ continue
+ }
+ data := make([]byte, length)
+
+ // Read the protobuf data.
+ _, err = io.ReadFull(c.conn, data)
+ if err != nil {
+ return nil, err
+ }
+
+ return data, nil
+ }
+}
+
+// writeStreamHeader writes the stream protocol header to the provided writer.
+// See https://meshtastic.org/docs/development/device/client-api#streaming-version
+func writeStreamHeader(w io.Writer, dataLen uint16) error {
+ header := bytes.NewBuffer(nil)
+ // First we write Start1, Start2
+ header.WriteByte(Start1)
+ header.WriteByte(Start2)
+ // Next we write the length of the protobuf message as a big-endian uint16
+ err := binary.Write(header, binary.BigEndian, dataLen)
+ if err != nil {
+ return fmt.Errorf("writing length to buffer: %w", err)
+ }
+
+ _, err = w.Write(header.Bytes())
+ return err
+}
+
+// Write writes a protobuf message to the connection.
+func (c *StreamConn) Write(in proto.Message) error {
+ protoBytes, err := proto.Marshal(in)
+ if err != nil {
+ return fmt.Errorf("marshalling proto message: %w", err)
+ }
+
+ if err := c.WriteBytes(protoBytes); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// WriteBytes writes a byte slice to the connection.
+// Prefer using Write if you have a protobuf message.
+func (c *StreamConn) WriteBytes(data []byte) error {
+ if len(data) > PacketMTU {
+ return fmt.Errorf("data length exceeds MTU: %d > %d", len(data), PacketMTU)
+ }
+ c.writeMu.Lock()
+ defer c.writeMu.Unlock()
+
+ if err := writeStreamHeader(c.conn, uint16(len(data))); err != nil {
+ return fmt.Errorf("writing stream header: %w", err)
+ }
+
+ if _, err := c.conn.Write(data); err != nil {
+ return fmt.Errorf("writing proto message: %w", err)
+ }
+ return nil
+}
+
+// writeWake writes a wake message to the radio.
+// This should only be called on the client side.
+//
+// TODO: Rather than just sending this on start, do we need to also send this after a long period of inactivity?
+func (c *StreamConn) writeWake() error {
+ // Send 32 bytes of Start2 to wake the radio if sleeping.
+ _, err := c.conn.Write(
+ bytes.Repeat([]byte{Start2}, 32),
+ )
+ if err != nil {
+ return err
+ }
+ time.Sleep(WaitAfterWake)
+ return nil
+}