mirror of
https://github.com/apernet/OpenGFW.git
synced 2024-11-14 14:29:22 +08:00
feat: add support for pcap replay
This commit is contained in:
parent
5723490a6c
commit
94387450cf
29
cmd/root.go
29
cmd/root.go
@ -43,6 +43,7 @@ var logger *zap.Logger
|
||||
// Flags
|
||||
var (
|
||||
cfgFile string
|
||||
pcapFile string
|
||||
logLevel string
|
||||
logFormat string
|
||||
)
|
||||
@ -118,6 +119,7 @@ func init() {
|
||||
|
||||
func initFlags() {
|
||||
rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file")
|
||||
rootCmd.PersistentFlags().StringVarP(&pcapFile, "pcap", "p", "", "pcap file (optional)")
|
||||
rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", envOrDefaultString(appLogLevelEnv, "info"), "log level")
|
||||
rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "f", envOrDefaultString(appLogFormatEnv, "console"), "log format")
|
||||
}
|
||||
@ -133,6 +135,9 @@ func initConfig() {
|
||||
viper.AddConfigPath("$HOME/.opengfw")
|
||||
viper.AddConfigPath("/etc/opengfw")
|
||||
}
|
||||
|
||||
viper.SetDefault("replay.realtime", true)
|
||||
viper.SetDefault("replay.replayDelay", 10 * time.Millisecond)
|
||||
}
|
||||
|
||||
func initLogger() {
|
||||
@ -167,6 +172,7 @@ type cliConfig struct {
|
||||
IO cliConfigIO `mapstructure:"io"`
|
||||
Workers cliConfigWorkers `mapstructure:"workers"`
|
||||
Ruleset cliConfigRuleset `mapstructure:"ruleset"`
|
||||
Replay cliConfigReplay `mapstructure:"replay"`
|
||||
}
|
||||
|
||||
type cliConfigIO struct {
|
||||
@ -177,6 +183,11 @@ type cliConfigIO struct {
|
||||
RST bool `mapstructure:"rst"`
|
||||
}
|
||||
|
||||
type cliConfigReplay struct {
|
||||
Realtime bool `mapstructure:"realtime"`
|
||||
ReplayDelay time.Duration `mapstructure:"replayDelay"`
|
||||
}
|
||||
|
||||
type cliConfigWorkers struct {
|
||||
Count int `mapstructure:"count"`
|
||||
QueueSize int `mapstructure:"queueSize"`
|
||||
@ -197,17 +208,31 @@ func (c *cliConfig) fillLogger(config *engine.Config) error {
|
||||
}
|
||||
|
||||
func (c *cliConfig) fillIO(config *engine.Config) error {
|
||||
nfio, err := io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{
|
||||
var ioImpl io.PacketIO
|
||||
var err error
|
||||
if pcapFile != "" {
|
||||
// Setup IO for pcap file replay
|
||||
logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile))
|
||||
ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{
|
||||
PcapFile: pcapFile,
|
||||
Realtime: c.Replay.Realtime,
|
||||
ReplayDelay: c.Replay.ReplayDelay,
|
||||
})
|
||||
} else {
|
||||
// Setup IO for nfqueue
|
||||
ioImpl, err = io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{
|
||||
QueueSize: c.IO.QueueSize,
|
||||
ReadBuffer: c.IO.ReadBuffer,
|
||||
WriteBuffer: c.IO.WriteBuffer,
|
||||
Local: c.IO.Local,
|
||||
RST: c.IO.RST,
|
||||
})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return configError{Field: "io", Err: err}
|
||||
}
|
||||
config.IO = nfio
|
||||
config.IO = ioImpl
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -58,12 +58,17 @@ func (e *engine) UpdateRuleset(r ruleset.Ruleset) error {
|
||||
}
|
||||
|
||||
func (e *engine) Run(ctx context.Context) error {
|
||||
workerCtx, workerCancel := context.WithCancel(ctx)
|
||||
defer workerCancel() // Stop workers
|
||||
|
||||
// Register IO shutdown
|
||||
ioCtx, ioCancel := context.WithCancel(ctx)
|
||||
defer ioCancel() // Stop workers & IO
|
||||
e.io.SetCancelFunc(ioCancel)
|
||||
defer ioCancel() // Stop IO
|
||||
|
||||
// Start workers
|
||||
for _, w := range e.workers {
|
||||
go w.Run(ioCtx)
|
||||
go w.Run(workerCtx)
|
||||
}
|
||||
|
||||
// Register IO callback
|
||||
@ -85,6 +90,8 @@ func (e *engine) Run(ctx context.Context) error {
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ioCtx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -48,6 +48,9 @@ type PacketIO interface {
|
||||
ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error)
|
||||
// Close closes the packet IO.
|
||||
Close() error
|
||||
// SetCancelFunc gives packet IO access to context cancel function, enabling it to
|
||||
// trigger a shutdown
|
||||
SetCancelFunc(cancelFunc context.CancelFunc) error
|
||||
}
|
||||
|
||||
type ErrInvalidPacket struct {
|
||||
|
@ -281,6 +281,11 @@ func (n *nfqueuePacketIO) Close() error {
|
||||
return n.n.Close()
|
||||
}
|
||||
|
||||
// nfqueue IO does not issue shutdown
|
||||
func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error {
|
||||
rules, err := generateNftRules(local, rst)
|
||||
if err != nil {
|
||||
|
126
io/pcap.go
Normal file
126
io/pcap.go
Normal file
@ -0,0 +1,126 @@
|
||||
package io
|
||||
|
||||
import (
|
||||
"context"
|
||||
"hash/crc32"
|
||||
"net"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/gopacket"
|
||||
"github.com/google/gopacket/pcap"
|
||||
)
|
||||
|
||||
var _ PacketIO = (*pcapPacketIO)(nil)
|
||||
|
||||
type pcapPacketIO struct {
|
||||
pcap *pcap.Handle
|
||||
lastTime *time.Time
|
||||
ioCancel context.CancelFunc
|
||||
config PcapPacketIOConfig
|
||||
}
|
||||
|
||||
type PcapPacketIOConfig struct {
|
||||
PcapFile string
|
||||
Realtime bool
|
||||
ReplayDelay time.Duration
|
||||
}
|
||||
|
||||
func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) {
|
||||
handle, err := pcap.OpenOffline(config.PcapFile)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
print(config.ReplayDelay)
|
||||
|
||||
return &pcapPacketIO{
|
||||
pcap: handle,
|
||||
lastTime: nil,
|
||||
ioCancel: nil,
|
||||
config: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error {
|
||||
go func() {
|
||||
packetSource := gopacket.NewPacketSource(p.pcap, p.pcap.LinkType())
|
||||
for packet := range packetSource.Packets() {
|
||||
p.wait(packet)
|
||||
|
||||
networkLayer := packet.NetworkLayer()
|
||||
if networkLayer != nil {
|
||||
src, dst := networkLayer.NetworkFlow().Endpoints()
|
||||
endpoints := []string{src.String(), dst.String()}
|
||||
sort.Strings(endpoints)
|
||||
id := crc32.Checksum([]byte(strings.Join(endpoints, ",")), crc32.IEEETable)
|
||||
|
||||
cb(&pcapPacket{
|
||||
streamID: id,
|
||||
data: packet.LinkLayer().LayerPayload(),
|
||||
}, nil)
|
||||
}
|
||||
}
|
||||
// Give the workers a chance to finish everything
|
||||
time.Sleep(time.Second)
|
||||
// Stop the engine when all packets are finished
|
||||
p.ioCancel()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pcapPacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (p *pcapPacketIO) SetVerdict(pkt Packet, v Verdict, newPacket []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error {
|
||||
p.ioCancel = cancelFunc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pcapPacketIO) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Intentionally slow down the replay
|
||||
// In realtime mode, this is to match the timestamps in the capture
|
||||
// In non realtime mode, this helps to avoid flooding the workers
|
||||
func (p *pcapPacketIO) wait(packet gopacket.Packet) error {
|
||||
if !p.config.Realtime {
|
||||
time.Sleep(p.config.ReplayDelay)
|
||||
return nil
|
||||
}
|
||||
|
||||
if p.lastTime == nil {
|
||||
p.lastTime = &packet.Metadata().Timestamp
|
||||
} else {
|
||||
t := packet.Metadata().Timestamp.Sub(*p.lastTime)
|
||||
time.Sleep(t)
|
||||
p.lastTime = &packet.Metadata().Timestamp
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ Packet = (*pcapPacket)(nil)
|
||||
|
||||
type pcapPacket struct {
|
||||
streamID uint32
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (p *pcapPacket) StreamID() uint32 {
|
||||
return p.streamID
|
||||
}
|
||||
|
||||
func (p *pcapPacket) Data() []byte {
|
||||
return p.data
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user