Commit 8cb50134 authored by Jeromy's avatar Jeromy

move util.Key into its own package under blocks

parent 8cd12955
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
key "github.com/ipfs/go-ipfs/blocks/key"
exchange "github.com/ipfs/go-ipfs/exchange" exchange "github.com/ipfs/go-ipfs/exchange"
decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision" decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
...@@ -21,7 +22,6 @@ import ( ...@@ -21,7 +22,6 @@ import (
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
"github.com/ipfs/go-ipfs/thirdparty/delay" "github.com/ipfs/go-ipfs/thirdparty/delay"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
u "github.com/ipfs/go-ipfs/util"
) )
var log = eventlog.Logger("bitswap") var log = eventlog.Logger("bitswap")
...@@ -85,7 +85,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, ...@@ -85,7 +85,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
findKeys: make(chan *blockRequest, sizeBatchRequestChan), findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px, process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize), newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan u.Key), provideKeys: make(chan key.Key),
wm: NewWantManager(ctx, network), wm: NewWantManager(ctx, network),
} }
go bs.wm.Run() go bs.wm.Run()
...@@ -124,7 +124,7 @@ type Bitswap struct { ...@@ -124,7 +124,7 @@ type Bitswap struct {
newBlocks chan *blocks.Block newBlocks chan *blocks.Block
provideKeys chan u.Key provideKeys chan key.Key
counterLk sync.Mutex counterLk sync.Mutex
blocksRecvd int blocksRecvd int
...@@ -132,13 +132,13 @@ type Bitswap struct { ...@@ -132,13 +132,13 @@ type Bitswap struct {
} }
type blockRequest struct { type blockRequest struct {
keys []u.Key keys []key.Key
ctx context.Context ctx context.Context
} }
// GetBlock attempts to retrieve a particular block from peers within the // GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context. // deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) { func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, error) {
// Any async work initiated by this function must end when this function // Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to // returns. To ensure this, derive a new context. Note that it is okay to
...@@ -156,7 +156,7 @@ func (bs *Bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ...@@ -156,7 +156,7 @@ func (bs *Bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
cancelFunc() cancelFunc()
}() }()
promise, err := bs.GetBlocks(ctx, []u.Key{k}) promise, err := bs.GetBlocks(ctx, []key.Key{k})
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -177,8 +177,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ...@@ -177,8 +177,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
} }
} }
func (bs *Bitswap) WantlistForPeer(p peer.ID) []u.Key { func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
var out []u.Key var out []key.Key
for _, e := range bs.engine.WantlistForPeer(p) { for _, e := range bs.engine.WantlistForPeer(p) {
out = append(out, e.Key) out = append(out, e.Key)
} }
...@@ -192,7 +192,7 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []u.Key { ...@@ -192,7 +192,7 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []u.Key {
// NB: Your request remains open until the context expires. To conserve // NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one // resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server) // that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) { func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) {
select { select {
case <-bs.process.Closing(): case <-bs.process.Closing():
return nil, errors.New("bitswap is closed") return nil, errors.New("bitswap is closed")
...@@ -246,7 +246,7 @@ func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.En ...@@ -246,7 +246,7 @@ func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.En
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, e := range entries { for _, e := range entries {
wg.Add(1) wg.Add(1)
go func(k u.Key) { go func(k key.Key) {
defer wg.Done() defer wg.Done()
child, cancel := context.WithTimeout(ctx, providerRequestTimeout) child, cancel := context.WithTimeout(ctx, providerRequestTimeout)
...@@ -277,7 +277,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -277,7 +277,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
} }
// quickly send out cancels, reduces chances of duplicate block receives // quickly send out cancels, reduces chances of duplicate block receives
var keys []u.Key var keys []key.Key
for _, block := range iblocks { for _, block := range iblocks {
if _, found := bs.wm.wl.Contains(block.Key()); !found { if _, found := bs.wm.wl.Contains(block.Key()); !found {
log.Notice("received un-asked-for block: %s", block) log.Notice("received un-asked-for block: %s", block)
...@@ -342,8 +342,8 @@ func (bs *Bitswap) Close() error { ...@@ -342,8 +342,8 @@ func (bs *Bitswap) Close() error {
return bs.process.Close() return bs.process.Close()
} }
func (bs *Bitswap) GetWantlist() []u.Key { func (bs *Bitswap) GetWantlist() []key.Key {
var out []u.Key var out []key.Key
for _, e := range bs.wm.wl.Entries() { for _, e := range bs.wm.wl.Entries() {
out = append(out, e.Key) out = append(out, e.Key)
} }
......
...@@ -12,11 +12,11 @@ import ( ...@@ -12,11 +12,11 @@ import (
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
key "github.com/ipfs/go-ipfs/blocks/key"
tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util" p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
mockrouting "github.com/ipfs/go-ipfs/routing/mock" mockrouting "github.com/ipfs/go-ipfs/routing/mock"
delay "github.com/ipfs/go-ipfs/thirdparty/delay" delay "github.com/ipfs/go-ipfs/thirdparty/delay"
u "github.com/ipfs/go-ipfs/util"
) )
// FIXME the tests are really sensitive to the network delay. fix them to work // FIXME the tests are really sensitive to the network delay. fix them to work
...@@ -155,7 +155,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -155,7 +155,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.Log("Give the blocks to the first instance") t.Log("Give the blocks to the first instance")
var blkeys []u.Key var blkeys []key.Key
first := instances[0] first := instances[0]
for _, b := range blocks { for _, b := range blocks {
blkeys = append(blkeys, b.Key()) blkeys = append(blkeys, b.Key())
...@@ -227,7 +227,7 @@ func TestSendToWantingPeer(t *testing.T) { ...@@ -227,7 +227,7 @@ func TestSendToWantingPeer(t *testing.T) {
alpha := bg.Next() alpha := bg.Next()
// peerA requests and waits for block alpha // peerA requests and waits for block alpha
ctx, _ := context.WithTimeout(context.TODO(), waitTime) ctx, _ := context.WithTimeout(context.TODO(), waitTime)
alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()}) alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -4,9 +4,9 @@ import ( ...@@ -4,9 +4,9 @@ import (
"math" "math"
"testing" "testing"
key "github.com/ipfs/go-ipfs/blocks/key"
"github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
"github.com/ipfs/go-ipfs/p2p/peer" "github.com/ipfs/go-ipfs/p2p/peer"
"github.com/ipfs/go-ipfs/util"
"github.com/ipfs/go-ipfs/util/testutil" "github.com/ipfs/go-ipfs/util/testutil"
) )
...@@ -21,6 +21,6 @@ func BenchmarkTaskQueuePush(b *testing.B) { ...@@ -21,6 +21,6 @@ func BenchmarkTaskQueuePush(b *testing.B) {
} }
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
q.Push(wantlist.Entry{Key: util.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)]) q.Push(wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
} }
} }
...@@ -3,20 +3,20 @@ package decision ...@@ -3,20 +3,20 @@ package decision
import ( import (
"time" "time"
key "github.com/ipfs/go-ipfs/blocks/key"
wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
u "github.com/ipfs/go-ipfs/util"
) )
// keySet is just a convenient alias for maps of keys, where we only care // keySet is just a convenient alias for maps of keys, where we only care
// access/lookups. // access/lookups.
type keySet map[u.Key]struct{} type keySet map[key.Key]struct{}
func newLedger(p peer.ID) *ledger { func newLedger(p peer.ID) *ledger {
return &ledger{ return &ledger{
wantList: wl.New(), wantList: wl.New(),
Partner: p, Partner: p,
sentToPeer: make(map[u.Key]time.Time), sentToPeer: make(map[key.Key]time.Time),
} }
} }
...@@ -43,7 +43,7 @@ type ledger struct { ...@@ -43,7 +43,7 @@ type ledger struct {
// sentToPeer is a set of keys to ensure we dont send duplicate blocks // sentToPeer is a set of keys to ensure we dont send duplicate blocks
// to a given peer // to a given peer
sentToPeer map[u.Key]time.Time sentToPeer map[key.Key]time.Time
} }
type debtRatio struct { type debtRatio struct {
...@@ -68,16 +68,16 @@ func (l *ledger) ReceivedBytes(n int) { ...@@ -68,16 +68,16 @@ func (l *ledger) ReceivedBytes(n int) {
} }
// TODO: this needs to be different. We need timeouts. // TODO: this needs to be different. We need timeouts.
func (l *ledger) Wants(k u.Key, priority int) { func (l *ledger) Wants(k key.Key, priority int) {
log.Debugf("peer %s wants %s", l.Partner, k) log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList.Add(k, priority) l.wantList.Add(k, priority)
} }
func (l *ledger) CancelWant(k u.Key) { func (l *ledger) CancelWant(k key.Key) {
l.wantList.Remove(k) l.wantList.Remove(k)
} }
func (l *ledger) WantListContains(k u.Key) (wl.Entry, bool) { func (l *ledger) WantListContains(k key.Key) (wl.Entry, bool) {
return l.wantList.Contains(k) return l.wantList.Contains(k)
} }
......
...@@ -4,17 +4,17 @@ import ( ...@@ -4,17 +4,17 @@ import (
"sync" "sync"
"time" "time"
key "github.com/ipfs/go-ipfs/blocks/key"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
pq "github.com/ipfs/go-ipfs/thirdparty/pq" pq "github.com/ipfs/go-ipfs/thirdparty/pq"
u "github.com/ipfs/go-ipfs/util"
) )
type peerRequestQueue interface { type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty. // Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask Pop() *peerRequestTask
Push(entry wantlist.Entry, to peer.ID) Push(entry wantlist.Entry, to peer.ID)
Remove(k u.Key, p peer.ID) Remove(k key.Key, p peer.ID)
// NB: cannot expose simply expose taskQueue.Len because trashed elements // NB: cannot expose simply expose taskQueue.Len because trashed elements
// may exist. These trashed elements should not contribute to the count. // may exist. These trashed elements should not contribute to the count.
} }
...@@ -110,7 +110,7 @@ func (tl *prq) Pop() *peerRequestTask { ...@@ -110,7 +110,7 @@ func (tl *prq) Pop() *peerRequestTask {
} }
// Remove removes a task from the queue // Remove removes a task from the queue
func (tl *prq) Remove(k u.Key, p peer.ID) { func (tl *prq) Remove(k key.Key, p peer.ID) {
tl.lock.Lock() tl.lock.Lock()
t, ok := tl.taskMap[taskKey(p, k)] t, ok := tl.taskMap[taskKey(p, k)]
if ok { if ok {
...@@ -155,7 +155,7 @@ func (t *peerRequestTask) SetIndex(i int) { ...@@ -155,7 +155,7 @@ func (t *peerRequestTask) SetIndex(i int) {
} }
// taskKey returns a key that uniquely identifies a task. // taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.ID, k u.Key) string { func taskKey(p peer.ID, k key.Key) string {
return string(p) + string(k) return string(p) + string(k)
} }
...@@ -186,7 +186,7 @@ type activePartner struct { ...@@ -186,7 +186,7 @@ type activePartner struct {
activelk sync.Mutex activelk sync.Mutex
active int active int
activeBlocks map[u.Key]struct{} activeBlocks map[key.Key]struct{}
// requests is the number of blocks this peer is currently requesting // requests is the number of blocks this peer is currently requesting
// request need not be locked around as it will only be modified under // request need not be locked around as it will only be modified under
...@@ -203,7 +203,7 @@ type activePartner struct { ...@@ -203,7 +203,7 @@ type activePartner struct {
func newActivePartner() *activePartner { func newActivePartner() *activePartner {
return &activePartner{ return &activePartner{
taskQueue: pq.New(wrapCmp(V1)), taskQueue: pq.New(wrapCmp(V1)),
activeBlocks: make(map[u.Key]struct{}), activeBlocks: make(map[key.Key]struct{}),
} }
} }
...@@ -230,7 +230,7 @@ func partnerCompare(a, b pq.Elem) bool { ...@@ -230,7 +230,7 @@ func partnerCompare(a, b pq.Elem) bool {
} }
// StartTask signals that a task was started for this partner // StartTask signals that a task was started for this partner
func (p *activePartner) StartTask(k u.Key) { func (p *activePartner) StartTask(k key.Key) {
p.activelk.Lock() p.activelk.Lock()
p.activeBlocks[k] = struct{}{} p.activeBlocks[k] = struct{}{}
p.active++ p.active++
...@@ -238,7 +238,7 @@ func (p *activePartner) StartTask(k u.Key) { ...@@ -238,7 +238,7 @@ func (p *activePartner) StartTask(k u.Key) {
} }
// TaskDone signals that a task was completed for this partner // TaskDone signals that a task was completed for this partner
func (p *activePartner) TaskDone(k u.Key) { func (p *activePartner) TaskDone(k key.Key) {
p.activelk.Lock() p.activelk.Lock()
delete(p.activeBlocks, k) delete(p.activeBlocks, k)
p.active-- p.active--
......
...@@ -7,8 +7,8 @@ import ( ...@@ -7,8 +7,8 @@ import (
"strings" "strings"
"testing" "testing"
key "github.com/ipfs/go-ipfs/blocks/key"
"github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
"github.com/ipfs/go-ipfs/util"
"github.com/ipfs/go-ipfs/util/testutil" "github.com/ipfs/go-ipfs/util/testutil"
) )
...@@ -41,10 +41,10 @@ func TestPushPop(t *testing.T) { ...@@ -41,10 +41,10 @@ func TestPushPop(t *testing.T) {
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
letter := alphabet[index] letter := alphabet[index]
t.Log(partner.String()) t.Log(partner.String())
prq.Push(wantlist.Entry{Key: util.Key(letter), Priority: math.MaxInt32 - index}, partner) prq.Push(wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner)
} }
for _, consonant := range consonants { for _, consonant := range consonants {
prq.Remove(util.Key(consonant), partner) prq.Remove(key.Key(consonant), partner)
} }
var out []string var out []string
...@@ -76,10 +76,10 @@ func TestPeerRepeats(t *testing.T) { ...@@ -76,10 +76,10 @@ func TestPeerRepeats(t *testing.T) {
// Have each push some blocks // Have each push some blocks
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
prq.Push(wantlist.Entry{Key: util.Key(i)}, a) prq.Push(wantlist.Entry{Key: key.Key(i)}, a)
prq.Push(wantlist.Entry{Key: util.Key(i)}, b) prq.Push(wantlist.Entry{Key: key.Key(i)}, b)
prq.Push(wantlist.Entry{Key: util.Key(i)}, c) prq.Push(wantlist.Entry{Key: key.Key(i)}, c)
prq.Push(wantlist.Entry{Key: util.Key(i)}, d) prq.Push(wantlist.Entry{Key: key.Key(i)}, d)
} }
// now, pop off four entries, there should be one from each // now, pop off four entries, there should be one from each
......
...@@ -4,10 +4,10 @@ import ( ...@@ -4,10 +4,10 @@ import (
"io" "io"
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/internal/pb" pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/internal/pb"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
inet "github.com/ipfs/go-ipfs/p2p/net" inet "github.com/ipfs/go-ipfs/p2p/net"
u "github.com/ipfs/go-ipfs/util"
ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io" ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
...@@ -25,9 +25,9 @@ type BitSwapMessage interface { ...@@ -25,9 +25,9 @@ type BitSwapMessage interface {
Blocks() []*blocks.Block Blocks() []*blocks.Block
// AddEntry adds an entry to the Wantlist. // AddEntry adds an entry to the Wantlist.
AddEntry(key u.Key, priority int) AddEntry(key key.Key, priority int)
Cancel(key u.Key) Cancel(key key.Key)
Empty() bool Empty() bool
...@@ -47,8 +47,8 @@ type Exportable interface { ...@@ -47,8 +47,8 @@ type Exportable interface {
type impl struct { type impl struct {
full bool full bool
wantlist map[u.Key]Entry wantlist map[key.Key]Entry
blocks map[u.Key]*blocks.Block blocks map[key.Key]*blocks.Block
} }
func New(full bool) BitSwapMessage { func New(full bool) BitSwapMessage {
...@@ -57,8 +57,8 @@ func New(full bool) BitSwapMessage { ...@@ -57,8 +57,8 @@ func New(full bool) BitSwapMessage {
func newMsg(full bool) *impl { func newMsg(full bool) *impl {
return &impl{ return &impl{
blocks: make(map[u.Key]*blocks.Block), blocks: make(map[key.Key]*blocks.Block),
wantlist: make(map[u.Key]Entry), wantlist: make(map[key.Key]Entry),
full: full, full: full,
} }
} }
...@@ -71,7 +71,7 @@ type Entry struct { ...@@ -71,7 +71,7 @@ type Entry struct {
func newMessageFromProto(pbm pb.Message) BitSwapMessage { func newMessageFromProto(pbm pb.Message) BitSwapMessage {
m := newMsg(pbm.GetWantlist().GetFull()) m := newMsg(pbm.GetWantlist().GetFull())
for _, e := range pbm.GetWantlist().GetEntries() { for _, e := range pbm.GetWantlist().GetEntries() {
m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel()) m.addEntry(key.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
} }
for _, d := range pbm.GetBlocks() { for _, d := range pbm.GetBlocks() {
b := blocks.NewBlock(d) b := blocks.NewBlock(d)
...@@ -104,16 +104,16 @@ func (m *impl) Blocks() []*blocks.Block { ...@@ -104,16 +104,16 @@ func (m *impl) Blocks() []*blocks.Block {
return bs return bs
} }
func (m *impl) Cancel(k u.Key) { func (m *impl) Cancel(k key.Key) {
delete(m.wantlist, k) delete(m.wantlist, k)
m.addEntry(k, 0, true) m.addEntry(k, 0, true)
} }
func (m *impl) AddEntry(k u.Key, priority int) { func (m *impl) AddEntry(k key.Key, priority int) {
m.addEntry(k, priority, false) m.addEntry(k, priority, false)
} }
func (m *impl) addEntry(k u.Key, priority int, cancel bool) { func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
e, exists := m.wantlist[k] e, exists := m.wantlist[k]
if exists { if exists {
e.Priority = priority e.Priority = priority
......
...@@ -7,14 +7,14 @@ import ( ...@@ -7,14 +7,14 @@ import (
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/internal/pb" pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/internal/pb"
u "github.com/ipfs/go-ipfs/util"
) )
func TestAppendWanted(t *testing.T) { func TestAppendWanted(t *testing.T) {
const str = "foo" const str = "foo"
m := New(true) m := New(true)
m.AddEntry(u.Key(str), 1) m.AddEntry(key.Key(str), 1)
if !wantlistContains(m.ToProto().GetWantlist(), str) { if !wantlistContains(m.ToProto().GetWantlist(), str) {
t.Fail() t.Fail()
...@@ -63,7 +63,7 @@ func TestWantlist(t *testing.T) { ...@@ -63,7 +63,7 @@ func TestWantlist(t *testing.T) {
keystrs := []string{"foo", "bar", "baz", "bat"} keystrs := []string{"foo", "bar", "baz", "bat"}
m := New(true) m := New(true)
for _, s := range keystrs { for _, s := range keystrs {
m.AddEntry(u.Key(s), 1) m.AddEntry(key.Key(s), 1)
} }
exported := m.Wantlist() exported := m.Wantlist()
...@@ -86,7 +86,7 @@ func TestCopyProtoByValue(t *testing.T) { ...@@ -86,7 +86,7 @@ func TestCopyProtoByValue(t *testing.T) {
const str = "foo" const str = "foo"
m := New(true) m := New(true)
protoBeforeAppend := m.ToProto() protoBeforeAppend := m.ToProto()
m.AddEntry(u.Key(str), 1) m.AddEntry(key.Key(str), 1)
if wantlistContains(protoBeforeAppend.GetWantlist(), str) { if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
t.Fail() t.Fail()
} }
...@@ -94,11 +94,11 @@ func TestCopyProtoByValue(t *testing.T) { ...@@ -94,11 +94,11 @@ func TestCopyProtoByValue(t *testing.T) {
func TestToNetFromNetPreservesWantList(t *testing.T) { func TestToNetFromNetPreservesWantList(t *testing.T) {
original := New(true) original := New(true)
original.AddEntry(u.Key("M"), 1) original.AddEntry(key.Key("M"), 1)
original.AddEntry(u.Key("B"), 1) original.AddEntry(key.Key("B"), 1)
original.AddEntry(u.Key("D"), 1) original.AddEntry(key.Key("D"), 1)
original.AddEntry(u.Key("T"), 1) original.AddEntry(key.Key("T"), 1)
original.AddEntry(u.Key("F"), 1) original.AddEntry(key.Key("F"), 1)
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
if err := original.ToNet(buf); err != nil { if err := original.ToNet(buf); err != nil {
...@@ -110,7 +110,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { ...@@ -110,7 +110,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
keys := make(map[u.Key]bool) keys := make(map[key.Key]bool)
for _, k := range copied.Wantlist() { for _, k := range copied.Wantlist() {
keys[k.Key] = true keys[k.Key] = true
} }
...@@ -140,7 +140,7 @@ func TestToAndFromNetMessage(t *testing.T) { ...@@ -140,7 +140,7 @@ func TestToAndFromNetMessage(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
keys := make(map[u.Key]bool) keys := make(map[key.Key]bool)
for _, b := range m2.Blocks() { for _, b := range m2.Blocks() {
keys[b.Key()] = true keys[b.Key()] = true
} }
......
...@@ -2,10 +2,10 @@ package network ...@@ -2,10 +2,10 @@ package network
import ( import (
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
protocol "github.com/ipfs/go-ipfs/p2p/protocol" protocol "github.com/ipfs/go-ipfs/p2p/protocol"
u "github.com/ipfs/go-ipfs/util"
) )
var ProtocolBitswap protocol.ID = "/ipfs/bitswap" var ProtocolBitswap protocol.ID = "/ipfs/bitswap"
...@@ -44,8 +44,8 @@ type Receiver interface { ...@@ -44,8 +44,8 @@ type Receiver interface {
type Routing interface { type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key // FindProvidersAsync returns a channel of providers for the given key
FindProvidersAsync(context.Context, u.Key, int) <-chan peer.ID FindProvidersAsync(context.Context, key.Key, int) <-chan peer.ID
// Provide provides the key to the network // Provide provides the key to the network
Provide(context.Context, u.Key) error Provide(context.Context, key.Key) error
} }
...@@ -3,13 +3,13 @@ package network ...@@ -3,13 +3,13 @@ package network
import ( import (
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
host "github.com/ipfs/go-ipfs/p2p/host" host "github.com/ipfs/go-ipfs/p2p/host"
inet "github.com/ipfs/go-ipfs/p2p/net" inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
routing "github.com/ipfs/go-ipfs/routing" routing "github.com/ipfs/go-ipfs/routing"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
util "github.com/ipfs/go-ipfs/util"
) )
var log = eventlog.Logger("bitswap_network") var log = eventlog.Logger("bitswap_network")
...@@ -102,7 +102,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { ...@@ -102,7 +102,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
} }
// FindProvidersAsync returns a channel of providers for the given key // FindProvidersAsync returns a channel of providers for the given key
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID { func (bsnet *impl) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan peer.ID {
// Since routing queries are expensive, give bitswap the peers to which we // Since routing queries are expensive, give bitswap the peers to which we
// have open connections. Note that this may cause issues if bitswap starts // have open connections. Note that this may cause issues if bitswap starts
...@@ -138,7 +138,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) ...@@ -138,7 +138,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int)
} }
// Provide provides the key to the network // Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k util.Key) error { func (bsnet *impl) Provide(ctx context.Context, k key.Key) error {
return bsnet.routing.Provide(ctx, k) return bsnet.routing.Provide(ctx, k)
} }
......
...@@ -4,14 +4,14 @@ import ( ...@@ -4,14 +4,14 @@ import (
pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub" pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
u "github.com/ipfs/go-ipfs/util" key "github.com/ipfs/go-ipfs/blocks/key"
) )
const bufferSize = 16 const bufferSize = 16
type PubSub interface { type PubSub interface {
Publish(block *blocks.Block) Publish(block *blocks.Block)
Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block
Shutdown() Shutdown()
} }
...@@ -35,7 +35,7 @@ func (ps *impl) Shutdown() { ...@@ -35,7 +35,7 @@ func (ps *impl) Shutdown() {
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel| // Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys) // is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks. // blocks.
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block { func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block {
blocksCh := make(chan *blocks.Block, len(keys)) blocksCh := make(chan *blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
...@@ -71,7 +71,7 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Blo ...@@ -71,7 +71,7 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Blo
return blocksCh return blocksCh
} }
func toStrings(keys []u.Key) []string { func toStrings(keys []key.Key) []string {
strs := make([]string, 0) strs := make([]string, 0)
for _, key := range keys { for _, key := range keys {
strs = append(strs, string(key)) strs = append(strs, string(key))
......
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
"github.com/ipfs/go-ipfs/util" key "github.com/ipfs/go-ipfs/blocks/key"
) )
func TestDuplicates(t *testing.T) { func TestDuplicates(t *testing.T) {
...@@ -131,8 +131,8 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) { ...@@ -131,8 +131,8 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
t.Log("generate a large number of blocks. exceed default buffer") t.Log("generate a large number of blocks. exceed default buffer")
bs := g.Blocks(1000) bs := g.Blocks(1000)
ks := func() []util.Key { ks := func() []key.Key {
var keys []util.Key var keys []key.Key
for _, b := range bs { for _, b := range bs {
keys = append(keys, b.Key()) keys = append(keys, b.Key())
} }
......
package bitswap package bitswap
import ( import (
u "github.com/ipfs/go-ipfs/util" key "github.com/ipfs/go-ipfs/blocks/key"
"sort" "sort"
) )
type Stat struct { type Stat struct {
ProvideBufLen int ProvideBufLen int
Wantlist []u.Key Wantlist []key.Key
Peers []string Peers []string
BlocksReceived int BlocksReceived int
DupBlksReceived int DupBlksReceived int
......
...@@ -4,13 +4,13 @@ import ( ...@@ -4,13 +4,13 @@ import (
"errors" "errors"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
routing "github.com/ipfs/go-ipfs/routing" routing "github.com/ipfs/go-ipfs/routing"
mockrouting "github.com/ipfs/go-ipfs/routing/mock" mockrouting "github.com/ipfs/go-ipfs/routing/mock"
delay "github.com/ipfs/go-ipfs/thirdparty/delay" delay "github.com/ipfs/go-ipfs/thirdparty/delay"
util "github.com/ipfs/go-ipfs/util"
testutil "github.com/ipfs/go-ipfs/util/testutil" testutil "github.com/ipfs/go-ipfs/util/testutil"
) )
...@@ -91,7 +91,7 @@ func (nc *networkClient) SendMessage( ...@@ -91,7 +91,7 @@ func (nc *networkClient) SendMessage(
} }
// FindProvidersAsync returns a channel of providers for the given key // FindProvidersAsync returns a channel of providers for the given key
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID { func (nc *networkClient) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan peer.ID {
// NB: this function duplicates the PeerInfo -> ID transformation in the // NB: this function duplicates the PeerInfo -> ID transformation in the
// bitswap network adapter. Not to worry. This network client will be // bitswap network adapter. Not to worry. This network client will be
...@@ -113,7 +113,7 @@ func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max ...@@ -113,7 +113,7 @@ func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max
} }
// Provide provides the key to the network // Provide provides the key to the network
func (nc *networkClient) Provide(ctx context.Context, k util.Key) error { func (nc *networkClient) Provide(ctx context.Context, k key.Key) error {
return nc.routing.Provide(ctx, k) return nc.routing.Provide(ctx, k)
} }
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
package wantlist package wantlist
import ( import (
u "github.com/ipfs/go-ipfs/util" key "github.com/ipfs/go-ipfs/blocks/key"
"sort" "sort"
"sync" "sync"
) )
...@@ -15,14 +15,14 @@ type ThreadSafe struct { ...@@ -15,14 +15,14 @@ type ThreadSafe struct {
// not threadsafe // not threadsafe
type Wantlist struct { type Wantlist struct {
set map[u.Key]Entry set map[key.Key]Entry
// TODO provide O(1) len accessor if cost becomes an issue // TODO provide O(1) len accessor if cost becomes an issue
} }
type Entry struct { type Entry struct {
// TODO consider making entries immutable so they can be shared safely and // TODO consider making entries immutable so they can be shared safely and
// slices can be copied efficiently. // slices can be copied efficiently.
Key u.Key Key key.Key
Priority int Priority int
} }
...@@ -40,25 +40,25 @@ func NewThreadSafe() *ThreadSafe { ...@@ -40,25 +40,25 @@ func NewThreadSafe() *ThreadSafe {
func New() *Wantlist { func New() *Wantlist {
return &Wantlist{ return &Wantlist{
set: make(map[u.Key]Entry), set: make(map[key.Key]Entry),
} }
} }
func (w *ThreadSafe) Add(k u.Key, priority int) { func (w *ThreadSafe) Add(k key.Key, priority int) {
// TODO rm defer for perf // TODO rm defer for perf
w.lk.Lock() w.lk.Lock()
defer w.lk.Unlock() defer w.lk.Unlock()
w.Wantlist.Add(k, priority) w.Wantlist.Add(k, priority)
} }
func (w *ThreadSafe) Remove(k u.Key) { func (w *ThreadSafe) Remove(k key.Key) {
// TODO rm defer for perf // TODO rm defer for perf
w.lk.Lock() w.lk.Lock()
defer w.lk.Unlock() defer w.lk.Unlock()
w.Wantlist.Remove(k) w.Wantlist.Remove(k)
} }
func (w *ThreadSafe) Contains(k u.Key) (Entry, bool) { func (w *ThreadSafe) Contains(k key.Key) (Entry, bool) {
// TODO rm defer for perf // TODO rm defer for perf
w.lk.RLock() w.lk.RLock()
defer w.lk.RUnlock() defer w.lk.RUnlock()
...@@ -87,7 +87,7 @@ func (w *Wantlist) Len() int { ...@@ -87,7 +87,7 @@ func (w *Wantlist) Len() int {
return len(w.set) return len(w.set)
} }
func (w *Wantlist) Add(k u.Key, priority int) { func (w *Wantlist) Add(k key.Key, priority int) {
if _, ok := w.set[k]; ok { if _, ok := w.set[k]; ok {
return return
} }
...@@ -97,11 +97,11 @@ func (w *Wantlist) Add(k u.Key, priority int) { ...@@ -97,11 +97,11 @@ func (w *Wantlist) Add(k u.Key, priority int) {
} }
} }
func (w *Wantlist) Remove(k u.Key) { func (w *Wantlist) Remove(k key.Key) {
delete(w.set, k) delete(w.set, k)
} }
func (w *Wantlist) Contains(k u.Key) (Entry, bool) { func (w *Wantlist) Contains(k key.Key) (Entry, bool) {
e, ok := w.set[k] e, ok := w.set[k]
return e, ok return e, ok
} }
......
...@@ -5,12 +5,12 @@ import ( ...@@ -5,12 +5,12 @@ import (
"time" "time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
u "github.com/ipfs/go-ipfs/util"
) )
type WantManager struct { type WantManager struct {
...@@ -46,7 +46,7 @@ type msgPair struct { ...@@ -46,7 +46,7 @@ type msgPair struct {
type cancellation struct { type cancellation struct {
who peer.ID who peer.ID
blk u.Key blk key.Key
} }
type msgQueue struct { type msgQueue struct {
...@@ -60,16 +60,16 @@ type msgQueue struct { ...@@ -60,16 +60,16 @@ type msgQueue struct {
done chan struct{} done chan struct{}
} }
func (pm *WantManager) WantBlocks(ks []u.Key) { func (pm *WantManager) WantBlocks(ks []key.Key) {
log.Infof("want blocks: %s", ks) log.Infof("want blocks: %s", ks)
pm.addEntries(ks, false) pm.addEntries(ks, false)
} }
func (pm *WantManager) CancelWants(ks []u.Key) { func (pm *WantManager) CancelWants(ks []key.Key) {
pm.addEntries(ks, true) pm.addEntries(ks, true)
} }
func (pm *WantManager) addEntries(ks []u.Key, cancel bool) { func (pm *WantManager) addEntries(ks []key.Key, cancel bool) {
var entries []*bsmsg.Entry var entries []*bsmsg.Entry
for i, k := range ks { for i, k := range ks {
entries = append(entries, &bsmsg.Entry{ entries = append(entries, &bsmsg.Entry{
......
...@@ -7,7 +7,7 @@ import ( ...@@ -7,7 +7,7 @@ import (
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
u "github.com/ipfs/go-ipfs/util" key "github.com/ipfs/go-ipfs/blocks/key"
) )
var TaskWorkerCount = 8 var TaskWorkerCount = 8
...@@ -104,9 +104,9 @@ func (bs *Bitswap) provideWorker(ctx context.Context) { ...@@ -104,9 +104,9 @@ func (bs *Bitswap) provideWorker(ctx context.Context) {
func (bs *Bitswap) provideCollector(ctx context.Context) { func (bs *Bitswap) provideCollector(ctx context.Context) {
defer close(bs.provideKeys) defer close(bs.provideKeys)
var toProvide []u.Key var toProvide []key.Key
var nextKey u.Key var nextKey key.Key
var keysOut chan u.Key var keysOut chan key.Key
for { for {
select { select {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment