Skip to content

tux.cogs.services.reminders

Classes:

Name Description
ScheduledReminder

A scheduled reminder entry for the priority queue.

ReminderService

Classes

ScheduledReminder

Bases: NamedTuple

A scheduled reminder entry for the priority queue.

ReminderService(bot: Tux)

Bases: Cog

Methods:

Name Description
cog_unload

Clean up resources when the cog is unloaded.

reminder_processor

Process reminders from the priority queue.

before_reminder_processor

Wait until the bot is ready.

on_reminder_processor_error

Handles errors in the reminder processor loop.

schedule_reminder

Add a reminder to the priority queue.

Source code in tux/cogs/services/reminders.py
Python
def __init__(self, bot: Tux) -> None:
    self.bot = bot
    self.db = DatabaseController()
    self._initialized = False
    # Use a heap as a priority queue for reminders
    self._reminder_queue: list[ScheduledReminder] = []
    self._queue_lock = asyncio.Lock()
    # Store task references to prevent garbage collection
    self._reminder_tasks: set[asyncio.Task[Any]] = set()
    self.reminder_processor.start()

Functions

_create_reminder_task(coro: Coroutine[Any, Any, Any], name: str) -> None

Create a task and store its reference to prevent garbage collection.

Source code in tux/cogs/services/reminders.py
Python
def _create_reminder_task(self, coro: Coroutine[Any, Any, Any], name: str) -> None:
    """Create a task and store its reference to prevent garbage collection."""
    task = asyncio.create_task(coro, name=name)
    self._reminder_tasks.add(task)
    task.add_done_callback(self._reminder_tasks.discard)
cog_unload() -> None async

Clean up resources when the cog is unloaded.

Source code in tux/cogs/services/reminders.py
Python
async def cog_unload(self) -> None:
    """Clean up resources when the cog is unloaded."""
    self.reminder_processor.cancel()
    # Cancel any pending reminder tasks
    for task in self._reminder_tasks.copy():
        task.cancel()
    await asyncio.gather(*self._reminder_tasks, return_exceptions=True)
    self._reminder_tasks.clear()
reminder_processor() -> None async

Process reminders from the priority queue.

Source code in tux/cogs/services/reminders.py
Python
@tasks.loop(seconds=5, name="reminder_processor")
async def reminder_processor(self) -> None:
    """Process reminders from the priority queue."""
    current_time = datetime.datetime.now(datetime.UTC).timestamp()

    async with self._queue_lock:
        # Process all reminders that are due
        while self._reminder_queue and self._reminder_queue[0].timestamp <= current_time:
            scheduled_reminder = heapq.heappop(self._reminder_queue)
            # Schedule the reminder sending as a separate task to avoid blocking the loop
            self._create_reminder_task(
                self.send_reminder(scheduled_reminder.reminder),
                f"send_reminder_{scheduled_reminder.reminder.reminder_id}",
            )
before_reminder_processor() -> None async

Wait until the bot is ready.

Source code in tux/cogs/services/reminders.py
Python
@reminder_processor.before_loop
async def before_reminder_processor(self) -> None:
    """Wait until the bot is ready."""
    await self.bot.wait_until_ready()
on_reminder_processor_error(error: BaseException) -> None async

Handles errors in the reminder processor loop.

Source code in tux/cogs/services/reminders.py
Python
@reminder_processor.error
async def on_reminder_processor_error(self, error: BaseException) -> None:
    """Handles errors in the reminder processor loop."""
    logger.error(f"Error in reminder processor loop: {error}")
    if isinstance(error, Exception):
        self.bot.sentry_manager.capture_exception(error)
    else:
        raise error
schedule_reminder(reminder: Reminder) -> None async

Add a reminder to the priority queue.

Source code in tux/cogs/services/reminders.py
Python
async def schedule_reminder(self, reminder: Reminder) -> None:
    """Add a reminder to the priority queue."""
    scheduled = ScheduledReminder(timestamp=reminder.reminder_expires_at.timestamp(), reminder=reminder)

    async with self._queue_lock:
        heapq.heappush(self._reminder_queue, scheduled)