Skip to content

Commit 3626bae

Browse files
authored
Merge pull request cdapio#15986 from cdapio/execute_mutation_branch-2
[CDAP-21172] Implement CRUD Operations for Metadata Tables
2 parents 52b7e01 + de60c7c commit 3626bae

File tree

4 files changed

+693
-50
lines changed

4 files changed

+693
-50
lines changed
Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.metadata.spanner;
18+
19+
import io.cdap.cdap.api.data.schema.Schema;
20+
import io.cdap.cdap.api.data.schema.SchemaWalker;
21+
import io.cdap.cdap.api.metadata.MetadataEntity;
22+
import io.cdap.cdap.api.metadata.MetadataScope;
23+
import io.cdap.cdap.spi.metadata.Metadata;
24+
import io.cdap.cdap.spi.metadata.MetadataConstants;
25+
import io.cdap.cdap.spi.metadata.ScopedName;
26+
import java.io.IOException;
27+
import java.util.ArrayList;
28+
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.HashSet;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.Objects;
34+
import java.util.Optional;
35+
import java.util.Set;
36+
import java.util.stream.Collectors;
37+
import java.util.stream.Stream;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
41+
/**
42+
* A helper class to process and prepare metadata fields for optimized storage.
43+
* This class encapsulates the logic for flattening text fields, parsing schemas,
44+
* and extracting specific properties for relational storage.
45+
*/
46+
public class FormattedMetadata {
47+
48+
private static final Logger LOG = LoggerFactory.getLogger(FormattedMetadata.class);
49+
50+
// Fields directly mapped to metadata table columns
51+
private final String namespace;
52+
private final String type;
53+
private final String name;
54+
private final Long created;
55+
private final String userText;
56+
private final String systemText;
57+
private final Set<Property> metadataProps;
58+
59+
/**
60+
* Creates an instance of FormattedMetadata from a MetadataEntity and Metadata.
61+
* This is the public entry point for creating formatted metadata.
62+
*
63+
* @param entity the metadata entity being processed.
64+
* @param metadata the metadata to be associated with the entity.
65+
* @return a new instance of FormattedMetadata.
66+
* @throws IOException if schema parsing fails.
67+
*/
68+
public static FormattedMetadata from(MetadataEntity entity, Metadata metadata) throws IOException {
69+
return new FormattedMetadata(entity, metadata);
70+
}
71+
72+
private FormattedMetadata(MetadataEntity entity, Metadata metadata) throws IOException {
73+
this.namespace = entity.getValue("namespace");
74+
this.type = entity.getType().toLowerCase();
75+
this.name = Objects.requireNonNull(entity.getValue(entity.getType())).toLowerCase();
76+
77+
Map<ScopedName, String> properties = metadata.getProperties();
78+
String schemaJson = properties.get(new ScopedName(MetadataScope.SYSTEM, MetadataConstants.SCHEMA_KEY));
79+
Map<String, Set<Property>> schemaProperty = reformatSchemaProperty(schemaJson);
80+
Map<String, Set<Property>> reformatedProperties = reformatProperties(properties);
81+
Set<Property> reformatedPropertiesWithValue = reformatedProperties
82+
.getOrDefault("extractedProperties", Collections.emptySet());
83+
reformatedPropertiesWithValue.addAll(schemaProperty.getOrDefault("schemaAndFieldNames",
84+
Collections.emptySet()));
85+
Set<ScopedName> tags = metadata.getTags();
86+
Set<Property> reformatedTags = reformatTags(tags);
87+
88+
this.metadataProps = new HashSet<>();
89+
this.metadataProps.addAll(reformatedPropertiesWithValue);
90+
this.metadataProps.addAll(reformatedTags);
91+
this.metadataProps.add(new Property(MetadataScope.SYSTEM.name(), this.type, this.name));
92+
this.systemText = buildText(reformatedPropertiesWithValue, tags, MetadataScope.SYSTEM) + " " + type;
93+
this.metadataProps.addAll(reformatedProperties.getOrDefault("propertyNames", Collections.emptySet()));
94+
this.metadataProps.addAll(schemaProperty.getOrDefault("fieldProperties", Collections.emptySet()));
95+
this.userText = buildText(reformatedPropertiesWithValue, tags, MetadataScope.USER);
96+
this.created = parseCreationTime(reformatedPropertiesWithValue).orElse(null);
97+
}
98+
99+
public String getNamespace() {
100+
return namespace;
101+
}
102+
103+
public String getType() {
104+
return type;
105+
}
106+
107+
public String getName() {
108+
return name;
109+
}
110+
111+
public Optional<Long> getCreated() {
112+
return Optional.ofNullable(created);
113+
}
114+
115+
public String getUserText() {
116+
return userText;
117+
}
118+
119+
public String getSystemText() {
120+
return systemText;
121+
}
122+
123+
public Set<Property> getMetadataProps() {
124+
return metadataProps;
125+
}
126+
127+
/**
128+
* Processes properties and extract property names into a Set.
129+
*
130+
* @param properties The original properties from the metadata object.
131+
* @return A Set containing Property objects.
132+
*/
133+
private Map<String, Set<Property>> reformatProperties(Map<ScopedName, String> properties) {
134+
if (properties == null || properties.isEmpty()) {
135+
return Collections.emptyMap();
136+
}
137+
138+
Set<Property> extracted = new HashSet<>(properties.size());
139+
Set<String> propertyNames = new HashSet<>();
140+
for (Map.Entry<ScopedName, String> entry : properties.entrySet()) {
141+
ScopedName key = entry.getKey();
142+
String name = key.getName().toLowerCase();
143+
String scope = key.getScope().name();
144+
String value = entry.getValue().toLowerCase();
145+
146+
// If it's a schema key, reformat it.
147+
if (MetadataConstants.SCHEMA_KEY.equals(name)) {
148+
continue;
149+
}
150+
151+
extracted.add(new Property(scope, name, value));
152+
propertyNames.add(name);
153+
}
154+
155+
String allPropertiesValue = String.join(" ", propertyNames);
156+
Set<Property> propertyValue = new HashSet<>();
157+
propertyValue.add(new Property(MetadataScope.SYSTEM.name(),
158+
MetadataConstants.PROPERTIES_KEY, allPropertiesValue));
159+
160+
Map<String, Set<Property>> result = new HashMap<>();
161+
result.put("extractedProperties", extracted);
162+
result.put("propertyNames", propertyValue);
163+
164+
return result;
165+
166+
}
167+
168+
/**
169+
* Extracts tags into the set format.
170+
*/
171+
private Set<Property> reformatTags(Set<ScopedName> tags) {
172+
return tags.stream()
173+
.map(tag -> new Property(tag.getScope().name(), MetadataConstants.TAGS_KEY, tag.getName()))
174+
.collect(Collectors.toSet());
175+
}
176+
177+
/**
178+
* Builds a single, space-delimited string of text from properties and tags
179+
* for a given scope.
180+
*/
181+
private String buildText(Set<Property> properties, Set<ScopedName> tags, MetadataScope scope) {
182+
String scopeProperties = properties.stream()
183+
.filter(property -> Objects.equals(property.getScope(), scope.toString()))
184+
.map(property -> property.getValue().toLowerCase())
185+
.collect(Collectors.joining(" "));
186+
187+
String scopeTags = tags.stream()
188+
.filter(tag -> tag.getScope() == scope)
189+
.map(tag -> tag.getName().toLowerCase())
190+
.collect(Collectors.joining(" "));
191+
192+
return Stream.of(scopeProperties, scopeTags)
193+
.filter(s -> !s.isEmpty())
194+
.collect(Collectors.joining(" "));
195+
}
196+
197+
/**
198+
* Finds and parses the creation time from the processed properties.
199+
*/
200+
private Optional<Long> parseCreationTime(Set<Property> properties) {
201+
if (properties == null || properties.isEmpty()) {
202+
return Optional.empty();
203+
}
204+
205+
String expectedScope = MetadataScope.SYSTEM.name();
206+
String expectedName = MetadataConstants.CREATION_TIME_KEY;
207+
for (Property property : properties) {
208+
if (expectedScope.equalsIgnoreCase(property.getScope()) && expectedName.equalsIgnoreCase(property.getName())) {
209+
try {
210+
return Optional.of(Long.parseLong(property.getValue()));
211+
} catch (NumberFormatException e) {
212+
LOG.warn("Unable to parse property value '{}' as a long for the creation time key. Skipping.",
213+
property.getValue(), e);
214+
return Optional.empty();
215+
}
216+
}
217+
}
218+
219+
return Optional.empty();
220+
}
221+
222+
/**
223+
* Parses a schema JSON string into a concise, human-readable format.
224+
*
225+
* @param schemaStr The raw JSON string representing the schema.
226+
* @return A formatted string (e.g., "schemaname:TYPE field1:TYPE1 field2:TYPE2").
227+
* @throws IOException if the schema string cannot be parsed.
228+
*/
229+
private Map<String, Set<Property>> reformatSchemaProperty(String schemaStr) throws IOException {
230+
if (schemaStr == null) {
231+
return Collections.emptyMap();
232+
}
233+
234+
Set<Property> fieldProperties = new HashSet<>();
235+
Set<Property> schemaAndFieldNames = new HashSet<>();
236+
Schema schema = Schema.parseJson(schemaStr);
237+
238+
List<String> formattedFields = new ArrayList<>();
239+
SchemaWalker.walk(schema, (fieldName, fieldSchema) -> {
240+
if (fieldName != null) {
241+
Schema nonNullableSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
242+
String typeName = nonNullableSchema.getType().toString().toLowerCase();
243+
fieldProperties.add(new Property(
244+
MetadataScope.SYSTEM.name(),
245+
fieldName.toLowerCase(),
246+
typeName));
247+
formattedFields.add(fieldName.toLowerCase() + ":" + typeName.toLowerCase());
248+
}
249+
});
250+
251+
String schemaProperties = formattedFields.isEmpty() ? "" : String.join(" ", formattedFields);
252+
schemaAndFieldNames.add(new Property(
253+
MetadataScope.SYSTEM.name(),
254+
"schema",
255+
schemaProperties));
256+
257+
Map<String, Set<Property>> result = new HashMap<>();
258+
result.put("fieldProperties", fieldProperties);
259+
result.put("schemaAndFieldNames", schemaAndFieldNames);
260+
261+
return result;
262+
}
263+
264+
/**
265+
* Represents a property for the Spanner metadata_props table.
266+
*/
267+
static class Property {
268+
private final String scope;
269+
private final String name;
270+
private final String value;
271+
272+
Property(String scope, String name, String value) {
273+
this.scope = scope;
274+
this.name = name;
275+
this.value = value;
276+
}
277+
278+
public String getScope() {
279+
return scope;
280+
}
281+
282+
public String getName() {
283+
return name;
284+
}
285+
286+
public String getValue() {
287+
return value;
288+
}
289+
290+
/**
291+
* Checks if this Property is equal to another object.
292+
*/
293+
public boolean equals(Object o) {
294+
if (this == o) {
295+
return true;
296+
}
297+
if (o == null || getClass() != o.getClass()) {
298+
return false;
299+
}
300+
301+
Property property = (Property) o;
302+
return Objects.equals(scope, property.scope)
303+
&& Objects.equals(name, property.name)
304+
&& Objects.equals(value, property.value);
305+
}
306+
307+
public int hashCode() {
308+
return Objects.hash(scope, name, value);
309+
}
310+
311+
public String toString() {
312+
return scope + ':' + name + ':' + value;
313+
}
314+
}
315+
}
316+

0 commit comments

Comments
 (0)