blockservice.go 5.03 KB
Newer Older
1 2 3
// package blockservice implements a BlockService interface that provides
// a single GetBlock/AddBlock interface that seamlessly retrieves data either
// locally or from a remote peer through the exchange.
4 5 6
package blockservice

import (
7
	"errors"
Jeromy's avatar
Jeromy committed
8
	"fmt"
9

10 11
	blocks "github.com/ipfs/go-ipfs/blocks"
	"github.com/ipfs/go-ipfs/blocks/blockstore"
12
	key "github.com/ipfs/go-ipfs/blocks/key"
13
	exchange "github.com/ipfs/go-ipfs/exchange"
Jeromy's avatar
Jeromy committed
14

Jeromy's avatar
Jeromy committed
15
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
Jeromy's avatar
Jeromy committed
16
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Jeromy's avatar
Jeromy committed
17
	cid "gx/ipfs/QmfSc2xehWmWLnwwYR91Y8QF4xdASypTFVknutoKQS3GHp/go-cid"
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("blockservice")
21

22
var ErrNotFound = errors.New("blockservice: key not found")
Jeromy's avatar
Jeromy committed
23

24 25
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
26 27
// It uses an internal `datastore.Datastore` instance to store values.
type BlockService struct {
28 29
	// TODO don't expose underlying impl details
	Blockstore blockstore.Blockstore
30
	Exchange   exchange.Interface
31 32
}

Jeromy's avatar
Jeromy committed
33 34 35 36 37 38
// an Object is simply a typed block
type Object interface {
	Cid() *cid.Cid
	blocks.Block
}

39
// NewBlockService creates a BlockService with given datastore instance.
40
func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
Jeromy's avatar
Jeromy committed
41
	if rem == nil {
Jeromy's avatar
Jeromy committed
42
		log.Warning("blockservice running in local (offline) mode.")
Jeromy's avatar
Jeromy committed
43
	}
44 45

	return &BlockService{
46 47
		Blockstore: bs,
		Exchange:   rem,
48
	}
49 50 51
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
52
// TODO pass a context into this if the remote.HasBlock is going to remain here.
Jeromy's avatar
Jeromy committed
53 54 55 56 57 58 59 60
func (s *BlockService) AddObject(o Object) (*cid.Cid, error) {
	// TODO: while this is a great optimization, we should think about the
	// possibility of streaming writes directly to disk. If we can pass this object
	// all the way down to the datastore without having to 'buffer' its data,
	// we could implement a `WriteTo` method on it that could do a streaming write
	// of the content, saving us (probably) considerable memory.
	c := o.Cid()
	has, err := s.Blockstore.Has(key.Key(c.Hash()))
61
	if err != nil {
Jeromy's avatar
Jeromy committed
62
		return nil, err
63
	}
Jeromy's avatar
Jeromy committed
64

65
	if has {
Jeromy's avatar
Jeromy committed
66
		return c, nil
67 68
	}

Jeromy's avatar
Jeromy committed
69
	err = s.Blockstore.Put(o)
Jeromy's avatar
Jeromy committed
70
	if err != nil {
Jeromy's avatar
Jeromy committed
71
		return nil, err
Jeromy's avatar
Jeromy committed
72
	}
Jeromy's avatar
Jeromy committed
73 74 75

	if err := s.Exchange.HasBlock(o); err != nil {
		return nil, errors.New("blockservice is closed")
76
	}
Jeromy's avatar
Jeromy committed
77 78

	return c, nil
79 80
}

Jeromy's avatar
Jeromy committed
81
func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) {
82
	var toput []blocks.Block
Jeromy's avatar
Jeromy committed
83
	var toputcids []*cid.Cid
84
	for _, b := range bs {
Jeromy's avatar
Jeromy committed
85 86 87
		c := b.Cid()

		has, err := s.Blockstore.Has(key.Key(c.Hash()))
88 89 90 91 92 93 94 95 96
		if err != nil {
			return nil, err
		}

		if has {
			continue
		}

		toput = append(toput, b)
Jeromy's avatar
Jeromy committed
97
		toputcids = append(toputcids, c)
98 99 100
	}

	err := s.Blockstore.PutMany(toput)
101 102 103 104
	if err != nil {
		return nil, err
	}

Jeromy's avatar
Jeromy committed
105 106 107 108
	var ks []*cid.Cid
	for _, o := range toput {
		if err := s.Exchange.HasBlock(o); err != nil {
			return nil, fmt.Errorf("blockservice is closed (%s)", err)
109
		}
Jeromy's avatar
Jeromy committed
110 111 112

		c := o.(Object).Cid() // cast is safe, we created these
		ks = append(ks, c)
113 114 115 116
	}
	return ks, nil
}

117 118
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
Jeromy's avatar
Jeromy committed
119 120
func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
	log.Debugf("BlockService GetBlock: '%s'", c)
jbenet's avatar
jbenet committed
121

Jeromy's avatar
Jeromy committed
122
	block, err := s.Blockstore.Get(key.Key(c.Hash()))
Jeromy's avatar
Jeromy committed
123
	if err == nil {
124
		return block, nil
Jeromy's avatar
Jeromy committed
125 126 127
	}

	if err == blockstore.ErrNotFound && s.Exchange != nil {
128 129
		// TODO be careful checking ErrNotFound. If the underlying
		// implementation changes, this will break.
130
		log.Debug("Blockservice: Searching bitswap")
Jeromy's avatar
Jeromy committed
131
		blk, err := s.Exchange.GetBlock(ctx, key.Key(c.Hash()))
Jeromy's avatar
Jeromy committed
132
		if err != nil {
133 134 135
			if err == blockstore.ErrNotFound {
				return nil, ErrNotFound
			}
Jeromy's avatar
Jeromy committed
136 137 138
			return nil, err
		}
		return blk, nil
Jeromy's avatar
Jeromy committed
139 140
	}

141
	log.Debug("Blockservice GetBlock: Not found")
Jeromy's avatar
Jeromy committed
142
	if err == blockstore.ErrNotFound {
143
		return nil, ErrNotFound
144
	}
Jeromy's avatar
Jeromy committed
145 146

	return nil, err
147
}
Jeromy's avatar
Jeromy committed
148

149 150 151
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
Jeromy's avatar
Jeromy committed
152
func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
153
	out := make(chan blocks.Block, 0)
154
	go func() {
155
		defer close(out)
156
		var misses []key.Key
Jeromy's avatar
Jeromy committed
157 158
		for _, c := range ks {
			k := key.Key(c.Hash())
159
			hit, err := s.Blockstore.Get(k)
160
			if err != nil {
161
				misses = append(misses, k)
162
				continue
163
			}
164
			log.Debug("Blockservice: Got data in datastore")
165 166 167 168 169
			select {
			case out <- hit:
			case <-ctx.Done():
				return
			}
170
		}
Jeromy's avatar
Jeromy committed
171

172 173 174 175
		if len(misses) == 0 {
			return
		}

176
		rblocks, err := s.Exchange.GetBlocks(ctx, misses)
Jeromy's avatar
Jeromy committed
177
		if err != nil {
178
			log.Debugf("Error with GetBlocks: %s", err)
Jeromy's avatar
Jeromy committed
179 180
			return
		}
181

182 183 184 185 186 187
		for b := range rblocks {
			select {
			case out <- b:
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
188
		}
189 190
	}()
	return out
Jeromy's avatar
Jeromy committed
191 192
}

Jeromy's avatar
Jeromy committed
193
// DeleteBlock deletes a block in the blockservice from the datastore
Jeromy's avatar
Jeromy committed
194 195
func (s *BlockService) DeleteObject(o Object) error {
	return s.Blockstore.DeleteBlock(o.Key())
Jeromy's avatar
Jeromy committed
196
}
197 198 199

func (s *BlockService) Close() error {
	log.Debug("blockservice is shutting down...")
200
	return s.Exchange.Close()
201
}