Retry failed connection attempts, include peers in statistics, and transmit in a separate thread

This commit is contained in:
maride 2021-04-26 12:22:11 +02:00
parent c7dff7c496
commit 0b18b19fe5
4 changed files with 62 additions and 13 deletions

View File

@ -3,7 +3,6 @@ package net
import ( import (
"fmt" "fmt"
"github.com/maride/afl-transmit/stats" "github.com/maride/afl-transmit/stats"
"log"
"net" "net"
"regexp" "regexp"
"strings" "strings"
@ -36,35 +35,32 @@ func CreatePeer(address string) Peer {
} }
// Sends the given content to the peer // Sends the given content to the peer
func (p *Peer) SendToPeer(content []byte) { func (p *Peer) SendToPeer(content []byte) error {
// Encrypt content if desired // Encrypt content if desired
if CryptApplicable() { if CryptApplicable() {
// Encrypt packet // Encrypt packet
var encryptErr error var encryptErr error
content, encryptErr = Encrypt(content) content, encryptErr = Encrypt(content)
if encryptErr != nil { if encryptErr != nil {
log.Printf("Failed to decrypt packet from %s: %s", p.Address, encryptErr) return fmt.Errorf("Failed to decrypt packet from %s: %s", p.Address, encryptErr)
return
} }
} }
// Build up a connection // Build up a connection
tcpConn, dialErr := net.Dial("tcp", p.Address) tcpConn, dialErr := net.Dial("tcp", p.Address)
if dialErr != nil { if dialErr != nil {
log.Printf("Unable to connect to peer %s: %s", p.Address, dialErr) return fmt.Errorf("Unable to connect to peer %s: %s", p.Address, dialErr)
return
} }
// Send // Send
written, writeErr := tcpConn.Write(content) written, writeErr := tcpConn.Write(content)
if writeErr != nil { if writeErr != nil {
log.Printf("Unable to write to peer %s: %s", tcpConn.RemoteAddr().String(), writeErr) return fmt.Errorf("Unable to write to peer %s: %s", tcpConn.RemoteAddr().String(), writeErr)
return
} }
// Push written bytes to stats // Push written bytes to stats
stats.PushStat(stats.Stat{SentBytes: uint64(written)}) stats.PushStat(stats.Stat{SentBytes: uint64(written)})
// Close connection // Close connection
tcpConn.Close() return tcpConn.Close()
} }

View File

@ -2,10 +2,12 @@ package net
import ( import (
"flag" "flag"
"github.com/maride/afl-transmit/stats"
"io/ioutil" "io/ioutil"
"log" "log"
"net" "net"
"strings" "strings"
"time"
) )
var ( var (
@ -24,9 +26,45 @@ func RegisterSenderFlags() {
// Send the given content to all peers // Send the given content to all peers
func SendToPeers(content []byte) { func SendToPeers(content []byte) {
for _, p := range peers { // Reset stats
p.SendToPeer(content) alivePeers := uint8(0)
// Peers where the sending process initially failed
var failedPeers []Peer
for i := 0; i < len(peers); i++ {
// Send to that peer
sendErr := peers[i].SendToPeer(content)
if sendErr != nil {
// Sending failed, retry in a second
failedPeers = append(failedPeers, peers[i])
continue
} }
alivePeers++
}
// Set stats now - we might be updating it in a few minutes again
stats.SetAlivePeers(alivePeers)
// Sleep so our peers maybe come up again
if len(failedPeers) > 0 {
time.Sleep(10 * time.Second)
}
// Retry
for i := 0; i < len(failedPeers); i++ {
// Send to that peer
sendErr := failedPeers[i].SendToPeer(content)
if sendErr != nil {
// Sending failed - inform user
log.Printf("Transmission failed after retry: %s", sendErr)
continue
}
alivePeers++
}
// Finally update it again, now including those peers where we retried it
stats.SetAlivePeers(alivePeers)
} }
// Parses both peerString and peerFile, and adds all the peers to an internal array. // Parses both peerString and peerFile, and adds all the peers to an internal array.
@ -54,6 +92,11 @@ func ReadPeers() {
removeLocalPeers() removeLocalPeers()
} }
// Update stats, include registered peers
stats.PushStat(stats.Stat{
RegisteredPeers: uint8(len(peers)),
})
log.Printf("Configured %d unique peers.", len(peers)) log.Printf("Configured %d unique peers.", len(peers))
} }

View File

@ -11,6 +11,8 @@ import (
type Stat struct { type Stat struct {
SentBytes uint64 SentBytes uint64
ReceivedBytes uint64 ReceivedBytes uint64
RegisteredPeers uint8
AlivePeer uint8
} }
// statPipe is a channel used to // statPipe is a channel used to
@ -25,9 +27,17 @@ func RegisterStatsFlags() {
} }
// PushStat pushes the given stat // PushStat pushes the given stat
// Note that SentBytes, ReceivedBytes and RegisteredPeers are added to the current number,
// while AlivePeer is interfaced with SetAlivePeers and is left ignored by PushStat
func PushStat(s Stat) { func PushStat(s Stat) {
stats.SentBytes += s.SentBytes stats.SentBytes += s.SentBytes
stats.ReceivedBytes += s.ReceivedBytes stats.ReceivedBytes += s.ReceivedBytes
stats.RegisteredPeers += s.RegisteredPeers
}
// SetAlivePeers sets the number of alive peers, means peers we could connect to
func SetAlivePeers(n uint8) {
stats.AlivePeer = n
} }
// PrintStats periodically prints the collected statistics // PrintStats periodically prints the collected statistics
@ -47,6 +57,6 @@ func PrintStats() {
bIn := humanize.Bytes(stats.ReceivedBytes) bIn := humanize.Bytes(stats.ReceivedBytes)
bOut := humanize.Bytes(stats.SentBytes) bOut := humanize.Bytes(stats.SentBytes)
fmt.Printf("%s in / %s out\r", bIn, bOut) fmt.Printf("Traffic: %s in / %s out | Peers: %d seen / %d registered\t\r", bIn, bOut, stats.AlivePeer, stats.RegisteredPeers)
} }
} }

View File

@ -34,7 +34,7 @@ func WatchFuzzers(outputDirectory string) {
} }
// and send it to our peers // and send it to our peers
net.SendToPeers(packedFuzzers) go net.SendToPeers(packedFuzzers)
// Sleep a bit // Sleep a bit
time.Sleep(time.Duration(rescan) * time.Minute) time.Sleep(time.Duration(rescan) * time.Minute)