Commit 15a7d870 authored by Jeromy's avatar Jeromy

some bitswap cleanup

parent 2ee030b6
......@@ -16,11 +16,14 @@ import (
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/eventlog"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)
var log = eventlog.Logger("bitswap")
// Number of providers to request for sending a wantlist to
const maxProvidersPerRequest = 6
// New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as
// the network delegate.
......@@ -97,7 +100,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
ctx, cancelFunc := context.WithCancel(parent)
ctx = eventlog.ContextWithMetadata(ctx, eventlog.Uuid("GetBlockRequest"))
ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
log.Event(ctx, "GetBlockRequestBegin", &k)
defer func() {
......@@ -176,14 +179,29 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
return nil
}
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) {
done := make(chan struct{})
for _, k := range ks {
go func(k u.Key) {
providers := bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
done <- struct{}{}
}(k)
}
for _ = range ks {
<-done
}
}
// TODO ensure only one active request per key
func (bs *bitswap) loop(parent context.Context) {
ctx, cancel := context.WithCancel(parent)
// Every so often, we should resend out our current want list
rebroadcastTime := time.Second * 5
broadcastSignal := time.NewTicker(bs.strategy.GetRebroadcastDelay())
defer func() {
cancel() // signal to derived async functions
......@@ -193,15 +211,12 @@ func (bs *bitswap) loop(parent context.Context) {
for {
select {
case <-broadcastSignal.C:
for _, k := range bs.wantlist.Keys() {
providers := bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
}
bs.sendWantlistToProviders(ctx, bs.wantlist.Keys())
case ks := <-bs.batchRequests:
// TODO: implement batching on len(ks) > X for some X
// i.e. if given 20 keys, fetch first five, then next
// five, and so on, so we are more likely to be able to
// effectively stream the data
if len(ks) == 0 {
log.Warning("Received batch request for zero blocks")
continue
......@@ -232,6 +247,18 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return bs.routing.Provide(ctx, blk.Key())
}
func (bs *bitswap) receiveBlock(ctx context.Context, block *blocks.Block) {
// TODO verify blocks?
if err := bs.blockstore.Put(block); err != nil {
log.Criticalf("error putting block: %s", err)
return
}
err := bs.HasBlock(ctx, block)
if err != nil {
log.Warningf("HasBlock errored: %s", err)
}
}
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) {
......@@ -255,15 +282,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
bs.strategy.MessageReceived(p, incoming) // FIRST
for _, block := range incoming.Blocks() {
// TODO verify blocks?
if err := bs.blockstore.Put(block); err != nil {
log.Criticalf("error putting block: %s", err)
continue // FIXME(brian): err ignored
}
err := bs.HasBlock(ctx, block)
if err != nil {
log.Warningf("HasBlock errored: %s", err)
}
go bs.receiveBlock(ctx, block)
}
for _, key := range incoming.Wantlist() {
......@@ -277,6 +296,8 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
blkmsg := bsmsg.New()
// TODO: only send this the first time
// no sense in sending our wantlist to the
// same peer multiple times
for _, k := range bs.wantlist.Keys() {
blkmsg.AddWanted(k)
}
......
......@@ -148,5 +148,5 @@ func (s *strategist) GetBatchSize() int {
}
func (s *strategist) GetRebroadcastDelay() time.Duration {
return time.Second * 2
return time.Second * 5
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment