Unverified Commit a3eae7f1 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #39 from ipfs/features/streaming-ls-5600

feat(Directory): Add EnumLinksAsync method
parents e8af7a6b c54d0e47
...@@ -24,7 +24,6 @@ import ( ...@@ -24,7 +24,6 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"sync"
bitfield "github.com/Stebalien/go-bitfield" bitfield "github.com/Stebalien/go-bitfield"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
...@@ -400,21 +399,16 @@ func (ds *Shard) getValue(ctx context.Context, hv *hashBits, key string, cb func ...@@ -400,21 +399,16 @@ func (ds *Shard) getValue(ctx context.Context, hv *hashBits, key string, cb func
// EnumLinks collects all links in the Shard. // EnumLinks collects all links in the Shard.
func (ds *Shard) EnumLinks(ctx context.Context) ([]*ipld.Link, error) { func (ds *Shard) EnumLinks(ctx context.Context) ([]*ipld.Link, error) {
var links []*ipld.Link var links []*ipld.Link
var setlk sync.Mutex
getLinks := makeAsyncTrieGetLinks(ds.dserv, func(sv *Shard) error { linkResults := ds.EnumLinksAsync(ctx)
lnk := sv.val
lnk.Name = sv.key
setlk.Lock()
links = append(links, lnk)
setlk.Unlock()
return nil
})
cset := cid.NewSet()
err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit) for linkResult := range linkResults {
return links, err if linkResult.Err != nil {
return links, linkResult.Err
}
links = append(links, linkResult.Link)
}
return links, nil
} }
// ForEachLink walks the Shard and calls the given function. // ForEachLink walks the Shard and calls the given function.
...@@ -427,10 +421,28 @@ func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) erro ...@@ -427,10 +421,28 @@ func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) erro
}) })
} }
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
linkResults := make(chan format.LinkResult)
ctx, cancel := context.WithCancel(ctx)
go func() {
defer close(linkResults)
defer cancel()
getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults)
cset := cid.NewSet()
err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit)
if err != nil {
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
}
}()
return linkResults
}
// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync // makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync
// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called // to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called
// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation // on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation
func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(shard *Shard) error) dag.GetLinks { func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks {
return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) { return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) {
node, err := dagService.Get(ctx, currentCid) node, err := dagService.Get(ctx, currentCid)
...@@ -458,16 +470,31 @@ func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(shard * ...@@ -458,16 +470,31 @@ func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(shard *
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = onShardValue(sv) formattedLink := sv.val
if err != nil { formattedLink.Name = sv.key
return nil, err emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil})
}
} }
} }
return childShards, nil return childShards, nil
} }
} }
func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) {
// make sure that context cancel is processed first
// the reason is due to the concurrency of EnumerateChildrenAsync
// it's possible for EnumLinksAsync to complete and close the linkResults
// channel before this code runs
select {
case <-ctx.Done():
return
default:
}
select {
case linkResults <- r:
case <-ctx.Done():
}
}
func (ds *Shard) walkTrie(ctx context.Context, cb func(*Shard) error) error { func (ds *Shard) walkTrie(ctx context.Context, cb func(*Shard) error) error {
for idx := range ds.children { for idx := range ds.children {
c, err := ds.getChild(ctx, idx) c, err := ds.getChild(ctx, idx)
......
...@@ -74,28 +74,7 @@ func assertLink(s *Shard, name string, found bool) error { ...@@ -74,28 +74,7 @@ func assertLink(s *Shard, name string, found bool) error {
} }
} }
func assertSerializationWorks(ds ipld.DAGService, s *Shard) error { func assertLinksEqual(linksA []*ipld.Link, linksB []*ipld.Link) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nd, err := s.Node()
if err != nil {
return err
}
nds, err := NewHamtFromDag(ds, nd)
if err != nil {
return err
}
linksA, err := s.EnumLinks(ctx)
if err != nil {
return err
}
linksB, err := nds.EnumLinks(ctx)
if err != nil {
return err
}
if len(linksA) != len(linksB) { if len(linksA) != len(linksB) {
return fmt.Errorf("links arrays are different sizes") return fmt.Errorf("links arrays are different sizes")
...@@ -121,6 +100,32 @@ func assertSerializationWorks(ds ipld.DAGService, s *Shard) error { ...@@ -121,6 +100,32 @@ func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
return nil return nil
} }
func assertSerializationWorks(ds ipld.DAGService, s *Shard) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nd, err := s.Node()
if err != nil {
return err
}
nds, err := NewHamtFromDag(ds, nd)
if err != nil {
return err
}
linksA, err := s.EnumLinks(ctx)
if err != nil {
return err
}
linksB, err := nds.EnumLinks(ctx)
if err != nil {
return err
}
return assertLinksEqual(linksA, linksB)
}
func TestBasicSet(t *testing.T) { func TestBasicSet(t *testing.T) {
ds := mdtest.Mock() ds := mdtest.Mock()
for _, w := range []int{128, 256, 512, 1024, 2048, 4096} { for _, w := range []int{128, 256, 512, 1024, 2048, 4096} {
...@@ -309,6 +314,46 @@ func TestSetAfterMarshal(t *testing.T) { ...@@ -309,6 +314,46 @@ func TestSetAfterMarshal(t *testing.T) {
} }
} }
func TestEnumLinksAsync(t *testing.T) {
ds := mdtest.Mock()
_, s, err := makeDir(ds, 300)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
nd, err := s.Node()
if err != nil {
t.Fatal(err)
}
nds, err := NewHamtFromDag(ds, nd)
if err != nil {
t.Fatal(err)
}
linksA, err := nds.EnumLinks(ctx)
if err != nil {
t.Fatal(err)
}
linkResults := nds.EnumLinksAsync(ctx)
var linksB []*ipld.Link
for linkResult := range linkResults {
if linkResult.Err != nil {
t.Fatal(linkResult.Err)
}
linksB = append(linksB, linkResult.Link)
}
err = assertLinksEqual(linksA, linksB)
if err != nil {
t.Fatal(err)
}
}
func TestDuplicateAddShard(t *testing.T) { func TestDuplicateAddShard(t *testing.T) {
ds := mdtest.Mock() ds := mdtest.Mock()
dir, _ := NewShard(ds, 256) dir, _ := NewShard(ds, 256)
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"os" "os"
mdag "github.com/ipfs/go-merkledag" mdag "github.com/ipfs/go-merkledag"
format "github.com/ipfs/go-unixfs" format "github.com/ipfs/go-unixfs"
hamt "github.com/ipfs/go-unixfs/hamt" hamt "github.com/ipfs/go-unixfs/hamt"
...@@ -38,6 +39,10 @@ type Directory interface { ...@@ -38,6 +39,10 @@ type Directory interface {
// ForEachLink applies the given function to Links in the directory. // ForEachLink applies the given function to Links in the directory.
ForEachLink(context.Context, func(*ipld.Link) error) error ForEachLink(context.Context, func(*ipld.Link) error) error
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
EnumLinksAsync(context.Context) <-chan format.LinkResult
// Links returns the all the links in the directory node. // Links returns the all the links in the directory node.
Links(context.Context) ([]*ipld.Link, error) Links(context.Context) ([]*ipld.Link, error)
...@@ -141,6 +146,26 @@ func (d *BasicDirectory) AddChild(ctx context.Context, name string, node ipld.No ...@@ -141,6 +146,26 @@ func (d *BasicDirectory) AddChild(ctx context.Context, name string, node ipld.No
return d.node.AddNodeLink(name, node) return d.node.AddNodeLink(name, node)
} }
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (d *BasicDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
linkResults := make(chan format.LinkResult)
go func() {
defer close(linkResults)
for _, l := range d.node.Links() {
select {
case linkResults <- format.LinkResult{
Link: l,
Err: nil,
}:
case <-ctx.Done():
return
}
}
}()
return linkResults
}
// ForEachLink implements the `Directory` interface. // ForEachLink implements the `Directory` interface.
func (d *BasicDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error { func (d *BasicDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
for _, l := range d.node.Links() { for _, l := range d.node.Links() {
...@@ -226,6 +251,12 @@ func (d *HAMTDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) erro ...@@ -226,6 +251,12 @@ func (d *HAMTDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) erro
return d.shard.ForEachLink(ctx, f) return d.shard.ForEachLink(ctx, f)
} }
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (d *HAMTDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
return d.shard.EnumLinksAsync(ctx)
}
// Links implements the `Directory` interface. // Links implements the `Directory` interface.
func (d *HAMTDirectory) Links(ctx context.Context) ([]*ipld.Link, error) { func (d *HAMTDirectory) Links(ctx context.Context) ([]*ipld.Link, error) {
return d.shard.EnumLinks(ctx) return d.shard.EnumLinks(ctx)
......
...@@ -5,7 +5,9 @@ import ( ...@@ -5,7 +5,9 @@ import (
"fmt" "fmt"
"testing" "testing"
ipld "github.com/ipfs/go-ipld-format"
mdtest "github.com/ipfs/go-merkledag/test" mdtest "github.com/ipfs/go-merkledag/test"
ft "github.com/ipfs/go-unixfs" ft "github.com/ipfs/go-unixfs"
) )
...@@ -155,4 +157,28 @@ func TestDirBuilder(t *testing.T) { ...@@ -155,4 +157,28 @@ func TestDirBuilder(t *testing.T) {
if len(links) != count { if len(links) != count {
t.Fatal("wrong number of links", len(links), count) t.Fatal("wrong number of links", len(links), count)
} }
linkResults := dir.EnumLinksAsync(ctx)
asyncNames := make(map[string]bool)
var asyncLinks []*ipld.Link
for linkResult := range linkResults {
if linkResult.Err != nil {
t.Fatal(linkResult.Err)
}
asyncNames[linkResult.Link.Name] = true
asyncLinks = append(asyncLinks, linkResult.Link)
}
for i := 0; i < count; i++ {
n := fmt.Sprintf("entry %d", i)
if !asyncNames[n] {
t.Fatal("COULDNT FIND: ", n)
}
}
if len(asyncLinks) != count {
t.Fatal("wrong number of links", len(asyncLinks), count)
}
} }
...@@ -9,9 +9,18 @@ import ( ...@@ -9,9 +9,18 @@ import (
proto "github.com/gogo/protobuf/proto" proto "github.com/gogo/protobuf/proto"
dag "github.com/ipfs/go-merkledag" dag "github.com/ipfs/go-merkledag"
ipld "github.com/ipfs/go-ipld-format"
pb "github.com/ipfs/go-unixfs/pb" pb "github.com/ipfs/go-unixfs/pb"
) )
// A LinkResult for any parallel enumeration of links
// TODO: Should this live in go-ipld-format?
type LinkResult struct {
Link *ipld.Link
Err error
}
// Shorthands for protobuffer types // Shorthands for protobuffer types
const ( const (
TRaw = pb.Data_Raw TRaw = pb.Data_Raw
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment