diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 757c9067eb9062bbaa9b19c2e18e14cc8a751a97..b8dcdab1e9b6e427e449748d8f288bdfd12a6bf8 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -91,7 +91,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan u.Key), + pm: NewPeerManager(network), } + go bs.pm.Run(ctx) network.SetDelegate(bs) // Start up bitswaps async worker routines @@ -108,6 +110,10 @@ type Bitswap struct { // network delivers messages on behalf of the session network bsnet.BitSwapNetwork + // the peermanager manages sending messages to peers in a way that + // wont block bitswap operation + pm *PeerManager + // blockstore is the local database // NB: ensure threadsafety blockstore blockstore.Blockstore @@ -217,7 +223,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { - log.Event(ctx, "hasBlock", blk) select { case <-bs.process.Closing(): return errors.New("bitswap is closed") @@ -227,6 +232,7 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { if err := bs.blockstore.Put(blk); err != nil { return err } + bs.wantlist.Remove(blk.Key()) bs.notifications.Publish(blk) select { @@ -239,7 +245,6 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { func (bs *Bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { set := pset.New() - wg := sync.WaitGroup{} loop: for { @@ -253,37 +258,22 @@ loop: continue } - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - if err := bs.send(ctx, p, m); err != nil { - log.Debug(err) // TODO remove if too verbose - } - }(peerToQuery) + bs.pm.Send(peerToQuery, m) case <-ctx.Done(): return nil } } - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - select { - case <-done: - case <-ctx.Done(): - // NB: we may be abandoning goroutines here before they complete - // this shouldnt be an issue because they will complete soon anyways - // we just don't want their being slow to impact bitswap transfer speeds - } return nil } func (bs *Bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error { + entries := bs.wantlist.Entries() + if len(entries) == 0 { + return nil + } message := bsmsg.New() message.SetFull(true) - for _, wanted := range bs.wantlist.Entries() { + for _, wanted := range entries { message.AddEntry(wanted.Key, wanted.Priority) } return bs.sendWantlistMsgToPeers(ctx, message, peers) @@ -326,7 +316,7 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli // TODO(brian): handle errors func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error { - defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() + //defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() // This call records changes to wantlists, blocks received, // and number of bytes transfered. @@ -356,6 +346,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerConnected(p peer.ID) { // TODO: add to clientWorker?? + bs.pm.Connected(p) peers := make(chan peer.ID, 1) peers <- p close(peers) @@ -367,6 +358,7 @@ func (bs *Bitswap) PeerConnected(p peer.ID) { // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerDisconnected(p peer.ID) { + bs.pm.Disconnected(p) bs.engine.PeerDisconnected(p) } @@ -381,19 +373,7 @@ func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { message.Cancel(k) } - wg := sync.WaitGroup{} - for _, p := range bs.engine.Peers() { - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - err := bs.send(ctx, p, message) - if err != nil { - log.Warningf("Error sending message: %s", err) - return - } - }(p) - } - wg.Wait() + bs.pm.Broadcast(message) return } @@ -408,29 +388,7 @@ func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { message.AddEntry(k, kMaxPriority-i) } - wg := sync.WaitGroup{} - for _, p := range bs.engine.Peers() { - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - err := bs.send(ctx, p, message) - if err != nil { - log.Debugf("Error sending message: %s", err) - } - }(p) - } - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - select { - case <-done: - case <-ctx.Done(): - // NB: we may be abandoning goroutines here before they complete - // this shouldnt be an issue because they will complete soon anyways - // we just don't want their being slow to impact bitswap transfer speeds - } + bs.pm.Broadcast(message) } func (bs *Bitswap) ReceiveError(err error) { @@ -439,16 +397,6 @@ func (bs *Bitswap) ReceiveError(err error) { // TODO bubble the network error up to the parent context/error logger } -// send strives to ensure that accounting is always performed when a message is -// sent -func (bs *Bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error { - defer log.EventBegin(ctx, "sendMessage", p, m).Done() - if err := bs.network.SendMessage(ctx, p, m); err != nil { - return err - } - return bs.engine.MessageSent(p, m) -} - func (bs *Bitswap) Close() error { return bs.process.Close() } diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 354eb73e5afeed20c81450fb73f1416f8a6af36c..c04946692ec8ee909fdd9ffd81853be4d8f4859c 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -13,7 +13,6 @@ import ( blocks "github.com/ipfs/go-ipfs/blocks" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" - p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util" mockrouting "github.com/ipfs/go-ipfs/routing/mock" delay "github.com/ipfs/go-ipfs/thirdparty/delay" u "github.com/ipfs/go-ipfs/util" @@ -36,30 +35,6 @@ func TestClose(t *testing.T) { bitswap.Exchange.GetBlock(context.Background(), block.Key()) } -func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this - - rs := mockrouting.NewServer() - net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) - g := NewTestSessionGenerator(net) - defer g.Close() - - block := blocks.NewBlock([]byte("block")) - pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t) - rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network - - solo := g.Next() - defer solo.Exchange.Close() - - ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) - _, err := solo.Exchange.GetBlock(ctx, block.Key()) - - if err != context.DeadlineExceeded { - t.Fatal("Expected DeadlineExceeded error") - } -} - -// TestGetBlockAfterRequesting... - func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) @@ -67,14 +42,15 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { g := NewTestSessionGenerator(net) defer g.Close() - hasBlock := g.Next() + peers := g.Instances(2) + hasBlock := peers[0] defer hasBlock.Exchange.Close() if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil { t.Fatal(err) } - wantsBlock := g.Next() + wantsBlock := peers[1] defer wantsBlock.Exchange.Close() ctx, _ := context.WithTimeout(context.Background(), time.Second) @@ -196,8 +172,9 @@ func TestSendToWantingPeer(t *testing.T) { prev := rebroadcastDelay.Set(time.Second / 2) defer func() { rebroadcastDelay.Set(prev) }() - peerA := sg.Next() - peerB := sg.Next() + peers := sg.Instances(2) + peerA := peers[0] + peerB := peers[1] t.Logf("Session %v\n", peerA.Peer) t.Logf("Session %v\n", peerB.Peer) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 60b95e469b7ca9664d7b26c24975f3c88695c9fb..0b08a55fb44579a933818c9310e10966f95922cc 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -5,6 +5,7 @@ import ( "sync" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + blocks "github.com/ipfs/go-ipfs/blocks" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" @@ -53,8 +54,9 @@ const ( type Envelope struct { // Peer is the intended recipient Peer peer.ID - // Message is the payload - Message bsmsg.BitSwapMessage + + // Block is the payload + Block *blocks.Block // A callback to notify the decision queue that the task is complete Sent func() @@ -151,12 +153,10 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { continue } - m := bsmsg.New() // TODO: maybe add keys from our wantlist? - m.AddBlock(block) return &Envelope{ - Peer: nextTask.Target, - Message: m, - Sent: nextTask.Done, + Peer: nextTask.Target, + Block: block, + Sent: nextTask.Done, }, nil } } @@ -185,7 +185,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { defer e.lock.Unlock() if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 { - log.Debug("received empty message from", p) + log.Debugf("received empty message from %s", p) } newWorkExists := false @@ -202,11 +202,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, entry := range m.Wantlist() { if entry.Cancel { - log.Debug("cancel", entry.Key) + log.Debugf("cancel %s", entry.Key) l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { - log.Debug("wants", entry.Key, entry.Priority) + log.Debugf("wants %s", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) @@ -216,7 +216,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { } for _, block := range m.Blocks() { - log.Debug("got block %s %d bytes", block.Key(), len(block.Data)) + log.Debugf("got block %s %d bytes", block.Key(), len(block.Data)) l.ReceivedBytes(len(block.Data)) for _, l := range e.ledgerMap { if entry, ok := l.WantListContains(block.Key()); ok { diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go index afe6ba9adddf7e1f720b3dee9e330593720c50dc..31e46c7763571a91f3b03f91b61d322503e46b37 100644 --- a/exchange/bitswap/decision/engine_test.go +++ b/exchange/bitswap/decision/engine_test.go @@ -185,7 +185,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error { for _, k := range keys { next := <-e.Outbox() envelope := <-next - received := envelope.Message.Blocks()[0] + received := envelope.Block expected := blocks.NewBlock([]byte(k)) if received.Key() != expected.Key() { return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data))) diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go index 42928487dcc2e52e5f7aab37b40b4cb0a477e04e..15f52da745bbbce4d579b3c633ca285e35f36920 100644 --- a/exchange/bitswap/decision/peer_request_queue.go +++ b/exchange/bitswap/decision/peer_request_queue.go @@ -156,7 +156,7 @@ func (t *peerRequestTask) SetIndex(i int) { // taskKey returns a key that uniquely identifies a task. func taskKey(p peer.ID, k u.Key) string { - return string(p.String() + k.String()) + return string(p) + string(k) } // FIFO is a basic task comparator that returns tasks in the order created. diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 3a7d70aae0eb39ca9b31a82a575408b04e96d6a8..4e88e738c31564fc0065158b0619b46b5738c682 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -29,6 +29,8 @@ type BitSwapMessage interface { Cancel(key u.Key) + Empty() bool + // Sets whether or not the contained wantlist represents the entire wantlist // true = full wantlist // false = wantlist 'patch' @@ -51,7 +53,7 @@ type Exportable interface { type impl struct { full bool wantlist map[u.Key]Entry - blocks map[u.Key]*blocks.Block // map to detect duplicates + blocks map[u.Key]*blocks.Block } func New() BitSwapMessage { @@ -92,6 +94,10 @@ func (m *impl) Full() bool { return m.full } +func (m *impl) Empty() bool { + return len(m.blocks) == 0 && len(m.wantlist) == 0 +} + func (m *impl) Wantlist() []Entry { var out []Entry for _, e := range m.wantlist { @@ -101,7 +107,7 @@ func (m *impl) Wantlist() []Entry { } func (m *impl) Blocks() []*blocks.Block { - bs := make([]*blocks.Block, 0) + bs := make([]*blocks.Block, 0, len(m.blocks)) for _, block := range m.blocks { bs = append(bs, block) } @@ -109,6 +115,7 @@ func (m *impl) Blocks() []*blocks.Block { } func (m *impl) Cancel(k u.Key) { + delete(m.wantlist, k) m.addEntry(k, 0, true) } diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index a6ed070c03c8c9137e733407db44f881b770830a..849a1c28e1e4ccd35c09c1a04b8436187dea5060 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -23,6 +23,8 @@ type BitSwapNetwork interface { // network. SetDelegate(Receiver) + ConnectTo(context.Context, peer.ID) error + Routing } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 97745e32da2e612efacc4d16f8fcf822dd126b2a..4e5a1317f58bcc5e1f033bdbd8bf928c39673d52 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -97,6 +97,10 @@ func (bsnet *impl) SetDelegate(r Receiver) { bsnet.receiver = r } +func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { + return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}) +} + // FindProvidersAsync returns a channel of providers for the given key func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID { diff --git a/exchange/bitswap/peermanager.go b/exchange/bitswap/peermanager.go new file mode 100644 index 0000000000000000000000000000000000000000..ff3d9ab31f09323ae1db1b21ffeec3cd526f3480 --- /dev/null +++ b/exchange/bitswap/peermanager.go @@ -0,0 +1,203 @@ +package bitswap + +import ( + "sync" + + context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" + bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" + bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" + peer "github.com/ipfs/go-ipfs/p2p/peer" + u "github.com/ipfs/go-ipfs/util" +) + +type PeerManager struct { + receiver bsnet.Receiver + + incoming chan *msgPair + connect chan peer.ID + disconnect chan peer.ID + + peers map[peer.ID]*msgQueue + + network bsnet.BitSwapNetwork +} + +func NewPeerManager(network bsnet.BitSwapNetwork) *PeerManager { + return &PeerManager{ + incoming: make(chan *msgPair, 10), + connect: make(chan peer.ID, 10), + disconnect: make(chan peer.ID, 10), + peers: make(map[peer.ID]*msgQueue), + network: network, + } +} + +type msgPair struct { + to peer.ID + msg bsmsg.BitSwapMessage +} + +type cancellation struct { + who peer.ID + blk u.Key +} + +type msgQueue struct { + p peer.ID + + lk sync.Mutex + wlmsg bsmsg.BitSwapMessage + + work chan struct{} + done chan struct{} +} + +func (pm *PeerManager) SendBlock(env *engine.Envelope) { + // Blocks need to be sent synchronously to maintain proper backpressure + // throughout the network stack + defer env.Sent() + + msg := bsmsg.New() + msg.AddBlock(env.Block) + err := pm.network.SendMessage(context.TODO(), env.Peer, msg) + if err != nil { + log.Error(err) + } +} + +func (pm *PeerManager) startPeerHandler(p peer.ID) { + _, ok := pm.peers[p] + if ok { + // TODO: log an error? + return + } + + mq := new(msgQueue) + mq.done = make(chan struct{}) + mq.work = make(chan struct{}, 1) + mq.p = p + + pm.peers[p] = mq + go pm.runQueue(mq) +} + +func (pm *PeerManager) stopPeerHandler(p peer.ID) { + pq, ok := pm.peers[p] + if !ok { + // TODO: log error? + return + } + + close(pq.done) + delete(pm.peers, p) +} + +func (pm *PeerManager) runQueue(mq *msgQueue) { + for { + select { + case <-mq.work: // there is work to be done + + // TODO: this might not need to be done every time, figure out + // a good heuristic + err := pm.network.ConnectTo(context.TODO(), mq.p) + if err != nil { + log.Error(err) + // TODO: cant connect, what now? + } + + // grab messages from queue + mq.lk.Lock() + wlm := mq.wlmsg + mq.wlmsg = nil + mq.lk.Unlock() + + if wlm != nil && !wlm.Empty() { + // send wantlist updates + err = pm.network.SendMessage(context.TODO(), mq.p, wlm) + if err != nil { + log.Error("bitswap send error: ", err) + // TODO: what do we do if this fails? + } + } + case <-mq.done: + return + } + } +} + +func (pm *PeerManager) Send(to peer.ID, msg bsmsg.BitSwapMessage) { + if len(msg.Blocks()) > 0 { + panic("no blocks here!") + } + pm.incoming <- &msgPair{to: to, msg: msg} +} + +func (pm *PeerManager) Broadcast(msg bsmsg.BitSwapMessage) { + pm.incoming <- &msgPair{msg: msg} +} + +func (pm *PeerManager) Connected(p peer.ID) { + pm.connect <- p +} + +func (pm *PeerManager) Disconnected(p peer.ID) { + pm.disconnect <- p +} + +// TODO: use goprocess here once i trust it +func (pm *PeerManager) Run(ctx context.Context) { + for { + select { + case msgp := <-pm.incoming: + + // Broadcast message to all if recipient not set + if msgp.to == "" { + for _, p := range pm.peers { + p.addMessage(msgp.msg) + } + continue + } + + p, ok := pm.peers[msgp.to] + if !ok { + //TODO: decide, drop message? or dial? + pm.startPeerHandler(msgp.to) + p = pm.peers[msgp.to] + } + + p.addMessage(msgp.msg) + case p := <-pm.connect: + pm.startPeerHandler(p) + case p := <-pm.disconnect: + pm.stopPeerHandler(p) + case <-ctx.Done(): + return + } + } +} + +func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { + mq.lk.Lock() + defer func() { + mq.lk.Unlock() + select { + case mq.work <- struct{}{}: + default: + } + }() + + if mq.wlmsg == nil || msg.Full() { + mq.wlmsg = msg + return + } + + // TODO: add a msg.Combine(...) method + for _, e := range msg.Wantlist() { + if e.Cancel { + mq.wlmsg.Cancel(e.Key) + } else { + mq.wlmsg.AddEntry(e.Key, e.Priority) + } + } +} diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index feb5fd722a955df895bde208fe6096a8bafc34e7..f2c814f819e023e64c1717e4e87241e15b6bb40a 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -119,3 +119,12 @@ func (nc *networkClient) Provide(ctx context.Context, k util.Key) error { func (nc *networkClient) SetDelegate(r bsnet.Receiver) { nc.Receiver = r } + +func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { + if !nc.network.HasPeer(p) { + return errors.New("no such peer in network") + } + nc.network.clients[p].PeerConnected(nc.local) + nc.Receiver.PeerConnected(p) + return nil +} diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index 2ce035c3de8e323bb12be73704993e1be1be9175..47930de694d6a582a08e2da3a4012611a080647c 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -7,7 +7,6 @@ import ( ds_sync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" - exchange "github.com/ipfs/go-ipfs/exchange" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" peer "github.com/ipfs/go-ipfs/p2p/peer" p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util" @@ -56,12 +55,18 @@ func (g *SessionGenerator) Instances(n int) []Instance { inst := g.Next() instances = append(instances, inst) } + for i, inst := range instances { + for j := i + 1; j < len(instances); j++ { + oinst := instances[j] + inst.Exchange.PeerConnected(oinst.Peer) + } + } return instances } type Instance struct { Peer peer.ID - Exchange exchange.Interface + Exchange *Bitswap blockstore blockstore.Blockstore blockstoreDelay delay.D @@ -94,7 +99,7 @@ func session(ctx context.Context, net tn.Network, p testutil.Identity) Instance const alwaysSendToPeer = true - bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer) + bs := New(ctx, p.ID(), adapter, bstore, alwaysSendToPeer).(*Bitswap) return Instance{ Peer: p.ID(), diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index dff3d911c2088f8b61adcc53a926471d2da8f170..c6c2bbb25deadbe7d8711b147b0b4eda38918a2a 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -70,9 +70,9 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { if !ok { continue } - log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) - bs.send(ctx, envelope.Peer, envelope.Message) - envelope.Sent() + + //log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) + bs.pm.SendBlock(envelope) case <-ctx.Done(): return }