merkledag.go 9.01 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"bytes"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8 9
	"time"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Chas Leichner's avatar
Chas Leichner committed
11

12
	mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
13
	blocks "github.com/jbenet/go-ipfs/blocks"
14
	bserv "github.com/jbenet/go-ipfs/blockservice"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18
var log = u.Logger("merkledag")
19
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
20

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
21 22 23
// NodeMap maps u.Keys to Nodes.
// We cannot use []byte/Multihash for keys :(
// so have to convert Multihash bytes to string (u.Key)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
type NodeMap map[u.Key]*Node

Jeromy's avatar
Jeromy committed
26 27 28 29 30 31
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
	Add(*Node) (u.Key, error)
	AddRecursive(*Node) error
	Get(u.Key) (*Node, error)
	Remove(*Node) error
32 33 34 35

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
	GetDAG(context.Context, *Node) <-chan *Node
Jeromy's avatar
Jeromy committed
36 37 38 39 40 41
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
42
// Node represents a node in the IPFS Merkle DAG.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
// nodes have opaque data and a set of navigable links.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44
type Node struct {
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
45 46
	Links []*Link
	Data  []byte
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47 48 49

	// cache encoded/marshaled value
	encoded []byte
50 51

	cached mh.Multihash
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52 53
}

54 55 56 57 58 59 60 61 62 63 64 65 66 67
// NodeStat is a statistics object for a Node. Mostly sizes.
type NodeStat struct {
	NumLinks       int // number of links in link table
	BlockSize      int // size of the raw data
	LinksSize      int // size of the links segment
	DataSize       int // size of the data segment
	CumulativeSize int // cumulatie size of object + all it references
}

func (ns NodeStat) String() string {
	f := "NodeStat{NumLinks: %d, BlockSize: %d, LinksSize: %d, DataSize: %d, CumulativeSize: %d}"
	return fmt.Sprintf(f, ns.NumLinks, ns.BlockSize, ns.LinksSize, ns.DataSize, ns.CumulativeSize)
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
68
// Link represents an IPFS Merkle DAG Link between Nodes.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
type Link struct {
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
70 71
	// utf string name. should be unique per object
	Name string // utf8
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72

Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
73 74
	// cumulative size of target object
	Size uint64
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75

Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
76 77
	// multihash of the target object
	Hash mh.Multihash
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78 79 80

	// a ptr to the actual node for graph manipulation
	Node *Node
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81 82
}

83 84 85 86 87 88
type LinkSlice []*Link

func (ls LinkSlice) Len() int           { return len(ls) }
func (ls LinkSlice) Swap(a, b int)      { ls[a], ls[b] = ls[b], ls[a] }
func (ls LinkSlice) Less(a, b int) bool { return ls[a].Name < ls[b].Name }

Jeromy's avatar
Jeromy committed
89
// MakeLink creates a link to the given node
Jeromy's avatar
Jeromy committed
90 91
func MakeLink(n *Node) (*Link, error) {
	s, err := n.Size()
92
	if err != nil {
Jeromy's avatar
Jeromy committed
93
		return nil, err
94 95
	}

Jeromy's avatar
Jeromy committed
96
	h, err := n.Multihash()
97
	if err != nil {
Jeromy's avatar
Jeromy committed
98
		return nil, err
99
	}
Jeromy's avatar
Jeromy committed
100
	return &Link{
101 102
		Size: s,
		Hash: h,
Jeromy's avatar
Jeromy committed
103
	}, nil
104 105
}

Jeromy's avatar
Jeromy committed
106
// GetNode returns the MDAG Node that this link points to
107
func (l *Link) GetNode(serv DAGService) (*Node, error) {
108 109 110 111 112 113 114
	if l.Node != nil {
		return l.Node, nil
	}

	return serv.Get(u.Key(l.Hash))
}

Jeromy's avatar
Jeromy committed
115 116 117
// AddNodeLink adds a link to another node.
func (n *Node) AddNodeLink(name string, that *Node) error {
	lnk, err := MakeLink(that)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118 119 120
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
121 122 123 124 125 126
	lnk.Name = name
	lnk.Node = that

	n.Links = append(n.Links, lnk)
	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127

Jeromy's avatar
Jeromy committed
128 129 130 131
// AddNodeLink adds a link to another node. without keeping a reference to
// the child node
func (n *Node) AddNodeLinkClean(name string, that *Node) error {
	lnk, err := MakeLink(that)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132 133 134
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
135
	lnk.Name = name
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136

Jeromy's avatar
Jeromy committed
137
	n.Links = append(n.Links, lnk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138 139
	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140

Jeromy's avatar
Jeromy committed
141
// Remove a link on this node by the given name
142 143 144 145 146 147 148
func (n *Node) RemoveNodeLink(name string) error {
	for i, l := range n.Links {
		if l.Name == name {
			n.Links = append(n.Links[:i], n.Links[i+1:]...)
			return nil
		}
	}
149
	return ErrNotFound
150 151
}

152 153
// Copy returns a copy of the node.
// NOTE: does not make copies of Node objects in the links.
Jeromy's avatar
Jeromy committed
154 155 156 157 158 159 160 161 162 163
func (n *Node) Copy() *Node {
	nnode := new(Node)
	nnode.Data = make([]byte, len(n.Data))
	copy(nnode.Data, n.Data)

	nnode.Links = make([]*Link, len(n.Links))
	copy(nnode.Links, n.Links)
	return nnode
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
164 165
// Size returns the total size of the data addressed by node,
// including the total sizes of references.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166
func (n *Node) Size() (uint64, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
	b, err := n.Encoded(false)
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
168 169 170 171
	if err != nil {
		return 0, err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172
	s := uint64(len(b))
Juan Batiz-Benet's avatar
gofmt  
Juan Batiz-Benet committed
173 174 175 176
	for _, l := range n.Links {
		s += l.Size
	}
	return s, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
// Stat returns statistics on the node.
func (n *Node) Stat() (NodeStat, error) {
	enc, err := n.Encoded(false)
	if err != nil {
		return NodeStat{}, err
	}

	cumSize, err := n.Size()
	if err != nil {
		return NodeStat{}, err
	}

	return NodeStat{
		NumLinks:       len(n.Links),
		BlockSize:      len(enc),
		LinksSize:      len(enc) - len(n.Data), // includes framing.
		DataSize:       len(n.Data),
		CumulativeSize: int(cumSize),
	}, nil
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
200
// Multihash hashes the encoded data of this node.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
201
func (n *Node) Multihash() (mh.Multihash, error) {
202
	// Note: Encoded generates the hash and puts it in n.cached.
203
	_, err := n.Encoded(false)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204 205 206 207
	if err != nil {
		return nil, err
	}

208
	return n.cached, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
210

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
211
// Key returns the Multihash as a key, for maps.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212 213 214 215
func (n *Node) Key() (u.Key, error) {
	h, err := n.Multihash()
	return u.Key(h), err
}
216

217
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
218 219
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
220 221
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
222
type dagService struct {
223
	Blocks *bserv.BlockService
224 225
}

226 227
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(nd *Node) (u.Key, error) {
228
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
229
		return "", fmt.Errorf("dagService is nil")
230 231 232 233 234 235 236
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

237 238 239
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
240 241 242 243 244 245 246
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

Jeromy's avatar
Jeromy committed
247
// AddRecursive adds the given node and all child nodes to the BlockService
248
func (n *dagService) AddRecursive(nd *Node) error {
249 250
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
251
		log.Info("AddRecursive Error: %s\n", err)
252 253 254 255
		return err
	}

	for _, link := range nd.Links {
256 257 258 259 260
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
261 262 263 264 265 266
		}
	}

	return nil
}

267 268
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(k u.Key) (*Node, error) {
269
	if n == nil {
270
		return nil, fmt.Errorf("dagService is nil")
271 272
	}

273 274 275 276 277
	ctx, _ := context.WithTimeout(context.TODO(), time.Minute)
	// we shouldn't use an arbitrary timeout here.
	// since Get doesnt take in a context yet, we give a large upper bound.
	// think of an http request. we want it to go on as long as the client requests it.

Jeromy's avatar
Jeromy committed
278
	b, err := n.Blocks.GetBlock(ctx, k)
279 280 281 282 283 284
	if err != nil {
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
285

Jeromy's avatar
Jeromy committed
286
// Remove deletes the given node and all of its children from the BlockService
287
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
288 289 290 291 292 293 294 295 296 297 298
	for _, l := range nd.Links {
		if l.Node != nil {
			n.Remove(l.Node)
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
299

Jeromy's avatar
Jeromy committed
300 301
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
Jeromy's avatar
Jeromy committed
302
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
303
	log.Warning("Untested.")
Jeromy's avatar
Jeromy committed
304 305 306
	var wg sync.WaitGroup
	done := make(chan struct{})

Jeromy's avatar
Jeromy committed
307
	for _, l := range root.Links {
Jeromy's avatar
Jeromy committed
308
		wg.Add(1)
Jeromy's avatar
Jeromy committed
309
		go func(lnk *Link) {
Jeromy's avatar
Jeromy committed
310 311 312

			// Signal child is done on way out
			defer wg.Done()
Jeromy's avatar
Jeromy committed
313 314 315 316 317 318 319 320 321 322
			select {
			case <-ctx.Done():
				return
			}

			nd, err := lnk.GetNode(serv)
			if err != nil {
				log.Error(err)
				return
			}
Jeromy's avatar
Jeromy committed
323 324 325

			// Wait for children to finish
			<-FetchGraph(ctx, nd, serv)
Jeromy's avatar
Jeromy committed
326 327
		}(l)
	}
Jeromy's avatar
Jeromy committed
328 329 330 331 332 333 334

	go func() {
		wg.Wait()
		done <- struct{}{}
	}()

	return done
Jeromy's avatar
Jeromy committed
335
}
336

Jeromy's avatar
Jeromy committed
337 338
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
339
func FindLinks(n *Node, k u.Key, start int) []int {
Jeromy's avatar
Jeromy committed
340
	var out []int
341
	keybytes := []byte(k)
Jeromy's avatar
Jeromy committed
342
	for i, lnk := range n.Links[start:] {
343
		if bytes.Equal([]byte(lnk.Hash), keybytes) {
Jeromy's avatar
Jeromy committed
344
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
345 346
		}
	}
Jeromy's avatar
Jeromy committed
347
	return out
Jeromy's avatar
Jeromy committed
348 349
}

350
// GetDAG will fill out all of the links of the given Node.
351 352
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
353
func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
Jeromy's avatar
Jeromy committed
354
	sig := make(chan *Node)
355
	go func() {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
356 357
		defer close(sig)

358
		var keys []u.Key
Jeromy's avatar
Jeromy committed
359 360 361 362 363
		for _, lnk := range root.Links {
			keys = append(keys, u.Key(lnk.Hash))
		}
		blkchan := ds.Blocks.GetBlocks(ctx, keys)

364
		nodes := make([]*Node, len(root.Links))
Jeromy's avatar
Jeromy committed
365
		next := 0
366
		for blk := range blkchan {
367 368
			nd, err := Decoded(blk.Data)
			if err != nil {
Jeromy's avatar
Jeromy committed
369
				// NB: can occur in normal situations, with improperly formatted
Brian Tiger Chow's avatar
Brian Tiger Chow committed
370
				// input data
371 372 373
				log.Error("Got back bad block!")
				break
			}
Jeromy's avatar
Jeromy committed
374
			is := FindLinks(root, blk.Key(), next)
Jeromy's avatar
Jeromy committed
375 376
			for _, i := range is {
				nodes[i] = nd
377
			}
378

Jeromy's avatar
Jeromy committed
379 380
			for ; next < len(nodes) && nodes[next] != nil; next++ {
				sig <- nodes[next]
381 382
			}
		}
383
		if next < len(nodes) {
Jeromy's avatar
Jeromy committed
384 385
			// TODO: bubble errors back up.
			log.Errorf("Did not receive correct number of nodes!")
386
		}
387
	}()
388

389
	return sig
390
}