Commit af2f04ae authored by Jeromy's avatar Jeromy

fixing up some bitswap stuff after the PR

parent cfdf01d5
......@@ -32,14 +32,13 @@ type BitSwap struct {
net swarm.Network
meschan *swarm.Chan
// datastore is the local database
// Ledgers of known
// datastore is the local database // Ledgers of known
datastore ds.Datastore
// routing interface for communication
routing *dht.IpfsDHT
listener *swarm.MesListener
listener *swarm.MessageListener
// partners is a map of currently active bitswap relationships.
// The Ledger has the peer.ID, and the peer connection works through net.
......@@ -67,7 +66,7 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR
routing: r.(*dht.IpfsDHT),
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP),
haltChan: make(chan struct{}),
listener: swarm.NewMesListener(),
listener: swarm.NewMessageListener(),
}
go bs.handleMessages()
......@@ -84,11 +83,11 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
valchan := make(chan []byte)
after := time.After(tleft)
// TODO: when the data is received, shut down this for loop
// TODO: when the data is received, shut down this for loop ASAP
go func() {
for p := range provs_ch {
go func(pr *peer.Peer) {
ledger := bs.GetLedger(pr.Key())
ledger := bs.GetLedger(pr)
blk, err := bs.getBlock(k, pr, tleft)
if err != nil {
u.PErr("getBlock returned: %v\n", err)
......@@ -170,6 +169,8 @@ func (bs *BitSwap) handleMessages() {
switch pmes.GetType() {
case PBMessage_GET_BLOCK:
go bs.handleGetBlock(mes.Peer, pmes)
case PBMessage_WANT_BLOCK:
go bs.handleWantBlock(mes.Peer, pmes)
default:
u.PErr("Invalid message type.\n")
}
......@@ -179,9 +180,18 @@ func (bs *BitSwap) handleMessages() {
}
}
func (bs *BitSwap) handleWantBlock(p *peer.Peer, pmes *PBMessage) {
wants := pmes.GetWantlist()
ledg := bs.GetLedger(p)
for _, s := range wants {
// TODO: this needs to be different. We need timeouts.
ledg.WantList[u.Key(s)] = struct{}{}
}
}
func (bs *BitSwap) handleGetBlock(p *peer.Peer, pmes *PBMessage) {
u.DOut("handleGetBlock.\n")
ledger := bs.GetLedger(p.Key())
ledger := bs.GetLedger(p)
u.DOut("finding [%s] in datastore.\n", u.Key(pmes.GetKey()).Pretty())
idata, err := bs.datastore.Get(ds.NewKey(pmes.GetKey()))
......@@ -216,19 +226,35 @@ func (bs *BitSwap) handleGetBlock(p *peer.Peer, pmes *PBMessage) {
}
}
func (bs *BitSwap) GetLedger(k u.Key) *Ledger {
l, ok := bs.partners[k]
func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger {
l, ok := bs.partners[p.Key()]
if ok {
return l
}
l = new(Ledger)
l.Strategy = StandardStrategy
l.Partner = peer.ID(k)
bs.partners[k] = l
l.Partner = p
bs.partners[p.Key()] = l
return l
}
func (bs *BitSwap) SendWantList(wl KeySet) error {
mes := Message{
ID: swarm.GenerateMessageID(),
Type: PBMessage_WANT_BLOCK,
WantList: bs.wantList,
}
pbmes := mes.ToProtobuf()
// Lets just ping everybody all at once
for _, ledger := range bs.partners {
bs.meschan.Outgoing <- swarm.NewMessage(ledger.Partner, pbmes)
}
return nil
}
func (bs *BitSwap) Halt() {
bs.haltChan <- struct{}{}
}
......@@ -10,17 +10,17 @@ import (
// Ledger stores the data exchange relationship between two peers.
type Ledger struct {
// Partner is the ID of the remote Peer.
Partner peer.ID
// Partner is the remote Peer.
Partner *peer.Peer
// Accounting tracks bytes sent and recieved.
Accounting debtRatio
// FirstExchnage is the time of the first data exchange.
FirstExchange *time.Time
FirstExchange time.Time
// LastExchange is the time of the last data exchange.
LastExchange *time.Time
LastExchange time.Time
// WantList is a (bounded, small) set of keys that Partner desires.
WantList KeySet
......@@ -36,9 +36,11 @@ func (l *Ledger) ShouldSend() bool {
}
func (l *Ledger) SentBytes(n uint64) {
l.LastExchange = time.Now()
l.Accounting.BytesSent += n
}
func (l *Ledger) ReceivedBytes(n uint64) {
l.LastExchange = time.Now()
l.Accounting.BytesRecv += n
}
......@@ -12,6 +12,7 @@ type Message struct {
Key u.Key
Value []byte
Success bool
WantList KeySet
}
func (m *Message) ToProtobuf() *PBMessage {
......@@ -26,6 +27,14 @@ func (m *Message) ToProtobuf() *PBMessage {
pmes.Success = proto.Bool(true)
}
if m.WantList != nil {
var swant []string
for k, _ := range m.WantList {
swant = append(swant, string(k))
}
pmes.Wantlist = swant
}
pmes.Key = proto.String(string(m.Key))
pmes.Value = m.Value
return pmes
......
......@@ -23,14 +23,17 @@ var _ = math.Inf
type PBMessage_MessageType int32
const (
PBMessage_GET_BLOCK PBMessage_MessageType = 0
PBMessage_GET_BLOCK PBMessage_MessageType = 0
PBMessage_WANT_BLOCK PBMessage_MessageType = 1
)
var PBMessage_MessageType_name = map[int32]string{
0: "GET_BLOCK",
1: "WANT_BLOCK",
}
var PBMessage_MessageType_value = map[string]int32{
"GET_BLOCK": 0,
"GET_BLOCK": 0,
"WANT_BLOCK": 1,
}
func (x PBMessage_MessageType) Enum() *PBMessage_MessageType {
......@@ -57,6 +60,7 @@ type PBMessage struct {
Value []byte `protobuf:"bytes,4,opt,name=value" json:"value,omitempty"`
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"`
Wantlist []string `protobuf:"bytes,7,rep,name=wantlist" json:"wantlist,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
......@@ -106,6 +110,13 @@ func (m *PBMessage) GetSuccess() bool {
return false
}
func (m *PBMessage) GetWantlist() []string {
if m != nil {
return m.Wantlist
}
return nil
}
func init() {
proto.RegisterEnum("bitswap.PBMessage_MessageType", PBMessage_MessageType_name, PBMessage_MessageType_value)
}
......@@ -3,6 +3,7 @@ package bitswap;
message PBMessage {
enum MessageType {
GET_BLOCK = 0;
WANT_BLOCK = 1;
}
required MessageType Type = 1;
......@@ -11,4 +12,5 @@ message PBMessage {
optional bytes value = 4;
optional bool response = 5;
optional bool success = 6;
repeated string wantlist = 7;
}
......@@ -2,7 +2,6 @@ package blockservice
import (
"bytes"
"fmt"
"testing"
ds "github.com/jbenet/datastore.go"
......@@ -11,9 +10,8 @@ import (
)
func TestBlocks(t *testing.T) {
d := ds.NewMapDatastore()
bs, err := NewBlockService(d)
bs, err := NewBlockService(d, nil)
if err != nil {
t.Error("failed to construct block service", err)
return
......@@ -62,7 +60,4 @@ func TestBlocks(t *testing.T) {
if !bytes.Equal(b.Data, b2.Data) {
t.Error("Block data is not equal.")
}
fmt.Printf("key: %s\n", b.Key())
fmt.Printf("data: %v\n", b.Data)
}
......@@ -25,7 +25,7 @@ func NewBlockService(d ds.Datastore, rem *bitswap.BitSwap) (*BlockService, error
return nil, fmt.Errorf("BlockService requires valid datastore")
}
if rem == nil {
return nil, fmt.Errorf("BlockService requires a valid bitswap")
u.PErr("Caution: blockservice running in local (offline) mode.\n")
}
return &BlockService{Datastore: d, Remote: rem}, nil
}
......@@ -39,7 +39,9 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
if err != nil {
return k, err
}
err = s.Remote.HaveBlock(b.Key())
if s.Remote != nil {
err = s.Remote.HaveBlock(b.Key())
}
return k, err
}
......@@ -57,7 +59,7 @@ func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
Multihash: mh.Multihash(k),
Data: bdata,
}, nil
} else if err == ds.ErrNotFound {
} else if err == ds.ErrNotFound && s.Remote != nil {
blk, err := s.Remote.GetBlock(k, time.Second*5)
if err != nil {
return nil, err
......
......@@ -49,7 +49,7 @@ type IpfsDHT struct {
diaglock sync.Mutex
// listener is a server to register to listen for responses to messages
listener *swarm.MesListener
listener *swarm.MessageListener
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
......@@ -66,7 +66,7 @@ func NewDHT(p *peer.Peer, net swarm.Network, dstore ds.Datastore) *IpfsDHT {
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
dht.listener = swarm.NewMesListener()
dht.listener = swarm.NewMessageListener()
dht.birth = time.Now()
return dht
}
......
......@@ -78,7 +78,7 @@ func TestTableUpdate(t *testing.T) {
for i := 0; i < 10000; i++ {
p := rt.Update(peers[rand.Intn(len(peers))])
if p != nil {
t.Log("evicted peer.")
//t.Log("evicted peer.")
}
}
......
......@@ -8,7 +8,7 @@ import (
u "github.com/jbenet/go-ipfs/util"
)
type MesListener struct {
type MessageListener struct {
listeners map[uint64]*listenInfo
haltchan chan struct{}
unlist chan uint64
......@@ -41,8 +41,8 @@ type listenInfo struct {
id uint64
}
func NewMesListener() *MesListener {
ml := new(MesListener)
func NewMessageListener() *MessageListener {
ml := new(MessageListener)
ml.haltchan = make(chan struct{})
ml.listeners = make(map[uint64]*listenInfo)
ml.nlist = make(chan *listenInfo, 16)
......@@ -52,7 +52,7 @@ func NewMesListener() *MesListener {
return ml
}
func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-chan *Message {
func (ml *MessageListener) Listen(id uint64, count int, timeout time.Duration) <-chan *Message {
li := new(listenInfo)
li.count = count
li.eol = time.Now().Add(timeout)
......@@ -62,7 +62,7 @@ func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-cha
return li.resp
}
func (ml *MesListener) Unlisten(id uint64) {
func (ml *MessageListener) Unlisten(id uint64) {
ml.unlist <- id
}
......@@ -71,18 +71,18 @@ type respMes struct {
mes *Message
}
func (ml *MesListener) Respond(id uint64, mes *Message) {
func (ml *MessageListener) Respond(id uint64, mes *Message) {
ml.send <- &respMes{
id: id,
mes: mes,
}
}
func (ml *MesListener) Halt() {
func (ml *MessageListener) Halt() {
ml.haltchan <- struct{}{}
}
func (ml *MesListener) run() {
func (ml *MessageListener) run() {
for {
select {
case <-ml.haltchan:
......
......@@ -8,8 +8,8 @@ import (
)
// Ensure that the Message Listeners basic functionality works
func TestMesListenerBasic(t *testing.T) {
ml := NewMesListener()
func TestMessageListener(t *testing.T) {
ml := NewMessageListener()
a := GenerateMessageID()
resp := ml.Listen(a, 1, time.Minute)
......@@ -20,7 +20,7 @@ func TestMesListenerBasic(t *testing.T) {
go ml.Respond(a, mes)
del := time.After(time.Millisecond * 10)
del := time.After(time.Millisecond * 100)
select {
case get := <-resp:
if string(get.Data) != string(mes.Data) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment