diff options
| author | Smoke <[email protected]> | 2024-01-19 10:51:52 -1000 |
|---|---|---|
| committer | Smoke <[email protected]> | 2024-01-19 10:51:52 -1000 |
| commit | 70bb2c77356d349165ba46ea98f8346284c2e44e (patch) | |
| tree | 7a1f858ca12386f7bd9478550e29bf3c1af109b5 /mqtt | |
| parent | 320bb2e1e7dfe5092ea1f6b65a9c6e53e58ce387 (diff) | |
updates
Diffstat (limited to 'mqtt')
| -rw-r--r-- | mqtt/client.go | 138 | ||||
| -rw-r--r-- | mqtt/node.go | 14 | ||||
| -rw-r--r-- | mqtt/util.go | 23 |
3 files changed, 175 insertions, 0 deletions
diff --git a/mqtt/client.go b/mqtt/client.go new file mode 100644 index 0000000..913b8af --- /dev/null +++ b/mqtt/client.go @@ -0,0 +1,138 @@ +package mqtt + +import ( + "errors" + "github.com/charmbracelet/log" + mqtt "github.com/eclipse/paho.mqtt.golang" + "strings" + "sync" + "time" +) + +type Client struct { + server string + username string + password string + topicRoot string + clientID string + client mqtt.Client + sync.RWMutex + channelHandlers map[string][]HandlerFunc +} + +type HandlerFunc func(message Message) + +var DefaultClient = Client{ + server: "tcp://mqtt.meshtastic.org:1883", + username: "meshdev", + password: "large4cats", + topicRoot: "msh/2", + + channelHandlers: make(map[string][]HandlerFunc), +} + +func NewClient(url, username, password, rootTopic string) *Client { + return &Client{ + server: url, + username: username, + password: password, + topicRoot: rootTopic, + channelHandlers: make(map[string][]HandlerFunc), + } +} + +func (c *Client) TopicRoot() string { + return c.topicRoot +} + +func (c *Client) Connect() error { + var alphabet = []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789") + c.clientID = randomString(23, alphabet) + + mqtt.DEBUG = log.StandardLog(log.StandardLogOptions{ForceLevel: log.DebugLevel}) + mqtt.ERROR = log.StandardLog(log.StandardLogOptions{ForceLevel: log.ErrorLevel}) + opts := mqtt.NewClientOptions(). + AddBroker(c.server). + SetUsername(c.username). + SetOrderMatters(false). + SetPassword(c.password). + SetClientID(c.clientID). + SetCleanSession(false) + opts.SetKeepAlive(30 * time.Second) + opts.SetResumeSubs(true) + //opts.SetDefaultPublishHandler(f) + opts.SetPingTimeout(5 * time.Second) + opts.SetAutoReconnect(true) + opts.SetMaxReconnectInterval(1 * time.Minute) + opts.SetConnectionLostHandler(func(c mqtt.Client, err error) { + log.Error("mqtt connection lost", "err", err) + }) + opts.SetReconnectingHandler(func(c mqtt.Client, options *mqtt.ClientOptions) { + log.Info("mqtt reconnecting") + }) + opts.SetOnConnectHandler(func(client mqtt.Client) { + log.Info("connected to", "server", c.server) + }) + c.client = mqtt.NewClient(opts) + if token := c.client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + return nil +} + +// MQTT Message +type Message struct { + Topic string + Payload []byte + Retained bool +} + +// Publish a message to the broker +func (c *Client) Publish(m *Message) error { + tok := c.client.Publish(m.Topic, 0, m.Retained, m.Payload) + if !tok.WaitTimeout(10 * time.Second) { + tok.Wait() + return errors.New("timeout on mqtt publish") + } + if tok.Error() != nil { + return tok.Error() + } + return nil +} + +// Register a handler for messages on the specified channel +func (c *Client) Handle(channel string, h HandlerFunc) { + c.Lock() + defer c.Unlock() + topic := c.GetFullTopicForChannel(channel) + c.channelHandlers[channel] = append(c.channelHandlers[channel], h) + c.client.Subscribe(topic+"/+", 0, c.handleBrokerMessage) +} +func (c *Client) GetFullTopicForChannel(channel string) string { + return c.topicRoot + "/c/" + channel +} +func (c *Client) GetChannelFromTopic(topic string) string { + trimmed := strings.TrimPrefix(topic, c.topicRoot+"/c/") + sepIndex := strings.Index(trimmed, "/") + if sepIndex > 0 { + return trimmed[:sepIndex] + } + return trimmed +} +func (c *Client) handleBrokerMessage(client mqtt.Client, message mqtt.Message) { + msg := Message{ + Topic: message.Topic(), + Payload: message.Payload(), + Retained: message.Retained(), + } + c.RLock() + defer c.RUnlock() + channel := c.GetChannelFromTopic(msg.Topic) + chans := c.channelHandlers[channel] + if len(chans) == 0 { + log.Error("no handlers found", "topic", channel) + } + for _, ch := range chans { + go ch(msg) + } +} diff --git a/mqtt/node.go b/mqtt/node.go new file mode 100644 index 0000000..3400307 --- /dev/null +++ b/mqtt/node.go @@ -0,0 +1,14 @@ +package mqtt + +import "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic" + +// Node implements a meshtastic node that connects only via MQTT +type Node struct { + user *meshtastic.User +} + +func NewNode(user *meshtastic.User) *Node { + return &Node{ + user: user, + } +} diff --git a/mqtt/util.go b/mqtt/util.go new file mode 100644 index 0000000..a934614 --- /dev/null +++ b/mqtt/util.go @@ -0,0 +1,23 @@ +package mqtt + +import ( + "math/rand" + "strings" + "time" +) + +// generates a random string for use as a client ID +func randomString(n int, alphabet []rune) string { + var seededRand = rand.New(rand.NewSource(time.Now().UnixNano())) + + alphabetSize := len(alphabet) + var sb strings.Builder + + for i := 0; i < n; i++ { + ch := alphabet[seededRand.Intn(alphabetSize)] + sb.WriteRune(ch) + } + + s := sb.String() + return s +} |
