Commit fe76ad85 authored by tavit ohanian's avatar tavit ohanian

reference basis

parents 70e1c52f 58d4a764
Pipeline #614 failed with stages
in 0 seconds
os:
- linux
language: go
go:
- 1.13.x
env:
global:
- GOTFLAGS="-race"
matrix:
- BUILD_DEPTYPE=gomod
# disable travis install
install:
- true
script:
- bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh)
cache:
directories:
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
notifications:
email: false
The MIT License (MIT)
Copyright (c) 2018 Protocol Labs
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# go-p2p-pubsub-router # go-libp2p-pubsub-router
dms3 p2p go-libp2p-pubsub-router [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
\ No newline at end of file [![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](https://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![GoDoc](https://godoc.org/github.com/libp2p/go-libp2p-pubsub-router?status.svg)](https://godoc.org/github.com/libp2p/go-libp2p-pubsub-router)
[![Coverage Status](https://img.shields.io/codecov/c/github/libp2p/go-libp2p-pubsub-router.svg?style=flat-square&branch=master)](https://codecov.io/github/libp2p/go-libp2p-pubsub-router?branch=master)
[![Build Status](https://travis-ci.org/libp2p/go-libp2p-pubsub-router.svg?branch=master)](https://travis-ci.org/libp2p/go-libp2p-pubsub-router)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
> A libp2p router that uses pubsub.
We currently only use this for IPNS over PubSub.
## Documenation
See https://godoc.org/github.com/libp2p/go-libp2p-pubsub-router.
## Contribute
Feel free to join in. All welcome. Open an [issue](https://github.com/libp2p/go-libp2p-pubsub-router/issues)!
This repository falls under the libp2p [Code of Conduct](https://github.com/libp2p/community/blob/master/code-of-conduct.md).
### Want to hack on libp2p?
[![](https://cdn.rawgit.com/libp2p/community/master/img/contribute.gif)](https://github.com/libp2p/community/blob/master/CONTRIBUTE.md)
## License
MIT
---
The last gx published version of this module was: 0.5.18: QmaHVH3EqQD6DsE1yPgwfCThvFCwfkJ396uyrRSo3Ku1kH
package namesys
import (
"context"
"errors"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-msgio/protoio"
pb "github.com/libp2p/go-libp2p-pubsub-router/pb"
"github.com/gogo/protobuf/proto"
)
const FetchProtoID = protocol.ID("/libp2p/fetch/0.0.1")
type fetchProtocol struct {
ctx context.Context
host host.Host
}
type getValue func(key string) ([]byte, error)
func newFetchProtocol(ctx context.Context, host host.Host, getData getValue) *fetchProtocol {
p := &fetchProtocol{ctx, host}
host.SetStreamHandler(FetchProtoID, func(s network.Stream) {
p.receive(s, getData)
})
return p
}
func (p *fetchProtocol) receive(s network.Stream, getData getValue) {
defer s.Close()
msg := &pb.FetchRequest{}
if err := readMsg(p.ctx, s, msg); err != nil {
log.Infof("error reading request from %s: %s", s.Conn().RemotePeer(), err)
s.Reset()
return
}
response, err := getData(msg.Identifier)
var respProto pb.FetchResponse
if err != nil {
respProto = pb.FetchResponse{Status: pb.FetchResponse_NOT_FOUND}
} else {
respProto = pb.FetchResponse{Data: response}
}
if err := writeMsg(p.ctx, s, &respProto); err != nil {
s.Reset()
return
}
}
func (p *fetchProtocol) Fetch(ctx context.Context, pid peer.ID, key string) ([]byte, error) {
peerCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
s, err := p.host.NewStream(peerCtx, pid, FetchProtoID)
if err != nil {
return nil, err
}
defer s.Close()
msg := &pb.FetchRequest{Identifier: key}
if err := writeMsg(ctx, s, msg); err != nil {
_ = s.Reset()
return nil, err
}
if err := s.CloseWrite(); err != nil {
_ = s.Reset()
return nil, err
}
response := &pb.FetchResponse{}
if err := readMsg(ctx, s, response); err != nil {
_ = s.Reset()
return nil, err
}
switch response.Status {
case pb.FetchResponse_OK:
return response.Data, nil
case pb.FetchResponse_NOT_FOUND:
return nil, nil
default:
return nil, errors.New("fetch: received unknown status code")
}
}
func writeMsg(ctx context.Context, s network.Stream, msg proto.Message) error {
done := make(chan error, 1)
go func() {
wc := protoio.NewDelimitedWriter(s)
if err := wc.WriteMsg(msg); err != nil {
done <- err
return
}
done <- nil
}()
var retErr error
select {
case retErr = <-done:
case <-ctx.Done():
retErr = ctx.Err()
}
if retErr != nil {
log.Infof("error writing response to %s: %s", s.Conn().RemotePeer(), retErr)
}
return retErr
}
func readMsg(ctx context.Context, s network.Stream, msg proto.Message) error {
done := make(chan error, 1)
go func() {
r := protoio.NewDelimitedReader(s, 1<<20)
if err := r.ReadMsg(msg); err != nil {
done <- err
return
}
done <- nil
}()
select {
case err := <-done:
return err
case <-ctx.Done():
s.Reset()
return ctx.Err()
}
}
package namesys
import (
"bytes"
"context"
"errors"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/host"
)
func connect(t *testing.T, a, b host.Host) {
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}
type datastore struct {
data map[string][]byte
}
func (d *datastore) Lookup(key string) ([]byte, error) {
v, ok := d.data[key]
if !ok {
return nil, errors.New("key not found")
}
return v, nil
}
func TestFetchProtocolTrip(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])
// wait for hosts to get connected
time.Sleep(time.Millisecond * 100)
d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup)
d2 := &datastore{map[string][]byte{"key": []byte("value2")}}
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup)
fetchCheck(ctx, t, h1, h2, "key", []byte("value2"))
fetchCheck(ctx, t, h2, h1, "key", []byte("value1"))
}
func TestFetchProtocolNotFound(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])
// wait for hosts to get connected
time.Sleep(time.Millisecond * 100)
d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup)
d2 := &datastore{make(map[string][]byte)}
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup)
fetchCheck(ctx, t, h1, h2, "key", nil)
fetchCheck(ctx, t, h2, h1, "key", []byte("value1"))
}
func TestFetchProtocolRepeated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := newNetHosts(ctx, t, 2)
connect(t, hosts[0], hosts[1])
// wait for hosts to get connected
time.Sleep(time.Millisecond * 100)
d1 := &datastore{map[string][]byte{"key": []byte("value1")}}
h1 := newFetchProtocol(ctx, hosts[0], d1.Lookup)
d2 := &datastore{make(map[string][]byte)}
h2 := newFetchProtocol(ctx, hosts[1], d2.Lookup)
for i := 0; i < 10; i++ {
fetchCheck(ctx, t, h1, h2, "key", nil)
fetchCheck(ctx, t, h2, h1, "key", []byte("value1"))
}
}
func fetchCheck(ctx context.Context, t *testing.T,
requester *fetchProtocol, responder *fetchProtocol, key string, expected []byte) {
data, err := requester.Fetch(ctx, responder.host.ID(), key)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(data, expected) {
t.Fatalf("expected: %v, received: %v", string(expected), string(data))
}
if (data == nil && expected != nil) || (data != nil && expected == nil) {
t.Fatalf("expected []byte{} or nil and received the opposite")
}
}
This diff is collapsed.
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path="$(GOPATH)/src" --proto_path="." --gogofast_out=. $<
clean:
rm -f *.pb.go
rm -f *.go
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: message.proto
package namesys_pb
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type FetchResponse_StatusCode int32
const (
FetchResponse_OK FetchResponse_StatusCode = 0
FetchResponse_NOT_FOUND FetchResponse_StatusCode = 1
FetchResponse_ERROR FetchResponse_StatusCode = 2
)
var FetchResponse_StatusCode_name = map[int32]string{
0: "OK",
1: "NOT_FOUND",
2: "ERROR",
}
var FetchResponse_StatusCode_value = map[string]int32{
"OK": 0,
"NOT_FOUND": 1,
"ERROR": 2,
}
func (x FetchResponse_StatusCode) String() string {
return proto.EnumName(FetchResponse_StatusCode_name, int32(x))
}
func (FetchResponse_StatusCode) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_33c57e4bae7b9afd, []int{1, 0}
}
type FetchRequest struct {
Identifier string `protobuf:"bytes,1,opt,name=identifier,proto3" json:"identifier,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *FetchRequest) Reset() { *m = FetchRequest{} }
func (m *FetchRequest) String() string { return proto.CompactTextString(m) }
func (*FetchRequest) ProtoMessage() {}
func (*FetchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_33c57e4bae7b9afd, []int{0}
}
func (m *FetchRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *FetchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_FetchRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *FetchRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_FetchRequest.Merge(m, src)
}
func (m *FetchRequest) XXX_Size() int {
return m.Size()
}
func (m *FetchRequest) XXX_DiscardUnknown() {
xxx_messageInfo_FetchRequest.DiscardUnknown(m)
}
var xxx_messageInfo_FetchRequest proto.InternalMessageInfo
func (m *FetchRequest) GetIdentifier() string {
if m != nil {
return m.Identifier
}
return ""
}
type FetchResponse struct {
Status FetchResponse_StatusCode `protobuf:"varint,1,opt,name=status,proto3,enum=namesys.pb.FetchResponse_StatusCode" json:"status,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *FetchResponse) Reset() { *m = FetchResponse{} }
func (m *FetchResponse) String() string { return proto.CompactTextString(m) }
func (*FetchResponse) ProtoMessage() {}
func (*FetchResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_33c57e4bae7b9afd, []int{1}
}
func (m *FetchResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *FetchResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_FetchResponse.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *FetchResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_FetchResponse.Merge(m, src)
}
func (m *FetchResponse) XXX_Size() int {
return m.Size()
}
func (m *FetchResponse) XXX_DiscardUnknown() {
xxx_messageInfo_FetchResponse.DiscardUnknown(m)
}
var xxx_messageInfo_FetchResponse proto.InternalMessageInfo
func (m *FetchResponse) GetStatus() FetchResponse_StatusCode {
if m != nil {
return m.Status
}
return FetchResponse_OK
}
func (m *FetchResponse) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func init() {
proto.RegisterEnum("namesys.pb.FetchResponse_StatusCode", FetchResponse_StatusCode_name, FetchResponse_StatusCode_value)
proto.RegisterType((*FetchRequest)(nil), "namesys.pb.FetchRequest")
proto.RegisterType((*FetchResponse)(nil), "namesys.pb.FetchResponse")
}
func init() { proto.RegisterFile("message.proto", fileDescriptor_33c57e4bae7b9afd) }
var fileDescriptor_33c57e4bae7b9afd = []byte{
// 212 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcd, 0x4d, 0x2d, 0x2e,
0x4e, 0x4c, 0x4f, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0x4b, 0xcc, 0x4d, 0x2d,
0xae, 0x2c, 0xd6, 0x2b, 0x48, 0x52, 0xd2, 0xe3, 0xe2, 0x71, 0x4b, 0x2d, 0x49, 0xce, 0x08, 0x4a,
0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x92, 0xe3, 0xe2, 0xca, 0x4c, 0x49, 0xcd, 0x2b, 0xc9, 0x4c,
0xcb, 0x4c, 0x2d, 0x92, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x42, 0x12, 0x51, 0x9a, 0xc8, 0xc8,
0xc5, 0x0b, 0xd5, 0x50, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x2a, 0x64, 0xc3, 0xc5, 0x56, 0x5c, 0x92,
0x58, 0x52, 0x5a, 0x0c, 0x56, 0xcd, 0x67, 0xa4, 0xa2, 0x87, 0x30, 0x5e, 0x0f, 0x45, 0xa9, 0x5e,
0x30, 0x58, 0x9d, 0x73, 0x7e, 0x4a, 0x6a, 0x10, 0x54, 0x8f, 0x90, 0x10, 0x17, 0x4b, 0x4a, 0x62,
0x49, 0xa2, 0x04, 0x93, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x98, 0xad, 0xa4, 0xc7, 0xc5, 0x85, 0x50,
0x29, 0xc4, 0xc6, 0xc5, 0xe4, 0xef, 0x2d, 0xc0, 0x20, 0xc4, 0xcb, 0xc5, 0xe9, 0xe7, 0x1f, 0x12,
0xef, 0xe6, 0x1f, 0xea, 0xe7, 0x22, 0xc0, 0x28, 0xc4, 0xc9, 0xc5, 0xea, 0x1a, 0x14, 0xe4, 0x1f,
0x24, 0xc0, 0xe4, 0xc4, 0x73, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9,
0x31, 0x26, 0xb1, 0x81, 0x3d, 0x69, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0xc8, 0x6e, 0x00, 0xfe,
0xf5, 0x00, 0x00, 0x00,
}
func (m *FetchRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *FetchRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *FetchRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Identifier) > 0 {
i -= len(m.Identifier)
copy(dAtA[i:], m.Identifier)
i = encodeVarintMessage(dAtA, i, uint64(len(m.Identifier)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *FetchResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *FetchResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *FetchResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Data) > 0 {
i -= len(m.Data)
copy(dAtA[i:], m.Data)
i = encodeVarintMessage(dAtA, i, uint64(len(m.Data)))
i--
dAtA[i] = 0x12
}
if m.Status != 0 {
i = encodeVarintMessage(dAtA, i, uint64(m.Status))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func encodeVarintMessage(dAtA []byte, offset int, v uint64) int {
offset -= sovMessage(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *FetchRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Identifier)
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *FetchResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Status != 0 {
n += 1 + sovMessage(uint64(m.Status))
}
l = len(m.Data)
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovMessage(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozMessage(x uint64) (n int) {
return sovMessage(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *FetchRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: FetchRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: FetchRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Identifier", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthMessage
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Identifier = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMessage(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMessage
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthMessage
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *FetchResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: FetchResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: FetchResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType)
}
m.Status = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Status |= FetchResponse_StatusCode(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthMessage
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
if m.Data == nil {
m.Data = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMessage(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMessage
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthMessage
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipMessage(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowMessage
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowMessage
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowMessage
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthMessage
}
iNdEx += length
if iNdEx < 0 {
return 0, ErrInvalidLengthMessage
}
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupMessage
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthMessage = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowMessage = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupMessage = fmt.Errorf("proto: unexpected end of group")
)
syntax = "proto3";
package namesys.pb;
message FetchRequest {
string identifier = 1;
}
message FetchResponse {
StatusCode status = 1;
enum StatusCode {
OK = 0;
NOT_FOUND = 1;
ERROR = 2;
}
bytes data = 2;
}
\ No newline at end of file
package namesys
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
record "github.com/libp2p/go-libp2p-record"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
dshelp "github.com/ipfs/go-ipfs-ds-help"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("pubsub-valuestore")
// Pubsub is the minimal subset of the pubsub interface required by the pubsub
// value store. This way, users can wrap the underlying pubsub implementation
// without re-exporting/implementing the entire interface.
type Pubsub interface {
RegisterTopicValidator(topic string, validator interface{}, opts ...pubsub.ValidatorOpt) error
Join(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error)
}
type watchGroup struct {
// Note: this chan must be buffered, see notifyWatchers
listeners map[chan []byte]struct{}
}
type PubsubValueStore struct {
ctx context.Context
ds ds.Datastore
ps Pubsub
host host.Host
fetch *fetchProtocol
rebroadcastInitialDelay time.Duration
rebroadcastInterval time.Duration
// Map of keys to topics
mx sync.Mutex
topics map[string]*topicInfo
watchLk sync.Mutex
watching map[string]*watchGroup
Validator record.Validator
}
type topicInfo struct {
topic *pubsub.Topic
evts *pubsub.TopicEventHandler
sub *pubsub.Subscription
cancel context.CancelFunc
finished chan struct{}
dbWriteMx sync.Mutex
}
// KeyToTopic converts a binary record key to a pubsub topic key.
func KeyToTopic(key string) string {
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs.
// Encodes to "/record/base64url(key)"
return "/record/" + base64.RawURLEncoding.EncodeToString([]byte(key))
}
// Option is a function that configures a PubsubValueStore during initialization
type Option func(*PubsubValueStore) error
// NewPubsubValueStore constructs a new ValueStore that gets and receives records through pubsub.
func NewPubsubValueStore(ctx context.Context, host host.Host, ps Pubsub, validator record.Validator, opts ...Option) (*PubsubValueStore, error) {
psValueStore := &PubsubValueStore{
ctx: ctx,
ds: dssync.MutexWrap(ds.NewMapDatastore()),
ps: ps,
host: host,
rebroadcastInitialDelay: 100 * time.Millisecond,
rebroadcastInterval: time.Minute * 10,
topics: make(map[string]*topicInfo),
watching: make(map[string]*watchGroup),
Validator: validator,
}
for _, opt := range opts {
err := opt(psValueStore)
if err != nil {
return nil, err
}
}
psValueStore.fetch = newFetchProtocol(ctx, host, psValueStore.getLocal)
go psValueStore.rebroadcast(ctx)
return psValueStore, nil
}
// PutValue publishes a record through pubsub
func (p *PubsubValueStore) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error {
if err := p.Subscribe(key); err != nil {
return err
}
log.Debugf("PubsubPublish: publish value for key", key)
p.mx.Lock()
ti, ok := p.topics[key]
p.mx.Unlock()
if !ok {
return errors.New("could not find topic handle")
}
ti.dbWriteMx.Lock()
defer ti.dbWriteMx.Unlock()
recCmp, err := p.putLocal(ti, key, value)
if err != nil {
return err
}
if recCmp < 0 {
return nil
}
select {
case err := <-p.psPublishChannel(ctx, ti.topic, value):
return err
case <-ctx.Done():
return ctx.Err()
}
}
// compare compares the input value with the current value.
// First return value is 0 if equal, greater than 0 if better, less than 0 if worse.
// Second return value is true if valid.
//
func (p *PubsubValueStore) compare(key string, val []byte) (int, bool) {
if p.Validator.Validate(key, val) != nil {
return -1, false
}
old, err := p.getLocal(key)
if err != nil {
// If the old one is invalid, the new one is *always* better.
return 1, true
}
// Same record is not better
if old != nil && bytes.Equal(old, val) {
return 0, true
}
i, err := p.Validator.Select(key, [][]byte{val, old})
if err == nil && i == 0 {
return 1, true
}
return -1, true
}
func (p *PubsubValueStore) Subscribe(key string) error {
p.mx.Lock()
defer p.mx.Unlock()
// see if we already have a pubsub subscription; if not, subscribe
ti, ok := p.topics[key]
if ok {
return nil
}
topic := KeyToTopic(key)
// Ignore the error. We have to check again anyways to make sure the
// record hasn't expired.
//
// Also, make sure to do this *before* subscribing.
myID := p.host.ID()
_ = p.ps.RegisterTopicValidator(topic, func(
ctx context.Context,
src peer.ID,
msg *pubsub.Message,
) pubsub.ValidationResult {
cmp, valid := p.compare(key, msg.GetData())
if !valid {
return pubsub.ValidationReject
}
if cmp > 0 || cmp == 0 && src == myID {
return pubsub.ValidationAccept
}
return pubsub.ValidationIgnore
})
ti, err := p.createTopicHandler(topic)
if err != nil {
return err
}
p.topics[key] = ti
ctx, cancel := context.WithCancel(p.ctx)
ti.cancel = cancel
go p.handleSubscription(ctx, ti, key)
log.Debugf("PubsubResolve: subscribed to %s", key)
return nil
}
// createTopicHandler creates an internal topic object. Must be called with p.mx held
func (p *PubsubValueStore) createTopicHandler(topic string) (*topicInfo, error) {
t, err := p.ps.Join(topic)
if err != nil {
return nil, err
}
sub, err := t.Subscribe()
if err != nil {
_ = t.Close()
return nil, err
}
evts, err := t.EventHandler()
if err != nil {
sub.Cancel()
_ = t.Close()
}
ti := &topicInfo{
topic: t,
evts: evts,
sub: sub,
finished: make(chan struct{}, 1),
}
return ti, nil
}
func (p *PubsubValueStore) rebroadcast(ctx context.Context) {
select {
case <-time.After(p.rebroadcastInitialDelay):
case <-ctx.Done():
return
}
ticker := time.NewTicker(p.rebroadcastInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.mx.Lock()
keys := make([]string, 0, len(p.topics))
topics := make([]*topicInfo, 0, len(p.topics))
for k, ti := range p.topics {
keys = append(keys, k)
topics = append(topics, ti)
}
p.mx.Unlock()
if len(topics) > 0 {
for i, k := range keys {
val, err := p.getLocal(k)
if err == nil {
topic := topics[i].topic
select {
case <-p.psPublishChannel(ctx, topic, val):
case <-ctx.Done():
return
}
}
}
}
case <-ctx.Done():
return
}
}
}
func (p *PubsubValueStore) psPublishChannel(ctx context.Context, topic *pubsub.Topic, value []byte) chan error {
done := make(chan error, 1)
go func() {
done <- topic.Publish(ctx, value)
}()
return done
}
// putLocal tries to put the key-value pair into the local datastore
// Requires that the ti.dbWriteMx is held when called
// Returns true if the value is better then what is currently in the datastore
// Returns any errors from putting the data in the datastore
func (p *PubsubValueStore) putLocal(ti *topicInfo, key string, value []byte) (int, error) {
cmp, valid := p.compare(key, value)
if valid && cmp > 0 {
return cmp, p.ds.Put(dshelp.NewKeyFromBinary([]byte(key)), value)
}
return cmp, nil
}
func (p *PubsubValueStore) getLocal(key string) ([]byte, error) {
val, err := p.ds.Get(dshelp.NewKeyFromBinary([]byte(key)))
if err != nil {
// Don't invalidate due to ds errors.
if err == ds.ErrNotFound {
err = routing.ErrNotFound
}
return nil, err
}
// If the old one is invalid, the new one is *always* better.
if err := p.Validator.Validate(key, val); err != nil {
return nil, err
}
return val, nil
}
func (p *PubsubValueStore) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
if err := p.Subscribe(key); err != nil {
return nil, err
}
return p.getLocal(key)
}
func (p *PubsubValueStore) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
if err := p.Subscribe(key); err != nil {
return nil, err
}
p.watchLk.Lock()
defer p.watchLk.Unlock()
out := make(chan []byte, 1)
lv, err := p.getLocal(key)
if err == nil {
out <- lv
close(out)
return out, nil
}
wg, ok := p.watching[key]
if !ok {
wg = &watchGroup{
listeners: map[chan []byte]struct{}{},
}
p.watching[key] = wg
}
proxy := make(chan []byte, 1)
ctx, cancel := context.WithCancel(ctx)
wg.listeners[proxy] = struct{}{}
go func() {
defer func() {
cancel()
p.watchLk.Lock()
delete(wg.listeners, proxy)
if _, ok := p.watching[key]; len(wg.listeners) == 0 && ok {
delete(p.watching, key)
}
p.watchLk.Unlock()
close(out)
}()
for {
select {
case val, ok := <-proxy:
if !ok {
return
}
// outCh is buffered, so we just put the value or swap it for the newer one
select {
case out <- val:
case <-out:
out <- val
}
// 1 is good enough
return
case <-ctx.Done():
return
}
}
}()
return out, nil
}
// GetSubscriptions retrieves a list of active topic subscriptions
func (p *PubsubValueStore) GetSubscriptions() []string {
p.mx.Lock()
defer p.mx.Unlock()
var res []string
for sub := range p.topics {
res = append(res, sub)
}
return res
}
// Cancel cancels a topic subscription; returns true if an active
// subscription was canceled
func (p *PubsubValueStore) Cancel(name string) (bool, error) {
p.mx.Lock()
defer p.mx.Unlock()
p.watchLk.Lock()
if _, wok := p.watching[name]; wok {
p.watchLk.Unlock()
return false, fmt.Errorf("key has active subscriptions")
}
p.watchLk.Unlock()
ti, ok := p.topics[name]
if ok {
p.closeTopic(name, ti)
<-ti.finished
}
return ok, nil
}
// closeTopic must be called under the PubSubValueStore's mutex
func (p *PubsubValueStore) closeTopic(key string, ti *topicInfo) {
ti.cancel()
ti.sub.Cancel()
ti.evts.Cancel()
_ = ti.topic.Close()
delete(p.topics, key)
}
func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo, key string) {
defer func() {
close(ti.finished)
p.mx.Lock()
defer p.mx.Unlock()
p.closeTopic(key, ti)
}()
newMsg := make(chan []byte)
go func() {
defer close(newMsg)
for {
data, err := p.handleNewMsgs(ctx, ti.sub, key)
if err != nil {
return
}
select {
case newMsg <- data:
case <-ctx.Done():
return
}
}
}()
newPeerData := make(chan []byte)
go func() {
defer close(newPeerData)
for {
data, err := p.handleNewPeer(ctx, ti.evts, key)
if err == nil {
if data != nil {
select {
case newPeerData <- data:
case <-ctx.Done():
return
}
}
} else {
select {
case <-ctx.Done():
return
default:
log.Errorf("PubsubPeerJoin: error interacting with new peer: %s", err)
}
}
}
}()
for {
var data []byte
var ok bool
select {
case data, ok = <-newMsg:
if !ok {
return
}
case data, ok = <-newPeerData:
if !ok {
return
}
case <-ctx.Done():
return
}
ti.dbWriteMx.Lock()
recCmp, err := p.putLocal(ti, key, data)
ti.dbWriteMx.Unlock()
if recCmp > 0 {
if err != nil {
log.Warnf("PubsubResolve: error writing update for %s: %s", key, err)
}
p.notifyWatchers(key, data)
}
}
}
func (p *PubsubValueStore) handleNewMsgs(ctx context.Context, sub *pubsub.Subscription, key string) ([]byte, error) {
msg, err := sub.Next(ctx)
if err != nil {
if err != context.Canceled {
log.Warnf("PubsubResolve: subscription error in %s: %s", key, err.Error())
}
return nil, err
}
return msg.GetData(), nil
}
func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pubsub.TopicEventHandler, key string) ([]byte, error) {
for ctx.Err() == nil {
peerEvt, err := peerEvtHandler.NextPeerEvent(ctx)
if err != nil {
if err != context.Canceled {
log.Warnf("PubsubNewPeer: subscription error in %s: %s", key, err.Error())
}
return nil, err
}
if peerEvt.Type != pubsub.PeerJoin {
continue
}
pid := peerEvt.Peer
value, err := p.fetch.Fetch(ctx, pid, key)
if err == nil {
return value, nil
}
log.Debugf("failed to fetch latest pubsub value for key '%s' from peer '%s': %s", key, pid, err)
}
return nil, ctx.Err()
}
func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {
p.watchLk.Lock()
defer p.watchLk.Unlock()
sg, ok := p.watching[key]
if !ok {
return
}
for watcher := range sg.listeners {
select {
case <-watcher:
watcher <- data
case watcher <- data:
}
}
}
func WithRebroadcastInterval(duration time.Duration) Option {
return func(store *PubsubValueStore) error {
store.rebroadcastInterval = duration
return nil
}
}
func WithRebroadcastInitialDelay(duration time.Duration) Option {
return func(store *PubsubValueStore) error {
store.rebroadcastInitialDelay = duration
return nil
}
}
// WithDatastore returns an option that overrides the default datastore.
func WithDatastore(datastore ds.Datastore) Option {
return func(store *PubsubValueStore) error {
store.ds = datastore
return nil
}
}
package namesys
import (
"bytes"
"context"
"fmt"
"testing"
"time"
"golang.org/x/sync/errgroup"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
bhost "github.com/libp2p/go-libp2p-blankhost"
pubsub "github.com/libp2p/go-libp2p-pubsub"
record "github.com/libp2p/go-libp2p-record"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
)
func newNetHost(ctx context.Context, t *testing.T) host.Host {
netw := swarmt.GenSwarm(t, ctx)
return bhost.NewBlankHost(netw)
}
func newNetHosts(ctx context.Context, t *testing.T, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
h := newNetHost(ctx, t)
out = append(out, h)
}
return out
}
type testValidator struct{}
func (testValidator) Validate(key string, value []byte) error {
ns, k, err := record.SplitKey(key)
if err != nil {
return err
}
if ns != "namespace" {
return record.ErrInvalidRecordType
}
if !bytes.Contains(value, []byte(k)) {
return record.ErrInvalidRecordType
}
if bytes.Contains(value, []byte("invalid")) {
return record.ErrInvalidRecordType
}
return nil
}
func (testValidator) Select(key string, vals [][]byte) (int, error) {
if len(vals) == 0 {
panic("selector with no values")
}
var best []byte
idx := 0
for i, val := range vals {
if bytes.Compare(best, val) < 0 {
best = val
idx = i
}
}
return idx, nil
}
func setupTest(ctx context.Context, t *testing.T) (*PubsubValueStore, []*PubsubValueStore) {
key := "/namespace/key"
hosts := newNetHosts(ctx, t, 5)
vss := make([]*PubsubValueStore, len(hosts))
for i := 0; i < len(vss); i++ {
fs, err := pubsub.NewFloodSub(ctx, hosts[i])
if err != nil {
t.Fatal(err)
}
vss[i], err = NewPubsubValueStore(ctx, hosts[i], fs, testValidator{})
if err != nil {
t.Fatal(err)
}
}
pub := vss[0]
vss = vss[1:]
pubinfo := hosts[0].Peerstore().PeerInfo(hosts[0].ID())
for _, h := range hosts[1:] {
if err := h.Connect(ctx, pubinfo); err != nil {
t.Fatal(err)
}
}
time.Sleep(time.Millisecond * 100)
for i, vs := range vss {
checkNotFound(ctx, t, i, vs, key)
// delay to avoid connection storms
time.Sleep(time.Millisecond * 100)
}
// let the bootstrap finish
time.Sleep(time.Second * 1)
return pub, vss
}
// tests
func TestEarlyPublish(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := newNetHosts(ctx, t, 5)
key := "/namespace/key"
val := []byte("valid for key 1")
vss := make([]*PubsubValueStore, len(hosts))
for i := 0; i < len(vss); i++ {
fs, err := pubsub.NewFloodSub(ctx, hosts[i])
if err != nil {
t.Fatal(err)
}
vss[i], err = NewPubsubValueStore(ctx, hosts[i], fs, testValidator{})
if err != nil {
t.Fatal(err)
}
}
pub := vss[0]
vss = vss[1:]
if err := pub.PutValue(ctx, key, val); err != nil {
t.Fatal(err)
}
for i, vs := range vss {
connect(t, hosts[i], hosts[i+1])
if err := vs.Subscribe(key); err != nil {
t.Fatal(err)
}
}
// Wait for Fetch protocol to retrieve data
time.Sleep(time.Second * 1)
for i, vs := range vss {
checkValue(ctx, t, i, vs, key, val)
}
}
func TestPubsubPublishSubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pub, vss := setupTest(ctx, t)
key := "/namespace/key"
key2 := "/namespace/key2"
val := []byte("valid for key 1")
err := pub.PutValue(ctx, key, val)
if err != nil {
t.Fatal(err)
}
// let the flood propagate
time.Sleep(time.Second * 1)
for i, vs := range vss {
checkValue(ctx, t, i, vs, key, val)
}
val = []byte("valid for key 2")
err = pub.PutValue(ctx, key, val)
if err != nil {
t.Fatal(err)
}
// let the flood propagate
time.Sleep(time.Second * 1)
for i, vs := range vss {
checkValue(ctx, t, i, vs, key, val)
}
// Check selector.
nval := []byte("valid for key 1")
err = pub.PutValue(ctx, key, val)
if err != nil {
t.Fatal(err)
}
// let the flood propagate
time.Sleep(time.Second * 1)
for i, vs := range vss {
checkValue(ctx, t, i, vs, key, val)
}
// Check validator.
nval = []byte("valid for key 9999 invalid")
err = pub.PutValue(ctx, key, val)
if err != nil {
t.Fatal(err)
}
// let the flood propagate
time.Sleep(time.Second * 1)
for i, vs := range vss {
checkValue(ctx, t, i, vs, key, val)
}
// Different key?
// subscribe to the second key
for i, vs := range vss {
checkNotFound(ctx, t, i, vs, key2)
}
time.Sleep(time.Second * 1)
// Put to the second key
nval = []byte("valid for key2")
err = pub.PutValue(ctx, key2, nval)
if err != nil {
t.Fatal(err)
}
// let the flood propagate
time.Sleep(time.Second * 1)
for i, vs := range vss {
checkValue(ctx, t, i, vs, key2, nval)
checkValue(ctx, t, i, vs, key, val)
}
// cancel subscriptions
for _, vs := range vss {
vs.Cancel(key)
}
time.Sleep(time.Millisecond * 100)
// Get missed value?
nval = []byte("valid for key 3")
err = pub.PutValue(ctx, key, nval)
if err != nil {
t.Fatal(err)
}
// resubscribe
for _, vs := range vss {
if err := vs.Subscribe(key); err != nil {
t.Fatal(err)
}
}
// check that we get the new value
time.Sleep(time.Second * 1)
for i, vs := range vss {
checkValue(ctx, t, i, vs, key, nval)
}
}
func TestWatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pub, vss := setupTest(ctx, t)
key := "/namespace/key"
key2 := "/namespace/key2"
ch, err := vss[1].SearchValue(ctx, key)
if err != nil {
t.Fatal(err)
}
val := []byte("valid for key 1")
err = pub.PutValue(ctx, key, val)
if err != nil {
t.Fatal(err)
}
v := string(<-ch)
if v != "valid for key 1" {
t.Errorf("got unexpected value: %s", v)
}
val = []byte("valid for key 2")
err = pub.PutValue(ctx, key, val)
if err != nil {
t.Fatal(err)
}
ch, err = vss[1].SearchValue(ctx, key2)
if err != nil {
t.Fatal(err)
}
_, err = vss[1].Cancel(key2)
if err.Error() != "key has active subscriptions" {
t.Fatal("cancel should have failed")
}
// let the flood propagate
time.Sleep(time.Second * 1)
ch, err = vss[1].SearchValue(ctx, key)
if err != nil {
t.Fatal(err)
}
v = string(<-ch)
if v != "valid for key 2" {
t.Errorf("got unexpected value: %s", v)
}
}
func TestPutMany(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := newNetHosts(ctx, t, 5)
vss := make([]*PubsubValueStore, len(hosts))
for i := 0; i < len(vss); i++ {
fs, err := pubsub.NewFloodSub(ctx, hosts[i])
if err != nil {
t.Fatal(err)
}
vss[i], err = NewPubsubValueStore(ctx, hosts[i], fs, testValidator{})
if err != nil {
t.Fatal(err)
}
}
for i := 1; i < len(hosts); i++ {
connect(t, hosts[0], hosts[i])
}
const numRuns = 10
const numRoutines = 1000 // Note: if changing the numRoutines also change the number of digits in the fmtString
const fmtString = "%s-%04d"
const baseKey = "/namespace/key"
for i := 0; i < numRuns; i++ {
key := fmt.Sprintf("%s/%d", baseKey, i)
var eg errgroup.Group
for j := 0; j < numRoutines; j++ {
rtNum := j
eg.Go(func() error {
return vss[0].PutValue(ctx, key, []byte(fmt.Sprintf(fmtString, key, rtNum)))
})
}
if err := eg.Wait(); err != nil {
t.Fatal(err)
}
finalValue := []byte(fmt.Sprintf(fmtString, key, numRoutines-1))
for j := 0; j < len(hosts); j++ {
for {
v, err := vss[j].GetValue(ctx, key)
if err != routing.ErrNotFound {
if err != nil {
t.Fatal(err)
}
if bytes.Equal(v, finalValue) {
break
}
}
time.Sleep(time.Millisecond * 100)
}
}
}
}
func checkNotFound(ctx context.Context, t *testing.T, i int, vs routing.ValueStore, key string) {
t.Helper()
_, err := vs.GetValue(ctx, key)
if err != routing.ErrNotFound {
t.Fatalf("[vssolver %d] unexpected error: %s", i, err.Error())
}
}
func checkValue(ctx context.Context, t *testing.T, i int, vs routing.ValueStore, key string, val []byte) {
t.Helper()
xval, err := vs.GetValue(ctx, key)
if err != nil {
t.Fatalf("[ValueStore %d] vssolve failed: %s", i, err.Error())
}
if !bytes.Equal(xval, val) {
t.Fatalf("[ValueStore %d] unexpected value: expected '%s', got '%s'", i, val, xval)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment