added node search for video

This commit is contained in:
partisan 2024-08-09 10:39:08 +02:00
parent 4724e84059
commit 1443dec8e2
3 changed files with 94 additions and 36 deletions

View file

@ -103,6 +103,7 @@ func handleSearchVideoMessage(msg Message) {
Safe string `json:"safe"` Safe string `json:"safe"`
Lang string `json:"lang"` Lang string `json:"lang"`
Page int `json:"page"` Page int `json:"page"`
ResponseAddr string `json:"responseAddr"`
} }
err := json.Unmarshal([]byte(msg.Content), &searchParams) err := json.Unmarshal([]byte(msg.Content), &searchParams)
if err != nil { if err != nil {
@ -110,6 +111,8 @@ func handleSearchVideoMessage(msg Message) {
return return
} }
log.Printf("Received search-video request. ResponseAddr: %s", searchParams.ResponseAddr)
results := fetchVideoResults(searchParams.Query, searchParams.Safe, searchParams.Lang, searchParams.Page) results := fetchVideoResults(searchParams.Query, searchParams.Safe, searchParams.Lang, searchParams.Page)
resultsJSON, err := json.Marshal(results) resultsJSON, err := json.Marshal(results)
if err != nil { if err != nil {
@ -123,9 +126,16 @@ func handleSearchVideoMessage(msg Message) {
Content: string(resultsJSON), Content: string(resultsJSON),
} }
err = sendMessage(msg.ID, responseMsg) log.Printf("Sending video search results to %s", searchParams.ResponseAddr)
if searchParams.ResponseAddr == "" {
log.Printf("Error: Response address is empty")
return
}
err = sendMessage(searchParams.ResponseAddr, responseMsg)
if err != nil { if err != nil {
log.Printf("Error sending video search results to %s: %v", msg.ID, err) log.Printf("Error sending video search results to %s: %v", searchParams.ResponseAddr, err)
} }
} }

View file

@ -157,10 +157,10 @@ func interpretMessage(msg Message) {
handleTextResultsMessage(msg) // need to implement handleTextResultsMessage(msg) // need to implement
case "image-results": case "image-results":
handleImageResultsMessage(msg) // need to implement handleImageResultsMessage(msg) // need to implement
// case "video-results": case "video-results":
// handleVideoResultsMessage(msg) // need to implement handleVideoResultsMessage(msg) // need to implement
// case "file-results": case "file-results":
// handleFileResultsMessage(msg) // need to implement handleFileResultsMessage(msg) // need to implement
default: default:
fmt.Println("Received unknown message type:", msg.Type) fmt.Println("Received unknown message type:", msg.Type)
} }

100
video.go
View file

@ -30,6 +30,7 @@ var (
} }
disabledInstances = make(map[string]bool) disabledInstances = make(map[string]bool)
mu sync.Mutex mu sync.Mutex
videoResultsChan = make(chan []VideoResult) // Channel to receive video results from other nodes
) )
// VideoAPIResponse matches the structure of the JSON response from the Piped API // VideoAPIResponse matches the structure of the JSON response from the Piped API
@ -151,32 +152,10 @@ func makeHTMLRequest(query, safe, lang string, page int) (*VideoAPIResponse, err
func handleVideoSearch(w http.ResponseWriter, query, safe, lang string, page int) { func handleVideoSearch(w http.ResponseWriter, query, safe, lang string, page int) {
start := time.Now() start := time.Now()
apiResp, err := makeHTMLRequest(query, safe, lang, page) results := fetchVideoResults(query, safe, lang, page)
if err != nil { if len(results) == 0 {
log.Printf("Error fetching video results: %v", err) log.Printf("No results from primary search, trying other nodes")
http.Error(w, "Internal Server Error", http.StatusInternalServerError) results = tryOtherNodesForVideoSearch(query, safe, lang, page)
return
}
var results []VideoResult
for _, item := range apiResp.Items {
if item.Type == "channel" || item.Type == "playlist" {
continue
}
if item.UploadedDate == "" {
item.UploadedDate = "Now"
}
results = append(results, VideoResult{
Href: fmt.Sprintf("https://youtube.com%s", item.URL),
Title: item.Title,
Date: item.UploadedDate,
Views: formatViews(item.Views),
Creator: item.UploaderName,
Publisher: "Piped",
Image: fmt.Sprintf("/img_proxy?url=%s", url.QueryEscape(item.Thumbnail)),
Duration: formatDuration(item.Duration),
})
} }
elapsed := time.Since(start) elapsed := time.Since(start)
@ -230,3 +209,72 @@ func fetchVideoResults(query, safe, lang string, page int) []VideoResult {
} }
return results return results
} }
func tryOtherNodesForVideoSearch(query, safe, lang string, page int) []VideoResult {
for _, nodeAddr := range peers {
results, err := sendVideoSearchRequestToNode(nodeAddr, query, safe, lang, page)
if err != nil {
log.Printf("Error contacting node %s: %v", nodeAddr, err)
continue
}
if len(results) > 0 {
return results
}
}
return nil
}
func sendVideoSearchRequestToNode(nodeAddr, query, safe, lang string, page int) ([]VideoResult, error) {
searchParams := struct {
Query string `json:"query"`
Safe string `json:"safe"`
Lang string `json:"lang"`
Page int `json:"page"`
ResponseAddr string `json:"responseAddr"`
}{
Query: query,
Safe: safe,
Lang: lang,
Page: page,
ResponseAddr: fmt.Sprintf("http://localhost:%d/node", config.Port),
}
msgBytes, err := json.Marshal(searchParams)
if err != nil {
return nil, fmt.Errorf("failed to marshal search parameters: %v", err)
}
msg := Message{
ID: hostID,
Type: "search-video",
Content: string(msgBytes),
}
err = sendMessage(nodeAddr, msg)
if err != nil {
return nil, fmt.Errorf("failed to send search request to node %s: %v", nodeAddr, err)
}
// Wait for results
select {
case res := <-videoResultsChan:
return res, nil
case <-time.After(20 * time.Second):
return nil, fmt.Errorf("timeout waiting for results from node %s", nodeAddr)
}
}
func handleVideoResultsMessage(msg Message) {
var results []VideoResult
err := json.Unmarshal([]byte(msg.Content), &results)
if err != nil {
log.Printf("Error unmarshalling video results: %v", err)
return
}
log.Printf("Received video results: %+v", results)
// Send results to videoResultsChan
go func() {
videoResultsChan <- results
}()
}