Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import sqlite3 | |
| import re | |
| from datetime import datetime | |
| import hashlib | |
| import json | |
| import math | |
| import gradio as gr | |
| import torah # Assuming you have your torah module | |
| from utils import date_to_words, custom_normalize | |
| from gematria import calculate_gematria, strip_diacritics | |
| from urllib.parse import quote_plus | |
| from gradio_calendar import Calendar | |
| # --- Setup Logging --- | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # --- Constants and Global Variables --- | |
| DATABASE_FILE = 'gematria.db' | |
| ELS_CACHE_DB = 'els_cache.db' | |
| DATABASE_TIMEOUT = 60 | |
| ADJUSTMENT_CONSTANT = 137.035999177 # Use the more precise constant | |
| # --- Helper Functions --- | |
| def get_query_hash(func, args, kwargs): | |
| key = (func.__name__, args, kwargs) | |
| logger.debug(f"Generating query hash for key: {key}") | |
| hash_value = hashlib.sha256(json.dumps(key).encode()).hexdigest() | |
| logger.debug(f"Generated hash: {hash_value}") | |
| return hash_value | |
| def create_els_cache_table(): | |
| if not os.path.exists(ELS_CACHE_DB): | |
| logger.debug(f"Creating ELS cache table in {ELS_CACHE_DB}") | |
| with sqlite3.connect(ELS_CACHE_DB) as conn: | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS els_cache ( | |
| query_hash TEXT PRIMARY KEY, | |
| function_name TEXT, | |
| args TEXT, | |
| kwargs TEXT, | |
| results TEXT | |
| ) | |
| ''') | |
| logger.debug("ELS cache table created.") | |
| else: | |
| logger.debug(f"ELS cache table already exists in {ELS_CACHE_DB}") | |
| def cached_process_json_files(func, *args, **kwargs): | |
| logger.debug(f"Entering cached_process_json_files with func: {func}, args: {args}, kwargs: {kwargs}") | |
| params = { | |
| "function": f"{func.__module__}.{func.__name__}" | |
| } | |
| arg_names = func.__code__.co_varnames[:func.__code__.co_argcount] | |
| for name, value in zip(arg_names, args): | |
| params[name] = value | |
| for name, value in kwargs.items(): | |
| params[name] = value | |
| params_json = json.dumps(params) | |
| logger.debug(f"Parameters as JSON: {params_json}") | |
| query_hash = get_query_hash(func, params_json, "") | |
| create_els_cache_table() | |
| try: | |
| with sqlite3.connect(ELS_CACHE_DB, timeout=DATABASE_TIMEOUT) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "SELECT results FROM els_cache WHERE query_hash = ?", (query_hash,)) | |
| result = cursor.fetchone() | |
| if result: | |
| logger.info(f"Cache hit for query: {query_hash}") | |
| logger.debug(f"Cached result: {result[0]}") | |
| return json.loads(result[0]) | |
| except sqlite3.Error as e: | |
| logger.error(f"Database error checking cache: {e}") | |
| logger.info(f"Cache miss for query: {query_hash}") | |
| results = func(*args, **kwargs) | |
| logger.debug(f"Results from function call: {results}") | |
| try: | |
| with sqlite3.connect(ELS_CACHE_DB, timeout=DATABASE_TIMEOUT) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "INSERT INTO els_cache (query_hash, function_name, args, kwargs, results) VALUES (?, ?, ?, ?, ?)", | |
| (query_hash, params["function"], params_json, json.dumps({}), json.dumps(results))) | |
| conn.commit() | |
| logger.debug("Cached results in database.") | |
| except sqlite3.Error as e: | |
| logger.error(f"Database error caching results: {e}") | |
| logger.debug(f"Exiting cached_process_json_files, returning: {results}") | |
| return results | |
| def calculate_gematria_sum(text, date_words): | |
| logger.debug(f"Entering calculate_gematria_sum with text: '{text}', date_words: '{date_words}'") | |
| if text or date_words: | |
| combined_input = f"{text} {date_words}" | |
| logger.info(f"Calculating Gematria sum for input: {combined_input}") | |
| numbers = re.findall(r'\d+', combined_input) | |
| logger.debug(f"Extracted numbers: {numbers}") | |
| text_without_numbers = re.sub(r'\d+', '', combined_input) | |
| logger.debug(f"Text without numbers: {text_without_numbers}") | |
| number_sum = sum(int(number) for number in numbers) | |
| logger.debug(f"Sum of numbers: {number_sum}") | |
| text_gematria = calculate_gematria(strip_diacritics(text_without_numbers)) | |
| logger.debug(f"Gematria of text: {text_gematria}") | |
| total_sum = text_gematria + number_sum | |
| logger.debug(f"Calculated Gematria sum: {total_sum}") | |
| logger.debug(f"Exiting calculate_gematria_sum, returning: {total_sum}") | |
| return total_sum | |
| else: | |
| logger.debug("No input text or date words provided. Returning None.") | |
| return None | |
| def get_first_els_result_genesis(gematria_sum, tlang="en"): | |
| """Gets the first ELS result from Genesis (book 1) using cached processing.""" | |
| logger.debug(f"Entering get_first_els_result_genesis with gematria_sum: {gematria_sum}, tlang: {tlang}") | |
| results = cached_process_json_files( | |
| torah.process_json_files, | |
| 1, 1, # Only Genesis (book 1 to 1) | |
| gematria_sum, # Use gematria sum as step | |
| "1,-1", # Default rounds_combination | |
| 0, # length | |
| tlang, | |
| True, # strip_spaces | |
| True, # strip_in_braces | |
| True # strip_diacritics_chk | |
| ) | |
| if results: | |
| logger.debug(f"First ELS result from Genesis: {results[0]}") | |
| logger.debug(f"Exiting get_first_els_result_genesis, returning: {results[0]}") | |
| return results[0] | |
| else: | |
| logger.debug("No ELS results found in Genesis.") | |
| logger.debug(f"Exiting get_first_els_result_genesis, returning: None") | |
| return None | |
| def find_shortest_psalm_match(gematria_sum): | |
| """Finds the shortest Psalm entry in gematria.db.""" | |
| logger.debug(f"Entering find_shortest_psalm_match with gematria_sum: {gematria_sum}") | |
| with sqlite3.connect(DATABASE_FILE) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT words, chapter, verse | |
| FROM results | |
| WHERE gematria_sum = ? AND book = 'Psalms' | |
| ORDER BY LENGTH(words) ASC | |
| LIMIT 1 | |
| ''', (gematria_sum,)) | |
| result = cursor.fetchone() | |
| if result: | |
| logger.debug(f"Shortest Psalm match found: {result}") | |
| logger.debug(f"Exiting find_shortest_psalm_match, returning: {result}") | |
| return {"words": result[0], "chapter": result[1], "verse": result[2], "phrase_length": None} | |
| logger.debug("No matching Psalm found.") | |
| logger.debug(f"Exiting find_shortest_psalm_match, returning: None") | |
| return None | |
| def create_biblegateway_iframe(book_name, chapter, verse): | |
| """Creates an iframe HTML string for Bible Gateway.""" | |
| logger.debug(f"Entering create_biblegateway_iframe with book_name: {book_name}, chapter: {chapter}, verse: {verse}") | |
| encoded_book_name = quote_plus(book_name) | |
| url = f"https://www.biblegateway.com/passage/?search={encoded_book_name}+{chapter}&version=CJB" | |
| iframe = f'<iframe src="{url}" width="800" height="600"></iframe>' | |
| logger.debug(f"Generated iframe: {iframe}") | |
| logger.debug(f"Exiting create_biblegateway_iframe, returning: {iframe}") | |
| return iframe | |
| # --- Gradio Interface --- | |
| with gr.Blocks() as app: | |
| gr.Markdown("# Daily Psalm Finder") | |
| with gr.Row(): | |
| name_input = gr.Textbox(label="Name (optional)") | |
| date_input = Calendar(label="Date", type="date") | |
| with gr.Row(): | |
| adjusted_sum_check = gr.Checkbox(label="Journal Sum + (Journal Sum / 137)", value=False) | |
| with gr.Row(): | |
| run_button = gr.Button("Find Psalm") | |
| with gr.Row(): | |
| iframe_output = gr.HTML(label="Psalm Display") | |
| def main_function(name, date, adjusted_sum_check): | |
| logger.debug(f"Entering main_function with name: '{name}', date: '{date}', adjusted_sum: {adjusted_sum_check}") | |
| if isinstance(date, str): | |
| date = datetime.strptime(date, "%Y-%m-%d") | |
| logger.debug(f"Converted date string to datetime: {date}") | |
| date_str = date.strftime("%Y-%m-%d") | |
| logger.debug(f"Formatted date string: {date_str}") | |
| date_words = date_to_words(date_str) | |
| logger.debug(f"Date words: {date_words}") | |
| initial_gematria_sum = calculate_gematria_sum(name, date_words) | |
| logger.debug(f"Initial Gematria Sum: {initial_gematria_sum}") | |
| if initial_gematria_sum is None: | |
| logger.debug("Initial Gematria sum is None. Returning error message.") | |
| return "Could not calculate initial Gematria sum." | |
| # Adjust gematria sum if checkbox is checked | |
| if adjusted_sum_check: | |
| adjusted_sum = initial_gematria_sum + (initial_gematria_sum / ADJUSTMENT_CONSTANT) | |
| initial_gematria_sum = math.ceil(adjusted_sum) # Use the adjusted sum | |
| logger.debug(f"Adjusted Gematria Sum (using formula): {initial_gematria_sum}") | |
| els_result = get_first_els_result_genesis(initial_gematria_sum, tlang='en') # Use adjusted sum here, and set target language to en | |
| if not els_result: | |
| logger.debug("No ELS result. Returning error message.") | |
| return "No ELS result found in Genesis." | |
| els_gematria_sum = calculate_gematria(els_result['result_text']) | |
| logger.debug(f"ELS Gematria sum: {els_gematria_sum}") | |
| psalm_match = find_shortest_psalm_match(els_gematria_sum) | |
| logger.debug(f"Psalm Match result: {psalm_match}") | |
| if not psalm_match: | |
| logger.debug("No Psalm match. Returning error message.") | |
| return "No matching Psalm found for the ELS result." | |
| logger.debug("Creating Bible Gateway iframe...") | |
| iframe_html = create_biblegateway_iframe("Psalms", psalm_match["chapter"], psalm_match["verse"]) | |
| logger.debug(f"Returning iframe HTML: {iframe_html}") | |
| return iframe_html | |
| run_button.click( | |
| main_function, | |
| inputs=[name_input, date_input, adjusted_sum_check], | |
| outputs=[iframe_output] | |
| ) | |
| if __name__ == "__main__": | |
| app.launch(share=False) |