Commit b5121d63 authored by Jeromy's avatar Jeromy

wire GetBlocks into blockservice

parent b90e5fb7
...@@ -108,7 +108,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ...@@ -108,7 +108,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
select { select {
case block := <-promise: case block := <-promise:
return &block, nil return block, nil
case <-parent.Done(): case <-parent.Done():
return nil, parent.Err() return nil, parent.Err()
} }
...@@ -122,7 +122,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ...@@ -122,7 +122,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
// NB: Your request remains open until the context expires. To conserve // NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one // resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server) // that lasts throughout the lifetime of the server)
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan blocks.Block, error) { func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
// TODO log the request // TODO log the request
promise := bs.notifications.Subscribe(ctx, keys...) promise := bs.notifications.Subscribe(ctx, keys...)
...@@ -213,7 +213,7 @@ func (bs *bitswap) loop(parent context.Context) { ...@@ -213,7 +213,7 @@ func (bs *bitswap) loop(parent context.Context) {
// HasBlock announces the existance of a block to this bitswap service. The // HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers. // service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
log.Debugf("Has Block %v", blk.Key()) log.Debugf("Has Block %v", blk.Key())
bs.wantlist.Remove(blk.Key()) bs.wantlist.Remove(blk.Key())
bs.sendToPeersThatWant(ctx, blk) bs.sendToPeersThatWant(ctx, blk)
...@@ -244,7 +244,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm ...@@ -244,7 +244,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
for _, block := range incoming.Blocks() { for _, block := range incoming.Blocks() {
// TODO verify blocks? // TODO verify blocks?
if err := bs.blockstore.Put(&block); err != nil { if err := bs.blockstore.Put(block); err != nil {
log.Criticalf("error putting block: %s", err) log.Criticalf("error putting block: %s", err)
continue // FIXME(brian): err ignored continue // FIXME(brian): err ignored
} }
...@@ -267,7 +267,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm ...@@ -267,7 +267,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil { if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue continue
} else { } else {
message.AddBlock(*block) message.AddBlock(block)
} }
} }
} }
...@@ -290,7 +290,7 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage ...@@ -290,7 +290,7 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage
bs.strategy.MessageSent(p, m) bs.strategy.MessageSent(p, m)
} }
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) {
log.Debugf("Sending %v to peers that want it", block.Key()) log.Debugf("Sending %v to peers that want it", block.Key())
for _, p := range bs.strategy.Peers() { for _, p := range bs.strategy.Peers() {
......
...@@ -83,7 +83,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { ...@@ -83,7 +83,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
if err := hasBlock.blockstore.Put(block); err != nil { if err := hasBlock.blockstore.Put(block); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := hasBlock.exchange.HasBlock(context.Background(), *block); err != nil { if err := hasBlock.exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -140,7 +140,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -140,7 +140,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
first := instances[0] first := instances[0]
for _, b := range blocks { for _, b := range blocks {
first.blockstore.Put(b) first.blockstore.Put(b)
first.exchange.HasBlock(context.Background(), *b) first.exchange.HasBlock(context.Background(), b)
rs.Announce(first.peer, b.Key()) rs.Announce(first.peer, b.Key())
} }
...@@ -212,7 +212,7 @@ func TestSendToWantingPeer(t *testing.T) { ...@@ -212,7 +212,7 @@ func TestSendToWantingPeer(t *testing.T) {
beta := bg.Next() beta := bg.Next()
t.Logf("Peer %v announes availability of %v\n", w.peer, beta.Key()) t.Logf("Peer %v announes availability of %v\n", w.peer, beta.Key())
ctx, _ = context.WithTimeout(context.Background(), timeout) ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := w.blockstore.Put(&beta); err != nil { if err := w.blockstore.Put(beta); err != nil {
t.Fatal(err) t.Fatal(err)
} }
w.exchange.HasBlock(ctx, beta) w.exchange.HasBlock(ctx, beta)
...@@ -225,7 +225,7 @@ func TestSendToWantingPeer(t *testing.T) { ...@@ -225,7 +225,7 @@ func TestSendToWantingPeer(t *testing.T) {
t.Logf("%v announces availability of %v\n", o.peer, alpha.Key()) t.Logf("%v announces availability of %v\n", o.peer, alpha.Key())
ctx, _ = context.WithTimeout(context.Background(), timeout) ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := o.blockstore.Put(&alpha); err != nil { if err := o.blockstore.Put(alpha); err != nil {
t.Fatal(err) t.Fatal(err)
} }
o.exchange.HasBlock(ctx, alpha) o.exchange.HasBlock(ctx, alpha)
...@@ -254,16 +254,16 @@ type BlockGenerator struct { ...@@ -254,16 +254,16 @@ type BlockGenerator struct {
seq int seq int
} }
func (bg *BlockGenerator) Next() blocks.Block { func (bg *BlockGenerator) Next() *blocks.Block {
bg.seq++ bg.seq++
return *blocks.NewBlock([]byte(string(bg.seq))) return blocks.NewBlock([]byte(string(bg.seq)))
} }
func (bg *BlockGenerator) Blocks(n int) []*blocks.Block { func (bg *BlockGenerator) Blocks(n int) []*blocks.Block {
blocks := make([]*blocks.Block, 0) blocks := make([]*blocks.Block, 0)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
b := bg.Next() b := bg.Next()
blocks = append(blocks, &b) blocks = append(blocks, b)
} }
return blocks return blocks
} }
......
...@@ -19,7 +19,7 @@ type BitSwapMessage interface { ...@@ -19,7 +19,7 @@ type BitSwapMessage interface {
Wantlist() []u.Key Wantlist() []u.Key
// Blocks returns a slice of unique blocks // Blocks returns a slice of unique blocks
Blocks() []blocks.Block Blocks() []*blocks.Block
// AddWanted adds the key to the Wantlist. // AddWanted adds the key to the Wantlist.
// //
...@@ -32,7 +32,7 @@ type BitSwapMessage interface { ...@@ -32,7 +32,7 @@ type BitSwapMessage interface {
// implies Priority(A) > Priority(B) // implies Priority(A) > Priority(B)
AddWanted(u.Key) AddWanted(u.Key)
AddBlock(blocks.Block) AddBlock(*blocks.Block)
Exportable Exportable
} }
...@@ -42,14 +42,14 @@ type Exportable interface { ...@@ -42,14 +42,14 @@ type Exportable interface {
} }
type impl struct { type impl struct {
existsInWantlist map[u.Key]struct{} // map to detect duplicates existsInWantlist map[u.Key]struct{} // map to detect duplicates
wantlist []u.Key // slice to preserve ordering wantlist []u.Key // slice to preserve ordering
blocks map[u.Key]blocks.Block // map to detect duplicates blocks map[u.Key]*blocks.Block // map to detect duplicates
} }
func New() BitSwapMessage { func New() BitSwapMessage {
return &impl{ return &impl{
blocks: make(map[u.Key]blocks.Block), blocks: make(map[u.Key]*blocks.Block),
existsInWantlist: make(map[u.Key]struct{}), existsInWantlist: make(map[u.Key]struct{}),
wantlist: make([]u.Key, 0), wantlist: make([]u.Key, 0),
} }
...@@ -62,7 +62,7 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage { ...@@ -62,7 +62,7 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage {
} }
for _, d := range pbm.GetBlocks() { for _, d := range pbm.GetBlocks() {
b := blocks.NewBlock(d) b := blocks.NewBlock(d)
m.AddBlock(*b) m.AddBlock(b)
} }
return m return m
} }
...@@ -71,8 +71,8 @@ func (m *impl) Wantlist() []u.Key { ...@@ -71,8 +71,8 @@ func (m *impl) Wantlist() []u.Key {
return m.wantlist return m.wantlist
} }
func (m *impl) Blocks() []blocks.Block { func (m *impl) Blocks() []*blocks.Block {
bs := make([]blocks.Block, 0) bs := make([]*blocks.Block, 0)
for _, block := range m.blocks { for _, block := range m.blocks {
bs = append(bs, block) bs = append(bs, block)
} }
...@@ -88,7 +88,7 @@ func (m *impl) AddWanted(k u.Key) { ...@@ -88,7 +88,7 @@ func (m *impl) AddWanted(k u.Key) {
m.wantlist = append(m.wantlist, k) m.wantlist = append(m.wantlist, k)
} }
func (m *impl) AddBlock(b blocks.Block) { func (m *impl) AddBlock(b *blocks.Block) {
m.blocks[b.Key()] = b m.blocks[b.Key()] = b
} }
......
...@@ -42,7 +42,7 @@ func TestAppendBlock(t *testing.T) { ...@@ -42,7 +42,7 @@ func TestAppendBlock(t *testing.T) {
m := New() m := New()
for _, str := range strs { for _, str := range strs {
block := blocks.NewBlock([]byte(str)) block := blocks.NewBlock([]byte(str))
m.AddBlock(*block) m.AddBlock(block)
} }
// assert strings are in proto message // assert strings are in proto message
...@@ -133,10 +133,10 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { ...@@ -133,10 +133,10 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
func TestToAndFromNetMessage(t *testing.T) { func TestToAndFromNetMessage(t *testing.T) {
original := New() original := New()
original.AddBlock(*blocks.NewBlock([]byte("W"))) original.AddBlock(blocks.NewBlock([]byte("W")))
original.AddBlock(*blocks.NewBlock([]byte("E"))) original.AddBlock(blocks.NewBlock([]byte("E")))
original.AddBlock(*blocks.NewBlock([]byte("F"))) original.AddBlock(blocks.NewBlock([]byte("F")))
original.AddBlock(*blocks.NewBlock([]byte("M"))) original.AddBlock(blocks.NewBlock([]byte("M")))
p := peer.WithIDString("X") p := peer.WithIDString("X")
netmsg, err := original.ToNet(p) netmsg, err := original.ToNet(p)
...@@ -180,8 +180,8 @@ func TestDuplicates(t *testing.T) { ...@@ -180,8 +180,8 @@ func TestDuplicates(t *testing.T) {
t.Fatal("Duplicate in BitSwapMessage") t.Fatal("Duplicate in BitSwapMessage")
} }
msg.AddBlock(*b) msg.AddBlock(b)
msg.AddBlock(*b) msg.AddBlock(b)
if len(msg.Blocks()) != 1 { if len(msg.Blocks()) != 1 {
t.Fatal("Duplicate in BitSwapMessage") t.Fatal("Duplicate in BitSwapMessage")
} }
......
...@@ -11,8 +11,8 @@ import ( ...@@ -11,8 +11,8 @@ import (
const bufferSize = 16 const bufferSize = 16
type PubSub interface { type PubSub interface {
Publish(block blocks.Block) Publish(block *blocks.Block)
Subscribe(ctx context.Context, keys ...u.Key) <-chan blocks.Block Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block
Shutdown() Shutdown()
} }
...@@ -24,7 +24,7 @@ type impl struct { ...@@ -24,7 +24,7 @@ type impl struct {
wrapped pubsub.PubSub wrapped pubsub.PubSub
} }
func (ps *impl) Publish(block blocks.Block) { func (ps *impl) Publish(block *blocks.Block) {
topic := string(block.Key()) topic := string(block.Key())
ps.wrapped.Pub(block, topic) ps.wrapped.Pub(block, topic)
} }
...@@ -32,18 +32,18 @@ func (ps *impl) Publish(block blocks.Block) { ...@@ -32,18 +32,18 @@ func (ps *impl) Publish(block blocks.Block) {
// Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil // Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil
// if the |ctx| times out or is cancelled. Then channel is closed after the // if the |ctx| times out or is cancelled. Then channel is closed after the
// blocks given by |keys| are sent. // blocks given by |keys| are sent.
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan blocks.Block { func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
topics := make([]string, 0) topics := make([]string, 0)
for _, key := range keys { for _, key := range keys {
topics = append(topics, string(key)) topics = append(topics, string(key))
} }
subChan := ps.wrapped.SubOnce(topics...) subChan := ps.wrapped.SubOnce(topics...)
blockChannel := make(chan blocks.Block, 1) // buffered so the sender doesn't wait on receiver blockChannel := make(chan *blocks.Block, 1) // buffered so the sender doesn't wait on receiver
go func() { go func() {
defer close(blockChannel) defer close(blockChannel)
select { select {
case val := <-subChan: case val := <-subChan:
block, ok := val.(blocks.Block) block, ok := val.(*blocks.Block)
if ok { if ok {
blockChannel <- block blockChannel <- block
} }
......
...@@ -16,13 +16,13 @@ func TestPublishSubscribe(t *testing.T) { ...@@ -16,13 +16,13 @@ func TestPublishSubscribe(t *testing.T) {
defer n.Shutdown() defer n.Shutdown()
ch := n.Subscribe(context.Background(), blockSent.Key()) ch := n.Subscribe(context.Background(), blockSent.Key())
n.Publish(*blockSent) n.Publish(blockSent)
blockRecvd, ok := <-ch blockRecvd, ok := <-ch
if !ok { if !ok {
t.Fail() t.Fail()
} }
assertBlocksEqual(t, blockRecvd, *blockSent) assertBlocksEqual(t, blockRecvd, blockSent)
} }
...@@ -39,14 +39,14 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) { ...@@ -39,14 +39,14 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {
assertBlockChannelNil(t, blockChannel) assertBlockChannelNil(t, blockChannel)
} }
func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) { func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) {
_, ok := <-blockChannel _, ok := <-blockChannel
if ok { if ok {
t.Fail() t.Fail()
} }
} }
func assertBlocksEqual(t *testing.T, a, b blocks.Block) { func assertBlocksEqual(t *testing.T, a, b *blocks.Block) {
if !bytes.Equal(a.Data, b.Data) { if !bytes.Equal(a.Data, b.Data) {
t.Fail() t.Fail()
} }
......
...@@ -30,7 +30,7 @@ func TestConsistentAccounting(t *testing.T) { ...@@ -30,7 +30,7 @@ func TestConsistentAccounting(t *testing.T) {
m := message.New() m := message.New()
content := []string{"this", "is", "message", "i"} content := []string{"this", "is", "message", "i"}
m.AddBlock(*blocks.NewBlock([]byte(strings.Join(content, " ")))) m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
sender.MessageSent(receiver.Peer, m) sender.MessageSent(receiver.Peer, m)
receiver.MessageReceived(sender.Peer, m) receiver.MessageReceived(sender.Peer, m)
......
...@@ -33,7 +33,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { ...@@ -33,7 +33,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
// TODO test contents of incoming message // TODO test contents of incoming message
m := bsmsg.New() m := bsmsg.New()
m.AddBlock(*blocks.NewBlock([]byte(expectedStr))) m.AddBlock(blocks.NewBlock([]byte(expectedStr)))
return from, m return from, m
})) }))
...@@ -41,7 +41,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { ...@@ -41,7 +41,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
t.Log("Build a message and send a synchronous request to recipient") t.Log("Build a message and send a synchronous request to recipient")
message := bsmsg.New() message := bsmsg.New()
message.AddBlock(*blocks.NewBlock([]byte("data"))) message.AddBlock(blocks.NewBlock([]byte("data")))
response, err := initiator.SendRequest( response, err := initiator.SendRequest(
context.Background(), peer.WithID(idOfRecipient), message) context.Background(), peer.WithID(idOfRecipient), message)
if err != nil { if err != nil {
...@@ -77,7 +77,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -77,7 +77,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
peer.Peer, bsmsg.BitSwapMessage) { peer.Peer, bsmsg.BitSwapMessage) {
msgToWaiter := bsmsg.New() msgToWaiter := bsmsg.New()
msgToWaiter.AddBlock(*blocks.NewBlock([]byte(expectedStr))) msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
return fromWaiter, msgToWaiter return fromWaiter, msgToWaiter
})) }))
...@@ -105,7 +105,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -105,7 +105,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
})) }))
messageSentAsync := bsmsg.New() messageSentAsync := bsmsg.New()
messageSentAsync.AddBlock(*blocks.NewBlock([]byte("data"))) messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
errSending := waiter.SendMessage( errSending := waiter.SendMessage(
context.Background(), peer.WithID(idOfResponder), messageSentAsync) context.Background(), peer.WithID(idOfResponder), messageSentAsync)
if errSending != nil { if errSending != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment