Commit c0311d7e authored by Steven Allen's avatar Steven Allen

NodePromise: replace interface with concrete type

Also:

1. Specify the threading guarantees.
2. Vastly simplify it to use a single channel for synchronization.
parent 5837bec5
......@@ -21,7 +21,7 @@ func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func GetDAG(ctx context.Context, ds DAGService, root Node) []NodePromise {
func GetDAG(ctx context.Context, ds DAGService, root Node) []*NodePromise {
var cids []*cid.Cid
for _, lnk := range root.Links() {
cids = append(cids, lnk.Cid)
......@@ -32,16 +32,16 @@ func GetDAG(ctx context.Context, ds DAGService, root Node) []NodePromise {
// GetNodes returns an array of 'FutureNode' promises, with each corresponding
// to the key with the same index as the passed in keys
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodePromise {
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []*NodePromise {
// Early out if no work to do
if len(keys) == 0 {
return nil
}
promises := make([]NodePromise, len(keys))
promises := make([]*NodePromise, len(keys))
for i := range keys {
promises[i] = newNodePromise(ctx)
promises[i] = NewNodePromise(ctx)
}
dedupedKeys := dedupeKeys(keys)
......
......@@ -2,87 +2,65 @@ package format
import (
"context"
"sync"
)
// TODO: I renamed this to NodePromise because:
// 1. NodeGetter is a naming conflict.
// 2. It's a promise...
// TODO: Should this even be an interface? It seems like a simple struct would
// suffice.
// NodePromise provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
type NodePromise interface {
Get(context.Context) (Node, error)
Fail(err error)
Send(Node)
}
func newNodePromise(ctx context.Context) NodePromise {
return &nodePromise{
recv: make(chan Node, 1),
//
// Thread Safety: This is multiple-consumer/single-producer safe.
func NewNodePromise(ctx context.Context) *NodePromise {
return &NodePromise{
done: make(chan struct{}),
ctx: ctx,
err: make(chan error, 1),
}
}
type nodePromise struct {
cache Node
clk sync.Mutex
recv chan Node
type NodePromise struct {
value Node
err error
done chan struct{}
ctx context.Context
err chan error
}
func (np *nodePromise) Fail(err error) {
np.clk.Lock()
v := np.cache
np.clk.Unlock()
// if promise has a value, don't fail it
if v != nil {
// Call this function to fail a promise.
//
// Once a promise has been failed or fulfilled, further attempts to fail it will
// be silently dropped.
func (np *NodePromise) Fail(err error) {
if np.err != nil || np.value != nil {
// Already filled.
return
}
np.err <- err
np.err = err
close(np.done)
}
func (np *nodePromise) Send(nd Node) {
var already bool
np.clk.Lock()
if np.cache != nil {
already = true
}
np.cache = nd
np.clk.Unlock()
if already {
panic("sending twice to the same promise is an error!")
// Fulfill this promise.
//
// Once a promise has been fulfilled or failed, calling this function will
// panic.
func (np *NodePromise) Send(nd Node) {
// if promise has a value, don't fail it
if np.err != nil || np.value != nil {
panic("already filled")
}
np.recv <- nd
np.value = nd
close(np.done)
}
func (np *nodePromise) Get(ctx context.Context) (Node, error) {
np.clk.Lock()
c := np.cache
np.clk.Unlock()
if c != nil {
return c, nil
}
// Get the value of this promise.
//
// This function is safe to call concurrently from any number of goroutines.
func (np *NodePromise) Get(ctx context.Context) (Node, error) {
select {
case nd := <-np.recv:
return nd, nil
case <-np.done:
return np.value, np.err
case <-np.ctx.Done():
return nil, np.ctx.Err()
case <-ctx.Done():
return nil, ctx.Err()
case err := <-np.err:
return nil, err
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment