diff options
Diffstat (limited to 'mqtt.go')
| -rw-r--r-- | mqtt.go | 205 |
1 files changed, 205 insertions, 0 deletions
@@ -0,0 +1,205 @@ +package main + +import ( + "context" + "errors" + "fmt" + "net/url" + "time" + + "github.com/charmbracelet/log" + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "github.com/meshnet-gophers/meshtastic-go/radio" + "google.golang.org/protobuf/proto" +) + +// Implements MeshIf +type MqttIf struct { + Address string + Username string + Password string + ClientID string + Root string + GatewayID meshtastic.NodeID + Channels *pb.ChannelSet + + logger *log.Logger + c *autopaho.ConnectionManager + ctx context.Context + stop context.CancelFunc + inputCh chan mqttServiceEnvelope + chIdMap map[string]uint8 +} + +type mqttServiceEnvelope struct { + timestamp time.Time + se *pb.ServiceEnvelope +} + +func NewMqttIf(address string, username, password, root, clientID string, gatewayID meshtastic.NodeID, channels *pb.ChannelSet) *MqttIf { + logger := log.WithPrefix("MqttIf") + inputCh := make(chan mqttServiceEnvelope, 10) + return &MqttIf{ + Address: address, + Username: username, + Password: password, + ClientID: clientID, + Root: root, + GatewayID: gatewayID, + Channels: channels, + + logger: logger, + inputCh: inputCh, + } +} + +func (m *MqttIf) channelPublishTopic(channelId string) string { + return fmt.Sprintf("%s/2/e/%s/%s", m.Root, channelId, m.GatewayID) +} + +func (m *MqttIf) onConnectionUp(cm *autopaho.ConnectionManager, connAck *paho.Connack) { + topics := []string{fmt.Sprintf("%s/2/e/PKI/#", m.Root)} + chIdMap := make(map[string]uint8) + for _, c := range m.Channels.Settings { + topics = append(topics, fmt.Sprintf("%s/2/e/%s/#", m.Root, c.Name)) + channelHash, err := radio.ChannelHash(c.Name, c.Psk) + if err != nil { + m.logger.Errorf("failed to build channel hash: %s", err) + m.c.Disconnect(m.ctx) + return + } + chIdMap[c.Name] = uint8(channelHash) + } + m.chIdMap = chIdMap + subscriptions := make([]paho.SubscribeOptions, 0, len(topics)) + for _, t := range topics { + subscriptions = append(subscriptions, paho.SubscribeOptions{Topic: t, QoS: 1}) + } + if _, err := cm.Subscribe(m.ctx, &paho.Subscribe{Subscriptions: subscriptions}); err != nil { + m.logger.Errorf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) + m.c.Disconnect(m.ctx) + return + } + m.logger.Info("Subscribed", "topics", topics) +} + +func (m *MqttIf) onConnectionError(err error) { + m.logger.Warnf("error whilst attempting connection: %s\n", err) +} + +func (m *MqttIf) onReceive(pr paho.PublishReceived) (bool, error) { + if pr.Packet.Duplicate() || pr.AlreadyHandled { + return false, nil + } + se := &pb.ServiceEnvelope{} + if err := proto.Unmarshal(pr.Packet.Payload, se); err != nil { + return false, fmt.Errorf("unmarshalling ServicePayload from Mqtt: %w", err) + } + if se.Packet.RxTime == 0 { + se.Packet.RxTime = uint32(time.Now().Unix()) + } + m.inputCh <- mqttServiceEnvelope{timestamp: time.Now(), se: se} + return true, nil +} + +func (m *MqttIf) onClientError(err error) { + m.logger.Warnf("client error: %s\n", err) +} + +func (m *MqttIf) onServerDisconnect(d *paho.Disconnect) { + if d.Properties != nil { + fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString) + } else { + fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode) + } +} + +func (m *MqttIf) Open() error { + m.ctx, m.stop = context.WithCancel(context.Background()) + u, err := url.Parse(m.Address) + if err != nil { + return err + } + cliCfg := autopaho.ClientConfig{ + ServerUrls: []*url.URL{u}, + KeepAlive: 20, // Keepalive message should be sent every 20 seconds + CleanStartOnInitialConnection: false, + SessionExpiryInterval: 60, + OnConnectionUp: m.onConnectionUp, + OnConnectError: m.onConnectionError, + ConnectUsername: m.Username, + ConnectPassword: []byte(m.Password), + ClientConfig: paho.ClientConfig{ + ClientID: m.ClientID, + OnPublishReceived: []func(paho.PublishReceived) (bool, error){m.onReceive}, + OnClientError: m.onClientError, + OnServerDisconnect: m.onServerDisconnect, + }, + } + + m.c, err = autopaho.NewConnection(m.ctx, cliCfg) // starts process; will reconnect until context cancelled + if err != nil { + return err + } + if err = m.c.AwaitConnection(m.ctx); err != nil { + return err + } + + return nil +} + +func (m *MqttIf) Close() error { + m.logger.Info("Closing MQTT connection...") + m.c.Disconnect(m.ctx) + m.stop() + close(m.inputCh) + <-m.ctx.Done() + return nil +} + +func (m *MqttIf) ReadMeshPacket() (*pb.MeshPacket, uint64, error) { + if err := m.ctx.Err(); err != nil { + return nil, 0, err + } + p := <-m.inputCh + if p.se == nil || p.se.Packet == nil { + return nil, 0, errors.New("no input") + } + p.se.Packet.Channel = uint32(m.chIdMap[p.se.ChannelId]) + return p.se.Packet, uint64(p.timestamp.UnixMicro()), nil +} + +func (m *MqttIf) WriteMeshPacket(p *pb.MeshPacket) error { + var channelId string + if p.PkiEncrypted { + channelId = "PKI" + p.Priority = pb.MeshPacket_HIGH + } else { + channel := m.Channels.Settings[p.Channel] + channelId = channel.Name + } + if p.RxTime == 0 { + p.RxTime = uint32(time.Now().Unix()) + } + p.RelayNode = p.RelayNode & 0xFF + p.NextHop = p.NextHop & 0xFF + p.Channel = 0 // Don't expose your channel number + se := &pb.ServiceEnvelope{ + Packet: p, + ChannelId: channelId, + GatewayId: m.GatewayID.String(), + } + payload, err := proto.Marshal(se) + if err != nil { + return err + } + topic := m.channelPublishTopic(channelId) + m.c.Publish(m.ctx, &paho.Publish{ + Topic: topic, + Payload: payload, + }) + return nil +} |
