From 5e2cd15223c07e60af5f62f53df20f892584c747 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Tue, 10 May 2022 16:14:41 -0400 Subject: [PATCH 1/2] smarter integer type mapping to postgres based on the min/max value of a column, we can infer the best postgres data type and choose appropriately - integer, bigint or numeric. --- datapusher/jobs.py | 43 ++++++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/datapusher/jobs.py b/datapusher/jobs.py index 18bb01a9..63f539c9 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -66,11 +66,16 @@ if not SSL_VERIFY: requests.packages.urllib3.disable_warnings() +POSTGRES_INT_MAX = 2147483647 +POSTGRES_INT_MIN = -2147483648 +POSTGRES_BIGINT_MAX = 9223372036854775807 +POSTGRES_BIGINT_MIN = -9223372036854775808 + _TYPE_MAPPING = { 'String': 'text', - # 'int' may not be big enough, - # and type detection may not realize it needs to be big - 'Integer': 'numeric', + # smartint looks at the min/max values of qsv stats scan and + # chooses integer, bigint or numeric as appropriate + 'Integer': 'smartint', 'Float': 'numeric', 'DateTime': 'timestamp', 'Date': 'date', @@ -586,6 +591,8 @@ def push_to_datastore(task_id, input, dry_run=False): # run qsv stats to get data types and descriptive statistics headers = [] types = [] + headers_min = [] + headers_max = [] qsv_stats_csv = tempfile.NamedTemporaryFile(suffix='.csv') qsv_stats_cmd = [QSV_BIN, 'stats', tmp.name, '--output', qsv_stats_csv.name] @@ -605,11 +612,12 @@ def push_to_datastore(task_id, input, dry_run=False): 'Cannot infer data types and compile statistics: {}'.format(e) ) with open(qsv_stats_csv.name, mode='r') as inp: - reader = csv.reader(inp) - next(reader) # skip first element, which is a header - for rows in reader: - headers.append(rows[0]) - types.append(rows[1]) + reader = csv.DictReader(inp) + for row in reader: + headers.append(row['field']) + types.append(row['type']) + headers_min.append(row['min']) + headers_max.append(row['max']) existing = datastore_resource_exists(resource_id, api_key, ckan_url) existing_info = None @@ -621,7 +629,7 @@ def push_to_datastore(task_id, input, dry_run=False): if existing_info: types = [{ 'text': 'String', - 'numeric': 'Decimal', + 'numeric': 'Float', 'timestamp': 'DateTime', }.get(existing_info.get(h, {}).get('type_override'), t) for t, h in zip(types, headers)] @@ -636,8 +644,21 @@ def push_to_datastore(task_id, input, dry_run=False): res_id=resource_id)) delete_datastore_resource(resource_id, api_key, ckan_url) - headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])]) - for field in zip(headers, types)] + # 1st pass of building headers_dict + temp_headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])]) + for field in zip(headers, types)] + + # 2nd pass header_dicts, checking for smartint types + headers_dicts = [] + for idx, header in enumerate(temp_headers_dicts): + if header['type'] == 'smartint': + if headers_max[idx] <= POSTGRES_INT_MAX and headers_min[idx] >= POSTGRES_INT_MIN: + header_type = 'integer' + elif headers_max[idx] <= POSTGRES_BIGINT_MAX and headers_min[idx] >= POSTGRES_BIGINT_MIN: + header_type = 'bigint' + else: + header_type = 'numeric' + headers_dicts.append(dict(id=header['id'], type=header_type)) # Maintain data dictionaries from matching column names if existing_info: From 6fdd1b842cf15e86b19e65bb56608da5444bf044 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Tue, 10 May 2022 16:42:12 -0400 Subject: [PATCH 2/2] smartint implemented --- datapusher/jobs.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datapusher/jobs.py b/datapusher/jobs.py index 63f539c9..3eda299f 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -652,12 +652,14 @@ def push_to_datastore(task_id, input, dry_run=False): headers_dicts = [] for idx, header in enumerate(temp_headers_dicts): if header['type'] == 'smartint': - if headers_max[idx] <= POSTGRES_INT_MAX and headers_min[idx] >= POSTGRES_INT_MIN: + if int(headers_max[idx]) <= POSTGRES_INT_MAX and int(headers_min[idx]) >= POSTGRES_INT_MIN: header_type = 'integer' - elif headers_max[idx] <= POSTGRES_BIGINT_MAX and headers_min[idx] >= POSTGRES_BIGINT_MIN: + elif int(headers_max[idx]) <= POSTGRES_BIGINT_MAX and int(headers_min[idx]) >= POSTGRES_BIGINT_MIN: header_type = 'bigint' else: header_type = 'numeric' + else: + header_type = header['type'] headers_dicts.append(dict(id=header['id'], type=header_type)) # Maintain data dictionaries from matching column names