From 220671ed46e10e4c67741286da493df1d45cdcfc Mon Sep 17 00:00:00 2001 From: Marin Ivanov Date: Wed, 3 Sep 2025 20:59:24 +0300 Subject: Initial release --- .gitignore | 3 + config.go | 53 +++++ config.json.example | 43 ++++ data/.gitkeep | 0 database.go | 187 ++++++++++++++++ datalogger.go | 96 +++++++++ go.mod | 63 ++++++ go.sum | 160 ++++++++++++++ helpers.go | 5 + kava.go | 168 +++++++++++++++ logs/.gitkeep | 0 lru.go | 24 +++ main.go | 133 ++++++++++++ mestastic.go | 90 ++++++++ mqtt.go | 205 ++++++++++++++++++ node.go | 598 ++++++++++++++++++++++++++++++++++++++++++++++++++++ nodeadmin.go | 392 ++++++++++++++++++++++++++++++++++ noderxloop.go | 280 ++++++++++++++++++++++++ timers.go | 23 ++ 19 files changed, 2523 insertions(+) create mode 100644 .gitignore create mode 100644 config.go create mode 100644 config.json.example create mode 100644 data/.gitkeep create mode 100644 database.go create mode 100644 datalogger.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 helpers.go create mode 100644 kava.go create mode 100644 logs/.gitkeep create mode 100644 lru.go create mode 100644 main.go create mode 100644 mestastic.go create mode 100644 mqtt.go create mode 100644 node.go create mode 100644 nodeadmin.go create mode 100644 noderxloop.go create mode 100644 timers.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2fdfb0b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/data +/config.json +/logs \ No newline at end of file diff --git a/config.go b/config.go new file mode 100644 index 0000000..4698998 --- /dev/null +++ b/config.go @@ -0,0 +1,53 @@ +package main + +import ( + "encoding/json" + "os" +) + +type Config struct { + NodeID []byte `json:"nodeID"` + Name string `json:"name"` + ShortName string `json:"shortName"` + DefaultHops uint32 `json:"defaultHops"` + MaxHops uint32 `json:"maxHops"` + NodeBroadcastInterval float64 `json:"nodeBroadcastInterval"` + X25519Key []byte `json:"x25519Key"` + Channels []struct { + Name string `json:"name"` + PSK []byte `json:"psk"` + } `json:"channels"` + Position struct { + BroadcastInterval float64 `json:"broadcastInterval"` + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Altitude float64 `json:"altitude"` + } `json:"position"` + DeviceMetrics struct { + BroadcastInterval float64 `json:"broadcastInterval"` + UPSAddress string `json:"upsAddress"` + } `json:"deviceMetrics"` + KavaModem struct { + Address string `json:"address"` + } `json:"kavaModem"` + MqttIf struct { + Address string `json:"address"` + Username string `json:"username"` + Password string `json:"password"` + ClientID string `json:"clientId"` + Root string `json:"root"` + } `json:"mqttIf"` +} + +func ReadConfig(filename string) (*Config, error) { + cfg := Config{} + f, err := os.Open(filename) + if err != nil { + return nil, err + } + dec := json.NewDecoder(f) + if err := dec.Decode(&cfg); err != nil { + return nil, err + } + return &cfg, nil +} diff --git a/config.json.example b/config.json.example new file mode 100644 index 0000000..14ab023 --- /dev/null +++ b/config.json.example @@ -0,0 +1,43 @@ +{ + "nodeID": "7rpREQ==", + "name": "oti sho", + "shortName": "oti", + "defaultHops": 1, + "maxHops": 1, + "nodeBroadcastInterval": 1200, + "x25519Key": "", + "position": { + "broadcastInterval": 86400, + "latitude": 51.5014760, + "longitude": -0.1406340, + "altitude": 2.0 + }, + "deviceMetrics": { + "broadcastInterval": 300, + "upsAddress": "upsname@localhost" + }, + "channels": [ + { + "name": "ShortFast", + "psk": "1PG7OiApB1nwvP+rz05pAQ==" + }, + { + "name": "Blagoevgrad", + "psk": "1PG7OiApB1nwvP+rz05pAQ==" + }, + { + "name": "Bulgaria", + "psk": "1PG7OiApB1nwvP+rz05pAQ==" + } + ], + "kavaModem": { + "address": "127.0.0.1:3333" + }, + "mqttIf": { + "address": "mqtt://127.0.0.1:1883", + "username": "meshdev", + "password": "large4cats", + "clientId": "motisho", + "root": "msh/Bulgaria" + } +} diff --git a/data/.gitkeep b/data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/database.go b/database.go new file mode 100644 index 0000000..dc410d8 --- /dev/null +++ b/database.go @@ -0,0 +1,187 @@ +package main + +import ( + "encoding/binary" + "errors" + "fmt" + "time" + + "github.com/cockroachdb/pebble/v2" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "google.golang.org/protobuf/proto" +) + +type BucketEnum uint16 + +const ( + BucketState BucketEnum = iota + BucketNodeInfo + BucketFromRadio +) + +type StateKeyEnum uint32 + +const ( + StateKeyNull StateKeyEnum = iota + StateKeyPhoneLastUpdate +) + +var ( + errNotExists = errors.New("not exist") +) + +type Database struct { + p *pebble.DB +} + +func NewDB(dbPath string) (*Database, error) { + p, err := pebble.Open(dbPath, &pebble.Options{}) + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + return &Database{ + p: p, + }, nil +} + +func (db *Database) Close() error { + return db.p.Close() +} + +func (db *Database) keyUint32(bucket BucketEnum, k uint32) []byte { + var key [6]byte + binary.BigEndian.PutUint16(key[0:2], uint16(bucket)) + binary.BigEndian.PutUint32(key[2:6], k) + return key[:] +} + +func (db *Database) keyUint64(bucket BucketEnum, k uint64) []byte { + var key [10]byte + binary.BigEndian.PutUint16(key[0:2], uint16(bucket)) + binary.BigEndian.PutUint64(key[2:10], k) + return key[:] +} + +func (db *Database) Get(key []byte) ([]byte, error) { + data, rd, err := db.p.Get(key) + if err == pebble.ErrNotFound { + return nil, errNotExists + } else if err != nil { + return nil, err + } + defer rd.Close() + return data, nil +} + +func (db *Database) Set(key []byte, value []byte) error { + return db.p.Set(key, value, pebble.Sync) +} + +func (db *Database) GetState(key StateKeyEnum) ([]byte, error) { + data, rd, err := db.p.Get(db.keyUint32(BucketState, uint32(key))) + if err == pebble.ErrNotFound { + return nil, errNotExists + } else if err != nil { + return nil, err + } + defer rd.Close() + return data, nil +} + +func (db *Database) SetState(key StateKeyEnum, value []byte) error { + return db.p.Set(db.keyUint32(BucketState, uint32(key)), value, pebble.Sync) +} + +func (db *Database) SetNodeInfo(num uint32, ni *pb.NodeInfo) error { + key := db.keyUint32(BucketNodeInfo, num) + if num != 0 && ni.Num == 0 { + return db.p.Delete(key, pebble.Sync) + } + ni.Num = num + data, err := proto.Marshal(ni) + if err != nil { + return err + } + return db.p.Set(key, data, pebble.Sync) +} + +func (db *Database) GetNodeInfo(num uint32) (*pb.NodeInfo, error) { + data, rd, err := db.p.Get(db.keyUint32(BucketNodeInfo, num)) + if err == pebble.ErrNotFound { + return nil, errNotExists + } else if err != nil { + return nil, err + } + defer rd.Close() + var ni pb.NodeInfo + if err = proto.Unmarshal(data, &ni); err != nil { + return nil, err + } + return &ni, nil +} + +func (db *Database) ListNodeInfo() ([]*pb.NodeInfo, error) { + it, err := db.p.NewIter(&pebble.IterOptions{ + LowerBound: db.keyUint32(BucketNodeInfo, uint32(0)), + UpperBound: db.keyUint32(BucketNodeInfo+1, uint32(0)), + }) + if err != nil { + return nil, err + } + defer it.Close() + var list []*pb.NodeInfo + for it.First(); it.Valid(); it.Next() { + key := it.Key() + data := it.Value() + ni := &pb.NodeInfo{} + if err = proto.Unmarshal(data, ni); err != nil { + return nil, err + } + ni.Num = binary.BigEndian.Uint32(key[2:6]) + list = append(list, ni) + } + return list, nil +} + +func (db *Database) StoreFromRadio(tsMicro int64, msg *pb.FromRadio) error { + b, err := proto.Marshal(msg) + if err != nil { + return err + } + return db.p.Set(db.keyUint64(BucketFromRadio, uint64(tsMicro)), b, pebble.Sync) +} + +func (db *Database) ReadFromRadio(start, end time.Time) (<-chan *pb.FromRadio, <-chan error) { + c := make(chan *pb.FromRadio) + errc := make(chan error, 1) + lowerLimit := uint64(start.UnixMicro()) + upperLimit := uint64(end.UnixMicro()) + go func() { + defer close(c) + defer close(errc) + it, err := db.p.NewIter(&pebble.IterOptions{ + LowerBound: db.keyUint64(BucketFromRadio, lowerLimit), + UpperBound: db.keyUint64(BucketFromRadio+1, upperLimit), + }) + if err != nil { + errc <- err + return + } + defer it.Close() + for it.First(); it.Valid(); it.Next() { + key := it.Key() + data := it.Value() + tsMicro := binary.BigEndian.Uint64(key[2:]) + if tsMicro > upperLimit { + continue + } + msg := &pb.FromRadio{} + if err = proto.Unmarshal(data, msg); err != nil { + errc <- err + return + } + c <- msg + } + }() + return c, errc +} diff --git a/datalogger.go b/datalogger.go new file mode 100644 index 0000000..6cd5106 --- /dev/null +++ b/datalogger.go @@ -0,0 +1,96 @@ +package main + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" +) + +type DataLogger struct { + data *os.File + text *os.File +} + +func NewDataLogger(logsDir string) (*DataLogger, error) { + var err error + var d DataLogger + d.data, err = os.OpenFile(filepath.Join(logsDir, "data.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + d.Close() + return nil, err + } + d.text, err = os.OpenFile(filepath.Join(logsDir, "text.log"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + d.Close() + return nil, err + } + + return &d, nil +} + +func (d *DataLogger) Close() error { + if d.data != nil { + d.data.Close() + } + if d.text != nil { + d.text.Close() + } + return nil +} + +func formatNodeInfo(ni *pb.NodeInfo) string { + var str string + if ni != nil { + if ni.User != nil && ni.User.Id != "" { + str = fmt.Sprintf("%s (%s) %s", ni.User.Id, ni.User.ShortName, ni.User.LongName) + } else { + str = meshtastic.NodeID(ni.Num).String() + } + } else { + str = "???" + } + return str +} + +func (d *DataLogger) LogPacket(p *pb.MeshPacket, c *pb.ChannelSettings, src *pb.NodeInfo, dst *pb.NodeInfo, data *pb.Data, payload any) error { + source := formatNodeInfo(src) + destination := formatNodeInfo(dst) + header := fmt.Sprintf("%s | #%d:%s '%s' >> '%s' [rssi:%d snr:%f hs:%d, hl:%d]", time.Unix(int64(p.RxTime), 0), c.ChannelNum, c.Name, source, destination, p.RxRssi, p.RxSnr, p.HopStart, p.HopLimit) + var s fmt.Stringer + var log string + var logfile *os.File = d.data + switch data.Portnum { + case pb.PortNum_NODEINFO_APP: + s, _ = payload.(*pb.User) + case pb.PortNum_TELEMETRY_APP: + s, _ = payload.(*pb.Telemetry) + case pb.PortNum_POSITION_APP: + s, _ = payload.(*pb.Position) + case pb.PortNum_ROUTING_APP: + s, _ = payload.(*pb.Routing) + case pb.PortNum_TRACEROUTE_APP: + s, _ = payload.(*pb.RouteDiscovery) + case pb.PortNum_AUDIO_APP: + log = fmt.Sprintf("Audio message (%dB, %.2fs@800bps)", len(data.Payload), (float32(len(data.Payload)) / 100.0)) + case pb.PortNum_TEXT_MESSAGE_APP: + log = string(data.Payload) + logfile = d.text + default: + log = p.String() + } + if log == "" && s != nil { + log = s.String() + } + if log != "" && logfile != nil { + output := fmt.Sprintf("%s : [%s] %T{ %s }\n", header, data.Portnum, payload, log) + if _, err := logfile.Write([]byte(output)); err != nil { + return err + } + return logfile.Sync() + } + return nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c6d5b18 --- /dev/null +++ b/go.mod @@ -0,0 +1,63 @@ +module go.metala.org/gotishosdr + +go 1.24.1 + +require ( + github.com/charmbracelet/log v0.4.2 + github.com/cockroachdb/pebble/v2 v2.0.6 + github.com/eclipse/paho.golang v0.22.0 + github.com/elastic/go-freelru v0.16.0 + github.com/meshnet-gophers/meshtastic-go v0.1.7 + github.com/pion/dtls/v3 v3.0.6 + github.com/spf13/pflag v1.0.6 + golang.org/x/crypto v0.39.0 + golang.org/x/sync v0.15.0 + google.golang.org/protobuf v1.36.6 +) + +require ( + github.com/DataDog/zstd v1.5.7 // indirect + github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/charmbracelet/colorprofile v0.3.1 // indirect + github.com/charmbracelet/lipgloss v1.1.0 // indirect + github.com/charmbracelet/x/ansi v0.9.3 // indirect + github.com/charmbracelet/x/cellbuf v0.0.13 // indirect + github.com/charmbracelet/x/term v0.2.1 // indirect + github.com/cockroachdb/crlib v0.0.0-20250617202621-0794c595bbe6 // indirect + github.com/cockroachdb/errors v1.12.0 // indirect + github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 // indirect + github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506 // indirect + github.com/cockroachdb/redact v1.1.6 // indirect + github.com/cockroachdb/swiss v0.0.0-20250624142022-d6e517c1d961 // indirect + github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb // indirect + github.com/getsentry/sentry-go v0.34.0 // indirect + github.com/go-logfmt/logfmt v0.6.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/snappy v1.0.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/lucasb-eyer/go-colorful v1.2.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/muesli/termenv v0.16.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/planetscale/vtprotobuf v0.6.0 // indirect + github.com/prometheus/client_golang v1.22.0 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.65.0 // indirect + github.com/prometheus/procfs v0.17.0 // indirect + github.com/rivo/uniseg v0.4.7 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect + golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect + golang.org/x/net v0.40.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect +) + +replace github.com/meshnet-gophers/meshtastic-go v0.1.7 => ../meshtastic-go diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d0c3070 --- /dev/null +++ b/go.sum @@ -0,0 +1,160 @@ +github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE= +github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo= +github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f/go.mod h1:tMDTce/yLLN/SK8gMOxQfnyeMeCg8KGzp0D1cbECEeo= +github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= +github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/charmbracelet/colorprofile v0.3.1 h1:k8dTHMd7fgw4bnFd7jXTLZrSU/CQrKnL3m+AxCzDz40= +github.com/charmbracelet/colorprofile v0.3.1/go.mod h1:/GkGusxNs8VB/RSOh3fu0TJmQ4ICMMPApIIVn0KszZ0= +github.com/charmbracelet/lipgloss v1.1.0 h1:vYXsiLHVkK7fp74RkV7b2kq9+zDLoEU4MZoFqR/noCY= +github.com/charmbracelet/lipgloss v1.1.0/go.mod h1:/6Q8FR2o+kj8rz4Dq0zQc3vYf7X+B0binUUBwA0aL30= +github.com/charmbracelet/log v0.4.2 h1:hYt8Qj6a8yLnvR+h7MwsJv/XvmBJXiueUcI3cIxsyig= +github.com/charmbracelet/log v0.4.2/go.mod h1:qifHGX/tc7eluv2R6pWIpyHDDrrb/AG71Pf2ysQu5nw= +github.com/charmbracelet/x/ansi v0.9.3 h1:BXt5DHS/MKF+LjuK4huWrC6NCvHtexww7dMayh6GXd0= +github.com/charmbracelet/x/ansi v0.9.3/go.mod h1:3RQDQ6lDnROptfpWuUVIUG64bD2g2BgntdxH0Ya5TeE= +github.com/charmbracelet/x/cellbuf v0.0.13 h1:/KBBKHuVRbq1lYx5BzEHBAFBP8VcQzJejZ/IA3iR28k= +github.com/charmbracelet/x/cellbuf v0.0.13/go.mod h1:xe0nKWGd3eJgtqZRaN9RjMtK7xUYchjzPr7q6kcvCCs= +github.com/charmbracelet/x/term v0.2.1 h1:AQeHeLZ1OqSXhrAWpYUtZyX1T3zVxfpZuEQMIQaGIAQ= +github.com/charmbracelet/x/term v0.2.1/go.mod h1:oQ4enTYFV7QN4m0i9mzHrViD7TQKvNEEkHUMCmsxdUg= +github.com/cockroachdb/crlib v0.0.0-20250617202621-0794c595bbe6 h1:PZVolkXzVqPQQZ7Jm8/lGpI9ZM086mB55oOWqy0wG6E= +github.com/cockroachdb/crlib v0.0.0-20250617202621-0794c595bbe6/go.mod h1:Gq51ZeKaFCXk6QwuGM0w1dnaOqc/F5zKT2zA9D6Xeac= +github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056 h1:slXychO2uDM6hYRu4c0pD0udNI8uObfeKN6UInWViS8= +github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= +github.com/cockroachdb/errors v1.12.0 h1:d7oCs6vuIMUQRVbi6jWWWEJZahLCfJpnJSVobd1/sUo= +github.com/cockroachdb/errors v1.12.0/go.mod h1:SvzfYNNBshAVbZ8wzNc/UPK3w1vf0dKDUP41ucAIf7g= +github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 h1:pU88SPhIFid6/k0egdR5V6eALQYq2qbSmukrkgIh/0A= +github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= +github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506 h1:ASDL+UJcILMqgNeV5jiqR4j+sTuvQNHdf2chuKj1M5k= +github.com/cockroachdb/logtags v0.0.0-20241215232642-bb51bb14a506/go.mod h1:Mw7HqKr2kdtu6aYGn3tPmAftiP3QPX63LdK/zcariIo= +github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA= +github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA= +github.com/cockroachdb/pebble/v2 v2.0.6 h1:eL54kX2AKp1ePJ/8vq4IO3xIEPpvVjlSP12dlLYilyE= +github.com/cockroachdb/pebble/v2 v2.0.6/go.mod h1:un1DXG73PKw3F7Ndd30YactyvsFviI9Fuhe0tENdnyA= +github.com/cockroachdb/redact v1.1.6 h1:zXJBwDZ84xJNlHl1rMyCojqyIxv+7YUpQiJLQ7n4314= +github.com/cockroachdb/redact v1.1.6/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/swiss v0.0.0-20250624142022-d6e517c1d961 h1:Nua446ru3juLHLZd4AwKNzClZgL1co3pUPGv3o8FlcA= +github.com/cockroachdb/swiss v0.0.0-20250624142022-d6e517c1d961/go.mod h1:yBRu/cnL4ks9bgy4vAASdjIW+/xMlFwuHKqtmh3GZQg= +github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb h1:3bCgBvB8PbJVMX1ouCcSIxvsqKPYM7gs72o0zC76n9g= +github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.golang v0.22.0 h1:JhhUngr8TBlyUZDZw/L6WVayPi9qmSmdWeki48i5AVE= +github.com/eclipse/paho.golang v0.22.0/go.mod h1:9ZiYJ93iEfGRJri8tErNeStPKLXIGBHiqbHV74t5pqI= +github.com/elastic/go-freelru v0.16.0 h1:gG2HJ1WXN2tNl5/p40JS/l59HjvjRhjyAa+oFTRArYs= +github.com/elastic/go-freelru v0.16.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= +github.com/getsentry/sentry-go v0.34.0 h1:1FCHBVp8TfSc8L10zqSwXUZNiOSF+10qw4czjarTiY4= +github.com/getsentry/sentry-go v0.34.0/go.mod h1:C55omcY9ChRQIUcVcGcs+Zdy4ZpQGvNJ7JYHIoSWOtE= +github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9 h1:r5GgOLGbza2wVHRzK7aAj6lWZjfbAwiu/RDCVOKjRyM= +github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9/go.mod h1:106OIgooyS7OzLDOpUGgm9fA3bQENb/cFSyyBmMoJDs= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= +github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= +github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= +github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/muesli/termenv v0.16.0 h1:S5AlUN9dENB57rsbnkPyfdGuWIlkmzJjbFf0Tf5FWUc= +github.com/muesli/termenv v0.16.0/go.mod h1:ZRfOIKPFDYQoDFF4Olj7/QJbW60Ol/kL1pU3VfY/Cnk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= +github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pion/dtls/v3 v3.0.6 h1:7Hkd8WhAJNbRgq9RgdNh1aaWlZlGpYTzdqjy9x9sK2E= +github.com/pion/dtls/v3 v3.0.6/go.mod h1:iJxNQ3Uhn1NZWOMWlLxEEHAN5yX7GyPvvKw04v9bzYU= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/planetscale/vtprotobuf v0.6.0 h1:nBeETjudeJ5ZgBHUz1fVHvbqUKnYOXNhsIEabROxmNA= +github.com/planetscale/vtprotobuf v0.6.0/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= +github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2VzE= +github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= +github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0= +github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= +golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= +golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= +golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= +golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..882ddec --- /dev/null +++ b/helpers.go @@ -0,0 +1,5 @@ +package main + +func Ptr[T any](v T) *T { + return &v +} diff --git a/kava.go b/kava.go new file mode 100644 index 0000000..7c2ea65 --- /dev/null +++ b/kava.go @@ -0,0 +1,168 @@ +package main + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "net" + "time" + + "github.com/charmbracelet/log" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "github.com/meshnet-gophers/meshtastic-go/radio" +) + +var ( + errInvalidOpcode = errors.New("invalid opcode") + errInvalidLength = errors.New("invalid length") +) + +// Implements MeshIf +type KavaIf struct { + Address string + KeepAlivePeriod time.Duration + Channels *pb.ChannelSet + + logger *log.Logger + conn net.Conn + modemTimeOffset uint64 +} + +func NewKavaConn(address string, keepAlive time.Duration, channels *pb.ChannelSet) *KavaIf { + logger := log.WithPrefix("Kava") + return &KavaIf{ + Address: address, + KeepAlivePeriod: keepAlive, + Channels: channels, + logger: logger, + } +} + +func (k *KavaIf) Open() error { + conn, err := net.Dial("tcp", k.Address) + if err != nil { + return fmt.Errorf("connecting to modem: %w", err) + } + if k.KeepAlivePeriod > 0 { + tcpconn := conn.(*net.TCPConn) + tcpconn.SetKeepAlive(true) + tcpconn.SetKeepAlivePeriod(k.KeepAlivePeriod) + } + + k.conn = conn + return nil +} + +func (k *KavaIf) Close() error { + return k.conn.Close() +} + +func (k *KavaIf) ReadMeshPacket() (*pb.MeshPacket, uint64, error) { + var header [4]byte + var err error + for { + if _, err = io.ReadFull(k.conn, header[0:4]); err != nil { + return nil, 0, err + } + opcode := binary.LittleEndian.Uint16(header[0:2]) + length := binary.LittleEndian.Uint16(header[2:4]) + if length <= 0 { + continue + } else if length > 255+12 { + return nil, 0, errInvalidLength + } + switch opcode { + case 0x1007: + var ts [8]byte + if _, err = io.ReadFull(k.conn, ts[:]); err != nil { + return nil, 0, err + } + uptime := binary.LittleEndian.Uint64(ts[:]) + k.modemTimeOffset = uint64(time.Now().UnixMicro()) - uptime + k.logger.Info("}} Modem uptime", "µs", uptime, "Δ-to-now", fmt.Sprintf("%+d", k.modemTimeOffset)) + continue + case 0x0001: + var message [272]byte + if _, err := io.ReadFull(k.conn, message[:length]); err != nil { + return nil, 0, err + } + if length < (12 /*kava*/ + 16 /*m8c*/) { + return nil, 0, errInvalidLength + } + + modemTs := binary.LittleEndian.Uint64(message[0:8]) + rssi := int16(binary.LittleEndian.Uint16(message[8:10])) + snr := int16(binary.LittleEndian.Uint16(message[10:12])) + ts := int64(modemTs + k.modemTimeOffset) + timestamp := time.UnixMicro(ts) + data := message[12:length] + flags := RadioHeaderFlag(data[12]) + channelHash := data[13] + payload := data[16:] + p := pb.MeshPacket{ + To: binary.LittleEndian.Uint32(data[0:4]), + From: binary.LittleEndian.Uint32(data[4:8]), + Id: binary.LittleEndian.Uint32(data[8:12]), + RxTime: uint32(timestamp.Unix()), + RxSnr: float32(snr), + RxRssi: int32(rssi), + HopLimit: uint32(flags.HopLimit()), + HopStart: uint32(flags.HopStart()), + WantAck: flags.WantACK() == 1, + ViaMqtt: flags.MQTT() == 1, + NextHop: uint32(data[14]), + RelayNode: uint32(data[15]), + Channel: uint32(channelHash), + PkiEncrypted: channelHash == 0x00, + PayloadVariant: &pb.MeshPacket_Encrypted{Encrypted: payload}, + } + return &p, uint64(ts), nil + default: + return nil, 0, errInvalidOpcode + } + + } +} + +func (k *KavaIf) WriteMeshPacket(p *pb.MeshPacket) error { + viaMQTT := uint32(bool2Int(p.ViaMqtt)) + wantAck := uint32(bool2Int(p.WantAck)) + flags := (p.HopStart << 5) | (viaMQTT << 4) | (wantAck << 3) | p.HopLimit + + var chash uint32 + if p.PkiEncrypted { + chash = 0x00 + } else { + channel := k.Channels.Settings[p.Channel] + chash, _ = radio.ChannelHash(channel.Name, channel.Psk) + } + + buf := bytes.NewBuffer(nil) + binary.Write(buf, binary.LittleEndian, uint32(p.To)) + binary.Write(buf, binary.LittleEndian, uint32(p.From)) + binary.Write(buf, binary.LittleEndian, uint32(p.Id)) + binary.Write(buf, binary.LittleEndian, uint8(flags)) + binary.Write(buf, binary.LittleEndian, uint8(chash)) + binary.Write(buf, binary.LittleEndian, uint8(p.NextHop)) + binary.Write(buf, binary.LittleEndian, uint8(p.RelayNode)) + buf.Write(p.GetEncrypted()) + b := buf.Bytes() + + modemCmd := append([]byte{1, 0, byte(len(b)), 0}, b...) + if _, err := k.conn.Write(modemCmd); err != nil { + return err + } + return nil +} + +func bool2Int(b bool) int { + var i int + if b { + i = 1 + } else { + i = 0 + } + return i +} diff --git a/logs/.gitkeep b/logs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/lru.go b/lru.go new file mode 100644 index 0000000..e0a386f --- /dev/null +++ b/lru.go @@ -0,0 +1,24 @@ +package main + +import "encoding/binary" + +const ( + // FNV-1a + offset32 = uint32(2166136261) + prime32 = uint32(16777619) +) + +func hashPacket(p PacketTruncated) uint32 { + h := offset32 + b := make([]byte, 12) + binary.BigEndian.PutUint32(b[0:4], p.To) + binary.BigEndian.PutUint32(b[4:8], p.From) + binary.BigEndian.PutUint32(b[8:12], p.ID) + for ; len(b) >= 4; b = b[4:] { + h = (h ^ uint32(b[0])) * prime32 + h = (h ^ uint32(b[1])) * prime32 + h = (h ^ uint32(b[2])) * prime32 + h = (h ^ uint32(b[3])) * prime32 + } + return h +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..710f195 --- /dev/null +++ b/main.go @@ -0,0 +1,133 @@ +package main + +import ( + "context" + "encoding/base64" + "encoding/binary" + "fmt" + "os" + "os/signal" + "time" + + "github.com/charmbracelet/log" + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + flag "github.com/spf13/pflag" + "golang.org/x/crypto/curve25519" +) + +func main() { + var args struct { + DbDir string + LogsDir string + ConfigPath string + Interface string + Relentless bool + } + flag.StringVarP(&args.DbDir, "dbdir", "", "./data/", "The path to the database directory") + flag.StringVarP(&args.LogsDir, "logsdir", "", "./logs/", "The path to the logs directory") + flag.StringVarP(&args.ConfigPath, "config", "c", "./config.json", "The path to the config.json") + flag.StringVarP(&args.Interface, "interface", "i", "kava", "Mesh interface (kava, mqtt)") + flag.BoolVarP(&args.Relentless, "relentless", "r", false, "Relentless reconnect to modem") + flag.Parse() + + log.SetLevel(log.DebugLevel) + cfg, err := ReadConfig(args.ConfigPath) + if err != nil { + log.Fatal(err) + } + pbChannels := make([]*pb.ChannelSettings, len(cfg.Channels)) + for i, channel := range cfg.Channels { + var pbChannelSettings = pb.ChannelSettings{ + Name: channel.Name, + Psk: channel.PSK, + } + pbChannels[i] = &pbChannelSettings + } + + var nodeID meshtastic.NodeID + if len(cfg.NodeID) == 4 { + nodeID = meshtastic.NodeID(binary.BigEndian.Uint32(cfg.NodeID)) + } else if len(cfg.NodeID) == 0 { + nodeID, err = meshtastic.RandomNodeID() + if err != nil { + log.Fatal(err) + } + } + log.Print("Config:") + log.Print(" Database", "directory", args.DbDir) + log.Print(" Logs", "directory", args.LogsDir) + log.Print(" Node", "id", nodeID, "short", cfg.ShortName, "name", cfg.Name) + nodeConfig := NodeConfig{ + DatabaseDir: args.DbDir, + LogsDir: args.LogsDir, + NodeID: nodeID, + LongName: cfg.Name, + ShortName: cfg.ShortName, + DefaultHops: cfg.DefaultHops, + MaxHops: cfg.MaxHops, + Channels: &pb.ChannelSet{Settings: pbChannels}, + BroadcastNodeInfoInterval: time.Duration(cfg.NodeBroadcastInterval * float64(time.Second)), + BroadcastPositionInterval: time.Duration(cfg.Position.BroadcastInterval * float64(time.Second)), + // Position paramters + PositionLatitudeI: int32(cfg.Position.Latitude * 1.e7), + PositionLongitudeI: int32(cfg.Position.Longitude * 1.e7), + PositionAltitude: int32(cfg.Position.Altitude), + // Device Metrics + DeviceMetricsBroadcastInterval: time.Duration(cfg.DeviceMetrics.BroadcastInterval * float64(time.Second)), + DeviceMetricsUPSAddress: cfg.DeviceMetrics.UPSAddress, + TCPListenAddr: "0.0.0.0:4403", + } + if len(cfg.X25519Key) == 32 { + nodeConfig.X25519SecretKey = cfg.X25519Key + nodeConfig.X25519PublicKey, err = curve25519.X25519(nodeConfig.X25519SecretKey, curve25519.Basepoint) + if err != nil { + log.Fatal(err) + } + log.Print(" X25519PublicKey", "base64", base64.StdEncoding.EncodeToString(nodeConfig.X25519PublicKey), "hex", fmt.Sprintf("%x", nodeConfig.X25519PublicKey)) + } else if len(cfg.X25519Key) != 0 { + log.Fatal("invalid x25519 key") + } + + if args.Relentless { + log.Warn("Starting Node in 'relentless' mode...") + } else { + log.Info("Starting Node...") + } + + n, err := NewNode(nodeConfig) + if err != nil { + log.Fatal(err) + } + defer n.Close() + ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt) + for { + var iface MeshIf + switch args.Interface { + case "kava": + iface = NewKavaConn(cfg.KavaModem.Address, 10*time.Second, n.cfg.Channels) + case "mqtt": + iface = NewMqttIf(cfg.MqttIf.Address, cfg.MqttIf.Username, cfg.MqttIf.Password, cfg.MqttIf.Root, cfg.MqttIf.ClientID, meshtastic.NodeID(nodeID), n.cfg.Channels) + default: + log.Fatal("invalid argument", "interface", args.Interface) + } + + if err := n.Run(ctx, iface); err != nil { + log.Error("!! Node.Run()", "err", err) + } + if err := ctx.Err(); err != nil { + log.Error("!! Caught interrupt!") + break + } + if !args.Relentless { + break + } + log.Warn("Reconnecting in 30 seconds...") + ticker := time.NewTicker(30 * time.Second) + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + } +} diff --git a/mestastic.go b/mestastic.go new file mode 100644 index 0000000..d76165b --- /dev/null +++ b/mestastic.go @@ -0,0 +1,90 @@ +package main + +import ( + "encoding/binary" + + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "google.golang.org/protobuf/proto" +) + +const ( + FlagViaMQTT = (1 << 4) + FlagWantACK = (1 << 3) +) + +var ( + broadcastID = meshtastic.BroadcastNodeID.Uint32() +) + +var pkcChan = &pb.ChannelSettings{ + ChannelNum: 0xFFFFFFFF, + Name: "[PKC]", + Psk: nil, +} + +type MeshIf interface { + Open() error + ReadMeshPacket() (*pb.MeshPacket, uint64, error) + WriteMeshPacket(*pb.MeshPacket) error + Close() error +} + +type RadioHeaderFlag uint8 + +func (r RadioHeaderFlag) WantACK() uint8 { + return uint8(r>>3) & 1 +} + +func (r RadioHeaderFlag) MQTT() uint8 { + return uint8(r>>4) & 1 +} + +func (r RadioHeaderFlag) HopStart() uint8 { + return uint8(r>>5) & 0b111 +} + +func (r RadioHeaderFlag) HopLimit() uint8 { + return uint8(r & 0b111) +} + +type PacketTruncated struct { + To uint32 + From uint32 + ID uint32 +} + +func buildRoutingReply(p *pb.MeshPacket, err pb.Routing_Error) *pb.MeshPacket { + cb, _ := proto.Marshal(&pb.Routing{ + Variant: &pb.Routing_ErrorReason{ErrorReason: err}, + }) + return &pb.MeshPacket{ + // Reversed + To: p.From, + From: p.To, + + PkiEncrypted: p.PkiEncrypted, + PublicKey: p.PublicKey, + + // (N)ACK + Priority: pb.MeshPacket_ACK, + HopLimit: p.HopStart, + Channel: p.Channel, + PayloadVariant: &pb.MeshPacket_Decoded{ + Decoded: &pb.Data{ + Portnum: pb.PortNum_ROUTING_APP, + RequestId: p.Id, + Payload: cb, + }, + }, + } +} + +func buildPkcNonce(id uint32, extra uint32, from uint32) []byte { + var buf [13]byte // zero-init + binary.LittleEndian.PutUint32(buf[0:4], id) + binary.LittleEndian.PutUint32(buf[4:8], extra) + binary.LittleEndian.PutUint32(buf[8:12], from) + buf[12] = 0 // last byte is zero + return buf[:] +} diff --git a/mqtt.go b/mqtt.go new file mode 100644 index 0000000..2e83e8a --- /dev/null +++ b/mqtt.go @@ -0,0 +1,205 @@ +package main + +import ( + "context" + "errors" + "fmt" + "net/url" + "time" + + "github.com/charmbracelet/log" + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "github.com/meshnet-gophers/meshtastic-go/radio" + "google.golang.org/protobuf/proto" +) + +// Implements MeshIf +type MqttIf struct { + Address string + Username string + Password string + ClientID string + Root string + GatewayID meshtastic.NodeID + Channels *pb.ChannelSet + + logger *log.Logger + c *autopaho.ConnectionManager + ctx context.Context + stop context.CancelFunc + inputCh chan mqttServiceEnvelope + chIdMap map[string]uint8 +} + +type mqttServiceEnvelope struct { + timestamp time.Time + se *pb.ServiceEnvelope +} + +func NewMqttIf(address string, username, password, root, clientID string, gatewayID meshtastic.NodeID, channels *pb.ChannelSet) *MqttIf { + logger := log.WithPrefix("MqttIf") + inputCh := make(chan mqttServiceEnvelope, 10) + return &MqttIf{ + Address: address, + Username: username, + Password: password, + ClientID: clientID, + Root: root, + GatewayID: gatewayID, + Channels: channels, + + logger: logger, + inputCh: inputCh, + } +} + +func (m *MqttIf) channelPublishTopic(channelId string) string { + return fmt.Sprintf("%s/2/e/%s/%s", m.Root, channelId, m.GatewayID) +} + +func (m *MqttIf) onConnectionUp(cm *autopaho.ConnectionManager, connAck *paho.Connack) { + topics := []string{fmt.Sprintf("%s/2/e/PKI/#", m.Root)} + chIdMap := make(map[string]uint8) + for _, c := range m.Channels.Settings { + topics = append(topics, fmt.Sprintf("%s/2/e/%s/#", m.Root, c.Name)) + channelHash, err := radio.ChannelHash(c.Name, c.Psk) + if err != nil { + m.logger.Errorf("failed to build channel hash: %s", err) + m.c.Disconnect(m.ctx) + return + } + chIdMap[c.Name] = uint8(channelHash) + } + m.chIdMap = chIdMap + subscriptions := make([]paho.SubscribeOptions, 0, len(topics)) + for _, t := range topics { + subscriptions = append(subscriptions, paho.SubscribeOptions{Topic: t, QoS: 1}) + } + if _, err := cm.Subscribe(m.ctx, &paho.Subscribe{Subscriptions: subscriptions}); err != nil { + m.logger.Errorf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) + m.c.Disconnect(m.ctx) + return + } + m.logger.Info("Subscribed", "topics", topics) +} + +func (m *MqttIf) onConnectionError(err error) { + m.logger.Warnf("error whilst attempting connection: %s\n", err) +} + +func (m *MqttIf) onReceive(pr paho.PublishReceived) (bool, error) { + if pr.Packet.Duplicate() || pr.AlreadyHandled { + return false, nil + } + se := &pb.ServiceEnvelope{} + if err := proto.Unmarshal(pr.Packet.Payload, se); err != nil { + return false, fmt.Errorf("unmarshalling ServicePayload from Mqtt: %w", err) + } + if se.Packet.RxTime == 0 { + se.Packet.RxTime = uint32(time.Now().Unix()) + } + m.inputCh <- mqttServiceEnvelope{timestamp: time.Now(), se: se} + return true, nil +} + +func (m *MqttIf) onClientError(err error) { + m.logger.Warnf("client error: %s\n", err) +} + +func (m *MqttIf) onServerDisconnect(d *paho.Disconnect) { + if d.Properties != nil { + fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString) + } else { + fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode) + } +} + +func (m *MqttIf) Open() error { + m.ctx, m.stop = context.WithCancel(context.Background()) + u, err := url.Parse(m.Address) + if err != nil { + return err + } + cliCfg := autopaho.ClientConfig{ + ServerUrls: []*url.URL{u}, + KeepAlive: 20, // Keepalive message should be sent every 20 seconds + CleanStartOnInitialConnection: false, + SessionExpiryInterval: 60, + OnConnectionUp: m.onConnectionUp, + OnConnectError: m.onConnectionError, + ConnectUsername: m.Username, + ConnectPassword: []byte(m.Password), + ClientConfig: paho.ClientConfig{ + ClientID: m.ClientID, + OnPublishReceived: []func(paho.PublishReceived) (bool, error){m.onReceive}, + OnClientError: m.onClientError, + OnServerDisconnect: m.onServerDisconnect, + }, + } + + m.c, err = autopaho.NewConnection(m.ctx, cliCfg) // starts process; will reconnect until context cancelled + if err != nil { + return err + } + if err = m.c.AwaitConnection(m.ctx); err != nil { + return err + } + + return nil +} + +func (m *MqttIf) Close() error { + m.logger.Info("Closing MQTT connection...") + m.c.Disconnect(m.ctx) + m.stop() + close(m.inputCh) + <-m.ctx.Done() + return nil +} + +func (m *MqttIf) ReadMeshPacket() (*pb.MeshPacket, uint64, error) { + if err := m.ctx.Err(); err != nil { + return nil, 0, err + } + p := <-m.inputCh + if p.se == nil || p.se.Packet == nil { + return nil, 0, errors.New("no input") + } + p.se.Packet.Channel = uint32(m.chIdMap[p.se.ChannelId]) + return p.se.Packet, uint64(p.timestamp.UnixMicro()), nil +} + +func (m *MqttIf) WriteMeshPacket(p *pb.MeshPacket) error { + var channelId string + if p.PkiEncrypted { + channelId = "PKI" + p.Priority = pb.MeshPacket_HIGH + } else { + channel := m.Channels.Settings[p.Channel] + channelId = channel.Name + } + if p.RxTime == 0 { + p.RxTime = uint32(time.Now().Unix()) + } + p.RelayNode = p.RelayNode & 0xFF + p.NextHop = p.NextHop & 0xFF + p.Channel = 0 // Don't expose your channel number + se := &pb.ServiceEnvelope{ + Packet: p, + ChannelId: channelId, + GatewayId: m.GatewayID.String(), + } + payload, err := proto.Marshal(se) + if err != nil { + return err + } + topic := m.channelPublishTopic(channelId) + m.c.Publish(m.ctx, &paho.Publish{ + Topic: topic, + Payload: payload, + }) + return nil +} diff --git a/node.go b/node.go new file mode 100644 index 0000000..a2b8edb --- /dev/null +++ b/node.go @@ -0,0 +1,598 @@ +package main + +import ( + "bufio" + "bytes" + "context" + "crypto/aes" + "crypto/rand" + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" + "io" + "os/exec" + "strconv" + "sync" + "time" + + "github.com/charmbracelet/log" + "github.com/elastic/go-freelru" + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "github.com/meshnet-gophers/meshtastic-go/radio" + "github.com/meshnet-gophers/meshtastic-go/transport" + "github.com/pion/dtls/v3/pkg/crypto/ccm" + "golang.org/x/crypto/curve25519" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/proto" +) + +const ( + // MinAppVersion is the minimum app version supported by the emulated radio. + MinAppVersion = 30200 + ROUTE_MAX_SIZE = 8 +) + +var ( + errNoAdminConnection = errors.New("no admin connection") +) + +// NodeConfig is the configuration for the emulated Radio. +type NodeConfig struct { + // Database + DatabaseDir string + + // Logs directory + LogsDir string + + // Node configuration + // NodeID is the ID of the node. + NodeID meshtastic.NodeID + // LongName is the long name of the node. + LongName string + // ShortName is the short name of the node. + ShortName string + + // MaxHops sets the maximum value for value for HopStart/HopLimit on Tx + MaxHops uint32 + // DefaultHops sets the default value for HopStart/HopLimit on Tx + DefaultHops uint32 + + // Channels is the set of channels the radio will listen and transmit on. + // The first channel in the set is considered the primary channel and is used for broadcasting NodeInfo and Position + Channels *pb.ChannelSet + // BroadcastNodeInfoInterval is the interval at which the radio will broadcast a NodeInfo on the Primary channel. + // The zero value disables broadcasting NodeInfo. + BroadcastNodeInfoInterval time.Duration + // BroadcastPositionInterval is the interval at which the radio will broadcast Position on the Primary channel. + // The zero value disables broadcasting NodeInfo. + BroadcastPositionInterval time.Duration + // PositionLatitudeI is the latitude of the position which will be regularly broadcasted. + // This is in degrees multiplied by 1e7. + PositionLatitudeI int32 + // PositionLongitudeI is the longitude of the position which will be regularly broadcasted. + // This is in degrees multiplied by 1e7. + PositionLongitudeI int32 + // PositionAltitude is the altitude of the position which will be regularly broadcasted. + // This is in meters above MSL. + PositionAltitude int32 + // TCPListenAddr is the address the emulated radio will listen on for TCP connections and offer the Client API over. + TCPListenAddr string + + DeviceMetricsBroadcastInterval time.Duration + DeviceMetricsUPSAddress string + X25519SecretKey []byte + X25519PublicKey []byte +} + +func (c *NodeConfig) validate() error { + if c.DatabaseDir == "" { + return fmt.Errorf("DatabasePath is required") + } + if c.LogsDir == "" { + return fmt.Errorf("LogsDir is required") + } + if c.NodeID == 0 { + return fmt.Errorf("NodeID is required") + } + if c.LongName == "" { + c.LongName = c.NodeID.DefaultLongName() + } + if c.ShortName == "" { + c.ShortName = c.NodeID.DefaultShortName() + } + if c.Channels == nil { + //lint:ignore ST1005 we're referencing an actual field here. + return fmt.Errorf("Channels is required") + } + if len(c.Channels.Settings) == 0 { + return fmt.Errorf("Channels.Settings should be non-empty") + } + return nil +} + +// Node emulates a meshtastic Node, communicating with a meshtastic network via MQTT. +type Node struct { + cfg NodeConfig + db *Database + mesh MeshIf + logger *log.Logger + dl *DataLogger + start time.Time + nodeID uint32 + user *pb.User + chLookup map[byte][]*pb.ChannelSettings + pktLru *freelru.LRU[PacketTruncated, struct{}] + packetID uint32 + mu sync.Mutex + adminConn map[*transport.StreamConn]struct{} +} + +// NewNode creates a new emulated radio. +func NewNode(cfg NodeConfig) (*Node, error) { + if err := cfg.validate(); err != nil { + return nil, fmt.Errorf("validating config: %w", err) + } + db, err := NewDB(cfg.DatabaseDir) + if err != nil { + return nil, err + } + logger := log.WithPrefix("Node") + chLookup := make(map[byte][]*pb.ChannelSettings) + for i, c := range cfg.Channels.Settings { + c.ChannelNum = uint32(i) + ch, err := radio.ChannelHash(c.Name, c.Psk) + if err != nil { + return nil, err + } + chLookup[byte(ch)] = append(chLookup[byte(ch)], c) + logger.Infof("Channel[%d]: Name='%s', Hash=0x%02x", i, c.Name, ch) + } + dl, err := NewDataLogger(cfg.LogsDir) + if err != nil { + return nil, err + } + pktLru, err := freelru.New[PacketTruncated, struct{}](32, hashPacket) + if err != nil { + panic(err) + } + var nonce [4]byte + if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil { + return nil, err + } + adminConn := make(map[*transport.StreamConn]struct{}) + return &Node{ + cfg: cfg, + logger: logger, + db: db, + dl: dl, + start: time.Now(), + packetID: binary.LittleEndian.Uint32(nonce[:]), + chLookup: chLookup, + pktLru: pktLru, + adminConn: adminConn, + }, nil +} + +func (n *Node) Close() error { + return n.db.Close() +} + +// Run starts the radio. It blocks until the context is cancelled. +func (n *Node) Run(ctx context.Context, meshConn MeshIf) error { + n.logger.Infof("** Connecting to Mesh...") + if err := meshConn.Open(); err != nil { + return fmt.Errorf("connecting to modem: %w", err) + } + n.mesh = meshConn + go func() { + <-ctx.Done() + meshConn.Close() + }() + n.logger.Infof("** Connected to Mesh...") + n.nodeID = n.cfg.NodeID.Uint32() + n.NodeInfo(n.nodeID, func(ni *pb.NodeInfo) bool { + var updated bool + n.user = &pb.User{ + Id: n.cfg.NodeID.String(), + LongName: n.cfg.LongName, + ShortName: n.cfg.ShortName, + HwModel: pb.HardwareModel_PRIVATE_HW, + Role: pb.Config_DeviceConfig_CLIENT_MUTE, + IsLicensed: false, + IsUnmessagable: Ptr(false), + PublicKey: n.cfg.X25519PublicKey, + } + if !proto.Equal(ni.User, n.user) { + ni.User = n.user + updated = true + } + if n.cfg.DeviceMetricsBroadcastInterval > 0 { + deviceMetrics, err := n.getDeviceMetrics() + if err == nil { + ni.DeviceMetrics = deviceMetrics + updated = true + } + } + return updated + }) + + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(func() error { + return n.RxLoop(egCtx) + }) + + if n.cfg.BroadcastNodeInfoInterval > 0 { + eg.Go(FuncTicker(n.cfg.BroadcastNodeInfoInterval, egCtx, n.broadcastNodeInfo)) + } + if n.cfg.BroadcastPositionInterval > 0 { + eg.Go(FuncTicker(n.cfg.BroadcastPositionInterval, egCtx, n.broadcastPosition)) + } + if n.cfg.DeviceMetricsBroadcastInterval > 0 { + eg.Go(FuncTicker(n.cfg.DeviceMetricsBroadcastInterval, egCtx, n.broadcastDeviceMetrics)) + } + if n.cfg.TCPListenAddr != "" { + eg.Go(func() error { + return n.listenTCP(egCtx) + }) + } + return eg.Wait() +} + +func (n *Node) NodeInfo(nodeID uint32, updateFunc func(*pb.NodeInfo) bool) *pb.NodeInfo { + if nodeID == 0 { + return nil + } + ni, err := n.db.GetNodeInfo(nodeID) + if err == errNotExists { + ni = &pb.NodeInfo{User: &pb.User{}, Position: &pb.Position{}, DeviceMetrics: &pb.DeviceMetrics{}} + } else if err != nil { + n.logger.Error("!! NodeInfo error", "error", err) + return nil + } + ni.Num = nodeID + var needsUpdate bool + if updateFunc != nil { + needsUpdate = updateFunc(ni) + } else if err == nil { + return ni // no (updatefunc or new node) + } + if !needsUpdate { + return ni + } + ni.LastHeard = uint32(time.Now().Unix()) + n.logger.Debug("** SetNodeInfo", "nodeInfo", ni) + if err = n.db.SetNodeInfo(nodeID, ni); err != nil { + n.logger.Error(err) + } + return ni +} + +func (n *Node) tryDecryptPKC(p *pb.MeshPacket) ([]byte, []byte, error) { + ni, err := n.db.GetNodeInfo(p.From) + if err != nil { + return nil, nil, err + } + if ni.User == nil || ni.User.PublicKey == nil { + return nil, nil, errors.New("invalid user publickey") + } + pk := ni.User.PublicKey + sh, err := curve25519.X25519(n.cfg.X25519SecretKey, pk) + if err != nil { + return nil, nil, err + } + h := sha256.New() + h.Write(sh) + shk := h.Sum(nil) + c, err := aes.NewCipher(shk) + if err != nil { + return nil, nil, err + } + aead, err := ccm.NewCCM(c, 8, 13) + if err != nil { + return nil, nil, err + } + payload := p.GetEncrypted() + if len(payload) <= 4+8 { + return nil, nil, errors.New("too short") + } + extra := binary.LittleEndian.Uint32(payload[len(payload)-4:]) + nonce := buildPkcNonce(p.Id, extra, p.From) + data, err := aead.Open(nil, nonce, payload[:len(payload)-4], nil) + if err != nil { + return nil, nil, err + } + return data, pk, nil +} + +func (n *Node) tryDecryptPSK(p *pb.MeshPacket) ([]byte, *pb.ChannelSettings) { + channelCandidates, ok := n.chLookup[byte(p.Channel)] + if !ok { + return nil, nil // cannot find channel + } + var err error + var plaintext []byte + var channel *pb.ChannelSettings + for _, channel = range channelCandidates { + plaintext, err = radio.XOR( + p.GetEncrypted(), + channel.Psk, + p.Id, + p.From, + ) + if err == nil { + break + } + } + if err != nil { + return nil, nil + } + return plaintext, channel +} + +func (n *Node) nextPacketID() uint32 { + n.mu.Lock() + n.packetID += 1 + n.mu.Unlock() + return n.packetID +} + +func (n *Node) txPortnumMessage(channelID uint32, to uint32, portNum pb.PortNum, message proto.Message) error { + payload, err := proto.Marshal(message) + if err != nil { + return err + } + return n.txPacket(&pb.MeshPacket{ + From: n.nodeID, + To: to, + Channel: channelID, + PayloadVariant: &pb.MeshPacket_Decoded{Decoded: &pb.Data{ + Portnum: portNum, + Payload: payload, + }}, + }) +} + +func (n *Node) txPacket(p *pb.MeshPacket) error { + var plaintext []byte + var decoded *pb.Data + switch payload := p.PayloadVariant.(type) { + case *pb.MeshPacket_Decoded: + var err error + plaintext, err = proto.Marshal(payload.Decoded) + if err != nil { + return fmt.Errorf("marshalling user: %w", err) + } + decoded = payload.Decoded + default: + panic("unexpected MeshPacket payload variant") + } + + if p.To == 0 { + p.To = broadcastID + } + if p.Id == 0 { + p.Id = n.nextPacketID() + } + if p.RelayNode == 0 { + p.RelayNode = p.From + } + if p.RelayNode == 0 { + p.RelayNode = 0xFF + } + if p.HopStart == 0 { + p.HopStart = n.cfg.DefaultHops + p.HopLimit = n.cfg.DefaultHops + } + if p.HopLimit > n.cfg.MaxHops { + p.HopLimit = n.cfg.MaxHops + } + dstNi, _ := n.db.GetNodeInfo(p.To) + if dstNi == nil { + dstNi = &pb.NodeInfo{Num: p.To} + } + + // PKC + if p.PkiEncrypted && p.PublicKey != nil { + // explicit PKC with a key, do nothing + } else if p.From == n.nodeID && decoded.Portnum != pb.PortNum_TRACEROUTE_APP && decoded.Portnum != pb.PortNum_NODEINFO_APP && decoded.Portnum != pb.PortNum_ROUTING_APP && decoded.Portnum != pb.PortNum_POSITION_APP { + // implicit PKC, or explicit PKC without a key + if dstNi.User != nil && dstNi.User.PublicKey != nil { + p.PublicKey = dstNi.User.PublicKey + } + } + // Ensure if PKC is selected, there is a recepient key + if p.PkiEncrypted && p.PublicKey == nil { + return errors.New("missing recepient public key") + } + + // Encrypt payload + buf := bytes.NewBuffer(nil) + var encrChannel string + var channel *pb.ChannelSettings + if p.PkiEncrypted { + channel = pkcChan + encrChannel = pkcChan.Name + p.Channel = 0 + sh, err := curve25519.X25519(n.cfg.X25519SecretKey, p.PublicKey) + if err != nil { + return err + } + h := sha256.New() + h.Write(sh) + shk := h.Sum(nil) + c, err := aes.NewCipher(shk) + if err != nil { + return err + } + aead, err := ccm.NewCCM(c, 8, 13) + if err != nil { + return err + } + var nonceExtra [4]byte + if _, err := rand.Read(nonceExtra[:]); err != nil { + return err + } + nonce := buildPkcNonce(p.Id, binary.LittleEndian.Uint32(nonceExtra[:]), p.From) + ciphertextAndTag := aead.Seal(nil, nonce, plaintext, nil) + buf.Write(ciphertextAndTag) + buf.Write(nonceExtra[:]) + } else { + chIdx := p.Channel + if chIdx >= uint32(len(n.cfg.Channels.Settings)) { + return errors.New("invalid channel index") + } + channel = n.cfg.Channels.Settings[chIdx] + encrChannel = fmt.Sprintf("#%s", channel.Name) + ciphertext, err := radio.XOR( + plaintext, + channel.Psk, + p.Id, + p.From, + ) + if err != nil { + return fmt.Errorf("failed to PSK encrypt: %w", err) + } + buf.Write(ciphertext) + } + b := buf.Bytes() + buf.Reset() + out := proto.Clone(p).(*pb.MeshPacket) + out.PayloadVariant = &pb.MeshPacket_Encrypted{Encrypted: b} + if err := n.mesh.WriteMeshPacket(out); err != nil { + n.logger.Debug("X( MeshPacket", "channel", encrChannel, "packet", p.String(), "out", out.String()) + return err + } + n.dl.LogPacket(p, channel, &pb.NodeInfo{Num: n.nodeID, User: n.user}, dstNi, decoded, decoded.Payload) + pt := PacketTruncated{ + To: p.To, + From: p.From, + ID: p.Id, + } + n.pktLru.Add(pt, struct{}{}) + n.logger.Debug("(( MeshPacket", "channel", encrChannel, "packet", p.String(), "out", out.String()) + return nil +} + +func (n *Node) broadcastNodeInfo(ctx context.Context) error { + n.logger.Info("(( NodeInfo") + return n.txPortnumMessage(0, broadcastID, pb.PortNum_NODEINFO_APP, n.user) +} + +func (n *Node) getPosition() *pb.Position { + return &pb.Position{ + LatitudeI: &n.cfg.PositionLatitudeI, + LongitudeI: &n.cfg.PositionLongitudeI, + Altitude: &n.cfg.PositionAltitude, + Time: uint32(time.Now().Unix()), + LocationSource: pb.Position_LOC_MANUAL, + } +} + +func (n *Node) broadcastPosition(ctx context.Context) error { + n.logger.Info("(( Position") + position := n.getPosition() + n.NodeInfo(n.nodeID, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.Position = position + return true + }) + return n.txPortnumMessage(0, broadcastID, pb.PortNum_POSITION_APP, position) +} + +func (n *Node) getDeviceMetrics() (*pb.DeviceMetrics, error) { + cmdStdout := bytes.NewBuffer(nil) + cmd := exec.Command("upsc", n.cfg.DeviceMetricsUPSAddress) + cmd.Stdout = cmdStdout + err := cmd.Run() + if err != nil { + n.logger.Error("failed to get UPS data", "err", err) + return nil, err + } + stdoutBytes := cmdStdout.Bytes() + stdout := bufio.NewReader(bytes.NewReader(stdoutBytes)) + var batteryLevel uint32 + var voltage float32 + var channelUtilization float32 + var airUtilTx float32 + for { + line, _, err := stdout.ReadLine() + if errors.Is(err, io.EOF) { + break + } + lineValue := bytes.SplitN(line, []byte(": "), 2) + if len(lineValue) != 2 { + continue + } + if bytes.Equal(lineValue[0], []byte("battery.voltage")) { + v, err := strconv.ParseFloat(string(lineValue[1]), 64) + if err != nil { + continue + } + voltage = float32(v) + } + if bytes.Equal(lineValue[0], []byte("battery.charge")) { + charge, err := strconv.ParseUint(string(lineValue[1]), 10, 64) + if err != nil { + continue + } + batteryLevel = uint32(charge) + } + } + if batteryLevel == 100 && voltage > 0 { + batteryLevel = 101 + } + uptime := uint32(time.Since(n.start).Seconds()) + return &pb.DeviceMetrics{ + BatteryLevel: &batteryLevel, + Voltage: &voltage, + ChannelUtilization: &channelUtilization, + AirUtilTx: &airUtilTx, + UptimeSeconds: &uptime, + }, nil +} +func (n *Node) broadcastDeviceMetrics(ctx context.Context) error { + n.logger.Info("(( DeviceMetrics") + deviceMetrics, err := n.getDeviceMetrics() + if err != nil { + return err + } + n.NodeInfo(n.nodeID, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.DeviceMetrics = deviceMetrics + return true + }) + return n.txPortnumMessage(0, broadcastID, pb.PortNum_TELEMETRY_APP, &pb.Telemetry{ + Variant: &pb.Telemetry_DeviceMetrics{ + DeviceMetrics: deviceMetrics, + }, + }) +} + +func (n *Node) getDeviceMetadata() *pb.DeviceMetadata { + return &pb.DeviceMetadata{ + FirmwareVersion: "2.6.0-golang", + DeviceStateVersion: 24, + HwModel: pb.HardwareModel_PRIVATE_HW, + Role: pb.Config_DeviceConfig_CLIENT_MUTE, + HasPKC: true, + HasRemoteHardware: false, + HasWifi: false, + HasBluetooth: false, + HasEthernet: false, + CanShutdown: false, + } +} + +// dispatchMessageToAdmin sends a FromRadio message to all current subscribers to +// the FromRadio. +func (n *Node) dispatchMessageToAdmin(msg *pb.FromRadio) error { + if len(n.adminConn) == 0 { + return errNoAdminConnection + } + for conn := range n.adminConn { + if err := conn.Write(msg); err != nil { + n.logger.Errorf("failed to send to admin: %w", err) + } + } + return nil +} diff --git a/nodeadmin.go b/nodeadmin.go new file mode 100644 index 0000000..e6b88a5 --- /dev/null +++ b/nodeadmin.go @@ -0,0 +1,392 @@ +package main + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "net" + "strings" + "time" + + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "github.com/meshnet-gophers/meshtastic-go/transport" + "google.golang.org/protobuf/proto" +) + +func phoneLastUpdateKey(identifier string) []byte { + return []byte("phone-lastupdate-" + identifier) +} + +func (n *Node) setPhoneLastUpdate(identifier string, t time.Time) error { + var lastUpdate [8]byte + binary.BigEndian.PutUint64(lastUpdate[:], uint64(t.UnixMicro())) + return n.db.Set(phoneLastUpdateKey(identifier), lastUpdate[:]) +} + +func (n *Node) handleToRadioWantConfigID(identifier string, conn *transport.StreamConn, req *pb.ToRadio_WantConfigId) error { + // Send MyInfo + err := conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_MyInfo{ + MyInfo: &pb.MyNodeInfo{ + MyNodeNum: n.nodeID, + RebootCount: 0, // use db nonce key? + // TODO: Track this as a const + MinAppVersion: MinAppVersion, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Metadata + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Metadata{ + Metadata: n.getDeviceMetadata(), + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send our node + nodeInfo := n.NodeInfo(n.nodeID, nil) + if nodeInfo.User == nil { + nodeInfo.User = n.user + } + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_NodeInfo{ + NodeInfo: nodeInfo, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Other Nodes + nodes, err := n.db.ListNodeInfo() + if err != nil { + return fmt.Errorf("listing nodes: %w", err) + } + for _, ni := range nodes { + if ni.Num == n.nodeID { + continue + } + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_NodeInfo{ + NodeInfo: ni, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + + // Send Channels + for i, channel := range n.cfg.Channels.Settings { + role := pb.Channel_SECONDARY + if i == 0 { + role = pb.Channel_PRIMARY + } + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Channel{ + Channel: &pb.Channel{ + Index: int32(i), + Settings: &pb.ChannelSettings{ + Name: channel.Name, + Psk: channel.Psk, + }, + Role: role, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + + // Send Config: Device + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Config{ + Config: &pb.Config{ + PayloadVariant: &pb.Config_Device{ + Device: &pb.Config_DeviceConfig{ + NodeInfoBroadcastSecs: uint32(n.cfg.BroadcastNodeInfoInterval.Seconds()), + }, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send Config: LoRa + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_Config{ + Config: &pb.Config{ + PayloadVariant: &pb.Config_Lora{ + Lora: &pb.Config_LoRaConfig{ + UsePreset: true, + ModemPreset: pb.Config_LoRaConfig_SHORT_FAST, + Region: pb.Config_LoRaConfig_EU_868, + ConfigOkToMqtt: true, + }, + }, + }, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + // Send ConfigComplete to indicate we're done + err = conn.Write(&pb.FromRadio{ + PayloadVariant: &pb.FromRadio_ConfigCompleteId{ + ConfigCompleteId: req.WantConfigId, + }, + }) + if err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + + start := uint64(0) + lastPhoneUpdate, _ := n.db.Get(phoneLastUpdateKey(identifier)) + if len(lastPhoneUpdate) == 8 { + start = binary.BigEndian.Uint64(lastPhoneUpdate) + } else { + start = uint64(time.Now().UnixMicro()) + } + startTime := time.UnixMicro(int64(start)) + now := time.Now() + n.logger.Info("** Replaying FromRadio packets to admin...", "from", startTime, "to", now) + c, errc := n.db.ReadFromRadio(startTime, now) + for msg := range c { + pkt := msg.GetPacket() + // process only messages that are broadcast or to us. + if pkt == nil || (pkt.To != broadcastID && pkt.To != n.nodeID) { + continue + } + if err := conn.Write(msg); err != nil { + return fmt.Errorf("failed to send to admin: %w", err) + } + + } + err = <-errc + n.logger.Info("** Finished WantConfigID", "err", err) + + n.setPhoneLastUpdate(identifier, now) + return err +} + +func (n *Node) handleConn(ctx context.Context, underlying io.ReadWriteCloser, identifier string) error { + streamConn := transport.NewRadioStreamConn(underlying) + n.adminConn[streamConn] = struct{}{} + defer func() { + delete(n.adminConn, streamConn) + if err := streamConn.Close(); err != nil { + n.logger.Error("failed to close streamConn", "err", err) + } + }() + go func() { + <-ctx.Done() + streamConn.Close() + }() + + var configured bool + var lastPktTime time.Time + defer func() { + if lastPktTime.IsZero() { + return + } + n.setPhoneLastUpdate(identifier, lastPktTime) + }() + for { + msg := &pb.ToRadio{} + if err := streamConn.Read(msg); err != nil { + return fmt.Errorf("reading from streamConn: %w", err) + } + if configured { + lastPktTime = time.Now() + } + n.logger.Info(">> ToRadio", "msg", msg) + switch payload := msg.PayloadVariant.(type) { + case *pb.ToRadio_Disconnect: + // The meshtastic python client sends a Disconnect command and with the TCP implementation, it expects + // the radio to close the connection. So we end the read loop here, and return to close the connection. + streamConn.Close() + return nil + case *pb.ToRadio_WantConfigId: + if err := n.handleToRadioWantConfigID(identifier, streamConn, payload); err != nil { + return fmt.Errorf("handling WantConfigId: %w", err) + } + configured = true + case *pb.ToRadio_Packet: + var reply *pb.FromRadio_Packet + var sendAck bool = payload.Packet.WantAck + var errorCode pb.Routing_Error = pb.Routing_NONE + if decoded := payload.Packet.GetDecoded(); decoded != nil { + n.logger.Info(">> ToRadio_Packet", "payload", payload) + if payload.Packet.To == n.nodeID { + switch decoded.Portnum { + case pb.PortNum_ADMIN_APP: + admin := &pb.AdminMessage{} + if err := proto.Unmarshal(decoded.Payload, admin); err != nil { + return fmt.Errorf("unmarshalling admin: %w", err) + } + n.logger.Info(fmt.Sprintf(">> AdminMessage %T:", admin.PayloadVariant), "msg", admin.PayloadVariant) + + switch adminPayload := admin.PayloadVariant.(type) { + case *pb.AdminMessage_GetChannelRequest: + n.logger.Info(">> AdminMessage.GetChannelRequest", "adminPayload", adminPayload, "packet", payload) + chIdx := adminPayload.GetChannelRequest - 1 + var c *pb.ChannelSettings + role := pb.Channel_DISABLED + if chIdx > uint32(len(n.cfg.Channels.Settings)) { + c = &pb.ChannelSettings{} + } else { + c = n.cfg.Channels.Settings[chIdx] + role = pb.Channel_SECONDARY + if chIdx == 0 { + role = pb.Channel_PRIMARY + } + } + resp := &pb.AdminMessage{ + PayloadVariant: &pb.AdminMessage_GetChannelResponse{ + GetChannelResponse: &pb.Channel{ + Index: int32(chIdx), + Settings: c, + Role: role, + }, + }, + } + respBytes, err := proto.Marshal(resp) + if err != nil { + return fmt.Errorf("marshalling GetChannelResponse: %w", err) + } + // Send GetChannelResponse + reply = &pb.FromRadio_Packet{ + Packet: &pb.MeshPacket{ + Id: n.nextPacketID(), + From: n.nodeID, + To: n.nodeID, + PayloadVariant: &pb.MeshPacket_Decoded{ + Decoded: &pb.Data{ + Portnum: pb.PortNum_ADMIN_APP, + Payload: respBytes, + RequestId: payload.Packet.Id, + }, + }, + }, + } + case *pb.AdminMessage_SetFavoriteNode: + n.NodeInfo(adminPayload.SetFavoriteNode, func(ni *pb.NodeInfo) bool { + ni.IsFavorite = true + return true + }) + case *pb.AdminMessage_RemoveFavoriteNode: + n.NodeInfo(adminPayload.RemoveFavoriteNode, func(ni *pb.NodeInfo) bool { + ni.IsFavorite = false + return true + }) + case *pb.AdminMessage_RemoveByNodenum: + n.NodeInfo(adminPayload.RemoveByNodenum, func(ni *pb.NodeInfo) bool { + ni.Num = 0 // delete NodeInfo + return true + }) + case *pb.AdminMessage_SetTimeOnly: + default: + errorCode = pb.Routing_BAD_REQUEST + n.logger.Warnf(">> !Unhandled AdminMessage of type '%T'", adminPayload) + } + default: + n.logger.Warnf(">> No handling of ToRadio.Packet to us w/ portNum '%s'", decoded.Portnum.String()) + } + } else { + defaultWarning := true + switch decoded.Portnum { + case pb.PortNum_AUDIO_APP, pb.PortNum_NODEINFO_APP, pb.PortNum_TEXT_MESSAGE_APP, pb.PortNum_POSITION_APP, pb.PortNum_TRACEROUTE_APP: + defaultWarning = false + fallthrough + default: + if defaultWarning { + n.logger.Warnf(">> Default handling of ToRadio.Packet message w/ portNum '%s'", decoded.Portnum.String()) + } + from := payload.Packet.From + if from == 0 { + from = n.nodeID + } + pkt := proto.Clone(payload.Packet).(*pb.MeshPacket) + pkt.From = from + decoded := pkt.GetDecoded() + if decoded.Bitfield != nil { + *decoded.Bitfield |= (1 << 0) // ok to mqtt + } else { + bitfield := uint32(1) + decoded.Bitfield = &bitfield // ok to mqtt + } + if err := n.txPacket(pkt); err != nil { + return fmt.Errorf("failed to tx packet: %w", err) + } + } + } + } + if errorCode != pb.Routing_NONE || sendAck { + if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: buildRoutingReply(payload.Packet, errorCode)}}); err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + if reply != nil { + if err := streamConn.Write(&pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: reply.Packet}}); err != nil { + return fmt.Errorf("writing to streamConn: %w", err) + } + } + case *pb.ToRadio_Heartbeat: + default: + n.logger.Warnf(">> !Unhandled ToRadio message of type '%T'", payload) + } + } +} + +func (n *Node) listenTCP(ctx context.Context) error { + l, err := net.Listen("tcp", n.cfg.TCPListenAddr) + if err != nil { + return fmt.Errorf("listening: %w", err) + } + defer l.Close() + n.logger.Info("** Listening for tcp connections", "addr", n.cfg.TCPListenAddr) + + go func() { + <-ctx.Done() + l.Close() + }() + for { + c, err := l.Accept() + if err != nil { + n.logger.Error("!! Failed to accept connection", "err", err) + return err + } + go func(c net.Conn) { + defer c.Close() + connctx, cancel := context.WithCancel(ctx) + defer cancel() + tcpconn := c.(*net.TCPConn) + tcpconn.SetKeepAlive(true) + tcpconn.SetKeepAlivePeriod(25 * time.Second) + address := tcpconn.RemoteAddr().String() + ipString := strings.Split(address, ":")[0] + n.logger.Info("** Accepted connection", "addr", address) + if err := n.handleConn(connctx, c, ipString); err != nil { + if errors.Is(err, io.EOF) { + n.logger.Info("** Connection EOF", "addr", address) + } else { + n.logger.Error("!! Connection error", "err", err) + } + } else { + n.logger.Info("** Connection ended", "addr", address) + } + }(c) + } +} diff --git a/noderxloop.go b/noderxloop.go new file mode 100644 index 0000000..f4ed602 --- /dev/null +++ b/noderxloop.go @@ -0,0 +1,280 @@ +package main + +import ( + "bytes" + "context" + "encoding/hex" + "fmt" + + "github.com/charmbracelet/log" + "github.com/meshnet-gophers/meshtastic-go" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "google.golang.org/protobuf/proto" +) + +func (n *Node) RxLoop(ctx context.Context) error { + for { + if err := ctx.Err(); err != nil { + return err + } + p, ts, err := n.mesh.ReadMeshPacket() + if err != nil { + n.logger.Error("Read mesh packet failed", "err", err) + return err + } + if err := n.rxMessage(ts, p); err != nil { + n.logger.Error("Processing Radio Message", "error", err) + } + } +} + +func (n *Node) rxMessage(ts uint64, p *pb.MeshPacket) error { + pt := PacketTruncated{To: p.To, From: p.From, ID: p.Id} + info := fmt.Sprintf("ts:%d rssi:%-4d snr:%-5.1f", p.RxTime, p.RxRssi, p.RxSnr) + header := fmt.Sprintf("from:%08x to:%08x id:%08x ch:%02x next:%02x last:%02x hs:%d hl:%d mq:%d wack:%d", p.From, p.To, p.Id, uint8(p.Channel), p.NextHop, p.RelayNode, p.HopStart, p.HopLimit, bool2Int(p.ViaMqtt), bool2Int(p.WantAck)) + payloadstr := fmt.Sprintf("%x", p.GetEncrypted()) + var dupe bool + if _, dupe = n.pktLru.Get(pt); dupe { + n.logger.Debug(") MeshPacket", "info", info, "header", header, "payload", payloadstr) + } else { + n.logger.Debug(")) MeshPacket", "info", info, "header", header, "payload", payloadstr) + n.pktLru.Add(pt, struct{}{}) + } + if dupe { // XXX: it would need to be changed for the traceroute + return nil // skip duplicates + } + if p.From == n.nodeID { + return nil // ourself + } + if len(p.GetEncrypted()) == 0 { + n.logger.Warn(":: Skipping unencrypted or empty packet") + return nil // we skip unencrypted packets + } + var plaintext []byte + var pubKey []byte + var channel *pb.ChannelSettings + if p.To == n.nodeID && p.Channel == 0x00 { + var err error + if plaintext, pubKey, err = n.tryDecryptPKC(p); err != nil { + log.Printf("failed to decrypt PKC message: %s", err) + } + } + if plaintext != nil { + channel = pkcChan + p.Channel = 0 + } else { + plaintext, channel = n.tryDecryptPSK(p) + if plaintext == nil { + return nil // failed decrypt + } + p.Channel = channel.ChannelNum + } + data := &pb.Data{} + if err := proto.Unmarshal(plaintext, data); err != nil { + n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext)) + return fmt.Errorf("unmarshalling decrypted data: %w", err) + } + + p.PublicKey = pubKey + p.PkiEncrypted = channel == pkcChan + p.PayloadVariant = &pb.MeshPacket_Decoded{Decoded: data} + msgFromRadio := &pb.FromRadio{PayloadVariant: &pb.FromRadio_Packet{Packet: p}} + source := data.Source + if source == 0 { + source = p.From + } + dest := data.Dest + if dest == 0 { + dest = p.To + } + toUs := dest == n.nodeID + if dest == broadcastID || toUs { + n.dispatchMessageToAdmin(msgFromRadio) + } + n.db.StoreFromRadio(int64(ts), msgFromRadio) + n.logger.Debug(")) MeshPacket.Data", "channel", channel.Name, "mqtt", p.ViaMqtt, "raw", hex.EncodeToString(plaintext), "data", data) + + src := n.NodeInfo(source, func(ni *pb.NodeInfo) bool { + if ni.Num == n.nodeID { + return false // it's us + } + if p.HopLimit > p.HopStart { + return false // weird hoplimit value + } + hopsAway := uint32(p.HopStart - p.HopLimit) + if ni.HopsAway != nil && *ni.HopsAway <= hopsAway { + return false // skip update if these hops away are longer + } + ni.HopsAway = &hopsAway + return true + }) + dst := n.NodeInfo(dest, nil) + + var payload any + var reply proto.Message + replyPortNum := data.Portnum + switch data.Portnum { + case pb.PortNum_ADMIN_APP: + admin := &pb.AdminMessage{} + if err := proto.Unmarshal(data.Payload, admin); err != nil { + return fmt.Errorf("unmarshalling admin message: %w", err) + } + payload = admin + switch adminPayload := admin.PayloadVariant.(type) { + case *pb.AdminMessage_GetDeviceMetadataRequest: + if data.WantResponse && toUs { + reply = &pb.AdminMessage{ + PayloadVariant: &pb.AdminMessage_GetDeviceMetadataResponse{ + GetDeviceMetadataResponse: n.getDeviceMetadata(), + }, + } + } + default: + n.logger.Warnf("unhandled admin message type: %T", adminPayload) + } + case pb.PortNum_NODEINFO_APP: + user := &pb.User{} + if err := proto.Unmarshal(data.Payload, user); err != nil { + return fmt.Errorf("unmarshalling user: %w", err) + } + payload = user + n.logger.Info(")) NodeInfo", "user", user) + n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { + if nodeInfo.User != nil { + if len(nodeInfo.User.PublicKey) > 0 && !bytes.Equal(nodeInfo.User.PublicKey, user.PublicKey) { + n.logger.Error("NodeInfo w/ different key", "nodeInfo", nodeInfo, "new", user) + return false // do not update + } + } + nodeInfo.User = user + return true + }) + if data.WantResponse && toUs { + reply = n.user + } + case pb.PortNum_TEXT_MESSAGE_APP: + n.logger.Info(")) TextMessage", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "text", string(data.Payload)) + case pb.PortNum_ROUTING_APP: + routing := &pb.Routing{} + if err := proto.Unmarshal(data.Payload, routing); err != nil { + return fmt.Errorf("unmarshalling routingPayload: %w", err) + } + payload = routing + n.logger.Info(")) Routing", "routing", routing) + case pb.PortNum_TRACEROUTE_APP: + routeDiscovery := &pb.RouteDiscovery{} + if err := proto.Unmarshal(data.Payload, routeDiscovery); err != nil { + return fmt.Errorf("unmarshalling traceroute payload: %w", err) + } + payload = routeDiscovery + n.logger.Info(")) RouteDiscovery", "channel", channel.Name, "source", meshtastic.NodeID(source).String(), "payload", routeDiscovery) + if !toUs { + break + } + towardsDestination := data.ReplyId == 0 + var route *[]uint32 + var snr *[]int32 + if towardsDestination { + route = &routeDiscovery.Route + snr = &routeDiscovery.SnrTowards + } else { + route = &routeDiscovery.RouteBack + snr = &routeDiscovery.SnrBack + } + // Only insert unknown hops if hop_start is valid + if p.HopStart != 0 && p.HopLimit <= p.HopStart { + var hopsTaken uint8 = uint8(p.HopStart - p.HopLimit) + diff := int(hopsTaken) - len(*route) + for i := 0; i < diff; i++ { + if len(*route) < ROUTE_MAX_SIZE { + *route = append(*route, broadcastID) + } + } + // Add unknown SNR values if necessary + diff = len(*route) - len(*snr) + for i := 0; i < diff; i++ { + if len(*snr) < ROUTE_MAX_SIZE { + *snr = append(*snr, 0xFF) + } + } + } + + if len(*snr) < ROUTE_MAX_SIZE { + *snr = append(*snr, int32(p.RxSnr*4)) + } + // We don't need to add route, if we are the dest + // // Length of route array can normally not be exceeded due to the max. hop_limit of 7 + // if *route_count < ROUTE_SIZE { + // route[*route_count] = myNodeInfo.my_node_num + // *route_count += 1 + // } else { + // LOG_WARN("Route exceeded maximum hop limit!") // Are you bridging networks? + // } + reply = routeDiscovery + case pb.PortNum_POSITION_APP: + position := &pb.Position{} + if err := proto.Unmarshal(data.Payload, position); err != nil { + return fmt.Errorf("unmarshalling positionPayload: %w", err) + } + payload = position + n.logger.Info(")) Position", "position", position) + if data.WantResponse && toUs { + reply = n.getPosition() + } else { + n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.Position = position + return true + }) + } + case pb.PortNum_TELEMETRY_APP: + telemetry := &pb.Telemetry{} + if err := proto.Unmarshal(data.Payload, telemetry); err != nil { + return fmt.Errorf("unmarshalling telemetryPayload: %w", err) + } + payload = telemetry + n.logger.Info(")) Telemetry deviceMetrics", "telemetry", telemetry) + if data.WantResponse && toUs { + metrics, err := n.getDeviceMetrics() + if err != nil { + break + } + reply = metrics + } else { + deviceMetrics := telemetry.GetDeviceMetrics() + if deviceMetrics == nil { + break + } + n.NodeInfo(source, func(nodeInfo *pb.NodeInfo) bool { + nodeInfo.DeviceMetrics = deviceMetrics + return true + }) + } + default: + n.logger.Debug(")) !Unhandled app payload", "data", data) + } + n.dl.LogPacket(p, channel, src, dst, data, payload) + if toUs && p.WantAck { + err := n.txPacket(buildRoutingReply(&pb.MeshPacket{ + From: p.From, + To: p.To, + HopStart: uint32(p.HopStart), + Channel: channel.ChannelNum, + Id: p.Id, + PkiEncrypted: msgFromRadio.GetPacket().PkiEncrypted, + PublicKey: msgFromRadio.GetPacket().PublicKey, + }, pb.Routing_NONE)) + if err != nil { + n.logger.Warn("failed to send text ack", "err", err) + } + } + if reply != nil { + err := n.txPortnumMessage(channel.ChannelNum, source, replyPortNum, reply) + if err != nil { + n.logger.Error("Failed to reply", "err", err, "reply", reply) + } else { + n.dl.LogPacket(p, channel, dst, src, &pb.Data{Portnum: replyPortNum}, payload) + } + } + + return nil +} diff --git a/timers.go b/timers.go new file mode 100644 index 0000000..095e42c --- /dev/null +++ b/timers.go @@ -0,0 +1,23 @@ +package main + +import ( + "context" + "time" +) + +func FuncTicker(interval time.Duration, ctx context.Context, fn func(context.Context) error) func() error { + return func() error { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + } + if err := fn(ctx); err != nil { + return err + } + } + } +} -- cgit v1.2.3