-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjson-to-csv.py
More file actions
130 lines (108 loc) · 4.17 KB
/
json-to-csv.py
File metadata and controls
130 lines (108 loc) · 4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3
"""
JSON to CSV Converter — Flatten nested JSON to CSV.
Usage:
python json-to-csv.py <input.json> [--output output.csv]
python json-to-csv.py <input.json> --flatten
Options:
--output FILE Output CSV file (default: <input>.csv)
--flatten Flatten nested objects with dot notation (default: True)
--delimiter CHAR CSV delimiter (default: ",")
--encoding ENC File encoding (default: utf-8)
--help Show this help message and exit
"""
import os
import sys
import csv
import json
import argparse
from pathlib import Path
from collections.abc import Mapping
def flatten_dict(d: Mapping, parent_key: str = "", sep: str = ".") -> dict:
"""Recursively flatten a nested dict into dot-notation keys."""
items: list[tuple[str, object]] = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, Mapping):
items.extend(flatten_dict(v, new_key, sep=sep).items())
elif isinstance(v, list):
# Convert list to JSON string representation
items.append((new_key, json.dumps(v, ensure_ascii=False)))
else:
items.append((new_key, v))
return dict(items)
def json_to_csv(input_path: Path, output_path: Path, flatten: bool,
delimiter: str, encoding: str) -> int:
"""Convert JSON file to CSV. Returns number of rows."""
try:
with open(input_path, "r", encoding=encoding) as f:
data = json.load(f)
except FileNotFoundError:
print(f"Error: File '{input_path}' not found.")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON: {e}")
sys.exit(1)
except Exception as e:
print(f"Error reading JSON: {e}")
sys.exit(1)
# Ensure data is a list
if isinstance(data, Mapping):
data = [data]
elif not isinstance(data, list):
print("Error: JSON root must be an object or an array of objects.")
sys.exit(1)
if not data:
print("Warning: JSON data is empty.")
return 0
# Flatten if requested
if flatten:
rows = [flatten_dict(row) for row in data]
else:
rows = data # type: ignore[assignment]
# Collect all unique keys in order
all_keys: list[str] = []
seen_keys: set[str] = set()
for row in rows:
for k in row.keys():
if k not in seen_keys:
all_keys.append(k)
seen_keys.add(k)
try:
with open(output_path, "w", encoding=encoding, newline="") as f:
writer = csv.DictWriter(f, fieldnames=all_keys, delimiter=delimiter)
writer.writeheader()
for row in rows:
writer.writerow(row)
except Exception as e:
print(f"Error writing CSV: {e}")
sys.exit(1)
return len(rows)
def main():
parser = argparse.ArgumentParser(
description="Flatten nested JSON to CSV.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
" python json-to-csv.py data.json\n"
" python json-to-csv.py data.json --output table.csv\n"
" python json-to-csv.py data.json --delimiter ';'\n"
),
)
parser.add_argument("input", help="Input JSON file")
parser.add_argument("--output", help="Output CSV file (default: <input>.csv)")
parser.add_argument("--flatten", action="store_true", default=True,
help="Flatten nested objects (default: True)")
parser.add_argument("--delimiter", default=",", help="CSV delimiter (default: ',')")
parser.add_argument("--encoding", default="utf-8", help="File encoding (default: utf-8)")
args = parser.parse_args()
input_path = Path(args.input)
if not input_path.is_file():
print(f"Error: '{args.input}' is not a valid file.")
sys.exit(1)
output_path = Path(args.output) if args.output else input_path.with_suffix(".csv")
rows = json_to_csv(input_path, output_path, args.flatten,
args.delimiter, args.encoding)
print(f"Converted {rows} row(s) → {output_path}")
if __name__ == "__main__":
main()