From f32ea689d84f2e4a7b2ace811d47984058cb7bd1 Mon Sep 17 00:00:00 2001 From: Smoke <86024507+crypto-smoke@users.noreply.github.com> Date: Wed, 31 Jan 2024 10:18:12 -1000 Subject: add packet deduplicator --- dedupe.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 dedupe.go diff --git a/dedupe.go b/dedupe.go new file mode 100644 index 0000000..2b103b8 --- /dev/null +++ b/dedupe.go @@ -0,0 +1,66 @@ +package meshtastic + +import ( + "encoding/hex" + "fmt" + "hash" + "sync" + "time" +) + +type PacketDeduplicator struct { + hasher hash.Hash + expiresAfter time.Duration + sync.RWMutex + seen map[string]time.Time +} + +func NewDeduplicator(hasher hash.Hash, expiresAfter time.Duration) *PacketDeduplicator { + pd := PacketDeduplicator{ + seen: make(map[string]time.Time), + hasher: hasher, + expiresAfter: expiresAfter, + } + go func() { + for { + time.Sleep(expiresAfter) + pd.Lock() + for packet, timestamp := range pd.seen { + if time.Since(timestamp) > pd.expiresAfter { + delete(pd.seen, packet) + } + } + pd.Unlock() + } + }() + + return &pd +} +func (p *PacketDeduplicator) Seen(sender, packetID uint32) bool { + asString := fmt.Sprintf("%d-%d", sender, packetID) + p.RLock() + if _, exists := p.seen[asString]; !exists { + p.RUnlock() + p.Lock() + defer p.Unlock() + p.seen[asString] = time.Now() + return false + } + p.RUnlock() + return true +} +func (p *PacketDeduplicator) SeenData(data []byte) bool { + hashed := p.hasher.Sum(data) + asHex := hex.EncodeToString(hashed) + p.RLock() + if _, exists := p.seen[asHex]; !exists { + p.RUnlock() + p.Lock() + defer p.Unlock() + p.seen[asHex] = time.Now() + return false + } + p.RUnlock() + + return true +} -- cgit v1.2.3 From d39dc4342489f012fc462413d14037e298597426 Mon Sep 17 00:00:00 2001 From: Smoke <86024507+crypto-smoke@users.noreply.github.com> Date: Wed, 31 Jan 2024 10:18:12 -1000 Subject: add packet deduplicator --- dedupe.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 dedupe.go diff --git a/dedupe.go b/dedupe.go new file mode 100644 index 0000000..2b103b8 --- /dev/null +++ b/dedupe.go @@ -0,0 +1,66 @@ +package meshtastic + +import ( + "encoding/hex" + "fmt" + "hash" + "sync" + "time" +) + +type PacketDeduplicator struct { + hasher hash.Hash + expiresAfter time.Duration + sync.RWMutex + seen map[string]time.Time +} + +func NewDeduplicator(hasher hash.Hash, expiresAfter time.Duration) *PacketDeduplicator { + pd := PacketDeduplicator{ + seen: make(map[string]time.Time), + hasher: hasher, + expiresAfter: expiresAfter, + } + go func() { + for { + time.Sleep(expiresAfter) + pd.Lock() + for packet, timestamp := range pd.seen { + if time.Since(timestamp) > pd.expiresAfter { + delete(pd.seen, packet) + } + } + pd.Unlock() + } + }() + + return &pd +} +func (p *PacketDeduplicator) Seen(sender, packetID uint32) bool { + asString := fmt.Sprintf("%d-%d", sender, packetID) + p.RLock() + if _, exists := p.seen[asString]; !exists { + p.RUnlock() + p.Lock() + defer p.Unlock() + p.seen[asString] = time.Now() + return false + } + p.RUnlock() + return true +} +func (p *PacketDeduplicator) SeenData(data []byte) bool { + hashed := p.hasher.Sum(data) + asHex := hex.EncodeToString(hashed) + p.RLock() + if _, exists := p.seen[asHex]; !exists { + p.RUnlock() + p.Lock() + defer p.Unlock() + p.seen[asHex] = time.Now() + return false + } + p.RUnlock() + + return true +} -- cgit v1.2.3 From 71719e1dbfad35985e20a65865caf98ab9cf28e4 Mon Sep 17 00:00:00 2001 From: Smoke <86024507+crypto-smoke@users.noreply.github.com> Date: Wed, 31 Jan 2024 14:59:09 -1000 Subject: Add dedupe and tests (#4) * document dedupe.go * add dedupe test --- dedupe.go | 18 +++++++++++++---- dedupe_test.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 4 deletions(-) create mode 100644 dedupe_test.go diff --git a/dedupe.go b/dedupe.go index 2b103b8..97934e0 100644 --- a/dedupe.go +++ b/dedupe.go @@ -8,13 +8,17 @@ import ( "time" ) +// PacketDeduplicator is a structure that prevents processing of duplicate packets. +// It keeps a record of seen packets and the time they were last seen. type PacketDeduplicator struct { - hasher hash.Hash - expiresAfter time.Duration - sync.RWMutex - seen map[string]time.Time + hasher hash.Hash // hasher is used for generating packet identifiers. + expiresAfter time.Duration // expiresAfter defines the duration after which a seen packet record expires. + sync.RWMutex // RWMutex is used to protect the seen map from concurrent access. + seen map[string]time.Time // seen maps a packet identifier to the last time it was seen. } +// NewDeduplicator creates a new PacketDeduplicator with a given hasher and expiration duration for packet records. +// It starts a background goroutine to periodically clean up expired packet records. func NewDeduplicator(hasher hash.Hash, expiresAfter time.Duration) *PacketDeduplicator { pd := PacketDeduplicator{ seen: make(map[string]time.Time), @@ -36,6 +40,9 @@ func NewDeduplicator(hasher hash.Hash, expiresAfter time.Duration) *PacketDedupl return &pd } + +// Seen checks whether a packet with the given sender and packetID has been seen before. +// If not, it records the packet as seen and returns false. Otherwise, it returns true. func (p *PacketDeduplicator) Seen(sender, packetID uint32) bool { asString := fmt.Sprintf("%d-%d", sender, packetID) p.RLock() @@ -49,6 +56,9 @@ func (p *PacketDeduplicator) Seen(sender, packetID uint32) bool { p.RUnlock() return true } + +// SeenData checks whether the data has been seen before based on its hashed value. +// If not, it records the data as seen and returns false. Otherwise, it returns true. func (p *PacketDeduplicator) SeenData(data []byte) bool { hashed := p.hasher.Sum(data) asHex := hex.EncodeToString(hashed) diff --git a/dedupe_test.go b/dedupe_test.go new file mode 100644 index 0000000..6388360 --- /dev/null +++ b/dedupe_test.go @@ -0,0 +1,61 @@ +package meshtastic_test + +import ( + "crypto/md5" + "github.com/crypto-smoke/meshtastic-go" + "testing" + "time" +) + +func TestPacketDeduplicatorSeen(t *testing.T) { + hasher := md5.New() + expiresAfter := 100 * time.Millisecond + dedup := meshtastic.NewDeduplicator(hasher, expiresAfter) + + sender := uint32(1) + packetID := uint32(1) + + // Test Seen with new packetID + if dedup.Seen(sender, packetID) { + t.Error("Expected the packet to not have been seen") + } + + // Test Seen with same packetID again + if !dedup.Seen(sender, packetID) { + t.Error("Expected the packet to have been seen") + } + + // Wait for expiration + time.Sleep(expiresAfter + 100*time.Millisecond) + + // Test Seen with same packetID after expiration + if dedup.Seen(sender, packetID) { + t.Error("Expected the packet to not have been seen after expiration") + } +} + +func TestPacketDeduplicatorSeenData(t *testing.T) { + hasher := md5.New() + expiresAfter := 100 * time.Millisecond + dedup := meshtastic.NewDeduplicator(hasher, expiresAfter) + + data := []byte("test data") + + // Test SeenData with new data + if dedup.SeenData(data) { + t.Error("Expected the data to not have been seen") + } + + // Test SeenData with same data again + if !dedup.SeenData(data) { + t.Error("Expected the data to have been seen") + } + + // Wait for expiration + time.Sleep(expiresAfter + 100*time.Millisecond) + + // Test SeenData with same data after expiration + if dedup.SeenData(data) { + t.Error("Expected the data to not have been seen after expiration") + } +} -- cgit v1.2.3