1- from dbt . contracts . graph . unparsed import UnparsedNode
1+
22from dbt .contracts .graph .parsed import ParsedArchiveNode
33from dbt .node_types import NodeType
4- from dbt .parser .base import MacrosKnownParser
54from dbt .parser .base_sql import BaseSqlParser , SQLParseResult
6- from dbt .adapters .factory import get_adapter
75import dbt .clients .jinja
86import dbt .exceptions
97import dbt .utils
108
11- import os
12-
139
1410def set_archive_attributes (node ):
1511 config_keys = {
@@ -24,93 +20,6 @@ def set_archive_attributes(node):
2420 return node
2521
2622
27- class ArchiveParser (MacrosKnownParser ):
28- @classmethod
29- def parse_archives_from_project (cls , config ):
30- archives = []
31- archive_configs = config .archive
32-
33- for archive_config in archive_configs :
34- tables = archive_config .get ('tables' )
35-
36- if tables is None :
37- continue
38-
39- for table in tables :
40- cfg = table .copy ()
41- source_database = archive_config .get (
42- 'source_database' ,
43- config .credentials .database
44- )
45- cfg ['target_database' ] = archive_config .get (
46- 'target_database' ,
47- config .credentials .database
48- )
49-
50- source_schema = archive_config ['source_schema' ]
51- cfg ['target_schema' ] = archive_config .get ('target_schema' )
52- # project-defined archives always use the 'timestamp' strategy.
53- cfg ['strategy' ] = 'timestamp'
54-
55- fake_path = [cfg ['target_database' ], cfg ['target_schema' ],
56- cfg ['target_table' ]]
57-
58- relation = get_adapter (config ).Relation .create (
59- database = source_database ,
60- schema = source_schema ,
61- identifier = table ['source_table' ],
62- type = 'table'
63- )
64-
65- raw_sql = '{{ config(materialized="archive") }}' + \
66- 'select * from {!s}' .format (relation )
67-
68- archives .append ({
69- 'name' : table .get ('target_table' ),
70- 'root_path' : config .project_root ,
71- 'resource_type' : NodeType .Archive ,
72- 'path' : os .path .join ('archive' , * fake_path ),
73- 'original_file_path' : 'dbt_project.yml' ,
74- 'package_name' : config .project_name ,
75- 'config' : cfg ,
76- 'raw_sql' : raw_sql
77- })
78-
79- return archives
80-
81- def load_and_parse (self ):
82- """Load and parse archives in a list of projects. Returns a dict
83- that maps unique ids onto ParsedNodes"""
84-
85- archives = []
86- to_return = {}
87-
88- for name , project in self .all_projects .items ():
89- archives = archives + self .parse_archives_from_project (project )
90-
91- # We're going to have a similar issue with parsed nodes, if we want to
92- # make parse_node return those.
93- for a in archives :
94- # archives have a config, but that would make for an invalid
95- # UnparsedNode, so remove it and pass it along to parse_node as an
96- # argument.
97- archive_config = a .pop ('config' )
98- archive = UnparsedNode (** a )
99- node_path = self .get_path (archive .resource_type ,
100- archive .package_name ,
101- archive .name )
102-
103- parsed_node = self .parse_node (
104- archive ,
105- node_path ,
106- self .all_projects .get (archive .package_name ),
107- archive_config = archive_config )
108-
109- to_return [node_path ] = set_archive_attributes (parsed_node )
110-
111- return to_return
112-
113-
11423class ArchiveBlockParser (BaseSqlParser ):
11524 def parse_archives_from_file (self , file_node , tags = None ):
11625 # the file node has a 'raw_sql' field that contains the jinja data with
0 commit comments