summaryrefslogtreecommitdiff
path: root/mqtt.go
diff options
context:
space:
mode:
Diffstat (limited to 'mqtt.go')
-rw-r--r--mqtt.go205
1 files changed, 205 insertions, 0 deletions
diff --git a/mqtt.go b/mqtt.go
new file mode 100644
index 0000000..2e83e8a
--- /dev/null
+++ b/mqtt.go
@@ -0,0 +1,205 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net/url"
+ "time"
+
+ "github.com/charmbracelet/log"
+ "github.com/eclipse/paho.golang/autopaho"
+ "github.com/eclipse/paho.golang/paho"
+ "github.com/meshnet-gophers/meshtastic-go"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "github.com/meshnet-gophers/meshtastic-go/radio"
+ "google.golang.org/protobuf/proto"
+)
+
+// Implements MeshIf
+type MqttIf struct {
+ Address string
+ Username string
+ Password string
+ ClientID string
+ Root string
+ GatewayID meshtastic.NodeID
+ Channels *pb.ChannelSet
+
+ logger *log.Logger
+ c *autopaho.ConnectionManager
+ ctx context.Context
+ stop context.CancelFunc
+ inputCh chan mqttServiceEnvelope
+ chIdMap map[string]uint8
+}
+
+type mqttServiceEnvelope struct {
+ timestamp time.Time
+ se *pb.ServiceEnvelope
+}
+
+func NewMqttIf(address string, username, password, root, clientID string, gatewayID meshtastic.NodeID, channels *pb.ChannelSet) *MqttIf {
+ logger := log.WithPrefix("MqttIf")
+ inputCh := make(chan mqttServiceEnvelope, 10)
+ return &MqttIf{
+ Address: address,
+ Username: username,
+ Password: password,
+ ClientID: clientID,
+ Root: root,
+ GatewayID: gatewayID,
+ Channels: channels,
+
+ logger: logger,
+ inputCh: inputCh,
+ }
+}
+
+func (m *MqttIf) channelPublishTopic(channelId string) string {
+ return fmt.Sprintf("%s/2/e/%s/%s", m.Root, channelId, m.GatewayID)
+}
+
+func (m *MqttIf) onConnectionUp(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
+ topics := []string{fmt.Sprintf("%s/2/e/PKI/#", m.Root)}
+ chIdMap := make(map[string]uint8)
+ for _, c := range m.Channels.Settings {
+ topics = append(topics, fmt.Sprintf("%s/2/e/%s/#", m.Root, c.Name))
+ channelHash, err := radio.ChannelHash(c.Name, c.Psk)
+ if err != nil {
+ m.logger.Errorf("failed to build channel hash: %s", err)
+ m.c.Disconnect(m.ctx)
+ return
+ }
+ chIdMap[c.Name] = uint8(channelHash)
+ }
+ m.chIdMap = chIdMap
+ subscriptions := make([]paho.SubscribeOptions, 0, len(topics))
+ for _, t := range topics {
+ subscriptions = append(subscriptions, paho.SubscribeOptions{Topic: t, QoS: 1})
+ }
+ if _, err := cm.Subscribe(m.ctx, &paho.Subscribe{Subscriptions: subscriptions}); err != nil {
+ m.logger.Errorf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
+ m.c.Disconnect(m.ctx)
+ return
+ }
+ m.logger.Info("Subscribed", "topics", topics)
+}
+
+func (m *MqttIf) onConnectionError(err error) {
+ m.logger.Warnf("error whilst attempting connection: %s\n", err)
+}
+
+func (m *MqttIf) onReceive(pr paho.PublishReceived) (bool, error) {
+ if pr.Packet.Duplicate() || pr.AlreadyHandled {
+ return false, nil
+ }
+ se := &pb.ServiceEnvelope{}
+ if err := proto.Unmarshal(pr.Packet.Payload, se); err != nil {
+ return false, fmt.Errorf("unmarshalling ServicePayload from Mqtt: %w", err)
+ }
+ if se.Packet.RxTime == 0 {
+ se.Packet.RxTime = uint32(time.Now().Unix())
+ }
+ m.inputCh <- mqttServiceEnvelope{timestamp: time.Now(), se: se}
+ return true, nil
+}
+
+func (m *MqttIf) onClientError(err error) {
+ m.logger.Warnf("client error: %s\n", err)
+}
+
+func (m *MqttIf) onServerDisconnect(d *paho.Disconnect) {
+ if d.Properties != nil {
+ fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString)
+ } else {
+ fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode)
+ }
+}
+
+func (m *MqttIf) Open() error {
+ m.ctx, m.stop = context.WithCancel(context.Background())
+ u, err := url.Parse(m.Address)
+ if err != nil {
+ return err
+ }
+ cliCfg := autopaho.ClientConfig{
+ ServerUrls: []*url.URL{u},
+ KeepAlive: 20, // Keepalive message should be sent every 20 seconds
+ CleanStartOnInitialConnection: false,
+ SessionExpiryInterval: 60,
+ OnConnectionUp: m.onConnectionUp,
+ OnConnectError: m.onConnectionError,
+ ConnectUsername: m.Username,
+ ConnectPassword: []byte(m.Password),
+ ClientConfig: paho.ClientConfig{
+ ClientID: m.ClientID,
+ OnPublishReceived: []func(paho.PublishReceived) (bool, error){m.onReceive},
+ OnClientError: m.onClientError,
+ OnServerDisconnect: m.onServerDisconnect,
+ },
+ }
+
+ m.c, err = autopaho.NewConnection(m.ctx, cliCfg) // starts process; will reconnect until context cancelled
+ if err != nil {
+ return err
+ }
+ if err = m.c.AwaitConnection(m.ctx); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (m *MqttIf) Close() error {
+ m.logger.Info("Closing MQTT connection...")
+ m.c.Disconnect(m.ctx)
+ m.stop()
+ close(m.inputCh)
+ <-m.ctx.Done()
+ return nil
+}
+
+func (m *MqttIf) ReadMeshPacket() (*pb.MeshPacket, uint64, error) {
+ if err := m.ctx.Err(); err != nil {
+ return nil, 0, err
+ }
+ p := <-m.inputCh
+ if p.se == nil || p.se.Packet == nil {
+ return nil, 0, errors.New("no input")
+ }
+ p.se.Packet.Channel = uint32(m.chIdMap[p.se.ChannelId])
+ return p.se.Packet, uint64(p.timestamp.UnixMicro()), nil
+}
+
+func (m *MqttIf) WriteMeshPacket(p *pb.MeshPacket) error {
+ var channelId string
+ if p.PkiEncrypted {
+ channelId = "PKI"
+ p.Priority = pb.MeshPacket_HIGH
+ } else {
+ channel := m.Channels.Settings[p.Channel]
+ channelId = channel.Name
+ }
+ if p.RxTime == 0 {
+ p.RxTime = uint32(time.Now().Unix())
+ }
+ p.RelayNode = p.RelayNode & 0xFF
+ p.NextHop = p.NextHop & 0xFF
+ p.Channel = 0 // Don't expose your channel number
+ se := &pb.ServiceEnvelope{
+ Packet: p,
+ ChannelId: channelId,
+ GatewayId: m.GatewayID.String(),
+ }
+ payload, err := proto.Marshal(se)
+ if err != nil {
+ return err
+ }
+ topic := m.channelPublishTopic(channelId)
+ m.c.Publish(m.ctx, &paho.Publish{
+ Topic: topic,
+ Payload: payload,
+ })
+ return nil
+}