diff --git a/datapusher/jobs.py b/datapusher/jobs.py index 18bb01a9..3eda299f 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -66,11 +66,16 @@ if not SSL_VERIFY: requests.packages.urllib3.disable_warnings() +POSTGRES_INT_MAX = 2147483647 +POSTGRES_INT_MIN = -2147483648 +POSTGRES_BIGINT_MAX = 9223372036854775807 +POSTGRES_BIGINT_MIN = -9223372036854775808 + _TYPE_MAPPING = { 'String': 'text', - # 'int' may not be big enough, - # and type detection may not realize it needs to be big - 'Integer': 'numeric', + # smartint looks at the min/max values of qsv stats scan and + # chooses integer, bigint or numeric as appropriate + 'Integer': 'smartint', 'Float': 'numeric', 'DateTime': 'timestamp', 'Date': 'date', @@ -586,6 +591,8 @@ def push_to_datastore(task_id, input, dry_run=False): # run qsv stats to get data types and descriptive statistics headers = [] types = [] + headers_min = [] + headers_max = [] qsv_stats_csv = tempfile.NamedTemporaryFile(suffix='.csv') qsv_stats_cmd = [QSV_BIN, 'stats', tmp.name, '--output', qsv_stats_csv.name] @@ -605,11 +612,12 @@ def push_to_datastore(task_id, input, dry_run=False): 'Cannot infer data types and compile statistics: {}'.format(e) ) with open(qsv_stats_csv.name, mode='r') as inp: - reader = csv.reader(inp) - next(reader) # skip first element, which is a header - for rows in reader: - headers.append(rows[0]) - types.append(rows[1]) + reader = csv.DictReader(inp) + for row in reader: + headers.append(row['field']) + types.append(row['type']) + headers_min.append(row['min']) + headers_max.append(row['max']) existing = datastore_resource_exists(resource_id, api_key, ckan_url) existing_info = None @@ -621,7 +629,7 @@ def push_to_datastore(task_id, input, dry_run=False): if existing_info: types = [{ 'text': 'String', - 'numeric': 'Decimal', + 'numeric': 'Float', 'timestamp': 'DateTime', }.get(existing_info.get(h, {}).get('type_override'), t) for t, h in zip(types, headers)] @@ -636,8 +644,23 @@ def push_to_datastore(task_id, input, dry_run=False): res_id=resource_id)) delete_datastore_resource(resource_id, api_key, ckan_url) - headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])]) - for field in zip(headers, types)] + # 1st pass of building headers_dict + temp_headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])]) + for field in zip(headers, types)] + + # 2nd pass header_dicts, checking for smartint types + headers_dicts = [] + for idx, header in enumerate(temp_headers_dicts): + if header['type'] == 'smartint': + if int(headers_max[idx]) <= POSTGRES_INT_MAX and int(headers_min[idx]) >= POSTGRES_INT_MIN: + header_type = 'integer' + elif int(headers_max[idx]) <= POSTGRES_BIGINT_MAX and int(headers_min[idx]) >= POSTGRES_BIGINT_MIN: + header_type = 'bigint' + else: + header_type = 'numeric' + else: + header_type = header['type'] + headers_dicts.append(dict(id=header['id'], type=header_type)) # Maintain data dictionaries from matching column names if existing_info: