Commit aa8e4cd7 authored by Kevin Atkinson's avatar Kevin Atkinson

Make blocks.Block an interface.

License: MIT
Signed-off-by: default avatarKevin Atkinson <k@kevina.org>
parent f4350456
...@@ -90,7 +90,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, ...@@ -90,7 +90,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
network: network, network: network,
findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan), findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan),
process: px, process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize), newBlocks: make(chan blocks.Block, HasBlockBufferSize),
provideKeys: make(chan key.Key, provideKeysBufferSize), provideKeys: make(chan key.Key, provideKeysBufferSize),
wm: NewWantManager(ctx, network), wm: NewWantManager(ctx, network),
} }
...@@ -137,7 +137,7 @@ type Bitswap struct { ...@@ -137,7 +137,7 @@ type Bitswap struct {
process process.Process process process.Process
newBlocks chan *blocks.Block newBlocks chan blocks.Block
provideKeys chan key.Key provideKeys chan key.Key
...@@ -154,7 +154,7 @@ type blockRequest struct { ...@@ -154,7 +154,7 @@ type blockRequest struct {
// GetBlock attempts to retrieve a particular block from peers within the // GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context. // deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, error) { func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) {
// Any async work initiated by this function must end when this function // Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to // returns. To ensure this, derive a new context. Note that it is okay to
...@@ -209,9 +209,9 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key { ...@@ -209,9 +209,9 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
// NB: Your request remains open until the context expires. To conserve // NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one // resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server) // that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) { func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) {
if len(keys) == 0 { if len(keys) == 0 {
out := make(chan *blocks.Block) out := make(chan blocks.Block)
close(out) close(out)
return out, nil return out, nil
} }
...@@ -251,7 +251,7 @@ func (bs *Bitswap) CancelWants(ks []key.Key) { ...@@ -251,7 +251,7 @@ func (bs *Bitswap) CancelWants(ks []key.Key) {
// HasBlock announces the existance of a block to this bitswap service. The // HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers. // service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk *blocks.Block) error { func (bs *Bitswap) HasBlock(blk blocks.Block) error {
select { select {
case <-bs.process.Closing(): case <-bs.process.Closing():
return errors.New("bitswap is closed") return errors.New("bitswap is closed")
...@@ -277,7 +277,7 @@ func (bs *Bitswap) HasBlock(blk *blocks.Block) error { ...@@ -277,7 +277,7 @@ func (bs *Bitswap) HasBlock(blk *blocks.Block) error {
return nil return nil
} }
func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error { func (bs *Bitswap) tryPutBlock(blk blocks.Block, attempts int) error {
var err error var err error
for i := 0; i < attempts; i++ { for i := 0; i < attempts; i++ {
if err = bs.blockstore.Put(blk); err == nil { if err = bs.blockstore.Put(blk); err == nil {
...@@ -316,7 +316,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -316,7 +316,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, block := range iblocks { for _, block := range iblocks {
wg.Add(1) wg.Add(1)
go func(b *blocks.Block) { go func(b blocks.Block) {
defer wg.Done() defer wg.Done()
if err := bs.updateReceiveCounters(b); err != nil { if err := bs.updateReceiveCounters(b); err != nil {
...@@ -337,7 +337,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -337,7 +337,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
var ErrAlreadyHaveBlock = errors.New("already have block") var ErrAlreadyHaveBlock = errors.New("already have block")
func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error { func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
bs.counterLk.Lock() bs.counterLk.Lock()
defer bs.counterLk.Unlock() defer bs.counterLk.Unlock()
bs.blocksRecvd++ bs.blocksRecvd++
...@@ -348,7 +348,7 @@ func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error { ...@@ -348,7 +348,7 @@ func (bs *Bitswap) updateReceiveCounters(b *blocks.Block) error {
} }
if err == nil && has { if err == nil && has {
bs.dupBlocksRecvd++ bs.dupBlocksRecvd++
bs.dupDataRecvd += uint64(len(b.Data)) bs.dupDataRecvd += uint64(len(b.Data()))
} }
if has { if has {
......
...@@ -85,7 +85,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { ...@@ -85,7 +85,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
t.Fatal("Expected to succeed") t.Fatal("Expected to succeed")
} }
if !bytes.Equal(block.Data, received.Data) { if !bytes.Equal(block.Data(), received.Data()) {
t.Fatal("Data doesn't match") t.Fatal("Data doesn't match")
} }
} }
...@@ -218,7 +218,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -218,7 +218,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
} }
} }
func getOrFail(bitswap Instance, b *blocks.Block, t *testing.T, wg *sync.WaitGroup) { func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) {
if _, err := bitswap.Blockstore().Get(b.Key()); err != nil { if _, err := bitswap.Blockstore().Get(b.Key()); err != nil {
_, err := bitswap.Exchange.GetBlock(context.Background(), b.Key()) _, err := bitswap.Exchange.GetBlock(context.Background(), b.Key())
if err != nil { if err != nil {
......
...@@ -58,7 +58,7 @@ type Envelope struct { ...@@ -58,7 +58,7 @@ type Envelope struct {
Peer peer.ID Peer peer.ID
// Block is the payload // Block is the payload
Block *blocks.Block Block blocks.Block
// A callback to notify the decision queue that the task is complete // A callback to notify the decision queue that the task is complete
Sent func() Sent func()
...@@ -226,13 +226,13 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { ...@@ -226,13 +226,13 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
} }
for _, block := range m.Blocks() { for _, block := range m.Blocks() {
log.Debugf("got block %s %d bytes", block.Key(), len(block.Data)) log.Debugf("got block %s %d bytes", block.Key(), len(block.Data()))
l.ReceivedBytes(len(block.Data)) l.ReceivedBytes(len(block.Data()))
} }
return nil return nil
} }
func (e *Engine) addBlock(block *blocks.Block) { func (e *Engine) addBlock(block blocks.Block) {
work := false work := false
for _, l := range e.ledgerMap { for _, l := range e.ledgerMap {
...@@ -247,7 +247,7 @@ func (e *Engine) addBlock(block *blocks.Block) { ...@@ -247,7 +247,7 @@ func (e *Engine) addBlock(block *blocks.Block) {
} }
} }
func (e *Engine) AddBlock(block *blocks.Block) { func (e *Engine) AddBlock(block blocks.Block) {
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
...@@ -266,7 +266,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error { ...@@ -266,7 +266,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
l := e.findOrCreate(p) l := e.findOrCreate(p)
for _, block := range m.Blocks() { for _, block := range m.Blocks() {
l.SentBytes(len(block.Data)) l.SentBytes(len(block.Data()))
l.wantList.Remove(block.Key()) l.wantList.Remove(block.Key())
e.peerRequestQueue.Remove(block.Key(), p) e.peerRequestQueue.Remove(block.Key(), p)
} }
......
...@@ -188,7 +188,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error { ...@@ -188,7 +188,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
received := envelope.Block received := envelope.Block
expected := blocks.NewBlock([]byte(k)) expected := blocks.NewBlock([]byte(k))
if received.Key() != expected.Key() { if received.Key() != expected.Key() {
return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data))) return errors.New(fmt.Sprintln("received", string(received.Data()), "expected", string(expected.Data())))
} }
} }
return nil return nil
......
...@@ -22,7 +22,7 @@ type BitSwapMessage interface { ...@@ -22,7 +22,7 @@ type BitSwapMessage interface {
Wantlist() []Entry Wantlist() []Entry
// Blocks returns a slice of unique blocks // Blocks returns a slice of unique blocks
Blocks() []*blocks.Block Blocks() []blocks.Block
// AddEntry adds an entry to the Wantlist. // AddEntry adds an entry to the Wantlist.
AddEntry(key key.Key, priority int) AddEntry(key key.Key, priority int)
...@@ -34,7 +34,7 @@ type BitSwapMessage interface { ...@@ -34,7 +34,7 @@ type BitSwapMessage interface {
// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set // A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Full() bool Full() bool
AddBlock(*blocks.Block) AddBlock(blocks.Block)
Exportable Exportable
Loggable() map[string]interface{} Loggable() map[string]interface{}
...@@ -48,7 +48,7 @@ type Exportable interface { ...@@ -48,7 +48,7 @@ type Exportable interface {
type impl struct { type impl struct {
full bool full bool
wantlist map[key.Key]Entry wantlist map[key.Key]Entry
blocks map[key.Key]*blocks.Block blocks map[key.Key]blocks.Block
} }
func New(full bool) BitSwapMessage { func New(full bool) BitSwapMessage {
...@@ -57,7 +57,7 @@ func New(full bool) BitSwapMessage { ...@@ -57,7 +57,7 @@ func New(full bool) BitSwapMessage {
func newMsg(full bool) *impl { func newMsg(full bool) *impl {
return &impl{ return &impl{
blocks: make(map[key.Key]*blocks.Block), blocks: make(map[key.Key]blocks.Block),
wantlist: make(map[key.Key]Entry), wantlist: make(map[key.Key]Entry),
full: full, full: full,
} }
...@@ -96,8 +96,8 @@ func (m *impl) Wantlist() []Entry { ...@@ -96,8 +96,8 @@ func (m *impl) Wantlist() []Entry {
return out return out
} }
func (m *impl) Blocks() []*blocks.Block { func (m *impl) Blocks() []blocks.Block {
bs := make([]*blocks.Block, 0, len(m.blocks)) bs := make([]blocks.Block, 0, len(m.blocks))
for _, block := range m.blocks { for _, block := range m.blocks {
bs = append(bs, block) bs = append(bs, block)
} }
...@@ -129,7 +129,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) { ...@@ -129,7 +129,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
} }
} }
func (m *impl) AddBlock(b *blocks.Block) { func (m *impl) AddBlock(b blocks.Block) {
m.blocks[b.Key()] = b m.blocks[b.Key()] = b
} }
...@@ -156,7 +156,7 @@ func (m *impl) ToProto() *pb.Message { ...@@ -156,7 +156,7 @@ func (m *impl) ToProto() *pb.Message {
}) })
} }
for _, b := range m.Blocks() { for _, b := range m.Blocks() {
pbm.Blocks = append(pbm.Blocks, b.Data) pbm.Blocks = append(pbm.Blocks, b.Data())
} }
return pbm return pbm
} }
......
...@@ -10,8 +10,8 @@ import ( ...@@ -10,8 +10,8 @@ import (
const bufferSize = 16 const bufferSize = 16
type PubSub interface { type PubSub interface {
Publish(block *blocks.Block) Publish(block blocks.Block)
Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block
Shutdown() Shutdown()
} }
...@@ -23,7 +23,7 @@ type impl struct { ...@@ -23,7 +23,7 @@ type impl struct {
wrapped pubsub.PubSub wrapped pubsub.PubSub
} }
func (ps *impl) Publish(block *blocks.Block) { func (ps *impl) Publish(block blocks.Block) {
topic := string(block.Key()) topic := string(block.Key())
ps.wrapped.Pub(block, topic) ps.wrapped.Pub(block, topic)
} }
...@@ -35,9 +35,9 @@ func (ps *impl) Shutdown() { ...@@ -35,9 +35,9 @@ func (ps *impl) Shutdown() {
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel| // Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys) // is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks. // blocks.
func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block { func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block {
blocksCh := make(chan *blocks.Block, len(keys)) blocksCh := make(chan blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
if len(keys) == 0 { if len(keys) == 0 {
close(blocksCh) close(blocksCh)
...@@ -55,7 +55,7 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.B ...@@ -55,7 +55,7 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.B
if !ok { if !ok {
return return
} }
block, ok := val.(*blocks.Block) block, ok := val.(blocks.Block)
if !ok { if !ok {
return return
} }
......
...@@ -151,15 +151,15 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) { ...@@ -151,15 +151,15 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
t.Log("publishing the large number of blocks to the ignored channel must not deadlock") t.Log("publishing the large number of blocks to the ignored channel must not deadlock")
} }
func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) { func assertBlockChannelNil(t *testing.T, blockChannel <-chan blocks.Block) {
_, ok := <-blockChannel _, ok := <-blockChannel
if ok { if ok {
t.Fail() t.Fail()
} }
} }
func assertBlocksEqual(t *testing.T, a, b *blocks.Block) { func assertBlocksEqual(t *testing.T, a, b blocks.Block) {
if !bytes.Equal(a.Data, b.Data) { if !bytes.Equal(a.Data(), b.Data()) {
t.Fatal("blocks aren't equal") t.Fatal("blocks aren't equal")
} }
if a.Key() != b.Key() { if a.Key() != b.Key() {
......
...@@ -44,7 +44,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -44,7 +44,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
// TODO assert that this came from the correct peer and that the message contents are as expected // TODO assert that this came from the correct peer and that the message contents are as expected
ok := false ok := false
for _, b := range msgFromResponder.Blocks() { for _, b := range msgFromResponder.Blocks() {
if string(b.Data) == expectedStr { if string(b.Data()) == expectedStr {
wg.Done() wg.Done()
ok = true ok = true
} }
......
...@@ -61,7 +61,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { ...@@ -61,7 +61,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{ log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableMap{
"ID": id, "ID": id,
"Target": envelope.Peer.Pretty(), "Target": envelope.Peer.Pretty(),
"Block": envelope.Block.Multihash.B58String(), "Block": envelope.Block.Multihash().B58String(),
}) })
bs.wm.SendBlock(ctx, envelope) bs.wm.SendBlock(ctx, envelope)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment