Commit 87012196 authored by hannahhoward's avatar hannahhoward

Add context cancelling logic

parent 0e51ad49
......@@ -428,8 +428,10 @@ func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) erro
// as they are enumerated, where order is not gauranteed
func (ds *Shard) EnumLinksAsync(ctx context.Context) (<-chan format.LinkResult, error) {
linkResults := make(chan format.LinkResult)
ctx, cancel := context.WithCancel(ctx)
go func() {
defer close(linkResults)
defer cancel()
getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults)
cset := cid.NewSet()
dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit)
......@@ -445,12 +447,12 @@ func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format
return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) {
node, err := dagService.Get(ctx, currentCid)
if err != nil {
linkResults <- format.LinkResult{Link: nil, Err: err}
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
return nil, err
}
directoryShard, err := NewHamtFromDag(dagService, node)
if err != nil {
linkResults <- format.LinkResult{Link: nil, Err: err}
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
return nil, err
}
......@@ -461,19 +463,26 @@ func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format
lnkLinkType, err := directoryShard.childLinkType(lnk)
if err != nil {
linkResults <- format.LinkResult{Link: nil, Err: err}
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
return nil, err
}
if lnkLinkType == shardLink {
childShards = append(childShards, lnk)
} else {
linkResults <- format.LinkResult{Link: lnk, Err: nil}
emitResult(ctx, linkResults, format.LinkResult{Link: lnk, Err: nil})
}
}
return childShards, nil
}
}
func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) {
select {
case linkResults <- r:
case <-ctx.Done():
}
}
func (ds *Shard) walkTrie(ctx context.Context, cb func(*Shard) error) error {
for idx := range ds.children {
c, err := ds.getChild(ctx, idx)
......
......@@ -153,9 +153,13 @@ func (d *BasicDirectory) EnumLinksAsync(ctx context.Context) (<-chan format.Link
go func() {
defer close(linkResults)
for _, l := range d.node.Links() {
linkResults <- format.LinkResult{
select {
case linkResults <- format.LinkResult{
Link: l,
Err: nil,
}:
case <-ctx.Done():
return
}
}
}()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment