Commit 677d790f authored by Brian Tiger Chow's avatar Brian Tiger Chow

Merge pull request #38 from jbenet/feat/bitswap-import-cleanup

chore(bitswap) tidy up bitswap, ledger
parents dbc3a05e ad303335
package bitswap
import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
"time"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
blocks "github.com/jbenet/go-ipfs/blocks"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
"time"
)
// PartnerWantListMax is the bound for the number of keys we'll store per
......@@ -44,7 +44,7 @@ type BitSwap struct {
// The Ledger has the peer.ID, and the peer connection works through net.
// Ledgers of known relationships (active or inactive) stored in datastore.
// Changes to the Ledger should be committed to the datastore.
partners map[u.Key]*Ledger
partners LedgerMap
// haveList is the set of keys we have values for. a map for fast lookups.
// haveList KeySet -- not needed. all values in datastore?
......@@ -136,7 +136,7 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byt
func (bs *BitSwap) HaveBlock(blk *blocks.Block) error {
go func() {
for _, ledger := range bs.partners {
if _, ok := ledger.WantList[blk.Key()]; ok {
if ledger.WantListContains(blk.Key()) {
//send block to node
if ledger.ShouldSend() {
bs.SendBlock(ledger.Partner, blk)
......@@ -189,14 +189,13 @@ func (bs *BitSwap) handleMessages() {
// and then if we do, check the ledger for whether or not we should send it.
func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) {
u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), u.Key(want).Pretty())
ledg := bs.GetLedger(p)
ledger := bs.getLedger(p)
dsk := ds.NewKey(want)
blk_i, err := bs.datastore.Get(dsk)
if err != nil {
if err == ds.ErrNotFound {
// TODO: this needs to be different. We need timeouts.
ledg.WantList[u.Key(want)] = struct{}{}
ledger.Wants(u.Key(want))
}
u.PErr("datastore get error: %v\n", err)
return
......@@ -208,7 +207,7 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) {
return
}
if ledg.ShouldSend() {
if ledger.ShouldSend() {
u.DOut("Sending block to peer.\n")
bblk, err := blocks.NewBlock(blk)
if err != nil {
......@@ -216,7 +215,7 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) {
return
}
bs.SendBlock(p, bblk)
ledg.SentBytes(len(blk))
ledger.SentBytes(len(blk))
} else {
u.DOut("Decided not to send block.")
}
......@@ -236,11 +235,11 @@ func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) {
}
bs.listener.Respond(string(blk.Key()), mes)
ledger := bs.GetLedger(p)
ledger := bs.getLedger(p)
ledger.ReceivedBytes(len(blk.Data))
}
func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger {
func (bs *BitSwap) getLedger(p *peer.Peer) *Ledger {
l, ok := bs.partners[p.Key()]
if ok {
return l
......@@ -273,7 +272,7 @@ func (bs *BitSwap) Halt() {
func (bs *BitSwap) SetStrategy(sf StrategyFunc) {
bs.strategy = sf
for _, ledg := range bs.partners {
ledg.Strategy = sf
for _, ledger := range bs.partners {
ledger.Strategy = sf
}
}
package bitswap
import (
"sync"
"time"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
"time"
)
// Ledger stores the data exchange relationship between two peers.
type Ledger struct {
lock sync.RWMutex
// Partner is the remote Peer.
Partner *peer.Peer
......@@ -16,17 +18,17 @@ type Ledger struct {
// Accounting tracks bytes sent and recieved.
Accounting debtRatio
// FirstExchnage is the time of the first data exchange.
FirstExchange time.Time
// firstExchnage is the time of the first data exchange.
firstExchange time.Time
// LastExchange is the time of the last data exchange.
LastExchange time.Time
// lastExchange is the time of the last data exchange.
lastExchange time.Time
// Number of exchanges with this peer
ExchangeCount uint64
// exchangeCount is the number of exchanges with this peer
exchangeCount uint64
// WantList is a (bounded, small) set of keys that Partner desires.
WantList KeySet
// wantList is a (bounded, small) set of keys that Partner desires.
wantList KeySet
Strategy StrategyFunc
}
......@@ -35,17 +37,48 @@ type Ledger struct {
type LedgerMap map[u.Key]*Ledger
func (l *Ledger) ShouldSend() bool {
l.lock.Lock()
defer l.lock.Unlock()
return l.Strategy(l)
}
func (l *Ledger) SentBytes(n int) {
l.ExchangeCount++
l.LastExchange = time.Now()
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesSent += uint64(n)
}
func (l *Ledger) ReceivedBytes(n int) {
l.ExchangeCount++
l.LastExchange = time.Now()
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesRecv += uint64(n)
}
// TODO: this needs to be different. We need timeouts.
func (l *Ledger) Wants(k u.Key) {
l.lock.Lock()
defer l.lock.Unlock()
l.wantList[k] = struct{}{}
}
func (l *Ledger) WantListContains(k u.Key) bool {
l.lock.RLock()
defer l.lock.RUnlock()
_, ok := l.wantList[k]
return ok
}
func (l *Ledger) ExchangeCount() uint64 {
l.lock.RLock()
defer l.lock.RUnlock()
return l.exchangeCount
}
package bitswap
import (
"sync"
"testing"
)
func TestRaceConditions(t *testing.T) {
const numberOfExpectedExchanges = 10000
l := new(Ledger)
var wg sync.WaitGroup
for i := 0; i < numberOfExpectedExchanges; i++ {
wg.Add(1)
go func() {
defer wg.Done()
l.ReceivedBytes(1)
}()
}
wg.Wait()
if l.ExchangeCount() != numberOfExpectedExchanges {
t.Fail()
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment