|
|
|
|
|
from smolagents import tool |
|
|
import requests |
|
|
from markdownify import markdownify as md |
|
|
from bs4 import BeautifulSoup |
|
|
from common.mylogger import save_file_with_timestamp, mylog |
|
|
|
|
|
@tool |
|
|
def fetch_webpage(url: str, convert_to_markdown: bool = True) -> str: |
|
|
""" |
|
|
Fetches the HTML content of a given URL. |
|
|
if markdown conversion is enabled, it will remove script and style and return the text content as markdown else return raw unfiltered HTML |
|
|
Args: |
|
|
url (str): The URL to fetch. |
|
|
convert_to_markdown (bool): If True, convert the HTML content to Markdown format. else return the raw HTML. |
|
|
Returns: |
|
|
str: The HTML content of the URL. |
|
|
""" |
|
|
content = None |
|
|
response = requests.get(url, timeout=30) |
|
|
if (convert_to_markdown): |
|
|
soup = BeautifulSoup(response.text, "html.parser") |
|
|
|
|
|
for script in soup(["script", "style"]): |
|
|
script.extract() |
|
|
|
|
|
|
|
|
if "wikipedia.org" in url: |
|
|
main_content = soup.find("main",{"id":"content"}) |
|
|
if main_content: |
|
|
content = md(str(main_content),strip=['script', 'style'], heading_style="ATX").strip() |
|
|
else: |
|
|
content = md(response.text,strip=['script', 'style'], heading_style="ATX").strip() |
|
|
else: |
|
|
content = response.text |
|
|
|
|
|
save_file_with_timestamp(content, "webpage", ".md" if convert_to_markdown else ".html") |
|
|
|
|
|
return content |
|
|
|
|
|
@tool |
|
|
|
|
|
def search_web(query: str, num_results: int = 5) -> list: |
|
|
""" |
|
|
Perform a web search using local SearXNG instance. |
|
|
Args: |
|
|
query (str): The search query. |
|
|
num_results (int): The number of results to return. |
|
|
Returns: |
|
|
list: A list of search results sorted by score with {url, title, content, score} for each result. |
|
|
""" |
|
|
|
|
|
searxng_url = "http://localhost:8888/search" |
|
|
params = {"q": query, "format": 'json'} |
|
|
response = requests.get(searxng_url, params=params) |
|
|
if response.status_code == 200: |
|
|
ret = response.json() |
|
|
|
|
|
results = ret.get("results", []) |
|
|
|
|
|
results = results[:num_results] |
|
|
|
|
|
results = [ |
|
|
{ |
|
|
"url": result.get("url"), |
|
|
"title": result.get("title"), |
|
|
"content": result.get("content"), |
|
|
"score": result.get("score"), |
|
|
} |
|
|
for result in results |
|
|
] |
|
|
|
|
|
return results |
|
|
|
|
|
else: |
|
|
print(f"Error: {response.status_code}") |
|
|
return [] |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
try: |
|
|
|
|
|
query = "What is the capital of France?" |
|
|
results = search_web(query,3) |
|
|
print(results) |
|
|
except Exception as e: |
|
|
print(f"An error occurred: {e}") |
|
|
|
|
|
try: |
|
|
|
|
|
video_id = "L1vXCYZAYYM" |
|
|
video_url = "https://www.youtube.com/watch?v=" + video_id |
|
|
url = "https://en.wikipedia.org/wiki/Malko_Competition" |
|
|
|
|
|
page_content = fetch_webpage(url, convert_to_markdown=True) |
|
|
print(page_content.encode("utf-8")) |
|
|
except Exception as e: |
|
|
print(f"An error occurred: {e}") |
|
|
|
|
|
|
|
|
|