blockservice.go 6.75 KB
Newer Older
1 2 3
// package blockservice implements a BlockService interface that provides
// a single GetBlock/AddBlock interface that seamlessly retrieves data either
// locally or from a remote peer through the exchange.
4 5 6
package blockservice

import (
7
	"context"
8
	"errors"
Jeromy's avatar
Jeromy committed
9
	"fmt"
10

11 12
	"github.com/ipfs/go-ipfs/blocks/blockstore"
	exchange "github.com/ipfs/go-ipfs/exchange"
13
	bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
Jeromy's avatar
Jeromy committed
14

15 16
	cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
	blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
Jeromy's avatar
Jeromy committed
17
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("blockservice")
21

22
var ErrNotFound = errors.New("blockservice: key not found")
Jeromy's avatar
Jeromy committed
23

24 25
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
26
// It uses an internal `datastore.Datastore` instance to store values.
27 28 29 30 31 32 33 34 35 36 37 38 39 40
type BlockService interface {
	Blockstore() blockstore.Blockstore
	Exchange() exchange.Interface
	AddBlock(o blocks.Block) (*cid.Cid, error)
	AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)
	GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
	GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
	DeleteBlock(o blocks.Block) error
	Close() error
}

type blockService struct {
	blockstore blockstore.Blockstore
	exchange   exchange.Interface
41 42 43
	// If checkFirst is true then first check that a block doesn't
	// already exist to avoid republishing the block on the exchange.
	checkFirst bool
44 45 46
}

// NewBlockService creates a BlockService with given datastore instance.
47
func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
Jeromy's avatar
Jeromy committed
48
	if rem == nil {
Jeromy's avatar
Jeromy committed
49
		log.Warning("blockservice running in local (offline) mode.")
Jeromy's avatar
Jeromy committed
50
	}
51

52 53 54
	return &blockService{
		blockstore: bs,
		exchange:   rem,
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
		checkFirst: true,
	}
}

// NewWriteThrough ceates a BlockService that guarantees writes will go
// through to the blockstore and are not skipped by cache checks.
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
	if rem == nil {
		log.Warning("blockservice running in local (offline) mode.")
	}

	return &blockService{
		blockstore: bs,
		exchange:   rem,
		checkFirst: false,
70
	}
71 72
}

73 74 75 76 77 78 79 80
func (bs *blockService) Blockstore() blockstore.Blockstore {
	return bs.blockstore
}

func (bs *blockService) Exchange() exchange.Interface {
	return bs.exchange
}

81 82
// NewSession creates a bitswap session that allows for controlled exchange of
// wantlists to decrease the bandwidth overhead.
83 84 85
func NewSession(ctx context.Context, bs BlockService) *Session {
	exchange := bs.Exchange()
	if bswap, ok := exchange.(*bitswap.Bitswap); ok {
86 87 88
		ses := bswap.NewSession(ctx)
		return &Session{
			ses: ses,
89
			bs:  bs.Blockstore(),
90 91 92
		}
	}
	return &Session{
93 94
		ses: exchange,
		bs:  bs.Blockstore(),
95 96 97
	}
}

98
// AddBlock adds a particular block to the service, Putting it into the datastore.
99
// TODO pass a context into this if the remote.HasBlock is going to remain here.
100
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
Jeromy's avatar
Jeromy committed
101
	c := o.Cid()
102 103 104 105 106
	if s.checkFirst {
		has, err := s.blockstore.Has(c)
		if err != nil {
			return nil, err
		}
Jeromy's avatar
Jeromy committed
107

108 109 110
		if has {
			return c, nil
		}
111 112
	}

113
	err := s.blockstore.Put(o)
Jeromy's avatar
Jeromy committed
114
	if err != nil {
Jeromy's avatar
Jeromy committed
115
		return nil, err
Jeromy's avatar
Jeromy committed
116
	}
Jeromy's avatar
Jeromy committed
117

118
	if err := s.exchange.HasBlock(o); err != nil {
Jeromy's avatar
Jeromy committed
119
		return nil, errors.New("blockservice is closed")
120
	}
Jeromy's avatar
Jeromy committed
121 122

	return c, nil
123 124
}

125
func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
126
	var toput []blocks.Block
127 128 129 130 131 132
	if s.checkFirst {
		for _, b := range bs {
			has, err := s.blockstore.Has(b.Cid())
			if err != nil {
				return nil, err
			}
133 134
			if !has {
				toput = append(toput, b)
135
			}
136
		}
137
	} else {
Jeromy's avatar
Jeromy committed
138
		toput = bs
139 140
	}

141
	err := s.blockstore.PutMany(toput)
142 143 144 145
	if err != nil {
		return nil, err
	}

Jeromy's avatar
Jeromy committed
146 147
	var ks []*cid.Cid
	for _, o := range toput {
148
		if err := s.exchange.HasBlock(o); err != nil {
Jeromy's avatar
Jeromy committed
149
			return nil, fmt.Errorf("blockservice is closed (%s)", err)
150
		}
Jeromy's avatar
Jeromy committed
151

152
		ks = append(ks, o.Cid())
153 154 155 156
	}
	return ks, nil
}

157 158
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
159
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
Jeromy's avatar
Jeromy committed
160
	log.Debugf("BlockService GetBlock: '%s'", c)
jbenet's avatar
jbenet committed
161

162 163 164 165 166 167 168 169 170 171
	var f exchange.Fetcher
	if s.exchange != nil {
		f = s.exchange
	}

	return getBlock(ctx, c, s.blockstore, f)
}

func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) (blocks.Block, error) {
	block, err := bs.Get(c)
Jeromy's avatar
Jeromy committed
172
	if err == nil {
173
		return block, nil
Jeromy's avatar
Jeromy committed
174 175
	}

176
	if err == blockstore.ErrNotFound && f != nil {
177 178
		// TODO be careful checking ErrNotFound. If the underlying
		// implementation changes, this will break.
179
		log.Debug("Blockservice: Searching bitswap")
180
		blk, err := f.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
181
		if err != nil {
182 183 184
			if err == blockstore.ErrNotFound {
				return nil, ErrNotFound
			}
Jeromy's avatar
Jeromy committed
185 186 187
			return nil, err
		}
		return blk, nil
Jeromy's avatar
Jeromy committed
188 189
	}

190
	log.Debug("Blockservice GetBlock: Not found")
Jeromy's avatar
Jeromy committed
191
	if err == blockstore.ErrNotFound {
192
		return nil, ErrNotFound
193
	}
Jeromy's avatar
Jeromy committed
194 195

	return nil, err
196
}
Jeromy's avatar
Jeromy committed
197

198 199 200
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
201
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
202 203 204 205
	return getBlocks(ctx, ks, s.blockstore, s.exchange)
}

func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) <-chan blocks.Block {
206
	out := make(chan blocks.Block)
207
	go func() {
208
		defer close(out)
209
		var misses []*cid.Cid
Jeromy's avatar
Jeromy committed
210
		for _, c := range ks {
211
			hit, err := bs.Get(c)
212
			if err != nil {
213
				misses = append(misses, c)
214
				continue
215
			}
216
			log.Debug("Blockservice: Got data in datastore")
217 218 219 220 221
			select {
			case out <- hit:
			case <-ctx.Done():
				return
			}
222
		}
Jeromy's avatar
Jeromy committed
223

224 225 226 227
		if len(misses) == 0 {
			return
		}

228
		rblocks, err := f.GetBlocks(ctx, misses)
Jeromy's avatar
Jeromy committed
229
		if err != nil {
230
			log.Debugf("Error with GetBlocks: %s", err)
Jeromy's avatar
Jeromy committed
231 232
			return
		}
233

234 235 236 237 238 239
		for b := range rblocks {
			select {
			case out <- b:
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
240
		}
241 242
	}()
	return out
Jeromy's avatar
Jeromy committed
243 244
}

Jeromy's avatar
Jeromy committed
245
// DeleteBlock deletes a block in the blockservice from the datastore
246 247
func (s *blockService) DeleteBlock(o blocks.Block) error {
	return s.blockstore.DeleteBlock(o.Cid())
Jeromy's avatar
Jeromy committed
248
}
249

250
func (s *blockService) Close() error {
251
	log.Debug("blockservice is shutting down...")
252
	return s.exchange.Close()
253
}
254

Jeromy's avatar
Jeromy committed
255
// Session is a helper type to provide higher level access to bitswap sessions
256 257 258 259 260
type Session struct {
	bs  blockstore.Blockstore
	ses exchange.Fetcher
}

Jeromy's avatar
Jeromy committed
261
// GetBlock gets a block in the context of a request session
262 263 264 265
func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
	return getBlock(ctx, c, s.bs, s.ses)
}

Jeromy's avatar
Jeromy committed
266
// GetBlocks gets blocks in the context of a request session
267 268 269
func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
	return getBlocks(ctx, ks, s.bs, s.ses)
}