package todocounter

import (
	"sync"
)

// Counter records things remaining to process. It is needed for complicated
// cases where multiple goroutines are spawned to process items, and they may
// generate more items to process. For example, say a query over a set of nodes
// may yield either a result value, or more nodes to query. Signaling is subtly
// complicated, because the queue may be empty while items are being processed,
// that will end up adding more items to the queue.
//
// Use Counter like this:
//
//    todos := make(chan int, 10)
//    ctr := todoctr.NewCounter()
//
//    process := func(item int) {
//      fmt.Println("processing %d\n...", item)
//
//      // this task may randomly generate more tasks
//      if rand.Intn(5) == 0 {
//        todos<- item + 1
//        ctr.Increment(1) // increment counter for new task.
//      }
//
//      ctr.Decrement(1) // decrement one to signal the task being done.
//    }
//
//    // add some tasks.
//    todos<- 1
//    todos<- 2
//    todos<- 3
//    todos<- 4
//    ctr.Increment(4)
//
//    for {
//      select {
//      case item := <- todos:
//        go process(item)
//      case <-ctr.Done():
//        fmt.Println("done processing everything.")
//        close(todos)
//      }
//    }
type Counter interface {
	// Incrememnt adds a number of todos to track.
	// If the counter is **below** zero, it panics.
	Increment(i uint32)

	// Decrement removes a number of todos to track.
	// If the count drops to zero, signals done and destroys the counter.
	// If the count drops **below** zero, panics. It means you have tried to remove
	// more things than you added, i.e. sync issues.
	Decrement(i uint32)

	// Done returns a channel to wait upon. Use it in selects:
	//
	//  select {
	//  case <-ctr.Done():
	//    // done processing all items
	//  }
	//
	Done() <-chan struct{}
}

type todoCounter struct {
	count int32
	done  chan struct{}
	sync.RWMutex
}

// NewSyncCounter constructs a new counter
func NewSyncCounter() Counter {
	return &todoCounter{
		done: make(chan struct{}),
	}
}

func (c *todoCounter) Increment(i uint32) {
	c.Lock()
	defer c.Unlock()

	if c.count < 0 {
		panic("counter already signaled done. use a new counter.")
	}

	// increment count
	c.count += int32(i)
}

// Decrement removes a number of todos to track.
// If the count drops to zero, signals done and destroys the counter.
// If the count drops **below** zero, panics. It means you have tried to remove
// more things than you added, i.e. sync issues.
func (c *todoCounter) Decrement(i uint32) {
	c.Lock()
	defer c.Unlock()

	if c.count < 0 {
		panic("counter already signaled done. probably have sync issues.")
	}

	if int32(i) > c.count {
		panic("decrement amount creater than counter. sync issues.")
	}

	c.count -= int32(i)
	if c.count == 0 { // done! signal it.
		c.count--     // set it to -1 to prevent reuse
		close(c.done) // a closed channel will always return nil
	}
}

func (c *todoCounter) Done() <-chan struct{} {
	return c.done
}