package main import ( "encoding/binary" "errors" "fmt" "time" "github.com/cockroachdb/pebble/v2" pb "github.com/meshnet-gophers/meshtastic-go/meshtastic" "google.golang.org/protobuf/proto" ) type BucketEnum uint16 const ( BucketState BucketEnum = iota BucketNodeInfo BucketFromRadio ) type StateKeyEnum uint32 const ( StateKeyNull StateKeyEnum = iota StateKeyPhoneLastUpdate ) var ( errNotExists = errors.New("not exist") ) type Database struct { p *pebble.DB } func NewDB(dbPath string) (*Database, error) { p, err := pebble.Open(dbPath, &pebble.Options{}) if err != nil { return nil, fmt.Errorf("open database: %w", err) } return &Database{ p: p, }, nil } func (db *Database) Close() error { return db.p.Close() } func (db *Database) keyUint32(bucket BucketEnum, k uint32) []byte { var key [6]byte binary.BigEndian.PutUint16(key[0:2], uint16(bucket)) binary.BigEndian.PutUint32(key[2:6], k) return key[:] } func (db *Database) keyUint64(bucket BucketEnum, k uint64) []byte { var key [10]byte binary.BigEndian.PutUint16(key[0:2], uint16(bucket)) binary.BigEndian.PutUint64(key[2:10], k) return key[:] } func (db *Database) Get(key []byte) ([]byte, error) { data, rd, err := db.p.Get(key) if err == pebble.ErrNotFound { return nil, errNotExists } else if err != nil { return nil, err } defer rd.Close() return data, nil } func (db *Database) Set(key []byte, value []byte) error { return db.p.Set(key, value, pebble.Sync) } func (db *Database) GetState(key StateKeyEnum) ([]byte, error) { data, rd, err := db.p.Get(db.keyUint32(BucketState, uint32(key))) if err == pebble.ErrNotFound { return nil, errNotExists } else if err != nil { return nil, err } defer rd.Close() return data, nil } func (db *Database) SetState(key StateKeyEnum, value []byte) error { return db.p.Set(db.keyUint32(BucketState, uint32(key)), value, pebble.Sync) } func (db *Database) SetNodeInfo(num uint32, ni *pb.NodeInfo) error { key := db.keyUint32(BucketNodeInfo, num) if num != 0 && ni.Num == 0 { return db.p.Delete(key, pebble.Sync) } ni.Num = num data, err := proto.Marshal(ni) if err != nil { return err } return db.p.Set(key, data, pebble.Sync) } func (db *Database) GetNodeInfo(num uint32) (*pb.NodeInfo, error) { data, rd, err := db.p.Get(db.keyUint32(BucketNodeInfo, num)) if err == pebble.ErrNotFound { return nil, errNotExists } else if err != nil { return nil, err } defer rd.Close() var ni pb.NodeInfo if err = proto.Unmarshal(data, &ni); err != nil { return nil, err } return &ni, nil } func (db *Database) ListNodeInfo() ([]*pb.NodeInfo, error) { it, err := db.p.NewIter(&pebble.IterOptions{ LowerBound: db.keyUint32(BucketNodeInfo, uint32(0)), UpperBound: db.keyUint32(BucketNodeInfo+1, uint32(0)), }) if err != nil { return nil, err } defer it.Close() var list []*pb.NodeInfo for it.First(); it.Valid(); it.Next() { key := it.Key() data := it.Value() ni := &pb.NodeInfo{} if err = proto.Unmarshal(data, ni); err != nil { return nil, err } ni.Num = binary.BigEndian.Uint32(key[2:6]) list = append(list, ni) } return list, nil } func (db *Database) StoreFromRadio(tsMicro int64, msg *pb.FromRadio) error { b, err := proto.Marshal(msg) if err != nil { return err } return db.p.Set(db.keyUint64(BucketFromRadio, uint64(tsMicro)), b, pebble.Sync) } func (db *Database) ReadFromRadio(start, end time.Time) (<-chan *pb.FromRadio, <-chan error) { c := make(chan *pb.FromRadio) errc := make(chan error, 1) lowerLimit := uint64(start.UnixMicro()) upperLimit := uint64(end.UnixMicro()) go func() { defer close(c) defer close(errc) it, err := db.p.NewIter(&pebble.IterOptions{ LowerBound: db.keyUint64(BucketFromRadio, lowerLimit), UpperBound: db.keyUint64(BucketFromRadio+1, upperLimit), }) if err != nil { errc <- err return } defer it.Close() for it.First(); it.Valid(); it.Next() { key := it.Key() data := it.Value() tsMicro := binary.BigEndian.Uint64(key[2:]) if tsMicro > upperLimit { continue } msg := &pb.FromRadio{} if err = proto.Unmarshal(data, msg); err != nil { errc <- err return } c <- msg } }() return c, errc }