Commit a6794230 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

bitswap: respond to peers connecting + disconnecting

With these notifications, bitswap can reclaim all resources
for any outstanding work for a peer.

cc @briantigerchow @whyrusleeping
parent d905de22
...@@ -339,6 +339,24 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -339,6 +339,24 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
return "", nil return "", nil
} }
// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerConnected(p peer.ID) {
// TODO: add to clientWorker??
peers := make(chan peer.ID)
err := bs.sendWantlistToPeers(context.TODO(), peers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
peers <- p
close(peers)
}
// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerDisconnected(peer.ID) {
// TODO: release resources.
}
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
if len(bkeys) < 1 { if len(bkeys) < 1 {
return return
......
...@@ -40,6 +40,10 @@ type Receiver interface { ...@@ -40,6 +40,10 @@ type Receiver interface {
destination peer.ID, outgoing bsmsg.BitSwapMessage) destination peer.ID, outgoing bsmsg.BitSwapMessage)
ReceiveError(error) ReceiveError(error)
// Connected/Disconnected warns bitswap about peer connections
PeerConnected(peer.ID)
PeerDisconnected(peer.ID)
} }
type Routing interface { type Routing interface {
......
...@@ -21,6 +21,9 @@ func NewFromIpfsHost(host host.Host, r routing.IpfsRouting) BitSwapNetwork { ...@@ -21,6 +21,9 @@ func NewFromIpfsHost(host host.Host, r routing.IpfsRouting) BitSwapNetwork {
routing: r, routing: r,
} }
host.SetStreamHandler(ProtocolBitswap, bitswapNetwork.handleNewStream) host.SetStreamHandler(ProtocolBitswap, bitswapNetwork.handleNewStream)
host.Network().Notify((*netNotifiee)(&bitswapNetwork))
// TODO: StopNotify.
return &bitswapNetwork return &bitswapNetwork
} }
...@@ -139,3 +142,20 @@ func (bsnet *impl) handleNewStream(s inet.Stream) { ...@@ -139,3 +142,20 @@ func (bsnet *impl) handleNewStream(s inet.Stream) {
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer()) log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
bsnet.receiver.ReceiveMessage(ctx, p, received) bsnet.receiver.ReceiveMessage(ctx, p, received)
} }
type netNotifiee impl
func (nn *netNotifiee) impl() *impl {
return (*impl)(nn)
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
nn.impl().receiver.PeerConnected(v.RemotePeer())
}
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
nn.impl().receiver.PeerDisconnected(v.RemotePeer())
}
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {}
...@@ -146,3 +146,10 @@ func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, ...@@ -146,3 +146,10 @@ func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
func (lam *lambdaImpl) ReceiveError(err error) { func (lam *lambdaImpl) ReceiveError(err error) {
// TODO log error // TODO log error
} }
func (lam *lambdaImpl) PeerConnected(p peer.ID) {
// TODO
}
func (lam *lambdaImpl) PeerDisconnected(peer.ID) {
// TODO
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment