diff --git a/bitswap.go b/bitswap.go index 8af786a80dcb481eb131494feb3ab8ece3acc925..e87157573418279d6e243c5e785b99e518259d0e 100644 --- a/bitswap.go +++ b/bitswap.go @@ -252,6 +252,9 @@ type Bitswap struct { allMetric metrics.Histogram sentHistogram metrics.Histogram + // External statistics interface + wiretap WireTap + // the SessionManager routes requests to interested sessions sm *bssm.SessionManager @@ -419,6 +422,10 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger + if bs.wiretap != nil { + bs.wiretap.MessageReceived(p, incoming) + } + iblocks := incoming.Blocks() if len(iblocks) > 0 { diff --git a/bitswap_test.go b/bitswap_test.go index b95faa30d5d4f6f45857f2c1ff75b7ea5f1af3b3..2962394d19e33e44ad1a1ab4159662d22273f63f 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -13,6 +13,8 @@ import ( decision "github.com/ipfs/go-bitswap/internal/decision" bssession "github.com/ipfs/go-bitswap/internal/session" "github.com/ipfs/go-bitswap/message" + bsmsg "github.com/ipfs/go-bitswap/message" + pb "github.com/ipfs/go-bitswap/message/pb" testinstance "github.com/ipfs/go-bitswap/testinstance" tn "github.com/ipfs/go-bitswap/testnet" blocks "github.com/ipfs/go-block-format" @@ -468,7 +470,6 @@ func TestBasicBitswap(t *testing.T) { if err != nil { t.Fatal(err) } - st1, err := instances[1].Exchange.Stat() if err != nil { t.Fatal(err) @@ -860,3 +861,144 @@ func TestWithScoreLedger(t *testing.T) { t.Fatal("Expected the score ledger to be closed within 5s") } } + +type logItem struct { + dir byte + pid peer.ID + msg bsmsg.BitSwapMessage +} +type mockWireTap struct { + log []logItem +} + +func (m *mockWireTap) MessageReceived(p peer.ID, msg bsmsg.BitSwapMessage) { + m.log = append(m.log, logItem{'r', p, msg}) +} +func (m *mockWireTap) MessageSent(p peer.ID, msg bsmsg.BitSwapMessage) { + m.log = append(m.log, logItem{'s', p, msg}) +} + +func TestWireTap(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + ig := testinstance.NewTestInstanceGenerator(net, nil, nil) + defer ig.Close() + bg := blocksutil.NewBlockGenerator() + + instances := ig.Instances(3) + blocks := bg.Blocks(2) + + // Install WireTap + wiretap := new(mockWireTap) + bitswap.EnableWireTap(wiretap)(instances[0].Exchange) + + // First peer has block + err := instances[0].Exchange.HasBlock(blocks[0]) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // Second peer broadcasts want for block CID + // (Received by first and third peers) + _, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid()) + if err != nil { + t.Fatal(err) + } + + // When second peer receives block, it should send out a cancel, so third + // peer should no longer keep second peer's want + if err = tu.WaitFor(ctx, func() error { + if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 { + return fmt.Errorf("should have no items in other peers wantlist") + } + if len(instances[1].Exchange.GetWantlist()) != 0 { + return fmt.Errorf("shouldnt have anything in wantlist") + } + return nil + }); err != nil { + t.Fatal(err) + } + + // After communication, 3 messages should be logged via WireTap + if l := len(wiretap.log); l != 3 { + t.Fatal("expected 3 items logged via WireTap, found", l) + } + + // Received: 'Have' + if wiretap.log[0].dir != 'r' { + t.Error("expected message to be received") + } + if wiretap.log[0].pid != instances[1].Peer { + t.Error("expected peer", instances[1].Peer, ", found", wiretap.log[0].pid) + } + if l := len(wiretap.log[0].msg.Wantlist()); l != 1 { + t.Fatal("expected 1 entry in Wantlist, found", l) + } + if wiretap.log[0].msg.Wantlist()[0].WantType != pb.Message_Wantlist_Have { + t.Error("expected WantType equal to 'Have', found 'Block'") + } + + // Sent: Block + if wiretap.log[1].dir != 's' { + t.Error("expected message to be sent") + } + if wiretap.log[1].pid != instances[1].Peer { + t.Error("expected peer", instances[1].Peer, ", found", wiretap.log[1].pid) + } + if l := len(wiretap.log[1].msg.Blocks()); l != 1 { + t.Fatal("expected 1 entry in Blocks, found", l) + } + if wiretap.log[1].msg.Blocks()[0].Cid() != blocks[0].Cid() { + t.Error("wrong block Cid") + } + + // Received: 'Cancel' + if wiretap.log[2].dir != 'r' { + t.Error("expected message to be received") + } + if wiretap.log[2].pid != instances[1].Peer { + t.Error("expected peer", instances[1].Peer, ", found", wiretap.log[2].pid) + } + if l := len(wiretap.log[2].msg.Wantlist()); l != 1 { + t.Fatal("expected 1 entry in Wantlist, found", l) + } + if wiretap.log[2].msg.Wantlist()[0].WantType != pb.Message_Wantlist_Block { + t.Error("expected WantType equal to 'Block', found 'Have'") + } + if wiretap.log[2].msg.Wantlist()[0].Cancel != true { + t.Error("expected entry with Cancel set to 'true'") + } + + // After disabling WireTap, no new messages are logged + bitswap.DisableWireTap()(instances[0].Exchange) + + err = instances[0].Exchange.HasBlock(blocks[1]) + if err != nil { + t.Fatal(err) + } + _, err = instances[1].Exchange.GetBlock(ctx, blocks[1].Cid()) + if err != nil { + t.Fatal(err) + } + if err = tu.WaitFor(ctx, func() error { + if len(instances[1].Exchange.GetWantlist()) != 0 { + return fmt.Errorf("shouldnt have anything in wantlist") + } + return nil + }); err != nil { + t.Fatal(err) + } + + if l := len(wiretap.log); l != 3 { + t.Fatal("expected 3 items logged via WireTap, found", l) + } + + for _, inst := range instances { + err := inst.Exchange.Close() + if err != nil { + t.Fatal(err) + } + } +} diff --git a/wiretap.go b/wiretap.go new file mode 100644 index 0000000000000000000000000000000000000000..55cb21d3e626fff4ea2267c2cdbbc1f3b8b30a48 --- /dev/null +++ b/wiretap.go @@ -0,0 +1,27 @@ +package bitswap + +import ( + bsmsg "github.com/ipfs/go-bitswap/message" + peer "github.com/libp2p/go-libp2p-core/peer" +) + +// WireTap provides methods to access all messages sent and received by Bitswap. +// This interface can be used to implement various statistics (this is original intent). +type WireTap interface { + MessageReceived(peer.ID, bsmsg.BitSwapMessage) + MessageSent(peer.ID, bsmsg.BitSwapMessage) +} + +// Configures Bitswap to use given wiretap. +func EnableWireTap(tap WireTap) Option { + return func(bs *Bitswap) { + bs.wiretap = tap + } +} + +// Configures Bitswap not to use any wiretap. +func DisableWireTap() Option { + return func(bs *Bitswap) { + bs.wiretap = nil + } +} diff --git a/workers.go b/workers.go index 208c02bffc2c298a2a6c64bb232ab128bd06980f..5db5342317369dac769dee6627b39ede77276e45 100644 --- a/workers.go +++ b/workers.go @@ -56,6 +56,9 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { // Ideally, yes. But we'd need some way to trigger a retry and/or drop // the peer. bs.engine.MessageSent(envelope.Peer, envelope.Message) + if bs.wiretap != nil { + bs.wiretap.MessageSent(envelope.Peer, envelope.Message) + } bs.sendBlocks(ctx, envelope) case <-ctx.Done(): return