diff options
Diffstat (limited to 'database.go')
| -rw-r--r-- | database.go | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/database.go b/database.go new file mode 100644 index 0000000..dc410d8 --- /dev/null +++ b/database.go @@ -0,0 +1,187 @@ +package main + +import ( + "encoding/binary" + "errors" + "fmt" + "time" + + "github.com/cockroachdb/pebble/v2" + pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" + "google.golang.org/protobuf/proto" +) + +type BucketEnum uint16 + +const ( + BucketState BucketEnum = iota + BucketNodeInfo + BucketFromRadio +) + +type StateKeyEnum uint32 + +const ( + StateKeyNull StateKeyEnum = iota + StateKeyPhoneLastUpdate +) + +var ( + errNotExists = errors.New("not exist") +) + +type Database struct { + p *pebble.DB +} + +func NewDB(dbPath string) (*Database, error) { + p, err := pebble.Open(dbPath, &pebble.Options{}) + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + return &Database{ + p: p, + }, nil +} + +func (db *Database) Close() error { + return db.p.Close() +} + +func (db *Database) keyUint32(bucket BucketEnum, k uint32) []byte { + var key [6]byte + binary.BigEndian.PutUint16(key[0:2], uint16(bucket)) + binary.BigEndian.PutUint32(key[2:6], k) + return key[:] +} + +func (db *Database) keyUint64(bucket BucketEnum, k uint64) []byte { + var key [10]byte + binary.BigEndian.PutUint16(key[0:2], uint16(bucket)) + binary.BigEndian.PutUint64(key[2:10], k) + return key[:] +} + +func (db *Database) Get(key []byte) ([]byte, error) { + data, rd, err := db.p.Get(key) + if err == pebble.ErrNotFound { + return nil, errNotExists + } else if err != nil { + return nil, err + } + defer rd.Close() + return data, nil +} + +func (db *Database) Set(key []byte, value []byte) error { + return db.p.Set(key, value, pebble.Sync) +} + +func (db *Database) GetState(key StateKeyEnum) ([]byte, error) { + data, rd, err := db.p.Get(db.keyUint32(BucketState, uint32(key))) + if err == pebble.ErrNotFound { + return nil, errNotExists + } else if err != nil { + return nil, err + } + defer rd.Close() + return data, nil +} + +func (db *Database) SetState(key StateKeyEnum, value []byte) error { + return db.p.Set(db.keyUint32(BucketState, uint32(key)), value, pebble.Sync) +} + +func (db *Database) SetNodeInfo(num uint32, ni *pb.NodeInfo) error { + key := db.keyUint32(BucketNodeInfo, num) + if num != 0 && ni.Num == 0 { + return db.p.Delete(key, pebble.Sync) + } + ni.Num = num + data, err := proto.Marshal(ni) + if err != nil { + return err + } + return db.p.Set(key, data, pebble.Sync) +} + +func (db *Database) GetNodeInfo(num uint32) (*pb.NodeInfo, error) { + data, rd, err := db.p.Get(db.keyUint32(BucketNodeInfo, num)) + if err == pebble.ErrNotFound { + return nil, errNotExists + } else if err != nil { + return nil, err + } + defer rd.Close() + var ni pb.NodeInfo + if err = proto.Unmarshal(data, &ni); err != nil { + return nil, err + } + return &ni, nil +} + +func (db *Database) ListNodeInfo() ([]*pb.NodeInfo, error) { + it, err := db.p.NewIter(&pebble.IterOptions{ + LowerBound: db.keyUint32(BucketNodeInfo, uint32(0)), + UpperBound: db.keyUint32(BucketNodeInfo+1, uint32(0)), + }) + if err != nil { + return nil, err + } + defer it.Close() + var list []*pb.NodeInfo + for it.First(); it.Valid(); it.Next() { + key := it.Key() + data := it.Value() + ni := &pb.NodeInfo{} + if err = proto.Unmarshal(data, ni); err != nil { + return nil, err + } + ni.Num = binary.BigEndian.Uint32(key[2:6]) + list = append(list, ni) + } + return list, nil +} + +func (db *Database) StoreFromRadio(tsMicro int64, msg *pb.FromRadio) error { + b, err := proto.Marshal(msg) + if err != nil { + return err + } + return db.p.Set(db.keyUint64(BucketFromRadio, uint64(tsMicro)), b, pebble.Sync) +} + +func (db *Database) ReadFromRadio(start, end time.Time) (<-chan *pb.FromRadio, <-chan error) { + c := make(chan *pb.FromRadio) + errc := make(chan error, 1) + lowerLimit := uint64(start.UnixMicro()) + upperLimit := uint64(end.UnixMicro()) + go func() { + defer close(c) + defer close(errc) + it, err := db.p.NewIter(&pebble.IterOptions{ + LowerBound: db.keyUint64(BucketFromRadio, lowerLimit), + UpperBound: db.keyUint64(BucketFromRadio+1, upperLimit), + }) + if err != nil { + errc <- err + return + } + defer it.Close() + for it.First(); it.Valid(); it.Next() { + key := it.Key() + data := it.Value() + tsMicro := binary.BigEndian.Uint64(key[2:]) + if tsMicro > upperLimit { + continue + } + msg := &pb.FromRadio{} + if err = proto.Unmarshal(data, msg); err != nil { + errc <- err + return + } + c <- msg + } + }() + return c, errc +} |
