merkledag.go 6.26 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
Jeromy's avatar
Jeromy committed
6
	"sync"
Jeromy's avatar
Jeromy committed
7 8
	"time"

9
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
10
	blocks "github.com/jbenet/go-ipfs/blocks"
11
	bserv "github.com/jbenet/go-ipfs/blockservice"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
var log = u.Logger("merkledag")
16
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
17

Jeromy's avatar
Jeromy committed
18 19 20 21 22 23
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
	Add(*Node) (u.Key, error)
	AddRecursive(*Node) error
	Get(u.Key) (*Node, error)
	Remove(*Node) error
24 25 26

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
27 28
	GetDAG(context.Context, *Node) []NodeGetter
	GetNodes(context.Context, []u.Key) []NodeGetter
Jeromy's avatar
Jeromy committed
29 30 31 32 33 34
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

35
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
36 37
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
38 39
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
40
type dagService struct {
41
	Blocks *bserv.BlockService
42 43
}

44 45
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(nd *Node) (u.Key, error) {
46
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
47
		return "", fmt.Errorf("dagService is nil")
48 49 50 51 52 53 54
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

55 56 57
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
58 59 60 61 62 63 64
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

Jeromy's avatar
Jeromy committed
65
// AddRecursive adds the given node and all child nodes to the BlockService
66
func (n *dagService) AddRecursive(nd *Node) error {
67 68
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
69
		log.Info("AddRecursive Error: %s\n", err)
70 71 72 73
		return err
	}

	for _, link := range nd.Links {
74 75 76 77 78
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
79 80 81 82 83 84
		}
	}

	return nil
}

85 86
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(k u.Key) (*Node, error) {
87
	if n == nil {
88
		return nil, fmt.Errorf("dagService is nil")
89 90
	}

91 92
	ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
	defer cancel()
93 94 95 96
	// we shouldn't use an arbitrary timeout here.
	// since Get doesnt take in a context yet, we give a large upper bound.
	// think of an http request. we want it to go on as long as the client requests it.

Jeromy's avatar
Jeromy committed
97
	b, err := n.Blocks.GetBlock(ctx, k)
98 99 100 101 102 103
	if err != nil {
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
104

Jeromy's avatar
Jeromy committed
105
// Remove deletes the given node and all of its children from the BlockService
106
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
107 108 109 110 111 112 113 114 115 116 117
	for _, l := range nd.Links {
		if l.Node != nil {
			n.Remove(l.Node)
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
118

Jeromy's avatar
Jeromy committed
119 120
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
Jeromy's avatar
Jeromy committed
121
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
122
	log.Warning("Untested.")
Jeromy's avatar
Jeromy committed
123 124 125
	var wg sync.WaitGroup
	done := make(chan struct{})

Jeromy's avatar
Jeromy committed
126
	for _, l := range root.Links {
Jeromy's avatar
Jeromy committed
127
		wg.Add(1)
Jeromy's avatar
Jeromy committed
128
		go func(lnk *Link) {
Jeromy's avatar
Jeromy committed
129 130 131

			// Signal child is done on way out
			defer wg.Done()
Jeromy's avatar
Jeromy committed
132 133 134 135 136 137 138
			select {
			case <-ctx.Done():
				return
			}

			nd, err := lnk.GetNode(serv)
			if err != nil {
139
				log.Debug(err)
Jeromy's avatar
Jeromy committed
140 141
				return
			}
Jeromy's avatar
Jeromy committed
142 143 144

			// Wait for children to finish
			<-FetchGraph(ctx, nd, serv)
Jeromy's avatar
Jeromy committed
145 146
		}(l)
	}
Jeromy's avatar
Jeromy committed
147 148 149 150 151 152 153

	go func() {
		wg.Wait()
		done <- struct{}{}
	}()

	return done
Jeromy's avatar
Jeromy committed
154
}
155

Jeromy's avatar
Jeromy committed
156 157
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
158
func FindLinks(links []u.Key, k u.Key, start int) []int {
Jeromy's avatar
Jeromy committed
159
	var out []int
Jeromy's avatar
Jeromy committed
160 161
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
162
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
163 164
		}
	}
Jeromy's avatar
Jeromy committed
165
	return out
Jeromy's avatar
Jeromy committed
166 167
}

168
// GetDAG will fill out all of the links of the given Node.
169 170
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
171
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
172 173 174 175 176 177 178 179
	var keys []u.Key
	for _, lnk := range root.Links {
		keys = append(keys, u.Key(lnk.Hash))
	}

	return ds.GetNodes(ctx, keys)
}

Jeromy's avatar
Jeromy committed
180 181
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
Jeromy's avatar
Jeromy committed
182
func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) []NodeGetter {
183 184 185 186 187 188

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
189 190 191 192 193 194
	promises := make([]NodeGetter, len(keys))
	sendChans := make([]chan<- *Node, len(keys))
	for i, _ := range keys {
		promises[i], sendChans[i] = newNodePromise(ctx)
	}

195
	dedupedKeys := dedupeKeys(keys)
196
	go func() {
197 198 199
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

200
		blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
201

202
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
203 204 205 206 207
			select {
			case blk, ok := <-blkchan:
				if !ok {
					return
				}
Jeromy's avatar
Jeromy committed
208

Jeromy's avatar
Jeromy committed
209 210
				nd, err := Decoded(blk.Data)
				if err != nil {
Jeromy's avatar
Jeromy committed
211
					// NB: can happen with improperly formatted input data
212
					log.Debug("Got back bad block!")
Jeromy's avatar
Jeromy committed
213
					return
Jeromy's avatar
Jeromy committed
214
				}
Jeromy's avatar
Jeromy committed
215
				is := FindLinks(keys, blk.Key(), 0)
Jeromy's avatar
Jeromy committed
216
				for _, i := range is {
217
					count++
218
					sendChans[i] <- nd
Jeromy's avatar
Jeromy committed
219 220 221
				}
			case <-ctx.Done():
				return
222 223 224
			}
		}
	}()
Jeromy's avatar
Jeromy committed
225 226 227
	return promises
}

228 229 230 231 232 233 234 235 236 237 238 239 240
// Remove duplicates from a list of keys
func dedupeKeys(ks []u.Key) []u.Key {
	kmap := make(map[u.Key]struct{})
	var out []u.Key
	for _, k := range ks {
		if _, ok := kmap[k]; !ok {
			kmap[k] = struct{}{}
			out = append(out, k)
		}
	}
	return out
}

Jeromy's avatar
Jeromy committed
241 242 243 244 245 246 247 248 249 250 251 252 253 254
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
	ch := make(chan *Node, 1)
	return &nodePromise{
		recv: ch,
		ctx:  ctx,
	}, ch
}

type nodePromise struct {
	cache *Node
	recv  <-chan *Node
	ctx   context.Context
}

Jeromy's avatar
Jeromy committed
255 256 257 258
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
259
type NodeGetter interface {
260
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
261 262
}

263
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
Jeromy's avatar
Jeromy committed
264 265 266 267 268 269 270 271 272
	if np.cache != nil {
		return np.cache, nil
	}

	select {
	case blk := <-np.recv:
		np.cache = blk
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
273 274
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
275 276
	}
	return np.cache, nil
277
}