forked from pymssql/pymssql
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_bulk_copy.py
More file actions
137 lines (105 loc) · 5.24 KB
/
test_bulk_copy.py
File metadata and controls
137 lines (105 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# -*- coding: utf-8 -*-
"""
Test bulk copy.
"""
import unittest
import datetime
from pymssql import _mssql
from tests.helpers import drop_table, pymssqlconn, mssql_server_required
tablename = "pymssql"
simple_table = "CREATE TABLE %s (a1 INT, a2 INT, a3 INT)" % tablename
complex_table = """
CREATE TABLE %s (
pk_id int IDENTITY (1, 1) NOT NULL,
uuid uniqueidentifier DEFAULT newsequentialid(),
col_real real UNIQUE,
col_float float,
col_datetime datetime,
col_bit bit,
col_varchar varchar(50),
col_varbinary varbinary(50)
)
""" % tablename
@mssql_server_required
class TestTypes(unittest.TestCase):
def setUp(self):
self.conn = pymssqlconn()
drop_table(self.conn._conn, tablename)
def tearDown(self):
self.conn.close()
def expect_simple_table_content(self, query, content):
self.conn._conn.execute_query(query)
assert [(row[0], row[1], row[2]) for row in self.conn._conn] == content
def expect_row_count(self, expected_row_count):
self.conn._conn.execute_query('select count(*) from pymssql')
assert tuple(self.conn._conn)[0][0] == expected_row_count
def simple_table_test(self, content, **kwargs):
self.conn._conn.execute_non_query(simple_table)
self.conn.bulk_copy(tablename, content, **kwargs)
self.expect_simple_table_content('select * from pymssql', content)
def test_simple_table_bulk_copy(self):
self.simple_table_test([(1, 2, 3), (4, 5, 6)])
def test_lots_of_rows_single_batch(self):
self.conn._conn.execute_non_query(simple_table)
self.conn.bulk_copy(tablename, [(1, 2, 3), (4, 5, 6)] * 100000, batch_size=1000000)
self.expect_simple_table_content('select top 2 * from pymssql', [(1, 2, 3), (4, 5, 6)])
self.expect_row_count(200000)
def test_batches(self):
self.conn._conn.execute_non_query(simple_table)
self.conn.bulk_copy(tablename, [(1, 2, 3), (4, 5, 6)] * 100000, batch_size=1000)
self.expect_simple_table_content('select top 2 * from pymssql', [(1, 2, 3), (4, 5, 6)])
self.expect_row_count(200000)
def test_exact_batch_size(self):
self.conn._conn.execute_non_query(simple_table)
self.conn.bulk_copy(tablename, [(1, 2, 3), (4, 5, 6)] * 500, batch_size=1000)
self.expect_simple_table_content('select top 2 * from pymssql', [(1, 2, 3), (4, 5, 6)])
self.expect_row_count(1000)
def test_tablock_hint(self):
self.simple_table_test([(1, 2, 3), (4, 5, 6)], tablock=True)
def test_check_constraints_hint(self):
self.simple_table_test([(1, 2, 3), (4, 5, 6)], check_constraints=True)
def test_fire_triggers_hint(self):
self.simple_table_test([(1, 2, 3), (4, 5, 6)], fire_triggers=True)
def test_null_values(self):
self.simple_table_test([(1, None, 3), (None, None, None), (1, 2, 3)])
def test_column_ids(self):
self.conn._conn.execute_non_query(simple_table)
self.conn.bulk_copy(tablename, [(1, 2, 3), (4, 5, 6)], column_ids=[1, 3, 2])
self.expect_simple_table_content('select * from pymssql', [(1, 3, 2), (4, 6, 5)])
def test_too_many_columns(self):
self.conn._conn.execute_non_query(simple_table)
with self.assertRaises(_mssql.MSSQLDatabaseException):
self.conn.bulk_copy(tablename, [(7, 7, 7, 7)])
def test_bad_value(self):
self.conn._conn.execute_non_query(simple_table)
with self.assertRaises(_mssql.MSSQLDatabaseException):
self.conn.bulk_copy(tablename, [("Hello", 7, 7)])
def test_too_few_column_ids(self):
self.conn._conn.execute_non_query(simple_table)
caught_exception = False
try:
self.conn.bulk_copy(tablename, [(1, 2, 3)], column_ids=[1])
except ValueError:
caught_exception = True
assert caught_exception
def test_invalid_column_ids(self):
self.conn._conn.execute_non_query(simple_table)
with self.assertRaises(_mssql.MSSQLDatabaseException):
self.conn.bulk_copy(tablename, [(1, 2, 3)], column_ids=[1, 2, 9])
def test_complex_table(self):
self.conn._conn.execute_non_query(complex_table)
rows = [
(1.2000000476837158, 3.4, datetime.datetime(year=2020, month=1, day=2, hour=3, minute=4, second=5), True, "Hello World", b'\x02\x03\x05\x07'),
(5.599999904632568, 7.8, datetime.datetime(year=2021, month=2, day=3, hour=4, minute=5, second=6), False, "Hello World!", bytearray([2, 3, 5, 7])),
]
self.conn.bulk_copy(tablename, rows, [3, 4, 5, 6, 7, 8])
self.conn._conn.execute_query('select * from pymssql')
assert [tuple(row[i] for i in range(2, 8)) for row in self.conn._conn] == rows
def test_uniqueness_failure(self):
self.conn._conn.execute_non_query(complex_table)
rows = [
(1.2000000476837158, 3.4, datetime.datetime(year=2020, month=1, day=2, hour=3, minute=4, second=5), True, "Hello World"),
(1.2000000476837158, 7.8, datetime.datetime(year=2021, month=2, day=3, hour=4, minute=5, second=6), False, "Hello World!"),
]
with self.assertRaises(_mssql.MSSQLDatabaseException):
self.conn.bulk_copy(tablename, rows, [3, 4, 5, 6, 7])