Skip to content

Commit 245af97

Browse files
authored
Merge pull request data-integrations#112 from data-integrations/PLUGIN-1515-fix-http-sink-1-2
PLUGIN-1515 fix http sink when batch size > 1
2 parents e72fde3 + d437f8c commit 245af97

File tree

6 files changed

+321
-207
lines changed

6 files changed

+321
-207
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<name>HTTP Plugins</name>
2222
<groupId>io.cdap</groupId>
2323
<artifactId>http-plugins</artifactId>
24-
<version>1.2.5</version>
24+
<version>1.2.6</version>
2525

2626
<licenses>
2727
<license>
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright © 2023 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.http.sink.batch;
18+
19+
import com.google.gson.Gson;
20+
import io.cdap.cdap.api.data.format.StructuredRecord;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.mapreduce.JobContext;
23+
import org.apache.hadoop.mapreduce.OutputCommitter;
24+
import org.apache.hadoop.mapreduce.OutputFormat;
25+
import org.apache.hadoop.mapreduce.RecordWriter;
26+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
27+
28+
/**
29+
* OutputFormat for HTTP writing
30+
*/
31+
public class HTTPOutputFormat extends OutputFormat<StructuredRecord, StructuredRecord> {
32+
private static final Gson GSON = new Gson();
33+
static final String CONFIG_KEY = "http.sink.config";
34+
35+
@Override
36+
public RecordWriter<StructuredRecord, StructuredRecord> getRecordWriter(TaskAttemptContext context) {
37+
Configuration hConf = context.getConfiguration();
38+
HTTPSinkConfig config = GSON.fromJson(hConf.get(CONFIG_KEY), HTTPSinkConfig.class);
39+
return new HTTPRecordWriter(config);
40+
}
41+
42+
@Override
43+
public void checkOutputSpecs(JobContext jobContext) {
44+
45+
}
46+
47+
@Override
48+
public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
49+
return new OutputCommitter() {
50+
@Override
51+
public void setupJob(JobContext jobContext) {
52+
53+
}
54+
55+
@Override
56+
public void setupTask(TaskAttemptContext taskAttemptContext) {
57+
58+
}
59+
60+
@Override
61+
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
62+
return false;
63+
}
64+
65+
@Override
66+
public void commitTask(TaskAttemptContext taskAttemptContext) {
67+
68+
}
69+
70+
@Override
71+
public void abortTask(TaskAttemptContext taskAttemptContext) {
72+
73+
}
74+
};
75+
}
76+
}
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/*
2+
* Copyright © 2023 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.http.sink.batch;
18+
19+
import io.cdap.cdap.api.data.format.StructuredRecord;
20+
import io.cdap.cdap.api.data.schema.Schema;
21+
import io.cdap.cdap.format.StructuredRecordStringConverter;
22+
import org.apache.hadoop.mapreduce.RecordWriter;
23+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import java.io.IOException;
28+
import java.io.OutputStream;
29+
import java.io.UnsupportedEncodingException;
30+
import java.net.HttpURLConnection;
31+
import java.net.MalformedURLException;
32+
import java.net.ProtocolException;
33+
import java.net.URL;
34+
import java.net.URLEncoder;
35+
import java.security.KeyManagementException;
36+
import java.security.NoSuchAlgorithmException;
37+
import java.security.cert.X509Certificate;
38+
import java.util.HashMap;
39+
import java.util.Map;
40+
import java.util.StringTokenizer;
41+
import java.util.regex.Matcher;
42+
import java.util.regex.Pattern;
43+
import javax.net.ssl.HostnameVerifier;
44+
import javax.net.ssl.HttpsURLConnection;
45+
import javax.net.ssl.SSLContext;
46+
import javax.net.ssl.SSLSession;
47+
import javax.net.ssl.TrustManager;
48+
import javax.net.ssl.X509TrustManager;
49+
50+
/**
51+
* RecordWriter for HTTP.
52+
*/
53+
public class HTTPRecordWriter extends RecordWriter<StructuredRecord, StructuredRecord> {
54+
private static final Logger LOG = LoggerFactory.getLogger(HTTPRecordWriter.class);
55+
private static final String REGEX_HASHED_VAR = "#s*(\\w+)";
56+
57+
private final HTTPSinkConfig config;
58+
private StringBuilder messages = new StringBuilder();
59+
private String contentType;
60+
61+
HTTPRecordWriter(HTTPSinkConfig config) {
62+
this.config = config;
63+
}
64+
65+
@Override
66+
public void write(StructuredRecord input, StructuredRecord unused) throws IOException {
67+
String message = null;
68+
if (config.getMethod().equals("POST") || config.getMethod().equals("PUT")) {
69+
if (config.getMessageFormat().equals("JSON")) {
70+
message = StructuredRecordStringConverter.toJsonString(input);
71+
contentType = "application/json";
72+
} else if (config.getMessageFormat().equals("Form")) {
73+
message = createFormMessage(input);
74+
contentType = " application/x-www-form-urlencoded";
75+
} else if (config.getMessageFormat().equals("Custom")) {
76+
message = createCustomMessage(config.getBody(), input);
77+
contentType = " text/plain";
78+
}
79+
messages.append(message).append(config.getDelimiterForMessages());
80+
}
81+
StringTokenizer tokens = new StringTokenizer(messages.toString().trim(), config.getDelimiterForMessages());
82+
if (config.getBatchSize() == 1 || tokens.countTokens() == config.getBatchSize()) {
83+
executeHTTPService();
84+
}
85+
}
86+
87+
@Override
88+
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
89+
// Process remaining messages after batch executions.
90+
if (!messages.toString().isEmpty()) {
91+
try {
92+
executeHTTPService();
93+
} catch (Exception e) {
94+
throw new RuntimeException("Error while executing http request for remaining input messages " +
95+
"after the batch execution. " + e);
96+
}
97+
}
98+
}
99+
100+
private void executeHTTPService() throws IOException {
101+
int responseCode;
102+
int retries = 0;
103+
IOException exception = null;
104+
do {
105+
HttpURLConnection conn = null;
106+
Map<String, String> headers = config.getRequestHeadersMap();
107+
try {
108+
URL url = new URL(config.getUrl());
109+
conn = (HttpURLConnection) url.openConnection();
110+
if (conn instanceof HttpsURLConnection) {
111+
//Disable SSLv3
112+
System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2");
113+
if (config.getDisableSSLValidation()) {
114+
disableSSLValidation();
115+
}
116+
}
117+
conn.setRequestMethod(config.getMethod().toUpperCase());
118+
conn.setConnectTimeout(config.getConnectTimeout());
119+
conn.setReadTimeout(config.getReadTimeout());
120+
conn.setInstanceFollowRedirects(config.getFollowRedirects());
121+
conn.addRequestProperty("charset", config.getCharset());
122+
for (Map.Entry<String, String> propertyEntry : headers.entrySet()) {
123+
conn.addRequestProperty(propertyEntry.getKey(), propertyEntry.getValue());
124+
}
125+
//Default contentType value would be added in the request properties if user has not added in the headers.
126+
if (config.getMethod().equals("POST") || config.getMethod().equals("PUT")) {
127+
if (!headers.containsKey("Content-Type")) {
128+
conn.addRequestProperty("Content-Type", contentType);
129+
}
130+
}
131+
if (messages.length() > 0) {
132+
conn.setDoOutput(true);
133+
try (OutputStream outputStream = conn.getOutputStream()) {
134+
outputStream.write(messages.toString().trim().getBytes(config.getCharset()));
135+
}
136+
}
137+
responseCode = conn.getResponseCode();
138+
messages.setLength(0);
139+
if (config.getFailOnNon200Response() && !(responseCode >= 200 && responseCode < 300)) {
140+
exception = new IOException("Received error response. Response code: " + responseCode);
141+
}
142+
break;
143+
} catch (MalformedURLException | ProtocolException e) {
144+
throw new IllegalStateException("Error opening url connection. Reason: " + e.getMessage(), e);
145+
} catch (IOException e) {
146+
LOG.warn("Error making {} request to url {} with headers {}.", config.getMethod(), config.getMethod(), headers);
147+
exception = e;
148+
} finally {
149+
if (conn != null) {
150+
conn.disconnect();
151+
}
152+
}
153+
retries++;
154+
} while (retries < config.getNumRetries());
155+
if (exception != null) {
156+
throw exception;
157+
}
158+
}
159+
160+
private String createFormMessage(StructuredRecord input) {
161+
boolean first = true;
162+
String formMessage = null;
163+
StringBuilder sb = new StringBuilder("");
164+
for (Schema.Field field : input.getSchema().getFields()) {
165+
if (first) {
166+
first = false;
167+
} else {
168+
sb.append("&");
169+
}
170+
sb.append(field.getName());
171+
sb.append("=");
172+
sb.append((String) input.get(field.getName()));
173+
}
174+
try {
175+
formMessage = URLEncoder.encode(sb.toString(), config.getCharset());
176+
} catch (UnsupportedEncodingException e) {
177+
throw new IllegalStateException("Error encoding Form message. Reason: " + e.getMessage(), e);
178+
}
179+
return formMessage;
180+
}
181+
182+
private String createCustomMessage(String body, StructuredRecord input) {
183+
String customMessage = body;
184+
Matcher matcher = Pattern.compile(REGEX_HASHED_VAR).matcher(customMessage);
185+
HashMap<String, String> findReplaceMap = new HashMap();
186+
while (matcher.find()) {
187+
if (input.get(matcher.group(1)) != null) {
188+
findReplaceMap.put(matcher.group(1), (String) input.get(matcher.group(1)));
189+
} else {
190+
throw new IllegalArgumentException(String.format(
191+
"Field %s doesnt exist in the input schema.", matcher.group(1)));
192+
}
193+
}
194+
Matcher replaceMatcher = Pattern.compile(REGEX_HASHED_VAR).matcher(customMessage);
195+
while (replaceMatcher.find()) {
196+
String val = replaceMatcher.group().replace("#", "");
197+
customMessage = (customMessage.replace(replaceMatcher.group(), findReplaceMap.get(val)));
198+
}
199+
return customMessage;
200+
}
201+
202+
private void disableSSLValidation() {
203+
TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() {
204+
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
205+
return null;
206+
}
207+
208+
public void checkClientTrusted(X509Certificate[] certs, String authType) {
209+
}
210+
211+
public void checkServerTrusted(X509Certificate[] certs, String authType) {
212+
}
213+
}
214+
};
215+
SSLContext sslContext = null;
216+
try {
217+
sslContext = SSLContext.getInstance("SSL");
218+
sslContext.init(null, trustAllCerts, new java.security.SecureRandom());
219+
} catch (KeyManagementException | NoSuchAlgorithmException e) {
220+
throw new IllegalStateException("Error while installing the trust manager: " + e.getMessage(), e);
221+
}
222+
HttpsURLConnection.setDefaultSSLSocketFactory(sslContext.getSocketFactory());
223+
HostnameVerifier allHostsValid = new HostnameVerifier() {
224+
public boolean verify(String hostname, SSLSession session) {
225+
return true;
226+
}
227+
};
228+
HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
229+
}
230+
}

0 commit comments

Comments
 (0)