summaryrefslogtreecommitdiff
path: root/database.go
diff options
context:
space:
mode:
authorMarin Ivanov <[email protected]>2025-09-03 20:59:24 +0300
committerMarin Ivanov <[email protected]>2025-09-03 20:59:24 +0300
commit220671ed46e10e4c67741286da493df1d45cdcfc (patch)
tree0711f179f4619959fd937c2929c6247ed8ab124a /database.go
Initial release
Diffstat (limited to 'database.go')
-rw-r--r--database.go187
1 files changed, 187 insertions, 0 deletions
diff --git a/database.go b/database.go
new file mode 100644
index 0000000..dc410d8
--- /dev/null
+++ b/database.go
@@ -0,0 +1,187 @@
+package main
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/cockroachdb/pebble/v2"
+ pb "github.com/meshnet-gophers/meshtastic-go/meshtastic"
+ "google.golang.org/protobuf/proto"
+)
+
+type BucketEnum uint16
+
+const (
+ BucketState BucketEnum = iota
+ BucketNodeInfo
+ BucketFromRadio
+)
+
+type StateKeyEnum uint32
+
+const (
+ StateKeyNull StateKeyEnum = iota
+ StateKeyPhoneLastUpdate
+)
+
+var (
+ errNotExists = errors.New("not exist")
+)
+
+type Database struct {
+ p *pebble.DB
+}
+
+func NewDB(dbPath string) (*Database, error) {
+ p, err := pebble.Open(dbPath, &pebble.Options{})
+ if err != nil {
+ return nil, fmt.Errorf("open database: %w", err)
+ }
+ return &Database{
+ p: p,
+ }, nil
+}
+
+func (db *Database) Close() error {
+ return db.p.Close()
+}
+
+func (db *Database) keyUint32(bucket BucketEnum, k uint32) []byte {
+ var key [6]byte
+ binary.BigEndian.PutUint16(key[0:2], uint16(bucket))
+ binary.BigEndian.PutUint32(key[2:6], k)
+ return key[:]
+}
+
+func (db *Database) keyUint64(bucket BucketEnum, k uint64) []byte {
+ var key [10]byte
+ binary.BigEndian.PutUint16(key[0:2], uint16(bucket))
+ binary.BigEndian.PutUint64(key[2:10], k)
+ return key[:]
+}
+
+func (db *Database) Get(key []byte) ([]byte, error) {
+ data, rd, err := db.p.Get(key)
+ if err == pebble.ErrNotFound {
+ return nil, errNotExists
+ } else if err != nil {
+ return nil, err
+ }
+ defer rd.Close()
+ return data, nil
+}
+
+func (db *Database) Set(key []byte, value []byte) error {
+ return db.p.Set(key, value, pebble.Sync)
+}
+
+func (db *Database) GetState(key StateKeyEnum) ([]byte, error) {
+ data, rd, err := db.p.Get(db.keyUint32(BucketState, uint32(key)))
+ if err == pebble.ErrNotFound {
+ return nil, errNotExists
+ } else if err != nil {
+ return nil, err
+ }
+ defer rd.Close()
+ return data, nil
+}
+
+func (db *Database) SetState(key StateKeyEnum, value []byte) error {
+ return db.p.Set(db.keyUint32(BucketState, uint32(key)), value, pebble.Sync)
+}
+
+func (db *Database) SetNodeInfo(num uint32, ni *pb.NodeInfo) error {
+ key := db.keyUint32(BucketNodeInfo, num)
+ if num != 0 && ni.Num == 0 {
+ return db.p.Delete(key, pebble.Sync)
+ }
+ ni.Num = num
+ data, err := proto.Marshal(ni)
+ if err != nil {
+ return err
+ }
+ return db.p.Set(key, data, pebble.Sync)
+}
+
+func (db *Database) GetNodeInfo(num uint32) (*pb.NodeInfo, error) {
+ data, rd, err := db.p.Get(db.keyUint32(BucketNodeInfo, num))
+ if err == pebble.ErrNotFound {
+ return nil, errNotExists
+ } else if err != nil {
+ return nil, err
+ }
+ defer rd.Close()
+ var ni pb.NodeInfo
+ if err = proto.Unmarshal(data, &ni); err != nil {
+ return nil, err
+ }
+ return &ni, nil
+}
+
+func (db *Database) ListNodeInfo() ([]*pb.NodeInfo, error) {
+ it, err := db.p.NewIter(&pebble.IterOptions{
+ LowerBound: db.keyUint32(BucketNodeInfo, uint32(0)),
+ UpperBound: db.keyUint32(BucketNodeInfo+1, uint32(0)),
+ })
+ if err != nil {
+ return nil, err
+ }
+ defer it.Close()
+ var list []*pb.NodeInfo
+ for it.First(); it.Valid(); it.Next() {
+ key := it.Key()
+ data := it.Value()
+ ni := &pb.NodeInfo{}
+ if err = proto.Unmarshal(data, ni); err != nil {
+ return nil, err
+ }
+ ni.Num = binary.BigEndian.Uint32(key[2:6])
+ list = append(list, ni)
+ }
+ return list, nil
+}
+
+func (db *Database) StoreFromRadio(tsMicro int64, msg *pb.FromRadio) error {
+ b, err := proto.Marshal(msg)
+ if err != nil {
+ return err
+ }
+ return db.p.Set(db.keyUint64(BucketFromRadio, uint64(tsMicro)), b, pebble.Sync)
+}
+
+func (db *Database) ReadFromRadio(start, end time.Time) (<-chan *pb.FromRadio, <-chan error) {
+ c := make(chan *pb.FromRadio)
+ errc := make(chan error, 1)
+ lowerLimit := uint64(start.UnixMicro())
+ upperLimit := uint64(end.UnixMicro())
+ go func() {
+ defer close(c)
+ defer close(errc)
+ it, err := db.p.NewIter(&pebble.IterOptions{
+ LowerBound: db.keyUint64(BucketFromRadio, lowerLimit),
+ UpperBound: db.keyUint64(BucketFromRadio+1, upperLimit),
+ })
+ if err != nil {
+ errc <- err
+ return
+ }
+ defer it.Close()
+ for it.First(); it.Valid(); it.Next() {
+ key := it.Key()
+ data := it.Value()
+ tsMicro := binary.BigEndian.Uint64(key[2:])
+ if tsMicro > upperLimit {
+ continue
+ }
+ msg := &pb.FromRadio{}
+ if err = proto.Unmarshal(data, msg); err != nil {
+ errc <- err
+ return
+ }
+ c <- msg
+ }
+ }()
+ return c, errc
+}