Unverified Commit 6338b068 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #19 from ipfs/feat/bitswap-pr-improvements

Feat/bitswap pr improvements
parents 04f55f38 d6144d9e
...@@ -30,6 +30,8 @@ import ( ...@@ -30,6 +30,8 @@ import (
var log = logging.Logger("bitswap") var log = logging.Logger("bitswap")
var _ exchange.SessionExchange = (*Bitswap)(nil)
const ( const (
// maxProvidersPerRequest specifies the maximum number of providers desired // maxProvidersPerRequest specifies the maximum number of providers desired
// from the network. This value is specified because the network streams // from the network. This value is specified because the network streams
......
package bitswap
import (
"context"
"encoding/json"
"io/ioutil"
"math/rand"
"sync"
"testing"
"time"
tn "github.com/ipfs/go-bitswap/testnet"
"github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)
type fetchFunc func(b *testing.B, bs *Bitswap, ks []cid.Cid)
type distFunc func(b *testing.B, provs []Instance, blocks []blocks.Block)
type runStats struct {
Dups uint64
MsgSent uint64
MsgRecd uint64
Time time.Duration
Name string
}
var benchmarkLog []runStats
func BenchmarkDups2Nodes(b *testing.B) {
b.Run("AllToAll-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, allToAll, oneAtATime)
})
b.Run("AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, allToAll, batchFetchAll)
})
b.Run("Overlap1-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap1, oneAtATime)
})
b.Run("Overlap2-BatchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap2, batchFetchBy10)
})
b.Run("Overlap3-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, oneAtATime)
})
b.Run("Overlap3-BatchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchBy10)
})
b.Run("Overlap3-AllConcurrent", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, fetchAllConcurrent)
})
b.Run("Overlap3-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchAll)
})
b.Run("Overlap3-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, unixfsFileFetch)
})
b.Run("10Nodes-AllToAll-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, oneAtATime)
})
b.Run("10Nodes-AllToAll-BatchFetchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchBy10)
})
b.Run("10Nodes-AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchAll)
})
b.Run("10Nodes-AllToAll-AllConcurrent", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, fetchAllConcurrent)
})
b.Run("10Nodes-AllToAll-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, unixfsFileFetch)
})
b.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, oneAtATime)
})
b.Run("10Nodes-OnePeerPerBlock-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, batchFetchAll)
})
b.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, unixfsFileFetch)
})
b.Run("200Nodes-AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 200, 20, allToAll, batchFetchAll)
})
out, _ := json.MarshalIndent(benchmarkLog, "", " ")
ioutil.WriteFile("benchmark.json", out, 0666)
}
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, df distFunc, ff fetchFunc) {
start := time.Now()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond))
sg := NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
instances := sg.Instances(numnodes)
blocks := bg.Blocks(numblks)
fetcher := instances[numnodes-1]
df(b, instances[:numnodes-1], blocks)
var ks []cid.Cid
for _, blk := range blocks {
ks = append(ks, blk.Cid())
}
ff(b, fetcher.Exchange, ks)
st, err := fetcher.Exchange.Stat()
if err != nil {
b.Fatal(err)
}
nst := fetcher.Exchange.network.Stats()
stats := runStats{
Time: time.Now().Sub(start),
MsgRecd: nst.MessagesRecvd,
MsgSent: nst.MessagesSent,
Dups: st.DupBlksReceived,
Name: b.Name(),
}
benchmarkLog = append(benchmarkLog, stats)
b.Logf("send/recv: %d / %d", nst.MessagesSent, nst.MessagesRecvd)
if st.DupBlksReceived != 0 {
b.Fatalf("got %d duplicate blocks!", st.DupBlksReceived)
}
}
func allToAll(b *testing.B, provs []Instance, blocks []blocks.Block) {
for _, p := range provs {
if err := p.Blockstore().PutMany(blocks); err != nil {
b.Fatal(err)
}
}
}
// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
func overlap1(b *testing.B, provs []Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap1 only works with 2 provs")
}
bill := provs[0]
jeff := provs[1]
if err := bill.Blockstore().PutMany(blks[:75]); err != nil {
b.Fatal(err)
}
if err := jeff.Blockstore().PutMany(blks[25:]); err != nil {
b.Fatal(err)
}
}
// overlap2 gives every even numbered block to the first peer, odd numbered
// blocks to the second. it also gives every third block to both peers
func overlap2(b *testing.B, provs []Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap2 only works with 2 provs")
}
bill := provs[0]
jeff := provs[1]
bill.Blockstore().Put(blks[0])
jeff.Blockstore().Put(blks[0])
for i, blk := range blks {
if i%3 == 0 {
bill.Blockstore().Put(blk)
jeff.Blockstore().Put(blk)
} else if i%2 == 1 {
bill.Blockstore().Put(blk)
} else {
jeff.Blockstore().Put(blk)
}
}
}
func overlap3(b *testing.B, provs []Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap3 only works with 2 provs")
}
bill := provs[0]
jeff := provs[1]
bill.Blockstore().Put(blks[0])
jeff.Blockstore().Put(blks[0])
for i, blk := range blks {
if i%3 == 0 {
bill.Blockstore().Put(blk)
jeff.Blockstore().Put(blk)
} else if i%2 == 1 {
bill.Blockstore().Put(blk)
} else {
jeff.Blockstore().Put(blk)
}
}
}
// onePeerPerBlock picks a random peer to hold each block
// with this layout, we shouldnt actually ever see any duplicate blocks
// but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []Instance, blks []blocks.Block) {
for _, blk := range blks {
provs[rand.Intn(len(provs))].Blockstore().Put(blk)
}
}
func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background()).(*Session)
for _, c := range ks {
_, err := ses.GetBlock(context.Background(), c)
if err != nil {
b.Fatal(err)
}
}
b.Logf("Session fetch latency: %s", ses.latTotal/time.Duration(ses.fetchcnt))
}
// fetch data in batches, 10 at a time
func batchFetchBy10(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
for i := 0; i < len(ks); i += 10 {
out, err := ses.GetBlocks(context.Background(), ks[i:i+10])
if err != nil {
b.Fatal(err)
}
for range out {
}
}
}
// fetch each block at the same time concurrently
func fetchAllConcurrent(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
var wg sync.WaitGroup
for _, c := range ks {
wg.Add(1)
go func(c cid.Cid) {
defer wg.Done()
_, err := ses.GetBlock(context.Background(), c)
if err != nil {
b.Fatal(err)
}
}(c)
}
wg.Wait()
}
func batchFetchAll(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
out, err := ses.GetBlocks(context.Background(), ks)
if err != nil {
b.Fatal(err)
}
for range out {
}
}
// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
func unixfsFileFetch(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
_, err := ses.GetBlock(context.Background(), ks[0])
if err != nil {
b.Fatal(err)
}
out, err := ses.GetBlocks(context.Background(), ks[1:11])
if err != nil {
b.Fatal(err)
}
for range out {
}
out, err = ses.GetBlocks(context.Background(), ks[11:])
if err != nil {
b.Fatal(err)
}
for range out {
}
}
...@@ -38,6 +38,8 @@ type BitSwapNetwork interface { ...@@ -38,6 +38,8 @@ type BitSwapNetwork interface {
ConnectionManager() ifconnmgr.ConnManager ConnectionManager() ifconnmgr.ConnManager
Stats() NetworkStats
Routing Routing
} }
...@@ -68,3 +70,11 @@ type Routing interface { ...@@ -68,3 +70,11 @@ type Routing interface {
// Provide provides the key to the network // Provide provides the key to the network
Provide(context.Context, cid.Cid) error Provide(context.Context, cid.Cid) error
} }
// NetworkStats is a container for statistics about the bitswap network
// the numbers inside are specific to bitswap, and not any other protocols
// using the same underlying network.
type NetworkStats struct {
MessagesSent uint64
MessagesRecvd uint64
}
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"sync/atomic"
"time" "time"
bsmsg "github.com/ipfs/go-bitswap/message" bsmsg "github.com/ipfs/go-bitswap/message"
...@@ -48,6 +49,8 @@ type impl struct { ...@@ -48,6 +49,8 @@ type impl struct {
// inbound messages from the network are forwarded to the receiver // inbound messages from the network are forwarded to the receiver
receiver Receiver receiver Receiver
stats NetworkStats
} }
type streamMessageSender struct { type streamMessageSender struct {
...@@ -130,6 +133,8 @@ func (bsnet *impl) SendMessage( ...@@ -130,6 +133,8 @@ func (bsnet *impl) SendMessage(
s.Reset() s.Reset()
return err return err
} }
atomic.AddUint64(&bsnet.stats.MessagesSent, 1)
// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine. // TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
go inet.AwaitEOF(s) go inet.AwaitEOF(s)
return s.Close() return s.Close()
...@@ -210,6 +215,7 @@ func (bsnet *impl) handleNewStream(s inet.Stream) { ...@@ -210,6 +215,7 @@ func (bsnet *impl) handleNewStream(s inet.Stream) {
ctx := context.Background() ctx := context.Background()
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer()) log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
bsnet.receiver.ReceiveMessage(ctx, p, received) bsnet.receiver.ReceiveMessage(ctx, p, received)
atomic.AddUint64(&bsnet.stats.MessagesRecvd, 1)
} }
} }
...@@ -217,6 +223,13 @@ func (bsnet *impl) ConnectionManager() ifconnmgr.ConnManager { ...@@ -217,6 +223,13 @@ func (bsnet *impl) ConnectionManager() ifconnmgr.ConnManager {
return bsnet.host.ConnManager() return bsnet.host.ConnManager()
} }
func (bsnet *impl) Stats() NetworkStats {
return NetworkStats{
MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd),
MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent),
}
}
type netNotifiee impl type netNotifiee impl
func (nn *netNotifiee) impl() *impl { func (nn *netNotifiee) impl() *impl {
......
...@@ -87,9 +87,9 @@ ...@@ -87,9 +87,9 @@
}, },
{ {
"author": "hsanjuan", "author": "hsanjuan",
"hash": "QmR1nncPsZR14A4hWr39mq8Lm7BGgS68bHVT9nop8NpWEM", "hash": "QmUdh9184Bozfinyn5YDhgPRg33E3KR3btfZXcVoFgTxD4",
"name": "go-ipfs-exchange-interface", "name": "go-ipfs-exchange-interface",
"version": "0.1.0" "version": "0.1.1"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
loggables "github.com/libp2p/go-libp2p-loggables" loggables "github.com/libp2p/go-libp2p-loggables"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
...@@ -51,7 +52,7 @@ type Session struct { ...@@ -51,7 +52,7 @@ type Session struct {
// NewSession creates a new bitswap session whose lifetime is bounded by the // NewSession creates a new bitswap session whose lifetime is bounded by the
// given context // given context
func (bs *Bitswap) NewSession(ctx context.Context) *Session { func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
s := &Session{ s := &Session{
activePeers: make(map[peer.ID]struct{}), activePeers: make(map[peer.ID]struct{}),
liveWants: make(map[cid.Cid]time.Time), liveWants: make(map[cid.Cid]time.Time),
......
...@@ -132,7 +132,7 @@ func TestSessionSplitFetch(t *testing.T) { ...@@ -132,7 +132,7 @@ func TestSessionSplitFetch(t *testing.T) {
cids = append(cids, blk.Cid()) cids = append(cids, blk.Cid())
} }
ses := inst[10].Exchange.NewSession(ctx) ses := inst[10].Exchange.NewSession(ctx).(*Session)
ses.baseTickDelay = time.Millisecond * 10 ses.baseTickDelay = time.Millisecond * 10
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
......
...@@ -7,15 +7,16 @@ import ( ...@@ -7,15 +7,16 @@ import (
) )
type Stat struct { type Stat struct {
ProvideBufLen int ProvideBufLen int
Wantlist []cid.Cid Wantlist []cid.Cid
Peers []string Peers []string
BlocksReceived uint64 BlocksReceived uint64
DataReceived uint64 DataReceived uint64
BlocksSent uint64 BlocksSent uint64
DataSent uint64 DataSent uint64
DupBlksReceived uint64 DupBlksReceived uint64
DupDataReceived uint64 DupDataReceived uint64
MessagesReceived uint64
} }
func (bs *Bitswap) Stat() (*Stat, error) { func (bs *Bitswap) Stat() (*Stat, error) {
...@@ -30,6 +31,7 @@ func (bs *Bitswap) Stat() (*Stat, error) { ...@@ -30,6 +31,7 @@ func (bs *Bitswap) Stat() (*Stat, error) {
st.BlocksSent = c.blocksSent st.BlocksSent = c.blocksSent
st.DataSent = c.dataSent st.DataSent = c.dataSent
st.DataReceived = c.dataRecvd st.DataReceived = c.dataRecvd
st.MessagesReceived = c.messagesRecvd
bs.counterLk.Unlock() bs.counterLk.Unlock()
peers := bs.engine.Peers() peers := bs.engine.Peers()
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"sync" "sync"
"sync/atomic"
"time" "time"
bsmsg "github.com/ipfs/go-bitswap/message" bsmsg "github.com/ipfs/go-bitswap/message"
...@@ -48,7 +49,7 @@ type message struct { ...@@ -48,7 +49,7 @@ type message struct {
// order* with their delays respected as much as sending them in order allows // order* with their delays respected as much as sending them in order allows
// for // for
type receiverQueue struct { type receiverQueue struct {
receiver bsnet.Receiver receiver *networkClient
queue []*message queue []*message
active bool active bool
lk sync.Mutex lk sync.Mutex
...@@ -104,30 +105,30 @@ func (n *network) SendMessage( ...@@ -104,30 +105,30 @@ func (n *network) SendMessage(
return nil return nil
} }
func (n *network) deliver(
r bsnet.Receiver, from peer.ID, message bsmsg.BitSwapMessage) error {
if message == nil || from == "" {
return errors.New("invalid input")
}
n.delay.Wait()
r.ReceiveMessage(context.TODO(), from, message)
return nil
}
type networkClient struct { type networkClient struct {
local peer.ID local peer.ID
bsnet.Receiver bsnet.Receiver
network *network network *network
routing routing.IpfsRouting routing routing.IpfsRouting
stats bsnet.NetworkStats
} }
func (nc *networkClient) SendMessage( func (nc *networkClient) SendMessage(
ctx context.Context, ctx context.Context,
to peer.ID, to peer.ID,
message bsmsg.BitSwapMessage) error { message bsmsg.BitSwapMessage) error {
return nc.network.SendMessage(ctx, nc.local, to, message) if err := nc.network.SendMessage(ctx, nc.local, to, message); err != nil {
return err
}
atomic.AddUint64(&nc.stats.MessagesSent, 1)
return nil
}
func (nc *networkClient) Stats() bsnet.NetworkStats {
return bsnet.NetworkStats{
MessagesRecvd: atomic.LoadUint64(&nc.stats.MessagesRecvd),
MessagesSent: atomic.LoadUint64(&nc.stats.MessagesSent),
}
} }
// FindProvidersAsync returns a channel of providers for the given key // FindProvidersAsync returns a channel of providers for the given key
...@@ -157,14 +158,14 @@ func (nc *networkClient) ConnectionManager() ifconnmgr.ConnManager { ...@@ -157,14 +158,14 @@ func (nc *networkClient) ConnectionManager() ifconnmgr.ConnManager {
} }
type messagePasser struct { type messagePasser struct {
net *network net *networkClient
target peer.ID target peer.ID
local peer.ID local peer.ID
ctx context.Context ctx context.Context
} }
func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error { func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error {
return mp.net.SendMessage(ctx, mp.local, mp.target, m) return mp.net.SendMessage(ctx, mp.target, m)
} }
func (mp *messagePasser) Close() error { func (mp *messagePasser) Close() error {
...@@ -177,7 +178,7 @@ func (mp *messagePasser) Reset() error { ...@@ -177,7 +178,7 @@ func (mp *messagePasser) Reset() error {
func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) { func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
return &messagePasser{ return &messagePasser{
net: n.network, net: n,
target: p, target: p,
local: n.local, local: n.local,
ctx: ctx, ctx: ctx,
...@@ -241,6 +242,7 @@ func (rq *receiverQueue) process() { ...@@ -241,6 +242,7 @@ func (rq *receiverQueue) process() {
rq.lk.Unlock() rq.lk.Unlock()
time.Sleep(time.Until(m.shouldSend)) time.Sleep(time.Until(m.shouldSend))
atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg) rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
} }
} }
......
...@@ -81,7 +81,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { ...@@ -81,7 +81,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
return i.blockstoreDelay.Set(t) return i.blockstoreDelay.Set(t)
} }
// session creates a test bitswap session. // session creates a test bitswap instance.
// //
// NB: It's easy make mistakes by providing the same peer ID to two different // NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's // sessions. To safeguard, use the SessionGenerator to generate sessions. It's
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment