Commit 78f0f5b0 authored by Brian Tiger Chow's avatar Brian Tiger Chow

refac(bs:msg) let msg.Blocks() return []blocks

discard erroneous values

wherever blocks cannot be nil, use value rather than pointer. only use
pointers when absolutely necessary.
parent a3487eb4
......@@ -93,7 +93,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
tleft := timeout - time.Now().Sub(begin)
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
blockChannel := make(chan *blocks.Block)
blockChannel := make(chan blocks.Block)
after := time.After(tleft)
// TODO: when the data is received, shut down this for loop ASAP
......@@ -106,7 +106,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
return
}
select {
case blockChannel <- blk:
case blockChannel <- *blk:
default:
}
}(p)
......@@ -116,7 +116,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
select {
case block := <-blockChannel:
close(blockChannel)
return block, nil
return &block, nil
case <-after:
return nil, u.ErrTimeout
}
......@@ -137,7 +137,7 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*bloc
u.PErr("getBlock for '%s' timed out.\n", k.Pretty())
return nil, u.ErrTimeout
}
return block, nil
return &block, nil
}
// HaveBlock announces the existance of a block to BitSwap, potentially sending
......@@ -173,12 +173,7 @@ func (bs *BitSwap) handleMessages() {
}
if bsmsg.Blocks() != nil {
for _, blkData := range bsmsg.Blocks() {
blk, err := blocks.NewBlock(blkData)
if err != nil {
u.PErr("%v\n", err)
continue
}
for _, blk := range bsmsg.Blocks() {
go bs.blockReceive(mes.Peer, blk)
}
}
......@@ -231,7 +226,7 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) {
}
}
func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) {
func (bs *BitSwap) blockReceive(p *peer.Peer, blk blocks.Block) {
u.DOut("blockReceive: %s\n", blk.Key().Pretty())
err := bs.datastore.Put(ds.NewKey(string(blk.Key())), blk.Data)
if err != nil {
......@@ -286,5 +281,16 @@ func (bs *BitSwap) SetStrategy(sf StrategyFunc) {
func (bs *BitSwap) ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
bsmsg.BitSwapMessage, *peer.Peer, error) {
if incoming.Blocks() != nil {
for _, block := range incoming.Blocks() {
go bs.blockReceive(sender, block)
}
}
if incoming.Wantlist() != nil {
for _, want := range incoming.Wantlist() {
go bs.peerWantsBlock(sender, want)
}
}
return nil, nil, errors.New("TODO implement")
}
......@@ -15,7 +15,7 @@ import (
type BitSwapMessage interface {
Wantlist() []string
Blocks() [][]byte
Blocks() []blocks.Block
AppendWanted(k u.Key)
AppendBlock(b *blocks.Block)
Exportable
......@@ -46,8 +46,16 @@ func (m *message) Wantlist() []string {
}
// TODO(brian): convert these into blocks
func (m *message) Blocks() [][]byte {
return m.pb.Blocks
func (m *message) Blocks() []blocks.Block {
bs := make([]blocks.Block, len(m.pb.Blocks))
for _, data := range m.pb.Blocks {
b, err := blocks.NewBlock(data)
if err != nil {
continue
}
bs = append(bs, *b)
}
return bs
}
func (m *message) AppendWanted(k u.Key) {
......
......@@ -9,8 +9,8 @@ import (
)
type PubSub interface {
Publish(block *blocks.Block)
Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block
Publish(block blocks.Block)
Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block
Shutdown()
}
......@@ -23,7 +23,7 @@ type impl struct {
wrapped pubsub.PubSub
}
func (ps *impl) Publish(block *blocks.Block) {
func (ps *impl) Publish(block blocks.Block) {
topic := string(block.Key())
ps.wrapped.Pub(block, topic)
}
......@@ -31,15 +31,15 @@ func (ps *impl) Publish(block *blocks.Block) {
// Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil
// if the |ctx| times out or is cancelled. Then channel is closed after the
// block given by |k| is sent.
func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan *blocks.Block {
func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block {
topic := string(k)
subChan := ps.wrapped.SubOnce(topic)
blockChannel := make(chan *blocks.Block)
blockChannel := make(chan blocks.Block)
go func() {
defer close(blockChannel)
select {
case val := <-subChan:
block, ok := val.(*blocks.Block)
block, ok := val.(blocks.Block)
if ok {
blockChannel <- block
}
......
......@@ -34,19 +34,20 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {
n := New()
defer n.Shutdown()
blockChannel := n.Subscribe(fastExpiringCtx, getBlockOrFail(t, "A Missed Connection").Key())
block := getBlockOrFail(t, "A Missed Connection")
blockChannel := n.Subscribe(fastExpiringCtx, block.Key())
assertBlockChannelNil(t, blockChannel)
}
func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) {
blockReceived := <-blockChannel
if blockReceived != nil {
func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) {
_, ok := <-blockChannel
if ok {
t.Fail()
}
}
func assertBlocksEqual(t *testing.T, a, b *blocks.Block) {
func assertBlocksEqual(t *testing.T, a, b blocks.Block) {
if !bytes.Equal(a.Data, b.Data) {
t.Fail()
}
......@@ -55,10 +56,10 @@ func assertBlocksEqual(t *testing.T, a, b *blocks.Block) {
}
}
func getBlockOrFail(t *testing.T, msg string) *blocks.Block {
func getBlockOrFail(t *testing.T, msg string) blocks.Block {
block, blockCreationErr := blocks.NewBlock([]byte(msg))
if blockCreationErr != nil {
t.Fail()
}
return block
return *block
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment