Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions datapusher/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,16 @@
if not SSL_VERIFY:
requests.packages.urllib3.disable_warnings()

POSTGRES_INT_MAX = 2147483647
POSTGRES_INT_MIN = -2147483648
POSTGRES_BIGINT_MAX = 9223372036854775807
POSTGRES_BIGINT_MIN = -9223372036854775808

_TYPE_MAPPING = {
'String': 'text',
# 'int' may not be big enough,
# and type detection may not realize it needs to be big
'Integer': 'numeric',
# smartint looks at the min/max values of qsv stats scan and
# chooses integer, bigint or numeric as appropriate
'Integer': 'smartint',
'Float': 'numeric',
'DateTime': 'timestamp',
'Date': 'date',
Expand Down Expand Up @@ -586,6 +591,8 @@ def push_to_datastore(task_id, input, dry_run=False):
# run qsv stats to get data types and descriptive statistics
headers = []
types = []
headers_min = []
headers_max = []
qsv_stats_csv = tempfile.NamedTemporaryFile(suffix='.csv')
qsv_stats_cmd = [QSV_BIN, 'stats', tmp.name,
'--output', qsv_stats_csv.name]
Expand All @@ -605,11 +612,12 @@ def push_to_datastore(task_id, input, dry_run=False):
'Cannot infer data types and compile statistics: {}'.format(e)
)
with open(qsv_stats_csv.name, mode='r') as inp:
reader = csv.reader(inp)
next(reader) # skip first element, which is a header
for rows in reader:
headers.append(rows[0])
types.append(rows[1])
reader = csv.DictReader(inp)
for row in reader:
headers.append(row['field'])
types.append(row['type'])
headers_min.append(row['min'])
headers_max.append(row['max'])

existing = datastore_resource_exists(resource_id, api_key, ckan_url)
existing_info = None
Expand All @@ -621,7 +629,7 @@ def push_to_datastore(task_id, input, dry_run=False):
if existing_info:
types = [{
'text': 'String',
'numeric': 'Decimal',
'numeric': 'Float',
'timestamp': 'DateTime',
}.get(existing_info.get(h, {}).get('type_override'), t)
for t, h in zip(types, headers)]
Expand All @@ -636,8 +644,23 @@ def push_to_datastore(task_id, input, dry_run=False):
res_id=resource_id))
delete_datastore_resource(resource_id, api_key, ckan_url)

headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])])
for field in zip(headers, types)]
# 1st pass of building headers_dict
temp_headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])])
for field in zip(headers, types)]

# 2nd pass header_dicts, checking for smartint types
headers_dicts = []
for idx, header in enumerate(temp_headers_dicts):
if header['type'] == 'smartint':
if int(headers_max[idx]) <= POSTGRES_INT_MAX and int(headers_min[idx]) >= POSTGRES_INT_MIN:
header_type = 'integer'
elif int(headers_max[idx]) <= POSTGRES_BIGINT_MAX and int(headers_min[idx]) >= POSTGRES_BIGINT_MIN:
header_type = 'bigint'
else:
header_type = 'numeric'
else:
header_type = header['type']
headers_dicts.append(dict(id=header['id'], type=header_type))

# Maintain data dictionaries from matching column names
if existing_info:
Expand Down