diff --git a/api/libs/db_migration_lock.py b/api/libs/db_migration_lock.py index 1d3a81e0a2..ca8956e397 100644 --- a/api/libs/db_migration_lock.py +++ b/api/libs/db_migration_lock.py @@ -14,9 +14,15 @@ from __future__ import annotations import logging import threading -from typing import Any +from typing import TYPE_CHECKING, Any +import redis +from redis.cluster import RedisCluster from redis.exceptions import LockNotOwnedError, RedisError +from redis.lock import Lock + +if TYPE_CHECKING: + from extensions.ext_redis import RedisClientWrapper logger = logging.getLogger(__name__) @@ -38,21 +44,21 @@ class DbMigrationAutoRenewLock: primary error/exit code. """ - _redis_client: Any + _redis_client: redis.Redis | RedisCluster | RedisClientWrapper _name: str _ttl_seconds: float _renew_interval_seconds: float _log_context: str | None _logger: logging.Logger - _lock: Any + _lock: Lock | None _stop_event: threading.Event | None _thread: threading.Thread | None _acquired: bool def __init__( self, - redis_client: Any, + redis_client: redis.Redis | RedisCluster | RedisClientWrapper, name: str, ttl_seconds: float = 60, renew_interval_seconds: float | None = None, @@ -127,7 +133,7 @@ class DbMigrationAutoRenewLock: ) self._thread.start() - def _heartbeat_loop(self, lock: Any, stop_event: threading.Event) -> None: + def _heartbeat_loop(self, lock: Lock, stop_event: threading.Event) -> None: while not stop_event.wait(self._renew_interval_seconds): try: lock.reacquire()