Commit a159e682 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

implement peermanager to control outgoing messages

Also more refactoring of bitswap in general, including some perf
improvements and eventlog removal.

clean up, and buffer channels

move some things around

correctly buffer work messages

more cleanup, and improve test perf

remove unneccessary test

revert changes to bitswap message, they werent necessary
parent f0276dc3
......@@ -91,7 +91,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan u.Key),
pm: NewPeerManager(network),
}
go bs.pm.Run(ctx)
network.SetDelegate(bs)
// Start up bitswaps async worker routines
......@@ -108,6 +110,10 @@ type Bitswap struct {
// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
pm *PeerManager
// blockstore is the local database
// NB: ensure threadsafety
blockstore blockstore.Blockstore
......@@ -217,7 +223,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
log.Event(ctx, "hasBlock", blk)
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
......@@ -227,6 +232,7 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
if err := bs.blockstore.Put(blk); err != nil {
return err
}
bs.wantlist.Remove(blk.Key())
bs.notifications.Publish(blk)
select {
......@@ -239,7 +245,6 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
set := pset.New()
wg := sync.WaitGroup{}
loop:
for {
......@@ -253,37 +258,22 @@ loop:
continue
}
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
if err := bs.send(ctx, p, m); err != nil {
log.Debug(err) // TODO remove if too verbose
}
}(peerToQuery)
bs.pm.Send(peerToQuery, m)
case <-ctx.Done():
return nil
}
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
// NB: we may be abandoning goroutines here before they complete
// this shouldnt be an issue because they will complete soon anyways
// we just don't want their being slow to impact bitswap transfer speeds
}
return nil
}
func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
entries := bs.wantlist.Entries()
if len(entries) == 0 {
return nil
}
message := bsmsg.New()
message.SetFull(true)
for _, wanted := range bs.wantlist.Entries() {
for _, wanted := range entries {
message.AddEntry(wanted.Key, wanted.Priority)
}
return bs.sendWantlistMsgToPeers(ctx, message, peers)
......@@ -326,7 +316,7 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
// TODO(brian): handle errors
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error {
defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
//defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
......@@ -356,6 +346,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
// Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerConnected(p peer.ID) {
// TODO: add to clientWorker??
bs.pm.Connected(p)
peers := make(chan peer.ID, 1)
peers <- p
close(peers)
......@@ -367,6 +358,7 @@ func (bs *Bitswap) PeerConnected(p peer.ID) {
// Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.pm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}
......@@ -381,19 +373,7 @@ func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
message.Cancel(k)
}
wg := sync.WaitGroup{}
for _, p := range bs.engine.Peers() {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
err := bs.send(ctx, p, message)
if err != nil {
log.Warningf("Error sending message: %s", err)
return
}
}(p)
}
wg.Wait()
bs.pm.Broadcast(message)
return
}
......@@ -408,29 +388,7 @@ func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
message.AddEntry(k, kMaxPriority-i)
}
wg := sync.WaitGroup{}
for _, p := range bs.engine.Peers() {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
err := bs.send(ctx, p, message)
if err != nil {
log.Debugf("Error sending message: %s", err)
}
}(p)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
// NB: we may be abandoning goroutines here before they complete
// this shouldnt be an issue because they will complete soon anyways
// we just don't want their being slow to impact bitswap transfer speeds
}
bs.pm.Broadcast(message)
}
func (bs *Bitswap) ReceiveError(err error) {
......@@ -439,16 +397,6 @@ func (bs *Bitswap) ReceiveError(err error) {
// TODO bubble the network error up to the parent context/error logger
}
// send strives to ensure that accounting is always performed when a message is
// sent
func (bs *Bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
defer log.EventBegin(ctx, "sendMessage", p, m).Done()
if err := bs.network.SendMessage(ctx, p, m); err != nil {
return err
}
return bs.engine.MessageSent(p, m)
}
func (bs *Bitswap) Close() error {
return bs.process.Close()
}
......
......@@ -13,7 +13,6 @@ import (
blocks "github.com/ipfs/go-ipfs/blocks"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
delay "github.com/ipfs/go-ipfs/thirdparty/delay"
u "github.com/ipfs/go-ipfs/util"
......@@ -36,30 +35,6 @@ func TestClose(t *testing.T) {
bitswap.Exchange.GetBlock(context.Background(), block.Key())
}
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
rs := mockrouting.NewServer()
net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
g := NewTestSessionGenerator(net)
defer g.Close()
block := blocks.NewBlock([]byte("block"))
pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
solo := g.Next()
defer solo.Exchange.Close()
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
_, err := solo.Exchange.GetBlock(ctx, block.Key())
if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error")
}
}
// TestGetBlockAfterRequesting...
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
......@@ -67,14 +42,15 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
g := NewTestSessionGenerator(net)
defer g.Close()
hasBlock := g.Next()
peers := g.Instances(2)
hasBlock := peers[0]
defer hasBlock.Exchange.Close()
if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
wantsBlock := g.Next()
wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()
ctx, _ := context.WithTimeout(context.Background(), time.Second)
......@@ -196,8 +172,9 @@ func TestSendToWantingPeer(t *testing.T) {
prev := rebroadcastDelay.Set(time.Second / 2)
defer func() { rebroadcastDelay.Set(prev) }()
peerA := sg.Next()
peerB := sg.Next()
peers := sg.Instances(2)
peerA := peers[0]
peerB := peers[1]
t.Logf("Session %v\n", peerA.Peer)
t.Logf("Session %v\n", peerB.Peer)
......
......@@ -5,6 +5,7 @@ import (
"sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
......@@ -53,8 +54,9 @@ const (
type Envelope struct {
// Peer is the intended recipient
Peer peer.ID
// Message is the payload
Message bsmsg.BitSwapMessage
// Block is the payload
Block *blocks.Block
// A callback to notify the decision queue that the task is complete
Sent func()
......@@ -151,12 +153,10 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
continue
}
m := bsmsg.New() // TODO: maybe add keys from our wantlist?
m.AddBlock(block)
return &Envelope{
Peer: nextTask.Target,
Message: m,
Sent: nextTask.Done,
Peer: nextTask.Target,
Block: block,
Sent: nextTask.Done,
}, nil
}
}
......@@ -185,7 +185,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
defer e.lock.Unlock()
if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
log.Debug("received empty message from", p)
log.Debugf("received empty message from %s", p)
}
newWorkExists := false
......@@ -202,11 +202,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debug("cancel", entry.Key)
log.Debugf("cancel %s", entry.Key)
l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p)
} else {
log.Debug("wants", entry.Key, entry.Priority)
log.Debugf("wants %s", entry.Key, entry.Priority)
l.Wants(entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p)
......@@ -216,7 +216,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
}
for _, block := range m.Blocks() {
log.Debug("got block %s %d bytes", block.Key(), len(block.Data))
log.Debugf("got block %s %d bytes", block.Key(), len(block.Data))
l.ReceivedBytes(len(block.Data))
for _, l := range e.ledgerMap {
if entry, ok := l.WantListContains(block.Key()); ok {
......
......@@ -185,7 +185,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
for _, k := range keys {
next := <-e.Outbox()
envelope := <-next
received := envelope.Message.Blocks()[0]
received := envelope.Block
expected := blocks.NewBlock([]byte(k))
if received.Key() != expected.Key() {
return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data)))
......
......@@ -156,7 +156,7 @@ func (t *peerRequestTask) SetIndex(i int) {
// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.ID, k u.Key) string {
return string(p.String() + k.String())
return string(p) + string(k)
}
// FIFO is a basic task comparator that returns tasks in the order created.
......
......@@ -29,6 +29,8 @@ type BitSwapMessage interface {
Cancel(key u.Key)
Empty() bool
// Sets whether or not the contained wantlist represents the entire wantlist
// true = full wantlist
// false = wantlist 'patch'
......@@ -51,7 +53,7 @@ type Exportable interface {
type impl struct {
full bool
wantlist map[u.Key]Entry
blocks map[u.Key]*blocks.Block // map to detect duplicates
blocks map[u.Key]*blocks.Block
}
func New() BitSwapMessage {
......@@ -92,6 +94,10 @@ func (m *impl) Full() bool {
return m.full
}
func (m *impl) Empty() bool {
return len(m.blocks) == 0 && len(m.wantlist) == 0
}
func (m *impl) Wantlist() []Entry {
var out []Entry
for _, e := range m.wantlist {
......@@ -101,7 +107,7 @@ func (m *impl) Wantlist() []Entry {
}
func (m *impl) Blocks() []*blocks.Block {
bs := make([]*blocks.Block, 0)
bs := make([]*blocks.Block, 0, len(m.blocks))
for _, block := range m.blocks {
bs = append(bs, block)
}
......@@ -109,6 +115,7 @@ func (m *impl) Blocks() []*blocks.Block {
}
func (m *impl) Cancel(k u.Key) {
delete(m.wantlist, k)
m.addEntry(k, 0, true)
}
......
......@@ -23,6 +23,8 @@ type BitSwapNetwork interface {
// network.
SetDelegate(Receiver)
ConnectTo(context.Context, peer.ID) error
Routing
}
......
......@@ -97,6 +97,10 @@ func (bsnet *impl) SetDelegate(r Receiver) {
bsnet.receiver = r
}
func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p})
}
// FindProvidersAsync returns a channel of providers for the given key
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
......
package bitswap
import (
"sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
peer "github.com/ipfs/go-ipfs/p2p/peer"
u "github.com/ipfs/go-ipfs/util"
)
type PeerManager struct {
receiver bsnet.Receiver
incoming chan *msgPair
connect chan peer.ID
disconnect chan peer.ID
peers map[peer.ID]*msgQueue
network bsnet.BitSwapNetwork
}
func NewPeerManager(network bsnet.BitSwapNetwork) *PeerManager {
return &PeerManager{
incoming: make(chan *msgPair, 10),
connect: make(chan peer.ID, 10),
disconnect: make(chan peer.ID, 10),
peers: make(map[peer.ID]*msgQueue),
network: network,
}
}
type msgPair struct {
to peer.ID
msg bsmsg.BitSwapMessage
}
type cancellation struct {
who peer.ID
blk u.Key
}
type msgQueue struct {
p peer.ID
lk sync.Mutex
wlmsg bsmsg.BitSwapMessage
work chan struct{}
done chan struct{}
}
func (pm *PeerManager) SendBlock(env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer env.Sent()
msg := bsmsg.New()
msg.AddBlock(env.Block)
err := pm.network.SendMessage(context.TODO(), env.Peer, msg)
if err != nil {
log.Error(err)
}
}
func (pm *PeerManager) startPeerHandler(p peer.ID) {
_, ok := pm.peers[p]
if ok {
// TODO: log an error?
return
}
mq := new(msgQueue)
mq.done = make(chan struct{})
mq.work = make(chan struct{}, 1)
mq.p = p
pm.peers[p] = mq
go pm.runQueue(mq)
}
func (pm *PeerManager) stopPeerHandler(p peer.ID) {
pq, ok := pm.peers[p]
if !ok {
// TODO: log error?
return
}
close(pq.done)
delete(pm.peers, p)
}
func (pm *PeerManager) runQueue(mq *msgQueue) {
for {
select {
case <-mq.work: // there is work to be done
// TODO: this might not need to be done every time, figure out
// a good heuristic
err := pm.network.ConnectTo(context.TODO(), mq.p)
if err != nil {
log.Error(err)
// TODO: cant connect, what now?
}
// grab messages from queue
mq.lk.Lock()
wlm := mq.wlmsg
mq.wlmsg = nil
mq.lk.Unlock()
if wlm != nil && !wlm.Empty() {
// send wantlist updates
err = pm.network.SendMessage(context.TODO(), mq.p, wlm)
if err != nil {
log.Error("bitswap send error: ", err)
// TODO: what do we do if this fails?
}
}
case <-mq.done:
return
}
}
}
func (pm *PeerManager) Send(to peer.ID, msg bsmsg.BitSwapMessage) {
if len(msg.Blocks()) > 0 {
panic("no blocks here!")
}
pm.incoming <- &msgPair{to: to, msg: msg}
}
func (pm *PeerManager) Broadcast(msg bsmsg.BitSwapMessage) {
pm.incoming <- &msgPair{msg: msg}
}
func (pm *PeerManager) Connected(p peer.ID) {
pm.connect <- p
}
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.disconnect <- p
}
// TODO: use goprocess here once i trust it
func (pm *PeerManager) Run(ctx context.Context) {
for {
select {
case msgp := <-pm.incoming:
// Broadcast message to all if recipient not set
if msgp.to == "" {
for _, p := range pm.peers {
p.addMessage(msgp.msg)
}
continue
}
p, ok := pm.peers[msgp.to]
if !ok {
//TODO: decide, drop message? or dial?
pm.startPeerHandler(msgp.to)
p = pm.peers[msgp.to]
}
p.addMessage(msgp.msg)
case p := <-pm.connect:
pm.startPeerHandler(p)
case p := <-pm.disconnect:
pm.stopPeerHandler(p)
case <-ctx.Done():
return
}
}
}
func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) {
mq.lk.Lock()
defer func() {
mq.lk.Unlock()
select {
case mq.work <- struct{}{}:
default:
}
}()
if mq.wlmsg == nil || msg.Full() {
mq.wlmsg = msg
return
}
// TODO: add a msg.Combine(...) method
for _, e := range msg.Wantlist() {
if e.Cancel {
mq.wlmsg.Cancel(e.Key)
} else {
mq.wlmsg.AddEntry(e.Key, e.Priority)
}
}
}
......@@ -119,3 +119,12 @@ func (nc *networkClient) Provide(ctx context.Context, k util.Key) error {
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
nc.Receiver = r
}
func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error {
if !nc.network.HasPeer(p) {
return errors.New("no such peer in network")
}
nc.network.clients[p].PeerConnected(nc.local)
nc.Receiver.PeerConnected(p)
return nil
}
......@@ -7,7 +7,6 @@ import (
ds_sync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
peer "github.com/ipfs/go-ipfs/p2p/peer"
p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
......@@ -56,12 +55,18 @@ func (g *SessionGenerator) Instances(n int) []Instance {
inst := g.Next()
instances = append(instances, inst)
}
for i, inst := range instances {
for j := i + 1; j < len(instances); j++ {
oinst := instances[j]
inst.Exchange.PeerConnected(oinst.Peer)
}
}
return instances
}
type Instance struct {
Peer peer.ID
Exchange exchange.Interface
Exchange *Bitswap
blockstore blockstore.Blockstore
blockstoreDelay delay.D
......@@ -94,7 +99,7 @@ func session(ctx context.Context, net tn.Network, p testutil.Identity) Instance
const alwaysSendToPeer = true
bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer)
bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer).(*Bitswap)
return Instance{
Peer: p.ID(),
......
......@@ -70,9 +70,9 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
if !ok {
continue
}
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
envelope.Sent()
//log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.pm.SendBlock(envelope)
case <-ctx.Done():
return
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment