Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import mimetypes | |
| import requests | |
| import time | |
| from yt_dlp import YoutubeDL | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.lib.styles import getSampleStyleSheet | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
| from reportlab.lib.units import inch | |
| import gradio as gr | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_openai import ChatOpenAI | |
| from openai import OpenAI, DefaultHttpxClient | |
| from langchain_chroma import Chroma | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_community.document_loaders import WebBaseLoader | |
| from langchain_core.runnables import RunnableLambda | |
| from langchain_core.runnables.passthrough import RunnableAssign | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain.output_parsers import PydanticOutputParser | |
| from langchain_core.pydantic_v1 import BaseModel, Field | |
| from typing import List | |
| from pprint import pprint | |
| def download_youtube_video(youtube_url, download_path): | |
| try: | |
| ydl_opts = { | |
| 'cookiefile': "cookies.txt", | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': os.path.join(download_path, '%(title)s.%(ext)s'), | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| } | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info_dict = ydl.extract_info(youtube_url, download=True) | |
| title = info_dict.get('title', None) | |
| filename = ydl.prepare_filename(info_dict).replace('.webm', '.mp3').replace('.m4a', '.mp3') | |
| return filename, title | |
| except Exception as e: | |
| print(f"Failed to download video from {youtube_url}: {e}") | |
| return None, None | |
| def upload_file(filepath, api_key): | |
| url = "https://api.monsterapi.ai/v1/upload" | |
| headers = { | |
| "accept": "application/json", | |
| "authorization": f"Bearer {api_key}" | |
| } | |
| file_name = os.path.basename(filepath) | |
| get_file_urls = requests.get(f"{url}?filename={file_name}", headers=headers) | |
| if get_file_urls.status_code != 200: | |
| print(f"Failed to get upload URL: {get_file_urls.status_code}") | |
| return None | |
| response_json = get_file_urls.json() | |
| upload_url = response_json['upload_url'] | |
| download_url = response_json['download_url'] | |
| data = open(filepath, 'rb').read() | |
| file_headers = { | |
| "Content-Type": mimetypes.guess_type(filepath)[0], | |
| } | |
| file_uploaded = requests.put(upload_url, data=data, headers=file_headers) | |
| if file_uploaded.status_code == 200: | |
| print(f"File successfully uploaded. Usable link is {download_url}") | |
| return download_url | |
| else: | |
| print(f"Failed to upload file: {file_uploaded.status_code}") | |
| return None | |
| def generate_process_id(download_url, api_key): | |
| whisper_url = "https://api.monsterapi.ai/v1/generate/whisper" | |
| payload = { | |
| "file": f"{download_url}", | |
| "language": "en" | |
| } | |
| headers = { | |
| "accept": "application/json", | |
| "content-type": "application/json", | |
| "authorization": f"Bearer {api_key}" | |
| } | |
| response = requests.post(whisper_url, json=payload, headers=headers) | |
| if response.status_code != 200: | |
| print(f"Failed to generate process ID: {response.status_code}") | |
| return None | |
| else: | |
| process_id = response.json().get("process_id") | |
| print(f"Process ID is: {process_id}") | |
| return process_id | |
| def query_job_status(job_id, api_key): | |
| transcript = "" | |
| url = f"https://api.monsterapi.ai/v1/status/{job_id}" | |
| headers = { | |
| "accept": "application/json", | |
| "authorization": f"Bearer {api_key}" | |
| } | |
| while True: | |
| response = requests.get(url, headers=headers) | |
| if response.status_code != 200: | |
| print(f"Failed to get status: {response.status_code}") | |
| return transcript | |
| status = response.json().get("status") | |
| if status in ["COMPLETED", "FAILED"]: | |
| print(f"Job status: {status}") | |
| if status == "COMPLETED": | |
| transcript = response.json().get("result")["text"] | |
| return transcript | |
| print(f"Job status: {status}, checking again in 5 seconds...") | |
| time.sleep(5) | |
| def create_pdf(transcripts, file_path): | |
| doc = SimpleDocTemplate(file_path, pagesize=letter) | |
| styles = getSampleStyleSheet() | |
| story = [] | |
| for i, (title, transcript) in enumerate(transcripts, start=1): | |
| story.append(Paragraph(f'YouTube Video {i} Title: {title}', styles['Title'])) | |
| story.append(Spacer(1, 12)) | |
| story.append(Paragraph(f'YouTube Video {i} Transcript:', styles['Heading2'])) | |
| story.append(Spacer(1, 12)) | |
| story.append(Paragraph(transcript.replace('\n', '<br/>'), styles['BodyText'])) | |
| story.append(Spacer(1, 24)) | |
| doc.build(story) | |
| import gradio as gr | |
| import os | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_openai import ChatOpenAI | |
| from openai import OpenAI, DefaultHttpxClient | |
| from langchain_chroma import Chroma | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_community.document_loaders import WebBaseLoader | |
| from langchain_core.runnables import RunnableLambda | |
| from langchain_core.runnables.passthrough import RunnableAssign | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain.output_parsers import PydanticOutputParser | |
| from langchain_core.pydantic_v1 import BaseModel, Field | |
| from typing import List | |
| from pprint import pprint | |
| os.environ["OPENAI_API_KEY"] = "sk-proj-3XiMKGvrD8ev35tnGZ76T3BlbkFJmUSzs9Xpq8RBVF7tMyMh" | |
| class DocumentSummaryBase(BaseModel): | |
| running_summary: str = Field("", description="Running description of the document. Do not override; only update!") | |
| main_ideas: List[str] = Field([], description="Most important information from the document (max 3)") | |
| loose_ends: List[str] = Field([], description="Open questions that would be good to incorporate into summary, but that are yet unknown (max 3)") | |
| def transcribe_and_save(youtube_urls): | |
| download_path = os.getcwd() | |
| api_key = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6Ijc4YTFjM2JmYzY4NTRlYmE0YWIxNzkwNzMwZjVlYjY4IiwiY3JlYXRlZF9hdCI6IjIwMjQtMDYtMjFUMDU6Mzc6MzkuNDU1MTM5In0.5-eKWqvK3x11CysTdfjvV36FityW-d_0N2hhht_HajA" | |
| pdf_output_path = os.getcwd()+"/transcripts.pdf" | |
| transcripts = [] | |
| for youtube_url in youtube_urls: | |
| filepath, title = download_youtube_video(youtube_url, download_path) | |
| if filepath and title: | |
| download_url = upload_file(filepath, api_key) | |
| if download_url: | |
| process_id = generate_process_id(download_url, api_key) | |
| if process_id: | |
| transcript = query_job_status(process_id, api_key) | |
| transcripts.append((title, transcript)) | |
| # Save all transcripts into a PDF file | |
| create_pdf(transcripts, "transcripts.pdf") | |
| def RExtract(pydantic_class, llm, prompt): | |
| ''' | |
| Runnable Extraction module | |
| Returns a knowledge dictionary populated by slot-filling extraction | |
| ''' | |
| parser = PydanticOutputParser(pydantic_object=pydantic_class) | |
| instruct_merge = RunnableAssign({'format_instructions' : lambda x: parser.get_format_instructions()}) | |
| def preparse(string): | |
| if '{' not in string: string = '{' + string | |
| if '}' not in string: string = string + '}' | |
| string = (string | |
| .replace("\\_", "_") | |
| .replace("\n", " ") | |
| .replace("\]", "]") | |
| .replace("\[", "[") | |
| ) | |
| # print(string) ## Good for diagnostics | |
| return string | |
| return instruct_merge | prompt | llm | preparse | parser | |
| def RSummarizer(knowledge, llm, prompt, verbose=False): | |
| ''' | |
| Exercise: Create a chain that summarizes | |
| ''' | |
| def summarize_docs(docs): | |
| parse_chain = RunnableAssign({"info_base": RExtract(knowledge.__class__, llm, prompt)}) | |
| state = {"info_base": knowledge} | |
| all_summaries = [] # List to store all intermediate summaries | |
| for i, doc in enumerate(docs): | |
| state['input'] = doc.page_content | |
| state = parse_chain.invoke(state) | |
| # Store the current info_base in the list | |
| all_summaries.append(state['info_base'].dict()) | |
| if verbose: | |
| print(f"Considered {i+1} documents") | |
| pprint(state['info_base'].dict()) | |
| return all_summaries | |
| return RunnableLambda(summarize_docs) | |
| def find_first_non_empty_summary(summaries): | |
| for summary in reversed(summaries): | |
| if summary['loose_ends'] or summary['main_ideas'] or summary['running_summary']: | |
| return summary | |
| return None | |
| def create_running_summary(url): | |
| loader = WebBaseLoader(url) | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1200,chunk_overlap=100,separators=["\n\n", "\n", ".", ";", ",", " ", ""]) | |
| documents = loader.load() | |
| docs_split = text_splitter.split_documents(documents) | |
| summary_prompt =ChatPromptTemplate.from_template("""You are generating a running summary of the document. Make it readable by a technical user. | |
| After this, the old knowledge base will be replaced by the new one. Make sure a reader can still understand everything. | |
| Keep it short, but as dense and useful as possible! The information should flow from chunk to (loose ends or main ideas) to running_summary. | |
| Strictly output a json and nothing else do not output any strings or explanations just the json is enough. | |
| The updated knowledge base keep all of the information from running_summary here: {info_base}. | |
| {format_instructions}. Follow the format precisely, including quotations and commas\n\n | |
| {info_base}\nWithout losing any of the info, update the knowledge base with the following: {input}""") | |
| instruct_model = llm_1 | StrOutputParser() | |
| summarizer = RSummarizer(DocumentSummaryBase(), instruct_model, summary_prompt, verbose=True) | |
| summaries = summarizer.invoke(docs_split) | |
| summary = find_first_non_empty_summary(summaries) | |
| return summary | |
| def setup_vectorstore(): | |
| embeddings = OpenAIEmbeddings(model="text-embedding-3-large") | |
| vector_store = Chroma(collection_name="collection-1",embedding_function=embeddings,persist_directory="./vectorstore",) | |
| loader = PyPDFLoader(os.getcwd()+"/transcripts.pdf") | |
| documents = loader.load() | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=250,chunk_overlap=0,separators=["\n\n"]) | |
| text = text_splitter.split_documents(documents) | |
| retriever = vector_store.as_retriever() | |
| retriever.add_documents(text) | |
| return retriever | |
| def generate(content,examples): | |
| chat_template = ChatPromptTemplate.from_template("""Your are provided with a few sample youtube video scripts below. | |
| your task is to create a similar script for the following content provided to you below. | |
| Follow the style followd in the examples and create a similar script for the content givent to you. | |
| Create me a script for a youtube video explaining the following content: {content}. | |
| Here are a few example scripts of my previous videos that you have to adapt: {examples}.""") | |
| gen_chain = chat_template | llm_2 | StrOutputParser() | |
| return gen_chain.invoke({"content": content, "examples": examples}) | |
| def docs2str(docs, title="Document"): | |
| out_str = "" | |
| for doc in docs: | |
| doc_name = getattr(doc, 'metadata', {}).get('Title', title) | |
| if doc_name: | |
| out_str += f"[Quote from {doc_name}] " | |
| out_str += getattr(doc, 'page_content', str(doc)) + "\n" | |
| return out_str | |
| llm_1 = ChatOpenAI( | |
| model="google/gemma-2-9b-it", | |
| temperature=0, | |
| max_tokens=None, | |
| timeout=None, | |
| max_retries=2, | |
| api_key="eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6Ijc4YTFjM2JmYzY4NTRlYmE0YWIxNzkwNzMwZjVlYjY4IiwiY3JlYXRlZF9hdCI6IjIwMjQtMDYtMjFUMDU6Mzc6MzkuNDU1MTM5In0.5-eKWqvK3x11CysTdfjvV36FityW-d_0N2hhht_HajA", | |
| base_url="https://llm.monsterapi.ai/v1/", | |
| http_client=DefaultHttpxClient(verify = False) | |
| ) | |
| llm_2 = ChatOpenAI( | |
| model="meta-llama/Meta-Llama-3.1-8B-Instruct", | |
| temperature=0, | |
| max_tokens=None, | |
| timeout=None, | |
| max_retries=2, | |
| api_key="eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6Ijc4YTFjM2JmYzY4NTRlYmE0YWIxNzkwNzMwZjVlYjY4IiwiY3JlYXRlZF9hdCI6IjIwMjQtMDYtMjFUMDU6Mzc6MzkuNDU1MTM5In0.5-eKWqvK3x11CysTdfjvV36FityW-d_0N2hhht_HajA", | |
| base_url="https://llm.monsterapi.ai/v1/", | |
| http_client=DefaultHttpxClient(verify = False) | |
| ) | |
| def process_links(style_links, context_link): | |
| # Here you can define the processing logic for the links. | |
| style_links = style_links.split(",") | |
| style_links = [link.strip() for link in style_links] | |
| transcribe_and_save(style_links) | |
| retriever = setup_vectorstore() | |
| summary = create_running_summary(context_link) | |
| summary = summary['running_summary'] | |
| print("Summarized the url successfully:", summary) | |
| examples = retriever.invoke(summary) | |
| return generate(summary,examples) | |
| # Define the Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Link Processor") | |
| style_links = gr.Textbox(lines=5, placeholder="Enter style links separated by commas", label="Style Links") | |
| context_link = gr.Textbox(lines=1, placeholder="Enter context link", label="Context Link") | |
| output = gr.Textbox(lines=2, label="Output") | |
| process_button = gr.Button("Process") | |
| process_button.click(process_links, inputs=[style_links, context_link], outputs=output) | |
| demo.launch() | |