484 lines
13 KiB
Go
484 lines
13 KiB
Go
|
package kademlia
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/binary"
|
||
|
"errors"
|
||
|
"io"
|
||
|
"log"
|
||
|
"net"
|
||
|
"os"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
ProofOfWorkBits = 2
|
||
|
Parallelisation = 3
|
||
|
)
|
||
|
|
||
|
type Config struct {
|
||
|
Bootstrapper string // Bootstrapper IP
|
||
|
BootstrapperPort uint64
|
||
|
P2PAddress string
|
||
|
P2PPort uint64
|
||
|
APIAddress string
|
||
|
APIPort uint64
|
||
|
RepublishInterval time.Duration
|
||
|
PrefixLength uint64 // Bits matched per bucket in routing table
|
||
|
CertFile string // Where to store the certificate
|
||
|
KeyFile string // Where to store the key
|
||
|
BootstrapperCert string // Path to the already present bootstrapper's certificate
|
||
|
}
|
||
|
|
||
|
type DHT struct {
|
||
|
RoutingTable *RoutingTable
|
||
|
SelfNode P2PNode
|
||
|
Storage DHTStorage
|
||
|
config *Config
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
NewDHT Creates a new DHT object based on the configuration passed. If the bootstrap field of the config is left as empty
|
||
|
string, bootstrapping will be skipped.
|
||
|
*/
|
||
|
func NewDHT(config *Config) *DHT {
|
||
|
nodeIP := net.ParseIP(config.P2PAddress)
|
||
|
GenerateCert(config.P2PAddress, config.CertFile, config.KeyFile)
|
||
|
|
||
|
cert, err := os.ReadFile(config.CertFile)
|
||
|
if err != nil {
|
||
|
log.Fatal(err)
|
||
|
}
|
||
|
|
||
|
selfNode := P2PNode{
|
||
|
NodeID: GetPeerId(nodeIP, uint16(config.P2PPort)),
|
||
|
IPAddress: nodeIP,
|
||
|
Port: config.P2PPort,
|
||
|
Certificate: cert,
|
||
|
}
|
||
|
dht := &DHT{
|
||
|
Storage: DHTStorage{
|
||
|
//storageMap: sync.Map{},
|
||
|
storageMap: map[[32]byte]ValueType{},
|
||
|
},
|
||
|
SelfNode: selfNode,
|
||
|
RoutingTable: NewRoutingTable(selfNode, config.PrefixLength),
|
||
|
config: config,
|
||
|
}
|
||
|
|
||
|
dht.bootstrap()
|
||
|
return dht
|
||
|
}
|
||
|
|
||
|
type P2PNode struct {
|
||
|
NodeID [32]byte
|
||
|
IPAddress net.IP
|
||
|
Port uint64
|
||
|
Certificate []byte
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
bootstrap populates the routing table upon creation of the dht.
|
||
|
*/
|
||
|
func (d *DHT) bootstrap() {
|
||
|
if len(d.config.Bootstrapper) == 0 {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
bootstrapperCert, err := os.ReadFile(d.config.BootstrapperCert)
|
||
|
if err != nil {
|
||
|
log.Printf("[Boostrap] Failed to load boostrapper certificate: %v Proceeding without boostrap!", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
bootstrapNode := CreateNode(d.config.Bootstrapper, d.config.BootstrapperPort, bootstrapperCert)
|
||
|
|
||
|
d.RoutingTable.Insert(bootstrapNode)
|
||
|
|
||
|
//By flipping bits of our ID, we can query for nodes at specific distances to us, filling up the respective
|
||
|
//Buckets of the routing table.
|
||
|
for i := 0; i < 256; i++ {
|
||
|
toQuery := setBitAt(d.SelfNode.NodeID, i, getBitsAt(d.SelfNode.NodeID, uint64(i), 1) == 0) //Flip the bit
|
||
|
targets := *d.RoutingTable.FindClosest(toQuery, bucketSize)
|
||
|
if len(targets) == 0 {
|
||
|
log.Printf("This should not have happened")
|
||
|
}
|
||
|
targets = SortNodesByDistanceToKey(d.SelfNode.NodeID, targets)
|
||
|
|
||
|
// Once we find ourself, we will do so in further steps as well, so no need to go on.
|
||
|
if NodeEquals(targets[0], d.SelfNode) {
|
||
|
break
|
||
|
}
|
||
|
res, err := d.FindNodesAtNode(toQuery, targets[0], bucketSize)
|
||
|
if err != nil {
|
||
|
log.Printf("[DHT]: Error during bootstrap of node %v: %v", d.SelfNode.IPAddress, err)
|
||
|
continue
|
||
|
}
|
||
|
for _, node := range res {
|
||
|
d.RoutingTable.Insert(node)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
Store stores an entry at the specified key in the network. For this it first finds the nodes responsible for the key and
|
||
|
then sends STORE requests to them.
|
||
|
*/
|
||
|
func (d *DHT) Store(key [32]byte, value []byte, ttl uint16, repCount uint16) {
|
||
|
// Find the closest nodes to the key
|
||
|
targets, err := d.FindNode(key, repCount)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// If we are one of the closest nodes, we store it ourselves
|
||
|
if ContainsNode(targets, d.SelfNode) {
|
||
|
d.Storage.Put(key, value, ttl, repCount)
|
||
|
}
|
||
|
|
||
|
//Do proof of work only once
|
||
|
pow := ProofWorkForStore(key, value)
|
||
|
|
||
|
// Replicate the value to the closest nodes
|
||
|
for i := 0; i < int(repCount) && i < len(targets); i++ {
|
||
|
if targets[i].NodeID != d.SelfNode.NodeID {
|
||
|
// Send the value to the target
|
||
|
// log.Printf("Storing at %v, entry %d", targets[i].IPAddress, i)
|
||
|
go d.sendStore(key, value, repCount, ttl, targets[i], pow)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (d *DHT) Get(key [32]byte) ([]byte, bool) {
|
||
|
return d.FindValue(key)
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
Ping sends a PING message to the specified node and returns if it is still alive
|
||
|
*/
|
||
|
func Ping(node P2PNode, selfNode P2PNode) bool {
|
||
|
connection, err := ConnectToNode(node)
|
||
|
if err != nil {
|
||
|
return false
|
||
|
}
|
||
|
defer connection.Close()
|
||
|
|
||
|
packet := P2PPingPacket{
|
||
|
SourceIP: selfNode.IPAddress,
|
||
|
SourcePort: uint16(selfNode.Port),
|
||
|
Certificate: selfNode.Certificate,
|
||
|
}
|
||
|
|
||
|
_, err = connection.Write(SerialisePingPacket(&packet))
|
||
|
if err != nil {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
pingTimeout := 1 * time.Second
|
||
|
err = connection.SetDeadline(time.Now().Add(pingTimeout))
|
||
|
if err != nil {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
receiveBuffer := make([]byte, 4)
|
||
|
_, err = io.ReadFull(connection, receiveBuffer)
|
||
|
if err != nil {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
recvSize := binary.BigEndian.Uint16(receiveBuffer[:2])
|
||
|
recvType := binary.BigEndian.Uint16(receiveBuffer[2:])
|
||
|
if recvType != PONG || recvSize != 4 {
|
||
|
return false
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
FindValue queries the DHT network for the value related to the provided key. It does so in a highly parallel fashion,
|
||
|
querying the network for the closest nodes to the key first and then querying those for the key.
|
||
|
It returns the key, if found
|
||
|
*/
|
||
|
func (d *DHT) FindValue(key [32]byte) ([]byte, bool) {
|
||
|
v, e := d.Storage.GetValue(key)
|
||
|
if e {
|
||
|
return v, true
|
||
|
}
|
||
|
|
||
|
toSearch, err := d.FindNode(key, bucketSize)
|
||
|
if toSearch == nil || err != nil {
|
||
|
return nil, false
|
||
|
}
|
||
|
|
||
|
resultChannel := make(chan []byte)
|
||
|
done := make(chan struct{})
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
for _, node := range toSearch {
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
d.findValueAtNode(resultChannel, key, node)
|
||
|
wg.Done()
|
||
|
}()
|
||
|
}
|
||
|
go func() {
|
||
|
wg.Wait()
|
||
|
close(done)
|
||
|
}()
|
||
|
select {
|
||
|
case result := <-resultChannel:
|
||
|
return result, true
|
||
|
case <-done:
|
||
|
return nil, false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
findValueAtNode connects to a node and sends a FINDVALUE request for the specified key. In case it is successful it
|
||
|
reports it result to the results channel. If not, it exits with no reply, allowing for parallel queries.
|
||
|
*/
|
||
|
func (d *DHT) findValueAtNode(results chan<- []byte, key [32]byte, target P2PNode) {
|
||
|
connection, err := ConnectToNode(target)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
defer connection.Close()
|
||
|
packet := P2PFindValuePacket{
|
||
|
SourceIP: d.SelfNode.IPAddress,
|
||
|
SourcePort: uint16(d.SelfNode.Port),
|
||
|
Certificate: d.SelfNode.Certificate,
|
||
|
Key: key,
|
||
|
}
|
||
|
_, err = connection.Write(SerialiseFindValuePacket(&packet))
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
responseHeader := make([]byte, 4)
|
||
|
_, err = io.ReadFull(connection, responseHeader)
|
||
|
if err != nil {
|
||
|
log.Print(err)
|
||
|
return
|
||
|
}
|
||
|
size := binary.BigEndian.Uint16(responseHeader[0:2])
|
||
|
responseType := binary.BigEndian.Uint16(responseHeader[2:4])
|
||
|
|
||
|
switch responseType {
|
||
|
case FOUNDVALUE:
|
||
|
responseBody := make([]byte, size-4)
|
||
|
_, err = io.ReadFull(connection, responseBody)
|
||
|
if err != nil {
|
||
|
log.Print(err)
|
||
|
return
|
||
|
}
|
||
|
reply := ParseFoundValuePacket(append(responseHeader, responseBody...))
|
||
|
results <- reply.Value
|
||
|
return
|
||
|
default:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
sendStore connects to the target node and sends a STORE packet matching the parameters provided.
|
||
|
*/
|
||
|
func (d *DHT) sendStore(key [32]byte, value []byte, repCount uint16, ttl uint16, target P2PNode, pow [32]byte) {
|
||
|
connection, err := ConnectToNode(target)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
defer connection.Close()
|
||
|
|
||
|
packet := P2PStorePacket{
|
||
|
SourceIP: d.SelfNode.IPAddress,
|
||
|
SourcePort: uint16(d.SelfNode.Port),
|
||
|
Certificate: d.SelfNode.Certificate,
|
||
|
TTL: ttl,
|
||
|
RepCount: repCount,
|
||
|
PoW: pow,
|
||
|
Key: key,
|
||
|
Value: value,
|
||
|
}
|
||
|
_, err = connection.Write(SerialiseStorePacket(&packet))
|
||
|
|
||
|
if err != nil {
|
||
|
print("Error sending store message %v", err)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// FindNode queries the DHT until it found the closest node to the given key. It then returns the reply
|
||
|
// of that node to a FINDNODE request for that key
|
||
|
// The search is performed on several nodes in parallel, depending on the Parallelisation setting
|
||
|
func (d *DHT) FindNode(key [32]byte, numberOfNodes uint16) ([]P2PNode, error) {
|
||
|
toSearch := *d.RoutingTable.FindClosest(key, Parallelisation*5)
|
||
|
|
||
|
// If our routing table is empty, we apparently are alone
|
||
|
if len(toSearch) == 0 {
|
||
|
d.RoutingTable.Insert(d.SelfNode)
|
||
|
toSearch = append(toSearch, d.SelfNode)
|
||
|
return toSearch, nil
|
||
|
}
|
||
|
|
||
|
// Create a context to stop any loose running threads once a result has been found
|
||
|
ctx, cancel := context.WithCancel(context.Background())
|
||
|
defer cancel()
|
||
|
|
||
|
resultChannel := make(chan []P2PNode)
|
||
|
done := make(chan struct{})
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
for i := 0; i < Parallelisation; i++ {
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
// Start from different positions to not query the same node twice
|
||
|
startIndex := i * len(toSearch) / Parallelisation
|
||
|
_ = d.findClosestNodes(ctx, resultChannel, key, toSearch[startIndex:], numberOfNodes)
|
||
|
wg.Done()
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
// The done channel is only closed, if all routines calling findClosestNodes finished. If any of them does so by
|
||
|
// reporting a success, the first case below will be selected. Only if all failed to contact any nodes will
|
||
|
// the node search fail.
|
||
|
go func() {
|
||
|
wg.Wait()
|
||
|
close(done)
|
||
|
}()
|
||
|
select {
|
||
|
case result := <-resultChannel:
|
||
|
return result, nil
|
||
|
case <-done:
|
||
|
return toSearch, errors.New("node search unsuccessful")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
findClosestNodes tries to find the closest <amount> nodes to the specified key in the network. If no further improvements
|
||
|
are made, it reports back its result to the results channel. If the context is ended, it stops prematurely without
|
||
|
reporting a result. This is used when another concurrent search is successful first. It is provided an array of nodes
|
||
|
that it will use as first nodes to contact. This allows starting several threads with different node arrays, so that
|
||
|
not all threads choose the same node as start point.
|
||
|
*/
|
||
|
func (d *DHT) findClosestNodes(ctx context.Context, results chan<- []P2PNode, key [32]byte, nodes []P2PNode, amount uint16) error {
|
||
|
currentNode := nodes[0]
|
||
|
var closestSuccessfulNodeID [32]byte //Start of as far away as possible so everything is an improvement
|
||
|
for i, b := range key {
|
||
|
closestSuccessfulNodeID[i] = ^b
|
||
|
}
|
||
|
|
||
|
index := 0
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return nil
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
result, err := d.FindNodesAtNode(key, currentNode, amount)
|
||
|
if err != nil || len(result) == 0 {
|
||
|
index++
|
||
|
if index >= len(nodes) {
|
||
|
return errors.New("ran out of nodes")
|
||
|
} else {
|
||
|
currentNode = nodes[index]
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
result = append(result, nodes...)
|
||
|
result = SortNodesByDistanceToKey(key, result)
|
||
|
result = RemoveDuplicates(result)
|
||
|
|
||
|
if !IsCloserToKey(key, result[index].NodeID, closestSuccessfulNodeID) {
|
||
|
results <- result[index:]
|
||
|
return nil
|
||
|
} else {
|
||
|
nodes = result
|
||
|
currentNode = result[0]
|
||
|
index = 0
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
FindNodesAtNode connects to the specified node and issues a FINDNODE request to it for the key in question. It returns
|
||
|
the nodes sent back, if successful. numberOfNodes specifies, how many nodes should be tried to return.
|
||
|
*/
|
||
|
func (d *DHT) FindNodesAtNode(key [32]byte, node P2PNode, numberOfNodes uint16) ([]P2PNode, error) {
|
||
|
connection, err := ConnectToNode(node)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
defer connection.Close()
|
||
|
packet := P2PFindNodePacket{
|
||
|
SourceIP: d.SelfNode.IPAddress,
|
||
|
SourcePort: uint16(d.SelfNode.Port),
|
||
|
Key: key,
|
||
|
Certificate: d.SelfNode.Certificate,
|
||
|
Amount: numberOfNodes,
|
||
|
}
|
||
|
_, err = connection.Write(SerialiseFindNodePacket(&packet))
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
responseHeader := make([]byte, 4)
|
||
|
|
||
|
deadline := time.Now().Add(time.Second * 1)
|
||
|
err = connection.SetDeadline(deadline)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
_, err = io.ReadFull(connection, responseHeader)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
size := binary.BigEndian.Uint16(responseHeader)
|
||
|
messageType := binary.BigEndian.Uint16(responseHeader[2:])
|
||
|
if messageType != FOUNDNODE {
|
||
|
return nil, errors.New("received invalid response")
|
||
|
}
|
||
|
|
||
|
responseBody := make([]byte, size-4)
|
||
|
_, err = io.ReadFull(connection, responseBody)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
foundNodes, err := ParseFoundNodesPacket(append(responseHeader, responseBody...))
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return foundNodes.Nodes, nil
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
Housekeeping needs to be passed the time that passed since the last time it has been called. It then expires any entries
|
||
|
in the in memory storage of the DHT whose ttl ran out and republishes entries due for republishing.
|
||
|
*/
|
||
|
func (d *DHT) Housekeeping(elapsedTime time.Duration) {
|
||
|
d.Storage.UpdateAndExpireEntries(elapsedTime)
|
||
|
toBeRepublished := d.Storage.GetEntriesForRepublishing(d.config.RepublishInterval)
|
||
|
d.RepublishEntries(toBeRepublished)
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
RepublishEntries checks for all entries in the map provided, if they have not been republished for a long enough time
|
||
|
by now and republishes the ones due.
|
||
|
*/
|
||
|
func (d *DHT) RepublishEntries(entries map[KeyType]ValueType) {
|
||
|
for key, entry := range entries {
|
||
|
if time.Since(entry.timeAdded) > d.config.RepublishInterval {
|
||
|
// Do in parallel
|
||
|
go d.Store(key, entry.Value, entry.TTL, entry.RepCount)
|
||
|
}
|
||
|
}
|
||
|
}
|