Commit 8b6458eb authored by hannahhoward's avatar hannahhoward

feat(graphsync): support extension protocol

Per https://github.com/ipld/specs/pull/205, metadata is now an extension, and further extensions
will be added soon
parent 412bd0ea
package message
import (
"errors"
"fmt"
"io"
......@@ -21,8 +22,29 @@ type GraphSyncPriority int32
// GraphSyncResponseStatusCode is a status returned for a GraphSync Request.
type GraphSyncResponseStatusCode int32
// GraphSyncExtensionName is a name for a GraphSync extension
type GraphSyncExtensionName string
// GraphSyncExtension is a name/data pair for a graphsync extension
type GraphSyncExtension struct {
Name GraphSyncExtensionName
Data []byte
}
const (
// Known Graphsync Extensions
// ExtensionMetadata provides response metadata for a Graphsync request and is
// documented at
// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
ExtensionMetadata = GraphSyncExtensionName("graphsync/response-metadata")
// ExtensionDoNotSendCIDs tells the responding peer not to send certain blocks if they
// are encountered in a traversal and is documented at
// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
ExtensionDoNotSendCIDs = GraphSyncExtensionName("graphsync/do-not-send-cids")
// GraphSync Response Status Codes
// Informational Response Codes (partial)
......@@ -66,6 +88,11 @@ const (
RequestFailedContentNotFound = GraphSyncResponseStatusCode(34)
)
var (
// ErrExtensionNotPresent means the looked up extension was not found
ErrExtensionNotPresent = errors.New("Extension is missing from this message")
)
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
func IsTerminalSuccessCode(status GraphSyncResponseStatusCode) bool {
......@@ -119,19 +146,20 @@ type Exportable interface {
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
root cid.Cid
selector []byte
priority GraphSyncPriority
id GraphSyncRequestID
isCancel bool
root cid.Cid
selector []byte
priority GraphSyncPriority
id GraphSyncRequestID
extensions map[string][]byte
isCancel bool
}
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
requestID GraphSyncRequestID
status GraphSyncResponseStatusCode
extra []byte
requestID GraphSyncRequestID
status GraphSyncResponseStatusCode
extensions map[string][]byte
}
type graphSyncMessage struct {
......@@ -157,40 +185,58 @@ func newMsg() *graphSyncMessage {
func NewRequest(id GraphSyncRequestID,
root cid.Cid,
selector []byte,
priority GraphSyncPriority) GraphSyncRequest {
return newRequest(id, root, selector, priority, false)
priority GraphSyncPriority,
extensions ...GraphSyncExtension) GraphSyncRequest {
return newRequest(id, root, selector, priority, false, toExtensionsMap(extensions))
}
// CancelRequest request generates a request to cancel an in progress request
func CancelRequest(id GraphSyncRequestID) GraphSyncRequest {
return newRequest(id, cid.Cid{}, nil, 0, true)
return newRequest(id, cid.Cid{}, nil, 0, true, nil)
}
func toExtensionsMap(extensions []GraphSyncExtension) (extensionsMap map[string][]byte) {
if len(extensions) > 0 {
extensionsMap = make(map[string][]byte, len(extensions))
for _, extension := range extensions {
extensionsMap[string(extension.Name)] = extension.Data
}
}
return
}
func newRequest(id GraphSyncRequestID,
root cid.Cid,
selector []byte,
priority GraphSyncPriority,
isCancel bool) GraphSyncRequest {
isCancel bool,
extensions map[string][]byte) GraphSyncRequest {
return GraphSyncRequest{
id: id,
root: root,
selector: selector,
priority: priority,
isCancel: isCancel,
id: id,
root: root,
selector: selector,
priority: priority,
isCancel: isCancel,
extensions: extensions,
}
}
// NewResponse builds a new Graphsync response
func NewResponse(requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode,
extra []byte) GraphSyncResponse {
extensions ...GraphSyncExtension) GraphSyncResponse {
return newResponse(requestID, status, toExtensionsMap(extensions))
}
func newResponse(requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode, extensions map[string][]byte) GraphSyncResponse {
return GraphSyncResponse{
requestID: requestID,
status: status,
extra: extra,
requestID: requestID,
status: status,
extensions: extensions,
}
}
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
gsm := newMsg()
for _, req := range pbm.Requests {
......@@ -198,11 +244,11 @@ func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
if err != nil {
return nil, err
}
gsm.AddRequest(newRequest(GraphSyncRequestID(req.Id), root, req.Selector, GraphSyncPriority(req.Priority), req.Cancel))
gsm.AddRequest(newRequest(GraphSyncRequestID(req.Id), root, req.Selector, GraphSyncPriority(req.Priority), req.Cancel, req.GetExtensions()))
}
for _, res := range pbm.Responses {
gsm.AddResponse(NewResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), res.Extra))
gsm.AddResponse(newResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), res.GetExtensions()))
}
for _, b := range pbm.GetData() {
......@@ -288,20 +334,21 @@ func (gsm *graphSyncMessage) ToProto() *pb.Message {
pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
for _, request := range gsm.requests {
pbm.Requests = append(pbm.Requests, pb.Message_Request{
Id: int32(request.id),
Root: request.root.Bytes(),
Selector: request.selector,
Priority: int32(request.priority),
Cancel: request.isCancel,
Id: int32(request.id),
Root: request.root.Bytes(),
Selector: request.selector,
Priority: int32(request.priority),
Cancel: request.isCancel,
Extensions: request.extensions,
})
}
pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
for _, response := range gsm.responses {
pbm.Responses = append(pbm.Responses, pb.Message_Response{
Id: int32(response.requestID),
Status: int32(response.status),
Extra: response.extra,
Id: int32(response.requestID),
Status: int32(response.status),
Extensions: response.extensions,
})
}
......@@ -349,6 +396,19 @@ func (gsr GraphSyncRequest) Selector() []byte { return gsr.selector }
// Priority returns the priority of this request
func (gsr GraphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }
// Extension returns the content for an extension on a response, or errors
// if extension is not present
func (gsr GraphSyncRequest) Extension(name GraphSyncExtensionName) ([]byte, error) {
if gsr.extensions == nil {
return nil, ErrExtensionNotPresent
}
val, ok := gsr.extensions[string(name)]
if !ok {
return nil, ErrExtensionNotPresent
}
return val, nil
}
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }
......@@ -358,5 +418,16 @@ func (gsr GraphSyncResponse) RequestID() GraphSyncRequestID { return gsr.request
// Status returns the status for a response
func (gsr GraphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
// Extra returns any metadata on a response
func (gsr GraphSyncResponse) Extra() []byte { return gsr.extra }
// Extension returns the content for an extension on a response, or errors
// if extension is not present
func (gsr GraphSyncResponse) Extension(name GraphSyncExtensionName) ([]byte, error) {
if gsr.extensions == nil {
return nil, ErrExtensionNotPresent
}
val, ok := gsr.extensions[string(name)]
if !ok {
return nil, ErrExtensionNotPresent
}
return val, nil
}
......@@ -13,23 +13,31 @@ import (
)
func TestAppendingRequests(t *testing.T) {
extensionName := GraphSyncExtensionName("graphsync/awesome")
extension := GraphSyncExtension{
Name: extensionName,
Data: testutil.RandomBytes(100),
}
root := testutil.GenerateCids(1)[0]
selector := testutil.RandomBytes(100)
id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31())
gsm := New()
gsm.AddRequest(NewRequest(id, root, selector, priority))
gsm.AddRequest(NewRequest(id, root, selector, priority, extension))
requests := gsm.Requests()
if len(requests) != 1 {
t.Fatal("Did not add request to message")
}
request := requests[0]
extensionData, err := request.Extension(extensionName)
if request.ID() != id ||
request.IsCancel() != false ||
request.Priority() != priority ||
request.Root().String() != root.String() ||
!reflect.DeepEqual(request.Selector(), selector) {
!reflect.DeepEqual(request.Selector(), selector) ||
err != nil ||
!reflect.DeepEqual(extension.Data, extensionData) {
t.Fatal("Did not properly add request to message")
}
......@@ -39,7 +47,8 @@ func TestAppendingRequests(t *testing.T) {
pbRequest.Priority != int32(priority) ||
pbRequest.Cancel != false ||
!reflect.DeepEqual(pbRequest.Root, root.Bytes()) ||
!reflect.DeepEqual(pbRequest.Selector, selector) {
!reflect.DeepEqual(pbRequest.Selector, selector) ||
!reflect.DeepEqual(pbRequest.Extensions, map[string][]byte{"graphsync/awesome": extension.Data}) {
t.Fatal("Did not properly serialize message to protobuf")
}
......@@ -52,30 +61,39 @@ func TestAppendingRequests(t *testing.T) {
t.Fatal("Did not add request to deserialized message")
}
deserializedRequest := deserializedRequests[0]
extensionData, err = deserializedRequest.Extension(extensionName)
if deserializedRequest.ID() != id ||
deserializedRequest.IsCancel() != false ||
deserializedRequest.Priority() != priority ||
deserializedRequest.Root().String() != root.String() ||
!reflect.DeepEqual(deserializedRequest.Selector(), selector) {
!reflect.DeepEqual(deserializedRequest.Selector(), selector) ||
err != nil ||
!reflect.DeepEqual(extension.Data, extensionData) {
t.Fatal("Did not properly deserialize protobuf messages so requests are equal")
}
}
func TestAppendingResponses(t *testing.T) {
extra := testutil.RandomBytes(100)
extensionName := GraphSyncExtensionName("graphsync/awesome")
extension := GraphSyncExtension{
Name: extensionName,
Data: testutil.RandomBytes(100),
}
requestID := GraphSyncRequestID(rand.Int31())
status := RequestAcknowledged
gsm := New()
gsm.AddResponse(NewResponse(requestID, status, extra))
gsm.AddResponse(NewResponse(requestID, status, extension))
responses := gsm.Responses()
if len(responses) != 1 {
t.Fatal("Did not add response to message")
}
response := responses[0]
extensionData, err := response.Extension(extensionName)
if response.RequestID() != requestID ||
response.Status() != status ||
!reflect.DeepEqual(response.Extra(), extra) {
err != nil ||
!reflect.DeepEqual(extension.Data, extensionData) {
t.Fatal("Did not properly add response to message")
}
......@@ -83,7 +101,7 @@ func TestAppendingResponses(t *testing.T) {
pbResponse := pbMessage.Responses[0]
if pbResponse.Id != int32(requestID) ||
pbResponse.Status != int32(status) ||
!reflect.DeepEqual(pbResponse.Extra, extra) {
!reflect.DeepEqual(pbResponse.Extensions, map[string][]byte{"graphsync/awesome": extension.Data}) {
t.Fatal("Did not properly serialize message to protobuf")
}
......@@ -96,9 +114,11 @@ func TestAppendingResponses(t *testing.T) {
t.Fatal("Did not add response to message")
}
deserializedResponse := deserializedResponses[0]
if deserializedResponse.RequestID() != requestID ||
deserializedResponse.Status() != status ||
!reflect.DeepEqual(deserializedResponse.Extra(), extra) {
extensionData, err = deserializedResponse.Extension(extensionName)
if deserializedResponse.RequestID() != response.RequestID() ||
deserializedResponse.Status() != response.Status() ||
err != nil ||
!reflect.DeepEqual(extensionData, extension.Data) {
t.Fatal("Did not properly deserialize protobuf messages so responses are equal")
}
}
......@@ -158,14 +178,18 @@ func TestRequestCancel(t *testing.T) {
func TestToNetFromNetEquivalency(t *testing.T) {
root := testutil.GenerateCids(1)[0]
selector := testutil.RandomBytes(100)
extra := testutil.RandomBytes(100)
extensionName := GraphSyncExtensionName("graphsync/awesome")
extension := GraphSyncExtension{
Name: extensionName,
Data: testutil.RandomBytes(100),
}
id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31())
status := RequestAcknowledged
gsm := New()
gsm.AddRequest(NewRequest(id, root, selector, priority))
gsm.AddResponse(NewResponse(id, status, extra))
gsm.AddRequest(NewRequest(id, root, selector, priority, extension))
gsm.AddResponse(NewResponse(id, status, extension))
gsm.AddBlock(blocks.NewBlock([]byte("W")))
gsm.AddBlock(blocks.NewBlock([]byte("E")))
......@@ -192,11 +216,14 @@ func TestToNetFromNetEquivalency(t *testing.T) {
t.Fatal("Did not add request to deserialized message")
}
deserializedRequest := deserializedRequests[0]
extensionData, err := deserializedRequest.Extension(extensionName)
if deserializedRequest.ID() != request.ID() ||
deserializedRequest.IsCancel() != request.IsCancel() ||
deserializedRequest.Priority() != request.Priority() ||
deserializedRequest.Root().String() != request.Root().String() ||
!reflect.DeepEqual(deserializedRequest.Selector(), request.Selector()) {
!reflect.DeepEqual(deserializedRequest.Selector(), request.Selector()) ||
err != nil ||
!reflect.DeepEqual(extensionData, extension.Data) {
t.Fatal("Did not keep requests when writing to stream and back")
}
......@@ -210,9 +237,11 @@ func TestToNetFromNetEquivalency(t *testing.T) {
t.Fatal("Did not add response to message")
}
deserializedResponse := deserializedResponses[0]
extensionData, err = deserializedResponse.Extension(extensionName)
if deserializedResponse.RequestID() != response.RequestID() ||
deserializedResponse.Status() != response.Status() ||
!reflect.DeepEqual(deserializedResponse.Extra(), response.Extra()) {
err != nil ||
!reflect.DeepEqual(extensionData, extension.Data) {
t.Fatal("Did not keep responses when writing to stream and back")
}
......
......@@ -33,7 +33,7 @@ func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) {
return fileDescriptor_message_0f03446033f13d35, []int{0}
return fileDescriptor_message_c5788c4e9f6c17be, []int{0}
}
func (m *Message) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
......@@ -91,19 +91,19 @@ func (m *Message) GetData() []Message_Block {
}
type Message_Request struct {
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Root []byte `protobuf:"bytes,2,opt,name=root,proto3" json:"root,omitempty"`
Selector []byte `protobuf:"bytes,3,opt,name=selector,proto3" json:"selector,omitempty"`
Extra []byte `protobuf:"bytes,4,opt,name=extra,proto3" json:"extra,omitempty"`
Priority int32 `protobuf:"varint,5,opt,name=priority,proto3" json:"priority,omitempty"`
Cancel bool `protobuf:"varint,6,opt,name=cancel,proto3" json:"cancel,omitempty"`
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Root []byte `protobuf:"bytes,2,opt,name=root,proto3" json:"root,omitempty"`
Selector []byte `protobuf:"bytes,3,opt,name=selector,proto3" json:"selector,omitempty"`
Extensions map[string][]byte `protobuf:"bytes,4,rep,name=extensions" json:"extensions,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Priority int32 `protobuf:"varint,5,opt,name=priority,proto3" json:"priority,omitempty"`
Cancel bool `protobuf:"varint,6,opt,name=cancel,proto3" json:"cancel,omitempty"`
}
func (m *Message_Request) Reset() { *m = Message_Request{} }
func (m *Message_Request) String() string { return proto.CompactTextString(m) }
func (*Message_Request) ProtoMessage() {}
func (*Message_Request) Descriptor() ([]byte, []int) {
return fileDescriptor_message_0f03446033f13d35, []int{0, 0}
return fileDescriptor_message_c5788c4e9f6c17be, []int{0, 0}
}
func (m *Message_Request) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
......@@ -153,9 +153,9 @@ func (m *Message_Request) GetSelector() []byte {
return nil
}
func (m *Message_Request) GetExtra() []byte {
func (m *Message_Request) GetExtensions() map[string][]byte {
if m != nil {
return m.Extra
return m.Extensions
}
return nil
}
......@@ -175,16 +175,16 @@ func (m *Message_Request) GetCancel() bool {
}
type Message_Response struct {
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Status int32 `protobuf:"varint,2,opt,name=status,proto3" json:"status,omitempty"`
Extra []byte `protobuf:"bytes,3,opt,name=extra,proto3" json:"extra,omitempty"`
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Status int32 `protobuf:"varint,2,opt,name=status,proto3" json:"status,omitempty"`
Extensions map[string][]byte `protobuf:"bytes,3,rep,name=extensions" json:"extensions,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (m *Message_Response) Reset() { *m = Message_Response{} }
func (m *Message_Response) String() string { return proto.CompactTextString(m) }
func (*Message_Response) ProtoMessage() {}
func (*Message_Response) Descriptor() ([]byte, []int) {
return fileDescriptor_message_0f03446033f13d35, []int{0, 1}
return fileDescriptor_message_c5788c4e9f6c17be, []int{0, 1}
}
func (m *Message_Response) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
......@@ -227,9 +227,9 @@ func (m *Message_Response) GetStatus() int32 {
return 0
}
func (m *Message_Response) GetExtra() []byte {
func (m *Message_Response) GetExtensions() map[string][]byte {
if m != nil {
return m.Extra
return m.Extensions
}
return nil
}
......@@ -243,7 +243,7 @@ func (m *Message_Block) Reset() { *m = Message_Block{} }
func (m *Message_Block) String() string { return proto.CompactTextString(m) }
func (*Message_Block) ProtoMessage() {}
func (*Message_Block) Descriptor() ([]byte, []int) {
return fileDescriptor_message_0f03446033f13d35, []int{0, 2}
return fileDescriptor_message_c5788c4e9f6c17be, []int{0, 2}
}
func (m *Message_Block) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
......@@ -289,7 +289,9 @@ func (m *Message_Block) GetData() []byte {
func init() {
proto.RegisterType((*Message)(nil), "graphsync.message.pb.Message")
proto.RegisterType((*Message_Request)(nil), "graphsync.message.pb.Message.Request")
proto.RegisterMapType((map[string][]byte)(nil), "graphsync.message.pb.Message.Request.ExtensionsEntry")
proto.RegisterType((*Message_Response)(nil), "graphsync.message.pb.Message.Response")
proto.RegisterMapType((map[string][]byte)(nil), "graphsync.message.pb.Message.Response.ExtensionsEntry")
proto.RegisterType((*Message_Block)(nil), "graphsync.message.pb.Message.Block")
}
func (m *Message) Marshal() (dAtA []byte, err error) {
......@@ -388,11 +390,28 @@ func (m *Message_Request) MarshalTo(dAtA []byte) (int, error) {
i = encodeVarintMessage(dAtA, i, uint64(len(m.Selector)))
i += copy(dAtA[i:], m.Selector)
}
if len(m.Extra) > 0 {
dAtA[i] = 0x22
i++
i = encodeVarintMessage(dAtA, i, uint64(len(m.Extra)))
i += copy(dAtA[i:], m.Extra)
if len(m.Extensions) > 0 {
for k, _ := range m.Extensions {
dAtA[i] = 0x22
i++
v := m.Extensions[k]
byteSize := 0
if len(v) > 0 {
byteSize = 1 + len(v) + sovMessage(uint64(len(v)))
}
mapSize := 1 + len(k) + sovMessage(uint64(len(k))) + byteSize
i = encodeVarintMessage(dAtA, i, uint64(mapSize))
dAtA[i] = 0xa
i++
i = encodeVarintMessage(dAtA, i, uint64(len(k)))
i += copy(dAtA[i:], k)
if len(v) > 0 {
dAtA[i] = 0x12
i++
i = encodeVarintMessage(dAtA, i, uint64(len(v)))
i += copy(dAtA[i:], v)
}
}
}
if m.Priority != 0 {
dAtA[i] = 0x28
......@@ -437,11 +456,28 @@ func (m *Message_Response) MarshalTo(dAtA []byte) (int, error) {
i++
i = encodeVarintMessage(dAtA, i, uint64(m.Status))
}
if len(m.Extra) > 0 {
dAtA[i] = 0x1a
i++
i = encodeVarintMessage(dAtA, i, uint64(len(m.Extra)))
i += copy(dAtA[i:], m.Extra)
if len(m.Extensions) > 0 {
for k, _ := range m.Extensions {
dAtA[i] = 0x1a
i++
v := m.Extensions[k]
byteSize := 0
if len(v) > 0 {
byteSize = 1 + len(v) + sovMessage(uint64(len(v)))
}
mapSize := 1 + len(k) + sovMessage(uint64(len(k))) + byteSize
i = encodeVarintMessage(dAtA, i, uint64(mapSize))
dAtA[i] = 0xa
i++
i = encodeVarintMessage(dAtA, i, uint64(len(k)))
i += copy(dAtA[i:], k)
if len(v) > 0 {
dAtA[i] = 0x12
i++
i = encodeVarintMessage(dAtA, i, uint64(len(v)))
i += copy(dAtA[i:], v)
}
}
}
return i, nil
}
......@@ -532,9 +568,17 @@ func (m *Message_Request) Size() (n int) {
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
}
l = len(m.Extra)
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
if len(m.Extensions) > 0 {
for k, v := range m.Extensions {
_ = k
_ = v
l = 0
if len(v) > 0 {
l = 1 + len(v) + sovMessage(uint64(len(v)))
}
mapEntrySize := 1 + len(k) + sovMessage(uint64(len(k))) + l
n += mapEntrySize + 1 + sovMessage(uint64(mapEntrySize))
}
}
if m.Priority != 0 {
n += 1 + sovMessage(uint64(m.Priority))
......@@ -557,9 +601,17 @@ func (m *Message_Response) Size() (n int) {
if m.Status != 0 {
n += 1 + sovMessage(uint64(m.Status))
}
l = len(m.Extra)
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
if len(m.Extensions) > 0 {
for k, v := range m.Extensions {
_ = k
_ = v
l = 0
if len(v) > 0 {
l = 1 + len(v) + sovMessage(uint64(len(v)))
}
mapEntrySize := 1 + len(k) + sovMessage(uint64(len(k))) + l
n += mapEntrySize + 1 + sovMessage(uint64(mapEntrySize))
}
}
return n
}
......@@ -869,9 +921,9 @@ func (m *Message_Request) Unmarshal(dAtA []byte) error {
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Extra", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Extensions", wireType)
}
var byteLen int
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
......@@ -881,22 +933,110 @@ func (m *Message_Request) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
if msglen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + byteLen
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Extra = append(m.Extra[:0], dAtA[iNdEx:postIndex]...)
if m.Extra == nil {
m.Extra = []byte{}
if m.Extensions == nil {
m.Extensions = make(map[string][]byte)
}
var mapkey string
mapvalue := []byte{}
for iNdEx < postIndex {
entryPreIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
if fieldNum == 1 {
var stringLenmapkey uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLenmapkey |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLenmapkey := int(stringLenmapkey)
if intStringLenmapkey < 0 {
return ErrInvalidLengthMessage
}
postStringIndexmapkey := iNdEx + intStringLenmapkey
if postStringIndexmapkey > l {
return io.ErrUnexpectedEOF
}
mapkey = string(dAtA[iNdEx:postStringIndexmapkey])
iNdEx = postStringIndexmapkey
} else if fieldNum == 2 {
var mapbyteLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
mapbyteLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intMapbyteLen := int(mapbyteLen)
if intMapbyteLen < 0 {
return ErrInvalidLengthMessage
}
postbytesIndex := iNdEx + intMapbyteLen
if postbytesIndex > l {
return io.ErrUnexpectedEOF
}
mapvalue = make([]byte, mapbyteLen)
copy(mapvalue, dAtA[iNdEx:postbytesIndex])
iNdEx = postbytesIndex
} else {
iNdEx = entryPreIndex
skippy, err := skipMessage(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMessage
}
if (iNdEx + skippy) > postIndex {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
m.Extensions[mapkey] = mapvalue
iNdEx = postIndex
case 5:
if wireType != 0 {
......@@ -1027,9 +1167,9 @@ func (m *Message_Response) Unmarshal(dAtA []byte) error {
}
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Extra", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Extensions", wireType)
}
var byteLen int
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
......@@ -1039,22 +1179,110 @@ func (m *Message_Response) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
if msglen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + byteLen
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Extra = append(m.Extra[:0], dAtA[iNdEx:postIndex]...)
if m.Extra == nil {
m.Extra = []byte{}
if m.Extensions == nil {
m.Extensions = make(map[string][]byte)
}
var mapkey string
mapvalue := []byte{}
for iNdEx < postIndex {
entryPreIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
if fieldNum == 1 {
var stringLenmapkey uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLenmapkey |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLenmapkey := int(stringLenmapkey)
if intStringLenmapkey < 0 {
return ErrInvalidLengthMessage
}
postStringIndexmapkey := iNdEx + intStringLenmapkey
if postStringIndexmapkey > l {
return io.ErrUnexpectedEOF
}
mapkey = string(dAtA[iNdEx:postStringIndexmapkey])
iNdEx = postStringIndexmapkey
} else if fieldNum == 2 {
var mapbyteLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
mapbyteLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intMapbyteLen := int(mapbyteLen)
if intMapbyteLen < 0 {
return ErrInvalidLengthMessage
}
postbytesIndex := iNdEx + intMapbyteLen
if postbytesIndex > l {
return io.ErrUnexpectedEOF
}
mapvalue = make([]byte, mapbyteLen)
copy(mapvalue, dAtA[iNdEx:postbytesIndex])
iNdEx = postbytesIndex
} else {
iNdEx = entryPreIndex
skippy, err := skipMessage(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMessage
}
if (iNdEx + skippy) > postIndex {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
m.Extensions[mapkey] = mapvalue
iNdEx = postIndex
default:
iNdEx = preIndex
......@@ -1294,32 +1522,36 @@ var (
ErrIntOverflowMessage = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("message.proto", fileDescriptor_message_0f03446033f13d35) }
func init() { proto.RegisterFile("message.proto", fileDescriptor_message_c5788c4e9f6c17be) }
var fileDescriptor_message_0f03446033f13d35 = []byte{
// 380 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0xcf, 0x4e, 0xf2, 0x40,
0x14, 0xc5, 0xfb, 0x9f, 0x7e, 0xf3, 0xa1, 0x8b, 0x91, 0x90, 0x49, 0x17, 0x95, 0x68, 0x34, 0x6c,
0x2c, 0x46, 0xd6, 0x6e, 0xd8, 0x68, 0x8c, 0x6e, 0xfa, 0x06, 0x6d, 0x19, 0x4a, 0x63, 0x61, 0xea,
0xcc, 0x34, 0x81, 0xb7, 0x30, 0xf1, 0x15, 0x7c, 0x18, 0x96, 0x2c, 0x5d, 0x19, 0x03, 0x2f, 0x62,
0x7a, 0x3b, 0x16, 0x13, 0x89, 0xee, 0xee, 0xe9, 0x9d, 0xf3, 0xbb, 0xe7, 0xde, 0x14, 0x1d, 0xcc,
0xa8, 0x10, 0x51, 0x4a, 0x83, 0x82, 0x33, 0xc9, 0x70, 0x27, 0xe5, 0x51, 0x31, 0x15, 0xcb, 0x79,
0x12, 0x34, 0x8d, 0xd8, 0xbb, 0x48, 0x33, 0x39, 0x2d, 0xe3, 0x20, 0x61, 0xb3, 0x41, 0xca, 0x52,
0x36, 0x80, 0xc7, 0x71, 0x39, 0x01, 0x05, 0x02, 0xaa, 0x1a, 0x72, 0xf2, 0x6a, 0xa1, 0xd6, 0x43,
0xed, 0xc6, 0x97, 0xe8, 0x28, 0x61, 0xb3, 0x22, 0xa7, 0x92, 0x86, 0xf4, 0xa9, 0xa4, 0x42, 0xde,
0x67, 0x42, 0x12, 0xbd, 0xa7, 0xf7, 0xdd, 0x70, 0x5f, 0x0b, 0xdf, 0x20, 0x97, 0xd7, 0x52, 0x10,
0xa3, 0x67, 0xf6, 0xff, 0x5f, 0x9d, 0x05, 0xfb, 0x52, 0x05, 0x6a, 0x44, 0xa0, 0xcc, 0x23, 0x6b,
0xf5, 0x7e, 0xac, 0x85, 0x8d, 0x19, 0xdf, 0xa1, 0x7f, 0x9c, 0x8a, 0x82, 0xcd, 0x05, 0x15, 0xc4,
0x04, 0xd2, 0xf9, 0x5f, 0xa4, 0xfa, 0xb9, 0x42, 0xed, 0xec, 0xf8, 0x1a, 0x59, 0xe3, 0x48, 0x46,
0xc4, 0x02, 0xcc, 0xe9, 0xef, 0x98, 0x51, 0xce, 0x92, 0x47, 0xc5, 0x00, 0x9b, 0xf7, 0xa2, 0xa3,
0x96, 0x8a, 0x89, 0x0f, 0x91, 0x91, 0x8d, 0xe1, 0x00, 0x76, 0x68, 0x64, 0x63, 0x8c, 0x91, 0xc5,
0x19, 0x93, 0xc4, 0xe8, 0xe9, 0xfd, 0x76, 0x08, 0x35, 0xf6, 0x90, 0x2b, 0x68, 0x4e, 0x13, 0xc9,
0x38, 0x31, 0xe1, 0x7b, 0xa3, 0x71, 0x07, 0xd9, 0x74, 0x21, 0x79, 0x95, 0xa5, 0x6a, 0xd4, 0xa2,
0x72, 0x14, 0x3c, 0x63, 0x3c, 0x93, 0x4b, 0x62, 0x03, 0xbb, 0xd1, 0xb8, 0x8b, 0x9c, 0x24, 0x9a,
0x27, 0x34, 0x27, 0x0e, 0x9c, 0x5d, 0x29, 0xef, 0x16, 0xb9, 0x5f, 0x1b, 0xff, 0x48, 0xd5, 0x45,
0x8e, 0x90, 0x91, 0x2c, 0x05, 0xe4, 0xb2, 0x43, 0xa5, 0x76, 0xd3, 0xcd, 0x6f, 0xd3, 0xbd, 0x21,
0xb2, 0x61, 0xe9, 0xca, 0x56, 0x70, 0x3a, 0xc9, 0x16, 0x80, 0x6a, 0x87, 0x4a, 0x55, 0x4b, 0xc2,
0xfd, 0xd4, 0x92, 0x55, 0x3d, 0x22, 0xab, 0x8d, 0xaf, 0xaf, 0x37, 0xbe, 0xfe, 0xb1, 0xf1, 0xf5,
0xe7, 0xad, 0xaf, 0xad, 0xb7, 0xbe, 0xf6, 0xb6, 0xf5, 0xb5, 0xd8, 0x81, 0xff, 0x68, 0xf8, 0x19,
0x00, 0x00, 0xff, 0xff, 0xde, 0x2e, 0x80, 0x5d, 0x9d, 0x02, 0x00, 0x00,
var fileDescriptor_message_c5788c4e9f6c17be = []byte{
// 447 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x5d, 0x6b, 0xd4, 0x40,
0x14, 0xdd, 0x64, 0x37, 0xe9, 0xf6, 0x5a, 0x3f, 0x18, 0x4b, 0x19, 0xf2, 0x10, 0x17, 0x45, 0xd9,
0x17, 0x53, 0xb1, 0x28, 0x22, 0xf4, 0x65, 0xa1, 0x08, 0xa2, 0x2f, 0x03, 0xfa, 0x9e, 0xcd, 0xde,
0xa6, 0x43, 0xb3, 0x99, 0x38, 0x33, 0x91, 0xe6, 0x5f, 0x08, 0xfe, 0x07, 0x7f, 0x4b, 0x7d, 0xeb,
0xa3, 0x4f, 0x22, 0xbb, 0x7f, 0x44, 0x72, 0x33, 0xc6, 0xaf, 0x52, 0x17, 0x7c, 0xbb, 0x67, 0x67,
0xce, 0x99, 0x73, 0xee, 0xd9, 0xc0, 0xf5, 0x25, 0x1a, 0x93, 0xe6, 0x98, 0x54, 0x5a, 0x59, 0xc5,
0x76, 0x73, 0x9d, 0x56, 0x27, 0xa6, 0x29, 0xb3, 0xa4, 0x3f, 0x98, 0x47, 0x0f, 0x73, 0x69, 0x4f,
0xea, 0x79, 0x92, 0xa9, 0xe5, 0x7e, 0xae, 0x72, 0xb5, 0x4f, 0x97, 0xe7, 0xf5, 0x31, 0x21, 0x02,
0x34, 0x75, 0x22, 0x77, 0x3f, 0x85, 0xb0, 0xf5, 0xba, 0x63, 0xb3, 0x47, 0x70, 0x3b, 0x53, 0xcb,
0xaa, 0x40, 0x8b, 0x02, 0xdf, 0xd5, 0x68, 0xec, 0x2b, 0x69, 0x2c, 0xf7, 0x26, 0xde, 0x74, 0x2c,
0x2e, 0x3b, 0x62, 0x2f, 0x60, 0xac, 0x3b, 0x68, 0xb8, 0x3f, 0x19, 0x4e, 0xaf, 0x3d, 0xbe, 0x9f,
0x5c, 0xe6, 0x2a, 0x71, 0x4f, 0x24, 0x8e, 0x3c, 0x1b, 0x9d, 0x7f, 0xbd, 0x33, 0x10, 0x3d, 0x99,
0xbd, 0x84, 0x6d, 0x8d, 0xa6, 0x52, 0xa5, 0x41, 0xc3, 0x87, 0xa4, 0xf4, 0xe0, 0x5f, 0x4a, 0xdd,
0x75, 0x27, 0xf5, 0x93, 0xce, 0x0e, 0x61, 0xb4, 0x48, 0x6d, 0xca, 0x47, 0x24, 0x73, 0xef, 0x6a,
0x99, 0x59, 0xa1, 0xb2, 0x53, 0xa7, 0x41, 0xb4, 0xe8, 0xa3, 0x0f, 0x5b, 0xce, 0x26, 0xbb, 0x01,
0xbe, 0x5c, 0xd0, 0x02, 0x02, 0xe1, 0xcb, 0x05, 0x63, 0x30, 0xd2, 0x4a, 0x59, 0xee, 0x4f, 0xbc,
0xe9, 0x8e, 0xa0, 0x99, 0x45, 0x30, 0x36, 0x58, 0x60, 0x66, 0x95, 0xe6, 0x43, 0xfa, 0xbd, 0xc7,
0xec, 0x0d, 0x00, 0x9e, 0x59, 0x2c, 0x8d, 0x54, 0xa5, 0x71, 0x86, 0x9e, 0x6c, 0xb4, 0xa1, 0xe4,
0xa8, 0xe7, 0x1d, 0x95, 0x56, 0x37, 0xe2, 0x17, 0xa1, 0xf6, 0xc9, 0x4a, 0x4b, 0xa5, 0xa5, 0x6d,
0x78, 0x40, 0xe6, 0x7a, 0xcc, 0xf6, 0x20, 0xcc, 0xd2, 0x32, 0xc3, 0x82, 0x87, 0xd4, 0x9b, 0x43,
0xd1, 0x21, 0xdc, 0xfc, 0x43, 0x92, 0xdd, 0x82, 0xe1, 0x29, 0x36, 0x14, 0x6f, 0x5b, 0xb4, 0x23,
0xdb, 0x85, 0xe0, 0x7d, 0x5a, 0xd4, 0xe8, 0x02, 0x76, 0xe0, 0xb9, 0xff, 0xcc, 0x8b, 0x3e, 0x7b,
0x30, 0xfe, 0xb1, 0xf2, 0xbf, 0xd6, 0xb2, 0x07, 0xa1, 0xb1, 0xa9, 0xad, 0x0d, 0xf1, 0x02, 0xe1,
0x10, 0x7b, 0xfb, 0x5b, 0xfc, 0xae, 0xd6, 0xa7, 0x9b, 0xd5, 0x7a, 0x55, 0xfe, 0xff, 0xcd, 0x72,
0x00, 0x01, 0xd5, 0xde, 0xfa, 0xae, 0x34, 0x1e, 0xcb, 0x33, 0xe2, 0xed, 0x08, 0x87, 0xda, 0x9a,
0xe9, 0x1f, 0xe4, 0x6a, 0x6e, 0xe7, 0x19, 0x3f, 0x5f, 0xc5, 0xde, 0xc5, 0x2a, 0xf6, 0xbe, 0xad,
0x62, 0xef, 0xc3, 0x3a, 0x1e, 0x5c, 0xac, 0xe3, 0xc1, 0x97, 0x75, 0x3c, 0x98, 0x87, 0xf4, 0x25,
0x1d, 0x7c, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x8a, 0x3b, 0xe8, 0x33, 0x9f, 0x03, 0x00, 0x00,
}
......@@ -10,7 +10,7 @@ message Message {
int32 id = 1; // unique id set on the requester side
bytes root = 2; // a CID for the root node in the query
bytes selector = 3; // ipld selector to retrieve
bytes extra = 4; // aux information. useful for other protocols
map<string, bytes> extensions = 4; // aux information. useful for other protocols
int32 priority = 5; // the priority (normalized). default to 1
bool cancel = 6; // whether this cancels a request
}
......@@ -18,7 +18,7 @@ message Message {
message Response {
int32 id = 1; // the request id
int32 status = 2; // a status code.
bytes extra = 3; // additional data
map<string, bytes> extensions = 3; // additional data
}
message Block {
......
......@@ -175,9 +175,13 @@ func TestProcessingNotification(t *testing.T) {
newMessage := gsmsg.New()
responseID := gsmsg.GraphSyncRequestID(rand.Int31())
extra := testutil.RandomBytes(100)
extensionName := gsmsg.GraphSyncExtensionName("graphsync/awesome")
extension := gsmsg.GraphSyncExtension{
Name: extensionName,
Data: testutil.RandomBytes(100),
}
status := gsmsg.RequestCompletedFull
newMessage.AddResponse(gsmsg.NewResponse(responseID, status, extra))
newMessage.AddResponse(gsmsg.NewResponse(responseID, status, extension))
processing := messageQueue.AddResponses(newMessage.Responses(), blks)
select {
case <-processing:
......@@ -205,9 +209,11 @@ func TestProcessingNotification(t *testing.T) {
}
}
firstResponse := message.Responses()[0]
extensionData, err := firstResponse.Extension(extensionName)
if responseID != firstResponse.RequestID() ||
status != firstResponse.Status() ||
!reflect.DeepEqual(firstResponse.Extra(), extra) {
err != nil ||
!reflect.DeepEqual(extension.Data, extensionData) {
t.Fatal("Send incorrect response")
}
}
......
......@@ -73,14 +73,18 @@ func TestMessageSendAndReceive(t *testing.T) {
root := testutil.GenerateCids(1)[0]
selector := testutil.RandomBytes(100)
extra := testutil.RandomBytes(100)
extensionName := gsmsg.GraphSyncExtensionName("graphsync/awesome")
extension := gsmsg.GraphSyncExtension{
Name: extensionName,
Data: testutil.RandomBytes(100),
}
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
status := gsmsg.RequestAcknowledged
sent := gsmsg.New()
sent.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
sent.AddResponse(gsmsg.NewResponse(id, status, extra))
sent.AddResponse(gsmsg.NewResponse(id, status, extension))
err = gsnet1.ConnectTo(ctx, host2.ID())
if err != nil {
......@@ -129,9 +133,11 @@ func TestMessageSendAndReceive(t *testing.T) {
t.Fatal("Did not add response to received message")
}
receivedResponse := receivedResponses[0]
extensionData, err := receivedResponse.Extension(extensionName)
if receivedResponse.RequestID() != sentResponse.RequestID() ||
receivedResponse.Status() != sentResponse.Status() ||
!reflect.DeepEqual(receivedResponse.Extra(), sentResponse.Extra()) {
err != nil ||
!reflect.DeepEqual(extension.Data, extensionData) {
t.Fatal("Sent message responses did not match received message responses")
}
......
......@@ -191,13 +191,16 @@ func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
return md
}
func encodedMetadataForBlocks(t *testing.T, ipldBridge ipldbridge.IPLDBridge, blks []blocks.Block, present bool) []byte {
func encodedMetadataForBlocks(t *testing.T, ipldBridge ipldbridge.IPLDBridge, blks []blocks.Block, present bool) gsmsg.GraphSyncExtension {
md := metadataForBlocks(blks, present)
metadataEncoded, err := metadata.EncodeMetadata(md, ipldBridge)
if err != nil {
t.Fatal("did not encode metadata")
}
return metadataEncoded
return gsmsg.GraphSyncExtension{
Name: gsmsg.ExtensionMetadata,
Data: metadataEncoded,
}
}
func TestNormalSimultaneousFetch(t *testing.T) {
......@@ -254,8 +257,14 @@ func TestNormalSimultaneousFetch(t *testing.T) {
t.Fatal("did not encode metadata")
}
firstResponses := []gsmsg.GraphSyncResponse{
gsmsg.NewResponse(requestRecords[0].gsr.ID(), gsmsg.RequestCompletedFull, firstMetadataEncoded1),
gsmsg.NewResponse(requestRecords[1].gsr.ID(), gsmsg.PartialResponse, firstMetadataEncoded2),
gsmsg.NewResponse(requestRecords[0].gsr.ID(), gsmsg.RequestCompletedFull, gsmsg.GraphSyncExtension{
Name: gsmsg.ExtensionMetadata,
Data: firstMetadataEncoded1,
}),
gsmsg.NewResponse(requestRecords[1].gsr.ID(), gsmsg.PartialResponse, gsmsg.GraphSyncExtension{
Name: gsmsg.ExtensionMetadata,
Data: firstMetadataEncoded2,
}),
}
requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
......@@ -279,7 +288,10 @@ func TestNormalSimultaneousFetch(t *testing.T) {
t.Fatal("did not encode metadata")
}
moreResponses := []gsmsg.GraphSyncResponse{
gsmsg.NewResponse(requestRecords[1].gsr.ID(), gsmsg.RequestCompletedFull, moreMetadataEncoded),
gsmsg.NewResponse(requestRecords[1].gsr.ID(), gsmsg.RequestCompletedFull, gsmsg.GraphSyncExtension{
Name: gsmsg.ExtensionMetadata,
Data: moreMetadataEncoded,
}),
}
requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
......@@ -469,7 +481,7 @@ func TestFailedRequest(t *testing.T) {
rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
failedResponses := []gsmsg.GraphSyncResponse{
gsmsg.NewResponse(rr.gsr.ID(), gsmsg.RequestFailedContentNotFound, nil),
gsmsg.NewResponse(rr.gsr.ID(), gsmsg.RequestFailedContentNotFound),
}
requestManager.ProcessResponses(peers[0], failedResponses, nil)
......@@ -506,7 +518,7 @@ func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
// failure comes in later over network
failedResponses := []gsmsg.GraphSyncResponse{
gsmsg.NewResponse(rr.gsr.ID(), gsmsg.RequestFailedContentNotFound, nil),
gsmsg.NewResponse(rr.gsr.ID(), gsmsg.RequestFailedContentNotFound),
}
requestManager.ProcessResponses(peers[0], failedResponses, nil)
......
......@@ -27,7 +27,12 @@ func visitToChannel(ctx context.Context, inProgressChan chan types.ResponseProgr
func metadataForResponses(responses []gsmsg.GraphSyncResponse, ipldBridge ipldbridge.IPLDBridge) map[gsmsg.GraphSyncRequestID]metadata.Metadata {
responseMetadata := make(map[gsmsg.GraphSyncRequestID]metadata.Metadata, len(responses))
for _, response := range responses {
md, err := metadata.DecodeMetadata(response.Extra(), ipldBridge)
mdRaw, err := response.Extension(gsmsg.ExtensionMetadata)
if err != nil {
log.Warningf("Unable to decode metadata in response for request id: %d", response.RequestID())
continue
}
md, err := metadata.DecodeMetadata(mdRaw, ipldBridge)
if err != nil {
log.Warningf("Unable to decode metadata in response for request id: %d", response.RequestID())
continue
......
......@@ -64,12 +64,16 @@ func (rb *ResponseBuilder) Empty() bool {
func (rb *ResponseBuilder) Build(ipldBridge ipldbridge.IPLDBridge) ([]gsmsg.GraphSyncResponse, []blocks.Block, error) {
responses := make([]gsmsg.GraphSyncResponse, 0, len(rb.outgoingResponses))
for requestID, linkMap := range rb.outgoingResponses {
extra, err := metadata.EncodeMetadata(linkMap, ipldBridge)
mdRaw, err := metadata.EncodeMetadata(linkMap, ipldBridge)
if err != nil {
return nil, nil, err
}
md := gsmsg.GraphSyncExtension{
Name: gsmsg.ExtensionMetadata,
Data: mdRaw,
}
status, isComplete := rb.completedResponses[requestID]
responses = append(responses, gsmsg.NewResponse(requestID, responseCode(status, isComplete), extra))
responses = append(responses, gsmsg.NewResponse(requestID, responseCode(status, isComplete), md))
}
return responses, rb.outgoingBlocks, nil
}
......
......@@ -66,7 +66,11 @@ func TestMessageBuilding(t *testing.T) {
t.Fatal("did not generate completed partial response")
}
response1Metadata, err := metadata.DecodeMetadata(response1.Extra(), ipldBridge)
response1MetadataRaw, err := response1.Extension(gsmsg.ExtensionMetadata)
if err != nil {
t.Fatal("Metadata not included in response")
}
response1Metadata, err := metadata.DecodeMetadata(response1MetadataRaw, ipldBridge)
if err != nil || !reflect.DeepEqual(response1Metadata, metadata.Metadata{
metadata.Item{Link: links[0], BlockPresent: true},
metadata.Item{Link: links[1], BlockPresent: false},
......@@ -79,7 +83,11 @@ func TestMessageBuilding(t *testing.T) {
if err != nil || response2.Status() != gsmsg.RequestCompletedFull {
t.Fatal("did not generate completed partial response")
}
response2Metadata, err := metadata.DecodeMetadata(response2.Extra(), ipldBridge)
response2MetadataRaw, err := response2.Extension(gsmsg.ExtensionMetadata)
if err != nil {
t.Fatal("Metadata not included in response")
}
response2Metadata, err := metadata.DecodeMetadata(response2MetadataRaw, ipldBridge)
if err != nil || !reflect.DeepEqual(response2Metadata, metadata.Metadata{
metadata.Item{Link: links[1], BlockPresent: true},
metadata.Item{Link: links[2], BlockPresent: true},
......@@ -92,7 +100,11 @@ func TestMessageBuilding(t *testing.T) {
if err != nil || response3.Status() != gsmsg.PartialResponse {
t.Fatal("did not generate completed partial response")
}
response3Metadata, err := metadata.DecodeMetadata(response3.Extra(), ipldBridge)
response3MetadataRaw, err := response3.Extension(gsmsg.ExtensionMetadata)
if err != nil {
t.Fatal("Metadata not included in response")
}
response3Metadata, err := metadata.DecodeMetadata(response3MetadataRaw, ipldBridge)
if err != nil || !reflect.DeepEqual(response3Metadata, metadata.Metadata{
metadata.Item{Link: links[0], BlockPresent: true},
metadata.Item{Link: links[1], BlockPresent: true},
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment