This repository was archived by the owner on Sep 7, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 53
Expand file tree
/
Copy pathtest_copy.py
More file actions
121 lines (108 loc) · 4.17 KB
/
test_copy.py
File metadata and controls
121 lines (108 loc) · 4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import unittest
import pg8000
from connection_settings import db_connect
from six import b, BytesIO, u, iteritems
from sys import exc_info
class Tests(unittest.TestCase):
def setUp(self):
self.db = pg8000.connect(**db_connect)
try:
cursor = self.db.cursor()
try:
cursor = self.db.cursor()
cursor.execute("DROP TABLE t1")
except pg8000.DatabaseError:
e = exc_info()[1]
# the only acceptable error is:
msg = e.args[0]
code = msg['C']
self.assertEqual(
code, '42P01', # table does not exist
"incorrect error for drop table: " + str(msg))
self.db.rollback()
cursor.execute(
"CREATE TEMPORARY TABLE t1 (f1 int primary key, "
"f2 int not null, f3 varchar(50) null)")
finally:
cursor.close()
def tearDown(self):
self.db.close()
def testCopyToWithTable(self):
try:
cursor = self.db.cursor()
cursor.execute(
"INSERT INTO t1 (f1, f2, f3) VALUES (%s, %s, %s)", (1, 1, 1))
cursor.execute(
"INSERT INTO t1 (f1, f2, f3) VALUES (%s, %s, %s)", (2, 2, 2))
cursor.execute(
"INSERT INTO t1 (f1, f2, f3) VALUES (%s, %s, %s)", (3, 3, 3))
stream = BytesIO()
cursor.execute("copy t1 to stdout", stream=stream)
self.assertEqual(
stream.getvalue(), b("1\t1\t1\n2\t2\t2\n3\t3\t3\n"))
self.assertEqual(cursor.rowcount, 3)
self.db.commit()
finally:
cursor.close()
def testCopyToWithQuery(self):
try:
cursor = self.db.cursor()
stream = BytesIO()
cursor.execute(
"COPY (SELECT 1 as One, 2 as Two) TO STDOUT WITH DELIMITER "
"'X' CSV HEADER QUOTE AS 'Y' FORCE QUOTE Two", stream=stream)
self.assertEqual(stream.getvalue(), b('oneXtwo\n1XY2Y\n'))
self.assertEqual(cursor.rowcount, 1)
self.db.rollback()
finally:
cursor.close()
def testCopyFromWithTable(self):
try:
cursor = self.db.cursor()
stream = BytesIO(b("1\t1\t1\n2\t2\t2\n3\t3\t3\n"))
cursor.execute("copy t1 from STDIN", stream=stream)
self.assertEqual(cursor.rowcount, 3)
cursor.execute("SELECT * FROM t1 ORDER BY f1")
retval = cursor.fetchall()
self.assertEqual(retval, ([1, 1, '1'], [2, 2, '2'], [3, 3, '3']))
self.db.rollback()
finally:
cursor.close()
def testCopyFromWithQuery(self):
try:
cursor = self.db.cursor()
stream = BytesIO(b("f1Xf2\n1XY1Y\n"))
cursor.execute(
"COPY t1 (f1, f2) FROM STDIN WITH DELIMITER 'X' CSV HEADER "
"QUOTE AS 'Y' FORCE NOT NULL f1", stream=stream)
self.assertEqual(cursor.rowcount, 1)
cursor.execute("SELECT * FROM t1 ORDER BY f1")
retval = cursor.fetchall()
self.assertEqual(retval, ([1, 1, None],))
self.db.commit()
finally:
cursor.close()
def testCopyFromWithError(self):
try:
cursor = self.db.cursor()
stream = BytesIO(b("f1Xf2\n\n1XY1Y\n"))
cursor.execute(
"COPY t1 (f1, f2) FROM STDIN WITH DELIMITER 'X' CSV HEADER "
"QUOTE AS 'Y' FORCE NOT NULL f1", stream=stream)
self.assertTrue(False, "Should have raised an exception")
except:
args_dict = {
'S': u('ERROR'),
'C': u('22P02'),
'M': u('invalid input syntax for integer: ""'),
'W': u('COPY t1, line 2, column f1: ""'),
'F': u('numutils.c'),
'R': u('pg_atoi')
}
args = exc_info()[1].args[0]
for k, v in iteritems(args_dict):
self.assertEqual(args[k], v)
finally:
cursor.close()
if __name__ == "__main__":
unittest.main()