Unverified Commit bc3df6bd authored by Tomasz Zdybał's avatar Tomasz Zdybał Committed by GitHub

Add WireTap interface (#444)

* Add WireTap interface

WireTap interface can be used to access all messages send and received
by Bitswap. This can be used to implement advanced statistics/analysis
logic, which is beyond scope of Bitswap, but can be implemented as IPFS
plugin.

Some examples of potential applications:
 - per CID bandwidth tracker (see: https://gitcoin.co/issue/PinataCloud/apollo/2/100023631)
 - detailed per peer stats
 - intrusion detection system (IDS) implementation

* Add test for WireTap
parent bcf85413
...@@ -252,6 +252,9 @@ type Bitswap struct { ...@@ -252,6 +252,9 @@ type Bitswap struct {
allMetric metrics.Histogram allMetric metrics.Histogram
sentHistogram metrics.Histogram sentHistogram metrics.Histogram
// External statistics interface
wiretap WireTap
// the SessionManager routes requests to interested sessions // the SessionManager routes requests to interested sessions
sm *bssm.SessionManager sm *bssm.SessionManager
...@@ -419,6 +422,10 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -419,6 +422,10 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
// TODO: this is bad, and could be easily abused. // TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger // Should only track *useful* messages in ledger
if bs.wiretap != nil {
bs.wiretap.MessageReceived(p, incoming)
}
iblocks := incoming.Blocks() iblocks := incoming.Blocks()
if len(iblocks) > 0 { if len(iblocks) > 0 {
......
...@@ -13,6 +13,8 @@ import ( ...@@ -13,6 +13,8 @@ import (
decision "github.com/ipfs/go-bitswap/internal/decision" decision "github.com/ipfs/go-bitswap/internal/decision"
bssession "github.com/ipfs/go-bitswap/internal/session" bssession "github.com/ipfs/go-bitswap/internal/session"
"github.com/ipfs/go-bitswap/message" "github.com/ipfs/go-bitswap/message"
bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
testinstance "github.com/ipfs/go-bitswap/testinstance" testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet" tn "github.com/ipfs/go-bitswap/testnet"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
...@@ -468,7 +470,6 @@ func TestBasicBitswap(t *testing.T) { ...@@ -468,7 +470,6 @@ func TestBasicBitswap(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
st1, err := instances[1].Exchange.Stat() st1, err := instances[1].Exchange.Stat()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -860,3 +861,144 @@ func TestWithScoreLedger(t *testing.T) { ...@@ -860,3 +861,144 @@ func TestWithScoreLedger(t *testing.T) {
t.Fatal("Expected the score ledger to be closed within 5s") t.Fatal("Expected the score ledger to be closed within 5s")
} }
} }
type logItem struct {
dir byte
pid peer.ID
msg bsmsg.BitSwapMessage
}
type mockWireTap struct {
log []logItem
}
func (m *mockWireTap) MessageReceived(p peer.ID, msg bsmsg.BitSwapMessage) {
m.log = append(m.log, logItem{'r', p, msg})
}
func (m *mockWireTap) MessageSent(p peer.ID, msg bsmsg.BitSwapMessage) {
m.log = append(m.log, logItem{'s', p, msg})
}
func TestWireTap(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()
instances := ig.Instances(3)
blocks := bg.Blocks(2)
// Install WireTap
wiretap := new(mockWireTap)
bitswap.EnableWireTap(wiretap)(instances[0].Exchange)
// First peer has block
err := instances[0].Exchange.HasBlock(blocks[0])
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// Second peer broadcasts want for block CID
// (Received by first and third peers)
_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
if err != nil {
t.Fatal(err)
}
// When second peer receives block, it should send out a cancel, so third
// peer should no longer keep second peer's want
if err = tu.WaitFor(ctx, func() error {
if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 {
return fmt.Errorf("should have no items in other peers wantlist")
}
if len(instances[1].Exchange.GetWantlist()) != 0 {
return fmt.Errorf("shouldnt have anything in wantlist")
}
return nil
}); err != nil {
t.Fatal(err)
}
// After communication, 3 messages should be logged via WireTap
if l := len(wiretap.log); l != 3 {
t.Fatal("expected 3 items logged via WireTap, found", l)
}
// Received: 'Have'
if wiretap.log[0].dir != 'r' {
t.Error("expected message to be received")
}
if wiretap.log[0].pid != instances[1].Peer {
t.Error("expected peer", instances[1].Peer, ", found", wiretap.log[0].pid)
}
if l := len(wiretap.log[0].msg.Wantlist()); l != 1 {
t.Fatal("expected 1 entry in Wantlist, found", l)
}
if wiretap.log[0].msg.Wantlist()[0].WantType != pb.Message_Wantlist_Have {
t.Error("expected WantType equal to 'Have', found 'Block'")
}
// Sent: Block
if wiretap.log[1].dir != 's' {
t.Error("expected message to be sent")
}
if wiretap.log[1].pid != instances[1].Peer {
t.Error("expected peer", instances[1].Peer, ", found", wiretap.log[1].pid)
}
if l := len(wiretap.log[1].msg.Blocks()); l != 1 {
t.Fatal("expected 1 entry in Blocks, found", l)
}
if wiretap.log[1].msg.Blocks()[0].Cid() != blocks[0].Cid() {
t.Error("wrong block Cid")
}
// Received: 'Cancel'
if wiretap.log[2].dir != 'r' {
t.Error("expected message to be received")
}
if wiretap.log[2].pid != instances[1].Peer {
t.Error("expected peer", instances[1].Peer, ", found", wiretap.log[2].pid)
}
if l := len(wiretap.log[2].msg.Wantlist()); l != 1 {
t.Fatal("expected 1 entry in Wantlist, found", l)
}
if wiretap.log[2].msg.Wantlist()[0].WantType != pb.Message_Wantlist_Block {
t.Error("expected WantType equal to 'Block', found 'Have'")
}
if wiretap.log[2].msg.Wantlist()[0].Cancel != true {
t.Error("expected entry with Cancel set to 'true'")
}
// After disabling WireTap, no new messages are logged
bitswap.DisableWireTap()(instances[0].Exchange)
err = instances[0].Exchange.HasBlock(blocks[1])
if err != nil {
t.Fatal(err)
}
_, err = instances[1].Exchange.GetBlock(ctx, blocks[1].Cid())
if err != nil {
t.Fatal(err)
}
if err = tu.WaitFor(ctx, func() error {
if len(instances[1].Exchange.GetWantlist()) != 0 {
return fmt.Errorf("shouldnt have anything in wantlist")
}
return nil
}); err != nil {
t.Fatal(err)
}
if l := len(wiretap.log); l != 3 {
t.Fatal("expected 3 items logged via WireTap, found", l)
}
for _, inst := range instances {
err := inst.Exchange.Close()
if err != nil {
t.Fatal(err)
}
}
}
package bitswap
import (
bsmsg "github.com/ipfs/go-bitswap/message"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// WireTap provides methods to access all messages sent and received by Bitswap.
// This interface can be used to implement various statistics (this is original intent).
type WireTap interface {
MessageReceived(peer.ID, bsmsg.BitSwapMessage)
MessageSent(peer.ID, bsmsg.BitSwapMessage)
}
// Configures Bitswap to use given wiretap.
func EnableWireTap(tap WireTap) Option {
return func(bs *Bitswap) {
bs.wiretap = tap
}
}
// Configures Bitswap not to use any wiretap.
func DisableWireTap() Option {
return func(bs *Bitswap) {
bs.wiretap = nil
}
}
...@@ -56,6 +56,9 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { ...@@ -56,6 +56,9 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
// Ideally, yes. But we'd need some way to trigger a retry and/or drop // Ideally, yes. But we'd need some way to trigger a retry and/or drop
// the peer. // the peer.
bs.engine.MessageSent(envelope.Peer, envelope.Message) bs.engine.MessageSent(envelope.Peer, envelope.Message)
if bs.wiretap != nil {
bs.wiretap.MessageSent(envelope.Peer, envelope.Message)
}
bs.sendBlocks(ctx, envelope) bs.sendBlocks(ctx, envelope)
case <-ctx.Done(): case <-ctx.Done():
return return
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment