Commit 42f61ec0 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

peer change: peer.Peer -> peer.ID

this is a major refactor of the entire codebase
it changes the monolithic peer.Peer into using
a peer.ID and a peer.Peerstore.

Other changes:
- removed handshake3.
-	testutil vastly simplified peer
-	secio bugfix + debugging logs
-	testutil: RandKeyPair
-	backpressure bugfix: w.o.w.
-	peer: added hex enc/dec
-	peer: added a PeerInfo struct
  PeerInfo is a small struct used to pass around a peer with
 	a set of addresses and keys. This is not meant to be a
 	complete view of the system, but rather to model updates to
 	the peerstore. It is used by things like the routing system.
-	updated peer/queue + peerset
-	latency metrics
-	testutil: use crand for PeerID gen
 	RandPeerID generates random "valid" peer IDs. it does not
 	NEED to generate keys because it is as if we lost the key
 	right away. fine to read some randomness and hash it. to
 	generate proper keys and an ID, use:
 	  sk, pk, _ := testutil.RandKeyPair()
 	  id, _ := peer.IDFromPublicKey(pk)
 	Also added RandPeerIDFatal helper
- removed old spipe
- updated seccat
- core: cleanup initIdentity
- removed old getFromPeerList
parent fad1c7da
......@@ -8,6 +8,7 @@ import (
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange"
......@@ -43,7 +44,7 @@ var (
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, routing bsnet.Routing,
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, routing bsnet.Routing,
bstore blockstore.Blockstore, nice bool) exchange.Interface {
ctx, cancelFunc := context.WithCancel(parent)
......@@ -165,7 +166,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return bs.routing.Provide(ctx, blk.Key())
}
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.PeerInfo) error {
if peers == nil {
panic("Cant send wantlist to nil peerchan")
}
......@@ -175,9 +176,9 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
}
wg := sync.WaitGroup{}
for peerToQuery := range peers {
log.Event(ctx, "PeerToQuery", peerToQuery)
log.Event(ctx, "PeerToQuery", peerToQuery.ID)
wg.Add(1)
go func(p peer.Peer) {
go func(p peer.ID) {
defer wg.Done()
log.Event(ctx, "DialPeer", p)
......@@ -196,7 +197,7 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
// communication fails. May require slightly different API to
// get better guarantees. May need shared sequence numbers.
bs.engine.MessageSent(p, message)
}(peerToQuery)
}(peerToQuery.ID)
}
wg.Wait()
return nil
......@@ -224,8 +225,8 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantli
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers {
if ps.TryAdd(prov) { //Do once per peer
bs.send(ctx, prov, message)
if ps.TryAdd(prov.ID) { //Do once per peer
bs.send(ctx, prov.ID, message)
}
}
}(e.Key)
......@@ -287,19 +288,19 @@ func (bs *bitswap) clientWorker(parent context.Context) {
}
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) {
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage) {
log.Debugf("ReceiveMessage from %s", p)
if p == nil {
if p == "" {
log.Error("Received message from nil peer!")
// TODO propagate the error upward
return nil, nil
return "", nil
}
if incoming == nil {
log.Error("Got nil bitswap message!")
// TODO propagate the error upward
return nil, nil
return "", nil
}
// This call records changes to wantlists, blocks received,
......@@ -321,7 +322,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
bs.cancelBlocks(ctx, keys)
// TODO: consider changing this function to not return anything
return nil, nil
return "", nil
}
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
......@@ -349,7 +350,7 @@ func (bs *bitswap) ReceiveError(err error) {
// send strives to ensure that accounting is always performed when a message is
// sent
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) error {
func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
if err := bs.sender.SendMessage(ctx, p, m); err != nil {
return err
}
......
......@@ -7,13 +7,14 @@ import (
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
blocks "github.com/jbenet/go-ipfs/blocks"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
peer "github.com/jbenet/go-ipfs/peer"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
// FIXME the tests are really sensitive to the network delay. fix them to work
......@@ -62,7 +63,8 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
defer g.Close()
block := blocks.NewBlock([]byte("block"))
rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network
pinfo := peer.PeerInfo{ID: peer.ID("testing")}
rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
solo := g.Next()
defer solo.Exchange.Close()
......@@ -153,7 +155,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
first.Blockstore().Put(b)
blkeys = append(blkeys, b.Key())
first.Exchange.HasBlock(context.Background(), b)
rs.Client(first.Peer).Provide(context.Background(), b.Key())
rs.Client(peer.PeerInfo{ID: first.Peer}).Provide(context.Background(), b.Key())
}
t.Log("Distribute!")
......
......@@ -50,7 +50,7 @@ const (
// Envelope contains a message for a Peer
type Envelope struct {
// Peer is the intended recipient
Peer peer.Peer
Peer peer.ID
// Message is the payload
Message bsmsg.BitSwapMessage
}
......@@ -75,12 +75,12 @@ type Engine struct {
lock sync.RWMutex // protects the fields immediatly below
// ledgerMap lists Ledgers by their Partner key.
ledgerMap map[u.Key]*ledger
ledgerMap map[peer.ID]*ledger
}
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
e := &Engine{
ledgerMap: make(map[u.Key]*ledger),
ledgerMap: make(map[peer.ID]*ledger),
bs: bs,
peerRequestQueue: newTaskQueue(),
outbox: make(chan Envelope, sizeOutboxChan),
......@@ -126,11 +126,11 @@ func (e *Engine) Outbox() <-chan Envelope {
}
// Returns a slice of Peers with whom the local node has active sessions
func (e *Engine) Peers() []peer.Peer {
func (e *Engine) Peers() []peer.ID {
e.lock.RLock()
defer e.lock.RUnlock()
response := make([]peer.Peer, 0)
response := make([]peer.ID, 0)
for _, ledger := range e.ledgerMap {
response = append(response, ledger.Partner)
}
......@@ -139,7 +139,7 @@ func (e *Engine) Peers() []peer.Peer {
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
newWorkExists := false
defer func() {
if newWorkExists {
......@@ -189,7 +189,7 @@ func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically
func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
e.lock.Lock()
defer e.lock.Unlock()
......@@ -203,22 +203,22 @@ func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
return nil
}
func (e *Engine) numBytesSentTo(p peer.Peer) uint64 {
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
// NB not threadsafe
return e.findOrCreate(p).Accounting.BytesSent
}
func (e *Engine) numBytesReceivedFrom(p peer.Peer) uint64 {
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
// NB not threadsafe
return e.findOrCreate(p).Accounting.BytesRecv
}
// ledger lazily instantiates a ledger
func (e *Engine) findOrCreate(p peer.Peer) *ledger {
l, ok := e.ledgerMap[p.Key()]
func (e *Engine) findOrCreate(p peer.ID) *ledger {
l, ok := e.ledgerMap[p]
if !ok {
l = newLedger(p)
e.ledgerMap[p.Key()] = l
e.ledgerMap[p] = l
}
return l
}
......@@ -7,21 +7,21 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
message "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
type peerAndEngine struct {
peer.Peer
Peer peer.ID
Engine *Engine
}
func newPeerAndLedgermanager(idStr string) peerAndEngine {
return peerAndEngine{
Peer: testutil.NewPeerWithIDString(idStr),
Peer: peer.ID(idStr),
//Strategy: New(true),
Engine: NewEngine(context.TODO(),
blockstore.NewBlockstore(sync.MutexWrap(ds.NewMapDatastore()))),
......@@ -70,7 +70,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
sanfrancisco.Engine.MessageSent(seattle.Peer, m)
seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
if seattle.Peer.Key() == sanfrancisco.Peer.Key() {
if seattle.Peer == sanfrancisco.Peer {
t.Fatal("Sanity Check: Peers have same Key!")
}
......@@ -83,9 +83,9 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
}
}
func peerIsPartner(p peer.Peer, e *Engine) bool {
func peerIsPartner(p peer.ID, e *Engine) bool {
for _, partner := range e.Peers() {
if partner.Key() == p.Key() {
if partner == p {
return true
}
}
......
......@@ -12,7 +12,7 @@ import (
// access/lookups.
type keySet map[u.Key]struct{}
func newLedger(p peer.Peer) *ledger {
func newLedger(p peer.ID) *ledger {
return &ledger{
wantList: wl.New(),
Partner: p,
......@@ -24,7 +24,7 @@ func newLedger(p peer.Peer) *ledger {
// NOT threadsafe
type ledger struct {
// Partner is the remote Peer.
Partner peer.Peer
Partner peer.ID
// Accounting tracks bytes sent and recieved.
Accounting debtRatio
......
......@@ -26,12 +26,12 @@ func newTaskQueue() *taskQueue {
type task struct {
Entry wantlist.Entry
Target peer.Peer
Target peer.ID
Trash bool
}
// Push currently adds a new task to the end of the list
func (tl *taskQueue) Push(entry wantlist.Entry, to peer.Peer) {
func (tl *taskQueue) Push(entry wantlist.Entry, to peer.ID) {
tl.lock.Lock()
defer tl.lock.Unlock()
if task, ok := tl.taskmap[taskKey(to, entry.Key)]; ok {
......@@ -69,7 +69,7 @@ func (tl *taskQueue) Pop() *task {
}
// Remove lazily removes a task from the queue
func (tl *taskQueue) Remove(k u.Key, p peer.Peer) {
func (tl *taskQueue) Remove(k u.Key, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskmap[taskKey(p, k)]
if ok {
......@@ -79,6 +79,6 @@ func (tl *taskQueue) Remove(k u.Key, p peer.Peer) {
}
// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.Peer, k u.Key) string {
return string(p.Key() + k)
func taskKey(p peer.ID, k u.Key) string {
return string(p) + string(k)
}
......@@ -12,18 +12,18 @@ import (
type BitSwapNetwork interface {
// DialPeer ensures there is a connection to peer.
DialPeer(context.Context, peer.Peer) error
DialPeer(context.Context, peer.ID) error
// SendMessage sends a BitSwap message to a peer.
SendMessage(
context.Context,
peer.Peer,
peer.ID,
bsmsg.BitSwapMessage) error
// SendRequest sends a BitSwap message to a peer and waits for a response.
SendRequest(
context.Context,
peer.Peer,
peer.ID,
bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error)
// SetDelegate registers the Reciver to handle messages received from the
......@@ -34,15 +34,15 @@ type BitSwapNetwork interface {
// Implement Receiver to receive messages from the BitSwapNetwork
type Receiver interface {
ReceiveMessage(
ctx context.Context, sender peer.Peer, incoming bsmsg.BitSwapMessage) (
destination peer.Peer, outgoing bsmsg.BitSwapMessage)
ctx context.Context, sender peer.ID, incoming bsmsg.BitSwapMessage) (
destination peer.ID, outgoing bsmsg.BitSwapMessage)
ReceiveError(error)
}
type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key
FindProvidersAsync(context.Context, u.Key, int) <-chan peer.Peer
FindProvidersAsync(context.Context, u.Key, int) <-chan peer.PeerInfo
// Provide provides the key to the network
Provide(context.Context, u.Key) error
......
......@@ -53,13 +53,13 @@ func (bsnet *impl) handleNewStream(s inet.Stream) {
}
func (bsnet *impl) DialPeer(ctx context.Context, p peer.Peer) error {
func (bsnet *impl) DialPeer(ctx context.Context, p peer.ID) error {
return bsnet.network.DialPeer(ctx, p)
}
func (bsnet *impl) SendMessage(
ctx context.Context,
p peer.Peer,
p peer.ID,
outgoing bsmsg.BitSwapMessage) error {
s, err := bsnet.network.NewStream(inet.ProtocolBitswap, p)
......@@ -73,7 +73,7 @@ func (bsnet *impl) SendMessage(
func (bsnet *impl) SendRequest(
ctx context.Context,
p peer.Peer,
p peer.ID,
outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {
s, err := bsnet.network.NewStream(inet.ProtocolBitswap, p)
......
package bitswap
import (
"bytes"
"errors"
"fmt"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
peer "github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay"
)
type Network interface {
Adapter(peer.Peer) bsnet.BitSwapNetwork
Adapter(peer.ID) bsnet.BitSwapNetwork
HasPeer(peer.Peer) bool
HasPeer(peer.ID) bool
SendMessage(
ctx context.Context,
from peer.Peer,
to peer.Peer,
from peer.ID,
to peer.ID,
message bsmsg.BitSwapMessage) error
SendRequest(
ctx context.Context,
from peer.Peer,
to peer.Peer,
from peer.ID,
to peer.ID,
message bsmsg.BitSwapMessage) (
incoming bsmsg.BitSwapMessage, err error)
}
......@@ -36,27 +35,27 @@ type Network interface {
func VirtualNetwork(d delay.D) Network {
return &network{
clients: make(map[util.Key]bsnet.Receiver),
clients: make(map[peer.ID]bsnet.Receiver),
delay: d,
}
}
type network struct {
clients map[util.Key]bsnet.Receiver
clients map[peer.ID]bsnet.Receiver
delay delay.D
}
func (n *network) Adapter(p peer.Peer) bsnet.BitSwapNetwork {
func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork {
client := &networkClient{
local: p,
network: n,
}
n.clients[p.Key()] = client
n.clients[p] = client
return client
}
func (n *network) HasPeer(p peer.Peer) bool {
_, found := n.clients[p.Key()]
func (n *network) HasPeer(p peer.ID) bool {
_, found := n.clients[p]
return found
}
......@@ -64,11 +63,11 @@ func (n *network) HasPeer(p peer.Peer) bool {
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
ctx context.Context,
from peer.Peer,
to peer.Peer,
from peer.ID,
to peer.ID,
message bsmsg.BitSwapMessage) error {
receiver, ok := n.clients[to.Key()]
receiver, ok := n.clients[to]
if !ok {
return errors.New("Cannot locate peer on network")
}
......@@ -82,8 +81,8 @@ func (n *network) SendMessage(
}
func (n *network) deliver(
r bsnet.Receiver, from peer.Peer, message bsmsg.BitSwapMessage) error {
if message == nil || from == nil {
r bsnet.Receiver, from peer.ID, message bsmsg.BitSwapMessage) error {
if message == nil || from == "" {
return errors.New("Invalid input")
}
......@@ -91,15 +90,15 @@ func (n *network) deliver(
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
return errors.New("Malformed client request")
}
if nextPeer == nil && nextMsg == nil { // no response to send
if nextPeer == "" && nextMsg == nil { // no response to send
return nil
}
nextReceiver, ok := n.clients[nextPeer.Key()]
nextReceiver, ok := n.clients[nextPeer]
if !ok {
return errors.New("Cannot locate peer on network")
}
......@@ -110,32 +109,32 @@ func (n *network) deliver(
// TODO
func (n *network) SendRequest(
ctx context.Context,
from peer.Peer,
to peer.Peer,
from peer.ID,
to peer.ID,
message bsmsg.BitSwapMessage) (
incoming bsmsg.BitSwapMessage, err error) {
r, ok := n.clients[to.Key()]
r, ok := n.clients[to]
if !ok {
return nil, errors.New("Cannot locate peer on network")
}
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
// TODO dedupe code
if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
r.ReceiveError(errors.New("Malformed client request"))
return nil, nil
}
// TODO dedupe code
if nextPeer == nil && nextMsg == nil {
if nextPeer == "" && nextMsg == nil {
return nil, nil
}
// TODO test when receiver doesn't immediately respond to the initiator of the request
if !bytes.Equal(nextPeer.ID(), from.ID()) {
if nextPeer != from {
go func() {
nextReceiver, ok := n.clients[nextPeer.Key()]
nextReceiver, ok := n.clients[nextPeer]
if !ok {
// TODO log the error?
}
......@@ -147,26 +146,26 @@ func (n *network) SendRequest(
}
type networkClient struct {
local peer.Peer
local peer.ID
bsnet.Receiver
network Network
}
func (nc *networkClient) SendMessage(
ctx context.Context,
to peer.Peer,
to peer.ID,
message bsmsg.BitSwapMessage) error {
return nc.network.SendMessage(ctx, nc.local, to, message)
}
func (nc *networkClient) SendRequest(
ctx context.Context,
to peer.Peer,
to peer.ID,
message bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) {
return nc.network.SendRequest(ctx, nc.local, to, message)
}
func (nc *networkClient) DialPeer(ctx context.Context, p peer.Peer) error {
func (nc *networkClient) DialPeer(ctx context.Context, p peer.ID) error {
// no need to do anything because dialing isn't a thing in this test net.
if !nc.network.HasPeer(p) {
return fmt.Errorf("Peer not in network: %s", p)
......
......@@ -5,30 +5,30 @@ import (
"testing"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
blocks "github.com/jbenet/go-ipfs/blocks"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
peer "github.com/jbenet/go-ipfs/peer"
delay "github.com/jbenet/go-ipfs/util/delay"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func TestSendRequestToCooperativePeer(t *testing.T) {
net := VirtualNetwork(delay.Fixed(0))
idOfRecipient := []byte("recipient")
idOfRecipient := peer.ID("recipient")
t.Log("Get two network adapters")
initiator := net.Adapter(testutil.NewPeerWithIDString("initiator"))
recipient := net.Adapter(testutil.NewPeerWithID(idOfRecipient))
initiator := net.Adapter(peer.ID("initiator"))
recipient := net.Adapter(idOfRecipient)
expectedStr := "response from recipient"
recipient.SetDelegate(lambda(func(
ctx context.Context,
from peer.Peer,
from peer.ID,
incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) {
peer.ID, bsmsg.BitSwapMessage) {
t.Log("Recipient received a message from the network")
......@@ -45,13 +45,17 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
message := bsmsg.New()
message.AddBlock(blocks.NewBlock([]byte("data")))
response, err := initiator.SendRequest(
context.Background(), testutil.NewPeerWithID(idOfRecipient), message)
context.Background(), idOfRecipient, message)
if err != nil {
t.Fatal(err)
}
t.Log("Check the contents of the response from recipient")
if response == nil {
t.Fatal("Should have received a response")
}
for _, blockFromRecipient := range response.Blocks() {
if string(blockFromRecipient.Data) == expectedStr {
return
......@@ -62,9 +66,9 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
net := VirtualNetwork(delay.Fixed(0))
idOfResponder := []byte("responder")
waiter := net.Adapter(testutil.NewPeerWithIDString("waiter"))
responder := net.Adapter(testutil.NewPeerWithID(idOfResponder))
idOfResponder := peer.ID("responder")
waiter := net.Adapter(peer.ID("waiter"))
responder := net.Adapter(idOfResponder)
var wg sync.WaitGroup
......@@ -74,9 +78,9 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
responder.SetDelegate(lambda(func(
ctx context.Context,
fromWaiter peer.Peer,
fromWaiter peer.ID,
msgFromWaiter bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) {
peer.ID, bsmsg.BitSwapMessage) {
msgToWaiter := bsmsg.New()
msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
......@@ -86,9 +90,9 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
waiter.SetDelegate(lambda(func(
ctx context.Context,
fromResponder peer.Peer,
fromResponder peer.ID,
msgFromResponder bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) {
peer.ID, bsmsg.BitSwapMessage) {
// TODO assert that this came from the correct peer and that the message contents are as expected
ok := false
......@@ -103,13 +107,13 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
t.Fatal("Message not received from the responder")
}
return nil, nil
return "", nil
}))
messageSentAsync := bsmsg.New()
messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
errSending := waiter.SendMessage(
context.Background(), testutil.NewPeerWithID(idOfResponder), messageSentAsync)
context.Background(), idOfResponder, messageSentAsync)
if errSending != nil {
t.Fatal(errSending)
}
......@@ -117,8 +121,8 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
wg.Wait() // until waiter delegate function is executed
}
type receiverFunc func(ctx context.Context, p peer.Peer,
incoming bsmsg.BitSwapMessage) (peer.Peer, bsmsg.BitSwapMessage)
type receiverFunc func(ctx context.Context, p peer.ID,
incoming bsmsg.BitSwapMessage) (peer.ID, bsmsg.BitSwapMessage)
// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
......@@ -128,13 +132,13 @@ func lambda(f receiverFunc) bsnet.Receiver {
}
type lambdaImpl struct {
f func(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage)
f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage)
}
func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) {
p peer.ID, incoming bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage) {
return lam.f(ctx, p, incoming)
}
......
......@@ -44,7 +44,7 @@ func (g *SessionGenerator) Close() error {
func (g *SessionGenerator) Next() Instance {
g.seq++
return session(g.ctx, g.net, g.rs, g.ps, []byte(string(g.seq)))
return session(g.ctx, g.net, g.rs, g.ps, peer.ID(g.seq))
}
func (g *SessionGenerator) Instances(n int) []Instance {
......@@ -57,7 +57,7 @@ func (g *SessionGenerator) Instances(n int) []Instance {
}
type Instance struct {
Peer peer.Peer
Peer peer.ID
Exchange exchange.Interface
blockstore blockstore.Blockstore
......@@ -77,11 +77,10 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
func session(ctx context.Context, net tn.Network, rs mockrouting.Server, ps peer.Peerstore, id peer.ID) Instance {
p := ps.WithID(id)
func session(ctx context.Context, net tn.Network, rs mockrouting.Server, ps peer.Peerstore, p peer.ID) Instance {
adapter := net.Adapter(p)
htc := rs.Client(p)
htc := rs.Client(peer.PeerInfo{ID: p})
bsdelay := delay.Fixed(0)
const kWriteCacheElems = 100
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment