aboutsummaryrefslogtreecommitdiff
path: root/mqtt
diff options
context:
space:
mode:
authorSmoke <[email protected]>2024-01-19 10:51:52 -1000
committerSmoke <[email protected]>2024-01-19 10:51:52 -1000
commit70bb2c77356d349165ba46ea98f8346284c2e44e (patch)
tree7a1f858ca12386f7bd9478550e29bf3c1af109b5 /mqtt
parent320bb2e1e7dfe5092ea1f6b65a9c6e53e58ce387 (diff)
updates
Diffstat (limited to 'mqtt')
-rw-r--r--mqtt/client.go138
-rw-r--r--mqtt/node.go14
-rw-r--r--mqtt/util.go23
3 files changed, 175 insertions, 0 deletions
diff --git a/mqtt/client.go b/mqtt/client.go
new file mode 100644
index 0000000..913b8af
--- /dev/null
+++ b/mqtt/client.go
@@ -0,0 +1,138 @@
+package mqtt
+
+import (
+ "errors"
+ "github.com/charmbracelet/log"
+ mqtt "github.com/eclipse/paho.mqtt.golang"
+ "strings"
+ "sync"
+ "time"
+)
+
+type Client struct {
+ server string
+ username string
+ password string
+ topicRoot string
+ clientID string
+ client mqtt.Client
+ sync.RWMutex
+ channelHandlers map[string][]HandlerFunc
+}
+
+type HandlerFunc func(message Message)
+
+var DefaultClient = Client{
+ server: "tcp://mqtt.meshtastic.org:1883",
+ username: "meshdev",
+ password: "large4cats",
+ topicRoot: "msh/2",
+
+ channelHandlers: make(map[string][]HandlerFunc),
+}
+
+func NewClient(url, username, password, rootTopic string) *Client {
+ return &Client{
+ server: url,
+ username: username,
+ password: password,
+ topicRoot: rootTopic,
+ channelHandlers: make(map[string][]HandlerFunc),
+ }
+}
+
+func (c *Client) TopicRoot() string {
+ return c.topicRoot
+}
+
+func (c *Client) Connect() error {
+ var alphabet = []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")
+ c.clientID = randomString(23, alphabet)
+
+ mqtt.DEBUG = log.StandardLog(log.StandardLogOptions{ForceLevel: log.DebugLevel})
+ mqtt.ERROR = log.StandardLog(log.StandardLogOptions{ForceLevel: log.ErrorLevel})
+ opts := mqtt.NewClientOptions().
+ AddBroker(c.server).
+ SetUsername(c.username).
+ SetOrderMatters(false).
+ SetPassword(c.password).
+ SetClientID(c.clientID).
+ SetCleanSession(false)
+ opts.SetKeepAlive(30 * time.Second)
+ opts.SetResumeSubs(true)
+ //opts.SetDefaultPublishHandler(f)
+ opts.SetPingTimeout(5 * time.Second)
+ opts.SetAutoReconnect(true)
+ opts.SetMaxReconnectInterval(1 * time.Minute)
+ opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
+ log.Error("mqtt connection lost", "err", err)
+ })
+ opts.SetReconnectingHandler(func(c mqtt.Client, options *mqtt.ClientOptions) {
+ log.Info("mqtt reconnecting")
+ })
+ opts.SetOnConnectHandler(func(client mqtt.Client) {
+ log.Info("connected to", "server", c.server)
+ })
+ c.client = mqtt.NewClient(opts)
+ if token := c.client.Connect(); token.Wait() && token.Error() != nil {
+ return token.Error()
+ }
+ return nil
+}
+
+// MQTT Message
+type Message struct {
+ Topic string
+ Payload []byte
+ Retained bool
+}
+
+// Publish a message to the broker
+func (c *Client) Publish(m *Message) error {
+ tok := c.client.Publish(m.Topic, 0, m.Retained, m.Payload)
+ if !tok.WaitTimeout(10 * time.Second) {
+ tok.Wait()
+ return errors.New("timeout on mqtt publish")
+ }
+ if tok.Error() != nil {
+ return tok.Error()
+ }
+ return nil
+}
+
+// Register a handler for messages on the specified channel
+func (c *Client) Handle(channel string, h HandlerFunc) {
+ c.Lock()
+ defer c.Unlock()
+ topic := c.GetFullTopicForChannel(channel)
+ c.channelHandlers[channel] = append(c.channelHandlers[channel], h)
+ c.client.Subscribe(topic+"/+", 0, c.handleBrokerMessage)
+}
+func (c *Client) GetFullTopicForChannel(channel string) string {
+ return c.topicRoot + "/c/" + channel
+}
+func (c *Client) GetChannelFromTopic(topic string) string {
+ trimmed := strings.TrimPrefix(topic, c.topicRoot+"/c/")
+ sepIndex := strings.Index(trimmed, "/")
+ if sepIndex > 0 {
+ return trimmed[:sepIndex]
+ }
+ return trimmed
+}
+func (c *Client) handleBrokerMessage(client mqtt.Client, message mqtt.Message) {
+ msg := Message{
+ Topic: message.Topic(),
+ Payload: message.Payload(),
+ Retained: message.Retained(),
+ }
+ c.RLock()
+ defer c.RUnlock()
+ channel := c.GetChannelFromTopic(msg.Topic)
+ chans := c.channelHandlers[channel]
+ if len(chans) == 0 {
+ log.Error("no handlers found", "topic", channel)
+ }
+ for _, ch := range chans {
+ go ch(msg)
+ }
+}
diff --git a/mqtt/node.go b/mqtt/node.go
new file mode 100644
index 0000000..3400307
--- /dev/null
+++ b/mqtt/node.go
@@ -0,0 +1,14 @@
+package mqtt
+
+import "buf.build/gen/go/meshtastic/protobufs/protocolbuffers/go/meshtastic"
+
+// Node implements a meshtastic node that connects only via MQTT
+type Node struct {
+ user *meshtastic.User
+}
+
+func NewNode(user *meshtastic.User) *Node {
+ return &Node{
+ user: user,
+ }
+}
diff --git a/mqtt/util.go b/mqtt/util.go
new file mode 100644
index 0000000..a934614
--- /dev/null
+++ b/mqtt/util.go
@@ -0,0 +1,23 @@
+package mqtt
+
+import (
+ "math/rand"
+ "strings"
+ "time"
+)
+
+// generates a random string for use as a client ID
+func randomString(n int, alphabet []rune) string {
+ var seededRand = rand.New(rand.NewSource(time.Now().UnixNano()))
+
+ alphabetSize := len(alphabet)
+ var sb strings.Builder
+
+ for i := 0; i < n; i++ {
+ ch := alphabet[seededRand.Intn(alphabetSize)]
+ sb.WriteRune(ch)
+ }
+
+ s := sb.String()
+ return s
+}