blockstoremanager.go 2.65 KB
Newer Older
1 2 3 4
package decision

import (
	"context"
5
	"fmt"
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
	"sync"

	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	bstore "github.com/ipfs/go-ipfs-blockstore"
	process "github.com/jbenet/goprocess"
)

// blockstoreManager maintains a pool of workers that make requests to the blockstore.
type blockstoreManager struct {
	bs          bstore.Blockstore
	workerCount int
	jobs        chan func()
	px          process.Process
}

// newBlockstoreManager creates a new blockstoreManager with the given context
// and number of workers
24
func newBlockstoreManager(bs bstore.Blockstore, workerCount int) *blockstoreManager {
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
	return &blockstoreManager{
		bs:          bs,
		workerCount: workerCount,
		jobs:        make(chan func()),
	}
}

func (bsm *blockstoreManager) start(px process.Process) {
	bsm.px = px

	// Start up workers
	for i := 0; i < bsm.workerCount; i++ {
		px.Go(func(px process.Process) {
			bsm.worker()
		})
	}
}

func (bsm *blockstoreManager) worker() {
	for {
		select {
		case <-bsm.px.Closing():
			return
		case job := <-bsm.jobs:
			job()
		}
	}
}

54
func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error {
55 56
	select {
	case <-ctx.Done():
57
		return ctx.Err()
58
	case <-bsm.px.Closing():
59
		return fmt.Errorf("shutting down")
60
	case bsm.jobs <- job:
61
		return nil
62 63 64
	}
}

65
func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) (map[cid.Cid]int, error) {
66 67
	res := make(map[cid.Cid]int)
	if len(ks) == 0 {
68
		return res, nil
69 70 71
	}

	var lk sync.Mutex
72
	return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
73 74 75
		size, err := bsm.bs.GetSize(c)
		if err != nil {
			if err != bstore.ErrNotFound {
76
				// Note: this isn't a fatal error. We shouldn't abort the request
77 78 79 80 81 82 83 84 85 86
				log.Errorf("blockstore.GetSize(%s) error: %s", c, err)
			}
		} else {
			lk.Lock()
			res[c] = size
			lk.Unlock()
		}
	})
}

87
func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) {
88 89
	res := make(map[cid.Cid]blocks.Block)
	if len(ks) == 0 {
90
		return res, nil
91 92 93
	}

	var lk sync.Mutex
94
	return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
95 96 97
		blk, err := bsm.bs.Get(c)
		if err != nil {
			if err != bstore.ErrNotFound {
98
				// Note: this isn't a fatal error. We shouldn't abort the request
99 100 101 102 103 104 105 106 107 108
				log.Errorf("blockstore.Get(%s) error: %s", c, err)
			}
		} else {
			lk.Lock()
			res[c] = blk
			lk.Unlock()
		}
	})
}

109 110
func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) error {
	var err error
111 112 113 114
	wg := sync.WaitGroup{}
	for _, k := range ks {
		c := k
		wg.Add(1)
115
		err = bsm.addJob(ctx, func() {
116 117 118
			jobFn(c)
			wg.Done()
		})
119 120 121 122
		if err != nil {
			wg.Done()
			break
		}
123 124
	}
	wg.Wait()
125
	return err
126
}