Commit be8e0867 authored by Brian Tiger Chow's avatar Brian Tiger Chow

fix(bitswap) implement, test concrete strategist

parent 043c09e1
......@@ -55,7 +55,7 @@ func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d
bs := &bitswap{
blockstore: blockstore.NewBlockstore(d),
notifications: notifications.New(),
strategist: strategy.New(d),
strategist: strategy.New(),
peer: p,
routing: directory,
sender: bsnet.NewNetworkAdapter(s, &receiver),
......
......@@ -12,6 +12,13 @@ import (
// access/lookups.
type keySet map[u.Key]struct{}
func newLedger(p *peer.Peer, strategy strategyFunc) *ledger {
return &ledger{
Strategy: strategy,
Partner: p,
}
}
// ledger stores the data exchange relationship between two peers.
type ledger struct {
lock sync.RWMutex
......@@ -37,9 +44,6 @@ type ledger struct {
Strategy strategyFunc
}
// LedgerMap lists Ledgers by their Partner key.
type ledgerMap map[u.Key]*ledger
func (l *ledger) ShouldSend() bool {
l.lock.Lock()
defer l.lock.Unlock()
......
......@@ -3,56 +3,85 @@ package strategy
import (
"errors"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
bsmsg "github.com/jbenet/go-ipfs/bitswap/message"
"github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// TODO declare thread-safe datastore
func New(d ds.Datastore) Strategist {
func New() Strategist {
return &strategist{
datastore: d,
peers: ledgerMap{},
ledgerMap: ledgerMap{},
strategyFunc: yesManStrategy,
}
}
type strategist struct {
datastore ds.Datastore // FIXME(brian): enforce thread-safe datastore
peers ledgerMap
ledgerMap
strategyFunc
}
// Peers returns a list of this instance is connected to
// LedgerMap lists Ledgers by their Partner key.
type ledgerMap map[peerKey]*ledger
// FIXME share this externally
type peerKey u.Key
// Peers returns a list of peers
func (s *strategist) Peers() []*peer.Peer {
response := make([]*peer.Peer, 0) // TODO
response := make([]*peer.Peer, 0)
for _, ledger := range s.ledgerMap {
response = append(response, ledger.Partner)
}
return response
}
func (s *strategist) IsWantedByPeer(u.Key, *peer.Peer) bool {
return true // TODO
func (s *strategist) IsWantedByPeer(k u.Key, p *peer.Peer) bool {
ledger := s.ledger(p)
return ledger.WantListContains(k)
}
func (s *strategist) ShouldSendToPeer(u.Key, *peer.Peer) bool {
return true // TODO
func (s *strategist) ShouldSendToPeer(k u.Key, p *peer.Peer) bool {
ledger := s.ledger(p)
return ledger.ShouldSend()
}
func (s *strategist) Seed(int64) {
// TODO
}
func (s *strategist) MessageReceived(*peer.Peer, bsmsg.BitSwapMessage) error {
// TODO add peer to partners if doesn't already exist.
// TODO initialize ledger for peer if doesn't already exist
// TODO get wantlist from message and update contents in local wantlist for peer
// TODO acknowledge receipt of blocks and do accounting in ledger
func (s *strategist) MessageReceived(p *peer.Peer, m bsmsg.BitSwapMessage) error {
l := s.ledger(p)
for _, key := range m.Wantlist() {
l.Wants(key)
}
for _, block := range m.Blocks() {
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
l.ReceivedBytes(len(block.Data))
}
return errors.New("TODO")
}
func (s *strategist) MessageSent(*peer.Peer, bsmsg.BitSwapMessage) error {
// TODO add peer to partners if doesn't already exist.
// TODO initialize ledger for peer if doesn't already exist
// TODO add block to my wantlist
// TODO acknowledge receipt of blocks and do accounting in ledger
return errors.New("TODO")
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically
func (s *strategist) MessageSent(p *peer.Peer, m bsmsg.BitSwapMessage) error {
l := s.ledger(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data))
}
return nil
}
// ledger lazily instantiates a ledger
func (s *strategist) ledger(p *peer.Peer) *ledger {
l, ok := s.ledgerMap[peerKey(p.Key())]
if !ok {
l = newLedger(p, s.strategyFunc)
s.ledgerMap[peerKey(p.Key())] = l
}
return l
}
package strategy
import (
"testing"
message "github.com/jbenet/go-ipfs/bitswap/message"
"github.com/jbenet/go-ipfs/peer"
)
type peerAndStrategist struct {
*peer.Peer
Strategist
}
func newPeerAndStrategist(idStr string) peerAndStrategist {
return peerAndStrategist{
Peer: &peer.Peer{ID: peer.ID(idStr)},
Strategist: New(),
}
}
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
sanfrancisco := newPeerAndStrategist("sf")
seattle := newPeerAndStrategist("sea")
m := message.New()
sanfrancisco.MessageSent(seattle.Peer, m)
seattle.MessageReceived(sanfrancisco.Peer, m)
if seattle.Peer.Key() == sanfrancisco.Peer.Key() {
t.Fatal("Sanity Check: Peers have same Key!")
}
if !peerIsPartner(seattle.Peer, sanfrancisco.Strategist) {
t.Fatal("Peer wasn't added as a Partner")
}
if !peerIsPartner(sanfrancisco.Peer, seattle.Strategist) {
t.Fatal("Peer wasn't added as a Partner")
}
}
func peerIsPartner(p *peer.Peer, s Strategist) bool {
for _, partner := range s.Peers() {
if partner.Key() == p.Key() {
return true
}
}
return false
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment