diff --git a/tests/test_003_connection.py b/tests/test_003_connection.py index 422445c8..c71e769b 100644 --- a/tests/test_003_connection.py +++ b/tests/test_003_connection.py @@ -22,6 +22,7 @@ import pytest import time from mssql_python import Connection, connect, pooling +import threading def drop_table_if_exists(cursor, table_name): """Drop the table if it exists""" @@ -225,31 +226,245 @@ def test_connection_close(conn_str): temp_conn.close() def test_connection_pooling_speed(conn_str): - # No pooling - start_no_pool = time.perf_counter() + # Disable pooling first + pooling(enabled=False) + + # Warm up - establish initial connection to avoid first-connection overhead + warmup = connect(conn_str) + warmup.close() + + # Measure multiple non-pooled connections + non_pooled_times = [] + for _ in range(5): + start = time.perf_counter() + conn = connect(conn_str) + conn.close() + end = time.perf_counter() + non_pooled_times.append(end - start) + + avg_no_pool = sum(non_pooled_times) / len(non_pooled_times) + + # Enable pooling + pooling(max_size=5, idle_timeout=30) + + # Prime the pool with a connection + primer = connect(conn_str) + primer.close() + + # Small delay to ensure connection is properly returned to pool + time.sleep(0.1) + + # Measure multiple pooled connections + pooled_times = [] + for _ in range(5): + start = time.perf_counter() + conn = connect(conn_str) + conn.close() + end = time.perf_counter() + pooled_times.append(end - start) + + avg_pooled = sum(pooled_times) / len(pooled_times) + + # Pooled should be significantly faster than non-pooled + assert avg_pooled < avg_no_pool, \ + f"Pooled connections ({avg_pooled:.6f}s) not significantly faster than non-pooled ({avg_no_pool:.6f}s)" + + # Clean up - disable pooling for other tests + pooling(enabled=False) + +def test_connection_pooling_reuse_spid(conn_str): + """Test that connections are actually reused from the pool""" + # Enable pooling + pooling(max_size=1, idle_timeout=30) + + # Create and close a connection conn1 = connect(conn_str) + cursor1 = conn1.cursor() + cursor1.execute("SELECT @@SPID") # Get SQL Server process ID + spid1 = cursor1.fetchone()[0] conn1.close() - end_no_pool = time.perf_counter() - no_pool_duration = end_no_pool - start_no_pool - - # Second connection - start2 = time.perf_counter() + + # Get another connection - should be the same one from pool conn2 = connect(conn_str) + cursor2 = conn2.cursor() + cursor2.execute("SELECT @@SPID") + spid2 = cursor2.fetchone()[0] conn2.close() - end2 = time.perf_counter() - duration2 = end2 - start2 + + # The SPID should be the same, indicating connection reuse + assert spid1 == spid2, "Connections not reused - different SPIDs" + + # Clean up + +def test_pool_exhaustion_max_size_1(conn_str): + """Test pool exhaustion when max_size=1 and multiple concurrent connections are requested.""" + pooling(max_size=1, idle_timeout=30) + conn1 = connect(conn_str) + results = [] + + def try_connect(): + try: + conn2 = connect(conn_str) + results.append("success") + conn2.close() + except Exception as e: + results.append(str(e)) + + # Start a thread that will attempt to get a second connection while the first is open + t = threading.Thread(target=try_connect) + t.start() + t.join(timeout=2) + conn1.close() + + # Depending on implementation, either blocks, raises, or times out + assert results, "Second connection attempt did not complete" + # If pool blocks, the thread may not finish until conn1 is closed, so allow both outcomes + assert results[0] == "success" or "pool" in results[0].lower() or "timeout" in results[0].lower(), \ + f"Unexpected pool exhaustion result: {results[0]}" + pooling(enabled=False) + +def test_pool_idle_timeout_removes_connections(conn_str): + """Test that idle_timeout removes connections from the pool after the timeout.""" + pooling(max_size=2, idle_timeout=2) + conn1 = connect(conn_str) + spid_list = [] + cursor1 = conn1.cursor() + cursor1.execute("SELECT @@SPID") + spid1 = cursor1.fetchone()[0] + spid_list.append(spid1) + conn1.close() - # Pooling enabled - pooling(max_size=2, idle_timeout=10) - connect(conn_str).close() + # Wait for longer than idle_timeout + time.sleep(3) - # Pooled connection (should be reused, hence faster) - start_pool = time.perf_counter() + # Get a new connection, which should not reuse the previous SPID conn2 = connect(conn_str) + cursor2 = conn2.cursor() + cursor2.execute("SELECT @@SPID") + spid2 = cursor2.fetchone()[0] + spid_list.append(spid2) conn2.close() - end_pool = time.perf_counter() - pool_duration = end_pool - start_pool - assert pool_duration < no_pool_duration, "Expected faster connection with pooling" + + assert spid1 != spid2, "Idle timeout did not remove connection from pool" + +def test_connection_timeout_invalid_password(conn_str): + """Test that connecting with an invalid password raises an exception quickly (timeout).""" + # Modify the connection string to use an invalid password + if "Pwd=" in conn_str: + bad_conn_str = conn_str.replace("Pwd=", "Pwd=wrongpassword") + elif "Password=" in conn_str: + bad_conn_str = conn_str.replace("Password=", "Password=wrongpassword") + else: + pytest.skip("No password found in connection string to modify") + start = time.perf_counter() + with pytest.raises(Exception): + connect(bad_conn_str) + elapsed = time.perf_counter() - start + # Should fail quickly (within 10 seconds) + assert elapsed < 10, f"Connection with invalid password took too long: {elapsed:.2f}s" + +def test_connection_timeout_invalid_host(conn_str): + """Test that connecting to an invalid host fails with a timeout.""" + # Replace server/host with an invalid one + if "Server=" in conn_str: + bad_conn_str = conn_str.replace("Server=", "Server=invalidhost12345;") + elif "host=" in conn_str: + bad_conn_str = conn_str.replace("host=", "host=invalidhost12345;") + else: + pytest.skip("No server/host found in connection string to modify") + start = time.perf_counter() + with pytest.raises(Exception): + connect(bad_conn_str) + elapsed = time.perf_counter() - start + # Should fail within a reasonable time (30s) + # Note: This may vary based on network conditions, so adjust as needed + # but generally, a connection to an invalid host should not take too long + # to fail. + # If it takes too long, it may indicate a misconfiguration or network issue. + assert elapsed < 30, f"Connection to invalid host took too long: {elapsed:.2f}s" + +def test_pool_removes_invalid_connections(conn_str): + """Test that the pool removes connections that become invalid (simulate by closing underlying connection).""" + pooling(max_size=1, idle_timeout=30) + conn = connect(conn_str) + cursor = conn.cursor() + cursor.execute("SELECT 1") + # Simulate invalidation by forcibly closing the connection at the driver level + try: + # Try to access a private attribute or method to forcibly close the underlying connection + # This is implementation-specific; if not possible, skip + if hasattr(conn, "_conn") and hasattr(conn._conn, "close"): + conn._conn.close() + else: + pytest.skip("Cannot forcibly close underlying connection for this driver") + except Exception: + pass + # Safely close the connection, ignoring errors due to forced invalidation + try: + conn.close() + except RuntimeError as e: + if "not initialized" not in str(e): + raise + # Now, get a new connection from the pool and ensure it works + new_conn = connect(conn_str) + new_cursor = new_conn.cursor() + try: + new_cursor.execute("SELECT 1") + result = new_cursor.fetchone() + assert result is not None and result[0] == 1, "Pool did not remove invalid connection" + finally: + new_conn.close() + pooling(enabled=False) + +def test_pool_recovery_after_failed_connection(conn_str): + """Test that the pool recovers after a failed connection attempt.""" + pooling(max_size=1, idle_timeout=30) + # First, try to connect with a bad password (should fail) + if "Pwd=" in conn_str: + bad_conn_str = conn_str.replace("Pwd=", "Pwd=wrongpassword") + elif "Password=" in conn_str: + bad_conn_str = conn_str.replace("Password=", "Password=wrongpassword") + else: + pytest.skip("No password found in connection string to modify") + with pytest.raises(Exception): + connect(bad_conn_str) + # Now, connect with the correct string and ensure it works + conn = connect(conn_str) + cursor = conn.cursor() + cursor.execute("SELECT 1") + result = cursor.fetchone() + assert result is not None and result[0] == 1, "Pool did not recover after failed connection" + conn.close() + pooling(enabled=False) + +def test_pool_capacity_limit_and_overflow(conn_str): + """Test that pool does not grow beyond max_size and handles overflow gracefully.""" + pooling(max_size=2, idle_timeout=30) + conns = [] + try: + # Open up to max_size connections + conns.append(connect(conn_str)) + conns.append(connect(conn_str)) + # Try to open a third connection, which should fail or block + overflow_result = [] + def try_overflow(): + try: + c = connect(conn_str) + overflow_result.append("success") + c.close() + except Exception as e: + overflow_result.append(str(e)) + t = threading.Thread(target=try_overflow) + t.start() + t.join(timeout=2) + assert overflow_result, "Overflow connection attempt did not complete" + # Accept either block, error, or success if pool implementation allows overflow + assert overflow_result[0] == "success" or "pool" in overflow_result[0].lower() or "timeout" in overflow_result[0].lower(), \ + f"Unexpected pool overflow result: {overflow_result[0]}" + finally: + for c in conns: + c.close() + pooling(enabled=False) def test_connection_pooling_basic(conn_str): # Enable pooling with small pool size