merkledag.go 5.38 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
Jeromy's avatar
Jeromy committed
6
	"sync"
Jeromy's avatar
Jeromy committed
7 8
	"time"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Chas Leichner's avatar
Chas Leichner committed
10

11
	blocks "github.com/jbenet/go-ipfs/blocks"
12
	bserv "github.com/jbenet/go-ipfs/blockservice"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14 15
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16
var log = u.Logger("merkledag")
17
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
18

Jeromy's avatar
Jeromy committed
19 20 21 22 23 24
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
	Add(*Node) (u.Key, error)
	AddRecursive(*Node) error
	Get(u.Key) (*Node, error)
	Remove(*Node) error
25 26 27

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
28 29
	GetDAG(context.Context, *Node) []NodeGetter
	GetNodes(context.Context, []u.Key) []NodeGetter
Jeromy's avatar
Jeromy committed
30 31 32 33 34 35
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

36
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
37 38
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
39 40
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
41
type dagService struct {
42
	Blocks *bserv.BlockService
43 44
}

45 46
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(nd *Node) (u.Key, error) {
47
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
48
		return "", fmt.Errorf("dagService is nil")
49 50 51 52 53 54 55
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

56 57 58
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
59 60 61 62 63 64 65
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

Jeromy's avatar
Jeromy committed
66
// AddRecursive adds the given node and all child nodes to the BlockService
67
func (n *dagService) AddRecursive(nd *Node) error {
68 69
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
70
		log.Info("AddRecursive Error: %s\n", err)
71 72 73 74
		return err
	}

	for _, link := range nd.Links {
75 76 77 78 79
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
80 81 82 83 84 85
		}
	}

	return nil
}

86 87
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(k u.Key) (*Node, error) {
88
	if n == nil {
89
		return nil, fmt.Errorf("dagService is nil")
90 91
	}

92 93 94 95 96
	ctx, _ := context.WithTimeout(context.TODO(), time.Minute)
	// we shouldn't use an arbitrary timeout here.
	// since Get doesnt take in a context yet, we give a large upper bound.
	// think of an http request. we want it to go on as long as the client requests it.

Jeromy's avatar
Jeromy committed
97
	b, err := n.Blocks.GetBlock(ctx, k)
98 99 100 101 102 103
	if err != nil {
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
104

Jeromy's avatar
Jeromy committed
105
// Remove deletes the given node and all of its children from the BlockService
106
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
107 108 109 110 111 112 113 114 115 116 117
	for _, l := range nd.Links {
		if l.Node != nil {
			n.Remove(l.Node)
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
118

Jeromy's avatar
Jeromy committed
119 120
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
Jeromy's avatar
Jeromy committed
121
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
122
	log.Warning("Untested.")
Jeromy's avatar
Jeromy committed
123 124 125
	var wg sync.WaitGroup
	done := make(chan struct{})

Jeromy's avatar
Jeromy committed
126
	for _, l := range root.Links {
Jeromy's avatar
Jeromy committed
127
		wg.Add(1)
Jeromy's avatar
Jeromy committed
128
		go func(lnk *Link) {
Jeromy's avatar
Jeromy committed
129 130 131

			// Signal child is done on way out
			defer wg.Done()
Jeromy's avatar
Jeromy committed
132 133 134 135 136 137 138 139 140 141
			select {
			case <-ctx.Done():
				return
			}

			nd, err := lnk.GetNode(serv)
			if err != nil {
				log.Error(err)
				return
			}
Jeromy's avatar
Jeromy committed
142 143 144

			// Wait for children to finish
			<-FetchGraph(ctx, nd, serv)
Jeromy's avatar
Jeromy committed
145 146
		}(l)
	}
Jeromy's avatar
Jeromy committed
147 148 149 150 151 152 153

	go func() {
		wg.Wait()
		done <- struct{}{}
	}()

	return done
Jeromy's avatar
Jeromy committed
154
}
155

Jeromy's avatar
Jeromy committed
156 157
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
Jeromy's avatar
Jeromy committed
158
func FindLinks(links []u.Key, k u.Key, start int) []int {
Jeromy's avatar
Jeromy committed
159
	var out []int
Jeromy's avatar
Jeromy committed
160 161
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
162
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
163 164
		}
	}
Jeromy's avatar
Jeromy committed
165
	return out
Jeromy's avatar
Jeromy committed
166 167
}

168
// GetDAG will fill out all of the links of the given Node.
169 170
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
171
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
Jeromy's avatar
Jeromy committed
172 173 174 175 176 177 178 179
	var keys []u.Key
	for _, lnk := range root.Links {
		keys = append(keys, u.Key(lnk.Hash))
	}

	return ds.GetNodes(ctx, keys)
}

Jeromy's avatar
Jeromy committed
180 181 182 183 184 185 186
func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) []NodeGetter {
	promises := make([]NodeGetter, len(keys))
	sendChans := make([]chan<- *Node, len(keys))
	for i, _ := range keys {
		promises[i], sendChans[i] = newNodePromise(ctx)
	}

187
	go func() {
Jeromy's avatar
Jeromy committed
188 189
		blkchan := ds.Blocks.GetBlocks(ctx, keys)

Jeromy's avatar
Jeromy committed
190 191 192 193 194 195
		for {
			select {
			case blk, ok := <-blkchan:
				if !ok {
					return
				}
Jeromy's avatar
Jeromy committed
196

Jeromy's avatar
Jeromy committed
197 198
				nd, err := Decoded(blk.Data)
				if err != nil {
Jeromy's avatar
Jeromy committed
199
					// NB: can happen with improperly formatted input data
Jeromy's avatar
Jeromy committed
200
					log.Error("Got back bad block!")
Jeromy's avatar
Jeromy committed
201
					return
Jeromy's avatar
Jeromy committed
202
				}
Jeromy's avatar
Jeromy committed
203
				is := FindLinks(keys, blk.Key(), 0)
Jeromy's avatar
Jeromy committed
204
				for _, i := range is {
Jeromy's avatar
Jeromy committed
205
					sendChans[i] <- nd
Jeromy's avatar
Jeromy committed
206 207 208
				}
			case <-ctx.Done():
				return
209 210 211
			}
		}
	}()
Jeromy's avatar
Jeromy committed
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
	return promises
}

func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
	ch := make(chan *Node, 1)
	return &nodePromise{
		recv: ch,
		ctx:  ctx,
	}, ch
}

type nodePromise struct {
	cache *Node
	recv  <-chan *Node
	ctx   context.Context
}

type NodeGetter interface {
	Get() (*Node, error)
}

func (np *nodePromise) Get() (*Node, error) {
	if np.cache != nil {
		return np.cache, nil
	}

	select {
	case blk := <-np.recv:
		np.cache = blk
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
	}
	return np.cache, nil
245
}