Commit 567b71a7 authored by hannahhoward's avatar hannahhoward

feat(unverifiedblockstore): create unverified block store

Create an in memory object for storing blocks received from the server temporarily
parent 783a69fb
...@@ -13,6 +13,12 @@ import ( ...@@ -13,6 +13,12 @@ import (
// Loader is an alias from ipld, in case it's renamed/moved. // Loader is an alias from ipld, in case it's renamed/moved.
type Loader = ipld.Loader type Loader = ipld.Loader
// Storer is an alias from ipld, in case it's renamed/moved.
type Storer = ipld.Storer
// StoreCommitter is an alias from ipld, in case it's renamed/moved.
type StoreCommitter = ipld.StoreCommitter
// AdvVisitFn is an alias from ipld, in case it's renamed/moved. // AdvVisitFn is an alias from ipld, in case it's renamed/moved.
type AdvVisitFn = ipldtraversal.AdvVisitFn type AdvVisitFn = ipldtraversal.AdvVisitFn
......
package unverifiedblockstore
import (
"fmt"
"github.com/ipfs/go-graphsync/ipldbridge"
ipld "github.com/ipld/go-ipld-prime"
)
// UnverifiedBlockStore holds an in memory cache of receied blocks from the network
// that have not been verified to be part of a traversal
type UnverifiedBlockStore struct {
inMemoryBlocks map[ipld.Link][]byte
storer ipldbridge.Storer
}
// New initializes a new unverified store with the given storer function for writing
// to permaneant storage if the block is verified
func New(storer ipldbridge.Storer) *UnverifiedBlockStore {
return &UnverifiedBlockStore{
inMemoryBlocks: make(map[ipld.Link][]byte),
storer: storer,
}
}
// AddUnverifiedBlock adds a new unverified block to the in memory cache as it
// comes in as part of a traversal.
func (ubs *UnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []byte) {
ubs.inMemoryBlocks[lnk] = data
}
// PruneBlocks removes blocks from the unverified store without committing them,
// if the passed in function returns true for the given link
func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) bool) {
for link := range ubs.inMemoryBlocks {
if shouldPrune(link) {
delete(ubs.inMemoryBlocks, link)
}
}
}
// VerifyBlock verifies the data for the given link as being part of a traversal,
// removes it from the unverified store, and writes it to permaneant storage.
func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link) ([]byte, error) {
data, ok := ubs.inMemoryBlocks[lnk]
if !ok {
return nil, fmt.Errorf("Block not found")
}
delete(ubs.inMemoryBlocks, lnk)
buffer, committer, err := ubs.storer(ipldbridge.LinkContext{})
if err != nil {
return nil, err
}
_, err = buffer.Write(data)
if err != nil {
return nil, err
}
err = committer(lnk)
if err != nil {
return nil, err
}
return data, nil
}
package unverifiedblockstore
import (
"bytes"
"io"
"reflect"
"testing"
"github.com/ipfs/go-graphsync/ipldbridge"
"github.com/ipld/go-ipld-prime"
"github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-ipld-prime/linking/cid"
)
func TestVerifyBlockPresent(t *testing.T) {
blocksWritten := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blocksWritten)
unverifiedBlockStore := New(storer)
block := testutil.GenerateBlocksOfSize(1, 100)[0]
reader, err := loader(cidlink.Link{Cid: block.Cid()}, ipldbridge.LinkContext{})
if reader != nil || err == nil {
t.Fatal("block should not be loadable till it's verified and stored")
}
data, err := unverifiedBlockStore.VerifyBlock(cidlink.Link{Cid: block.Cid()})
if data != nil || err == nil {
t.Fatal("block should not be verifiable till it's added as an unverifiable block")
}
unverifiedBlockStore.AddUnverifiedBlock(cidlink.Link{Cid: block.Cid()}, block.RawData())
reader, err = loader(cidlink.Link{Cid: block.Cid()}, ipldbridge.LinkContext{})
if reader != nil || err == nil {
t.Fatal("block should not be loadable till it's verified and stored")
}
data, err = unverifiedBlockStore.VerifyBlock(cidlink.Link{Cid: block.Cid()})
if !reflect.DeepEqual(data, block.RawData()) || err != nil {
t.Fatal("block should be returned on verification if added")
}
reader, err = loader(cidlink.Link{Cid: block.Cid()}, ipldbridge.LinkContext{})
var buffer bytes.Buffer
io.Copy(&buffer, reader)
if !reflect.DeepEqual(buffer.Bytes(), block.RawData()) || err != nil {
t.Fatal("block should be stored after verification and therefore loadable")
}
data, err = unverifiedBlockStore.VerifyBlock(cidlink.Link{Cid: block.Cid()})
if data != nil || err == nil {
t.Fatal("block cannot be verified twice")
}
}
package testbridge
import (
"bytes"
"fmt"
"io"
"sync"
"github.com/ipfs/go-graphsync/ipldbridge"
ipld "github.com/ipld/go-ipld-prime"
)
// NewMockStore provides a loader and storer for the given in memory link -> byte data map
func NewMockStore(blocksWritten map[ipld.Link][]byte) (ipldbridge.Loader, ipldbridge.Storer) {
var storeLk sync.RWMutex
storer := func(lnkCtx ipldbridge.LinkContext) (io.Writer, ipldbridge.StoreCommitter, error) {
var buffer bytes.Buffer
committer := func(lnk ipld.Link) error {
storeLk.Lock()
blocksWritten[lnk] = buffer.Bytes()
storeLk.Unlock()
return nil
}
return &buffer, committer, nil
}
loader := func(lnk ipld.Link, lnkCtx ipldbridge.LinkContext) (io.Reader, error) {
storeLk.RLock()
data, ok := blocksWritten[lnk]
storeLk.RUnlock()
if ok {
return bytes.NewReader(data), nil
}
return nil, fmt.Errorf("unable to load block")
}
return loader, storer
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment