diff options
| author | Dustin Sallings <[email protected]> | 2024-02-17 09:44:52 -1000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-02-17 09:44:52 -1000 |
| commit | 3bfd8048b5a93e4ede740dbb51baa16e0262da25 (patch) | |
| tree | 2d7aa2871a0aae264a42e8d9b78102d51d98d151 | |
| parent | 3c4328b423ba54ec9484f9ec270a05e9a0bf4b35 (diff) | |
A bit structurally simpler deduplication (#18)
No strings and more tests
| -rw-r--r-- | dedupe/dedupe.go | 68 | ||||
| -rw-r--r-- | dedupe/dedupe_test.go | 55 |
2 files changed, 73 insertions, 50 deletions
diff --git a/dedupe/dedupe.go b/dedupe/dedupe.go index 44f02c5..c8ff5e9 100644 --- a/dedupe/dedupe.go +++ b/dedupe/dedupe.go @@ -1,9 +1,7 @@ -package dedupe +package dedup import ( - "encoding/hex" - "fmt" - "hash" + "hash/maphash" "sync" "time" ) @@ -11,23 +9,22 @@ import ( // PacketDeduplicator is a structure that prevents processing of duplicate packets. // It keeps a record of seen packets and the time they were last seen. type PacketDeduplicator struct { - hasher hash.Hash // hasher is used for generating packet identifiers. - expiresAfter time.Duration // expiresAfter defines the duration after which a seen packet record expires. - sync.RWMutex // RWMutex is used to protect the seen map from concurrent access. - seen map[string]time.Time // seen maps a packet identifier to the last time it was seen. + expiresAfter time.Duration // expiresAfter defines the duration after which a seen packet record expires. + seed maphash.Seed + sync.Mutex // protects the seen map from concurrent access. + seen map[uint64]time.Time // seen maps a packet identifier to the last time it was seen. } // NewDeduplicator creates a new PacketDeduplicator with a given hasher and expiration duration for packet records. // It starts a background goroutine to periodically clean up expired packet records. -func NewDeduplicator(hasher hash.Hash, expiresAfter time.Duration) *PacketDeduplicator { - pd := PacketDeduplicator{ - seen: make(map[string]time.Time), - hasher: hasher, +func NewDeduplicator(expiresAfter time.Duration) *PacketDeduplicator { + pd := &PacketDeduplicator{ + seen: map[uint64]time.Time{}, + seed: maphash.MakeSeed(), expiresAfter: expiresAfter, } go func() { - for { - time.Sleep(expiresAfter) + for range time.NewTicker(time.Second * 10).C { pd.Lock() for packet, timestamp := range pd.seen { if time.Since(timestamp) > pd.expiresAfter { @@ -38,39 +35,32 @@ func NewDeduplicator(hasher hash.Hash, expiresAfter time.Duration) *PacketDedupl } }() - return &pd + return pd } -// Seen checks whether a packet with the given sender and packetID has been seen before. -// If not, it records the packet as seen and returns false. Otherwise, it returns true. -func (p *PacketDeduplicator) Seen(sender, packetID uint32) bool { - asString := fmt.Sprintf("%d-%d", sender, packetID) - p.RLock() - if _, exists := p.seen[asString]; !exists { - p.RUnlock() - p.Lock() - defer p.Unlock() - p.seen[asString] = time.Now() +func (p *PacketDeduplicator) isSeen(k uint64) bool { + p.Lock() + defer p.Unlock() + d, exists := p.seen[k] + if !exists { + p.seen[k] = time.Now() + return false + } + if time.Since(d) > p.expiresAfter { + delete(p.seen, k) return false } - p.RUnlock() return true } +// Seen checks whether a packet with the given sender and packetID has been seen before. +// If not, it records the packet as seen and returns false. Otherwise, it returns true. +func (p *PacketDeduplicator) Seen(sender, packetID uint32) bool { + return p.isSeen((uint64(sender) << 32) | uint64(packetID)) +} + // SeenData checks whether the data has been seen before based on its hashed value. // If not, it records the data as seen and returns false. Otherwise, it returns true. func (p *PacketDeduplicator) SeenData(data []byte) bool { - hashed := p.hasher.Sum(data) - asHex := hex.EncodeToString(hashed) - p.RLock() - if _, exists := p.seen[asHex]; !exists { - p.RUnlock() - p.Lock() - defer p.Unlock() - p.seen[asHex] = time.Now() - return false - } - p.RUnlock() - - return true + return p.isSeen(maphash.Bytes(p.seed, data)) } diff --git a/dedupe/dedupe_test.go b/dedupe/dedupe_test.go index cc6ce70..5891429 100644 --- a/dedupe/dedupe_test.go +++ b/dedupe/dedupe_test.go @@ -1,16 +1,15 @@ -package dedupe_test +package dedup import ( - "crypto/md5" - "github.com/crypto-smoke/meshtastic-go/dedupe" "testing" + "testing/quick" "time" ) +const expiresAfter = time.Millisecond + func TestPacketDeduplicatorSeen(t *testing.T) { - hasher := md5.New() - expiresAfter := 100 * time.Millisecond - dedup := dedupe.NewDeduplicator(hasher, expiresAfter) + dedup := NewDeduplicator(expiresAfter) sender := uint32(1) packetID := uint32(1) @@ -26,7 +25,7 @@ func TestPacketDeduplicatorSeen(t *testing.T) { } // Wait for expiration - time.Sleep(expiresAfter + 100*time.Millisecond) + time.Sleep(expiresAfter + time.Millisecond) // Test Seen with same packetID after expiration if dedup.Seen(sender, packetID) { @@ -34,10 +33,44 @@ func TestPacketDeduplicatorSeen(t *testing.T) { } } +func TestDuplicatorProp(t *testing.T) { + if err := quick.Check(func(s, p uint32) bool { + dedup := NewDeduplicator(expiresAfter) + // Test Seen with new packetID + if dedup.Seen(s, p) { + t.Error("Expected the packet to not have been seen") + return false + } + + // Test Seen with same packetID again + if !dedup.Seen(s, p) { + t.Error("Expected the packet to have been seen") + return false + } + return true + }, nil); err != nil { + t.Error(err) + } +} + +func FuzzDup(f *testing.F) { + f.Fuzz(func(t *testing.T, s uint32, p uint32) { + dedup := NewDeduplicator(10*time.Second) + // Test Seen with new packetID + if dedup.Seen(s, p) { + t.Errorf("Expected the packet %v/%v to not have been seen the first time", s, p) + } + + // Test Seen with same packetID again + if !dedup.Seen(s, p) { + t.Errorf("Expected the packet %v/%v to have been seen", s, p) + } + + }) +} + func TestPacketDeduplicatorSeenData(t *testing.T) { - hasher := md5.New() - expiresAfter := 100 * time.Millisecond - dedup := dedupe.NewDeduplicator(hasher, expiresAfter) + dedup := NewDeduplicator(expiresAfter) data := []byte("test data") @@ -52,7 +85,7 @@ func TestPacketDeduplicatorSeenData(t *testing.T) { } // Wait for expiration - time.Sleep(expiresAfter + 100*time.Millisecond) + time.Sleep(expiresAfter + time.Millisecond) // Test SeenData with same data after expiration if dedup.SeenData(data) { |
