set.go 1.96 KB
Newer Older
1 2
package cid

Łukasz Magiera's avatar
Łukasz Magiera committed
3 4 5 6
import (
	"context"
)

7 8
// Set is a implementation of a set of Cids, that is, a structure
// to which holds a single copy of every Cids that is added to it.
9 10 11 12
type Set struct {
	set map[string]struct{}
}

13
// NewSet initializes and returns a new Set.
14 15 16 17
func NewSet() *Set {
	return &Set{set: make(map[string]struct{})}
}

18
// Add puts a Cid in the Set.
19 20 21 22
func (s *Set) Add(c *Cid) {
	s.set[string(c.Bytes())] = struct{}{}
}

23
// Has returns if the Set contains a given Cid.
24 25 26 27 28
func (s *Set) Has(c *Cid) bool {
	_, ok := s.set[string(c.Bytes())]
	return ok
}

29
// Remove deletes a Cid from the Set.
30 31 32 33
func (s *Set) Remove(c *Cid) {
	delete(s.set, string(c.Bytes()))
}

34
// Len returns how many elements the Set has.
35 36 37 38
func (s *Set) Len() int {
	return len(s.set)
}

39
// Keys returns the Cids in the set.
40
func (s *Set) Keys() []*Cid {
41
	out := make([]*Cid, 0, len(s.set))
42
	for k := range s.set {
43 44 45 46 47
		c, _ := Cast([]byte(k))
		out = append(out, c)
	}
	return out
}
48

49 50
// Visit adds a Cid to the set only if it is
// not in it already.
51 52 53 54 55 56 57 58
func (s *Set) Visit(c *Cid) bool {
	if !s.Has(c) {
		s.Add(c)
		return true
	}

	return false
}
59

60 61
// ForEach allows to run a custom function on each
// Cid in the set.
62
func (s *Set) ForEach(f func(c *Cid) error) error {
63
	for cs := range s.set {
64 65 66 67 68 69 70 71
		c, _ := Cast([]byte(cs))
		err := f(c)
		if err != nil {
			return err
		}
	}
	return nil
}
Łukasz Magiera's avatar
Łukasz Magiera committed
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102

// StreamingSet is an extension of Set which allows to implement back-pressure
// for the Visit function
type StreamingSet struct {
	Set *Set
	New chan *Cid
}

// NewStreamingSet initializes and returns new Set.
func NewStreamingSet() *StreamingSet {
	return &StreamingSet{
		Set: NewSet(),
		New: make(chan *Cid),
	}
}

// Visitor creates new visitor which adds a Cids to the set and emits them to
// the set.New channel
func (s *StreamingSet) Visitor(ctx context.Context) func(c *Cid) bool {
	return func(c *Cid) bool {
		if s.Set.Visit(c) {
			select {
			case s.New <- c:
			case <-ctx.Done():
			}
			return true
		}

		return false
	}
}