Commit 3538a30e authored by Jeromy's avatar Jeromy

implement bitswap sessions

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 7cfa440e
......@@ -7,6 +7,7 @@ import (
"errors"
"math"
"sync"
"sync/atomic"
"time"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
......@@ -17,13 +18,13 @@ import (
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
flags "github.com/ipfs/go-ipfs/flags"
"github.com/ipfs/go-ipfs/thirdparty/delay"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables"
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
......@@ -159,10 +160,15 @@ type Bitswap struct {
blocksSent int
dataSent uint64
dataRecvd uint64
messagesRecvd uint64
// Metrics interface metrics
dupMetric metrics.Histogram
allMetric metrics.Histogram
// Sessions
sessions []*Session
sessLk sync.Mutex
}
type blockRequest struct {
......@@ -173,45 +179,7 @@ type blockRequest struct {
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
if k == nil {
log.Error("nil cid in GetBlock")
return nil, blockstore.ErrNotFound
}
// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
// listen on parent in this scope, but NOT okay to pass |parent| to
// functions called by this one. Otherwise those functions won't return
// when this context's cancel func is executed. This is difficult to
// enforce. May this comment keep you safe.
ctx, cancelFunc := context.WithCancel(parent)
// TODO: this request ID should come in from a higher layer so we can track
// across multiple 'GetBlock' invocations
ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
defer cancelFunc()
promise, err := bs.GetBlocks(ctx, []*cid.Cid{k})
if err != nil {
return nil, err
}
select {
case block, ok := <-promise:
if !ok {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, errors.New("promise channel was closed")
}
}
return block, nil
case <-parent.Done():
return nil, parent.Err()
}
return getBlock(parent, k, bs.GetBlocks)
}
func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid {
......@@ -251,7 +219,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
}
bs.wm.WantBlocks(ctx, keys)
bs.wm.WantBlocks(ctx, keys, nil)
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
......@@ -304,7 +272,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
// CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(cids []*cid.Cid) {
bs.wm.CancelWants(cids)
bs.wm.CancelWants(context.Background(), cids, nil)
}
// HasBlock announces the existance of a block to this bitswap service. The
......@@ -340,7 +308,22 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return nil
}
func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session {
bs.sessLk.Lock()
defer bs.sessLk.Unlock()
var out []*Session
for _, s := range bs.sessions {
if s.InterestedIn(c) {
out = append(out, s)
}
}
return out
}
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
atomic.AddUint64(&bs.messagesRecvd, 1)
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.engine.MessageReceived(p, incoming)
......@@ -362,7 +345,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}
keys = append(keys, block.Cid())
}
bs.wm.CancelWants(keys)
bs.wm.CancelWants(context.Background(), keys, nil)
wg := sync.WaitGroup{}
for _, block := range iblocks {
......@@ -375,6 +359,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
k := b.Cid()
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
for _, ses := range bs.SessionsForBlock(k) {
ses.ReceiveBlock(p, b)
}
log.Debugf("got block %s from %s", b, p)
if err := bs.HasBlock(b); err != nil {
log.Warningf("ReceiveMessage HasBlock error: %s", err)
......
......@@ -370,6 +370,9 @@ func TestDoubleGet(t *testing.T) {
instances := sg.Instances(2)
blocks := bg.Blocks(1)
// NOTE: A race condition can happen here where these GetBlocks requests go
// through before the peers even get connected. This is okay, bitswap
// *should* be able to handle this.
ctx1, cancel1 := context.WithCancel(context.Background())
blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []*cid.Cid{blocks[0].Cid()})
if err != nil {
......@@ -385,7 +388,7 @@ func TestDoubleGet(t *testing.T) {
}
// ensure both requests make it into the wantlist at the same time
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 20)
cancel1()
_, ok := <-blkch1
......@@ -405,6 +408,14 @@ func TestDoubleGet(t *testing.T) {
}
t.Log(blk)
case <-time.After(time.Second * 5):
p1wl := instances[0].Exchange.WantlistForPeer(instances[1].Peer)
if len(p1wl) != 1 {
t.Logf("wantlist view didnt have 1 item (had %d)", len(p1wl))
} else if !p1wl[0].Equals(blocks[0].Cid()) {
t.Logf("had 1 item, it was wrong: %s %s", blocks[0].Cid(), p1wl[0])
} else {
t.Log("had correct wantlist, somehow")
}
t.Fatal("timed out waiting on block")
}
......
......@@ -2,10 +2,10 @@
package decision
import (
"context"
"sync"
"time"
context "context"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
......
package bitswap
import (
"context"
"errors"
blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
)
type getBlocksFunc func(context.Context, []*cid.Cid) (<-chan blocks.Block, error)
func getBlock(p context.Context, k *cid.Cid, gb getBlocksFunc) (blocks.Block, error) {
if k == nil {
log.Error("nil cid in GetBlock")
return nil, blockstore.ErrNotFound
}
// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
// listen on parent in this scope, but NOT okay to pass |parent| to
// functions called by this one. Otherwise those functions won't return
// when this context's cancel func is executed. This is difficult to
// enforce. May this comment keep you safe.
ctx, cancel := context.WithCancel(p)
defer cancel()
promise, err := gb(ctx, []*cid.Cid{k})
if err != nil {
return nil, err
}
select {
case block, ok := <-promise:
if !ok {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, errors.New("promise channel was closed")
}
}
return block, nil
case <-p.Done():
return nil, p.Err()
}
}
type wantFunc func(context.Context, []*cid.Cid)
func getBlocksImpl(ctx context.Context, keys []*cid.Cid, notif notifications.PubSub, want wantFunc, cwants func([]*cid.Cid)) (<-chan blocks.Block, error) {
if len(keys) == 0 {
out := make(chan blocks.Block)
close(out)
return out, nil
}
remaining := cid.NewSet()
promise := notif.Subscribe(ctx, keys...)
for _, k := range keys {
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
remaining.Add(k)
}
want(ctx, keys)
out := make(chan blocks.Block)
go handleIncoming(ctx, remaining, promise, out, cwants)
return out, nil
}
func handleIncoming(ctx context.Context, remaining *cid.Set, in <-chan blocks.Block, out chan blocks.Block, cfun func([]*cid.Cid)) {
ctx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
close(out)
// can't just defer this call on its own, arguments are resolved *when* the defer is created
cfun(remaining.Keys())
}()
for {
select {
case blk, ok := <-in:
if !ok {
return
}
remaining.Remove(blk.Cid())
select {
case out <- blk:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
package bitswap
import (
"context"
"time"
blocks "github.com/ipfs/go-ipfs/blocks"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
)
const activeWantsLimit = 16
type Session struct {
ctx context.Context
tofetch []*cid.Cid
activePeers map[peer.ID]struct{}
activePeersArr []peer.ID
bs *Bitswap
incoming chan blkRecv
newReqs chan []*cid.Cid
cancelKeys chan []*cid.Cid
interest *lru.Cache
liveWants map[string]time.Time
liveCnt int
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
notif notifications.PubSub
uuid logging.Loggable
}
func (bs *Bitswap) NewSession(ctx context.Context) *Session {
s := &Session{
activePeers: make(map[peer.ID]struct{}),
liveWants: make(map[string]time.Time),
newReqs: make(chan []*cid.Cid),
cancelKeys: make(chan []*cid.Cid),
ctx: ctx,
bs: bs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
}
cache, _ := lru.New(2048)
s.interest = cache
bs.sessLk.Lock()
bs.sessions = append(bs.sessions, s)
bs.sessLk.Unlock()
go s.run(ctx)
return s
}
type blkRecv struct {
from peer.ID
blk blocks.Block
}
func (s *Session) ReceiveBlock(from peer.ID, blk blocks.Block) {
s.incoming <- blkRecv{from: from, blk: blk}
}
func (s *Session) InterestedIn(c *cid.Cid) bool {
return s.interest.Contains(c.KeyString())
}
const provSearchDelay = time.Second * 10
func (s *Session) addActivePeer(p peer.ID) {
if _, ok := s.activePeers[p]; !ok {
s.activePeers[p] = struct{}{}
s.activePeersArr = append(s.activePeersArr, p)
}
}
func (s *Session) resetTick() {
if s.latTotal == 0 {
s.tick.Reset(provSearchDelay)
} else {
avLat := s.latTotal / time.Duration(s.fetchcnt)
s.tick.Reset(s.baseTickDelay + (3 * avLat))
}
}
func (s *Session) run(ctx context.Context) {
s.tick = time.NewTimer(provSearchDelay)
newpeers := make(chan peer.ID, 16)
for {
select {
case blk := <-s.incoming:
s.tick.Stop()
s.addActivePeer(blk.from)
s.receiveBlock(ctx, blk.blk)
s.resetTick()
case keys := <-s.newReqs:
for _, k := range keys {
s.interest.Add(k.KeyString(), nil)
}
if s.liveCnt < activeWantsLimit {
toadd := activeWantsLimit - s.liveCnt
if toadd > len(keys) {
toadd = len(keys)
}
s.liveCnt += toadd
now := keys[:toadd]
keys = keys[toadd:]
s.wantBlocks(ctx, now)
}
s.tofetch = append(s.tofetch, keys...)
case keys := <-s.cancelKeys:
s.cancel(keys)
case <-s.tick.C:
var live []*cid.Cid
for c, _ := range s.liveWants {
cs, _ := cid.Cast([]byte(c))
live = append(live, cs)
s.liveWants[c] = time.Now()
}
// Broadcast these keys to everyone we're connected to
s.bs.wm.WantBlocks(ctx, live, nil)
if len(live) > 0 {
go func() {
for p := range s.bs.network.FindProvidersAsync(ctx, live[0], 10) {
newpeers <- p
}
}()
}
s.resetTick()
case p := <-newpeers:
s.addActivePeer(p)
case <-ctx.Done():
return
}
}
}
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
ks := blk.Cid().KeyString()
if _, ok := s.liveWants[ks]; ok {
s.liveCnt--
tval := s.liveWants[ks]
s.latTotal += time.Since(tval)
s.fetchcnt++
delete(s.liveWants, ks)
s.notif.Publish(blk)
if len(s.tofetch) > 0 {
next := s.tofetch[0:1]
s.tofetch = s.tofetch[1:]
s.wantBlocks(ctx, next)
}
}
}
func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) {
for _, c := range ks {
s.liveWants[c.KeyString()] = time.Now()
}
s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr)
}
func (s *Session) cancel(keys []*cid.Cid) {
sset := cid.NewSet()
for _, c := range keys {
sset.Add(c)
}
var i, j int
for ; j < len(s.tofetch); j++ {
if sset.Has(s.tofetch[j]) {
continue
}
s.tofetch[i] = s.tofetch[j]
i++
}
s.tofetch = s.tofetch[:i]
}
func (s *Session) cancelWants(keys []*cid.Cid) {
s.cancelKeys <- keys
}
func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) {
select {
case s.newReqs <- keys:
case <-ctx.Done():
}
}
func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
ctx = logging.ContextWithLoggable(ctx, s.uuid)
return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants)
}
func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
return getBlock(parent, k, s.GetBlocks)
}
package bitswap
import (
"context"
"fmt"
"testing"
"time"
blocks "github.com/ipfs/go-ipfs/blocks"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
)
func TestBasicSessions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
block := bgen.Next()
inst := sesgen.Instances(2)
a := inst[0]
b := inst[1]
if err := b.Blockstore().Put(block); err != nil {
t.Fatal(err)
}
sesa := a.Exchange.NewSession(ctx)
blkout, err := sesa.GetBlock(ctx, block.Cid())
if err != nil {
t.Fatal(err)
}
if !blkout.Cid().Equals(block.Cid()) {
t.Fatal("got wrong block")
}
}
func assertBlockLists(got, exp []blocks.Block) error {
if len(got) != len(exp) {
return fmt.Errorf("got wrong number of blocks, %d != %d", len(got), len(exp))
}
h := cid.NewSet()
for _, b := range got {
h.Add(b.Cid())
}
for _, b := range exp {
if !h.Has(b.Cid()) {
return fmt.Errorf("didnt have: %s", b.Cid())
}
}
return nil
}
func TestSessionBetweenPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
inst := sesgen.Instances(10)
blks := bgen.Blocks(101)
if err := inst[0].Blockstore().PutMany(blks); err != nil {
t.Fatal(err)
}
var cids []*cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}
ses := inst[1].Exchange.NewSession(ctx)
if _, err := ses.GetBlock(ctx, cids[0]); err != nil {
t.Fatal(err)
}
blks = blks[1:]
cids = cids[1:]
for i := 0; i < 10; i++ {
ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10])
if err != nil {
t.Fatal(err)
}
var got []blocks.Block
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
for _, is := range inst[2:] {
if is.Exchange.messagesRecvd > 2 {
t.Fatal("uninvolved nodes should only receive two messages", is.Exchange.messagesRecvd)
}
}
}
func TestSessionSplitFetch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
inst := sesgen.Instances(11)
blks := bgen.Blocks(100)
for i := 0; i < 10; i++ {
if err := inst[i].Blockstore().PutMany(blks[i*10 : (i+1)*10]); err != nil {
t.Fatal(err)
}
}
var cids []*cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}
ses := inst[10].Exchange.NewSession(ctx)
ses.baseTickDelay = time.Millisecond * 10
for i := 0; i < 10; i++ {
ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10])
if err != nil {
t.Fatal(err)
}
var got []blocks.Block
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
}
......@@ -47,7 +47,7 @@ func (g *SessionGenerator) Next() Instance {
if err != nil {
panic("FIXME") // TODO change signature
}
return Session(g.ctx, g.net, p)
return MkSession(g.ctx, g.net, p)
}
func (g *SessionGenerator) Instances(n int) []Instance {
......@@ -86,7 +86,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
func MkSession(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
bsdelay := delay.Fixed(0)
adapter := net.Adapter(p)
......
......@@ -71,13 +71,13 @@ type msgQueue struct {
done chan struct{}
}
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) {
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID) {
log.Infof("want blocks: %s", ks)
pm.addEntries(ctx, ks, false)
pm.addEntries(ctx, ks, peers, false)
}
func (pm *WantManager) CancelWants(ks []*cid.Cid) {
pm.addEntries(context.Background(), ks, true)
func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID) {
pm.addEntries(context.Background(), ks, peers, true)
}
type wantSet struct {
......@@ -85,7 +85,7 @@ type wantSet struct {
targets []peer.ID
}
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) {
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool) {
var entries []*bsmsg.Entry
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
......@@ -98,7 +98,7 @@ func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel boo
})
}
select {
case pm.incoming <- &wantSet{entries: entries}:
case pm.incoming <- &wantSet{entries: entries, targets: targets}:
case <-pm.ctx.Done():
case <-ctx.Done():
}
......
......@@ -49,7 +49,7 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
idmap := logging.LoggableMap{"ID": id}
defer log.Info("bitswap task worker shutting down...")
defer log.Debug("bitswap task worker shutting down...")
for {
log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
select {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment