From 0b18b19fe5dbb6f606d2b2875b45e9ae8372779f Mon Sep 17 00:00:00 2001 From: maride Date: Mon, 26 Apr 2021 12:22:11 +0200 Subject: [PATCH] Retry failed connection attempts, include peers in statistics, and transmit in a separate thread --- net/peer.go | 14 +++++-------- net/sender.go | 47 ++++++++++++++++++++++++++++++++++++++++++-- stats/stats.go | 12 ++++++++++- watchdog/watchdog.go | 2 +- 4 files changed, 62 insertions(+), 13 deletions(-) diff --git a/net/peer.go b/net/peer.go index 26228f0..66bcaca 100644 --- a/net/peer.go +++ b/net/peer.go @@ -3,7 +3,6 @@ package net import ( "fmt" "github.com/maride/afl-transmit/stats" - "log" "net" "regexp" "strings" @@ -36,35 +35,32 @@ func CreatePeer(address string) Peer { } // Sends the given content to the peer -func (p *Peer) SendToPeer(content []byte) { +func (p *Peer) SendToPeer(content []byte) error { // Encrypt content if desired if CryptApplicable() { // Encrypt packet var encryptErr error content, encryptErr = Encrypt(content) if encryptErr != nil { - log.Printf("Failed to decrypt packet from %s: %s", p.Address, encryptErr) - return + return fmt.Errorf("Failed to decrypt packet from %s: %s", p.Address, encryptErr) } } // Build up a connection tcpConn, dialErr := net.Dial("tcp", p.Address) if dialErr != nil { - log.Printf("Unable to connect to peer %s: %s", p.Address, dialErr) - return + return fmt.Errorf("Unable to connect to peer %s: %s", p.Address, dialErr) } // Send written, writeErr := tcpConn.Write(content) if writeErr != nil { - log.Printf("Unable to write to peer %s: %s", tcpConn.RemoteAddr().String(), writeErr) - return + return fmt.Errorf("Unable to write to peer %s: %s", tcpConn.RemoteAddr().String(), writeErr) } // Push written bytes to stats stats.PushStat(stats.Stat{SentBytes: uint64(written)}) // Close connection - tcpConn.Close() + return tcpConn.Close() } diff --git a/net/sender.go b/net/sender.go index d63544e..e5a4b89 100644 --- a/net/sender.go +++ b/net/sender.go @@ -2,10 +2,12 @@ package net import ( "flag" + "github.com/maride/afl-transmit/stats" "io/ioutil" "log" "net" "strings" + "time" ) var ( @@ -24,9 +26,45 @@ func RegisterSenderFlags() { // Send the given content to all peers func SendToPeers(content []byte) { - for _, p := range peers { - p.SendToPeer(content) + // Reset stats + alivePeers := uint8(0) + + // Peers where the sending process initially failed + var failedPeers []Peer + + for i := 0; i < len(peers); i++ { + // Send to that peer + sendErr := peers[i].SendToPeer(content) + if sendErr != nil { + // Sending failed, retry in a second + failedPeers = append(failedPeers, peers[i]) + continue + } + alivePeers++ } + + // Set stats now - we might be updating it in a few minutes again + stats.SetAlivePeers(alivePeers) + + // Sleep so our peers maybe come up again + if len(failedPeers) > 0 { + time.Sleep(10 * time.Second) + } + + // Retry + for i := 0; i < len(failedPeers); i++ { + // Send to that peer + sendErr := failedPeers[i].SendToPeer(content) + if sendErr != nil { + // Sending failed - inform user + log.Printf("Transmission failed after retry: %s", sendErr) + continue + } + alivePeers++ + } + + // Finally update it again, now including those peers where we retried it + stats.SetAlivePeers(alivePeers) } // Parses both peerString and peerFile, and adds all the peers to an internal array. @@ -54,6 +92,11 @@ func ReadPeers() { removeLocalPeers() } + // Update stats, include registered peers + stats.PushStat(stats.Stat{ + RegisteredPeers: uint8(len(peers)), + }) + log.Printf("Configured %d unique peers.", len(peers)) } diff --git a/stats/stats.go b/stats/stats.go index f6b6605..519a40d 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -11,6 +11,8 @@ import ( type Stat struct { SentBytes uint64 ReceivedBytes uint64 + RegisteredPeers uint8 + AlivePeer uint8 } // statPipe is a channel used to @@ -25,9 +27,17 @@ func RegisterStatsFlags() { } // PushStat pushes the given stat +// Note that SentBytes, ReceivedBytes and RegisteredPeers are added to the current number, +// while AlivePeer is interfaced with SetAlivePeers and is left ignored by PushStat func PushStat(s Stat) { stats.SentBytes += s.SentBytes stats.ReceivedBytes += s.ReceivedBytes + stats.RegisteredPeers += s.RegisteredPeers +} + +// SetAlivePeers sets the number of alive peers, means peers we could connect to +func SetAlivePeers(n uint8) { + stats.AlivePeer = n } // PrintStats periodically prints the collected statistics @@ -47,6 +57,6 @@ func PrintStats() { bIn := humanize.Bytes(stats.ReceivedBytes) bOut := humanize.Bytes(stats.SentBytes) - fmt.Printf("%s in / %s out\r", bIn, bOut) + fmt.Printf("Traffic: %s in / %s out | Peers: %d seen / %d registered\t\r", bIn, bOut, stats.AlivePeer, stats.RegisteredPeers) } } \ No newline at end of file diff --git a/watchdog/watchdog.go b/watchdog/watchdog.go index cf8232c..376e2fd 100644 --- a/watchdog/watchdog.go +++ b/watchdog/watchdog.go @@ -34,7 +34,7 @@ func WatchFuzzers(outputDirectory string) { } // and send it to our peers - net.SendToPeers(packedFuzzers) + go net.SendToPeers(packedFuzzers) // Sleep a bit time.Sleep(time.Duration(rescan) * time.Minute)