Commit 551c4093 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet Committed by Brian Tiger Chow

chan queue

parent ae1f7688
package queue
import (
"fmt"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func newPeer(id string) *peer.Peer {
return &peer.Peer{ID: peer.ID(id)}
}
func TestPeerstore(t *testing.T) {
func TestQueue(t *testing.T) {
p1 := newPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31")
p2 := newPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32")
......@@ -60,3 +64,57 @@ func TestPeerstore(t *testing.T) {
}
}
func newPeerTime(t time.Time) *peer.Peer {
s := fmt.Sprintf("hmmm time: %v", t)
h, _ := u.Hash([]byte(s))
return &peer.Peer{ID: peer.ID(h)}
}
func TestSyncQueue(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
pq := NewXORDistancePQ(u.Key("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31"))
cq := NewChanQueue(ctx, pq)
countIn := 0
countOut := 0
produce := func() {
tick := time.Tick(time.Millisecond)
for {
select {
case tim := <-tick:
countIn++
cq.EnqChan <- newPeerTime(tim)
case <-ctx.Done():
return
}
}
}
consume := func() {
for {
select {
case <-cq.DeqChan:
countOut++
case <-ctx.Done():
return
}
}
}
for i := 0; i < 10; i++ {
go produce()
go produce()
go consume()
}
select {
case <-ctx.Done():
}
if countIn != countOut {
t.Errorf("didnt get them all out: %d/%d", countOut, countIn)
}
}
package queue
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
peer "github.com/jbenet/go-ipfs/peer"
)
// ChanQueue makes any PeerQueue synchronizable through channels.
type ChanQueue struct {
Queue PeerQueue
EnqChan chan *peer.Peer
DeqChan chan *peer.Peer
}
// NewChanQueue creates a ChanQueue by wrapping pq.
func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
cq := &ChanQueue{
Queue: pq,
EnqChan: make(chan *peer.Peer, 10),
DeqChan: make(chan *peer.Peer, 10),
}
go cq.process(ctx)
return cq
}
func (cq *ChanQueue) process(ctx context.Context) {
var next *peer.Peer
for {
if cq.Queue.Len() == 0 {
select {
case next = <-cq.EnqChan:
case <-ctx.Done():
close(cq.DeqChan)
return
}
} else {
next = cq.Queue.Dequeue()
}
select {
case item := <-cq.EnqChan:
cq.Queue.Enqueue(item)
cq.Queue.Enqueue(next)
next = nil
case cq.DeqChan <- next:
next = nil
case <-ctx.Done():
close(cq.DeqChan)
return
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment