diff options
| author | Marin Ivanov <[email protected]> | 2025-09-03 20:59:24 +0300 |
|---|---|---|
| committer | Marin Ivanov <[email protected]> | 2025-09-03 20:59:24 +0300 |
| commit | 220671ed46e10e4c67741286da493df1d45cdcfc (patch) | |
| tree | 0711f179f4619959fd937c2929c6247ed8ab124a | |
Initial release
| -rw-r--r-- | .gitignore | 3 | ||||
| -rw-r--r-- | config.go | 53 | ||||
| -rw-r--r-- | config.json.example | 43 | ||||
| -rw-r--r-- | data/.gitkeep | 0 | ||||
| -rw-r--r-- | database.go | 187 | ||||
| -rw-r--r-- | datalogger.go | 96 | ||||
| -rw-r--r-- | go.mod | 63 | ||||
| -rw-r--r-- | go.sum | 160 | ||||
| -rw-r--r-- | helpers.go | 5 | ||||
| -rw-r--r-- | kava.go | 168 | ||||
| -rw-r--r-- | logs/.gitkeep | 0 | ||||
| -rw-r--r-- | lru.go | 24 | ||||
| -rw-r--r-- | main.go | 133 | ||||
| -rw-r--r-- | mestastic.go | 90 | ||||
| -rw-r--r-- | mqtt.go | 205 | ||||
| -rw-r--r-- | node.go | 598 | ||||
| -rw-r--r-- | nodeadmin.go | 392 | ||||
| -rw-r--r-- | noderxloop.go | 280 | ||||
| -rw-r--r-- | timers.go | 23 |
19 files changed, 2523 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2fdfb0b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/data +/config.json +/logs
\ No newline at end of file diff --git a/config.go b/config.go new file mode 100644 index 0000000..4698998 --- /dev/null +++ b/config.go @@ -0,0 +1,53 @@ +package main + +import ( + "encoding/json" + "os" +) + +type Config struct { + NodeID []byte `json:"nodeID"` + Name string `json:"name"` + ShortName string `json:"shortName"` + DefaultHops uint32 `json:"defaultHops"` + MaxHops uint32 `json:"maxHops"` + NodeBroadcastInterval float64 `json:"nodeBroadcastInterval"` + X25519Key []byte `json:"x25519Key"` + Channels []struct { + Name string `json:"name"` + PSK []byte `json:"psk"` + } `json:"channels"` + Position struct { + BroadcastInterval float64 `json:"broadcastInterval"` + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Altitude float64 `json:"altitude"` + } `json:"position"` + DeviceMetrics struct { + BroadcastInterval float64 `json:"broadcastInterval"` + UPSAddress string `json:"upsAddress"` + } `json:"deviceMetrics"` + KavaModem struct { + Address string `json:"address"` + } `json:"kavaModem"` + MqttIf struct { + Address string `json:"address"` + Username string `json:"username"` + Password string `json:"password"` + ClientID string `json:"clientId"` + Root string `json:"root"` + } `json:"mqttIf"` +} + +func ReadConfig(filename string) (*Config, error) { + cfg := Config{} + f, err := os.Open(filename) + if err != nil { + return nil, err + } + dec := json.NewDecoder(f) + if err := dec.Decode(&cfg); err != nil { + return nil, err + } + return &cfg, nil +} diff --git a/config.json.example b/config.json.example new file mode 100644 index 0000000..14ab023 --- /dev/null +++ b/config.json.example @@ -0,0 +1,43 @@ +{ + "nodeID": "7rpREQ==", + "name": "oti sho", + "shortName": "oti", + "defaultHops": 1, + "maxHops": 1, + "nodeBroadcastInterval": 1200, + "x25519Key": "", + "position": { + "broadcastInterval": 86400, + "latitude": 51.5014760, + "longitude": -0.1406340, + "altitude": 2.0 + }, + "deviceMetrics": { + "broadcastInterval": 300, + "upsAddress": "upsname@localhost" + }, + "channels": [ + { + "name": "ShortFast", + "psk": "1PG7OiApB1nwvP+rz05pAQ==" + }, + { + "name": "Blagoevgrad", + "psk": "1PG7OiApB1nwvP+rz05pAQ==" + }, + { + "name": "Bulgaria", + "psk": "1PG7OiApB1nwvP+rz05pAQ==" + } + ], + "kavaModem": { + "address": "127.0.0.1:3333" + }, + "mqttIf": { + "address": "mqtt://127.0.0.1:1883", + "username": "meshdev", + "password": "large4cats", + "clientId": "motisho", + "root": "msh/Bulgaria" + } +} diff --git a/data/.gitkeep b/data/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/data/.gitkeep diff --git a/database.go b/database.go new file mode 100644 index 0000000..dc410d8 --- /dev/null +++ b/database.go @@ -0,0 +1,187 @@ +package main + +import ( + "encoding/binary" + "errors" + "fmt" + "time" + + "github.com/cockroachdb/pebble/v2" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "google.golang.org/protobuf/proto" +) + +type BucketEnum uint16 + +const ( + BucketState BucketEnum = iota + BucketNodeInfo + BucketFromRadio +) + +type StateKeyEnum uint32 + +const ( + StateKeyNull StateKeyEnum = iota + StateKeyPhoneLastUpdate +) + +var ( + errNotExists = errors.New("not exist") +) + +type Database struct { + p *pebble.DB +} + +func NewDB(dbPath string) (*Database, error) { + p, err := pebble.Open(dbPath, &pebble.Options{}) + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + return &Database{ + p: p, + }, nil +} + +func (db *Database) Close() error { + return db.p.Close() +} + +func (db *Database) keyUint32(bucket BucketEnum, k uint32) []byte { + var key [6]byte + binary.BigEndian.PutUint16(key[0:2], uint16(bucket)) + binary.BigEndian.PutUint32(key[2:6], k) + return key[:] +} + +func (db *Database) keyUint64(bucket BucketEnum, k uint64) []byte { + var key [10]byte + binary.BigEndian.PutUint16(key[0:2], uint16(bucket)) + binary.BigEndian.PutUint64(key[2:10], k) + return key[:] +} + +func (db *Database) Get(key []byte) ([]byte, error) { + data, rd, err := db.p.Get(key) + if err == pebble.ErrNotFound { + return nil, errNotExists + } else if err != nil { + return nil, err + } + defer rd.Close() + return data, nil +} + +func (db *Database) Set(key []byte, value []byte) error { + return db.p.Set(key, value, pebble.Sync) +} + +func (db *Database) GetState(key StateKeyEnum) ([]byte, error) { + data, rd, err := db.p.Get(db.keyUint32(BucketState, uint32(key))) + if err == pebble.ErrNotFound { + return nil, errNotExists + } else if err != nil { + return nil, err + } + defer rd.Close() + return data, nil +} + +func (db *Database) SetState(key StateKeyEnum, value []byte) error { + return db.p.Set(db.keyUint32(BucketState, uint32(key)), value, pebble.Sync) +} + +func (db *Database) SetNodeInfo(num uint32, ni *pb.NodeInfo) error { + key := db.keyUint32(BucketNodeInfo, num) + if num != 0 && ni.Num == 0 { + return db.p.Delete(key, pebble.Sync) + } + ni.Num = num + data, err := proto.Marshal(ni) + if err != nil { + return err + } + return db.p.Set(key, data, pebble.Sync) +} + +func (db *Database) GetNodeInfo(num uint32) (*pb.NodeInfo, error) { + data, rd, err := db.p.Get(db.keyUint32(BucketNodeInfo, num)) + if err == pebble.ErrNotFound { + return nil, errNotExists + } else if err != nil { + return nil, err + } + defer rd.Close() + var ni pb.NodeInfo + if err = proto.Unmarshal(data, &ni); err != nil { + return nil, err + } + return &ni, nil +} + +func (db *Database) ListNodeInfo() ([]*pb.NodeInfo, error) { + it, err := db.p.NewIter(&pebble.IterOptions{ + LowerBound: db.keyUint32(BucketNodeInfo, uint32(0)), + UpperBound: db.keyUint32(BucketNodeInfo+1, uint32(0)), + }) + if err != nil { + return nil, err + } + defer it.Close() + var list []*pb.NodeInfo + for it.First(); it.Valid(); it.Next() { + key := it.Key() + data := it.Value() + ni := &pb.NodeInfo{} + if err = proto.Unmarshal(data, ni); err != nil { + return nil, err + } + ni.Num = binary.BigEndian.Uint32(key[2:6]) + list = append(list, ni) + } + return list, nil +} + +func (db *Database) StoreFromRadio(tsMicro int64, msg *pb.FromRadio) error { + b, err := proto.Marshal(msg) + if err != nil { + return err + } + return db.p.Set(db.keyUint64(BucketFromRadio, uint64(tsMicro)), b, pebble.Sync) +} + +func (db *Database) ReadFromRadio(start, end time.Time) (<-chan *pb.FromRadio, <-chan error) { + c := make(chan *pb.FromRadio) + errc := make(chan error, 1) + lowerLimit := uint64(start.UnixMicro()) + upperLimit := uint64(end.UnixMicro()) + go func() { + defer close(c) + defer close(errc) + it, err := db.p.NewIter(&pebble.IterOptions{ + LowerBound: db.keyUint64(BucketFromRadio, lowerLimit), + UpperBound: db.keyUint64(BucketFromRadio+1, upperLimit), + }) + if err != nil { + errc <- err + return + } + defer it.Close() + for it.First(); it.Valid(); it.Next() { + key := it.Key() + data := it.Value() + tsMicro := binary.BigEndian.Uint64(key[2:]) + if tsMicro > upperLimit { + continue + } + msg := &pb.FromRadio{} + if err = proto.Unmarshal(data, msg); err != nil { + errc <- err + return + } + c <- msg + } + }() + return c, errc +} diff --git a/datalogger.go b/datalogger.go new file mode 100644 index 0000000..6cd5106 --- /dev/null +++ b/datalogger.go @@ -0,0 +1,96 @@ +package main + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" +) + +type DataLogger struct { + data *os.File + text *os.File +} + +func NewDataLogger(logsDir string) (*DataLogger, error) { + var err error + var d DataLogger + d.data, err = os.OpenFile(filepath.Join(logsDir, "data.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + d.Close() + return nil, err + } + d.text, err = os.OpenFile(filepath.Join(logsDir, "text.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + d.Close() + return nil, err + } + + return &d, nil +} + +func (d *DataLogger) Close() error { + if d.data != nil { + d.data.Close() + } + if d.text != nil { + d.text.Close() + } + return nil +} + +func formatNodeInfo(ni *pb.NodeInfo) string { + var str string + if ni != nil { + if ni.User != nil && ni.User.Id != "" { + str = fmt.Sprintf("%s (%s) %s", ni.User.Id, ni.User.ShortName, ni.User.LongName) + } else { + str = meshtastic.NodeID(ni.Num).String() + } + } else { + str = "???" + } + return str +} + +func (d *DataLogger) LogPacket(p *pb.MeshPacket, c *pb.ChannelSettings, src *pb.NodeInfo, dst *pb.NodeInfo, data *pb.Data, payload any) error { + source := formatNodeInfo(src) + destination := formatNodeInfo(dst) + header := fmt.Sprintf("%s | #%d:%s '%s' >> '%s' [rssi:%d snr:%f hs:%d, hl:%d]", time.Unix(int64(p.RxTime), 0), c.ChannelNum, c.Name, source, destination, p.RxRssi, p.RxSnr, p.HopStart, p.HopLimit) + var s fmt.Stringer + var log string + var logfile *os.File = d.data + switch data.Portnum { + case pb.PortNum_NODEINFO_APP: + s, _ = payload.(*pb.User) + case pb.PortNum_TELEMETRY_APP: + s, _ = payload.(*pb.Telemetry) + case pb.PortNum_POSITION_APP: + s, _ = payload.(*pb.Position) + case pb.PortNum_ROUTING_APP: + s, _ = payload.(*pb.Routing) + case pb.PortNum_TRACEROUTE_APP: + s, _ = payload.(*pb.RouteDiscovery) + case pb.PortNum_AUDIO_APP: + log = fmt.Sprintf("Audio message (%dB, %.2fs@800bps)", len(data.Payload), (float32(len(data.Payload)) / 100.0)) + case pb.PortNum_TEXT_MESSAGE_APP: + log = string(data.Payload) + logfile = d.text + default: + log = p.String() + } + if log == "" && s != nil { + log = s.String() + } + if log != "" && logfile != nil { + output := fmt.Sprintf("%s : [%s] %T{ %s }\n", header, data.Portnum, payload, log) + if _, err := logfile.Write([]byte(output)); err != nil { + return err + } + return logfile.Sync() + } + return nil +} @@ -0,0 +1,63 @@ +module go.metala.org/gotishosdr + +go 1.24.1 + +require ( + github.com/charmbracelet/log v0.4.2 + github.com/cockroachdb/pebble/v2 v2.0.6 + github.com/eclipse/paho.golang v0.22.0 + github.com/elastic/go-freelru v0.16.0 + github.com/meshnet-gophers/meshtastic-go v0.1.7 + github.com/pion/dtls/v3 v3.0.6 + github.com/spf13/pflag v1.0.6 + golang.org/x/crypto v0.39.0 + golang.org/x/sync v0.15.0 + google.golang.org/protobuf v1.36.6 +) + +require ( + github.com/DataDog/zstd v1.5.7 // indirect + github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/charmbracelet/colorprofile v0.3.1 // indirect + github.com/charmbracelet/lipgloss v1.1.0 // indirect + github.com/charmbracelet/x/ansi v0.9.3 // indirect + github.com/charmbracelet/x/cellbuf v0.0.13 // indirect + github.com/charmbracelet/x/term v0.2.1 // indirect + github.com/cockroachdb/crlib v0.0.0-20250617202621-0794c595bbe6 // indirect + github.com/cockroachdb/errors v1.12.0 // indirect + github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 // indirect + github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506 // indirect + github.com/cockroachdb/redact v1.1.6 // indirect + github.com/cockroachdb/swiss v0.0.0-20250624142022-d6e517c1d961 // indirect + github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb // indirect + github.com/getsentry/sentry-go v0.34.0 // indirect + github.com/go-logfmt/logfmt v0.6.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/snappy v1.0.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/lucasb-eyer/go-colorful v1.2.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/muesli/termenv v0.16.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/planetscale/vtprotobuf v0.6.0 // indirect + github.com/prometheus/client_golang v1.22.0 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.65.0 // indirect + github.com/prometheus/procfs v0.17.0 // indirect + github.com/rivo/uniseg v0.4.7 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect + golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect + golang.org/x/net v0.40.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect +) + +replace github.com/meshnet-gophers/meshtastic-go v0.1.7 => ../meshtastic-go @@ -0,0 +1,160 @@ +github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE= +github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo= +github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f/go.mod h1:tMDTce/yLLN/SK8gMOxQfnyeMeCg8KGzp0D1cbECEeo= +github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= +github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/charmbracelet/colorprofile v0.3.1 h1:k8dTHMd7fgw4bnFd7jXTLZrSU/CQrKnL3m+AxCzDz40= +github.com/charmbracelet/colorprofile v0.3.1/go.mod h1:/GkGusxNs8VB/RSOh3fu0TJmQ4ICMMPApIIVn0KszZ0= +github.com/charmbracelet/lipgloss v1.1.0 h1:vYXsiLHVkK7fp74RkV7b2kq9+zDLoEU4MZoFqR/noCY= +github.com/charmbracelet/lipgloss v1.1.0/go.mod h1:/6Q8FR2o+kj8rz4Dq0zQc3vYf7X+B0binUUBwA0aL30= +github.com/charmbracelet/log v0.4.2 h1:hYt8Qj6a8yLnvR+h7MwsJv/XvmBJXiueUcI3cIxsyig= +github.com/charmbracelet/log v0.4.2/go.mod h1:qifHGX/tc7eluv2R6pWIpyHDDrrb/AG71Pf2ysQu5nw= +github.com/charmbracelet/x/ansi v0.9.3 h1:BXt5DHS/MKF+LjuK4huWrC6NCvHtexww7dMayh6GXd0= +github.com/charmbracelet/x/ansi v0.9.3/go.mod h1:3RQDQ6lDnROptfpWuUVIUG64bD2g2BgntdxH0Ya5TeE= +github.com/charmbracelet/x/cellbuf v0.0.13 h1:/KBBKHuVRbq1lYx5BzEHBAFBP8VcQzJejZ/IA3iR28k= +github.com/charmbracelet/x/cellbuf v0.0.13/go.mod h1:xe0nKWGd3eJgtqZRaN9RjMtK7xUYchjzPr7q6kcvCCs= +github.com/charmbracelet/x/term v0.2.1 h1:AQeHeLZ1OqSXhrAWpYUtZyX1T3zVxfpZuEQMIQaGIAQ= +github.com/charmbracelet/x/term v0.2.1/go.mod h1:oQ4enTYFV7QN4m0i9mzHrViD7TQKvNEEkHUMCmsxdUg= +github.com/cockroachdb/crlib v0.0.0-20250617202621-0794c595bbe6 h1:PZVolkXzVqPQQZ7Jm8/lGpI9ZM086mB55oOWqy0wG6E= +github.com/cockroachdb/crlib v0.0.0-20250617202621-0794c595bbe6/go.mod h1:Gq51ZeKaFCXk6QwuGM0w1dnaOqc/F5zKT2zA9D6Xeac= +github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056 h1:slXychO2uDM6hYRu4c0pD0udNI8uObfeKN6UInWViS8= +github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= +github.com/cockroachdb/errors v1.12.0 h1:d7oCs6vuIMUQRVbi6jWWWEJZahLCfJpnJSVobd1/sUo= +github.com/cockroachdb/errors v1.12.0/go.mod h1:SvzfYNNBshAVbZ8wzNc/UPK3w1vf0dKDUP41ucAIf7g= +github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 h1:pU88SPhIFid6/k0egdR5V6eALQYq2qbSmukrkgIh/0A= +github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= +github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506 h1:ASDL+UJcILMqgNeV5jiqR4j+sTuvQNHdf2chuKj1M5k= +github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506/go.mod h1:Mw7HqKr2kdtu6aYGn3tPmAftiP3QPX63LdK/zcariIo= +github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA= +github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA= +github.com/cockroachdb/pebble/v2 v2.0.6 h1:eL54kX2AKp1ePJ/8vq4IO3xIEPpvVjlSP12dlLYilyE= +github.com/cockroachdb/pebble/v2 v2.0.6/go.mod h1:un1DXG73PKw3F7Ndd30YactyvsFviI9Fuhe0tENdnyA= +github.com/cockroachdb/redact v1.1.6 h1:zXJBwDZ84xJNlHl1rMyCojqyIxv+7YUpQiJLQ7n4314= +github.com/cockroachdb/redact v1.1.6/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/swiss v0.0.0-20250624142022-d6e517c1d961 h1:Nua446ru3juLHLZd4AwKNzClZgL1co3pUPGv3o8FlcA= +github.com/cockroachdb/swiss v0.0.0-20250624142022-d6e517c1d961/go.mod h1:yBRu/cnL4ks9bgy4vAASdjIW+/xMlFwuHKqtmh3GZQg= +github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb h1:3bCgBvB8PbJVMX1ouCcSIxvsqKPYM7gs72o0zC76n9g= +github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.golang v0.22.0 h1:JhhUngr8TBlyUZDZw/L6WVayPi9qmSmdWeki48i5AVE= +github.com/eclipse/paho.golang v0.22.0/go.mod h1:9ZiYJ93iEfGRJri8tErNeStPKLXIGBHiqbHV74t5pqI= +github.com/elastic/go-freelru v0.16.0 h1:gG2HJ1WXN2tNl5/p40JS/l59HjvjRhjyAa+oFTRArYs= +github.com/elastic/go-freelru v0.16.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= +github.com/getsentry/sentry-go v0.34.0 h1:1FCHBVp8TfSc8L10zqSwXUZNiOSF+10qw4czjarTiY4= +github.com/getsentry/sentry-go v0.34.0/go.mod h1:C55omcY9ChRQIUcVcGcs+Zdy4ZpQGvNJ7JYHIoSWOtE= +github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9 h1:r5GgOLGbza2wVHRzK7aAj6lWZjfbAwiu/RDCVOKjRyM= +github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9/go.mod h1:106OIgooyS7OzLDOpUGgm9fA3bQENb/cFSyyBmMoJDs= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= +github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= +github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= +github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/muesli/termenv v0.16.0 h1:S5AlUN9dENB57rsbnkPyfdGuWIlkmzJjbFf0Tf5FWUc= +github.com/muesli/termenv v0.16.0/go.mod h1:ZRfOIKPFDYQoDFF4Olj7/QJbW60Ol/kL1pU3VfY/Cnk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= +github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pion/dtls/v3 v3.0.6 h1:7Hkd8WhAJNbRgq9RgdNh1aaWlZlGpYTzdqjy9x9sK2E= +github.com/pion/dtls/v3 v3.0.6/go.mod h1:iJxNQ3Uhn1NZWOMWlLxEEHAN5yX7GyPvvKw04v9bzYU= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/planetscale/vtprotobuf v0.6.0 h1:nBeETjudeJ5ZgBHUz1fVHvbqUKnYOXNhsIEabROxmNA= +github.com/planetscale/vtprotobuf v0.6.0/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= +github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2VzE= +github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= +github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0= +github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= +golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= +golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= +golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= +golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..882ddec --- /dev/null +++ b/helpers.go @@ -0,0 +1,5 @@ +package main + +func Ptr[T any](v T) *T { + return &v +} @@ -0,0 +1,168 @@ +package main + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "net" + "time" + + "github.com/charmbracelet/log" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "github.com/meshnet-gophers/meshtastic-go/radio" +) + +var ( + errInvalidOpcode = errors.New("invalid opcode") + errInvalidLength = errors.New("invalid length") +) + +// Implements MeshIf +type KavaIf struct { + Address string + KeepAlivePeriod time.Duration + Channels *pb.ChannelSet + + logger *log.Logger + conn net.Conn + modemTimeOffset uint64 +} + +func NewKavaConn(address string, keepAlive time.Duration, channels *pb.ChannelSet) *KavaIf { + logger := log.WithPrefix("Kava") + return &KavaIf{ + Address: address, + KeepAlivePeriod: keepAlive, + Channels: channels, + logger: logger, + } +} + +func (k *KavaIf) Open() error { + conn, err := net.Dial("tcp", k.Address) + if err != nil { + return fmt.Errorf("connecting to modem: %w", err) + } + if k.KeepAlivePeriod > 0 { + tcpconn := conn.(*net.TCPConn) + tcpconn.SetKeepAlive(true) + tcpconn.SetKeepAlivePeriod(k.KeepAlivePeriod) + } + + k.conn = conn + return nil +} + +func (k *KavaIf) Close() error { + return k.conn.Close() +} + +func (k *KavaIf) ReadMeshPacket() (*pb.MeshPacket, uint64, error) { + var header [4]byte + var err error + for { + if _, err = io.ReadFull(k.conn, header[0:4]); err != nil { + return nil, 0, err + } + opcode := binary.LittleEndian.Uint16(header[0:2]) + length := binary.LittleEndian.Uint16(header[2:4]) + if length <= 0 { + continue + } else if length > 255+12 { + return nil, 0, errInvalidLength + } + switch opcode { + case 0x1007: + var ts [8]byte + if _, err = io.ReadFull(k.conn, ts[:]); err != nil { + return nil, 0, err + } + uptime := binary.LittleEndian.Uint64(ts[:]) + k.modemTimeOffset = uint64(time.Now().UnixMicro()) - uptime + k.logger.Info("}} Modem uptime", "µs", uptime, "Δ-to-now", fmt.Sprintf("%+d", k.modemTimeOffset)) + continue + case 0x0001: + var message [272]byte + if _, err := io.ReadFull(k.conn, message[:length]); err != nil { + return nil, 0, err + } + if length < (12 /*kava*/ + 16 /*m8c*/) { + return nil, 0, errInvalidLength + } + + modemTs := binary.LittleEndian.Uint64(message[0:8]) + rssi := int16(binary.LittleEndian.Uint16(message[8:10])) + snr := int16(binary.LittleEndian.Uint16(message[10:12])) + ts := int64(modemTs + k.modemTimeOffset) + timestamp := time.UnixMicro(ts) + data := message[12:length] + flags := RadioHeaderFlag(data[12]) + channelHash := data[13] + payload := data[16:] + p := pb.MeshPacket{ + To: binary.LittleEndian.Uint32(data[0:4]), + From: binary.LittleEndian.Uint32(data[4:8]), + Id: binary.LittleEndian.Uint32(data[8:12]), + RxTime: uint32(timestamp.Unix()), + RxSnr: float32(snr), + RxRssi: int32(rssi), + HopLimit: uint32(flags.HopLimit()), + HopStart: uint32(flags.HopStart()), + WantAck: flags.WantACK() == 1, + ViaMqtt: flags.MQTT() == 1, + NextHop: uint32(data[14]), + RelayNode: uint32(data[15]), + Channel: uint32(channelHash), + PkiEncrypted: channelHash == 0x00, + PayloadVariant: &pb.MeshPacket_Encrypted{Encrypted: payload}, + } + return &p, uint64(ts), nil + default: + return nil, 0, errInvalidOpcode + } + + } +} + +func (k *KavaIf) WriteMeshPacket(p *pb.MeshPacket) error { + viaMQTT := uint32(bool2Int(p.ViaMqtt)) + wantAck := uint32(bool2Int(p.WantAck)) + flags := (p.HopStart << 5) | (viaMQTT << 4) | (wantAck << 3) | p.HopLimit + + var chash uint32 + if p.PkiEncrypted { + chash = 0x00 + } else { + channel := k.Channels.Settings[p.Channel] + chash, _ = radio.ChannelHash(channel.Name, channel.Psk) + } + + buf := bytes.NewBuffer(nil) + binary.Write(buf, binary.LittleEndian, uint32(p.To)) + binary.Write(buf, binary.LittleEndian, uint32(p.From)) + binary.Write(buf, binary.LittleEndian, uint32(p.Id)) + binary.Write(buf, binary.LittleEndian, uint8(flags)) + binary.Write(buf, binary.LittleEndian, uint8(chash)) + binary.Write(buf, binary.LittleEndian, uint8(p.NextHop)) + binary.Write(buf, binary.LittleEndian, uint8(p.RelayNode)) + buf.Write(p.GetEncrypted()) + b := buf.Bytes() + + modemCmd := append([]byte{1, 0, byte(len(b)), 0}, b...) + if _, err := k.conn.Write(modemCmd); err != nil { + return err + } + return nil +} + +func bool2Int(b bool) int { + var i int + if b { + i = 1 + } else { + i = 0 + } + return i +} diff --git a/logs/.gitkeep b/logs/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/logs/.gitkeep @@ -0,0 +1,24 @@ +package main + +import "encoding/binary" + +const ( + // FNV-1a + offset32 = uint32(2166136261) + prime32 = uint32(16777619) +) + +func hashPacket(p PacketTruncated) uint32 { + h := offset32 + b := make([]byte, 12) + binary.BigEndian.PutUint32(b[0:4], p.To) + binary.BigEndian.PutUint32(b[4:8], p.From) + binary.BigEndian.PutUint32(b[8:12], p.ID) + for ; len(b) >= 4; b = b[4:] { + h = (h ^ uint32(b[0])) * prime32 + h = (h ^ uint32(b[1])) * prime32 + h = (h ^ uint32(b[2])) * prime32 + h = (h ^ uint32(b[3])) * prime32 + } + return h +} @@ -0,0 +1,133 @@ +package main + +import ( + "context" + "encoding/base64" + "encoding/binary" + "fmt" + "os" + "os/signal" + "time" + + "github.com/charmbracelet/log" + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + flag "github.com/spf13/pflag" + "golang.org/x/crypto/curve25519" +) + +func main() { + var args struct { + DbDir string + LogsDir string + ConfigPath string + Interface string + Relentless bool + } + flag.StringVarP(&args.DbDir, "dbdir", "", "./data/", "The path to the database directory") + flag.StringVarP(&args.LogsDir, "logsdir", "", "./logs/", "The path to the logs directory") + flag.StringVarP(&args.ConfigPath, "config", "c", "./config.json", "The path to the config.json") + flag.StringVarP(&args.Interface, "interface", "i", "kava", "Mesh interface (kava, mqtt)") + flag.BoolVarP(&args.Relentless, "relentless", "r", false, "Relentless reconnect to modem") + flag.Parse() + + log.SetLevel(log.DebugLevel) + cfg, err := ReadConfig(args.ConfigPath) + if err != nil { + log.Fatal(err) + } + pbChannels := make([]*pb.ChannelSettings, len(cfg.Channels)) + for i, channel := range cfg.Channels { + var pbChannelSettings = pb.ChannelSettings{ + Name: channel.Name, + Psk: channel.PSK, + } + pbChannels[i] = &pbChannelSettings + } + + var nodeID meshtastic.NodeID + if len(cfg.NodeID) == 4 { + nodeID = meshtastic.NodeID(binary.BigEndian.Uint32(cfg.NodeID)) + } else if len(cfg.NodeID) == 0 { + nodeID, err = meshtastic.RandomNodeID() + if err != nil { + log.Fatal(err) + } + } + log.Print("Config:") + log.Print(" Database", "directory", args.DbDir) + log.Print(" Logs", "directory", args.LogsDir) + log.Print(" Node", "id", nodeID, "short", cfg.ShortName, "name", cfg.Name) + nodeConfig := NodeConfig{ + DatabaseDir: args.DbDir, + LogsDir: args.LogsDir, + NodeID: nodeID, + LongName: cfg.Name, + ShortName: cfg.ShortName, + DefaultHops: cfg.DefaultHops, + MaxHops: cfg.MaxHops, + Channels: &pb.ChannelSet{Settings: pbChannels}, + BroadcastNodeInfoInterval: time.Duration(cfg.NodeBroadcastInterval * float64(time.Second)), + BroadcastPositionInterval: time.Duration(cfg.Position.BroadcastInterval * float64(time.Second)), + // Position paramters + PositionLatitudeI: int32(cfg.Position.Latitude * 1.e7), + PositionLongitudeI: int32(cfg.Position.Longitude * 1.e7), + PositionAltitude: int32(cfg.Position.Altitude), + // Device Metrics + DeviceMetricsBroadcastInterval: time.Duration(cfg.DeviceMetrics.BroadcastInterval * float64(time.Second)), + DeviceMetricsUPSAddress: cfg.DeviceMetrics.UPSAddress, + TCPListenAddr: "0.0.0.0:4403", + } + if len(cfg.X25519Key) == 32 { + nodeConfig.X25519SecretKey = cfg.X25519Key + nodeConfig.X25519PublicKey, err = curve25519.X25519(nodeConfig.X25519SecretKey, curve25519.Basepoint) + if err != nil { + log.Fatal(err) + } + log.Print(" X25519PublicKey", "base64", base64.StdEncoding.EncodeToString(nodeConfig.X25519PublicKey), "hex", fmt.Sprintf("%x", nodeConfig.X25519PublicKey)) + } else if len(cfg.X25519Key) != 0 { + log.Fatal("invalid x25519 key") + } + + if args.Relentless { + log.Warn("Starting Node in 'relentless' mode...") + } else { + log.Info("Starting Node...") + } + + n, err := NewNode(nodeConfig) + if err != nil { + log.Fatal(err) + } + defer n.Close() + ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt) + for { + var iface MeshIf + switch args.Interface { + case "kava": + iface = NewKavaConn(cfg.KavaModem.Address, 10*time.Second, n.cfg.Channels) + case "mqtt": + iface = NewMqttIf(cfg.MqttIf.Address, cfg.MqttIf.Username, cfg.MqttIf.Password, cfg.MqttIf.Root, cfg.MqttIf.ClientID, meshtastic.NodeID(nodeID), n.cfg.Channels) + default: + log.Fatal("invalid argument", "interface", args.Interface) + } + + if err := n.Run(ctx, iface); err != nil { + log.Error("!! Node.Run()", "err", err) + } + if err := ctx.Err(); err != nil { + log.Error("!! Caught interrupt!") + break + } + if !args.Relentless { + break + } + log.Warn("Reconnecting in 30 seconds...") + ticker := time.NewTicker(30 * time.Second) + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + } +} diff --git a/mestastic.go b/mestastic.go new file mode 100644 index 0000000..d76165b --- /dev/null +++ b/mestastic.go @@ -0,0 +1,90 @@ +package main + +import ( + "encoding/binary" + + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "google.golang.org/protobuf/proto" +) + +const ( + FlagViaMQTT = (1 << 4) + FlagWantACK = (1 << 3) +) + +var ( + broadcastID = meshtastic.BroadcastNodeID.Uint32() +) + +var pkcChan = &pb.ChannelSettings{ + ChannelNum: 0xFFFFFFFF, + Name: "[PKC]", + Psk: nil, +} + +type MeshIf interface { + Open() error + ReadMeshPacket() (*pb.MeshPacket, uint64, error) + WriteMeshPacket(*pb.MeshPacket) error + Close() error +} + +type RadioHeaderFlag uint8 + +func (r RadioHeaderFlag) WantACK() uint8 { + return uint8(r>>3) & 1 +} + +func (r RadioHeaderFlag) MQTT() uint8 { + return uint8(r>>4) & 1 +} + +func (r RadioHeaderFlag) HopStart() uint8 { + return uint8(r>>5) & 0b111 +} + +func (r RadioHeaderFlag) HopLimit() uint8 { + return uint8(r & 0b111) +} + +type PacketTruncated struct { + To uint32 + From uint32 + ID uint32 +} + +func buildRoutingReply(p *pb.MeshPacket, err pb.Routing_Error) *pb.MeshPacket { + cb, _ := proto.Marshal(&pb.Routing{ + Variant: &pb.Routing_ErrorReason{ErrorReason: err}, + }) + return &pb.MeshPacket{ + // Reversed + To: p.From, + From: p.To, + + PkiEncrypted: p.PkiEncrypted, + PublicKey: p.PublicKey, + + // (N)ACK + Priority: pb.MeshPacket_ACK, + HopLimit: p.HopStart, + Channel: p.Channel, + PayloadVariant: &pb.MeshPacket_Decoded{ + Decoded: &pb.Data{ + Portnum: pb.PortNum_ROUTING_APP, + RequestId: p.Id, + Payload: cb, + }, + }, + } +} + +func buildPkcNonce(id uint32, extra uint32, from uint32) []byte { + var buf [13]byte // zero-init + binary.LittleEndian.PutUint32(buf[0:4], id) + binary.LittleEndian.PutUint32(buf[4:8], extra) + binary.LittleEndian.PutUint32(buf[8:12], from) + buf[12] = 0 // last byte is zero + return buf[:] +} @@ -0,0 +1,205 @@ +package main + +import ( + "context" + "errors" + "fmt" + "net/url" + "time" + + "github.com/charmbracelet/log" + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "github.com/meshnet-gophers/meshtastic-go/radio" + "google.golang.org/protobuf/proto" +) + +// Implements MeshIf +type MqttIf struct { + Address string + Username string + Password string + ClientID string + Root string + GatewayID meshtastic.NodeID + Channels *pb.ChannelSet + + logger *log.Logger + c *autopaho.ConnectionManager + ctx context.Context + stop context.CancelFunc + inputCh chan mqttServiceEnvelope + chIdMap map[string]uint8 +} + +type mqttServiceEnvelope struct { + timestamp time.Time + se *pb.ServiceEnvelope +} + +func NewMqttIf(address string, username, password, root, clientID string, gatewayID meshtastic.NodeID, channels *pb.ChannelSet) *MqttIf { + logger := log.WithPrefix("MqttIf") + inputCh := make(chan mqttServiceEnvelope, 10) + return &MqttIf{ + Address: address, + Username: username, + Password: password, + ClientID: clientID, + Root: root, + GatewayID: gatewayID, + Channels: channels, + + logger: logger, + inputCh: inputCh, + } +} + +func (m *MqttIf) channelPublishTopic(channelId string) string { + return fmt.Sprintf("%s/2/e/%s/%s", m.Root, channelId, m.GatewayID) +} + +func (m *MqttIf) onConnectionUp(cm *autopaho.ConnectionManager, connAck *paho.Connack) { + topics := []string{fmt.Sprintf("%s/2/e/PKI/#", m.Root)} + chIdMap := make(map[string]uint8) + for _, c := range m.Channels.Settings { + topics = append(topics, fmt.Sprintf("%s/2/e/%s/#", m.Root, c.Name)) + channelHash, err := radio.ChannelHash(c.Name, c.Psk) + if err != nil { + m.logger.Errorf("failed to build channel hash: %s", err) + m.c.Disconnect(m.ctx) + return + } + chIdMap[c.Name] = uint8(channelHash) + } + m.chIdMap = chIdMap + subscriptions := make([]paho.SubscribeOptions, 0, len(topics)) + for _, t := range topics { + subscriptions = append(subscriptions, paho.SubscribeOptions{Topic: t, QoS: 1}) + } + if _, err := cm.Subscribe(m.ctx, &paho.Subscribe{Subscriptions: subscriptions}); err != nil { + m.logger.Errorf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) + m.c.Disconnect(m.ctx) + return + } + m.logger.Info("Subscribed", "topics", topics) +} + +func (m *MqttIf) onConnectionError(err error) { + m.logger.Warnf("error whilst attempting connection: %s\n", err) +} + +func (m *MqttIf) onReceive(pr paho.PublishReceived) (bool, error) { + if pr.Packet.Duplicate() || pr.AlreadyHandled { + return false, nil + } + se := &pb.ServiceEnvelope{} + if err := proto.Unmarshal(pr.Packet.Payload, se); err != nil { + return false, fmt.Errorf("unmarshalling ServicePayload from Mqtt: %w", err) + } + if se.Packet.RxTime == 0 { + se.Packet.RxTime = uint32(time.Now().Unix()) + } + m.inputCh <- mqttServiceEnvelope{timestamp: time.Now(), se: se} + return true, nil +} + +func (m *MqttIf) onClientError(err error) { + m.logger.Warnf("client error: %s\n", err) +} + +func (m *MqttIf) onServerDisconnect(d *paho.Disconnect) { + if d.Properties != nil { + fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString) + } else { + fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode) + } +} + +func (m *MqttIf) Open() error { + m.ctx, m.stop = context.WithCancel(context.Background()) + u, err := url.Parse(m.Address) + if err != nil { + return err + } + cliCfg := autopaho.ClientConfig{ + ServerUrls: []*url.URL{u}, + KeepAlive: 20, // Keepalive message should be sent every 20 seconds + CleanStartOnInitialConnection: false, + SessionExpiryInterval: 60, + OnConnectionUp: m.onConnectionUp, + OnConnectError: m.onConnectionError, + ConnectUsername: m.Username, + ConnectPassword: []byte(m.Password), + ClientConfig: paho.ClientConfig{ + ClientID: m.ClientID, + OnPublishReceived: []func(paho.PublishReceived) (bool, error){m.onReceive}, + OnClientError: m.onClientError, + OnServerDisconnect: m.onServerDisconnect, + }, + } + + m.c, err = autopaho.NewConnection(m.ctx, cliCfg) // starts process; will reconnect until context cancelled + if err != nil { + return err + } + if err = m.c.AwaitConnection(m.ctx); err != nil { + return err + } + + return nil +} + +func (m *MqttIf) Close() error { + m.logger.Info("Closing MQTT connection...") + m.c.Disconnect(m.ctx) + m.stop() + close(m.inputCh) + <-m.ctx.Done() + return nil +} + +func (m *MqttIf) ReadMeshPacket() (*pb.MeshPacket, uint64, error) { + if err := m.ctx.Err(); err != nil { + return nil, 0, err + } + p := <-m.inputCh + if p.se == nil || p.se.Packet == nil { + return nil, 0, errors.New("no input") + } + p.se.Packet.Channel = uint32(m.chIdMap[p.se.ChannelId]) + return p.se.Packet, uint64(p.timestamp.UnixMicro()), nil +} + +func (m *MqttIf) WriteMeshPacket(p *pb.MeshPacket) error { + var channelId string + if p.PkiEncrypted { + channelId = "PKI" + p.Priority = pb.MeshPacket_HIGH + } else { + channel := m.Channels.Settings[p.Channel] + channelId = channel.Name + } + if p.RxTime == 0 { + p.RxTime = uint32(time.Now().Unix()) + } + p.RelayNode = p.RelayNode & 0xFF + p.NextHop = p.NextHop & 0xFF + p.Channel = 0 // Don't expose your channel number + se := &pb.ServiceEnvelope{ + Packet: p, + ChannelId: channelId, + GatewayId: m.GatewayID.String(), + } + payload, err := proto.Marshal(se) + if err != nil { + return err + } + topic := m.channelPublishTopic(channelId) + m.c.Publish(m.ctx, &paho.Publish{ + Topic: topic, + Payload: payload, + }) + return nil +} @@ -0,0 +1,598 @@ +package main + +import ( + "bufio" + "bytes" + "context" + "crypto/aes" + "crypto/rand" + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" + "io" + "os/exec" + "strconv" + "sync" + "time" + + "github.com/charmbracelet/log" + "github.com/elastic/go-freelru" + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "github.com/meshnet-gophers/meshtastic-go/radio" + "github.com/meshnet-gophers/meshtastic-go/transport" + "github.com/pion/dtls/v3/pkg/crypto/ccm" + "golang.org/x/crypto/curve25519" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/proto" +) + +const ( + // MinAppVersion is the minimum app version supported by the emulated radio. + MinAppVersion = 30200 + ROUTE_MAX_SIZE = 8 +) + +var ( + errNoAdminConnection = errors.New("no admin connection") +) + +// NodeConfig is the configuration for the emulated Radio. +type NodeConfig struct { + // Database + DatabaseDir string + + // Logs directory + LogsDir string + + // Node configuration + // NodeID is the ID of the node. + NodeID meshtastic.NodeID + // LongName is the long name of the node. + LongName string + // ShortName is the short name of the node. + ShortName string + + // MaxHops sets the maximum value for value for HopStart/HopLimit on Tx + MaxHops uint32 + // DefaultHops sets the default value for HopStart/HopLimit on Tx + DefaultHops uint32 + + // Channels is the set of channels the radio will listen and transmit on. + // The first channel in the set is considered the primary channel and is used for broadcasting NodeInfo and Position + Channels *pb.ChannelSet + // BroadcastNodeInfoInterval is the interval at which the radio will broadcast a NodeInfo on the Primary channel. + // The zero value disables broadcasting NodeInfo. + BroadcastNodeInfoInterval time.Duration + // BroadcastPositionInterval is the interval at which the radio will broadcast Position on the Primary channel. + // The zero value disables broadcasting NodeInfo. + BroadcastPositionInterval time.Duration + // PositionLatitudeI is the latitude of the position which will be regularly broadcasted. + // This is in degrees multiplied by 1e7. + PositionLatitudeI int32 + // PositionLongitudeI is the longitude of the position which will be regularly broadcasted. + // This is in degrees multiplied by 1e7. + PositionLongitudeI int32 + // PositionAltitude is the altitude of the position which will be regularly broadcasted. + // This is in meters above MSL. + PositionAltitude int32 + // TCPListenAddr is the address the emulated radio will listen on for TCP connections and offer the Client API over. + TCPListenAddr string + + DeviceMetricsBroadcastInterval time.Duration + DeviceMetricsUPSAddress string + X25519SecretKey []byte + X25519PublicKey []byte +} + +func (c *NodeConfig) validate() error { + if c.DatabaseDir == "" { + return fmt.Errorf("DatabasePath is required") + } + if c.LogsDir == "" { + return fmt.Errorf("LogsDir is required") + } + if c.NodeID == 0 { + return fmt.Errorf("NodeID is required") + } + if c.LongName == "" { + c.LongName = c.NodeID.DefaultLongName() + } + if c.ShortName == "" { + c.ShortName = c.NodeID.DefaultShortName() + } + if c.Channels == nil { + //lint:ignore ST1005 we're referencing an actual field here. + return fmt.Errorf("Channels is required") + } + if len(c.Channels.Settings) == 0 { + return fmt.Errorf("Channels.Settings should be non-empty") + } + return nil +} + +// Node emulates a meshtastic Node, communicating with a meshtastic network via MQTT. +type Node struct { + cfg NodeConfig + db *Database + mesh MeshIf + logger *log.Logger + dl *DataLogger + start time.Time + nodeID uint32 + user *pb.User + chLookup map[byte][]*pb.ChannelSettings + pktLru *freelru.LRU[PacketTruncated, struct{}] + packetID uint32 + mu sync.Mutex + adminConn map[*transport.StreamConn]struct{} +} + +// NewNode creates a new emulated radio. +func NewNode(cfg NodeConfig) (*Node, error) { + if err := cfg.validate(); err != nil { + return nil, fmt.Errorf("validating config: %w", err) + } + db, err := NewDB(cfg.DatabaseDir) + if err != nil { + return nil, err + } + logger := log.WithPrefix("Node") + chLookup := make(map[byte][]*pb.ChannelSettings) + for i, c := range cfg.Channels.Settings { + c.ChannelNum = uint32(i) + ch, err := radio.ChannelHash(c.Name, c.Psk) + if err != nil { + return nil, err + } + chLookup[byte(ch)] = append(chLookup[byte(ch)], c) + logger.Infof("Channel[%d]: Name='%s', Hash=0x%02x", i, c.Name, ch) + } + dl, err := NewDataLogger(cfg.LogsDir) + if err != nil { + return nil, err + } + pktLru, err := freelru.New[PacketTruncated, struct{}](32, hashPacket) + if err != nil { + panic(err) + } + var nonce [4]byte + if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil { + return nil, err + } + adminConn := make(map[*transport.StreamConn]struct{}) + return &Node{ + cfg: cfg, + logger: logger, + db: db, + dl: dl, + start: time.Now(), + packetID: binary.LittleEndian.Uint32(nonce[:]), + chLookup: chLookup, + pktLru: pktLru, + adminConn: adminConn, + }, nil +} + +func (n *Node) Close() error { + return n.db.Close() +} + +// Run starts the radio. It blocks until the context is cancelled. +func (n *Node) Run(ctx context.Context, meshConn MeshIf) error { + n.logger.Infof("** Connecting to Mesh...") + if err := meshConn.Open(); err != nil { + return fmt.Errorf("connecting to modem: %w", err) + } + n.mesh = meshConn + go func() { + <-ctx.Done() + meshConn.Close() + }() + n.logger.Infof("** Connected to Mesh...") + n.nodeID = n.cfg.NodeID.Uint32() + n.NodeInfo(n.nodeID, func(ni *pb.NodeInfo) bool { + var updated bool + n.user = &pb.User{ + Id: n.cfg.NodeID.String(), + LongName: n.cfg.LongName, + ShortName: n.cfg.ShortName, + HwModel: pb.HardwareModel_PRIVATE_HW, + Role: pb.Config_DeviceConfig_CLIENT_MUTE, + IsLicensed: false, + IsUnmessagable: Ptr(false), + PublicKey: n.cfg.X25519PublicKey, + } + if !proto.Equal(ni.User, n.user) { + ni.User = n.user + updated = true + } + if n.cfg.DeviceMetricsBroadcastInterval > 0 { + deviceMetrics, err := n.getDeviceMetrics() + if err == nil { + ni.DeviceMetrics = deviceMetrics + updated = true + } + } + return updated + }) + + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(func() error { + return n.RxLoop(egCtx) + }) + + if n.cfg.BroadcastNodeInfoInterval > 0 { + eg.Go(FuncTicker(n.cfg.BroadcastNodeInfoInterval, egCtx, n.broadcastNodeInfo)) + } + if n.cfg.BroadcastPositionInterval > 0 { + eg.Go(FuncTicker(n.cfg.BroadcastPositionInterval, egCtx, n.broadcastPosition)) + } + if n.cfg.DeviceMetricsBroadcastInterval > 0 { + eg.Go(FuncTicker(n.cfg.DeviceMetricsBroadcastInterval, egCtx, n.broadcastDeviceMetrics)) + } + if n.cfg.TCPListenAddr != "" { + eg.Go(func() error { + return n.listenTCP(egCtx) + }) + } + return eg.Wait() +} + +func (n *Node) NodeInfo(nodeID uint32, updateFunc func(*pb.NodeInfo) bool) *pb.NodeInfo { + if nodeID == 0 { + return nil + } + ni, err := n.db.GetNodeInfo(nodeID) + if err == errNotExists { + ni = &pb.NodeInfo{User: &pb.User{}, Position: &pb.Position{}, DeviceMetrics: &pb.DeviceMetrics{}} + } else if err != nil { + n.logger.Error("!! NodeInfo error", "error", err) + return nil + } + ni.Num = nodeID + var needsUpdate bool + if updateFunc != nil { + needsUpdate = updateFunc(ni) + } else if err == nil { + return ni // no (updatefunc or new node) + } + if !needsUpdate { + return ni + } + ni.LastHeard = uint32(time.Now().Unix()) + n.logger.Debug("** SetNodeInfo", "nodeInfo", ni) + if err = n.db.SetNodeInfo(nodeID, ni); err != nil { + n.logger.Error(err) + } + return ni +} + +func (n *Node) tryDecryptPKC(p *pb.MeshPacket) ([]byte, []byte, error) { + ni, err := n.db.GetNodeInfo(p.From) + if err != nil { + return nil, nil, err + } + if ni.User == nil || ni.User.PublicKey == nil { + return nil, nil, errors.New("invalid user publickey") + } + pk := ni.User.PublicKey + sh, err := curve25519.X25519(n.cfg.X25519SecretKey, pk) + if err != nil { + return nil, nil, err + } + h := sha256.New() + h.Write(sh) + shk := h.Sum(nil) + c, err := aes.NewCipher(shk) + if err != nil { + return nil, nil, err + } + aead, err := ccm.NewCCM(c, 8, 13) + if err != nil { + return nil, nil, err + } + payload := p.GetEncrypted() + if len(payload) <= 4+8 { + return nil, nil, errors.New("too short") + } + extra := binary.LittleEndian.Uint32(payload[len(payload)-4:]) + nonce := buildPkcNonce(p.Id, extra, p.From) + data, err := aead.Open(nil, nonce, payload[:len(payload)-4], nil) + if err != nil { + return nil, nil, err + } + return data, pk, nil +} + +func (n *Node) tryDecryptPSK(p *pb.MeshPacket) ([]byte, *pb.ChannelSettings) { + channelCandidates, ok := n.chLookup[byte(p.Channel)] + if !ok { + return nil, nil // cannot find channel + } + var err error + var plaintext []byte + var channel *pb.ChannelSettings + for _, channel = range channelCandidates { + plaintext, err = radio.XOR( + p.GetEncrypted(), + channel.Psk, + p.Id, + p.From, + ) + if err == nil { + break + } + } + if err != nil { + return nil, nil + } + return plaintext, channel +} + +func (n *Node) nextPacketID() uint32 { + n.mu.Lock() + n.packetID += 1 + n.mu.Unlock() + return n.packetID +} + +func (n *Node) txPortnumMessage(channelID uint32, to uint32, portNum pb.PortNum, message proto.Message) error { + payload, err := proto.Marshal(message) + if err != nil { + return err + } + return n.txPacket(&pb.MeshPacket{ + From: n.nodeID, + To: to, + Channel: channelID, + PayloadVariant: &pb.MeshPacket_Decoded{Decoded: &pb.Data{ + Portnum: portNum, + Payload: payload, + }}, + }) +} + +func (n *Node) txPacket(p *pb.MeshPacket) error { + var plaintext []byte + var decoded *pb.Data + switch payload := p.PayloadVariant.(type) { + case *pb.MeshPacket_Decoded: + var err error + plaintext, err = proto.Marshal(payload.Decoded) + if err != nil { + return fmt.Errorf("marshalling user: %w", err) + } + decoded = payload.Decoded + default: + panic("unexpected MeshPacket payload variant") + } + + if p.To == 0 { + p.To = broadcastID + } + if p.Id == 0 { + p.Id = n.nextPacketID() + } + if p.RelayNode == 0 { + p.RelayNode = p.From + } + if p.RelayNode == 0 { + p.RelayNode = 0xFF + } + if p.HopStart == 0 { + p.HopStart = n.cfg.DefaultHops + p.HopLimit = n.cfg.DefaultHops + } + if p.HopLimit > n.cfg.MaxHops { + p.HopLimit = n.cfg.MaxHops + } + dstNi, _ := n.db.GetNodeInfo(p.To) + if dstNi == nil { + dstNi = &pb.NodeInfo{Num: p.To} + } + + // PKC + if p.PkiEncrypted && p.PublicKey != nil { + // explicit PKC with a key, do nothing + } else if p.From == n.nodeID && decoded.Portnum != pb.PortNum_TRACEROUTE_APP && decoded.Portnum != pb.PortNum_NODEINFO_APP && decoded.Portnum != pb.PortNum_ROUTING_APP && decoded.Portnum != pb.PortNum_POSITION_APP { + // implicit PKC, or explicit PKC without a key + if dstNi.User != nil && dstNi.User.PublicKey != nil { + p.PublicKey = dstNi.User.PublicKey + } + } + // Ensure if PKC is selected, there is a recepient key + if p.PkiEncrypted && p.PublicKey == nil { + return errors.New("missing recepient public key") + } + + // Encrypt payload + buf := bytes.NewBuffer(nil) + var encrChannel string + var channel *pb.ChannelSettings + if p.PkiEncrypted { + channel = pkcChan + encrChannel = pkcChan.Name + p.Channel = 0 + sh, err := curve25519.X25519(n.cfg.X25519SecretKey, p.PublicKey) + if err != nil { + return err + } + h := sha256.New() + h.Write(sh) + shk := h.Sum(nil) + c, err := aes.NewCipher(shk) + if err != nil { + return err + } + aead, err := ccm.NewCCM(c, 8, 13) + if err != nil { + return err + } + var nonceExtra [4]byte + if _, err := rand.Read(nonceExtra[:]); err != nil { + return err + } + nonce := buildPkcNonce(p.Id, binary.LittleEndian.Uint32(nonceExtra[:]), p.From) + ciphertextAndTag := aead.Seal(nil, nonce, plaintext, nil) + buf.Write(ciphertextAndTag) + buf.Write(nonceExtra[:]) + } else { + chIdx := p.Channel + if chIdx >= uint32(len(n.cfg.Channels.Settings)) { + return errors.New("invalid channel index") + } + channel = n.cfg.Channels.Settings[chIdx] + encrChannel = fmt.Sprintf("#%s", channel.Name) + ciphertext, err := radio.XOR( + plaintext, + channel.Psk, + p.Id, + p.From, + ) + if err != nil { + return fmt.Errorf("failed to PSK encrypt: %w", err) + } + buf.Write(ciphertext) + } + b := buf.Bytes() + buf.Reset() + out := proto.Clone(p).(*pb.MeshPacket) + out.PayloadVariant = &pb.MeshPacket_Encrypted{Encrypted: b} + if err := n.mesh.WriteMeshPacket(out); err != nil { + n.logger.Debug("X( MeshPacket", "channel", encrChannel, "packet", p.String(), "out", out.String()) + return err + } + n.dl.LogPacket(p, channel, &pb.NodeInfo{Num: n.nodeID, User: n.user}, dstNi, decoded, decoded.Payload) + pt := PacketTruncated{ + To: p.To, + From: p.From, + ID: p.Id, + } + n.pktLru.Add(pt, struct{}{}) + n.logger.Debug("(( MeshPacket", "channel", encrChannel, "packet", p.String(), "out", out.String()) + return nil +} + +func (n *Node) broadcastNodeInfo(ctx context.Context) error { + n.logger.Info("(( NodeInfo") + return n.txPortnumMessage(0, broadcastID, pb.PortNum_NODEINFO_APP, n.user) +} + +func (n *Node) getPosition() *pb.Position { + return &pb.Position{ + LatitudeI: &n.cfg.PositionLatitudeI, + LongitudeI: &n.cfg.PositionLongitudeI, + Altitude: &n.cfg.PositionAltitude, + Time: uint32(time.Now().Unix()), + LocationSource: pb.Position_LOC_MANUAL, + } +} + +func (n *Node) broadcastPosition(ctx context.Context) error { + n.logger.Info("(( Position") + position := n.getPosition() + n.NodeInfo(n.nodeID, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.Position = position + return true + }) + return n.txPortnumMessage(0, broadcastID, pb.PortNum_POSITION_APP, position) +} + +func (n *Node) getDeviceMetrics() (*pb.DeviceMetrics, error) { + cmdStdout := bytes.NewBuffer(nil) + cmd := exec.Command("upsc", n.cfg.DeviceMetricsUPSAddress) + cmd.Stdout = cmdStdout + err := cmd.Run() + if err != nil { + n.logger.Error("failed to get UPS data", "err", err) + return nil, err + } + stdoutBytes := cmdStdout.Bytes() + stdout := bufio.NewReader(bytes.NewReader(stdoutBytes)) + var batteryLevel uint32 + var voltage float32 + var channelUtilization float32 + var airUtilTx float32 + for { + line, _, err := stdout.ReadLine() + if errors.Is(err, io.EOF) { + break + } + lineValue := bytes.SplitN(line, []byte(": "), 2) + if len(lineValue) != 2 { + continue + } + if bytes.Equal(lineValue[0], []byte("battery.voltage")) { + v, err := strconv.ParseFloat(string(lineValue[1]), 64) + if err != nil { + continue + } + voltage = float32(v) + } + if bytes.Equal(lineValue[0], []byte("battery.charge")) { + charge, err := strconv.ParseUint(string(lineValue[1]), 10, 64) + if err != nil { + continue + } + batteryLevel = uint32(charge) + } + } + if batteryLevel == 100 && voltage > 0 { + batteryLevel = 101 + } + uptime := uint32(time.Since(n.start).Seconds()) + return &pb.DeviceMetrics{ + BatteryLevel: &batteryLevel, + Voltage: &voltage, + ChannelUtilization: &channelUtilization, + AirUtilTx: &airUtilTx, + UptimeSeconds: &uptime, + }, nil +} +func (n *Node) broadcastDeviceMetrics(ctx context.Context) error { + n.logger.Info("(( DeviceMetrics") + deviceMetrics, err := n.getDeviceMetrics() + if err != nil { + return err + } + n.NodeInfo(n.nodeID, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.DeviceMetrics = deviceMetrics + return true + }) + return n.txPortnumMessage(0, broadcastID, pb.PortNum_TELEMETRY_APP, &pb.Telemetry{ + Variant: &pb.Telemetry_DeviceMetrics{ + DeviceMetrics: deviceMetrics, + }, + }) +} + +func (n *Node) getDeviceMetadata() *pb.DeviceMetadata { + return &pb.DeviceMetadata{ + FirmwareVersion: "2.6.0-golang", + DeviceStateVersion: 24, + HwModel: pb.HardwareModel_PRIVATE_HW, + Role: pb.Config_DeviceConfig_CLIENT_MUTE, + HasPKC: true, + HasRemoteHardware: false, + HasWifi: false, + HasBluetooth: false, + HasEthernet: false, + CanShutdown: false, + } +} + +// dispatchMessageToAdmin sends a FromRadio message to all current subscribers to +// the FromRadio. +func (n *Node) dispatchMessageToAdmin(msg *pb.FromRadio) error { + if len(n.adminConn) == 0 { + return errNoAdminConnection + } + for conn := range n.adminConn { + if err := conn.Write(msg); err != nil { + n.logger.Errorf("failed to send to admin: %w", err) + } + } + return nil +} diff --git a/nodeadmin.go b/nodeadmin.go new file mode 100644 index 0000000..e6b88a5 --- /dev/null +++ b/nodeadmin.go @@ -0,0 +1,392 @@ +package main + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "net" + "strings" + "time" + + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "github.com/meshnet-gophers/meshtastic-go/transport" + "google.golang.org/protobuf/proto" +) + +func phoneLastUpdateKey(identifier string) []byte { + return []byte("phone-lastupdate-" + identifier) +} + +func (n *Node) setPhoneLastUpdate(identifier string, t time.Time) error { + var lastUpdate [8]byte + binary.BigEndian.PutUint64(lastUpdate[:], uint64(t.UnixMicro())) + return n.db.Set(phoneLastUpdateKey(identifier), lastUpdate[:]) +} + +func (n *Node) handleToRadioWantConfigID(identifier string, conn *transport.StreamConn, req *pb.ToRadio_WantConfigId) error { + // Send MyInfo + err := conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_MyInfo{ + MyInfo: &pb.MyNodeInfo{ + MyNodeNum: n.nodeID, + RebootCount: 0, // use db nonce key? + // TODO: Track this as a const + MinAppVersion: MinAppVersion, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Metadata + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Metadata{ + Metadata: n.getDeviceMetadata(), + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send our node + nodeInfo := n.NodeInfo(n.nodeID, nil) + if nodeInfo.User == nil { + nodeInfo.User = n.user + } + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_NodeInfo{ + NodeInfo: nodeInfo, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Other Nodes + nodes, err := n.db.ListNodeInfo() + if err != nil { + return fmt.Errorf("listing nodes: %w", err) + } + for _, ni := range nodes { + if ni.Num == n.nodeID { + continue + } + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_NodeInfo{ + NodeInfo: ni, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + + // Send Channels + for i, channel := range n.cfg.Channels.Settings { + role := pb.Channel_SECONDARY + if i == 0 { + role = pb.Channel_PRIMARY + } + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Channel{ + Channel: &pb.Channel{ + Index: int32(i), + Settings: &pb.ChannelSettings{ + Name: channel.Name, + Psk: channel.Psk, + }, + Role: role, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + + // Send Config: Device + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Config{ + Config: &pb.Config{ + PayloadVariant: &pb.Config_Device{ + Device: &pb.Config_DeviceConfig{ + NodeInfoBroadcastSecs: uint32(n.cfg.BroadcastNodeInfoInterval.Seconds()), + }, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Config: LoRa + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Config{ + Config: &pb.Config{ + PayloadVariant: &pb.Config_Lora{ + Lora: &pb.Config_LoRaConfig{ + UsePreset: true, + ModemPreset: pb.Config_LoRaConfig_SHORT_FAST, + Region: pb.Config_LoRaConfig_EU_868, + ConfigOkToMqtt: true, + }, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send ConfigComplete to indicate we're done + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_ConfigCompleteId{ + ConfigCompleteId: req.WantConfigId, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + start := uint64(0) + lastPhoneUpdate, _ := n.db.Get(phoneLastUpdateKey(identifier)) + if len(lastPhoneUpdate) == 8 { + start = binary.BigEndian.Uint64(lastPhoneUpdate) + } else { + start = uint64(time.Now().UnixMicro()) + } + startTime := time.UnixMicro(int64(start)) + now := time.Now() + n.logger.Info("** Replaying FromRadio packets to admin...", "from", startTime, "to", now) + c, errc := n.db.ReadFromRadio(startTime, now) + for msg := range c { + pkt := msg.GetPacket() + // process only messages that are broadcast or to us. + if pkt == nil || (pkt.To != broadcastID && pkt.To != n.nodeID) { + continue + } + if err := conn.Write(msg); err != nil { + return fmt.Errorf("failed to send to admin: %w", err) + } + + } + err = <-errc + n.logger.Info("** Finished WantConfigID", "err", err) + + n.setPhoneLastUpdate(identifier, now) + return err +} + +func (n *Node) handleConn(ctx context.Context, underlying io.ReadWriteCloser, identifier string) error { + streamConn := transport.NewRadioStreamConn(underlying) + n.adminConn[streamConn] = struct{}{} + defer func() { + delete(n.adminConn, streamConn) + if err := streamConn.Close(); err != nil { + n.logger.Error("failed to close streamConn", "err", err) + } + }() + go func() { + <-ctx.Done() + streamConn.Close() + }() + + var configured bool + var lastPktTime time.Time + defer func() { + if lastPktTime.IsZero() { + return + } + n.setPhoneLastUpdate(identifier, lastPktTime) + }() + for { + msg := &pb.ToRadio{} + if err := streamConn.Read(msg); err != nil { + return fmt.Errorf("reading from streamConn: %w", err) + } + if configured { + lastPktTime = time.Now() + } + n.logger.Info(">> ToRadio", "msg", msg) + switch payload := msg.PayloadVariant.(type) { + case *pb.ToRadio_Disconnect: + // The meshtastic python client sends a Disconnect command and with the TCP implementation, it expects + // the radio to close the connection. So we end the read loop here, and return to close the connection. + streamConn.Close() + return nil + case *pb.ToRadio_WantConfigId: + if err := n.handleToRadioWantConfigID(identifier, streamConn, payload); err != nil { + return fmt.Errorf("handling WantConfigId: %w", err) + } + configured = true + case *pb.ToRadio_Packet: + var reply *pb.FromRadio_Packet + var sendAck bool = payload.Packet.WantAck + var errorCode pb.Routing_Error = pb.Routing_NONE + if decoded := payload.Packet.GetDecoded(); decoded != nil { + n.logger.Info(">> ToRadio_Packet", "payload", payload) + if payload.Packet.To == n.nodeID { + switch decoded.Portnum { + case pb.PortNum_ADMIN_APP: + admin := &pb.AdminMessage{} + if err := proto.Unmarshal(decoded.Payload, admin); err != nil { + return fmt.Errorf("unmarshalling admin: %w", err) + } + n.logger.Info(fmt.Sprintf(">> AdminMessage %T:", admin.PayloadVariant), "msg", admin.PayloadVariant) + + switch adminPayload := admin.PayloadVariant.(type) { + case *pb.AdminMessage_GetChannelRequest: + n.logger.Info(">> AdminMessage.GetChannelRequest", "adminPayload", adminPayload, "packet", payload) + chIdx := adminPayload.GetChannelRequest - 1 + var c *pb.ChannelSettings + role := pb.Channel_DISABLED + if chIdx > uint32(len(n.cfg.Channels.Settings)) { + c = &pb.ChannelSettings{} + } else { + c = n.cfg.Channels.Settings[chIdx] + role = pb.Channel_SECONDARY + if chIdx == 0 { + role = pb.Channel_PRIMARY + } + } + resp := &pb.AdminMessage{ + PayloadVariant: &pb.AdminMessage_GetChannelResponse{ + GetChannelResponse: &pb.Channel{ + Index: int32(chIdx), + Settings: c, + Role: role, + }, + }, + } + respBytes, err := proto.Marshal(resp) + if err != nil { + return fmt.Errorf("marshalling GetChannelResponse: %w", err) + } + // Send GetChannelResponse + reply = &pb.FromRadio_Packet{ + Packet: &pb.MeshPacket{ + Id: n.nextPacketID(), + From: n.nodeID, + To: n.nodeID, + PayloadVariant: &pb.MeshPacket_Decoded{ + Decoded: &pb.Data{ + Portnum: pb.PortNum_ADMIN_APP, + Payload: respBytes, + RequestId: payload.Packet.Id, + }, + }, + }, + } + case *pb.AdminMessage_SetFavoriteNode: + n.NodeInfo(adminPayload.SetFavoriteNode, func(ni *pb.NodeInfo) bool { + ni.IsFavorite = true + return true + }) + case *pb.AdminMessage_RemoveFavoriteNode: + n.NodeInfo(adminPayload.RemoveFavoriteNode, func(ni *pb.NodeInfo) bool { + ni.IsFavorite = false + return true + }) + case *pb.AdminMessage_RemoveByNodenum: + n.NodeInfo(adminPayload.RemoveByNodenum, func(ni *pb.NodeInfo) bool { + ni.Num = 0 // delete NodeInfo + return true + }) + case *pb.AdminMessage_SetTimeOnly: + default: + errorCode = pb.Routing_BAD_REQUEST + n.logger.Warnf(">> !Unhandled AdminMessage of type '%T'", adminPayload) + } + default: + n.logger.Warnf(">> No handling of ToRadio.Packet to us w/ portNum '%s'", decoded.Portnum.String()) + } + } else { + defaultWarning := true + switch decoded.Portnum { + case pb.PortNum_AUDIO_APP, pb.PortNum_NODEINFO_APP, pb.PortNum_TEXT_MESSAGE_APP, pb.PortNum_POSITION_APP, pb.PortNum_TRACEROUTE_APP: + defaultWarning = false + fallthrough + default: + if defaultWarning { + n.logger.Warnf(">> Default handling of ToRadio.Packet message w/ portNum '%s'", decoded.Portnum.String()) + } + from := payload.Packet.From + if from == 0 { + from = n.nodeID + } + pkt := proto.Clone(payload.Packet).(*pb.MeshPacket) + pkt.From = from + decoded := pkt.GetDecoded() + if decoded.Bitfield != nil { + *decoded.Bitfield |= (1 << 0) // ok to mqtt + } else { + bitfield := uint32(1) + decoded.Bitfield = &bitfield // ok to mqtt + } + if err := n.txPacket(pkt); err != nil { + return fmt.Errorf("failed to tx packet: %w", err) + } + } + } + } + if errorCode != pb.Routing_NONE || sendAck { + if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: buildRoutingReply(payload.Packet, errorCode)}}); err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + if reply != nil { + if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: reply.Packet}}); err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + case *pb.ToRadio_Heartbeat: + default: + n.logger.Warnf(">> !Unhandled ToRadio message of type '%T'", payload) + } + } +} + +func (n *Node) listenTCP(ctx context.Context) error { + l, err := net.Listen("tcp", n.cfg.TCPListenAddr) + if err != nil { + return fmt.Errorf("listening: %w", err) + } + defer l.Close() + n.logger.Info("** Listening for tcp connections", "addr", n.cfg.TCPListenAddr) + + go func() { + <-ctx.Done() + l.Close() + }() + for { + c, err := l.Accept() + if err != nil { + n.logger.Error("!! Failed to accept connection", "err", err) + return err + } + go func(c net.Conn) { + defer c.Close() + connctx, cancel := context.WithCancel(ctx) + defer cancel() + tcpconn := c.(*net.TCPConn) + tcpconn.SetKeepAlive(true) + tcpconn.SetKeepAlivePeriod(25 * time.Second) + address := tcpconn.RemoteAddr().String() + ipString := strings.Split(address, ":")[0] + n.logger.Info("** Accepted connection", "addr", address) + if err := n.handleConn(connctx, c, ipString); err != nil { + if errors.Is(err, io.EOF) { + n.logger.Info("** Connection EOF", "addr", address) + } else { + n.logger.Error("!! Connection error", "err", err) + } + } else { + n.logger.Info("** Connection ended", "addr", address) + } + }(c) + } +} diff --git a/noderxloop.go b/noderxloop.go new file mode 100644 index 0000000..f4ed602 --- /dev/null +++ b/noderxloop.go @@ -0,0 +1,280 @@ +package main + +import ( + "bytes" + "context" + "encoding/hex" + "fmt" + + "github.com/charmbracelet/log" + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "google.golang.org/protobuf/proto" +) + +func (n *Node) RxLoop(ctx context.Context) error { + for { + if err := ctx.Err(); err != nil { + return err + } + p, ts, err := n.mesh.ReadMeshPacket() + if err != nil { + n.logger.Error("Read mesh packet failed", "err", err) + return err + } + if err := n.rxMessage(ts, p); err != nil { + n.logger.Error("Processing Radio Message", "error", err) + } + } +} + +func (n *Node) rxMessage(ts uint64, p *pb.MeshPacket) error { + pt := PacketTruncated{To: p.To, From: p.From, ID: p.Id} + info := fmt.Sprintf("ts:%d rssi:%-4d snr:%-5.1f", p.RxTime, p.RxRssi, p.RxSnr) + header := fmt.Sprintf("from:%08x to:%08x id:%08x ch:%02x next:%02x last:%02x hs:%d hl:%d mq:%d wack:%d", p.From, p.To, p.Id, uint8(p.Channel), p.NextHop, p.RelayNode, p.HopStart, p.HopLimit, bool2Int(p.ViaMqtt), bool2Int(p.WantAck)) + payloadstr := fmt.Sprintf("%x", p.GetEncrypted()) + var dupe bool + if _, dupe = n.pktLru.Get(pt); dupe { + n.logger.Debug(") MeshPacket", "info", info, "header", header, "payload", payloadstr) + } else { + n.logger.Debug(")) MeshPacket", "info", info, "header", header, "payload", payloadstr) + n.pktLru.Add(pt, struct{}{}) + } + if dupe { // XXX: it would need to be changed for the traceroute + return nil // skip duplicates + } + if p.From == n.nodeID { + return nil // ourself + } + if len(p.GetEncrypted()) == 0 { + n.logger.Warn(":: Skipping unencrypted or empty packet") + return nil // we skip unencrypted packets + } + var plaintext []byte + var pubKey []byte + var channel *pb.ChannelSettings + if p.To == n.nodeID && p.Channel == 0x00 { + var err error + if plaintext, pubKey, err = n.tryDecryptPKC(p); err != nil { + log.Printf("failed to decrypt PKC message: %s", err) + } + } + if plaintext != nil { + channel = pkcChan + p.Channel = 0 + } else { + plaintext, channel = n.tryDecryptPSK(p) + if plaintext == nil { + return nil // failed decrypt + } + p.Channel = channel.ChannelNum + } + data := &pb.Data{} + if err := proto.Unmarshal(plaintext, data); err != nil { + n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext)) + return fmt.Errorf("unmarshalling decrypted data: %w", err) + } + + p.PublicKey = pubKey + p.PkiEncrypted = channel == pkcChan + p.PayloadVariant = &pb.MeshPacket_Decoded{Decoded: data} + msgFromRadio := &pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: p}} + source := data.Source + if source == 0 { + source = p.From + } + dest := data.Dest + if dest == 0 { + dest = p.To + } + toUs := dest == n.nodeID + if dest == broadcastID || toUs { + n.dispatchMessageToAdmin(msgFromRadio) + } + n.db.StoreFromRadio(int64(ts), msgFromRadio) + n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext), "data", data) + + src := n.NodeInfo(source, func(ni *pb.NodeInfo) bool { + if ni.Num == n.nodeID { + return false // it's us + } + if p.HopLimit > p.HopStart { + return false // weird hoplimit value + } + hopsAway := uint32(p.HopStart - p.HopLimit) + if ni.HopsAway != nil && *ni.HopsAway <= hopsAway { + return false // skip update if these hops away are longer + } + ni.HopsAway = &hopsAway + return true + }) + dst := n.NodeInfo(dest, nil) + + var payload any + var reply proto.Message + replyPortNum := data.Portnum + switch data.Portnum { + case pb.PortNum_ADMIN_APP: + admin := &pb.AdminMessage{} + if err := proto.Unmarshal(data.Payload, admin); err != nil { + return fmt.Errorf("unmarshalling admin message: %w", err) + } + payload = admin + switch adminPayload := admin.PayloadVariant.(type) { + case *pb.AdminMessage_GetDeviceMetadataRequest: + if data.WantResponse && toUs { + reply = &pb.AdminMessage{ + PayloadVariant: &pb.AdminMessage_GetDeviceMetadataResponse{ + GetDeviceMetadataResponse: n.getDeviceMetadata(), + }, + } + } + default: + n.logger.Warnf("unhandled admin message type: %T", adminPayload) + } + case pb.PortNum_NODEINFO_APP: + user := &pb.User{} + if err := proto.Unmarshal(data.Payload, user); err != nil { + return fmt.Errorf("unmarshalling user: %w", err) + } + payload = user + n.logger.Info(")) NodeInfo", "user", user) + n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { + if nodeInfo.User != nil { + if len(nodeInfo.User.PublicKey) > 0 && !bytes.Equal(nodeInfo.User.PublicKey, user.PublicKey) { + n.logger.Error("NodeInfo w/ different key", "nodeInfo", nodeInfo, "new", user) + return false // do not update + } + } + nodeInfo.User = user + return true + }) + if data.WantResponse && toUs { + reply = n.user + } + case pb.PortNum_TEXT_MESSAGE_APP: + n.logger.Info(")) TextMessage", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "text", string(data.Payload)) + case pb.PortNum_ROUTING_APP: + routing := &pb.Routing{} + if err := proto.Unmarshal(data.Payload, routing); err != nil { + return fmt.Errorf("unmarshalling routingPayload: %w", err) + } + payload = routing + n.logger.Info(")) Routing", "routing", routing) + case pb.PortNum_TRACEROUTE_APP: + routeDiscovery := &pb.RouteDiscovery{} + if err := proto.Unmarshal(data.Payload, routeDiscovery); err != nil { + return fmt.Errorf("unmarshalling traceroute payload: %w", err) + } + payload = routeDiscovery + n.logger.Info(")) RouteDiscovery", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "payload", routeDiscovery) + if !toUs { + break + } + towardsDestination := data.ReplyId == 0 + var route *[]uint32 + var snr *[]int32 + if towardsDestination { + route = &routeDiscovery.Route + snr = &routeDiscovery.SnrTowards + } else { + route = &routeDiscovery.RouteBack + snr = &routeDiscovery.SnrBack + } + // Only insert unknown hops if hop_start is valid + if p.HopStart != 0 && p.HopLimit <= p.HopStart { + var hopsTaken uint8 = uint8(p.HopStart - p.HopLimit) + diff := int(hopsTaken) - len(*route) + for i := 0; i < diff; i++ { + if len(*route) < ROUTE_MAX_SIZE { + *route = append(*route, broadcastID) + } + } + // Add unknown SNR values if necessary + diff = len(*route) - len(*snr) + for i := 0; i < diff; i++ { + if len(*snr) < ROUTE_MAX_SIZE { + *snr = append(*snr, 0xFF) + } + } + } + + if len(*snr) < ROUTE_MAX_SIZE { + *snr = append(*snr, int32(p.RxSnr*4)) + } + // We don't need to add route, if we are the dest + // // Length of route array can normally not be exceeded due to the max. hop_limit of 7 + // if *route_count < ROUTE_SIZE { + // route[*route_count] = myNodeInfo.my_node_num + // *route_count += 1 + // } else { + // LOG_WARN("Route exceeded maximum hop limit!") // Are you bridging networks? + // } + reply = routeDiscovery + case pb.PortNum_POSITION_APP: + position := &pb.Position{} + if err := proto.Unmarshal(data.Payload, position); err != nil { + return fmt.Errorf("unmarshalling positionPayload: %w", err) + } + payload = position + n.logger.Info(")) Position", "position", position) + if data.WantResponse && toUs { + reply = n.getPosition() + } else { + n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.Position = position + return true + }) + } + case pb.PortNum_TELEMETRY_APP: + telemetry := &pb.Telemetry{} + if err := proto.Unmarshal(data.Payload, telemetry); err != nil { + return fmt.Errorf("unmarshalling telemetryPayload: %w", err) + } + payload = telemetry + n.logger.Info(")) Telemetry deviceMetrics", "telemetry", telemetry) + if data.WantResponse && toUs { + metrics, err := n.getDeviceMetrics() + if err != nil { + break + } + reply = metrics + } else { + deviceMetrics := telemetry.GetDeviceMetrics() + if deviceMetrics == nil { + break + } + n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.DeviceMetrics = deviceMetrics + return true + }) + } + default: + n.logger.Debug(")) !Unhandled app payload", "data", data) + } + n.dl.LogPacket(p, channel, src, dst, data, payload) + if toUs && p.WantAck { + err := n.txPacket(buildRoutingReply(&pb.MeshPacket{ + From: p.From, + To: p.To, + HopStart: uint32(p.HopStart), + Channel: channel.ChannelNum, + Id: p.Id, + PkiEncrypted: msgFromRadio.GetPacket().PkiEncrypted, + PublicKey: msgFromRadio.GetPacket().PublicKey, + }, pb.Routing_NONE)) + if err != nil { + n.logger.Warn("failed to send text ack", "err", err) + } + } + if reply != nil { + err := n.txPortnumMessage(channel.ChannelNum, source, replyPortNum, reply) + if err != nil { + n.logger.Error("Failed to reply", "err", err, "reply", reply) + } else { + n.dl.LogPacket(p, channel, dst, src, &pb.Data{Portnum: replyPortNum}, payload) + } + } + + return nil +} diff --git a/timers.go b/timers.go new file mode 100644 index 0000000..095e42c --- /dev/null +++ b/timers.go @@ -0,0 +1,23 @@ +package main + +import ( + "context" + "time" +) + +func FuncTicker(interval time.Duration, ctx context.Context, fn func(context.Context) error) func() error { + return func() error { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + } + if err := fn(ctx); err != nil { + return err + } + } + } +} |
