merkledag.go 4.84 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"bytes"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8 9
	"time"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Chas Leichner's avatar
Chas Leichner committed
11

12
	blocks "github.com/jbenet/go-ipfs/blocks"
13
	bserv "github.com/jbenet/go-ipfs/blockservice"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15 16
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17
var log = u.Logger("merkledag")
18
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
19

Jeromy's avatar
Jeromy committed
20 21 22 23 24 25
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
	Add(*Node) (u.Key, error)
	AddRecursive(*Node) error
	Get(u.Key) (*Node, error)
	Remove(*Node) error
26 27 28 29

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
	GetDAG(context.Context, *Node) <-chan *Node
Jeromy's avatar
Jeromy committed
30 31 32 33 34 35
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

36
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37 38
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
39 40
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
41
type dagService struct {
42
	Blocks *bserv.BlockService
43 44
}

45 46
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(nd *Node) (u.Key, error) {
47
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
48
		return "", fmt.Errorf("dagService is nil")
49 50 51 52 53 54 55
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

56 57 58
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
59 60 61 62 63 64 65
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

Jeromy's avatar
Jeromy committed
66
// AddRecursive adds the given node and all child nodes to the BlockService
67
func (n *dagService) AddRecursive(nd *Node) error {
68 69
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
70
		log.Info("AddRecursive Error: %s\n", err)
71 72 73 74
		return err
	}

	for _, link := range nd.Links {
75 76 77 78 79
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
80 81 82 83 84 85
		}
	}

	return nil
}

86 87
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(k u.Key) (*Node, error) {
88
	if n == nil {
89
		return nil, fmt.Errorf("dagService is nil")
90 91
	}

92 93 94 95 96
	ctx, _ := context.WithTimeout(context.TODO(), time.Minute)
	// we shouldn't use an arbitrary timeout here.
	// since Get doesnt take in a context yet, we give a large upper bound.
	// think of an http request. we want it to go on as long as the client requests it.

Jeromy's avatar
Jeromy committed
97
	b, err := n.Blocks.GetBlock(ctx, k)
98 99 100 101 102 103
	if err != nil {
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
104

Jeromy's avatar
Jeromy committed
105
// Remove deletes the given node and all of its children from the BlockService
106
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
107 108 109 110 111 112 113 114 115 116 117
	for _, l := range nd.Links {
		if l.Node != nil {
			n.Remove(l.Node)
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
118

Jeromy's avatar
Jeromy committed
119 120
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
Jeromy's avatar
Jeromy committed
121
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
122
	log.Warning("Untested.")
Jeromy's avatar
Jeromy committed
123 124 125
	var wg sync.WaitGroup
	done := make(chan struct{})

Jeromy's avatar
Jeromy committed
126
	for _, l := range root.Links {
Jeromy's avatar
Jeromy committed
127
		wg.Add(1)
Jeromy's avatar
Jeromy committed
128
		go func(lnk *Link) {
Jeromy's avatar
Jeromy committed
129 130 131

			// Signal child is done on way out
			defer wg.Done()
Jeromy's avatar
Jeromy committed
132 133 134 135 136 137 138 139 140 141
			select {
			case <-ctx.Done():
				return
			}

			nd, err := lnk.GetNode(serv)
			if err != nil {
				log.Error(err)
				return
			}
Jeromy's avatar
Jeromy committed
142 143 144

			// Wait for children to finish
			<-FetchGraph(ctx, nd, serv)
Jeromy's avatar
Jeromy committed
145 146
		}(l)
	}
Jeromy's avatar
Jeromy committed
147 148 149 150 151 152 153

	go func() {
		wg.Wait()
		done <- struct{}{}
	}()

	return done
Jeromy's avatar
Jeromy committed
154
}
155

Jeromy's avatar
Jeromy committed
156 157
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
158
func FindLinks(n *Node, k u.Key, start int) []int {
Jeromy's avatar
Jeromy committed
159
	var out []int
160
	keybytes := []byte(k)
Jeromy's avatar
Jeromy committed
161
	for i, lnk := range n.Links[start:] {
162
		if bytes.Equal([]byte(lnk.Hash), keybytes) {
Jeromy's avatar
Jeromy committed
163
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
164 165
		}
	}
Jeromy's avatar
Jeromy committed
166
	return out
Jeromy's avatar
Jeromy committed
167 168
}

169
// GetDAG will fill out all of the links of the given Node.
170 171
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
172
func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
Jeromy's avatar
Jeromy committed
173
	sig := make(chan *Node)
174
	go func() {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
175 176
		defer close(sig)

177
		var keys []u.Key
Jeromy's avatar
Jeromy committed
178 179 180 181 182
		for _, lnk := range root.Links {
			keys = append(keys, u.Key(lnk.Hash))
		}
		blkchan := ds.Blocks.GetBlocks(ctx, keys)

183
		nodes := make([]*Node, len(root.Links))
Jeromy's avatar
Jeromy committed
184
		next := 0
185
		for blk := range blkchan {
186 187
			nd, err := Decoded(blk.Data)
			if err != nil {
Jeromy's avatar
Jeromy committed
188
				// NB: can occur in normal situations, with improperly formatted
Brian Tiger Chow's avatar
Brian Tiger Chow committed
189
				// input data
190 191 192
				log.Error("Got back bad block!")
				break
			}
Jeromy's avatar
Jeromy committed
193
			is := FindLinks(root, blk.Key(), next)
Jeromy's avatar
Jeromy committed
194 195
			for _, i := range is {
				nodes[i] = nd
196
			}
197

Jeromy's avatar
Jeromy committed
198 199
			for ; next < len(nodes) && nodes[next] != nil; next++ {
				sig <- nodes[next]
200 201
			}
		}
202
		if next < len(nodes) {
Jeromy's avatar
Jeromy committed
203 204
			// TODO: bubble errors back up.
			log.Errorf("Did not receive correct number of nodes!")
205
		}
206
	}()
207

208
	return sig
209
}