org.slf4j
diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/loader/DTClassLoader.java b/flinkx-core/src/main/java/com/dtstack/flinkx/loader/DTClassLoader.java
index 1f8b8be494..f91b4fa48a 100644
--- a/flinkx-core/src/main/java/com/dtstack/flinkx/loader/DTClassLoader.java
+++ b/flinkx-core/src/main/java/com/dtstack/flinkx/loader/DTClassLoader.java
@@ -74,8 +74,9 @@ protected Class> loadClass(String name, boolean resolve) throws ClassNotFoundE
try {
clazz = findClass(name);
if (clazz != null) {
- if (resolve)
+ if (resolve){
resolveClass(clazz);
+ }
return (clazz);
}
} catch (ClassNotFoundException e) {
@@ -86,8 +87,9 @@ protected Class> loadClass(String name, boolean resolve) throws ClassNotFoundE
try {
clazz = Class.forName(name, false, parent);
if (clazz != null) {
- if (resolve)
+ if (resolve){
resolveClass(clazz);
+ }
return (clazz);
}
} catch (ClassNotFoundException e) {
diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java b/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java
index 7fc84d08bc..15dfca358a 100644
--- a/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java
+++ b/flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/RichOutputFormat.java
@@ -307,6 +307,7 @@ public void close() throws IOException {
}
if(errorLimiter != null) {
+ errorLimiter.acquire();
errorLimiter.stop();
}
diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/plugin/PluginLoader.java b/flinkx-core/src/main/java/com/dtstack/flinkx/plugin/PluginLoader.java
index 8a3064ec24..f7b7c0704c 100644
--- a/flinkx-core/src/main/java/com/dtstack/flinkx/plugin/PluginLoader.java
+++ b/flinkx-core/src/main/java/com/dtstack/flinkx/plugin/PluginLoader.java
@@ -22,7 +22,6 @@
import com.dtstack.flinkx.util.SysUtil;
import org.apache.flink.util.Preconditions;
import java.io.File;
-import java.io.FilenameFilter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/util/SysUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/util/SysUtil.java
index 6c389008fa..4988b304d9 100644
--- a/flinkx-core/src/main/java/com/dtstack/flinkx/util/SysUtil.java
+++ b/flinkx-core/src/main/java/com/dtstack/flinkx/util/SysUtil.java
@@ -18,8 +18,6 @@
package com.dtstack.flinkx.util;
-import com.google.common.base.Preconditions;
-
import java.io.File;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
diff --git a/flinkx-core/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flinkx-core/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
new file mode 100644
index 0000000000..301375d82c
--- /dev/null
+++ b/flinkx-core/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -0,0 +1,1255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.OperatorInformation;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.io.IteratorInputFormat;
+import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
+import org.apache.flink.api.java.io.PrimitiveInputFormat;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.io.TextValueInputFormat;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.Operator;
+import org.apache.flink.api.java.operators.OperatorTranslation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.NumberSequenceIterator;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SplittableIterator;
+import org.apache.flink.util.Visitor;
+
+import com.esotericsoftware.kryo.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The ExecutionEnvironment is the context in which a program is executed. A
+ * {@link LocalEnvironment} will cause execution in the current JVM, a
+ * {@link RemoteEnvironment} will cause execution on a remote setup.
+ *
+ * The environment provides methods to control the job execution (such as setting the parallelism)
+ * and to interact with the outside world (data access).
+ *
+ *
Please note that the execution environment needs strong type information for the input and return types
+ * of all operations that are executed. This means that the environments needs to know that the return
+ * value of an operation is for example a Tuple of String and Integer.
+ * Because the Java compiler throws much of the generic type information away, most methods attempt to re-
+ * obtain that information using reflection. In certain cases, it may be necessary to manually supply that
+ * information to some of the methods.
+ *
+ * @see LocalEnvironment
+ * @see RemoteEnvironment
+ */
+@Public
+public abstract class ExecutionEnvironment {
+
+ /** The logger used by the environment and its subclasses. */
+ protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
+
+ /** The environment of the context (local by default, cluster if invoked through command line). */
+ private static ThreadLocal contextEnvironmentFactory = new ThreadLocal<>();
+
+ /** The default parallelism used by local environments. */
+ private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
+
+ // --------------------------------------------------------------------------------------------
+
+ private final List> sinks = new ArrayList<>();
+
+ private final List> cacheFile = new ArrayList<>();
+
+ private final ExecutionConfig config = new ExecutionConfig();
+
+ /** Result from the latest execution, to make it retrievable when using eager execution methods. */
+ protected JobExecutionResult lastJobExecutionResult;
+
+ /** The ID of the session, defined by this execution environment. Sessions and Jobs are same in
+ * Flink, as Jobs can consist of multiple parts that are attached to the growing dataflow graph. */
+ protected JobID jobID;
+
+ /** The session timeout in seconds. */
+ protected long sessionTimeout;
+
+ /** Flag to indicate whether sinks have been cleared in previous executions. */
+ private boolean wasExecuted = false;
+
+ /**
+ * Creates a new Execution Environment.
+ */
+ protected ExecutionEnvironment() {
+ jobID = JobID.generate();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Properties
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the config object that defines execution parameters.
+ *
+ * @return The environment's execution configuration.
+ */
+ public ExecutionConfig getConfig() {
+ return config;
+ }
+
+ /**
+ * Gets the parallelism with which operation are executed by default. Operations can
+ * individually override this value to use a specific parallelism via
+ * {@link Operator#setParallelism(int)}. Other operations may need to run with a different
+ * parallelism - for example calling
+ * {@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the entire
+ * set will insert eventually an operation that runs non-parallel (parallelism of one).
+ *
+ * @return The parallelism used by operations, unless they override that value. This method
+ * returns {@link ExecutionConfig#PARALLELISM_DEFAULT}, if the environment's default parallelism should be used.
+ */
+ public int getParallelism() {
+ return config.getParallelism();
+ }
+
+ /**
+ * Sets the parallelism for operations executed through this environment.
+ * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
+ * x parallel instances.
+ *
+ * This method overrides the default parallelism for this environment.
+ * The {@link LocalEnvironment} uses by default a value equal to the number of hardware
+ * contexts (CPU cores / threads). When executing the program via the command line client
+ * from a JAR file, the default parallelism is the one configured for that setup.
+ *
+ * @param parallelism The parallelism
+ */
+ public void setParallelism(int parallelism) {
+ config.setParallelism(parallelism);
+ }
+
+ /**
+ * Sets the restart strategy configuration. The configuration specifies which restart strategy
+ * will be used for the execution graph in case of a restart.
+ *
+ * @param restartStrategyConfiguration Restart strategy configuration to be set
+ */
+ @PublicEvolving
+ public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
+ config.setRestartStrategy(restartStrategyConfiguration);
+ }
+
+ /**
+ * Returns the specified restart strategy configuration.
+ *
+ * @return The restart strategy configuration to be used
+ */
+ @PublicEvolving
+ public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
+ return config.getRestartStrategy();
+ }
+
+ /**
+ * Sets the number of times that failed tasks are re-executed. A value of zero
+ * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+ * default value (as defined in the configuration) should be used.
+ *
+ * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+ *
+ * @deprecated This method will be replaced by {@link #setRestartStrategy}. The
+ * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of
+ * execution retries.
+ */
+ @Deprecated
+ @PublicEvolving
+ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+ config.setNumberOfExecutionRetries(numberOfExecutionRetries);
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed tasks. A value
+ * of {@code -1} indicates that the system default value (as defined in the configuration)
+ * should be used.
+ *
+ * @return The number of times the system will try to re-execute failed tasks.
+ *
+ * @deprecated This method will be replaced by {@link #getRestartStrategy}. The
+ * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of
+ * execution retries.
+ */
+ @Deprecated
+ @PublicEvolving
+ public int getNumberOfExecutionRetries() {
+ return config.getNumberOfExecutionRetries();
+ }
+
+ /**
+ * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
+ *
+ * @return The execution result from the latest job execution.
+ */
+ public JobExecutionResult getLastJobExecutionResult(){
+ return this.lastJobExecutionResult;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Session Management
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the JobID by which this environment is identified. The JobID sets the execution context
+ * in the cluster or local environment.
+ *
+ * @return The JobID of this environment.
+ * @see #getIdString()
+ */
+ @PublicEvolving
+ public JobID getId() {
+ return this.jobID;
+ }
+
+ /**
+ * Gets the JobID by which this environment is identified, as a string.
+ *
+ * @return The JobID as a string.
+ * @see #getId()
+ */
+ @PublicEvolving
+ public String getIdString() {
+ return this.jobID.toString();
+ }
+
+ /**
+ * Sets the session timeout to hold the intermediate results of a job. This only
+ * applies the updated timeout in future executions.
+ *
+ * @param timeout The timeout, in seconds.
+ */
+ @PublicEvolving
+ public void setSessionTimeout(long timeout) {
+ throw new IllegalStateException("Support for sessions is currently disabled. " +
+ "It will be enabled in future Flink versions.");
+ // Session management is disabled, revert this commit to enable
+ //if (timeout < 0) {
+ // throw new IllegalArgumentException("The session timeout must not be less than zero.");
+ //}
+ //this.sessionTimeout = timeout;
+ }
+
+ /**
+ * Gets the session timeout for this environment. The session timeout defines for how long
+ * after an execution, the job and its intermediate results will be kept for future
+ * interactions.
+ *
+ * @return The session timeout, in seconds.
+ */
+ @PublicEvolving
+ public long getSessionTimeout() {
+ return sessionTimeout;
+ }
+
+ /**
+ * Starts a new session, discarding the previous data flow and all of its intermediate results.
+ */
+ @PublicEvolving
+ public abstract void startNewSession() throws Exception;
+
+ // --------------------------------------------------------------------------------------------
+ // Registry for types and serializers
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Adds a new Kryo default serializer to the Runtime.
+ *
+ *
Note that the serializer instance must be serializable (as defined by java.io.Serializable),
+ * because it may be distributed to the worker nodes by java serialization.
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializer The serializer to use.
+ */
+ public & Serializable>void addDefaultKryoSerializer(Class> type, T serializer) {
+ config.addDefaultKryoSerializer(type, serializer);
+ }
+
+ /**
+ * Adds a new Kryo default serializer to the Runtime.
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializerClass The class of the serializer to use.
+ */
+ public void addDefaultKryoSerializer(Class> type, Class extends Serializer>> serializerClass) {
+ config.addDefaultKryoSerializer(type, serializerClass);
+ }
+
+ /**
+ * Registers the given type with a Kryo Serializer.
+ *
+ * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
+ * because it may be distributed to the worker nodes by java serialization.
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializer The serializer to use.
+ */
+ public & Serializable>void registerTypeWithKryoSerializer(Class> type, T serializer) {
+ config.registerTypeWithKryoSerializer(type, serializer);
+ }
+
+ /**
+ * Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer.
+ *
+ * @param type The class of the types serialized with the given serializer.
+ * @param serializerClass The class of the serializer to use.
+ */
+ public void registerTypeWithKryoSerializer(Class> type, Class extends Serializer>> serializerClass) {
+ config.registerTypeWithKryoSerializer(type, serializerClass);
+ }
+
+ /**
+ * Registers the given type with the serialization stack. If the type is eventually
+ * serialized as a POJO, then the type is registered with the POJO serializer. If the
+ * type ends up being serialized with Kryo, then it will be registered at Kryo to make
+ * sure that only tags are written.
+ *
+ * @param type The class of the type to register.
+ */
+ public void registerType(Class> type) {
+ if (type == null) {
+ throw new NullPointerException("Cannot register null type class.");
+ }
+
+ TypeInformation> typeInfo = TypeExtractor.createTypeInfo(type);
+
+ if (typeInfo instanceof PojoTypeInfo) {
+ config.registerPojoType(type);
+ } else {
+ config.registerKryoType(type);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Data set creations
+ // --------------------------------------------------------------------------------------------
+
+ // ---------------------------------- Text Input Format ---------------------------------------
+
+ /**
+ * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
+ * The file will be read with the system's default character set.
+ *
+ * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @return A {@link DataSet} that represents the data read from the given file as text lines.
+ */
+ public DataSource readTextFile(String filePath) {
+ Preconditions.checkNotNull(filePath, "The file path may not be null.");
+
+ return new DataSource<>(this, new TextInputFormat(new Path(filePath)), BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
+ }
+
+ /**
+ * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
+ * The {@link java.nio.charset.Charset} with the given name will be used to read the files.
+ *
+ * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @param charsetName The name of the character set used to read the file.
+ * @return A {@link DataSet} that represents the data read from the given file as text lines.
+ */
+ public DataSource readTextFile(String filePath, String charsetName) {
+ Preconditions.checkNotNull(filePath, "The file path may not be null.");
+
+ TextInputFormat format = new TextInputFormat(new Path(filePath));
+ format.setCharsetName(charsetName);
+ return new DataSource<>(this, format, BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
+ }
+
+ // -------------------------- Text Input Format With String Value------------------------------
+
+ /**
+ * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
+ * This method is similar to {@link #readTextFile(String)}, but it produces a DataSet with mutable
+ * {@link StringValue} objects, rather than Java Strings. StringValues can be used to tune implementations
+ * to be less object and garbage collection heavy.
+ *
+ * The file will be read with the system's default character set.
+ *
+ * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @return A {@link DataSet} that represents the data read from the given file as text lines.
+ */
+ public DataSource readTextFileWithValue(String filePath) {
+ Preconditions.checkNotNull(filePath, "The file path may not be null.");
+
+ return new DataSource<>(this, new TextValueInputFormat(new Path(filePath)), new ValueTypeInfo<>(StringValue.class), Utils.getCallLocationName());
+ }
+
+ /**
+ * Creates a {@link DataSet} that represents the Strings produced by reading the given file line wise.
+ * This method is similar to {@link #readTextFile(String, String)}, but it produces a DataSet with mutable
+ * {@link StringValue} objects, rather than Java Strings. StringValues can be used to tune implementations
+ * to be less object and garbage collection heavy.
+ *
+ * The {@link java.nio.charset.Charset} with the given name will be used to read the files.
+ *
+ * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @param charsetName The name of the character set used to read the file.
+ * @param skipInvalidLines A flag to indicate whether to skip lines that cannot be read with the given character set.
+ *
+ * @return A DataSet that represents the data read from the given file as text lines.
+ */
+ public DataSource readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines) {
+ Preconditions.checkNotNull(filePath, "The file path may not be null.");
+
+ TextValueInputFormat format = new TextValueInputFormat(new Path(filePath));
+ format.setCharsetName(charsetName);
+ format.setSkipInvalidLines(skipInvalidLines);
+ return new DataSource<>(this, format, new ValueTypeInfo<>(StringValue.class), Utils.getCallLocationName());
+ }
+
+ // ----------------------------------- Primitive Input Format ---------------------------------------
+
+ /**
+ * Creates a {@link DataSet} that represents the primitive type produced by reading the given file line wise.
+ * This method is similar to {@link #readCsvFile(String)} with single field, but it produces a DataSet not through
+ * {@link org.apache.flink.api.java.tuple.Tuple1}.
+ *
+ * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @param typeClass The primitive type class to be read.
+ * @return A {@link DataSet} that represents the data read from the given file as primitive type.
+ */
+ public DataSource readFileOfPrimitives(String filePath, Class typeClass) {
+ Preconditions.checkNotNull(filePath, "The file path may not be null.");
+
+ return new DataSource<>(this, new PrimitiveInputFormat<>(new Path(filePath), typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
+ }
+
+ /**
+ * Creates a {@link DataSet} that represents the primitive type produced by reading the given file in delimited way.
+ * This method is similar to {@link #readCsvFile(String)} with single field, but it produces a DataSet not through
+ * {@link org.apache.flink.api.java.tuple.Tuple1}.
+ *
+ * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @param delimiter The delimiter of the given file.
+ * @param typeClass The primitive type class to be read.
+ * @return A {@link DataSet} that represents the data read from the given file as primitive type.
+ */
+ public DataSource readFileOfPrimitives(String filePath, String delimiter, Class typeClass) {
+ Preconditions.checkNotNull(filePath, "The file path may not be null.");
+
+ return new DataSource<>(this, new PrimitiveInputFormat<>(new Path(filePath), delimiter, typeClass), TypeExtractor.getForClass(typeClass), Utils.getCallLocationName());
+ }
+
+ // ----------------------------------- CSV Input Format ---------------------------------------
+
+ /**
+ * Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
+ * define parameters and field types and will eventually produce the DataSet that corresponds to
+ * the read and parsed CSV input.
+ *
+ * @param filePath The path of the CSV file.
+ * @return A CsvReader that can be used to configure the CSV input.
+ */
+ public CsvReader readCsvFile(String filePath) {
+ return new CsvReader(filePath, this);
+ }
+
+ // ------------------------------------ File Input Format -----------------------------------------
+
+ public DataSource readFile(FileInputFormat inputFormat, String filePath) {
+ if (inputFormat == null) {
+ throw new IllegalArgumentException("InputFormat must not be null.");
+ }
+ if (filePath == null) {
+ throw new IllegalArgumentException("The file path must not be null.");
+ }
+
+ inputFormat.setFilePath(new Path(filePath));
+ try {
+ return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
+ }
+ catch (Exception e) {
+ throw new InvalidProgramException("The type returned by the input format could not be automatically determined. " +
+ "Please specify the TypeInformation of the produced type explicitly by using the " +
+ "'createInput(InputFormat, TypeInformation)' method instead.");
+ }
+ }
+
+ // ----------------------------------- Generic Input Format ---------------------------------------
+
+ /**
+ * Generic method to create an input {@link DataSet} with in {@link InputFormat}. The DataSet will not be
+ * immediately created - instead, this method returns a DataSet that will be lazily created from
+ * the input format once the program is executed.
+ *
+ * Since all data sets need specific information about their types, this method needs to determine
+ * the type of the data produced by the input format. It will attempt to determine the data type
+ * by reflection, unless the input format implements the {@link ResultTypeQueryable} interface.
+ * In the latter case, this method will invoke the {@link ResultTypeQueryable#getProducedType()}
+ * method to determine data type produced by the input format.
+ *
+ * @param inputFormat The input format used to create the data set.
+ * @return A {@link DataSet} that represents the data created by the input format.
+ *
+ * @see #createInput(InputFormat, TypeInformation)
+ */
+ public DataSource createInput(InputFormat inputFormat) {
+ if (inputFormat == null) {
+ throw new IllegalArgumentException("InputFormat must not be null.");
+ }
+
+ try {
+ return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
+ }
+ catch (Exception e) {
+ throw new InvalidProgramException("The type returned by the input format could not be automatically determined. " +
+ "Please specify the TypeInformation of the produced type explicitly by using the " +
+ "'createInput(InputFormat, TypeInformation)' method instead.", e);
+ }
+ }
+
+ /**
+ * Generic method to create an input DataSet with in {@link InputFormat}. The {@link DataSet} will not be
+ * immediately created - instead, this method returns a {@link DataSet} that will be lazily created from
+ * the input format once the program is executed.
+ *
+ * The {@link DataSet} is typed to the given TypeInformation. This method is intended for input formats that
+ * where the return type cannot be determined by reflection analysis, and that do not implement the
+ * {@link ResultTypeQueryable} interface.
+ *
+ * @param inputFormat The input format used to create the data set.
+ * @return A {@link DataSet} that represents the data created by the input format.
+ *
+ * @see #createInput(InputFormat)
+ */
+ public DataSource createInput(InputFormat inputFormat, TypeInformation producedType) {
+ if (inputFormat == null) {
+ throw new IllegalArgumentException("InputFormat must not be null.");
+ }
+
+ if (producedType == null) {
+ throw new IllegalArgumentException("Produced type information must not be null.");
+ }
+
+ return new DataSource<>(this, inputFormat, producedType, Utils.getCallLocationName());
+ }
+
+ // ----------------------------------- Collection ---------------------------------------
+
+ /**
+ * Creates a DataSet from the given non-empty collection. The type of the data set is that
+ * of the elements in the collection.
+ *
+ * The framework will try and determine the exact type from the collection elements.
+ * In case of generic elements, it may be necessary to manually supply the type information
+ * via {@link #fromCollection(Collection, TypeInformation)}.
+ *
+ *
Note that this operation will result in a non-parallel data source, i.e. a data source with
+ * a parallelism of one.
+ *
+ * @param data The collection of elements to create the data set from.
+ * @return A DataSet representing the given collection.
+ *
+ * @see #fromCollection(Collection, TypeInformation)
+ */
+ public DataSource fromCollection(Collection data) {
+ if (data == null) {
+ throw new IllegalArgumentException("The data must not be null.");
+ }
+ if (data.size() == 0) {
+ throw new IllegalArgumentException("The size of the collection must not be empty.");
+ }
+
+ X firstValue = data.iterator().next();
+
+ TypeInformation type = TypeExtractor.getForObject(firstValue);
+ CollectionInputFormat.checkCollection(data, type.getTypeClass());
+ return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, Utils.getCallLocationName());
+ }
+
+ /**
+ * Creates a DataSet from the given non-empty collection. Note that this operation will result
+ * in a non-parallel data source, i.e. a data source with a parallelism of one.
+ *
+ * The returned DataSet is typed to the given TypeInformation.
+ *
+ * @param data The collection of elements to create the data set from.
+ * @param type The TypeInformation for the produced data set.
+ * @return A DataSet representing the given collection.
+ *
+ * @see #fromCollection(Collection)
+ */
+ public DataSource fromCollection(Collection data, TypeInformation type) {
+ return fromCollection(data, type, Utils.getCallLocationName());
+ }
+
+ private DataSource fromCollection(Collection data, TypeInformation type, String callLocationName) {
+ CollectionInputFormat.checkCollection(data, type.getTypeClass());
+ return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, callLocationName);
+ }
+
+ /**
+ * Creates a DataSet from the given iterator. Because the iterator will remain unmodified until
+ * the actual execution happens, the type of data returned by the iterator must be given
+ * explicitly in the form of the type class (this is due to the fact that the Java compiler
+ * erases the generic type information).
+ *
+ * Note that this operation will result in a non-parallel data source, i.e. a data source with
+ * a parallelism of one.
+ *
+ * @param data The collection of elements to create the data set from.
+ * @param type The class of the data produced by the iterator. Must not be a generic class.
+ * @return A DataSet representing the elements in the iterator.
+ *
+ * @see #fromCollection(Iterator, TypeInformation)
+ */
+ public DataSource fromCollection(Iterator data, Class type) {
+ return fromCollection(data, TypeExtractor.getForClass(type));
+ }
+
+ /**
+ * Creates a DataSet from the given iterator. Because the iterator will remain unmodified until
+ * the actual execution happens, the type of data returned by the iterator must be given
+ * explicitly in the form of the type information. This method is useful for cases where the type
+ * is generic. In that case, the type class (as given in {@link #fromCollection(Iterator, Class)}
+ * does not supply all type information.
+ *
+ * Note that this operation will result in a non-parallel data source, i.e. a data source with
+ * a parallelism of one.
+ *
+ * @param data The collection of elements to create the data set from.
+ * @param type The TypeInformation for the produced data set.
+ * @return A DataSet representing the elements in the iterator.
+ *
+ * @see #fromCollection(Iterator, Class)
+ */
+ public DataSource fromCollection(Iterator data, TypeInformation type) {
+ return new DataSource<>(this, new IteratorInputFormat<>(data), type, Utils.getCallLocationName());
+ }
+
+ /**
+ * Creates a new data set that contains the given elements. The elements must all be of the same type,
+ * for example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty.
+ *
+ * The framework will try and determine the exact type from the collection elements.
+ * In case of generic elements, it may be necessary to manually supply the type information
+ * via {@link #fromCollection(Collection, TypeInformation)}.
+ *
+ *
Note that this operation will result in a non-parallel data source, i.e. a data source with
+ * a parallelism of one.
+ *
+ * @param data The elements to make up the data set.
+ * @return A DataSet representing the given list of elements.
+ */
+ @SafeVarargs
+ public final DataSource fromElements(X... data) {
+ if (data == null) {
+ throw new IllegalArgumentException("The data must not be null.");
+ }
+ if (data.length == 0) {
+ throw new IllegalArgumentException("The number of elements must not be zero.");
+ }
+
+ TypeInformation typeInfo;
+ try {
+ typeInfo = TypeExtractor.getForObject(data[0]);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
+ + "; please specify the TypeInformation manually via "
+ + "ExecutionEnvironment#fromElements(Collection, TypeInformation)");
+ }
+
+ return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName());
+ }
+
+ /**
+ * Creates a new data set that contains the given elements. The framework will determine the type according to the
+ * based type user supplied. The elements should be the same or be the subclass to the based type.
+ * The sequence of elements must not be empty.
+ * Note that this operation will result in a non-parallel data source, i.e. a data source with
+ * a parallelism of one.
+ *
+ * @param type The base class type for every element in the collection.
+ * @param data The elements to make up the data set.
+ * @return A DataSet representing the given list of elements.
+ */
+ @SafeVarargs
+ public final DataSource fromElements(Class type, X... data) {
+ if (data == null) {
+ throw new IllegalArgumentException("The data must not be null.");
+ }
+ if (data.length == 0) {
+ throw new IllegalArgumentException("The number of elements must not be zero.");
+ }
+
+ TypeInformation typeInfo;
+ try {
+ typeInfo = TypeExtractor.getForClass(type);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not create TypeInformation for type " + type.getName()
+ + "; please specify the TypeInformation manually via "
+ + "ExecutionEnvironment#fromElements(Collection, TypeInformation)");
+ }
+
+ return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName());
+ }
+
+ /**
+ * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
+ * framework to create a parallel data source that returns the elements in the iterator.
+ *
+ * Because the iterator will remain unmodified until the actual execution happens, the type of data
+ * returned by the iterator must be given explicitly in the form of the type class (this is due to the
+ * fact that the Java compiler erases the generic type information).
+ *
+ * @param iterator The iterator that produces the elements of the data set.
+ * @param type The class of the data produced by the iterator. Must not be a generic class.
+ * @return A DataSet representing the elements in the iterator.
+ *
+ * @see #fromParallelCollection(SplittableIterator, TypeInformation)
+ */
+ public DataSource fromParallelCollection(SplittableIterator iterator, Class type) {
+ return fromParallelCollection(iterator, TypeExtractor.getForClass(type));
+ }
+
+ /**
+ * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
+ * framework to create a parallel data source that returns the elements in the iterator.
+ *
+ * Because the iterator will remain unmodified until the actual execution happens, the type of data
+ * returned by the iterator must be given explicitly in the form of the type information.
+ * This method is useful for cases where the type is generic. In that case, the type class
+ * (as given in {@link #fromParallelCollection(SplittableIterator, Class)} does not supply all type information.
+ *
+ * @param iterator The iterator that produces the elements of the data set.
+ * @param type The TypeInformation for the produced data set.
+ * @return A DataSet representing the elements in the iterator.
+ *
+ * @see #fromParallelCollection(SplittableIterator, Class)
+ */
+ public DataSource fromParallelCollection(SplittableIterator iterator, TypeInformation type) {
+ return fromParallelCollection(iterator, type, Utils.getCallLocationName());
+ }
+
+ // private helper for passing different call location names
+ private DataSource fromParallelCollection(SplittableIterator iterator, TypeInformation type, String callLocationName) {
+ return new DataSource<>(this, new ParallelIteratorInputFormat<>(iterator), type, callLocationName);
+ }
+
+ /**
+ * Creates a new data set that contains a sequence of numbers. The data set will be created in parallel,
+ * so there is no guarantee about the order of the elements.
+ *
+ * @param from The number to start at (inclusive).
+ * @param to The number to stop at (inclusive).
+ * @return A DataSet, containing all number in the {@code [from, to]} interval.
+ */
+ public DataSource generateSequence(long from, long to) {
+ return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, Utils.getCallLocationName());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Executing
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of the program that have
+ * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
+ * writing results (e.g. {@link DataSet#writeAsText(String)},
+ * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
+ * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
+ *
+ * The program execution will be logged and displayed with a generated default name.
+ *
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ * @throws Exception Thrown, if the program executions fails.
+ */
+ public JobExecutionResult execute() throws Exception {
+ return execute(getDefaultName());
+ }
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of the program that have
+ * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
+ * writing results (e.g. {@link DataSet#writeAsText(String)},
+ * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
+ * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
+ *
+ *
The program execution will be logged and displayed with the given job name.
+ *
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ * @throws Exception Thrown, if the program executions fails.
+ */
+ public abstract JobExecutionResult execute(String jobName) throws Exception;
+
+ /**
+ * Creates the plan with which the system will execute the program, and returns it as
+ * a String using a JSON representation of the execution data flow graph.
+ * Note that this needs to be called, before the plan is executed.
+ *
+ * @return The execution plan of the program, as a JSON String.
+ * @throws Exception Thrown, if the compiler could not be instantiated, or the master could not
+ * be contacted to retrieve information relevant to the execution planning.
+ */
+ public abstract String getExecutionPlan() throws Exception;
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+ * The runtime will copy the files temporarily to a local cache, if needed.
+ *
+ *
The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+ * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+ * {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ */
+ public void registerCachedFile(String filePath, String name){
+ registerCachedFile(filePath, name, false);
+ }
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+ * The runtime will copy the files temporarily to a local cache, if needed.
+ *
+ *
The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+ * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+ * {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ * @param executable flag indicating whether the file should be executable
+ */
+ public void registerCachedFile(String filePath, String name, boolean executable){
+ this.cacheFile.add(new Tuple2<>(name, new DistributedCacheEntry(filePath, executable)));
+ }
+
+ /**
+ * Registers all files that were registered at this execution environment's cache registry of the
+ * given plan's cache registry.
+ *
+ * @param p The plan to register files at.
+ * @throws IOException Thrown if checks for existence and sanity fail.
+ */
+ protected void registerCachedFilesWithPlan(Plan p) throws IOException {
+ for (Tuple2 entry : cacheFile) {
+ p.registerCachedFile(entry.f0, entry.f1);
+ }
+ }
+
+ /**
+ * Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks,
+ * and operations and how they interact, as an isolated unit that can be executed with a
+ * {@link org.apache.flink.api.common.PlanExecutor}. Obtaining a plan and starting it with an
+ * executor is an alternative way to run a program and is only possible if the program consists
+ * only of distributed operations.
+ * This automatically starts a new stage of execution.
+ *
+ * @return The program's plan.
+ */
+ @Internal
+ public Plan createProgramPlan() {
+ return createProgramPlan(null);
+ }
+
+ /**
+ * Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks,
+ * and operations and how they interact, as an isolated unit that can be executed with a
+ * {@link org.apache.flink.api.common.PlanExecutor}. Obtaining a plan and starting it with an
+ * executor is an alternative way to run a program and is only possible if the program consists
+ * only of distributed operations.
+ * This automatically starts a new stage of execution.
+ *
+ * @param jobName The name attached to the plan (displayed in logs and monitoring).
+ * @return The program's plan.
+ */
+ @Internal
+ public Plan createProgramPlan(String jobName) {
+ return createProgramPlan(jobName, true);
+ }
+
+ /**
+ * Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks,
+ * and operations and how they interact, as an isolated unit that can be executed with a
+ * {@link org.apache.flink.api.common.PlanExecutor}. Obtaining a plan and starting it with an
+ * executor is an alternative way to run a program and is only possible if the program consists
+ * only of distributed operations.
+ *
+ * @param jobName The name attached to the plan (displayed in logs and monitoring).
+ * @param clearSinks Whether or not to start a new stage of execution.
+ * @return The program's plan.
+ */
+ @Internal
+ public Plan createProgramPlan(String jobName, boolean clearSinks) {
+ if (this.sinks.isEmpty()) {
+ if (wasExecuted) {
+ throw new RuntimeException("No new data sinks have been defined since the " +
+ "last execution. The last execution refers to the latest call to " +
+ "'execute()', 'count()', 'collect()', or 'print()'.");
+ } else {
+ throw new RuntimeException("No data sinks have been created yet. " +
+ "A program needs at least one sink that consumes data. " +
+ "Examples are writing the data set or printing it.");
+ }
+ }
+
+ if (jobName == null) {
+ jobName = getDefaultName();
+ }
+
+ OperatorTranslation translator = new OperatorTranslation();
+ Plan plan = translator.translateToPlan(this.sinks, jobName);
+
+ if (getParallelism() > 0) {
+ plan.setDefaultParallelism(getParallelism());
+ }
+ plan.setExecutionConfig(getConfig());
+
+ // Check plan for GenericTypeInfo's and register the types at the serializers.
+ if (!config.isAutoTypeRegistrationDisabled()) {
+ plan.accept(new Visitor>() {
+
+ private final HashSet> deduplicator = new HashSet<>();
+
+ @Override
+ public boolean preVisit(org.apache.flink.api.common.operators.Operator> visitable) {
+ OperatorInformation> opInfo = visitable.getOperatorInfo();
+ Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, deduplicator);
+ return true;
+ }
+
+ @Override
+ public void postVisit(org.apache.flink.api.common.operators.Operator> visitable) {}
+ });
+ }
+
+ try {
+ registerCachedFilesWithPlan(plan);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e);
+ }
+
+ // clear all the sinks such that the next execution does not redo everything
+ if (clearSinks) {
+ this.sinks.clear();
+ wasExecuted = true;
+ }
+
+ // All types are registered now. Print information.
+ int registeredTypes = config.getRegisteredKryoTypes().size() +
+ config.getRegisteredPojoTypes().size() +
+ config.getRegisteredTypesWithKryoSerializerClasses().size() +
+ config.getRegisteredTypesWithKryoSerializers().size();
+ int defaultKryoSerializers = config.getDefaultKryoSerializers().size() +
+ config.getDefaultKryoSerializerClasses().size();
+ LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers);
+
+ if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) {
+ LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer");
+ }
+ if (config.isForceKryoEnabled()) {
+ LOG.info("Using KryoSerializer for serializing POJOs");
+ }
+ if (config.isForceAvroEnabled()) {
+ LOG.info("Using AvroSerializer for serializing POJOs");
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString());
+ LOG.debug("Registered Kryo with Serializers types: {}", config.getRegisteredTypesWithKryoSerializers().entrySet().toString());
+ LOG.debug("Registered Kryo with Serializer Classes types: {}", config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString());
+ LOG.debug("Registered Kryo default Serializers: {}", config.getDefaultKryoSerializers().entrySet().toString());
+ LOG.debug("Registered Kryo default Serializers Classes {}", config.getDefaultKryoSerializerClasses().entrySet().toString());
+ LOG.debug("Registered POJO types: {}", config.getRegisteredPojoTypes().toString());
+
+ // print information about static code analysis
+ LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
+ }
+
+ return plan;
+ }
+
+ /**
+ * Adds the given sink to this environment. Only sinks that have been added will be executed once
+ * the {@link #execute()} or {@link #execute(String)} method is called.
+ *
+ * @param sink The sink to add for execution.
+ */
+ @Internal
+ void registerDataSink(DataSink> sink) {
+ this.sinks.add(sink);
+ }
+
+ /**
+ * Gets a default job name, based on the timestamp when this method is invoked.
+ *
+ * @return A default job name.
+ */
+ private static String getDefaultName() {
+ return "Flink Java Job at " + Calendar.getInstance().getTime();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Instantiation of Execution Contexts
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates an execution environment that represents the context in which the program is currently executed.
+ * If the program is invoked standalone, this method returns a local execution environment, as returned by
+ * {@link #createLocalEnvironment()}. If the program is invoked from within the command line client to be
+ * submitted to a cluster, this method returns the execution environment of this cluster.
+ *
+ * @return The execution environment of the context in which the program is executed.
+ */
+ public static ExecutionEnvironment getExecutionEnvironment() {
+ return contextEnvironmentFactory.get() == null ?
+ createLocalEnvironment() : contextEnvironmentFactory.get().createExecutionEnvironment();
+ }
+
+ /**
+ * Creates a {@link CollectionEnvironment} that uses Java Collections underneath. This will execute in a
+ * single thread in the current JVM. It is very fast but will fail if the data does not fit into
+ * memory. parallelism will always be 1. This is useful during implementation and for debugging.
+ * @return A Collection Environment
+ */
+ @PublicEvolving
+ public static CollectionEnvironment createCollectionsEnvironment(){
+ CollectionEnvironment ce = new CollectionEnvironment();
+ ce.setParallelism(1);
+ return ce;
+ }
+
+ /**
+ * Creates a {@link LocalEnvironment}. The local execution environment will run the program in a
+ * multi-threaded fashion in the same JVM as the environment was created in. The default
+ * parallelism of the local environment is the number of hardware contexts (CPU cores / threads),
+ * unless it was specified differently by {@link #setDefaultLocalParallelism(int)}.
+ *
+ * @return A local execution environment.
+ */
+ public static LocalEnvironment createLocalEnvironment() {
+ return createLocalEnvironment(defaultLocalDop);
+ }
+
+ /**
+ * Creates a {@link LocalEnvironment}. The local execution environment will run the program in a
+ * multi-threaded fashion in the same JVM as the environment was created in. It will use the
+ * parallelism specified in the parameter.
+ *
+ * @param parallelism The parallelism for the local environment.
+ * @return A local execution environment with the specified parallelism.
+ */
+ public static LocalEnvironment createLocalEnvironment(int parallelism) {
+ LocalEnvironment lee = new LocalEnvironment();
+ lee.setParallelism(parallelism);
+ return lee;
+ }
+
+ /**
+ * Creates a {@link LocalEnvironment}. The local execution environment will run the program in a
+ * multi-threaded fashion in the same JVM as the environment was created in. It will use the
+ * parallelism specified in the parameter.
+ *
+ * @param customConfiguration Pass a custom configuration to the LocalEnvironment.
+ * @return A local execution environment with the specified parallelism.
+ */
+ public static LocalEnvironment createLocalEnvironment(Configuration customConfiguration) {
+ return new LocalEnvironment(customConfiguration);
+ }
+
+ /**
+ * Creates a {@link LocalEnvironment} for local program execution that also starts the
+ * web monitoring UI.
+ *
+ * The local execution environment will run the program in a multi-threaded fashion in
+ * the same JVM as the environment was created in. It will use the parallelism specified in the
+ * parameter.
+ *
+ *
If the configuration key 'jobmanager.web.port' was set in the configuration, that particular
+ * port will be used for the web UI. Otherwise, the default port (8081) will be used.
+ */
+ @PublicEvolving
+ public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
+ checkNotNull(conf, "conf");
+
+ conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+
+ LocalEnvironment localEnv = new LocalEnvironment(conf);
+ localEnv.setParallelism(defaultLocalDop);
+
+ return localEnv;
+ }
+
+ /**
+ * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
+ * to a cluster for execution. Note that all file paths used in the program must be accessible from the
+ * cluster. The execution will use the cluster's default parallelism, unless the parallelism is
+ * set explicitly via {@link ExecutionEnvironment#setParallelism(int)}.
+ *
+ * @param host The host name or address of the master (JobManager), where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should be executed.
+ * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
+ * user-defined functions, user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
+ * @return A remote environment that executes the program on a cluster.
+ */
+ public static ExecutionEnvironment createRemoteEnvironment(String host, int port, String... jarFiles) {
+ return new RemoteEnvironment(host, port, jarFiles);
+ }
+
+ /**
+ * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
+ * to a cluster for execution. Note that all file paths used in the program must be accessible from the
+ * cluster. The custom configuration file is used to configure Akka specific configuration parameters
+ * for the Client only; Program parallelism can be set via {@link ExecutionEnvironment#setParallelism(int)}.
+ *
+ *
Cluster configuration has to be done in the remotely running Flink instance.
+ *
+ * @param host The host name or address of the master (JobManager), where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should be executed.
+ * @param clientConfiguration Configuration used by the client that connects to the cluster.
+ * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
+ * user-defined functions, user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
+ * @return A remote environment that executes the program on a cluster.
+ */
+ public static ExecutionEnvironment createRemoteEnvironment(
+ String host, int port, Configuration clientConfiguration, String... jarFiles) {
+ return new RemoteEnvironment(host, port, clientConfiguration, jarFiles, null);
+ }
+
+ /**
+ * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
+ * to a cluster for execution. Note that all file paths used in the program must be accessible from the
+ * cluster. The execution will use the specified parallelism.
+ *
+ * @param host The host name or address of the master (JobManager), where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should be executed.
+ * @param parallelism The parallelism to use during the execution.
+ * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
+ * user-defined functions, user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
+ * @return A remote environment that executes the program on a cluster.
+ */
+ public static ExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles) {
+ RemoteEnvironment rec = new RemoteEnvironment(host, port, jarFiles);
+ rec.setParallelism(parallelism);
+ return rec;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Default parallelism for local execution
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the default parallelism that will be used for the local execution environment created by
+ * {@link #createLocalEnvironment()}.
+ *
+ * @return The default local parallelism
+ */
+ public static int getDefaultLocalParallelism() {
+ return defaultLocalDop;
+ }
+
+ /**
+ * Sets the default parallelism that will be used for the local execution environment created by
+ * {@link #createLocalEnvironment()}.
+ *
+ * @param parallelism The parallelism to use as the default local parallelism.
+ */
+ public static void setDefaultLocalParallelism(int parallelism) {
+ defaultLocalDop = parallelism;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Methods to control the context environment and creation of explicit environments other
+ // than the context environment
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Sets a context environment factory, that creates the context environment for running programs
+ * with pre-configured environments. Examples are running programs from the command line, and
+ * running programs in the Scala shell.
+ *
+ *
When the context environment factory is set, no other environments can be explicitly used.
+ *
+ * @param ctx The context environment factory.
+ */
+ protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
+ contextEnvironmentFactory.set(Preconditions.checkNotNull(ctx));
+ }
+
+ /**
+ * Un-sets the context environment factory. After this method is called, the call to
+ * {@link #getExecutionEnvironment()} will again return a default local execution environment, and
+ * it is possible to explicitly instantiate the LocalEnvironment and the RemoteEnvironment.
+ */
+ protected static void resetContextEnvironment() {
+ contextEnvironmentFactory.remove();
+ }
+
+ /**
+ * Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment
+ * or a RemoteEnvironment.
+ *
+ * @return True, if it is possible to explicitly instantiate a LocalEnvironment or a
+ * RemoteEnvironment, false otherwise.
+ */
+ @Internal
+ public static boolean areExplicitEnvironmentsAllowed() {
+ return contextEnvironmentFactory.get() == null;
+ }
+}
diff --git a/flinkx-core/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flinkx-core/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index d558fbb514..85ee6514ad 100644
--- a/flinkx-core/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flinkx-core/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -110,6 +110,7 @@ public AbstractMetricGroup(MetricRegistry registry, String[] scope, A parent) {
this.scopeStrings = new String[registry.getNumberReporters()];
}
+ @Override
public Map getAllVariables() {
if (variables == null) { // avoid synchronization for common case
synchronized (this) {
@@ -168,6 +169,7 @@ public String getLogicalScope(CharacterFilter filter, char delimiter) {
*
* @see #getMetricIdentifier(String)
*/
+ @Override
public String[] getScopeComponents() {
return scopeComponents;
}
@@ -200,6 +202,7 @@ public QueryScopeInfo getQueryServiceMetricInfo(CharacterFilter filter) {
* @param metricName metric name
* @return fully qualified metric name
*/
+ @Override
public String getMetricIdentifier(String metricName) {
return getMetricIdentifier(metricName, null);
}
@@ -212,6 +215,7 @@ public String getMetricIdentifier(String metricName) {
* @param filter character filter which is applied to the scope components if not null.
* @return fully qualified metric name
*/
+ @Override
public String getMetricIdentifier(String metricName, CharacterFilter filter) {
return getMetricIdentifier(metricName, filter, -1);
}
diff --git a/flinkx-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flinkx-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
new file mode 100644
index 0000000000..7737707c5a
--- /dev/null
+++ b/flinkx-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -0,0 +1,1823 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.FileReadFunction;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
+import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
+import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SplittableIterator;
+
+import com.esotericsoftware.kryo.Serializer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The StreamExecutionEnvironment is the context in which a streaming program is executed. A
+ * {@link LocalStreamEnvironment} will cause execution in the current JVM, a
+ * {@link RemoteStreamEnvironment} will cause execution on a remote setup.
+ *
+ * The environment provides methods to control the job execution (such as setting the parallelism
+ * or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).
+ *
+ * @see org.apache.flink.streaming.api.environment.LocalStreamEnvironment
+ * @see org.apache.flink.streaming.api.environment.RemoteStreamEnvironment
+ */
+@Public
+public abstract class StreamExecutionEnvironment {
+
+ /** The default name to use for a streaming job if no other name has been specified. */
+ public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
+
+ /** The time characteristic that is used if none other is set. */
+ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
+
+ /** The default buffer timeout (max delay of records in the network stack). */
+ private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
+
+ /**
+ * The environment of the context (local by default, cluster if invoked through command line).
+ */
+ private static ThreadLocal contextEnvironmentFactory = new ThreadLocal<>();
+
+ /** The default parallelism used when creating a local environment. */
+ private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
+
+ // ------------------------------------------------------------------------
+
+ /** The execution configuration for this environment. */
+ private final ExecutionConfig config = new ExecutionConfig();
+
+ /** Settings that control the checkpointing behavior. */
+ private final CheckpointConfig checkpointCfg = new CheckpointConfig();
+
+ protected final List> transformations = new ArrayList<>();
+
+ private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
+
+ protected boolean isChainingEnabled = true;
+
+ /** The state backend used for storing k/v state and state snapshots. */
+ private AbstractStateBackend defaultStateBackend;
+
+ /** The time characteristic used by the data streams. */
+ private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
+
+ protected final List> cacheFile = new ArrayList<>();
+
+
+ // --------------------------------------------------------------------------------------------
+ // Constructor and Properties
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the config object.
+ */
+ public ExecutionConfig getConfig() {
+ return config;
+ }
+
+ /**
+ * Get the list of cached files that were registered for distribution among the task managers.
+ */
+ public List> getCachedFiles() {
+ return cacheFile;
+ }
+
+ /**
+ * Sets the parallelism for operations executed through this environment.
+ * Setting a parallelism of x here will cause all operators (such as map,
+ * batchReduce) to run with x parallel instances. This method overrides the
+ * default parallelism for this environment. The
+ * {@link LocalStreamEnvironment} uses by default a value equal to the
+ * number of hardware contexts (CPU cores / threads). When executing the
+ * program via the command line client from a JAR file, the default degree
+ * of parallelism is the one configured for that setup.
+ *
+ * @param parallelism The parallelism
+ */
+ public StreamExecutionEnvironment setParallelism(int parallelism) {
+ if (parallelism < 1) {
+ throw new IllegalArgumentException("parallelism must be at least one.");
+ }
+ config.setParallelism(parallelism);
+ return this;
+ }
+
+ /**
+ * Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive)
+ * is Short.MAX_VALUE.
+ *
+ * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+ * defines the number of key groups used for partitioned state.
+ *
+ * @param maxParallelism Maximum degree of parallelism to be used for the program.,
+ * with 0 < maxParallelism <= 2^15 - 1
+ */
+ public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
+ Preconditions.checkArgument(maxParallelism > 0 &&
+ maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+ "maxParallelism is out of bounds 0 < maxParallelism <= " +
+ KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
+
+ config.setMaxParallelism(maxParallelism);
+ return this;
+ }
+
+ /**
+ * Gets the parallelism with which operation are executed by default.
+ * Operations can individually override this value to use a specific
+ * parallelism.
+ *
+ * @return The parallelism used by operations, unless they override that
+ * value.
+ */
+ public int getParallelism() {
+ return config.getParallelism();
+ }
+
+ /**
+ * Gets the maximum degree of parallelism defined for the program.
+ *
+ *
The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+ * defines the number of key groups used for partitioned state.
+ *
+ * @return Maximum degree of parallelism
+ */
+ public int getMaxParallelism() {
+ return config.getMaxParallelism();
+ }
+
+ /**
+ * Sets the maximum time frequency (milliseconds) for the flushing of the
+ * output buffers. By default the output buffers flush frequently to provide
+ * low latency and to aid smooth developer experience. Setting the parameter
+ * can result in three logical modes:
+ *
+ *
+ * - A positive integer triggers flushing periodically by that integer
+ * - 0 triggers flushing after every record thus minimizing latency
+ * - -1 triggers flushing only when the output buffer is full thus maximizing
+ * throughput
+ *
+ *
+ * @param timeoutMillis
+ * The maximum time between two output flushes.
+ */
+ public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
+ if (timeoutMillis < -1) {
+ throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
+ }
+
+ this.bufferTimeout = timeoutMillis;
+ return this;
+ }
+
+ /**
+ * Gets the maximum time frequency (milliseconds) for the flushing of the
+ * output buffers. For clarification on the extremal values see
+ * {@link #setBufferTimeout(long)}.
+ *
+ * @return The timeout of the buffer.
+ */
+ public long getBufferTimeout() {
+ return this.bufferTimeout;
+ }
+
+ /**
+ * Disables operator chaining for streaming operators. Operator chaining
+ * allows non-shuffle operations to be co-located in the same thread fully
+ * avoiding serialization and de-serialization.
+ *
+ * @return StreamExecutionEnvironment with chaining disabled.
+ */
+ @PublicEvolving
+ public StreamExecutionEnvironment disableOperatorChaining() {
+ this.isChainingEnabled = false;
+ return this;
+ }
+
+ /**
+ * Returns whether operator chaining is enabled.
+ *
+ * @return {@code true} if chaining is enabled, false otherwise.
+ */
+ @PublicEvolving
+ public boolean isChainingEnabled() {
+ return isChainingEnabled;
+ }
+
+ // ------------------------------------------------------------------------
+ // Checkpointing Settings
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the checkpoint config, which defines values like checkpoint interval, delay between
+ * checkpoints, etc.
+ *
+ * @return The checkpoint config.
+ */
+ public CheckpointConfig getCheckpointConfig() {
+ return checkpointCfg;
+ }
+
+ /**
+ * Enables checkpointing for the streaming job. The distributed state of the streaming
+ * dataflow will be periodically snapshotted. In case of a failure, the streaming
+ * dataflow will be restarted from the latest completed checkpoint. This method selects
+ * {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
+ *
+ * The job draws checkpoints periodically, in the given interval. The state will be
+ * stored in the configured state backend.
+ *
+ *
NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+ * the moment. For that reason, iterative jobs will not be started if used
+ * with enabled checkpointing. To override this mechanism, use the
+ * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.
+ *
+ * @param interval Time interval between state checkpoints in milliseconds.
+ */
+ public StreamExecutionEnvironment enableCheckpointing(long interval) {
+ checkpointCfg.setCheckpointInterval(interval);
+ return this;
+ }
+
+ /**
+ * Enables checkpointing for the streaming job. The distributed state of the streaming
+ * dataflow will be periodically snapshotted. In case of a failure, the streaming
+ * dataflow will be restarted from the latest completed checkpoint.
+ *
+ *
The job draws checkpoints periodically, in the given interval. The system uses the
+ * given {@link CheckpointingMode} for the checkpointing ("exactly once" vs "at least once").
+ * The state will be stored in the configured state backend.
+ *
+ *
NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+ * the moment. For that reason, iterative jobs will not be started if used
+ * with enabled checkpointing. To override this mechanism, use the
+ * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.
+ *
+ * @param interval
+ * Time interval between state checkpoints in milliseconds.
+ * @param mode
+ * The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
+ */
+ public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
+ checkpointCfg.setCheckpointingMode(mode);
+ checkpointCfg.setCheckpointInterval(interval);
+ return this;
+ }
+
+ /**
+ * Enables checkpointing for the streaming job. The distributed state of the streaming
+ * dataflow will be periodically snapshotted. In case of a failure, the streaming
+ * dataflow will be restarted from the latest completed checkpoint.
+ *
+ *
The job draws checkpoints periodically, in the given interval. The state will be
+ * stored in the configured state backend.
+ *
+ *
NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+ * the moment. If the "force" parameter is set to true, the system will execute the
+ * job nonetheless.
+ *
+ * @param interval
+ * Time interval between state checkpoints in millis.
+ * @param mode
+ * The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
+ * @param force
+ * If true checkpointing will be enabled for iterative jobs as well.
+ *
+ * @deprecated Use {@link #enableCheckpointing(long, CheckpointingMode)} instead.
+ * Forcing checkpoints will be removed in the future.
+ */
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ @PublicEvolving
+ public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
+ checkpointCfg.setCheckpointingMode(mode);
+ checkpointCfg.setCheckpointInterval(interval);
+ checkpointCfg.setForceCheckpointing(force);
+ return this;
+ }
+
+ /**
+ * Enables checkpointing for the streaming job. The distributed state of the streaming
+ * dataflow will be periodically snapshotted. In case of a failure, the streaming
+ * dataflow will be restarted from the latest completed checkpoint. This method selects
+ * {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
+ *
+ *
The job draws checkpoints periodically, in the default interval. The state will be
+ * stored in the configured state backend.
+ *
+ *
NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+ * the moment. For that reason, iterative jobs will not be started if used
+ * with enabled checkpointing. To override this mechanism, use the
+ * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.
+ *
+ * @deprecated Use {@link #enableCheckpointing(long)} instead.
+ */
+ @Deprecated
+ @PublicEvolving
+ public StreamExecutionEnvironment enableCheckpointing() {
+ checkpointCfg.setCheckpointInterval(500);
+ return this;
+ }
+
+ /**
+ * Returns the checkpointing interval or -1 if checkpointing is disabled.
+ *
+ *
Shorthand for {@code getCheckpointConfig().getCheckpointInterval()}.
+ *
+ * @return The checkpointing interval or -1
+ */
+ public long getCheckpointInterval() {
+ return checkpointCfg.getCheckpointInterval();
+ }
+
+ /**
+ * Returns whether checkpointing is force-enabled.
+ *
+ * @deprecated Forcing checkpoints will be removed in future version.
+ */
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ @PublicEvolving
+ public boolean isForceCheckpointing() {
+ return checkpointCfg.isForceCheckpointing();
+ }
+
+ /**
+ * Returns the checkpointing mode (exactly-once vs. at-least-once).
+ *
+ *
Shorthand for {@code getCheckpointConfig().getCheckpointingMode()}.
+ *
+ * @return The checkpoin
+ */
+ public CheckpointingMode getCheckpointingMode() {
+ return checkpointCfg.getCheckpointingMode();
+ }
+
+ /**
+ * Sets the state backend that describes how to store and checkpoint operator state. It defines in
+ * what form the key/value state ({@link ValueState}, accessible
+ * from operations on {@link org.apache.flink.streaming.api.datastream.KeyedStream}) is maintained
+ * (heap, managed memory, externally), and where state snapshots/checkpoints are stored, both for
+ * the key/value state, and for checkpointed functions (implementing the interface
+ * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}).
+ *
+ *
The {@link org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
+ * maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
+ * but can checkpoint only small states (some counters).
+ *
+ *
In contrast, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend}
+ * stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated
+ * file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee that state is not lost upon
+ * failures of individual nodes and that streaming program can be executed highly available and strongly
+ * consistent (assuming that Flink is run in high-availability mode).
+ *
+ * @return This StreamExecutionEnvironment itself, to allow chaining of function calls.
+ *
+ * @see #getStateBackend()
+ */
+ @PublicEvolving
+ public StreamExecutionEnvironment setStateBackend(AbstractStateBackend backend) {
+ this.defaultStateBackend = Preconditions.checkNotNull(backend);
+ return this;
+ }
+
+ /**
+ * Returns the state backend that defines how to store and checkpoint state.
+ * @return The state backend that defines how to store and checkpoint state.
+ *
+ * @see #setStateBackend(AbstractStateBackend)
+ */
+ @PublicEvolving
+ public AbstractStateBackend getStateBackend() {
+ return defaultStateBackend;
+ }
+
+ /**
+ * Sets the restart strategy configuration. The configuration specifies which restart strategy
+ * will be used for the execution graph in case of a restart.
+ *
+ * @param restartStrategyConfiguration Restart strategy configuration to be set
+ */
+ @PublicEvolving
+ public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
+ config.setRestartStrategy(restartStrategyConfiguration);
+ }
+
+ /**
+ * Returns the specified restart strategy configuration.
+ *
+ * @return The restart strategy configuration to be used
+ */
+ @PublicEvolving
+ public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
+ return config.getRestartStrategy();
+ }
+
+ /**
+ * Sets the number of times that failed tasks are re-executed. A value of
+ * zero effectively disables fault tolerance. A value of {@code -1}
+ * indicates that the system default value (as defined in the configuration)
+ * should be used.
+ *
+ * @param numberOfExecutionRetries
+ * The number of times the system will try to re-execute failed tasks.
+ *
+ * @deprecated This method will be replaced by {@link #setRestartStrategy}. The
+ * {@link RestartStrategies#fixedDelayRestart(int, Time)} contains the number of
+ * execution retries.
+ */
+ @Deprecated
+ @PublicEvolving
+ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+ config.setNumberOfExecutionRetries(numberOfExecutionRetries);
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed tasks.
+ * A value of {@code -1} indicates that the system default value (as defined
+ * in the configuration) should be used.
+ *
+ * @return The number of times the system will try to re-execute failed tasks.
+ *
+ * @deprecated This method will be replaced by {@link #getRestartStrategy}.
+ */
+ @Deprecated
+ @PublicEvolving
+ public int getNumberOfExecutionRetries() {
+ return config.getNumberOfExecutionRetries();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Registry for types and serializers
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Adds a new Kryo default serializer to the Runtime.
+ *
+ *
Note that the serializer instance must be serializable (as defined by
+ * java.io.Serializable), because it may be distributed to the worker nodes
+ * by java serialization.
+ *
+ * @param type
+ * The class of the types serialized with the given serializer.
+ * @param serializer
+ * The serializer to use.
+ */
+ public & Serializable>void addDefaultKryoSerializer(Class> type, T serializer) {
+ config.addDefaultKryoSerializer(type, serializer);
+ }
+
+ /**
+ * Adds a new Kryo default serializer to the Runtime.
+ *
+ * @param type
+ * The class of the types serialized with the given serializer.
+ * @param serializerClass
+ * The class of the serializer to use.
+ */
+ public void addDefaultKryoSerializer(Class> type, Class extends Serializer>> serializerClass) {
+ config.addDefaultKryoSerializer(type, serializerClass);
+ }
+
+ /**
+ * Registers the given type with a Kryo Serializer.
+ *
+ * Note that the serializer instance must be serializable (as defined by
+ * java.io.Serializable), because it may be distributed to the worker nodes
+ * by java serialization.
+ *
+ * @param type
+ * The class of the types serialized with the given serializer.
+ * @param serializer
+ * The serializer to use.
+ */
+ public & Serializable>void registerTypeWithKryoSerializer(Class> type, T serializer) {
+ config.registerTypeWithKryoSerializer(type, serializer);
+ }
+
+ /**
+ * Registers the given Serializer via its class as a serializer for the
+ * given type at the KryoSerializer.
+ *
+ * @param type
+ * The class of the types serialized with the given serializer.
+ * @param serializerClass
+ * The class of the serializer to use.
+ */
+ @SuppressWarnings("rawtypes")
+ public void registerTypeWithKryoSerializer(Class> type, Class extends Serializer> serializerClass) {
+ config.registerTypeWithKryoSerializer(type, serializerClass);
+ }
+
+ /**
+ * Registers the given type with the serialization stack. If the type is
+ * eventually serialized as a POJO, then the type is registered with the
+ * POJO serializer. If the type ends up being serialized with Kryo, then it
+ * will be registered at Kryo to make sure that only tags are written.
+ *
+ * @param type
+ * The class of the type to register.
+ */
+ public void registerType(Class> type) {
+ if (type == null) {
+ throw new NullPointerException("Cannot register null type class.");
+ }
+
+ TypeInformation> typeInfo = TypeExtractor.createTypeInfo(type);
+
+ if (typeInfo instanceof PojoTypeInfo) {
+ config.registerPojoType(type);
+ } else {
+ config.registerKryoType(type);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Time characteristic
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Sets the time characteristic for all streams create from this environment, e.g., processing
+ * time, event time, or ingestion time.
+ *
+ * If you set the characteristic to IngestionTime of EventTime this will set a default
+ * watermark update interval of 200 ms. If this is not applicable for your application
+ * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
+ *
+ * @param characteristic The time characteristic.
+ */
+ @PublicEvolving
+ public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
+ this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
+ if (characteristic == TimeCharacteristic.ProcessingTime) {
+ getConfig().setAutoWatermarkInterval(0);
+ } else {
+ getConfig().setAutoWatermarkInterval(200);
+ }
+ }
+
+ /**
+ * Gets the time characteristic.
+ *
+ * @see #setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)
+ *
+ * @return The time characteristic.
+ */
+ @PublicEvolving
+ public TimeCharacteristic getStreamTimeCharacteristic() {
+ return timeCharacteristic;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Data stream creations
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a new data stream that contains a sequence of numbers. This is a parallel source,
+ * if you manually set the parallelism to {@code 1}
+ * (using {@link org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setParallelism(int)})
+ * the generated sequence of elements is in order.
+ *
+ * @param from
+ * The number to start at (inclusive)
+ * @param to
+ * The number to stop at (inclusive)
+ * @return A data stream, containing all number in the [from, to] interval
+ */
+ public DataStreamSource generateSequence(long from, long to) {
+ if (from > to) {
+ throw new IllegalArgumentException("Start of sequence must not be greater than the end");
+ }
+ return addSource(new StatefulSequenceSource(from, to), "Sequence Source");
+ }
+
+ /**
+ * Creates a new data stream that contains the given elements. The elements must all be of the
+ * same type, for example, all of the {@link String} or {@link Integer}.
+ *
+ * The framework will try and determine the exact type from the elements. In case of generic
+ * elements, it may be necessary to manually supply the type information via
+ * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
+ *
+ *
Note that this operation will result in a non-parallel data stream source, i.e. a data
+ * stream source with a degree of parallelism one.
+ *
+ * @param data
+ * The array of elements to create the data stream from.
+ * @param
+ * The type of the returned data stream
+ * @return The data stream representing the given array of elements
+ */
+ @SafeVarargs
+ public final DataStreamSource fromElements(OUT... data) {
+ if (data.length == 0) {
+ throw new IllegalArgumentException("fromElements needs at least one element as argument");
+ }
+
+ TypeInformation typeInfo;
+ try {
+ typeInfo = TypeExtractor.getForObject(data[0]);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
+ + "; please specify the TypeInformation manually via "
+ + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
+ }
+ return fromCollection(Arrays.asList(data), typeInfo);
+ }
+
+ /**
+ * Creates a new data set that contains the given elements. The framework will determine the type according to the
+ * based type user supplied. The elements should be the same or be the subclass to the based type.
+ * The sequence of elements must not be empty.
+ * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param type
+ * The based class type in the collection.
+ * @param data
+ * The array of elements to create the data stream from.
+ * @param
+ * The type of the returned data stream
+ * @return The data stream representing the given array of elements
+ */
+ @SafeVarargs
+ public final DataStreamSource fromElements(Class type, OUT... data) {
+ if (data.length == 0) {
+ throw new IllegalArgumentException("fromElements needs at least one element as argument");
+ }
+
+ TypeInformation typeInfo;
+ try {
+ typeInfo = TypeExtractor.getForClass(type);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not create TypeInformation for type " + type.getName()
+ + "; please specify the TypeInformation manually via "
+ + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
+ }
+ return fromCollection(Arrays.asList(data), typeInfo);
+ }
+
+ /**
+ * Creates a data stream from the given non-empty collection. The type of the data stream is that of the
+ * elements in the collection.
+ *
+ * The framework will try and determine the exact type from the collection elements. In case of generic
+ * elements, it may be necessary to manually supply the type information via
+ * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
+ *
+ *
Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with
+ * parallelism one.
+ *
+ * @param data
+ * The collection of elements to create the data stream from.
+ * @param
+ * The generic type of the returned data stream.
+ * @return
+ * The data stream representing the given collection
+ */
+ public DataStreamSource fromCollection(Collection data) {
+ Preconditions.checkNotNull(data, "Collection must not be null");
+ if (data.isEmpty()) {
+ throw new IllegalArgumentException("Collection must not be empty");
+ }
+
+ OUT first = data.iterator().next();
+ if (first == null) {
+ throw new IllegalArgumentException("Collection must not contain null elements");
+ }
+
+ TypeInformation typeInfo;
+ try {
+ typeInfo = TypeExtractor.getForObject(first);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not create TypeInformation for type " + first.getClass()
+ + "; please specify the TypeInformation manually via "
+ + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
+ }
+ return fromCollection(data, typeInfo);
+ }
+
+ /**
+ * Creates a data stream from the given non-empty collection.
+ *
+ * Note that this operation will result in a non-parallel data stream source,
+ * i.e., a data stream source with parallelism one.
+ *
+ * @param data
+ * The collection of elements to create the data stream from
+ * @param typeInfo
+ * The TypeInformation for the produced data stream
+ * @param
+ * The type of the returned data stream
+ * @return The data stream representing the given collection
+ */
+ public DataStreamSource fromCollection(Collection data, TypeInformation typeInfo) {
+ Preconditions.checkNotNull(data, "Collection must not be null");
+
+ // must not have null elements and mixed elements
+ FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
+
+ SourceFunction function;
+ try {
+ function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ return addSource(function, "Collection Source", typeInfo).setParallelism(1);
+ }
+
+ /**
+ * Creates a data stream from the given iterator.
+ *
+ * Because the iterator will remain unmodified until the actual execution happens,
+ * the type of data returned by the iterator must be given explicitly in the form of the type
+ * class (this is due to the fact that the Java compiler erases the generic type information).
+ *
+ *
Note that this operation will result in a non-parallel data stream source, i.e.,
+ * a data stream source with a parallelism of one.
+ *
+ * @param data
+ * The iterator of elements to create the data stream from
+ * @param type
+ * The class of the data produced by the iterator. Must not be a generic class.
+ * @param
+ * The type of the returned data stream
+ * @return The data stream representing the elements in the iterator
+ * @see #fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
+ */
+ public DataStreamSource fromCollection(Iterator data, Class type) {
+ return fromCollection(data, TypeExtractor.getForClass(type));
+ }
+
+ /**
+ * Creates a data stream from the given iterator.
+ *
+ * Because the iterator will remain unmodified until the actual execution happens,
+ * the type of data returned by the iterator must be given explicitly in the form of the type
+ * information. This method is useful for cases where the type is generic.
+ * In that case, the type class (as given in
+ * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.
+ *
+ *
Note that this operation will result in a non-parallel data stream source, i.e.,
+ * a data stream source with parallelism one.
+ *
+ * @param data
+ * The iterator of elements to create the data stream from
+ * @param typeInfo
+ * The TypeInformation for the produced data stream
+ * @param
+ * The type of the returned data stream
+ * @return The data stream representing the elements in the iterator
+ */
+ public DataStreamSource fromCollection(Iterator data, TypeInformation typeInfo) {
+ Preconditions.checkNotNull(data, "The iterator must not be null");
+
+ SourceFunction function = new FromIteratorFunction<>(data);
+ return addSource(function, "Collection Source", typeInfo);
+ }
+
+ /**
+ * Creates a new data stream that contains elements in the iterator. The iterator is splittable,
+ * allowing the framework to create a parallel data stream source that returns the elements in
+ * the iterator.
+ *
+ * Because the iterator will remain unmodified until the actual execution happens, the type
+ * of data returned by the iterator must be given explicitly in the form of the type class
+ * (this is due to the fact that the Java compiler erases the generic type information).
+ *
+ * @param iterator
+ * The iterator that produces the elements of the data stream
+ * @param type
+ * The class of the data produced by the iterator. Must not be a generic class.
+ * @param
+ * The type of the returned data stream
+ * @return A data stream representing the elements in the iterator
+ */
+ public DataStreamSource fromParallelCollection(SplittableIterator iterator, Class type) {
+ return fromParallelCollection(iterator, TypeExtractor.getForClass(type));
+ }
+
+ /**
+ * Creates a new data stream that contains elements in the iterator. The iterator is splittable,
+ * allowing the framework to create a parallel data stream source that returns the elements in
+ * the iterator.
+ *
+ * Because the iterator will remain unmodified until the actual execution happens, the type
+ * of data returned by the iterator must be given explicitly in the form of the type
+ * information. This method is useful for cases where the type is generic. In that case, the
+ * type class (as given in
+ * {@link #fromParallelCollection(org.apache.flink.util.SplittableIterator, Class)} does not
+ * supply all type information.
+ *
+ * @param iterator
+ * The iterator that produces the elements of the data stream
+ * @param typeInfo
+ * The TypeInformation for the produced data stream.
+ * @param
+ * The type of the returned data stream
+ * @return A data stream representing the elements in the iterator
+ */
+ public DataStreamSource fromParallelCollection(SplittableIterator iterator, TypeInformation
+ typeInfo) {
+ return fromParallelCollection(iterator, typeInfo, "Parallel Collection Source");
+ }
+
+ // private helper for passing different names
+ private DataStreamSource fromParallelCollection(SplittableIterator iterator, TypeInformation
+ typeInfo, String operatorName) {
+ return addSource(new FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo);
+ }
+
+ /**
+ * Reads the given file line-by-line and creates a data stream that contains a string with the
+ * contents of each such line. The file will be read with the system's default character set.
+ *
+ * NOTES ON CHECKPOINTING: The source monitors the path, creates the
+ * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
+ * them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+ * and exits, without waiting for the readers to finish reading. This implies that no more
+ * checkpoint barriers are going to be forwarded after the source exits, thus having no
+ * checkpoints after that point.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @return The data stream that represents the data read from the given file as text lines
+ */
+ public DataStreamSource readTextFile(String filePath) {
+ return readTextFile(filePath, "UTF-8");
+ }
+
+ /**
+ * Reads the given file line-by-line and creates a data stream that contains a string with the
+ * contents of each such line. The {@link java.nio.charset.Charset} with the given name will be
+ * used to read the files.
+ *
+ * NOTES ON CHECKPOINTING: The source monitors the path, creates the
+ * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
+ * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+ * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
+ * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param charsetName
+ * The name of the character set used to read the file
+ * @return The data stream that represents the data read from the given file as text lines
+ */
+ public DataStreamSource readTextFile(String filePath, String charsetName) {
+ Preconditions.checkNotNull(filePath, "The file path must not be null.");
+ Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
+
+ TextInputFormat format = new TextInputFormat(new Path(filePath));
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
+ TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+ format.setCharsetName(charsetName);
+
+ return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
+ }
+
+ /**
+ * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}.
+ *
+ * Since all data streams need specific information about their types, this method needs to determine the
+ * type of the data produced by the input format. It will attempt to determine the data type by reflection,
+ * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+ * In the latter case, this method will invoke the
+ * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
+ * type produced by the input format.
+ *
+ *
NOTES ON CHECKPOINTING: The source monitors the path, creates the
+ * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
+ * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+ * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
+ * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param
+ * The type of the returned data stream
+ * @return The data stream that represents the data read from the given file
+ */
+ public DataStreamSource readFile(FileInputFormat inputFormat,
+ String filePath) {
+ return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1);
+ }
+
+ /**
+ * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending
+ * on the provided {@link FileProcessingMode}.
+ *
+ * See {@link #readFile(FileInputFormat, String, FileProcessingMode, long)}
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param watchType
+ * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
+ * @param interval
+ * In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
+ * @param filter
+ * The files to be excluded from the processing
+ * @param
+ * The type of the returned data stream
+ * @return The data stream that represents the data read from the given file
+ *
+ * @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and
+ * {@link StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)}
+ *
+ */
+ @PublicEvolving
+ @Deprecated
+ public DataStreamSource readFile(FileInputFormat inputFormat,
+ String filePath,
+ FileProcessingMode watchType,
+ long interval,
+ FilePathFilter filter) {
+ inputFormat.setFilesFilter(filter);
+
+ TypeInformation typeInformation;
+ try {
+ typeInformation = TypeExtractor.getInputFormatTypes(inputFormat);
+ } catch (Exception e) {
+ throw new InvalidProgramException("The type returned by the input format could not be " +
+ "automatically determined. Please specify the TypeInformation of the produced type " +
+ "explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
+ }
+ return readFile(inputFormat, filePath, watchType, interval, typeInformation);
+ }
+
+ /**
+ * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending
+ * on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms) the path
+ * for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the path and
+ * exit ({@link FileProcessingMode#PROCESS_ONCE}). In addition, if the path contains files not to be processed, the user
+ * can specify a custom {@link FilePathFilter}. As a default implementation you can use
+ * {@link FilePathFilter#createDefaultFilter()}.
+ *
+ * Since all data streams need specific information about their types, this method needs to determine the
+ * type of the data produced by the input format. It will attempt to determine the data type by reflection,
+ * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+ * In the latter case, this method will invoke the
+ * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
+ * type produced by the input format.
+ *
+ *
NOTES ON CHECKPOINTING: If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
+ * the source monitors the path once, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
+ * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+ * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
+ * are going to be forwarded after the source exits, thus having no checkpoints after that point.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param watchType
+ * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
+ * @param interval
+ * In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
+ * @param
+ * The type of the returned data stream
+ * @return The data stream that represents the data read from the given file
+ */
+ @PublicEvolving
+ public DataStreamSource readFile(FileInputFormat inputFormat,
+ String filePath,
+ FileProcessingMode watchType,
+ long interval) {
+
+ TypeInformation typeInformation;
+ try {
+ typeInformation = TypeExtractor.getInputFormatTypes(inputFormat);
+ } catch (Exception e) {
+ throw new InvalidProgramException("The type returned by the input format could not be " +
+ "automatically determined. Please specify the TypeInformation of the produced type " +
+ "explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
+ }
+ return readFile(inputFormat, filePath, watchType, interval, typeInformation);
+ }
+
+ /**
+ * Creates a data stream that contains the contents of file created while system watches the given path. The file
+ * will be read with the system's default character set.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path/")
+ * @param intervalMillis
+ * The interval of file watching in milliseconds
+ * @param watchType
+ * The watch type of file stream. When watchType is {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#ONLY_NEW_FILES}, the system processes
+ * only
+ * new files. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#REPROCESS_WITH_APPENDED} means that the system re-processes all contents of
+ * appended file. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#PROCESS_ONLY_APPENDED} means that the system processes only appended
+ * contents
+ * of files.
+ * @return The DataStream containing the given directory.
+ *
+ * @deprecated Use {@link #readFile(FileInputFormat, String, FileProcessingMode, long)} instead.
+ */
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ public DataStream readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType) {
+ DataStream> source = addSource(new FileMonitoringFunction(
+ filePath, intervalMillis, watchType), "Read File Stream source");
+
+ return source.flatMap(new FileReadFunction());
+ }
+
+ /**
+ * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}.
+ * Depending on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms)
+ * the path for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the
+ * path and exit ({@link FileProcessingMode#PROCESS_ONCE}). In addition, if the path contains files not to be processed,
+ * the user can specify a custom {@link FilePathFilter}. As a default implementation you can use
+ * {@link FilePathFilter#createDefaultFilter()}.
+ *
+ * NOTES ON CHECKPOINTING: If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
+ * the source monitors the path once, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
+ * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+ * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
+ * are going to be forwarded after the source exits, thus having no checkpoints after that point.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param watchType
+ * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
+ * @param typeInformation
+ * Information on the type of the elements in the output stream
+ * @param interval
+ * In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
+ * @param
+ * The type of the returned data stream
+ * @return The data stream that represents the data read from the given file
+ */
+ @PublicEvolving
+ public DataStreamSource readFile(FileInputFormat inputFormat,
+ String filePath,
+ FileProcessingMode watchType,
+ long interval,
+ TypeInformation typeInformation) {
+
+ Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
+ Preconditions.checkNotNull(filePath, "The file path must not be null.");
+ Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");
+
+ inputFormat.setFilePath(filePath);
+ return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval);
+ }
+
+ /**
+ * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
+ * decoded by the system's default character set. On the termination of the socket server connection retries can be
+ * initiated.
+ *
+ * Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
+ * the socket was gracefully terminated.
+ *
+ * @param hostname
+ * The host name which a server socket binds
+ * @param port
+ * The port number which a server socket binds. A port number of 0 means that the port number is automatically
+ * allocated.
+ * @param delimiter
+ * A character which splits received strings into records
+ * @param maxRetry
+ * The maximal retry interval in seconds while the program waits for a socket that is temporarily down.
+ * Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated,
+ * while
+ * a negative value ensures retrying forever.
+ * @return A data stream containing the strings received from the socket
+ *
+ * @deprecated Use {@link #socketTextStream(String, int, String, long)} instead.
+ */
+ @Deprecated
+ public DataStreamSource socketTextStream(String hostname, int port, char delimiter, long maxRetry) {
+ return socketTextStream(hostname, port, String.valueOf(delimiter), maxRetry);
+ }
+
+ /**
+ * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
+ * decoded by the system's default character set. On the termination of the socket server connection retries can be
+ * initiated.
+ *
+ * Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
+ * the socket was gracefully terminated.
+ *
+ * @param hostname
+ * The host name which a server socket binds
+ * @param port
+ * The port number which a server socket binds. A port number of 0 means that the port number is automatically
+ * allocated.
+ * @param delimiter
+ * A string which splits received strings into records
+ * @param maxRetry
+ * The maximal retry interval in seconds while the program waits for a socket that is temporarily down.
+ * Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated,
+ * while
+ * a negative value ensures retrying forever.
+ * @return A data stream containing the strings received from the socket
+ */
+ @PublicEvolving
+ public DataStreamSource socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
+ return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
+ "Socket Stream");
+ }
+
+ /**
+ * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
+ * decoded by the system's default character set. The reader is terminated immediately when the socket is down.
+ *
+ * @param hostname
+ * The host name which a server socket binds
+ * @param port
+ * The port number which a server socket binds. A port number of 0 means that the port number is automatically
+ * allocated.
+ * @param delimiter
+ * A character which splits received strings into records
+ * @return A data stream containing the strings received from the socket
+ *
+ * @deprecated Use {@link #socketTextStream(String, int, String)} instead.
+ */
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ public DataStreamSource socketTextStream(String hostname, int port, char delimiter) {
+ return socketTextStream(hostname, port, delimiter, 0);
+ }
+
+ /**
+ * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
+ * decoded by the system's default character set. The reader is terminated immediately when the socket is down.
+ *
+ * @param hostname
+ * The host name which a server socket binds
+ * @param port
+ * The port number which a server socket binds. A port number of 0 means that the port number is automatically
+ * allocated.
+ * @param delimiter
+ * A string which splits received strings into records
+ * @return A data stream containing the strings received from the socket
+ */
+ @PublicEvolving
+ public DataStreamSource socketTextStream(String hostname, int port, String delimiter) {
+ return socketTextStream(hostname, port, delimiter, 0);
+ }
+
+ /**
+ * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
+ * decoded by the system's default character set, using"\n" as delimiter. The reader is terminated immediately when
+ * the socket is down.
+ *
+ * @param hostname
+ * The host name which a server socket binds
+ * @param port
+ * The port number which a server socket binds. A port number of 0 means that the port number is automatically
+ * allocated.
+ * @return A data stream containing the strings received from the socket
+ */
+ @PublicEvolving
+ public DataStreamSource socketTextStream(String hostname, int port) {
+ return socketTextStream(hostname, port, "\n");
+ }
+
+ /**
+ * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
+ *
+ * Since all data streams need specific information about their types, this method needs to determine the
+ * type of the data produced by the input format. It will attempt to determine the data type by reflection,
+ * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+ * In the latter case, this method will invoke the
+ * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
+ * type produced by the input format.
+ *
+ *
NOTES ON CHECKPOINTING: In the case of a {@link FileInputFormat}, the source
+ * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the
+ * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
+ * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits,
+ * without waiting for the readers to finish reading. This implies that no more checkpoint
+ * barriers are going to be forwarded after the source exits, thus having no checkpoints.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param
+ * The type of the returned data stream
+ * @return The data stream that represents the data created by the input format
+ */
+ @PublicEvolving
+ public DataStreamSource createInput(InputFormat inputFormat) {
+ return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
+ }
+
+ /**
+ * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
+ *
+ * The data stream is typed to the given TypeInformation. This method is intended for input formats
+ * where the return type cannot be determined by reflection analysis, and that do not implement the
+ * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+ *
+ *
NOTES ON CHECKPOINTING: In the case of a {@link FileInputFormat}, the source
+ * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the
+ * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
+ * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits,
+ * without waiting for the readers to finish reading. This implies that no more checkpoint
+ * barriers are going to be forwarded after the source exits, thus having no checkpoints.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param typeInfo
+ * The information about the type of the output type
+ * @param
+ * The type of the returned data stream
+ * @return The data stream that represents the data created by the input format
+ */
+ @PublicEvolving
+ public DataStreamSource createInput(InputFormat inputFormat, TypeInformation typeInfo) {
+ DataStreamSource source;
+
+ if (inputFormat instanceof FileInputFormat) {
+ @SuppressWarnings("unchecked")
+ FileInputFormat format = (FileInputFormat) inputFormat;
+
+ source = createFileInput(format, typeInfo, "Custom File source",
+ FileProcessingMode.PROCESS_ONCE, -1);
+ } else {
+ source = createInput(inputFormat, typeInfo, "Custom Source");
+ }
+ return source;
+ }
+
+ private DataStreamSource createInput(InputFormat inputFormat,
+ TypeInformation typeInfo,
+ String sourceName) {
+
+ InputFormatSourceFunction function = new InputFormatSourceFunction<>(inputFormat, typeInfo);
+ return addSource(function, sourceName, typeInfo);
+ }
+
+ private DataStreamSource createFileInput(FileInputFormat inputFormat,
+ TypeInformation typeInfo,
+ String sourceName,
+ FileProcessingMode monitoringMode,
+ long interval) {
+
+ Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
+ Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
+ Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
+ Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");
+
+ Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||
+ interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
+ "The path monitoring interval cannot be less than " +
+ ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
+
+ ContinuousFileMonitoringFunction monitoringFunction =
+ new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);
+
+ ContinuousFileReaderOperator reader =
+ new ContinuousFileReaderOperator<>(inputFormat);
+
+ SingleOutputStreamOperator source = addSource(monitoringFunction, sourceName)
+ .transform("Split Reader: " + sourceName, typeInfo, reader);
+
+ return new DataStreamSource<>(source);
+ }
+
+ /**
+ * Adds a Data Source to the streaming topology.
+ *
+ * By default sources have a parallelism of 1. To enable parallel execution, the user defined source should
+ * implement {@link org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or extend {@link
+ * org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction}. In these cases the resulting source
+ * will have the parallelism of the environment. To change this afterwards call {@link
+ * org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism(int)}
+ *
+ * @param function
+ * the user defined function
+ * @param
+ * type of the returned stream
+ * @return the data stream constructed
+ */
+ public DataStreamSource addSource(SourceFunction function) {
+ return addSource(function, "Custom Source");
+ }
+
+ /**
+ * Ads a data source with a custom type information thus opening a
+ * {@link DataStream}. Only in very special cases does the user need to
+ * support type information. Otherwise use
+ * {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
+ *
+ * @param function
+ * the user defined function
+ * @param sourceName
+ * Name of the data source
+ * @param
+ * type of the returned stream
+ * @return the data stream constructed
+ */
+ public DataStreamSource addSource(SourceFunction function, String sourceName) {
+ return addSource(function, sourceName, null);
+ }
+
+ /**
+ * Ads a data source with a custom type information thus opening a
+ * {@link DataStream}. Only in very special cases does the user need to
+ * support type information. Otherwise use
+ * {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
+ *
+ * @param function
+ * the user defined function
+ * @param
+ * type of the returned stream
+ * @param typeInfo
+ * the user defined type information for the stream
+ * @return the data stream constructed
+ */
+ public DataStreamSource addSource(SourceFunction function, TypeInformation typeInfo) {
+ return addSource(function, "Custom Source", typeInfo);
+ }
+
+ /**
+ * Ads a data source with a custom type information thus opening a
+ * {@link DataStream}. Only in very special cases does the user need to
+ * support type information. Otherwise use
+ * {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
+ *
+ * @param function
+ * the user defined function
+ * @param sourceName
+ * Name of the data source
+ * @param
+ * type of the returned stream
+ * @param typeInfo
+ * the user defined type information for the stream
+ * @return the data stream constructed
+ */
+ @SuppressWarnings("unchecked")
+ public DataStreamSource addSource(SourceFunction function, String sourceName, TypeInformation typeInfo) {
+
+ if (typeInfo == null) {
+ if (function instanceof ResultTypeQueryable) {
+ typeInfo = ((ResultTypeQueryable) function).getProducedType();
+ } else {
+ try {
+ typeInfo = TypeExtractor.createTypeInfo(
+ SourceFunction.class,
+ function.getClass(), 0, null, null);
+ } catch (final InvalidTypesException e) {
+ typeInfo = (TypeInformation) new MissingTypeInfo(sourceName, e);
+ }
+ }
+ }
+
+ boolean isParallel = function instanceof ParallelSourceFunction;
+
+ clean(function);
+ StreamSource sourceOperator;
+ if (function instanceof StoppableFunction) {
+ sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
+ } else {
+ sourceOperator = new StreamSource<>(function);
+ }
+
+ return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
+ }
+
+ /**
+ * Casts the source function into a SourceFunction implementing the StoppableFunction.
+ *
+ * This method should only be used if the source function was checked to implement the
+ * {@link StoppableFunction} interface.
+ *
+ * @param sourceFunction Source function to cast
+ * @param Output type of source function
+ * @param Union type of SourceFunction and StoppableFunction
+ * @return The casted source function so that it's type implements the StoppableFunction
+ */
+ @SuppressWarnings("unchecked")
+ private & StoppableFunction> T cast2StoppableSourceFunction(SourceFunction sourceFunction) {
+ return (T) sourceFunction;
+ }
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of
+ * the program that have resulted in a "sink" operation. Sink operations are
+ * for example printing results or forwarding them to a message queue.
+ *
+ * The program execution will be logged and displayed with a generated
+ * default name.
+ *
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ * @throws Exception which occurs during job execution.
+ */
+ public JobExecutionResult execute() throws Exception {
+ return execute(DEFAULT_JOB_NAME);
+ }
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of
+ * the program that have resulted in a "sink" operation. Sink operations are
+ * for example printing results or forwarding them to a message queue.
+ *
+ *
The program execution will be logged and displayed with the provided name
+ *
+ * @param jobName
+ * Desired name of the job
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ * @throws Exception which occurs during job execution.
+ */
+ public abstract JobExecutionResult execute(String jobName) throws Exception;
+
+ /**
+ * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
+ *
+ * @return The streamgraph representing the transformations
+ */
+ @Internal
+ public StreamGraph getStreamGraph() {
+ if (transformations.size() <= 0) {
+ throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
+ }
+ return StreamGraphGenerator.generate(this, transformations);
+ }
+
+ /**
+ * Creates the plan with which the system will execute the program, and
+ * returns it as a String using a JSON representation of the execution data
+ * flow graph. Note that this needs to be called, before the plan is
+ * executed.
+ *
+ * @return The execution plan of the program, as a JSON String.
+ */
+ public String getExecutionPlan() {
+ return getStreamGraph().getStreamingPlanAsJSON();
+ }
+
+ /**
+ * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+ * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+ */
+ @Internal
+ public F clean(F f) {
+ if (getConfig().isClosureCleanerEnabled()) {
+ ClosureCleaner.clean(f, true);
+ }
+ ClosureCleaner.ensureSerializable(f);
+ return f;
+ }
+
+ /**
+ * Adds an operator to the list of operators that should be executed when calling
+ * {@link #execute}.
+ *
+ * When calling {@link #execute()} only the operators that where previously added to the list
+ * are executed.
+ *
+ *
This is not meant to be used by users. The API methods that create operators must call
+ * this method.
+ */
+ @Internal
+ public void addOperator(StreamTransformation> transformation) {
+ Preconditions.checkNotNull(transformation, "transformation must not be null.");
+ this.transformations.add(transformation);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Factory methods for ExecutionEnvironments
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates an execution environment that represents the context in which the
+ * program is currently executed. If the program is invoked standalone, this
+ * method returns a local execution environment, as returned by
+ * {@link #createLocalEnvironment()}.
+ *
+ * @return The execution environment of the context in which the program is
+ * executed.
+ */
+ public static StreamExecutionEnvironment getExecutionEnvironment() {
+ if (contextEnvironmentFactory.get() != null) {
+ return contextEnvironmentFactory.get().createExecutionEnvironment();
+ }
+
+ // because the streaming project depends on "flink-clients" (and not the other way around)
+ // we currently need to intercept the data set environment and create a dependent stream env.
+ // this should be fixed once we rework the project dependencies
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ if (env instanceof ContextEnvironment) {
+ return new StreamContextEnvironment((ContextEnvironment) env);
+ } else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
+ return new StreamPlanEnvironment(env);
+ } else {
+ return createLocalEnvironment();
+ }
+ }
+
+ /**
+ * Creates a {@link LocalStreamEnvironment}. The local execution environment
+ * will run the program in a multi-threaded fashion in the same JVM as the
+ * environment was created in. The default parallelism of the local
+ * environment is the number of hardware contexts (CPU cores / threads),
+ * unless it was specified differently by {@link #setParallelism(int)}.
+ *
+ * @return A local execution environment.
+ */
+ public static LocalStreamEnvironment createLocalEnvironment() {
+ return createLocalEnvironment(defaultLocalParallelism);
+ }
+
+ /**
+ * Creates a {@link LocalStreamEnvironment}. The local execution environment
+ * will run the program in a multi-threaded fashion in the same JVM as the
+ * environment was created in. It will use the parallelism specified in the
+ * parameter.
+ *
+ * @param parallelism
+ * The parallelism for the local environment.
+ * @return A local execution environment with the specified parallelism.
+ */
+ public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
+ LocalStreamEnvironment env = new LocalStreamEnvironment();
+ env.setParallelism(parallelism);
+ return env;
+ }
+
+ /**
+ * Creates a {@link LocalStreamEnvironment}. The local execution environment
+ * will run the program in a multi-threaded fashion in the same JVM as the
+ * environment was created in. It will use the parallelism specified in the
+ * parameter.
+ *
+ * @param parallelism
+ * The parallelism for the local environment.
+ * @param configuration
+ * Pass a custom configuration into the cluster
+ * @return A local execution environment with the specified parallelism.
+ */
+ public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
+ LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration);
+ currentEnvironment.setParallelism(parallelism);
+ return currentEnvironment;
+ }
+
+ /**
+ * Creates a {@link LocalStreamEnvironment} for local program execution that also starts the
+ * web monitoring UI.
+ *
+ *
The local execution environment will run the program in a multi-threaded fashion in
+ * the same JVM as the environment was created in. It will use the parallelism specified in the
+ * parameter.
+ *
+ *
If the configuration key 'jobmanager.web.port' was set in the configuration, that particular
+ * port will be used for the web UI. Otherwise, the default port (8081) will be used.
+ */
+ @PublicEvolving
+ public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
+ checkNotNull(conf, "conf");
+
+ conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
+
+ LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
+ localEnv.setParallelism(defaultLocalParallelism);
+
+ return localEnv;
+ }
+
+ /**
+ * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
+ * (parts of) the program to a cluster for execution. Note that all file
+ * paths used in the program must be accessible from the cluster. The
+ * execution will use no parallelism, unless the parallelism is set
+ * explicitly via {@link #setParallelism}.
+ *
+ * @param host
+ * The host name or address of the master (JobManager), where the
+ * program should be executed.
+ * @param port
+ * The port of the master (JobManager), where the program should
+ * be executed.
+ * @param jarFiles
+ * The JAR files with code that needs to be shipped to the
+ * cluster. If the program uses user-defined functions,
+ * user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
+ * @return A remote environment that executes the program on a cluster.
+ */
+ public static StreamExecutionEnvironment createRemoteEnvironment(
+ String host, int port, String... jarFiles) {
+ return new RemoteStreamEnvironment(host, port, jarFiles);
+ }
+
+ /**
+ * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
+ * (parts of) the program to a cluster for execution. Note that all file
+ * paths used in the program must be accessible from the cluster. The
+ * execution will use the specified parallelism.
+ *
+ * @param host
+ * The host name or address of the master (JobManager), where the
+ * program should be executed.
+ * @param port
+ * The port of the master (JobManager), where the program should
+ * be executed.
+ * @param parallelism
+ * The parallelism to use during the execution.
+ * @param jarFiles
+ * The JAR files with code that needs to be shipped to the
+ * cluster. If the program uses user-defined functions,
+ * user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
+ * @return A remote environment that executes the program on a cluster.
+ */
+ public static StreamExecutionEnvironment createRemoteEnvironment(
+ String host, int port, int parallelism, String... jarFiles) {
+ RemoteStreamEnvironment env = new RemoteStreamEnvironment(host, port, jarFiles);
+ env.setParallelism(parallelism);
+ return env;
+ }
+
+ /**
+ * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
+ * (parts of) the program to a cluster for execution. Note that all file
+ * paths used in the program must be accessible from the cluster. The
+ * execution will use the specified parallelism.
+ *
+ * @param host
+ * The host name or address of the master (JobManager), where the
+ * program should be executed.
+ * @param port
+ * The port of the master (JobManager), where the program should
+ * be executed.
+ * @param clientConfig
+ * The configuration used by the client that connects to the remote cluster.
+ * @param jarFiles
+ * The JAR files with code that needs to be shipped to the
+ * cluster. If the program uses user-defined functions,
+ * user-defined input formats, or any libraries, those must be
+ * provided in the JAR files.
+ * @return A remote environment that executes the program on a cluster.
+ */
+ public static StreamExecutionEnvironment createRemoteEnvironment(
+ String host, int port, Configuration clientConfig, String... jarFiles) {
+ return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
+ }
+
+ /**
+ * Gets the default parallelism that will be used for the local execution environment created by
+ * {@link #createLocalEnvironment()}.
+ *
+ * @return The default local parallelism
+ */
+ @PublicEvolving
+ public static int getDefaultLocalParallelism() {
+ return defaultLocalParallelism;
+ }
+
+ /**
+ * Sets the default parallelism that will be used for the local execution
+ * environment created by {@link #createLocalEnvironment()}.
+ *
+ * @param parallelism The parallelism to use as the default local parallelism.
+ */
+ @PublicEvolving
+ public static void setDefaultLocalParallelism(int parallelism) {
+ defaultLocalParallelism = parallelism;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Methods to control the context and local environments for execution from packaged programs
+ // --------------------------------------------------------------------------------------------
+
+ protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
+ contextEnvironmentFactory.set(ctx);
+ }
+
+ protected static void resetContextEnvironment() {
+ contextEnvironmentFactory.remove();
+ }
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+ * The runtime will copy the files temporarily to a local cache, if needed.
+ *
+ *
The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+ * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+ * {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ */
+ public void registerCachedFile(String filePath, String name) {
+ registerCachedFile(filePath, name, false);
+ }
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
+ * The runtime will copy the files temporarily to a local cache, if needed.
+ *
+ *
The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+ * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
+ * {@link org.apache.flink.api.common.cache.DistributedCache} via
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ * @param executable flag indicating whether the file should be executable
+ */
+ public void registerCachedFile(String filePath, String name, boolean executable) {
+ this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
+ }
+}
diff --git a/flinkx-es/flinkx-es-reader/pom.xml b/flinkx-es/flinkx-es-reader/pom.xml
index 10377efeee..0658a53403 100644
--- a/flinkx-es/flinkx-es-reader/pom.xml
+++ b/flinkx-es/flinkx-es-reader/pom.xml
@@ -36,11 +36,6 @@
com.dtstack.flinkx:flinkx-core
-
-
- ch.qos.logback:*
- org.slf4j:slf4j-api
-
org.apache.flink:*
com.data-artisans:*
diff --git a/flinkx-es/flinkx-es-writer/pom.xml b/flinkx-es/flinkx-es-writer/pom.xml
index a44ff3e5dc..fcafa410e2 100644
--- a/flinkx-es/flinkx-es-writer/pom.xml
+++ b/flinkx-es/flinkx-es-writer/pom.xml
@@ -45,9 +45,6 @@
com.dtstack.flinkx:flinkx-core
com.google.code.gson:*
- ch.qos.logback:*
- org.slf4j:slf4j-api
-
org.apache.flink:*
com.data-artisans:*
diff --git a/flinkx-examples/src/main/resources/examples/ftp_to_mysql.json b/flinkx-examples/examples/ftp_to_mysql.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/ftp_to_mysql.json
rename to flinkx-examples/examples/ftp_to_mysql.json
diff --git a/flinkx-examples/src/main/resources/examples/hive_orc_to_mysql.json b/flinkx-examples/examples/hive_orc_to_mysql.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/hive_orc_to_mysql.json
rename to flinkx-examples/examples/hive_orc_to_mysql.json
diff --git a/flinkx-examples/src/main/resources/examples/mysql_to_ftp.json b/flinkx-examples/examples/mysql_to_ftp.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/mysql_to_ftp.json
rename to flinkx-examples/examples/mysql_to_ftp.json
diff --git a/flinkx-examples/src/main/resources/examples/mysql_to_hdfs_orc.json b/flinkx-examples/examples/mysql_to_hdfs_orc.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/mysql_to_hdfs_orc.json
rename to flinkx-examples/examples/mysql_to_hdfs_orc.json
diff --git a/flinkx-examples/src/main/resources/examples/mysql_to_mysql.json b/flinkx-examples/examples/mysql_to_mysql.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/mysql_to_mysql.json
rename to flinkx-examples/examples/mysql_to_mysql.json
diff --git a/flinkx-examples/src/main/resources/examples/mysql_to_oracle.json b/flinkx-examples/examples/mysql_to_oracle.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/mysql_to_oracle.json
rename to flinkx-examples/examples/mysql_to_oracle.json
diff --git a/flinkx-examples/src/main/resources/examples/mysql_to_sftp.json b/flinkx-examples/examples/mysql_to_sftp.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/mysql_to_sftp.json
rename to flinkx-examples/examples/mysql_to_sftp.json
diff --git a/flinkx-examples/src/main/resources/examples/mysql_to_sqlserver.json b/flinkx-examples/examples/mysql_to_sqlserver.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/mysql_to_sqlserver.json
rename to flinkx-examples/examples/mysql_to_sqlserver.json
diff --git a/flinkx-examples/src/main/resources/examples/oracle_to_hdfs_text.json b/flinkx-examples/examples/oracle_to_hdfs_text.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/oracle_to_hdfs_text.json
rename to flinkx-examples/examples/oracle_to_hdfs_text.json
diff --git a/flinkx-examples/src/main/resources/examples/oracle_to_oracle.json b/flinkx-examples/examples/oracle_to_oracle.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/oracle_to_oracle.json
rename to flinkx-examples/examples/oracle_to_oracle.json
diff --git a/flinkx-examples/src/main/resources/examples/sftp_to_mysql.json b/flinkx-examples/examples/sftp_to_mysql.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/sftp_to_mysql.json
rename to flinkx-examples/examples/sftp_to_mysql.json
diff --git a/flinkx-examples/src/main/resources/examples/sqlserver_to_mysql.json b/flinkx-examples/examples/sqlserver_to_mysql.json
similarity index 100%
rename from flinkx-examples/src/main/resources/examples/sqlserver_to_mysql.json
rename to flinkx-examples/examples/sqlserver_to_mysql.json
diff --git a/flinkx-examples/src/main/java/com/dtstack/flinkx/examples/ExampleGenerator.java b/flinkx-examples/src/main/java/com/dtstack/flinkx/examples/ExampleGenerator.java
index 3908f46e1e..4339a2610a 100644
--- a/flinkx-examples/src/main/java/com/dtstack/flinkx/examples/ExampleGenerator.java
+++ b/flinkx-examples/src/main/java/com/dtstack/flinkx/examples/ExampleGenerator.java
@@ -20,14 +20,24 @@
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* The class used for generating data-transfer-task json files using variable substitution
@@ -38,15 +48,19 @@
public class ExampleGenerator {
private static final String OPTION_CONF_DIR = "c";
private static final String OPTION_TEMPLATE_DIR = "t";
+ private static final String OPTION_OUTPUT_DIR = "o";
private static final String DEFAULT_CONF_DIR = null;
private static final String DEFAULT_TEMPLATE_DIR;
private Properties substituteMap = new Properties();
private String confDir;
private String templateDir;
+ private String outputDir;
+ private List tempList = new ArrayList<>();
- public ExampleGenerator(String confDir, String templateDir) {
+ public ExampleGenerator(String confDir, String templateDir, String outputDir) {
this.confDir = confDir;
this.templateDir = templateDir;
+ this.outputDir = outputDir;
}
static {
@@ -56,48 +70,92 @@ public ExampleGenerator(String confDir, String templateDir) {
public void generate() throws IOException {
initVarMap();
- }
+ initTempList();
- private void initVarMap() throws IOException {
- if(confDir == null || confDir.trim().length() == 0) {
- substituteMap.load(getClass().getResourceAsStream("/examples.conf"));
- } else {
- File dir = new File(confDir);
- if(dir.exists() && dir.isDirectory()) {
- File[] confFiles = dir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.toLowerCase().endsWith(".jar");
+ for(String tempFile : tempList) {
+ String[] part = tempFile.split(File.separator);
+ String outputPath = outputDir + File.separator + part[part.length - 1];
+ try(BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(tempFile)))) {
+ try(BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outputPath)))) {
+ String line;
+ while((line = br.readLine()) != null) {
+ bw.write(substituteVars(line));
+ bw.write("\n");
}
- });
- for(File confFile : confFiles) {
- substituteMap.load(new FileInputStream(confFile));
}
}
}
- if(templateDir == null || templateDir.trim().length() == 0) {
-
+ }
+
+ public void initTempList() throws IOException {
+ File dir = new File(templateDir);
+ if(dir.exists() && dir.isDirectory()) {
+ File[] tempFiles = dir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.toLowerCase().endsWith(".json");
+ }
+ });
+ if(tempFiles != null) {
+ for(File tempFile : tempFiles) {
+ tempList.add(tempFile.getPath());
+ }
+ }
+ }
+ }
+
+ public void initVarMap() throws IOException {
+ File dir = new File(confDir);
+ if(dir.exists() && dir.isDirectory()) {
+ File[] confFiles = dir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.toLowerCase().endsWith(".conf");
+ }
+ });
+ for(File confFile : confFiles) {
+ substituteMap.load(new FileInputStream(confFile));
+ }
}
}
- private String substituteVars(String str) {
- return null;
+ public String substituteVars(String str) {
+ StringBuilder sb = new StringBuilder();
+ String pattern = "\\$\\{(.+?)\\}";
+ Pattern r = Pattern.compile(pattern);
+ Matcher m = r.matcher(str);
+ int start = 0;
+ while(m.find()) {
+ String var = m.group(1);
+ if(substituteMap.containsKey(var)) {
+ sb.append(str.substring(start, m.start()));
+ sb.append(substituteMap.get(var));
+ start = m.end();
+ }
+ }
+ if(start < str.length()) {
+ sb.append(str.substring(start));
+ }
+ return sb.toString();
}
private static ExampleGenerator getInstance(String[] args) throws ParseException {
Options options = new Options();
options.addOption(OPTION_CONF_DIR, true, "Variable configuration directory");
options.addOption(OPTION_TEMPLATE_DIR, true, "Task template directory");
+ options.addOption(OPTION_OUTPUT_DIR, true, "Output Directory");
BasicParser parser = new BasicParser();
CommandLine cmdLine = parser.parse(options, args);
String confDir = cmdLine.getOptionValue(OPTION_CONF_DIR);
- String templateDir = cmdLine.getOptionValue(OPTION_CONF_DIR);
- return new ExampleGenerator(confDir,templateDir);
+ String templateDir = cmdLine.getOptionValue(OPTION_TEMPLATE_DIR);
+ String outputDir = cmdLine.getOptionValue(OPTION_OUTPUT_DIR);
+ return new ExampleGenerator(confDir, templateDir, outputDir);
}
public static void main(String[] args) throws IOException, ParseException {
ExampleGenerator generator = getInstance(args);
generator.generate();
}
+
}
diff --git a/flinkx-examples/src/main/resources/examples.conf b/flinkx-examples/src/main/resources/examples.conf
deleted file mode 100644
index 2f5002ef1a..0000000000
--- a/flinkx-examples/src/main/resources/examples.conf
+++ /dev/null
@@ -1 +0,0 @@
-xxx=111
\ No newline at end of file
diff --git a/flinkx-ftp/flinkx-ftp-reader/pom.xml b/flinkx-ftp/flinkx-ftp-reader/pom.xml
index 1b4eb8c8e3..1b4fba1547 100644
--- a/flinkx-ftp/flinkx-ftp-reader/pom.xml
+++ b/flinkx-ftp/flinkx-ftp-reader/pom.xml
@@ -54,8 +54,6 @@ under the License.
com.dtstack.flinkx:flinkx-core
- ch.qos.logback:*
- org.slf4j:slf4j-api
org.apache.flink:*
com.data-artisans:*
org.scala-lang:*
diff --git a/flinkx-ftp/flinkx-ftp-writer/pom.xml b/flinkx-ftp/flinkx-ftp-writer/pom.xml
index ee8b8001c4..15297d5c45 100644
--- a/flinkx-ftp/flinkx-ftp-writer/pom.xml
+++ b/flinkx-ftp/flinkx-ftp-writer/pom.xml
@@ -54,7 +54,6 @@ under the License.
com.dtstack.flinkx:flinkx-core
com.google.code.gson:*
- ch.qos.logback:*
org.apache.flink:*
com.data-artisans:*
org.scala-lang:*
diff --git a/flinkx-hbase/flinkx-hbase-core/src/main/java/com/dtstack/flinkx/hbase/HbaseHelper.java b/flinkx-hbase/flinkx-hbase-core/src/main/java/com/dtstack/flinkx/hbase/HbaseHelper.java
index 4c35795186..1e695893c7 100644
--- a/flinkx-hbase/flinkx-hbase-core/src/main/java/com/dtstack/flinkx/hbase/HbaseHelper.java
+++ b/flinkx-hbase/flinkx-hbase-core/src/main/java/com/dtstack/flinkx/hbase/HbaseHelper.java
@@ -142,8 +142,9 @@ public static void checkHbaseTable(Admin admin, TableName table) throws IOExce
public static void closeBufferedMutator(BufferedMutator bufferedMutator){
try {
- if(null != bufferedMutator)
+ if(null != bufferedMutator){
bufferedMutator.close();
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/flinkx-hbase/flinkx-hbase-reader/pom.xml b/flinkx-hbase/flinkx-hbase-reader/pom.xml
index ce49dc3e43..2c3507bf46 100644
--- a/flinkx-hbase/flinkx-hbase-reader/pom.xml
+++ b/flinkx-hbase/flinkx-hbase-reader/pom.xml
@@ -38,8 +38,6 @@
com.dtstack.flinkx:flinkx-core
- ch.qos.logback:*
- org.slf4j:slf4j-api
org.apache.flink:*
diff --git a/flinkx-hbase/flinkx-hbase-writer/pom.xml b/flinkx-hbase/flinkx-hbase-writer/pom.xml
index 7f49af89e0..e7ce7428a3 100644
--- a/flinkx-hbase/flinkx-hbase-writer/pom.xml
+++ b/flinkx-hbase/flinkx-hbase-writer/pom.xml
@@ -38,10 +38,6 @@
com.dtstack.flinkx:flinkx-core
com.google.code.gson:*
- ch.qos.logback:*
-
-
-
org.apache.flink:*
com.data-artisans:*
diff --git a/flinkx-hdfs/flinkx-hdfs-reader/pom.xml b/flinkx-hdfs/flinkx-hdfs-reader/pom.xml
index 04e99a9ef3..5f74f1c4e2 100644
--- a/flinkx-hdfs/flinkx-hdfs-reader/pom.xml
+++ b/flinkx-hdfs/flinkx-hdfs-reader/pom.xml
@@ -59,10 +59,6 @@ under the License.
com.dtstack.flinkx:flinkx-core
-
- ch.qos.logback:*
- org.slf4j:slf4j-api
-
org.apache.flink:*
com.data-artisans:*
diff --git a/flinkx-hdfs/flinkx-hdfs-writer/pom.xml b/flinkx-hdfs/flinkx-hdfs-writer/pom.xml
index c26aefdef1..0182476069 100644
--- a/flinkx-hdfs/flinkx-hdfs-writer/pom.xml
+++ b/flinkx-hdfs/flinkx-hdfs-writer/pom.xml
@@ -68,9 +68,6 @@ under the License.
com.dtstack.flinkx:flinkx-rdb
com.google.code.gson:*
- ch.qos.logback:*
- org.slf4j:slf4j-api
-
org.apache.flink:*
com.data-artisans:*
diff --git a/flinkx-launcher/.gitignore b/flinkx-launcher/.gitignore
new file mode 100644
index 0000000000..135190f227
--- /dev/null
+++ b/flinkx-launcher/.gitignore
@@ -0,0 +1,2 @@
+# Created by .ignore support plugin (hsz.mobi)
+dependency-reduced-pom.xml
\ No newline at end of file
diff --git a/flinkx-launcher/dependency-reduced-pom.xml b/flinkx-launcher/dependency-reduced-pom.xml
deleted file mode 100644
index bb81d28b3b..0000000000
--- a/flinkx-launcher/dependency-reduced-pom.xml
+++ /dev/null
@@ -1,89 +0,0 @@
-
-
-
- flinkx-all
- com.dtstack.flinkx
- 1.6
-
- 4.0.0
- flinkx-launcher
-
-
-
- maven-shade-plugin
- 3.0.0
-
-
- package
-
-