Commit b16d5d15 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #4315 from ipfs/feat/sessions-tagging

tag peers associated with a bitswap session
parents cc9028d1 f60995ea
...@@ -4,8 +4,10 @@ import ( ...@@ -4,8 +4,10 @@ import (
"context" "context"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
ifconnmgr "gx/ipfs/QmYkCrTwivapqdB3JbwvwvxymseahVkcm46ThRMAA24zCr/go-libp2p-interface-connmgr"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
) )
...@@ -34,6 +36,8 @@ type BitSwapNetwork interface { ...@@ -34,6 +36,8 @@ type BitSwapNetwork interface {
NewMessageSender(context.Context, peer.ID) (MessageSender, error) NewMessageSender(context.Context, peer.ID) (MessageSender, error)
ConnectionManager() ifconnmgr.ConnManager
Routing Routing
} }
......
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr" ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
ifconnmgr "gx/ipfs/QmYkCrTwivapqdB3JbwvwvxymseahVkcm46ThRMAA24zCr/go-libp2p-interface-connmgr"
ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io" ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
host "gx/ipfs/Qmc1XhrFEiSeBNn3mpfg6gEuYCt5im2gYmNVmncsvmpeAk/go-libp2p-host" host "gx/ipfs/Qmc1XhrFEiSeBNn3mpfg6gEuYCt5im2gYmNVmncsvmpeAk/go-libp2p-host"
) )
...@@ -204,6 +205,10 @@ func (bsnet *impl) handleNewStream(s inet.Stream) { ...@@ -204,6 +205,10 @@ func (bsnet *impl) handleNewStream(s inet.Stream) {
} }
} }
func (bsnet *impl) ConnectionManager() ifconnmgr.ConnManager {
return bsnet.host.ConnManager()
}
type netNotifiee impl type netNotifiee impl
func (nn *netNotifiee) impl() *impl { func (nn *netNotifiee) impl() *impl {
......
...@@ -2,6 +2,7 @@ package bitswap ...@@ -2,6 +2,7 @@ package bitswap
import ( import (
"context" "context"
"fmt"
"time" "time"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
...@@ -44,7 +45,8 @@ type Session struct { ...@@ -44,7 +45,8 @@ type Session struct {
uuid logging.Loggable uuid logging.Loggable
id uint64 id uint64
tag string
} }
// NewSession creates a new bitswap session whose lifetime is bounded by the // NewSession creates a new bitswap session whose lifetime is bounded by the
...@@ -66,6 +68,8 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session { ...@@ -66,6 +68,8 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session {
id: bs.getNextSessionID(), id: bs.getNextSessionID(),
} }
s.tag = fmt.Sprint("bs-ses-", s.id)
cache, _ := lru.New(2048) cache, _ := lru.New(2048)
s.interest = cache s.interest = cache
...@@ -139,6 +143,9 @@ func (s *Session) addActivePeer(p peer.ID) { ...@@ -139,6 +143,9 @@ func (s *Session) addActivePeer(p peer.ID) {
if _, ok := s.activePeers[p]; !ok { if _, ok := s.activePeers[p]; !ok {
s.activePeers[p] = struct{}{} s.activePeers[p] = struct{}{}
s.activePeersArr = append(s.activePeersArr, p) s.activePeersArr = append(s.activePeersArr, p)
cmgr := s.bs.network.ConnectionManager()
cmgr.TagPeer(p, s.tag, 10)
} }
} }
...@@ -218,6 +225,11 @@ func (s *Session) run(ctx context.Context) { ...@@ -218,6 +225,11 @@ func (s *Session) run(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
s.tick.Stop() s.tick.Stop()
s.bs.removeSession(s) s.bs.removeSession(s)
cmgr := s.bs.network.ConnectionManager()
for _, p := range s.activePeersArr {
cmgr.UntagPeer(p, s.tag)
}
return return
} }
} }
......
...@@ -8,12 +8,13 @@ import ( ...@@ -8,12 +8,13 @@ import (
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
mockrouting "github.com/ipfs/go-ipfs/routing/mock" mockrouting "github.com/ipfs/go-ipfs/routing/mock"
delay "github.com/ipfs/go-ipfs/thirdparty/delay" delay "github.com/ipfs/go-ipfs/thirdparty/delay"
testutil "gx/ipfs/QmWRCn8vruNAzHx8i6SAXinuheRitKEGu8c7m26stKvsYx/go-testutil"
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing" routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
testutil "gx/ipfs/QmWRCn8vruNAzHx8i6SAXinuheRitKEGu8c7m26stKvsYx/go-testutil"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer" peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
ifconnmgr "gx/ipfs/QmYkCrTwivapqdB3JbwvwvxymseahVkcm46ThRMAA24zCr/go-libp2p-interface-connmgr"
) )
var log = logging.Logger("bstestnet") var log = logging.Logger("bstestnet")
...@@ -118,6 +119,10 @@ func (nc *networkClient) FindProvidersAsync(ctx context.Context, k *cid.Cid, max ...@@ -118,6 +119,10 @@ func (nc *networkClient) FindProvidersAsync(ctx context.Context, k *cid.Cid, max
return out return out
} }
func (nc *networkClient) ConnectionManager() ifconnmgr.ConnManager {
return &ifconnmgr.NullConnMgr{}
}
type messagePasser struct { type messagePasser struct {
net *network net *network
target peer.ID target peer.ID
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment