aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dedupe/dedupe.go68
-rw-r--r--dedupe/dedupe_test.go55
2 files changed, 73 insertions, 50 deletions
diff --git a/dedupe/dedupe.go b/dedupe/dedupe.go
index 44f02c5..c8ff5e9 100644
--- a/dedupe/dedupe.go
+++ b/dedupe/dedupe.go
@@ -1,9 +1,7 @@
-package dedupe
+package dedup
import (
- "encoding/hex"
- "fmt"
- "hash"
+ "hash/maphash"
"sync"
"time"
)
@@ -11,23 +9,22 @@ import (
// PacketDeduplicator is a structure that prevents processing of duplicate packets.
// It keeps a record of seen packets and the time they were last seen.
type PacketDeduplicator struct {
- hasher hash.Hash // hasher is used for generating packet identifiers.
- expiresAfter time.Duration // expiresAfter defines the duration after which a seen packet record expires.
- sync.RWMutex // RWMutex is used to protect the seen map from concurrent access.
- seen map[string]time.Time // seen maps a packet identifier to the last time it was seen.
+ expiresAfter time.Duration // expiresAfter defines the duration after which a seen packet record expires.
+ seed maphash.Seed
+ sync.Mutex // protects the seen map from concurrent access.
+ seen map[uint64]time.Time // seen maps a packet identifier to the last time it was seen.
}
// NewDeduplicator creates a new PacketDeduplicator with a given hasher and expiration duration for packet records.
// It starts a background goroutine to periodically clean up expired packet records.
-func NewDeduplicator(hasher hash.Hash, expiresAfter time.Duration) *PacketDeduplicator {
- pd := PacketDeduplicator{
- seen: make(map[string]time.Time),
- hasher: hasher,
+func NewDeduplicator(expiresAfter time.Duration) *PacketDeduplicator {
+ pd := &PacketDeduplicator{
+ seen: map[uint64]time.Time{},
+ seed: maphash.MakeSeed(),
expiresAfter: expiresAfter,
}
go func() {
- for {
- time.Sleep(expiresAfter)
+ for range time.NewTicker(time.Second * 10).C {
pd.Lock()
for packet, timestamp := range pd.seen {
if time.Since(timestamp) > pd.expiresAfter {
@@ -38,39 +35,32 @@ func NewDeduplicator(hasher hash.Hash, expiresAfter time.Duration) *PacketDedupl
}
}()
- return &pd
+ return pd
}
-// Seen checks whether a packet with the given sender and packetID has been seen before.
-// If not, it records the packet as seen and returns false. Otherwise, it returns true.
-func (p *PacketDeduplicator) Seen(sender, packetID uint32) bool {
- asString := fmt.Sprintf("%d-%d", sender, packetID)
- p.RLock()
- if _, exists := p.seen[asString]; !exists {
- p.RUnlock()
- p.Lock()
- defer p.Unlock()
- p.seen[asString] = time.Now()
+func (p *PacketDeduplicator) isSeen(k uint64) bool {
+ p.Lock()
+ defer p.Unlock()
+ d, exists := p.seen[k]
+ if !exists {
+ p.seen[k] = time.Now()
+ return false
+ }
+ if time.Since(d) > p.expiresAfter {
+ delete(p.seen, k)
return false
}
- p.RUnlock()
return true
}
+// Seen checks whether a packet with the given sender and packetID has been seen before.
+// If not, it records the packet as seen and returns false. Otherwise, it returns true.
+func (p *PacketDeduplicator) Seen(sender, packetID uint32) bool {
+ return p.isSeen((uint64(sender) << 32) | uint64(packetID))
+}
+
// SeenData checks whether the data has been seen before based on its hashed value.
// If not, it records the data as seen and returns false. Otherwise, it returns true.
func (p *PacketDeduplicator) SeenData(data []byte) bool {
- hashed := p.hasher.Sum(data)
- asHex := hex.EncodeToString(hashed)
- p.RLock()
- if _, exists := p.seen[asHex]; !exists {
- p.RUnlock()
- p.Lock()
- defer p.Unlock()
- p.seen[asHex] = time.Now()
- return false
- }
- p.RUnlock()
-
- return true
+ return p.isSeen(maphash.Bytes(p.seed, data))
}
diff --git a/dedupe/dedupe_test.go b/dedupe/dedupe_test.go
index cc6ce70..5891429 100644
--- a/dedupe/dedupe_test.go
+++ b/dedupe/dedupe_test.go
@@ -1,16 +1,15 @@
-package dedupe_test
+package dedup
import (
- "crypto/md5"
- "github.com/crypto-smoke/meshtastic-go/dedupe"
"testing"
+ "testing/quick"
"time"
)
+const expiresAfter = time.Millisecond
+
func TestPacketDeduplicatorSeen(t *testing.T) {
- hasher := md5.New()
- expiresAfter := 100 * time.Millisecond
- dedup := dedupe.NewDeduplicator(hasher, expiresAfter)
+ dedup := NewDeduplicator(expiresAfter)
sender := uint32(1)
packetID := uint32(1)
@@ -26,7 +25,7 @@ func TestPacketDeduplicatorSeen(t *testing.T) {
}
// Wait for expiration
- time.Sleep(expiresAfter + 100*time.Millisecond)
+ time.Sleep(expiresAfter + time.Millisecond)
// Test Seen with same packetID after expiration
if dedup.Seen(sender, packetID) {
@@ -34,10 +33,44 @@ func TestPacketDeduplicatorSeen(t *testing.T) {
}
}
+func TestDuplicatorProp(t *testing.T) {
+ if err := quick.Check(func(s, p uint32) bool {
+ dedup := NewDeduplicator(expiresAfter)
+ // Test Seen with new packetID
+ if dedup.Seen(s, p) {
+ t.Error("Expected the packet to not have been seen")
+ return false
+ }
+
+ // Test Seen with same packetID again
+ if !dedup.Seen(s, p) {
+ t.Error("Expected the packet to have been seen")
+ return false
+ }
+ return true
+ }, nil); err != nil {
+ t.Error(err)
+ }
+}
+
+func FuzzDup(f *testing.F) {
+ f.Fuzz(func(t *testing.T, s uint32, p uint32) {
+ dedup := NewDeduplicator(10*time.Second)
+ // Test Seen with new packetID
+ if dedup.Seen(s, p) {
+ t.Errorf("Expected the packet %v/%v to not have been seen the first time", s, p)
+ }
+
+ // Test Seen with same packetID again
+ if !dedup.Seen(s, p) {
+ t.Errorf("Expected the packet %v/%v to have been seen", s, p)
+ }
+
+ })
+}
+
func TestPacketDeduplicatorSeenData(t *testing.T) {
- hasher := md5.New()
- expiresAfter := 100 * time.Millisecond
- dedup := dedupe.NewDeduplicator(hasher, expiresAfter)
+ dedup := NewDeduplicator(expiresAfter)
data := []byte("test data")
@@ -52,7 +85,7 @@ func TestPacketDeduplicatorSeenData(t *testing.T) {
}
// Wait for expiration
- time.Sleep(expiresAfter + 100*time.Millisecond)
+ time.Sleep(expiresAfter + time.Millisecond)
// Test SeenData with same data after expiration
if dedup.SeenData(data) {