forked from enne2/qbit-agent
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
215 lines
9.4 KiB
215 lines
9.4 KiB
import os |
|
from langchain.tools.base import BaseTool |
|
from langchain.callbacks.manager import CallbackManagerForToolRun |
|
import requests |
|
from typing import Optional |
|
from langchain_community.tools import DuckDuckGoSearchRun |
|
|
|
class QbitDownloadListTool(BaseTool): |
|
name: str = "get_downloads_list" |
|
description: str = '''Useful for getting a list of current downloads from the qBittorrent API and |
|
information about them. The response will include the name, size, and status of each download. |
|
''' |
|
|
|
def _run(self, query: str = "", run_manager: Optional[CallbackManagerForToolRun] = None) -> str: |
|
"""Get the list of downloads from qBittorrent API.""" |
|
try: |
|
# Configuration for qBittorrent API |
|
QBIT_HOST = os.environ.get("QBIT_HOST", "http://localhost:8080") |
|
QBIT_USERNAME = os.environ.get("QBIT_USERNAME", "admin") |
|
QBIT_PASSWORD = os.environ.get("QBIT_PASSWORD", "adminadmin") |
|
|
|
# First login to get the auth cookie |
|
login_url = f"{QBIT_HOST}/api/v2/auth/login" |
|
login_data = {"username": QBIT_USERNAME, "password": QBIT_PASSWORD} |
|
|
|
session = requests.Session() |
|
login_response = session.post(login_url, data=login_data) |
|
|
|
if login_response.status_code != 200: |
|
return f"Failed to login to qBittorrent API: {login_response.text}" |
|
|
|
# Get the list of torrents |
|
torrents_url = f"{QBIT_HOST}/api/v2/torrents/info" |
|
response = session.get(torrents_url) |
|
|
|
if response.status_code != 200: |
|
return f"Failed to fetch downloads: {response.text}" |
|
|
|
torrents = response.json() |
|
|
|
# Format the response |
|
if not torrents: |
|
return "No active downloads found." |
|
|
|
result = "Current downloads:\n" |
|
for i, torrent in enumerate(torrents, 1): |
|
result += f"{i}:" |
|
for key, value in torrent.items(): |
|
if key in ["name", "progress", "size", "state", "eta"]: |
|
result += f" {key}: {value}," |
|
result += "\n" |
|
result += "\n" |
|
result += "Total downloads: " + str(len(torrents)) |
|
|
|
return result |
|
|
|
except Exception as e: |
|
return f"Error getting downloads list: {str(e)}" |
|
|
|
class QbitSearchTool(BaseTool): |
|
name: str = "qbittorrent_search" |
|
description: str = '''Useful for searching torrents using qBittorrent's search functionality. |
|
Input should be a search query for content the user wants to find. |
|
The tool will return a list of matching torrents ordered by the number of seeders (highest first). |
|
''' |
|
|
|
def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None) -> str: |
|
"""Search for torrents using qBittorrent's search functionality.""" |
|
try: |
|
# Configuration for qBittorrent API |
|
QBIT_HOST = os.environ.get("QBIT_HOST", "http://localhost:8080") |
|
QBIT_USERNAME = os.environ.get("QBIT_USERNAME", "admin") |
|
QBIT_PASSWORD = os.environ.get("QBIT_PASSWORD", "adminadmin") |
|
|
|
# First login to get the auth cookie |
|
login_url = f"{QBIT_HOST}/api/v2/auth/login" |
|
login_data = {"username": QBIT_USERNAME, "password": QBIT_PASSWORD} |
|
|
|
session = requests.Session() |
|
login_response = session.post(login_url, data=login_data) |
|
|
|
if login_response.status_code != 200: |
|
return f"Failed to login to qBittorrent API: {login_response.text}" |
|
|
|
# Start a search |
|
start_search_url = f"{QBIT_HOST}/api/v2/search/start" |
|
search_data = {"pattern": query, "plugins": "all", "category": "all", "limit": 5, "sort": "seeders", "order": "desc"} |
|
|
|
search_response = session.post(start_search_url, data=search_data) |
|
|
|
if search_response.status_code != 200: |
|
return f"Failed to start search: {search_response.text}" |
|
|
|
search_id = search_response.json().get("id") |
|
|
|
if not search_id: |
|
return "Failed to get search ID" |
|
|
|
# Wait for results (simple implementation, can be improved) |
|
import time |
|
max_wait = 5 # seconds |
|
wait_time = 0 |
|
step = 1 |
|
|
|
while wait_time < max_wait: |
|
time.sleep(step) |
|
wait_time += step |
|
|
|
# Get search status |
|
status_url = f"{QBIT_HOST}/api/v2/search/status" |
|
status_params = {"id": search_id} |
|
status_response = session.get(status_url, params=status_params) |
|
|
|
if status_response.status_code != 200: |
|
return f"Failed to get search status: {status_response.text}" |
|
|
|
status_data = status_response.json() |
|
if status_data[0].get("status") == "Stopped": |
|
break |
|
|
|
# Get search results |
|
results_url = f"{QBIT_HOST}/api/v2/search/results" |
|
results_params = {"id": search_id, "limit": 5} # Increased limit to find more seeders |
|
|
|
results_response = session.get(results_url, params=results_params) |
|
|
|
if results_response.status_code != 200: |
|
return f"Failed to get search results: {results_response.text}" |
|
|
|
results_data = results_response.json() |
|
results = results_data.get("results", []) |
|
|
|
# Stop the search |
|
stop_url = f"{QBIT_HOST}/api/v2/search/stop" |
|
stop_params = {"id": search_id} |
|
session.post(stop_url, params=stop_params) |
|
|
|
print(results) |
|
# Limit to top 10 results after sorting |
|
results = results[:10] |
|
|
|
# Format the response |
|
if not results: |
|
return f"No results found for '{query}'." |
|
|
|
response = f"Search results for '{query}' (sorted by seeders):\n\n" |
|
|
|
for i, result in enumerate(results, 1): |
|
name = result.get("fileName", "Unknown") |
|
size = result.get("fileSize", "Unknown") |
|
seeds = result.get("nbSeeders", 0) |
|
leech = result.get("nbLeechers", 0) |
|
magnet = result.get("fileUrl", "") |
|
|
|
# Convert size to human-readable format |
|
if isinstance(size, (int, float)): |
|
units = ["B", "KB", "MB", "GB", "TB"] |
|
size_index = 0 |
|
while size >= 1024 and size_index < len(units) - 1: |
|
size /= 1024 |
|
size_index += 1 |
|
size = f"{size:.2f} {units[size_index]}" |
|
|
|
response += f"{i}. {name}\n" |
|
response += f" Size: {size}, Seeds: {seeds}, Leechers: {leech}\n" |
|
if magnet: |
|
response += f" Magnet: {magnet}\n" |
|
|
|
return response |
|
|
|
except Exception as e: |
|
return f"Error searching torrents: {str(e)}" |
|
|
|
class QbitDownloadTorrentTool(BaseTool): |
|
name: str = "download_torrent" |
|
description: str = '''Useful for starting a new torrent download in qBittorrent. |
|
Input should be a magnet link or a torrent URL that the user wants to download. |
|
The tool will add the torrent to qBittorrent's download queue and return status. |
|
''' |
|
|
|
def _run(self, torrent_url: str, run_manager: Optional[CallbackManagerForToolRun] = None) -> str: |
|
"""Start downloading a torrent by adding it to qBittorrent.""" |
|
try: |
|
# Check if the input is a valid torrent URL or magnet link |
|
if not (torrent_url.startswith("http") or torrent_url.startswith("magnet:")): |
|
return "Error: Please provide a valid torrent URL or magnet link." |
|
|
|
# Configuration for qBittorrent API |
|
QBIT_HOST = os.environ.get("QBIT_HOST", "http://localhost:8080") |
|
QBIT_USERNAME = os.environ.get("QBIT_USERNAME", "admin") |
|
QBIT_PASSWORD = os.environ.get("QBIT_PASSWORD", "adminadmin") |
|
|
|
# First login to get the auth cookie |
|
login_url = f"{QBIT_HOST}/api/v2/auth/login" |
|
login_data = {"username": QBIT_USERNAME, "password": QBIT_PASSWORD} |
|
|
|
session = requests.Session() |
|
login_response = session.post(login_url, data=login_data) |
|
|
|
if login_response.status_code != 200: |
|
return f"Failed to login to qBittorrent API: {login_response.text}" |
|
|
|
# Add torrent to download queue |
|
add_url = f"{QBIT_HOST}/api/v2/torrents/add" |
|
add_data = {"urls": torrent_url} |
|
|
|
add_response = session.post(add_url, data=add_data) |
|
|
|
if add_response.status_code != 200: |
|
return f"Failed to add torrent: {add_response.text}" |
|
|
|
return f"Torrent has been added to download queue successfully. Check downloads list for status." |
|
|
|
except Exception as e: |
|
return f"Error adding torrent: {str(e)}"
|
|
|