merkledag.go 5.07 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
Jeromy's avatar
Jeromy committed
6
	"sync"
Jeromy's avatar
Jeromy committed
7 8
	"time"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Chas Leichner's avatar
Chas Leichner committed
10

11
	blocks "github.com/jbenet/go-ipfs/blocks"
12
	bserv "github.com/jbenet/go-ipfs/blockservice"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14 15
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16
var log = u.Logger("merkledag")
17
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
18

Jeromy's avatar
Jeromy committed
19 20 21 22 23 24
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
	Add(*Node) (u.Key, error)
	AddRecursive(*Node) error
	Get(u.Key) (*Node, error)
	Remove(*Node) error
25 26 27 28

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
	GetDAG(context.Context, *Node) <-chan *Node
Jeromy's avatar
Jeromy committed
29
	GetNodes(context.Context, []u.Key) <-chan *Node
Jeromy's avatar
Jeromy committed
30 31 32 33 34 35
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

36
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37 38
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
39 40
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
41
type dagService struct {
42
	Blocks *bserv.BlockService
43 44
}

45 46
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(nd *Node) (u.Key, error) {
47
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
48
		return "", fmt.Errorf("dagService is nil")
49 50 51 52 53 54 55
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

56 57 58
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
59 60 61 62 63 64 65
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

Jeromy's avatar
Jeromy committed
66
// AddRecursive adds the given node and all child nodes to the BlockService
67
func (n *dagService) AddRecursive(nd *Node) error {
68 69
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
70
		log.Info("AddRecursive Error: %s\n", err)
71 72 73 74
		return err
	}

	for _, link := range nd.Links {
75 76 77 78 79
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
80 81 82 83 84 85
		}
	}

	return nil
}

86 87
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(k u.Key) (*Node, error) {
88
	if n == nil {
89
		return nil, fmt.Errorf("dagService is nil")
90 91
	}

92 93 94 95 96
	ctx, _ := context.WithTimeout(context.TODO(), time.Minute)
	// we shouldn't use an arbitrary timeout here.
	// since Get doesnt take in a context yet, we give a large upper bound.
	// think of an http request. we want it to go on as long as the client requests it.

Jeromy's avatar
Jeromy committed
97
	b, err := n.Blocks.GetBlock(ctx, k)
98 99 100 101 102 103
	if err != nil {
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
104

Jeromy's avatar
Jeromy committed
105
// Remove deletes the given node and all of its children from the BlockService
106
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
107 108 109 110 111 112 113 114 115 116 117
	for _, l := range nd.Links {
		if l.Node != nil {
			n.Remove(l.Node)
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
118

Jeromy's avatar
Jeromy committed
119 120
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
Jeromy's avatar
Jeromy committed
121
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
122
	log.Warning("Untested.")
Jeromy's avatar
Jeromy committed
123 124 125
	var wg sync.WaitGroup
	done := make(chan struct{})

Jeromy's avatar
Jeromy committed
126
	for _, l := range root.Links {
Jeromy's avatar
Jeromy committed
127
		wg.Add(1)
Jeromy's avatar
Jeromy committed
128
		go func(lnk *Link) {
Jeromy's avatar
Jeromy committed
129 130 131

			// Signal child is done on way out
			defer wg.Done()
Jeromy's avatar
Jeromy committed
132 133 134 135 136 137 138 139 140 141
			select {
			case <-ctx.Done():
				return
			}

			nd, err := lnk.GetNode(serv)
			if err != nil {
				log.Error(err)
				return
			}
Jeromy's avatar
Jeromy committed
142 143 144

			// Wait for children to finish
			<-FetchGraph(ctx, nd, serv)
Jeromy's avatar
Jeromy committed
145 146
		}(l)
	}
Jeromy's avatar
Jeromy committed
147 148 149 150 151 152 153

	go func() {
		wg.Wait()
		done <- struct{}{}
	}()

	return done
Jeromy's avatar
Jeromy committed
154
}
155

Jeromy's avatar
Jeromy committed
156 157
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
158
func FindLinks(links []u.Key, k u.Key, start int) []int {
Jeromy's avatar
Jeromy committed
159
	var out []int
Jeromy's avatar
Jeromy committed
160 161
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
162
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
163 164
		}
	}
Jeromy's avatar
Jeromy committed
165
	return out
Jeromy's avatar
Jeromy committed
166 167
}

168
// GetDAG will fill out all of the links of the given Node.
169 170
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
171
func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
Jeromy's avatar
Jeromy committed
172 173 174 175 176 177 178 179 180
	var keys []u.Key
	for _, lnk := range root.Links {
		keys = append(keys, u.Key(lnk.Hash))
	}

	return ds.GetNodes(ctx, keys)
}

func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) <-chan *Node {
Jeromy's avatar
Jeromy committed
181
	sig := make(chan *Node)
182
	go func() {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
183
		defer close(sig)
Jeromy's avatar
Jeromy committed
184 185
		blkchan := ds.Blocks.GetBlocks(ctx, keys)

Jeromy's avatar
Jeromy committed
186
		nodes := make([]*Node, len(keys))
Jeromy's avatar
Jeromy committed
187
		next := 0
Jeromy's avatar
Jeromy committed
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
		for {
			select {
			case blk, ok := <-blkchan:
				if !ok {
					if next < len(nodes) {
						log.Errorf("Did not receive correct number of nodes!")
					}
					return
				}
				nd, err := Decoded(blk.Data)
				if err != nil {
					// NB: can occur in normal situations, with improperly formatted
					// input data
					log.Error("Got back bad block!")
					break
				}
				is := FindLinks(keys, blk.Key(), next)
				for _, i := range is {
					nodes[i] = nd
				}

				for ; next < len(nodes) && nodes[next] != nil; next++ {
					select {
					case sig <- nodes[next]:
					case <-ctx.Done():
						return
					}
				}
			case <-ctx.Done():
				return
218 219 220 221
			}
		}
	}()
	return sig
222
}