package main import ( "context" "errors" "fmt" "net/url" "time" "github.com/charmbracelet/log" "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" "github.com/meshnet-gophers/meshtastic-go" pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" "github.com/meshnet-gophers/meshtastic-go/radio" "google.golang.org/protobuf/proto" ) // Implements MeshIf type MqttIf struct { Address string Username string Password string ClientID string Root string GatewayID meshtastic.NodeID Channels *pb.ChannelSet logger *log.Logger c *autopaho.ConnectionManager ctx context.Context stop context.CancelFunc inputCh chan mqttServiceEnvelope chIdMap map[string]uint8 } type mqttServiceEnvelope struct { timestamp time.Time se *pb.ServiceEnvelope } func NewMqttIf(address string, username, password, root, clientID string, gatewayID meshtastic.NodeID, channels *pb.ChannelSet) *MqttIf { logger := log.WithPrefix("MqttIf") inputCh := make(chan mqttServiceEnvelope, 10) return &MqttIf{ Address: address, Username: username, Password: password, ClientID: clientID, Root: root, GatewayID: gatewayID, Channels: channels, logger: logger, inputCh: inputCh, } } func (m *MqttIf) channelPublishTopic(channelId string) string { return fmt.Sprintf("%s/2/e/%s/%s", m.Root, channelId, m.GatewayID) } func (m *MqttIf) onConnectionUp(cm *autopaho.ConnectionManager, connAck *paho.Connack) { topics := []string{fmt.Sprintf("%s/2/e/PKI/#", m.Root)} chIdMap := make(map[string]uint8) for _, c := range m.Channels.Settings { topics = append(topics, fmt.Sprintf("%s/2/e/%s/#", m.Root, c.Name)) channelHash, err := radio.ChannelHash(c.Name, c.Psk) if err != nil { m.logger.Errorf("failed to build channel hash: %s", err) m.c.Disconnect(m.ctx) return } chIdMap[c.Name] = uint8(channelHash) } m.chIdMap = chIdMap subscriptions := make([]paho.SubscribeOptions, 0, len(topics)) for _, t := range topics { subscriptions = append(subscriptions, paho.SubscribeOptions{Topic: t, QoS: 1}) } if _, err := cm.Subscribe(m.ctx, &paho.Subscribe{Subscriptions: subscriptions}); err != nil { m.logger.Errorf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) m.c.Disconnect(m.ctx) return } m.logger.Info("Subscribed", "topics", topics) } func (m *MqttIf) onConnectionError(err error) { m.logger.Warnf("error whilst attempting connection: %s\n", err) } func (m *MqttIf) onReceive(pr paho.PublishReceived) (bool, error) { if pr.Packet.Duplicate() || pr.AlreadyHandled { return false, nil } se := &pb.ServiceEnvelope{} if err := proto.Unmarshal(pr.Packet.Payload, se); err != nil { return false, fmt.Errorf("unmarshalling ServicePayload from Mqtt: %w", err) } if se.Packet.RxTime == 0 { se.Packet.RxTime = uint32(time.Now().Unix()) } m.inputCh <- mqttServiceEnvelope{timestamp: time.Now(), se: se} return true, nil } func (m *MqttIf) onClientError(err error) { m.logger.Warnf("client error: %s\n", err) } func (m *MqttIf) onServerDisconnect(d *paho.Disconnect) { if d.Properties != nil { fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString) } else { fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode) } } func (m *MqttIf) Open() error { m.ctx, m.stop = context.WithCancel(context.Background()) u, err := url.Parse(m.Address) if err != nil { return err } cliCfg := autopaho.ClientConfig{ ServerUrls: []*url.URL{u}, KeepAlive: 20, // Keepalive message should be sent every 20 seconds CleanStartOnInitialConnection: false, SessionExpiryInterval: 60, OnConnectionUp: m.onConnectionUp, OnConnectError: m.onConnectionError, ConnectUsername: m.Username, ConnectPassword: []byte(m.Password), ClientConfig: paho.ClientConfig{ ClientID: m.ClientID, OnPublishReceived: []func(paho.PublishReceived) (bool, error){m.onReceive}, OnClientError: m.onClientError, OnServerDisconnect: m.onServerDisconnect, }, } m.c, err = autopaho.NewConnection(m.ctx, cliCfg) // starts process; will reconnect until context cancelled if err != nil { return err } if err = m.c.AwaitConnection(m.ctx); err != nil { return err } return nil } func (m *MqttIf) Close() error { m.logger.Info("Closing MQTT connection...") m.c.Disconnect(m.ctx) m.stop() close(m.inputCh) <-m.ctx.Done() return nil } func (m *MqttIf) ReadMeshPacket() (*pb.MeshPacket, uint64, error) { if err := m.ctx.Err(); err != nil { return nil, 0, err } p := <-m.inputCh if p.se == nil || p.se.Packet == nil { return nil, 0, errors.New("no input") } p.se.Packet.Channel = uint32(m.chIdMap[p.se.ChannelId]) return p.se.Packet, uint64(p.timestamp.UnixMicro()), nil } func (m *MqttIf) WriteMeshPacket(p *pb.MeshPacket) error { var channelId string if p.PkiEncrypted { channelId = "PKI" p.Priority = pb.MeshPacket_HIGH } else { channel := m.Channels.Settings[p.Channel] channelId = channel.Name } if p.RxTime == 0 { p.RxTime = uint32(time.Now().Unix()) } p.RelayNode = p.RelayNode & 0xFF p.NextHop = p.NextHop & 0xFF p.Channel = 0 // Don't expose your channel number se := &pb.ServiceEnvelope{ Packet: p, ChannelId: channelId, GatewayId: m.GatewayID.String(), } payload, err := proto.Marshal(se) if err != nil { return err } topic := m.channelPublishTopic(channelId) m.c.Publish(m.ctx, &paho.Publish{ Topic: topic, Payload: payload, }) return nil }