Commit 4ab8ad56 authored by Brian Tiger Chow's avatar Brian Tiger Chow Committed by Juan Batiz-Benet

refactor(bitswap) bitswap.Network now abstracts ipfs.Network + ipfs.Routing

@jbenet @whyrusleeping

the next commit will change bitswap.Network.FindProviders to only deal
with IDs
parent f76fe2ad
...@@ -46,7 +46,7 @@ var ( ...@@ -46,7 +46,7 @@ var (
// BitSwapNetwork. This function registers the returned instance as the network // BitSwapNetwork. This function registers the returned instance as the network
// delegate. // delegate.
// Runs until context is cancelled. // Runs until context is cancelled.
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, routing bsnet.Routing, func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
bstore blockstore.Blockstore, nice bool) exchange.Interface { bstore blockstore.Blockstore, nice bool) exchange.Interface {
ctx, cancelFunc := context.WithCancel(parent) ctx, cancelFunc := context.WithCancel(parent)
...@@ -63,7 +63,6 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, routin ...@@ -63,7 +63,6 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, routin
cancelFunc: cancelFunc, cancelFunc: cancelFunc,
notifications: notif, notifications: notif,
engine: decision.NewEngine(ctx, bstore), engine: decision.NewEngine(ctx, bstore),
routing: routing,
network: network, network: network,
wantlist: wantlist.NewThreadSafe(), wantlist: wantlist.NewThreadSafe(),
batchRequests: make(chan []u.Key, sizeBatchRequestChan), batchRequests: make(chan []u.Key, sizeBatchRequestChan),
...@@ -85,9 +84,6 @@ type bitswap struct { ...@@ -85,9 +84,6 @@ type bitswap struct {
// NB: ensure threadsafety // NB: ensure threadsafety
blockstore blockstore.Blockstore blockstore blockstore.Blockstore
// routing interface for communication
routing bsnet.Routing
notifications notifications.PubSub notifications notifications.PubSub
// Requests for a set of related blocks // Requests for a set of related blocks
...@@ -165,7 +161,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { ...@@ -165,7 +161,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
} }
bs.wantlist.Remove(blk.Key()) bs.wantlist.Remove(blk.Key())
bs.notifications.Publish(blk) bs.notifications.Publish(blk)
return bs.routing.Provide(ctx, blk.Key()) return bs.network.Provide(ctx, blk.Key())
} }
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.PeerInfo) error { func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.PeerInfo) error {
...@@ -212,7 +208,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantli ...@@ -212,7 +208,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantli
go func(k u.Key) { go func(k u.Key) {
defer wg.Done() defer wg.Done()
child, _ := context.WithTimeout(ctx, providerRequestTimeout) child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest) providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers { for prov := range providers {
bs.network.Peerstore().AddAddresses(prov.ID, prov.Addrs) bs.network.Peerstore().AddAddresses(prov.ID, prov.Addrs)
if set.TryAdd(prov.ID) { //Do once per peer if set.TryAdd(prov.ID) { //Do once per peer
...@@ -265,7 +261,7 @@ func (bs *bitswap) clientWorker(parent context.Context) { ...@@ -265,7 +261,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
// it. Later, this assumption may not hold as true if we implement // it. Later, this assumption may not hold as true if we implement
// newer bitswap strategies. // newer bitswap strategies.
child, _ := context.WithTimeout(ctx, providerRequestTimeout) child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.routing.FindProvidersAsync(child, ks[0], maxProvidersPerRequest) providers := bs.network.FindProvidersAsync(child, ks[0], maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers) err := bs.sendWantListTo(ctx, providers)
if err != nil { if err != nil {
log.Errorf("error sending wantlist: %s", err) log.Errorf("error sending wantlist: %s", err)
......
...@@ -24,9 +24,8 @@ const kNetworkDelay = 0 * time.Millisecond ...@@ -24,9 +24,8 @@ const kNetworkDelay = 0 * time.Millisecond
func TestClose(t *testing.T) { func TestClose(t *testing.T) {
// TODO // TODO
t.Skip("TODO Bitswap's Close implementation is a WIP") t.Skip("TODO Bitswap's Close implementation is a WIP")
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
rout := mockrouting.NewServer() sesgen := NewSessionGenerator(vnet)
sesgen := NewSessionGenerator(vnet, rout)
defer sesgen.Close() defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator() bgen := blocksutil.NewBlockGenerator()
...@@ -39,9 +38,8 @@ func TestClose(t *testing.T) { ...@@ -39,9 +38,8 @@ func TestClose(t *testing.T) {
func TestGetBlockTimeout(t *testing.T) { func TestGetBlockTimeout(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer() g := NewSessionGenerator(net)
g := NewSessionGenerator(net, rs)
defer g.Close() defer g.Close()
self := g.Next() self := g.Next()
...@@ -55,11 +53,11 @@ func TestGetBlockTimeout(t *testing.T) { ...@@ -55,11 +53,11 @@ func TestGetBlockTimeout(t *testing.T) {
} }
} }
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer() rs := mockrouting.NewServer()
g := NewSessionGenerator(net, rs) net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
g := NewSessionGenerator(net)
defer g.Close() defer g.Close()
block := blocks.NewBlock([]byte("block")) block := blocks.NewBlock([]byte("block"))
...@@ -81,10 +79,9 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { ...@@ -81,10 +79,9 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer()
block := blocks.NewBlock([]byte("block")) block := blocks.NewBlock([]byte("block"))
g := NewSessionGenerator(net, rs) g := NewSessionGenerator(net)
defer g.Close() defer g.Close()
hasBlock := g.Next() hasBlock := g.Next()
...@@ -136,9 +133,8 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -136,9 +133,8 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
if testing.Short() { if testing.Short() {
t.SkipNow() t.SkipNow()
} }
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer() sg := NewSessionGenerator(net)
sg := NewSessionGenerator(net, rs)
defer sg.Close() defer sg.Close()
bg := blocksutil.NewBlockGenerator() bg := blocksutil.NewBlockGenerator()
...@@ -152,10 +148,9 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -152,10 +148,9 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
var blkeys []u.Key var blkeys []u.Key
first := instances[0] first := instances[0]
for _, b := range blocks { for _, b := range blocks {
first.Blockstore().Put(b) first.Blockstore().Put(b) // TODO remove. don't need to do this. bitswap owns block
blkeys = append(blkeys, b.Key()) blkeys = append(blkeys, b.Key())
first.Exchange.HasBlock(context.Background(), b) first.Exchange.HasBlock(context.Background(), b)
rs.Client(peer.PeerInfo{ID: first.Peer}).Provide(context.Background(), b.Key())
} }
t.Log("Distribute!") t.Log("Distribute!")
...@@ -202,9 +197,8 @@ func TestSendToWantingPeer(t *testing.T) { ...@@ -202,9 +197,8 @@ func TestSendToWantingPeer(t *testing.T) {
t.SkipNow() t.SkipNow()
} }
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer() sg := NewSessionGenerator(net)
sg := NewSessionGenerator(net, rs)
defer sg.Close() defer sg.Close()
bg := blocksutil.NewBlockGenerator() bg := blocksutil.NewBlockGenerator()
...@@ -248,9 +242,8 @@ func TestSendToWantingPeer(t *testing.T) { ...@@ -248,9 +242,8 @@ func TestSendToWantingPeer(t *testing.T) {
} }
func TestBasicBitswap(t *testing.T) { func TestBasicBitswap(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer() sg := NewSessionGenerator(net)
sg := NewSessionGenerator(net, rs)
bg := blocksutil.NewBlockGenerator() bg := blocksutil.NewBlockGenerator()
t.Log("Test a few nodes trying to get one file with a lot of blocks") t.Log("Test a few nodes trying to get one file with a lot of blocks")
......
...@@ -31,6 +31,8 @@ type BitSwapNetwork interface { ...@@ -31,6 +31,8 @@ type BitSwapNetwork interface {
// SetDelegate registers the Reciver to handle messages received from the // SetDelegate registers the Reciver to handle messages received from the
// network. // network.
SetDelegate(Receiver) SetDelegate(Receiver)
Routing
} }
// Implement Receiver to receive messages from the BitSwapNetwork // Implement Receiver to receive messages from the BitSwapNetwork
......
...@@ -13,9 +13,10 @@ var log = util.Logger("bitswap_network") ...@@ -13,9 +13,10 @@ var log = util.Logger("bitswap_network")
// NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS // NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS
// Dialer & Service // Dialer & Service
func NewFromIpfsNetwork(n inet.Network) BitSwapNetwork { func NewFromIpfsNetwork(n inet.Network, r Routing) BitSwapNetwork {
bitswapNetwork := impl{ bitswapNetwork := impl{
network: n, network: n,
routing: r,
} }
n.SetHandler(inet.ProtocolBitswap, bitswapNetwork.handleNewStream) n.SetHandler(inet.ProtocolBitswap, bitswapNetwork.handleNewStream)
return &bitswapNetwork return &bitswapNetwork
...@@ -25,6 +26,7 @@ func NewFromIpfsNetwork(n inet.Network) BitSwapNetwork { ...@@ -25,6 +26,7 @@ func NewFromIpfsNetwork(n inet.Network) BitSwapNetwork {
// NetMessage objects, into the bitswap network interface. // NetMessage objects, into the bitswap network interface.
type impl struct { type impl struct {
network inet.Network network inet.Network
routing Routing
// inbound messages from the network are forwarded to the receiver // inbound messages from the network are forwarded to the receiver
receiver Receiver receiver Receiver
...@@ -74,6 +76,16 @@ func (bsnet *impl) Peerstore() peer.Peerstore { ...@@ -74,6 +76,16 @@ func (bsnet *impl) Peerstore() peer.Peerstore {
return bsnet.Peerstore() return bsnet.Peerstore()
} }
// FindProvidersAsync returns a channel of providers for the given key
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.PeerInfo { // TODO change to return ID
return bsnet.routing.FindProvidersAsync(ctx, k, max)
}
// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k util.Key) error {
return bsnet.routing.Provide(ctx, k)
}
// handleNewStream receives a new stream from the network. // handleNewStream receives a new stream from the network.
func (bsnet *impl) handleNewStream(s inet.Stream) { func (bsnet *impl) handleNewStream(s inet.Stream) {
......
...@@ -5,10 +5,12 @@ import ( ...@@ -5,10 +5,12 @@ import (
"fmt" "fmt"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/routing/mock"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay" delay "github.com/jbenet/go-ipfs/util/delay"
) )
...@@ -33,16 +35,18 @@ type Network interface { ...@@ -33,16 +35,18 @@ type Network interface {
// network impl // network impl
func VirtualNetwork(d delay.D) Network { func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
return &network{ return &network{
clients: make(map[peer.ID]bsnet.Receiver), clients: make(map[peer.ID]bsnet.Receiver),
delay: d, delay: d,
routingserver: rs,
} }
} }
type network struct { type network struct {
clients map[peer.ID]bsnet.Receiver clients map[peer.ID]bsnet.Receiver
delay delay.D routingserver mockrouting.Server
delay delay.D
} }
func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork { func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork {
...@@ -50,6 +54,7 @@ func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork { ...@@ -50,6 +54,7 @@ func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork {
local: p, local: p,
network: n, network: n,
peerstore: peer.NewPeerstore(), peerstore: peer.NewPeerstore(),
routing: n.routingserver.Client(peer.PeerInfo{ID: p}),
} }
n.clients[p] = client n.clients[p] = client
return client return client
...@@ -151,6 +156,7 @@ type networkClient struct { ...@@ -151,6 +156,7 @@ type networkClient struct {
bsnet.Receiver bsnet.Receiver
network Network network Network
peerstore peer.Peerstore peerstore peer.Peerstore
routing bsnet.Routing
} }
func (nc *networkClient) SendMessage( func (nc *networkClient) SendMessage(
...@@ -167,6 +173,16 @@ func (nc *networkClient) SendRequest( ...@@ -167,6 +173,16 @@ func (nc *networkClient) SendRequest(
return nc.network.SendRequest(ctx, nc.local, to, message) return nc.network.SendRequest(ctx, nc.local, to, message)
} }
// FindProvidersAsync returns a channel of providers for the given key
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.PeerInfo { // TODO change to return ID
return nc.routing.FindProvidersAsync(ctx, k, max)
}
// Provide provides the key to the network
func (nc *networkClient) Provide(ctx context.Context, k util.Key) error {
return nc.routing.Provide(ctx, k)
}
func (nc *networkClient) DialPeer(ctx context.Context, p peer.ID) error { func (nc *networkClient) DialPeer(ctx context.Context, p peer.ID) error {
// no need to do anything because dialing isn't a thing in this test net. // no need to do anything because dialing isn't a thing in this test net.
if !nc.network.HasPeer(p) { if !nc.network.HasPeer(p) {
......
...@@ -11,10 +11,11 @@ import ( ...@@ -11,10 +11,11 @@ import (
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
delay "github.com/jbenet/go-ipfs/util/delay" delay "github.com/jbenet/go-ipfs/util/delay"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
) )
func TestSendRequestToCooperativePeer(t *testing.T) { func TestSendRequestToCooperativePeer(t *testing.T) {
net := VirtualNetwork(delay.Fixed(0)) net := VirtualNetwork(mockrouting.NewServer(),delay.Fixed(0))
idOfRecipient := peer.ID("recipient") idOfRecipient := peer.ID("recipient")
...@@ -65,7 +66,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { ...@@ -65,7 +66,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
} }
func TestSendMessageAsyncButWaitForResponse(t *testing.T) { func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
net := VirtualNetwork(delay.Fixed(0)) net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
idOfResponder := peer.ID("responder") idOfResponder := peer.ID("responder")
waiter := net.Adapter(peer.ID("waiter")) waiter := net.Adapter(peer.ID("waiter"))
responder := net.Adapter(idOfResponder) responder := net.Adapter(idOfResponder)
......
...@@ -10,18 +10,16 @@ import ( ...@@ -10,18 +10,16 @@ import (
exchange "github.com/jbenet/go-ipfs/exchange" exchange "github.com/jbenet/go-ipfs/exchange"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
datastore2 "github.com/jbenet/go-ipfs/util/datastore2" datastore2 "github.com/jbenet/go-ipfs/util/datastore2"
delay "github.com/jbenet/go-ipfs/util/delay" delay "github.com/jbenet/go-ipfs/util/delay"
) )
func NewSessionGenerator( func NewSessionGenerator(
net tn.Network, rs mockrouting.Server) SessionGenerator { net tn.Network) SessionGenerator {
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
return SessionGenerator{ return SessionGenerator{
ps: peer.NewPeerstore(), ps: peer.NewPeerstore(),
net: net, net: net,
rs: rs,
seq: 0, seq: 0,
ctx: ctx, // TODO take ctx as param to Next, Instances ctx: ctx, // TODO take ctx as param to Next, Instances
cancel: cancel, cancel: cancel,
...@@ -31,7 +29,6 @@ func NewSessionGenerator( ...@@ -31,7 +29,6 @@ func NewSessionGenerator(
type SessionGenerator struct { type SessionGenerator struct {
seq int seq int
net tn.Network net tn.Network
rs mockrouting.Server
ps peer.Peerstore ps peer.Peerstore
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
...@@ -44,7 +41,7 @@ func (g *SessionGenerator) Close() error { ...@@ -44,7 +41,7 @@ func (g *SessionGenerator) Close() error {
func (g *SessionGenerator) Next() Instance { func (g *SessionGenerator) Next() Instance {
g.seq++ g.seq++
return session(g.ctx, g.net, g.rs, g.ps, peer.ID(g.seq)) return session(g.ctx, g.net, g.ps, peer.ID(g.seq))
} }
func (g *SessionGenerator) Instances(n int) []Instance { func (g *SessionGenerator) Instances(n int) []Instance {
...@@ -77,10 +74,9 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { ...@@ -77,10 +74,9 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// NB: It's easy make mistakes by providing the same peer ID to two different // NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's // sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea. // just a much better idea.
func session(ctx context.Context, net tn.Network, rs mockrouting.Server, ps peer.Peerstore, p peer.ID) Instance { func session(ctx context.Context, net tn.Network, ps peer.Peerstore, p peer.ID) Instance {
adapter := net.Adapter(p) adapter := net.Adapter(p)
htc := rs.Client(peer.PeerInfo{ID: p})
bsdelay := delay.Fixed(0) bsdelay := delay.Fixed(0)
const kWriteCacheElems = 100 const kWriteCacheElems = 100
...@@ -92,7 +88,7 @@ func session(ctx context.Context, net tn.Network, rs mockrouting.Server, ps peer ...@@ -92,7 +88,7 @@ func session(ctx context.Context, net tn.Network, rs mockrouting.Server, ps peer
const alwaysSendToPeer = true const alwaysSendToPeer = true
bs := New(ctx, p, adapter, htc, bstore, alwaysSendToPeer) bs := New(ctx, p, adapter, bstore, alwaysSendToPeer)
return Instance{ return Instance{
Peer: p, Peer: p,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment