From 94387450cff92fe54fe86736290ae594a2544832 Mon Sep 17 00:00:00 2001 From: eddc005 Date: Mon, 6 May 2024 20:28:36 +0100 Subject: [PATCH 1/7] feat: add support for pcap replay --- cmd/root.go | 41 ++++++++++++--- engine/engine.go | 11 ++++- io/interface.go | 3 ++ io/nfqueue.go | 5 ++ io/pcap.go | 126 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 176 insertions(+), 10 deletions(-) create mode 100644 io/pcap.go diff --git a/cmd/root.go b/cmd/root.go index 288e3d7..d6c0083 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -43,6 +43,7 @@ var logger *zap.Logger // Flags var ( cfgFile string + pcapFile string logLevel string logFormat string ) @@ -118,6 +119,7 @@ func init() { func initFlags() { rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file") + rootCmd.PersistentFlags().StringVarP(&pcapFile, "pcap", "p", "", "pcap file (optional)") rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", envOrDefaultString(appLogLevelEnv, "info"), "log level") rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "f", envOrDefaultString(appLogFormatEnv, "console"), "log format") } @@ -133,6 +135,9 @@ func initConfig() { viper.AddConfigPath("$HOME/.opengfw") viper.AddConfigPath("/etc/opengfw") } + + viper.SetDefault("replay.realtime", true) + viper.SetDefault("replay.replayDelay", 10 * time.Millisecond) } func initLogger() { @@ -167,6 +172,7 @@ type cliConfig struct { IO cliConfigIO `mapstructure:"io"` Workers cliConfigWorkers `mapstructure:"workers"` Ruleset cliConfigRuleset `mapstructure:"ruleset"` + Replay cliConfigReplay `mapstructure:"replay"` } type cliConfigIO struct { @@ -177,6 +183,11 @@ type cliConfigIO struct { RST bool `mapstructure:"rst"` } +type cliConfigReplay struct { + Realtime bool `mapstructure:"realtime"` + ReplayDelay time.Duration `mapstructure:"replayDelay"` +} + type cliConfigWorkers struct { Count int `mapstructure:"count"` QueueSize int `mapstructure:"queueSize"` @@ -197,17 +208,31 @@ func (c *cliConfig) fillLogger(config *engine.Config) error { } func (c *cliConfig) fillIO(config *engine.Config) error { - nfio, err := io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{ - QueueSize: c.IO.QueueSize, - ReadBuffer: c.IO.ReadBuffer, - WriteBuffer: c.IO.WriteBuffer, - Local: c.IO.Local, - RST: c.IO.RST, - }) + var ioImpl io.PacketIO + var err error + if pcapFile != "" { + // Setup IO for pcap file replay + logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile)) + ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{ + PcapFile: pcapFile, + Realtime: c.Replay.Realtime, + ReplayDelay: c.Replay.ReplayDelay, + }) + } else { + // Setup IO for nfqueue + ioImpl, err = io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{ + QueueSize: c.IO.QueueSize, + ReadBuffer: c.IO.ReadBuffer, + WriteBuffer: c.IO.WriteBuffer, + Local: c.IO.Local, + RST: c.IO.RST, + }) + } + if err != nil { return configError{Field: "io", Err: err} } - config.IO = nfio + config.IO = ioImpl return nil } diff --git a/engine/engine.go b/engine/engine.go index 56f5ed3..1270efb 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -58,12 +58,17 @@ func (e *engine) UpdateRuleset(r ruleset.Ruleset) error { } func (e *engine) Run(ctx context.Context) error { + workerCtx, workerCancel := context.WithCancel(ctx) + defer workerCancel() // Stop workers + + // Register IO shutdown ioCtx, ioCancel := context.WithCancel(ctx) - defer ioCancel() // Stop workers & IO + e.io.SetCancelFunc(ioCancel) + defer ioCancel() // Stop IO // Start workers for _, w := range e.workers { - go w.Run(ioCtx) + go w.Run(workerCtx) } // Register IO callback @@ -85,6 +90,8 @@ func (e *engine) Run(ctx context.Context) error { return err case <-ctx.Done(): return nil + case <-ioCtx.Done(): + return nil } } diff --git a/io/interface.go b/io/interface.go index af7e1e7..f996789 100644 --- a/io/interface.go +++ b/io/interface.go @@ -48,6 +48,9 @@ type PacketIO interface { ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) // Close closes the packet IO. Close() error + // SetCancelFunc gives packet IO access to context cancel function, enabling it to + // trigger a shutdown + SetCancelFunc(cancelFunc context.CancelFunc) error } type ErrInvalidPacket struct { diff --git a/io/nfqueue.go b/io/nfqueue.go index e84a0bb..f1a64df 100644 --- a/io/nfqueue.go +++ b/io/nfqueue.go @@ -281,6 +281,11 @@ func (n *nfqueuePacketIO) Close() error { return n.n.Close() } +// nfqueue IO does not issue shutdown +func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { + return nil +} + func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error { rules, err := generateNftRules(local, rst) if err != nil { diff --git a/io/pcap.go b/io/pcap.go new file mode 100644 index 0000000..36bbbaa --- /dev/null +++ b/io/pcap.go @@ -0,0 +1,126 @@ +package io + +import ( + "context" + "hash/crc32" + "net" + "sort" + "strings" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/pcap" +) + +var _ PacketIO = (*pcapPacketIO)(nil) + +type pcapPacketIO struct { + pcap *pcap.Handle + lastTime *time.Time + ioCancel context.CancelFunc + config PcapPacketIOConfig +} + +type PcapPacketIOConfig struct { + PcapFile string + Realtime bool + ReplayDelay time.Duration +} + +func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { + handle, err := pcap.OpenOffline(config.PcapFile) + + if err != nil { + return nil, err + } + + print(config.ReplayDelay) + + return &pcapPacketIO{ + pcap: handle, + lastTime: nil, + ioCancel: nil, + config: config, + }, nil +} + +func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error { + go func() { + packetSource := gopacket.NewPacketSource(p.pcap, p.pcap.LinkType()) + for packet := range packetSource.Packets() { + p.wait(packet) + + networkLayer := packet.NetworkLayer() + if networkLayer != nil { + src, dst := networkLayer.NetworkFlow().Endpoints() + endpoints := []string{src.String(), dst.String()} + sort.Strings(endpoints) + id := crc32.Checksum([]byte(strings.Join(endpoints, ",")), crc32.IEEETable) + + cb(&pcapPacket{ + streamID: id, + data: packet.LinkLayer().LayerPayload(), + }, nil) + } + } + // Give the workers a chance to finish everything + time.Sleep(time.Second) + // Stop the engine when all packets are finished + p.ioCancel() + }() + + return nil +} + +func (p *pcapPacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) { + return nil, nil +} + +func (p *pcapPacketIO) SetVerdict(pkt Packet, v Verdict, newPacket []byte) error { + return nil +} + +func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { + p.ioCancel = cancelFunc + return nil +} + +func (p *pcapPacketIO) Close() error { + return nil +} + +// Intentionally slow down the replay +// In realtime mode, this is to match the timestamps in the capture +// In non realtime mode, this helps to avoid flooding the workers +func (p *pcapPacketIO) wait(packet gopacket.Packet) error { + if !p.config.Realtime { + time.Sleep(p.config.ReplayDelay) + return nil + } + + if p.lastTime == nil { + p.lastTime = &packet.Metadata().Timestamp + } else { + t := packet.Metadata().Timestamp.Sub(*p.lastTime) + time.Sleep(t) + p.lastTime = &packet.Metadata().Timestamp + } + + return nil +} + +var _ Packet = (*pcapPacket)(nil) + +type pcapPacket struct { + streamID uint32 + data []byte +} + +func (p *pcapPacket) StreamID() uint32 { + return p.streamID +} + +func (p *pcapPacket) Data() []byte { + return p.data +} + From f01b79e6255751bf2fe7a5db22226d205748e2b6 Mon Sep 17 00:00:00 2001 From: eddc005 Date: Mon, 6 May 2024 23:04:54 +0100 Subject: [PATCH 2/7] rebase and remove replayDelay --- cmd/root.go | 3 --- io/pcap.go | 20 ++++++++++---------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index d6c0083..136f456 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -137,7 +137,6 @@ func initConfig() { } viper.SetDefault("replay.realtime", true) - viper.SetDefault("replay.replayDelay", 10 * time.Millisecond) } func initLogger() { @@ -185,7 +184,6 @@ type cliConfigIO struct { type cliConfigReplay struct { Realtime bool `mapstructure:"realtime"` - ReplayDelay time.Duration `mapstructure:"replayDelay"` } type cliConfigWorkers struct { @@ -216,7 +214,6 @@ func (c *cliConfig) fillIO(config *engine.Config) error { ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{ PcapFile: pcapFile, Realtime: c.Replay.Realtime, - ReplayDelay: c.Replay.ReplayDelay, }) } else { // Setup IO for nfqueue diff --git a/io/pcap.go b/io/pcap.go index 36bbbaa..995332b 100644 --- a/io/pcap.go +++ b/io/pcap.go @@ -24,7 +24,6 @@ type pcapPacketIO struct { type PcapPacketIOConfig struct { PcapFile string Realtime bool - ReplayDelay time.Duration } func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { @@ -34,8 +33,6 @@ func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { return nil, err } - print(config.ReplayDelay) - return &pcapPacketIO{ pcap: handle, lastTime: nil, @@ -58,8 +55,9 @@ func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error { id := crc32.Checksum([]byte(strings.Join(endpoints, ",")), crc32.IEEETable) cb(&pcapPacket{ - streamID: id, - data: packet.LinkLayer().LayerPayload(), + streamID: id, + timestamp: packet.Metadata().Timestamp, + data: packet.LinkLayer().LayerPayload(), }, nil) } } @@ -91,10 +89,8 @@ func (p *pcapPacketIO) Close() error { // Intentionally slow down the replay // In realtime mode, this is to match the timestamps in the capture -// In non realtime mode, this helps to avoid flooding the workers func (p *pcapPacketIO) wait(packet gopacket.Packet) error { if !p.config.Realtime { - time.Sleep(p.config.ReplayDelay) return nil } @@ -112,15 +108,19 @@ func (p *pcapPacketIO) wait(packet gopacket.Packet) error { var _ Packet = (*pcapPacket)(nil) type pcapPacket struct { - streamID uint32 - data []byte + streamID uint32 + timestamp time.Time + data []byte } func (p *pcapPacket) StreamID() uint32 { return p.streamID } +func (p *pcapPacket) Timestamp() time.Time { + return p.timestamp +} + func (p *pcapPacket) Data() []byte { return p.data } - From abd7725feda4909931df90ae62461ea08428eaa0 Mon Sep 17 00:00:00 2001 From: eddc005 Date: Tue, 7 May 2024 21:50:06 +0100 Subject: [PATCH 3/7] close pcap properly and implement ProtectedDialContext --- io/pcap.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/io/pcap.go b/io/pcap.go index 995332b..c2dfae0 100644 --- a/io/pcap.go +++ b/io/pcap.go @@ -19,6 +19,8 @@ type pcapPacketIO struct { lastTime *time.Time ioCancel context.CancelFunc config PcapPacketIOConfig + + dialer *net.Dialer } type PcapPacketIOConfig struct { @@ -38,6 +40,7 @@ func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { lastTime: nil, ioCancel: nil, config: config, + dialer: &net.Dialer{}, }, nil } @@ -70,8 +73,9 @@ func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error { return nil } +// A normal dialer is sufficient as pcap IO does not mess up with the networking func (p *pcapPacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) { - return nil, nil + return p.dialer.DialContext(ctx, network, address) } func (p *pcapPacketIO) SetVerdict(pkt Packet, v Verdict, newPacket []byte) error { @@ -84,6 +88,7 @@ func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { } func (p *pcapPacketIO) Close() error { + p.pcap.Close() return nil } From 70fee141033dbf743e76a419a596ffe22ebc1898 Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 7 May 2024 22:50:58 -0700 Subject: [PATCH 4/7] chore: format --- cmd/root.go | 6 +++--- io/pcap.go | 7 +++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 136f456..79078ea 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -183,7 +183,7 @@ type cliConfigIO struct { } type cliConfigReplay struct { - Realtime bool `mapstructure:"realtime"` + Realtime bool `mapstructure:"realtime"` } type cliConfigWorkers struct { @@ -212,8 +212,8 @@ func (c *cliConfig) fillIO(config *engine.Config) error { // Setup IO for pcap file replay logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile)) ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{ - PcapFile: pcapFile, - Realtime: c.Replay.Realtime, + PcapFile: pcapFile, + Realtime: c.Replay.Realtime, }) } else { // Setup IO for nfqueue diff --git a/io/pcap.go b/io/pcap.go index c2dfae0..520da17 100644 --- a/io/pcap.go +++ b/io/pcap.go @@ -20,17 +20,16 @@ type pcapPacketIO struct { ioCancel context.CancelFunc config PcapPacketIOConfig - dialer *net.Dialer + dialer *net.Dialer } type PcapPacketIOConfig struct { - PcapFile string - Realtime bool + PcapFile string + Realtime bool } func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { handle, err := pcap.OpenOffline(config.PcapFile) - if err != nil { return nil, err } From 76c0f47832140dac528aba42b6d80bf551b4f7a8 Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 7 May 2024 23:05:06 -0700 Subject: [PATCH 5/7] chore: do not default replay.realtime to true --- cmd/root.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 79078ea..1ccf025 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -135,8 +135,6 @@ func initConfig() { viper.AddConfigPath("$HOME/.opengfw") viper.AddConfigPath("/etc/opengfw") } - - viper.SetDefault("replay.realtime", true) } func initLogger() { From 5e15fd6dd937fa040254fe368ee2b6645d3f50ff Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 7 May 2024 23:12:24 -0700 Subject: [PATCH 6/7] ci: install pcap for build --- .github/workflows/check.yaml | 3 +++ .github/workflows/release.yaml | 3 +++ 2 files changed, 6 insertions(+) diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index ac9e66d..352ac7c 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -23,6 +23,9 @@ jobs: with: go-version: 'stable' + - name: Install pcap + run: sudo apt install -y libpcap-dev + - run: go vet ./... - name: staticcheck diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 0da1054..b227e16 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -24,6 +24,9 @@ jobs: with: go-version: "1.22" + - name: Install pcap + run: sudo apt install -y libpcap-dev + - name: Build env: GOOS: ${{ matrix.goos }} From 0daaa32fc6c77fa4566266986d9a6f84722e6903 Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 7 May 2024 23:13:58 -0700 Subject: [PATCH 7/7] ci: install pcap for build 2 --- .github/workflows/check.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index 352ac7c..a24f198 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -47,4 +47,7 @@ jobs: with: go-version: 'stable' + - name: Install pcap + run: sudo apt install -y libpcap-dev + - run: go test ./...