aboutsummaryrefslogtreecommitdiff
path: root/dedupe/dedupe.go
diff options
context:
space:
mode:
Diffstat (limited to 'dedupe/dedupe.go')
-rw-r--r--dedupe/dedupe.go68
1 files changed, 29 insertions, 39 deletions
diff --git a/dedupe/dedupe.go b/dedupe/dedupe.go
index 44f02c5..c8ff5e9 100644
--- a/dedupe/dedupe.go
+++ b/dedupe/dedupe.go
@@ -1,9 +1,7 @@
-package dedupe
+package dedup
import (
- "encoding/hex"
- "fmt"
- "hash"
+ "hash/maphash"
"sync"
"time"
)
@@ -11,23 +9,22 @@ import (
// PacketDeduplicator is a structure that prevents processing of duplicate packets.
// It keeps a record of seen packets and the time they were last seen.
type PacketDeduplicator struct {
- hasher hash.Hash // hasher is used for generating packet identifiers.
- expiresAfter time.Duration // expiresAfter defines the duration after which a seen packet record expires.
- sync.RWMutex // RWMutex is used to protect the seen map from concurrent access.
- seen map[string]time.Time // seen maps a packet identifier to the last time it was seen.
+ expiresAfter time.Duration // expiresAfter defines the duration after which a seen packet record expires.
+ seed maphash.Seed
+ sync.Mutex // protects the seen map from concurrent access.
+ seen map[uint64]time.Time // seen maps a packet identifier to the last time it was seen.
}
// NewDeduplicator creates a new PacketDeduplicator with a given hasher and expiration duration for packet records.
// It starts a background goroutine to periodically clean up expired packet records.
-func NewDeduplicator(hasher hash.Hash, expiresAfter time.Duration) *PacketDeduplicator {
- pd := PacketDeduplicator{
- seen: make(map[string]time.Time),
- hasher: hasher,
+func NewDeduplicator(expiresAfter time.Duration) *PacketDeduplicator {
+ pd := &PacketDeduplicator{
+ seen: map[uint64]time.Time{},
+ seed: maphash.MakeSeed(),
expiresAfter: expiresAfter,
}
go func() {
- for {
- time.Sleep(expiresAfter)
+ for range time.NewTicker(time.Second * 10).C {
pd.Lock()
for packet, timestamp := range pd.seen {
if time.Since(timestamp) > pd.expiresAfter {
@@ -38,39 +35,32 @@ func NewDeduplicator(hasher hash.Hash, expiresAfter time.Duration) *PacketDedupl
}
}()
- return &pd
+ return pd
}
-// Seen checks whether a packet with the given sender and packetID has been seen before.
-// If not, it records the packet as seen and returns false. Otherwise, it returns true.
-func (p *PacketDeduplicator) Seen(sender, packetID uint32) bool {
- asString := fmt.Sprintf("%d-%d", sender, packetID)
- p.RLock()
- if _, exists := p.seen[asString]; !exists {
- p.RUnlock()
- p.Lock()
- defer p.Unlock()
- p.seen[asString] = time.Now()
+func (p *PacketDeduplicator) isSeen(k uint64) bool {
+ p.Lock()
+ defer p.Unlock()
+ d, exists := p.seen[k]
+ if !exists {
+ p.seen[k] = time.Now()
+ return false
+ }
+ if time.Since(d) > p.expiresAfter {
+ delete(p.seen, k)
return false
}
- p.RUnlock()
return true
}
+// Seen checks whether a packet with the given sender and packetID has been seen before.
+// If not, it records the packet as seen and returns false. Otherwise, it returns true.
+func (p *PacketDeduplicator) Seen(sender, packetID uint32) bool {
+ return p.isSeen((uint64(sender) << 32) | uint64(packetID))
+}
+
// SeenData checks whether the data has been seen before based on its hashed value.
// If not, it records the data as seen and returns false. Otherwise, it returns true.
func (p *PacketDeduplicator) SeenData(data []byte) bool {
- hashed := p.hasher.Sum(data)
- asHex := hex.EncodeToString(hashed)
- p.RLock()
- if _, exists := p.seen[asHex]; !exists {
- p.RUnlock()
- p.Lock()
- defer p.Unlock()
- p.seen[asHex] = time.Now()
- return false
- }
- p.RUnlock()
-
- return true
+ return p.isSeen(maphash.Bytes(p.seed, data))
}