Spaces:
Runtime error
Runtime error
| from agentpro.tools import Tool | |
| from typing import Any, List, Tuple, Optional | |
| import datetime | |
| import dateutil.parser | |
| import re | |
| class ModifyEventTool(Tool): | |
| """ | |
| Tool to change the date, time, and/or duration of an existing Google Calendar event. | |
| Accepts natural‐language commands such as: | |
| • “Shift the first meeting on Saturday to Monday” | |
| • “Shift the first meeting on Saturday to 5 AM” | |
| • “Reschedule all my meetings from Saturday to Sunday.” | |
| • “Reschedule the second meeting from Monday to Wednesday at 10 AM.” | |
| • “Shift the first meeting on 7th June to 9th June.” | |
| • “Change my first meeting on June 10, 2025, to June 12, 2025, at 2 PM.” | |
| • “Reschedule all of my tomorrow meetings to Sunday.” | |
| • “Move my ‘Team Sync’ meeting today from 3 PM to 4 PM.” | |
| • “Change ‘Project Discussion’ meeting tomorrow to 2:30 PM.” | |
| • “Reschedule my third appointment tomorrow to Friday at 11 AM.” | |
| """ | |
| # ── 1) Class attributes ───────────────────────────────────────────────────────────────── | |
| name: str = "Modify Event" | |
| description: str = ( | |
| "Change the date, time, and/or duration of an existing Google Calendar event. " | |
| "Supports natural‐language like 'Shift the first meeting on Saturday to 5 AM.'" | |
| ) | |
| action_type: str = "modify_event" | |
| input_format: str = ( | |
| "Natural‐language command describing which event(s) to modify and how.\n" | |
| "Examples:\n" | |
| " • “Shift the first meeting on Saturday to Monday.”\n" | |
| " • “Shift the first meeting on Saturday to 5 AM.”\n" | |
| " • “Shift the first meeting on 7th June to 9th June.”\n" | |
| " • “Reschedule all of my tomorrow meetings to Sunday.”\n" | |
| " • “Move my ‘Team Sync’ meeting today from 3 PM to 4 PM.”\n" | |
| " • “Change ‘Project Discussion’ meeting tomorrow to 2:30 PM.”\n" | |
| " • “Reschedule the second meeting from Monday to Wednesday at 10 AM.”\n" | |
| " • “Change my first meeting on June 10, 2025, to June 12, 2025, at 2 PM.”\n" | |
| " • “Reschedule my third appointment tomorrow to Friday at 11 AM.”\n" | |
| ) | |
| # ── 2) We expect a Google Calendar “service” to be passed in at instantiation ─────────── | |
| service: Any | |
| # ── 3) Main entry point ───────────────────────────────────────────────────────────────── | |
| def run(self, input_text: Any) -> str: | |
| """ | |
| Parse a natural‐language modification command, identify target event(s), and update them. | |
| Returns a human‐readable confirmation or an error message if something goes wrong. | |
| """ | |
| text = str(input_text).strip() | |
| # 1) Split into “source” (which event(s) to modify) vs “target” (new date/time) | |
| source_part, target_part = self._split_source_target(text) | |
| if source_part is None or target_part is None: | |
| return ( | |
| "Sorry, I couldn't identify which part of your command specifies the modification. " | |
| "Please use a format like “Shift the first meeting on Saturday to 5 AM.”" | |
| ) | |
| # 2) From source_part, extract ordinal (“first”, “second”, “last”, “all”), title (if any), | |
| # and source_date_spec (weekday/explicit date/today/tomorrow). | |
| ordinal = self._extract_ordinal(source_part) | |
| title = self._extract_title(source_part) | |
| source_date_spec = self._extract_date_spec(source_part) | |
| if source_date_spec is None: | |
| return ( | |
| "Sorry, I couldn't determine which day or date you meant. " | |
| "Please specify a weekday (e.g., 'Monday'), 'today', 'tomorrow', or an explicit date (e.g., 'June 10, 2025')." | |
| ) | |
| # 3) Resolve source_date_spec → a concrete `datetime.date` | |
| source_date = self._resolve_date(source_date_spec) | |
| if source_date is None: | |
| return f"Sorry, I couldn’t parse the date '{source_date_spec}'." | |
| # 4) Fetch all non‐all‐day events on that source_date | |
| events = self._fetch_events_on_date(source_date) | |
| if not events: | |
| return f"You have no non‐all‐day events on {source_date.strftime('%B %d, %Y')}." | |
| # 5) Select which events to modify based on ordinal/title | |
| target_events = self._select_target_events(events, ordinal, title) | |
| if isinstance(target_events, str): | |
| # an error message string | |
| return target_events | |
| if not target_events: | |
| return f"No events found matching that specification on {source_date.strftime('%B %d, %Y')}." | |
| # 6) Parse the “target” spec to determine new_date_spec and new_time_spec | |
| new_date_spec, new_time_spec = self._parse_target_part(target_part) | |
| # 7) Resolve new_date_spec → `datetime.date` (if given). If omitted, keep original date. | |
| new_date: Optional[datetime.date] = None | |
| if new_date_spec: | |
| new_date = self._resolve_date(new_date_spec) | |
| if new_date is None: | |
| return f"Sorry, I couldn’t parse the target date '{new_date_spec}'." | |
| # 8) Resolve new_time_spec → either (new_start_time, new_end_time) or (new_start_time, None). | |
| new_start_time: Optional[datetime.time] = None | |
| new_end_time: Optional[datetime.time] = None | |
| if new_time_spec: | |
| parsed = self._resolve_time_spec(new_time_spec) | |
| if parsed is None: | |
| return ( | |
| f"Sorry, I couldn’t parse the target time '{new_time_spec}'. " | |
| f"Please specify like 'at 2 PM', 'to 4 PM', or 'from 3 PM to 5 PM'." | |
| ) | |
| new_start_time, new_end_time = parsed | |
| # 9) For each selected event, compute new start/end datetimes (preserving duration logic) | |
| updates: List[Tuple[str, datetime.datetime, datetime.datetime, datetime.datetime, datetime.datetime]] = [] | |
| # Each tuple: (event_summary, old_start_dt, old_end_dt, new_start_dt, new_end_dt) | |
| for ev in target_events: | |
| old_start_dt = datetime.datetime.fromisoformat( | |
| ev["start"]["dateTime"].replace("Z", "+00:00") | |
| ) | |
| old_end_dt = datetime.datetime.fromisoformat( | |
| ev["end"]["dateTime"].replace("Z", "+00:00") | |
| ) | |
| original_duration = old_end_dt - old_start_dt | |
| # Determine which date to apply: either new_date or old_start_dt.date() | |
| apply_date = new_date if new_date else old_start_dt.date() | |
| # Keep the same tzinfo as the original event | |
| tzinfo = old_start_dt.tzinfo | |
| if new_start_time: | |
| # Case A: New time is provided (could be start-only or start+end) | |
| new_start_dt = datetime.datetime.combine(apply_date, new_start_time, tzinfo=tzinfo) | |
| if new_end_time: | |
| new_end_dt = datetime.datetime.combine(apply_date, new_end_time, tzinfo=tzinfo) | |
| else: | |
| # Single new_start: preserve original duration | |
| new_end_dt = new_start_dt + original_duration | |
| else: | |
| # Case B: No new time provided → keep original start/end times but shift date if needed | |
| original_start_time = old_start_dt.time() | |
| original_end_time = old_end_dt.time() | |
| new_start_dt = datetime.datetime.combine(apply_date, original_start_time, tzinfo=tzinfo) | |
| new_end_dt = datetime.datetime.combine(apply_date, original_end_time, tzinfo=tzinfo) | |
| # 10) Update the event in Google Calendar | |
| _ = self._update_event( | |
| event_id=ev["id"], | |
| new_start_iso=new_start_dt.isoformat(), | |
| new_end_iso=new_end_dt.isoformat(), | |
| ) | |
| updates.append((ev.get("summary", "(no title)"), old_start_dt, old_end_dt, new_start_dt, new_end_dt)) | |
| # 11) Return a confirmation message | |
| return self._format_confirmation(updates) | |
| # ──────────────────────────────────────────────────────────────────────────────────────────── | |
| def _split_source_target(self, text: str) -> Tuple[Optional[str], Optional[str]]: | |
| """ | |
| Naively split the input_text into source_part (which events) vs target_part (new date/time). | |
| We look for the first occurrence of ' to ' that is not part of a 'from X to Y' time range. | |
| """ | |
| lowered = text.lower() | |
| # If there's a "from X to Y" time‐range, skip that " to " and find the next " to " | |
| time_range_match = re.search( | |
| r"\bfrom\s+\d{1,2}(:\d{2})?\s*(am|pm)?\s+to\s+\d{1,2}(:\d{2})?\s*(am|pm)?", | |
| lowered | |
| ) | |
| if time_range_match: | |
| span_start, span_end = time_range_match.span() | |
| next_to = lowered.find(" to ", span_end) | |
| if next_to != -1: | |
| before = text[:next_to] | |
| after = text[next_to + len(" to "):] | |
| return before.strip(), after.strip() | |
| # Fallback: split on the first ' to ' | |
| parts = re.split(r"\s+to\s+", text, maxsplit=1) | |
| if len(parts) == 2: | |
| return parts[0].strip(), parts[1].strip() | |
| return None, None | |
| def _extract_ordinal(self, text: str) -> Optional[str]: | |
| """ | |
| Extract ordinal keyword (first, second, third, ..., last, all). | |
| Returns the matched keyword (lowercased) or None if none found. | |
| """ | |
| match = re.search( | |
| r"\b(first|second|third|fourth|fifth|sixth|seventh|eighth|ninth|tenth|last|all)\b", | |
| text, | |
| re.IGNORECASE | |
| ) | |
| return match.group(1).lower() if match else None | |
| def _extract_title(self, text: str) -> Optional[str]: | |
| """ | |
| If the user specified an event title in quotes (single or double), | |
| return that substring (without quotes). Otherwise, None. | |
| """ | |
| match = re.search(r"[‘'“\"]([^‘'”\"]+)[’'”\"]", text) | |
| if match: | |
| return match.group(1).strip() | |
| return None | |
| def _extract_date_spec(self, text: str) -> Optional[str]: | |
| """ | |
| Return the substring that indicates a source date: 'today', 'tomorrow', | |
| a weekday name, or an explicit date. Otherwise, None. | |
| """ | |
| lowered = text.lower() | |
| if "today" in lowered: | |
| return "today" | |
| if "tomorrow" in lowered: | |
| return "tomorrow" | |
| # Weekday names | |
| weekdays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] | |
| for wd in weekdays: | |
| if re.search(rf"\b{wd}\b", lowered): | |
| return wd | |
| # Check if there's an explicit date phrase: must contain a month name or ISO format | |
| if re.search(r"\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered) \ | |
| or re.search(r"\b\d{4}-\d{2}-\d{2}\b", text) \ | |
| or re.search(r"\b\d{1,2}(st|nd|rd|th)?\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered): | |
| # We'll hand the full text to dateutil.parse later | |
| return text | |
| return None | |
| def _resolve_date(self, spec: str) -> Optional[datetime.date]: | |
| """ | |
| Convert specifications like 'today', 'tomorrow', 'Saturday', | |
| '7th June', 'June 10, 2025' into a datetime.date object. | |
| """ | |
| spec_lower = spec.strip().lower() | |
| today = datetime.date.today() | |
| if spec_lower == "today": | |
| return today | |
| if spec_lower == "tomorrow": | |
| return today + datetime.timedelta(days=1) | |
| # Weekday resolution: find next occurrence (this week or next) | |
| weekdays_map = { | |
| "monday": 0, | |
| "tuesday": 1, | |
| "wednesday": 2, | |
| "thursday": 3, | |
| "friday": 4, | |
| "saturday": 5, | |
| "sunday": 6, | |
| } | |
| if spec_lower in weekdays_map: | |
| target_wd = weekdays_map[spec_lower] | |
| today_wd = today.weekday() # Monday=0 ... Sunday=6 | |
| if target_wd >= today_wd: | |
| delta = target_wd - today_wd | |
| else: | |
| delta = 7 - (today_wd - target_wd) | |
| return today + datetime.timedelta(days=delta) | |
| # Try explicit date parsing (assume current year if omitted) | |
| try: | |
| parsed = dateutil.parser.parse( | |
| spec, | |
| fuzzy=True, | |
| default=datetime.datetime(today.year, 1, 1), | |
| ) | |
| return parsed.date() | |
| except (ValueError, OverflowError): | |
| return None | |
| def _fetch_events_on_date(self, date_obj: datetime.date) -> List[dict]: | |
| """ | |
| Fetch all non‐all‐day events on the provided date (UTC midnight to next midnight). | |
| Returns a list of event dicts (as returned by Google Calendar API). | |
| """ | |
| start_of_day = datetime.datetime.combine(date_obj, datetime.time.min).isoformat() + "Z" | |
| end_of_day = (datetime.datetime.combine(date_obj, datetime.time.min) | |
| + datetime.timedelta(days=1)).isoformat() + "Z" | |
| events_res = ( | |
| self.service.events() | |
| .list( | |
| calendarId="primary", | |
| timeMin=start_of_day, | |
| timeMax=end_of_day, | |
| singleEvents=True, | |
| orderBy="startTime" | |
| ) | |
| .execute() | |
| ) | |
| items = events_res.get("items", []) | |
| # Filter out all‐day events (those have 'start.date' instead of 'start.dateTime') | |
| non_all_day = [ev for ev in items if ev.get("start", {}).get("dateTime")] | |
| return non_all_day | |
| def _select_target_events( | |
| self, | |
| events: List[dict], | |
| ordinal: Optional[str], | |
| title: Optional[str] | |
| ) -> Any: | |
| """ | |
| Given a list of events on the same date, choose which to modify based on: | |
| - If title is provided: filter by case‐insensitive substring match. | |
| • If exactly one match → return [that_event]. | |
| • If multiple: | |
| - If ordinal == 'first' or 'last' → pick earliest or latest among those matches. | |
| - Else → return an error string prompting clarification. | |
| • If no match → return []. | |
| - If no title but ordinal provided: | |
| - Sort all events by start time. | |
| - 'first' → [earliest], 'second' → [second-earliest], 'last' → [latest], 'all' → all. | |
| - If the specified ordinal index is out of range → return []. | |
| - If neither title nor ordinal → return an error string asking for clarification. | |
| """ | |
| # Title-based selection | |
| if title: | |
| matches = [ | |
| ev for ev in events | |
| if title.lower() in (ev.get("summary", "") or "").lower() | |
| ] | |
| if not matches: | |
| return [] | |
| if len(matches) == 1: | |
| return matches | |
| # Multiple matches and ordinal present? | |
| if ordinal in ("first", "last"): | |
| sorted_by_time = sorted( | |
| matches, | |
| key=lambda ev: datetime.datetime.fromisoformat( | |
| ev["start"]["dateTime"].replace("Z", "+00:00") | |
| ) | |
| ) | |
| return [sorted_by_time[0]] if ordinal == "first" else [sorted_by_time[-1]] | |
| return ( | |
| f"Multiple events match '{title}'. Which one did you mean? " | |
| f"You can say 'first {title}' or 'last {title}'." | |
| ) | |
| # No title; rely on ordinal | |
| if ordinal: | |
| sorted_all = sorted( | |
| events, | |
| key=lambda ev: datetime.datetime.fromisoformat( | |
| ev["start"]["dateTime"].replace("Z", "+00:00") | |
| ) | |
| ) | |
| if ordinal == "all": | |
| return sorted_all | |
| if ordinal == "first": | |
| return [sorted_all[0]] if sorted_all else [] | |
| if ordinal == "last": | |
| return [sorted_all[-1]] if sorted_all else [] | |
| ord_map = { | |
| "second": 1, | |
| "third": 2, | |
| "fourth": 3, | |
| "fifth": 4, | |
| "sixth": 5, | |
| "seventh": 6, | |
| "eighth": 7, | |
| "ninth": 8, | |
| "tenth": 9 | |
| } | |
| if ordinal in ord_map: | |
| idx = ord_map[ordinal] | |
| return [sorted_all[idx]] if idx < len(sorted_all) else [] | |
| return [] | |
| # Neither title nor ordinal → ambiguous | |
| return ( | |
| "Please specify which event(s) to modify (e.g., 'first meeting', " | |
| "'last appointment', 'all meetings', or include the title in quotes)." | |
| ) | |
| def _parse_target_part(self, text: str) -> Tuple[Optional[str], Optional[str]]: | |
| """ | |
| Given the target_part (everything after 'to'), determine: | |
| - new_date_spec (like 'Monday', 'June 12, 2025', 'Friday') OR None if no date. | |
| - new_time_spec (like '5 AM', '2:30 PM', '3 PM to 4 PM') OR None if no time. | |
| Strategy: | |
| 1) Look explicitly for date keywords first: | |
| • 'today' or 'tomorrow' | |
| • weekday names | |
| • explicit date phrases containing a month name or ISO format (YYYY-MM-DD) | |
| 2) Only if one of those appears do we set new_date_spec. Otherwise, new_date_spec stays None. | |
| 3) Independently, look for time‐range patterns or single times ("at 5 PM", "5 AM", etc.). | |
| 4) Return (date_spec, time_spec). If neither is found, return (None, None). | |
| """ | |
| lowered = text.lower() | |
| # 1) Identify new_date_spec (only if a date keyword is present) | |
| new_date_spec: Optional[str] = None | |
| if "today" in lowered: | |
| new_date_spec = "today" | |
| elif "tomorrow" in lowered: | |
| new_date_spec = "tomorrow" | |
| else: | |
| # Weekday names | |
| weekdays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] | |
| for wd in weekdays: | |
| if re.search(rf"\b{wd}\b", lowered): | |
| new_date_spec = wd | |
| break | |
| # If still None, check for explicit date phrase: must contain a month name or ISO format | |
| if new_date_spec is None: | |
| if re.search(r"\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered) \ | |
| or re.search(r"\b\d{4}-\d{2}-\d{2}\b", text) \ | |
| or re.search(r"\b\d{1,2}(st|nd|rd|th)?\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered): | |
| new_date_spec = text | |
| # 2) Identify new_time_spec (if any) | |
| # Look for 'from X to Y' or single time patterns like 'at X', 'X AM/PM', etc. | |
| time_pattern = re.compile( | |
| r"(?P<from>from\s+)?" | |
| r"(?P<h1>\d{1,2}(:\d{2})?\s*(am|pm))" | |
| r"\s*(to\s*(?P<h2>\d{1,2}(:\d{2})?\s*(am|pm)))?", | |
| re.IGNORECASE | |
| ) | |
| time_match = time_pattern.search(text) | |
| new_time_spec: Optional[str] = time_match.group(0) if time_match else None | |
| return new_date_spec, new_time_spec | |
| def _resolve_time_spec(self, spec: str) -> Optional[Tuple[datetime.time, Optional[datetime.time]]]: | |
| """ | |
| Convert strings like '3 PM to 4 PM', 'at 2:30 PM', '2 PM', 'from 3 PM to 5 PM' | |
| into (new_start_time, new_end_time) where new_end_time may be None (meaning preserve original duration). | |
| """ | |
| spec = spec.strip().lower() | |
| # Find all time tokens in the spec | |
| times = re.findall(r"(\d{1,2}(:\d{2})?\s*(am|pm))", spec, re.IGNORECASE) | |
| parsed_times: List[datetime.time] = [] | |
| for ttuple in times: | |
| tstr = ttuple[0] | |
| try: | |
| dt = dateutil.parser.parse(tstr) | |
| parsed_times.append(dt.time()) | |
| except (ValueError, OverflowError): | |
| continue | |
| if not parsed_times: | |
| return None | |
| if len(parsed_times) == 1: | |
| # Single new time: assume new start; preserve duration later | |
| return parsed_times[0], None | |
| # Two times: first is new_start, second is new_end | |
| return parsed_times[0], parsed_times[1] | |
| def _update_event(self, event_id: str, new_start_iso: str, new_end_iso: str) -> dict: | |
| """ | |
| Call Google Calendar API to patch the event's start and end times. | |
| Returns the patched event resource. | |
| """ | |
| updated = ( | |
| self.service.events() | |
| .patch( | |
| calendarId="primary", | |
| eventId=event_id, | |
| body={ | |
| "start": {"dateTime": new_start_iso}, | |
| "end": {"dateTime": new_end_iso}, | |
| } | |
| ) | |
| .execute() | |
| ) | |
| return updated | |
| def _format_confirmation( | |
| self, | |
| updates: List[Tuple[str, datetime.datetime, datetime.datetime, datetime.datetime, datetime.datetime]] | |
| ) -> str: | |
| """ | |
| Given a list of tuples: | |
| (event_summary, old_start_dt, old_end_dt, new_start_dt, new_end_dt) | |
| produce a combined, human‐readable confirmation string. | |
| """ | |
| lines: List[str] = [] | |
| for summary, old_start, old_end, new_start, new_end in updates: | |
| old_fmt = ( | |
| f"{old_start.strftime('%B %d, %Y %I:%M %p').lstrip('0')}–" | |
| f"{old_end.strftime('%I:%M %p').lstrip('0')}" | |
| ) | |
| new_fmt = ( | |
| f"{new_start.strftime('%B %d, %Y %I:%M %p').lstrip('0')}–" | |
| f"{new_end.strftime('%I:%M %p').lstrip('0')}" | |
| ) | |
| lines.append( | |
| f"The meeting \"{summary}\" originally scheduled for {old_fmt} has been rescheduled to {new_fmt}." | |
| ) | |
| return " ".join(lines) |