From f769f70ce77082dab7de3bee2abc58b87ed7a697 Mon Sep 17 00:00:00 2001 From: partisan Date: Fri, 5 Jul 2024 03:08:35 +0200 Subject: [PATCH] wip --- init.go | 73 +++++++++++++++++++++++++++++++++------ main.go | 24 ++++++------- node-master.go | 81 +++++++++++++++++++++++++++++++++++++++++++ node-update.go | 2 +- node.go | 93 +++++++++++++++++++++++++++++++++++++++++++------- 5 files changed, 237 insertions(+), 36 deletions(-) create mode 100644 node-master.go diff --git a/init.go b/init.go index 873b077..b3df270 100644 --- a/init.go +++ b/init.go @@ -10,14 +10,17 @@ import ( "os" "strconv" "strings" + "sync" + + "github.com/fsnotify/fsnotify" ) // Configuration structure type Config struct { - Port int - ConnectionCode string - Peers []string - OpenSearch OpenSearchConfig + Port int + AuthCode string + Peers []string + OpenSearch OpenSearchConfig } type OpenSearchConfig struct { @@ -34,18 +37,31 @@ var defaultConfig = Config{ const configFilePath = "config.json" +var config Config +var configLock sync.RWMutex + func main() { - // Run the initialization process err := initConfig() if err != nil { fmt.Println("Error during initialization:", err) return } - // This is stupid loadNodeConfig() + go startFileWatcher() + go checkMasterHeartbeat() + + if config.AuthCode == "" { + config.AuthCode = generateStrongRandomString(64) + fmt.Printf("Generated connection code: %s\n", config.AuthCode) + saveConfig(config) + } + + if len(config.Peers) > 0 { + go startNodeClient(config.Peers) + startElection() + } - // Start the main application runServer() } @@ -55,6 +71,7 @@ func initConfig() error { } fmt.Println("Configuration file already exists.") + config = loadConfig() return nil } @@ -94,9 +111,9 @@ func createConfig() error { } } - if config.ConnectionCode == "" { - config.ConnectionCode = generateStrongRandomString(32) - fmt.Printf("Generated connection code: %s\n", config.ConnectionCode) + if config.AuthCode == "" { + config.AuthCode = generateStrongRandomString(64) + fmt.Printf("Generated connection code: %s\n", config.AuthCode) } saveConfig(config) @@ -146,3 +163,39 @@ func generateStrongRandomString(length int) string { } return base64.URLEncoding.EncodeToString(bytes)[:length] } + +func startFileWatcher() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Fatal(err) + } + + go func() { + defer watcher.Close() + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + if event.Op&fsnotify.Write == fsnotify.Write { + log.Println("Modified file:", event.Name) + configLock.Lock() + config = loadConfig() + configLock.Unlock() + // Perform your logic here to handle the changes in the config file + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + log.Println("Error:", err) + } + } + }() + + err = watcher.Add(configFilePath) + if err != nil { + log.Fatal(err) + } +} diff --git a/main.go b/main.go index f05108a..0d9ac33 100644 --- a/main.go +++ b/main.go @@ -122,18 +122,18 @@ func parsePageParameter(pageStr string) int { } func runServer() { - http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static")))) - http.HandleFunc("/", handleSearch) - http.HandleFunc("/search", handleSearch) - http.HandleFunc("/img_proxy", handleImageProxy) - http.HandleFunc("/settings", func(w http.ResponseWriter, r *http.Request) { - http.ServeFile(w, r, "templates/settings.html") - }) - http.HandleFunc("/opensearch.xml", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/opensearchdescription+xml") - http.ServeFile(w, r, "static/opensearch.xml") - }) - initializeTorrentSites() + // http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static")))) + // http.HandleFunc("/", handleSearch) + // http.HandleFunc("/search", handleSearch) + // http.HandleFunc("/img_proxy", handleImageProxy) + // http.HandleFunc("/settings", func(w http.ResponseWriter, r *http.Request) { + // http.ServeFile(w, r, "templates/settings.html") + // }) + // http.HandleFunc("/opensearch.xml", func(w http.ResponseWriter, r *http.Request) { + // w.Header().Set("Content-Type", "application/opensearchdescription+xml") + // http.ServeFile(w, r, "static/opensearch.xml") + // }) + // initializeTorrentSites() http.HandleFunc("/node", handleNodeRequest) // Handle node requests diff --git a/node-master.go b/node-master.go new file mode 100644 index 0000000..4f5155b --- /dev/null +++ b/node-master.go @@ -0,0 +1,81 @@ +package main + +import ( + "log" + "sync" + "time" +) + +var ( + isMaster bool + masterNode string + masterNodeMux sync.RWMutex +) + +const ( + heartbeatInterval = 5 * time.Second + heartbeatTimeout = 15 * time.Second + electionTimeout = 10 * time.Second +) + +func sendHeartbeats() { + for { + if !isMaster { + return + } + for _, node := range peers { + err := sendMessage(node, authCode, "heartbeat", authCode) + if err != nil { + log.Printf("Error sending heartbeat to %s: %v", node, err) + } + } + time.Sleep(heartbeatInterval) + } +} + +func checkMasterHeartbeat() { + for { + time.Sleep(heartbeatTimeout) + masterNodeMux.RLock() + if masterNode == authCode || masterNode == "" { + masterNodeMux.RUnlock() + continue + } + masterNodeMux.RUnlock() + + masterNodeMux.Lock() + masterNode = "" + masterNodeMux.Unlock() + startElection() + } +} + +func startElection() { + masterNodeMux.Lock() + defer masterNodeMux.Unlock() + + for _, node := range peers { + err := sendMessage(node, authCode, "election", authCode) + if err != nil { + log.Printf("Error sending election message to %s: %v", node, err) + } + } + + isMaster = true + go sendHeartbeats() +} + +func handleHeartbeat(content string) { + masterNodeMux.Lock() + defer masterNodeMux.Unlock() + masterNode = content +} + +func handleElection(content string) { + masterNodeMux.Lock() + defer masterNodeMux.Unlock() + + if content < authCode { + masterNode = content + } +} diff --git a/node-update.go b/node-update.go index e67a160..0ef63f6 100644 --- a/node-update.go +++ b/node-update.go @@ -11,7 +11,7 @@ func nodeUpdateSync() { fmt.Println("Syncing updates across all nodes...") for _, peer := range peers { fmt.Printf("Notifying node %s about update...\n", peer) - err := sendMessage(peer, connCode, "update", "Start update process") + err := sendMessage(peer, authCode, "update", "Start update process") if err != nil { log.Printf("Failed to notify node %s: %v\n", peer, err) continue diff --git a/node.go b/node.go index 5c04e18..97862c7 100644 --- a/node.go +++ b/node.go @@ -2,18 +2,22 @@ package main import ( "bytes" + "crypto/sha256" + "encoding/hex" "encoding/json" "fmt" "io/ioutil" + "log" "net/http" "sync" "time" ) var ( - connCode string + authCode string peers []string - connCodeMutex sync.Mutex + authMutex sync.Mutex + authenticated = make(map[string]bool) ) type Message struct { @@ -22,9 +26,16 @@ type Message struct { Content string `json:"content"` } +type CrawlerConfig struct { + ID string + Host string + Port int + AuthCode string +} + func loadNodeConfig() { config := loadConfig() - connCode = config.ConnectionCode + authCode = config.AuthCode // nuh uh peers = config.Peers } @@ -34,9 +45,6 @@ func handleNodeRequest(w http.ResponseWriter, r *http.Request) { return } - connCodeMutex.Lock() - defer connCodeMutex.Unlock() - body, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, "Error reading request body", http.StatusInternalServerError) @@ -50,8 +58,8 @@ func handleNodeRequest(w http.ResponseWriter, r *http.Request) { return } - if msg.ID != connCode { - http.Error(w, "Authentication failed", http.StatusUnauthorized) + if !isAuthenticated(msg.ID) { + http.Error(w, "Authentication required", http.StatusUnauthorized) return } @@ -59,17 +67,55 @@ func handleNodeRequest(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "Message received") } +func handleAuth(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) + return + } + + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, "Error reading request body", http.StatusInternalServerError) + return + } + defer r.Body.Close() + + var authRequest CrawlerConfig + if err := json.Unmarshal(body, &authRequest); err != nil { + http.Error(w, "Error parsing JSON", http.StatusBadRequest) + return + } + + expectedCode := GenerateRegistrationCode(authRequest.Host, authRequest.Port, authCode) + if authRequest.AuthCode != expectedCode { + http.Error(w, "Invalid auth code", http.StatusUnauthorized) + return + } + + authMutex.Lock() + authenticated[authRequest.ID] = true + authMutex.Unlock() + + fmt.Fprintln(w, "Authenticated successfully") +} + +func isAuthenticated(id string) bool { + authMutex.Lock() + defer authMutex.Unlock() + return authenticated[id] +} + func interpretMessage(msg Message) { switch msg.Type { case "test": fmt.Println("Received test message:", msg.Content) - case "get-version": - fmt.Println("Received get-version message") - // Handle get-version logic here case "update": fmt.Println("Received update message:", msg.Content) - // Handle update logic here go update() + case "heartbeat": + handleHeartbeat(msg.Content) + case "election": + handleElection(msg.Content) default: fmt.Println("Received unknown message type:", msg.Type) } @@ -111,7 +157,7 @@ func startNodeClient(addresses []string) { for _, address := range addresses { go func(addr string) { for { - err := sendMessage(addr, connCode, "test", "This is a test message") + err := sendMessage(addr, authCode, "test", "This is a test message") if err != nil { fmt.Println("Error sending test message to", addr, ":", err) continue @@ -121,3 +167,24 @@ func startNodeClient(addresses []string) { }(address) } } + +func GenerateRegistrationCode(host string, port int, authCode string) string { + data := fmt.Sprintf("%s:%d:%s", host, port, authCode) + hash := sha256.Sum256([]byte(data)) + return hex.EncodeToString(hash[:]) +} + +func ParseRegistrationCode(code string, host string, port int, authCode string) (string, int, string, error) { + data := fmt.Sprintf("%s:%d:%s", host, port, authCode) + hash := sha256.Sum256([]byte(data)) + expectedCode := hex.EncodeToString(hash[:]) + + log.Printf("Parsing registration code: %s", code) + log.Printf("Expected registration code: %s", expectedCode) + + if expectedCode != code { + return "", 0, "", fmt.Errorf("invalid registration code") + } + + return host, port, authCode, nil +}