Skip to content

Commit 451ed2f

Browse files
committed
Address review comments: use 2x CPU cores, add units to vars, enhance comments, add normal load test
1 parent d5bc7d3 commit 451ed2f

File tree

1 file changed

+99
-24
lines changed

1 file changed

+99
-24
lines changed

tests/test_020_multithreaded_stress.py

Lines changed: 99 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
Multi-threaded stress tests for mssql-python driver.
33
44
These tests verify the driver's behavior under multi-threaded conditions:
5-
- Concurrent connections with 2, 5, 10, 50, 100, and 1000 threads
5+
- Concurrent connections with 2, 5, 10, 50, 100 threads, and 2x CPU cores
66
- Connection pooling under stress
77
- Thread safety of query execution
88
- Memory and resource usage under load
@@ -502,13 +502,15 @@ def worker(thread_id: int):
502502
table_name = f"#stress_t{thread_id}"
503503
drop_table_if_exists(cursor, table_name)
504504

505-
cursor.execute(f"""
505+
cursor.execute(
506+
f"""
506507
CREATE TABLE {table_name} (
507508
id INT PRIMARY KEY,
508509
thread_id INT,
509510
data NVARCHAR(100)
510511
)
511-
""")
512+
"""
513+
)
512514
conn.commit()
513515

514516
# Perform iterations
@@ -572,7 +574,71 @@ def worker(thread_id: int):
572574

573575

574576
# ============================================================================
575-
# High Load Tests (100, 1000 threads)
577+
# Normal/Realistic Load Test (CPU core count)
578+
# ============================================================================
579+
580+
581+
@pytest.mark.stress_threading
582+
def test_normal_load_cpu_cores(stress_conn_str):
583+
"""
584+
Test with normal/realistic thread count matching CPU core count.
585+
586+
Uses thread count equal to processor cores (typical for CPU-bound workloads)
587+
with longer-running queries to keep each thread busy and stress the driver
588+
under normal production-like conditions.
589+
"""
590+
cpu_cores = os.cpu_count() or 4 # Default to 4 if unable to detect
591+
num_threads = cpu_cores
592+
iterations = 50 # Higher iteration count to keep threads busy longer
593+
594+
# Query that returns more data and takes longer to process
595+
query = """
596+
SELECT
597+
o.name,
598+
o.object_id,
599+
o.type,
600+
o.type_desc,
601+
o.create_date,
602+
o.modify_date,
603+
s.name as schema_name
604+
FROM sys.objects o
605+
INNER JOIN sys.schemas s ON o.schema_id = s.schema_id
606+
WHERE o.type IN ('U', 'V', 'P', 'FN')
607+
ORDER BY o.create_date
608+
"""
609+
610+
runner = MultiThreadedQueryRunner(
611+
conn_str=stress_conn_str,
612+
query=query,
613+
enable_pooling=True,
614+
timeout_seconds=300,
615+
)
616+
617+
result = runner.run_parallel(num_threads=num_threads, iterations_per_thread=iterations)
618+
619+
assert not result.hung, f"Test hung with {num_threads} threads (CPU cores)"
620+
621+
# For normal load, expect high reliability
622+
completion_rate = result.total_iterations / (num_threads * iterations)
623+
error_rate = result.total_errors / result.total_iterations if result.total_iterations > 0 else 1
624+
625+
print(f"\nNormal load test (CPU cores = {cpu_cores}) results:")
626+
print(f" Threads: {num_threads}")
627+
print(f" Iterations per thread: {iterations}")
628+
print(f" Completion rate: {completion_rate*100:.1f}%")
629+
print(f" Error rate: {error_rate*100:.1f}%")
630+
print(f" Throughput: {result.throughput_qps:.1f} qps")
631+
print(f" Avg latency: {result.avg_latency_ms:.1f}ms")
632+
633+
assert (
634+
completion_rate >= 0.95
635+
), f"Completion rate too low for normal load: {completion_rate*100:.1f}%"
636+
assert error_rate < 0.05, f"Error rate too high for normal load: {error_rate*100:.1f}%"
637+
print(f"[PASSED] Normal load test with {num_threads} threads (CPU cores)")
638+
639+
640+
# ============================================================================
641+
# High Load Tests (100 threads, 2x CPU cores)
576642
# ============================================================================
577643

578644

@@ -612,39 +678,42 @@ def test_high_load_100_threads(stress_conn_str, num_threads, iterations):
612678

613679
@pytest.mark.stress_threading
614680
@pytest.mark.slow
615-
def test_extreme_load_1000_threads(stress_conn_str):
681+
def test_extreme_load_2x_cpu_cores(stress_conn_str):
616682
"""
617-
Test extreme load with 1000 concurrent threads.
683+
Test extreme load with 2x CPU core count threads.
618684
619-
This is an extreme stress test to find the breaking point.
620-
Expects some failures but tests for graceful degradation.
685+
This uses a pragmatic thread count (2x processor cores) rather than
686+
an unrealistically high count. Tests graceful degradation under high load.
621687
"""
622-
num_threads = 1000
623-
iterations = 2 # Keep iterations low for 1000 threads
688+
cpu_cores = os.cpu_count() or 4 # Default to 4 if unable to detect
689+
num_threads = cpu_cores * 2
690+
iterations = 10 # Higher iterations now that thread count is reasonable
624691

625692
runner = MultiThreadedQueryRunner(
626693
conn_str=stress_conn_str,
627694
query="SELECT 1",
628695
enable_pooling=True,
629-
timeout_seconds=600, # 10 minutes for 1000 threads
696+
timeout_seconds=300, # 5 minutes
630697
)
631698

632699
result = runner.run_parallel(num_threads=num_threads, iterations_per_thread=iterations)
633700

634-
# For 1000 threads, we just check it doesn't completely hang
635-
assert not result.hung, "Test completely hung with 1000 threads"
701+
assert not result.hung, f"Test hung with {num_threads} threads (2x {cpu_cores} cores)"
636702

637703
completion_rate = result.total_iterations / (num_threads * iterations)
638704
error_rate = result.total_errors / result.total_iterations if result.total_iterations > 0 else 1
639705

640-
print(f"\n1000-thread stress test results:")
706+
print(f"\n2x CPU cores ({num_threads} threads) stress test results:")
707+
print(f" CPU cores: {cpu_cores}")
708+
print(f" Threads: {num_threads}")
641709
print(f" Completion rate: {completion_rate*100:.1f}%")
642710
print(f" Error rate: {error_rate*100:.1f}%")
643711
print(f" Throughput: {result.throughput_qps:.1f} qps")
644712

645-
# Very lenient assertions for extreme load
646-
assert completion_rate >= 0.5, f"Less than 50% of queries completed: {completion_rate*100:.1f}%"
647-
print(f"[PASSED] 1000 threads extreme stress test completed")
713+
# Reasonable expectations for 2x CPU count
714+
assert completion_rate >= 0.85, f"Completion rate too low: {completion_rate*100:.1f}%"
715+
assert error_rate < 0.15, f"Error rate too high: {error_rate*100:.1f}%"
716+
print(f"[PASSED] 2x CPU cores ({num_threads} threads) extreme stress test completed")
648717

649718

650719
# ============================================================================
@@ -814,18 +883,18 @@ def memory_monitor():
814883
if len(memory_samples) >= 2:
815884
initial_mem = memory_samples[0]["rss_mb"]
816885
final_mem = memory_samples[-1]["rss_mb"]
817-
mem_growth = final_mem - initial_mem
886+
mem_growth_mb = final_mem - initial_mem
818887

819888
print(f"\nSustained load test results:")
820889
print(f" Duration: {elapsed:.1f}s")
821890
print(f" Total iterations: {total_iterations}")
822891
print(f" Total errors: {total_errors}")
823892
print(f" Throughput: {total_iterations/elapsed:.1f} qps")
824-
print(f" Memory: {initial_mem:.1f}MB -> {final_mem:.1f}MB (delta: {mem_growth:+.1f}MB)")
893+
print(f" Memory: {initial_mem:.1f}MB -> {final_mem:.1f}MB (delta: {mem_growth_mb:+.1f}MB)")
825894

826895
# Check for excessive memory growth (potential leak)
827896
# Allow up to 100MB growth for long test
828-
assert mem_growth < 100, f"Potential memory leak: {mem_growth:.1f}MB growth"
897+
assert mem_growth_mb < 100, f"Potential memory leak: {mem_growth_mb:.1f}MB growth"
829898

830899
error_rate = total_errors / total_iterations if total_iterations > 0 else 1
831900
assert error_rate < 0.05, f"Error rate too high in sustained test: {error_rate*100:.1f}%"
@@ -844,9 +913,10 @@ def test_complex_queries_multithreaded(stress_conn_str):
844913
Test multi-threaded execution with complex queries.
845914
846915
Tests with JOINs, aggregations, and larger result sets.
916+
Each thread processes all results to keep the driver busy.
847917
"""
848918
complex_query = """
849-
SELECT TOP 50
919+
SELECT
850920
o.name as object_name,
851921
o.type_desc,
852922
s.name as schema_name,
@@ -904,11 +974,11 @@ def test_resource_cleanup_after_stress(stress_conn_str):
904974
after = get_resource_usage()
905975
print(f"After stress RSS: {after['rss_mb']} MB")
906976

907-
mem_delta = after["rss_mb"] - baseline["rss_mb"]
908-
print(f"Memory delta: {mem_delta:+.1f} MB")
977+
mem_delta_mb = after["rss_mb"] - baseline["rss_mb"]
978+
print(f"Memory delta: {mem_delta_mb:+.1f} MB")
909979

910980
# Allow some memory growth but not excessive
911-
assert mem_delta < 50, f"Memory not properly released: {mem_delta:.1f}MB retained"
981+
assert mem_delta_mb < 50, f"Memory not properly released: {mem_delta_mb:.1f}MB retained"
912982
print(f"[PASSED] Resource cleanup test")
913983

914984

@@ -948,6 +1018,11 @@ def test_comprehensive_thread_scaling(stress_conn_str, num_threads, iterations,
9481018
assert not result.hung, f"Test hung: {num_threads} threads, pooling={pooling}"
9491019

9501020
# Adaptive expectations based on thread count
1021+
# Lower thread counts (<=10) should have near-perfect reliability (95%+ completion, <5% errors)
1022+
# Medium loads (11-50 threads) may see some contention (90%+ completion, <10% errors)
1023+
# Higher loads (>50 threads) can experience resource pressure and timing issues,
1024+
# so we allow more graceful degradation (80%+ completion, <20% errors)
1025+
# This approach validates that the driver handles load proportionally without brittleness.
9511026
if num_threads <= 10:
9521027
min_completion = 0.95
9531028
max_error_rate = 0.05

0 commit comments

Comments
 (0)