Commit 8203d2c0 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #26 from jbenet/dht-p1

DHT merging part 1
parents 41a725c2 dd08e0ed
package bitswap
import (
"time"
mh "github.com/jbenet/go-multihash"
blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
mh "github.com/jbenet/go-multihash"
"time"
)
// aliases
type Ledger struct {
Owner mh.Multihash
Partner mh.Multihash
Owner mh.Multihash
Partner mh.Multihash
BytesSent uint64
BytesRecv uint64
Timestamp *time.Time
}
type BitSwap struct {
Ledgers map[u.Key]*Ledger // key is peer.ID
HaveList map[u.Key]*blocks.Block // key is multihash
WantList []*mh.Multihash
Ledgers map[u.Key]*Ledger // key is peer.ID
HaveList map[u.Key]*blocks.Block // key is multihash
WantList []*mh.Multihash
// todo
}
......@@ -51,7 +51,9 @@ func Load(filename string) (*Config, error) {
// if nothing is there, write first config file.
if _, err := os.Stat(filename); os.IsNotExist(err) {
WriteFile(filename, []byte(defaultConfigFile))
if err := WriteFile(filename, []byte(defaultConfigFile)); err != nil {
return nil, err
}
}
var cfg Config
......
......@@ -7,7 +7,7 @@ import (
func TestConfig(t *testing.T) {
cfg, err := Load("")
cfg, err := Load(".ipfsconfig")
if err != nil {
t.Error(err)
return
......
package dht
import (
"time"
mh "github.com/jbenet/go-multihash"
peer "github.com/jbenet/go-ipfs/peer"
"errors"
"net"
)
var NotFound = errors.New("Not Found")
var NotAvailable = errors.New("Not Available")
var TimeoutExceeded = errors.New("Timeout Exceeded")
// The IPFS DHT is an implementation of Kademlia with
// Coral and S/Kademlia modifications. It is used to
// implement the base IPFS Routing module.
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js
type DHT struct {
//Network
Network net.Conn
// DHT Configuration Settings
Config DHTConfig
//Republish
Republish *DHTRepublish
}
// TODO: not call this republish
type DHTRepublish struct {
Strict []*DHTObject
Sloppy []*DHTObject
}
type DHTObject struct {
Key string
Value *DHTValue
LastPublished *time.Time
}
func (o *DHTObject) ShouldRepublish(interval time.Duration) bool {
return (time.Now().Second() - o.LastPublished.Second()) > int(interval.Seconds())
}
// A struct representing a value in the DHT
type DHTValue struct {}
type DHTConfig struct {
// Time to wait between republishing intervals
RepublishInterval time.Duration
// Multihash hash function
HashType int
}
// Looks for a particular node
func (dht *DHT) FindNode(id *peer.ID /* and a callback? */) error {
panic("Not implemented.")
}
func (dht *DHT) PingNode(id *peer.ID, timeout time.Duration) error {
panic("Not implemented.")
}
// Retrieves a value for a given key
func (dht *DHT) GetValue(key string) *DHTValue {
panic("Not implemented.")
}
// Stores a value for a given key
func (dht *DHT) SetValue(key string, value *DHTValue) error {
panic("Not implemented.")
}
// GetSloppyValues finds (at least) a number of values for given key
func (dht *DHT) GetSloppyValues(key string, count int) ([]*DHTValue, error) {
panic("Not implemented.")
}
func (dht *DHT) SetSloppyValue(key string, value *DHTValue) error {
panic("Not implemented.")
}
func (dht *DHT) periodicRepublish() {
tick := time.NewTicker(time.Second * 5)
for {
select {
case <-tick.C:
for _,v := range dht.Republish.Strict {
if v.ShouldRepublish(dht.Config.RepublishInterval) {
dht.SetValue(v.Key, v.Value)
}
}
for _,v := range dht.Republish.Sloppy {
if v.ShouldRepublish(dht.Config.RepublishInterval) {
dht.SetSloppyValue(v.Key, v.Value)
}
}
}
}
}
func (dht *DHT) handleMessage(message []byte) {
}
func (dht *DHT) coerceMultihash(hash mh.Multihash) {
}
package dht
import (
"sync"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/datastore.go"
"code.google.com/p/goprotobuf/proto"
)
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js
// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
routes RoutingTable
network *swarm.Swarm
// Local peer (yourself)
self *peer.Peer
// Local data
datastore ds.Datastore
// map of channels waiting for reply messages
listeners map[uint64]chan *swarm.Message
listenLock sync.RWMutex
// Signal to shutdown dht
shutdown chan struct{}
}
func NewDHT(p *peer.Peer) *IpfsDHT {
dht := new(IpfsDHT)
dht.self = p
dht.network = swarm.NewSwarm(p)
dht.listeners = make(map[uint64]chan *swarm.Message)
dht.shutdown = make(chan struct{})
return dht
}
// Read in all messages from swarm and handle them appropriately
// NOTE: this function is just a quick sketch
func (dht *IpfsDHT) handleMessages() {
for {
select {
case mes := <-dht.network.Chan.Incoming:
pmes := new(DHTMessage)
err := proto.Unmarshal(mes.Data, pmes)
if err != nil {
u.PErr("Failed to decode protobuf message: %s", err)
continue
}
// Note: not sure if this is the correct place for this
if pmes.GetResponse() {
dht.listenLock.RLock()
ch, ok := dht.listeners[pmes.GetId()]
dht.listenLock.RUnlock()
if ok {
ch <- mes
}
// this is expected behaviour during a timeout
u.DOut("Received response with nobody listening...")
continue
}
//
switch pmes.GetType() {
case DHTMessage_GET_VALUE:
dht.handleGetValue(mes.Peer, pmes)
case DHTMessage_PUT_VALUE:
dht.handlePutValue(mes.Peer, pmes)
case DHTMessage_FIND_NODE:
dht.handleFindNode(mes.Peer, pmes)
case DHTMessage_ADD_PROVIDER:
case DHTMessage_GET_PROVIDERS:
case DHTMessage_PING:
dht.handleFindNode(mes.Peer, pmes)
}
case <-dht.shutdown:
return
}
}
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
i_val, err := dht.datastore.Get(dskey)
if err == nil {
isResponse := true
resp := new(DHTMessage)
resp.Response = &isResponse
resp.Id = pmes.Id
resp.Key = pmes.Key
val := i_val.([]byte)
resp.Value = val
mes := new(swarm.Message)
mes.Peer = p
mes.Data = []byte(resp.String())
} else if err == ds.ErrNotFound {
// Find closest node(s) to desired key and reply with that info
// TODO: this will need some other metadata in the protobuf message
// to signal to the querying node that the data its receiving
// is actually a list of other nodes
}
}
// Store a value in this nodes local storage
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
err := dht.datastore.Put(dskey, pmes.GetValue())
if err != nil {
// For now, just panic, handle this better later maybe
panic(err)
}
}
func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
panic("Not implemented.")
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
isResponse := true
resp := new(DHTMessage)
resp.Id = pmes.Id
resp.Response = &isResponse
mes := new(swarm.Message)
mes.Peer = p
mes.Data = []byte(resp.String())
dht.network.Chan.Outgoing <- mes
}
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
lchan := make(chan *swarm.Message)
dht.listenLock.Lock()
dht.listeners[mesid] = lchan
dht.listenLock.Unlock()
return lchan
}
func (dht *IpfsDHT) Unlisten(mesid uint64) {
dht.listenLock.Lock()
ch, ok := dht.listeners[mesid]
if ok {
delete(dht.listeners, mesid)
}
dht.listenLock.Unlock()
close(ch)
}
// Stop all communications from this node and shut down
func (dht *IpfsDHT) Halt() {
dht.shutdown <- struct{}{}
dht.network.Close()
}
// Code generated by protoc-gen-go.
// source: messages.proto
// DO NOT EDIT!
/*
Package dht is a generated protocol buffer package.
It is generated from these files:
messages.proto
It has these top-level messages:
DHTMessage
*/
package dht
import proto "code.google.com/p/goprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type DHTMessage_MessageType int32
const (
DHTMessage_PUT_VALUE DHTMessage_MessageType = 0
DHTMessage_GET_VALUE DHTMessage_MessageType = 1
DHTMessage_ADD_PROVIDER DHTMessage_MessageType = 2
DHTMessage_GET_PROVIDERS DHTMessage_MessageType = 3
DHTMessage_FIND_NODE DHTMessage_MessageType = 4
DHTMessage_PING DHTMessage_MessageType = 5
)
var DHTMessage_MessageType_name = map[int32]string{
0: "PUT_VALUE",
1: "GET_VALUE",
2: "ADD_PROVIDER",
3: "GET_PROVIDERS",
4: "FIND_NODE",
5: "PING",
}
var DHTMessage_MessageType_value = map[string]int32{
"PUT_VALUE": 0,
"GET_VALUE": 1,
"ADD_PROVIDER": 2,
"GET_PROVIDERS": 3,
"FIND_NODE": 4,
"PING": 5,
}
func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType {
p := new(DHTMessage_MessageType)
*p = x
return p
}
func (x DHTMessage_MessageType) String() string {
return proto.EnumName(DHTMessage_MessageType_name, int32(x))
}
func (x *DHTMessage_MessageType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(DHTMessage_MessageType_value, data, "DHTMessage_MessageType")
if err != nil {
return err
}
*x = DHTMessage_MessageType(value)
return nil
}
type DHTMessage struct {
Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"`
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
// Unique ID of this message, used to match queries with responses
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
// Signals whether or not this message is a response to another message
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *DHTMessage) Reset() { *m = DHTMessage{} }
func (m *DHTMessage) String() string { return proto.CompactTextString(m) }
func (*DHTMessage) ProtoMessage() {}
func (m *DHTMessage) GetType() DHTMessage_MessageType {
if m != nil && m.Type != nil {
return *m.Type
}
return DHTMessage_PUT_VALUE
}
func (m *DHTMessage) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
}
return ""
}
func (m *DHTMessage) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *DHTMessage) GetId() uint64 {
if m != nil && m.Id != nil {
return *m.Id
}
return 0
}
func (m *DHTMessage) GetResponse() bool {
if m != nil && m.Response != nil {
return *m.Response
}
return false
}
func init() {
proto.RegisterEnum("dht.DHTMessage_MessageType", DHTMessage_MessageType_name, DHTMessage_MessageType_value)
}
package dht;
//run `protoc --go_out=. *.proto` to generate
message DHTMessage {
enum MessageType {
PUT_VALUE = 0;
GET_VALUE = 1;
ADD_PROVIDER = 2;
GET_PROVIDERS = 3;
FIND_NODE = 4;
PING = 5;
}
required MessageType type = 1;
optional string key = 2;
optional bytes value = 3;
// Unique ID of this message, used to match queries with responses
required uint64 id = 4;
// Signals whether or not this message is a response to another message
optional bool response = 5;
}
package dht
import (
"math/rand"
"time"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
)
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
return uint64(rand.Uint32()) << 32 & uint64(rand.Uint32())
}
// This file implements the Routing interface for the IpfsDHT struct.
// Basic Put/Get
// PutValue adds value corresponding to given Key.
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
var p *peer.Peer
p = s.routes.NearestNode(key)
pmes_type := DHTMessage_PUT_VALUE
str_key := string(key)
mes_id := GenerateMessageID()
pmes := new(DHTMessage)
pmes.Type = &pmes_type
pmes.Key = &str_key
pmes.Value = value
pmes.Id = &mes_id
mes := new(swarm.Message)
mes.Data = []byte(pmes.String())
mes.Peer = p
s.network.Chan.Outgoing <- mes
return nil
}
// GetValue searches for the value corresponding to given Key.
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
var p *peer.Peer
p = s.routes.NearestNode(key)
str_key := string(key)
mes_type := DHTMessage_GET_VALUE
mes_id := GenerateMessageID()
// protobuf structure
pmes := new(DHTMessage)
pmes.Type = &mes_type
pmes.Key = &str_key
pmes.Id = &mes_id
mes := new(swarm.Message)
mes.Data = []byte(pmes.String())
mes.Peer = p
response_chan := s.ListenFor(*pmes.Id)
// Wait for either the response or a timeout
timeup := time.After(timeout)
select {
case <-timeup:
// TODO: unregister listener
return nil, u.ErrTimeout
case resp := <-response_chan:
return resp.Data, nil
}
}
// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
// Announce that this node can provide value for given key
func (s *IpfsDHT) Provide(key u.Key) error {
return u.ErrNotImplemented
}
// FindProviders searches for peers who can provide the value for given key.
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error) {
return nil, u.ErrNotImplemented
}
// Find specific Peer
// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
return nil, u.ErrNotImplemented
}
package dht
import (
"bytes"
"container/list"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// ID for IpfsDHT should be a byte slice, to allow for simpler operations
// (xor). DHT ids are based on the peer.IDs.
//
// NOTE: peer.IDs are biased because they are multihashes (first bytes
// biased). Thus, may need to re-hash keys (uniform dist). TODO(jbenet)
type ID []byte
// Bucket holds a list of peers.
type Bucket []*list.List
// RoutingTable defines the routing table.
type RoutingTable struct {
// kBuckets define all the fingers to other nodes.
Buckets []Bucket
}
//TODO: make this accept an ID, requires method of converting keys to IDs
func (rt *RoutingTable) NearestNode(key u.Key) *peer.Peer {
panic("Function not implemented.")
}
func (id ID) Equal(other ID) bool {
return bytes.Equal(id, other)
}
func (id ID) Less(other interface{}) bool {
a, b := equalizeSizes(id, other.(ID))
for i := 0; i < len(a); i++ {
if a[i] != b[i] {
return a[i] < b[i]
}
}
return len(a) < len(b)
}
func (id ID) commonPrefixLen() int {
for i := 0; i < len(id); i++ {
for j := 0; j < 8; j++ {
if (id[i]>>uint8(7-j))&0x1 != 0 {
return i*8 + j
}
}
}
return len(id)*8 - 1
}
func xor(a, b ID) ID {
a, b = equalizeSizes(a, b)
c := make(ID, len(a))
for i := 0; i < len(a); i++ {
c[i] = a[i] ^ b[i]
}
return c
}
func equalizeSizes(a, b ID) (ID, ID) {
la := len(a)
lb := len(b)
if la < lb {
na := make([]byte, lb)
copy(na, a)
a = na
} else if lb < la {
nb := make([]byte, la)
copy(nb, b)
b = nb
}
return a, b
}
package routing
import (
"time"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
"time"
)
// IpfsRouting is the routing module interface
// It is implemented by things like DHTs, etc.
type IpfsRouting interface {
// Basic Put/Get
// Basic Put/Get
// PutValue adds value corresponding to given Key.
PutValue(key u.Key, value []byte) (error)
// PutValue adds value corresponding to given Key.
PutValue(key u.Key, value []byte) error
// GetValue searches for the value corresponding to given Key.
GetValue(key u.Key, timeout time.Duration) ([]byte, error)
// GetValue searches for the value corresponding to given Key.
GetValue(key u.Key, timeout time.Duration) ([]byte, error)
// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
// Announce that this node can provide value for given key
Provide(key u.Key) error
// Announce that this node can provide value for given key
Provide(key u.Key) (error)
// FindProviders searches for peers who can provide the value for given key.
FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error)
// FindProviders searches for peers who can provide the value for given key.
FindProviders(key u.Key, timeout time.Duration) (*peer.Peer, error)
// Find specific Peer
// Find specific Peer
// FindPeer searches for a peer with given ID.
FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error)
// FindPeer searches for a peer with given ID.
FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error)
}
......@@ -44,29 +44,36 @@ func Dial(network string, peer *peer.Peer) (*Conn, error) {
return nil, err
}
out := msgio.NewChan(10)
inc := msgio.NewChan(10)
conn := &Conn{
Peer: peer,
Addr: addr,
Conn: nconn,
}
Outgoing: out,
Incoming: inc,
Closed: make(chan bool, 1),
newConnChans(conn)
return conn, nil
}
// Construct new channels for given Conn.
func newConnChans(c *Conn) error {
if c.Outgoing != nil || c.Incoming != nil {
return fmt.Errorf("Conn already initialized")
}
go out.WriteTo(nconn)
go inc.ReadFrom(nconn, 1<<12)
c.Outgoing = msgio.NewChan(10)
c.Incoming = msgio.NewChan(10)
c.Closed = make(chan bool, 1)
return conn, nil
go c.Outgoing.WriteTo(c.Conn)
go c.Incoming.ReadFrom(c.Conn, 1<<12)
return nil
}
// Close closes the connection, and associated channels.
func (s *Conn) Close() error {
if s.Conn == nil {
return fmt.Errorf("Already closed.") // already closed
return fmt.Errorf("Already closed") // already closed
}
// closing net connection
......
......@@ -3,6 +3,9 @@ package swarm
import (
"fmt"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
"net"
"sync"
)
......@@ -18,8 +21,8 @@ type Message struct {
// Chan is a swam channel, which provides duplex communication and errors.
type Chan struct {
Outgoing chan Message
Incoming chan Message
Outgoing chan *Message
Incoming chan *Message
Errors chan error
Close chan bool
}
......@@ -27,8 +30,8 @@ type Chan struct {
// NewChan constructs a Chan instance, with given buffer size bufsize.
func NewChan(bufsize int) *Chan {
return &Chan{
Outgoing: make(chan Message, bufsize),
Incoming: make(chan Message, bufsize),
Outgoing: make(chan *Message, bufsize),
Incoming: make(chan *Message, bufsize),
Errors: make(chan error),
Close: make(chan bool, bufsize),
}
......@@ -42,19 +45,64 @@ type Swarm struct {
Chan *Chan
conns ConnMap
connsLock sync.RWMutex
local *peer.Peer
}
// NewSwarm constructs a Swarm, with a Chan.
func NewSwarm() *Swarm {
func NewSwarm(local *peer.Peer) *Swarm {
s := &Swarm{
Chan: NewChan(10),
conns: ConnMap{},
local: local,
}
go s.fanOut()
return s
}
// Close closes a swam.
// Open listeners for each network the swarm should listen on
func (s *Swarm) Listen() {
for _, addr := range s.local.Addresses {
err := s.connListen(addr)
if err != nil {
u.PErr("Failed to listen on: %s [%s]", addr, err)
}
}
}
// Listen for new connections on the given multiaddr
func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
netstr, addr, err := maddr.DialArgs()
if err != nil {
return err
}
list, err := net.Listen(netstr, addr)
if err != nil {
return err
}
// Accept and handle new connections on this listener until it errors
go func() {
for {
nconn, err := list.Accept()
if err != nil {
u.PErr("Failed to accept connection: %s - %s", netstr, addr)
return
}
go s.handleNewConn(nconn)
}
}()
return nil
}
// Handle getting ID from this peer and adding it into the map
func (s *Swarm) handleNewConn(nconn net.Conn) {
panic("Not yet implemented!")
}
// Close closes a swarm.
func (s *Swarm) Close() {
s.connsLock.RLock()
l := len(s.conns)
......@@ -149,7 +197,7 @@ Loop:
}
// wrap it for consumers.
msg := Message{Peer: conn.Peer, Data: data}
msg := &Message{Peer: conn.Peer, Data: data}
s.Chan.Incoming <- msg
}
}
......
......@@ -42,7 +42,7 @@ func pong(c net.Conn, peer *peer.Peer) {
func TestSwarm(t *testing.T) {
swarm := NewSwarm()
swarm := NewSwarm(nil)
peers := []*peer.Peer{}
listeners := []*net.Listener{}
peerNames := map[string]string{
......@@ -84,7 +84,7 @@ func TestSwarm(t *testing.T) {
MsgNum := 1000
for k := 0; k < MsgNum; k++ {
for _, p := range peers {
swarm.Chan.Outgoing <- Message{Peer: p, Data: []byte("ping")}
swarm.Chan.Outgoing <- &Message{Peer: p, Data: []byte("ping")}
}
}
......
......@@ -14,6 +14,9 @@ var Debug bool
// ErrNotImplemented signifies a function has not been implemented yet.
var ErrNotImplemented = fmt.Errorf("Error: not implemented yet.")
// ErrTimeout implies that a timeout has been triggered
var ErrTimeout = fmt.Errorf("Error: Call timed out.")
// Key is a string representation of multihash for use with maps.
type Key string
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment