diff --git a/dvc.yaml b/dvc.yaml index 47d08b14..fc299acd 100644 --- a/dvc.yaml +++ b/dvc.yaml @@ -125,6 +125,14 @@ stages: outs: - loom/samples + + loom-featur-encoding: + cmd: ./bin/loom python -m loom cat loom/ingest/encoding.json.gz > data/loom-feature-encoding.json + deps: + - loom/ingest + outs: + - data/loom-feature-encoding.json + loom-dump-metadata: cmd: > mkdir -p data/cgpm/raw && @@ -132,11 +140,13 @@ stages: parallel ${parallel.flags} ./bin/loom python scripts/loom_dump.py {} --output data/cgpm/raw/{/}.json + --data data/numericalized.csv params: - parallel.flags deps: - loom/samples - scripts/loom_dump.py + - data/loom-feature-encoding.json outs: - data/cgpm/raw @@ -179,8 +189,8 @@ stages: sort | parallel ${parallel.flags} 'python scripts/cgpm_infer.py {} - --kernel alpha - --kernel view_alphas + --kernel structure_hypers + --kernel view_structure_hypers --kernel column_hypers --kernel rows --kernel columns @@ -553,7 +563,7 @@ stages: do: cmd: > mkdir -p data/discretized/ && - python scripts/discretize.py + nix develop github:InferenceQL/lpm.discretize -c python scripts/discretize.py --real data/ignored.csv --synthetic data/${item} --schema data/loom-schema.json @@ -572,7 +582,7 @@ stages: do: cmd: > mkdir -p data/fidelity/ && - assess-distance + nix develop github:InferenceQL/lpm.fidelity -c assess-distance --data-1 data/discretized/ignored.csv --data-2 data/discretized/${item} --bivariate diff --git a/pyproject.toml b/pyproject.toml index d789fddb..3699c4d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,15 +8,14 @@ readme = "README.md" [tool.poetry.dependencies] python = "~3.10.12" -cgpm = {git = "https://github.com/probcomp/cgpm.git", branch = "zane/hardcode-version"} +cgpm = {git = "https://github.com/probcomp/cgpm.git", rev = "schaechtle/loom-compatible-parametrization"} dvc = "^3.13.3" sppl = {git = "https://github.com/probsys/sppl.git"} edn-format = "^0.7.5" jsonschema = "^4.19.0" catboost = "^1.2" scikit-learn = "^1.3.0" -pytest = "^6.2" -dvc-s3= "^2.23" +pytest = "^6.2.1" [tool.poetry.group.dev.dependencies] black = "^23.7.0" diff --git a/scripts/ast_export.py b/scripts/ast_export.py index bfecff6c..934bd91a 100644 --- a/scripts/ast_export.py +++ b/scripts/ast_export.py @@ -20,7 +20,7 @@ def read_metadata(f): # as lists of pairs. When deserializing them we need to do the conversion # in the other direciton. - for k in ["view_alphas", "Zv", "Zrv"]: + for k in ["view_structure_hypers", "Zv", "Zrv"]: if k in metadata: metadata[k] = dict(metadata[k]) @@ -86,11 +86,11 @@ def export_primitive(output, cctype, hypers, suffstats, distargs, categorical_ma elif cctype == "categorical": k = distargs["k"] - alpha = hypers["alpha"] + alphas = [hypers[f"alpha_{i}"] for i in range(len(hypers))] N = suffstats["N"] counts = suffstats["counts"] # Compute the distribution. - weights = [alpha + counts[i] for i in range(k)] + weights = [alphas[i] + counts[i] for i in range(k)] norm = sum(weights) if categorical_mapping is None: keys = map(str, range(k)) @@ -107,20 +107,29 @@ def export_primitive(output, cctype, hypers, suffstats, distargs, categorical_ma elif cctype == "crp": alpha = hypers["alpha"] + discount = hypers["discount"] N = suffstats["N"] counts = dict(suffstats["counts"]) assert 1 <= len(counts) # Compute the distribution. tables = sorted(counts) - weights = [counts[t] for t in tables] + [alpha] - norm = sum(weights) + + total_count = counts.values() + logdenom = math.log(total_count + alpha) + new_table_prob = math.log(len(counts) * discount + alpha) - logdenom + probs = [new_table_prob - math.log(m) if count==0 else + log(count - self.discount) - logdenom + for count in counts.values() + ] + weight = probs + [new_table_prob] + keys = map(str, range(len(weights))) dist = { edn_format.Keyword("distribution/type"): edn_format.Keyword( "distribution.type/categorical" ), edn_format.Keyword("categorical/category->weight"): { - key: weight / norm for key, weight in zip(keys, weights) + key: math.exp(weight) for key, weight in zip(keys, weights) }, } @@ -247,7 +256,7 @@ def args(output, z): def export_view(metadata, categorical_mapping, variable_mapping): # Compute the CRP cluster weights using Zr and alpha. Zr = metadata["Zr"] - alpha = metadata["alpha"] + alpha = metadata["structure_hypers"]["alpha"] counts = Counter(Zr) tables_existing = sorted(set(Zr)) table_aux = max(tables_existing) + 1 if (len(tables_existing) > 0) else 0 @@ -324,7 +333,7 @@ def invert(d): "distargs": metadata["distargs"], "suffstats": metadata["suffstats"], "Zr": metadata["Zrv"][view_idx], - "alpha": metadata["view_alphas"][view_idx], + "structure_hypers": metadata["view_structure_hypers"][view_idx], } view = export_view(metadata_view, category_mapping, variable_mapping) views.append({edn_format.Keyword("view/clusters"): view}) diff --git a/scripts/cgpm_hydrate.py b/scripts/cgpm_hydrate.py index 258b2f24..cee5fd27 100644 --- a/scripts/cgpm_hydrate.py +++ b/scripts/cgpm_hydrate.py @@ -71,6 +71,7 @@ def distarg(column): base_metadata = dict( X=df.values, cctypes=cctypes, distargs=distargs, outputs=range(df.shape[1]) ) + base_metadata["Zv"] = {i: 0 for i in range(len(cctypes))} if args.model == "CrossCat": pass # don't constrain the model space further. elif args.model == "DPMM": diff --git a/scripts/cgpm_infer.py b/scripts/cgpm_infer.py index 602229e7..246d5513 100644 --- a/scripts/cgpm_infer.py +++ b/scripts/cgpm_infer.py @@ -45,7 +45,7 @@ def main(): action="append", type=str, help="Inference kernel.", - default=["alpha", "view_alphas", "column_hypers"], + default=["structure_hypers", "view_structure_hypers", "column_hypers"], metavar="KERNEL", dest="kernels", ) diff --git a/scripts/loom_dump.py b/scripts/loom_dump.py index 775e27c7..cd119b20 100644 --- a/scripts/loom_dump.py +++ b/scripts/loom_dump.py @@ -7,35 +7,85 @@ import loom.cFormat as cFormat import loom.schema_pb2 as schema_pb2 import os +import json import sys +import pandas as pd - -def loom_metadata(path): +def loom_metadata(path, data): model_in = os.path.join(path, "model.pb.gz") assign_in = os.path.join(path, "assign.pbs.gz") cross_cat = schema_pb2.CrossCat() - with stream.open_compressed(model_in, "rb") as f: cross_cat.ParseFromString(f.read()) - zv = list( - itertools.chain.from_iterable( - [ - [(loom_rank, k) for loom_rank in kind.featureids] - for k, kind in enumerate(cross_cat.kinds) - ] - ) - ) - num_kinds = len(cross_cat.kinds) - assignments = { - a.rowid: [a.groupids(k) for k in range(num_kinds)] - for a in cFormat.assignment_stream_load(assign_in) - } - rowids = sorted(assignments) - zrv = [ - [k, [assignments[rowid][k] for rowid in rowids]] for k in range(num_kinds) - ] - return {"Zv": zv, "Zrv": zrv, "hooked_cgpms": {}} + + # Map col name to index. + df = pd.read_csv(data) + col_mapping = {c:i for i,c in enumerate(df.columns)} + # Initialize hyper-parameter field. + hypers = [None for _ in df.columns] + + with open("data/loom-feature-encoding.json", "r") as f: + feature_encoding = json.load(f) + zv = list( + itertools.chain.from_iterable( + [ + [(loom_rank, k) for loom_rank in kind.featureids] + for k, kind in enumerate(cross_cat.kinds) + ] + ) + ) + num_kinds = len(cross_cat.kinds) + assignments = { + a.rowid: [a.groupids(k) for k in range(num_kinds)] + for a in cFormat.assignment_stream_load(assign_in) + } + rowids = sorted(assignments) + zrv = [ + [k, [assignments[rowid][k] for rowid in rowids]] for k in range(num_kinds) + ] + # From Loom docs: "'topology' hyperparameters for the Pitman-Yor categorization of features". + structure_hypers = { + "alpha": cross_cat.topology.alpha, + "discount": cross_cat.topology.d + } + view_structure_hypers = [] + schema = {} + # From Loom docs: "'clustering' hyperparameters for the Pitman-Yor + # categorization of rows". + for k, kind in enumerate(cross_cat.kinds): + # Keep track of ordering in different places. + dd_counter = 0 + nich_counter = 0 + view_structure_hypers.append({ + "alpha": kind.product_model.clustering.alpha, + "discount": kind.product_model.clustering.d + }) + for feature_id in kind.featureids: + # XXX: currently only deals with categoricals (dd) and normals + # (nich). + col_name = feature_encoding[feature_id]["name"] + col_type = feature_encoding[feature_id]["model"] + if col_type == "dd": + alphas = kind.product_model.dd[dd_counter].alphas + assert len(alphas) == len(feature_encoding[feature_id]["symbols"]), \ + "categories mapped incorrectly" + hypers[col_mapping[col_name]] = {"alpha_{i}".format(i=i):alpha for i, alpha in enumerate(alphas)} + dd_counter+=1 + if col_type == "nich": + m = kind.product_model.nich[nich_counter].mu + r = kind.product_model.nich[nich_counter].kappa + s = kind.product_model.nich[nich_counter].sigmasq + nu = kind.product_model.nich[nich_counter].nu + hypers[col_mapping[col_name]] = {"m":m, "r":r, "s":s, "nu":nu} + nich_counter+=1 + return { + "Zv": zv, + "Zrv": zrv, + "structure_hypers": structure_hypers, + "view_structure_hypers": view_structure_hypers, + "hypers":hypers, + "hooked_cgpms": {}} def dir_path(string): @@ -51,6 +101,9 @@ def main(): parser = argparse.ArgumentParser(description=description) parser.add_argument("sample", type=dir_path, help="Path to Loom sample directory.") + parser.add_argument( + "--data", type=argparse.FileType("r"), help="Path to numericalized CSV." + ) parser.add_argument( "-o", "--output", @@ -65,7 +118,7 @@ def main(): parser.print_help(sys.stderr) sys.exit(1) - metadata = loom_metadata(args.sample) + metadata = loom_metadata(args.sample, args.data) json.dump(metadata, args.output) diff --git a/scripts/loom_read.py b/scripts/loom_read.py new file mode 100644 index 00000000..8b015b58 --- /dev/null +++ b/scripts/loom_read.py @@ -0,0 +1,75 @@ +from distributions.io.stream import ( + open_compressed, protobuf_stream_load) +from loom.cFormat import assignment_stream_load +from loom.util import get_message, protobuf_to_dict +from parsable import parsable +import errno +import json +import os + +@parsable +def loom_to_json(filename): + ''' Convert a loom file to a json file. ''' + name = filename.split("/")[-1].split(".")[0] + output_dir = "/".join(filename.split("/")[:-1]) + output_dir = output_dir.replace("/samples", "/samples_json") + output_filename = output_dir + "/{}.json".format(name) + + try: + os.makedirs(output_dir) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + parts = os.path.basename(filename).split('.') + if parts[-1] in ['gz', 'bz2']: + parts.pop() + + try: + if parts[0] == "assign": + message_dict = parse_assign(filename) + elif parts[-1] == "pb": + message_dict = parse_pb(filename) + elif parts[-1] == "pbs": + message_dict = parse_pbs(filename) + else: + raise ValueError('Unknown protocol: {}'.format(filename)) + + except AssertionError: + # empty message + return + + with open(output_filename, "w") as f: + json.dump(message_dict, f, indent=4) + +def parse_assign(filename): + stream = assignment_stream_load(filename) + + assignments = { + a.rowid: [a.groupids(k) for k in xrange(a.groupids_size())] + for a in stream + } + rowids = sorted(assignments) + n_k = len(next(iter(assignments.values()))) + return { + k: [assignments[rowid][k] for rowid in rowids] + for k in xrange(n_k) + } + + +def parse_pb(filename): + message = get_message(filename) + with open_compressed(filename) as f: + message.ParseFromString(f.read()) + return protobuf_to_dict(message) + +def parse_pbs(filename): + message = get_message(filename) + string_stream = [s for s in protobuf_stream_load(filename)] + string = ''.join(string_stream) + message.ParseFromString(string) + return protobuf_to_dict(message) + + +if __name__ == "__main__": + parsable() diff --git a/scripts/loom_sample.py b/scripts/loom_sample.py new file mode 100644 index 00000000..b4576915 --- /dev/null +++ b/scripts/loom_sample.py @@ -0,0 +1,66 @@ +import loom.tasks +import pandas as pd +from StringIO import StringIO +import argparse +import json + +def synthetic_sample_loom(num_samples): + server = loom.tasks.query("loom") + + row = {name: "" for name in server.feature_names} + csv_headers = map(str, row.iterkeys()) + csv_values = map(str, row.itervalues()) + + # Prepare streams for the server. + outfile = StringIO() + writer = loom.preql.CsvWriter(outfile, returns=outfile.getvalue) + reader = iter([csv_headers] + [csv_values]) + + # Obtain the prediction. + server._predict(reader, num_samples, writer, False) + output_csv = writer.result() + + csvStringIO = StringIO(output_csv) + + df = pd.read_csv(csvStringIO) + return df + +def main(): + description = "Write a Loom synthetic data to disk" + parser = argparse.ArgumentParser(description=description) + + parser.add_argument( + "--sample_count", + type=int, + nargs="?", + default=None, + help="Number of joint simulations for QC", + ) + parser.add_argument( + "-o", + "--output", + type=str, + help="File to write samples to.", + ) + parser.add_argument( + "-i", + "--inv-mapping-table", + type=str, + help="Path to inverted mapping table in JSON", + default=None, + ) + + args = parser.parse_args() + + with open(args.inv_mapping_table, "r") as f: + inv_mapping_table = json.load(f) + + df = synthetic_sample_loom(args.sample_count) + for c, mapping in inv_mapping_table.items(): + new_c = [mapping.get(str(v),None) for v in df[c]] + df[c] = new_c + df.to_csv(args.output, index=False) + + +if __name__ == "__main__": + main()