Commit 7cb2f524 authored by Brian Tiger Chow's avatar Brian Tiger Chow

refac(exchange) bitswap -> exchange/bitswap

Move go-ipfs/bitswap package to go-ipfs/exchange/bitswap

* Delineates the difference between the generic exchange interface and
  implementations (eg. BitSwap protocol)

  Thus, the bitswap protocol can be refined without having to overthink
  how future exchanges will work. Aspects common to BitSwap and other
  exchanges can be extracted out to the exchange package in piecemeal.

  Future exchange implementations can be placed in sibling packages next
  to exchange/bitswap. (eg. exchange/multilateral)
parents
package bitswap
import (
"errors"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// TODO(brian): ensure messages are being received
// PartnerWantListMax is the bound for the number of keys we'll store per
// partner. These are usually taken from the top of the Partner's WantList
// advertisements. WantLists are sorted in terms of priority.
const PartnerWantListMax = 10
// bitswap instances implement the bitswap protocol.
type bitswap struct {
// peer is the identity of this (local) node.
peer *peer.Peer
// sender delivers messages on behalf of the session
sender bsnet.NetworkAdapter
// blockstore is the local database
// NB: ensure threadsafety
blockstore blockstore.Blockstore
// routing interface for communication
routing exchange.Directory
notifications notifications.PubSub
// strategy listens to network traffic and makes decisions about how to
// interact with partners.
// TODO(brian): save the strategy's state to the datastore
strategy strategy.Strategy
}
// NewSession initializes a bitswap session.
func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d ds.Datastore, directory exchange.Directory) exchange.Exchange {
// FIXME(brian): instantiate a concrete Strategist
receiver := bsnet.Forwarder{}
bs := &bitswap{
blockstore: blockstore.NewBlockstore(d),
notifications: notifications.New(),
strategy: strategy.New(),
peer: p,
routing: directory,
sender: bsnet.NewNetworkAdapter(s, &receiver),
}
receiver.Delegate(bs)
return bs
}
// GetBlock attempts to retrieve a particular block from peers, within timeout.
func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
*blocks.Block, error) {
begin := time.Now()
tleft := timeout - time.Now().Sub(begin)
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
blockChannel := make(chan blocks.Block)
after := time.After(tleft)
// TODO: when the data is received, shut down this for loop ASAP
go func() {
for p := range provs_ch {
go func(pr *peer.Peer) {
blk, err := bs.getBlock(k, pr, tleft)
if err != nil {
return
}
select {
case blockChannel <- *blk:
default:
}
}(p)
}
}()
select {
case block := <-blockChannel:
close(blockChannel)
return &block, nil
case <-after:
return nil, u.ErrTimeout
}
}
func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) {
ctx, _ := context.WithTimeout(context.Background(), timeout)
blockChannel := bs.notifications.Subscribe(ctx, k)
message := bsmsg.New()
message.AppendWanted(k)
// FIXME(brian): register the accountant on the service wrapper to ensure
// that accounting is _always_ performed when SendMessage and
// ReceiveMessage are called
bs.sender.SendMessage(ctx, p, message)
bs.strategy.MessageSent(p, message)
block, ok := <-blockChannel
if !ok {
return nil, u.ErrTimeout
}
return &block, nil
}
func (bs *bitswap) sendToPeersThatWant(block blocks.Block) {
for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
go bs.send(p, block)
}
}
}
}
// HasBlock announces the existance of a block to bitswap, potentially sending
// it to peers (Partners) whose WantLists include it.
func (bs *bitswap) HasBlock(blk blocks.Block) error {
go bs.sendToPeersThatWant(blk)
return bs.routing.Provide(blk.Key())
}
// TODO(brian): get a return value
func (bs *bitswap) send(p *peer.Peer, b blocks.Block) {
message := bsmsg.New()
message.AppendBlock(b)
// FIXME(brian): pass ctx
bs.sender.SendMessage(context.Background(), p, message)
bs.strategy.MessageSent(p, message)
}
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
bs.strategy.MessageReceived(sender, incoming)
if incoming.Blocks() != nil {
for _, block := range incoming.Blocks() {
go bs.blockstore.Put(block) // FIXME(brian): err ignored
go bs.notifications.Publish(block)
}
}
if incoming.Wantlist() != nil {
for _, key := range incoming.Wantlist() {
if bs.strategy.ShouldSendBlockToPeer(key, sender) {
block, errBlockNotFound := bs.blockstore.Get(key)
if errBlockNotFound != nil {
// TODO(brian): log/return the error
continue
}
go bs.send(sender, *block)
}
}
}
return nil, nil, errors.New("TODO implement")
}
func numBytes(b blocks.Block) int {
return len(b.Data)
}
# TODO(brian): add proto tasks
all: message.pb.go
message.pb.go: message.proto
protoc --gogo_out=. --proto_path=../../../../../:/usr/local/opt/protobuf/include:. $<
clean:
rm message.pb.go
package message
import (
"errors"
netmsg "github.com/jbenet/go-ipfs/net/message"
blocks "github.com/jbenet/go-ipfs/blocks"
nm "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
type BitSwapMessage interface {
Wantlist() []u.Key
Blocks() []blocks.Block
AppendWanted(k u.Key)
AppendBlock(b blocks.Block)
Exportable
}
type Exportable interface {
ToProto() *PBMessage
ToNet(p *peer.Peer) (nm.NetMessage, error)
}
// message wraps a proto message for convenience
type message struct {
pb PBMessage
}
func newMessageFromProto(pb PBMessage) *message {
return &message{pb: pb}
}
func New() *message {
return new(message)
}
// TODO(brian): convert these into keys
func (m *message) Wantlist() []u.Key {
wl := make([]u.Key, len(m.pb.Wantlist))
for _, str := range m.pb.Wantlist {
wl = append(wl, u.Key(str))
}
return wl
}
// TODO(brian): convert these into blocks
func (m *message) Blocks() []blocks.Block {
bs := make([]blocks.Block, len(m.pb.Blocks))
for _, data := range m.pb.Blocks {
b, err := blocks.NewBlock(data)
if err != nil {
continue
}
bs = append(bs, *b)
}
return bs
}
func (m *message) AppendWanted(k u.Key) {
m.pb.Wantlist = append(m.pb.Wantlist, string(k))
}
func (m *message) AppendBlock(b blocks.Block) {
m.pb.Blocks = append(m.pb.Blocks, b.Data)
}
func FromNet(nmsg netmsg.NetMessage) (BitSwapMessage, error) {
return nil, errors.New("TODO implement")
}
func (m *message) ToProto() *PBMessage {
cp := m.pb
return &cp
}
func (m *message) ToNet(p *peer.Peer) (nm.NetMessage, error) {
return nm.FromObject(p, m.ToProto())
}
// Code generated by protoc-gen-go.
// source: message.proto
// DO NOT EDIT!
/*
Package bitswap is a generated protocol buffer package.
It is generated from these files:
message.proto
It has these top-level messages:
PBMessage
*/
package message
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type PBMessage struct {
Wantlist []string `protobuf:"bytes,1,rep,name=wantlist" json:"wantlist,omitempty"`
Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *PBMessage) Reset() { *m = PBMessage{} }
func (m *PBMessage) String() string { return proto.CompactTextString(m) }
func (*PBMessage) ProtoMessage() {}
func (m *PBMessage) GetWantlist() []string {
if m != nil {
return m.Wantlist
}
return nil
}
func (m *PBMessage) GetBlocks() [][]byte {
if m != nil {
return m.Blocks
}
return nil
}
func init() {
}
package message;
message PBMessage {
repeated string wantlist = 1;
repeated bytes blocks = 2;
}
package message
import (
"bytes"
"testing"
u "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func TestAppendWanted(t *testing.T) {
const str = "foo"
m := New()
m.AppendWanted(u.Key(str))
if !contains(m.ToProto().GetWantlist(), str) {
t.Fail()
}
}
func TestNewMessageFromProto(t *testing.T) {
const str = "a_key"
protoMessage := new(PBMessage)
protoMessage.Wantlist = []string{string(str)}
if !contains(protoMessage.Wantlist, str) {
t.Fail()
}
m := newMessageFromProto(*protoMessage)
if !contains(m.ToProto().GetWantlist(), str) {
t.Fail()
}
}
func TestAppendBlock(t *testing.T) {
strs := make([]string, 2)
strs = append(strs, "Celeritas")
strs = append(strs, "Incendia")
m := New()
for _, str := range strs {
block := testutil.NewBlockOrFail(t, str)
m.AppendBlock(block)
}
// assert strings are in proto message
for _, blockbytes := range m.ToProto().GetBlocks() {
s := bytes.NewBuffer(blockbytes).String()
if !contains(strs, s) {
t.Fail()
}
}
}
func TestCopyProtoByValue(t *testing.T) {
const str = "foo"
m := New()
protoBeforeAppend := m.ToProto()
m.AppendWanted(u.Key(str))
if contains(protoBeforeAppend.GetWantlist(), str) {
t.Fail()
}
}
func contains(s []string, x string) bool {
for _, a := range s {
if a == x {
return true
}
}
return false
}
package network
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer"
)
// Forwarder receives messages and forwards them to the delegate.
//
// Forwarder breaks the circular dependency between the BitSwap Session and the
// Network Service.
type Forwarder struct {
delegate Receiver
}
func (r *Forwarder) ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
if r.delegate == nil {
return nil, nil, nil
}
return r.delegate.ReceiveMessage(ctx, sender, incoming)
}
func (r *Forwarder) Delegate(delegate Receiver) {
r.delegate = delegate
}
package network
import (
"testing"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer"
)
func TestDoesntPanicIfDelegateNotPresent(t *testing.T) {
fwdr := Forwarder{}
fwdr.ReceiveMessage(context.Background(), &peer.Peer{}, bsmsg.New())
}
func TestForwardsMessageToDelegate(t *testing.T) {
fwdr := Forwarder{delegate: &EchoDelegate{}}
fwdr.ReceiveMessage(context.Background(), &peer.Peer{}, bsmsg.New())
}
type EchoDelegate struct{}
func (d *EchoDelegate) ReceiveMessage(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage, error) {
return p, incoming, nil
}
package network
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
netservice "github.com/jbenet/go-ipfs/net/service"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
)
// NetworkAdapter mediates the exchange's communication with the network.
type NetworkAdapter interface {
// SendMessage sends a BitSwap message to a peer.
SendMessage(
context.Context,
*peer.Peer,
bsmsg.BitSwapMessage) error
// SendRequest sends a BitSwap message to a peer and waits for a response.
SendRequest(
context.Context,
*peer.Peer,
bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error)
// SetDelegate registers the Reciver to handle messages received from the
// network.
SetDelegate(Receiver)
}
type Receiver interface {
ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
destination *peer.Peer, outgoing bsmsg.BitSwapMessage, err error)
}
// TODO(brian): move this to go-ipfs/net package
type NetworkService interface {
SendRequest(ctx context.Context, m netmsg.NetMessage) (netmsg.NetMessage, error)
SendMessage(ctx context.Context, m netmsg.NetMessage) error
SetHandler(netservice.Handler)
}
package network
import (
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
)
// NewSender wraps a network Service to perform translation between
// BitSwapMessage and NetMessage formats. This allows the BitSwap session to
// ignore these details.
func NewNetworkAdapter(s NetworkService, r Receiver) NetworkAdapter {
adapter := networkAdapter{
networkService: s,
receiver: r,
}
s.SetHandler(&adapter)
return &adapter
}
// networkAdapter implements NetworkAdapter
type networkAdapter struct {
networkService NetworkService
receiver Receiver
}
// HandleMessage marshals and unmarshals net messages, forwarding them to the
// BitSwapMessage receiver
func (adapter *networkAdapter) HandleMessage(
ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) {
if adapter.receiver == nil {
return nil, errors.New("No receiver. NetMessage dropped")
}
received, err := bsmsg.FromNet(incoming)
if err != nil {
return nil, err
}
p, bsmsg, err := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)
if err != nil {
return nil, err
}
// TODO(brian): put this in a helper function
if bsmsg == nil || p == nil {
return nil, nil
}
outgoing, err := bsmsg.ToNet(p)
if err != nil {
return nil, err
}
return outgoing, nil
}
func (adapter *networkAdapter) SendMessage(
ctx context.Context,
p *peer.Peer,
outgoing bsmsg.BitSwapMessage) error {
nmsg, err := outgoing.ToNet(p)
if err != nil {
return err
}
return adapter.networkService.SendMessage(ctx, nmsg)
}
func (adapter *networkAdapter) SendRequest(
ctx context.Context,
p *peer.Peer,
outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {
outgoingMsg, err := outgoing.ToNet(p)
if err != nil {
return nil, err
}
incomingMsg, err := adapter.networkService.SendRequest(ctx, outgoingMsg)
if err != nil {
return nil, err
}
return bsmsg.FromNet(incomingMsg)
}
func (adapter *networkAdapter) SetDelegate(r Receiver) {
adapter.receiver = r
}
package notifications
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub"
blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
)
type PubSub interface {
Publish(block blocks.Block)
Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block
Shutdown()
}
func New() PubSub {
const bufferSize = 16
return &impl{*pubsub.New(bufferSize)}
}
type impl struct {
wrapped pubsub.PubSub
}
func (ps *impl) Publish(block blocks.Block) {
topic := string(block.Key())
ps.wrapped.Pub(block, topic)
}
// Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil
// if the |ctx| times out or is cancelled. Then channel is closed after the
// block given by |k| is sent.
func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block {
topic := string(k)
subChan := ps.wrapped.SubOnce(topic)
blockChannel := make(chan blocks.Block)
go func() {
defer close(blockChannel)
select {
case val := <-subChan:
block, ok := val.(blocks.Block)
if ok {
blockChannel <- block
}
case <-ctx.Done():
ps.wrapped.Unsub(subChan, topic)
}
}()
return blockChannel
}
func (ps *impl) Shutdown() {
ps.wrapped.Shutdown()
}
package notifications
import (
"bytes"
"testing"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
testutil "github.com/jbenet/go-ipfs/util/testutil"
blocks "github.com/jbenet/go-ipfs/blocks"
)
func TestPublishSubscribe(t *testing.T) {
blockSent := testutil.NewBlockOrFail(t, "Greetings from The Interval")
n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), blockSent.Key())
n.Publish(blockSent)
blockRecvd, ok := <-ch
if !ok {
t.Fail()
}
assertBlocksEqual(t, blockRecvd, blockSent)
}
func TestCarryOnWhenDeadlineExpires(t *testing.T) {
impossibleDeadline := time.Nanosecond
fastExpiringCtx, _ := context.WithTimeout(context.Background(), impossibleDeadline)
n := New()
defer n.Shutdown()
block := testutil.NewBlockOrFail(t, "A Missed Connection")
blockChannel := n.Subscribe(fastExpiringCtx, block.Key())
assertBlockChannelNil(t, blockChannel)
}
func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) {
_, ok := <-blockChannel
if ok {
t.Fail()
}
}
func assertBlocksEqual(t *testing.T, a, b blocks.Block) {
if !bytes.Equal(a.Data, b.Data) {
t.Fail()
}
if a.Key() != b.Key() {
t.Fail()
}
}
package bitswap
import (
"errors"
"time"
blocks "github.com/jbenet/go-ipfs/blocks"
exchange "github.com/jbenet/go-ipfs/exchange"
u "github.com/jbenet/go-ipfs/util"
)
func NewOfflineExchange() exchange.Exchange {
return &offlineExchange{}
}
// offlineExchange implements the Exchange interface but doesn't return blocks.
// For use in offline mode.
type offlineExchange struct {
}
// Block returns nil to signal that a block could not be retrieved for the
// given key.
// NB: This function may return before the timeout expires.
func (_ *offlineExchange) Block(k u.Key, timeout time.Duration) (*blocks.Block, error) {
return nil, errors.New("Block unavailable. Operating in offline mode")
}
// HasBlock always returns nil.
func (_ *offlineExchange) HasBlock(blocks.Block) error {
return nil
}
package bitswap
import (
"testing"
"time"
u "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func TestBlockReturnsErr(t *testing.T) {
off := NewOfflineExchange()
_, err := off.Block(u.Key("foo"), time.Second)
if err != nil {
return // as desired
}
t.Fail()
}
func TestHasBlockReturnsNil(t *testing.T) {
off := NewOfflineExchange()
block := testutil.NewBlockOrFail(t, "data")
err := off.HasBlock(block)
if err != nil {
t.Fatal("")
}
}
package strategy
import (
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
type Strategy interface {
// Returns a slice of Peers that
Peers() []*peer.Peer
// WantList returns the WantList for the given Peer
BlockIsWantedByPeer(u.Key, *peer.Peer) bool
// ShouldSendTo(Peer) decides whether to send data to this Peer
ShouldSendBlockToPeer(u.Key, *peer.Peer) bool
// Seed initializes the decider to a deterministic state
Seed(int64)
// MessageReceived records receipt of message for accounting purposes
MessageReceived(*peer.Peer, bsmsg.BitSwapMessage) error
// MessageSent records sending of message for accounting purposes
MessageSent(*peer.Peer, bsmsg.BitSwapMessage) error
}
type WantList interface {
// Peer returns the owner of the WantList
Peer() *peer.Peer
// Intersection returns the keys common to both WantLists
Intersection(WantList) WantList
KeySet
}
// TODO(brian): potentially move this somewhere more generic. For now, it's
// useful in BitSwap operations.
type KeySet interface {
Contains(u.Key) bool
Keys() []u.Key
}
package strategy
import (
"sync"
"time"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// keySet is just a convenient alias for maps of keys, where we only care
// access/lookups.
type keySet map[u.Key]struct{}
func newLedger(p *peer.Peer, strategy strategyFunc) *ledger {
return &ledger{
wantList: keySet{},
Strategy: strategy,
Partner: p,
}
}
// ledger stores the data exchange relationship between two peers.
type ledger struct {
lock sync.RWMutex
// Partner is the remote Peer.
Partner *peer.Peer
// Accounting tracks bytes sent and recieved.
Accounting debtRatio
// firstExchnage is the time of the first data exchange.
firstExchange time.Time
// lastExchange is the time of the last data exchange.
lastExchange time.Time
// exchangeCount is the number of exchanges with this peer
exchangeCount uint64
// wantList is a (bounded, small) set of keys that Partner desires.
wantList keySet
Strategy strategyFunc
}
func (l *ledger) ShouldSend() bool {
l.lock.Lock()
defer l.lock.Unlock()
return l.Strategy(l)
}
func (l *ledger) SentBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesSent += uint64(n)
}
func (l *ledger) ReceivedBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesRecv += uint64(n)
}
// TODO: this needs to be different. We need timeouts.
func (l *ledger) Wants(k u.Key) {
l.lock.Lock()
defer l.lock.Unlock()
l.wantList[k] = struct{}{}
}
func (l *ledger) WantListContains(k u.Key) bool {
l.lock.RLock()
defer l.lock.RUnlock()
_, ok := l.wantList[k]
return ok
}
func (l *ledger) ExchangeCount() uint64 {
l.lock.RLock()
defer l.lock.RUnlock()
return l.exchangeCount
}
package strategy
import (
"sync"
"testing"
)
func TestRaceConditions(t *testing.T) {
const numberOfExpectedExchanges = 10000
l := new(ledger)
var wg sync.WaitGroup
for i := 0; i < numberOfExpectedExchanges; i++ {
wg.Add(1)
go func() {
defer wg.Done()
l.ReceivedBytes(1)
}()
}
wg.Wait()
if l.ExchangeCount() != numberOfExpectedExchanges {
t.Fail()
}
}
package strategy
import (
"math"
"math/rand"
)
type strategyFunc func(*ledger) bool
func standardStrategy(l *ledger) bool {
return rand.Float64() <= probabilitySend(l.Accounting.Value())
}
func yesManStrategy(l *ledger) bool {
return true
}
func probabilitySend(ratio float64) float64 {
x := 1 + math.Exp(6-3*ratio)
y := 1 / x
return 1 - y
}
type debtRatio struct {
BytesSent uint64
BytesRecv uint64
}
func (dr *debtRatio) Value() float64 {
return float64(dr.BytesSent) / float64(dr.BytesRecv+1)
}
package strategy
import (
"testing"
)
func TestProbabilitySendDecreasesAsRatioIncreases(t *testing.T) {
grateful := debtRatio{BytesSent: 0, BytesRecv: 10000}
pWhenGrateful := probabilitySend(grateful.Value())
abused := debtRatio{BytesSent: 10000, BytesRecv: 0}
pWhenAbused := probabilitySend(abused.Value())
if pWhenGrateful < pWhenAbused {
t.Fail()
}
}
package strategy
import (
"errors"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
"github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// TODO declare thread-safe datastore
func New() Strategy {
return &strategist{
ledgerMap: ledgerMap{},
strategyFunc: yesManStrategy,
}
}
type strategist struct {
ledgerMap
strategyFunc
}
// LedgerMap lists Ledgers by their Partner key.
type ledgerMap map[peerKey]*ledger
// FIXME share this externally
type peerKey u.Key
// Peers returns a list of peers
func (s *strategist) Peers() []*peer.Peer {
response := make([]*peer.Peer, 0)
for _, ledger := range s.ledgerMap {
response = append(response, ledger.Partner)
}
return response
}
func (s *strategist) BlockIsWantedByPeer(k u.Key, p *peer.Peer) bool {
ledger := s.ledger(p)
return ledger.WantListContains(k)
}
func (s *strategist) ShouldSendBlockToPeer(k u.Key, p *peer.Peer) bool {
ledger := s.ledger(p)
return ledger.ShouldSend()
}
func (s *strategist) Seed(int64) {
// TODO
}
func (s *strategist) MessageReceived(p *peer.Peer, m bsmsg.BitSwapMessage) error {
l := s.ledger(p)
for _, key := range m.Wantlist() {
l.Wants(key)
}
for _, block := range m.Blocks() {
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
l.ReceivedBytes(len(block.Data))
}
return errors.New("TODO")
}
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically
func (s *strategist) MessageSent(p *peer.Peer, m bsmsg.BitSwapMessage) error {
l := s.ledger(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data))
}
return nil
}
// ledger lazily instantiates a ledger
func (s *strategist) ledger(p *peer.Peer) *ledger {
l, ok := s.ledgerMap[peerKey(p.Key())]
if !ok {
l = newLedger(p, s.strategyFunc)
s.ledgerMap[peerKey(p.Key())] = l
}
return l
}
package strategy
import (
"testing"
message "github.com/jbenet/go-ipfs/bitswap/message"
"github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/util/testutil"
)
type peerAndStrategist struct {
*peer.Peer
Strategist
}
func newPeerAndStrategist(idStr string) peerAndStrategist {
return peerAndStrategist{
Peer: &peer.Peer{ID: peer.ID(idStr)},
Strategist: New(),
}
}
func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) {
beggar := newPeerAndStrategist("can't be chooser")
chooser := newPeerAndStrategist("chooses JIF")
block := testutil.NewBlockOrFail(t, "data wanted by beggar")
messageFromBeggarToChooser := message.New()
messageFromBeggarToChooser.AppendWanted(block.Key())
chooser.MessageReceived(beggar.Peer, messageFromBeggarToChooser)
// for this test, doesn't matter if you record that beggar sent
if !chooser.IsWantedByPeer(block.Key(), beggar.Peer) {
t.Fatal("chooser failed to record that beggar wants block")
}
}
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
sanfrancisco := newPeerAndStrategist("sf")
seattle := newPeerAndStrategist("sea")
m := message.New()
sanfrancisco.MessageSent(seattle.Peer, m)
seattle.MessageReceived(sanfrancisco.Peer, m)
if seattle.Peer.Key() == sanfrancisco.Peer.Key() {
t.Fatal("Sanity Check: Peers have same Key!")
}
if !peerIsPartner(seattle.Peer, sanfrancisco.Strategist) {
t.Fatal("Peer wasn't added as a Partner")
}
if !peerIsPartner(sanfrancisco.Peer, seattle.Strategist) {
t.Fatal("Peer wasn't added as a Partner")
}
}
func peerIsPartner(p *peer.Peer, s Strategist) bool {
for _, partner := range s.Peers() {
if partner.Key() == p.Key() {
return true
}
}
return false
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment