Commit ac1dd999 authored by Jeromy's avatar Jeromy

cid: integrate cid into bitswap and blockstores

License: MIT
Signed-off-by: default avatarJeromy <why@ipfs.io>
parent e5c0ecbe
......@@ -3,13 +3,12 @@
package bitswap
import (
"context"
"errors"
"math"
"sync"
"time"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"
......@@ -19,12 +18,12 @@ import (
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
flags "github.com/ipfs/go-ipfs/flags"
"github.com/ipfs/go-ipfs/thirdparty/delay"
loggables "gx/ipfs/QmTMy4hVSY28DdwJ9kBz6y7q6MuioFzPcpM3Ma3aPjo1i3/go-libp2p-loggables"
context "context"
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
loggables "gx/ipfs/QmTMy4hVSY28DdwJ9kBz6y7q6MuioFzPcpM3Ma3aPjo1i3/go-libp2p-loggables"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
......@@ -90,8 +89,8 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
network: network,
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan key.Key, HasBlockBufferSize),
provideKeys: make(chan key.Key, provideKeysBufferSize),
newBlocks: make(chan *cid.Cid, HasBlockBufferSize),
provideKeys: make(chan *cid.Cid, provideKeysBufferSize),
wm: NewWantManager(ctx, network),
}
go bs.wm.Run()
......@@ -137,9 +136,9 @@ type Bitswap struct {
process process.Process
newBlocks chan key.Key
newBlocks chan *cid.Cid
provideKeys chan key.Key
provideKeys chan *cid.Cid
counterLk sync.Mutex
blocksRecvd int
......@@ -148,14 +147,15 @@ type Bitswap struct {
}
type blockRequest struct {
Key key.Key
Cid *cid.Cid
Ctx context.Context
}
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) {
if k == "" {
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
if k == nil {
log.Error("nil cid in GetBlock")
return nil, blockstore.ErrNotFound
}
......@@ -165,18 +165,17 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, er
// functions called by this one. Otherwise those functions won't return
// when this context's cancel func is executed. This is difficult to
// enforce. May this comment keep you safe.
ctx, cancelFunc := context.WithCancel(parent)
ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
defer func() {
cancelFunc()
}()
promise, err := bs.GetBlocks(ctx, []key.Key{k})
promise, err := bs.GetBlocks(ctx, []*cid.Cid{k})
if err != nil {
return nil, err
}
......@@ -197,10 +196,10 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, er
}
}
func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
var out []key.Key
func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid {
var out []*cid.Cid
for _, e := range bs.engine.WantlistForPeer(p) {
out = append(out, e.Key)
out = append(out, e.Cid)
}
return out
}
......@@ -216,7 +215,7 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) {
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
if len(keys) == 0 {
out := make(chan blocks.Block)
close(out)
......@@ -231,7 +230,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
promise := bs.notifications.Subscribe(ctx, keys...)
for _, k := range keys {
log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
}
bs.wm.WantBlocks(ctx, keys)
......@@ -240,13 +239,13 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
req := &blockRequest{
Key: keys[0],
Cid: keys[0],
Ctx: ctx,
}
remaining := make(map[key.Key]struct{})
remaining := cid.NewSet()
for _, k := range keys {
remaining[k] = struct{}{}
remaining.Add(k)
}
out := make(chan blocks.Block)
......@@ -255,11 +254,8 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
defer cancel()
defer close(out)
defer func() {
var toCancel []key.Key
for k, _ := range remaining {
toCancel = append(toCancel, k)
}
bs.CancelWants(toCancel)
// can't just defer this call on its own, arguments are resolved *when* the defer is created
bs.CancelWants(remaining.Keys())
}()
for {
select {
......@@ -268,7 +264,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
return
}
delete(remaining, blk.Key())
remaining.Remove(blk.Cid())
select {
case out <- blk:
case <-ctx.Done():
......@@ -289,8 +285,8 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
}
// CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(keys []key.Key) {
bs.wm.CancelWants(keys)
func (bs *Bitswap) CancelWants(cids []*cid.Cid) {
bs.wm.CancelWants(cids)
}
// HasBlock announces the existance of a block to this bitswap service. The
......@@ -318,7 +314,7 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
bs.engine.AddBlock(blk)
select {
case bs.newBlocks <- blk.Key():
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
......@@ -340,13 +336,13 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}
// quickly send out cancels, reduces chances of duplicate block receives
var keys []key.Key
var keys []*cid.Cid
for _, block := range iblocks {
if _, found := bs.wm.wl.Contains(block.Key()); !found {
if _, found := bs.wm.wl.Contains(block.Cid()); !found {
log.Infof("received un-asked-for %s from %s", block, p)
continue
}
keys = append(keys, block.Key())
keys = append(keys, block.Cid())
}
bs.wm.CancelWants(keys)
......@@ -360,8 +356,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
}
k := b.Key()
log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
k := b.Cid()
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
log.Debugf("got block %s from %s", b, p)
if err := bs.HasBlock(b); err != nil {
......@@ -378,7 +374,7 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
bs.counterLk.Lock()
defer bs.counterLk.Unlock()
bs.blocksRecvd++
has, err := bs.blockstore.Has(b.Key())
has, err := bs.blockstore.Has(b.Cid())
if err != nil {
log.Infof("blockstore.Has error: %s", err)
return err
......@@ -415,10 +411,10 @@ func (bs *Bitswap) Close() error {
return bs.process.Close()
}
func (bs *Bitswap) GetWantlist() []key.Key {
var out []key.Key
func (bs *Bitswap) GetWantlist() []*cid.Cid {
var out []*cid.Cid
for _, e := range bs.wm.wl.Entries() {
out = append(out, e.Key)
out = append(out, e.Cid)
}
return out
}
......
......@@ -2,21 +2,21 @@ package bitswap
import (
"bytes"
"context"
"sync"
"testing"
"time"
context "context"
detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis"
blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
delay "github.com/ipfs/go-ipfs/thirdparty/delay"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis"
detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
p2ptestutil "gx/ipfs/QmcRa2qn6iCmap9bjp8jAwkvYAq13AUfxdY3rrYiaJbLum/go-libp2p/p2p/test/util"
)
......@@ -38,7 +38,7 @@ func TestClose(t *testing.T) {
bitswap := sesgen.Next()
bitswap.Exchange.Close()
bitswap.Exchange.GetBlock(context.Background(), block.Key())
bitswap.Exchange.GetBlock(context.Background(), block.Cid())
}
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
......@@ -57,7 +57,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
defer cancel()
_, err := solo.Exchange.GetBlock(ctx, block.Key())
_, err := solo.Exchange.GetBlock(ctx, block.Cid())
if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error")
......@@ -84,7 +84,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
received, err := wantsBlock.Exchange.GetBlock(ctx, block.Cid())
if err != nil {
t.Log(err)
t.Fatal("Expected to succeed")
......@@ -176,10 +176,10 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
}
}
var blkeys []key.Key
var blkeys []*cid.Cid
first := instances[0]
for _, b := range blocks {
blkeys = append(blkeys, b.Key())
blkeys = append(blkeys, b.Cid())
first.Exchange.HasBlock(b)
}
......@@ -216,7 +216,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
for _, inst := range instances {
for _, b := range blocks {
if _, err := inst.Blockstore().Get(b.Key()); err != nil {
if _, err := inst.Blockstore().Get(b.Cid()); err != nil {
t.Fatal(err)
}
}
......@@ -224,8 +224,8 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
}
func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) {
if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
if _, err := bitswap.Blockstore().Get(b.Cid()); err != nil {
_, err := bitswap.Exchange.GetBlock(context.Background(), b.Cid())
if err != nil {
t.Fatal(err)
}
......@@ -260,7 +260,7 @@ func TestSendToWantingPeer(t *testing.T) {
// peerA requests and waits for block alpha
ctx, cancel := context.WithTimeout(context.Background(), waitTime)
defer cancel()
alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()})
alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []*cid.Cid{alpha.Cid()})
if err != nil {
t.Fatal(err)
}
......@@ -277,7 +277,7 @@ func TestSendToWantingPeer(t *testing.T) {
t.Fatal("context timed out and broke promise channel!")
}
if blkrecvd.Key() != alpha.Key() {
if !blkrecvd.Cid().Equals(alpha.Cid()) {
t.Fatal("Wrong block!")
}
......@@ -292,7 +292,7 @@ func TestEmptyKey(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err := bs.GetBlock(ctx, key.Key(""))
_, err := bs.GetBlock(ctx, nil)
if err != blockstore.ErrNotFound {
t.Error("empty str key should return ErrNotFound")
}
......@@ -315,7 +315,7 @@ func TestBasicBitswap(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
if err != nil {
t.Fatal(err)
}
......@@ -341,7 +341,7 @@ func TestDoubleGet(t *testing.T) {
blocks := bg.Blocks(1)
ctx1, cancel1 := context.WithCancel(context.Background())
blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []key.Key{blocks[0].Key()})
blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []*cid.Cid{blocks[0].Cid()})
if err != nil {
t.Fatal(err)
}
......@@ -349,7 +349,7 @@ func TestDoubleGet(t *testing.T) {
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []key.Key{blocks[0].Key()})
blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []*cid.Cid{blocks[0].Cid()})
if err != nil {
t.Fatal(err)
}
......@@ -396,9 +396,9 @@ func TestWantlistCleanup(t *testing.T) {
bswap := instances.Exchange
blocks := bg.Blocks(20)
var keys []key.Key
var keys []*cid.Cid
for _, b := range blocks {
keys = append(keys, b.Key())
keys = append(keys, b.Cid())
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
......
package decision
import (
"fmt"
"math"
"testing"
"github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
"github.com/ipfs/go-ipfs/thirdparty/testutil"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
"gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
......@@ -21,6 +23,8 @@ func BenchmarkTaskQueuePush(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Push(&wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
q.Push(&wantlist.Entry{Cid: c, Priority: math.MaxInt32}, peers[i%len(peers)])
}
}
......@@ -169,8 +169,9 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
// with a task in hand, we're ready to prepare the envelope...
block, err := e.bs.Get(nextTask.Entry.Key)
block, err := e.bs.Get(nextTask.Entry.Cid)
if err != nil {
log.Errorf("tried to execute a task and errored fetching block: %s", err)
// If we don't have the block, don't hold that against the peer
// make sure to update that the task has been 'completed'
nextTask.Done()
......@@ -233,13 +234,13 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debugf("%s cancel %s", p, entry.Key)
l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p)
log.Debugf("%s cancel %s", p, entry.Cid)
l.CancelWant(entry.Cid)
e.peerRequestQueue.Remove(entry.Cid, p)
} else {
log.Debugf("wants %s - %d", entry.Key, entry.Priority)
l.Wants(entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
log.Debugf("wants %s - %d", entry.Cid, entry.Priority)
l.Wants(entry.Cid, entry.Priority)
if exists, err := e.bs.Has(entry.Cid); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p)
newWorkExists = true
}
......@@ -258,7 +259,7 @@ func (e *Engine) addBlock(block blocks.Block) {
for _, l := range e.ledgerMap {
l.lk.Lock()
if entry, ok := l.WantListContains(block.Key()); ok {
if entry, ok := l.WantListContains(block.Cid()); ok {
e.peerRequestQueue.Push(entry, l.Partner)
work = true
}
......@@ -287,8 +288,8 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
l := e.findOrCreate(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.RawData()))
l.wantList.Remove(block.Key())
e.peerRequestQueue.Remove(block.Key(), p)
l.wantList.Remove(block.Cid())
e.peerRequestQueue.Remove(block.Cid(), p)
}
return nil
......
......@@ -167,7 +167,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) {
add := message.New(false)
for i, letter := range keys {
block := blocks.NewBlock([]byte(letter))
add.AddEntry(block.Key(), math.MaxInt32-i)
add.AddEntry(block.Cid(), math.MaxInt32-i)
}
e.MessageReceived(partner, add)
}
......@@ -176,7 +176,7 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) {
cancels := message.New(false)
for _, k := range keys {
block := blocks.NewBlock([]byte(k))
cancels.Cancel(block.Key())
cancels.Cancel(block.Cid())
}
e.MessageReceived(partner, cancels)
}
......@@ -187,7 +187,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
envelope := <-next
received := envelope.Block
expected := blocks.NewBlock([]byte(k))
if received.Key() != expected.Key() {
if !received.Cid().Equals(expected.Cid()) {
return errors.New(fmt.Sprintln("received", string(received.RawData()), "expected", string(expected.RawData())))
}
}
......
......@@ -5,19 +5,16 @@ import (
"time"
wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
// keySet is just a convenient alias for maps of keys, where we only care
// access/lookups.
type keySet map[key.Key]struct{}
func newLedger(p peer.ID) *ledger {
return &ledger{
wantList: wl.New(),
Partner: p,
sentToPeer: make(map[key.Key]time.Time),
sentToPeer: make(map[string]time.Time),
}
}
......@@ -44,7 +41,7 @@ type ledger struct {
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
// to a given peer
sentToPeer map[key.Key]time.Time
sentToPeer map[string]time.Time
lk sync.Mutex
}
......@@ -78,16 +75,16 @@ func (l *ledger) ReceivedBytes(n int) {
l.Accounting.BytesRecv += uint64(n)
}
func (l *ledger) Wants(k key.Key, priority int) {
func (l *ledger) Wants(k *cid.Cid, priority int) {
log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList.Add(k, priority)
}
func (l *ledger) CancelWant(k key.Key) {
func (l *ledger) CancelWant(k *cid.Cid) {
l.wantList.Remove(k)
}
func (l *ledger) WantListContains(k key.Key) (*wl.Entry, bool) {
func (l *ledger) WantListContains(k *cid.Cid) (*wl.Entry, bool) {
return l.wantList.Contains(k)
}
......
......@@ -6,7 +6,8 @@ import (
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
pq "github.com/ipfs/go-ipfs/thirdparty/pq"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
......@@ -14,7 +15,7 @@ type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask
Push(entry *wantlist.Entry, to peer.ID)
Remove(k key.Key, p peer.ID)
Remove(k *cid.Cid, p peer.ID)
// NB: cannot expose simply expose taskQueue.Len because trashed elements
// may exist. These trashed elements should not contribute to the count.
......@@ -57,12 +58,11 @@ func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
partner.activelk.Lock()
defer partner.activelk.Unlock()
_, ok = partner.activeBlocks[entry.Key]
if ok {
if partner.activeBlocks.Has(entry.Cid) {
return
}
if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
if task, ok := tl.taskMap[taskKey(to, entry.Cid)]; ok {
task.Entry.Priority = entry.Priority
partner.taskQueue.Update(task.index)
return
......@@ -74,7 +74,7 @@ func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
created: time.Now(),
Done: func() {
tl.lock.Lock()
partner.TaskDone(entry.Key)
partner.TaskDone(entry.Cid)
tl.pQueue.Update(partner.Index())
tl.lock.Unlock()
},
......@@ -104,7 +104,7 @@ func (tl *prq) Pop() *peerRequestTask {
continue // discarding tasks that have been removed
}
partner.StartTask(out.Entry.Key)
partner.StartTask(out.Entry.Cid)
partner.requests--
break // and return |out|
}
......@@ -114,7 +114,7 @@ func (tl *prq) Pop() *peerRequestTask {
}
// Remove removes a task from the queue
func (tl *prq) Remove(k key.Key, p peer.ID) {
func (tl *prq) Remove(k *cid.Cid, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskMap[taskKey(p, k)]
if ok {
......@@ -181,7 +181,7 @@ type peerRequestTask struct {
// Key uniquely identifies a task.
func (t *peerRequestTask) Key() string {
return taskKey(t.Target, t.Entry.Key)
return taskKey(t.Target, t.Entry.Cid)
}
// Index implements pq.Elem
......@@ -195,8 +195,8 @@ func (t *peerRequestTask) SetIndex(i int) {
}
// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.ID, k key.Key) string {
return string(p) + string(k)
func taskKey(p peer.ID, k *cid.Cid) string {
return string(p) + k.KeyString()
}
// FIFO is a basic task comparator that returns tasks in the order created.
......@@ -226,7 +226,7 @@ type activePartner struct {
activelk sync.Mutex
active int
activeBlocks map[key.Key]struct{}
activeBlocks *cid.Set
// requests is the number of blocks this peer is currently requesting
// request need not be locked around as it will only be modified under
......@@ -245,7 +245,7 @@ type activePartner struct {
func newActivePartner() *activePartner {
return &activePartner{
taskQueue: pq.New(wrapCmp(V1)),
activeBlocks: make(map[key.Key]struct{}),
activeBlocks: cid.NewSet(),
}
}
......@@ -281,17 +281,17 @@ func partnerCompare(a, b pq.Elem) bool {
}
// StartTask signals that a task was started for this partner
func (p *activePartner) StartTask(k key.Key) {
func (p *activePartner) StartTask(k *cid.Cid) {
p.activelk.Lock()
p.activeBlocks[k] = struct{}{}
p.activeBlocks.Add(k)
p.active++
p.activelk.Unlock()
}
// TaskDone signals that a task was completed for this partner
func (p *activePartner) TaskDone(k key.Key) {
func (p *activePartner) TaskDone(k *cid.Cid) {
p.activelk.Lock()
delete(p.activeBlocks, k)
p.activeBlocks.Remove(k)
p.active--
if p.active < 0 {
panic("more tasks finished than started!")
......
package decision
import (
"fmt"
"math"
"math/rand"
"sort"
......@@ -9,7 +10,8 @@ import (
"github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
"github.com/ipfs/go-ipfs/thirdparty/testutil"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)
func TestPushPop(t *testing.T) {
......@@ -41,10 +43,13 @@ func TestPushPop(t *testing.T) {
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
letter := alphabet[index]
t.Log(partner.String())
prq.Push(&wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner)
c := cid.NewCidV0(u.Hash([]byte(letter)))
prq.Push(&wantlist.Entry{Cid: c, Priority: math.MaxInt32 - index}, partner)
}
for _, consonant := range consonants {
prq.Remove(key.Key(consonant), partner)
c := cid.NewCidV0(u.Hash([]byte(consonant)))
prq.Remove(c, partner)
}
prq.fullThaw()
......@@ -56,12 +61,13 @@ func TestPushPop(t *testing.T) {
break
}
out = append(out, string(received.Entry.Key))
out = append(out, received.Entry.Cid.String())
}
// Entries popped should already be in correct order
for i, expected := range vowels {
if out[i] != expected {
exp := cid.NewCidV0(u.Hash([]byte(expected))).String()
if out[i] != exp {
t.Fatal("received", out[i], "expected", expected)
}
}
......@@ -78,10 +84,11 @@ func TestPeerRepeats(t *testing.T) {
// Have each push some blocks
for i := 0; i < 5; i++ {
prq.Push(&wantlist.Entry{Key: key.Key(i)}, a)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, b)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, c)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, d)
elcid := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
prq.Push(&wantlist.Entry{Cid: elcid}, a)
prq.Push(&wantlist.Entry{Cid: elcid}, b)
prq.Push(&wantlist.Entry{Cid: elcid}, c)
prq.Push(&wantlist.Entry{Cid: elcid}, d)
}
// now, pop off four entries, there should be one from each
......
package message
import (
"fmt"
"io"
blocks "github.com/ipfs/go-ipfs/blocks"
pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/pb"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
inet "gx/ipfs/QmdXimY9QHaasZmw6hWojWnCJvfgxETjZQfg9g6ZrA9wMX/go-libp2p-net"
ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
inet "gx/ipfs/QmdXimY9QHaasZmw6hWojWnCJvfgxETjZQfg9g6ZrA9wMX/go-libp2p-net"
)
// TODO move message.go into the bitswap package
......@@ -25,9 +26,9 @@ type BitSwapMessage interface {
Blocks() []blocks.Block
// AddEntry adds an entry to the Wantlist.
AddEntry(key key.Key, priority int)
AddEntry(key *cid.Cid, priority int)
Cancel(key key.Key)
Cancel(key *cid.Cid)
Empty() bool
......@@ -47,8 +48,8 @@ type Exportable interface {
type impl struct {
full bool
wantlist map[key.Key]Entry
blocks map[key.Key]blocks.Block
wantlist map[string]Entry
blocks map[string]blocks.Block
}
func New(full bool) BitSwapMessage {
......@@ -57,8 +58,8 @@ func New(full bool) BitSwapMessage {
func newMsg(full bool) *impl {
return &impl{
blocks: make(map[key.Key]blocks.Block),
wantlist: make(map[key.Key]Entry),
blocks: make(map[string]blocks.Block),
wantlist: make(map[string]Entry),
full: full,
}
}
......@@ -68,16 +69,20 @@ type Entry struct {
Cancel bool
}
func newMessageFromProto(pbm pb.Message) BitSwapMessage {
func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) {
m := newMsg(pbm.GetWantlist().GetFull())
for _, e := range pbm.GetWantlist().GetEntries() {
m.addEntry(key.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
c, err := cid.Cast([]byte(e.GetBlock()))
if err != nil {
return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err)
}
m.addEntry(c, int(e.GetPriority()), e.GetCancel())
}
for _, d := range pbm.GetBlocks() {
b := blocks.NewBlock(d)
m.AddBlock(b)
}
return m
return m, nil
}
func (m *impl) Full() bool {
......@@ -104,16 +109,17 @@ func (m *impl) Blocks() []blocks.Block {
return bs
}
func (m *impl) Cancel(k key.Key) {
delete(m.wantlist, k)
func (m *impl) Cancel(k *cid.Cid) {
delete(m.wantlist, k.KeyString())
m.addEntry(k, 0, true)
}
func (m *impl) AddEntry(k key.Key, priority int) {
func (m *impl) AddEntry(k *cid.Cid, priority int) {
m.addEntry(k, priority, false)
}
func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
func (m *impl) addEntry(c *cid.Cid, priority int, cancel bool) {
k := c.KeyString()
e, exists := m.wantlist[k]
if exists {
e.Priority = priority
......@@ -121,7 +127,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
} else {
m.wantlist[k] = Entry{
Entry: &wantlist.Entry{
Key: k,
Cid: c,
Priority: priority,
},
Cancel: cancel,
......@@ -130,7 +136,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
}
func (m *impl) AddBlock(b blocks.Block) {
m.blocks[b.Key()] = b
m.blocks[b.Cid().KeyString()] = b
}
func FromNet(r io.Reader) (BitSwapMessage, error) {
......@@ -144,8 +150,7 @@ func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
return nil, err
}
m := newMessageFromProto(*pb)
return m, nil
return newMessageFromProto(*pb)
}
func (m *impl) ToProto() *pb.Message {
......@@ -153,7 +158,7 @@ func (m *impl) ToProto() *pb.Message {
pbm.Wantlist = new(pb.Message_Wantlist)
for _, e := range m.wantlist {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
Block: proto.String(string(e.Key)),
Block: proto.String(e.Cid.KeyString()),
Priority: proto.Int32(int32(e.Priority)),
Cancel: proto.Bool(e.Cancel),
})
......@@ -176,7 +181,7 @@ func (m *impl) ToNet(w io.Writer) error {
func (m *impl) Loggable() map[string]interface{} {
var blocks []string
for _, v := range m.blocks {
blocks = append(blocks, v.Key().B58String())
blocks = append(blocks, v.Cid().String())
}
return map[string]interface{}{
"blocks": blocks,
......
......@@ -8,13 +8,18 @@ import (
blocks "github.com/ipfs/go-ipfs/blocks"
pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/pb"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
)
func mkFakeCid(s string) *cid.Cid {
return cid.NewCidV0(u.Hash([]byte(s)))
}
func TestAppendWanted(t *testing.T) {
const str = "foo"
str := mkFakeCid("foo")
m := New(true)
m.AddEntry(key.Key(str), 1)
m.AddEntry(str, 1)
if !wantlistContains(m.ToProto().GetWantlist(), str) {
t.Fail()
......@@ -23,16 +28,20 @@ func TestAppendWanted(t *testing.T) {
}
func TestNewMessageFromProto(t *testing.T) {
const str = "a_key"
str := mkFakeCid("a_key")
protoMessage := new(pb.Message)
protoMessage.Wantlist = new(pb.Message_Wantlist)
protoMessage.Wantlist.Entries = []*pb.Message_Wantlist_Entry{
{Block: proto.String(str)},
{Block: proto.String(str.KeyString())},
}
if !wantlistContains(protoMessage.Wantlist, str) {
t.Fail()
}
m := newMessageFromProto(*protoMessage)
m, err := newMessageFromProto(*protoMessage)
if err != nil {
t.Fatal(err)
}
if !wantlistContains(m.ToProto().GetWantlist(), str) {
t.Fail()
}
......@@ -60,10 +69,10 @@ func TestAppendBlock(t *testing.T) {
}
func TestWantlist(t *testing.T) {
keystrs := []string{"foo", "bar", "baz", "bat"}
keystrs := []*cid.Cid{mkFakeCid("foo"), mkFakeCid("bar"), mkFakeCid("baz"), mkFakeCid("bat")}
m := New(true)
for _, s := range keystrs {
m.AddEntry(key.Key(s), 1)
m.AddEntry(s, 1)
}
exported := m.Wantlist()
......@@ -71,22 +80,22 @@ func TestWantlist(t *testing.T) {
present := false
for _, s := range keystrs {
if s == string(k.Key) {
if s.Equals(k.Cid) {
present = true
}
}
if !present {
t.Logf("%v isn't in original list", k.Key)
t.Logf("%v isn't in original list", k.Cid)
t.Fail()
}
}
}
func TestCopyProtoByValue(t *testing.T) {
const str = "foo"
str := mkFakeCid("foo")
m := New(true)
protoBeforeAppend := m.ToProto()
m.AddEntry(key.Key(str), 1)
m.AddEntry(str, 1)
if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
t.Fail()
}
......@@ -94,11 +103,11 @@ func TestCopyProtoByValue(t *testing.T) {
func TestToNetFromNetPreservesWantList(t *testing.T) {
original := New(true)
original.AddEntry(key.Key("M"), 1)
original.AddEntry(key.Key("B"), 1)
original.AddEntry(key.Key("D"), 1)
original.AddEntry(key.Key("T"), 1)
original.AddEntry(key.Key("F"), 1)
original.AddEntry(mkFakeCid("M"), 1)
original.AddEntry(mkFakeCid("B"), 1)
original.AddEntry(mkFakeCid("D"), 1)
original.AddEntry(mkFakeCid("T"), 1)
original.AddEntry(mkFakeCid("F"), 1)
buf := new(bytes.Buffer)
if err := original.ToNet(buf); err != nil {
......@@ -110,13 +119,13 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
t.Fatal(err)
}
keys := make(map[key.Key]bool)
keys := make(map[string]bool)
for _, k := range copied.Wantlist() {
keys[k.Key] = true
keys[k.Cid.KeyString()] = true
}
for _, k := range original.Wantlist() {
if _, ok := keys[k.Key]; !ok {
if _, ok := keys[k.Cid.KeyString()]; !ok {
t.Fatalf("Key Missing: \"%v\"", k)
}
}
......@@ -140,21 +149,21 @@ func TestToAndFromNetMessage(t *testing.T) {
t.Fatal(err)
}
keys := make(map[key.Key]bool)
keys := make(map[string]bool)
for _, b := range m2.Blocks() {
keys[b.Key()] = true
keys[b.Cid().KeyString()] = true
}
for _, b := range original.Blocks() {
if _, ok := keys[b.Key()]; !ok {
if _, ok := keys[b.Cid().KeyString()]; !ok {
t.Fail()
}
}
}
func wantlistContains(wantlist *pb.Message_Wantlist, x string) bool {
func wantlistContains(wantlist *pb.Message_Wantlist, c *cid.Cid) bool {
for _, e := range wantlist.GetEntries() {
if e.GetBlock() == x {
if e.GetBlock() == c.KeyString() {
return true
}
}
......@@ -174,8 +183,8 @@ func TestDuplicates(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
msg := New(true)
msg.AddEntry(b.Key(), 1)
msg.AddEntry(b.Key(), 1)
msg.AddEntry(b.Cid(), 1)
msg.AddEntry(b.Cid(), 1)
if len(msg.Wantlist()) != 1 {
t.Fatal("Duplicate in BitSwapMessage")
}
......
package network
import (
context "context"
"context"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
......@@ -52,8 +53,8 @@ type Receiver interface {
type Routing interface {
// FindProvidersAsync returns a channel of providers for the given key
FindProvidersAsync(context.Context, key.Key, int) <-chan peer.ID
FindProvidersAsync(context.Context, *cid.Cid, int) <-chan peer.ID
// Provide provides the key to the network
Provide(context.Context, key.Key) error
Provide(context.Context, *cid.Cid) error
}
......@@ -10,7 +10,6 @@ import (
ma "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr"
routing "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing"
pstore "gx/ipfs/QmXXCcQ7CLg5a81Ui9TTR35QcR4y7ZyihxwfjqaHfUVcVo/go-libp2p-peerstore"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
host "gx/ipfs/QmdML3R42PRSwnt46jSuEts9bHSqLctVYEjJqMR3UYV8ki/go-libp2p-host"
......@@ -130,7 +129,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
}
// FindProvidersAsync returns a channel of providers for the given key
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan peer.ID {
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID {
// Since routing queries are expensive, give bitswap the peers to which we
// have open connections. Note that this may cause issues if bitswap starts
......@@ -147,12 +146,9 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k key.Key, max int) <
out <- id
}
// TEMPORARY SHIM UNTIL CID GETS PROPAGATED
c := cid.NewCidV0(k.ToMultihash())
go func() {
defer close(out)
providers := bsnet.routing.FindProvidersAsync(ctx, c, max)
providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
for info := range providers {
if info.ID == bsnet.host.ID() {
continue // ignore self as provider
......@@ -169,9 +165,8 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k key.Key, max int) <
}
// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k key.Key) error {
c := cid.NewCidV0(k.ToMultihash())
return bsnet.routing.Provide(ctx, c)
func (bsnet *impl) Provide(ctx context.Context, k *cid.Cid) error {
return bsnet.routing.Provide(ctx, k)
}
// handleNewStream receives a new stream from the network.
......
package notifications
import (
context "context"
pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
"context"
blocks "github.com/ipfs/go-ipfs/blocks"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
const bufferSize = 16
type PubSub interface {
Publish(block blocks.Block)
Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block
Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block
Shutdown()
}
......@@ -24,8 +26,7 @@ type impl struct {
}
func (ps *impl) Publish(block blocks.Block) {
topic := string(block.Key())
ps.wrapped.Pub(block, topic)
ps.wrapped.Pub(block, block.Cid().KeyString())
}
func (ps *impl) Shutdown() {
......@@ -35,7 +36,7 @@ func (ps *impl) Shutdown() {
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block {
func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block {
blocksCh := make(chan blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
......@@ -71,10 +72,10 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Bl
return blocksCh
}
func toStrings(keys []key.Key) []string {
func toStrings(keys []*cid.Cid) []string {
strs := make([]string, 0)
for _, key := range keys {
strs = append(strs, string(key))
strs = append(strs, key.KeyString())
}
return strs
}
......@@ -2,13 +2,13 @@ package notifications
import (
"bytes"
"context"
"testing"
"time"
context "context"
blocks "github.com/ipfs/go-ipfs/blocks"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
func TestDuplicates(t *testing.T) {
......@@ -17,7 +17,7 @@ func TestDuplicates(t *testing.T) {
n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), b1.Key(), b2.Key())
ch := n.Subscribe(context.Background(), b1.Cid(), b2.Cid())
n.Publish(b1)
blockRecvd, ok := <-ch
......@@ -41,7 +41,7 @@ func TestPublishSubscribe(t *testing.T) {
n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), blockSent.Key())
ch := n.Subscribe(context.Background(), blockSent.Cid())
n.Publish(blockSent)
blockRecvd, ok := <-ch
......@@ -59,7 +59,7 @@ func TestSubscribeMany(t *testing.T) {
n := New()
defer n.Shutdown()
ch := n.Subscribe(context.Background(), e1.Key(), e2.Key())
ch := n.Subscribe(context.Background(), e1.Cid(), e2.Cid())
n.Publish(e1)
r1, ok := <-ch
......@@ -83,8 +83,8 @@ func TestDuplicateSubscribe(t *testing.T) {
n := New()
defer n.Shutdown()
ch1 := n.Subscribe(context.Background(), e1.Key())
ch2 := n.Subscribe(context.Background(), e1.Key())
ch1 := n.Subscribe(context.Background(), e1.Cid())
ch2 := n.Subscribe(context.Background(), e1.Cid())
n.Publish(e1)
r1, ok := <-ch1
......@@ -118,7 +118,7 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {
n := New()
defer n.Shutdown()
block := blocks.NewBlock([]byte("A Missed Connection"))
blockChannel := n.Subscribe(fastExpiringCtx, block.Key())
blockChannel := n.Subscribe(fastExpiringCtx, block.Cid())
assertBlockChannelNil(t, blockChannel)
}
......@@ -132,10 +132,10 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
t.Log("generate a large number of blocks. exceed default buffer")
bs := g.Blocks(1000)
ks := func() []key.Key {
var keys []key.Key
ks := func() []*cid.Cid {
var keys []*cid.Cid
for _, b := range bs {
keys = append(keys, b.Key())
keys = append(keys, b.Cid())
}
return keys
}()
......@@ -162,7 +162,7 @@ func assertBlocksEqual(t *testing.T, a, b blocks.Block) {
if !bytes.Equal(a.RawData(), b.RawData()) {
t.Fatal("blocks aren't equal")
}
if a.Key() != b.Key() {
if a.Cid() != b.Cid() {
t.Fatal("block keys aren't equal")
}
}
package bitswap
import (
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
"sort"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
type Stat struct {
ProvideBufLen int
Wantlist []key.Key
Wantlist []*cid.Cid
Peers []string
BlocksReceived int
DupBlksReceived int
......
......@@ -10,7 +10,6 @@ import (
delay "github.com/ipfs/go-ipfs/thirdparty/delay"
testutil "github.com/ipfs/go-ipfs/thirdparty/testutil"
routing "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
......@@ -92,18 +91,17 @@ func (nc *networkClient) SendMessage(
}
// FindProvidersAsync returns a channel of providers for the given key
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan peer.ID {
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID {
// NB: this function duplicates the PeerInfo -> ID transformation in the
// bitswap network adapter. Not to worry. This network client will be
// deprecated once the ipfsnet.Mock is added. The code below is only
// temporary.
c := cid.NewCidV0(k.ToMultihash())
out := make(chan peer.ID)
go func() {
defer close(out)
providers := nc.routing.FindProvidersAsync(ctx, c, max)
providers := nc.routing.FindProvidersAsync(ctx, k, max)
for info := range providers {
select {
case <-ctx.Done():
......@@ -139,9 +137,8 @@ func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.
}
// Provide provides the key to the network
func (nc *networkClient) Provide(ctx context.Context, k key.Key) error {
c := cid.NewCidV0(k.ToMultihash())
return nc.routing.Provide(ctx, c)
func (nc *networkClient) Provide(ctx context.Context, k *cid.Cid) error {
return nc.routing.Provide(ctx, k)
}
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
......
......@@ -6,7 +6,7 @@ import (
"sort"
"sync"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
type ThreadSafe struct {
......@@ -16,11 +16,11 @@ type ThreadSafe struct {
// not threadsafe
type Wantlist struct {
set map[key.Key]*Entry
set map[string]*Entry
}
type Entry struct {
Key key.Key
Cid *cid.Cid
Priority int
RefCnt int
......@@ -40,11 +40,11 @@ func NewThreadSafe() *ThreadSafe {
func New() *Wantlist {
return &Wantlist{
set: make(map[key.Key]*Entry),
set: make(map[string]*Entry),
}
}
func (w *ThreadSafe) Add(k key.Key, priority int) bool {
func (w *ThreadSafe) Add(k *cid.Cid, priority int) bool {
w.lk.Lock()
defer w.lk.Unlock()
return w.Wantlist.Add(k, priority)
......@@ -56,13 +56,13 @@ func (w *ThreadSafe) AddEntry(e *Entry) bool {
return w.Wantlist.AddEntry(e)
}
func (w *ThreadSafe) Remove(k key.Key) bool {
func (w *ThreadSafe) Remove(k *cid.Cid) bool {
w.lk.Lock()
defer w.lk.Unlock()
return w.Wantlist.Remove(k)
}
func (w *ThreadSafe) Contains(k key.Key) (*Entry, bool) {
func (w *ThreadSafe) Contains(k *cid.Cid) (*Entry, bool) {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.Contains(k)
......@@ -90,14 +90,15 @@ func (w *Wantlist) Len() int {
return len(w.set)
}
func (w *Wantlist) Add(k key.Key, priority int) bool {
func (w *Wantlist) Add(c *cid.Cid, priority int) bool {
k := c.KeyString()
if e, ok := w.set[k]; ok {
e.RefCnt++
return false
}
w.set[k] = &Entry{
Key: k,
Cid: c,
Priority: priority,
RefCnt: 1,
}
......@@ -106,15 +107,17 @@ func (w *Wantlist) Add(k key.Key, priority int) bool {
}
func (w *Wantlist) AddEntry(e *Entry) bool {
if ex, ok := w.set[e.Key]; ok {
k := e.Cid.KeyString()
if ex, ok := w.set[k]; ok {
ex.RefCnt++
return false
}
w.set[e.Key] = e
w.set[k] = e
return true
}
func (w *Wantlist) Remove(k key.Key) bool {
func (w *Wantlist) Remove(c *cid.Cid) bool {
k := c.KeyString()
e, ok := w.set[k]
if !ok {
return false
......@@ -128,8 +131,8 @@ func (w *Wantlist) Remove(k key.Key) bool {
return false
}
func (w *Wantlist) Contains(k key.Key) (*Entry, bool) {
e, ok := w.set[k]
func (w *Wantlist) Contains(k *cid.Cid) (*Entry, bool) {
e, ok := w.set[k.KeyString()]
return e, ok
}
......
package bitswap
import (
"context"
"sync"
"time"
context "context"
engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
......@@ -51,7 +51,7 @@ type msgPair struct {
type cancellation struct {
who peer.ID
blk key.Key
blk *cid.Cid
}
type msgQueue struct {
......@@ -69,23 +69,23 @@ type msgQueue struct {
done chan struct{}
}
func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) {
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) {
log.Infof("want blocks: %s", ks)
pm.addEntries(ctx, ks, false)
}
func (pm *WantManager) CancelWants(ks []key.Key) {
func (pm *WantManager) CancelWants(ks []*cid.Cid) {
log.Infof("cancel wants: %s", ks)
pm.addEntries(context.TODO(), ks, true)
}
func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool) {
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) {
var entries []*bsmsg.Entry
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: &wantlist.Entry{
Key: k,
Cid: k,
Priority: kMaxPriority - i,
RefCnt: 1,
},
......@@ -130,7 +130,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
// new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New(true)
for _, e := range pm.wl.Entries() {
fullwantlist.AddEntry(e.Key, e.Priority)
fullwantlist.AddEntry(e.Cid, e.Priority)
}
mq.out = fullwantlist
mq.work <- struct{}{}
......@@ -246,7 +246,7 @@ func (pm *WantManager) Run() {
var filtered []*bsmsg.Entry
for _, e := range entries {
if e.Cancel {
if pm.wl.Remove(e.Key) {
if pm.wl.Remove(e.Cid) {
filtered = append(filtered, e)
}
} else {
......@@ -323,9 +323,9 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
// one passed in
for _, e := range entries {
if e.Cancel {
mq.out.Cancel(e.Key)
mq.out.Cancel(e.Cid)
} else {
mq.out.AddEntry(e.Key, e.Priority)
mq.out.AddEntry(e.Cid, e.Priority)
}
}
}
package bitswap
import (
"context"
"math/rand"
"sync"
"time"
context "context"
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
......@@ -77,7 +77,7 @@ func (bs *Bitswap) provideWorker(px process.Process) {
limit := make(chan struct{}, provideWorkerMax)
limitedGoProvide := func(k key.Key, wid int) {
limitedGoProvide := func(k *cid.Cid, wid int) {
defer func() {
// replace token when done
<-limit
......@@ -85,7 +85,7 @@ func (bs *Bitswap) provideWorker(px process.Process) {
ev := logging.LoggableMap{"ID": wid}
ctx := procctx.OnClosingContext(px) // derive ctx from px
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done()
defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done()
ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx
defer cancel()
......@@ -121,9 +121,9 @@ func (bs *Bitswap) provideWorker(px process.Process) {
func (bs *Bitswap) provideCollector(ctx context.Context) {
defer close(bs.provideKeys)
var toProvide []key.Key
var nextKey key.Key
var keysOut chan key.Key
var toProvide []*cid.Cid
var nextKey *cid.Cid
var keysOut chan *cid.Cid
for {
select {
......@@ -181,7 +181,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
// for new providers for blocks.
i := rand.Intn(len(entries))
bs.findKeys <- &blockRequest{
Key: entries[i].Key,
Cid: entries[i].Cid,
Ctx: ctx,
}
case <-parent.Done():
......@@ -192,23 +192,23 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
func (bs *Bitswap) providerQueryManager(ctx context.Context) {
var activeLk sync.Mutex
kset := key.NewKeySet()
kset := cid.NewSet()
for {
select {
case e := <-bs.findKeys:
activeLk.Lock()
if kset.Has(e.Key) {
if kset.Has(e.Cid) {
activeLk.Unlock()
continue
}
kset.Add(e.Key)
kset.Add(e.Cid)
activeLk.Unlock()
go func(e *blockRequest) {
child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
defer cancel()
providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest)
providers := bs.network.FindProvidersAsync(child, e.Cid, maxProvidersPerRequest)
wg := &sync.WaitGroup{}
for p := range providers {
wg.Add(1)
......@@ -222,7 +222,7 @@ func (bs *Bitswap) providerQueryManager(ctx context.Context) {
}
wg.Wait()
activeLk.Lock()
kset.Remove(e.Key)
kset.Remove(e.Cid)
activeLk.Unlock()
}(e)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment