|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. |
| 3 | + |
| 4 | + |
| 5 | +import os |
| 6 | +import tempfile |
| 7 | +import unittest |
| 8 | +from unittest.mock import patch |
| 9 | + |
| 10 | +from fq import FQ |
| 11 | +from fq.utils import is_valid_identifier |
| 12 | +from fq.exceptions import BadArgumentException, FQException |
| 13 | + |
| 14 | + |
| 15 | +class FakeCluster: |
| 16 | + def __init__(self, startup_nodes=None, decode_responses=False, socket_timeout=None): |
| 17 | + self.startup_nodes = startup_nodes or [] |
| 18 | + self.decode_responses = decode_responses |
| 19 | + self.socket_timeout = socket_timeout |
| 20 | + |
| 21 | + def register_script(self, _): |
| 22 | + async def _runner(*args, **kwargs): |
| 23 | + return [] |
| 24 | + |
| 25 | + return _runner |
| 26 | + |
| 27 | + async def ping(self): |
| 28 | + return True |
| 29 | + |
| 30 | + |
| 31 | +class FakeRedisForClose: |
| 32 | + def __init__(self): |
| 33 | + self.closed = False |
| 34 | + self.waited = False |
| 35 | + self.disconnected = False |
| 36 | + self.connection_pool = self |
| 37 | + |
| 38 | + async def close(self): |
| 39 | + self.closed = True |
| 40 | + |
| 41 | + async def wait_closed(self): |
| 42 | + self.waited = True |
| 43 | + |
| 44 | + async def disconnect(self): |
| 45 | + self.disconnected = True |
| 46 | + |
| 47 | + |
| 48 | +class FakeRedisForDeepStatus: |
| 49 | + def __init__(self): |
| 50 | + self.key_set = None |
| 51 | + |
| 52 | + async def set(self, key, value): |
| 53 | + self.key_set = (key, value) |
| 54 | + return True |
| 55 | + |
| 56 | + |
| 57 | +class FakeRedisConnectionFailure: |
| 58 | + def __init__(self, *args, **kwargs): |
| 59 | + pass |
| 60 | + |
| 61 | + async def ping(self): |
| 62 | + raise ConnectionError("boom") |
| 63 | + |
| 64 | + def register_script(self, _): |
| 65 | + async def _runner(*args, **kwargs): |
| 66 | + return [] |
| 67 | + |
| 68 | + return _runner |
| 69 | + |
| 70 | + |
| 71 | +class FakeLuaDequeue: |
| 72 | + def __init__(self): |
| 73 | + self.called = False |
| 74 | + |
| 75 | + async def __call__(self, keys=None, args=None): |
| 76 | + self.called = True |
| 77 | + return [b"q1", b"j1", None, b"0"] |
| 78 | + |
| 79 | + |
| 80 | +class FakePipe: |
| 81 | + def __init__(self): |
| 82 | + self.hdel_calls = [] |
| 83 | + self.deleted = [] |
| 84 | + self.executed = False |
| 85 | + |
| 86 | + def hdel(self, *args): |
| 87 | + self.hdel_calls.append(args) |
| 88 | + |
| 89 | + def delete(self, *args): |
| 90 | + self.deleted.append(args) |
| 91 | + |
| 92 | + async def execute(self): |
| 93 | + self.executed = True |
| 94 | + |
| 95 | + |
| 96 | +class FakeRedisForClear: |
| 97 | + def __init__(self): |
| 98 | + self.pipe = FakePipe() |
| 99 | + self.deleted_keys = [] |
| 100 | + |
| 101 | + async def zrem(self, _primary_set, _queue_id): |
| 102 | + return 1 |
| 103 | + |
| 104 | + async def lrange(self, _key, _start, _end): |
| 105 | + return [None, b"job-bytes", "job-str"] |
| 106 | + |
| 107 | + def pipeline(self): |
| 108 | + return self.pipe |
| 109 | + |
| 110 | + async def delete(self, key): |
| 111 | + self.deleted_keys.append(key) |
| 112 | + |
| 113 | + |
| 114 | +class TestEdgeCases(unittest.IsolatedAsyncioTestCase): |
| 115 | + async def asyncSetUp(self): |
| 116 | + cwd = os.path.dirname(os.path.realpath(__file__)) |
| 117 | + self.config_path = os.path.join(cwd, "test.conf") |
| 118 | + self.fq_instance = None |
| 119 | + |
| 120 | + async def asyncTearDown(self): |
| 121 | + """Clean up Redis state and close connections after each test.""" |
| 122 | + # If a test initialized FQ with real Redis, clean up |
| 123 | + if self.fq_instance is not None: |
| 124 | + try: |
| 125 | + if self.fq_instance._r is not None: |
| 126 | + await self.fq_instance._r.flushdb() |
| 127 | + await self.fq_instance.close() |
| 128 | + except Exception: |
| 129 | + # Ignore errors during cleanup - tests may have mocked or closed connections |
| 130 | + # This prevents tearDown failures from masking test failures |
| 131 | + pass |
| 132 | + self.fq_instance = None |
| 133 | + |
| 134 | + def test_missing_config_file_raises(self): |
| 135 | + with self.assertRaisesRegex(FQException, "Config file not found"): |
| 136 | + FQ("/tmp/does-not-exist.conf") |
| 137 | + |
| 138 | + async def test_initialize_fails_fast_on_bad_redis(self): |
| 139 | + with patch("fq.queue.Redis", FakeRedisConnectionFailure): |
| 140 | + fq = FQ(self.config_path) |
| 141 | + with self.assertRaisesRegex(FQException, "Failed to connect to Redis"): |
| 142 | + await fq.initialize() |
| 143 | + |
| 144 | + async def test_cluster_initialization(self): |
| 145 | + """Covers clustered Redis path (queue.py lines 69-75, 104-106).""" |
| 146 | + with tempfile.NamedTemporaryFile(mode="w", suffix=".conf", delete=False) as f: |
| 147 | + f.write( |
| 148 | + """[fq] |
| 149 | +job_expire_interval : 5000 |
| 150 | +job_requeue_interval : 5000 |
| 151 | +default_job_requeue_limit : -1 |
| 152 | +
|
| 153 | +[redis] |
| 154 | +db : 0 |
| 155 | +key_prefix : test_fq_cluster |
| 156 | +conn_type : tcp_sock |
| 157 | +host : 127.0.0.1 |
| 158 | +port : 6379 |
| 159 | +clustered : true |
| 160 | +password : |
| 161 | +""" |
| 162 | + ) |
| 163 | + config_path = f.name |
| 164 | + |
| 165 | + try: |
| 166 | + with patch("fq.queue.RedisCluster", FakeCluster): |
| 167 | + fq = FQ(config_path) |
| 168 | + await fq._initialize() |
| 169 | + self.assertIsInstance(fq.redis_client(), FakeCluster) |
| 170 | + await fq.close() |
| 171 | + finally: |
| 172 | + os.unlink(config_path) |
| 173 | + |
| 174 | + async def test_dequeue_payload_none(self): |
| 175 | + """Covers dequeue branch where payload is None (queue.py line 212).""" |
| 176 | + fq = FQ(self.config_path) |
| 177 | + self.fq_instance = fq |
| 178 | + await fq._initialize() |
| 179 | + fake_dequeue = FakeLuaDequeue() |
| 180 | + fq._lua_dequeue = fake_dequeue |
| 181 | + result = await fq.dequeue() |
| 182 | + self.assertEqual(result["status"], "failure") |
| 183 | + self.assertTrue(fake_dequeue.called) |
| 184 | + |
| 185 | + async def test_clear_queue_delete_only(self): |
| 186 | + """Covers clear_queue else branch (queue.py lines 499, 502).""" |
| 187 | + fq = FQ(self.config_path) |
| 188 | + self.fq_instance = fq |
| 189 | + await fq._initialize() |
| 190 | + await fq._r.flushdb() |
| 191 | + response = await fq.clear_queue(queue_type="noqueue", queue_id="missing") |
| 192 | + self.assertEqual(response["status"], "Failure") |
| 193 | + |
| 194 | + async def test_close_fallback_paths(self): |
| 195 | + """Covers close() fallback paths (queue.py lines 528-549).""" |
| 196 | + fq = FQ(self.config_path) |
| 197 | + fq._r = FakeRedisForClose() |
| 198 | + await fq.close() |
| 199 | + self.assertIsNone(fq._r) |
| 200 | + |
| 201 | + async def test_deep_status_calls_set(self): |
| 202 | + """Covers deep_status (queue.py line 420).""" |
| 203 | + fq = FQ(self.config_path) |
| 204 | + fq._key_prefix = fq._config.get("redis", "key_prefix") |
| 205 | + fq._r = FakeRedisForDeepStatus() |
| 206 | + await fq.deep_status() |
| 207 | + self.assertEqual( |
| 208 | + fq._r.key_set, |
| 209 | + ("fq:deep_status:{}".format(fq._key_prefix), "sharq_deep_status"), |
| 210 | + ) |
| 211 | + |
| 212 | + def test_is_valid_identifier_non_string(self): |
| 213 | + """Covers utils.is_valid_identifier non-string check (utils.py line 22).""" |
| 214 | + self.assertFalse(is_valid_identifier(123)) |
| 215 | + self.assertFalse(is_valid_identifier(None)) |
| 216 | + self.assertFalse(is_valid_identifier(["a"])) |
| 217 | + |
| 218 | + async def test_reload_config_with_new_path(self): |
| 219 | + """Covers reload_config branch (queue.py lines 104-106).""" |
| 220 | + with tempfile.NamedTemporaryFile(mode="w", suffix=".conf", delete=False) as f: |
| 221 | + f.write( |
| 222 | + """[fq] |
| 223 | +job_expire_interval : 5000 |
| 224 | +job_requeue_interval : 5000 |
| 225 | +default_job_requeue_limit : -1 |
| 226 | +
|
| 227 | +[redis] |
| 228 | +db : 0 |
| 229 | +key_prefix : new_prefix |
| 230 | +conn_type : tcp_sock |
| 231 | +port : 6379 |
| 232 | +host : 127.0.0.1 |
| 233 | +clustered : false |
| 234 | +password : |
| 235 | +""" |
| 236 | + ) |
| 237 | + new_config = f.name |
| 238 | + |
| 239 | + try: |
| 240 | + fq = FQ(self.config_path) |
| 241 | + fq.reload_config(new_config) |
| 242 | + self.assertEqual(fq.config_path, new_config) |
| 243 | + self.assertEqual(fq._config.get("redis", "key_prefix"), "new_prefix") |
| 244 | + finally: |
| 245 | + os.unlink(new_config) |
| 246 | + |
| 247 | + async def test_clear_queue_purge_all_with_mixed_job_ids(self): |
| 248 | + """Covers purge_all loop branches (queue.py lines 463-468, 474-479).""" |
| 249 | + fq = FQ(self.config_path) |
| 250 | + fq._key_prefix = fq._config.get("redis", "key_prefix") |
| 251 | + fq._r = FakeRedisForClear() |
| 252 | + response = await fq.clear_queue("qt", "qid", purge_all=True) |
| 253 | + self.assertEqual(response["status"], "Success") |
| 254 | + self.assertTrue(fq._r.pipe.executed) |
| 255 | + |
| 256 | + async def test_get_queue_length_invalid_params(self): |
| 257 | + """Covers validation branches (queue.py lines 499, 502).""" |
| 258 | + fq = FQ(self.config_path) |
| 259 | + with self.assertRaises(BadArgumentException): |
| 260 | + await fq.get_queue_length("bad type", "qid") |
| 261 | + with self.assertRaises(BadArgumentException): |
| 262 | + await fq.get_queue_length("qtype", "bad id") |
| 263 | + |
| 264 | + async def test_deep_status_real_redis(self): |
| 265 | + """Covers deep_status with real redis (queue.py line 420).""" |
| 266 | + fq = FQ(self.config_path) |
| 267 | + self.fq_instance = fq |
| 268 | + await fq._initialize() |
| 269 | + result = await fq.deep_status() |
| 270 | + self.assertTrue(result) |
| 271 | + |
| 272 | + |
| 273 | +if __name__ == "__main__": |
| 274 | + unittest.main() |
0 commit comments