summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore3
-rw-r--r--config.go53
-rw-r--r--config.json.example43
-rw-r--r--data/.gitkeep0
-rw-r--r--database.go187
-rw-r--r--datalogger.go96
-rw-r--r--go.mod63
-rw-r--r--go.sum160
-rw-r--r--helpers.go5
-rw-r--r--kava.go168
-rw-r--r--logs/.gitkeep0
-rw-r--r--lru.go24
-rw-r--r--main.go133
-rw-r--r--mestastic.go90
-rw-r--r--mqtt.go205
-rw-r--r--node.go598
-rw-r--r--nodeadmin.go392
-rw-r--r--noderxloop.go280
-rw-r--r--timers.go23
19 files changed, 2523 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2fdfb0b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+/data
+/config.json
+/logs \ No newline at end of file
diff --git a/config.go b/config.go
new file mode 100644
index 0000000..4698998
--- /dev/null
+++ b/config.go
@@ -0,0 +1,53 @@
+package main
+
+import (
+ "encoding/json"
+ "os"
+)
+
+type Config struct {
+ NodeID []byte `json:"nodeID"`
+ Name string `json:"name"`
+ ShortName string `json:"shortName"`
+ DefaultHops uint32 `json:"defaultHops"`
+ MaxHops uint32 `json:"maxHops"`
+ NodeBroadcastInterval float64 `json:"nodeBroadcastInterval"`
+ X25519Key []byte `json:"x25519Key"`
+ Channels []struct {
+ Name string `json:"name"`
+ PSK []byte `json:"psk"`
+ } `json:"channels"`
+ Position struct {
+ BroadcastInterval float64 `json:"broadcastInterval"`
+ Latitude float64 `json:"latitude"`
+ Longitude float64 `json:"longitude"`
+ Altitude float64 `json:"altitude"`
+ } `json:"position"`
+ DeviceMetrics struct {
+ BroadcastInterval float64 `json:"broadcastInterval"`
+ UPSAddress string `json:"upsAddress"`
+ } `json:"deviceMetrics"`
+ KavaModem struct {
+ Address string `json:"address"`
+ } `json:"kavaModem"`
+ MqttIf struct {
+ Address string `json:"address"`
+ Username string `json:"username"`
+ Password string `json:"password"`
+ ClientID string `json:"clientId"`
+ Root string `json:"root"`
+ } `json:"mqttIf"`
+}
+
+func ReadConfig(filename string) (*Config, error) {
+ cfg := Config{}
+ f, err := os.Open(filename)
+ if err != nil {
+ return nil, err
+ }
+ dec := json.NewDecoder(f)
+ if err := dec.Decode(&cfg); err != nil {
+ return nil, err
+ }
+ return &cfg, nil
+}
diff --git a/config.json.example b/config.json.example
new file mode 100644
index 0000000..14ab023
--- /dev/null
+++ b/config.json.example
@@ -0,0 +1,43 @@
+{
+ "nodeID": "7rpREQ==",
+ "name": "oti sho",
+ "shortName": "oti",
+ "defaultHops": 1,
+ "maxHops": 1,
+ "nodeBroadcastInterval": 1200,
+ "x25519Key": "",
+ "position": {
+ "broadcastInterval": 86400,
+ "latitude": 51.5014760,
+ "longitude": -0.1406340,
+ "altitude": 2.0
+ },
+ "deviceMetrics": {
+ "broadcastInterval": 300,
+ "upsAddress": "upsname@localhost"
+ },
+ "channels": [
+ {
+ "name": "ShortFast",
+ "psk": "1PG7OiApB1nwvP+rz05pAQ=="
+ },
+ {
+ "name": "Blagoevgrad",
+ "psk": "1PG7OiApB1nwvP+rz05pAQ=="
+ },
+ {
+ "name": "Bulgaria",
+ "psk": "1PG7OiApB1nwvP+rz05pAQ=="
+ }
+ ],
+ "kavaModem": {
+ "address": "127.0.0.1:3333"
+ },
+ "mqttIf": {
+ "address": "mqtt://127.0.0.1:1883",
+ "username": "meshdev",
+ "password": "large4cats",
+ "clientId": "motisho",
+ "root": "msh/Bulgaria"
+ }
+}
diff --git a/data/.gitkeep b/data/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/data/.gitkeep
diff --git a/database.go b/database.go
new file mode 100644
index 0000000..dc410d8
--- /dev/null
+++ b/database.go
@@ -0,0 +1,187 @@
+package main
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/cockroachdb/pebble/v2"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "google.golang.org/protobuf/proto"
+)
+
+type BucketEnum uint16
+
+const (
+ BucketState BucketEnum = iota
+ BucketNodeInfo
+ BucketFromRadio
+)
+
+type StateKeyEnum uint32
+
+const (
+ StateKeyNull StateKeyEnum = iota
+ StateKeyPhoneLastUpdate
+)
+
+var (
+ errNotExists = errors.New("not exist")
+)
+
+type Database struct {
+ p *pebble.DB
+}
+
+func NewDB(dbPath string) (*Database, error) {
+ p, err := pebble.Open(dbPath, &pebble.Options{})
+ if err != nil {
+ return nil, fmt.Errorf("open database: %w", err)
+ }
+ return &Database{
+ p: p,
+ }, nil
+}
+
+func (db *Database) Close() error {
+ return db.p.Close()
+}
+
+func (db *Database) keyUint32(bucket BucketEnum, k uint32) []byte {
+ var key [6]byte
+ binary.BigEndian.PutUint16(key[0:2], uint16(bucket))
+ binary.BigEndian.PutUint32(key[2:6], k)
+ return key[:]
+}
+
+func (db *Database) keyUint64(bucket BucketEnum, k uint64) []byte {
+ var key [10]byte
+ binary.BigEndian.PutUint16(key[0:2], uint16(bucket))
+ binary.BigEndian.PutUint64(key[2:10], k)
+ return key[:]
+}
+
+func (db *Database) Get(key []byte) ([]byte, error) {
+ data, rd, err := db.p.Get(key)
+ if err == pebble.ErrNotFound {
+ return nil, errNotExists
+ } else if err != nil {
+ return nil, err
+ }
+ defer rd.Close()
+ return data, nil
+}
+
+func (db *Database) Set(key []byte, value []byte) error {
+ return db.p.Set(key, value, pebble.Sync)
+}
+
+func (db *Database) GetState(key StateKeyEnum) ([]byte, error) {
+ data, rd, err := db.p.Get(db.keyUint32(BucketState, uint32(key)))
+ if err == pebble.ErrNotFound {
+ return nil, errNotExists
+ } else if err != nil {
+ return nil, err
+ }
+ defer rd.Close()
+ return data, nil
+}
+
+func (db *Database) SetState(key StateKeyEnum, value []byte) error {
+ return db.p.Set(db.keyUint32(BucketState, uint32(key)), value, pebble.Sync)
+}
+
+func (db *Database) SetNodeInfo(num uint32, ni *pb.NodeInfo) error {
+ key := db.keyUint32(BucketNodeInfo, num)
+ if num != 0 && ni.Num == 0 {
+ return db.p.Delete(key, pebble.Sync)
+ }
+ ni.Num = num
+ data, err := proto.Marshal(ni)
+ if err != nil {
+ return err
+ }
+ return db.p.Set(key, data, pebble.Sync)
+}
+
+func (db *Database) GetNodeInfo(num uint32) (*pb.NodeInfo, error) {
+ data, rd, err := db.p.Get(db.keyUint32(BucketNodeInfo, num))
+ if err == pebble.ErrNotFound {
+ return nil, errNotExists
+ } else if err != nil {
+ return nil, err
+ }
+ defer rd.Close()
+ var ni pb.NodeInfo
+ if err = proto.Unmarshal(data, &ni); err != nil {
+ return nil, err
+ }
+ return &ni, nil
+}
+
+func (db *Database) ListNodeInfo() ([]*pb.NodeInfo, error) {
+ it, err := db.p.NewIter(&pebble.IterOptions{
+ LowerBound: db.keyUint32(BucketNodeInfo, uint32(0)),
+ UpperBound: db.keyUint32(BucketNodeInfo+1, uint32(0)),
+ })
+ if err != nil {
+ return nil, err
+ }
+ defer it.Close()
+ var list []*pb.NodeInfo
+ for it.First(); it.Valid(); it.Next() {
+ key := it.Key()
+ data := it.Value()
+ ni := &pb.NodeInfo{}
+ if err = proto.Unmarshal(data, ni); err != nil {
+ return nil, err
+ }
+ ni.Num = binary.BigEndian.Uint32(key[2:6])
+ list = append(list, ni)
+ }
+ return list, nil
+}
+
+func (db *Database) StoreFromRadio(tsMicro int64, msg *pb.FromRadio) error {
+ b, err := proto.Marshal(msg)
+ if err != nil {
+ return err
+ }
+ return db.p.Set(db.keyUint64(BucketFromRadio, uint64(tsMicro)), b, pebble.Sync)
+}
+
+func (db *Database) ReadFromRadio(start, end time.Time) (<-chan *pb.FromRadio, <-chan error) {
+ c := make(chan *pb.FromRadio)
+ errc := make(chan error, 1)
+ lowerLimit := uint64(start.UnixMicro())
+ upperLimit := uint64(end.UnixMicro())
+ go func() {
+ defer close(c)
+ defer close(errc)
+ it, err := db.p.NewIter(&pebble.IterOptions{
+ LowerBound: db.keyUint64(BucketFromRadio, lowerLimit),
+ UpperBound: db.keyUint64(BucketFromRadio+1, upperLimit),
+ })
+ if err != nil {
+ errc <- err
+ return
+ }
+ defer it.Close()
+ for it.First(); it.Valid(); it.Next() {
+ key := it.Key()
+ data := it.Value()
+ tsMicro := binary.BigEndian.Uint64(key[2:])
+ if tsMicro > upperLimit {
+ continue
+ }
+ msg := &pb.FromRadio{}
+ if err = proto.Unmarshal(data, msg); err != nil {
+ errc <- err
+ return
+ }
+ c <- msg
+ }
+ }()
+ return c, errc
+}
diff --git a/datalogger.go b/datalogger.go
new file mode 100644
index 0000000..6cd5106
--- /dev/null
+++ b/datalogger.go
@@ -0,0 +1,96 @@
+package main
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/meshnet-gophers/meshtastic-go"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+)
+
+type DataLogger struct {
+ data *os.File
+ text *os.File
+}
+
+func NewDataLogger(logsDir string) (*DataLogger, error) {
+ var err error
+ var d DataLogger
+ d.data, err = os.OpenFile(filepath.Join(logsDir, "data.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
+ if err != nil {
+ d.Close()
+ return nil, err
+ }
+ d.text, err = os.OpenFile(filepath.Join(logsDir, "text.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
+ if err != nil {
+ d.Close()
+ return nil, err
+ }
+
+ return &d, nil
+}
+
+func (d *DataLogger) Close() error {
+ if d.data != nil {
+ d.data.Close()
+ }
+ if d.text != nil {
+ d.text.Close()
+ }
+ return nil
+}
+
+func formatNodeInfo(ni *pb.NodeInfo) string {
+ var str string
+ if ni != nil {
+ if ni.User != nil && ni.User.Id != "" {
+ str = fmt.Sprintf("%s (%s) %s", ni.User.Id, ni.User.ShortName, ni.User.LongName)
+ } else {
+ str = meshtastic.NodeID(ni.Num).String()
+ }
+ } else {
+ str = "???"
+ }
+ return str
+}
+
+func (d *DataLogger) LogPacket(p *pb.MeshPacket, c *pb.ChannelSettings, src *pb.NodeInfo, dst *pb.NodeInfo, data *pb.Data, payload any) error {
+ source := formatNodeInfo(src)
+ destination := formatNodeInfo(dst)
+ header := fmt.Sprintf("%s | #%d:%s '%s' >> '%s' [rssi:%d snr:%f hs:%d, hl:%d]", time.Unix(int64(p.RxTime), 0), c.ChannelNum, c.Name, source, destination, p.RxRssi, p.RxSnr, p.HopStart, p.HopLimit)
+ var s fmt.Stringer
+ var log string
+ var logfile *os.File = d.data
+ switch data.Portnum {
+ case pb.PortNum_NODEINFO_APP:
+ s, _ = payload.(*pb.User)
+ case pb.PortNum_TELEMETRY_APP:
+ s, _ = payload.(*pb.Telemetry)
+ case pb.PortNum_POSITION_APP:
+ s, _ = payload.(*pb.Position)
+ case pb.PortNum_ROUTING_APP:
+ s, _ = payload.(*pb.Routing)
+ case pb.PortNum_TRACEROUTE_APP:
+ s, _ = payload.(*pb.RouteDiscovery)
+ case pb.PortNum_AUDIO_APP:
+ log = fmt.Sprintf("Audio message (%dB, %.2fs@800bps)", len(data.Payload), (float32(len(data.Payload)) / 100.0))
+ case pb.PortNum_TEXT_MESSAGE_APP:
+ log = string(data.Payload)
+ logfile = d.text
+ default:
+ log = p.String()
+ }
+ if log == "" && s != nil {
+ log = s.String()
+ }
+ if log != "" && logfile != nil {
+ output := fmt.Sprintf("%s : [%s] %T{ %s }\n", header, data.Portnum, payload, log)
+ if _, err := logfile.Write([]byte(output)); err != nil {
+ return err
+ }
+ return logfile.Sync()
+ }
+ return nil
+}
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..c6d5b18
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,63 @@
+module go.metala.org/gotishosdr
+
+go 1.24.1
+
+require (
+ github.com/charmbracelet/log v0.4.2
+ github.com/cockroachdb/pebble/v2 v2.0.6
+ github.com/eclipse/paho.golang v0.22.0
+ github.com/elastic/go-freelru v0.16.0
+ github.com/meshnet-gophers/meshtastic-go v0.1.7
+ github.com/pion/dtls/v3 v3.0.6
+ github.com/spf13/pflag v1.0.6
+ golang.org/x/crypto v0.39.0
+ golang.org/x/sync v0.15.0
+ google.golang.org/protobuf v1.36.6
+)
+
+require (
+ github.com/DataDog/zstd v1.5.7 // indirect
+ github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
+ github.com/beorn7/perks v1.0.1 // indirect
+ github.com/cespare/xxhash/v2 v2.3.0 // indirect
+ github.com/charmbracelet/colorprofile v0.3.1 // indirect
+ github.com/charmbracelet/lipgloss v1.1.0 // indirect
+ github.com/charmbracelet/x/ansi v0.9.3 // indirect
+ github.com/charmbracelet/x/cellbuf v0.0.13 // indirect
+ github.com/charmbracelet/x/term v0.2.1 // indirect
+ github.com/cockroachdb/crlib v0.0.0-20250617202621-0794c595bbe6 // indirect
+ github.com/cockroachdb/errors v1.12.0 // indirect
+ github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 // indirect
+ github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506 // indirect
+ github.com/cockroachdb/redact v1.1.6 // indirect
+ github.com/cockroachdb/swiss v0.0.0-20250624142022-d6e517c1d961 // indirect
+ github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb // indirect
+ github.com/getsentry/sentry-go v0.34.0 // indirect
+ github.com/go-logfmt/logfmt v0.6.0 // indirect
+ github.com/gogo/protobuf v1.3.2 // indirect
+ github.com/golang/snappy v1.0.0 // indirect
+ github.com/gorilla/websocket v1.5.3 // indirect
+ github.com/klauspost/compress v1.18.0 // indirect
+ github.com/kr/pretty v0.3.1 // indirect
+ github.com/kr/text v0.2.0 // indirect
+ github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
+ github.com/mattn/go-isatty v0.0.20 // indirect
+ github.com/mattn/go-runewidth v0.0.16 // indirect
+ github.com/muesli/termenv v0.16.0 // indirect
+ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
+ github.com/pkg/errors v0.9.1 // indirect
+ github.com/planetscale/vtprotobuf v0.6.0 // indirect
+ github.com/prometheus/client_golang v1.22.0 // indirect
+ github.com/prometheus/client_model v0.6.2 // indirect
+ github.com/prometheus/common v0.65.0 // indirect
+ github.com/prometheus/procfs v0.17.0 // indirect
+ github.com/rivo/uniseg v0.4.7 // indirect
+ github.com/rogpeppe/go-internal v1.14.1 // indirect
+ github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
+ golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
+ golang.org/x/net v0.40.0 // indirect
+ golang.org/x/sys v0.33.0 // indirect
+ golang.org/x/text v0.26.0 // indirect
+)
+
+replace github.com/meshnet-gophers/meshtastic-go v0.1.7 => ../meshtastic-go
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..d0c3070
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,160 @@
+github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE=
+github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
+github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo=
+github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f/go.mod h1:tMDTce/yLLN/SK8gMOxQfnyeMeCg8KGzp0D1cbECEeo=
+github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
+github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/charmbracelet/colorprofile v0.3.1 h1:k8dTHMd7fgw4bnFd7jXTLZrSU/CQrKnL3m+AxCzDz40=
+github.com/charmbracelet/colorprofile v0.3.1/go.mod h1:/GkGusxNs8VB/RSOh3fu0TJmQ4ICMMPApIIVn0KszZ0=
+github.com/charmbracelet/lipgloss v1.1.0 h1:vYXsiLHVkK7fp74RkV7b2kq9+zDLoEU4MZoFqR/noCY=
+github.com/charmbracelet/lipgloss v1.1.0/go.mod h1:/6Q8FR2o+kj8rz4Dq0zQc3vYf7X+B0binUUBwA0aL30=
+github.com/charmbracelet/log v0.4.2 h1:hYt8Qj6a8yLnvR+h7MwsJv/XvmBJXiueUcI3cIxsyig=
+github.com/charmbracelet/log v0.4.2/go.mod h1:qifHGX/tc7eluv2R6pWIpyHDDrrb/AG71Pf2ysQu5nw=
+github.com/charmbracelet/x/ansi v0.9.3 h1:BXt5DHS/MKF+LjuK4huWrC6NCvHtexww7dMayh6GXd0=
+github.com/charmbracelet/x/ansi v0.9.3/go.mod h1:3RQDQ6lDnROptfpWuUVIUG64bD2g2BgntdxH0Ya5TeE=
+github.com/charmbracelet/x/cellbuf v0.0.13 h1:/KBBKHuVRbq1lYx5BzEHBAFBP8VcQzJejZ/IA3iR28k=
+github.com/charmbracelet/x/cellbuf v0.0.13/go.mod h1:xe0nKWGd3eJgtqZRaN9RjMtK7xUYchjzPr7q6kcvCCs=
+github.com/charmbracelet/x/term v0.2.1 h1:AQeHeLZ1OqSXhrAWpYUtZyX1T3zVxfpZuEQMIQaGIAQ=
+github.com/charmbracelet/x/term v0.2.1/go.mod h1:oQ4enTYFV7QN4m0i9mzHrViD7TQKvNEEkHUMCmsxdUg=
+github.com/cockroachdb/crlib v0.0.0-20250617202621-0794c595bbe6 h1:PZVolkXzVqPQQZ7Jm8/lGpI9ZM086mB55oOWqy0wG6E=
+github.com/cockroachdb/crlib v0.0.0-20250617202621-0794c595bbe6/go.mod h1:Gq51ZeKaFCXk6QwuGM0w1dnaOqc/F5zKT2zA9D6Xeac=
+github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056 h1:slXychO2uDM6hYRu4c0pD0udNI8uObfeKN6UInWViS8=
+github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU=
+github.com/cockroachdb/errors v1.12.0 h1:d7oCs6vuIMUQRVbi6jWWWEJZahLCfJpnJSVobd1/sUo=
+github.com/cockroachdb/errors v1.12.0/go.mod h1:SvzfYNNBshAVbZ8wzNc/UPK3w1vf0dKDUP41ucAIf7g=
+github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 h1:pU88SPhIFid6/k0egdR5V6eALQYq2qbSmukrkgIh/0A=
+github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M=
+github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506 h1:ASDL+UJcILMqgNeV5jiqR4j+sTuvQNHdf2chuKj1M5k=
+github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506/go.mod h1:Mw7HqKr2kdtu6aYGn3tPmAftiP3QPX63LdK/zcariIo=
+github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA=
+github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA=
+github.com/cockroachdb/pebble/v2 v2.0.6 h1:eL54kX2AKp1ePJ/8vq4IO3xIEPpvVjlSP12dlLYilyE=
+github.com/cockroachdb/pebble/v2 v2.0.6/go.mod h1:un1DXG73PKw3F7Ndd30YactyvsFviI9Fuhe0tENdnyA=
+github.com/cockroachdb/redact v1.1.6 h1:zXJBwDZ84xJNlHl1rMyCojqyIxv+7YUpQiJLQ7n4314=
+github.com/cockroachdb/redact v1.1.6/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
+github.com/cockroachdb/swiss v0.0.0-20250624142022-d6e517c1d961 h1:Nua446ru3juLHLZd4AwKNzClZgL1co3pUPGv3o8FlcA=
+github.com/cockroachdb/swiss v0.0.0-20250624142022-d6e517c1d961/go.mod h1:yBRu/cnL4ks9bgy4vAASdjIW+/xMlFwuHKqtmh3GZQg=
+github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb h1:3bCgBvB8PbJVMX1ouCcSIxvsqKPYM7gs72o0zC76n9g=
+github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/eclipse/paho.golang v0.22.0 h1:JhhUngr8TBlyUZDZw/L6WVayPi9qmSmdWeki48i5AVE=
+github.com/eclipse/paho.golang v0.22.0/go.mod h1:9ZiYJ93iEfGRJri8tErNeStPKLXIGBHiqbHV74t5pqI=
+github.com/elastic/go-freelru v0.16.0 h1:gG2HJ1WXN2tNl5/p40JS/l59HjvjRhjyAa+oFTRArYs=
+github.com/elastic/go-freelru v0.16.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I=
+github.com/getsentry/sentry-go v0.34.0 h1:1FCHBVp8TfSc8L10zqSwXUZNiOSF+10qw4czjarTiY4=
+github.com/getsentry/sentry-go v0.34.0/go.mod h1:C55omcY9ChRQIUcVcGcs+Zdy4ZpQGvNJ7JYHIoSWOtE=
+github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9 h1:r5GgOLGbza2wVHRzK7aAj6lWZjfbAwiu/RDCVOKjRyM=
+github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9/go.mod h1:106OIgooyS7OzLDOpUGgm9fA3bQENb/cFSyyBmMoJDs=
+github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
+github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
+github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
+github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
+github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
+github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
+github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
+github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
+github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
+github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
+github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
+github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
+github.com/muesli/termenv v0.16.0 h1:S5AlUN9dENB57rsbnkPyfdGuWIlkmzJjbFf0Tf5FWUc=
+github.com/muesli/termenv v0.16.0/go.mod h1:ZRfOIKPFDYQoDFF4Olj7/QJbW60Ol/kL1pU3VfY/Cnk=
+github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
+github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
+github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
+github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
+github.com/pion/dtls/v3 v3.0.6 h1:7Hkd8WhAJNbRgq9RgdNh1aaWlZlGpYTzdqjy9x9sK2E=
+github.com/pion/dtls/v3 v3.0.6/go.mod h1:iJxNQ3Uhn1NZWOMWlLxEEHAN5yX7GyPvvKw04v9bzYU=
+github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/planetscale/vtprotobuf v0.6.0 h1:nBeETjudeJ5ZgBHUz1fVHvbqUKnYOXNhsIEabROxmNA=
+github.com/planetscale/vtprotobuf v0.6.0/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
+github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0=
+github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
+github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
+github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2VzE=
+github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
+github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0=
+github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw=
+github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
+github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
+github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
+github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
+github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
+github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
+github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
+github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
+github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no=
+github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
+go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
+golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
+golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
+golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
+golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
+golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
+golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
+golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
+google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/helpers.go b/helpers.go
new file mode 100644
index 0000000..882ddec
--- /dev/null
+++ b/helpers.go
@@ -0,0 +1,5 @@
+package main
+
+func Ptr[T any](v T) *T {
+ return &v
+}
diff --git a/kava.go b/kava.go
new file mode 100644
index 0000000..7c2ea65
--- /dev/null
+++ b/kava.go
@@ -0,0 +1,168 @@
+package main
+
+import (
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "time"
+
+ "github.com/charmbracelet/log"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "github.com/meshnet-gophers/meshtastic-go/radio"
+)
+
+var (
+ errInvalidOpcode = errors.New("invalid opcode")
+ errInvalidLength = errors.New("invalid length")
+)
+
+// Implements MeshIf
+type KavaIf struct {
+ Address string
+ KeepAlivePeriod time.Duration
+ Channels *pb.ChannelSet
+
+ logger *log.Logger
+ conn net.Conn
+ modemTimeOffset uint64
+}
+
+func NewKavaConn(address string, keepAlive time.Duration, channels *pb.ChannelSet) *KavaIf {
+ logger := log.WithPrefix("Kava")
+ return &KavaIf{
+ Address: address,
+ KeepAlivePeriod: keepAlive,
+ Channels: channels,
+ logger: logger,
+ }
+}
+
+func (k *KavaIf) Open() error {
+ conn, err := net.Dial("tcp", k.Address)
+ if err != nil {
+ return fmt.Errorf("connecting to modem: %w", err)
+ }
+ if k.KeepAlivePeriod > 0 {
+ tcpconn := conn.(*net.TCPConn)
+ tcpconn.SetKeepAlive(true)
+ tcpconn.SetKeepAlivePeriod(k.KeepAlivePeriod)
+ }
+
+ k.conn = conn
+ return nil
+}
+
+func (k *KavaIf) Close() error {
+ return k.conn.Close()
+}
+
+func (k *KavaIf) ReadMeshPacket() (*pb.MeshPacket, uint64, error) {
+ var header [4]byte
+ var err error
+ for {
+ if _, err = io.ReadFull(k.conn, header[0:4]); err != nil {
+ return nil, 0, err
+ }
+ opcode := binary.LittleEndian.Uint16(header[0:2])
+ length := binary.LittleEndian.Uint16(header[2:4])
+ if length <= 0 {
+ continue
+ } else if length > 255+12 {
+ return nil, 0, errInvalidLength
+ }
+ switch opcode {
+ case 0x1007:
+ var ts [8]byte
+ if _, err = io.ReadFull(k.conn, ts[:]); err != nil {
+ return nil, 0, err
+ }
+ uptime := binary.LittleEndian.Uint64(ts[:])
+ k.modemTimeOffset = uint64(time.Now().UnixMicro()) - uptime
+ k.logger.Info("}} Modem uptime", "µs", uptime, "Δ-to-now", fmt.Sprintf("%+d", k.modemTimeOffset))
+ continue
+ case 0x0001:
+ var message [272]byte
+ if _, err := io.ReadFull(k.conn, message[:length]); err != nil {
+ return nil, 0, err
+ }
+ if length < (12 /*kava*/ + 16 /*m8c*/) {
+ return nil, 0, errInvalidLength
+ }
+
+ modemTs := binary.LittleEndian.Uint64(message[0:8])
+ rssi := int16(binary.LittleEndian.Uint16(message[8:10]))
+ snr := int16(binary.LittleEndian.Uint16(message[10:12]))
+ ts := int64(modemTs + k.modemTimeOffset)
+ timestamp := time.UnixMicro(ts)
+ data := message[12:length]
+ flags := RadioHeaderFlag(data[12])
+ channelHash := data[13]
+ payload := data[16:]
+ p := pb.MeshPacket{
+ To: binary.LittleEndian.Uint32(data[0:4]),
+ From: binary.LittleEndian.Uint32(data[4:8]),
+ Id: binary.LittleEndian.Uint32(data[8:12]),
+ RxTime: uint32(timestamp.Unix()),
+ RxSnr: float32(snr),
+ RxRssi: int32(rssi),
+ HopLimit: uint32(flags.HopLimit()),
+ HopStart: uint32(flags.HopStart()),
+ WantAck: flags.WantACK() == 1,
+ ViaMqtt: flags.MQTT() == 1,
+ NextHop: uint32(data[14]),
+ RelayNode: uint32(data[15]),
+ Channel: uint32(channelHash),
+ PkiEncrypted: channelHash == 0x00,
+ PayloadVariant: &pb.MeshPacket_Encrypted{Encrypted: payload},
+ }
+ return &p, uint64(ts), nil
+ default:
+ return nil, 0, errInvalidOpcode
+ }
+
+ }
+}
+
+func (k *KavaIf) WriteMeshPacket(p *pb.MeshPacket) error {
+ viaMQTT := uint32(bool2Int(p.ViaMqtt))
+ wantAck := uint32(bool2Int(p.WantAck))
+ flags := (p.HopStart << 5) | (viaMQTT << 4) | (wantAck << 3) | p.HopLimit
+
+ var chash uint32
+ if p.PkiEncrypted {
+ chash = 0x00
+ } else {
+ channel := k.Channels.Settings[p.Channel]
+ chash, _ = radio.ChannelHash(channel.Name, channel.Psk)
+ }
+
+ buf := bytes.NewBuffer(nil)
+ binary.Write(buf, binary.LittleEndian, uint32(p.To))
+ binary.Write(buf, binary.LittleEndian, uint32(p.From))
+ binary.Write(buf, binary.LittleEndian, uint32(p.Id))
+ binary.Write(buf, binary.LittleEndian, uint8(flags))
+ binary.Write(buf, binary.LittleEndian, uint8(chash))
+ binary.Write(buf, binary.LittleEndian, uint8(p.NextHop))
+ binary.Write(buf, binary.LittleEndian, uint8(p.RelayNode))
+ buf.Write(p.GetEncrypted())
+ b := buf.Bytes()
+
+ modemCmd := append([]byte{1, 0, byte(len(b)), 0}, b...)
+ if _, err := k.conn.Write(modemCmd); err != nil {
+ return err
+ }
+ return nil
+}
+
+func bool2Int(b bool) int {
+ var i int
+ if b {
+ i = 1
+ } else {
+ i = 0
+ }
+ return i
+}
diff --git a/logs/.gitkeep b/logs/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/logs/.gitkeep
diff --git a/lru.go b/lru.go
new file mode 100644
index 0000000..e0a386f
--- /dev/null
+++ b/lru.go
@@ -0,0 +1,24 @@
+package main
+
+import "encoding/binary"
+
+const (
+ // FNV-1a
+ offset32 = uint32(2166136261)
+ prime32 = uint32(16777619)
+)
+
+func hashPacket(p PacketTruncated) uint32 {
+ h := offset32
+ b := make([]byte, 12)
+ binary.BigEndian.PutUint32(b[0:4], p.To)
+ binary.BigEndian.PutUint32(b[4:8], p.From)
+ binary.BigEndian.PutUint32(b[8:12], p.ID)
+ for ; len(b) >= 4; b = b[4:] {
+ h = (h ^ uint32(b[0])) * prime32
+ h = (h ^ uint32(b[1])) * prime32
+ h = (h ^ uint32(b[2])) * prime32
+ h = (h ^ uint32(b[3])) * prime32
+ }
+ return h
+}
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..710f195
--- /dev/null
+++ b/main.go
@@ -0,0 +1,133 @@
+package main
+
+import (
+ "context"
+ "encoding/base64"
+ "encoding/binary"
+ "fmt"
+ "os"
+ "os/signal"
+ "time"
+
+ "github.com/charmbracelet/log"
+ "github.com/meshnet-gophers/meshtastic-go"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ flag "github.com/spf13/pflag"
+ "golang.org/x/crypto/curve25519"
+)
+
+func main() {
+ var args struct {
+ DbDir string
+ LogsDir string
+ ConfigPath string
+ Interface string
+ Relentless bool
+ }
+ flag.StringVarP(&args.DbDir, "dbdir", "", "./data/", "The path to the database directory")
+ flag.StringVarP(&args.LogsDir, "logsdir", "", "./logs/", "The path to the logs directory")
+ flag.StringVarP(&args.ConfigPath, "config", "c", "./config.json", "The path to the config.json")
+ flag.StringVarP(&args.Interface, "interface", "i", "kava", "Mesh interface (kava, mqtt)")
+ flag.BoolVarP(&args.Relentless, "relentless", "r", false, "Relentless reconnect to modem")
+ flag.Parse()
+
+ log.SetLevel(log.DebugLevel)
+ cfg, err := ReadConfig(args.ConfigPath)
+ if err != nil {
+ log.Fatal(err)
+ }
+ pbChannels := make([]*pb.ChannelSettings, len(cfg.Channels))
+ for i, channel := range cfg.Channels {
+ var pbChannelSettings = pb.ChannelSettings{
+ Name: channel.Name,
+ Psk: channel.PSK,
+ }
+ pbChannels[i] = &pbChannelSettings
+ }
+
+ var nodeID meshtastic.NodeID
+ if len(cfg.NodeID) == 4 {
+ nodeID = meshtastic.NodeID(binary.BigEndian.Uint32(cfg.NodeID))
+ } else if len(cfg.NodeID) == 0 {
+ nodeID, err = meshtastic.RandomNodeID()
+ if err != nil {
+ log.Fatal(err)
+ }
+ }
+ log.Print("Config:")
+ log.Print(" Database", "directory", args.DbDir)
+ log.Print(" Logs", "directory", args.LogsDir)
+ log.Print(" Node", "id", nodeID, "short", cfg.ShortName, "name", cfg.Name)
+ nodeConfig := NodeConfig{
+ DatabaseDir: args.DbDir,
+ LogsDir: args.LogsDir,
+ NodeID: nodeID,
+ LongName: cfg.Name,
+ ShortName: cfg.ShortName,
+ DefaultHops: cfg.DefaultHops,
+ MaxHops: cfg.MaxHops,
+ Channels: &pb.ChannelSet{Settings: pbChannels},
+ BroadcastNodeInfoInterval: time.Duration(cfg.NodeBroadcastInterval * float64(time.Second)),
+ BroadcastPositionInterval: time.Duration(cfg.Position.BroadcastInterval * float64(time.Second)),
+ // Position paramters
+ PositionLatitudeI: int32(cfg.Position.Latitude * 1.e7),
+ PositionLongitudeI: int32(cfg.Position.Longitude * 1.e7),
+ PositionAltitude: int32(cfg.Position.Altitude),
+ // Device Metrics
+ DeviceMetricsBroadcastInterval: time.Duration(cfg.DeviceMetrics.BroadcastInterval * float64(time.Second)),
+ DeviceMetricsUPSAddress: cfg.DeviceMetrics.UPSAddress,
+ TCPListenAddr: "0.0.0.0:4403",
+ }
+ if len(cfg.X25519Key) == 32 {
+ nodeConfig.X25519SecretKey = cfg.X25519Key
+ nodeConfig.X25519PublicKey, err = curve25519.X25519(nodeConfig.X25519SecretKey, curve25519.Basepoint)
+ if err != nil {
+ log.Fatal(err)
+ }
+ log.Print(" X25519PublicKey", "base64", base64.StdEncoding.EncodeToString(nodeConfig.X25519PublicKey), "hex", fmt.Sprintf("%x", nodeConfig.X25519PublicKey))
+ } else if len(cfg.X25519Key) != 0 {
+ log.Fatal("invalid x25519 key")
+ }
+
+ if args.Relentless {
+ log.Warn("Starting Node in 'relentless' mode...")
+ } else {
+ log.Info("Starting Node...")
+ }
+
+ n, err := NewNode(nodeConfig)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer n.Close()
+ ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
+ for {
+ var iface MeshIf
+ switch args.Interface {
+ case "kava":
+ iface = NewKavaConn(cfg.KavaModem.Address, 10*time.Second, n.cfg.Channels)
+ case "mqtt":
+ iface = NewMqttIf(cfg.MqttIf.Address, cfg.MqttIf.Username, cfg.MqttIf.Password, cfg.MqttIf.Root, cfg.MqttIf.ClientID, meshtastic.NodeID(nodeID), n.cfg.Channels)
+ default:
+ log.Fatal("invalid argument", "interface", args.Interface)
+ }
+
+ if err := n.Run(ctx, iface); err != nil {
+ log.Error("!! Node.Run()", "err", err)
+ }
+ if err := ctx.Err(); err != nil {
+ log.Error("!! Caught interrupt!")
+ break
+ }
+ if !args.Relentless {
+ break
+ }
+ log.Warn("Reconnecting in 30 seconds...")
+ ticker := time.NewTicker(30 * time.Second)
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ }
+ }
+}
diff --git a/mestastic.go b/mestastic.go
new file mode 100644
index 0000000..d76165b
--- /dev/null
+++ b/mestastic.go
@@ -0,0 +1,90 @@
+package main
+
+import (
+ "encoding/binary"
+
+ "github.com/meshnet-gophers/meshtastic-go"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "google.golang.org/protobuf/proto"
+)
+
+const (
+ FlagViaMQTT = (1 << 4)
+ FlagWantACK = (1 << 3)
+)
+
+var (
+ broadcastID = meshtastic.BroadcastNodeID.Uint32()
+)
+
+var pkcChan = &pb.ChannelSettings{
+ ChannelNum: 0xFFFFFFFF,
+ Name: "[PKC]",
+ Psk: nil,
+}
+
+type MeshIf interface {
+ Open() error
+ ReadMeshPacket() (*pb.MeshPacket, uint64, error)
+ WriteMeshPacket(*pb.MeshPacket) error
+ Close() error
+}
+
+type RadioHeaderFlag uint8
+
+func (r RadioHeaderFlag) WantACK() uint8 {
+ return uint8(r>>3) & 1
+}
+
+func (r RadioHeaderFlag) MQTT() uint8 {
+ return uint8(r>>4) & 1
+}
+
+func (r RadioHeaderFlag) HopStart() uint8 {
+ return uint8(r>>5) & 0b111
+}
+
+func (r RadioHeaderFlag) HopLimit() uint8 {
+ return uint8(r & 0b111)
+}
+
+type PacketTruncated struct {
+ To uint32
+ From uint32
+ ID uint32
+}
+
+func buildRoutingReply(p *pb.MeshPacket, err pb.Routing_Error) *pb.MeshPacket {
+ cb, _ := proto.Marshal(&pb.Routing{
+ Variant: &pb.Routing_ErrorReason{ErrorReason: err},
+ })
+ return &pb.MeshPacket{
+ // Reversed
+ To: p.From,
+ From: p.To,
+
+ PkiEncrypted: p.PkiEncrypted,
+ PublicKey: p.PublicKey,
+
+ // (N)ACK
+ Priority: pb.MeshPacket_ACK,
+ HopLimit: p.HopStart,
+ Channel: p.Channel,
+ PayloadVariant: &pb.MeshPacket_Decoded{
+ Decoded: &pb.Data{
+ Portnum: pb.PortNum_ROUTING_APP,
+ RequestId: p.Id,
+ Payload: cb,
+ },
+ },
+ }
+}
+
+func buildPkcNonce(id uint32, extra uint32, from uint32) []byte {
+ var buf [13]byte // zero-init
+ binary.LittleEndian.PutUint32(buf[0:4], id)
+ binary.LittleEndian.PutUint32(buf[4:8], extra)
+ binary.LittleEndian.PutUint32(buf[8:12], from)
+ buf[12] = 0 // last byte is zero
+ return buf[:]
+}
diff --git a/mqtt.go b/mqtt.go
new file mode 100644
index 0000000..2e83e8a
--- /dev/null
+++ b/mqtt.go
@@ -0,0 +1,205 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net/url"
+ "time"
+
+ "github.com/charmbracelet/log"
+ "github.com/eclipse/paho.golang/autopaho"
+ "github.com/eclipse/paho.golang/paho"
+ "github.com/meshnet-gophers/meshtastic-go"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "github.com/meshnet-gophers/meshtastic-go/radio"
+ "google.golang.org/protobuf/proto"
+)
+
+// Implements MeshIf
+type MqttIf struct {
+ Address string
+ Username string
+ Password string
+ ClientID string
+ Root string
+ GatewayID meshtastic.NodeID
+ Channels *pb.ChannelSet
+
+ logger *log.Logger
+ c *autopaho.ConnectionManager
+ ctx context.Context
+ stop context.CancelFunc
+ inputCh chan mqttServiceEnvelope
+ chIdMap map[string]uint8
+}
+
+type mqttServiceEnvelope struct {
+ timestamp time.Time
+ se *pb.ServiceEnvelope
+}
+
+func NewMqttIf(address string, username, password, root, clientID string, gatewayID meshtastic.NodeID, channels *pb.ChannelSet) *MqttIf {
+ logger := log.WithPrefix("MqttIf")
+ inputCh := make(chan mqttServiceEnvelope, 10)
+ return &MqttIf{
+ Address: address,
+ Username: username,
+ Password: password,
+ ClientID: clientID,
+ Root: root,
+ GatewayID: gatewayID,
+ Channels: channels,
+
+ logger: logger,
+ inputCh: inputCh,
+ }
+}
+
+func (m *MqttIf) channelPublishTopic(channelId string) string {
+ return fmt.Sprintf("%s/2/e/%s/%s", m.Root, channelId, m.GatewayID)
+}
+
+func (m *MqttIf) onConnectionUp(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
+ topics := []string{fmt.Sprintf("%s/2/e/PKI/#", m.Root)}
+ chIdMap := make(map[string]uint8)
+ for _, c := range m.Channels.Settings {
+ topics = append(topics, fmt.Sprintf("%s/2/e/%s/#", m.Root, c.Name))
+ channelHash, err := radio.ChannelHash(c.Name, c.Psk)
+ if err != nil {
+ m.logger.Errorf("failed to build channel hash: %s", err)
+ m.c.Disconnect(m.ctx)
+ return
+ }
+ chIdMap[c.Name] = uint8(channelHash)
+ }
+ m.chIdMap = chIdMap
+ subscriptions := make([]paho.SubscribeOptions, 0, len(topics))
+ for _, t := range topics {
+ subscriptions = append(subscriptions, paho.SubscribeOptions{Topic: t, QoS: 1})
+ }
+ if _, err := cm.Subscribe(m.ctx, &paho.Subscribe{Subscriptions: subscriptions}); err != nil {
+ m.logger.Errorf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
+ m.c.Disconnect(m.ctx)
+ return
+ }
+ m.logger.Info("Subscribed", "topics", topics)
+}
+
+func (m *MqttIf) onConnectionError(err error) {
+ m.logger.Warnf("error whilst attempting connection: %s\n", err)
+}
+
+func (m *MqttIf) onReceive(pr paho.PublishReceived) (bool, error) {
+ if pr.Packet.Duplicate() || pr.AlreadyHandled {
+ return false, nil
+ }
+ se := &pb.ServiceEnvelope{}
+ if err := proto.Unmarshal(pr.Packet.Payload, se); err != nil {
+ return false, fmt.Errorf("unmarshalling ServicePayload from Mqtt: %w", err)
+ }
+ if se.Packet.RxTime == 0 {
+ se.Packet.RxTime = uint32(time.Now().Unix())
+ }
+ m.inputCh <- mqttServiceEnvelope{timestamp: time.Now(), se: se}
+ return true, nil
+}
+
+func (m *MqttIf) onClientError(err error) {
+ m.logger.Warnf("client error: %s\n", err)
+}
+
+func (m *MqttIf) onServerDisconnect(d *paho.Disconnect) {
+ if d.Properties != nil {
+ fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString)
+ } else {
+ fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode)
+ }
+}
+
+func (m *MqttIf) Open() error {
+ m.ctx, m.stop = context.WithCancel(context.Background())
+ u, err := url.Parse(m.Address)
+ if err != nil {
+ return err
+ }
+ cliCfg := autopaho.ClientConfig{
+ ServerUrls: []*url.URL{u},
+ KeepAlive: 20, // Keepalive message should be sent every 20 seconds
+ CleanStartOnInitialConnection: false,
+ SessionExpiryInterval: 60,
+ OnConnectionUp: m.onConnectionUp,
+ OnConnectError: m.onConnectionError,
+ ConnectUsername: m.Username,
+ ConnectPassword: []byte(m.Password),
+ ClientConfig: paho.ClientConfig{
+ ClientID: m.ClientID,
+ OnPublishReceived: []func(paho.PublishReceived) (bool, error){m.onReceive},
+ OnClientError: m.onClientError,
+ OnServerDisconnect: m.onServerDisconnect,
+ },
+ }
+
+ m.c, err = autopaho.NewConnection(m.ctx, cliCfg) // starts process; will reconnect until context cancelled
+ if err != nil {
+ return err
+ }
+ if err = m.c.AwaitConnection(m.ctx); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (m *MqttIf) Close() error {
+ m.logger.Info("Closing MQTT connection...")
+ m.c.Disconnect(m.ctx)
+ m.stop()
+ close(m.inputCh)
+ <-m.ctx.Done()
+ return nil
+}
+
+func (m *MqttIf) ReadMeshPacket() (*pb.MeshPacket, uint64, error) {
+ if err := m.ctx.Err(); err != nil {
+ return nil, 0, err
+ }
+ p := <-m.inputCh
+ if p.se == nil || p.se.Packet == nil {
+ return nil, 0, errors.New("no input")
+ }
+ p.se.Packet.Channel = uint32(m.chIdMap[p.se.ChannelId])
+ return p.se.Packet, uint64(p.timestamp.UnixMicro()), nil
+}
+
+func (m *MqttIf) WriteMeshPacket(p *pb.MeshPacket) error {
+ var channelId string
+ if p.PkiEncrypted {
+ channelId = "PKI"
+ p.Priority = pb.MeshPacket_HIGH
+ } else {
+ channel := m.Channels.Settings[p.Channel]
+ channelId = channel.Name
+ }
+ if p.RxTime == 0 {
+ p.RxTime = uint32(time.Now().Unix())
+ }
+ p.RelayNode = p.RelayNode & 0xFF
+ p.NextHop = p.NextHop & 0xFF
+ p.Channel = 0 // Don't expose your channel number
+ se := &pb.ServiceEnvelope{
+ Packet: p,
+ ChannelId: channelId,
+ GatewayId: m.GatewayID.String(),
+ }
+ payload, err := proto.Marshal(se)
+ if err != nil {
+ return err
+ }
+ topic := m.channelPublishTopic(channelId)
+ m.c.Publish(m.ctx, &paho.Publish{
+ Topic: topic,
+ Payload: payload,
+ })
+ return nil
+}
diff --git a/node.go b/node.go
new file mode 100644
index 0000000..a2b8edb
--- /dev/null
+++ b/node.go
@@ -0,0 +1,598 @@
+package main
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "crypto/aes"
+ "crypto/rand"
+ "crypto/sha256"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "os/exec"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/charmbracelet/log"
+ "github.com/elastic/go-freelru"
+ "github.com/meshnet-gophers/meshtastic-go"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "github.com/meshnet-gophers/meshtastic-go/radio"
+ "github.com/meshnet-gophers/meshtastic-go/transport"
+ "github.com/pion/dtls/v3/pkg/crypto/ccm"
+ "golang.org/x/crypto/curve25519"
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/protobuf/proto"
+)
+
+const (
+ // MinAppVersion is the minimum app version supported by the emulated radio.
+ MinAppVersion = 30200
+ ROUTE_MAX_SIZE = 8
+)
+
+var (
+ errNoAdminConnection = errors.New("no admin connection")
+)
+
+// NodeConfig is the configuration for the emulated Radio.
+type NodeConfig struct {
+ // Database
+ DatabaseDir string
+
+ // Logs directory
+ LogsDir string
+
+ // Node configuration
+ // NodeID is the ID of the node.
+ NodeID meshtastic.NodeID
+ // LongName is the long name of the node.
+ LongName string
+ // ShortName is the short name of the node.
+ ShortName string
+
+ // MaxHops sets the maximum value for value for HopStart/HopLimit on Tx
+ MaxHops uint32
+ // DefaultHops sets the default value for HopStart/HopLimit on Tx
+ DefaultHops uint32
+
+ // Channels is the set of channels the radio will listen and transmit on.
+ // The first channel in the set is considered the primary channel and is used for broadcasting NodeInfo and Position
+ Channels *pb.ChannelSet
+ // BroadcastNodeInfoInterval is the interval at which the radio will broadcast a NodeInfo on the Primary channel.
+ // The zero value disables broadcasting NodeInfo.
+ BroadcastNodeInfoInterval time.Duration
+ // BroadcastPositionInterval is the interval at which the radio will broadcast Position on the Primary channel.
+ // The zero value disables broadcasting NodeInfo.
+ BroadcastPositionInterval time.Duration
+ // PositionLatitudeI is the latitude of the position which will be regularly broadcasted.
+ // This is in degrees multiplied by 1e7.
+ PositionLatitudeI int32
+ // PositionLongitudeI is the longitude of the position which will be regularly broadcasted.
+ // This is in degrees multiplied by 1e7.
+ PositionLongitudeI int32
+ // PositionAltitude is the altitude of the position which will be regularly broadcasted.
+ // This is in meters above MSL.
+ PositionAltitude int32
+ // TCPListenAddr is the address the emulated radio will listen on for TCP connections and offer the Client API over.
+ TCPListenAddr string
+
+ DeviceMetricsBroadcastInterval time.Duration
+ DeviceMetricsUPSAddress string
+ X25519SecretKey []byte
+ X25519PublicKey []byte
+}
+
+func (c *NodeConfig) validate() error {
+ if c.DatabaseDir == "" {
+ return fmt.Errorf("DatabasePath is required")
+ }
+ if c.LogsDir == "" {
+ return fmt.Errorf("LogsDir is required")
+ }
+ if c.NodeID == 0 {
+ return fmt.Errorf("NodeID is required")
+ }
+ if c.LongName == "" {
+ c.LongName = c.NodeID.DefaultLongName()
+ }
+ if c.ShortName == "" {
+ c.ShortName = c.NodeID.DefaultShortName()
+ }
+ if c.Channels == nil {
+ //lint:ignore ST1005 we're referencing an actual field here.
+ return fmt.Errorf("Channels is required")
+ }
+ if len(c.Channels.Settings) == 0 {
+ return fmt.Errorf("Channels.Settings should be non-empty")
+ }
+ return nil
+}
+
+// Node emulates a meshtastic Node, communicating with a meshtastic network via MQTT.
+type Node struct {
+ cfg NodeConfig
+ db *Database
+ mesh MeshIf
+ logger *log.Logger
+ dl *DataLogger
+ start time.Time
+ nodeID uint32
+ user *pb.User
+ chLookup map[byte][]*pb.ChannelSettings
+ pktLru *freelru.LRU[PacketTruncated, struct{}]
+ packetID uint32
+ mu sync.Mutex
+ adminConn map[*transport.StreamConn]struct{}
+}
+
+// NewNode creates a new emulated radio.
+func NewNode(cfg NodeConfig) (*Node, error) {
+ if err := cfg.validate(); err != nil {
+ return nil, fmt.Errorf("validating config: %w", err)
+ }
+ db, err := NewDB(cfg.DatabaseDir)
+ if err != nil {
+ return nil, err
+ }
+ logger := log.WithPrefix("Node")
+ chLookup := make(map[byte][]*pb.ChannelSettings)
+ for i, c := range cfg.Channels.Settings {
+ c.ChannelNum = uint32(i)
+ ch, err := radio.ChannelHash(c.Name, c.Psk)
+ if err != nil {
+ return nil, err
+ }
+ chLookup[byte(ch)] = append(chLookup[byte(ch)], c)
+ logger.Infof("Channel[%d]: Name='%s', Hash=0x%02x", i, c.Name, ch)
+ }
+ dl, err := NewDataLogger(cfg.LogsDir)
+ if err != nil {
+ return nil, err
+ }
+ pktLru, err := freelru.New[PacketTruncated, struct{}](32, hashPacket)
+ if err != nil {
+ panic(err)
+ }
+ var nonce [4]byte
+ if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
+ return nil, err
+ }
+ adminConn := make(map[*transport.StreamConn]struct{})
+ return &Node{
+ cfg: cfg,
+ logger: logger,
+ db: db,
+ dl: dl,
+ start: time.Now(),
+ packetID: binary.LittleEndian.Uint32(nonce[:]),
+ chLookup: chLookup,
+ pktLru: pktLru,
+ adminConn: adminConn,
+ }, nil
+}
+
+func (n *Node) Close() error {
+ return n.db.Close()
+}
+
+// Run starts the radio. It blocks until the context is cancelled.
+func (n *Node) Run(ctx context.Context, meshConn MeshIf) error {
+ n.logger.Infof("** Connecting to Mesh...")
+ if err := meshConn.Open(); err != nil {
+ return fmt.Errorf("connecting to modem: %w", err)
+ }
+ n.mesh = meshConn
+ go func() {
+ <-ctx.Done()
+ meshConn.Close()
+ }()
+ n.logger.Infof("** Connected to Mesh...")
+ n.nodeID = n.cfg.NodeID.Uint32()
+ n.NodeInfo(n.nodeID, func(ni *pb.NodeInfo) bool {
+ var updated bool
+ n.user = &pb.User{
+ Id: n.cfg.NodeID.String(),
+ LongName: n.cfg.LongName,
+ ShortName: n.cfg.ShortName,
+ HwModel: pb.HardwareModel_PRIVATE_HW,
+ Role: pb.Config_DeviceConfig_CLIENT_MUTE,
+ IsLicensed: false,
+ IsUnmessagable: Ptr(false),
+ PublicKey: n.cfg.X25519PublicKey,
+ }
+ if !proto.Equal(ni.User, n.user) {
+ ni.User = n.user
+ updated = true
+ }
+ if n.cfg.DeviceMetricsBroadcastInterval > 0 {
+ deviceMetrics, err := n.getDeviceMetrics()
+ if err == nil {
+ ni.DeviceMetrics = deviceMetrics
+ updated = true
+ }
+ }
+ return updated
+ })
+
+ eg, egCtx := errgroup.WithContext(ctx)
+ eg.Go(func() error {
+ return n.RxLoop(egCtx)
+ })
+
+ if n.cfg.BroadcastNodeInfoInterval > 0 {
+ eg.Go(FuncTicker(n.cfg.BroadcastNodeInfoInterval, egCtx, n.broadcastNodeInfo))
+ }
+ if n.cfg.BroadcastPositionInterval > 0 {
+ eg.Go(FuncTicker(n.cfg.BroadcastPositionInterval, egCtx, n.broadcastPosition))
+ }
+ if n.cfg.DeviceMetricsBroadcastInterval > 0 {
+ eg.Go(FuncTicker(n.cfg.DeviceMetricsBroadcastInterval, egCtx, n.broadcastDeviceMetrics))
+ }
+ if n.cfg.TCPListenAddr != "" {
+ eg.Go(func() error {
+ return n.listenTCP(egCtx)
+ })
+ }
+ return eg.Wait()
+}
+
+func (n *Node) NodeInfo(nodeID uint32, updateFunc func(*pb.NodeInfo) bool) *pb.NodeInfo {
+ if nodeID == 0 {
+ return nil
+ }
+ ni, err := n.db.GetNodeInfo(nodeID)
+ if err == errNotExists {
+ ni = &pb.NodeInfo{User: &pb.User{}, Position: &pb.Position{}, DeviceMetrics: &pb.DeviceMetrics{}}
+ } else if err != nil {
+ n.logger.Error("!! NodeInfo error", "error", err)
+ return nil
+ }
+ ni.Num = nodeID
+ var needsUpdate bool
+ if updateFunc != nil {
+ needsUpdate = updateFunc(ni)
+ } else if err == nil {
+ return ni // no (updatefunc or new node)
+ }
+ if !needsUpdate {
+ return ni
+ }
+ ni.LastHeard = uint32(time.Now().Unix())
+ n.logger.Debug("** SetNodeInfo", "nodeInfo", ni)
+ if err = n.db.SetNodeInfo(nodeID, ni); err != nil {
+ n.logger.Error(err)
+ }
+ return ni
+}
+
+func (n *Node) tryDecryptPKC(p *pb.MeshPacket) ([]byte, []byte, error) {
+ ni, err := n.db.GetNodeInfo(p.From)
+ if err != nil {
+ return nil, nil, err
+ }
+ if ni.User == nil || ni.User.PublicKey == nil {
+ return nil, nil, errors.New("invalid user publickey")
+ }
+ pk := ni.User.PublicKey
+ sh, err := curve25519.X25519(n.cfg.X25519SecretKey, pk)
+ if err != nil {
+ return nil, nil, err
+ }
+ h := sha256.New()
+ h.Write(sh)
+ shk := h.Sum(nil)
+ c, err := aes.NewCipher(shk)
+ if err != nil {
+ return nil, nil, err
+ }
+ aead, err := ccm.NewCCM(c, 8, 13)
+ if err != nil {
+ return nil, nil, err
+ }
+ payload := p.GetEncrypted()
+ if len(payload) <= 4+8 {
+ return nil, nil, errors.New("too short")
+ }
+ extra := binary.LittleEndian.Uint32(payload[len(payload)-4:])
+ nonce := buildPkcNonce(p.Id, extra, p.From)
+ data, err := aead.Open(nil, nonce, payload[:len(payload)-4], nil)
+ if err != nil {
+ return nil, nil, err
+ }
+ return data, pk, nil
+}
+
+func (n *Node) tryDecryptPSK(p *pb.MeshPacket) ([]byte, *pb.ChannelSettings) {
+ channelCandidates, ok := n.chLookup[byte(p.Channel)]
+ if !ok {
+ return nil, nil // cannot find channel
+ }
+ var err error
+ var plaintext []byte
+ var channel *pb.ChannelSettings
+ for _, channel = range channelCandidates {
+ plaintext, err = radio.XOR(
+ p.GetEncrypted(),
+ channel.Psk,
+ p.Id,
+ p.From,
+ )
+ if err == nil {
+ break
+ }
+ }
+ if err != nil {
+ return nil, nil
+ }
+ return plaintext, channel
+}
+
+func (n *Node) nextPacketID() uint32 {
+ n.mu.Lock()
+ n.packetID += 1
+ n.mu.Unlock()
+ return n.packetID
+}
+
+func (n *Node) txPortnumMessage(channelID uint32, to uint32, portNum pb.PortNum, message proto.Message) error {
+ payload, err := proto.Marshal(message)
+ if err != nil {
+ return err
+ }
+ return n.txPacket(&pb.MeshPacket{
+ From: n.nodeID,
+ To: to,
+ Channel: channelID,
+ PayloadVariant: &pb.MeshPacket_Decoded{Decoded: &pb.Data{
+ Portnum: portNum,
+ Payload: payload,
+ }},
+ })
+}
+
+func (n *Node) txPacket(p *pb.MeshPacket) error {
+ var plaintext []byte
+ var decoded *pb.Data
+ switch payload := p.PayloadVariant.(type) {
+ case *pb.MeshPacket_Decoded:
+ var err error
+ plaintext, err = proto.Marshal(payload.Decoded)
+ if err != nil {
+ return fmt.Errorf("marshalling user: %w", err)
+ }
+ decoded = payload.Decoded
+ default:
+ panic("unexpected MeshPacket payload variant")
+ }
+
+ if p.To == 0 {
+ p.To = broadcastID
+ }
+ if p.Id == 0 {
+ p.Id = n.nextPacketID()
+ }
+ if p.RelayNode == 0 {
+ p.RelayNode = p.From
+ }
+ if p.RelayNode == 0 {
+ p.RelayNode = 0xFF
+ }
+ if p.HopStart == 0 {
+ p.HopStart = n.cfg.DefaultHops
+ p.HopLimit = n.cfg.DefaultHops
+ }
+ if p.HopLimit > n.cfg.MaxHops {
+ p.HopLimit = n.cfg.MaxHops
+ }
+ dstNi, _ := n.db.GetNodeInfo(p.To)
+ if dstNi == nil {
+ dstNi = &pb.NodeInfo{Num: p.To}
+ }
+
+ // PKC
+ if p.PkiEncrypted && p.PublicKey != nil {
+ // explicit PKC with a key, do nothing
+ } else if p.From == n.nodeID && decoded.Portnum != pb.PortNum_TRACEROUTE_APP && decoded.Portnum != pb.PortNum_NODEINFO_APP && decoded.Portnum != pb.PortNum_ROUTING_APP && decoded.Portnum != pb.PortNum_POSITION_APP {
+ // implicit PKC, or explicit PKC without a key
+ if dstNi.User != nil && dstNi.User.PublicKey != nil {
+ p.PublicKey = dstNi.User.PublicKey
+ }
+ }
+ // Ensure if PKC is selected, there is a recepient key
+ if p.PkiEncrypted && p.PublicKey == nil {
+ return errors.New("missing recepient public key")
+ }
+
+ // Encrypt payload
+ buf := bytes.NewBuffer(nil)
+ var encrChannel string
+ var channel *pb.ChannelSettings
+ if p.PkiEncrypted {
+ channel = pkcChan
+ encrChannel = pkcChan.Name
+ p.Channel = 0
+ sh, err := curve25519.X25519(n.cfg.X25519SecretKey, p.PublicKey)
+ if err != nil {
+ return err
+ }
+ h := sha256.New()
+ h.Write(sh)
+ shk := h.Sum(nil)
+ c, err := aes.NewCipher(shk)
+ if err != nil {
+ return err
+ }
+ aead, err := ccm.NewCCM(c, 8, 13)
+ if err != nil {
+ return err
+ }
+ var nonceExtra [4]byte
+ if _, err := rand.Read(nonceExtra[:]); err != nil {
+ return err
+ }
+ nonce := buildPkcNonce(p.Id, binary.LittleEndian.Uint32(nonceExtra[:]), p.From)
+ ciphertextAndTag := aead.Seal(nil, nonce, plaintext, nil)
+ buf.Write(ciphertextAndTag)
+ buf.Write(nonceExtra[:])
+ } else {
+ chIdx := p.Channel
+ if chIdx >= uint32(len(n.cfg.Channels.Settings)) {
+ return errors.New("invalid channel index")
+ }
+ channel = n.cfg.Channels.Settings[chIdx]
+ encrChannel = fmt.Sprintf("#%s", channel.Name)
+ ciphertext, err := radio.XOR(
+ plaintext,
+ channel.Psk,
+ p.Id,
+ p.From,
+ )
+ if err != nil {
+ return fmt.Errorf("failed to PSK encrypt: %w", err)
+ }
+ buf.Write(ciphertext)
+ }
+ b := buf.Bytes()
+ buf.Reset()
+ out := proto.Clone(p).(*pb.MeshPacket)
+ out.PayloadVariant = &pb.MeshPacket_Encrypted{Encrypted: b}
+ if err := n.mesh.WriteMeshPacket(out); err != nil {
+ n.logger.Debug("X( MeshPacket", "channel", encrChannel, "packet", p.String(), "out", out.String())
+ return err
+ }
+ n.dl.LogPacket(p, channel, &pb.NodeInfo{Num: n.nodeID, User: n.user}, dstNi, decoded, decoded.Payload)
+ pt := PacketTruncated{
+ To: p.To,
+ From: p.From,
+ ID: p.Id,
+ }
+ n.pktLru.Add(pt, struct{}{})
+ n.logger.Debug("(( MeshPacket", "channel", encrChannel, "packet", p.String(), "out", out.String())
+ return nil
+}
+
+func (n *Node) broadcastNodeInfo(ctx context.Context) error {
+ n.logger.Info("(( NodeInfo")
+ return n.txPortnumMessage(0, broadcastID, pb.PortNum_NODEINFO_APP, n.user)
+}
+
+func (n *Node) getPosition() *pb.Position {
+ return &pb.Position{
+ LatitudeI: &n.cfg.PositionLatitudeI,
+ LongitudeI: &n.cfg.PositionLongitudeI,
+ Altitude: &n.cfg.PositionAltitude,
+ Time: uint32(time.Now().Unix()),
+ LocationSource: pb.Position_LOC_MANUAL,
+ }
+}
+
+func (n *Node) broadcastPosition(ctx context.Context) error {
+ n.logger.Info("(( Position")
+ position := n.getPosition()
+ n.NodeInfo(n.nodeID, func(nodeInfo *pb.NodeInfo) bool {
+ nodeInfo.Position = position
+ return true
+ })
+ return n.txPortnumMessage(0, broadcastID, pb.PortNum_POSITION_APP, position)
+}
+
+func (n *Node) getDeviceMetrics() (*pb.DeviceMetrics, error) {
+ cmdStdout := bytes.NewBuffer(nil)
+ cmd := exec.Command("upsc", n.cfg.DeviceMetricsUPSAddress)
+ cmd.Stdout = cmdStdout
+ err := cmd.Run()
+ if err != nil {
+ n.logger.Error("failed to get UPS data", "err", err)
+ return nil, err
+ }
+ stdoutBytes := cmdStdout.Bytes()
+ stdout := bufio.NewReader(bytes.NewReader(stdoutBytes))
+ var batteryLevel uint32
+ var voltage float32
+ var channelUtilization float32
+ var airUtilTx float32
+ for {
+ line, _, err := stdout.ReadLine()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ lineValue := bytes.SplitN(line, []byte(": "), 2)
+ if len(lineValue) != 2 {
+ continue
+ }
+ if bytes.Equal(lineValue[0], []byte("battery.voltage")) {
+ v, err := strconv.ParseFloat(string(lineValue[1]), 64)
+ if err != nil {
+ continue
+ }
+ voltage = float32(v)
+ }
+ if bytes.Equal(lineValue[0], []byte("battery.charge")) {
+ charge, err := strconv.ParseUint(string(lineValue[1]), 10, 64)
+ if err != nil {
+ continue
+ }
+ batteryLevel = uint32(charge)
+ }
+ }
+ if batteryLevel == 100 && voltage > 0 {
+ batteryLevel = 101
+ }
+ uptime := uint32(time.Since(n.start).Seconds())
+ return &pb.DeviceMetrics{
+ BatteryLevel: &batteryLevel,
+ Voltage: &voltage,
+ ChannelUtilization: &channelUtilization,
+ AirUtilTx: &airUtilTx,
+ UptimeSeconds: &uptime,
+ }, nil
+}
+func (n *Node) broadcastDeviceMetrics(ctx context.Context) error {
+ n.logger.Info("(( DeviceMetrics")
+ deviceMetrics, err := n.getDeviceMetrics()
+ if err != nil {
+ return err
+ }
+ n.NodeInfo(n.nodeID, func(nodeInfo *pb.NodeInfo) bool {
+ nodeInfo.DeviceMetrics = deviceMetrics
+ return true
+ })
+ return n.txPortnumMessage(0, broadcastID, pb.PortNum_TELEMETRY_APP, &pb.Telemetry{
+ Variant: &pb.Telemetry_DeviceMetrics{
+ DeviceMetrics: deviceMetrics,
+ },
+ })
+}
+
+func (n *Node) getDeviceMetadata() *pb.DeviceMetadata {
+ return &pb.DeviceMetadata{
+ FirmwareVersion: "2.6.0-golang",
+ DeviceStateVersion: 24,
+ HwModel: pb.HardwareModel_PRIVATE_HW,
+ Role: pb.Config_DeviceConfig_CLIENT_MUTE,
+ HasPKC: true,
+ HasRemoteHardware: false,
+ HasWifi: false,
+ HasBluetooth: false,
+ HasEthernet: false,
+ CanShutdown: false,
+ }
+}
+
+// dispatchMessageToAdmin sends a FromRadio message to all current subscribers to
+// the FromRadio.
+func (n *Node) dispatchMessageToAdmin(msg *pb.FromRadio) error {
+ if len(n.adminConn) == 0 {
+ return errNoAdminConnection
+ }
+ for conn := range n.adminConn {
+ if err := conn.Write(msg); err != nil {
+ n.logger.Errorf("failed to send to admin: %w", err)
+ }
+ }
+ return nil
+}
diff --git a/nodeadmin.go b/nodeadmin.go
new file mode 100644
index 0000000..e6b88a5
--- /dev/null
+++ b/nodeadmin.go
@@ -0,0 +1,392 @@
+package main
+
+import (
+ "context"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "strings"
+ "time"
+
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "github.com/meshnet-gophers/meshtastic-go/transport"
+ "google.golang.org/protobuf/proto"
+)
+
+func phoneLastUpdateKey(identifier string) []byte {
+ return []byte("phone-lastupdate-" + identifier)
+}
+
+func (n *Node) setPhoneLastUpdate(identifier string, t time.Time) error {
+ var lastUpdate [8]byte
+ binary.BigEndian.PutUint64(lastUpdate[:], uint64(t.UnixMicro()))
+ return n.db.Set(phoneLastUpdateKey(identifier), lastUpdate[:])
+}
+
+func (n *Node) handleToRadioWantConfigID(identifier string, conn *transport.StreamConn, req *pb.ToRadio_WantConfigId) error {
+ // Send MyInfo
+ err := conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_MyInfo{
+ MyInfo: &pb.MyNodeInfo{
+ MyNodeNum: n.nodeID,
+ RebootCount: 0, // use db nonce key?
+ // TODO: Track this as a const
+ MinAppVersion: MinAppVersion,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Metadata
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Metadata{
+ Metadata: n.getDeviceMetadata(),
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send our node
+ nodeInfo := n.NodeInfo(n.nodeID, nil)
+ if nodeInfo.User == nil {
+ nodeInfo.User = n.user
+ }
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_NodeInfo{
+ NodeInfo: nodeInfo,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Other Nodes
+ nodes, err := n.db.ListNodeInfo()
+ if err != nil {
+ return fmt.Errorf("listing nodes: %w", err)
+ }
+ for _, ni := range nodes {
+ if ni.Num == n.nodeID {
+ continue
+ }
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_NodeInfo{
+ NodeInfo: ni,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+
+ // Send Channels
+ for i, channel := range n.cfg.Channels.Settings {
+ role := pb.Channel_SECONDARY
+ if i == 0 {
+ role = pb.Channel_PRIMARY
+ }
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Channel{
+ Channel: &pb.Channel{
+ Index: int32(i),
+ Settings: &pb.ChannelSettings{
+ Name: channel.Name,
+ Psk: channel.Psk,
+ },
+ Role: role,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+
+ // Send Config: Device
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Config{
+ Config: &pb.Config{
+ PayloadVariant: &pb.Config_Device{
+ Device: &pb.Config_DeviceConfig{
+ NodeInfoBroadcastSecs: uint32(n.cfg.BroadcastNodeInfoInterval.Seconds()),
+ },
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send Config: LoRa
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_Config{
+ Config: &pb.Config{
+ PayloadVariant: &pb.Config_Lora{
+ Lora: &pb.Config_LoRaConfig{
+ UsePreset: true,
+ ModemPreset: pb.Config_LoRaConfig_SHORT_FAST,
+ Region: pb.Config_LoRaConfig_EU_868,
+ ConfigOkToMqtt: true,
+ },
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ // Send ConfigComplete to indicate we're done
+ err = conn.Write(&pb.FromRadio{
+ PayloadVariant: &pb.FromRadio_ConfigCompleteId{
+ ConfigCompleteId: req.WantConfigId,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+
+ start := uint64(0)
+ lastPhoneUpdate, _ := n.db.Get(phoneLastUpdateKey(identifier))
+ if len(lastPhoneUpdate) == 8 {
+ start = binary.BigEndian.Uint64(lastPhoneUpdate)
+ } else {
+ start = uint64(time.Now().UnixMicro())
+ }
+ startTime := time.UnixMicro(int64(start))
+ now := time.Now()
+ n.logger.Info("** Replaying FromRadio packets to admin...", "from", startTime, "to", now)
+ c, errc := n.db.ReadFromRadio(startTime, now)
+ for msg := range c {
+ pkt := msg.GetPacket()
+ // process only messages that are broadcast or to us.
+ if pkt == nil || (pkt.To != broadcastID && pkt.To != n.nodeID) {
+ continue
+ }
+ if err := conn.Write(msg); err != nil {
+ return fmt.Errorf("failed to send to admin: %w", err)
+ }
+
+ }
+ err = <-errc
+ n.logger.Info("** Finished WantConfigID", "err", err)
+
+ n.setPhoneLastUpdate(identifier, now)
+ return err
+}
+
+func (n *Node) handleConn(ctx context.Context, underlying io.ReadWriteCloser, identifier string) error {
+ streamConn := transport.NewRadioStreamConn(underlying)
+ n.adminConn[streamConn] = struct{}{}
+ defer func() {
+ delete(n.adminConn, streamConn)
+ if err := streamConn.Close(); err != nil {
+ n.logger.Error("failed to close streamConn", "err", err)
+ }
+ }()
+ go func() {
+ <-ctx.Done()
+ streamConn.Close()
+ }()
+
+ var configured bool
+ var lastPktTime time.Time
+ defer func() {
+ if lastPktTime.IsZero() {
+ return
+ }
+ n.setPhoneLastUpdate(identifier, lastPktTime)
+ }()
+ for {
+ msg := &pb.ToRadio{}
+ if err := streamConn.Read(msg); err != nil {
+ return fmt.Errorf("reading from streamConn: %w", err)
+ }
+ if configured {
+ lastPktTime = time.Now()
+ }
+ n.logger.Info(">> ToRadio", "msg", msg)
+ switch payload := msg.PayloadVariant.(type) {
+ case *pb.ToRadio_Disconnect:
+ // The meshtastic python client sends a Disconnect command and with the TCP implementation, it expects
+ // the radio to close the connection. So we end the read loop here, and return to close the connection.
+ streamConn.Close()
+ return nil
+ case *pb.ToRadio_WantConfigId:
+ if err := n.handleToRadioWantConfigID(identifier, streamConn, payload); err != nil {
+ return fmt.Errorf("handling WantConfigId: %w", err)
+ }
+ configured = true
+ case *pb.ToRadio_Packet:
+ var reply *pb.FromRadio_Packet
+ var sendAck bool = payload.Packet.WantAck
+ var errorCode pb.Routing_Error = pb.Routing_NONE
+ if decoded := payload.Packet.GetDecoded(); decoded != nil {
+ n.logger.Info(">> ToRadio_Packet", "payload", payload)
+ if payload.Packet.To == n.nodeID {
+ switch decoded.Portnum {
+ case pb.PortNum_ADMIN_APP:
+ admin := &pb.AdminMessage{}
+ if err := proto.Unmarshal(decoded.Payload, admin); err != nil {
+ return fmt.Errorf("unmarshalling admin: %w", err)
+ }
+ n.logger.Info(fmt.Sprintf(">> AdminMessage %T:", admin.PayloadVariant), "msg", admin.PayloadVariant)
+
+ switch adminPayload := admin.PayloadVariant.(type) {
+ case *pb.AdminMessage_GetChannelRequest:
+ n.logger.Info(">> AdminMessage.GetChannelRequest", "adminPayload", adminPayload, "packet", payload)
+ chIdx := adminPayload.GetChannelRequest - 1
+ var c *pb.ChannelSettings
+ role := pb.Channel_DISABLED
+ if chIdx > uint32(len(n.cfg.Channels.Settings)) {
+ c = &pb.ChannelSettings{}
+ } else {
+ c = n.cfg.Channels.Settings[chIdx]
+ role = pb.Channel_SECONDARY
+ if chIdx == 0 {
+ role = pb.Channel_PRIMARY
+ }
+ }
+ resp := &pb.AdminMessage{
+ PayloadVariant: &pb.AdminMessage_GetChannelResponse{
+ GetChannelResponse: &pb.Channel{
+ Index: int32(chIdx),
+ Settings: c,
+ Role: role,
+ },
+ },
+ }
+ respBytes, err := proto.Marshal(resp)
+ if err != nil {
+ return fmt.Errorf("marshalling GetChannelResponse: %w", err)
+ }
+ // Send GetChannelResponse
+ reply = &pb.FromRadio_Packet{
+ Packet: &pb.MeshPacket{
+ Id: n.nextPacketID(),
+ From: n.nodeID,
+ To: n.nodeID,
+ PayloadVariant: &pb.MeshPacket_Decoded{
+ Decoded: &pb.Data{
+ Portnum: pb.PortNum_ADMIN_APP,
+ Payload: respBytes,
+ RequestId: payload.Packet.Id,
+ },
+ },
+ },
+ }
+ case *pb.AdminMessage_SetFavoriteNode:
+ n.NodeInfo(adminPayload.SetFavoriteNode, func(ni *pb.NodeInfo) bool {
+ ni.IsFavorite = true
+ return true
+ })
+ case *pb.AdminMessage_RemoveFavoriteNode:
+ n.NodeInfo(adminPayload.RemoveFavoriteNode, func(ni *pb.NodeInfo) bool {
+ ni.IsFavorite = false
+ return true
+ })
+ case *pb.AdminMessage_RemoveByNodenum:
+ n.NodeInfo(adminPayload.RemoveByNodenum, func(ni *pb.NodeInfo) bool {
+ ni.Num = 0 // delete NodeInfo
+ return true
+ })
+ case *pb.AdminMessage_SetTimeOnly:
+ default:
+ errorCode = pb.Routing_BAD_REQUEST
+ n.logger.Warnf(">> !Unhandled AdminMessage of type '%T'", adminPayload)
+ }
+ default:
+ n.logger.Warnf(">> No handling of ToRadio.Packet to us w/ portNum '%s'", decoded.Portnum.String())
+ }
+ } else {
+ defaultWarning := true
+ switch decoded.Portnum {
+ case pb.PortNum_AUDIO_APP, pb.PortNum_NODEINFO_APP, pb.PortNum_TEXT_MESSAGE_APP, pb.PortNum_POSITION_APP, pb.PortNum_TRACEROUTE_APP:
+ defaultWarning = false
+ fallthrough
+ default:
+ if defaultWarning {
+ n.logger.Warnf(">> Default handling of ToRadio.Packet message w/ portNum '%s'", decoded.Portnum.String())
+ }
+ from := payload.Packet.From
+ if from == 0 {
+ from = n.nodeID
+ }
+ pkt := proto.Clone(payload.Packet).(*pb.MeshPacket)
+ pkt.From = from
+ decoded := pkt.GetDecoded()
+ if decoded.Bitfield != nil {
+ *decoded.Bitfield |= (1 << 0) // ok to mqtt
+ } else {
+ bitfield := uint32(1)
+ decoded.Bitfield = &bitfield // ok to mqtt
+ }
+ if err := n.txPacket(pkt); err != nil {
+ return fmt.Errorf("failed to tx packet: %w", err)
+ }
+ }
+ }
+ }
+ if errorCode != pb.Routing_NONE || sendAck {
+ if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: buildRoutingReply(payload.Packet, errorCode)}}); err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+ if reply != nil {
+ if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: reply.Packet}}); err != nil {
+ return fmt.Errorf("writing to streamConn: %w", err)
+ }
+ }
+ case *pb.ToRadio_Heartbeat:
+ default:
+ n.logger.Warnf(">> !Unhandled ToRadio message of type '%T'", payload)
+ }
+ }
+}
+
+func (n *Node) listenTCP(ctx context.Context) error {
+ l, err := net.Listen("tcp", n.cfg.TCPListenAddr)
+ if err != nil {
+ return fmt.Errorf("listening: %w", err)
+ }
+ defer l.Close()
+ n.logger.Info("** Listening for tcp connections", "addr", n.cfg.TCPListenAddr)
+
+ go func() {
+ <-ctx.Done()
+ l.Close()
+ }()
+ for {
+ c, err := l.Accept()
+ if err != nil {
+ n.logger.Error("!! Failed to accept connection", "err", err)
+ return err
+ }
+ go func(c net.Conn) {
+ defer c.Close()
+ connctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ tcpconn := c.(*net.TCPConn)
+ tcpconn.SetKeepAlive(true)
+ tcpconn.SetKeepAlivePeriod(25 * time.Second)
+ address := tcpconn.RemoteAddr().String()
+ ipString := strings.Split(address, ":")[0]
+ n.logger.Info("** Accepted connection", "addr", address)
+ if err := n.handleConn(connctx, c, ipString); err != nil {
+ if errors.Is(err, io.EOF) {
+ n.logger.Info("** Connection EOF", "addr", address)
+ } else {
+ n.logger.Error("!! Connection error", "err", err)
+ }
+ } else {
+ n.logger.Info("** Connection ended", "addr", address)
+ }
+ }(c)
+ }
+}
diff --git a/noderxloop.go b/noderxloop.go
new file mode 100644
index 0000000..f4ed602
--- /dev/null
+++ b/noderxloop.go
@@ -0,0 +1,280 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "encoding/hex"
+ "fmt"
+
+ "github.com/charmbracelet/log"
+ "github.com/meshnet-gophers/meshtastic-go"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "google.golang.org/protobuf/proto"
+)
+
+func (n *Node) RxLoop(ctx context.Context) error {
+ for {
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ p, ts, err := n.mesh.ReadMeshPacket()
+ if err != nil {
+ n.logger.Error("Read mesh packet failed", "err", err)
+ return err
+ }
+ if err := n.rxMessage(ts, p); err != nil {
+ n.logger.Error("Processing Radio Message", "error", err)
+ }
+ }
+}
+
+func (n *Node) rxMessage(ts uint64, p *pb.MeshPacket) error {
+ pt := PacketTruncated{To: p.To, From: p.From, ID: p.Id}
+ info := fmt.Sprintf("ts:%d rssi:%-4d snr:%-5.1f", p.RxTime, p.RxRssi, p.RxSnr)
+ header := fmt.Sprintf("from:%08x to:%08x id:%08x ch:%02x next:%02x last:%02x hs:%d hl:%d mq:%d wack:%d", p.From, p.To, p.Id, uint8(p.Channel), p.NextHop, p.RelayNode, p.HopStart, p.HopLimit, bool2Int(p.ViaMqtt), bool2Int(p.WantAck))
+ payloadstr := fmt.Sprintf("%x", p.GetEncrypted())
+ var dupe bool
+ if _, dupe = n.pktLru.Get(pt); dupe {
+ n.logger.Debug(") MeshPacket", "info", info, "header", header, "payload", payloadstr)
+ } else {
+ n.logger.Debug(")) MeshPacket", "info", info, "header", header, "payload", payloadstr)
+ n.pktLru.Add(pt, struct{}{})
+ }
+ if dupe { // XXX: it would need to be changed for the traceroute
+ return nil // skip duplicates
+ }
+ if p.From == n.nodeID {
+ return nil // ourself
+ }
+ if len(p.GetEncrypted()) == 0 {
+ n.logger.Warn(":: Skipping unencrypted or empty packet")
+ return nil // we skip unencrypted packets
+ }
+ var plaintext []byte
+ var pubKey []byte
+ var channel *pb.ChannelSettings
+ if p.To == n.nodeID && p.Channel == 0x00 {
+ var err error
+ if plaintext, pubKey, err = n.tryDecryptPKC(p); err != nil {
+ log.Printf("failed to decrypt PKC message: %s", err)
+ }
+ }
+ if plaintext != nil {
+ channel = pkcChan
+ p.Channel = 0
+ } else {
+ plaintext, channel = n.tryDecryptPSK(p)
+ if plaintext == nil {
+ return nil // failed decrypt
+ }
+ p.Channel = channel.ChannelNum
+ }
+ data := &pb.Data{}
+ if err := proto.Unmarshal(plaintext, data); err != nil {
+ n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext))
+ return fmt.Errorf("unmarshalling decrypted data: %w", err)
+ }
+
+ p.PublicKey = pubKey
+ p.PkiEncrypted = channel == pkcChan
+ p.PayloadVariant = &pb.MeshPacket_Decoded{Decoded: data}
+ msgFromRadio := &pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: p}}
+ source := data.Source
+ if source == 0 {
+ source = p.From
+ }
+ dest := data.Dest
+ if dest == 0 {
+ dest = p.To
+ }
+ toUs := dest == n.nodeID
+ if dest == broadcastID || toUs {
+ n.dispatchMessageToAdmin(msgFromRadio)
+ }
+ n.db.StoreFromRadio(int64(ts), msgFromRadio)
+ n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext), "data", data)
+
+ src := n.NodeInfo(source, func(ni *pb.NodeInfo) bool {
+ if ni.Num == n.nodeID {
+ return false // it's us
+ }
+ if p.HopLimit > p.HopStart {
+ return false // weird hoplimit value
+ }
+ hopsAway := uint32(p.HopStart - p.HopLimit)
+ if ni.HopsAway != nil && *ni.HopsAway <= hopsAway {
+ return false // skip update if these hops away are longer
+ }
+ ni.HopsAway = &hopsAway
+ return true
+ })
+ dst := n.NodeInfo(dest, nil)
+
+ var payload any
+ var reply proto.Message
+ replyPortNum := data.Portnum
+ switch data.Portnum {
+ case pb.PortNum_ADMIN_APP:
+ admin := &pb.AdminMessage{}
+ if err := proto.Unmarshal(data.Payload, admin); err != nil {
+ return fmt.Errorf("unmarshalling admin message: %w", err)
+ }
+ payload = admin
+ switch adminPayload := admin.PayloadVariant.(type) {
+ case *pb.AdminMessage_GetDeviceMetadataRequest:
+ if data.WantResponse && toUs {
+ reply = &pb.AdminMessage{
+ PayloadVariant: &pb.AdminMessage_GetDeviceMetadataResponse{
+ GetDeviceMetadataResponse: n.getDeviceMetadata(),
+ },
+ }
+ }
+ default:
+ n.logger.Warnf("unhandled admin message type: %T", adminPayload)
+ }
+ case pb.PortNum_NODEINFO_APP:
+ user := &pb.User{}
+ if err := proto.Unmarshal(data.Payload, user); err != nil {
+ return fmt.Errorf("unmarshalling user: %w", err)
+ }
+ payload = user
+ n.logger.Info(")) NodeInfo", "user", user)
+ n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool {
+ if nodeInfo.User != nil {
+ if len(nodeInfo.User.PublicKey) > 0 && !bytes.Equal(nodeInfo.User.PublicKey, user.PublicKey) {
+ n.logger.Error("NodeInfo w/ different key", "nodeInfo", nodeInfo, "new", user)
+ return false // do not update
+ }
+ }
+ nodeInfo.User = user
+ return true
+ })
+ if data.WantResponse && toUs {
+ reply = n.user
+ }
+ case pb.PortNum_TEXT_MESSAGE_APP:
+ n.logger.Info(")) TextMessage", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "text", string(data.Payload))
+ case pb.PortNum_ROUTING_APP:
+ routing := &pb.Routing{}
+ if err := proto.Unmarshal(data.Payload, routing); err != nil {
+ return fmt.Errorf("unmarshalling routingPayload: %w", err)
+ }
+ payload = routing
+ n.logger.Info(")) Routing", "routing", routing)
+ case pb.PortNum_TRACEROUTE_APP:
+ routeDiscovery := &pb.RouteDiscovery{}
+ if err := proto.Unmarshal(data.Payload, routeDiscovery); err != nil {
+ return fmt.Errorf("unmarshalling traceroute payload: %w", err)
+ }
+ payload = routeDiscovery
+ n.logger.Info(")) RouteDiscovery", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "payload", routeDiscovery)
+ if !toUs {
+ break
+ }
+ towardsDestination := data.ReplyId == 0
+ var route *[]uint32
+ var snr *[]int32
+ if towardsDestination {
+ route = &routeDiscovery.Route
+ snr = &routeDiscovery.SnrTowards
+ } else {
+ route = &routeDiscovery.RouteBack
+ snr = &routeDiscovery.SnrBack
+ }
+ // Only insert unknown hops if hop_start is valid
+ if p.HopStart != 0 && p.HopLimit <= p.HopStart {
+ var hopsTaken uint8 = uint8(p.HopStart - p.HopLimit)
+ diff := int(hopsTaken) - len(*route)
+ for i := 0; i < diff; i++ {
+ if len(*route) < ROUTE_MAX_SIZE {
+ *route = append(*route, broadcastID)
+ }
+ }
+ // Add unknown SNR values if necessary
+ diff = len(*route) - len(*snr)
+ for i := 0; i < diff; i++ {
+ if len(*snr) < ROUTE_MAX_SIZE {
+ *snr = append(*snr, 0xFF)
+ }
+ }
+ }
+
+ if len(*snr) < ROUTE_MAX_SIZE {
+ *snr = append(*snr, int32(p.RxSnr*4))
+ }
+ // We don't need to add route, if we are the dest
+ // // Length of route array can normally not be exceeded due to the max. hop_limit of 7
+ // if *route_count < ROUTE_SIZE {
+ // route[*route_count] = myNodeInfo.my_node_num
+ // *route_count += 1
+ // } else {
+ // LOG_WARN("Route exceeded maximum hop limit!") // Are you bridging networks?
+ // }
+ reply = routeDiscovery
+ case pb.PortNum_POSITION_APP:
+ position := &pb.Position{}
+ if err := proto.Unmarshal(data.Payload, position); err != nil {
+ return fmt.Errorf("unmarshalling positionPayload: %w", err)
+ }
+ payload = position
+ n.logger.Info(")) Position", "position", position)
+ if data.WantResponse && toUs {
+ reply = n.getPosition()
+ } else {
+ n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool {
+ nodeInfo.Position = position
+ return true
+ })
+ }
+ case pb.PortNum_TELEMETRY_APP:
+ telemetry := &pb.Telemetry{}
+ if err := proto.Unmarshal(data.Payload, telemetry); err != nil {
+ return fmt.Errorf("unmarshalling telemetryPayload: %w", err)
+ }
+ payload = telemetry
+ n.logger.Info(")) Telemetry deviceMetrics", "telemetry", telemetry)
+ if data.WantResponse && toUs {
+ metrics, err := n.getDeviceMetrics()
+ if err != nil {
+ break
+ }
+ reply = metrics
+ } else {
+ deviceMetrics := telemetry.GetDeviceMetrics()
+ if deviceMetrics == nil {
+ break
+ }
+ n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool {
+ nodeInfo.DeviceMetrics = deviceMetrics
+ return true
+ })
+ }
+ default:
+ n.logger.Debug(")) !Unhandled app payload", "data", data)
+ }
+ n.dl.LogPacket(p, channel, src, dst, data, payload)
+ if toUs && p.WantAck {
+ err := n.txPacket(buildRoutingReply(&pb.MeshPacket{
+ From: p.From,
+ To: p.To,
+ HopStart: uint32(p.HopStart),
+ Channel: channel.ChannelNum,
+ Id: p.Id,
+ PkiEncrypted: msgFromRadio.GetPacket().PkiEncrypted,
+ PublicKey: msgFromRadio.GetPacket().PublicKey,
+ }, pb.Routing_NONE))
+ if err != nil {
+ n.logger.Warn("failed to send text ack", "err", err)
+ }
+ }
+ if reply != nil {
+ err := n.txPortnumMessage(channel.ChannelNum, source, replyPortNum, reply)
+ if err != nil {
+ n.logger.Error("Failed to reply", "err", err, "reply", reply)
+ } else {
+ n.dl.LogPacket(p, channel, dst, src, &pb.Data{Portnum: replyPortNum}, payload)
+ }
+ }
+
+ return nil
+}
diff --git a/timers.go b/timers.go
new file mode 100644
index 0000000..095e42c
--- /dev/null
+++ b/timers.go
@@ -0,0 +1,23 @@
+package main
+
+import (
+ "context"
+ "time"
+)
+
+func FuncTicker(interval time.Duration, ctx context.Context, fn func(context.Context) error) func() error {
+ return func() error {
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-ticker.C:
+ }
+ if err := fn(ctx); err != nil {
+ return err
+ }
+ }
+ }
+}