diff options
| -rw-r--r-- | example/main.go | 5 | ||||
| -rw-r--r-- | transport/client.go | 194 |
2 files changed, 170 insertions, 29 deletions
diff --git a/example/main.go b/example/main.go index 3986791..884ee36 100644 --- a/example/main.go +++ b/example/main.go @@ -9,6 +9,7 @@ import ( "google.golang.org/protobuf/proto" "os" "os/signal" + "time" ) func main() { @@ -37,7 +38,9 @@ func main() { pkt := msg.(*pb.MeshPacket) log.Info("Received message from radio", "msg", pkt) }) - if client.Connect() != nil { + ctxTimeout, cancelTimeout := context.WithTimeout(ctx, 10*time.Second) + defer cancelTimeout() + if client.Connect(ctxTimeout) != nil { panic("Failed to connect to the radio") } diff --git a/transport/client.go b/transport/client.go index 042d2ce..c638cf4 100644 --- a/transport/client.go +++ b/transport/client.go @@ -2,10 +2,17 @@ package transport import ( "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic" + "context" + "errors" "fmt" - "github.com/charmbracelet/log" "google.golang.org/protobuf/proto" + "log/slog" "math/rand" + "sync" +) + +var ( + ErrTimeout = errors.New("timeout connecting to radio") ) type HandlerFunc func(message proto.Message) @@ -13,23 +20,139 @@ type HandlerFunc func(message proto.Message) type Client struct { sc *StreamConn handlers *HandlerRegistry - log *log.Logger - - config struct { - complete bool - configID uint32 - *meshtastic.MyNodeInfo - *meshtastic.DeviceMetadata - nodes []*meshtastic.NodeInfo - channels []*meshtastic.Channel - config []*meshtastic.Config - modules []*meshtastic.ModuleConfig + log *slog.Logger + + State State +} + +type State struct { + sync.RWMutex + complete bool + configID uint32 + nodeInfo *meshtastic.MyNodeInfo + deviceMetadata *meshtastic.DeviceMetadata + nodes []*meshtastic.NodeInfo + channels []*meshtastic.Channel + configs []*meshtastic.Config + modules []*meshtastic.ModuleConfig +} + +func (s *State) Complete() bool { + s.RLock() + defer s.RUnlock() + return s.complete +} + +func (s *State) ConfigID() uint32 { + s.RLock() + defer s.RUnlock() + return s.configID +} + +func (s *State) NodeInfo() *meshtastic.MyNodeInfo { + s.RLock() + defer s.RUnlock() + return s.nodeInfo +} + +func (s *State) DeviceMetadata() *meshtastic.DeviceMetadata { + s.RLock() + defer s.RUnlock() + return proto.Clone(s.deviceMetadata).(*meshtastic.DeviceMetadata) +} + +func (s *State) Nodes() []*meshtastic.NodeInfo { + s.RLock() + defer s.RUnlock() + var nodeInfos []*meshtastic.NodeInfo + for _, n := range s.nodes { + nodeInfos = append(nodeInfos, proto.Clone(n).(*meshtastic.NodeInfo)) + } + return nodeInfos +} + +func (s *State) Channels() []*meshtastic.Channel { + s.RLock() + defer s.RUnlock() + var channels []*meshtastic.Channel + for _, n := range s.channels { + channels = append(channels, proto.Clone(n).(*meshtastic.Channel)) + } + return channels +} + +func (s *State) Configs() []*meshtastic.Config { + s.RLock() + defer s.RUnlock() + var configs []*meshtastic.Config + for _, n := range s.configs { + configs = append(configs, proto.Clone(n).(*meshtastic.Config)) + } + return configs +} + +func (s *State) Modules() []*meshtastic.ModuleConfig { + s.RLock() + defer s.RUnlock() + var configs []*meshtastic.ModuleConfig + for _, n := range s.modules { + configs = append(configs, proto.Clone(n).(*meshtastic.ModuleConfig)) } + return configs +} + +func (s *State) SetComplete(complete bool) { + s.Lock() + defer s.Unlock() + s.complete = complete +} + +func (s *State) SetConfigID(configID uint32) { + s.Lock() + defer s.Unlock() + s.configID = configID +} + +func (s *State) SetNodeInfo(nodeInfo *meshtastic.MyNodeInfo) { + s.Lock() + defer s.Unlock() + s.nodeInfo = nodeInfo +} + +func (s *State) SetDeviceMetadata(deviceMetadata *meshtastic.DeviceMetadata) { + s.Lock() + defer s.Unlock() + s.deviceMetadata = deviceMetadata +} + +func (s *State) AddNode(node *meshtastic.NodeInfo) { + s.Lock() + defer s.Unlock() + s.nodes = append(s.nodes, node) +} + +func (s *State) AddChannel(channel *meshtastic.Channel) { + s.Lock() + defer s.Unlock() + s.channels = append(s.channels, channel) +} + +func (s *State) AddConfig(config *meshtastic.Config) { + s.Lock() + defer s.Unlock() + s.configs = append(s.configs, config) +} + +func (s *State) AddModule(module *meshtastic.ModuleConfig) { + s.Lock() + defer s.Unlock() + s.modules = append(s.modules, module) } func NewClient(sc *StreamConn, errorOnNoHandler bool) *Client { return &Client{ - log: log.WithPrefix("client"), + // TODO: allow consumer to specify logger + log: slog.Default().WithGroup("client"), sc: sc, handlers: NewHandlerRegistry(errorOnNoHandler), } @@ -38,7 +161,7 @@ func NewClient(sc *StreamConn, errorOnNoHandler bool) *Client { // You have to send this first to get the radio into protobuf mode and have it accept and send packets via serial func (c *Client) sendGetConfig() error { r := rand.Uint32() - c.config.configID = r + c.State.configID = r msg := &meshtastic.ToRadio{ PayloadVariant: &meshtastic.ToRadio_WantConfigId{ WantConfigId: r, @@ -60,10 +183,11 @@ func (c *Client) SendToRadio(msg *meshtastic.ToRadio) error { return c.sc.Write(msg) } -func (c *Client) Connect() error { +func (c *Client) Connect(ctx context.Context) error { if err := c.sendGetConfig(); err != nil { return fmt.Errorf("requesting config: %w", err) } + cfgComplete := make(chan struct{}) go func() { for { msg := &meshtastic.FromRadio{} @@ -77,30 +201,34 @@ func (c *Client) Connect() error { switch msg.GetPayloadVariant().(type) { // These pbufs all get sent upon initial connection to the node case *meshtastic.FromRadio_MyInfo: - c.config.MyNodeInfo = msg.GetMyInfo() - variant = c.config.MyNodeInfo + c.State.SetNodeInfo(msg.GetMyInfo()) + variant = c.State.nodeInfo case *meshtastic.FromRadio_Metadata: - c.config.DeviceMetadata = msg.GetMetadata() - variant = c.config.DeviceMetadata + c.State.SetDeviceMetadata(msg.GetMetadata()) + variant = c.State.deviceMetadata case *meshtastic.FromRadio_NodeInfo: node := msg.GetNodeInfo() - c.config.nodes = append(c.config.nodes, node) + c.State.AddNode(node) variant = node case *meshtastic.FromRadio_Channel: channel := msg.GetChannel() - c.config.channels = append(c.config.channels, channel) + c.State.AddChannel(channel) variant = channel case *meshtastic.FromRadio_Config: cfg := msg.GetConfig() - c.config.config = append(c.config.config, cfg) + c.State.AddConfig(cfg) variant = cfg case *meshtastic.FromRadio_ModuleConfig: cfg := msg.GetModuleConfig() - c.config.modules = append(c.config.modules, cfg) + c.State.AddModule(cfg) variant = cfg case *meshtastic.FromRadio_ConfigCompleteId: - c.log.Info("config complete") - c.config.complete = true + // logged here because it's not an actual proto.Message that we can call handlers on + c.log.Debug("config complete") + if !c.State.Complete() { + close(cfgComplete) + } + c.State.SetComplete(true) continue // below are packets not part of initial connection @@ -112,17 +240,19 @@ func (c *Client) Connect() error { variant = msg.GetQueueStatus() case *meshtastic.FromRadio_Rebooted: // true if radio just rebooted + // logged here because it's not an actual proto.Message that we can call handlers on c.log.Debug("rebooted", "rebooted", msg.GetRebooted()) + continue case *meshtastic.FromRadio_XmodemPacket: variant = msg.GetXmodemPacket() - case *meshtastic.FromRadio_Packet: variant = msg.GetPacket() default: c.log.Warn("unhandled protobuf from radio") } - if !c.config.complete { + + if !c.State.Complete() { continue } err = c.handlers.HandleMessage(variant) @@ -131,5 +261,13 @@ func (c *Client) Connect() error { } } }() - return nil + + for { + select { + case <-ctx.Done(): + return ErrTimeout + case <-cfgComplete: + return nil + } + } } |
