merkledag.go 6.75 KB
Newer Older
1
// package merkledag implements the ipfs Merkle DAG datastructures.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3 4
package merkledag

import (
5
	"fmt"
Jeromy's avatar
Jeromy committed
6
	"sync"
Jeromy's avatar
Jeromy committed
7

8 9
	"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	blocks "github.com/ipfs/go-ipfs/blocks"
10
	key "github.com/ipfs/go-ipfs/blocks/key"
11 12
	bserv "github.com/ipfs/go-ipfs/blockservice"
	u "github.com/ipfs/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
var log = u.Logger("merkledag")
16
var ErrNotFound = fmt.Errorf("merkledag: not found")
Jeromy's avatar
Jeromy committed
17

Jeromy's avatar
Jeromy committed
18 19
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
20
	Add(*Node) (key.Key, error)
Jeromy's avatar
Jeromy committed
21
	AddRecursive(*Node) error
22
	Get(context.Context, key.Key) (*Node, error)
Jeromy's avatar
Jeromy committed
23
	Remove(*Node) error
24 25 26

	// GetDAG returns, in order, all the single leve child
	// nodes of the passed in node.
Jeromy's avatar
Jeromy committed
27
	GetDAG(context.Context, *Node) []NodeGetter
28
	GetNodes(context.Context, []key.Key) []NodeGetter
29 30

	Batch() *Batch
Jeromy's avatar
Jeromy committed
31 32 33 34 35 36
}

func NewDAGService(bs *bserv.BlockService) DAGService {
	return &dagService{bs}
}

37
// dagService is an IPFS Merkle DAG service.
Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
38 39
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
40 41
// TODO: should cache Nodes that are in memory, and be
//       able to free some of them when vm pressure is high
42
type dagService struct {
43
	Blocks *bserv.BlockService
44 45
}

46
// Add adds a node to the dagService, storing the block in the BlockService
47
func (n *dagService) Add(nd *Node) (key.Key, error) {
48
	if n == nil { // FIXME remove this assertion. protect with constructor invariant
49
		return "", fmt.Errorf("dagService is nil")
50 51 52 53 54 55 56
	}

	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

57 58 59
	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
60 61 62 63 64 65 66
	if err != nil {
		return "", err
	}

	return n.Blocks.AddBlock(b)
}

67 68 69 70
func (n *dagService) Batch() *Batch {
	return &Batch{ds: n, MaxSize: 8 * 1024 * 1024}
}

Jeromy's avatar
Jeromy committed
71
// AddRecursive adds the given node and all child nodes to the BlockService
72
func (n *dagService) AddRecursive(nd *Node) error {
73 74
	_, err := n.Add(nd)
	if err != nil {
Jeromy's avatar
Jeromy committed
75
		log.Info("AddRecursive Error: %s\n", err)
76 77 78 79
		return err
	}

	for _, link := range nd.Links {
80 81 82 83 84
		if link.Node != nil {
			err := n.AddRecursive(link.Node)
			if err != nil {
				return err
			}
85 86 87 88 89 90
		}
	}

	return nil
}

91
// Get retrieves a node from the dagService, fetching the block in the BlockService
92
func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
93
	if n == nil {
94
		return nil, fmt.Errorf("dagService is nil")
95 96
	}

Jeromy's avatar
Jeromy committed
97
	b, err := n.Blocks.GetBlock(ctx, k)
98 99 100 101 102 103
	if err != nil {
		return nil, err
	}

	return Decoded(b.Data)
}
Jeromy's avatar
Jeromy committed
104

Jeromy's avatar
Jeromy committed
105
// Remove deletes the given node and all of its children from the BlockService
106
func (n *dagService) Remove(nd *Node) error {
Jeromy's avatar
Jeromy committed
107 108 109 110 111 112 113 114 115 116 117
	for _, l := range nd.Links {
		if l.Node != nil {
			n.Remove(l.Node)
		}
	}
	k, err := nd.Key()
	if err != nil {
		return err
	}
	return n.Blocks.DeleteBlock(k)
}
Jeromy's avatar
Jeromy committed
118

Jeromy's avatar
Jeromy committed
119 120
// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
Jeromy's avatar
Jeromy committed
121
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} {
122
	log.Warning("Untested.")
Jeromy's avatar
Jeromy committed
123 124 125
	var wg sync.WaitGroup
	done := make(chan struct{})

Jeromy's avatar
Jeromy committed
126
	for _, l := range root.Links {
Jeromy's avatar
Jeromy committed
127
		wg.Add(1)
Jeromy's avatar
Jeromy committed
128
		go func(lnk *Link) {
Jeromy's avatar
Jeromy committed
129 130 131

			// Signal child is done on way out
			defer wg.Done()
Jeromy's avatar
Jeromy committed
132 133 134 135 136
			select {
			case <-ctx.Done():
				return
			}

Jeromy's avatar
Jeromy committed
137
			nd, err := lnk.GetNode(ctx, serv)
Jeromy's avatar
Jeromy committed
138
			if err != nil {
139
				log.Debug(err)
Jeromy's avatar
Jeromy committed
140 141
				return
			}
Jeromy's avatar
Jeromy committed
142 143 144

			// Wait for children to finish
			<-FetchGraph(ctx, nd, serv)
Jeromy's avatar
Jeromy committed
145 146
		}(l)
	}
Jeromy's avatar
Jeromy committed
147 148 149 150 151 152 153

	go func() {
		wg.Wait()
		done <- struct{}{}
	}()

	return done
Jeromy's avatar
Jeromy committed
154
}
155

Jeromy's avatar
Jeromy committed
156 157
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
158
func FindLinks(links []key.Key, k key.Key, start int) []int {
Jeromy's avatar
Jeromy committed
159
	var out []int
Jeromy's avatar
Jeromy committed
160 161
	for i, lnk_k := range links[start:] {
		if k == lnk_k {
Jeromy's avatar
Jeromy committed
162
			out = append(out, i+start)
Jeromy's avatar
Jeromy committed
163 164
		}
	}
Jeromy's avatar
Jeromy committed
165
	return out
Jeromy's avatar
Jeromy committed
166 167
}

168
// GetDAG will fill out all of the links of the given Node.
169 170
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
Jeromy's avatar
Jeromy committed
171
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
172
	var keys []key.Key
Jeromy's avatar
Jeromy committed
173
	for _, lnk := range root.Links {
174
		keys = append(keys, key.Key(lnk.Hash))
Jeromy's avatar
Jeromy committed
175 176 177 178 179
	}

	return ds.GetNodes(ctx, keys)
}

Jeromy's avatar
Jeromy committed
180 181
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
182
func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter {
183 184 185 186 187 188

	// Early out if no work to do
	if len(keys) == 0 {
		return nil
	}

Jeromy's avatar
Jeromy committed
189 190
	promises := make([]NodeGetter, len(keys))
	sendChans := make([]chan<- *Node, len(keys))
rht's avatar
rht committed
191
	for i := range keys {
Jeromy's avatar
Jeromy committed
192 193 194
		promises[i], sendChans[i] = newNodePromise(ctx)
	}

195
	dedupedKeys := dedupeKeys(keys)
196
	go func() {
197 198 199
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

200
		blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys)
Jeromy's avatar
Jeromy committed
201

202
		for count := 0; count < len(keys); {
Jeromy's avatar
Jeromy committed
203 204 205 206 207
			select {
			case blk, ok := <-blkchan:
				if !ok {
					return
				}
Jeromy's avatar
Jeromy committed
208

Jeromy's avatar
Jeromy committed
209 210
				nd, err := Decoded(blk.Data)
				if err != nil {
Jeromy's avatar
Jeromy committed
211
					// NB: can happen with improperly formatted input data
212
					log.Debug("Got back bad block!")
Jeromy's avatar
Jeromy committed
213
					return
Jeromy's avatar
Jeromy committed
214
				}
Jeromy's avatar
Jeromy committed
215
				is := FindLinks(keys, blk.Key(), 0)
Jeromy's avatar
Jeromy committed
216
				for _, i := range is {
217
					count++
218
					sendChans[i] <- nd
Jeromy's avatar
Jeromy committed
219 220 221
				}
			case <-ctx.Done():
				return
222 223 224
			}
		}
	}()
Jeromy's avatar
Jeromy committed
225 226 227
	return promises
}

228
// Remove duplicates from a list of keys
229 230 231
func dedupeKeys(ks []key.Key) []key.Key {
	kmap := make(map[key.Key]struct{})
	var out []key.Key
232 233 234 235 236 237 238 239 240
	for _, k := range ks {
		if _, ok := kmap[k]; !ok {
			kmap[k] = struct{}{}
			out = append(out, k)
		}
	}
	return out
}

Jeromy's avatar
Jeromy committed
241 242 243 244 245 246 247 248 249 250 251 252 253 254
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
	ch := make(chan *Node, 1)
	return &nodePromise{
		recv: ch,
		ctx:  ctx,
	}, ch
}

type nodePromise struct {
	cache *Node
	recv  <-chan *Node
	ctx   context.Context
}

Jeromy's avatar
Jeromy committed
255 256 257 258
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
Jeromy's avatar
Jeromy committed
259
type NodeGetter interface {
260
	Get(context.Context) (*Node, error)
Jeromy's avatar
Jeromy committed
261 262
}

263
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
Jeromy's avatar
Jeromy committed
264 265 266 267 268 269 270 271 272
	if np.cache != nil {
		return np.cache, nil
	}

	select {
	case blk := <-np.recv:
		np.cache = blk
	case <-np.ctx.Done():
		return nil, np.ctx.Err()
273 274
	case <-ctx.Done():
		return nil, ctx.Err()
Jeromy's avatar
Jeromy committed
275 276
	}
	return np.cache, nil
277
}
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315

type Batch struct {
	ds *dagService

	blocks  []*blocks.Block
	size    int
	MaxSize int
}

func (t *Batch) Add(nd *Node) (key.Key, error) {
	d, err := nd.Encoded(false)
	if err != nil {
		return "", err
	}

	b := new(blocks.Block)
	b.Data = d
	b.Multihash, err = nd.Multihash()
	if err != nil {
		return "", err
	}

	k := key.Key(b.Multihash)

	t.blocks = append(t.blocks, b)
	t.size += len(b.Data)
	if t.size > t.MaxSize {
		return k, t.Commit()
	}
	return k, nil
}

func (t *Batch) Commit() error {
	_, err := t.ds.Blocks.AddBlocks(t.blocks)
	t.blocks = nil
	t.size = 0
	return err
}