Commit b5f0a42b authored by Juan Batiz-Benet's avatar Juan Batiz-Benet Committed by Brian Tiger Chow

better query processing (runner)

parent 75b1212d
......@@ -177,6 +177,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message
if err != nil {
return nil, err
}
if rmes == nil {
return nil, errors.New("no response to request")
}
rtt := time.Since(start)
rmes.Peer().SetLatency(rtt)
......@@ -218,19 +221,22 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
return nil, nil, err
}
u.POut("pmes.GetValue() %v\n", pmes.GetValue())
if value := pmes.GetValue(); value != nil {
// Success! We were given the value
u.POut("getValueOrPeers: got value\n")
return value, nil, nil
}
// TODO decide on providers. This probably shouldn't be happening.
// if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
// val, err := dht.getFromPeerList(key, timeout,, level)
// if err != nil {
// return nil, nil, err
// }
// return val, nil, nil
// }
if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
val, err := dht.getFromPeerList(ctx, key, prv, level)
if err != nil {
return nil, nil, err
}
u.POut("getValueOrPeers: get from providers\n")
return val, nil, nil
}
// Perhaps we were given closer peers
var peers []*peer.Peer
......@@ -256,10 +262,12 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
}
if len(peers) > 0 {
u.POut("getValueOrPeers: peers\n")
return nil, peers, nil
}
return nil, nil, errors.New("NotFound. did not get value or closer peers.")
u.POut("getValueOrPeers: u.ErrNotFound\n")
return nil, nil, u.ErrNotFound
}
// getValueSingle simply performs the get value RPC with the given parameters
......
......@@ -18,14 +18,23 @@ import (
"time"
)
// mesHandleFunc is a function that takes in outgoing messages
// and can respond to them, simulating other peers on the network.
// returning nil will chose not to respond and pass the message onto the
// next registered handler
type mesHandleFunc func(msg.NetMessage) msg.NetMessage
// fauxNet is a standin for a swarm.Network in order to more easily recreate
// different testing scenarios
type fauxSender struct {
handlers []mesHandleFunc
}
func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
func (f *fauxSender) AddHandler(fn func(msg.NetMessage) msg.NetMessage) {
f.handlers = append(f.handlers, fn)
}
func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
for _, h := range f.handlers {
reply := h(m)
if reply != nil {
......@@ -33,7 +42,12 @@ func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.Net
}
}
return nil, nil
// no reply? ok force a timeout
select {
case <-ctx.Done():
}
return nil, ctx.Err()
}
func (f *fauxSender) SendMessage(ctx context.Context, m msg.NetMessage) error {
......@@ -49,17 +63,6 @@ func (f *fauxSender) SendMessage(ctx context.Context, m msg.NetMessage) error {
// fauxNet is a standin for a swarm.Network in order to more easily recreate
// different testing scenarios
type fauxNet struct {
handlers []mesHandleFunc
}
// mesHandleFunc is a function that takes in outgoing messages
// and can respond to them, simulating other peers on the network.
// returning nil will chose not to respond and pass the message onto the
// next registered handler
type mesHandleFunc func(msg.NetMessage) msg.NetMessage
func (f *fauxNet) AddHandler(fn func(msg.NetMessage) msg.NetMessage) {
f.handlers = append(f.handlers, fn)
}
// DialPeer attempts to establish a connection to a given peer
......@@ -98,25 +101,23 @@ func TestGetFailures(t *testing.T) {
local.ID = peer.ID("test_peer")
d := NewDHT(local, peerstore, fn, fs, ds.NewMapDatastore())
other := &peer.Peer{ID: peer.ID("other_peer")}
d.Start()
d.Update(other)
// This one should time out
// u.POut("Timout Test\n")
_, err := d.GetValue(u.Key("test"), time.Millisecond*10)
if err != nil {
if err != u.ErrTimeout {
t.Fatal("Got different error than we expected.")
if err != context.DeadlineExceeded {
t.Fatal("Got different error than we expected", err)
}
} else {
t.Fatal("Did not get expected error!")
}
// u.POut("NotFound Test\n")
// Reply with failures to every message
fn.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
......@@ -140,18 +141,7 @@ func TestGetFailures(t *testing.T) {
t.Fatal("expected error, got none.")
}
success := make(chan struct{})
fn.handlers = nil
fn.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
resp := new(Message)
err := proto.Unmarshal(mes.Data(), resp)
if err != nil {
t.Fatal(err)
}
success <- struct{}{}
return nil
})
fs.handlers = nil
// Now we test this DHT's handleGetValue failure
typ := Message_GET_VALUE
str := "hello"
......@@ -161,17 +151,32 @@ func TestGetFailures(t *testing.T) {
Value: []byte{0},
}
// u.POut("handleGetValue Test\n")
mes, err := msg.FromObject(other, &req)
if err != nil {
t.Error(err)
}
mes, err = fs.SendRequest(ctx, mes)
mes, err = d.HandleMessage(ctx, mes)
if err != nil {
t.Error(err)
}
<-success
pmes := new(Message)
err = proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}
if pmes.GetValue() != nil {
t.Fatal("shouldnt have value")
}
if pmes.GetCloserPeers() != nil {
t.Fatal("shouldnt have closer peers")
}
if pmes.GetProviderPeers() != nil {
t.Fatal("shouldnt have provider peers")
}
}
// TODO: Maybe put these in some sort of "ipfs_testutil" package
......@@ -192,7 +197,6 @@ func TestNotFound(t *testing.T) {
peerstore := peer.NewPeerstore()
d := NewDHT(local, peerstore, fn, fs, ds.NewMapDatastore())
d.Start()
var ps []*peer.Peer
for i := 0; i < 5; i++ {
......@@ -201,7 +205,7 @@ func TestNotFound(t *testing.T) {
}
// Reply with random peers to every message
fn.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
......@@ -228,7 +232,8 @@ func TestNotFound(t *testing.T) {
})
_, err := d.GetValue(u.Key("hello"), time.Second*30)
v, err := d.GetValue(u.Key("hello"), time.Second*5)
u.POut("get value got %v\n", v)
if err != nil {
switch err {
case u.ErrNotFound:
......@@ -254,7 +259,6 @@ func TestLessThanKResponses(t *testing.T) {
local.ID = peer.ID("test_peer")
d := NewDHT(local, peerstore, fn, fs, ds.NewMapDatastore())
d.Start()
var ps []*peer.Peer
for i := 0; i < 5; i++ {
......@@ -264,7 +268,7 @@ func TestLessThanKResponses(t *testing.T) {
other := _randPeer()
// Reply with random peers to every message
fn.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
......
......@@ -58,7 +58,10 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
return nil, err
}
// if we have the value, respond with it!
// Note: changed the behavior here to return _as much_ info as possible
// (potentially all of {value, closer peers, provider})
// if we have the value, send it back
if err == nil {
u.DOut("handleGetValue success!\n")
......@@ -68,7 +71,6 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
}
resp.Value = byts
return resp, nil
}
// if we know any providers for the requested value, return those.
......@@ -76,20 +78,16 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
if len(provs) > 0 {
u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
resp.ProviderPeers = peersToPBPeers(provs)
return resp, nil
}
// Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeerToQuery(pmes)
if closer == nil {
u.DOut("handleGetValue could not find a closer node than myself.\n")
resp.CloserPeers = nil
if closer != nil {
u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
return resp, nil
}
// we got a closer peer, it seems. return it.
u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
return resp, nil
}
......
package dht
import (
"sync"
peer "github.com/jbenet/go-ipfs/peer"
queue "github.com/jbenet/go-ipfs/peer/queue"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
const maxQueryConcurrency = 5
type dhtQuery struct {
// a PeerQueue
peers queue.PeerQueue
// the key we're querying for
key u.Key
// the function to execute per peer
qfunc queryFunc
// the concurrency parameter
concurrency int
}
type dhtQueryResult struct {
value []byte // GetValue
peer *peer.Peer // FindPeer
providerPeers []*peer.Peer // GetProviders
closerPeers []*peer.Peer // *
success bool
}
// constructs query
func newQuery(k u.Key, f queryFunc) *dhtQuery {
return &dhtQuery{
key: k,
qfunc: f,
concurrency: maxQueryConcurrency,
}
}
// QueryFunc is a function that runs a particular query with a given peer.
......@@ -21,65 +47,170 @@ type dhtQuery struct {
// - the value
// - a list of peers potentially better able to serve the query
// - an error
type queryFunc func(context.Context, *peer.Peer) (interface{}, []*peer.Peer, error)
type queryFunc func(context.Context, *peer.Peer) (*dhtQueryResult, error)
// Run runs the query at hand. pass in a list of peers to use first.
func (q *dhtQuery) Run(ctx context.Context, peers []*peer.Peer) (*dhtQueryResult, error) {
runner := newQueryRunner(ctx, q)
return runner.Run(peers)
}
type dhtQueryRunner struct {
// the query to run
query *dhtQuery
// peersToQuery is a list of peers remaining to query
peersToQuery *queue.ChanQueue
// peersSeen are all the peers queried. used to prevent querying same peer 2x
peersSeen peer.Map
func (q *dhtQuery) Run(ctx context.Context, concurrency int) (interface{}, error) {
// get own cancel function to signal when we've found the value
// rateLimit is a channel used to rate limit our processing (semaphore)
rateLimit chan struct{}
// peersRemaining is a counter of peers remaining (toQuery + processing)
peersRemaining todoctr.Counter
// context
ctx context.Context
cancel context.CancelFunc
// result
result *dhtQueryResult
// result errors
errs []error
// lock for concurrent access to fields
sync.RWMutex
}
func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
ctx, cancel := context.WithCancel(ctx)
// the variable waiting to be populated upon success
var result interface{}
// chanQueue is how workers receive their work
chanQueue := queue.NewChanQueue(ctx, q.peers)
// worker
worker := func() {
for {
select {
case p := <-chanQueue.DeqChan:
val, closer, err := q.qfunc(ctx, p)
if err != nil {
u.PErr("error running query: %v\n", err)
continue
}
if val != nil {
result = val
cancel() // signal we're done.
return
}
if closer != nil {
for _, p := range closer {
select {
case chanQueue.EnqChan <- p:
case <-ctx.Done():
return
}
}
}
case <-ctx.Done():
return
}
}
return &dhtQueryRunner{
ctx: ctx,
cancel: cancel,
query: q,
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
peersRemaining: todoctr.NewSyncCounter(),
peersSeen: peer.Map{},
rateLimit: make(chan struct{}, q.concurrency),
}
}
func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
// setup concurrency rate limiting
for i := 0; i < r.query.concurrency; i++ {
r.rateLimit <- struct{}{}
}
// launch all workers
for i := 0; i < concurrency; i++ {
go worker()
// add all the peers we got first.
for _, p := range peers {
r.addPeerToQuery(p, nil) // don't have access to self here...
}
// wait until we're done. yep.
select {
case <-ctx.Done():
case <-r.peersRemaining.Done():
r.cancel() // ran all and nothing. cancel all outstanding workers.
r.RLock()
defer r.RUnlock()
if len(r.errs) > 0 {
return nil, r.errs[0]
}
return nil, u.ErrNotFound
case <-r.ctx.Done():
r.RLock()
defer r.RUnlock()
if r.result != nil && r.result.success {
return r.result, nil
}
return nil, r.ctx.Err()
}
}
func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
if next == nil {
// wtf why are peers nil?!?
u.PErr("Query getting nil peers!!!\n")
return
}
// if new peer further away than whom we got it from, bother (loops)
if benchmark != nil && kb.Closer(benchmark.ID, next.ID, r.query.key) {
return
}
// if already seen, no need.
r.Lock()
_, found := r.peersSeen[next.Key()]
if found {
r.Unlock()
return
}
r.peersSeen[next.Key()] = next
r.Unlock()
// do this after unlocking to prevent possible deadlocks.
r.peersRemaining.Increment(1)
select {
case r.peersToQuery.EnqChan <- next:
case <-r.ctx.Done():
}
}
func (r *dhtQueryRunner) spawnWorkers(p *peer.Peer) {
for {
select {
case <-r.peersRemaining.Done():
return
case <-r.ctx.Done():
return
case p := <-r.peersToQuery.DeqChan:
go r.queryPeer(p)
}
}
}
if result != nil {
return result, nil
func (r *dhtQueryRunner) queryPeer(p *peer.Peer) {
// make sure we rate limit concurrency.
select {
case <-r.rateLimit:
case <-r.ctx.Done():
r.peersRemaining.Decrement(1)
return
}
// finally, run the query against this peer
res, err := r.query.qfunc(r.ctx, p)
if err != nil {
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()
} else if res.success {
r.Lock()
r.result = res
r.Unlock()
r.cancel() // signal to everyone that we're done.
} else if res.closerPeers != nil {
for _, next := range res.closerPeers {
r.addPeerToQuery(next, p)
}
}
return nil, ctx.Err()
// signal we're done proccessing peer p
r.peersRemaining.Decrement(1)
r.rateLimit <- struct{}{}
}
......@@ -3,14 +3,11 @@ package dht
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
peer "github.com/jbenet/go-ipfs/peer"
queue "github.com/jbenet/go-ipfs/peer/queue"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"
)
......@@ -24,28 +21,23 @@ import (
func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
ctx := context.TODO()
query := &dhtQuery{}
query.peers = queue.NewXORDistancePQ(key)
peers := []*peer.Peer{}
// get the peers we need to announce to
for _, route := range dht.routingTables {
peers := route.NearestPeers(kb.ConvertKey(key), KValue)
for _, p := range peers {
if p == nil {
// this shouldn't be happening.
panic("p should not be nil")
}
query.peers.Enqueue(p)
}
npeers := route.NearestPeers(kb.ConvertKey(key), KValue)
peers = append(peers, npeers...)
}
query.qfunc = func(ctx context.Context, p *peer.Peer) (interface{}, []*peer.Peer, error) {
dht.putValueToNetwork(ctx, p, string(key), value)
return nil, nil, nil
}
query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
err := dht.putValueToNetwork(ctx, p, string(key), value)
if err != nil {
return nil, err
}
return &dhtQueryResult{success: true}, nil
})
_, err := query.Run(ctx, query.peers.Len())
_, err := query.Run(ctx, peers)
return err
}
......@@ -63,7 +55,6 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
val, err := dht.getLocal(key)
if err == nil {
ll.Success = true
u.DOut("Found local, returning.\n")
return val, nil
}
......@@ -74,30 +65,33 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
return nil, kb.ErrLookupFailure
}
query := &dhtQuery{}
query.peers = queue.NewXORDistancePQ(key)
// setup the Query
query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
// get the peers we need to announce to
for _, p := range closest {
query.peers.Enqueue(p)
}
val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel)
if err != nil {
return nil, err
}
// setup the Query Function
query.qfunc = func(ctx context.Context, p *peer.Peer) (interface{}, []*peer.Peer, error) {
return dht.getValueOrPeers(ctx, p, key, routeLevel)
}
res := &dhtQueryResult{value: val, closerPeers: peers}
if val != nil {
res.success = true
}
return res, nil
})
// run it!
result, err := query.Run(ctx, query.peers.Len())
result, err := query.Run(ctx, closest)
if err != nil {
return nil, err
}
byt, ok := result.([]byte)
if !ok {
return nil, fmt.Errorf("received non-byte slice value")
if result.value == nil {
return nil, u.ErrNotFound
}
return byt, nil
return result.value, nil
}
// Value provider layer of indirection.
......@@ -278,25 +272,19 @@ func (dht *IpfsDHT) findPeerMultiple(id peer.ID, timeout time.Duration) (*peer.P
return p, nil
}
query := &dhtQuery{}
query.peers = queue.NewXORDistancePQ(u.Key(id))
// get the peers we need to announce to
routeLevel := 0
peers := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if len(peers) == 0 {
return nil, kb.ErrLookupFailure
}
for _, p := range peers {
query.peers.Enqueue(p)
}
// setup query function
query.qfunc = func(ctx context.Context, p *peer.Peer) (interface{}, []*peer.Peer, error) {
query := newQuery(u.Key(id), func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
if err != nil {
u.DErr("getPeer error: %v\n", err)
return nil, nil, err
return nil, err
}
plist := pmes.GetCloserPeers()
......@@ -313,25 +301,24 @@ func (dht *IpfsDHT) findPeerMultiple(id peer.ID, timeout time.Duration) (*peer.P
}
if nxtp.ID.Equal(id) {
return nxtp, nil, nil
return &dhtQueryResult{peer: nxtp, success: true}, nil
}
nxtprs[i] = nxtp
}
return nil, nxtprs, nil
}
return &dhtQueryResult{closerPeers: nxtprs}, nil
})
p5, err := query.Run(ctx, query.peers.Len())
result, err := query.Run(ctx, peers)
if err != nil {
return nil, err
}
p6, ok := p5.(*peer.Peer)
if !ok {
return nil, errors.New("received non peer object")
if result.peer == nil {
return nil, u.ErrNotFound
}
return p6, nil
return result.peer, nil
}
// Ping a peer, log the time it took
......@@ -350,21 +337,14 @@ func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
ctx, _ := context.WithTimeout(context.TODO(), timeout)
u.DOut("Begin Diagnostic")
query := &dhtQuery{}
query.peers = queue.NewXORDistancePQ(u.Key(dht.self.ID))
targets := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
for _, p := range targets {
query.peers.Enqueue(p)
}
peers := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
var out []*diagInfo
query.qfunc = func(ctx context.Context, p *peer.Peer) (interface{}, []*peer.Peer, error) {
query := newQuery(dht.self.Key(), func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
pmes := newMessage(Message_DIAGNOSTIC, "", 0)
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
return nil, nil, err
return nil, err
}
dec := json.NewDecoder(bytes.NewBuffer(rpmes.GetValue()))
......@@ -377,9 +357,9 @@ func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
out = append(out, di)
}
return nil, nil, nil
}
return &dhtQueryResult{success: true}, nil
})
_, err := query.Run(ctx, query.peers.Len())
_, err := query.Run(ctx, peers)
return out, err
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment