commit 98fd5f52f8472cc1c8c212f6d4562db45002ca0e Author: Darquise Date: Fri Sep 13 14:04:40 2024 +0900 Copy from LRZgitLab diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..8eaca0a --- /dev/null +++ b/.dockerignore @@ -0,0 +1,34 @@ +# Include any files or directories that you don't want to be copied to your +# container here (e.g., local build artifacts, temporary files, etc.). +# +# For more help, visit the .dockerignore file reference guide at +# https://docs.docker.com/go/build-context-dockerignore/ + +./shared_data + +**/.DS_Store +**/.classpath +**/.dockerignore +**/.env +**/.git +**/.gitignore +**/.project +**/.settings +**/.toolstarget +**/.vs +**/.vscode +**/*.*proj.user +**/*.dbmdl +**/*.jfm +**/bin +**/charts +**/docker-compose* +**/compose.y*ml +**/Dockerfile* +**/node_modules +**/npm-debug.log +**/obj +**/secrets.dev.yaml +**/values.dev.yaml +LICENSE +README.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ff53756 --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +vendor/ + +# Go workspace file +go.work + +# Default ignored files +/shelf/ +.idea/ +build/ + +# Testing files +cmd/certs +shared_data/cert* +shared_data/key* +generate_compose.sh +temp.yaml diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..e53a379 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,15 @@ +run: + timeout: 5m +linters: + enable: + - revive + - staticcheck + - gofmt + - govet + - errcheck + - ineffassign + - unused +issues: + exclude-use-default: false + exclude: + - "could not import C (no metadata for C)" \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5f29ddd --- /dev/null +++ b/Dockerfile @@ -0,0 +1,32 @@ +# syntax=docker/dockerfile:1 + +FROM golang:1.23 + +# Set destination for COPY +WORKDIR /app + +# Download Go modules +COPY go.mod go.sum ./ +RUN go mod download + +# Copy the source code. Note the slash at the end, as explained in +# https://docs.docker.com/reference/dockerfile/#copy +COPY ./cmd ./cmd/ +COPY ./pkg ./pkg/ + + +WORKDIR ./cmd +# Build + +RUN CGO_ENABLED=0 GOOS=linux go build -o docker-gs-ping + +# Optional: +# To bind to a TCP port, runtime parameters must be supplied to the docker command. +# But we can document in the Dockerfile what ports +# the application is going to listen on by default. +# https://docs.docker.com/reference/dockerfile/#expose +EXPOSE 7001/TCP +EXPOSE 6001/TCP + +# Run +CMD ["./docker-gs-ping","-c", "./shared_data/config1.ini"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..044d375 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Simon Hanssen, Janin Chaib + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/cmd/api_server.go b/cmd/api_server.go new file mode 100644 index 0000000..7d59215 --- /dev/null +++ b/cmd/api_server.go @@ -0,0 +1,107 @@ +package main + +import ( + "encoding/binary" + kademlia "gitlab.lrz.de/netintum/teaching/p2psec_projects_2024/DHT-6/pkg" + "io" + "log" + "net" + "strconv" +) + +// Server configuration and message type identifiers + +type APIServer struct { + dht *kademlia.DHT +} + +func initializeAPIServer(config *kademlia.Config, dht *kademlia.DHT) { + server := APIServer{} + server.dht = dht + listener, err := net.Listen("tcp", config.APIAddress+":"+strconv.Itoa(int(config.APIPort))) + if err != nil { + log.Fatalln("[API Server] Error starting server: ", err) + } + defer listener.Close() + + for { + connection, err := listener.Accept() + if err != nil { + log.Println("[API Server] Error accepting connection: ", err) + continue + } + // Handle in seperate thread + go server.handleAPIRequest(connection) + } +} + +func (s *APIServer) handleAPIRequest(connection net.Conn) { + + buf := make([]byte, 4) + _, err := connection.Read(buf) + if err != nil { + log.Println("[API Server] Error reading message: ", err) + return + } + defer connection.Close() + + size := binary.BigEndian.Uint16(buf[:2]) + messageType := binary.BigEndian.Uint16(buf[2:]) + + // Subtract 4 bytes used for size and message type + messageBody := make([]byte, size-4) + io.ReadFull(connection, messageBody) + + // Combine body and head to get the full message again + message := append(buf, messageBody...) + + switch messageType { + case kademlia.PUT: + s.handlePut(message, connection) + case kademlia.GET: + s.handleGet(message, connection) + default: + log.Println("[API Server] Invalid message type") + } +} + +func (s *APIServer) handlePut(data []byte, connection net.Conn) { + packet, err := kademlia.ParsePutPacket(data) + if err != nil { + log.Printf("[API Server] Error receiving PUT request: %v", err) + return + } + + // Store the value in the network + s.dht.Store(packet.Key, packet.Value, packet.TTL, uint16(packet.Replication)) +} + +func (s *APIServer) handleGet(data []byte, connection net.Conn) { + packet, err := kademlia.ParseGetPacket(data) + if err != nil { + log.Printf("[API Server] Error receiving GET request: %v", err) + return + } + + // Query the network for the value + val, exists := s.dht.Get(packet.Key) + + if exists { + response := kademlia.DHTSuccessPacket{ + Key: packet.Key, + Value: val, + } + _, err := connection.Write(kademlia.SerialiseSuccessPacket(response)) + if err != nil { + log.Println("[API Server] Error sending GET response: ", err) + return + } + } else { + response := kademlia.DHTFailurePacket{Key: packet.Key} + _, err := connection.Write(kademlia.SerialiseFailurePacket(response)) + if err != nil { + log.Println("[API Server] Error sending GET response: ", err) + return + } + } +} diff --git a/cmd/config.ini b/cmd/config.ini new file mode 100644 index 0000000..4d3d33b --- /dev/null +++ b/cmd/config.ini @@ -0,0 +1,16 @@ +hostkey = /hostkey.pem + +[gossip] +cache_size = 50 +degree = 30 +bootstrapper = p2psec.net.in.tum.de:6001 +p2p_address = 131.159.15.61:6001 +api_address = 131.159.15.61:7001 + +[dht] +p2p_address = 10.0.0.10:6001 +api_address = 10.0.0.10:7001 +prefix_length = 5 +republish_interval = 3600 +key_file = "keyC.pem" +cert_file = "certC.pem" diff --git a/cmd/dht b/cmd/dht new file mode 100644 index 0000000..3ca99f3 Binary files /dev/null and b/cmd/dht differ diff --git a/cmd/dht_test.go b/cmd/dht_test.go new file mode 100644 index 0000000..3f1b62d --- /dev/null +++ b/cmd/dht_test.go @@ -0,0 +1,245 @@ +package main + +import ( + "fmt" + kademlia "gitlab.lrz.de/netintum/teaching/p2psec_projects_2024/DHT-6/pkg" + "strconv" + "testing" + "time" +) + +func startServer(config *kademlia.Config) *kademlia.DHT { + dht := kademlia.NewDHT(config) + + go initializeAPIServer(config, dht) + go initializeP2PServer(config, dht) + return dht +} + +func TestBasicStartAPI(t *testing.T) { + + config := &kademlia.Config{ + P2PPort: 6001, + APIPort: 7001, + P2PAddress: "127.0.0.2", + APIAddress: "127.0.0.2", + KeyFile: "certs/key.pem", + CertFile: "certs/cert.pem", + PrefixLength: 5, + RepublishInterval: 3600, + } + startServer(config) + + time.Sleep(1 * time.Second) + var key [32]byte + value := []byte("Hello World") + packet := kademlia.SerialisePutPacket(&kademlia.DHTPutPacket{ + TTL: 20, + Replication: 20, + Key: key, + Value: value, + }) + + conn, err := kademlia.ConnectToApi("127.0.0.2", 7001) + if err != nil { + t.Fatalf("Failed to connect %v", err) + } + conn.Write(packet) + conn.Close() + packet = kademlia.SerialiseGetPacket(&kademlia.DHTGetPacket{Key: key}) + + conn, err = kademlia.ConnectToApi("127.0.0.2", 7001) + if err != nil { + t.Fatalf("Failed to connect %v", err) + } + + conn.Write(packet) + response := make([]byte, 100) + n, _ := conn.Read(response) + t.Logf("Response: %v Num: %d", response, n) + +} + +/* +Testing whether or not the boostrapping process works as intended. By running the second node configured with +the first as boostrapper, they both should appear in each others routing tables. +*/ +func TestBoostrap(t *testing.T) { + config1 := &kademlia.Config{ + P2PPort: 6001, + APIPort: 7001, + P2PAddress: "127.0.0.2", + APIAddress: "127.0.0.2", + KeyFile: "certs/key.pem", + CertFile: "certs/cert.pem", + } + + config2 := &kademlia.Config{ + P2PPort: 6001, + APIPort: 7001, + P2PAddress: "127.0.0.1", + APIAddress: "127.0.0.1", + Bootstrapper: "127.0.0.2", + BootstrapperPort: 6001, + BootstrapperCert: "certs/cert.pem", + KeyFile: "certs/key1.pem", + CertFile: "certs/cert1.pem", + } + + dht := startServer(config1) + + time.Sleep(100 * time.Millisecond) + dht2 := startServer(config2) + time.Sleep(100 * time.Millisecond) + if !kademlia.ContainsNode(*dht.RoutingTable.FindClosest(dht2.SelfNode.NodeID, 10), dht2.SelfNode) { + t.Fatalf("Boostrapping failed, node1 did not add node2 to table") + } + if !kademlia.ContainsNode(*dht2.RoutingTable.FindClosest(dht.SelfNode.NodeID, 10), dht2.SelfNode) { + t.Fatalf("Boostrapping failed, node2 did not add node1 to table") + } +} + +/* +This test covers most of the DHTs functionality. One node acts as bootstrapper for the others, +two values are stored with different replication counts. It is tested whether finding these values works from +any node in the network and whether the replication works correctly. +*/ +func TestStoreReceiveReplication(t *testing.T) { + dhts := make([]kademlia.DHT, 0) + for i := 2; i < 40; i++ { + config := &kademlia.Config{ + P2PPort: 7001, + APIPort: 6001, + P2PAddress: "127.0.0." + strconv.Itoa(i), + APIAddress: "127.0.0." + strconv.Itoa(i), + Bootstrapper: "127.0.0.2", + BootstrapperPort: 7001, + BootstrapperCert: "certs/cert2.pem", + KeyFile: fmt.Sprintf("certs/key%d.pem", i), + CertFile: fmt.Sprintf("certs/cert%d.pem", i), + PrefixLength: 5, + RepublishInterval: 3600 * time.Second, + } + if i == 2 { + config.Bootstrapper = "" + } + time.Sleep(100 * time.Millisecond) + dht := startServer(config) + dhts = append(dhts, *dht) + } + time.Sleep(100 * time.Millisecond) + var key, key2 [32]byte + key2[0] = 1 + value := []byte("Hello World") + packet := kademlia.SerialisePutPacket(&kademlia.DHTPutPacket{ + TTL: 200, + Replication: 20, + Key: key, + Value: value, + }) + conn, err := kademlia.ConnectToApi("127.0.0.2", 6001) + if err != nil { + t.Fatalf("Failed to connect %v", err) + } + conn.Write(packet) + conn.Close() + + conn, err = kademlia.ConnectToApi("127.0.0.2", 6001) + packet = kademlia.SerialisePutPacket(&kademlia.DHTPutPacket{ + TTL: 200, + Replication: 5, + Key: key2, + Value: value, + }) + conn.Write(packet) + conn.Close() + time.Sleep(3 * time.Second) + + for n, client := range dhts { + v, ex := client.Get(key) + + if ex { + for i, b := range v { + if value[i] != b { + t.Fatalf("Incorrect value found by node %d", n) + } + } + t.Logf("found key one at node %d", n) + } else { + t.Fatalf("Value coud not be found by node %d", n) + } + } + c1, c2 := 0, 0 + for _, client := range dhts { + _, e := client.Storage.GetValue(key) + if e { + c1++ + } + _, e = client.Storage.GetValue(key2) + if e { + c2++ + } + } + t.Logf("Key 1 stored %d times and Key 2 stored %d times", c1, c2) +} + +func TestExpireRepublish(t *testing.T) { + configs := make([]kademlia.Config, 0) + for i := 2; i < 4; i++ { + config := &kademlia.Config{ + P2PPort: 7001, + APIPort: 6001, + P2PAddress: "127.0.0." + strconv.Itoa(i), + APIAddress: "127.0.0." + strconv.Itoa(i), + Bootstrapper: "127.0.0.2", + BootstrapperPort: 7001, + BootstrapperCert: "certs/cert2.pem", + KeyFile: fmt.Sprintf("certs/key%d.pem", i), + CertFile: fmt.Sprintf("certs/cert%d.pem", i), + PrefixLength: 5, + RepublishInterval: 10 * time.Second, + } + if i == 2 { + config.Bootstrapper = "" + } + time.Sleep(100 * time.Millisecond) + configs = append(configs, *config) + } + configs[0].Bootstrapper = "" + dht1 := startServer(&configs[0]) + + var key [32]byte + var key2 [32]byte + key2[0] = 1 + value := []byte("Hello World") + dht1.Store(key, value, 5, 10) + dht1.Store(key2, value, 1000, 10) + + dht2 := startServer(&configs[1]) + time.Sleep(100 * time.Millisecond) + if _, e := dht1.Storage.GetValue(key); !e { + t.Fatal("Key not stored!") + } + if _, e := dht1.Storage.GetValue(key2); !e { + t.Fatal("Key not stored!") + } + if _, e := dht2.Storage.GetValue(key2); e { + t.Fatal("Key should not be present yet!") + } + + // Trigger republishing + time.Sleep(10 * time.Second) + kademlia.Ping(dht1.SelfNode, dht2.SelfNode) + time.Sleep(1 * time.Second) + + if _, e := dht1.Storage.GetValue(key); e { + t.Fatal("Key should have been deleted!") + } + if _, e := dht1.Storage.GetValue(key2); !e { + t.Fatal("Key not stored!") + } + if _, e := dht2.Storage.GetValue(key2); !e { + t.Fatal("Key should have been republished!") + } + +} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..eacc572 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,125 @@ +package main + +import ( + "errors" + "flag" + kademlia "gitlab.lrz.de/netintum/teaching/p2psec_projects_2024/DHT-6/pkg" + "gopkg.in/ini.v1" + "log" + "strconv" + "strings" + "time" +) + +// Config holds the config variables for the DHT module + +func main() { + + path := flag.String("c", "", "Path to configuration file") + flag.Parse() + + if *path == "" { + log.Fatalln("[Config Parser] Please provide a configuration file") + } + + dhtConfig := ParseConfig(*path) + + Setup(dhtConfig) +} + +func ParseConfig(configPath string) *kademlia.Config { + config, err := ini.Load(configPath) + if err != nil { + log.Fatalln("[Config Parser] Error reading file: ", err) + } + + var dhtConfig kademlia.Config + + section := config.Section("dht") + + ip, port, err := parseIP(section.Key("p2p_address").String()) + if err != nil { + // Without P2P address we cannot work + log.Fatalf("[Config Parser]: Invalid P2P address provided: %v", err) + } + dhtConfig.P2PAddress = ip + dhtConfig.P2PPort = port + + // If one wants to only provide an additional node to the network without interacting with it oneself, this might + // be fine + ip, port, err = parseIP(section.Key("api_address").String()) + if err != nil { + // Without API setup it would work in theory, one simply would add an additional node to the network + log.Printf("[Config Parser: Invalid API config: %v | API Server will be unreachable", err) + } + dhtConfig.APIAddress = ip + dhtConfig.APIPort = port + + // Invalid or empty bootstrapper field will result in no bootstrap. This is for example required for the first node + // To join/start the network + ip, port, err = parseIP(section.Key("bootstrapper").String()) + if err != nil { + log.Print("[Config Parser] Unable to parse bootstrapper, node will skip boostrap.") + } + dhtConfig.Bootstrapper = ip + dhtConfig.BootstrapperPort = port + + dhtConfig.PrefixLength, err = section.Key("prefix_length").Uint64() + if err != nil { + log.Printf("[Config Parser] Falied to parse prefix length, defaulting to 5: %v", err) + dhtConfig.PrefixLength = 5 + } else if dhtConfig.PrefixLength <= 0 || dhtConfig.PrefixLength > 64 { + log.Fatalf("[Config Parser] Prefix length must be in [1,64], but was %d. Values over 10 are not recommended.", dhtConfig.PrefixLength) + } + + dhtConfig.KeyFile = section.Key("key_file").String() + if len(dhtConfig.KeyFile) == 0 { + dhtConfig.KeyFile = "key.pem" + } + + dhtConfig.CertFile = section.Key("cert_file").String() + if len(dhtConfig.CertFile) == 0 { + dhtConfig.CertFile = "cert.pem" + } + + dhtConfig.BootstrapperCert = section.Key("bootstrapper_cert").String() + + interval, err := section.Key("republish_interval").Uint64() + if err != nil || interval == 0 { + log.Printf("[Config Parser] Falied to parse republish interval, defaulting to 1 hour, %v", err) + interval = 3600 + } + + dhtConfig.RepublishInterval = time.Duration(interval) * time.Second + + return &dhtConfig +} + +func Setup(dhtConfig *kademlia.Config) { + dht := kademlia.NewDHT(dhtConfig) + + go initializeAPIServer(dhtConfig, dht) + initializeP2PServer(dhtConfig, dht) +} + +/* +parseIP splits a string conforming to the syntax for ipv4 and ipv6 addresses into IP and port +*/ +func parseIP(line string) (string, uint64, error) { + var temp []string + if strings.HasPrefix(line, "[") { + temp = strings.Split(line[1:], "]:") + } else { + temp = strings.Split(line, ":") + } + if len(temp) != 2 { + return "", 0, errors.New("invalid IP format") + } + + portTemp, err := strconv.Atoi(temp[1]) + if err != nil { + return "", 0, err + } + port := uint64(portTemp) + return temp[0], port, nil +} diff --git a/cmd/main_test.go b/cmd/main_test.go new file mode 100644 index 0000000..f4e4d4d --- /dev/null +++ b/cmd/main_test.go @@ -0,0 +1,37 @@ +package main + +import ( + "testing" +) + +func TestParseIP(t *testing.T) { + + cases := []struct { + input string + host string + port uint64 + }{ + {"localhost:8080", "localhost", 8080}, + {"127.0.0.1:8080", "127.0.0.1", 8080}, + {"131.159.15.61:6001", "131.159.15.61", 6001}, + {"[::1]:8080", "::1", 8080}, + {"[2001:4ca0:2001:11:226:b9ff:fe7d:84ed]:6001", "2001:4ca0:2001:11:226:b9ff:fe7d:84ed", 6001}, + } + + for _, c := range cases { + var host string + var port uint64 + host, port, _ = parseIP(c.input) + if host != c.host || port != c.port { + t.Errorf("parseIP(%s): Expected %s, %d; got %s, %d", c.input, c.host, c.port, host, port) + } + } +} + +func TestParseConfig(t *testing.T) { + config := ParseConfig("testdata/test.ini") + + if config.PrefixLength != 5 { + t.Fatal("Read wrong value!") + } +} diff --git a/cmd/p2p_server.go b/cmd/p2p_server.go new file mode 100644 index 0000000..4629a79 --- /dev/null +++ b/cmd/p2p_server.go @@ -0,0 +1,175 @@ +package main + +import ( + "crypto/tls" + "encoding/binary" + kademlia "gitlab.lrz.de/netintum/teaching/p2psec_projects_2024/DHT-6/pkg" + "io" + "log" + "net" + "strconv" + "time" +) + +type P2PServer struct { + dhtInstance *kademlia.DHT +} + +func initializeP2PServer(config *kademlia.Config, dht *kademlia.DHT) { + + // Get the certificate ready to accept connections via TLS + cert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile) + if err != nil { + log.Fatal(err) + } + + tlsConfig := &tls.Config{Certificates: []tls.Certificate{cert}} + listener, err := tls.Listen("tcp", config.P2PAddress+":"+strconv.Itoa(int(config.P2PPort)), tlsConfig) + + if err != nil { + log.Fatalln("[P2P Server] Error starting server: ", err) + } + defer listener.Close() + server := &P2PServer{dhtInstance: dht} + + lastUpdated := time.Now() + + for { + connection, err := listener.Accept() + + // Expire and republish entries periodically + if time.Since(lastUpdated) > config.RepublishInterval { + go server.dhtInstance.Housekeeping(time.Now().Sub(lastUpdated)) + lastUpdated = time.Now() + } + + if err != nil { + log.Println("[P2P Server] Error accepting connection: ", err) + continue + } + + // Handle in separate thread and close when done + go func() { + server.handleP2PMessage(connection) + connection.Close() + }() + } +} + +func (p *P2PServer) handleP2PMessage(connection net.Conn) { + // Add a lenient timeout so connections cannot be held open forever. + err := connection.SetDeadline(time.Now().Add(10 * time.Second)) + if err != nil { + log.Println("[P2P Server] Error setting deadline: ", err) + return + } + + buf := make([]byte, 4) + _, err = io.ReadFull(connection, buf) + if err != nil { + log.Println("[P2P Server] Error reading message: ", err) + return + } + size := binary.BigEndian.Uint16(buf[:2]) + messageType := binary.BigEndian.Uint16(buf[2:]) + + // Subtract 4 bytes used for size and message type + buf2 := make([]byte, size-4) + _, err = io.ReadFull(connection, buf2) + if err != nil { + log.Println("[P2P Server] Error reading message: ", err) + return + } + + // combine header and body to get the full message again + data := append(buf, buf2...) + + switch messageType { + case kademlia.STORE: + packet, err := kademlia.ParseStorePacket(data) + if err != nil { + log.Println("[P2P Server] Error parsing store packet: ", err) + return + } + p.handleStore(packet) + case kademlia.PING: + packet, err := kademlia.ParsePingPacket(data) + if err != nil { + log.Println("[P2P Server] Error parsing ping packet: ", err) + return + } + p.handlePing(packet, connection) + case kademlia.FINDNODE: + packet, err := kademlia.ParseFindNodePacket(data) + if err != nil { + log.Println("[P2P Server] Error parsing find node packet: ", err) + return + } + p.handleFindNode(packet, connection) + case kademlia.FINDVALUE: + packet, err := kademlia.ParseFindValuePacket(data) + if err != nil { + log.Println("[P2P Server] Error parsing find value packet: ", err) + return + } + p.handleFindValue(packet, connection) + default: + log.Println("[P2P Server] Invalid message type") + } +} + +func (p *P2PServer) handleStore(packet *kademlia.P2PStorePacket) { + // Always add the sender to the routing table + senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate) + go p.dhtInstance.RoutingTable.Insert(senderNode) + + nodes := p.dhtInstance.RoutingTable.FindClosest(packet.Key, int(packet.RepCount)) + + if len(*nodes) < int(packet.RepCount) || kademlia.ContainsID((*nodes)[:packet.RepCount], p.dhtInstance.SelfNode.NodeID) { + log.Printf("Storing value...") + value := packet.Value + p.dhtInstance.Storage.Put(packet.Key, value, packet.TTL, packet.RepCount) + } else { + log.Printf("Stored at enough other nodes, no need to.") + } +} + +func (p *P2PServer) handlePing(packet *kademlia.P2PPingPacket, connection net.Conn) { + // Always add the sender to the routing table + senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate) + go p.dhtInstance.RoutingTable.Insert(senderNode) + + reply := kademlia.SerialisePongPacket(&kademlia.P2PPongPacket{}) + _, _ = connection.Write(reply) +} + +func (p *P2PServer) handleFindNode(packet *kademlia.P2PFindNodePacket, connection net.Conn) { + // Always add the sender to the routing table + senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate) + go p.dhtInstance.RoutingTable.Insert(senderNode) + + nodes := p.dhtInstance.RoutingTable.FindClosest(packet.Key, int(packet.Amount)) + + reply := kademlia.SerialiseFoundNodePacket(&kademlia.P2PFoundNodePacket{Nodes: *nodes}) + + _, _ = connection.Write(reply) +} + +func (p *P2PServer) handleFindValue(packet *kademlia.P2PFindValuePacket, connection net.Conn) { + // Always add the sender to the routing table + senderNode := kademlia.NodeFromIpPort(packet.SourceIP, uint64(packet.SourcePort), packet.Certificate) + go p.dhtInstance.RoutingTable.Insert(senderNode) + + val, exists := p.dhtInstance.Storage.GetValue(packet.Key) + + if exists { + reply := kademlia.P2PFoundValuePacket{ + Value: val, + } + _, _ = connection.Write(kademlia.SerialiseFoundValuePacket(&reply)) + } else { + reply := kademlia.SerialiseNotFoundValuePacket(&kademlia.P2PNotFoundPValuePacket{}) + _, _ = connection.Write(reply) + } + +} diff --git a/cmd/p2p_server_test.go b/cmd/p2p_server_test.go new file mode 100644 index 0000000..666f014 --- /dev/null +++ b/cmd/p2p_server_test.go @@ -0,0 +1,47 @@ +package main + +import ( + kademlia "gitlab.lrz.de/netintum/teaching/p2psec_projects_2024/DHT-6/pkg" + "io" + "testing" + "time" +) + +func TestPingP2PServer(t *testing.T) { + ip := "127.0.0.1" + port := uint64(8080) + + config := &kademlia.Config{ + P2PAddress: ip, + P2PPort: port, + KeyFile: "./certs/p2pkey.pem", + CertFile: "./certs/p2pcert.pem", + } + dht := kademlia.NewDHT(config) + go initializeP2PServer(config, dht) + + // Wait for server to start + time.Sleep(1 * time.Second) + + conn, err := kademlia.ConnectToNode(dht.SelfNode) + if err != nil { + t.Fatalf("Testing P2PServer: Failed to connect to server: %s", err) + } + defer conn.Close() + ping := kademlia.SerialisePingPacket(&kademlia.P2PPingPacket{ + SourceIP: dht.SelfNode.IPAddress, + SourcePort: uint16(dht.SelfNode.Port), + Certificate: dht.SelfNode.Certificate, + }) + + deadline := time.Now().Add(1 * time.Second) + conn.SetDeadline(deadline) + conn.Write(ping) + reply := make([]byte, 4) + io.ReadFull(conn, reply) + + if !kademlia.PacketsEqual(reply, kademlia.SerialisePongPacket(&kademlia.P2PPongPacket{})) { + t.Fatalf("Testing P2PServer: Failed, invalid Ping response") + } + +} diff --git a/cmd/testdata/test.ini b/cmd/testdata/test.ini new file mode 100644 index 0000000..e7d5fbb --- /dev/null +++ b/cmd/testdata/test.ini @@ -0,0 +1,15 @@ +hostkey = /hostkey.pem + +[gossip] +cache_size = 50 +degree = 30 +bootstrapper = p2psec.net.in.tum.de:6001 +p2p_address = 131.159.15.61:6001 +api_address = 131.159.15.61:7001 + +[dht] +bootstrapper = p2psec.net.in.tum.de:6001 +p2p_address = 131.159.15.61:6001 +api_address = 131.159.15.61:7001 +prefix_length = 5 +republish_interval = 3600 diff --git a/cmd/tls_test.go b/cmd/tls_test.go new file mode 100644 index 0000000..a485b85 --- /dev/null +++ b/cmd/tls_test.go @@ -0,0 +1,66 @@ +package main + +import ( + "crypto/tls" + "crypto/x509" + "gitlab.lrz.de/netintum/teaching/p2psec_projects_2024/DHT-6/pkg" + "io" + "log" + "os" + "testing" + "time" +) + +func TestBasicTLSConnection(t *testing.T) { + kademlia.GenerateCert("127.0.0.1", "cert.pem", "key.pem") + + go StartTLSServer(t) + + cert, err := os.ReadFile("cert.pem") + if err != nil { + t.Fatal(err) + } + + certPool := x509.NewCertPool() + if ok := certPool.AppendCertsFromPEM(cert); !ok { + t.Fatal("Unable to parse cert from file") + } + config := &tls.Config{RootCAs: certPool} + time.Sleep(time.Second) + conn, err := tls.Dial("tcp", "127.0.0.1:8001", config) + + _, err = io.WriteString(conn, "Hello World") + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + +} + +func StartTLSServer(t *testing.T) { + cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem") + if err != nil { + t.Fatal(err) + } + + config := &tls.Config{Certificates: []tls.Certificate{cert}} + + l, err := tls.Listen("tcp", ":8001", config) + if err != nil { + t.Fatal(err) + } + defer l.Close() + + for { + conn, err := l.Accept() + if err != nil { + t.Fatal(err) + } + log.Print("Accepted connection") + + buf := make([]byte, 20) + + conn.Read(buf) + t.Logf("%s", buf) + } +} diff --git a/commandlineDHT/commandlineDHT b/commandlineDHT/commandlineDHT new file mode 100644 index 0000000..f593897 Binary files /dev/null and b/commandlineDHT/commandlineDHT differ diff --git a/commandlineDHT/main.go b/commandlineDHT/main.go new file mode 100644 index 0000000..b964fde --- /dev/null +++ b/commandlineDHT/main.go @@ -0,0 +1,176 @@ +package main + +import ( + "encoding/binary" + "errors" + "flag" + kademlia "gitlab.lrz.de/netintum/teaching/p2psec_projects_2024/DHT-6/pkg" + "io" + "log" + "net" + "os" + "strconv" + "strings" +) + +// Config holds the config variables for the DHT module + +func main() { + + mode := flag.String("m", "", "Whether to send a put or get packet") + ttl := flag.Int("ttl", 3600, "TTL for sending a put or get packet") + replication := flag.Int("rep", 20, "Replication factor for put packet") + address := flag.String("ip", "", "IP and port of API Server in for 127.0.0.1:1234") + keyStr := flag.String("key", "", "Key to be stored or retrieved") + valueStr := flag.String("value", "", "Value to pe put") + certPath := flag.String("cert", "", "Path to certificate") + flag.Parse() + + var key [32]byte + copy(key[:], *keyStr) + value := []byte(*valueStr) + + switch *mode { + case "put": + conn, err := net.Dial("tcp", *address) + if err != nil { + log.Fatal(err) + } + packet := kademlia.DHTPutPacket{ + TTL: uint16(*ttl), + Replication: byte(*replication), + Key: key, + Value: value, + } + conn.Write(kademlia.SerialisePutPacket(&packet)) + conn.Close() + case "get": + conn, err := net.Dial("tcp", *address) + if err != nil { + log.Fatal(err) + } + packet := kademlia.DHTGetPacket{Key: key} + conn.Write(kademlia.SerialiseGetPacket(&packet)) + + header := make([]byte, 4) + _, err = io.ReadFull(conn, header) + if err != nil { + log.Fatal(err) + } + + size := binary.BigEndian.Uint16(header[0:2]) + messageType := binary.BigEndian.Uint16(header[2:4]) + body := make([]byte, size-4) + io.ReadFull(conn, body) + response := append(header, body...) + switch messageType { + case kademlia.FAILURE: + log.Print("No value found") + conn.Close() + + case kademlia.SUCCESS: + + responsePack, err := kademlia.ParseSucceessPacket(response) + if err != nil { + log.Fatal(err) + } + log.Printf("Found value %s", responsePack.Value) + conn.Close() + + default: + flag.PrintDefaults() + } + + case "ping": + cert, err := os.ReadFile(*certPath) + if err != nil { + log.Fatal(err) + } + + ip, port, err := parseIP(*address) + conn, err := kademlia.ConnectToNode(kademlia.P2PNode{ + NodeID: [32]byte{}, + IPAddress: net.ParseIP(ip), + Port: port, + Certificate: cert, + }) + if err != nil { + log.Fatal(err) + } + conn.Write(kademlia.SerialisePingPacket(&kademlia.P2PPingPacket{ + SourceIP: net.ParseIP(ip), + SourcePort: uint16(port), + Certificate: cert, + })) + + reply := make([]byte, 4) + _, err = io.ReadFull(conn, reply) + log.Print("Got Reply") + + case "find": + cert, err := os.ReadFile(*certPath) + if err != nil { + log.Fatal(err) + } + + ip, port, err := parseIP(*address) + conn, err := kademlia.ConnectToNode(kademlia.P2PNode{ + NodeID: [32]byte{}, + IPAddress: net.ParseIP(ip), + Port: port, + Certificate: cert, + }) + if err != nil { + log.Fatal(err) + } + conn.Write(kademlia.SerialiseFindValuePacket(&kademlia.P2PFindValuePacket{ + SourceIP: net.ParseIP(ip), + SourcePort: uint16(port), + Certificate: cert, + Key: key, + })) + + responseHeader := make([]byte, 4) + _, err = io.ReadFull(conn, responseHeader) + if err != nil { + log.Print(err) + return + } + size := binary.BigEndian.Uint16(responseHeader[0:2]) + responseType := binary.BigEndian.Uint16(responseHeader[2:4]) + + switch responseType { + case kademlia.FOUNDVALUE: + responseBody := make([]byte, size-4) + _, err = io.ReadFull(conn, responseBody) + if err != nil { + log.Print(err) + return + } + reply := kademlia.ParseFoundValuePacket(append(responseHeader, responseBody...)) + log.Printf("Found value %s", string(reply.Value)) + default: + log.Printf("Value not present") + } + } + +} + +func parseIP(line string) (string, uint64, error) { + var temp []string + if strings.HasPrefix(line, "[") { + temp = strings.Split(line[1:], "]:") + } else { + temp = strings.Split(line, ":") + } + if len(temp) != 2 { + return "", 0, errors.New("invalid IP format") + } + + portTemp, err := strconv.Atoi(temp[1]) + if err != nil { + return "", 0, err + } + port := uint64(portTemp) + return temp[0], port, nil +} diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..bd6563e --- /dev/null +++ b/compose.yaml @@ -0,0 +1,35 @@ +services: + boot_container: + image: dht:latest + container_name: bootstrapper + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.10 + ports: + - "6001:6001" + - "7001:7001" + command: ["./docker-gs-ping", "-c", "/shared/config_boot.ini"] + + container1: + image: dht:latest + container_name: container1 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.101 + ports: + - "6002:6001" + - "7002:7001" + command: ["./docker-gs-ping", "-c", "/shared/config1.ini"] + +networks: + my_network: + name: dhtnet + driver: bridge + ipam: + config: + - subnet: 10.0.0.0/16 + diff --git a/composeLarge.yaml b/composeLarge.yaml new file mode 100644 index 0000000..4360f99 --- /dev/null +++ b/composeLarge.yaml @@ -0,0 +1,543 @@ +services: + boot_container: + image: dht:latest + container_name: bootstrapper + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.10 + ports: + - "6001:6001" + - "7001:7001" + command: ["./docker-gs-ping", "-c", "/shared/config_boot.ini"] + + container1: + image: dht:latest + container_name: container1 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.101 + ports: + - "6002:6001" + - "7002:7001" + command: ["./docker-gs-ping", "-c", "/shared/config1.ini"] + + container2: + image: dht:latest + container_name: container2 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.102 + ports: + - "6003:6001" + - "7003:7001" + command: ["./docker-gs-ping", "-c", "/shared/config2.ini"] + + container3: + image: dht:latest + container_name: container3 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.103 + ports: + - "6004:6001" + - "7004:7001" + command: ["./docker-gs-ping", "-c", "/shared/config3.ini"] + + container4: + image: dht:latest + container_name: container4 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.104 + ports: + - "6005:6001" + - "7005:7001" + command: ["./docker-gs-ping", "-c", "/shared/config4.ini"] + + container5: + image: dht:latest + container_name: container5 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.105 + ports: + - "6006:6001" + - "7006:7001" + command: ["./docker-gs-ping", "-c", "/shared/config5.ini"] + + container6: + image: dht:latest + container_name: container6 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.106 + ports: + - "6007:6001" + - "7007:7001" + command: ["./docker-gs-ping", "-c", "/shared/config6.ini"] + + container7: + image: dht:latest + container_name: container7 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.107 + ports: + - "6008:6001" + - "7008:7001" + command: ["./docker-gs-ping", "-c", "/shared/config7.ini"] + + container8: + image: dht:latest + container_name: container8 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.108 + ports: + - "6009:6001" + - "7009:7001" + command: ["./docker-gs-ping", "-c", "/shared/config8.ini"] + + container9: + image: dht:latest + container_name: container9 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.109 + ports: + - "6010:6001" + - "7010:7001" + command: ["./docker-gs-ping", "-c", "/shared/config9.ini"] + + container10: + image: dht:latest + container_name: container10 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.110 + ports: + - "6011:6001" + - "7011:7001" + command: ["./docker-gs-ping", "-c", "/shared/config10.ini"] + + container11: + image: dht:latest + container_name: container11 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.111 + ports: + - "6012:6001" + - "7012:7001" + command: ["./docker-gs-ping", "-c", "/shared/config11.ini"] + + container12: + image: dht:latest + container_name: container12 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.112 + ports: + - "6013:6001" + - "7013:7001" + command: ["./docker-gs-ping", "-c", "/shared/config12.ini"] + + container13: + image: dht:latest + container_name: container13 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.113 + ports: + - "6014:6001" + - "7014:7001" + command: ["./docker-gs-ping", "-c", "/shared/config13.ini"] + + container14: + image: dht:latest + container_name: container14 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.114 + ports: + - "6015:6001" + - "7015:7001" + command: ["./docker-gs-ping", "-c", "/shared/config14.ini"] + + container15: + image: dht:latest + container_name: container15 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.115 + ports: + - "6016:6001" + - "7016:7001" + command: ["./docker-gs-ping", "-c", "/shared/config15.ini"] + + container16: + image: dht:latest + container_name: container16 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.116 + ports: + - "6017:6001" + - "7017:7001" + command: ["./docker-gs-ping", "-c", "/shared/config16.ini"] + + container17: + image: dht:latest + container_name: container17 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.117 + ports: + - "6018:6001" + - "7018:7001" + command: ["./docker-gs-ping", "-c", "/shared/config17.ini"] + + container18: + image: dht:latest + container_name: container18 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.118 + ports: + - "6019:6001" + - "7019:7001" + command: ["./docker-gs-ping", "-c", "/shared/config18.ini"] + + container19: + image: dht:latest + container_name: container19 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.119 + ports: + - "6020:6001" + - "7020:7001" + command: ["./docker-gs-ping", "-c", "/shared/config19.ini"] + + container20: + image: dht:latest + container_name: container20 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.120 + ports: + - "6021:6001" + - "7021:7001" + command: ["./docker-gs-ping", "-c", "/shared/config20.ini"] + + container21: + image: dht:latest + container_name: container21 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.121 + ports: + - "6022:6001" + - "7022:7001" + command: ["./docker-gs-ping", "-c", "/shared/config21.ini"] + + container22: + image: dht:latest + container_name: container22 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.122 + ports: + - "6023:6001" + - "7023:7001" + command: ["./docker-gs-ping", "-c", "/shared/config22.ini"] + + container23: + image: dht:latest + container_name: container23 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.123 + ports: + - "6024:6001" + - "7024:7001" + command: ["./docker-gs-ping", "-c", "/shared/config23.ini"] + + container24: + image: dht:latest + container_name: container24 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.124 + ports: + - "6025:6001" + - "7025:7001" + command: ["./docker-gs-ping", "-c", "/shared/config24.ini"] + + container25: + image: dht:latest + container_name: container25 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.125 + ports: + - "6026:6001" + - "7026:7001" + command: ["./docker-gs-ping", "-c", "/shared/config25.ini"] + + container26: + image: dht:latest + container_name: container26 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.126 + ports: + - "6027:6001" + - "7027:7001" + command: ["./docker-gs-ping", "-c", "/shared/config26.ini"] + + container27: + image: dht:latest + container_name: container27 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.127 + ports: + - "6028:6001" + - "7028:7001" + command: ["./docker-gs-ping", "-c", "/shared/config27.ini"] + + container28: + image: dht:latest + container_name: container28 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.128 + ports: + - "6029:6001" + - "7029:7001" + command: ["./docker-gs-ping", "-c", "/shared/config28.ini"] + + container29: + image: dht:latest + container_name: container29 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.129 + ports: + - "6030:6001" + - "7030:7001" + command: ["./docker-gs-ping", "-c", "/shared/config29.ini"] + + container30: + image: dht:latest + container_name: container30 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.130 + ports: + - "6031:6001" + - "7031:7001" + command: ["./docker-gs-ping", "-c", "/shared/config30.ini"] + + container31: + image: dht:latest + container_name: container31 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.131 + ports: + - "6032:6001" + - "7032:7001" + command: ["./docker-gs-ping", "-c", "/shared/config31.ini"] + + container32: + image: dht:latest + container_name: container32 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.132 + ports: + - "6033:6001" + - "7033:7001" + command: ["./docker-gs-ping", "-c", "/shared/config32.ini"] + + container33: + image: dht:latest + container_name: container33 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.133 + ports: + - "6034:6001" + - "7034:7001" + command: ["./docker-gs-ping", "-c", "/shared/config33.ini"] + + container34: + image: dht:latest + container_name: container34 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.134 + ports: + - "6035:6001" + - "7035:7001" + command: ["./docker-gs-ping", "-c", "/shared/config34.ini"] + + container35: + image: dht:latest + container_name: container35 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.135 + ports: + - "6036:6001" + - "7036:7001" + command: ["./docker-gs-ping", "-c", "/shared/config35.ini"] + + container36: + image: dht:latest + container_name: container36 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.136 + ports: + - "6037:6001" + - "7037:7001" + command: ["./docker-gs-ping", "-c", "/shared/config36.ini"] + + container37: + image: dht:latest + container_name: container37 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.137 + ports: + - "6038:6001" + - "7038:7001" + command: ["./docker-gs-ping", "-c", "/shared/config37.ini"] + + container38: + image: dht:latest + container_name: container38 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.138 + ports: + - "6039:6001" + - "7039:7001" + command: ["./docker-gs-ping", "-c", "/shared/config38.ini"] + + container39: + image: dht:latest + container_name: container39 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.139 + ports: + - "6040:6001" + - "7040:7001" + command: ["./docker-gs-ping", "-c", "/shared/config39.ini"] + + container40: + image: dht:latest + container_name: container40 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.140 + ports: + - "6041:6001" + - "7041:7001" + command: ["./docker-gs-ping", "-c", "/shared/config40.ini"] + + +networks: + my_network: + name: dhtnet + driver: bridge + ipam: + config: + - subnet: 10.0.0.0/16 + diff --git a/composeRep.yaml b/composeRep.yaml new file mode 100644 index 0000000..0541f0d --- /dev/null +++ b/composeRep.yaml @@ -0,0 +1,35 @@ +services: + boot_container: + image: dht:latest + container_name: bootstrapper + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.10 + ports: + - "6001:6001" + - "7001:7001" + command: ["./docker-gs-ping", "-c", "/shared/config_boot_short.ini"] + + container1: + image: dht:latest + container_name: container1 + volumes: + - ./shared_data:/shared + networks: + my_network: + ipv4_address: 10.0.0.101 + ports: + - "6002:6001" + - "7002:7001" + command: ["./docker-gs-ping", "-c", "/shared/config1.ini"] + +networks: + my_network: + name: dhtnet + driver: bridge + ipam: + config: + - subnet: 10.0.0.0/16 + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..68b73fa --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module gitlab.lrz.de/netintum/teaching/p2psec_projects_2024/DHT-6 + +go 1.22.2 + +require ( + github.com/stretchr/testify v1.9.0 // indirect + gopkg.in/ini.v1 v1.67.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d44a430 --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/dht_test_utilts.go b/pkg/dht_test_utilts.go new file mode 100644 index 0000000..e49df0f --- /dev/null +++ b/pkg/dht_test_utilts.go @@ -0,0 +1,22 @@ +package kademlia + +import ( + "net" + "strconv" +) + +func PacketsEqual(a, b []byte) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func ConnectToApi(IP string, port int) (net.Conn, error) { + return net.Dial("tcp", IP+":"+strconv.Itoa(port)) +} diff --git a/pkg/dht_utils.go b/pkg/dht_utils.go new file mode 100644 index 0000000..8e05c16 --- /dev/null +++ b/pkg/dht_utils.go @@ -0,0 +1,162 @@ +package kademlia + +import ( + "crypto/sha256" + "crypto/tls" + "crypto/x509" + "encoding/binary" + "errors" + "net" + "slices" + "sort" + "strconv" + "time" +) + +func GetPeerIdFromAddr(addr net.Addr) [32]byte { + addrTCP := addr.(*net.TCPAddr) + ipPortConcat := binary.BigEndian.AppendUint16(addrTCP.IP.To16(), uint16(addrTCP.Port)) + peerId := sha256.Sum256(ipPortConcat) + return peerId +} + +func GetPeerId(ip net.IP, port uint16) [32]byte { + val := binary.BigEndian.AppendUint16(ip.To16(), port) + res := sha256.Sum256(val) + return res +} + +func CreateNode(ip string, port uint64, cert []byte) P2PNode { + ipA := net.ParseIP(ip) + return P2PNode{ + NodeID: GetPeerId(ipA, uint16(port)), + IPAddress: ipA, + Port: port, + Certificate: cert, + } +} + +func NodeFromIpPort(ip net.IP, port uint64, cert []byte) P2PNode { + return P2PNode{ + NodeID: GetPeerId(ip, uint16(port)), + IPAddress: ip, + Port: port, + Certificate: cert, + } +} + +func ContainsID(nodes []P2PNode, target [32]byte) bool { + for _, node := range nodes { + if node.NodeID == target { + return true + } + } + return false +} + +func SortNodesByDistanceToKey(key [32]byte, nodes []P2PNode) []P2PNode { + sort.SliceStable(nodes, func(i, j int) bool { + return IsCloserToKey(nodes[i].NodeID, nodes[j].NodeID, key) + }) + return nodes +} + +func RemoveDuplicates(nodes []P2PNode) []P2PNode { + return slices.CompactFunc(nodes, func(node P2PNode, node2 P2PNode) bool { + return node.NodeID == node2.NodeID + }) +} + +func NodeEquals(node P2PNode, node2 P2PNode) bool { + return node.NodeID == node2.NodeID +} + +// IsCloserToKey returns whether the first argument is closer to the key than the second one when using the XOR metric +// everything is closer to a key than nil. +func IsCloserToKey(a, b, key [32]byte) bool { + for i, _ := range key { + if a[i] == b[i] { + continue + } else { + return a[i]^key[i] < b[i]^key[i] + } + } + return false +} + +/* +ConnectToNode attempts to establish a connection to the provided node and puts a 30 second timeout on it. +*/ +func ConnectToNode(node P2PNode) (net.Conn, error) { + certPool := x509.NewCertPool() + if ok := certPool.AppendCertsFromPEM(node.Certificate); !ok { + return nil, errors.New("[Utils]: Unable to parse cert") + } + tlsConfig := &tls.Config{RootCAs: certPool} + + addr := node.IPAddress.String() + ":" + strconv.Itoa(int(node.Port)) + + conn, err := tls.Dial("tcp", addr, tlsConfig) + + if err == nil { + // Set lenient timeout in case of unresponsive other peer + _ = conn.SetDeadline(time.Now().Add(time.Second * 30)) + } + return conn, err +} + +func ContainsNode(bucket []P2PNode, node P2PNode) bool { + for _, n := range bucket { + if n.NodeID == node.NodeID { + return true + } + } + return false +} + +/* +getBitsAt retrieves a specified number of bits from a [32]byte. Beginning with the bit at index startBit, it +retrieves numBits bits. numBits cannot be greater than 64 and has to stay within bounds. +*/ +func getBitsAt(id [32]byte, startBit uint64, numBits uint64) uint64 { + if numBits <= 0 || startBit < 0 || startBit+numBits > 256 || numBits > 64 { + return uint64(0) + } + + var result uint64 + var bitPos uint64 + + for i := uint64(0); i < numBits; i++ { + byteIndex := (startBit + i) / 8 + bitIndex := (startBit + i) % 8 + + bit := (id[byteIndex] >> (7 - bitIndex)) & 1 + result = (result << 1) | uint64(bit) + + bitPos++ + } + + return result +} + +/* +setBitAt sets the bit of a [32]byte at the specified index to the specified value +*/ +func setBitAt(id [32]byte, index int, value bool) [32]byte { + if index >= 256 { + return id + } + + byteIndex := index / 8 + bitIndex := index % 8 + + if value { + // Set the bit to 1 + id[byteIndex] |= 1 << uint(7-bitIndex) + } else { + // Set the bit to 0 + id[byteIndex] &^= 1 << uint(7-bitIndex) + } + + return id +} diff --git a/pkg/kademlia.go b/pkg/kademlia.go new file mode 100644 index 0000000..931e81b --- /dev/null +++ b/pkg/kademlia.go @@ -0,0 +1,483 @@ +package kademlia + +import ( + "context" + "encoding/binary" + "errors" + "io" + "log" + "net" + "os" + "sync" + "time" +) + +const ( + ProofOfWorkBits = 2 + Parallelisation = 3 +) + +type Config struct { + Bootstrapper string // Bootstrapper IP + BootstrapperPort uint64 + P2PAddress string + P2PPort uint64 + APIAddress string + APIPort uint64 + RepublishInterval time.Duration + PrefixLength uint64 // Bits matched per bucket in routing table + CertFile string // Where to store the certificate + KeyFile string // Where to store the key + BootstrapperCert string // Path to the already present bootstrapper's certificate +} + +type DHT struct { + RoutingTable *RoutingTable + SelfNode P2PNode + Storage DHTStorage + config *Config +} + +/* +NewDHT Creates a new DHT object based on the configuration passed. If the bootstrap field of the config is left as empty +string, bootstrapping will be skipped. +*/ +func NewDHT(config *Config) *DHT { + nodeIP := net.ParseIP(config.P2PAddress) + GenerateCert(config.P2PAddress, config.CertFile, config.KeyFile) + + cert, err := os.ReadFile(config.CertFile) + if err != nil { + log.Fatal(err) + } + + selfNode := P2PNode{ + NodeID: GetPeerId(nodeIP, uint16(config.P2PPort)), + IPAddress: nodeIP, + Port: config.P2PPort, + Certificate: cert, + } + dht := &DHT{ + Storage: DHTStorage{ + //storageMap: sync.Map{}, + storageMap: map[[32]byte]ValueType{}, + }, + SelfNode: selfNode, + RoutingTable: NewRoutingTable(selfNode, config.PrefixLength), + config: config, + } + + dht.bootstrap() + return dht +} + +type P2PNode struct { + NodeID [32]byte + IPAddress net.IP + Port uint64 + Certificate []byte +} + +/* +bootstrap populates the routing table upon creation of the dht. +*/ +func (d *DHT) bootstrap() { + if len(d.config.Bootstrapper) == 0 { + return + } + + bootstrapperCert, err := os.ReadFile(d.config.BootstrapperCert) + if err != nil { + log.Printf("[Boostrap] Failed to load boostrapper certificate: %v Proceeding without boostrap!", err) + return + } + + bootstrapNode := CreateNode(d.config.Bootstrapper, d.config.BootstrapperPort, bootstrapperCert) + + d.RoutingTable.Insert(bootstrapNode) + + //By flipping bits of our ID, we can query for nodes at specific distances to us, filling up the respective + //Buckets of the routing table. + for i := 0; i < 256; i++ { + toQuery := setBitAt(d.SelfNode.NodeID, i, getBitsAt(d.SelfNode.NodeID, uint64(i), 1) == 0) //Flip the bit + targets := *d.RoutingTable.FindClosest(toQuery, bucketSize) + if len(targets) == 0 { + log.Printf("This should not have happened") + } + targets = SortNodesByDistanceToKey(d.SelfNode.NodeID, targets) + + // Once we find ourself, we will do so in further steps as well, so no need to go on. + if NodeEquals(targets[0], d.SelfNode) { + break + } + res, err := d.FindNodesAtNode(toQuery, targets[0], bucketSize) + if err != nil { + log.Printf("[DHT]: Error during bootstrap of node %v: %v", d.SelfNode.IPAddress, err) + continue + } + for _, node := range res { + d.RoutingTable.Insert(node) + } + } +} + +/* +Store stores an entry at the specified key in the network. For this it first finds the nodes responsible for the key and +then sends STORE requests to them. +*/ +func (d *DHT) Store(key [32]byte, value []byte, ttl uint16, repCount uint16) { + // Find the closest nodes to the key + targets, err := d.FindNode(key, repCount) + if err != nil { + return + } + + // If we are one of the closest nodes, we store it ourselves + if ContainsNode(targets, d.SelfNode) { + d.Storage.Put(key, value, ttl, repCount) + } + + //Do proof of work only once + pow := ProofWorkForStore(key, value) + + // Replicate the value to the closest nodes + for i := 0; i < int(repCount) && i < len(targets); i++ { + if targets[i].NodeID != d.SelfNode.NodeID { + // Send the value to the target + // log.Printf("Storing at %v, entry %d", targets[i].IPAddress, i) + go d.sendStore(key, value, repCount, ttl, targets[i], pow) + } + } +} + +func (d *DHT) Get(key [32]byte) ([]byte, bool) { + return d.FindValue(key) +} + +/* +Ping sends a PING message to the specified node and returns if it is still alive +*/ +func Ping(node P2PNode, selfNode P2PNode) bool { + connection, err := ConnectToNode(node) + if err != nil { + return false + } + defer connection.Close() + + packet := P2PPingPacket{ + SourceIP: selfNode.IPAddress, + SourcePort: uint16(selfNode.Port), + Certificate: selfNode.Certificate, + } + + _, err = connection.Write(SerialisePingPacket(&packet)) + if err != nil { + return false + } + + pingTimeout := 1 * time.Second + err = connection.SetDeadline(time.Now().Add(pingTimeout)) + if err != nil { + return false + } + + receiveBuffer := make([]byte, 4) + _, err = io.ReadFull(connection, receiveBuffer) + if err != nil { + return false + } + + recvSize := binary.BigEndian.Uint16(receiveBuffer[:2]) + recvType := binary.BigEndian.Uint16(receiveBuffer[2:]) + if recvType != PONG || recvSize != 4 { + return false + } + return true +} + +/* +FindValue queries the DHT network for the value related to the provided key. It does so in a highly parallel fashion, +querying the network for the closest nodes to the key first and then querying those for the key. +It returns the key, if found +*/ +func (d *DHT) FindValue(key [32]byte) ([]byte, bool) { + v, e := d.Storage.GetValue(key) + if e { + return v, true + } + + toSearch, err := d.FindNode(key, bucketSize) + if toSearch == nil || err != nil { + return nil, false + } + + resultChannel := make(chan []byte) + done := make(chan struct{}) + + var wg sync.WaitGroup + for _, node := range toSearch { + wg.Add(1) + go func() { + d.findValueAtNode(resultChannel, key, node) + wg.Done() + }() + } + go func() { + wg.Wait() + close(done) + }() + select { + case result := <-resultChannel: + return result, true + case <-done: + return nil, false + } +} + +/* +findValueAtNode connects to a node and sends a FINDVALUE request for the specified key. In case it is successful it +reports it result to the results channel. If not, it exits with no reply, allowing for parallel queries. +*/ +func (d *DHT) findValueAtNode(results chan<- []byte, key [32]byte, target P2PNode) { + connection, err := ConnectToNode(target) + if err != nil { + return + } + + defer connection.Close() + packet := P2PFindValuePacket{ + SourceIP: d.SelfNode.IPAddress, + SourcePort: uint16(d.SelfNode.Port), + Certificate: d.SelfNode.Certificate, + Key: key, + } + _, err = connection.Write(SerialiseFindValuePacket(&packet)) + if err != nil { + return + } + + responseHeader := make([]byte, 4) + _, err = io.ReadFull(connection, responseHeader) + if err != nil { + log.Print(err) + return + } + size := binary.BigEndian.Uint16(responseHeader[0:2]) + responseType := binary.BigEndian.Uint16(responseHeader[2:4]) + + switch responseType { + case FOUNDVALUE: + responseBody := make([]byte, size-4) + _, err = io.ReadFull(connection, responseBody) + if err != nil { + log.Print(err) + return + } + reply := ParseFoundValuePacket(append(responseHeader, responseBody...)) + results <- reply.Value + return + default: + return + } +} + +/* +sendStore connects to the target node and sends a STORE packet matching the parameters provided. +*/ +func (d *DHT) sendStore(key [32]byte, value []byte, repCount uint16, ttl uint16, target P2PNode, pow [32]byte) { + connection, err := ConnectToNode(target) + if err != nil { + return + } + + defer connection.Close() + + packet := P2PStorePacket{ + SourceIP: d.SelfNode.IPAddress, + SourcePort: uint16(d.SelfNode.Port), + Certificate: d.SelfNode.Certificate, + TTL: ttl, + RepCount: repCount, + PoW: pow, + Key: key, + Value: value, + } + _, err = connection.Write(SerialiseStorePacket(&packet)) + + if err != nil { + print("Error sending store message %v", err) + return + } +} + +// FindNode queries the DHT until it found the closest node to the given key. It then returns the reply +// of that node to a FINDNODE request for that key +// The search is performed on several nodes in parallel, depending on the Parallelisation setting +func (d *DHT) FindNode(key [32]byte, numberOfNodes uint16) ([]P2PNode, error) { + toSearch := *d.RoutingTable.FindClosest(key, Parallelisation*5) + + // If our routing table is empty, we apparently are alone + if len(toSearch) == 0 { + d.RoutingTable.Insert(d.SelfNode) + toSearch = append(toSearch, d.SelfNode) + return toSearch, nil + } + + // Create a context to stop any loose running threads once a result has been found + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resultChannel := make(chan []P2PNode) + done := make(chan struct{}) + + var wg sync.WaitGroup + for i := 0; i < Parallelisation; i++ { + wg.Add(1) + go func() { + // Start from different positions to not query the same node twice + startIndex := i * len(toSearch) / Parallelisation + _ = d.findClosestNodes(ctx, resultChannel, key, toSearch[startIndex:], numberOfNodes) + wg.Done() + }() + } + + // The done channel is only closed, if all routines calling findClosestNodes finished. If any of them does so by + // reporting a success, the first case below will be selected. Only if all failed to contact any nodes will + // the node search fail. + go func() { + wg.Wait() + close(done) + }() + select { + case result := <-resultChannel: + return result, nil + case <-done: + return toSearch, errors.New("node search unsuccessful") + } +} + +/* +findClosestNodes tries to find the closest nodes to the specified key in the network. If no further improvements +are made, it reports back its result to the results channel. If the context is ended, it stops prematurely without +reporting a result. This is used when another concurrent search is successful first. It is provided an array of nodes +that it will use as first nodes to contact. This allows starting several threads with different node arrays, so that +not all threads choose the same node as start point. +*/ +func (d *DHT) findClosestNodes(ctx context.Context, results chan<- []P2PNode, key [32]byte, nodes []P2PNode, amount uint16) error { + currentNode := nodes[0] + var closestSuccessfulNodeID [32]byte //Start of as far away as possible so everything is an improvement + for i, b := range key { + closestSuccessfulNodeID[i] = ^b + } + + index := 0 + for { + select { + case <-ctx.Done(): + return nil + default: + } + + result, err := d.FindNodesAtNode(key, currentNode, amount) + if err != nil || len(result) == 0 { + index++ + if index >= len(nodes) { + return errors.New("ran out of nodes") + } else { + currentNode = nodes[index] + continue + } + } + result = append(result, nodes...) + result = SortNodesByDistanceToKey(key, result) + result = RemoveDuplicates(result) + + if !IsCloserToKey(key, result[index].NodeID, closestSuccessfulNodeID) { + results <- result[index:] + return nil + } else { + nodes = result + currentNode = result[0] + index = 0 + continue + } + } +} + +/* +FindNodesAtNode connects to the specified node and issues a FINDNODE request to it for the key in question. It returns +the nodes sent back, if successful. numberOfNodes specifies, how many nodes should be tried to return. +*/ +func (d *DHT) FindNodesAtNode(key [32]byte, node P2PNode, numberOfNodes uint16) ([]P2PNode, error) { + connection, err := ConnectToNode(node) + if err != nil { + return nil, err + } + defer connection.Close() + packet := P2PFindNodePacket{ + SourceIP: d.SelfNode.IPAddress, + SourcePort: uint16(d.SelfNode.Port), + Key: key, + Certificate: d.SelfNode.Certificate, + Amount: numberOfNodes, + } + _, err = connection.Write(SerialiseFindNodePacket(&packet)) + if err != nil { + return nil, err + } + + responseHeader := make([]byte, 4) + + deadline := time.Now().Add(time.Second * 1) + err = connection.SetDeadline(deadline) + if err != nil { + return nil, err + } + + _, err = io.ReadFull(connection, responseHeader) + if err != nil { + return nil, err + } + + size := binary.BigEndian.Uint16(responseHeader) + messageType := binary.BigEndian.Uint16(responseHeader[2:]) + if messageType != FOUNDNODE { + return nil, errors.New("received invalid response") + } + + responseBody := make([]byte, size-4) + _, err = io.ReadFull(connection, responseBody) + if err != nil { + return nil, err + } + + foundNodes, err := ParseFoundNodesPacket(append(responseHeader, responseBody...)) + if err != nil { + return nil, err + } + + return foundNodes.Nodes, nil +} + +/* +Housekeeping needs to be passed the time that passed since the last time it has been called. It then expires any entries +in the in memory storage of the DHT whose ttl ran out and republishes entries due for republishing. +*/ +func (d *DHT) Housekeeping(elapsedTime time.Duration) { + d.Storage.UpdateAndExpireEntries(elapsedTime) + toBeRepublished := d.Storage.GetEntriesForRepublishing(d.config.RepublishInterval) + d.RepublishEntries(toBeRepublished) +} + +/* +RepublishEntries checks for all entries in the map provided, if they have not been republished for a long enough time +by now and republishes the ones due. +*/ +func (d *DHT) RepublishEntries(entries map[KeyType]ValueType) { + for key, entry := range entries { + if time.Since(entry.timeAdded) > d.config.RepublishInterval { + // Do in parallel + go d.Store(key, entry.Value, entry.TTL, entry.RepCount) + } + } +} diff --git a/pkg/packet_test.go b/pkg/packet_test.go new file mode 100644 index 0000000..1728788 --- /dev/null +++ b/pkg/packet_test.go @@ -0,0 +1,62 @@ +package kademlia + +import ( + "encoding/binary" + "strings" + "testing" +) + +func TestSuccessPacketSerialisation(t *testing.T) { + + key := []byte(strings.Repeat("k", 32)) + value := []byte("value") + size := uint16(4 + len(key) + len(value)) + + result := SerialiseSuccessPacket(DHTSuccessPacket{ + Key: [32]byte(key), + Value: value, + }) + if len(result) != int(size) { + t.Errorf("gExpected size %d, got %d", size, len(result)) + } + messageSize := binary.BigEndian.Uint16(result[:2]) + if messageSize != size { + t.Errorf("Expected size value in message to be %d, got %d", size, messageSize) + } + messageType := binary.BigEndian.Uint16(result[2:4]) + if messageType != SUCCESS { + t.Errorf("Expected message type to be %d, got %d", SUCCESS, messageType) + } + messageKey := result[4:36] + if string(messageKey) != string(key) { + t.Errorf("Expected key %s, got %s", key, messageKey) + } + messageValue := result[36:] + if string(messageValue) != string(value) { + t.Errorf("Expected value %s, got %s", value, messageValue) + } +} + +func TestFailurePacketSerialisation(t *testing.T) { + + key := []byte(strings.Repeat("k", 32)) + + size := uint16(4 + len(key)) + + result := SerialiseFailurePacket(DHTFailurePacket{Key: [32]byte(key)}) + if len(result) != int(size) { + t.Errorf("Expected size %d, got %d", size, len(result)) + } + messageSize := binary.BigEndian.Uint16(result[:2]) + if messageSize != size { + t.Errorf("Expected size value in message to be %d, got %d", size, messageSize) + } + messageType := binary.BigEndian.Uint16(result[2:4]) + if messageType != FAILURE { + t.Errorf("Expected message type to be %d, got %d", FAILURE, messageType) + } + messageKey := result[4:] + if string(messageKey) != string(key) { + t.Errorf("Expected key %s, got %s", key, messageKey) + } +} diff --git a/pkg/packets.go b/pkg/packets.go new file mode 100644 index 0000000..a2689cd --- /dev/null +++ b/pkg/packets.go @@ -0,0 +1,445 @@ +package kademlia + +import ( + "crypto/sha256" + "encoding/binary" + "errors" + "log" + "net" +) + +const ( + PUT = 650 + GET = 651 + SUCCESS = 652 + FAILURE = 653 + STORE = 660 + PING = 661 + FINDNODE = 662 + FINDVALUE = 663 + PONG = 671 + FOUNDNODE = 672 + FOUNDVALUE = 673 + NOTFOUNDVALUE = 674 +) + +type DHTPutPacket struct { + TTL uint16 + Replication byte + Key [32]byte + Value []byte +} + +type DHTGetPacket struct { + Key [32]byte +} + +type DHTSuccessPacket struct { + Key [32]byte + Value []byte +} + +type DHTFailurePacket struct { + Key [32]byte +} + +func SerialiseSuccessPacket(packet DHTSuccessPacket) []byte { + //Placeholder for size + buf := make([]byte, 2) + // Message type + buf = binary.BigEndian.AppendUint16(buf, uint16(SUCCESS)) + // Key + buf = append(buf, packet.Key[:]...) + // Value + buf = append(buf, packet.Value...) + // Size + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} + +func ParseSucceessPacket(buf []byte) (*DHTSuccessPacket, error) { + if len(buf) < 36 { + return nil, errors.New("missing fields") + } + + return &DHTSuccessPacket{ + Key: [32]byte(buf[4:36]), + Value: buf[36:], + }, nil +} + +func SerialiseFailurePacket(packet DHTFailurePacket) []byte { + //Placeholder for size + buf := make([]byte, 2) + // Message type + buf = binary.BigEndian.AppendUint16(buf, uint16(FAILURE)) + // Key + buf = append(buf, packet.Key[:]...) + // Size + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} + +func ParsePutPacket(buf []byte) (*DHTPutPacket, error) { + if len(buf) <= 40 { + return nil, errors.New("missing fields") + } + return &DHTPutPacket{ + TTL: binary.BigEndian.Uint16(buf[4:6]), + Replication: buf[6], + Key: [32]byte(buf[8:40]), + Value: buf[40:], + }, nil +} + +func SerialisePutPacket(packet *DHTPutPacket) []byte { + //Placeholder for size + buf := make([]byte, 2) + // Message type + buf = binary.BigEndian.AppendUint16(buf, uint16(PUT)) + // Time to live + buf = binary.BigEndian.AppendUint16(buf, packet.TTL) + // Replication and reserved + buf = append(buf, packet.Replication, 0) + // Key + buf = append(buf, packet.Key[:]...) + // Value + buf = append(buf, packet.Value...) + // Size + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} + +func ParseGetPacket(buf []byte) (*DHTGetPacket, error) { + if len(buf) < 36 { + return nil, errors.New("missing fields") + } + + return &DHTGetPacket{ + Key: [32]byte(buf[4:36]), + }, nil +} + +func SerialiseGetPacket(packet *DHTGetPacket) []byte { + //Placeholder for size + buf := make([]byte, 2) + // Message type + buf = binary.BigEndian.AppendUint16(buf, uint16(GET)) + // Key + buf = append(buf, packet.Key[:]...) + // Size + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} + +type P2PStorePacket struct { + SourceIP net.IP + SourcePort uint16 + TTL uint16 + RepCount uint16 + Certificate []byte + PoW [32]byte + Key [32]byte + Value []byte +} + +type P2PPingPacket struct { + SourceIP net.IP + SourcePort uint16 + Certificate []byte +} + +type P2PFindNodePacket struct { + SourceIP net.IP + SourcePort uint16 + Certificate []byte + Key [32]byte + Amount uint16 +} + +type P2PFindValuePacket struct { + SourceIP net.IP + SourcePort uint16 + Certificate []byte + Key [32]byte +} + +type P2PFoundNodePacket struct { + Nodes []P2PNode +} + +type P2PFoundValuePacket struct { + Value []byte +} + +type P2PPongPacket struct { +} + +type P2PNotFoundPValuePacket struct { +} + +func ProofWorkForStore(key [32]byte, value []byte) [32]byte { + payload := append(key[:], value...) + var hash [32]byte + log.Printf("Starting pow") + for { + res := sha256.Sum256(append(hash[:], payload...)) + if getBitsAt(res, 0, ProofOfWorkBits) == 0 { + break + } + hash = res + } + log.Printf("Ending pow") + return hash +} + +func SerialiseStorePacket(packet *P2PStorePacket) []byte { + //Placeholder for size + buf := make([]byte, 2) + // Message type + buf = binary.BigEndian.AppendUint16(buf, uint16(STORE)) + // IP + buf = append(buf, packet.SourceIP.To16()...) + // Port + buf = binary.BigEndian.AppendUint16(buf, packet.SourcePort) + // Certificate length + buf = binary.BigEndian.AppendUint16(buf, uint16(len(packet.Certificate))) + // Cert + buf = append(buf, packet.Certificate...) + // Rep Count + buf = binary.BigEndian.AppendUint16(buf, packet.RepCount) + // TTL + buf = binary.BigEndian.AppendUint16(buf, packet.TTL) + // ProofOfWork + buf = append(buf, packet.PoW[:]...) + // Key + buf = append(buf, packet.Key[:]...) + // Value + buf = append(buf, packet.Value...) + // Size + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} + +func ParseStorePacket(buf []byte) (*P2PStorePacket, error) { + if len(buf) <= 92 { + return nil, errors.New("packet length too short") + } + + certLen := binary.BigEndian.Uint16(buf[22:24]) + + packet := &P2PStorePacket{ + SourceIP: net.IP(buf[4:20]), + SourcePort: binary.BigEndian.Uint16(buf[20:22]), + Certificate: buf[24 : 24+certLen], + TTL: binary.BigEndian.Uint16(buf[24+certLen : 26+certLen]), + RepCount: binary.BigEndian.Uint16(buf[26+certLen : 28+certLen]), + PoW: [32]byte(buf[28+certLen : 60+certLen]), + Key: [32]byte(buf[60+certLen : 92+certLen]), + Value: buf[92+certLen:], + } + hash := sha256.Sum256(buf[28+certLen:]) + if getBitsAt(hash, 0, ProofOfWorkBits) != 0 { + return nil, errors.New("invalid proof of work") + } + return packet, nil +} + +func SerialisePingPacket(packet *P2PPingPacket) []byte { + //Placeholder for size + buf := make([]byte, 2) + // Message type + buf = binary.BigEndian.AppendUint16(buf, uint16(PING)) + // IP + buf = append(buf, packet.SourceIP.To16()...) + // Port + buf = binary.BigEndian.AppendUint16(buf, uint16(packet.SourcePort)) + // Certificate length + buf = binary.BigEndian.AppendUint16(buf, uint16(len(packet.Certificate))) + // Cert + buf = append(buf, packet.Certificate...) + // Size + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} + +func ParsePingPacket(buf []byte) (*P2PPingPacket, error) { + if len(buf) < 22 { + return nil, errors.New("packet length too short") + } + + certLen := binary.BigEndian.Uint16(buf[22:24]) + + packet := &P2PPingPacket{ + SourceIP: net.IP(buf[4:20]), + SourcePort: binary.BigEndian.Uint16(buf[20:22]), + Certificate: buf[24 : 24+certLen], + } + return packet, nil +} + +func SerialiseFindNodePacket(packet *P2PFindNodePacket) []byte { + //Placeholder for size + buf := make([]byte, 2) + // Message type + buf = binary.BigEndian.AppendUint16(buf, uint16(FINDNODE)) + // IP + buf = append(buf, packet.SourceIP.To16()...) + // Port + buf = binary.BigEndian.AppendUint16(buf, packet.SourcePort) + // Certificate length + buf = binary.BigEndian.AppendUint16(buf, uint16(len(packet.Certificate))) + // Cert + buf = append(buf, packet.Certificate...) + // Key + buf = append(buf, packet.Key[:]...) + // Amount + buf = binary.BigEndian.AppendUint16(buf, packet.Amount) + // Size + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} + +func ParseFindNodePacket(buf []byte) (*P2PFindNodePacket, error) { + if len(buf) < 56 { + return nil, errors.New("packet length too short") + } + + certLen := binary.BigEndian.Uint16(buf[22:24]) + + packet := &P2PFindNodePacket{ + SourceIP: net.IP(buf[4:20]), + SourcePort: binary.BigEndian.Uint16(buf[20:22]), + Certificate: buf[24 : 24+certLen], + Key: [32]byte(buf[24+certLen : 56+certLen]), + Amount: binary.BigEndian.Uint16(buf[56+certLen : 58+certLen]), + } + + return packet, nil +} + +func SerialiseFindValuePacket(packet *P2PFindValuePacket) []byte { + //Placeholder for size + buf := make([]byte, 2) + // Message type + buf = binary.BigEndian.AppendUint16(buf, uint16(FINDVALUE)) + // IP + buf = append(buf, packet.SourceIP.To16()...) + // Port + buf = binary.BigEndian.AppendUint16(buf, packet.SourcePort) + // Certificate length + buf = binary.BigEndian.AppendUint16(buf, uint16(len(packet.Certificate))) + // Cert + buf = append(buf, packet.Certificate...) + // Key + buf = append(buf, packet.Key[:]...) + // Size + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} + +func ParseFindValuePacket(buf []byte) (*P2PFindValuePacket, error) { + if len(buf) < 56 { + return nil, errors.New("packet length too short") + } + + certLen := binary.BigEndian.Uint16(buf[22:24]) + + packet := &P2PFindValuePacket{ + SourceIP: net.IP(buf[4:20]), + SourcePort: binary.BigEndian.Uint16(buf[20:22]), + Certificate: buf[24 : 24+certLen], + Key: [32]byte(buf[24+certLen : 56+certLen]), + } + + return packet, nil +} + +func generateNodesList(nodes []P2PNode) []byte { + nodeList := make([]byte, 0) + + for _, node := range nodes { + nodeList = append(nodeList, net.IP.To16(node.IPAddress)...) + nodeList = binary.BigEndian.AppendUint16(nodeList, uint16(node.Port)) + nodeList = binary.BigEndian.AppendUint16(nodeList, uint16(len(node.Certificate))) + nodeList = append(nodeList, node.Certificate...) + } + return nodeList +} + +func SerialiseFoundNodePacket(packet *P2PFoundNodePacket) []byte { + buf := make([]byte, 2) + // Type + buf = binary.BigEndian.AppendUint16(buf, FOUNDNODE) + // reserved + buf = binary.BigEndian.AppendUint16(buf, 0) + // number of nodes + buf = binary.BigEndian.AppendUint16(buf, uint16(len(packet.Nodes))) + // list of nodes + buf = append(buf, generateNodesList(packet.Nodes)...) + // Size + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} + +func ParseFoundNodesPacket(packetToParse []byte) (*P2PFoundNodePacket, error) { + if len(packetToParse) < 8 { + return nil, errors.New("Invalid FoundNodes header") + } + numberOfNodes := binary.BigEndian.Uint16(packetToParse[6:8]) + + nodes := make([]P2PNode, numberOfNodes) + + currentIndex := 8 + for i := 0; i < int(numberOfNodes); i++ { + if len(packetToParse) < currentIndex+20 { + return nil, errors.New("received invalid packet, fields not present") + } + ip := packetToParse[currentIndex : currentIndex+16] + port := uint64(binary.BigEndian.Uint16(packetToParse[currentIndex+16 : currentIndex+18])) + certLen := int(binary.BigEndian.Uint16(packetToParse[currentIndex+18 : currentIndex+20])) + currentIndex += 20 + if len(packetToParse) < currentIndex+certLen { + return nil, errors.New("received invalid packet, certificate too short") + } + cert := packetToParse[currentIndex : currentIndex+certLen] + currentIndex += certLen + nodes[i] = P2PNode{ + NodeID: GetPeerId(ip, uint16(port)), + IPAddress: ip, + Port: port, + Certificate: cert, + } + } + return &P2PFoundNodePacket{Nodes: nodes}, nil +} + +func SerialiseFoundValuePacket(packet *P2PFoundValuePacket) []byte { + //Placeholder for size + buf := make([]byte, 2) + // Message type + buf = binary.BigEndian.AppendUint16(buf, uint16(FOUNDVALUE)) + // Value + buf = append(buf, packet.Value...) + // Size + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} + +func ParseFoundValuePacket(buf []byte) *P2PFoundValuePacket { + return &P2PFoundValuePacket{Value: buf[4:]} +} + +func SerialiseNotFoundValuePacket(packet *P2PNotFoundPValuePacket) []byte { + buf := make([]byte, 2) + buf = binary.BigEndian.AppendUint16(buf, NOTFOUNDVALUE) + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} + +func SerialisePongPacket(packet *P2PPongPacket) []byte { + buf := make([]byte, 2) + buf = binary.BigEndian.AppendUint16(buf, PONG) + binary.BigEndian.PutUint16(buf, uint16(len(buf))) + return buf +} diff --git a/pkg/routing_table.go b/pkg/routing_table.go new file mode 100644 index 0000000..29aad4d --- /dev/null +++ b/pkg/routing_table.go @@ -0,0 +1,124 @@ +package kademlia + +// TODO: Tests +const ( + threshold = 10 + bucketSize = 20 +) + +// RoutingTableNode interface implemented by both InnerNode and Leaf +type RoutingTableNode interface { + isNode() + findClosest(id [32]byte, numberOfNodes int) []P2PNode + insert(node P2PNode) RoutingTableNode +} + +type Leaf struct { + bucket []P2PNode + hostClientNode P2PNode + depth uint64 + maxDepth uint64 + prefixLen uint64 +} + +// IsNode ensures Leaf and InnerNode implement RoutingTableNode +func (i *InnerNode) isNode() {} +func (l *Leaf) isNode() {} + +type RoutingTable struct { + root RoutingTableNode + hostClientNode P2PNode + matchedPrefixLength uint64 +} + +func NewRoutingTable(clientNode P2PNode, matchedPrefixLength uint64) *RoutingTable { + return &RoutingTable{ + root: &Leaf{bucket: make([]P2PNode, 0), depth: 0, maxDepth: 256, prefixLen: matchedPrefixLength, hostClientNode: clientNode}, + hostClientNode: clientNode, + matchedPrefixLength: matchedPrefixLength, + } + +} + +type InnerNode struct { + branches []RoutingTableNode + hostClientNode P2PNode + depth uint64 + prefixLength uint64 +} + +// FindClosest Return a list of nodes in the routing table closest to the provided ID +func (r *RoutingTable) FindClosest(targetID [32]byte, numberOfNodes int) *[]P2PNode { + ret := r.root.findClosest(targetID, numberOfNodes) + return &ret +} + +func (i *InnerNode) findClosest(id [32]byte, numberOfNodes int) []P2PNode { + distance := uint64(0) + result := make([]P2PNode, 0) + + // go through the buckets in increasing distance from the key until sufficiently many nodes were found + for distance < 1< threshold && getBitsAt(node.NodeID, 0, l.depth) == getBitsAt(l.hostClientNode.NodeID, 0, l.depth) { + branches := make([]RoutingTableNode, 1<= timeLimit { + toBeRepublished[key] = entry + m.storageMap.Delete(key) + } + return true + })*/ + + for key, entry := range m.storageMap { + if time.Now().Sub(entry.timeAdded) >= timeLimit { + toBeRepublished[key] = entry + delete(m.storageMap, key) + } + } + return toBeRepublished +} diff --git a/pkg/tls_helper.go b/pkg/tls_helper.go new file mode 100644 index 0000000..c4f66d6 --- /dev/null +++ b/pkg/tls_helper.go @@ -0,0 +1,126 @@ +package kademlia + +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Generate a self-signed X.509 certificate for a TLS server. Outputs to +// 'cert.pem' and 'key.pem' and will overwrite existing files. + +import ( + "crypto/ecdsa" + "crypto/ed25519" + "crypto/elliptic" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "flag" + "log" + "math/big" + "net" + "os" + "strings" + "time" +) + +func publicKey(priv any) any { + switch k := priv.(type) { + case *rsa.PrivateKey: + return &k.PublicKey + case *ecdsa.PrivateKey: + return &k.PublicKey + case ed25519.PrivateKey: + return k.Public().(ed25519.PublicKey) + default: + return nil + } +} + +func GenerateCert(host, certOutFile, keyOutFile string) { + flag.Parse() + + if len(host) == 0 { + log.Fatalf("Missing required --host parameter") + } + + var priv any + var err error + priv, err = ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + log.Fatalf("Failed to generate private key: %v", err) + } + + // ECDSA, ED25519 and RSA subject keys should have the DigitalSignature + // KeyUsage bits set in the x509.Certificate template + keyUsage := x509.KeyUsageDigitalSignature + + var notBefore time.Time + notBefore = time.Now() + + notAfter := notBefore.Add(365 * 24 * time.Hour) + + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) + if err != nil { + log.Fatalf("Failed to generate serial number: %v", err) + } + + template := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + Organization: []string{"DHT inc"}, + }, + NotBefore: notBefore, + NotAfter: notAfter, + + KeyUsage: keyUsage, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + } + + hosts := strings.Split(host, ",") + for _, h := range hosts { + if ip := net.ParseIP(h); ip != nil { + template.IPAddresses = append(template.IPAddresses, ip) + } else { + template.DNSNames = append(template.DNSNames, h) + } + } + + template.IsCA = true + template.KeyUsage |= x509.KeyUsageCertSign + + derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, publicKey(priv), priv) + if err != nil { + log.Fatalf("Failed to create certificate: %v", err) + } + + certOut, err := os.Create(certOutFile) + if err != nil { + log.Fatalf("Failed to open %s for writing: %v", certOutFile, err) + } + if err := pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil { + log.Fatalf("Failed to write data to %s: %v", certOutFile, err) + } + if err := certOut.Close(); err != nil { + log.Fatalf("Error closing %s: %v", certOutFile, err) + } + + keyOut, err := os.OpenFile(keyOutFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + log.Fatalf("Failed to open %s for writing: %v", keyOutFile, err) + } + privBytes, err := x509.MarshalPKCS8PrivateKey(priv) + if err != nil { + log.Fatalf("Unable to marshal private key: %v", err) + } + if err := pem.Encode(keyOut, &pem.Block{Type: "PRIVATE KEY", Bytes: privBytes}); err != nil { + log.Fatalf("Failed to write data to %s: %v", keyOutFile, err) + } + if err := keyOut.Close(); err != nil { + log.Fatalf("Error closing %s: %v", keyOutFile, err) + } + +} diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..f87333c --- /dev/null +++ b/readme.md @@ -0,0 +1,27 @@ +This is an implementation of a distributed hash table (DHT) based on the Kademlia routing protocol. It is the final project submission for the lecture "Peer-to-peer Systems and Security" offered at the Technical University of Munich. More elaborate documentation is found in the final report handed in. + +This is my first time using GO, expect some yet unpolished code and style inconsistencies. + +This is a copy of the repository used for the submission, as the universities with all my projects gitLab is not publicly accessible. + +Further points to be implemented: +- activate hashing keys before storing them (currently removed for debugging) +- add more checks whether nodes are still alive or not other than routing table buckets filling + + +# Running +To run an instance of the DHT, in needs to be called with `-c /${i}/g;s//${ip}/g" "$template_file" > "$file_name" +done + + diff --git a/shared_data/template_config.ini b/shared_data/template_config.ini new file mode 100644 index 0000000..6cf4ee6 --- /dev/null +++ b/shared_data/template_config.ini @@ -0,0 +1,11 @@ +hostkey = /hostkey.pem + +[dht] +p2p_address = 10.0.0.:6001 +api_address = 10.0.0.:7001 +bootstrapper = 10.0.0.10:6001 +prefix_length = 5 +republish_interval = 3600 +key_file = "/shared/key.pem" +cert_file = "/shared/cert.pem" +bootstrapper_cert = "/shared/cert_boot.pem" diff --git a/testDisconnect.sh b/testDisconnect.sh new file mode 100644 index 0000000..b6d016a --- /dev/null +++ b/testDisconnect.sh @@ -0,0 +1,14 @@ +docker-compose -f composeLarge.yaml up boot_container -d +docker-compose -f composeLarge.yaml up -d + +./commandlineDHT/commandlineDHT -m put -ip 127.0.0.1:7003 -key NodeThree -value "Going" +./commandlineDHT/commandlineDHT -m put -ip 127.0.0.1:7002 -key NodeTwo -value "Offline" + +docker-compose -f composeLarge.yaml down container3 container2 + +./commandlineDHT/commandlineDHT -m get -ip 10.0.0.130:7001 -key NodeThree +./commandlineDHT/commandlineDHT -m get -ip 10.0.0.106:7001 -key NodeTwo + + +docker-compose -f composeLarge.yaml down + diff --git a/testExpire.sh b/testExpire.sh new file mode 100644 index 0000000..bea4415 --- /dev/null +++ b/testExpire.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +docker-compose -f composeRep.yaml up boot_container -d + + +echo "==============================================================================" + +echo "Storing 2 values: Long ttl and short ttl" +./commandlineDHT/commandlineDHT -m put -ip 10.0.0.10:7001 -key Short -value Short -ttl 5 +./commandlineDHT/commandlineDHT -m put -ip 10.0.0.10:7001 -key Long -value Long -ttl 5000 + +echo "Looking up the vaules" +echo "Short ttl:" +./commandlineDHT/commandlineDHT -m get -ip 10.0.0.10:7001 -key Short +echo "Long ttl:" +./commandlineDHT/commandlineDHT -m get -ip 10.0.0.10:7001 -key Long + +echo "Waiting until refres timer passes" + +sleep 20 + +echo "Triggering update" +./commandlineDHT/commandlineDHT -m find -ip 10.0.0.10:6001 -cert ./shared_data/cert_boot.pem -key test + +echo "Trying to find the values again" +echo "Short ttl:" +./commandlineDHT/commandlineDHT -m get -ip 10.0.0.10:7001 -key Short +echo "Long ttl:" +./commandlineDHT/commandlineDHT -m get -ip 10.0.0.10:7001 -key Long + +echo "==============================================================================" + +docker-compose -f composeRep.yaml down diff --git a/testLarge.sh b/testLarge.sh new file mode 100644 index 0000000..5ff0a1b --- /dev/null +++ b/testLarge.sh @@ -0,0 +1,21 @@ +docker-compose -f composeLarge.yaml up boot_container -d +docker-compose -f composeLarge.yaml up -d + +./commandlineDHT/commandlineDHT -m put -ip 127.0.0.1:7001 -key Hello -value "Big World" +./commandlineDHT/commandlineDHT -m put -ip 127.0.0.1:7001 -key Small -value "Small World" -rep 5 + +for i in {1..40} +do + ip=$((100+i)) + echo "Node ${i}:" + echo "On node:" + ./commandlineDHT/commandlineDHT -m find -ip 10.0.0.${ip}:6001 -cert ./shared_data/cert${i}.pem -key Hello + ./commandlineDHT/commandlineDHT -m find -ip 10.0.0.${ip}:6001 -cert ./shared_data/cert${i}.pem -key Small + echo "In network:" + ./commandlineDHT/commandlineDHT -m get -ip 10.0.0.${ip}:7001 -key Hello + ./commandlineDHT/commandlineDHT -m get -ip 10.0.0.${ip}:7001 -key Small + +done + +docker-compose -f composeLarge.yaml down +