Commit 59317cc1 authored by hannahhoward's avatar hannahhoward

refactor(bitswap): add comments and extract testutils.go

Add comments to all exported functions, extract the utils for creating instances in testnet.go,
moves integration tests to bitswap_test

BREAKING CHANGE: removed one constant -- rebroadcastDelay -- which I believe was unused
parent 61f12234
package bitswap
package bitswap_test
import (
"context"
......@@ -10,19 +10,21 @@ import (
"time"
"github.com/ipfs/go-bitswap/testutil"
blocks "github.com/ipfs/go-block-format"
bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/session"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
"github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)
type fetchFunc func(b *testing.B, bs *Bitswap, ks []cid.Cid)
type fetchFunc func(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid)
type distFunc func(b *testing.B, provs []Instance, blocks []blocks.Block)
type distFunc func(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block)
type runStats struct {
Dups uint64
......@@ -146,7 +148,7 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, d
start := time.Now()
net := tn.VirtualNetwork(mockrouting.NewServer(), d)
sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
......@@ -160,7 +162,7 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d
start := time.Now()
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()
instances := sg.Instances(numnodes)
......@@ -169,7 +171,7 @@ func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d
runDistribution(b, instances, blocks, df, ff, start)
}
func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {
func runDistribution(b *testing.B, instances []testinstance.Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {
numnodes := len(instances)
......@@ -189,7 +191,7 @@ func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block,
b.Fatal(err)
}
nst := fetcher.Exchange.network.Stats()
nst := fetcher.Adapter.Stats()
stats := runStats{
Time: time.Now().Sub(start),
MsgRecd: nst.MessagesRecvd,
......@@ -204,7 +206,7 @@ func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block,
}
}
func allToAll(b *testing.B, provs []Instance, blocks []blocks.Block) {
func allToAll(b *testing.B, provs []testinstance.Instance, blocks []blocks.Block) {
for _, p := range provs {
if err := p.Blockstore().PutMany(blocks); err != nil {
b.Fatal(err)
......@@ -214,7 +216,7 @@ func allToAll(b *testing.B, provs []Instance, blocks []blocks.Block) {
// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks
// to the second peer. This means both peers have the middle 50 blocks
func overlap1(b *testing.B, provs []Instance, blks []blocks.Block) {
func overlap1(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap1 only works with 2 provs")
}
......@@ -231,7 +233,7 @@ func overlap1(b *testing.B, provs []Instance, blks []blocks.Block) {
// overlap2 gives every even numbered block to the first peer, odd numbered
// blocks to the second. it also gives every third block to both peers
func overlap2(b *testing.B, provs []Instance, blks []blocks.Block) {
func overlap2(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap2 only works with 2 provs")
}
......@@ -252,7 +254,7 @@ func overlap2(b *testing.B, provs []Instance, blks []blocks.Block) {
}
}
func overlap3(b *testing.B, provs []Instance, blks []blocks.Block) {
func overlap3(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
if len(provs) != 2 {
b.Fatal("overlap3 only works with 2 provs")
}
......@@ -277,13 +279,13 @@ func overlap3(b *testing.B, provs []Instance, blks []blocks.Block) {
// onePeerPerBlock picks a random peer to hold each block
// with this layout, we shouldnt actually ever see any duplicate blocks
// but we're mostly just testing performance of the sync algorithm
func onePeerPerBlock(b *testing.B, provs []Instance, blks []blocks.Block) {
func onePeerPerBlock(b *testing.B, provs []testinstance.Instance, blks []blocks.Block) {
for _, blk := range blks {
provs[rand.Intn(len(provs))].Blockstore().Put(blk)
}
}
func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func oneAtATime(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background()).(*bssession.Session)
for _, c := range ks {
_, err := ses.GetBlock(context.Background(), c)
......@@ -295,7 +297,7 @@ func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
}
// fetch data in batches, 10 at a time
func batchFetchBy10(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func batchFetchBy10(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
for i := 0; i < len(ks); i += 10 {
out, err := ses.GetBlocks(context.Background(), ks[i:i+10])
......@@ -308,7 +310,7 @@ func batchFetchBy10(b *testing.B, bs *Bitswap, ks []cid.Cid) {
}
// fetch each block at the same time concurrently
func fetchAllConcurrent(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func fetchAllConcurrent(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
var wg sync.WaitGroup
......@@ -325,7 +327,7 @@ func fetchAllConcurrent(b *testing.B, bs *Bitswap, ks []cid.Cid) {
wg.Wait()
}
func batchFetchAll(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func batchFetchAll(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
out, err := ses.GetBlocks(context.Background(), ks)
if err != nil {
......@@ -336,7 +338,7 @@ func batchFetchAll(b *testing.B, bs *Bitswap, ks []cid.Cid) {
}
// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
func unixfsFileFetch(b *testing.B, bs *Bitswap, ks []cid.Cid) {
func unixfsFileFetch(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
_, err := ses.GetBlock(context.Background(), ks[0])
if err != nil {
......
// package bitswap implements the IPFS exchange interface with the BitSwap
// Package bitswap implements the IPFS exchange interface with the BitSwap
// bilateral exchange protocol.
package bitswap
......@@ -24,7 +24,6 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
delay "github.com/ipfs/go-ipfs-delay"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log"
metrics "github.com/ipfs/go-metrics-interface"
......@@ -43,8 +42,14 @@ const (
)
var (
// ProvideEnabled is a variable that tells Bitswap whether or not
// to handle providing blocks (see experimental provider system)
ProvideEnabled = true
// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
// TODO: Does this need to be this large givent that?
HasBlockBufferSize = 256
provideKeysBufferSize = 2048
provideWorkerMax = 6
......@@ -53,12 +58,9 @@ var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)
var rebroadcastDelay = delay.Fixed(time.Minute)
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
// delegate. Runs until context is cancelled or bitswap.Close is called.
func New(parent context.Context, network bsnet.BitSwapNetwork,
bstore blockstore.Blockstore) exchange.Interface {
......@@ -121,7 +123,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
network.SetDelegate(bs)
// Start up bitswaps async worker routines
bs.startWorkers(px, ctx)
bs.startWorkers(ctx, px)
// bind the context and process.
// do it over here to avoid closing before all setup is done.
......@@ -190,6 +192,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, er
return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
}
// WantlistForPeer returns the currently understood list of blocks requested by a
// given peer.
func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
var out []cid.Cid
for _, e := range bs.engine.WantlistForPeer(p) {
......@@ -198,6 +202,8 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
return out
}
// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
return bs.engine.LedgerForPeer(p)
}
......@@ -258,6 +264,8 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
return nil
}
// ReceiveMessage is called by the network interface when a new message is
// received.
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
bs.counterLk.Lock()
bs.counters.messagesRecvd++
......@@ -300,8 +308,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
wg.Wait()
}
var ErrAlreadyHaveBlock = errors.New("already have block")
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
blkLen := len(b.RawData())
has, err := bs.blockstore.Has(b.Cid())
......@@ -327,28 +333,34 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
}
}
// Connected/Disconnected warns bitswap about peer connections.
// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.wm.Connected(p)
bs.engine.PeerConnected(p)
}
// Connected/Disconnected warns bitswap about peer connections.
// PeerDisconnected is called by the network interface when a peer
// closes a connection
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.wm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}
// ReceiveError is called by the network interface when an error happens
// at the network layer. Currently just logs error.
func (bs *Bitswap) ReceiveError(err error) {
log.Infof("Bitswap ReceiveError: %s", err)
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
}
// Close is called to shutdown Bitswap
func (bs *Bitswap) Close() error {
return bs.process.Close()
}
// GetWantlist returns the current local wantlist.
func (bs *Bitswap) GetWantlist() []cid.Cid {
entries := bs.wm.CurrentWants()
out := make([]cid.Cid, 0, len(entries))
......@@ -358,10 +370,17 @@ func (bs *Bitswap) GetWantlist() []cid.Cid {
return out
}
// IsOnline is needed to match go-ipfs-exchange-interface
func (bs *Bitswap) IsOnline() bool {
return true
}
// NewSession generates a new Bitswap session. You should use this, rather
// that calling Bitswap.GetBlocks, any time you intend to do several related
// block requests in a row. The session returned will have it's own GetBlocks
// method, but the session will use the fact that the requests are related to
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
return bs.sm.NewSession(ctx)
}
package bitswap
package bitswap_test
import (
"bytes"
......@@ -8,11 +8,12 @@ import (
"testing"
"time"
bitswap "github.com/ipfs/go-bitswap"
decision "github.com/ipfs/go-bitswap/decision"
"github.com/ipfs/go-bitswap/message"
bssession "github.com/ipfs/go-bitswap/session"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
detectrace "github.com/ipfs/go-detect-race"
......@@ -35,7 +36,7 @@ func getVirtualNetwork() tn.Network {
func TestClose(t *testing.T) {
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
sesgen := testinstance.NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
......@@ -50,7 +51,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
rs := mockrouting.NewServer()
net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
g := NewTestSessionGenerator(net)
g := testinstance.NewTestSessionGenerator(net)
defer g.Close()
block := blocks.NewBlock([]byte("block"))
......@@ -73,7 +74,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
g := NewTestSessionGenerator(net)
g := testinstance.NewTestSessionGenerator(net)
defer g.Close()
peers := g.Instances(2)
......@@ -101,12 +102,12 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
ProvideEnabled = false
defer func() { ProvideEnabled = true }()
bitswap.ProvideEnabled = false
defer func() { bitswap.ProvideEnabled = true }()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
g := NewTestSessionGenerator(net)
g := testinstance.NewTestSessionGenerator(net)
defer g.Close()
hasBlock := g.Next()
......@@ -143,7 +144,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
bsMessage := message.New(true)
bsMessage.AddBlock(block)
g := NewTestSessionGenerator(net)
g := testinstance.NewTestSessionGenerator(net)
defer g.Close()
peers := g.Instances(2)
......@@ -162,7 +163,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
doesNotWantBlock.Exchange.ReceiveMessage(ctx, hasBlock.Peer, bsMessage)
blockInStore, err := doesNotWantBlock.blockstore.Has(block.Cid())
blockInStore, err := doesNotWantBlock.Blockstore().Has(block.Cid())
if err != nil || blockInStore {
t.Fatal("Unwanted block added to block store")
}
......@@ -200,18 +201,6 @@ func TestLargeFile(t *testing.T) {
PerformDistributionTest(t, numInstances, numBlocks)
}
func TestLargeFileNoRebroadcast(t *testing.T) {
rbd := rebroadcastDelay.Get()
rebroadcastDelay.Set(time.Hour * 24 * 365 * 10) // ten years should be long enough
if testing.Short() {
t.SkipNow()
}
numInstances := 10
numBlocks := 100
PerformDistributionTest(t, numInstances, numBlocks)
rebroadcastDelay.Set(rbd)
}
func TestLargeFileTwoPeers(t *testing.T) {
if testing.Short() {
t.SkipNow()
......@@ -227,7 +216,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.SkipNow()
}
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
......@@ -250,7 +239,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
for _, inst := range instances[1:] {
wg.Add(1)
go func(inst Instance) {
go func(inst testinstance.Instance) {
defer wg.Done()
outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
if err != nil {
......@@ -290,13 +279,10 @@ func TestSendToWantingPeer(t *testing.T) {
}
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
prev := rebroadcastDelay.Set(time.Second / 2)
defer func() { rebroadcastDelay.Set(prev) }()
peers := sg.Instances(2)
peerA := peers[0]
peerB := peers[1]
......@@ -335,7 +321,7 @@ func TestSendToWantingPeer(t *testing.T) {
func TestEmptyKey(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()
bs := sg.Instances(1)[0].Exchange
......@@ -348,7 +334,7 @@ func TestEmptyKey(t *testing.T) {
}
}
func assertStat(t *testing.T, st *Stat, sblks, rblks, sdata, rdata uint64) {
func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint64) {
if sblks != st.BlocksSent {
t.Errorf("mismatch in blocks sent: %d vs %d", sblks, st.BlocksSent)
}
......@@ -368,7 +354,7 @@ func assertStat(t *testing.T, st *Stat, sblks, rblks, sdata, rdata uint64) {
func TestBasicBitswap(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
......@@ -437,7 +423,7 @@ func TestBasicBitswap(t *testing.T) {
func TestDoubleGet(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
......@@ -505,7 +491,7 @@ func TestDoubleGet(t *testing.T) {
func TestWantlistCleanup(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
......@@ -616,7 +602,7 @@ func newReceipt(sent, recv, exchanged uint64) *decision.Receipt {
func TestBitswapLedgerOneWay(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
......@@ -668,7 +654,7 @@ func TestBitswapLedgerOneWay(t *testing.T) {
func TestBitswapLedgerTwoWay(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
sg := testinstance.NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
......
package bitswap
package bitswap_test
import (
"context"
......@@ -7,6 +7,7 @@ import (
"time"
bssession "github.com/ipfs/go-bitswap/session"
testinstance "github.com/ipfs/go-bitswap/testinstance"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
......@@ -18,7 +19,7 @@ func TestBasicSessions(t *testing.T) {
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
sesgen := testinstance.NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
......@@ -66,7 +67,7 @@ func TestSessionBetweenPeers(t *testing.T) {
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
sesgen := testinstance.NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
......@@ -109,7 +110,7 @@ func TestSessionBetweenPeers(t *testing.T) {
t.Fatal(err)
}
if stat.MessagesReceived > 2 {
t.Fatal("uninvolved nodes should only receive two messages", is.Exchange.counters.messagesRecvd)
t.Fatal("uninvolved nodes should only receive two messages", stat.MessagesReceived)
}
}
}
......@@ -119,7 +120,7 @@ func TestSessionSplitFetch(t *testing.T) {
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
sesgen := testinstance.NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
......@@ -162,7 +163,7 @@ func TestFetchNotConnected(t *testing.T) {
bssession.SetProviderSearchDelay(10 * time.Millisecond)
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
sesgen := testinstance.NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
......@@ -202,7 +203,7 @@ func TestInterestCacheOverflow(t *testing.T) {
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
sesgen := testinstance.NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
......@@ -254,7 +255,7 @@ func TestPutAfterSessionCacheEvict(t *testing.T) {
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
sesgen := testinstance.NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
......@@ -294,7 +295,7 @@ func TestMultipleSessions(t *testing.T) {
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
sesgen := testinstance.NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
......@@ -337,7 +338,7 @@ func TestWantlistClearsOnCancel(t *testing.T) {
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
sesgen := testinstance.NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
......
// package decision implements the decision engine for the bitswap service.
// Package decision implements the decision engine for the bitswap service.
package decision
import (
......@@ -68,6 +68,7 @@ type Envelope struct {
Sent func()
}
// Engine manages sending requested blocks to peers.
type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
// Requests are popped from the queue, packaged up, and placed in the
......@@ -94,6 +95,7 @@ type Engine struct {
ticker *time.Ticker
}
// NewEngine creates a new block sending engine for the given block store
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
......@@ -107,6 +109,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
return e
}
// WantlistForPeer returns the currently understood want list for a given peer
func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) {
partner := e.findOrCreate(p)
partner.lk.Lock()
......@@ -114,6 +117,8 @@ func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) {
return partner.wantList.SortedEntries()
}
// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
ledger := e.findOrCreate(p)
......@@ -295,6 +300,8 @@ func (e *Engine) addBlock(block blocks.Block) {
}
}
// AddBlock is called to when a new block is received and added to a block store
// meaning there may be peers who want that block that we should send it to.
func (e *Engine) AddBlock(block blocks.Block) {
e.lock.Lock()
defer e.lock.Unlock()
......@@ -308,6 +315,8 @@ func (e *Engine) AddBlock(block blocks.Block) {
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically
// MessageSent is called when a message has successfully been sent out, to record
// changes.
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
l := e.findOrCreate(p)
l.lk.Lock()
......@@ -321,6 +330,8 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
}
// PeerConnected is called when a new peer connects, meaning we should start
// sending blocks.
func (e *Engine) PeerConnected(p peer.ID) {
e.lock.Lock()
defer e.lock.Unlock()
......@@ -334,6 +345,7 @@ func (e *Engine) PeerConnected(p peer.ID) {
l.ref++
}
// PeerDisconnected is called when a peer disconnects.
func (e *Engine) PeerDisconnected(p peer.ID) {
e.lock.Lock()
defer e.lock.Unlock()
......
......@@ -47,6 +47,9 @@ type ledger struct {
lk sync.Mutex
}
// Receipt is a summary of the ledger for a given peer
// collecting various pieces of aggregated data for external
// reporting purposes.
type Receipt struct {
Peer string
Value float64
......
......@@ -30,4 +30,9 @@ require (
github.com/libp2p/go-libp2p-routing v0.0.1
github.com/libp2p/go-testutil v0.0.1
github.com/multiformats/go-multiaddr v0.0.1
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 // indirect
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c // indirect
golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862 // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/tools v0.0.0-20190509153222-73554e0f7805 // indirect
)
......@@ -241,21 +241,37 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 h1:jsG6UpNLt9iAsb0S2AGW28DveNzzgmbXR+ENoPjUeIU=
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180524181706-dfa909b99c79/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190227160552-c95aed5357e7 h1:C2F/nMkR/9sfUTpvR3QrjBuTdvMUC/cFajkphs1YLQo=
golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e h1:ZytStCyV048ZqDsWHiYDdoI2Vd4msMcrDECFxS+tL9c=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862 h1:rM0ROo5vb9AdYJi1110yjWGMej9ITfKddS89P3Fkhug=
golang.org/x/sys v0.0.0-20190509141414-a5b02f93d862/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190509153222-73554e0f7805 h1:1ufBXAsTpUhSmmPXEEs5PrGQSfnBhsjAd2SmVhp9xrY=
golang.org/x/tools v0.0.0-20190509153222-73554e0f7805/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
......
......@@ -13,9 +13,8 @@ import (
inet "github.com/libp2p/go-libp2p-net"
)
// TODO move message.go into the bitswap package
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb
// BitSwapMessage is the basic interface for interacting building, encoding,
// and decoding messages sent on the BitSwap protocol.
type BitSwapMessage interface {
// Wantlist returns a slice of unique keys that represent data wanted by
// the sender.
......@@ -40,6 +39,8 @@ type BitSwapMessage interface {
Loggable() map[string]interface{}
}
// Exportable is an interface for structures than can be
// encoded in a bitswap protobuf.
type Exportable interface {
ToProtoV0() *pb.Message
ToProtoV1() *pb.Message
......@@ -53,6 +54,7 @@ type impl struct {
blocks map[cid.Cid]blocks.Block
}
// New returns a new, empty bitswap message
func New(full bool) BitSwapMessage {
return newMsg(full)
}
......@@ -65,6 +67,8 @@ func newMsg(full bool) *impl {
}
}
// Entry is an wantlist entry in a Bitswap message (along with whether it's an
// add or cancel).
type Entry struct {
wantlist.Entry
Cancel bool
......@@ -163,11 +167,13 @@ func (m *impl) AddBlock(b blocks.Block) {
m.blocks[b.Cid()] = b
}
// FromNet generates a new BitswapMessage from incoming data on an io.Reader.
func FromNet(r io.Reader) (BitSwapMessage, error) {
pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
return FromPBReader(pbr)
}
// FromPBReader generates a new Bitswap message from a gogo-protobuf reader
func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
pb := new(pb.Message)
if err := pbr.ReadMsg(pb); err != nil {
......
......@@ -12,10 +12,12 @@ import (
)
var (
// These two are equivalent, legacy
ProtocolBitswapOne protocol.ID = "/ipfs/bitswap/1.0.0"
// ProtocolBitswapOne is the prefix for the legacy bitswap protocol
ProtocolBitswapOne protocol.ID = "/ipfs/bitswap/1.0.0"
// ProtocolBitswapNoVers is equivalent to the legacy bitswap protocol
ProtocolBitswapNoVers protocol.ID = "/ipfs/bitswap"
// ProtocolBitswap is the current version of bitswap protocol, 1.1.0
ProtocolBitswap protocol.ID = "/ipfs/bitswap/1.1.0"
)
......@@ -38,18 +40,20 @@ type BitSwapNetwork interface {
ConnectionManager() ifconnmgr.ConnManager
Stats() NetworkStats
Stats() Stats
Routing
}
// MessageSender is an interface for sending a series of messages over the bitswap
// network
type MessageSender interface {
SendMsg(context.Context, bsmsg.BitSwapMessage) error
Close() error
Reset() error
}
// Implement Receiver to receive messages from the BitSwapNetwork.
// Receiver is an interface that can receive messages from the BitSwapNetwork.
type Receiver interface {
ReceiveMessage(
ctx context.Context,
......@@ -63,6 +67,8 @@ type Receiver interface {
PeerDisconnected(peer.ID)
}
// Routing is an interface to providing and finding providers on a bitswap
// network.
type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key.
FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
......@@ -71,10 +77,10 @@ type Routing interface {
Provide(context.Context, cid.Cid) error
}
// NetworkStats is a container for statistics about the bitswap network
// Stats is a container for statistics about the bitswap network
// the numbers inside are specific to bitswap, and not any other protocols
// using the same underlying network.
type NetworkStats struct {
type Stats struct {
MessagesSent uint64
MessagesRecvd uint64
}
......@@ -49,7 +49,7 @@ type impl struct {
// inbound messages from the network are forwarded to the receiver
receiver Receiver
stats NetworkStats
stats Stats
}
type streamMessageSender struct {
......@@ -201,8 +201,8 @@ func (bsnet *impl) ConnectionManager() ifconnmgr.ConnManager {
return bsnet.host.ConnManager()
}
func (bsnet *impl) Stats() NetworkStats {
return NetworkStats{
func (bsnet *impl) Stats() Stats {
return Stats{
MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd),
MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent),
}
......
......@@ -11,12 +11,16 @@ import (
const bufferSize = 16
// PubSub is a simple interface for publishing blocks and being able to subscribe
// for cids. It's used internally by bitswap to decouple receiving blocks
// and actually providing them back to the GetBlocks caller.
type PubSub interface {
Publish(block blocks.Block)
Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block
Shutdown()
}
// New generates a new PubSub interface.
func New() PubSub {
return &impl{
wrapped: *pubsub.New(bufferSize),
......
......@@ -6,6 +6,7 @@ import (
cid "github.com/ipfs/go-cid"
)
// Stat is a struct that provides various statistics on bitswap operations
type Stat struct {
ProvideBufLen int
Wantlist []cid.Cid
......@@ -19,6 +20,7 @@ type Stat struct {
MessagesReceived uint64
}
// Stat returns aggregated statistics about bitswap operations
func (bs *Bitswap) Stat() (*Stat, error) {
st := new(Stat)
st.ProvideBufLen = len(bs.newBlocks)
......
package bitswap
package testsession
import (
"context"
"time"
bitswap "github.com/ipfs/go-bitswap"
bsnet "github.com/ipfs/go-bitswap/network"
tn "github.com/ipfs/go-bitswap/testnet"
ds "github.com/ipfs/go-datastore"
delayed "github.com/ipfs/go-datastore/delayed"
ds_sync "github.com/ipfs/go-datastore/sync"
......@@ -16,7 +17,8 @@ import (
testutil "github.com/libp2p/go-testutil"
)
// WARNING: this uses RandTestBogusIdentity DO NOT USE for NON TESTS!
// NewTestSessionGenerator generates a new SessionGenerator for the given
// testnet
func NewTestSessionGenerator(
net tn.Network) SessionGenerator {
ctx, cancel := context.WithCancel(context.Background())
......@@ -28,7 +30,7 @@ func NewTestSessionGenerator(
}
}
// TODO move this SessionGenerator to the core package and export it as the core generator
// SessionGenerator generates new test instances of bitswap+dependencies
type SessionGenerator struct {
seq int
net tn.Network
......@@ -36,11 +38,13 @@ type SessionGenerator struct {
cancel context.CancelFunc
}
// Close closes the clobal context, shutting down all test instances
func (g *SessionGenerator) Close() error {
g.cancel()
return nil // for Closer interface
}
// Next generates a new instance of bitswap + dependencies
func (g *SessionGenerator) Next() Instance {
g.seq++
p, err := p2ptestutil.RandTestBogusIdentity()
......@@ -50,6 +54,7 @@ func (g *SessionGenerator) Next() Instance {
return MkSession(g.ctx, g.net, p)
}
// Instances creates N test instances of bitswap + dependencies
func (g *SessionGenerator) Instances(n int) []Instance {
var instances []Instance
for j := 0; j < n; j++ {
......@@ -59,29 +64,33 @@ func (g *SessionGenerator) Instances(n int) []Instance {
for i, inst := range instances {
for j := i + 1; j < len(instances); j++ {
oinst := instances[j]
inst.Exchange.network.ConnectTo(context.Background(), oinst.Peer)
inst.Adapter.ConnectTo(context.Background(), oinst.Peer)
}
}
return instances
}
// Instance is a test instance of bitswap + dependencies for integration testing
type Instance struct {
Peer peer.ID
Exchange *Bitswap
blockstore blockstore.Blockstore
Peer peer.ID
Exchange *bitswap.Bitswap
blockstore blockstore.Blockstore
Adapter bsnet.BitSwapNetwork
blockstoreDelay delay.D
}
// Blockstore returns the block store for this test instance
func (i *Instance) Blockstore() blockstore.Blockstore {
return i.blockstore
}
// SetBlockstoreLatency customizes the artificial delay on receiving blocks
// from a blockstore test instance.
func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
return i.blockstoreDelay.Set(t)
}
// session creates a test bitswap instance.
// MkSession creates a test bitswap instance.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
......@@ -99,9 +108,10 @@ func MkSession(ctx context.Context, net tn.Network, p testutil.Identity) Instanc
panic(err.Error()) // FIXME perhaps change signature and return error.
}
bs := New(ctx, adapter, bstore).(*Bitswap)
bs := bitswap.New(ctx, adapter, bstore).(*bitswap.Bitswap)
return Instance{
Adapter: adapter,
Peer: p.ID(),
Exchange: bs,
blockstore: bstore,
......
......@@ -6,6 +6,8 @@ import (
"github.com/libp2p/go-testutil"
)
// Network is an interface for generating bitswap network interfaces
// based on a test network.
type Network interface {
Adapter(testutil.Identity) bsnet.BitSwapNetwork
......
......@@ -17,6 +17,7 @@ type peernet struct {
routingserver mockrouting.Server
}
// StreamNet is a testnet that uses libp2p's MockNet
func StreamNet(ctx context.Context, net mockpeernet.Mocknet, rs mockrouting.Server) (Network, error) {
return &peernet{net, rs}, nil
}
......
......@@ -24,6 +24,8 @@ import (
var log = logging.Logger("bstestnet")
// VirtualNetwork generates a new testnet instance - a fake network that
// is used to simulate sending messages.
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
return &network{
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
......@@ -36,10 +38,13 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
}
}
// RateLimitGenerator is an interface for generating rate limits across peers
type RateLimitGenerator interface {
NextRateLimit() float64
}
// RateLimitedVirtualNetwork generates a testnet instance where nodes are rate
// limited in the upload/download speed.
func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network {
return &network{
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
......@@ -168,7 +173,7 @@ type networkClient struct {
bsnet.Receiver
network *network
routing routing.IpfsRouting
stats bsnet.NetworkStats
stats bsnet.Stats
}
func (nc *networkClient) SendMessage(
......@@ -182,8 +187,8 @@ func (nc *networkClient) SendMessage(
return nil
}
func (nc *networkClient) Stats() bsnet.NetworkStats {
return bsnet.NetworkStats{
func (nc *networkClient) Stats() bsnet.Stats {
return bsnet.Stats{
MessagesRecvd: atomic.LoadUint64(&nc.stats.MessagesRecvd),
MessagesSent: atomic.LoadUint64(&nc.stats.MessagesSent),
}
......@@ -234,11 +239,11 @@ func (mp *messagePasser) Reset() error {
return nil
}
func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
return &messagePasser{
net: n,
net: nc,
target: p,
local: n.local,
local: nc.local,
ctx: ctx,
}, nil
}
......
......@@ -5,7 +5,7 @@ import (
bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/ipfs/go-bitswap/wantlist"
"github.com/ipfs/go-block-format"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
peer "github.com/libp2p/go-libp2p-peer"
......
......@@ -8,14 +8,18 @@ import (
cid "github.com/ipfs/go-cid"
)
// SessionTrackedWantlist is a list of wants that also track which bitswap
// sessions have requested them
type SessionTrackedWantlist struct {
set map[cid.Cid]*sessionTrackedEntry
}
// Wantlist is a raw list of wanted blocks and their priorities
type Wantlist struct {
set map[cid.Cid]Entry
}
// Entry is an entry in a want list, consisting of a cid and its priority
type Entry struct {
Cid cid.Cid
Priority int
......@@ -40,12 +44,14 @@ func (es entrySlice) Len() int { return len(es) }
func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priority }
// NewSessionTrackedWantlist generates a new SessionTrackedWantList.
func NewSessionTrackedWantlist() *SessionTrackedWantlist {
return &SessionTrackedWantlist{
set: make(map[cid.Cid]*sessionTrackedEntry),
}
}
// New generates a new raw Wantlist
func New() *Wantlist {
return &Wantlist{
set: make(map[cid.Cid]Entry),
......@@ -116,6 +122,7 @@ func (w *SessionTrackedWantlist) Contains(k cid.Cid) (Entry, bool) {
return e.Entry, true
}
// Entries returns all wantlist entries for a given session tracked want list.
func (w *SessionTrackedWantlist) Entries() []Entry {
es := make([]Entry, 0, len(w.set))
for _, e := range w.set {
......@@ -124,16 +131,20 @@ func (w *SessionTrackedWantlist) Entries() []Entry {
return es
}
// SortedEntries returns wantlist entries ordered by priority.
func (w *SessionTrackedWantlist) SortedEntries() []Entry {
es := w.Entries()
sort.Sort(entrySlice(es))
return es
}
// Len returns the number of entries in a wantlist.
func (w *SessionTrackedWantlist) Len() int {
return len(w.set)
}
// CopyWants copies all wants from one SessionTrackWantlist to another (along with
// the session data)
func (w *SessionTrackedWantlist) CopyWants(to *SessionTrackedWantlist) {
for _, e := range w.set {
for k := range e.sesTrk {
......@@ -142,10 +153,12 @@ func (w *SessionTrackedWantlist) CopyWants(to *SessionTrackedWantlist) {
}
}
// Len returns the number of entries in a wantlist.
func (w *Wantlist) Len() int {
return len(w.set)
}
// Add adds an entry in a wantlist from CID & Priority, if not already present.
func (w *Wantlist) Add(c cid.Cid, priority int) bool {
if _, ok := w.set[c]; ok {
return false
......@@ -159,6 +172,7 @@ func (w *Wantlist) Add(c cid.Cid, priority int) bool {
return true
}
// AddEntry adds an entry to a wantlist if not already present.
func (w *Wantlist) AddEntry(e Entry) bool {
if _, ok := w.set[e.Cid]; ok {
return false
......@@ -167,6 +181,7 @@ func (w *Wantlist) AddEntry(e Entry) bool {
return true
}
// Remove removes the given cid from the wantlist.
func (w *Wantlist) Remove(c cid.Cid) bool {
_, ok := w.set[c]
if !ok {
......@@ -177,11 +192,14 @@ func (w *Wantlist) Remove(c cid.Cid) bool {
return true
}
// Contains returns the entry, if present, for the given CID, plus whether it
// was present.
func (w *Wantlist) Contains(c cid.Cid) (Entry, bool) {
e, ok := w.set[c]
return e, ok
}
// Entries returns all wantlist entries for a want list.
func (w *Wantlist) Entries() []Entry {
es := make([]Entry, 0, len(w.set))
for _, e := range w.set {
......@@ -190,6 +208,7 @@ func (w *Wantlist) Entries() []Entry {
return es
}
// SortedEntries returns wantlist entries ordered by priority.
func (w *Wantlist) SortedEntries() []Entry {
es := w.Entries()
sort.Sort(entrySlice(es))
......
......@@ -11,9 +11,11 @@ import (
procctx "github.com/jbenet/goprocess/context"
)
// TaskWorkerCount is the total number of simultaneous threads sending
// outgoing messages
var TaskWorkerCount = 8
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
func (bs *Bitswap) startWorkers(ctx context.Context, px process.Process) {
// Start up workers to handle requests from other nodes for the data on this node
for i := 0; i < TaskWorkerCount; i++ {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment