Commit 3711d540 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet Committed by Brian Tiger Chow

getValueSingle using SendRequest

parent 52064084
......@@ -159,8 +159,37 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.
return rmes, nil
}
func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) {
pmes, err := dht.getValueSingle(p, key, timeout, level)
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {
mes, err := msg.FromObject(p, pmes)
if err != nil {
return nil, err
}
start := time.Now()
rmes, err := dht.sender.SendRequest(ctx, mes)
if err != nil {
return nil, err
}
rtt := time.Since(start)
rmes.Peer().SetLatency(rtt)
rpmes := new(Message)
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
return nil, err
}
return rpmes, nil
}
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
key u.Key, level int) ([]byte, []*peer.Peer, error) {
pmes, err := dht.getValueSingle(ctx, p, key, level)
if err != nil {
return nil, nil, err
}
......@@ -202,39 +231,15 @@ func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Durati
}
// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*Message, error) {
pmes := Message{
Type: Message_GET_VALUE,
Key: string(key),
Value: []byte{byte(level)},
ID: swarm.GenerateMessageID(),
}
responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
key u.Key, level int) (*Message, error) {
mes := swarm.NewMessage(p, pmes.ToProtobuf())
t := time.Now()
dht.netChan.Outgoing <- mes
typ := Message_GET_VALUE
skey := string(key)
pmes := &Message{Type: &typ, Key: &skey}
pmes.SetClusterLevel(int32(level))
// Wait for either the response or a timeout
timeup := time.After(timeout)
select {
case <-timeup:
dht.listener.Unlisten(pmes.ID)
return nil, u.ErrTimeout
case resp, ok := <-responseChan:
if !ok {
u.PErr("response channel closed before timeout, please investigate.\n")
return nil, u.ErrTimeout
}
roundtrip := time.Since(t)
resp.Peer.SetLatency(roundtrip)
pmesOut := new(Message)
err := proto.Unmarshal(resp.Data, pmesOut)
if err != nil {
return nil, err
}
return pmesOut, nil
}
return dht.sendRequest(ctx, p, pmes)
}
// TODO: Im not certain on this implementation, we get a list of peers/providers
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment