diff --git a/docker/docker-compose-flinksql.yml b/docker/docker-compose-flinksql.yml index 49719c76..260955d4 100644 --- a/docker/docker-compose-flinksql.yml +++ b/docker/docker-compose-flinksql.yml @@ -1,61 +1,50 @@ services: broker: - image: confluentinc/cp-kafka:7.4.1 + image: confluentinc/cp-kafka:8.2.0 hostname: broker container_name: broker - ports: - - 29092:29092 environment: - KAFKA_BROKER_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_PROCESS_ROLES: broker,controller - KAFKA_NODE_ID: 1 - KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 - KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_LOG_DIRS: /tmp/kraft-combined-logs - CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: 'localhost' + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' + KAFKA_LISTENERS: 'PLAINTEXT://broker:9092,CONTROLLER://broker:29093' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" + # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh + CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' + schema-registry: - image: confluentinc/cp-schema-registry:7.3.0 + image: confluentinc/cp-schema-registry:8.2.0 hostname: schema-registry container_name: schema-registry depends_on: - broker - ports: - - 8081:8081 environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092 - flink-sql-client: - image: cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17 - hostname: flink-sql-client - container_name: flink-sql-client - depends_on: - - flink-jobmanager - environment: - FLINK_JOBMANAGER_HOST: flink-jobmanager - volumes: - - ./settings/:/settings + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + flink-jobmanager: - image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17 + image: cnfldemos/flink-kafka:2.2.0-scala_2.12-java17 hostname: flink-jobmanager container_name: flink-jobmanager ports: - - 9081:9081 + - 8081:8081 command: jobmanager - environment: - - | - FLINK_PROPERTIES= - jobmanager.rpc.address: flink-jobmanager - rest.bind-port: 9081 + flink-taskmanager: - image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17 + image: cnfldemos/flink-kafka:2.2.0-scala_2.12-java17 hostname: flink-taskmanager container_name: flink-taskmanager depends_on: @@ -67,3 +56,18 @@ services: FLINK_PROPERTIES= jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 10 + + flink-sql-client: + image: cnfldemos/flink-sql-client-kafka:2.2.0-scala_2.12-java17 + hostname: flink-sql-client + container_name: flink-sql-client + depends_on: + - flink-jobmanager + - flink-taskmanager + - broker + - schema-registry + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: flink-jobmanager + taskmanager.numberOfTaskSlots: 10 diff --git a/docker/flink/Dockerfile.flink-kafka b/docker/flink/Dockerfile.flink-kafka index a1dd0815..e8cd52e4 100644 --- a/docker/flink/Dockerfile.flink-kafka +++ b/docker/flink/Dockerfile.flink-kafka @@ -1,11 +1,10 @@ -FROM flink:1.19.1-scala_2.12-java11 +FROM flink:2.2.0-scala_2.12-java17 LABEL io.confluent.docker=true -# Download connector libraries -RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar; \ - wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.19.1/flink-json-1.19.1.jar; \ - wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.19.1/flink-sql-avro-confluent-registry-1.19.1.jar; +RUN wget -P /opt/flink/lib https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/4.0.1-2.0/flink-sql-connector-kafka-4.0.1-2.0.jar; \ + wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/2.2.0/flink-json-2.2.0.jar; \ + wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/2.2.0/flink-sql-avro-confluent-registry-2.2.0.jar; # Copy configuration -COPY ./conf/* /opt/flink/conf/ +COPY ./conf/* /opt/flink/conf/ \ No newline at end of file diff --git a/docker/flink/Dockerfile.flink-sql-client-kafka b/docker/flink/Dockerfile.flink-sql-client-kafka index e284b6bf..b499b204 100644 --- a/docker/flink/Dockerfile.flink-sql-client-kafka +++ b/docker/flink/Dockerfile.flink-sql-client-kafka @@ -1,23 +1,13 @@ -FROM flink:1.19.1-scala_2.12-java11 +FROM flink:2.2.0-scala_2.12-java17 LABEL io.confluent.docker=true -# Create CLI lib folder -RUN mkdir -p /opt/sql-client/lib - -# Download connector libraries -RUN wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar; \ - wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.19.1/flink-json-1.19.1.jar; \ - wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.19.1/flink-sql-avro-confluent-registry-1.19.1.jar; - -# Also copy to Flink lib so that, e.g., other catalog types can be used if desired -RUN cp /opt/sql-client/lib/* /opt/flink/lib/ +RUN wget -P /opt/flink/lib https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/4.0.1-2.0/flink-sql-connector-kafka-4.0.1-2.0.jar; \ + wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/2.2.0/flink-json-2.2.0.jar; \ + wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/2.2.0/flink-sql-avro-confluent-registry-2.2.0.jar; # Copy configuration COPY ./conf/* /opt/flink/conf/ -WORKDIR /opt/sql-client -ENV SQL_CLIENT_HOME=/opt/sql-client - COPY ./docker-entrypoint.sh / ENTRYPOINT ["/docker-entrypoint.sh"] diff --git a/docker/flink/Makefile b/docker/flink/Makefile index 2a947a57..b14e4bef 100644 --- a/docker/flink/Makefile +++ b/docker/flink/Makefile @@ -1,4 +1,4 @@ -FLINK_VERSION ?= 1.19.1-scala_2.12-java17 +FLINK_VERSION ?= 2.2.0-scala_2.12-java17 BUILD_PLATFORM ?= linux/amd64 PUSH_PREFIX ?= cnfldemos diff --git a/docker/flink/conf/config.yaml b/docker/flink/conf/config.yaml new file mode 100644 index 00000000..1cc2b3ec --- /dev/null +++ b/docker/flink/conf/config.yaml @@ -0,0 +1,30 @@ +blob: + server: + port: '6124' +taskmanager: + memory: + process: + size: 1728m + bind-host: 0.0.0.0 + numberOfTaskSlots: '10' +jobmanager: + execution: + failover-strategy: region + rpc: + address: flink-jobmanager + port: 6123 + memory: + process: + size: 1600m + bind-host: 0.0.0.0 +query: + server: + port: '6125' +parallelism: + default: 1 +rest: + address: flink-jobmanager +env: + java: + opts: + all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED \ No newline at end of file diff --git a/docker/flink/conf/flink-conf.yaml b/docker/flink/conf/flink-conf.yaml deleted file mode 100644 index 8a6c8b0a..00000000 --- a/docker/flink/conf/flink-conf.yaml +++ /dev/null @@ -1,312 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - - -#============================================================================== -# Common -#============================================================================== - -# The external address of the host on which the JobManager runs and can be -# reached by the TaskManagers and any clients which want to connect. This setting -# is only used in Standalone mode and may be overwritten on the JobManager side -# by specifying the --host parameter of the bin/jobmanager.sh executable. -# In high availability mode, if you use the bin/start-cluster.sh script and setup -# the conf/masters file, this will be taken care of automatically. Yarn -# automatically configure the host name based on the hostname of the node where the -# JobManager runs. - -jobmanager.rpc.address: flink-taskmanager - -# The RPC port where the JobManager is reachable. - -jobmanager.rpc.port: 6123 - -# The host interface the JobManager will bind to. By default, this is localhost, and will prevent -# the JobManager from communicating outside the machine/container it is running on. -# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. -# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. -# -# To enable this, set the bind-host address to one that has access to an outside facing network -# interface, such as 0.0.0.0. - -jobmanager.bind-host: 0.0.0.0 - - -# The total process memory size for the JobManager. -# -# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. - -jobmanager.memory.process.size: 1600m - -# The host interface the TaskManager will bind to. By default, this is localhost, and will prevent -# the TaskManager from communicating outside the machine/container it is running on. -# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. -# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. -# -# To enable this, set the bind-host address to one that has access to an outside facing network -# interface, such as 0.0.0.0. - -taskmanager.bind-host: 0.0.0.0 - -# The address of the host on which the TaskManager runs and can be reached by the JobManager and -# other TaskManagers. If not specified, the TaskManager will try different strategies to identify -# the address. -# -# Note this address needs to be reachable by the JobManager and forward traffic to one of -# the interfaces the TaskManager is bound to (see 'taskmanager.bind-host'). -# -# Note also that unless all TaskManagers are running on the same machine, this address needs to be -# configured separately for each TaskManager. - - -# The total process memory size for the TaskManager. -# -# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. - -taskmanager.memory.process.size: 1728m - -# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. -# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. -# -# taskmanager.memory.flink.size: 1280m - -# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. - -taskmanager.numberOfTaskSlots: 1 - -# The parallelism used for programs that did not specify and other parallelism. - -parallelism.default: 1 - -# The default file system scheme and authority. -# -# By default file paths without scheme are interpreted relative to the local -# root file system 'file:///'. Use this to override the default and interpret -# relative paths relative to a different file system, -# for example 'hdfs://mynamenode:12345' -# -# fs.default-scheme - -#============================================================================== -# High Availability -#============================================================================== - -# The high-availability mode. Possible options are 'NONE' or 'zookeeper'. -# -# high-availability: zookeeper - -# The path where metadata for master recovery is persisted. While ZooKeeper stores -# the small ground truth for checkpoint and leader election, this location stores -# the larger objects, like persisted dataflow graphs. -# -# Must be a durable file system that is accessible from all nodes -# (like HDFS, S3, Ceph, nfs, ...) -# -# high-availability.storageDir: hdfs:///flink/ha/ - -# The list of ZooKeeper quorum peers that coordinate the high-availability -# setup. This must be a list of the form: -# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) -# -# high-availability.zookeeper.quorum: localhost:2181 - - -# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes -# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) -# The default value is "open" and it can be changed to "creator" if ZK security is enabled -# -# high-availability.zookeeper.client.acl: open - -#============================================================================== -# Fault tolerance and checkpointing -#============================================================================== - -# The backend that will be used to store operator state checkpoints if -# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0. -# -# Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details. -# -# execution.checkpointing.interval: 3min -# execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION] -# execution.checkpointing.max-concurrent-checkpoints: 1 -# execution.checkpointing.min-pause: 0 -# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE] -# execution.checkpointing.timeout: 10min -# execution.checkpointing.tolerable-failed-checkpoints: 0 -# execution.checkpointing.unaligned: false -# -# Supported backends are 'hashmap', 'rocksdb', or the -# . -# -# state.backend: hashmap - -# Directory for checkpoints filesystem, when using any of the default bundled -# state backends. -# -# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints - -# Default target directory for savepoints, optional. -# -# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints - -# Flag to enable/disable incremental checkpoints for backends that -# support incremental checkpoints (like the RocksDB state backend). -# -# state.backend.incremental: false - -# The failover strategy, i.e., how the job computation recovers from task failures. -# Only restart tasks that may have been affected by the task failure, which typically includes -# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption. - -jobmanager.execution.failover-strategy: region - -#============================================================================== -# Rest & web frontend -#============================================================================== - -# The port to which the REST client connects to. If rest.bind-port has -# not been specified, then the server will bind to this port as well. -# -rest.port: 9081 - -# The address to which the REST client will connect to -# -rest.address: flink-jobmanager - -# Port range for the REST and web server to bind to. -# -#rest.bind-port: 8080-8090 - -# The address that the REST & web server binds to -# By default, this is localhost, which prevents the REST & web server from -# being able to communicate outside of the machine/container it is running on. -# -# To enable this, set the bind address to one that has access to outside-facing -# network interface, such as 0.0.0.0. -# -rest.bind-address: 0.0.0.0 - -# Flag to specify whether job submission is enabled from the web-based -# runtime monitor. Uncomment to disable. - -#web.submit.enable: false - -# Flag to specify whether job cancellation is enabled from the web-based -# runtime monitor. Uncomment to disable. - -#web.cancel.enable: false - -#============================================================================== -# Advanced -#============================================================================== - -# Override the directories for temporary files. If not specified, the -# system-specific Java temporary directory (java.io.tmpdir property) is taken. -# -# For framework setups on Yarn, Flink will automatically pick up the -# containers' temp directories without any need for configuration. -# -# Add a delimited list for multiple directories, using the system directory -# delimiter (colon ':' on unix) or a comma, e.g.: -# /data1/tmp:/data2/tmp:/data3/tmp -# -# Note: Each directory entry is read from and written to by a different I/O -# thread. You can include the same directory multiple times in order to create -# multiple I/O threads against that directory. This is for example relevant for -# high-throughput RAIDs. -# -# io.tmp.dirs: /tmp - -# The classloading resolve order. Possible values are 'child-first' (Flink's default) -# and 'parent-first' (Java's default). -# -# Child first classloading allows users to use different dependency/library -# versions in their application than those in the classpath. Switching back -# to 'parent-first' may help with debugging dependency issues. -# -# classloader.resolve-order: child-first - -# The amount of memory going to the network stack. These numbers usually need -# no tuning. Adjusting them may be necessary in case of an "Insufficient number -# of network buffers" error. The default min is 64MB, the default max is 1GB. -# -# taskmanager.memory.network.fraction: 0.1 -# taskmanager.memory.network.min: 64mb -# taskmanager.memory.network.max: 1gb - -#============================================================================== -# Flink Cluster Security Configuration -#============================================================================== - -# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - -# may be enabled in four steps: -# 1. configure the local krb5.conf file -# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) -# 3. make the credentials available to various JAAS login contexts -# 4. configure the connector to use JAAS/SASL - -# The below configure how Kerberos credentials are provided. A keytab will be used instead of -# a ticket cache if the keytab path and principal are set. - -# security.kerberos.login.use-ticket-cache: true -# security.kerberos.login.keytab: /path/to/kerberos/keytab -# security.kerberos.login.principal: flink-user - -# The configuration below defines which JAAS login contexts - -# security.kerberos.login.contexts: Client,KafkaClient - -#============================================================================== -# ZK Security Configuration -#============================================================================== - -# Below configurations are applicable if ZK ensemble is configured for security - -# Override below configuration to provide custom ZK service name if configured -# zookeeper.sasl.service-name: zookeeper - -# The configuration below must match one of the values set in "security.kerberos.login.contexts" -# zookeeper.sasl.login-context-name: Client - -#============================================================================== -# HistoryServer -#============================================================================== - -# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) - -# Directory to upload completed jobs to. Add this directory to the list of -# monitored directories of the HistoryServer as well (see below). -#jobmanager.archive.fs.dir: hdfs:///completed-jobs/ - -# The address under which the web-based HistoryServer listens. -#historyserver.web.address: 0.0.0.0 - -# The port under which the web-based HistoryServer listens. -#historyserver.web.port: 8082 - -# Comma separated list of directories to monitor for completed jobs. -#historyserver.archive.fs.dir: hdfs:///completed-jobs/ - -# Interval in milliseconds for refreshing the monitored directories. -#historyserver.archive.fs.refresh-interval: 10000 - -blob.server.port: 6124 -query.server.port: 6125 - -jobmanager.rpc.address: flink-jobmanager -taskmanager.numberOfTaskSlots: 10 \ No newline at end of file