diff options
Diffstat (limited to 'transport/stream_conn.go')
| -rw-r--r-- | transport/stream_conn.go | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/transport/stream_conn.go b/transport/stream_conn.go new file mode 100644 index 0000000..e936318 --- /dev/null +++ b/transport/stream_conn.go @@ -0,0 +1,187 @@ +package transport + +import ( + "bytes" + "encoding/binary" + "fmt" + "google.golang.org/protobuf/proto" + "io" + "sync" + "time" +) + +const ( + // WaitAfterWake is the amount of time to wait after sending the wake message before sending the first message. + WaitAfterWake = 100 * time.Millisecond + // Start1 is a magic byte used in the meshtastic stream protocol. + // Start1 is sent at the beginning of a message to indicate the start of a new message. + Start1 = 0x94 + // Start2 is a magic byte used in the meshtastic stream protocol. + // It is sent after Start1 to indicate the start of a new message. + Start2 = 0xc3 + // PacketMTU is the maximum size of the protobuf message which can be sent within the header. + PacketMTU = 512 +) + +// StreamConn implements the meshtastic client API stream protocol. +// This protocol is used to send and receive protobuf messages over a serial or TCP connection. +// See https://meshtastic.org/docs/development/device/client-api#streaming-version for additional information. +type StreamConn struct { + conn io.ReadWriteCloser + // DebugWriter is an optional writer that is used when a non-protobuf message is sent over the connection. + DebugWriter io.Writer + + readMu sync.Mutex + writeMu sync.Mutex +} + +// NewClientStreamConn creates a new StreamConn with the provided io.ReadWriteCloser. +// Once an io.ReadWriteCloser is provided, the StreamConn should be used read, write and close operations. +func NewClientStreamConn(conn io.ReadWriteCloser) (*StreamConn, error) { + sConn := &StreamConn{conn: conn} + if err := sConn.writeWake(); err != nil { + return nil, fmt.Errorf("sending wake message: %w", err) + } + return sConn, nil +} + +// NewRadioStreamConn creates a new StreamConn with the provided io.ReadWriteCloser. +// Once an io.ReadWriteCloser is provided, the StreamConn should be used read, write and close operations. +func NewRadioStreamConn(conn io.ReadWriteCloser) *StreamConn { + return &StreamConn{conn: conn} +} + +// Close closes the connection. +func (c *StreamConn) Close() (err error) { + return c.conn.Close() +} + +// Read reads a protobuf message from the connection. +func (c *StreamConn) Read(out proto.Message) error { + data, err := c.ReadBytes() + if err != nil { + return err + } + return proto.Unmarshal(data, out) +} + +// ReadBytes reads a byte message from the connection. +// Prefer using Read if you have a protobuf message. +func (c *StreamConn) ReadBytes() ([]byte, error) { + c.readMu.Lock() + defer c.readMu.Unlock() + buf := make([]byte, 4) + for { + // Read the first byte, looking for Start1. + _, err := io.ReadFull(c.conn, buf[:1]) + if err != nil { + return nil, err + } + + // Check for Start1. + if buf[0] != Start1 { + if c.DebugWriter != nil { + c.DebugWriter.Write(buf[0:1]) + } + continue + } + + // Read the second byte, looking for Start2. + _, err = io.ReadFull(c.conn, buf[1:2]) + if err != nil { + return nil, err + } + + // Check for Start2. + if buf[1] != Start2 { + continue + } + + // The next two bytes should be the length of the protobuf message. + _, err = io.ReadFull(c.conn, buf[2:]) + if err != nil { + return nil, err + } + + length := int(binary.BigEndian.Uint16(buf[2:])) + if length > PacketMTU { + //packet corrupt, start over + continue + } + data := make([]byte, length) + + // Read the protobuf data. + _, err = io.ReadFull(c.conn, data) + if err != nil { + return nil, err + } + + return data, nil + } +} + +// writeStreamHeader writes the stream protocol header to the provided writer. +// See https://meshtastic.org/docs/development/device/client-api#streaming-version +func writeStreamHeader(w io.Writer, dataLen uint16) error { + header := bytes.NewBuffer(nil) + // First we write Start1, Start2 + header.WriteByte(Start1) + header.WriteByte(Start2) + // Next we write the length of the protobuf message as a big-endian uint16 + err := binary.Write(header, binary.BigEndian, dataLen) + if err != nil { + return fmt.Errorf("writing length to buffer: %w", err) + } + + _, err = w.Write(header.Bytes()) + return err +} + +// Write writes a protobuf message to the connection. +func (c *StreamConn) Write(in proto.Message) error { + protoBytes, err := proto.Marshal(in) + if err != nil { + return fmt.Errorf("marshalling proto message: %w", err) + } + + if err := c.WriteBytes(protoBytes); err != nil { + return err + } + + return nil +} + +// WriteBytes writes a byte slice to the connection. +// Prefer using Write if you have a protobuf message. +func (c *StreamConn) WriteBytes(data []byte) error { + if len(data) > PacketMTU { + return fmt.Errorf("data length exceeds MTU: %d > %d", len(data), PacketMTU) + } + c.writeMu.Lock() + defer c.writeMu.Unlock() + + if err := writeStreamHeader(c.conn, uint16(len(data))); err != nil { + return fmt.Errorf("writing stream header: %w", err) + } + + if _, err := c.conn.Write(data); err != nil { + return fmt.Errorf("writing proto message: %w", err) + } + return nil +} + +// writeWake writes a wake message to the radio. +// This should only be called on the client side. +// +// TODO: Rather than just sending this on start, do we need to also send this after a long period of inactivity? +func (c *StreamConn) writeWake() error { + // Send 32 bytes of Start2 to wake the radio if sleeping. + _, err := c.conn.Write( + bytes.Repeat([]byte{Start2}, 32), + ) + if err != nil { + return err + } + time.Sleep(WaitAfterWake) + return nil +} |
