Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2821,21 +2821,23 @@ public class Config extends ConfigBase {
})
public static int autobucket_max_buckets = 128;

@ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为512, "
+ "并强制限制小于 qe_max_connection/2, 避免`Reach limit of connections`, "
+ "因为arrow flight sql是无状态的协议,连接通常不会主动断开,"
@ConfField(description = {"单个 FE 的 Arrow Flight Server 的最大连接数。",
"Maximal number of connections of Arrow Flight Server per FE."})
public static int arrow_flight_max_connections = 4096;

@ConfField(description = {"(已弃用,被 arrow_flight_max_connection 替代) Arrow Flight Server中所有用户token的缓存上限,"
+ "超过后LRU淘汰, arrow flight sql是无状态的协议,连接通常不会主动断开,"
+ "bearer token 从 cache 淘汰的同时会 unregister Connection.",
"The cache limit of all user tokens in Arrow Flight Server. which will be eliminated by"
+ "LRU rules after exceeding the limit, the default value is 512, the mandatory limit is "
+ "less than qe_max_connection/2 to avoid `Reach limit of connections`, "
+ "because arrow flight sql is a stateless protocol, the connection is usually not actively "
+ "disconnected, bearer token is evict from the cache will unregister ConnectContext."})
public static int arrow_flight_token_cache_size = 512;

@ConfField(description = {"Arrow Flight Server中用户token的存活时间,自上次写入后过期时间,单位分钟,默认值为4320,即3天",
"The alive time of the user token in Arrow Flight Server, expire after write, unit minutes,"
+ "the default value is 4320, which is 3 days"})
public static int arrow_flight_token_alive_time = 4320;
"(Deprecated, replaced by arrow_flight_max_connection) The cache limit of all user tokens in "
+ "Arrow Flight Server. which will be eliminated by LRU rules after exceeding the limit, "
+ "arrow flight sql is a stateless protocol, the connection is usually not actively disconnected, "
+ "bearer token is evict from the cache will unregister ConnectContext."})
public static int arrow_flight_token_cache_size = 4096;

@ConfField(description = {"Arrow Flight Server中用户token的存活时间,自上次写入后过期时间,单位秒,默认值为86400,即1天",
"The alive time of the user token in Arrow Flight Server, expire after write, unit second,"
+ "the default value is 86400, which is 1 days"})
public static int arrow_flight_token_alive_time_second = 86400;

@ConfField(mutable = true, description = {
"Doris 为了兼用 mysql 周边工具生态,会内置一个名为 mysql 的数据库,如果该数据库与用户自建数据库冲突,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,16 @@ public void handleConnection(ConnectContext context, StreamConnection connection
if (!MysqlProto.negotiate(context)) {
throw new AfterConnectedException("mysql negotiate failed");
}
int res = connectScheduler.registerConnection(context);
int res = connectScheduler.getConnectPoolMgr().registerConnection(context);
if (res == -1) {
MysqlProto.sendResponsePacket(context);
connection.setCloseListener(
streamConnection -> connectScheduler.unregisterConnection(context));
streamConnection -> connectScheduler.getConnectPoolMgr().unregisterConnection(context));
} else {
long userConnLimit = context.getEnv().getAuth().getMaxConn(context.getQualifiedUser());
String errMsg = String.format(
"Reach limit of connections. Total: %d, User: %d, Current: %d",
connectScheduler.getMaxConnections(), userConnLimit, res);
connectScheduler.getConnectPoolMgr().getMaxConnections(), userConnLimit, res);
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, errMsg);
MysqlProto.sendResponsePacket(context);
throw new AfterConnectedException(errMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ public void setQueryId(TUniqueId queryId) {
}
this.queryId = queryId;
if (connectScheduler != null && !Strings.isNullOrEmpty(traceId)) {
connectScheduler.putTraceId2QueryId(traceId, queryId);
connectScheduler.getConnectPoolMgr().putTraceId2QueryId(traceId, queryId);
}
}

Expand Down
169 changes: 169 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext.ThreadInfo;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

public class ConnectPoolMgr {
private static final Logger LOG = LogManager.getLogger(ConnectPoolMgr.class);
protected final int maxConnections;
protected final AtomicInteger numberConnection;
protected final Map<Integer, ConnectContext> connectionMap = Maps.newConcurrentMap();
protected final Map<String, AtomicInteger> connByUser = Maps.newConcurrentMap();

// valid trace id -> query id
protected final Map<String, TUniqueId> traceId2QueryId = Maps.newConcurrentMap();

public ConnectPoolMgr(int maxConnections) {
this.maxConnections = maxConnections;
numberConnection = new AtomicInteger(0);
}

public void timeoutChecker(long now) {
for (ConnectContext connectContext : connectionMap.values()) {
connectContext.checkTimeout(now);
}
}

// Register one connection with its connection id.
// Return -1 means register OK
// Return >=0 means register failed, and return value is current connection num.
public int registerConnection(ConnectContext ctx) {
if (numberConnection.incrementAndGet() > maxConnections) {
numberConnection.decrementAndGet();
return numberConnection.get();
}
// Check user
connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0));
AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
if (conns.incrementAndGet() > ctx.getEnv().getAuth().getMaxConn(ctx.getQualifiedUser())) {
conns.decrementAndGet();
numberConnection.decrementAndGet();
return numberConnection.get();
}
connectionMap.put(ctx.getConnectionId(), ctx);
return -1;
}

public void unregisterConnection(ConnectContext ctx) {
ctx.closeTxn();
if (connectionMap.remove(ctx.getConnectionId()) != null) {
AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
if (conns != null) {
conns.decrementAndGet();
}
numberConnection.decrementAndGet();
}
}

public ConnectContext getContext(int connectionId) {
return connectionMap.get(connectionId);
}

public ConnectContext getContextWithQueryId(String queryId) {
for (ConnectContext context : connectionMap.values()) {
if (queryId.equals(DebugUtil.printId(context.queryId))) {
return context;
}
}
return null;
}

public boolean cancelQuery(String queryId, Status cancelReason) {
for (ConnectContext ctx : connectionMap.values()) {
TUniqueId qid = ctx.queryId();
if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
ctx.cancelQuery(cancelReason);
return true;
}
}
return false;
}

public int getConnectionNum() {
return numberConnection.get();
}

public List<ThreadInfo> listConnection(String user, boolean isFull) {
List<ConnectContext.ThreadInfo> infos = Lists.newArrayList();
for (ConnectContext ctx : connectionMap.values()) {
// Check auth
if (!ctx.getQualifiedUser().equals(user) && !Env.getCurrentEnv().getAccessManager()
.checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
continue;
}

infos.add(ctx.toThreadInfo(isFull));
}
return infos;
}

// used for thrift
public List<List<String>> listConnectionForRpc(UserIdentity userIdentity, boolean isShowFullSql,
Optional<String> timeZone) {
List<List<String>> list = new ArrayList<>();
long nowMs = System.currentTimeMillis();
for (ConnectContext ctx : connectionMap.values()) {
// Check auth
if (!ctx.getCurrentUserIdentity().equals(userIdentity) && !Env.getCurrentEnv().getAccessManager()
.checkGlobalPriv(userIdentity, PrivPredicate.GRANT)) {
continue;
}
list.add(ctx.toThreadInfo(isShowFullSql).toRow(-1, nowMs, timeZone));
}
return list;
}

public void putTraceId2QueryId(String traceId, TUniqueId queryId) {
traceId2QueryId.put(traceId, queryId);
}

public String getQueryIdByTraceId(String traceId) {
TUniqueId queryId = traceId2QueryId.get(traceId);
return queryId == null ? "" : DebugUtil.printId(queryId);
}

public Map<Integer, ConnectContext> getConnectionMap() {
return connectionMap;
}

public Map<String, AtomicInteger> getUserConnectionMap() {
return connByUser;
}

public int getMaxConnections() {
return maxConnections;
}
}
Loading
Loading